List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:October 27 2011 12:44pm
Subject:bzr push into mysql-trunk-cluster branch (magnus.blaudd:3402 to 3403)
View as plain text  
 3403 magnus.blaudd@stripped	2011-10-27 [merge]
      Merge 5.5-cluster -> trunk-cluster

    added:
      sql/ndb_anyvalue.cc
      sql/ndb_anyvalue.h
      sql/ndb_binlog_extra_row_info.cc
      sql/ndb_binlog_extra_row_info.h
      sql/ndb_ndbapi_util.cc
      sql/ndb_ndbapi_util.h
    modified:
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ha_ndbcluster_connection.cc
      sql/ndb_share.h
      storage/ndb/CMakeLists.txt
 3402 magnus.blaudd@stripped	2011-10-27 [merge]
      Merge 5.5-cluster -> trunk-cluster

    added:
      mysql-test/include/not_ndb_is.inc
      mysql-test/suite/rpl_ndb/r/rpl_ndb_not_null.result
      mysql-test/suite/rpl_ndb/t/rpl_ndb_not_null.test
      storage/ndb/cmake/ndb_get_config_value.cmake
    modified:
      mysql-test/r/information_schema.result
      mysql-test/r/information_schema_db.result
      mysql-test/suite/funcs_1/r/is_columns_is.result
      mysql-test/suite/funcs_1/r/is_tables_is.result
      mysql-test/suite/funcs_1/t/is_columns_is.test
      mysql-test/suite/funcs_1/t/is_tables_is.test
      mysql-test/suite/ndb/include/have_clusterj.inc
      mysql-test/suite/ndb/r/ndb_join_pushdown.result
      mysql-test/suite/ndb/t/have_ndb_dist_priv.inc
      mysql-test/suite/ndb/t/ndb_join_pushdown.test
      mysql-test/suite/ndb_big/rqg_spj.test
      mysql-test/t/information_schema.test
      mysql-test/t/information_schema_db.test
      mysql-test/t/mysqlshow.test
      sql/abstract_query_plan.cc
      sql/abstract_query_plan.h
      sql/ha_ndb_index_stat.cc
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ha_ndbcluster_connection.cc
      sql/ha_ndbinfo.cc
      sql/ha_ndbinfo.h
      sql/ndb_local_connection.cc
      sql/ndb_thd_ndb.cc
      storage/ndb/CMakeLists.txt
      storage/ndb/VERSION
      storage/ndb/clusterj/CMakeLists.txt
      storage/ndb/clusterj/clusterj-api/CMakeLists.txt
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/AndPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/BetweenPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/CandidateIndexImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/ComparativePredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/EqualPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/GreaterEqualPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/GreaterThanPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/InPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/LessEqualPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/LessThanPredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/PredicateImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/PropertyImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryExecutionContextImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/QueryExecutionContext.java
      storage/ndb/clusterj/clusterj-jpatest/CMakeLists.txt
      storage/ndb/clusterj/clusterj-openjpa/CMakeLists.txt
      storage/ndb/clusterj/clusterj-test/CMakeLists.txt
      storage/ndb/include/CMakeLists.txt
      storage/ndb/include/kernel/signaldata/DiGetNodes.hpp
      storage/ndb/include/ndb_global.h
      storage/ndb/include/ndbapi/Ndb.hpp
      storage/ndb/include/ndbapi/NdbScanOperation.hpp
      storage/ndb/include/util/OutputStream.hpp
      storage/ndb/src/CMakeLists.txt
      storage/ndb/src/common/debugger/EventLogger.cpp
      storage/ndb/src/common/debugger/SignalLoggerManager.cpp
      storage/ndb/src/common/logger/LogHandler.cpp
      storage/ndb/src/common/logger/Logger.cpp
      storage/ndb/src/common/portlib/NdbConfig.c
      storage/ndb/src/common/portlib/NdbDir.cpp
      storage/ndb/src/common/portlib/NdbThread.c
      storage/ndb/src/common/portlib/ndb_daemon.cc
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/common/util/BaseString.cpp
      storage/ndb/src/common/util/ConfigValues.cpp
      storage/ndb/src/common/util/File.cpp
      storage/ndb/src/common/util/InputStream.cpp
      storage/ndb/src/common/util/NdbSqlUtil.cpp
      storage/ndb/src/common/util/OutputStream.cpp
      storage/ndb/src/common/util/Parser.cpp
      storage/ndb/src/common/util/Properties.cpp
      storage/ndb/src/common/util/ndb_init.cpp
      storage/ndb/src/common/util/ndbzio.c
      storage/ndb/src/common/util/socket_io.cpp
      storage/ndb/src/cw/cpcd/APIService.cpp
      storage/ndb/src/cw/cpcd/CPCD.cpp
      storage/ndb/src/cw/cpcd/Monitor.cpp
      storage/ndb/src/cw/cpcd/Process.cpp
      storage/ndb/src/kernel/blocks/backup/read.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dbdih/printSysfile.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dblqh/redoLogReader/reader.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp
      storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
      storage/ndb/src/kernel/blocks/ndbfs/AsyncFile.cpp
      storage/ndb/src/kernel/blocks/ndbfs/Filename.cpp
      storage/ndb/src/kernel/blocks/ndbfs/Win32AsyncFile.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
      storage/ndb/src/kernel/error/ErrorReporter.cpp
      storage/ndb/src/kernel/error/ndbd_exit_codes.c
      storage/ndb/src/kernel/vm/NdbinfoTables.cpp
      storage/ndb/src/kernel/vm/SimulatedBlock.cpp
      storage/ndb/src/mgmapi/mgmapi.cpp
      storage/ndb/src/mgmapi/ndb_logevent.cpp
      storage/ndb/src/mgmclient/CommandInterpreter.cpp
      storage/ndb/src/mgmsrv/Defragger.hpp
      storage/ndb/src/mgmsrv/InitConfigFileParser.cpp
      storage/ndb/src/mgmsrv/MgmtSrvr.cpp
      storage/ndb/src/mgmsrv/Services.cpp
      storage/ndb/src/ndbapi/Ndb.cpp
      storage/ndb/src/ndbapi/NdbBlob.cpp
      storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
      storage/ndb/src/ndbapi/NdbImpl.hpp
      storage/ndb/src/ndbapi/NdbOperationExec.cpp
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
      storage/ndb/src/ndbapi/NdbTransaction.cpp
      storage/ndb/src/ndbapi/NdbWaitGroup.cpp
      storage/ndb/src/ndbapi/Ndbinit.cpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/ndberror.c
      storage/ndb/test/include/NDBT_Table.hpp
      storage/ndb/test/include/NdbMgmd.hpp
      storage/ndb/test/ndbapi/ScanFunctions.hpp
      storage/ndb/test/ndbapi/testDict.cpp
      storage/ndb/test/ndbapi/testMgm.cpp
      storage/ndb/test/ndbapi/testNodeRestart.cpp
      storage/ndb/test/rqg/parseargs.sh
      storage/ndb/test/rqg/run_rqg.sh
      storage/ndb/test/run-test/atrt.hpp
      storage/ndb/test/run-test/files.cpp
      storage/ndb/test/run-test/main.cpp
      storage/ndb/test/src/DbUtil.cpp
      storage/ndb/test/src/HugoQueries.cpp
      storage/ndb/test/src/HugoQueryBuilder.cpp
      storage/ndb/test/src/NDBT_Test.cpp
      storage/ndb/test/src/NdbBackup.cpp
      storage/ndb/test/src/NdbRestarter.cpp
      storage/ndb/test/src/getarg.c
      storage/ndb/test/tools/cpcc.cpp
      storage/ndb/tools/CMakeLists.txt
      storage/ndb/tools/ndb_dump_frm_data.cpp
      storage/ndb/tools/ndb_index_stat.cpp
      storage/ndb/tools/restore/consumer_restore.cpp
      storage/ndb/tools/waiter.cpp
      support-files/compiler_warnings.supp
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster.cc	2011-10-27 12:33:39 +0000
@@ -51,6 +51,8 @@
 #include <ndb_version.h>
 #include "ndb_mi.h"
 #include "ndb_conflict_trans.h"
+#include "ndb_anyvalue.h"
+#include "ndb_binlog_extra_row_info.h"
 
 // ndb interface initialization/cleanup
 extern "C" void ndb_init_internal();
@@ -1876,87 +1878,6 @@ ha_ndbcluster::set_blob_values(const Ndb
   DBUG_RETURN(res);
 }
 
