List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:October 31 2011 7:41am
Subject:bzr push into mysql-5.5-cluster branch (magnus.blaudd:3619 to 3620)
View as plain text  
 3620 magnus.blaudd@stripped	2011-10-31 [merge]
      Merge in latest ndbcluster with schema dist refactorings

    added:
      sql/ndb_event_data.cc
      sql/ndb_event_data.h
    modified:
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ha_ndbcluster_glue.h
      sql/ndb_thd.cc
      sql/ndb_thd.h
      storage/ndb/CMakeLists.txt
 3619 magnus.blaudd@stripped	2011-10-27 [merge]
      Merge

    added:
      sql/ndb_anyvalue.cc
      sql/ndb_anyvalue.h
      sql/ndb_binlog_extra_row_info.cc
      sql/ndb_binlog_extra_row_info.h
      sql/ndb_ndbapi_util.cc
      sql/ndb_ndbapi_util.h
    modified:
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ha_ndbcluster_connection.cc
      sql/ndb_share.h
      storage/ndb/CMakeLists.txt
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-10-27 11:57:04 +0000
+++ b/sql/ha_ndbcluster.cc	2011-10-29 09:02:21 +0000
@@ -53,6 +53,7 @@
 #include "ndb_conflict_trans.h"
 #include "ndb_anyvalue.h"
 #include "ndb_binlog_extra_row_info.h"
+#include "ndb_event_data.h"
 
 // ndb interface initialization/cleanup
 extern "C" void ndb_init_internal();
@@ -8760,6 +8761,31 @@ ha_ndbcluster::start_transaction_part_id
    
 
 /**
+  Static error print function called from static handler method
+  ndbcluster_commit and ndbcluster_rollback.
+*/
+static void
+ndbcluster_print_error(int error, const NdbOperation *error_op)
+{
+  DBUG_ENTER("ndbcluster_print_error");
+  TABLE_SHARE share;
+  const char *tab_name= (error_op) ? error_op->getTableName() : "";
+  if (tab_name == NULL)
+  {
+    DBUG_ASSERT(tab_name != NULL);
+    tab_name= "";
+  }
+  share.db.str= (char*) "";
+  share.db.length= 0;
+  share.table_name.str= (char *) tab_name;
+  share.table_name.length= strlen(tab_name);
+  ha_ndbcluster error_handler(ndbcluster_hton, &share);
+  error_handler.print_error(error, MYF(0));
+  DBUG_VOID_RETURN;
+}
+
+
+/**
   Commit a transaction started in NDB.
 */
 