-/*
-  This routine is shared by injector.  There is no common blobs buffer
-  so the buffer and length are passed by reference.  Injector also
-  passes a record pointer diff.
- */
-int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
-                        uchar*& buffer, uint& buffer_size,
-                        my_ptrdiff_t ptrdiff)
-{
-  DBUG_ENTER("get_ndb_blobs_value");
-
-  // Field has no field number so cannot use TABLE blob_field
-  // Loop twice, first only counting total buffer size
-  for (int loop= 0; loop <= 1; loop++)
-  {
-    uint32 offset= 0;
-    for (uint i= 0; i < table->s->fields; i++)
-    {
-      Field *field= table->field[i];
-      NdbValue value= value_array[i];
-      if (! (field->flags & BLOB_FLAG))
-        continue;
-      if (value.blob == NULL)
-      {
-        DBUG_PRINT("info",("[%u] skipped", i));
-        continue;
-      }
-      Field_blob *field_blob= (Field_blob *)field;
-      NdbBlob *ndb_blob= value.blob;
-      int isNull;
-      if (ndb_blob->getNull(isNull) != 0)
-        ERR_RETURN(ndb_blob->getNdbError());
-      if (isNull == 0) {
-        Uint64 len64= 0;
-        if (ndb_blob->getLength(len64) != 0)
-          ERR_RETURN(ndb_blob->getNdbError());
-        // Align to Uint64
-        uint32 size= Uint32(len64);
-        if (size % 8 != 0)
-          size+= 8 - size % 8;
-        if (loop == 1)
-        {
-          uchar *buf= buffer + offset;
-          uint32 len= 0xffffffff;  // Max uint32
-          if (ndb_blob->readData(buf, len) != 0)
-            ERR_RETURN(ndb_blob->getNdbError());
-          DBUG_PRINT("info", ("[%u] offset: %u  buf: 0x%lx  len=%u  [ptrdiff=%d]",
-                              i, offset, (long) buf, len, (int)ptrdiff));
-          DBUG_ASSERT(len == len64);
-          // Ugly hack assumes only ptr needs to be changed
-          field_blob->set_ptr_offset(ptrdiff, len, buf);
-        }
-        offset+= size;
-      }
-      else if (loop == 1) // undefined or null
-      {
-        // have to set length even in this case
-        uchar *buf= buffer + offset; // or maybe NULL
-        uint32 len= 0;
-	field_blob->set_ptr_offset(ptrdiff, len, buf);
-        DBUG_PRINT("info", ("[%u] isNull=%d", i, isNull));
-      }
-    }
-    if (loop == 0 && offset > buffer_size)
-    {
-      my_free(buffer, MYF(MY_ALLOW_ZERO_PTR));
-      buffer_size= 0;
-      DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
-      buffer= (uchar*) my_malloc(offset, MYF(MY_WME));
-      if (buffer == NULL)
-      {
-        sql_print_error("ha_ndbcluster::get_ndb_blobs_value: "
-                        "my_malloc(%u) failed", offset);
-        DBUG_RETURN(-1);
-      }
-      buffer_size= offset;
-    }
-  }
-  DBUG_RETURN(0);
-}
-
 
 /**
   Check if any set or get of blob value in current query.
@@ -4373,6 +4294,24 @@ ha_ndbcluster::set_auto_inc(THD *thd, Fi
   DBUG_RETURN(set_auto_inc_val(thd, next_val));
 }
 
+
+class Ndb_tuple_id_range_guard {
+  NDB_SHARE* m_share;
+public:
+  Ndb_tuple_id_range_guard(NDB_SHARE* share) :
+    m_share(share),
+    range(share->tuple_id_range)
+  {
+    pthread_mutex_lock(&m_share->mutex);
+  }
+  ~Ndb_tuple_id_range_guard()
+  {
+    pthread_mutex_unlock(&m_share->mutex);
+  }
+  Ndb::TupleIdRange& range;
+};
+
+
 inline
 int
 ha_ndbcluster::set_auto_inc_val(THD *thd, Uint64 value)
@@ -12186,137 +12125,6 @@ int ndb_create_table_from_engine(THD *th
   return res;
 }
 
-/*
-  find all tables in ndb and discover those needed
-*/
-int ndbcluster_find_all_files(THD *thd)
-{
-  Ndb* ndb;
-  char key[FN_REFLEN + 1];
-  NDBDICT *dict;
-  int unhandled, retries= 5, skipped;
-  DBUG_ENTER("ndbcluster_find_all_files");
-
-  if (!(ndb= check_ndb_in_thd(thd)))
-    DBUG_RETURN(HA_ERR_NO_CONNECTION);
-
-  dict= ndb->getDictionary();
-
-  LINT_INIT(unhandled);
-  LINT_INIT(skipped);
-  do
-  {
-    NdbDictionary::Dictionary::List list;
-    if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
-      ERR_RETURN(dict->getNdbError());
-    unhandled= 0;
-    skipped= 0;
-    retries--;
-    for (uint i= 0 ; i < list.count ; i++)
-    {
-      NDBDICT::List::Element& elmt= list.elements[i];
-      if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
-      {
-        DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
-        continue;
-      }
-      DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
-      if (elmt.state != NDBOBJ::StateOnline &&
-          elmt.state != NDBOBJ::StateBackup &&
-          elmt.state != NDBOBJ::StateBuilding)
-      {
-        sql_print_information("NDB: skipping setup table %s.%s, in state %d",
-                              elmt.database, elmt.name, elmt.state);
-        skipped++;
-        continue;
-      }
-
-      ndb->setDatabaseName(elmt.database);
-      Ndb_table_guard ndbtab_g(dict, elmt.name);
-      const NDBTAB *ndbtab= ndbtab_g.get_table();
-      if (!ndbtab)
-      {
-        if (retries == 0)
-          sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
-                          elmt.database, elmt.name,
-                          dict->getNdbError().code,
-                          dict->getNdbError().message);
-        unhandled++;
-        continue;
-      }
-
-      if (ndbtab->getFrmLength() == 0)
-        continue;
-    
-      /* check if database exists */
-      char *end= key +
-        build_table_filename(key, sizeof(key) - 1, elmt.database, "", "", 0);
-      if (my_access(key, F_OK))
-      {
-        /* no such database defined, skip table */
-        continue;
-      }
-      /* finalize construction of path */
-      end+= tablename_to_filename(elmt.name, end,
-                                  (uint)(sizeof(key)-(end-key)));
-      uchar *data= 0, *pack_data= 0;
-      size_t length, pack_length;
-      int discover= 0;
-      if (readfrm(key, &data, &length) ||
-          packfrm(data, length, &pack_data, &pack_length))
-      {
-        discover= 1;
-        sql_print_information("NDB: missing frm for %s.%s, discovering...",
-                              elmt.database, elmt.name);
-      }
-      else if (cmp_frm(ndbtab, pack_data, pack_length))
-      {
-        /* ndb_share reference temporary */
-        NDB_SHARE *share= get_share(key, 0, FALSE);
-        if (share)
-        {
-          DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
-                                   share->key, share->use_count));
-        }
-        if (!share || get_ndb_share_state(share) != NSS_ALTERED)
-        {
-          discover= 1;
-          sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
-                                elmt.database, elmt.name);
-        }
-        if (share)
-        {
-          /* ndb_share reference temporary free */
-          DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
-                                   share->key, share->use_count));
-          free_share(&share);
-        }
-      }
-      my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
-      my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
-
-      if (discover)
-      {
-        /* ToDo 4.1 database needs to be created if missing */
-        if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
-        {
-          /* ToDo 4.1 handle error */
-        }
-      }
-      else
-      {
-        /* set up replication for this table */
-        ndbcluster_create_binlog_setup(thd, ndb, key, (uint)(end-key),
-                                       elmt.database, elmt.name,
-                                       0);
-      }
-    }
-  }
-  while (unhandled && retries);
-
-  DBUG_RETURN(-(skipped + unhandled));
-}
-
 
 static int
 ndbcluster_find_files(handlerton *hton, THD *thd,

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster.h	2011-10-27 12:33:39 +0000
@@ -108,71 +108,9 @@ public:
   Uint32 old_table_version;
 };
 
-typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
-
-int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
-                        uchar*& buffer, uint& buffer_size,
-                        my_ptrdiff_t ptrdiff);
-
+#include "ndb_ndbapi_util.h"
 #include "ndb_share.h"
 
-struct Ndb_tuple_id_range_guard {
-  Ndb_tuple_id_range_guard(NDB_SHARE* _share) :
-    share(_share),
-    range(share->tuple_id_range) {
-    pthread_mutex_lock(&share->mutex);
-  }
-  ~Ndb_tuple_id_range_guard() {
-    pthread_mutex_unlock(&share->mutex);
-  }
-  NDB_SHARE* share;
-  Ndb::TupleIdRange& range;
-};
-
-/* NDB_SHARE.flags */
-#define NSF_HIDDEN_PK   1u /* table has hidden primary key */
-#define NSF_BLOB_FLAG   2u /* table has blob attributes */
-#define NSF_NO_BINLOG   4u /* table should not be binlogged */
-#define NSF_BINLOG_FULL 8u /* table should be binlogged with full rows */
-#define NSF_BINLOG_USE_UPDATE 16u  /* table update should be binlogged using
-                                     update log event */
-inline void set_binlog_logging(NDB_SHARE *share)
-{
-  DBUG_PRINT("info", ("set_binlog_logging"));
-  share->flags&= ~NSF_NO_BINLOG;
-}
-inline void set_binlog_nologging(NDB_SHARE *share)
-{
-  DBUG_PRINT("info", ("set_binlog_nologging"));
-  share->flags|= NSF_NO_BINLOG;
-}
-inline my_bool get_binlog_nologging(NDB_SHARE *share)
-{ return (share->flags & NSF_NO_BINLOG) != 0; }
-inline void set_binlog_updated_only(NDB_SHARE *share)
-{
-  DBUG_PRINT("info", ("set_binlog_updated_only"));
-  share->flags&= ~NSF_BINLOG_FULL;
-}
-inline void set_binlog_full(NDB_SHARE *share)
-{
-  DBUG_PRINT("info", ("set_binlog_full"));
-  share->flags|= NSF_BINLOG_FULL;
-}
-inline my_bool get_binlog_full(NDB_SHARE *share)
-{ return (share->flags & NSF_BINLOG_FULL) != 0; }
-inline void set_binlog_use_write(NDB_SHARE *share)
-{
-  DBUG_PRINT("info", ("set_binlog_use_write"));
-  share->flags&= ~NSF_BINLOG_USE_UPDATE;
-}
-inline void set_binlog_use_update(NDB_SHARE *share)
-{
-  DBUG_PRINT("info", ("set_binlog_use_update"));
-  share->flags|= NSF_BINLOG_USE_UPDATE;
-}
-inline my_bool get_binlog_use_update(NDB_SHARE *share)
-{ return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
-
 enum enum_slave_trans_conflict_apply_state
 {
   /* Normal with optional row-level conflict detection */

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-10-27 12:33:39 +0000
@@ -56,6 +56,8 @@ bool ndb_log_empty_epochs(void);
 #include "ha_ndbcluster_tables.h"
 
 #include "ndb_dist_priv_util.h"
+#include "ndb_anyvalue.h"
+#include "ndb_binlog_extra_row_info.h"
 
 /*
   Timeout for syncing schema events between
@@ -1313,6 +1315,141 @@ static int ndbcluster_find_all_databases
   }
 }
 
+
+/*
+  find all tables in ndb and discover those needed
+*/
+static
+int ndbcluster_find_all_files(THD *thd)
+{
+  Ndb* ndb;
+  char key[FN_REFLEN + 1];
+  NDBDICT *dict;
+  int unhandled, retries= 5, skipped;
+  DBUG_ENTER("ndbcluster_find_all_files");
+
+  if (!(ndb= check_ndb_in_thd(thd)))
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+  dict= ndb->getDictionary();
+
+  LINT_INIT(unhandled);
+  LINT_INIT(skipped);
+  do
+  {
+    NdbDictionary::Dictionary::List list;
+    if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
+      DBUG_RETURN(1);
+    unhandled= 0;
+    skipped= 0;
+    retries--;
+    for (uint i= 0 ; i < list.count ; i++)
+    {
+      NDBDICT::List::Element& elmt= list.elements[i];
+      if (IS_TMP_PREFIX(elmt.name) || IS_NDB_BLOB_PREFIX(elmt.name))
+      {
+        DBUG_PRINT("info", ("Skipping %s.%s in NDB", elmt.database, elmt.name));
+        continue;
+      }
+      DBUG_PRINT("info", ("Found %s.%s in NDB", elmt.database, elmt.name));
+      if (elmt.state != NDBOBJ::StateOnline &&
+          elmt.state != NDBOBJ::StateBackup &&
+          elmt.state != NDBOBJ::StateBuilding)
+      {
+        sql_print_information("NDB: skipping setup table %s.%s, in state %d",
+                              elmt.database, elmt.name, elmt.state);
+        skipped++;
+        continue;
+      }
+
+      ndb->setDatabaseName(elmt.database);
+      Ndb_table_guard ndbtab_g(dict, elmt.name);
+      const NDBTAB *ndbtab= ndbtab_g.get_table();
+      if (!ndbtab)
+      {
+        if (retries == 0)
+          sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+                          elmt.database, elmt.name,
+                          dict->getNdbError().code,
+                          dict->getNdbError().message);
+        unhandled++;
+        continue;
+      }
+
+      if (ndbtab->getFrmLength() == 0)
+        continue;
+
+      /* check if database exists */
+      char *end= key +
+        build_table_filename(key, sizeof(key) - 1, elmt.database, "", "", 0);
+      if (my_access(key, F_OK))
+      {
+        /* no such database defined, skip table */
+        continue;
+      }
+      /* finalize construction of path */
+      end+= tablename_to_filename(elmt.name, end,
+                                  (uint)(sizeof(key)-(end-key)));
+      uchar *data= 0, *pack_data= 0;
+      size_t length, pack_length;
+      int discover= 0;
+      if (readfrm(key, &data, &length) ||
+          packfrm(data, length, &pack_data, &pack_length))
+      {
+        discover= 1;
+        sql_print_information("NDB: missing frm for %s.%s, discovering...",
+                              elmt.database, elmt.name);
+      }
+      else if (cmp_frm(ndbtab, pack_data, pack_length))
+      {
+        /* ndb_share reference temporary */
+        NDB_SHARE *share= get_share(key, 0, FALSE);
+        if (share)
+        {
+          DBUG_PRINT("NDB_SHARE", ("%s temporary  use_count: %u",
+                                   share->key, share->use_count));
+        }
+        if (!share || get_ndb_share_state(share) != NSS_ALTERED)
+        {
+          discover= 1;
+          sql_print_information("NDB: mismatch in frm for %s.%s,"
+                                " discovering...",
+                                elmt.database, elmt.name);
+        }
+        if (share)
+        {
+          /* ndb_share reference temporary free */
+          DBUG_PRINT("NDB_SHARE", ("%s temporary free  use_count: %u",
+                                   share->key, share->use_count));
+          free_share(&share);
+        }
+      }
+      my_free((char*) data, MYF(MY_ALLOW_ZERO_PTR));
+      my_free((char*) pack_data, MYF(MY_ALLOW_ZERO_PTR));
+
+      if (discover)
+      {
+        /* ToDo 4.1 database needs to be created if missing */
+        if (ndb_create_table_from_engine(thd, elmt.database, elmt.name))
+        {
+          /* ToDo 4.1 handle error */
+        }
+      }
+      else
+      {
+        /* set up replication for this table */
+        ndbcluster_create_binlog_setup(thd, ndb, key, (uint)(end-key),
+                                       elmt.database, elmt.name,
+                                       0);
+      }
+    }
+  }
+  while (unhandled && retries);
+
+  DBUG_RETURN(-(skipped + unhandled));
+}
+
+
 bool
 ndb_binlog_setup(THD *thd)
 {
@@ -1391,307 +1528,6 @@ ndb_binlog_setup(THD *thd)
 #define SCHEMA_SIZE 9u
 #define SCHEMA_SLOCK_SIZE 32u
 
-struct Cluster_schema
-{
-  uchar db_length;
-  char db[64];
-  uchar name_length;
-  char name[64];
-  uchar slock_length;
-  uint32 slock[SCHEMA_SLOCK_SIZE/4];
-  unsigned short query_length;
-  char *query;
-  Uint64 epoch;
-  uint32 node_id;
-  uint32 id;
-  uint32 version;
-  uint32 type;
-  uint32 any_value;
-};
-
-static void
-print_could_not_discover_error(THD *thd,
-                               const Cluster_schema *schema)
-{
-  sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
-                  "binlog schema event '%s' from node %d. "
-                  "my_errno: %d",
-                   schema->db, schema->name, schema->query,
-                   schema->node_id, my_errno);
-  print_warning_list("NDB Binlog", thd);
-}
-
-
-/*
-  Transfer schema table data into corresponding struct
-*/
-static void ndbcluster_get_schema(Ndb_event_data *event_data,
-                                  Cluster_schema *s)
-{
-  TABLE *table= event_data->shadow_table;
-  Field **field;
-  /* unpack blob values */
-  uchar* blobs_buffer= 0;
-  uint blobs_buffer_size= 0;
-  my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
-  {
-    ptrdiff_t ptrdiff= 0;
-    int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
-                                 blobs_buffer, blobs_buffer_size,
-                                 ptrdiff);
-    if (ret != 0)
-    {
-      my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
-      DBUG_PRINT("info", ("blob read error"));
-      DBUG_ASSERT(FALSE);
-    }
-  }
-  /* db varchar 1 length uchar */
-  field= table->field;
-  s->db_length= *(uint8*)(*field)->ptr;
-  DBUG_ASSERT(s->db_length <= (*field)->field_length);
-  DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
-  memcpy(s->db, (*field)->ptr + 1, s->db_length);
-  s->db[s->db_length]= 0;
-  /* name varchar 1 length uchar */
-  field++;
-  s->name_length= *(uint8*)(*field)->ptr;
-  DBUG_ASSERT(s->name_length <= (*field)->field_length);
-  DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
-  memcpy(s->name, (*field)->ptr + 1, s->name_length);
-  s->name[s->name_length]= 0;
-  /* slock fixed length */
-  field++;
-  s->slock_length= (*field)->field_length;
-  DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
-  memcpy(s->slock, (*field)->ptr, s->slock_length);
-  /* query blob */
-  field++;
-  {
-    Field_blob *field_blob= (Field_blob*)(*field);
-    uint blob_len= field_blob->get_length((*field)->ptr);
-    uchar *blob_ptr= 0;
-    field_blob->get_ptr(&blob_ptr);
-    DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
-    s->query_length= blob_len;
-    s->query= sql_strmake((char*) blob_ptr, blob_len);
-  }
-  /* node_id */
-  field++;
-  s->node_id= (Uint32)((Field_long *)*field)->val_int();
-  /* epoch */
-  field++;
-  s->epoch= ((Field_long *)*field)->val_int();
-  /* id */
-  field++;
-  s->id= (Uint32)((Field_long *)*field)->val_int();
-  /* version */
-  field++;
-  s->version= (Uint32)((Field_long *)*field)->val_int();
-  /* type */
-  field++;
-  s->type= (Uint32)((Field_long *)*field)->val_int();
-  /* free blobs buffer */
-  my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
-  dbug_tmp_restore_column_map(table->read_set, old_map);
-}
-
-/*
-  helper function to pack a ndb varchar
-*/
-char *ndb_pack_varchar(const NDBCOL *col, char *buf,
-                       const char *str, int sz)
-{
-  switch (col->getArrayType())
-  {
-    case NDBCOL::ArrayTypeFixed:
-      memcpy(buf, str, sz);
-      break;
-    case NDBCOL::ArrayTypeShortVar:
-      *(uchar*)buf= (uchar)sz;
-      memcpy(buf + 1, str, sz);
-      break;
-    case NDBCOL::ArrayTypeMediumVar:
-      int2store(buf, sz);
-      memcpy(buf + 2, str, sz);
-      break;
-  }
-  return buf;
-}
-
-/*
-  acknowledge handling of schema operation
-*/
-static int
-ndbcluster_update_slock(THD *thd,
-                        const char *db,
-                        const char *table_name,
-                        uint32 table_id,
-                        uint32 table_version)
-{
-  DBUG_ENTER("ndbcluster_update_slock");
-  if (!ndb_schema_share)
-  {
-    DBUG_RETURN(0);
-  }
-
-  const NdbError *ndb_error= 0;
-  uint32 node_id= g_ndb_cluster_connection->node_id();
-  Ndb *ndb= check_ndb_in_thd(thd);
-  char save_db[FN_HEADLEN];
-  strcpy(save_db, ndb->getDatabaseName());
-
-  char tmp_buf[FN_REFLEN];
-  NDBDICT *dict= ndb->getDictionary();
-  ndb->setDatabaseName(NDB_REP_DB);
-  Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
-  const NDBTAB *ndbtab= ndbtab_g.get_table();
-  NdbTransaction *trans= 0;
-  int retries= 100;
-  int retry_sleep= 30; /* 30 milliseconds, transaction */
-  const NDBCOL *col[SCHEMA_SIZE];
-  unsigned sz[SCHEMA_SIZE];
-
-  MY_BITMAP slock;
-  uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
-  bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
-
-  if (ndbtab == 0)
-  {
-    if (dict->getNdbError().code != 4009)
-      abort();
-    DBUG_RETURN(0);
-  }
-
-  {
-    uint i;
-    for (i= 0; i < SCHEMA_SIZE; i++)
-    {
-      col[i]= ndbtab->getColumn(i);
-      if (i != SCHEMA_QUERY_I)
-      {
-        sz[i]= col[i]->getLength();
-        DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
-      }
-    }
-  }
-
-  while (1)
-  {
-    if ((trans= ndb->startTransaction()) == 0)
-      goto err;
-    {
-      NdbOperation *op= 0;
-      int r= 0;
-
-      /* read the bitmap exlusive */
-      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
-      DBUG_ASSERT(r == 0);
-      r|= op->readTupleExclusive();
-      DBUG_ASSERT(r == 0);
-    
-      /* db */
-      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
-      r|= op->equal(SCHEMA_DB_I, tmp_buf);
-      DBUG_ASSERT(r == 0);
-      /* name */
-      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
-                       (int)strlen(table_name));
-      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
-      DBUG_ASSERT(r == 0);
-      /* slock */
-      r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
-      DBUG_ASSERT(r == 0);
-    }
-    if (trans->execute(NdbTransaction::NoCommit))
-      goto err;
-
-    if (opt_ndb_extra_logging > 19)
-    {
-      uint32 copy[SCHEMA_SLOCK_SIZE/4];
-      memcpy(copy, bitbuf, sizeof(copy));
-      bitmap_clear_bit(&slock, node_id);
-      sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
-                            db, table_name,
-                            table_id, table_version,
-                            copy[0], copy[1],
-                            slock.bitmap[0],
-                            slock.bitmap[1]);
-    }
-    else
-    {
-      bitmap_clear_bit(&slock, node_id);
-    }
-
-    {
-      NdbOperation *op= 0;
-      int r= 0;
-
-      /* now update the tuple */
-      r|= (op= trans->getNdbOperation(ndbtab)) == 0;
-      DBUG_ASSERT(r == 0);
-      r|= op->updateTuple();
-      DBUG_ASSERT(r == 0);
-
-      /* db */
-      ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
-      r|= op->equal(SCHEMA_DB_I, tmp_buf);
-      DBUG_ASSERT(r == 0);
-      /* name */
-      ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
-                       (int)strlen(table_name));
-      r|= op->equal(SCHEMA_NAME_I, tmp_buf);
-      DBUG_ASSERT(r == 0);
-      /* slock */
-      r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
-      DBUG_ASSERT(r == 0);
-      /* node_id */
-      r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
-      DBUG_ASSERT(r == 0);
-      /* type */
-      r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
-      DBUG_ASSERT(r == 0);
-    }
-    if (trans->execute(NdbTransaction::Commit, 
-                       NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
-    {
-      DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
-                          node_id, db, table_name));
-      dict->forceGCPWait(1);
-      break;
-    }
-  err:
-    const NdbError *this_error= trans ?
-      &trans->getNdbError() : &ndb->getNdbError();
-    if (this_error->status == NdbError::TemporaryError && !thd->killed)
-    {
-      if (retries--)
-      {
-        if (trans)
-          ndb->closeTransaction(trans);
-        do_retry_sleep(retry_sleep);
-        continue; // retry
-      }
-    }
-    ndb_error= this_error;
-    break;
-  }
-
-  if (ndb_error)
-  {
-    char buf[1024];
-    my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
-                db, table_name);
-    push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                        ndb_error->code, ndb_error->message, buf);
-  }
-  if (trans)
-    ndb->closeTransaction(trans);
-  ndb->setDatabaseName(save_db);
-  DBUG_RETURN(0);
-}
-
 
 /*
   log query in schema table
@@ -2355,68 +2191,6 @@ ndb_handle_schema_change(THD *thd, Ndb *
   DBUG_RETURN(0);
 }
 
-static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
-{
-  /* any_value == 0 means local cluster sourced change that
-   * should be logged
-   */
-  if (ndbcluster_anyvalue_is_reserved(schema->any_value))
-  {
-    /* Originating SQL node did not want this query logged */
-    if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
-      sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
-                        "query not logged",
-                        schema->any_value);
-    return;
-  }
-  
-  Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
-  /* 
-     Start with serverId as received AnyValue, in case it's a composite
-     (server_id_bits < 31).
-     This is for 'future', as currently schema ops do not have composite
-     AnyValues.
-     In future it may be useful to support *not* mapping composite
-     AnyValues to/from Binlogged server-ids.
-  */
-  Uint32 loggedServerId = schema->any_value;
-
-  if (queryServerId)
-  {
-    /* 
-       AnyValue has non-zero serverId, must be a query applied by a slave 
-       mysqld.
-       TODO : Assert that we are running in the Binlog injector thread?
-    */
-    if (! g_ndb_log_slave_updates)
-    {
-      /* This MySQLD does not log slave updates */
-      return;
-    }
-  }
-  else
-  {
-    /* No ServerId associated with this query, mark it as ours */
-    ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
-  }
-
-  uint32 thd_server_id_save= thd->server_id;
-  DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
-  char *thd_db_save= thd->db;
-  thd->server_id = loggedServerId;
-  thd->db= schema->db;
-  int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
-  thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
-                    schema->query_length, FALSE,
-#ifdef NDB_THD_BINLOG_QUERY_HAS_DIRECT
-                    TRUE,
-#endif
-                    schema->name[0] == 0 || thd->db[0] == 0,
-                    errcode);
-  thd->server_id= thd_server_id_save;
-  thd->db= thd_db_save;
-}
-
 
 class Mutex_guard
 {
@@ -2434,16 +2208,361 @@ private:
 };
 
 