@@ -11841,7 +11867,7 @@ static int ndbcluster_close_connection(h
 /**
   Try to discover one table from NDB.
 */
-
+static
 int ndbcluster_discover(handlerton *hton, THD* thd, const char *db, 
                         const char *name,
                         uchar **frmblob, 
@@ -11962,7 +11988,7 @@ err:
 /**
   Check if a table exists in NDB.
 */
-
+static
 int ndbcluster_table_exists_in_engine(handlerton *hton, THD* thd, 
                                       const char *db,
                                       const char *name)
@@ -12751,30 +12777,6 @@ void ha_ndbcluster::print_error(int erro
 
 
 /**
-  Static error print function called from static handler method
-  ndbcluster_commit and ndbcluster_rollback.
-*/
-
-void ndbcluster_print_error(int error, const NdbOperation *error_op)
-{
-  DBUG_ENTER("ndbcluster_print_error");
-  TABLE_SHARE share;
-  const char *tab_name= (error_op) ? error_op->getTableName() : "";
-  if (tab_name == NULL) 
-  {
-    DBUG_ASSERT(tab_name != NULL);
-    tab_name= "";
-  }
-  share.db.str= (char*) "";
-  share.db.length= 0;
-  share.table_name.str= (char *) tab_name;
-  share.table_name.length= strlen(tab_name);
-  ha_ndbcluster error_handler(ndbcluster_hton, &share);
-  error_handler.print_error(error, MYF(0));
-  DBUG_VOID_RETURN;
-}
-
-/**
   Set a given location from full pathname to database name.
 */
 

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-10-27 11:48:43 +0000
+++ b/sql/ha_ndbcluster.h	2011-10-28 07:14:45 +0000
@@ -42,7 +42,6 @@ class NdbBlob;
 class NdbIndexStat;
 class NdbEventOperation;
 class ha_ndbcluster_cond;
-class Ndb_event_data;
 class NdbQuery;
 class NdbQueryOperation;
 class NdbQueryOperationTypeWrapper;
@@ -497,9 +496,6 @@ private:
                                     const uchar *record,
                                     bool use_active_index);
   friend int ndbcluster_drop_database_impl(THD *thd, const char *path);
-  friend int ndb_handle_schema_change(THD *thd, 
-                                      Ndb *ndb, NdbEventOperation *pOp,
-                                      NDB_SHARE *share);
 
   void check_read_before_write_removal();
   static int drop_table_impl(THD *thd, ha_ndbcluster *h, Ndb *ndb,
@@ -814,12 +810,6 @@ private:
   int add_handler_to_open_tables(THD*, Thd_ndb*, ha_ndbcluster* handler);
 };
 
-int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
-                        const void** frmblob, uint* frmlen);
-int ndbcluster_table_exists_in_engine(THD* thd,
-                                      const char *db, const char *name);
-void ndbcluster_print_error(int error, const NdbOperation *error_op);
-
 static const char ndbcluster_hton_name[]= "ndbcluster";
 static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1;
 extern int ndbcluster_terminating;

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-10-27 09:34:06 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-10-29 14:21:55 +0000
@@ -58,6 +58,7 @@ bool ndb_log_empty_epochs(void);
 #include "ndb_dist_priv_util.h"
 #include "ndb_anyvalue.h"
 #include "ndb_binlog_extra_row_info.h"
+#include "ndb_event_data.h"
 
 /*
   Timeout for syncing schema events between
@@ -141,9 +142,9 @@ static int ndbcluster_binlog_terminating
   Mutex and condition used for interacting between client sql thread
   and injector thread
 */
-pthread_t ndb_binlog_thread;
-pthread_mutex_t injector_mutex;
-pthread_cond_t  injector_cond;
+static pthread_t ndb_binlog_thread;
+static pthread_mutex_t injector_mutex;
+static pthread_cond_t  injector_cond;
 
 /* NDB Injector thread (used for binlog creation) */
 static ulonglong ndb_latest_applied_binlog_epoch= 0;
@@ -152,7 +153,7 @@ static ulonglong ndb_latest_received_bin
 
 NDB_SHARE *ndb_apply_status_share= 0;
 NDB_SHARE *ndb_schema_share= 0;
-pthread_mutex_t ndb_schema_share_mutex;
+static pthread_mutex_t ndb_schema_share_mutex;
 
 extern my_bool opt_log_slave_updates;
 static my_bool g_ndb_log_slave_updates;
@@ -260,20 +261,6 @@ static void dbug_print_table(const char 
 #define dbug_print_table(a,b)
 #endif
 
-static inline void
-print_warning_list(const char* prefix, List<MYSQL_ERROR>& list)
-{
-  List_iterator_fast<MYSQL_ERROR> it(list);
-  MYSQL_ERROR *err;
-  while ((err= it++))
-  {
-    sql_print_warning("%s: (%d)%s",
-                      prefix,
-                      MYSQL_ERROR_get_sql_errno(err),
-                      MYSQL_ERROR_get_message_text(err));
-  }
-}
-
 
 static void run_query(THD *thd, char *buf, char *end,
                       const int *no_print_error)
@@ -2087,7 +2074,7 @@ end:
 static
 int
 ndb_handle_schema_change(THD *thd, Ndb *is_ndb, NdbEventOperation *pOp,
-                         Ndb_event_data *event_data)
+                         const Ndb_event_data *event_data)
 {
   DBUG_ENTER("ndb_handle_schema_change");
   NDB_SHARE *share= event_data->share;
@@ -2207,14 +2194,87 @@ private:
 
 class Ndb_schema_event_handler {
 
-  struct Cluster_schema
+  class Ndb_schema_op
   {
+    // Unpack Ndb_schema_op from event_data pointer
+    void unpack_event(const Ndb_event_data *event_data)
+    {
+      TABLE *table= event_data->shadow_table;
+      Field **field;
+      /* unpack blob values */
+      uchar* blobs_buffer= 0;
+      uint blobs_buffer_size= 0;
+      my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+      {
+        ptrdiff_t ptrdiff= 0;
+        int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
+                                     blobs_buffer, blobs_buffer_size,
+                                     ptrdiff);
+        if (ret != 0)
+        {
+          my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+          DBUG_PRINT("info", ("blob read error"));
+          DBUG_ASSERT(FALSE);
+        }
+      }
+      /* db varchar 1 length uchar */
+      field= table->field;
+      db_length= *(uint8*)(*field)->ptr;
+      DBUG_ASSERT(db_length <= (*field)->field_length);
+      DBUG_ASSERT((*field)->field_length + 1 == sizeof(db));
+      memcpy(db, (*field)->ptr + 1, db_length);
+      db[db_length]= 0;
+      /* name varchar 1 length uchar */
+      field++;
+      name_length= *(uint8*)(*field)->ptr;
+      DBUG_ASSERT(name_length <= (*field)->field_length);
+      DBUG_ASSERT((*field)->field_length + 1 == sizeof(name));
+      memcpy(name, (*field)->ptr + 1, name_length);
+      name[name_length]= 0;
+      /* slock fixed length */
+      field++;
+      slock_length= (*field)->field_length;
+      DBUG_ASSERT((*field)->field_length == sizeof(slock_buf));
+      memcpy(slock_buf, (*field)->ptr, slock_length);
+      /* query blob */
+      field++;
+      {
+        Field_blob *field_blob= (Field_blob*)(*field);
+        uint blob_len= field_blob->get_length((*field)->ptr);
+        uchar *blob_ptr= 0;
+        field_blob->get_ptr(&blob_ptr);
+        DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
+        query_length= blob_len;
+        query= sql_strmake((char*) blob_ptr, blob_len);
+      }
+      /* node_id */
+      field++;
+      node_id= (Uint32)((Field_long *)*field)->val_int();
+      /* epoch */
+      field++;
+      epoch= ((Field_long *)*field)->val_int();
+      /* id */
+      field++;
+      id= (Uint32)((Field_long *)*field)->val_int();
+      /* version */
+      field++;
+      version= (Uint32)((Field_long *)*field)->val_int();
+      /* type */
+      field++;
+      type= (Uint32)((Field_long *)*field)->val_int();
+      /* free blobs buffer */
+      my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+      dbug_tmp_restore_column_map(table->read_set, old_map);
+    }
+
+  public:
     uchar db_length;
     char db[64];
     uchar name_length;
     char name[64];
     uchar slock_length;
-    uint32 slock[SCHEMA_SLOCK_SIZE/4];
+    uint32 slock_buf[SCHEMA_SLOCK_SIZE/4];
+    MY_BITMAP slock;
     unsigned short query_length;
     char *query;
     Uint64 epoch;
@@ -2223,99 +2283,52 @@ class Ndb_schema_event_handler {
     uint32 version;
     uint32 type;
     uint32 any_value;
-  };
 
+    /**
+      Create a Ndb_schema_op from event_data
+    */
+    static Ndb_schema_op*
+    create(const Ndb_event_data* event_data,
+           Uint32 any_value)
+    {
+      DBUG_ENTER("Ndb_schema_op::create");
+      Ndb_schema_op* schema_op=
+        (Ndb_schema_op*)sql_alloc(sizeof(Ndb_schema_op));
+      bitmap_init(&schema_op->slock,
+                  schema_op->slock_buf, 8*SCHEMA_SLOCK_SIZE, FALSE);
+      schema_op->unpack_event(event_data);
+      schema_op->any_value= any_value;
+      DBUG_PRINT("exit", ("%s.%s: query: '%s'  type: %d",
+                          schema_op->db, schema_op->name,
+                          schema_op->query,
+                          schema_op->type));
+      DBUG_RETURN(schema_op);
+    }
+  };
 
   static void
   print_could_not_discover_error(THD *thd,
-                                 const Cluster_schema *schema)
+                                 const Ndb_schema_op *schema)
   {
     sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
                     "binlog schema event '%s' from node %d. "
                     "my_errno: %d",
                      schema->db, schema->name, schema->query,
                      schema->node_id, my_errno);
-    print_warning_list("NDB Binlog", thd_warn_list(thd));
+    thd_print_warning_list(thd, "NDB Binlog");
   }
 
 
-  /*
-    Transfer schema table event data into Cluster_schema struct
-  */
-  static void ndbcluster_get_schema(const Ndb_event_data *event_data,
-                                    Cluster_schema *s)
+  static void
+  write_schema_op_to_binlog(THD *thd, Ndb_schema_op *schema)
   {
-    TABLE *table= event_data->shadow_table;
-    Field **field;
-    /* unpack blob values */
-    uchar* blobs_buffer= 0;
-    uint blobs_buffer_size= 0;
-    my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
-    {
-      ptrdiff_t ptrdiff= 0;
-      int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
-                                   blobs_buffer, blobs_buffer_size,
-                                   ptrdiff);
-      if (ret != 0)
-      {
-        my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
-        DBUG_PRINT("info", ("blob read error"));
-        DBUG_ASSERT(FALSE);
-      }
-    }
-    /* db varchar 1 length uchar */
-    field= table->field;
-    s->db_length= *(uint8*)(*field)->ptr;
-    DBUG_ASSERT(s->db_length <= (*field)->field_length);
-    DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
-    memcpy(s->db, (*field)->ptr + 1, s->db_length);
-    s->db[s->db_length]= 0;
-    /* name varchar 1 length uchar */
-    field++;
-    s->name_length= *(uint8*)(*field)->ptr;
-    DBUG_ASSERT(s->name_length <= (*field)->field_length);
-    DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
-    memcpy(s->name, (*field)->ptr + 1, s->name_length);
-    s->name[s->name_length]= 0;
-    /* slock fixed length */
-    field++;
-    s->slock_length= (*field)->field_length;
-    DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
-    memcpy(s->slock, (*field)->ptr, s->slock_length);
-    /* query blob */
-    field++;
-    {
-      Field_blob *field_blob= (Field_blob*)(*field);
-      uint blob_len= field_blob->get_length((*field)->ptr);
-      uchar *blob_ptr= 0;
-      field_blob->get_ptr(&blob_ptr);
-      DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
-      s->query_length= blob_len;
-      s->query= sql_strmake((char*) blob_ptr, blob_len);
-    }
-    /* node_id */
-    field++;
-    s->node_id= (Uint32)((Field_long *)*field)->val_int();
-    /* epoch */
-    field++;
-    s->epoch= ((Field_long *)*field)->val_int();
-    /* id */
-    field++;
-    s->id= (Uint32)((Field_long *)*field)->val_int();
-    /* version */
-    field++;
-    s->version= (Uint32)((Field_long *)*field)->val_int();
-    /* type */
-    field++;
-    s->type= (Uint32)((Field_long *)*field)->val_int();
-    /* free blobs buffer */
-    my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
-    dbug_tmp_restore_column_map(table->read_set, old_map);
-  }
 
+    if (!ndb_binlog_running)
+    {
+      // This mysqld is not writing a binlog
+      return;
+    }
 
-  static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
-  {
     /* any_value == 0 means local cluster sourced change that
      * should be logged
      */
@@ -2379,24 +2392,23 @@ class Ndb_schema_event_handler {
 
 
   /*
-    acknowledge handling of schema operation
+    Acknowledge handling of schema operation
+    - Inform the other nodes that schema op has
+      been completed by this node (by updating the
+      row for this op in ndb_schema table)
   */
-  static int
-  ndbcluster_update_slock(THD *thd,
-                          const char *db,
-                          const char *table_name,
-                          uint32 table_id,
-                          uint32 table_version)
+  int
+  ack_schema_op(const char *db, const char *table_name,
+                uint32 table_id, uint32 table_version)
   {
-    DBUG_ENTER("ndbcluster_update_slock");
+    DBUG_ENTER("ack_schema_op");
     if (!ndb_schema_share)
     {
       DBUG_RETURN(0);
     }
 
     const NdbError *ndb_error= 0;
-    uint32 node_id= g_ndb_cluster_connection->node_id();
-    Ndb *ndb= check_ndb_in_thd(thd);
+    Ndb *ndb= check_ndb_in_thd(m_thd);
     char save_db[FN_HEADLEN];
     strcpy(save_db, ndb->getDatabaseName());
 
@@ -2469,7 +2481,7 @@ class Ndb_schema_event_handler {
       {
         uint32 copy[SCHEMA_SLOCK_SIZE/4];
         memcpy(copy, bitbuf, sizeof(copy));
-        bitmap_clear_bit(&slock, node_id);
+        bitmap_clear_bit(&slock, own_nodeid());
         sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
                               db, table_name,
                               table_id, table_version,
@@ -2479,7 +2491,7 @@ class Ndb_schema_event_handler {
       }
       else
       {
-        bitmap_clear_bit(&slock, node_id);
+        bitmap_clear_bit(&slock, own_nodeid());
       }
 
       {
@@ -2505,7 +2517,7 @@ class Ndb_schema_event_handler {
         r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
         DBUG_ASSERT(r == 0);
         /* node_id */
-        r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
+        r|= op->setValue(SCHEMA_NODE_ID_I, own_nodeid());
         DBUG_ASSERT(r == 0);
         /* type */
         r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
@@ -2515,14 +2527,15 @@ class Ndb_schema_event_handler {
                          NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
       {
         DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
-                            node_id, db, table_name));
+                            own_nodeid(), db, table_name));
         dict->forceGCPWait(1);
         break;
       }
     err:
       const NdbError *this_error= trans ?
         &trans->getNdbError() : &ndb->getNdbError();
-      if (this_error->status == NdbError::TemporaryError && !thd->killed)
+      if (this_error->status == NdbError::TemporaryError &&
+          !thd_killed(m_thd))
       {
         if (retries--)
         {
@@ -2538,12 +2551,10 @@ class Ndb_schema_event_handler {
 
     if (ndb_error)
     {
-      char buf[1024];
-      my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
-                  db, table_name);
-      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
-                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                          ndb_error->code, ndb_error->message, buf);
+      sql_print_warning("NDB: Could not release slock on '%s.%s', "
+                        "Error code: %d Message: %s",
+                        db, table_name,
+                        ndb_error->code, ndb_error->message);
     }
     if (trans)
       ndb->closeTransaction(trans);
@@ -2552,43 +2563,100 @@ class Ndb_schema_event_handler {
   }
 
 
-static int
-handle_schema_event(THD *thd, Ndb *s_ndb,
-                    NdbEventOperation *pOp,
-                    List<Cluster_schema> *post_epoch_log_list,
-                    List<Cluster_schema> *post_epoch_unlock_list,
-                    MEM_ROOT *mem_root)
-{
-  DBUG_ENTER("handle_schema_event");
-  Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
-  NDB_SHARE *tmp_share= event_data->share;
-  if (tmp_share && ndb_schema_share == tmp_share)
+  bool check_is_ndb_schema_event(const Ndb_event_data* event_data) const
+  {
+    if (!event_data)
+    {
+      // Received event without event data pointer
+      assert(false);
+      return false;
+    }
+
+    NDB_SHARE *share= event_data->share;
+    if (!share)
+    {
+      // Received event where the event_data is not properly initialized
+      assert(false);
+      return false;
+    }
+    assert(event_data->shadow_table);
+    assert(event_data->ndb_value[0]);
+    assert(event_data->ndb_value[1]);
+
+    pthread_mutex_lock(&ndb_schema_share_mutex);
+    if (share != ndb_schema_share)
+    {
+      // Received event from s_ndb not pointing at the ndb_schema_share
+      pthread_mutex_unlock(&ndb_schema_share_mutex);
+      assert(false);
+      return false;
+    }
+    assert(!strncmp(share->db, STRING_WITH_LEN(NDB_REP_DB)));
+    assert(!strncmp(share->table_name, STRING_WITH_LEN(NDB_SCHEMA_TABLE)));
+    pthread_mutex_unlock(&ndb_schema_share_mutex);
+    return true;
+  }
+
+
+  void
+  log_after_epoch(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("log_after_epoch");
+    m_post_epoch_log_list.push_back(schema, m_mem_root);
+    DBUG_VOID_RETURN;
+  }
+
+
+  void
+  unlock_after_epoch(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("unlock_after_epoch");
+    m_post_epoch_unlock_list.push_back(schema, m_mem_root);
+    DBUG_VOID_RETURN;
+  }
+
+
+  uint own_nodeid(void) const
+  {
+    return m_own_nodeid;
+  }
+
+
+  bool
+  check_if_local_tables_in_db(const char *dbname) const
   {
-    NDBEVENT::TableEvent ev_type= pOp->getEventType();
-    DBUG_PRINT("enter", ("%s.%s  ev_type: %d",
-                         tmp_share->db, tmp_share->table_name, ev_type));
-    if (ev_type == NDBEVENT::TE_UPDATE ||
-        ev_type == NDBEVENT::TE_INSERT)
-    {
-      Thd_ndb *thd_ndb= get_thd_ndb(thd);
-      Ndb *ndb= thd_ndb->ndb;
-      NDBDICT *dict= ndb->getDictionary();
-      Thd_ndb_options_guard thd_ndb_options(thd_ndb);
-      Cluster_schema *schema= (Cluster_schema *)
-        sql_alloc(sizeof(Cluster_schema));
-      MY_BITMAP slock;
-      bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE);
-      uint node_id= g_ndb_cluster_connection->node_id();
-      {
-        ndbcluster_get_schema(event_data, schema);
-        schema->any_value= pOp->getAnyValue();
-      }
-      enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
-      DBUG_PRINT("info",
-                 ("%s.%s: log query_length: %d  query: '%s'  type: %d",
-                  schema->db, schema->name,
-                  schema->query_length, schema->query,
-                  schema_type));
+    DBUG_ENTER("check_if_local_tables_in_db");
+    DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
+    List<LEX_STRING> files;
+    char path[FN_REFLEN + 1];
+
+    build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
+    if (find_files(m_thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
+    {
+      m_thd->clear_error();
+      DBUG_PRINT("info", ("Failed to find files"));
+      DBUG_RETURN(true);
+    }
+    DBUG_PRINT("info",("found: %d files", files.elements));
+
+    LEX_STRING *tabname;
+    while ((tabname= files.pop()))
+    {
+      DBUG_PRINT("info", ("Found table %s", tabname->str));
+      if (ndbcluster_check_if_local_table(dbname, tabname->str))
+        DBUG_RETURN(true);
+    }
+
+    DBUG_RETURN(false);
+  }
+
+
+  int
+  handle_schema_op(Ndb_schema_op* schema)
+  {
+    DBUG_ENTER("handle_schema_op");
+    {
+      const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
 
       if (opt_ndb_extra_logging > 19)
       {
@@ -2599,7 +2667,8 @@ handle_schema_event(THD *thd, Ndb *s_ndb
                               get_schema_type_name(schema_type),
                               schema_type,
                               schema->node_id,
-                              slock.bitmap[0], slock.bitmap[1]);
+                              schema->slock.bitmap[0],
+                              schema->slock.bitmap[1]);
       }
 
       if ((schema->db[0] == 0) && (schema->name[0] == 0))
@@ -2611,6 +2680,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
          */
         DBUG_RETURN(0);
       }
+
       switch (schema_type)
       {
       case SOT_CLEAR_SLOCK:
@@ -2619,24 +2689,29 @@ handle_schema_event(THD *thd, Ndb *s_ndb
           schema events get inserted in the binlog after any data
           events
         */
-        post_epoch_log_list->push_back(schema, mem_root);
+        log_after_epoch(schema);
         DBUG_RETURN(0);
 
       case SOT_ALTER_TABLE_COMMIT:
       case SOT_RENAME_TABLE_PREPARE:
       case SOT_ONLINE_ALTER_TABLE_PREPARE:
       case SOT_ONLINE_ALTER_TABLE_COMMIT:
-        post_epoch_log_list->push_back(schema, mem_root);
-        post_epoch_unlock_list->push_back(schema, mem_root);
+        log_after_epoch(schema);
+        unlock_after_epoch(schema);
         DBUG_RETURN(0);
 
       default:
         break;
       }
 
-      if (schema->node_id != node_id)
+      if (schema->node_id != own_nodeid())
       {
-        int log_query= 0, post_epoch_unlock= 0;
+        THD* thd= m_thd; // Code compatibility
+        Thd_ndb *thd_ndb= get_thd_ndb(thd);
+        Ndb *ndb= thd_ndb->ndb;
+        Thd_ndb_options_guard thd_ndb_options(thd_ndb);
+
+        int post_epoch_unlock= 0;
  
         switch (schema_type)
         {
@@ -2652,7 +2727,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
                       schema->query + schema->query_length,
                       no_print_error);
             /* binlog dropping table after any table operations */
-            post_epoch_log_list->push_back(schema, mem_root);
+            log_after_epoch(schema);
             /* acknowledge this query _after_ epoch completion */
             post_epoch_unlock= 1;
           }
@@ -2667,7 +2742,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
                             (schema_type == SOT_DROP_TABLE ? "dropp" : "renam"),
                             schema->db, schema->name, schema->query,
                             schema->node_id);
-            log_query= 1;
+            write_schema_op_to_binlog(thd, schema);
           }
           // Fall through
 	case SOT_TRUNCATE_TABLE:
@@ -2687,7 +2762,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
           {
             {
               ndb->setDatabaseName(schema->db);
-              Ndb_table_guard ndbtab_g(dict, schema->name);
+              Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name);
               ndbtab_g.invalidate();
             }
             TABLE_LIST table_list;
@@ -2722,20 +2797,20 @@ handle_schema_event(THD *thd, Ndb *s_ndb
           {
             print_could_not_discover_error(thd, schema);
           }
-          log_query= 1;
+          write_schema_op_to_binlog(thd, schema);
           break;
 
         case SOT_DROP_DB:
           /* Drop the database locally if it only contains ndb tables */
           thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP);
-          if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db))
+          if (!check_if_local_tables_in_db(schema->db))
           {
             const int no_print_error[1]= {0};
             run_query(thd, schema->query,
                       schema->query + schema->query_length,
                       no_print_error);
             /* binlog dropping database after any table operations */
-            post_epoch_log_list->push_back(schema, mem_root);
+            log_after_epoch(schema);
             /* acknowledge this query _after_ epoch completion */
             post_epoch_unlock= 1;
           }
@@ -2746,7 +2821,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
                             "binlog schema event '%s' from node %d. ",
                             schema->db, schema->query,
                             schema->node_id);
-            log_query= 1;
+            write_schema_op_to_binlog(thd, schema);
           }
           break;
 
@@ -2758,7 +2833,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb
           run_query(thd, schema->query,
                     schema->query + schema->query_length,
                     no_print_error);
-          log_query= 1;
+          write_schema_op_to_binlog(thd, schema);
           break;
         }
 
@@ -2779,13 +2854,13 @@ handle_schema_event(THD *thd, Ndb *s_ndb
           run_query(thd, cmd,
                     cmd + strlen(cmd),
                     no_print_error);
-          log_query= 1;
+          write_schema_op_to_binlog(thd, schema);
 	  break;
         }
 
         case SOT_TABLESPACE:
         case SOT_LOGFILE_GROUP:
-          log_query= 1;
+          write_schema_op_to_binlog(thd, schema);
           break;
 
         case SOT_ALTER_TABLE_COMMIT:
@@ -2799,159 +2874,44 @@ handle_schema_event(THD *thd, Ndb *s_ndb
           break;
 
         }
-        if (log_query && ndb_binlog_running)
-          ndb_binlog_query(thd, schema);
+
         /* signal that schema operation has been handled */
-        DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length);
-        if (bitmap_is_set(&slock, node_id))
+        DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length);
+        if (bitmap_is_set(&schema->slock, own_nodeid()))
         {
           if (post_epoch_unlock)
-            post_epoch_unlock_list->push_back(schema, mem_root);
+            unlock_after_epoch(schema);
           else
-            ndbcluster_update_slock(thd, schema->db, schema->name,
-                                    schema->id, schema->version);
+            ack_schema_op(schema->db, schema->name,
+                          schema->id, schema->version);
         }
       }