+class Ndb_schema_event_handler {
+
+  struct Cluster_schema
+  {
+    uchar db_length;
+    char db[64];
+    uchar name_length;
+    char name[64];
+    uchar slock_length;
+    uint32 slock[SCHEMA_SLOCK_SIZE/4];
+    unsigned short query_length;
+    char *query;
+    Uint64 epoch;
+    uint32 node_id;
+    uint32 id;
+    uint32 version;
+    uint32 type;
+    uint32 any_value;
+  };
+
+
+  static void
+  print_could_not_discover_error(THD *thd,
+                                 const Cluster_schema *schema)
+  {
+    sql_print_error("NDB Binlog: Could not discover table '%s.%s' from "
+                    "binlog schema event '%s' from node %d. "
+                    "my_errno: %d",
+                     schema->db, schema->name, schema->query,
+                     schema->node_id, my_errno);
+    print_warning_list("NDB Binlog", thd);
+  }
+
+
+  /*
+    Transfer schema table event data into Cluster_schema struct
+  */
+  static void ndbcluster_get_schema(const Ndb_event_data *event_data,
+                                    Cluster_schema *s)
+  {
+    TABLE *table= event_data->shadow_table;
+    Field **field;
+    /* unpack blob values */
+    uchar* blobs_buffer= 0;
+    uint blobs_buffer_size= 0;
+    my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set);
+    {
+      ptrdiff_t ptrdiff= 0;
+      int ret= get_ndb_blobs_value(table, event_data->ndb_value[0],
+                                   blobs_buffer, blobs_buffer_size,
+                                   ptrdiff);
+      if (ret != 0)
+      {
+        my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+        DBUG_PRINT("info", ("blob read error"));
+        DBUG_ASSERT(FALSE);
+      }
+    }
+    /* db varchar 1 length uchar */
+    field= table->field;
+    s->db_length= *(uint8*)(*field)->ptr;
+    DBUG_ASSERT(s->db_length <= (*field)->field_length);
+    DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db));
+    memcpy(s->db, (*field)->ptr + 1, s->db_length);
+    s->db[s->db_length]= 0;
+    /* name varchar 1 length uchar */
+    field++;
+    s->name_length= *(uint8*)(*field)->ptr;
+    DBUG_ASSERT(s->name_length <= (*field)->field_length);
+    DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name));
+    memcpy(s->name, (*field)->ptr + 1, s->name_length);
+    s->name[s->name_length]= 0;
+    /* slock fixed length */
+    field++;
+    s->slock_length= (*field)->field_length;
+    DBUG_ASSERT((*field)->field_length == sizeof(s->slock));
+    memcpy(s->slock, (*field)->ptr, s->slock_length);
+    /* query blob */
+    field++;
+    {
+      Field_blob *field_blob= (Field_blob*)(*field);
+      uint blob_len= field_blob->get_length((*field)->ptr);
+      uchar *blob_ptr= 0;
+      field_blob->get_ptr(&blob_ptr);
+      DBUG_ASSERT(blob_len == 0 || blob_ptr != 0);
+      s->query_length= blob_len;
+      s->query= sql_strmake((char*) blob_ptr, blob_len);
+    }
+    /* node_id */
+    field++;
+    s->node_id= (Uint32)((Field_long *)*field)->val_int();
+    /* epoch */
+    field++;
+    s->epoch= ((Field_long *)*field)->val_int();
+    /* id */
+    field++;
+    s->id= (Uint32)((Field_long *)*field)->val_int();
+    /* version */
+    field++;
+    s->version= (Uint32)((Field_long *)*field)->val_int();
+    /* type */
+    field++;
+    s->type= (Uint32)((Field_long *)*field)->val_int();
+    /* free blobs buffer */
+    my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
+    dbug_tmp_restore_column_map(table->read_set, old_map);
+  }
+
+
+  static void ndb_binlog_query(THD *thd, Cluster_schema *schema)
+  {
+    /* any_value == 0 means local cluster sourced change that
+     * should be logged
+     */
+    if (ndbcluster_anyvalue_is_reserved(schema->any_value))
+    {
+      /* Originating SQL node did not want this query logged */
+      if (!ndbcluster_anyvalue_is_nologging(schema->any_value))
+        sql_print_warning("NDB: unknown value for binlog signalling 0x%X, "
+                          "query not logged",
+                          schema->any_value);
+      return;
+    }
+
+    Uint32 queryServerId = ndbcluster_anyvalue_get_serverid(schema->any_value);
+    /*
+       Start with serverId as received AnyValue, in case it's a composite
+       (server_id_bits < 31).
+       This is for 'future', as currently schema ops do not have composite
+       AnyValues.
+       In future it may be useful to support *not* mapping composite
+       AnyValues to/from Binlogged server-ids.
+    */
+    Uint32 loggedServerId = schema->any_value;
+
+    if (queryServerId)
+    {
+      /*
+         AnyValue has non-zero serverId, must be a query applied by a slave
+         mysqld.
+         TODO : Assert that we are running in the Binlog injector thread?
+      */
+      if (! g_ndb_log_slave_updates)
+      {
+        /* This MySQLD does not log slave updates */
+        return;
+      }
+    }
+    else
+    {
+      /* No ServerId associated with this query, mark it as ours */
+      ndbcluster_anyvalue_set_serverid(loggedServerId, ::server_id);
+    }
+
+    uint32 thd_server_id_save= thd->server_id;
+    DBUG_ASSERT(sizeof(thd_server_id_save) == sizeof(thd->server_id));
+    char *thd_db_save= thd->db;
+    thd->server_id = loggedServerId;
+    thd->db= schema->db;
+    int errcode = query_error_code(thd, thd->killed == THD::NOT_KILLED);
+    thd->binlog_query(THD::STMT_QUERY_TYPE, schema->query,
+                      schema->query_length, FALSE,
+  #ifdef NDB_THD_BINLOG_QUERY_HAS_DIRECT
+                      TRUE,
+  #endif
+                      schema->name[0] == 0 || thd->db[0] == 0,
+                      errcode);
+    thd->server_id= thd_server_id_save;
+    thd->db= thd_db_save;
+  }
+
+
+
+  /*
+    acknowledge handling of schema operation
+  */
+  static int
+  ndbcluster_update_slock(THD *thd,
+                          const char *db,
+                          const char *table_name,
+                          uint32 table_id,
+                          uint32 table_version)
+  {
+    DBUG_ENTER("ndbcluster_update_slock");
+    if (!ndb_schema_share)
+    {
+      DBUG_RETURN(0);
+    }
+
+    const NdbError *ndb_error= 0;
+    uint32 node_id= g_ndb_cluster_connection->node_id();
+    Ndb *ndb= check_ndb_in_thd(thd);
+    char save_db[FN_HEADLEN];
+    strcpy(save_db, ndb->getDatabaseName());
+
+    char tmp_buf[FN_REFLEN];
+    NDBDICT *dict= ndb->getDictionary();
+    ndb->setDatabaseName(NDB_REP_DB);
+    Ndb_table_guard ndbtab_g(dict, NDB_SCHEMA_TABLE);
+    const NDBTAB *ndbtab= ndbtab_g.get_table();
+    NdbTransaction *trans= 0;
+    int retries= 100;
+    int retry_sleep= 30; /* 30 milliseconds, transaction */
+    const NDBCOL *col[SCHEMA_SIZE];
+    unsigned sz[SCHEMA_SIZE];
+
+    MY_BITMAP slock;
+    uint32 bitbuf[SCHEMA_SLOCK_SIZE/4];
+    bitmap_init(&slock, bitbuf, sizeof(bitbuf)*8, false);
+
+    if (ndbtab == 0)
+    {
+      if (dict->getNdbError().code != 4009)
+        abort();
+      DBUG_RETURN(0);
+    }
+
+    {
+      uint i;
+      for (i= 0; i < SCHEMA_SIZE; i++)
+      {
+        col[i]= ndbtab->getColumn(i);
+        if (i != SCHEMA_QUERY_I)
+        {
+          sz[i]= col[i]->getLength();
+          DBUG_ASSERT(sz[i] <= sizeof(tmp_buf));
+        }
+      }
+    }
+
+    while (1)
+    {
+      if ((trans= ndb->startTransaction()) == 0)
+        goto err;
+      {
+        NdbOperation *op= 0;
+        int r= 0;
+
+        /* read the bitmap exlusive */
+        r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+        DBUG_ASSERT(r == 0);
+        r|= op->readTupleExclusive();
+        DBUG_ASSERT(r == 0);
+
+        /* db */
+        ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
+        r|= op->equal(SCHEMA_DB_I, tmp_buf);
+        DBUG_ASSERT(r == 0);
+        /* name */
+        ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
+                         (int)strlen(table_name));
+        r|= op->equal(SCHEMA_NAME_I, tmp_buf);
+        DBUG_ASSERT(r == 0);
+        /* slock */
+        r|= op->getValue(SCHEMA_SLOCK_I, (char*)slock.bitmap) == 0;
+        DBUG_ASSERT(r == 0);
+      }
+      if (trans->execute(NdbTransaction::NoCommit))
+        goto err;
+
+      if (opt_ndb_extra_logging > 19)
+      {
+        uint32 copy[SCHEMA_SLOCK_SIZE/4];
+        memcpy(copy, bitbuf, sizeof(copy));
+        bitmap_clear_bit(&slock, node_id);
+        sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x",
+                              db, table_name,
+                              table_id, table_version,
+                              copy[0], copy[1],
+                              slock.bitmap[0],
+                              slock.bitmap[1]);
+      }
+      else
+      {
+        bitmap_clear_bit(&slock, node_id);
+      }
+
+      {
+        NdbOperation *op= 0;
+        int r= 0;
+
+        /* now update the tuple */
+        r|= (op= trans->getNdbOperation(ndbtab)) == 0;
+        DBUG_ASSERT(r == 0);
+        r|= op->updateTuple();
+        DBUG_ASSERT(r == 0);
+
+        /* db */
+        ndb_pack_varchar(col[SCHEMA_DB_I], tmp_buf, db, (int)strlen(db));
+        r|= op->equal(SCHEMA_DB_I, tmp_buf);
+        DBUG_ASSERT(r == 0);
+        /* name */
+        ndb_pack_varchar(col[SCHEMA_NAME_I], tmp_buf, table_name,
+                         (int)strlen(table_name));
+        r|= op->equal(SCHEMA_NAME_I, tmp_buf);
+        DBUG_ASSERT(r == 0);
+        /* slock */
+        r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap);
+        DBUG_ASSERT(r == 0);
+        /* node_id */
+        r|= op->setValue(SCHEMA_NODE_ID_I, node_id);
+        DBUG_ASSERT(r == 0);
+        /* type */
+        r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK);
+        DBUG_ASSERT(r == 0);
+      }
+      if (trans->execute(NdbTransaction::Commit,
+                         NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0)
+      {
+        DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'",
+                            node_id, db, table_name));
+        dict->forceGCPWait(1);
+        break;
+      }
+    err:
+      const NdbError *this_error= trans ?
+        &trans->getNdbError() : &ndb->getNdbError();
+      if (this_error->status == NdbError::TemporaryError && !thd->killed)
+      {
+        if (retries--)
+        {
+          if (trans)
+            ndb->closeTransaction(trans);
+          do_retry_sleep(retry_sleep);
+          continue; // retry
+        }
+      }
+      ndb_error= this_error;
+      break;
+    }
+
+    if (ndb_error)
+    {
+      char buf[1024];
+      my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'",
+                  db, table_name);
+      push_warning_printf(thd, Sql_condition::WARN_LEVEL_WARN,
+                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                          ndb_error->code, ndb_error->message, buf);
+    }
+    if (trans)
+      ndb->closeTransaction(trans);
+    ndb->setDatabaseName(save_db);
+    DBUG_RETURN(0);
+  }
+
+
 static int