-      DBUG_RETURN(0);
-    }
-    /*
-      the normal case of UPDATE/INSERT has already been handled
-    */
-    switch (ev_type)
-    {
-    case NDBEVENT::TE_DELETE:
-      // skip
-      break;
-    case NDBEVENT::TE_CLUSTER_FAILURE:
-      if (opt_ndb_extra_logging)
-        sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
-                              ndb_schema_share->key,
-                              (uint)(pOp->getGCI() >> 32),
-                              (uint)(pOp->getGCI()));
-      // fall through
-    case NDBEVENT::TE_DROP:
-      if (opt_ndb_extra_logging &&
-          ndb_binlog_tables_inited && ndb_binlog_running)
-        sql_print_information("NDB Binlog: ndb tables initially "
-                              "read only on reconnect.");
-
-      /* begin protect ndb_schema_share */
-      pthread_mutex_lock(&ndb_schema_share_mutex);
-      /* ndb_share reference binlog extra free */
-      DBUG_PRINT("NDB_SHARE", ("%s binlog extra free  use_count: %u",
-                               ndb_schema_share->key,
-                               ndb_schema_share->use_count));
-      free_share(&ndb_schema_share);
-      ndb_schema_share= 0;
-      ndb_binlog_tables_inited= FALSE;
-      ndb_binlog_is_ready= FALSE;
-      pthread_mutex_unlock(&ndb_schema_share_mutex);
-      /* end protect ndb_schema_share */
-
-      close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
-      // fall through
-    case NDBEVENT::TE_ALTER:
-      ndb_handle_schema_change(thd, s_ndb, pOp, event_data);
-      break;
-    case NDBEVENT::TE_NODE_FAILURE:
-    {
-      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
-      DBUG_ASSERT(node_id != 0xFF);
-      pthread_mutex_lock(&tmp_share->mutex);
-      bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
-      DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
-      if (opt_ndb_extra_logging)
-      {
-        sql_print_information("NDB Binlog: Node: %d, down,"
-                              " Subscriber bitmask %x%x",
-                              pOp->getNdbdNodeId(),
-                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
-                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
-      }
-      pthread_mutex_unlock(&tmp_share->mutex);
-      (void) pthread_cond_signal(&injector_cond);
-      break;
-    }
-    case NDBEVENT::TE_SUBSCRIBE:
-    {
-      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
-      uint8 req_id= pOp->getReqNodeId();
-      DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
-      pthread_mutex_lock(&tmp_share->mutex);
-      bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
-      DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
-      if (opt_ndb_extra_logging)
-      {
-        sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
-                              " Subscriber bitmask %x%x",
-                              pOp->getNdbdNodeId(),
-                              req_id,
-                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
-                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
-      }
-      pthread_mutex_unlock(&tmp_share->mutex);
-      (void) pthread_cond_signal(&injector_cond);
-      break;
-    }
-    case NDBEVENT::TE_UNSUBSCRIBE:
-    {
-      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
-      uint8 req_id= pOp->getReqNodeId();
-      DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
-      pthread_mutex_lock(&tmp_share->mutex);
-      bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
-      DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
-      if (opt_ndb_extra_logging)
-      {
-        sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
-                              " Subscriber bitmask %x%x",
-                              pOp->getNdbdNodeId(),
-                              req_id,
-                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
-                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
-      }
-      pthread_mutex_unlock(&tmp_share->mutex);
-      (void) pthread_cond_signal(&injector_cond);
-      break;
-    }
-    default:
-      sql_print_error("NDB Binlog: unknown non data event %d for %s. "
-                      "Ignoring...", (unsigned) ev_type, tmp_share->key);
     }
+    DBUG_RETURN(0);
   }
-  DBUG_RETURN(0);
-}
-
-/*
-  process any operations that should be done after
-  the epoch is complete
-*/
-static void
-handle_schema_log_post_epoch(THD *thd,
-                             List<Cluster_schema> *log_list)
-{
-  DBUG_ENTER("handle_schema_log_post_epoch");
 
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  Ndb *ndb= thd_ndb->ndb;
-  NDBDICT *dict= ndb->getDictionary();
-
-  Cluster_schema *schema;
-  while ((schema= log_list->pop()))
+  void
+  handle_schema_op_post_epoch(Ndb_schema_op* schema)
   {
+    DBUG_ENTER("handle_schema_op_post_epoch");
+    THD* thd = m_thd; // Code compatibility
+    Thd_ndb *thd_ndb= get_thd_ndb(thd);
+    Ndb *ndb= thd_ndb->ndb;
+    NDBDICT *dict= ndb->getDictionary();
     Thd_ndb_options_guard thd_ndb_options(thd_ndb);
     DBUG_PRINT("info",
                ("%s.%s: log query_length: %d  query: '%s'  type: %d",
                 schema->db, schema->name,
                 schema->query_length, schema->query,
                 schema->type));
-    int log_query= 0;
+
     {
-      enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type;
+      const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type;
       char key[FN_REFLEN + 1];
       build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0);
       if (schema_type == SOT_CLEAR_SLOCK)
       {
+        /* Ack to any SQL thread waiting for schema op to complete */
         pthread_mutex_lock(&ndbcluster_mutex);
         NDB_SCHEMA_OBJECT *ndb_schema_object=
           (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects,
@@ -2968,10 +2928,10 @@ handle_schema_log_post_epoch(THD *thd,
                                   key, schema->id, schema->version,
                                   ndb_schema_object->slock[0],
                                   ndb_schema_object->slock[1],
-                                  schema->slock[0],
-                                  schema->slock[1]);
+                                  schema->slock_buf[0],
+                                  schema->slock_buf[1]);
           }
-          memcpy(ndb_schema_object->slock, schema->slock,
+          memcpy(ndb_schema_object->slock, schema->slock_buf,
                  sizeof(ndb_schema_object->slock));
           DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap",
                     (uchar*)ndb_schema_object->slock_bitmap.bitmap,
@@ -2998,8 +2958,15 @@ handle_schema_log_post_epoch(THD *thd,
           }
         }
         pthread_mutex_unlock(&ndbcluster_mutex);
-        continue;
+        DBUG_VOID_RETURN;
       }
+
+      if (opt_ndb_extra_logging > 9)
+        sql_print_information("%s - %s.%s",
+                              get_schema_type_name(schema_type),
+                              schema->db ? schema->db : "(null)",
+                              schema->name ? schema->name : "(null)");
+
       /* ndb_share reference temporary, free below */
       NDB_SHARE *share= get_share(key, 0, FALSE, FALSE);
       if (share)
@@ -3010,12 +2977,11 @@ handle_schema_log_post_epoch(THD *thd,
       switch (schema_type)
       {
       case SOT_DROP_DB:
-        log_query= 1;
+        write_schema_op_to_binlog(thd, schema);
         break;
+
       case SOT_DROP_TABLE:
-        if (opt_ndb_extra_logging > 9)
-          sql_print_information("SOT_DROP_TABLE %s.%s", schema->db, schema->name);
-        log_query= 1;
+        write_schema_op_to_binlog(thd, schema);
         {
           ndb->setDatabaseName(schema->db);
           Ndb_table_guard ndbtab_g(dict, schema->name);
@@ -3029,29 +2995,25 @@ handle_schema_log_post_epoch(THD *thd,
           close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE);
         }
         break;
+
       case SOT_RENAME_TABLE:
-        if (opt_ndb_extra_logging > 9)
-          sql_print_information("SOT_RENAME_TABLE %s.%s", schema->db, schema->name);
-        log_query= 1;
+        write_schema_op_to_binlog(thd, schema);
         if (share)
         {
           ndbcluster_rename_share(thd, share);
         }
         break;
+
       case SOT_RENAME_TABLE_PREPARE:
-        if (opt_ndb_extra_logging > 9)
-          sql_print_information("SOT_RENAME_TABLE_PREPARE %s.%s -> %s",
-                                schema->db, schema->name, schema->query);
         if (share &&
             schema->node_id != g_ndb_cluster_connection->node_id())
           ndbcluster_prepare_rename_share(share, schema->query);
         break;
+
       case SOT_ALTER_TABLE_COMMIT:
-        if (opt_ndb_extra_logging > 9)
-          sql_print_information("SOT_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
         if (schema->node_id == g_ndb_cluster_connection->node_id())
           break;
-        log_query= 1;
+        write_schema_op_to_binlog(thd, schema);
         {
           ndb->setDatabaseName(schema->db);
           Ndb_table_guard ndbtab_g(dict, schema->name);
@@ -3112,8 +3074,6 @@ handle_schema_log_post_epoch(THD *thd,
 
       case SOT_ONLINE_ALTER_TABLE_PREPARE:
       {
-        if (opt_ndb_extra_logging > 9)
-          sql_print_information("SOT_ONLINE_ALTER_TABLE_PREPARE %s.%s", schema->db, schema->name);
         int error= 0;
         ndb->setDatabaseName(schema->db);
         {
@@ -3141,7 +3101,7 @@ handle_schema_log_post_epoch(THD *thd,
  
           DBUG_PRINT("info", ("Detected frm change of table %s.%s",
                               schema->db, schema->name));
-          log_query= 1;
+          write_schema_op_to_binlog(thd, schema);
           build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0);
           /*
             If the there is no local table shadowing the altered table and 
@@ -3220,10 +3180,9 @@ handle_schema_log_post_epoch(THD *thd,
         }
         break;
       }
+
       case SOT_ONLINE_ALTER_TABLE_COMMIT:
       {
-        if (opt_ndb_extra_logging > 9)
-          sql_print_information("SOT_ONLINE_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name);
         if (share)
         {
           pthread_mutex_lock(&share->mutex);
@@ -3245,10 +3204,9 @@ handle_schema_log_post_epoch(THD *thd,
         }
         break;
       }
+
       case SOT_RENAME_TABLE_NEW:
-        if (opt_ndb_extra_logging > 9)
-          sql_print_information("SOT_RENAME_TABLE_NEW %s.%s", schema->db, schema->name);
-        log_query= 1;
+        write_schema_op_to_binlog(thd, schema);
         if (ndb_binlog_running && (!share || !share->op))
         {
           /*
@@ -3279,9 +3237,11 @@ handle_schema_log_post_epoch(THD *thd,
           }
         }
         break;
+
       default:
         DBUG_ASSERT(FALSE);
       }
+
       if (share)
       {
         /* ndb_share reference temporary free */
@@ -3291,40 +3251,55 @@ handle_schema_log_post_epoch(THD *thd,
         share= 0;
       }
     }
-    if (ndb_binlog_running && log_query)
-      ndb_binlog_query(thd, schema);
+
+    DBUG_VOID_RETURN;
   }
-  DBUG_VOID_RETURN;
-}
 
+  /*
+    process any operations that should be done after
+    the epoch is complete
+  */
+  void
+  handle_schema_log_post_epoch(List<Ndb_schema_op> *log_list)
+  {
+    DBUG_ENTER("handle_schema_log_post_epoch");
+
+    Ndb_schema_op* schema;
+    while ((schema= log_list->pop()))
+    {
+      handle_schema_op_post_epoch(schema);
+    }
+    DBUG_VOID_RETURN;
+  }
 
-static void
-handle_schema_unlock_post_epoch(THD *thd,
-                                List<Cluster_schema> *unlock_list)
-{
-  DBUG_ENTER("handle_schema_unlock_post_epoch");
 
-  Cluster_schema *schema;
-  while ((schema= unlock_list->pop()))
+  void
+  handle_schema_unlock_post_epoch(List<Ndb_schema_op> *unlock_list)
   {
-    ndbcluster_update_slock(thd, schema->db, schema->name,
-                            schema->id, schema->version);
+    DBUG_ENTER("handle_schema_unlock_post_epoch");
+
+    Ndb_schema_op *schema;
+    while ((schema= unlock_list->pop()))
+    {
+      ack_schema_op(schema->db, schema->name,
+                    schema->id, schema->version);
+    }
+    DBUG_VOID_RETURN;
   }
-  DBUG_VOID_RETURN;
-}
 
   THD* m_thd;
   MEM_ROOT* m_mem_root;
+  uint m_own_nodeid;
 
-  List<Cluster_schema> m_post_epoch_log_list;
-  List<Cluster_schema> m_post_epoch_unlock_list;
+  List<Ndb_schema_op> m_post_epoch_log_list;
+  List<Ndb_schema_op> m_post_epoch_unlock_list;
 
 public:
   Ndb_schema_event_handler(); // Not implemented
   Ndb_schema_event_handler(const Ndb_schema_event_handler&); // Not implemented
 
-  Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root):
-    m_thd(thd), m_mem_root(mem_root)
+  Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root, uint own_nodeid):
+    m_thd(thd), m_mem_root(mem_root), m_own_nodeid(own_nodeid)
   {
   }
 
@@ -3337,19 +3312,148 @@ public:
 
   void handle_event(Ndb* s_ndb, NdbEventOperation *pOp)
   {
-    handle_schema_event(m_thd, s_ndb, pOp,
-                        &m_post_epoch_log_list,
-                        &m_post_epoch_unlock_list,
-                        m_mem_root);
+    DBUG_ENTER("handle_event");
+
+    const Ndb_event_data *event_data=
+      static_cast<const Ndb_event_data*>(pOp->getCustomData());
+
+    if (!check_is_ndb_schema_event(event_data))
+      DBUG_VOID_RETURN;
+
+    const NDBEVENT::TableEvent ev_type= pOp->getEventType();
+    switch (ev_type)
+    {
+    case NDBEVENT::TE_INSERT:
+    case NDBEVENT::TE_UPDATE:
+    {
+      /* ndb_schema table, row INSERTed or UPDATEed*/
+      Ndb_schema_op* schema_op=
+        Ndb_schema_op::create(event_data, pOp->getAnyValue());
+      handle_schema_op(schema_op);
+      break;
+    }
+
+    case NDBEVENT::TE_DELETE:
+      /* ndb_schema table, row DELETEd */
+      break;
+
+    case NDBEVENT::TE_CLUSTER_FAILURE:
+      if (opt_ndb_extra_logging)
+        sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.",
+                              ndb_schema_share->key,
+                              (uint)(pOp->getGCI() >> 32),
+                              (uint)(pOp->getGCI()));
+      // fall through
+    case NDBEVENT::TE_DROP:
+      /* ndb_schema table DROPped */
+      if (opt_ndb_extra_logging &&
+          ndb_binlog_tables_inited && ndb_binlog_running)
+        sql_print_information("NDB Binlog: ndb tables initially "
+                              "read only on reconnect.");
+
+      /* release the ndb_schema_share */
+      pthread_mutex_lock(&ndb_schema_share_mutex);
+      free_share(&ndb_schema_share);
+      ndb_schema_share= 0;
+      ndb_binlog_tables_inited= FALSE;
+      ndb_binlog_is_ready= FALSE;
+      pthread_mutex_unlock(&ndb_schema_share_mutex);
+
+      close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE);
+      // fall through
+    case NDBEVENT::TE_ALTER:
+      /* ndb_schema table ALTERed */
+      ndb_handle_schema_change(m_thd, s_ndb, pOp, event_data);
+      break;
+
+    case NDBEVENT::TE_NODE_FAILURE:
+    {
+      /* Remove all subscribers for node from bitmap in ndb_schema_share */
+      NDB_SHARE *tmp_share= event_data->share;
+      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+      DBUG_ASSERT(node_id != 0xFF);
+      pthread_mutex_lock(&tmp_share->mutex);
+      bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]);
+      DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id));
+      if (opt_ndb_extra_logging)
+      {
+        sql_print_information("NDB Binlog: Node: %d, down,"
+                              " Subscriber bitmask %x%x",
+                              pOp->getNdbdNodeId(),
+                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
+                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
+      }
+      pthread_mutex_unlock(&tmp_share->mutex);
+      (void) pthread_cond_signal(&injector_cond);
+      break;
+    }
+
+    case NDBEVENT::TE_SUBSCRIBE:
+    {
+      /* Add node as subscriber from bitmap in ndb_schema_share */
+      NDB_SHARE *tmp_share= event_data->share;
+      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+      uint8 req_id= pOp->getReqNodeId();
+      DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
+      pthread_mutex_lock(&tmp_share->mutex);
+      bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
+      DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id));
+      if (opt_ndb_extra_logging)
+      {
+        sql_print_information("NDB Binlog: Node: %d, subscribe from node %d,"
+                              " Subscriber bitmask %x%x",
+                              pOp->getNdbdNodeId(),
+                              req_id,
+                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
+                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
+      }
+      pthread_mutex_unlock(&tmp_share->mutex);
+      (void) pthread_cond_signal(&injector_cond);
+      break;
+    }
+
+    case NDBEVENT::TE_UNSUBSCRIBE:
+    {
+      /* Remove node as subscriber from bitmap in ndb_schema_share */
+      NDB_SHARE *tmp_share= event_data->share;
+      uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()];
+      uint8 req_id= pOp->getReqNodeId();
+      DBUG_ASSERT(req_id != 0 && node_id != 0xFF);
+      pthread_mutex_lock(&tmp_share->mutex);
+      bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id);
+      DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id));
+      if (opt_ndb_extra_logging)
+      {
+        sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d,"
+                              " Subscriber bitmask %x%x",
+                              pOp->getNdbdNodeId(),
+                              req_id,
+                              tmp_share->subscriber_bitmap[node_id].bitmap[1],
+                              tmp_share->subscriber_bitmap[node_id].bitmap[0]);
+      }
+      pthread_mutex_unlock(&tmp_share->mutex);
+      (void) pthread_cond_signal(&injector_cond);
+      break;
+    }
+
+    default:
+    {
+      NDB_SHARE *tmp_share= event_data->share;
+      sql_print_error("NDB Binlog: unknown non data event %d for %s. "
+                      "Ignoring...", (unsigned) ev_type, tmp_share->key);
+    }
+    }
+
+    DBUG_VOID_RETURN;
   }
 
   void post_epoch()
   {
     if (m_post_epoch_log_list.elements > 0)
     {
-      handle_schema_log_post_epoch(m_thd, &m_post_epoch_log_list);
+      handle_schema_log_post_epoch(&m_post_epoch_log_list);
       // NOTE post_epoch_unlock_list may not be handled!
-      handle_schema_unlock_post_epoch(m_thd, &m_post_epoch_unlock_list);
+      handle_schema_unlock_post_epoch(&m_post_epoch_unlock_list);
     }
     // There should be no work left todo...
     DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
@@ -4784,7 +4888,7 @@ err:
   *conflict_fn_spec= NULL;
 
   if (ndberror.code && opt_ndb_extra_logging)
-    print_warning_list("NDB", thd_warn_list(thd));
+    thd_print_warning_list(thd, "NDB");
   DBUG_RETURN(ndberror.code);
 }
 