-ndb_binlog_thread_handle_schema_event(THD *thd, Ndb *s_ndb,
-                                      NdbEventOperation *pOp,
-                                      List<Cluster_schema> 
-                                      *post_epoch_log_list,
-                                      List<Cluster_schema> 
-                                      *post_epoch_unlock_list,
-                                      MEM_ROOT *mem_root)
+handle_schema_event(THD *thd, Ndb *s_ndb,
+                    NdbEventOperation *pOp,
+                    List<Cluster_schema> *post_epoch_log_list,
+                    List<Cluster_schema> *post_epoch_unlock_list,
+                    MEM_ROOT *mem_root)
 {
-  DBUG_ENTER("ndb_binlog_thread_handle_schema_event");
+  DBUG_ENTER("handle_schema_event");
   Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
   NDB_SHARE *tmp_share= event_data->share;
   if (tmp_share && ndb_schema_share == tmp_share)
@@ -2811,20 +2930,17 @@ ndb_binlog_thread_handle_schema_event(TH
   the epoch is complete
 */
 static void
-ndb_binlog_thread_handle_schema_event_post_epoch(THD *thd,
-                                                 List<Cluster_schema>
-                                                 *post_epoch_log_list,
-                                                 List<Cluster_schema>
-                                                 *post_epoch_unlock_list)
+handle_schema_log_post_epoch(THD *thd,
+                             List<Cluster_schema> *log_list)
 {
-  if (post_epoch_log_list->elements == 0)
-    return;
-  DBUG_ENTER("ndb_binlog_thread_handle_schema_event_post_epoch");
-  Cluster_schema *schema;
+  DBUG_ENTER("handle_schema_log_post_epoch");
+
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   Ndb *ndb= thd_ndb->ndb;
   NDBDICT *dict= ndb->getDictionary();
-  while ((schema= post_epoch_log_list->pop()))
+
+  Cluster_schema *schema;
+  while ((schema= log_list->pop()))
   {
     Thd_ndb_options_guard thd_ndb_options(thd_ndb);
     DBUG_PRINT("info",
@@ -3181,7 +3297,18 @@ ndb_binlog_thread_handle_schema_event_po
     if (ndb_binlog_running && log_query)
       ndb_binlog_query(thd, schema);
   }
-  while ((schema= post_epoch_unlock_list->pop()))
+  DBUG_VOID_RETURN;
+}
+
+
+static void
+handle_schema_unlock_post_epoch(THD *thd,
+                                List<Cluster_schema> *unlock_list)
+{
+  DBUG_ENTER("handle_schema_unlock_post_epoch");
+
+  Cluster_schema *schema;
+  while ((schema= unlock_list->pop()))
   {
     ndbcluster_update_slock(thd, schema->db, schema->name,
                             schema->id, schema->version);
@@ -3189,9 +3316,49 @@ ndb_binlog_thread_handle_schema_event_po
   DBUG_VOID_RETURN;
 }
 
-/*
-  Timer class for doing performance measurements
-*/
+  THD* m_thd;
+  MEM_ROOT* m_mem_root;
+
+  List<Cluster_schema> m_post_epoch_log_list;
+  List<Cluster_schema> m_post_epoch_unlock_list;
+
+public:
+  Ndb_schema_event_handler(); // Not implemented
+  Ndb_schema_event_handler(const Ndb_schema_event_handler&); // Not implemented
+
+  Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root):
+    m_thd(thd), m_mem_root(mem_root)
+  {
+  }
+
+  ~Ndb_schema_event_handler()
+  {
+    // There should be no work left todo...
+    DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
+    DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+  }
+
+  void handle_event(Ndb* s_ndb, NdbEventOperation *pOp)
+  {
+    handle_schema_event(m_thd, s_ndb, pOp,
+                        &m_post_epoch_log_list,
+                        &m_post_epoch_unlock_list,
+                        m_mem_root);
+  }
+
+  void post_epoch()
+  {
+    if (m_post_epoch_log_list.elements > 0)
+    {
+      handle_schema_log_post_epoch(m_thd, &m_post_epoch_log_list);
+      // NOTE post_epoch_unlock_list may not be handled!
+      handle_schema_unlock_post_epoch(m_thd, &m_post_epoch_unlock_list);
+    }
+    // There should be no work left todo...
+    DBUG_ASSERT(m_post_epoch_log_list.elements == 0);
+    DBUG_ASSERT(m_post_epoch_unlock_list.elements == 0);
+  }
+};
 
 /*********************************************************************
   Internal helper functions for handeling of the cluster replication tables
@@ -5721,11 +5888,11 @@ static void ndb_unpack_record(TABLE *tab
   Handle error states on events from the storage nodes
 */
 static int
-ndb_binlog_thread_handle_error(NdbEventOperation *pOp)
+handle_error(NdbEventOperation *pOp)
 {
   Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
   NDB_SHARE *share= event_data->share;
-  DBUG_ENTER("ndb_binlog_thread_handle_error");
+  DBUG_ENTER("handle_error");
 
   int overrun= pOp->isOverrun();
   if (overrun)
@@ -5762,9 +5929,9 @@ ndb_binlog_thread_handle_error(NdbEventO
 }
 
 static int
-ndb_binlog_thread_handle_non_data_event(THD *thd,
-                                        NdbEventOperation *pOp,
-                                        ndb_binlog_index_row &row)
+handle_non_data_event(THD *thd,
+                      NdbEventOperation *pOp,
+                      ndb_binlog_index_row &row)
 {
   Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
   NDB_SHARE *share= event_data->share;
@@ -5897,11 +6064,11 @@ ndb_find_binlog_index_row(ndb_binlog_ind
 
 
 static int
-ndb_binlog_thread_handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
-                                    ndb_binlog_index_row **rows,
-                                    injector::transaction &trans,
-                                    unsigned &trans_row_count,
-                                    unsigned &trans_slave_row_count)
+handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
+                  ndb_binlog_index_row **rows,
+                  injector::transaction &trans,
+                  unsigned &trans_row_count,
+                  unsigned &trans_slave_row_count)
 {
   Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData();
   TABLE *table= event_data->shadow_table;
@@ -6243,25 +6410,6 @@ ndb_binlog_thread_handle_data_event(THD*
   return 0;
 }
 
-//#define RUN_NDB_BINLOG_TIMER
-#ifdef RUN_NDB_BINLOG_TIMER
-class Timer
-{
-public:
-  Timer() { start(); }
-  void start() { gettimeofday(&m_start, 0); }
-  void stop() { gettimeofday(&m_stop, 0); }
-  ulong elapsed_ms()
-  {
-    return (ulong)
-      (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
-       ((longlong) m_stop.tv_usec -
-        (longlong) m_start.tv_usec + 999) / 1000);
-  }
-private:
-  struct timeval m_start,m_stop;
-};
-#endif
 
 /****************************************************************
   Injector thread main loop
@@ -6409,12 +6557,6 @@ void updateInjectorStats(Ndb* schemaNdb,
     dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
 }
 
-enum Binlog_thread_state
-{
-  BCCC_running= 0,
-  BCCC_exit= 1,
-  BCCC_restart= 2
-};
 
 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
@@ -6428,7 +6570,8 @@ ndb_binlog_thread_func(void *arg)
   Thd_ndb *thd_ndb=0;
   injector *inj= injector::instance();
   uint incident_id= 0;
-  Binlog_thread_state do_ndbcluster_binlog_close_connection;
+
+  enum { BCCC_running, BCCC_exit, BCCC_restart } binlog_thread_state;
 
   /**
    * If we get error after having reported incident
@@ -6437,10 +6580,6 @@ ndb_binlog_thread_func(void *arg)
    */
   bool do_incident = true;
 
-#ifdef RUN_NDB_BINLOG_TIMER
-  Timer main_timer;
-#endif
-
   pthread_mutex_lock(&injector_mutex);
   /*
     Set up the Thread
@@ -6502,7 +6641,7 @@ ndb_binlog_thread_func(void *arg)
 
 restart_cluster_failure:
   int have_injector_mutex_lock= 0;
-  do_ndbcluster_binlog_close_connection= BCCC_exit;
+  binlog_thread_state= BCCC_exit;
 
   if (!(thd_ndb= Thd_ndb::seize(thd)))
   {
@@ -6633,7 +6772,7 @@ restart_cluster_failure:
           thread to get rid of any garbage on the ndb objects
         */
         have_injector_mutex_lock= 1;
-        do_ndbcluster_binlog_close_connection= BCCC_restart;
+        binlog_thread_state= BCCC_restart;
         goto err;
       }
       /* ndb not connected yet */
@@ -6739,30 +6878,25 @@ restart_cluster_failure:
     thd->db= db;
   }
   do_incident = true; // If we get disconnected again...do incident report
-  do_ndbcluster_binlog_close_connection= BCCC_running;
+  binlog_thread_state= BCCC_running;
   for ( ; !((ndbcluster_binlog_terminating ||
-             do_ndbcluster_binlog_close_connection) &&
+             binlog_thread_state) &&
             ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci()) &&
-          do_ndbcluster_binlog_close_connection != BCCC_restart; )
+          binlog_thread_state != BCCC_restart; )
   {
 #ifndef DBUG_OFF
-    if (do_ndbcluster_binlog_close_connection)
+    if (binlog_thread_state)
     {
-      DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection: %d, "
+      DBUG_PRINT("info", ("binlog_thread_state: %d, "
                           "ndb_latest_handled_binlog_epoch: %u/%u, "
                           "*get_latest_trans_gci(): %u/%u",
-                          do_ndbcluster_binlog_close_connection,
+                          binlog_thread_state,
                           (uint)(ndb_latest_handled_binlog_epoch >> 32),
                           (uint)(ndb_latest_handled_binlog_epoch),
                           (uint)(ndb_get_latest_trans_gci() >> 32),
                           (uint)(ndb_get_latest_trans_gci())));
     }
 #endif
-#ifdef RUN_NDB_BINLOG_TIMER
-    main_timer.stop();
-    sql_print_information("main_timer %ld ms",  main_timer.elapsed_ms());
-    main_timer.start();
-#endif
 
     /*
       now we don't want any events before next gci is complete
@@ -6795,7 +6929,7 @@ restart_cluster_failure:
     }
 
     if ((ndbcluster_binlog_terminating ||
-         do_ndbcluster_binlog_close_connection) &&
+         binlog_thread_state) &&
         (ndb_latest_handled_binlog_epoch >= ndb_get_latest_trans_gci() ||
          !ndb_binlog_running))
       break; /* Shutting down server */
@@ -6805,8 +6939,11 @@ restart_cluster_failure:
     MEM_ROOT *old_root= *root_ptr;
     MEM_ROOT mem_root;
     init_sql_alloc(&mem_root, 4096, 0);
-    List<Cluster_schema> post_epoch_log_list;
-    List<Cluster_schema> post_epoch_unlock_list;
+
+    // The Ndb_schema_event_handler does not necessarily need
+    // to use the same memroot(or vice versa)
+    Ndb_schema_event_handler schema_event_handler(thd, &mem_root);
+
     *root_ptr= &mem_root;
 
     if (unlikely(schema_res > 0))
@@ -6822,10 +6959,8 @@ restart_cluster_failure:
       {
         if (!pOp->hasError())
         {
-          ndb_binlog_thread_handle_schema_event(thd, s_ndb, pOp,
-                                                &post_epoch_log_list,
-                                                &post_epoch_unlock_list,
-                                                &mem_root);
+          schema_event_handler.handle_event(s_ndb, pOp);
+
           DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
                               s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
                               "<empty>"));
@@ -6834,10 +6969,10 @@ restart_cluster_failure:
                               "<empty>"));
           if (i_ndb->getEventOperation() == NULL &&
               s_ndb->getEventOperation() == NULL &&
-              do_ndbcluster_binlog_close_connection == BCCC_running)
+              binlog_thread_state == BCCC_running)
           {
-            DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
-            do_ndbcluster_binlog_close_connection= BCCC_restart;
+            DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
+            binlog_thread_state= BCCC_restart;
             if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
             {
               sql_print_error("NDB Binlog: latest transaction in epoch %u/%u not in binlog "
@@ -6875,15 +7010,15 @@ restart_cluster_failure:
               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
           {
             ndb_binlog_index_row row;
-            ndb_binlog_thread_handle_non_data_event(thd, pOp, row);
+            handle_non_data_event(thd, pOp, row);
           }
         }
         if (i_ndb->getEventOperation() == NULL &&
             s_ndb->getEventOperation() == NULL &&
-            do_ndbcluster_binlog_close_connection == BCCC_running)
+            binlog_thread_state == BCCC_running)
         {
-          DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
-          do_ndbcluster_binlog_close_connection= BCCC_restart;
+          DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
+          binlog_thread_state= BCCC_restart;
         }
       }
       updateInjectorStats(s_ndb, i_ndb);