@@ -4990,32 +5094,6 @@ ndbcluster_check_if_local_table(const ch
   DBUG_RETURN(false);
 }
 
-bool
-ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname)
-{
-  DBUG_ENTER("ndbcluster_check_if_local_tables_in_db");
-  DBUG_PRINT("info", ("Looking for files in directory %s", dbname));
-  LEX_STRING *tabname;
-  List<LEX_STRING> files;
-  char path[FN_REFLEN + 1];
-
-  build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0);
-  if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK)
-  {
-    thd->clear_error();
-    DBUG_PRINT("info", ("Failed to find files"));
-    DBUG_RETURN(true);
-  }
-  DBUG_PRINT("info",("found: %d files", files.elements));
-  while ((tabname= files.pop()))
-  {
-    DBUG_PRINT("info", ("Found table %s", tabname->str));
-    if (ndbcluster_check_if_local_table(dbname, tabname->str))
-      DBUG_RETURN(true);
-  }
-  
-  DBUG_RETURN(false);
-}
 
 /*
   Common function for setting up everything for logging a table at
@@ -5930,7 +6008,8 @@ handle_non_data_event(THD *thd,
                       NdbEventOperation *pOp,
                       ndb_binlog_index_row &row)
 {
-  Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
+  const Ndb_event_data* event_data=
+    static_cast<const Ndb_event_data*>(pOp->getCustomData());
   NDB_SHARE *share= event_data->share;
   NDBEVENT::TableEvent type= pOp->getEventType();
 
@@ -6939,7 +7018,9 @@ restart_cluster_failure:
 
     // The Ndb_schema_event_handler does not necessarily need
     // to use the same memroot(or vice versa)
-    Ndb_schema_event_handler schema_event_handler(thd, &mem_root);
+    Ndb_schema_event_handler
+      schema_event_handler(thd, &mem_root,
+                           g_ndb_cluster_connection->node_id());
 
     *root_ptr= &mem_root;
 

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-10-27 09:34:06 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-10-29 14:21:55 +0000
@@ -31,35 +31,6 @@ typedef NdbDictionary::Event  NDBEVENT;
 
 extern handlerton *ndbcluster_hton;
 
-class Ndb_event_data
-{
-public:
-  Ndb_event_data(NDB_SHARE *the_share) :
-    shadow_table(0),
-    share(the_share)
-  {
-    ndb_value[0]= 0;
-    ndb_value[1]= 0;
-  }
-  ~Ndb_event_data()
-  {
-    if (shadow_table)
-      closefrm(shadow_table, 1);
-    shadow_table= 0;
-    free_root(&mem_root, MYF(0));
-    share= 0;
-    /*
-       ndbvalue[] allocated with my_multi_malloc
-       so only first pointer should be freed  
-    */
-    my_free(ndb_value[0], MYF(MY_WME|MY_ALLOW_ZERO_PTR));
-  }
-  MEM_ROOT mem_root;
-  TABLE *shadow_table;
-  NDB_SHARE *share;
-  NdbValue *ndb_value[2];
-};
-
 /*
   The numbers below must not change as they
   are passed between mysql servers, and if changed
@@ -218,6 +189,3 @@ int cmp_frm(const NDBTAB *ndbtab, const 
 */
 bool
 ndbcluster_check_if_local_table(const char *dbname, const char *tabname);
-bool
-ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
-

=== modified file 'sql/ha_ndbcluster_glue.h'
--- a/sql/ha_ndbcluster_glue.h	2011-10-05 07:16:59 +0000
+++ b/sql/ha_ndbcluster_glue.h	2011-10-29 09:18:21 +0000
@@ -129,40 +129,6 @@ Diagnostics_area* thd_stmt_da(THD* thd)
 #endif
 }
 
-/* extract the list of warnings from THD */
-static inline
-List<MYSQL_ERROR>& thd_warn_list(const THD * thd)
-{
-#if MYSQL_VERSION_ID < 50500
-  return const_cast<THD*>(thd)->warn_list;
-#else
-  /* "options" has moved to "variables.option_bits" */
-  return thd->warning_info->warn_list();
-#endif
-}
-
-static inline
-const char* MYSQL_ERROR_get_message_text(const MYSQL_ERROR* err)
-{
-#if MYSQL_VERSION_ID < 50500
-  return err->msg;
-#else
-  /* "msg" is gone, use accessor */
-  return err->get_message_text();
-#endif
-}
-
-static inline
-uint MYSQL_ERROR_get_sql_errno(const MYSQL_ERROR* err)
-{
-#if MYSQL_VERSION_ID < 50500
-  return err->code;
-#else
-  /* "code" is gone, use accessor */
-  return err->get_sql_errno();
-#endif
-}
-
 #if MYSQL_VERSION_ID < 50500
 
 /*

=== added file 'sql/ndb_event_data.cc'
--- a/sql/ndb_event_data.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_event_data.cc	2011-10-29 09:02:21 +0000
@@ -0,0 +1,44 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_event_data.h"
+
+#include <table.h>
+
+
+Ndb_event_data::Ndb_event_data(NDB_SHARE *the_share) :
+  shadow_table(NULL),
+  share(the_share)
+{
+  ndb_value[0]= NULL;
+  ndb_value[1]= NULL;
+}
+
+
+Ndb_event_data::~Ndb_event_data()
+{
+  if (shadow_table)
+    closefrm(shadow_table, 1);
+  shadow_table= NULL;
+  free_root(&mem_root, MYF(0));
+  share= NULL;
+  /*
+    ndbvalue[] allocated with my_multi_malloc -> only
+    first pointer need to be freed
+  */
+  my_free(ndb_value[0]);
+}