@@ -6958,11 +7093,6 @@ restart_cluster_failure:
       while (pOp != NULL)
       {
         rows= &_row;
-#ifdef RUN_NDB_BINLOG_TIMER
-        Timer gci_timer, write_timer;
-        int event_count= 0;
-        gci_timer.start();
-#endif
         gci= pOp->getGCI();
         DBUG_PRINT("info", ("Handling gci: %u/%u",
                             (uint)(gci >> 32),
@@ -7090,16 +7220,11 @@ restart_cluster_failure:
             sql_print_error("NDB: Could not get apply status share");
           }
         }
-#ifdef RUN_NDB_BINLOG_TIMER
-        write_timer.start();
-#endif
+
         do
         {
-#ifdef RUN_NDB_BINLOG_TIMER
-          event_count++;
-#endif
           if (pOp->hasError() &&
-              ndb_binlog_thread_handle_error(pOp) < 0)
+              handle_error(pOp) < 0)
             goto err;
 
 #ifndef DBUG_OFF
@@ -7137,11 +7262,11 @@ restart_cluster_failure:
 #endif
           if ((unsigned) pOp->getEventType() <
               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
-            ndb_binlog_thread_handle_data_event(thd, i_ndb, pOp, &rows, trans,
-                                                trans_row_count, trans_slave_row_count);
+            handle_data_event(thd, i_ndb, pOp, &rows, trans,
+                              trans_row_count, trans_slave_row_count);
           else
           {
-            ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
+            handle_non_data_event(thd, pOp, *rows);
             DBUG_PRINT("info", ("s_ndb first: %s", s_ndb->getEventOperation() ?
                                 s_ndb->getEventOperation()->getEvent()->getTable()->getName() :
                                 "<empty>"));
@@ -7150,10 +7275,10 @@ restart_cluster_failure:
                                 "<empty>"));
             if (i_ndb->getEventOperation() == NULL &&
                 s_ndb->getEventOperation() == NULL &&
-                do_ndbcluster_binlog_close_connection == BCCC_running)
+                binlog_thread_state == BCCC_running)
             {
-              DBUG_PRINT("info", ("do_ndbcluster_binlog_close_connection= BCCC_restart"));
-              do_ndbcluster_binlog_close_connection= BCCC_restart;
+              DBUG_PRINT("info", ("binlog_thread_state= BCCC_restart"));
+              binlog_thread_state= BCCC_restart;
               if (ndb_latest_received_binlog_epoch < ndb_get_latest_trans_gci() && ndb_binlog_running)
               {
                 sql_print_error("NDB Binlog: latest transaction in epoch %lu not in binlog "
@@ -7173,9 +7298,6 @@ restart_cluster_failure:
           note! pOp is not referring to an event in the next epoch
           or is == 0
         */
-#ifdef RUN_NDB_BINLOG_TIMER
-        write_timer.stop();
-#endif
 
         while (trans.good())
         {
@@ -7250,18 +7372,8 @@ restart_cluster_failure:
           break;
         }
         ndb_latest_handled_binlog_epoch= gci;
-
-#ifdef RUN_NDB_BINLOG_TIMER
-        gci_timer.stop();
-        sql_print_information("gci %ld event_count %d write time "
-                              "%ld(%d e/s), total time %ld(%d e/s)",
-                              (ulong)gci, event_count,
-                              write_timer.elapsed_ms(),
-                              (1000*event_count) / write_timer.elapsed_ms(),
-                              gci_timer.elapsed_ms(),
-                              (1000*event_count) / gci_timer.elapsed_ms());
-#endif
       }
+
       if(!i_ndb->isConsistent(gci))
       {
         char errmsg[64];
@@ -7277,15 +7389,16 @@ restart_cluster_failure:
       }
     }
 
-    ndb_binlog_thread_handle_schema_event_post_epoch(thd,
-                                                     &post_epoch_log_list,
-                                                     &post_epoch_unlock_list);
+    // Notify the schema event handler about post_epoch so it may finish
+    // any outstanding business
+    schema_event_handler.post_epoch();
+
     free_root(&mem_root, MYF(0));
     *root_ptr= old_root;
     ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
   }
  err:
-  if (do_ndbcluster_binlog_close_connection != BCCC_restart)
+  if (binlog_thread_state != BCCC_restart)
   {
     sql_print_information("Stopping Cluster Binlog");
     DBUG_PRINT("info",("Shutting down cluster binlog thread"));
@@ -7411,7 +7524,7 @@ restart_cluster_failure:
     }
   }
 
-  if (do_ndbcluster_binlog_close_connection == BCCC_restart)
+  if (binlog_thread_state == BCCC_restart)
   {
     pthread_mutex_lock(&injector_mutex);
     goto restart_cluster_failure;
@@ -7471,88 +7584,6 @@ ndbcluster_show_status_binlog(THD* thd, 
   DBUG_RETURN(FALSE);
 }
 
-/*
-   AnyValue carries ServerId or Reserved codes
-   Bits from opt_server_id_bits to 30 may carry other data
-   so we ignore them when reading/setting AnyValue.
- 
-   332        21        10        0
-   10987654321098765432109876543210
-   roooooooooooooooooooooooosssssss
- 
-   r = Reserved bit indicates whether
-   bits 0-7+ have ServerId (0) or
-   some special reserved code (1).
-   o = Optional bits, depending on value
-       of server-id-bits will be 
-       serverid bits or user-specific 
-       data
-   s = Serverid bits or reserved codes
-       At least 7 bits will be available
-       for serverid or reserved codes
-  
-*/
-
-#define NDB_ANYVALUE_RESERVED_BIT   0x80000000
-#define NDB_ANYVALUE_RESERVED_MASK  0x8000007f
-
-#define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f
-
-#ifndef DBUG_OFF
-void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue)
-{
-  /* 
-     Set userData part of AnyValue (if there is one) to
-     all 1s to test that it is ignored
-  */
-  const Uint32 userDataMask = ~(opt_server_id_mask | 
-                                NDB_ANYVALUE_RESERVED_BIT);
-
-  anyValue |= userDataMask;
-}
-#endif
-
-bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue)
-{
-  return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0);
-}
-
-bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue)
-{
-  return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) ==
-          NDB_ANYVALUE_NOLOGGING_CODE);
-}
-
-void ndbcluster_anyvalue_set_nologging(Uint32& anyValue)
-{
-  anyValue |= NDB_ANYVALUE_NOLOGGING_CODE;
-}
-
-void ndbcluster_anyvalue_set_normal(Uint32& anyValue)
-{
-  /* Clear reserved bit and serverid bits */
-  anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT);
-  anyValue &= ~(opt_server_id_mask);
-}
-
-bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)
-{
-  return ((serverId & ~opt_server_id_mask) == 0);
-}
-
-Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue)
-{
-  assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
-
-  return (anyValue & opt_server_id_mask);
-}
-
-void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId)
-{
-  assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
-  anyValue &= ~(opt_server_id_mask);
-  anyValue |= (serverId & opt_server_id_mask); 
-}
 
 #ifdef NDB_WITHOUT_SERVER_ID_BITS
 
@@ -7561,121 +7592,5 @@ ulong opt_server_id_mask = ~0;
 
 #endif
 
-Ndb_binlog_extra_row_info::
-Ndb_binlog_extra_row_info()
-{
-  flags = 0;
-  transactionId = InvalidTransactionId;
-  /* Prepare buffer with extra row info buffer bytes */
-  buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = 0;
-  buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] = ERIF_NDB;
-}
-
-void
-Ndb_binlog_extra_row_info::
-setFlags(Uint16 _flags)
-{
-  flags = _flags;
-}
-
-void
-Ndb_binlog_extra_row_info::
-setTransactionId(Uint64 _transactionId)
-{
-  assert(_transactionId != InvalidTransactionId);
-  transactionId = _transactionId;
-};
-
-int
-Ndb_binlog_extra_row_info::
-loadFromBuffer(const uchar* extra_row_info)
-{
-  assert(extra_row_info);
-
-  Uint8 length = extra_row_info[ EXTRA_ROW_INFO_LEN_OFFSET ];
-  assert(length >= EXTRA_ROW_INFO_HDR_BYTES);
-  Uint8 payload_length = length - EXTRA_ROW_INFO_HDR_BYTES;
-  Uint8 format = extra_row_info[ EXTRA_ROW_INFO_FORMAT_OFFSET ];
-
-  if (likely(format == ERIF_NDB))
-  {
-    if (likely(payload_length >= FLAGS_SIZE))
-    {
-      const uchar* data = &extra_row_info[ EXTRA_ROW_INFO_HDR_BYTES ];
-      Uint8 nextPos = 0;
-
-      /* Have flags at least */
-      Uint16 netFlags;
-      memcpy(&netFlags, &data[ nextPos ], FLAGS_SIZE);
-      nextPos += FLAGS_SIZE;
-      flags = uint2korr((const char*) &netFlags);
-
-      if (flags & NDB_ERIF_TRANSID)
-      {
-        if (likely((nextPos + TRANSID_SIZE) <= payload_length))
-        {
-          /*
-            Correct length, retrieve transaction id, converting from
-            little endian if necessary.
-          */
-          Uint64 netTransId;
-          memcpy(&netTransId,
-                 &data[ nextPos ],
-                 TRANSID_SIZE);
-          nextPos += TRANSID_SIZE;
-          transactionId = uint8korr((const char*) &netTransId);
-        }
-        else
-        {
-          /*
-             Error - supposed to have transaction id, but
-             buffer too short
-          */
-          return -1;
-        }
-      }
-    }
-  }
-
-  /* We currently ignore other formats of extra binlog info, and
-   * different lengths.
-   */
-
-  return 0;
-}
-
-uchar*
-Ndb_binlog_extra_row_info::generateBuffer()
-{
-  /*
-    Here we write out the buffer in network format,
-    based on the current member settings.
-  */
-  Uint8 nextPos = EXTRA_ROW_INFO_HDR_BYTES;
-
-  if (flags)
-  {
-    /* Write current flags into buff */
-    Uint16 netFlags = uint2korr((const char*) &flags);
-    memcpy(&buff[ nextPos ], &netFlags, FLAGS_SIZE);
-    nextPos += FLAGS_SIZE;
-
-    if (flags & NDB_ERIF_TRANSID)
-    {
-      Uint64 netTransactionId = uint8korr((const char*) &transactionId);
-      memcpy(&buff[ nextPos ], &netTransactionId, TRANSID_SIZE);
-      nextPos += TRANSID_SIZE;
-    }
-
-    assert( nextPos <= MaxLen );
-    /* Set length */
-    assert( buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] == ERIF_NDB );
-    buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = nextPos;
-
-    return buff;
-  }
-  return 0;
-}
-
 // #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
 #endif

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-10-27 12:33:39 +0000
@@ -212,42 +212,6 @@ ndbcluster_show_status_binlog(THD* thd, 
 */
 int cmp_frm(const NDBTAB *ndbtab, const void *pack_data,
             size_t pack_length);
-int ndbcluster_find_all_files(THD *thd);
-
-char *ndb_pack_varchar(const NDBCOL *col, char *buf,
-                       const char *str, int sz);
-
-NDB_SHARE *ndbcluster_get_share(const char *key,
-                                TABLE *table,
-                                bool create_if_not_exists,
-                                bool have_lock);
-NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share);
-void ndbcluster_free_share(NDB_SHARE **share, bool have_lock);
-void ndbcluster_real_free_share(NDB_SHARE **share);
-int handle_trailing_share(THD *thd, NDB_SHARE *share);
-int ndbcluster_prepare_rename_share(NDB_SHARE *share, const char *new_key);
-int ndbcluster_rename_share(THD *thd, NDB_SHARE *share);
-int ndbcluster_undo_rename_share(THD *thd, NDB_SHARE *share);
-void ndbcluster_mark_share_dropped(NDB_SHARE*);
-inline NDB_SHARE *get_share(const char *key,
-                            TABLE *table,
-                            bool create_if_not_exists= TRUE,
-                            bool have_lock= FALSE)
-{
-  return ndbcluster_get_share(key, table, create_if_not_exists, have_lock);
-}
-
-inline NDB_SHARE *get_share(NDB_SHARE *share)
-{
-  return ndbcluster_get_share(share);
-}
-
-inline void free_share(NDB_SHARE **share, bool have_lock= FALSE)
-{
-  ndbcluster_free_share(share, have_lock);
-}
-
-void set_binlog_flags(NDB_SHARE *share);
 
 /*
   Helper functions
@@ -257,60 +221,3 @@ ndbcluster_check_if_local_table(const ch
 bool
 ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname);
 
-bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue);
-bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue);
-void ndbcluster_anyvalue_set_nologging(Uint32& anyValue);
-bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId);
-void ndbcluster_anyvalue_set_normal(Uint32& anyValue);
-Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue);
-void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId);
-
-#ifndef DBUG_OFF
-void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue);
-#endif
-
-/*
-   Helper for reading/writing Binlog extra row info
-   in Ndb format.
-   It contains an internal buffer, which can be passed
-   in the thd variable when writing binlog entries if
-   the object stays in scope around the write.
-*/
-class Ndb_binlog_extra_row_info
-{
-public:
-  static const Uint32 FLAGS_SIZE = sizeof(Uint16);
-  static const Uint32 TRANSID_SIZE = sizeof(Uint64);
-  static const Uint32 MaxLen =
-    EXTRA_ROW_INFO_HDR_BYTES +
-    FLAGS_SIZE +
-    TRANSID_SIZE;
-
-  static const Uint64 InvalidTransactionId = ~Uint64(0);
-
-  enum Flags
-  {
-    NDB_ERIF_TRANSID = 0x1
-  };
-
-  Ndb_binlog_extra_row_info();
-
-  int loadFromBuffer(const uchar* extra_row_info_ptr);
-
-  Uint16 getFlags() const
-  {
-    return flags;
-  }
-  void setFlags(Uint16 _flags);
-  Uint64 getTransactionId() const
-  { return transactionId; };
-  void setTransactionId(Uint64 _transactionId);
-  uchar* getBuffPtr()
-  { return buff; };
-  uchar* generateBuffer();
-private:
-  uchar buff[MaxLen];
-  Uint16 flags;
-  Uint64 transactionId;
-};
-

=== modified file 'sql/ha_ndbcluster_connection.cc'
--- a/sql/ha_ndbcluster_connection.cc	2011-10-27 12:19:57 +0000
+++ b/sql/ha_ndbcluster_connection.cc	2011-10-27 12:33:39 +0000
@@ -341,7 +341,8 @@ static ST_FIELD_INFO ndb_transid_mysql_c
 
 static
 int
-ndb_transid_mysql_connection_map_fill_table(THD* thd, TABLE_LIST* tables, Item* cond)
+ndb_transid_mysql_connection_map_fill_table(THD* thd, TABLE_LIST* tables,
+                                            Item*)
 {
   DBUG_ENTER("ndb_transid_mysql_connection_map_fill_table");
 

=== added file 'sql/ndb_anyvalue.cc'
--- a/sql/ndb_anyvalue.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_anyvalue.cc	2011-10-27 07:42:25 +0000
@@ -0,0 +1,105 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_anyvalue.h"
+
+/*
+   AnyValue carries ServerId or Reserved codes
+   Bits from opt_server_id_bits to 30 may carry other data
+   so we ignore them when reading/setting AnyValue.
+
+   332        21        10        0
+   10987654321098765432109876543210
+   roooooooooooooooooooooooosssssss
+
+   r = Reserved bit indicates whether
+   bits 0-7+ have ServerId (0) or
+   some special reserved code (1).
+   o = Optional bits, depending on value
+       of server-id-bits will be
+       serverid bits or user-specific
+       data
+   s = Serverid bits or reserved codes
+       At least 7 bits will be available
+       for serverid or reserved codes
+
+*/
+
+#include <my_global.h>
+
+extern ulong opt_server_id_mask;
+
+#define NDB_ANYVALUE_RESERVED_BIT   0x80000000
+#define NDB_ANYVALUE_RESERVED_MASK  0x8000007f
+
+#define NDB_ANYVALUE_NOLOGGING_CODE 0x8000007f
+
+#ifndef DBUG_OFF
+void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue)
+{
+  /*
+     Set userData part of AnyValue (if there is one) to
+     all 1s to test that it is ignored
+  */
+  const Uint32 userDataMask = ~(opt_server_id_mask |
+                                NDB_ANYVALUE_RESERVED_BIT);
+
+  anyValue |= userDataMask;
+}
+#endif
+
+bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue)
+{
+  return ((anyValue & NDB_ANYVALUE_RESERVED_BIT) != 0);
+}
+
+bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue)
+{
+  return ((anyValue & NDB_ANYVALUE_RESERVED_MASK) ==
+          NDB_ANYVALUE_NOLOGGING_CODE);
+}
+
+void ndbcluster_anyvalue_set_nologging(Uint32& anyValue)
+{
+  anyValue |= NDB_ANYVALUE_NOLOGGING_CODE;
+}
+
+void ndbcluster_anyvalue_set_normal(Uint32& anyValue)
+{
+  /* Clear reserved bit and serverid bits */
+  anyValue &= ~(NDB_ANYVALUE_RESERVED_BIT);
+  anyValue &= ~(opt_server_id_mask);
+}
+
+bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId)
+{
+  return ((serverId & ~opt_server_id_mask) == 0);
+}
+
+Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue)
+{
+  assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
+
+  return (anyValue & opt_server_id_mask);
+}
+
+void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId)
+{
+  assert(! (anyValue & NDB_ANYVALUE_RESERVED_BIT) );
+  anyValue &= ~(opt_server_id_mask);
+  anyValue |= (serverId & opt_server_id_mask);
+}

=== added file 'sql/ndb_anyvalue.h'
--- a/sql/ndb_anyvalue.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_anyvalue.h	2011-10-27 07:42:25 +0000
@@ -0,0 +1,35 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_ANYVALUE_H
+#define NDB_ANYVALUE_H
+
+#include <ndb_types.h>
+
+bool ndbcluster_anyvalue_is_reserved(Uint32 anyValue);
+bool ndbcluster_anyvalue_is_nologging(Uint32 anyValue);
+void ndbcluster_anyvalue_set_nologging(Uint32& anyValue);
+bool ndbcluster_anyvalue_is_serverid_in_range(Uint32 serverId);
+void ndbcluster_anyvalue_set_normal(Uint32& anyValue);
+Uint32 ndbcluster_anyvalue_get_serverid(Uint32 anyValue);
+void ndbcluster_anyvalue_set_serverid(Uint32& anyValue, Uint32 serverId);
+
+#ifndef DBUG_OFF
+void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue);
+#endif
+
+#endif

=== added file 'sql/ndb_binlog_extra_row_info.cc'
--- a/sql/ndb_binlog_extra_row_info.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_binlog_extra_row_info.cc	2011-10-27 09:34:06 +0000
@@ -0,0 +1,136 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_binlog_extra_row_info.h"
+#include <string.h> // memcpy
+
+
+Ndb_binlog_extra_row_info::
+Ndb_binlog_extra_row_info()
+{
+  flags = 0;
+  transactionId = InvalidTransactionId;
+  /* Prepare buffer with extra row info buffer bytes */
+  buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = 0;
+  buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] = ERIF_NDB;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setFlags(Uint16 _flags)
+{
+  flags = _flags;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setTransactionId(Uint64 _transactionId)
+{
+  assert(_transactionId != InvalidTransactionId);
+  transactionId = _transactionId;
+};
+
+int
+Ndb_binlog_extra_row_info::
+loadFromBuffer(const uchar* extra_row_info)
+{
+  assert(extra_row_info);
+
+  Uint8 length = extra_row_info[ EXTRA_ROW_INFO_LEN_OFFSET ];
+  assert(length >= EXTRA_ROW_INFO_HDR_BYTES);
+  Uint8 payload_length = length - EXTRA_ROW_INFO_HDR_BYTES;
+  Uint8 format = extra_row_info[ EXTRA_ROW_INFO_FORMAT_OFFSET ];
+
+  if (likely(format == ERIF_NDB))
+  {
+    if (likely(payload_length >= FLAGS_SIZE))
+    {
+      const uchar* data = &extra_row_info[ EXTRA_ROW_INFO_HDR_BYTES ];
+      Uint8 nextPos = 0;
+
+      /* Have flags at least */
+      Uint16 netFlags;
+      memcpy(&netFlags, &data[ nextPos ], FLAGS_SIZE);
+      nextPos += FLAGS_SIZE;
+      flags = uint2korr((const char*) &netFlags);
+
+      if (flags & NDB_ERIF_TRANSID)
+      {
+        if (likely((nextPos + TRANSID_SIZE) <= payload_length))
+        {
+          /*
+            Correct length, retrieve transaction id, converting from
+            little endian if necessary.
+          */
+          Uint64 netTransId;
+          memcpy(&netTransId,
+                 &data[ nextPos ],
+                 TRANSID_SIZE);
+          nextPos += TRANSID_SIZE;
+          transactionId = uint8korr((const char*) &netTransId);
+        }
+        else
+        {
+          /*
+             Error - supposed to have transaction id, but
+             buffer too short
+          */
+          return -1;
+        }
+      }
+    }
+  }
+
+  /* We currently ignore other formats of extra binlog info, and
+   * different lengths.
+   */
+
+  return 0;
+}
+
+uchar*
+Ndb_binlog_extra_row_info::generateBuffer()
+{
+  /*
+    Here we write out the buffer in network format,
+    based on the current member settings.
+  */
+  Uint8 nextPos = EXTRA_ROW_INFO_HDR_BYTES;
+
+  if (flags)
+  {
+    /* Write current flags into buff */
+    Uint16 netFlags = uint2korr((const char*) &flags);
+    memcpy(&buff[ nextPos ], &netFlags, FLAGS_SIZE);
+    nextPos += FLAGS_SIZE;
+
+    if (flags & NDB_ERIF_TRANSID)
+    {
+      Uint64 netTransactionId = uint8korr((const char*) &transactionId);
+      memcpy(&buff[ nextPos ], &netTransactionId, TRANSID_SIZE);
+      nextPos += TRANSID_SIZE;
+    }
+
+    assert( nextPos <= MaxLen );
+    /* Set length */
+    assert( buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] == ERIF_NDB );
+    buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = nextPos;
+
+    return buff;
+  }
+  return 0;
+}