=== added file 'sql/ndb_event_data.h'
--- a/sql/ndb_event_data.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_event_data.h	2011-10-29 09:02:21 +0000
@@ -0,0 +1,39 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_EVENT_DATA_H
+#define NDB_EVENT_DATA_H
+
+#include <my_global.h> // my_alloc.h
+#include <my_alloc.h> // MEM_ROOT
+
+class Ndb_event_data
+{
+public:
+  Ndb_event_data(); // Not implemented
+  Ndb_event_data(const Ndb_event_data&); // Not implemented
+  Ndb_event_data(struct NDB_SHARE *the_share);
+
+  ~Ndb_event_data();
+
+  MEM_ROOT mem_root;
+  struct TABLE *shadow_table;
+  struct NDB_SHARE *share;
+  union NdbValue *ndb_value[2];
+};
+
+#endif

=== modified file 'sql/ndb_thd.cc'
--- a/sql/ndb_thd.cc	2011-03-08 22:08:44 +0000
+++ b/sql/ndb_thd.cc	2011-10-29 09:18:21 +0000
@@ -43,3 +43,23 @@ Ndb* check_ndb_in_thd(THD* thd, bool val
 
   return thd_ndb->ndb;
 }
+
+#ifndef MYSQL_SERVER
+#define MYSQL_SERVER
+#endif
+
+#include <sql_class.h>
+
+void
+thd_print_warning_list(THD* thd, const char* prefix)
+{
+  List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
+  MYSQL_ERROR *err;
+  while ((err= it++))
+  {
+    sql_print_warning("%s: (%d)%s",
+                      prefix,
+                      err->get_sql_errno(),
+                      err->get_message_text());
+  }
+}

=== modified file 'sql/ndb_thd.h'
--- a/sql/ndb_thd.h	2011-03-08 12:59:56 +0000
+++ b/sql/ndb_thd.h	2011-10-29 09:18:21 +0000
@@ -51,4 +51,9 @@ thd_set_thd_ndb(THD *thd, class Thd_ndb 
 /* Make sure THD has a Thd_ndb struct assigned */
 class Ndb* check_ndb_in_thd(THD* thd, bool validate_ndb= false);
 
+
+/* Print thd's list of warnings to error log */
+void
+thd_print_warning_list(THD* thd, const char* prefix);
+
 #endif

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-10-27 09:34:06 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-10-29 09:02:21 +0000
@@ -83,6 +83,7 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ndb_anyvalue.cc
   ../../sql/ndb_ndbapi_util.cc
   ../../sql/ndb_binlog_extra_row_info.cc
+  ../../sql/ndb_event_data.cc
 )
 
 # Include directories used when building ha_ndbcluster

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster branch (magnus.blaudd:3619 to 3620) magnus.blaudd1 Nov