=== added file 'sql/ndb_binlog_extra_row_info.h'
--- a/sql/ndb_binlog_extra_row_info.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_binlog_extra_row_info.h	2011-10-27 09:34:06 +0000
@@ -0,0 +1,71 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_BINLOG_EXTRA_ROW_INFO_H
+#define NDB_BINLOG_EXTRA_ROW_INFO_H
+
+#include <my_global.h>
+#include <ndb_types.h>
+#include <rpl_constants.h>
+
+/*
+   Helper for reading/writing Binlog extra row info
+   in Ndb format.
+   It contains an internal buffer, which can be passed
+   in the thd variable when writing binlog entries if
+   the object stays in scope around the write.
+*/
+class Ndb_binlog_extra_row_info
+{
+public:
+  static const Uint32 FLAGS_SIZE = sizeof(Uint16);
+  static const Uint32 TRANSID_SIZE = sizeof(Uint64);
+  static const Uint32 MaxLen =
+    EXTRA_ROW_INFO_HDR_BYTES +
+    FLAGS_SIZE +
+    TRANSID_SIZE;
+
+  static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+  enum Flags
+  {
+    NDB_ERIF_TRANSID = 0x1
+  };
+
+  Ndb_binlog_extra_row_info();
+
+  int loadFromBuffer(const uchar* extra_row_info_ptr);
+
+  Uint16 getFlags() const
+  {
+    return flags;
+  }
+  void setFlags(Uint16 _flags);
+  Uint64 getTransactionId() const
+  { return transactionId; };
+  void setTransactionId(Uint64 _transactionId);
+  uchar* getBuffPtr()
+  { return buff; };
+  uchar* generateBuffer();
+private:
+  uchar buff[MaxLen];
+  Uint16 flags;
+  Uint64 transactionId;
+};
+
+
+#endif

=== added file 'sql/ndb_ndbapi_util.cc'
--- a/sql/ndb_ndbapi_util.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_ndbapi_util.cc	2011-10-27 08:27:44 +0000
@@ -0,0 +1,131 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_ndbapi_util.h"
+
+#include <string.h> // memcpy
+
+/*
+  helper function to pack a ndb varchar
+*/
+char *ndb_pack_varchar(const NdbDictionary::Column *col, char *buf,
+                       const char *str, int sz)
+{
+  switch (col->getArrayType())
+  {
+    case NdbDictionary::Column::ArrayTypeFixed:
+      memcpy(buf, str, sz);
+      break;
+    case NdbDictionary::Column::ArrayTypeShortVar:
+      *(uchar*)buf= (uchar)sz;
+      memcpy(buf + 1, str, sz);
+      break;
+    case NdbDictionary::Column::ArrayTypeMediumVar:
+      int2store(buf, sz);
+      memcpy(buf + 2, str, sz);
+      break;
+  }
+  return buf;
+}
+
+
+#ifndef MYSQL_SERVER
+#define MYSQL_SERVER
+#endif
+
+#include <sql_class.h> // TABLE, Field etc.
+
+/*
+  This routine is shared by injector.  There is no common blobs buffer
+  so the buffer and length are passed by reference.  Injector also
+  passes a record pointer diff.
+ */
+int get_ndb_blobs_value(TABLE* table, NdbValue* value_array,
+                        uchar*& buffer, uint& buffer_size,
+                        my_ptrdiff_t ptrdiff)
+{
+  DBUG_ENTER("get_ndb_blobs_value");
+
+  // Field has no field number so cannot use TABLE blob_field
+  // Loop twice, first only counting total buffer size
+  for (int loop= 0; loop <= 1; loop++)
+  {
+    uint32 offset= 0;
+    for (uint i= 0; i < table->s->fields; i++)
+    {
+      Field *field= table->field[i];
+      NdbValue value= value_array[i];
+      if (! (field->flags & BLOB_FLAG))
+        continue;
+      if (value.blob == NULL)
+      {
+        DBUG_PRINT("info",("[%u] skipped", i));
+        continue;
+      }
+      Field_blob *field_blob= (Field_blob *)field;
+      NdbBlob *ndb_blob= value.blob;
+      int isNull;
+      if (ndb_blob->getNull(isNull) != 0)
+        DBUG_RETURN(-1);
+      if (isNull == 0) {
+        Uint64 len64= 0;
+        if (ndb_blob->getLength(len64) != 0)
+          DBUG_RETURN(-1);
+        // Align to Uint64
+        uint32 size= Uint32(len64);
+        if (size % 8 != 0)
+          size+= 8 - size % 8;
+        if (loop == 1)
+        {
+          uchar *buf= buffer + offset;
+          uint32 len= 0xffffffff;  // Max uint32
+          if (ndb_blob->readData(buf, len) != 0)
+            DBUG_RETURN(-1);
+          DBUG_PRINT("info", ("[%u] offset: %u  buf: 0x%lx  len=%u  [ptrdiff=%d]",
+                              i, offset, (long) buf, len, (int)ptrdiff));
+          DBUG_ASSERT(len == len64);
+          // Ugly hack assumes only ptr needs to be changed
+          field_blob->set_ptr_offset(ptrdiff, len, buf);
+        }
+        offset+= size;
+      }
+      else if (loop == 1) // undefined or null
+      {
+        // have to set length even in this case
+        uchar *buf= buffer + offset; // or maybe NULL
+        uint32 len= 0;
+        field_blob->set_ptr_offset(ptrdiff, len, buf);
+        DBUG_PRINT("info", ("[%u] isNull=%d", i, isNull));
+        }
+    }
+    if (loop == 0 && offset > buffer_size)
+    {
+      my_free(buffer);
+      buffer_size= 0;
+      DBUG_PRINT("info", ("allocate blobs buffer size %u", offset));
+      buffer= (uchar*) my_malloc(offset, MYF(MY_WME));
+      if (buffer == NULL)
+      {
+        sql_print_error("ha_ndbcluster::get_ndb_blobs_value: "
+                        "my_malloc(%u) failed", offset);
+        DBUG_RETURN(-1);
+      }
+      buffer_size= offset;
+    }
+  }
+  DBUG_RETURN(0);
+}

=== added file 'sql/ndb_ndbapi_util.h'
--- a/sql/ndb_ndbapi_util.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_ndbapi_util.h	2011-10-27 08:27:44 +0000
@@ -0,0 +1,41 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_NDBAPI_UTIL_H
+#define NDB_NDBAPI_UTIL_H
+
+#include <my_global.h>
+
+#include <ndbapi/NdbRecAttr.hpp>
+#include <ndbapi/NdbBlob.hpp>
+#include <ndbapi/NdbDictionary.hpp>
+
+union NdbValue
+{
+  const NdbRecAttr *rec;
+  NdbBlob *blob;
+  void *ptr;
+};
+
+int get_ndb_blobs_value(struct TABLE* table, NdbValue* value_array,
+                        uchar*& buffer, uint& buffer_size,
+                        my_ptrdiff_t ptrdiff);
+
+char *ndb_pack_varchar(const NdbDictionary::Column *col,
+                       char *buf, const char *str, int sz);
+
+#endif

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	2011-09-09 09:30:43 +0000
+++ b/sql/ndb_share.h	2011-10-27 09:10:08 +0000
@@ -212,4 +212,80 @@ set_ndb_share_state(NDB_SHARE *share, ND
   pthread_mutex_unlock(&share->mutex);
 }
 
+
+/* NDB_SHARE.flags */
+#define NSF_HIDDEN_PK   1u /* table has hidden primary key */
+#define NSF_BLOB_FLAG   2u /* table has blob attributes */
+#define NSF_NO_BINLOG   4u /* table should not be binlogged */
+#define NSF_BINLOG_FULL 8u /* table should be binlogged with full rows */
+#define NSF_BINLOG_USE_UPDATE 16u  /* table update should be binlogged using
+                                     update log event */
+inline void set_binlog_logging(NDB_SHARE *share)
+{
+  DBUG_PRINT("info", ("set_binlog_logging"));
+  share->flags&= ~NSF_NO_BINLOG;
+}
+inline void set_binlog_nologging(NDB_SHARE *share)
+{
+  DBUG_PRINT("info", ("set_binlog_nologging"));
+  share->flags|= NSF_NO_BINLOG;
+}
+inline my_bool get_binlog_nologging(NDB_SHARE *share)
+{ return (share->flags & NSF_NO_BINLOG) != 0; }
+inline void set_binlog_updated_only(NDB_SHARE *share)
+{
+  DBUG_PRINT("info", ("set_binlog_updated_only"));
+  share->flags&= ~NSF_BINLOG_FULL;
+}
+inline void set_binlog_full(NDB_SHARE *share)
+{
+  DBUG_PRINT("info", ("set_binlog_full"));
+  share->flags|= NSF_BINLOG_FULL;
+}
+inline my_bool get_binlog_full(NDB_SHARE *share)
+{ return (share->flags & NSF_BINLOG_FULL) != 0; }
+inline void set_binlog_use_write(NDB_SHARE *share)
+{
+  DBUG_PRINT("info", ("set_binlog_use_write"));
+  share->flags&= ~NSF_BINLOG_USE_UPDATE;
+}
+inline void set_binlog_use_update(NDB_SHARE *share)
+{
+  DBUG_PRINT("info", ("set_binlog_use_update"));
+  share->flags|= NSF_BINLOG_USE_UPDATE;
+}
+inline my_bool get_binlog_use_update(NDB_SHARE *share)
+{ return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
+
+
+NDB_SHARE *ndbcluster_get_share(const char *key,
+                                struct TABLE *table,
+                                bool create_if_not_exists,
+                                bool have_lock);
+NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share);
+void ndbcluster_free_share(NDB_SHARE **share, bool have_lock);
+void ndbcluster_real_free_share(NDB_SHARE **share);
+int handle_trailing_share(THD *thd, NDB_SHARE *share);
+int ndbcluster_prepare_rename_share(NDB_SHARE *share, const char *new_key);
+int ndbcluster_rename_share(THD *thd, NDB_SHARE *share);
+int ndbcluster_undo_rename_share(THD *thd, NDB_SHARE *share);
+void ndbcluster_mark_share_dropped(NDB_SHARE*);
+inline NDB_SHARE *get_share(const char *key,
+                            struct TABLE *table,
+                            bool create_if_not_exists= TRUE,
+                            bool have_lock= FALSE)
+{
+  return ndbcluster_get_share(key, table, create_if_not_exists, have_lock);
+}
+
+inline NDB_SHARE *get_share(NDB_SHARE *share)
+{
+  return ndbcluster_get_share(share);
+}
+
+inline void free_share(NDB_SHARE **share, bool have_lock= FALSE)
+{
+  ndbcluster_free_share(share, have_lock);
+}
+
 #endif

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-10-27 12:19:57 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-10-27 12:33:39 +0000
@@ -80,6 +80,9 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ndb_global_schema_lock.cc
   ../../sql/ndb_mi.cc
   ../../sql/ndb_conflict_trans.cc
+  ../../sql/ndb_anyvalue.cc
+  ../../sql/ndb_ndbapi_util.cc
+  ../../sql/ndb_binlog_extra_row_info.cc
 )
 
 # Include directories used when building ha_ndbcluster

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-trunk-cluster branch (magnus.blaudd:3402 to 3403) magnus.blaudd28 Oct