List:Internals« Previous MessageNext Message »
From:tomas Date:January 4 2006 4:30pm
Subject:bk commit into 5.1 tree (tomas:1.1987)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1987 06/01/04 16:30:28 tomas@stripped +21 -0
  Merge tulin@stripped:/home/bk/mysql-5.1-wl2325-v5
  into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-wl2325-repl

  storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
    1.25 06/01/04 16:30:20 tomas@stripped +1 -43
    manual merge

  sql/ha_ndbcluster.cc
    1.230 06/01/04 16:30:20 tomas@stripped +0 -44
    manual merge

  mysql-test/t/ndb_multi.test
    1.9 06/01/04 16:30:20 tomas@stripped +1 -1
    manual merge

  mysql-test/t/disabled.def
    1.17 06/01/04 16:30:20 tomas@stripped +0 -0
    manual merge

  mysql-test/r/ndb_multi.result
    1.8 06/01/04 16:30:20 tomas@stripped +0 -0
    manual merge

  storage/ndb/test/run-test/make-config.sh
    1.6 06/01/04 16:08:48 tomas@stripped +0 -1
    Auto merged

  storage/ndb/test/ndbapi/bank/BankLoad.cpp
    1.12 06/01/04 16:08:48 tomas@stripped +0 -1
    Auto merged

  storage/ndb/src/ndbapi/ndberror.c
    1.46 06/01/04 16:08:48 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
    1.36 06/01/04 16:08:48 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
    1.9 06/01/04 16:08:48 tomas@stripped +0 -1
    Auto merged

  storage/ndb/src/ndbapi/Ndb.cpp
    1.61 06/01/04 16:08:48 tomas@stripped +0 -0
    Auto merged

  storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
    1.65 06/01/04 16:08:48 tomas@stripped +0 -0
    Auto merged

  sql/sql_db.cc
    1.123 06/01/04 16:08:48 tomas@stripped +0 -0
    Auto merged

  sql/sql_base.cc
    1.286 06/01/04 16:08:48 tomas@stripped +0 -0
    Auto merged

  sql/mysqld.cc
    1.507 06/01/04 16:08:47 tomas@stripped +0 -0
    Auto merged

  sql/mysql_priv.h
    1.349 06/01/04 16:08:47 tomas@stripped +0 -0
    Auto merged

  sql/handler.h
    1.171 06/01/04 16:08:47 tomas@stripped +0 -0
    Auto merged

  sql/handler.cc
    1.196 06/01/04 16:08:47 tomas@stripped +0 -0
    Auto merged

  sql/ha_ndbcluster.h
    1.103 06/01/04 16:08:47 tomas@stripped +0 -0
    Auto merged

  sql/Makefile.am
    1.125 06/01/04 16:08:46 tomas@stripped +0 -0
    Auto merged

  libmysqld/Makefile.am
    1.74 06/01/04 16:08:46 tomas@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/mysql-5.1-wl2325-repl/RESYNC

--- 1.124/sql/Makefile.am	2005-12-15 09:21:04 +01:00
+++ 1.125/sql/Makefile.am	2006-01-04 16:08:46 +01:00
@@ -46,6 +46,7 @@
                         @yassl_libs@ @openssl_libs@
 noinst_HEADERS =	item.h item_func.h item_sum.h item_cmpfunc.h \
 			item_strfunc.h item_timefunc.h item_uniq.h \
+			item_xmlfunc.h \
 			item_create.h item_subselect.h item_row.h \
 			mysql_priv.h item_geofunc.h sql_bitmap.h \
 			procedure.h sql_class.h sql_lex.h sql_list.h \
@@ -63,12 +64,12 @@
 			sp_head.h sp_pcontext.h sp_rcontext.h sp.h sp_cache.h \
 			parse_file.h sql_view.h	sql_trigger.h \
 			sql_array.h sql_cursor.h \
-			sql_plugin.h
+			sql_plugin.h authors.h
 mysqld_SOURCES =	sql_lex.cc sql_handler.cc sql_partition.cc \
 			item.cc item_sum.cc item_buff.cc item_func.cc \
 			item_cmpfunc.cc item_strfunc.cc item_timefunc.cc \
 			thr_malloc.cc item_create.cc item_subselect.cc \
-			item_row.cc item_geofunc.cc \
+			item_row.cc item_geofunc.cc item_xmlfunc.cc \
 			field.cc strfunc.cc key.cc sql_class.cc sql_list.cc \
 			net_serv.cc protocol.cc sql_state.c \
 			lock.cc my_lock.c \
@@ -97,7 +98,7 @@
                         tztime.cc my_time.c my_decimal.cc\
 			sp_head.cc sp_pcontext.cc  sp_rcontext.cc sp.cc \
 			sp_cache.cc parse_file.cc sql_trigger.cc \
-			sql_plugin.cc\
+			sql_plugin.cc sql_binlog.cc \
 			handlerton.cc
 EXTRA_mysqld_SOURCES =	ha_innodb.cc ha_berkeley.cc ha_archive.cc \
 			ha_innodb.h  ha_berkeley.h  ha_archive.h \
@@ -121,23 +122,23 @@
 			@DEFS@
 
 BUILT_SOURCES =		sql_yacc.cc sql_yacc.h lex_hash.h
-EXTRA_DIST =		udf_example.cc $(BUILT_SOURCES)
+EXTRA_DIST =		udf_example.cc handlerton-win.cc $(BUILT_SOURCES)
 DISTCLEANFILES =        lex_hash.h
 AM_YFLAGS =		-d
 
 mysql_tzinfo_to_sql.cc:
 	rm -f mysql_tzinfo_to_sql.cc
-	@LN_CP_F@ tztime.cc mysql_tzinfo_to_sql.cc
+	@LN_CP_F@ $(srcdir)/tztime.cc mysql_tzinfo_to_sql.cc
 
 link_sources: mysql_tzinfo_to_sql.cc
 	rm -f mini_client_errors.c
-	@LN_CP_F@ ../libmysql/errmsg.c mini_client_errors.c
+	@LN_CP_F@ $(top_srcdir)/libmysql/errmsg.c mini_client_errors.c
 	rm -f pack.c
-	@LN_CP_F@ ../sql-common/pack.c pack.c
+	@LN_CP_F@ $(top_srcdir)/sql-common/pack.c pack.c
 	rm -f client.c
-	@LN_CP_F@ ../sql-common/client.c client.c
+	@LN_CP_F@ $(top_srcdir)/sql-common/client.c client.c
 	rm -f my_time.c
-	@LN_CP_F@ ../sql-common/my_time.c my_time.c
+	@LN_CP_F@ $(top_srcdir)/sql-common/my_time.c my_time.c
 
 mysql_tzinfo_to_sql.o:	$(mysql_tzinfo_to_sql_SOURCES)
 			$(CXXCOMPILE) -c $(INCLUDES) -DTZINFO2SQL $<

--- 1.195/sql/handler.cc	2005-12-15 09:21:04 +01:00
+++ 1.196/sql/handler.cc	2006-01-04 16:08:47 +01:00
@@ -190,7 +190,8 @@
 } /* ha_checktype */
 
 
-handler *get_new_handler(TABLE *table, MEM_ROOT *alloc, enum db_type db_type)
+handler *get_new_handler(TABLE_SHARE *share, MEM_ROOT *alloc,
+                         enum db_type db_type)
 {
   handler *file= NULL;
   handlerton **types;
@@ -206,7 +207,7 @@
     if (db_type == (*types)->db_type && (*types)->create)
     {
       file= ((*types)->state == SHOW_OPTION_YES) ?
-		(*types)->create(table) : NULL;
+		(*types)->create(share) : NULL;
       break;
     }
   }
@@ -217,7 +218,7 @@
     enum db_type def=(enum db_type) current_thd->variables.table_type;
     /* Try first with 'default table type' */
     if (db_type != def)
-      return get_new_handler(table, alloc, def);
+      return get_new_handler(share, alloc, def);
   }
   if (file)
   {
@@ -680,83 +681,6 @@
       thd->variables.tx_isolation=thd->session_tx_isolation;
       thd->transaction.cleanup();
     }
-    /*
-      TODO: REORGANIZATION (Guilhem 27 Oct 2005).
-
-      1) as the pending event is a member of thd->transaction, it would be
-      tidy to move prepare_for_commit|rollback into the st_transaction class;
-      roughly they are dependent on nothing which is in thd and which is not in
-      thd->transaction. So we would do:
-      thd->transaction.prepare_for_rollback().
-
-
-      2) prepare_for_rollback(), intended to do thread's preparations for
-      rollback, not only related to binlog, currently does only binlog things,
-      while some preparation is done in ha_rollback_trans(thd). Not optimal.
-      Guilhem thinks that prepare_for_commit|rollback() is not a symmetric
-      couple in reality: see how in ha_rollback_trans we have to call
-      prepare_for_rollback while in ha_commit_trans we don't have to call
-      prepare_for_commit (because it was already called by binlog_query).
-      So maybe names should not be symmetric...
-
-      3) prepare_for_rollback() can be seen as some cleanup to do: when they
-      execute a statement (on a transactional table), engines allocate
-      resources, write to them, and they free these resources above in
-      (*ht)->rollback. The pending rows event is also a resource allocated by
-      THD during the statement, and at rollback THD also needs to free this
-      resource.
-
-      4) What bothers Guilhem is that this resource is in THD, so again it makes
-      the binlog a special case, by requiring an explicit line "rollback
-      binlog" here in ha_rollback_trans(). In his XA patch, Serg had organized
-      things so that the binlog looks like a storage engine, registers in the
-      transaction if needed, and so does its cleanup in (*ht)->rollback 
-      above. But "pending" happens earlier in the process, so we may have a
-      pending event, without the binlog being registered as a participant in
-      the transaction.
-      As "pending" is "handler's private per-connection data" (quoting
-      sql_class.h) it should be in thd->ha_data[binlog_hton.slot], Serg
-      confirms.
-      Then we should register the binlog as soon as we create a pending event.
-      Then pending would be wiped out by binlog_rollback() and that would be
-      tidy.
-      The fact to have a binlog transaction cache is considered as having
-      binlog a participant. But a pending rows event is yet another binlog
-      transaction cache, so it should be placed in the same containers.
-      
-      https://intranet.mysql.com/worklog/Server-RawIdeaBin/?tid=2967
-      is the Worklog entry for this reorganization.
-      When this is done, we won't need the explicit call below, as binlog will
-      be registered as participant even in the case where this is an
-      autocommit statement on a transactional table, one row has been updated
-      but the statement rolls back because it fails on another row. With the
-      current code and without the call below, the updated row causes a
-      pending event which is not flushed (because binlog_query() is not called
-      because the statement is going to be rolled back); this pending event
-      then influences the next statement. Testcase extracted
-      from the sp_trans test, which causes a crash without the below call:
--- source include/have_innodb.inc
-delimiter |;
-create table t1 (id int) engine=innodb|
-create table t2 (id int primary key, j int) engine=innodb|
-      insert into t2 values (1, 0)|
-create function bug10015_5(i int) returns int
-  begin
-    if (i = 5) then
-      insert into t2 values (1, 0);
-    end if;
-    return i;
-  end|
---error 1062
-insert into t1 values (bug10015_5(4)), (bug10015_5(5))|
-### if you exit here master won't crash
-drop function bug10015_5|
-# if you exit here master will crash when it shuts down
-
-    */
-
-    thd->prepare_for_rollback();
-
   }
 #endif /* USING_TRANSACTIONS */
   /*
@@ -1104,10 +1028,10 @@
 int ha_rollback_to_savepoint(THD *thd, SAVEPOINT *sv)
 {
   int error=0;
-  THD_TRANS *trans=&thd->transaction.all;
+  THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
+                                        &thd->transaction.all);
   handlerton **ht=trans->ht, **end_ht;
   DBUG_ENTER("ha_rollback_to_savepoint");
-  DBUG_ASSERT(thd->transaction.stmt.ht[0] == 0);
 
   trans->nht=sv->nht;
   trans->no_2pc=0;
@@ -1125,7 +1049,8 @@
       my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
       error=1;
     }
-    statistic_increment(thd->status_var.ha_savepoint_rollback_count,&LOCK_status);
+    statistic_increment(thd->status_var.ha_savepoint_rollback_count,
+                        &LOCK_status);
     trans->no_2pc|=(*ht)->prepare == 0;
   }
   /*
@@ -1135,7 +1060,7 @@
   for (; *ht ; ht++)
   {
     int err;
-    if ((err= (*(*ht)->rollback)(thd, 1)))
+    if ((err= (*(*ht)->rollback)(thd, !thd->in_sub_stmt)))
     { // cannot happen
       my_error(ER_ERROR_DURING_ROLLBACK, MYF(0), err);
       error=1;
@@ -1155,10 +1080,10 @@
 int ha_savepoint(THD *thd, SAVEPOINT *sv)
 {
   int error=0;
-  THD_TRANS *trans=&thd->transaction.all;
+  THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
+                                        &thd->transaction.all);
   handlerton **ht=trans->ht;
   DBUG_ENTER("ha_savepoint");
-  DBUG_ASSERT(thd->transaction.stmt.ht[0] == 0);
 #ifdef USING_TRANSACTIONS
   for (; *ht; ht++)
   {
@@ -1184,9 +1109,10 @@
 int ha_release_savepoint(THD *thd, SAVEPOINT *sv)
 {
   int error=0;
-  handlerton **ht=thd->transaction.all.ht, **end_ht;
+  THD_TRANS *trans= (thd->in_sub_stmt ? &thd->transaction.stmt :
+                                        &thd->transaction.all);
+  handlerton **ht=trans->ht, **end_ht;
   DBUG_ENTER("ha_release_savepoint");
-  DBUG_ASSERT(thd->transaction.stmt.ht[0] == 0);
 
   end_ht=ht+sv->nht;
   for (; ht < end_ht; ht++)
@@ -1254,7 +1180,7 @@
 */
 
 int ha_delete_table(THD *thd, enum db_type table_type, const char *path,
-                    const char *alias, bool generate_warning)
+                    const char *db, const char *alias, bool generate_warning)
 {
   handler *file;
   char tmp_path[FN_REFLEN];
@@ -1269,7 +1195,7 @@
 
   /* DB_TYPE_UNKNOWN is used in ALTER TABLE when renaming only .frm files */
   if (table_type == DB_TYPE_UNKNOWN ||
-      ! (file=get_new_handler(&dummy_table, thd->mem_root, table_type)))
+      ! (file=get_new_handler(&dummy_share, thd->mem_root, table_type)))
     DBUG_RETURN(ENOENT);
 
   if (lower_case_table_names == 2 && !(file->table_flags() &
HA_FILE_BASED))
@@ -1302,7 +1228,12 @@
     thd->net.last_error[0]= 0;
 
     /* Fill up strucutures that print_error may need */
-    dummy_table.s->path= path;
+    dummy_share.path.str= (char*) path;
+    dummy_share.path.length= strlen(path);
+    dummy_share.db.str= (char*) db;
+    dummy_share.db.length= strlen(db);
+    dummy_share.table_name.str= (char*) alias;
+    dummy_share.table_name.length= strlen(alias);
     dummy_table.alias= alias;
 
     file->print_error(error, 0);
@@ -1324,16 +1255,26 @@
 ** General handler functions
 ****************************************************************************/
 
-	/* Open database-handler. Try O_RDONLY if can't open as O_RDWR */
-	/* Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set */
+/*
+  Open database-handler.
+
+  IMPLEMENTATION
+    Try O_RDONLY if cannot open as O_RDWR
+    Don't wait for locks if not HA_OPEN_WAIT_IF_LOCKED is set
+*/
 
-int handler::ha_open(const char *name, int mode, int test_if_locked)
+int handler::ha_open(TABLE *table_arg, const char *name, int mode,
+                     int test_if_locked)
 {
   int error;
   DBUG_ENTER("handler::ha_open");
-  DBUG_PRINT("enter",("name: %s  db_type: %d  db_stat: %d  mode: %d  lock_test: %d",
-                      name, table->s->db_type, table->db_stat, mode,
-                      test_if_locked));
+  DBUG_PRINT("enter",
+             ("name: %s  db_type: %d  db_stat: %d  mode: %d  lock_test: %d",
+              name, table_share->db_type, table_arg->db_stat, mode,
+              test_if_locked));
+
+  table= table_arg;
+  DBUG_ASSERT(table->s == table_share);
 
   if ((error=open(name,mode,test_if_locked)))
   {
@@ -1346,7 +1287,7 @@
   }
   if (error)
   {
-    my_errno=error;			/* Safeguard */
+    my_errno= error;                            /* Safeguard */
     DBUG_PRINT("error",("error: %d  errno: %d",error,errno));
   }
   else
@@ -1364,74 +1305,51 @@
     }
     else
       dupp_ref=ref+ALIGN_SIZE(ref_length);
+
+    if (ha_allocate_read_write_set(table->s->fields))
+      error= 1;
   }
   DBUG_RETURN(error);
 }
 
+
 int handler::ha_initialise()
 {
   DBUG_ENTER("ha_initialise");
-  if (table && table->s->fields &&
-      ha_allocate_read_write_set(table->s->fields))
-  {
-    DBUG_RETURN(TRUE);
-  }
   DBUG_RETURN(FALSE);
 }
 
+
+/*
+  Initalize bit maps for used fields
+
+  Called from open_table_from_share()
+*/
+
 int handler::ha_allocate_read_write_set(ulong no_fields)
 {
-  uint bitmap_size= 4*(((no_fields+1)+31)/32);
+  uint bitmap_size= bitmap_buffer_size(no_fields+1);
   uint32 *read_buf, *write_buf;
-#ifndef DEBUG_OFF
-  my_bool r;
-#endif
   DBUG_ENTER("ha_allocate_read_write_set");
   DBUG_PRINT("enter", ("no_fields = %d", no_fields));
 
-  if (table)
+  if (!multi_alloc_root(&table->mem_root,
+                        &read_set, sizeof(MY_BITMAP),
+                        &write_set, sizeof(MY_BITMAP),
+                        &read_buf, bitmap_size,
+                        &write_buf, bitmap_size,
+                        NullS))
   {
-    if (table->read_set == NULL)
-    {
-      read_set= (MY_BITMAP*)sql_alloc(sizeof(MY_BITMAP));
-      write_set= (MY_BITMAP*)sql_alloc(sizeof(MY_BITMAP));
-      read_buf= (uint32*)sql_alloc(bitmap_size);
-      write_buf= (uint32*)sql_alloc(bitmap_size);
-      if (!read_set || !write_set || !read_buf || !write_buf)
-      {
-        ha_deallocate_read_write_set();
-        DBUG_RETURN(TRUE);
-      }
-#ifndef DEBUG_OFF
-      r =
-#endif
-        bitmap_init(read_set, read_buf, no_fields+1, FALSE);
-      DBUG_ASSERT(!r /*bitmap_init(read_set...)*/);
-#ifndef DEBUG_OFF
-      r =
-#endif
-        bitmap_init(write_set, write_buf, no_fields+1, FALSE);
-      DBUG_ASSERT(!r /*bitmap_init(write_set...)*/);
-      table->read_set= read_set;
-      table->write_set= write_set;
-      ha_clear_all_set();
-    }
-    else
-    {
-      read_set= table->read_set;
-      write_set= table->write_set;
-    }
+    DBUG_RETURN(TRUE);
   }
+  bitmap_init(read_set, read_buf, no_fields+1, FALSE);
+  bitmap_init(write_set, write_buf, no_fields+1, FALSE);
+  table->read_set= read_set;
+  table->write_set= write_set;
+  ha_clear_all_set();
   DBUG_RETURN(FALSE);
 }
 
-void handler::ha_deallocate_read_write_set()
-{
-  DBUG_ENTER("ha_deallocate_read_write_set");
-  read_set=write_set=0;
-  DBUG_VOID_RETURN;
-}
-
 void handler::ha_clear_all_set()
 {
   DBUG_ENTER("ha_clear_all_set");
@@ -1473,6 +1391,7 @@
 }
 
 
+
 /*
   Read first row (only) from a table
   This is never called for InnoDB or BDB tables, as these table types
@@ -1484,7 +1403,8 @@
   register int error;
   DBUG_ENTER("handler::read_first_row");
 
-  statistic_increment(current_thd->status_var.ha_read_first_count,&LOCK_status);
+  statistic_increment(table->in_use->status_var.ha_read_first_count,
+                      &LOCK_status);
 
   /*
     If there is very few deleted rows in the table, find the first row by
@@ -1750,15 +1670,16 @@
     uint key_nr=get_dup_key(error);
     if ((int) key_nr >= 0)
     {
-      /* Write the dupplicated key in the error message */
+      /* Write the duplicated key in the error message */
       char key[MAX_KEY_LENGTH];
       String str(key,sizeof(key),system_charset_info);
+      /* Table is opened and defined at this point */
       key_unpack(&str,table,(uint) key_nr);
       uint max_length=MYSQL_ERRMSG_SIZE-(uint) strlen(ER(ER_DUP_ENTRY));
       if (str.length() >= max_length)
       {
 	str.length(max_length-4);
-	str.append("...");
+	str.append(STRING_WITH_LEN("..."));
       }
       my_error(ER_DUP_ENTRY, MYF(0), str.c_ptr(), key_nr+1);
       DBUG_VOID_RETURN;
@@ -1839,20 +1760,9 @@
     textno=ER_TABLE_DEF_CHANGED;
     break;
   case HA_ERR_NO_SUCH_TABLE:
-  {
-    /*
-      We have to use path to find database name instead of using
-      table->table_cache_key because if the table didn't exist, then
-      table_cache_key was not set up
-    */
-    char *db;
-    char buff[FN_REFLEN];
-    uint length= dirname_part(buff,table->s->path);
-    buff[length-1]=0;
-    db=buff+dirname_length(buff);
-    my_error(ER_NO_SUCH_TABLE, MYF(0), db, table->alias);
+    my_error(ER_NO_SUCH_TABLE, MYF(0), table_share->db.str,
+             table_share->table_name.str);
     break;
-  }
   case HA_ERR_RBR_LOGGING_FAILED:
     textno= ER_BINLOG_ROW_LOGGING_FAILED;
     break;
@@ -1876,7 +1786,7 @@
       DBUG_VOID_RETURN;
     }
   }
-  my_error(textno, errflag, table->alias, error);
+  my_error(textno, errflag, table_share->table_name.str, error);
   DBUG_VOID_RETURN;
 }
 
@@ -2021,23 +1931,37 @@
 
 /*
   Initiates table-file and calls apropriate database-creator
-  Returns 1 if something got wrong
+
+  NOTES
+    We must have a write lock on LOCK_open to be sure no other thread
+    interfers with table
+    
+  RETURN
+   0  ok
+   1  error
 */
 
-int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
+int ha_create_table(THD *thd, const char *path,
+                    const char *db, const char *table_name,
+                    HA_CREATE_INFO *create_info,
 		    bool update_create_info)
 {
-  int error;
+  int error= 1;
   TABLE table;
   char name_buff[FN_REFLEN];
+  const char *name;
+  TABLE_SHARE share;
   DBUG_ENTER("ha_create_table");
+  
+  init_tmp_table_share(&share, db, 0, table_name, path);
+  if (open_table_def(thd, &share, 0) ||
+      open_table_from_share(thd, &share, "", 0, (uint) READ_ALL, 0, &table))
+    goto err;
 
-  if (openfrm(current_thd, name,"",0,(uint) READ_ALL, 0, &table))
-    DBUG_RETURN(1);
   if (update_create_info)
-  {
     update_create_info_from_table(create_info, &table);
-  }
+
+  name= share.path.str;
   if (lower_case_table_names == 2 &&
       !(table.file->table_flags() & HA_FILE_BASED))
   {
@@ -2047,27 +1971,32 @@
     name= name_buff;
   }
 
-  error=table.file->create(name,&table,create_info);
-  VOID(closefrm(&table));
+  error= table.file->create(name, &table, create_info);
+  VOID(closefrm(&table, 0));
   if (error)
-    my_error(ER_CANT_CREATE_TABLE, MYF(ME_BELL+ME_WAITTANG), name,error);
+  {
+    strxmov(name_buff, db, ".", table_name, NullS);
+    my_error(ER_CANT_CREATE_TABLE, MYF(ME_BELL+ME_WAITTANG), name_buff, error);
+  }
+err:
+  free_table_share(&share);
   DBUG_RETURN(error != 0);
 }
 
 /*
-  Try to discover table from engine and
-  if found, write the frm file to disk.
+  Try to discover table from engine
+
+  NOTES
+    If found, write the frm file to disk.
 
   RETURN VALUES:
-  -1 : Table did not exists
-   0 : Table created ok
-   > 0 : Error, table existed but could not be created
+  -1    Table did not exists
+   0    Table created ok
+   > 0  Error, table existed but could not be created
 
 */
 
-int ha_create_table_from_engine(THD* thd,
-				const char *db,
-				const char *name)
+int ha_create_table_from_engine(THD* thd, const char *db, const char *name)
 {
   int error;
   const void *frmblob;
@@ -2075,6 +2004,7 @@
   char path[FN_REFLEN];
   HA_CREATE_INFO create_info;
   TABLE table;
+  TABLE_SHARE share;
   DBUG_ENTER("ha_create_table_from_engine");
   DBUG_PRINT("enter", ("name '%s'.'%s'", db, name));
 
@@ -2090,15 +2020,23 @@
     frmblob and frmlen are set, write the frm to disk
   */
 
-  (void)strxnmov(path,FN_REFLEN,mysql_data_home,"/",db,"/",name,NullS);
+  (void)strxnmov(path,FN_REFLEN-1,mysql_data_home,"/",db,"/",name,NullS);
   // Save the frm file
   error= writefrm(path, frmblob, frmlen);
   my_free((char*) frmblob, MYF(0));
   if (error)
     DBUG_RETURN(2);
 
-  if (openfrm(thd, path,"",0,(uint) READ_ALL, 0, &table))
+  init_tmp_table_share(&share, db, 0, name, path);
+  if (open_table_def(thd, &share, 0))
+  {
     DBUG_RETURN(3);
+  }
+  if (open_table_from_share(thd, &share, "" ,0, 0, 0, &table))
+  {
+    free_table_share(&share);
+    DBUG_RETURN(3);
+  }
 
   update_create_info_from_table(&create_info, &table);
   create_info.table_options|= HA_OPTION_CREATE_FROM_ENGINE;
@@ -2110,7 +2048,7 @@
     my_casedn_str(files_charset_info, path);
   }
   error=table.file->create(path,&table,&create_info);
-  VOID(closefrm(&table));
+  VOID(closefrm(&table, 1));
 
   DBUG_RETURN(error != 0);
 }
@@ -2679,7 +2617,7 @@
     {
       if ((*types)->state == SHOW_OPTION_YES)
       {
-	handler *file= get_new_handler(0, mem_root,
+	handler *file= get_new_handler((TABLE_SHARE*) 0, mem_root,
                                        (enum db_type) (*types)->db_type);
 	for (ext= file->bas_ext(); *ext; ext++)
 	{
@@ -2765,14 +2703,16 @@
   A row in the given table should be replicated if:
   - Row-based replication is on
   - It is not a temporary table
-  - The table shall be binlogged (binlog_*_db rules)
+  - The binlog is enabled
+  - The table shall be binlogged (binlog_*_db rules) [Seems disabled /Matz]
 */
   
 #ifdef HAVE_ROW_BASED_REPLICATION
-static bool check_table_binlog_row_based(TABLE *table) 
+static bool check_table_binlog_row_based(THD *thd, TABLE *table)
 {
   return
     binlog_row_based &&
+    thd && (thd->options & OPTION_BIN_LOG) &&
     (table->s->tmp_table == NO_TMP_TABLE);
 }
 
@@ -2783,7 +2723,9 @@
   if (h->is_injective())
     return 0;
   bool error= 0;
-  if (check_table_binlog_row_based(table)) 
+  THD *const thd= current_thd;
+
+  if (check_table_binlog_row_based(thd, table)) 
   {
     MY_BITMAP cols;
     /* Potential buffer on the stack for the bitmap */
@@ -2797,7 +2739,7 @@
     {
       bitmap_set_all(&cols);
       error=
-        RowsEventT::binlog_row_logging_function(current_thd, table,
+        RowsEventT::binlog_row_logging_function(thd, table,
                                                 table->file->has_transactions(),
                                                 &cols, table->s->fields,
                                                 before_record, after_record);

--- 1.170/sql/handler.h	2005-12-15 09:21:04 +01:00
+++ 1.171/sql/handler.h	2006-01-04 16:08:47 +01:00
@@ -93,6 +93,7 @@
 #define HA_CAN_BIT_FIELD       (1 << 28) /* supports bit fields */
 #define HA_NEED_READ_RANGE_BUFFER (1 << 29) /* for read_multi_range */
 #define HA_ANY_INDEX_MAY_BE_UNIQUE (1 << 30)
+#define HA_NO_COPY_ON_ALTER    (1 << 31)
 
 /* Flags for partition handlers */
 #define HA_CAN_PARTITION       (1 << 0) /* Partition support */
@@ -318,6 +319,7 @@
 
 struct st_table;
 typedef struct st_table TABLE;
+typedef struct st_table_share TABLE_SHARE;
 struct st_foreign_key_info;
 typedef struct st_foreign_key_info FOREIGN_KEY_INFO;
 typedef bool (stat_print_fn)(THD *thd, const char *type, const char *file,
@@ -418,7 +420,7 @@
    void *(*create_cursor_read_view)();
    void (*set_cursor_read_view)(void *);
    void (*close_cursor_read_view)(void *);
-   handler *(*create)(TABLE *table);
+   handler *(*create)(TABLE_SHARE *table);
    void (*drop_database)(char* path);
    int (*panic)(enum ha_panic_function flag);
    int (*release_temporary_latches)(THD *thd);
@@ -731,6 +733,9 @@
 bool is_partition_in_list(char *part_name, List<char> list_part_names);
 bool is_partitions_in_table(partition_info *new_part_info,
                             partition_info *old_part_info);
+bool check_reorganise_list(partition_info *new_part_info,
+                           partition_info *old_part_info,
+                           List<char> list_part_names);
 bool set_up_defaults_for_partitioning(partition_info *part_info,
                                       handler *file,
                                       ulonglong max_rows,
@@ -755,8 +760,9 @@
                                KEY *key_info,
                                const key_range *key_spec,
                                part_id_range *part_spec);
-bool mysql_unpack_partition(THD *thd, uchar *part_buf, uint part_info_len,
-                            TABLE* table, enum db_type default_db_type);
+bool mysql_unpack_partition(THD *thd, const uchar *part_buf,
+                            uint part_info_len, TABLE *table,
+                            enum db_type default_db_type);
 #endif
 
 
@@ -781,7 +787,8 @@
  friend class ha_partition;
 #endif
  protected:
-  struct st_table *table;		/* The table definition */
+  struct st_table_share *table_share;   /* The table definition */
+  struct st_table *table;               /* The current open table */
 
   virtual int index_init(uint idx, bool sorted) { active_index=idx; return 0; }
   virtual int index_end() { active_index=MAX_KEY; return 0; }
@@ -842,8 +849,8 @@
   MY_BITMAP *read_set;
   MY_BITMAP *write_set;
 
-  handler(const handlerton *ht_arg, TABLE *table_arg) :table(table_arg),
-    ht(ht_arg),
+  handler(const handlerton *ht_arg, TABLE_SHARE *share_arg)
+    :table_share(share_arg), ht(ht_arg),
     ref(0), data_file_length(0), max_data_file_length(0), index_file_length(0),
     delete_length(0), auto_increment_value(0),
     records(0), deleted(0), mean_rec_length(0),
@@ -855,16 +862,19 @@
     {}
   virtual ~handler(void)
   {
-    ha_deallocate_read_write_set();
     /* TODO: DBUG_ASSERT(inited == NONE); */
   }
   virtual int ha_initialise();
-  int ha_open(const char *name, int mode, int test_if_locked);
+  int ha_open(TABLE *table, const char *name, int mode, int test_if_locked);
   bool update_auto_increment();
   virtual void print_error(int error, myf errflag);
   virtual bool get_error_message(int error, String *buf);
   uint get_dup_key(int error);
-  void change_table_ptr(TABLE *table_arg) { table=table_arg; }
+  void change_table_ptr(TABLE *table_arg, TABLE_SHARE *share)
+  {
+    table= table_arg;
+    table_share= share;
+  }
   virtual double scan_time()
     { return ulonglong2double(data_file_length) / IO_SIZE + 2; }
   virtual double read_time(uint index, uint ranges, ha_rows rows)
@@ -1050,7 +1060,6 @@
   }
   void ha_set_primary_key_in_read_set();
   int ha_allocate_read_write_set(ulong no_fields);
-  void ha_deallocate_read_write_set();
   void ha_clear_all_set();
   uint get_index(void) const { return active_index; }
   virtual int open(const char *name, int mode, uint test_if_locked)=0;
@@ -1188,12 +1197,32 @@
                                    key_range *max_key)
     { return (ha_rows) 10; }
   virtual void position(const byte *record)=0;
-  virtual void info(uint)=0;
+  virtual void info(uint)=0; // see my_base.h for full description
   virtual int extra(enum ha_extra_function operation)
   { return 0; }
   virtual int extra_opt(enum ha_extra_function operation, ulong cache_size)
   { return extra(operation); }
   virtual int external_lock(THD *thd, int lock_type) { return 0; }
+  /*
+    In an UPDATE or DELETE, if the row under the cursor was locked by another
+    transaction, and the engine used an optimistic read of the last
+    committed row value under the cursor, then the engine returns 1 from this
+    function. MySQL must NOT try to update this optimistic value. If the
+    optimistic value does not match the WHERE condition, MySQL can decide to
+    skip over this row. Currently only works for InnoDB. This can be used to
+    avoid unnecessary lock waits.
+
+    If this method returns nonzero, it will also signal the storage
+    engine that the next read will be a locking re-read of the row.
+  */
+  virtual bool was_semi_consistent_read() { return 0; }
+  /*
+    Tell the engine whether it should avoid unnecessary lock waits.
+    If yes, in an UPDATE or DELETE, if the row under the cursor was locked
+    by another transaction, the engine may try an optimistic read of
+    the last committed row value under the cursor.
+  */
+  virtual void try_semi_consistent_read(bool) {}
   virtual void unlock_row() {}
   virtual int start_stmt(THD *thd, thr_lock_type lock_type) {return 0;}
   /*
@@ -1287,6 +1316,7 @@
 #ifdef WITH_PARTITION_STORAGE_ENGINE
   virtual ulong partition_flags(void) const { return 0;}
   virtual int get_default_no_partitions(ulonglong max_rows) { return 1;}
+  virtual void set_part_info(partition_info *part_info) { return; }
 #endif
   virtual ulong index_flags(uint idx, uint part, bool all_parts) const =0;
   virtual ulong index_ddl_flags(KEY *wanted_index) const
@@ -1419,17 +1449,18 @@
   */
   friend int ndb_add_binlog_index(THD *, void *);
 
-  virtual int write_row(byte *buf) 
+  virtual int write_row(byte *buf __attribute__((unused))) 
   { 
     return HA_ERR_WRONG_COMMAND; 
   }
 
-  virtual int update_row(const byte *old_data, byte *new_data)
+  virtual int update_row(const byte *old_data __attribute__((unused)),
+                         byte *new_data __attribute__((unused)))
   { 
     return HA_ERR_WRONG_COMMAND; 
   }
 
-  virtual int delete_row(const byte *buf)
+  virtual int delete_row(const byte *buf __attribute__((unused)))
   { 
     return HA_ERR_WRONG_COMMAND; 
   }
@@ -1452,7 +1483,8 @@
 /* lookups */
 enum db_type ha_resolve_by_name(const char *name, uint namelen);
 const char *ha_get_storage_engine(enum db_type db_type);
-handler *get_new_handler(TABLE *table, MEM_ROOT *alloc, enum db_type db_type);
+handler *get_new_handler(TABLE_SHARE *share, MEM_ROOT *alloc,
+                         enum db_type db_type);
 enum db_type ha_checktype(THD *thd, enum db_type database_type,
                           bool no_substitute, bool report_error);
 bool ha_check_storage_engine_flag(enum db_type db_type, uint32 flag);
@@ -1466,10 +1498,12 @@
 my_bool ha_storage_engine_is_enabled(enum db_type database_type);
 bool ha_flush_logs(enum db_type db_type=DB_TYPE_DEFAULT);
 void ha_drop_database(char* path);
-int ha_create_table(const char *name, HA_CREATE_INFO *create_info,
+int ha_create_table(THD *thd, const char *path,
+                    const char *db, const char *table_name,
+                    HA_CREATE_INFO *create_info,
 		    bool update_create_info);
 int ha_delete_table(THD *thd, enum db_type db_type, const char *path,
-                    const char *alias, bool generate_warning);
+                    const char *db, const char *alias, bool generate_warning);
 
 /* statistics and info */
 bool ha_show_status(THD *thd, enum db_type db_type, enum ha_stat_type stat);

--- 1.348/sql/mysql_priv.h	2006-01-02 21:46:39 +01:00
+++ 1.349/sql/mysql_priv.h	2006-01-04 16:08:47 +01:00
@@ -1039,7 +1039,7 @@
 bool remove_table_from_cache(THD *thd, const char *db, const char *table,
                              uint flags);
 
-bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables);
+bool close_cached_tables(THD *thd, bool wait_for_refresh, TABLE_LIST *tables, bool
have_lock = FALSE);
 void copy_field_from_tmp_record(Field *field,int offset);
 bool fill_record(THD *thd, Field **field, List<Item> &values,
                  bool ignore_errors);

--- 1.506/sql/mysqld.cc	2005-12-29 20:48:12 +01:00
+++ 1.507/sql/mysqld.cc	2006-01-04 16:08:47 +01:00
@@ -416,6 +416,8 @@
 ulong opt_ndb_cache_check_time;
 const char *opt_ndb_mgmd;
 ulong opt_ndb_nodeid;
+ulong ndb_report_thresh_binlog_epoch_slip;
+ulong ndb_report_thresh_binlog_mem_usage;
 
 extern struct show_var_st ndb_status_variables[];
 extern const char *ndb_distribution_names[];
@@ -5259,7 +5261,6 @@
    (gptr*) &global_system_variables.ndb_force_send,
    (gptr*) &global_system_variables.ndb_force_send,
    0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
-#ifdef HAVE_NDB_BINLOG
   {"ndb-report-thresh-binlog-epoch-slip", OPT_NDB_REPORT_THRESH_BINLOG_EPOCH_SLIP,
    "Threshold on number of epochs to be behind before reporting binlog status. "
    "E.g. 3 means that if the difference between what epoch has been received "
@@ -5276,7 +5277,6 @@
    (gptr*) &ndb_report_thresh_binlog_mem_usage,
    (gptr*) &ndb_report_thresh_binlog_mem_usage,
    0, GET_ULONG, REQUIRED_ARG, 10, 0, 100, 0, 0, 0},
-#endif
   {"ndb-use-exact-count", OPT_NDB_USE_EXACT_COUNT,
    "Use exact records count during query planning and for fast "
    "select count(*), disable for faster queries.",

--- 1.285/sql/sql_base.cc	2006-01-02 21:46:39 +01:00
+++ 1.286/sql/sql_base.cc	2006-01-04 16:08:48 +01:00
@@ -803,13 +803,14 @@
 */
 
 bool close_cached_tables(THD *thd, bool if_wait_for_refresh,
-			 TABLE_LIST *tables)
+			 TABLE_LIST *tables, bool have_lock)
 {
   bool result=0;
   DBUG_ENTER("close_cached_tables");
   DBUG_ASSERT(thd || (!if_wait_for_refresh && !tables));
 
-  VOID(pthread_mutex_lock(&LOCK_open));
+  if (!have_lock)
+    VOID(pthread_mutex_lock(&LOCK_open));
   if (!tables)
   {
     refresh_version++;				// Force close of open tables
@@ -888,7 +889,8 @@
     for (TABLE *table=thd->open_tables; table ; table= table->next)
       table->s->version= refresh_version;
   }
-  VOID(pthread_mutex_unlock(&LOCK_open));
+  if (!have_lock)
+    VOID(pthread_mutex_unlock(&LOCK_open));
   if (if_wait_for_refresh)
   {
     pthread_mutex_lock(&thd->mysys_var->mutex);

--- 1.122/sql/sql_db.cc	2005-12-12 20:55:13 +01:00
+++ 1.123/sql/sql_db.cc	2006-01-04 16:08:48 +01:00
@@ -401,6 +401,7 @@
                      bool silent)
 {
   char	 path[FN_REFLEN+16];
+  char	 tmp_query[FN_REFLEN+16];
   long result= 1;
   int error= 0;
   MY_STAT stat_info;
@@ -487,15 +488,18 @@
 
     if (!thd->query)				// Only in replication
     {
-      query= 	     path;
-      query_length= (uint) (strxmov(path,"create database `", db, "`", NullS) -
-			    path);
+      query= 	     tmp_query;
+      query_length= (uint) (strxmov(tmp_query,"create database `",
+                                    db, "`", NullS) - tmp_query);
     }
     else
     {
       query= 	    thd->query;
       query_length= thd->query_length;
     }
+
+    ha_create_database(thd, db, query, query_length);
+
     if (mysql_bin_log.is_open())
     {
       Query_log_event qinfo(thd, query, query_length, 0, 
@@ -566,6 +570,8 @@
 		     thd->variables.collation_server;
     thd->variables.collation_database= thd->db_charset;
   }
+
+  ha_alter_database(thd, db, thd->query, thd->query_length);
 
   if (mysql_bin_log.is_open())
   {

--- 1.16/mysql-test/t/disabled.def	2005-12-15 09:21:04 +01:00
+++ 1.17/mysql-test/t/disabled.def	2006-01-04 16:30:20 +01:00
@@ -1,6 +1,6 @@
 ##############################################################################
 #
-#  List the test cases that are to be disabled temporarely.
+#  List the test cases that are to be disabled temporarily.
 #
 #  Separate the test case name and the comment with ':'.
 #
@@ -11,15 +11,17 @@
 ##############################################################################
 
 sp-goto         : GOTO is currently is disabled - will be fixed in the future
-rpl_relayrotate : Unstable test case, bug#12429
-rpl_until       : Unstable test case, bug#12429
-rpl_deadlock    : Unstable test case, bug#12429
 kill            : Unstable test case, bug#9712
-rpl_row_mystery22:shows rbr slave issues Bug 12418
-rpl_row_relayrotate:Bug 14082
-rpl_row_000002:create table from temporary Bug 12345
-rpl_row_sp006_InnoDB:Bug 12586
-rpl_row_view01:Bug 12687
-rpl_row_NOW:Bug 12574 
-rpl_bit_npk:Bug 13418
-compress        : Magnus will fix
+rpl_bit_npk     : Bug #13418
+ndb_cache2      : Bug #15004
+ndb_cache_multi2: Bug #15004
+func_group      : Bug #15448
+func_math       : Bug #15448
+group_min_max   : Bug #15448 
+#mysqlslap       : Bug #15483
+innodb_concurrent : Results are not deterministic, Elliot will fix (BUG#3300)
+subselect       : Bug#15706
+type_time       : Bug#15805
+rpl000002       : Bug#15920 Temporary tables are not binlogged in SBR
+ps_7ndb         : Bug#15923 Core dump in RBR mode when executing test suite
+sp_trans        : Bug#15924 Code dump in RBR mode when executing test suite

--- 1.64/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2005-12-29 20:48:17 +01:00
+++ 1.65/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2006-01-04 16:08:48 +01:00
@@ -1994,9 +1994,9 @@
   c_opCreateTable.setSize(8);
   c_opDropTable.setSize(8);
   c_opCreateIndex.setSize(8);
-  c_opCreateEvent.setSize(8);
-  c_opSubEvent.setSize(8);
-  c_opDropEvent.setSize(8);
+  c_opCreateEvent.setSize(2);
+  c_opSubEvent.setSize(2);
+  c_opDropEvent.setSize(2);
   c_opSignalUtil.setSize(8);
   c_opDropIndex.setSize(8);
   c_opAlterIndex.setSize(8);
@@ -8602,6 +8602,23 @@
   const CreateEvntReq::RequestType requestType = req->getRequestType();
   const Uint32                     requestFlag = req->getRequestFlag();
 
+  if (refToBlock(signal->senderBlockRef()) != DBDICT &&
+      getOwnNodeId() != c_masterNodeId)
+  {
+    jam();
+    releaseSections(signal);
+    
+    CreateEvntRef * ref = (CreateEvntRef *)signal->getDataPtrSend();
+    ref->setUserRef(reference());
+    ref->setErrorCode(CreateEvntRef::NotMaster);
+    ref->setErrorLine(__LINE__);
+    ref->setErrorNode(reference());
+    ref->setMasterNode(c_masterNodeId);
+    sendSignal(signal->senderBlockRef(), GSN_CREATE_EVNT_REF, signal,
+	       CreateEvntRef::SignalLength2, JBB);
+    return;
+  }
+
   OpCreateEventPtr evntRecPtr;
   // Seize a Create Event record
   if (!c_opCreateEvent.seize(evntRecPtr)) {
@@ -8893,7 +8910,8 @@
       break;
     case CreateEvntReq::RT_USER_CREATE:
       {
-	evntRecPtr.p->m_eventRec.EVENT_TYPE = evntRecPtr.p->m_request.getEventType();
+	evntRecPtr.p->m_eventRec.EVENT_TYPE =
+          evntRecPtr.p->m_request.getEventType() |
evntRecPtr.p->m_request.getReportFlags();
 	evntRecPtr.p->m_eventRec.TABLEID  = evntRecPtr.p->m_request.getTableId();
 	evntRecPtr.p->m_eventRec.TABLEVERSION=evntRecPtr.p->m_request.getTableVersion();
 	AttributeMask m = evntRecPtr.p->m_request.getAttrListBitmask();
@@ -9108,6 +9126,7 @@
       parseReadEventSys(signal, evntRecPtr.p->m_eventRec);
 
       evntRec->m_request.setEventType(evntRecPtr.p->m_eventRec.EVENT_TYPE);
+      evntRec->m_request.setReportFlags(evntRecPtr.p->m_eventRec.EVENT_TYPE);
       evntRec->m_request.setTableId(evntRecPtr.p->m_eventRec.TABLEID);
       evntRec->m_request.setTableVersion(evntRecPtr.p->m_eventRec.TABLEVERSION);
       evntRec->m_request.setAttrListBitmask(*(AttributeMask*)
@@ -9360,10 +9379,10 @@
   sumaReq->subscriptionId   = evntRecPtr.p->m_request.getEventId();
   sumaReq->subscriptionKey  = evntRecPtr.p->m_request.getEventKey();
   sumaReq->subscriptionType = SubCreateReq::TableEvent;
-  if (evntRecPtr.p->m_reportAll)
-  {
+  if (evntRecPtr.p->m_request.getReportAll())
     sumaReq->subscriptionType|= SubCreateReq::ReportAll;
-  }
+  if (evntRecPtr.p->m_request.getReportSubscribe())
+    sumaReq->subscriptionType|= SubCreateReq::ReportSubscribe;
   sumaReq->tableId          = evntRecPtr.p->m_request.getTableId();
     
 #ifdef EVENT_PH2_DEBUG
@@ -9535,6 +9554,20 @@
 
   Uint32 origSenderRef = signal->senderBlockRef();
 
+  if (refToBlock(origSenderRef) != DBDICT &&
+      getOwnNodeId() != c_masterNodeId)
+  {
+    /*
+     * Coordinator but not master
+     */
+    SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
+    ref->senderRef = reference();
+    ref->errorCode = SubStartRef::NotMaster;
+    ref->m_masterNodeId = c_masterNodeId;
+    sendSignal(origSenderRef, GSN_SUB_START_REF, signal,
+	       SubStartRef::SignalLength2, JBB);
+    return;
+  }
   OpSubEventPtr subbPtr;
   if (!c_opSubEvent.seize(subbPtr)) {
     SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
@@ -9714,6 +9747,9 @@
 #ifdef EVENT_DEBUG
     ndbout_c("SUB_START_REF");
 #endif
+    SubStartRef * ref = (SubStartRef *)signal->getDataPtrSend();
+    ref->senderRef = reference();
+    ref->errorCode = subbPtr.p->m_errorCode;
     sendSignal(subbPtr.p->m_senderRef, GSN_SUB_START_REF,
 	       signal, SubStartRef::SignalLength, JBB);
     if (subbPtr.p->m_reqTracker.hasConf()) {
@@ -9742,6 +9778,20 @@
 
   Uint32 origSenderRef = signal->senderBlockRef();
 
+  if (refToBlock(origSenderRef) != DBDICT &&
+      getOwnNodeId() != c_masterNodeId)
+  {
+    /*
+     * Coordinator but not master
+     */
+    SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
+    ref->senderRef = reference();
+    ref->errorCode = SubStopRef::NotMaster;
+    ref->m_masterNodeId = c_masterNodeId;
+    sendSignal(origSenderRef, GSN_SUB_STOP_REF, signal,
+	       SubStopRef::SignalLength2, JBB);
+    return;
+  }
   OpSubEventPtr subbPtr;
   if (!c_opSubEvent.seize(subbPtr)) {
     SubStopRef * ref = (SubStopRef *)signal->getDataPtrSend();
@@ -9935,6 +9985,23 @@
   const Uint32 senderRef = signal->senderBlockRef();
   OpDropEventPtr evntRecPtr;
 
+  if (refToBlock(senderRef) != DBDICT &&
+      getOwnNodeId() != c_masterNodeId)
+  {
+    jam();
+    releaseSections(signal);
+
+    DropEvntRef * ref = (DropEvntRef *)signal->getDataPtrSend();
+    ref->setUserRef(reference());
+    ref->setErrorCode(DropEvntRef::NotMaster);
+    ref->setErrorLine(__LINE__);
+    ref->setErrorNode(reference());
+    ref->setMasterNode(c_masterNodeId);
+    sendSignal(senderRef, GSN_DROP_EVNT_REF, signal,
+	       DropEvntRef::SignalLength2, JBB);
+    return;
+  }
+
   // Seize a Create Event record
   if (!c_opDropEvent.seize(evntRecPtr)) {
     // Failed to allocate event record
@@ -10352,7 +10419,6 @@
     ret->setErrorCode(evntRecPtr.p->m_errorCode);
     ret->setErrorLine(evntRecPtr.p->m_errorLine);
     ret->setErrorNode(evntRecPtr.p->m_errorNode);
-
     sendSignal(senderRef, GSN_DROP_EVNT_REF, signal,
 	       DropEvntRef::SignalLength, JBB);
   } else {

--- 1.60/storage/ndb/src/ndbapi/Ndb.cpp	2005-11-22 18:04:54 +01:00
+++ 1.61/storage/ndb/src/ndbapi/Ndb.cpp	2006-01-04 16:08:48 +01:00
@@ -1310,7 +1310,12 @@
 
 void Ndb::setReportThreshEventFreeMem(unsigned thresh)
 {
-  theEventBuffer->m_free_thresh= thresh;
+  if (theEventBuffer->m_free_thresh != thresh)
+  {
+    theEventBuffer->m_free_thresh= thresh;
+    theEventBuffer->m_min_free_thresh= thresh;
+    theEventBuffer->m_max_free_thresh= 100;
+  }
 }
 
 #ifdef VM_TRACE

--- 1.24/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2005-11-22 18:04:55 +01:00
+++ 1.25/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2006-01-04 16:30:20 +01:00
@@ -268,6 +268,7 @@
   m_state= EO_ERROR;
   mi_type= 0;
   m_magic_number= 0;
+  m_error.code= myDict->getNdbError().code;
   m_ndb->theEventBuffer->remove_op();
   m_ndb->theEventBuffer->add_drop_unlock();
   DBUG_RETURN(r);
@@ -670,7 +671,7 @@
 
   NdbMutex_Lock(m_mutex);
   NdbEventOperationImpl *ev_op= move_data();
-  if (unlikely(ev_op == 0))
+  if (unlikely(ev_op == 0 && aMillisecondNumber))
   {
     NdbCondition_WaitTimeout(p_cond, m_mutex, aMillisecondNumber);
     ev_op= move_data();
@@ -994,6 +995,33 @@
 }
 
 void
+NdbEventBuffer::report_node_failure(Uint32 node_id)
+{
+  DBUG_ENTER("NdbEventBuffer::report_node_failure");
+  SubTableData data;
+  LinearSectionPtr ptr[3];
+  bzero(&data, sizeof(data));
+  bzero(ptr, sizeof(ptr));
+
+  data.tableId = ~0;
+  data.operation = NdbDictionary::Event::_TE_NODE_FAILURE;
+  data.req_nodeid = (Uint8)node_id;
+  data.ndbd_nodeid = (Uint8)node_id;
+  data.logType = SubTableData::LOG;
+  /**
+   * Insert this event for each operation
+   */
+  NdbEventOperation* op= 0;
+  while((op = m_ndb->getEventOperation(op)))
+  {
+    NdbEventOperationImpl* impl= &op->m_impl;
+    data.senderData = impl->m_oid;
+    insertDataL(impl, &data, ptr); 
+  }
+  DBUG_VOID_RETURN;
+}
+
+void
 NdbEventBuffer::completeClusterFailed()
 {
   DBUG_ENTER("NdbEventBuffer::completeClusterFailed");
@@ -1376,7 +1404,7 @@
     m_min_free_thresh= m_free_thresh;
     m_max_free_thresh= 100;
     goto send_report;
- }
+  }
   if (latest_gci-apply_gci >=  m_gci_slip_thresh)
   {
     goto send_report;

--- 1.8/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2005-11-22 18:04:55 +01:00
+++ 1.9/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	2006-01-04 16:08:48 +01:00
@@ -221,7 +221,7 @@
   void execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep);
   void complete_outof_order_gcis();
   
-  void reportClusterFailed(NdbEventOperationImpl *op);
+  void report_node_failure(Uint32 node_id);
   void completeClusterFailed();
 
   // used by user thread 

--- 1.229/sql/ha_ndbcluster.cc	2006-01-03 16:01:34 +01:00
+++ 1.230/sql/ha_ndbcluster.cc	2006-01-04 16:30:20 +01:00
@@ -33,10 +33,7 @@
 #include <../util/Bitmask.hpp>
 #include <ndbapi/NdbIndexStat.hpp>
 
-#ifdef HAVE_NDB_BINLOG
-#include "rpl_injector.h"
-#include "slave.h"
-#endif
+#include "ha_ndbcluster_binlog.h"
 
 // options from from mysqld.cc
 extern my_bool opt_ndb_optimized_node_selection;
@@ -55,13 +52,9 @@
 // createable against NDB from this handler
 static const int max_transactions= 3; // should really be 2 but there is a transaction to
much allocated when loch table is used
 
-static const char *ha_ndb_ext=".ndb";
-static const char share_prefix[]= "./";
-
-static int ndbcluster_close_connection(THD *thd);
-static int ndbcluster_commit(THD *thd, bool all);
-static int ndbcluster_rollback(THD *thd, bool all);
-static handler* ndbcluster_create_handler(TABLE_SHARE *table);
+static bool ndbcluster_init(void);
+static int ndbcluster_end(ha_panic_function flag);
+static bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
 
 handlerton ndbcluster_hton = {
   "ndbcluster",
@@ -69,31 +62,7 @@
   "Clustered, fault-tolerant, memory-based tables", 
   DB_TYPE_NDBCLUSTER,
   ndbcluster_init,
-  0, /* slot */
-  0, /* savepoint size */
-  ndbcluster_close_connection,
-  NULL, /* savepoint_set */
-  NULL, /* savepoint_rollback */
-  NULL, /* savepoint_release */
-  ndbcluster_commit,
-  ndbcluster_rollback,
-  NULL, /* prepare */
-  NULL, /* recover */
-  NULL, /* commit_by_xid */
-  NULL, /* rollback_by_xid */
-  NULL, /* create_cursor_read_view */
-  NULL, /* set_cursor_read_view */
-  NULL, /* close_cursor_read_view */
-  ndbcluster_create_handler, /* Create a new handler */
-  ndbcluster_drop_database, /* Drop a database */
-  ndbcluster_end, /* Panic call */
-  NULL, /* Release temporary latches */
-  NULL, /* Update Statistics */
-  NULL, /* Start Consistent Snapshot */
-  NULL, /* Flush logs */
-  ndbcluster_show_status, /* Show status */
-  NULL, /* Replication Report Sent Binlog */
-  HTON_NO_FLAGS
+  ~(uint)0, /* slot */
 };
 
 static handler *ndbcluster_create_handler(TABLE_SHARE *table)
@@ -126,38 +95,24 @@
   break;                                 \
 }
 
-// Typedefs for long names
-typedef NdbDictionary::Object NDBOBJ;
-typedef NdbDictionary::Column NDBCOL;
-typedef NdbDictionary::Table NDBTAB;
-typedef NdbDictionary::Index  NDBINDEX;
-typedef NdbDictionary::Dictionary  NDBDICT;
-typedef NdbDictionary::Event NDBEVENT;
-
 static int ndbcluster_inited= 0;
-static int ndbcluster_util_inited= 0;
+int ndbcluster_util_inited= 0;
 
 static Ndb* g_ndb= NULL;
-static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+unsigned char g_node_id_map[max_ndb_nodes];
 
 // Handler synchronization
 pthread_mutex_t ndbcluster_mutex;
 
 // Table lock handling
-static HASH ndbcluster_open_tables;
+HASH ndbcluster_open_tables;
 
 static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
                                 my_bool not_used __attribute__((unused)));
-static NDB_SHARE *get_share(const char *key,
-                            bool create_if_not_exists= TRUE,
-                            bool have_lock= FALSE);
 #ifdef HAVE_NDB_BINLOG
-/* you should have lock on ndbcluster_mutex when calling */
-static int handle_trailing_share(NDB_SHARE *share);
-static int rename_share(NDB_SHARE *share, const char *new_key);
+static int rename_share(NDB_SHARE *share, const char *new_key, bool have_lock);
 #endif
-static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
-static void real_free_share(NDB_SHARE **share);
 static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
 
 static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
@@ -167,35 +122,9 @@
 static int ndb_get_table_statistics(Ndb*, const char *, 
                                     struct Ndb_statistics *);
 
-#ifndef DBUG_OFF
-void print_records(TABLE *table, const char *record)
-{
-  if (_db_on_)
-  {
-    for (uint j= 0; j < table->s->fields; j++)
-    {
-      char buf[40];
-      int pos= 0;
-      Field *field= table->field[j];
-      const byte* field_ptr= field->ptr - table->record[0] + record;
-      int pack_len= field->pack_length();
-      int n= pack_len < 10 ? pack_len : 10;
-      
-      for (int i= 0; i < n && pos < 20; i++)
-      {
-	pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
-      }
-      buf[pos]= 0;
-      DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
-    }
-  }
-}
-#else
-#define print_records(a,b)
-#endif
 
 // Util thread variables
-static pthread_t ndb_util_thread;
+pthread_t ndb_util_thread;
 pthread_mutex_t LOCK_ndb_util_thread;
 pthread_cond_t COND_ndb_util_thread;
 pthread_handler_t ndb_util_thread_func(void *arg);
@@ -207,75 +136,6 @@
 */
 static uint32 dummy_buf;
 
-#ifdef HAVE_NDB_BINLOG
-#define INJECTOR_EVENT_LEN 200
-/* NDB Injector thread (used for binlog creation) */
-ulong ndb_report_thresh_binlog_epoch_slip;
-ulong ndb_report_thresh_binlog_mem_usage;
-static ulonglong ndb_latest_applied_binlog_epoch= 0;
-static ulonglong ndb_latest_handled_binlog_epoch= 0;
-static ulonglong ndb_latest_received_binlog_epoch= 0;
-static pthread_t ndb_binlog_thread;
-static int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
-                                          const char *db,
-                                          const char *table_name,
-                                          bool do_binlog,
-                                          NDB_SHARE *share= 0);
-static int ndbcluster_create_event(Ndb *ndb, const NDBTAB *table,
-                                   const char *event_name, NDB_SHARE *share);
-static int ndbcluster_create_event_ops(NDB_SHARE *share,
-                                       const NDBTAB *ndbtab,
-                                       const char *event_name);
-static int ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
-                                        NDB_SHARE *share);
-static void ndb_rep_event_name(String *event_name,
-                               const char *db, const char *tbl);
-#ifndef DBUG_OFF
-static void dbug_print_table(const char *info, TABLE *table);
-#endif
-static int ndbcluster_binlog_start();
-pthread_handler_t ndb_binlog_thread_func(void *arg);
-
-/*
-  Mutex and condition used for interacting between client sql thread
-  and injector thread
-*/
-pthread_mutex_t injector_mutex;
-pthread_cond_t  injector_cond;
-/*
-  Flag showing if the ndb injector thread is running, if so == 1
-*/
-static int ndb_binlog_thread_running= 0;
-
-/*
-  table cluster_replication.apply_status
-*/
-static int ndbcluster_create_apply_status_table(THD *thd);
-static NDB_SHARE *ndbcluster_check_apply_status_share();
-static NDB_SHARE *ndbcluster_get_apply_status_share();
-static NDB_SHARE *apply_status_share= 0;
-
-/*
-  Global reference to the ndb injector thread THD oject
-
-  Has one sole purpose, for setting the in_use table member variable
-  in get_share(...)
-*/
-static THD *injector_thd= 0;
-
-/*
-  Global reference to ndb injector thd object.
-
-  Used mainly by the binlog index thread, but exposed to the client sql
-  thread for one reason; to setup the events operations for a table
-  to enable ndb injector thread receiving events.
-
-  Must therefore always be used with a surrounding
-  pthread_mutex_lock(&injector_mutex), when doing create/dropEventOperation
-*/
-static Ndb *injector_ndb= 0;
-#endif /* HAVE_NDB_BINLOG */
-
 /*
   Stats that can be retrieved from ndb
 */
@@ -293,7 +153,7 @@
 static const char * ndb_connected_host= 0;
 static long ndb_connected_port= 0;
 static long ndb_number_of_replicas= 0;
-static long ndb_number_of_storage_nodes= 0;
+long ndb_number_of_storage_nodes= 0;
 
 static int update_status_variables(Ndb_cluster_connection *c)
 {
@@ -314,9 +174,6 @@
   {NullS, NullS, SHOW_LONG}
 };
 
-/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
-extern Uint64 g_latest_trans_gci;
-
 /*
   Error handling functions
 */
@@ -444,6 +301,7 @@
   all= NULL;
   stmt= NULL;
   error= 0;
+  options= 0;
 }
 
 Thd_ndb::~Thd_ndb()
@@ -469,14 +327,6 @@
 }
 
 inline
-Thd_ndb *
-get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; }
-
-inline
-void
-set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb;
}
-
-inline
 Ndb *ha_ndbcluster::get_ndb()
 {
   return get_thd_ndb(current_thd)->ndb;
@@ -2595,8 +2445,8 @@
     set to null.
 */
 
-static void ndb_unpack_record(TABLE *table, NdbValue *value,
-                              MY_BITMAP *defined, byte *buf)
+void ndb_unpack_record(TABLE *table, NdbValue *value,
+                       MY_BITMAP *defined, byte *buf)
 {
   Field **p_field= table->field, *field= *p_field;
   uint row_offset= (uint) (buf - table->record[0]);
@@ -3669,7 +3519,7 @@
   Commit a transaction started in NDB
  */
 
-int ndbcluster_commit(THD *thd, bool all)
+static int ndbcluster_commit(THD *thd, bool all)
 {
   int res= 0;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -3720,7 +3570,7 @@
   Rollback a transaction started in NDB
  */
 
-int ndbcluster_rollback(THD *thd, bool all)
+static int ndbcluster_rollback(THD *thd, bool all)
 {
   int res= 0;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -4069,7 +3919,8 @@
 #ifdef HAVE_NDB_BINLOG
     ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname,
                                    ndb_binlog_thread_running > 0 &&
-                                   !is_prefix(m_tabname, tmp_file_prefix));
+                                   !is_prefix(m_tabname, tmp_file_prefix),
+                                   0, TRUE);
 #endif /* HAVE_NDB_BINLOG */
     DBUG_RETURN(my_errno);
   }
@@ -4230,7 +4081,7 @@
       uint length= (uint) strlen(key);
       if ((share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
                                            (byte*) key, length)))
-        handle_trailing_share(share);
+        handle_trailing_share(share, TRUE);
     }
     /*
       get a new share
@@ -4273,8 +4124,12 @@
         sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
                         " Event: %s", name2);
         /* a warning has been issued to the client */
-        break;
       }
+      ndbcluster_log_schema_op(current_thd, share,
+                               current_thd->query, current_thd->query_length,
+                               share->db, share->table_name,
+                               0, 0,
+                               SOT_CREATE_TABLE);
       break;
     }
   }
@@ -4379,7 +4234,7 @@
   if (ndb_binlog_thread_running > 0 &&
       (share= get_share(from, false)))
   {
-    int r= rename_share(share, to);
+    int r= rename_share(share, to, TRUE);
     DBUG_ASSERT(r == 0);
   }
 #endif
@@ -4393,7 +4248,7 @@
 #ifdef HAVE_NDB_BINLOG
     if (share)
     {
-      int r= rename_share(share, from);
+      int r= rename_share(share, from, TRUE);
       DBUG_ASSERT(r == 0);
       free_share(&share);
     }
@@ -4413,12 +4268,14 @@
   }
 
 #ifdef HAVE_NDB_BINLOG
+  int is_old_table_tmpfile= 1;
   if (share && share->op)
     dict->forceGCPWait();
 
   /* handle old table */
   if (!is_prefix(m_tabname, tmp_file_prefix))
   {
+    is_old_table_tmpfile= 0;
     String event_name(INJECTOR_EVENT_LEN);
     ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
     ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share);
@@ -4455,6 +4312,18 @@
                           "Creating event for logging table failed. "
                           "See error log for details.");
     }
+    if (is_old_table_tmpfile)
+      ndbcluster_log_schema_op(current_thd, share,
+                               current_thd->query, current_thd->query_length,
+                               share->db, share->table_name,
+                               0, 0,
+                               SOT_ALTER_TABLE);
+    else
+      ndbcluster_log_schema_op(current_thd, share,
+                               current_thd->query, current_thd->query_length,
+                               share->db, share->table_name,
+                               0, 0,
+                               SOT_RENAME_TABLE);
   }
   if (share)
     free_share(&share);
@@ -4553,7 +4422,16 @@
   */
   int table_dropped= dict->getNdbError().code != 709;
 
-  if (table_dropped && share && share->op)
+  if (!is_prefix(table_name, tmp_file_prefix) && share)
+  {
+    ndbcluster_log_schema_op(current_thd, share,
+                             current_thd->query, current_thd->query_length,
+                             share->db, share->table_name,
+                             0, 0,
+                             SOT_DROP_TABLE);
+  }
+  else if (table_dropped && share && share->op) /*
ndbcluster_log_schema_op
+                                                   will do a force GCP */
     dict->forceGCPWait();
 
   if (!is_prefix(table_name, tmp_file_prefix))
@@ -4906,7 +4784,7 @@
 }
 
 
-int ndbcluster_close_connection(THD *thd)
+static int ndbcluster_close_connection(THD *thd)
 {
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   DBUG_ENTER("ndbcluster_close_connection");
@@ -5069,14 +4947,21 @@
   DBUG_RETURN(ret);      
 }
 
-void ndbcluster_drop_database(char *path)
+static void ndbcluster_drop_database(char *path)
 {
   ndbcluster_drop_database_impl(path);
+#ifdef HAVE_NDB_BINLOG
+  char db[FN_REFLEN];
+  ha_ndbcluster::set_dbname(path, db);
+  ndbcluster_log_schema_op(current_thd, 0,
+                           current_thd->query, current_thd->query_length,
+                           db, "", 0, 0, SOT_DROP_DB);
+#endif
 }
 /*
   find all tables in ndb and discover those needed
 */
-static int ndbcluster_find_all_files(THD *thd)
+int ndbcluster_find_all_files(THD *thd)
 {
   DBUG_ENTER("ndbcluster_find_all_files");
   Ndb* ndb;
@@ -5111,10 +4996,11 @@
 
       if (!(ndbtab= dict->getTable(elmt.name)))
       {
-        sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
-                        elmt.database, elmt.name,
-                        dict->getNdbError().code,
-                        dict->getNdbError().message);
+        if (elmt.state == NDBOBJ::StateOnline)
+          sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+                          elmt.database, elmt.name,
+                          dict->getNdbError().code,
+                          dict->getNdbError().message);
         unhandled++;
         continue;
       }
@@ -5173,7 +5059,7 @@
                                          ndb_binlog_thread_running > 0 &&
                                          !is_prefix(elmt.name,
                                                     tmp_file_prefix),
-                                         share);
+                                         share, FALSE);
         }
         else
           pthread_mutex_unlock(&ndbcluster_mutex);
@@ -5311,7 +5197,7 @@
         pthread_mutex_unlock(&ndbcluster_mutex);
         ndbcluster_create_binlog_setup(ndb, name, db, file_name,
                                        !is_prefix(file_name, tmp_file_prefix),
-                                       share);
+                                       share, FALSE);
         pthread_mutex_lock(&ndbcluster_mutex);
       }
     }
@@ -5391,11 +5277,18 @@
 static int connect_callback()
 {
   update_status_variables(g_ndb_cluster_connection);
+
+  uint node_id, i= 0;
+  Ndb_cluster_connection_node_iter node_iter;
+  memset((void *)g_node_id_map, 0xFFFF, sizeof(g_node_id_map));
+  while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter)))
+    g_node_id_map[node_id]= i++;
+
   pthread_cond_signal(&COND_ndb_util_thread);
   return 0;
 }
 
-bool ndbcluster_init()
+static bool ndbcluster_init()
 {
   int res;
   DBUG_ENTER("ndbcluster_init");
@@ -5403,6 +5296,21 @@
   if (have_ndbcluster != SHOW_OPTION_YES)
     goto ndbcluster_init_error;
 
+  {
+    handlerton &h= ndbcluster_hton;
+    h.close_connection= ndbcluster_close_connection;
+    h.commit=           ndbcluster_commit;
+    h.rollback=         ndbcluster_rollback;
+    h.create=           ndbcluster_create_handler; /* Create a new handler */
+    h.drop_database=    ndbcluster_drop_database;  /* Drop a database */
+    h.panic=            ndbcluster_end;            /* Panic call */
+    h.show_status=      ndbcluster_show_status;    /* Show status */
+#ifdef HAVE_NDB_BINLOG
+    ndbcluster_binlog_init_handlerton();
+#endif
+    h.flags=            HTON_NO_FLAGS;
+  }
+
   // Set connectstring if specified
   if (opt_ndbcluster_connectstring != 0)
     DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));     
@@ -5512,72 +5420,7 @@
   DBUG_RETURN(TRUE);
 }
 
-
-/*
-  End use of the NDB Cluster table handler
-  - free all global variables allocated by 
-    ndbcluster_init()
-*/
-
-int ndbcluster_binlog_end()
-{
-  DBUG_ENTER("ndb_binlog_end");
-
-  if (!ndbcluster_util_inited)
-    DBUG_RETURN(0);
-
-  // Kill ndb utility thread
-  (void) pthread_mutex_lock(&LOCK_ndb_util_thread);  
-  DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
-  (void) pthread_cond_signal(&COND_ndb_util_thread);
-  (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
-
-#ifdef HAVE_NDB_BINLOG
-  /* wait for injector thread to finish */
-  if (ndb_binlog_thread_running > 0)
-  {
-    pthread_mutex_lock(&injector_mutex);
-    while (ndb_binlog_thread_running > 0)
-    {
-      struct timespec abstime;
-      set_timespec(abstime, 1);
-      pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
-    }
-    pthread_mutex_unlock(&injector_mutex);
-  }
-
-  /* remove all shares */
-  {
-    pthread_mutex_lock(&ndbcluster_mutex);
-    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
-    {
-      NDB_SHARE *share=
-        (NDB_SHARE*) hash_element(&ndbcluster_open_tables, i);
-      if (share->table)
-        DBUG_PRINT("share",
-                   ("table->s->db.table_name: %s.%s",
-                    share->table->s->db.str,
share->table->s->table_name.str));
-      if (share->state != NSS_DROPPED && !--share->use_count)
-        real_free_share(&share);
-      else
-      {
-        DBUG_PRINT("share",
-                   ("[%d] 0x%lx  key: %s  key_length: %d",
-                    i, share, share->key, share->key_length));
-        DBUG_PRINT("share",
-                   ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
-                    share->db, share->table_name,
-                    share->use_count, share->commit_count));
-      }
-    }
-    pthread_mutex_unlock(&ndbcluster_mutex);
-  }
-#endif
-  ndbcluster_util_inited= 0;
-  DBUG_RETURN(0);
-}
-
-int ndbcluster_end(ha_panic_function type)
+static int ndbcluster_end(ha_panic_function type)
 {
   DBUG_ENTER("ndbcluster_end");
 
@@ -6076,60 +5919,6 @@
 }
 
 
-#ifndef DBUG_OFF
-static void dbug_print_table(const char *info, TABLE *table)
-{
-  if (table == 0)
-  {
-    DBUG_PRINT("info",("%s: (null)", info));
-    return;
-  }
-  DBUG_PRINT("info",
-             ("%s: %s.%s s->fields: %d  "
-              "reclength: %d  rec_buff_length: %d  record[0]: %lx  "
-              "record[1]: %lx",
-              info,
-              table->s->db.str,
-              table->s->table_name.str,
-              table->s->fields,
-              table->s->reclength,
-              table->s->rec_buff_length,
-              table->record[0],
-              table->record[1]));
-
-  for (unsigned int i= 0; i < table->s->fields; i++) 
-  {
-    Field *f= table->field[i];
-    DBUG_PRINT("info",
-               ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d  pack_length: %d  "
-                "ptr: 0x%lx[+%d]  null_bit: %u  null_ptr: 0x%lx[+%d]",
-                i,
-                f->field_name,
-                f->flags,
-                (f->flags & PRI_KEY_FLAG)  ? "pri"       : "attr",
-                (f->flags & NOT_NULL_FLAG) ? ""          : ",nullable",
-                (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
-                (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
-                (f->flags & BLOB_FLAG)     ? ",blob"     : "",
-                (f->flags & BINARY_FLAG)   ? ",binary"   : "",
-                f->real_type(),
-                f->pack_length(),
-                f->ptr, f->ptr - table->record[0],
-                f->null_bit,
-                f->null_ptr, (byte*) f->null_ptr - table->record[0]));
-    if (f->type() == MYSQL_TYPE_BIT)
-    {
-      Field_bit *g= (Field_bit*) f;
-      DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d  bit_ptr: 0x%lx[+%d] "
-                                   "bit_ofs: %u  bit_len: %u",
-                                   g->field_length, g->bit_ptr,
-                                   (byte*) g->bit_ptr-table->record[0],
-                                   g->bit_ofs, g->bit_len));
-    }
-  }
-}
-#endif
-
 /*
   Handling the shared NDB_SHARE structure that is needed to
   provide table locking.
@@ -6183,7 +5972,7 @@
   
   Must be called with previous pthread_mutex_lock(&ndbcluster_mutex)
 */
-static int handle_trailing_share(NDB_SHARE *share)
+int handle_trailing_share(NDB_SHARE *share, bool have_lock)
 {
   static ulong trailing_share_id= 0;
   DBUG_ENTER("handle_trailing_share");
@@ -6191,7 +5980,7 @@
   ++share->use_count;
   pthread_mutex_unlock(&ndbcluster_mutex);
 
-  close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
+  close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, have_lock);
 
   pthread_mutex_lock(&ndbcluster_mutex);
   if (!--share->use_count)
@@ -6253,7 +6042,7 @@
 /*
   Rename share is used during rename table.
 */
-static int rename_share(NDB_SHARE *share, const char *new_key)
+static int rename_share(NDB_SHARE *share, const char *new_key, bool have_lock)
 {
   NDB_SHARE *tmp;
   pthread_mutex_lock(&ndbcluster_mutex);
@@ -6262,7 +6051,7 @@
                               share->key, share->key_length));
   if ((tmp= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
                                      (byte*) new_key, new_length)))
-    handle_trailing_share(tmp);
+    handle_trailing_share(tmp, have_lock);
 
   /* remove the share from hash */
   hash_delete(&ndbcluster_open_tables, (byte*) share);
@@ -6335,7 +6124,7 @@
   Increase refcount on existing share.
   Always returns share and cannot fail.
 */
-static NDB_SHARE *get_share(NDB_SHARE *share)
+NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share)
 {
   pthread_mutex_lock(&ndbcluster_mutex);
   share->use_count++;
@@ -6367,8 +6156,8 @@
 
   have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
 */
-static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
-                            bool have_lock)
+NDB_SHARE *ndbcluster_get_share(const char *key, bool create_if_not_exists,
+                                bool have_lock)
 {
   DBUG_ENTER("get_share");
   DBUG_PRINT("info", ("get_share: key %s", key));
@@ -6419,70 +6208,7 @@
       share->table_name= share->db + strlen(share->db) + 1;
       ha_ndbcluster::set_tabname(key, share->table_name);
 #ifdef HAVE_NDB_BINLOG
-      share->op= 0;
-      share->table= 0;
-      while (ndb_binlog_thread_running > 0)
-      {
-        TABLE_SHARE *table_share= 
-          (TABLE_SHARE *) my_malloc(sizeof(*table_share), MYF(MY_WME));
-        TABLE *table= (TABLE*) my_malloc(sizeof(*table), MYF(MY_WME));
-        int error;
-
-        init_tmp_table_share(table_share, share->db, 0, share->table_name, 
-                             share->key);
-        if ((error= open_table_def(thd, table_share, 0)))
-        {
-          sql_print_error("Unable to get table share for %s, error=%d",
-                          share->key, error);
-          DBUG_PRINT("error", ("open_table_def failed %d", error));
-          my_free((gptr) table_share, MYF(0));
-          table_share= 0;
-          my_free((gptr) table, MYF(0));
-          table= 0;
-          break;
-        }
-        if ((error= open_table_from_share(thd, table_share, "", 0, 
-                                          (uint) READ_ALL, 0, table)))
-        {
-          sql_print_error("Unable to open table for %s, error=%d(%d)",
-                          share->key, error, my_errno);
-          DBUG_PRINT("error", ("open_table_from_share failed %d", error));
-          my_free((gptr) table_share, MYF(0));
-          table_share= 0;
-          my_free((gptr) table, MYF(0));
-          table= 0;
-          break;
-        }
-        assign_new_table_id(table);
-        if (!table->record[1] || table->record[1] == table->record[0])
-        {
-          table->record[1]= alloc_root(&table->mem_root,
-                                       table->s->rec_buff_length);
-        }
-        table->in_use= injector_thd;
-        
-        table->s->db.str= share->db;
-        table->s->db.length= strlen(share->db);
-        table->s->table_name.str= share->table_name;
-        table->s->table_name.length= strlen(share->table_name);
- 
-        share->table_share= table_share;
-        share->table= table;
-#ifndef DBUG_OFF
-        dbug_print_table("table", table);
-#endif
-        /*
-          ! do not touch the contents of the table
-          it may be in use by the injector thread
-	*/
-        share->ndb_value[0]= (NdbValue*)
-          alloc_root(*root_ptr, sizeof(NdbValue) * table->s->fields
-                     + 1 /*extra for hidden key*/);
-        share->ndb_value[1]= (NdbValue*)
-          alloc_root(*root_ptr, sizeof(NdbValue) * table->s->fields
-                     +1 /*extra for hidden key*/);
-        break;
-      }
+      ndbcluster_binlog_init_share(share);
 #endif
       *root_ptr= old_root;
     }
@@ -6511,7 +6237,7 @@
   return share;
 }
 
-static void real_free_share(NDB_SHARE **share)
+void ndbcluster_real_free_share(NDB_SHARE **share)
 {
   DBUG_PRINT("real_free_share",
              ("0x%lx key: %s  key_length: %d",
@@ -6558,7 +6284,7 @@
 
   have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
 */
-static void free_share(NDB_SHARE **share, bool have_lock)
+void ndbcluster_free_share(NDB_SHARE **share, bool have_lock)
 {
   if (!have_lock)
     pthread_mutex_lock(&ndbcluster_mutex);
@@ -6584,7 +6310,6 @@
 }
 
 
-
 /*
   Internal representation of the frm blob
    
@@ -7258,7 +6983,7 @@
     Wait for cluster to start
   */
   pthread_mutex_lock(&LOCK_ndb_util_thread);
-  while (!ndb_cluster_node_id)
+  while (!ndb_cluster_node_id && (ndbcluster_hton.slot != ~(uint)0))
   {
     /* ndb not connected yet */
     set_timespec(abstime, 1);
@@ -7273,14 +6998,25 @@
   }
   pthread_mutex_unlock(&LOCK_ndb_util_thread);
 
+  {
+    Thd_ndb *thd_ndb;
+    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+    {
+      sql_print_error("Could not allocate Thd_ndb object");
+      goto ndb_util_thread_end;
+    }
+    set_thd_ndb(thd, thd_ndb);
+    thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
+  }
+
+#ifdef HAVE_NDB_BINLOG
+  /* create tables needed by the replication */
+  ndbcluster_setup_binlog_table_shares(thd);
+#else
   /*
     Get all table definitions from the storage node
   */
   ndbcluster_find_all_files(thd);
-
-#ifdef HAVE_NDB_BINLOG
-  /* create tables needed by the replication */
-  ndbcluster_create_apply_status_table(thd);
 #endif
 
   ndbcluster_util_inited= 1;
@@ -7309,15 +7045,11 @@
 
 #ifdef HAVE_NDB_BINLOG
     /*
-      Check that the apply_status_share has been created.
+      Check that the apply_status_share and schema_share has been created.
       If not try to create it
     */
-    if (!apply_status_share &&
-        ndbcluster_check_apply_status_share() == 0)
-    {
-      ndbcluster_find_all_files(thd);
-      ndbcluster_create_apply_status_table(thd);
-    }
+    if (!apply_status_share || !schema_share)
+      ndbcluster_setup_binlog_table_shares(thd);
 #endif
 
     if (ndb_cache_check_time == 0)
@@ -7425,6 +7157,7 @@
     }
   }
 ndb_util_thread_end:
+  sql_print_information("Stopping Cluster Utility thread");
   net_end(&thd->net);
   thd->cleanup();
   delete thd;
@@ -8770,7 +8503,6 @@
                        enum ha_stat_type stat_type)
 {
   char buf[IO_SIZE];
-  ulonglong ndb_latest_epoch= 0;
   DBUG_ENTER("ndbcluster_show_status");
   
   if (have_ndbcluster != SHOW_OPTION_YES) 
@@ -8809,1449 +8541,11 @@
     }
   }
 #ifdef HAVE_NDB_BINLOG
-  pthread_mutex_lock(&injector_mutex);
-  if (injector_ndb)
-  {
-    ndb_latest_epoch= injector_ndb->getLatestGCI();
-    pthread_mutex_unlock(&injector_mutex);
-
-    snprintf(buf, sizeof(buf),
-             "latest_epoch=%llu, "
-             "latest_trans_epoch=%llu, "
-             "latest_received_binlog_epoch=%llu, "
-             "latest_handled_binlog_epoch=%llu, "
-             "latest_applied_binlog_epoch=%llu",
-             ndb_latest_epoch,
-             g_latest_trans_gci,
-             ndb_latest_received_binlog_epoch,
-             ndb_latest_handled_binlog_epoch,
-             ndb_latest_applied_binlog_epoch);
-    if (stat_print(thd, ndbcluster_hton.name, "binlog", buf))
-      DBUG_RETURN(TRUE);
-  }
-  else
-    pthread_mutex_unlock(&injector_mutex);
+  ndbcluster_show_status_binlog(thd, stat_print, stat_type);
 #endif
 
   DBUG_RETURN(FALSE);
 }
-
-#ifdef HAVE_NDB_BINLOG
-
-/*
-  Run a query through mysql_parse
-
-  Used to:
-  - purging the cluster_replication.binlog_index
-  - creating the cluster_replication.apply_status table
-*/
-static void run_query(THD *thd, char *buf, char *end, my_bool print_error)
-{
-  ulong save_query_length= thd->query_length;
-  char *save_query= thd->query;
-  ulong save_thread_id= thd->variables.pseudo_thread_id;
-  NET save_net= thd->net;
-
-  bzero((char*) &thd->net, sizeof(NET));
-  thd->query_length= end - buf;
-  thd->query= buf;
-  thd->variables.pseudo_thread_id= thread_id;
-  DBUG_PRINT("query", ("%s", thd->query));
-
-  mysql_parse(thd, thd->query, thd->query_length);
-
-  if (print_error && thd->query_error)
-  {
-    sql_print_error("NDB: %s: error %s %d %d %d",
-                    buf, thd->net.last_error, thd->net.last_errno,
-                    thd->net.report_error, thd->query_error);
-  }
-
-  thd->query_length= save_query_length;
-  thd->query= save_query;
-  thd->variables.pseudo_thread_id= save_thread_id;
-  thd->net= save_net;
-}
-
-
-/*********************************************************************
-  Internal helper functions for handeling of the cluster replication tables
-  - cluster_replication.binlog_index
-  - cluster_replication.apply_status
-*********************************************************************/
-
-/*
-  defines for cluster replication table names
-*/
-#define NDB_REP_DB      "cluster_replication"
-#define NDB_REP_TABLE   "binlog_index"
-#define NDB_APPLY_TABLE "apply_status"
-#define NDB_APPLY_TABLE_FILE "./" NDB_REP_DB "/" NDB_APPLY_TABLE
-
-/*
-  Global variables for holding the binlog_index table reference
-*/
-TABLE *binlog_index= 0;
-TABLE_LIST binlog_tables;
-
-/*
-  struct to hold the data to be inserted into the
-  cluster_replication.binlog_index table
-*/
-struct Binlog_index_row {
-  longlong gci;
-  const char *master_log_file;
-  longlong master_log_pos;
-  longlong n_inserts;
-  longlong n_updates;
-  longlong n_deletes;
-  longlong n_schemaops;
-};
-
-/*
-  Open the cluster_replication.binlog_index table
-*/
-static int open_binlog_index(THD *thd, TABLE_LIST *tables,
-                             TABLE **binlog_index)
-{
-  static char repdb[]= NDB_REP_DB;
-  static char reptable[]= NDB_REP_TABLE;
-  const char *save_proc_info= thd->proc_info;
-
-  bzero((char*) tables, sizeof(*tables));
-  tables->db= repdb;
-  tables->alias= tables->table_name= reptable;
-  tables->lock_type= TL_WRITE;
-  thd->proc_info= "Opening " NDB_REP_DB "." NDB_REP_TABLE;
-  tables->required_type= FRMTYPE_TABLE;
-  uint counter;
-  thd->clear_error();
-  if (open_tables(thd, &tables, &counter, MYSQL_LOCK_IGNORE_FLUSH))
-  {
-    sql_print_error("NDB Binlog: Opening binlog_index: %d, '%s'",
-                    thd->net.last_errno,
-                    thd->net.last_error ? thd->net.last_error : "");
-    thd->proc_info= save_proc_info;
-    return -1;
-  }
-  *binlog_index= tables->table;
-  thd->proc_info= save_proc_info;
-  return 0;
-}
-
-/*
-  Insert one row in the cluster_replication.binlog_index
-
-  declared friend in handler.h to be able to call write_row directly
-  so that this insert is not replicated
-*/
-int ndb_add_binlog_index(THD *thd, void *_row)
-{
-  Binlog_index_row &row= *(Binlog_index_row *) _row;
-  int error= 0;
-  bool need_reopen;
-  for ( ; ; ) /* loop for need_reopen */
-  {
-    if (!binlog_index && open_binlog_index(thd, &binlog_tables,
&binlog_index))
-    {
-      error= -1;
-      goto add_binlog_index_err;
-    }
-
-    if (lock_tables(thd, &binlog_tables, 1, &need_reopen))
-    {
-      if (need_reopen)
-      {
-        close_tables_for_reopen(thd, &binlog_tables);
-	binlog_index= 0;
-        continue;
-      }
-      sql_print_error("NDB Binlog: Unable to lock table binlog_index");
-      error= -1;
-      goto add_binlog_index_err;
-    }
-    break;
-  }
-
-  binlog_index->field[0]->store(row.master_log_pos);
-  binlog_index->field[1]->store(row.master_log_file,
-                                strlen(row.master_log_file),
-                                &my_charset_bin);
-  binlog_index->field[2]->store(row.gci);
-  binlog_index->field[3]->store(row.n_inserts);
-  binlog_index->field[4]->store(row.n_updates);
-  binlog_index->field[5]->store(row.n_deletes);
-  binlog_index->field[6]->store(row.n_schemaops);
-
-  int r;
-  if ((r= binlog_index->file->write_row(binlog_index->record[0])))
-  {
-    sql_print_error("NDB Binlog: Writing row to binlog_index: %d", r);
-    error= -1;
-    goto add_binlog_index_err;
-  }
-
-  mysql_unlock_tables(thd, thd->lock);
-  thd->lock= 0;
-  return 0;
-add_binlog_index_err:
-  close_thread_tables(thd);
-  binlog_index= 0;
-  return error;
-}
-
-/*
-  check the availability af the cluster_replication.apply_status share
-  - return share, but do not increase refcount
-  - return 0 if there is no share
-*/
-static NDB_SHARE *ndbcluster_check_apply_status_share()
-{
-  pthread_mutex_lock(&ndbcluster_mutex);
-
-  void *share= hash_search(&ndbcluster_open_tables, 
-                           NDB_APPLY_TABLE_FILE,
-                           sizeof(NDB_APPLY_TABLE_FILE) - 1);
-  DBUG_PRINT("info",("ndbcluster_check_apply_status_share %s %p",
-                     NDB_APPLY_TABLE_FILE, share));
-  pthread_mutex_unlock(&ndbcluster_mutex);
-  return (NDB_SHARE*) share;
-}
-
-/*
-  Get the share for the cluster_replication.apply_status share
-
-  - return 0 if share does not exist
-*/
-static NDB_SHARE *ndbcluster_get_apply_status_share()
-{
-  return get_share(NDB_APPLY_TABLE_FILE, false);
-}
-
-/*
-  Create the cluster_replication.apply_status table
-*/
-static int ndbcluster_create_apply_status_table(THD *thd)
-{
-  DBUG_ENTER("ndbcluster_create_apply_status_table");
-
-  /*
-    Check if we already have the apply status table.
-    If so it should have been discovered at startup
-    and thus have a share
-  */
-
-  if (ndbcluster_check_apply_status_share())
-    DBUG_RETURN(0);
-
-  if (g_ndb_cluster_connection->get_no_ready() <= 0)
-    DBUG_RETURN(0);
-
-  char buf[1024], *end;
-
-  sql_print_information("NDB: Creating " NDB_REP_DB "." NDB_APPLY_TABLE);
-
-  /*
-    Check if apply status table exists in MySQL "dictionary"
-    if so, remove it since there is none in Ndb
-  */
-  {
-    strxnmov(buf, sizeof(buf),
-             mysql_data_home,
-             "/" NDB_REP_DB "/" NDB_APPLY_TABLE,
-             reg_ext, NullS);
-    unpack_filename(buf,buf);
-    my_delete(buf, MYF(0));
-  }
-
-  /*
-    Note, updating this table schema must be reflected in ndb_restore
-  */
-  end= strmov(buf, "CREATE TABLE IF NOT EXISTS "
-                   NDB_REP_DB "." NDB_APPLY_TABLE
-                   " ( server_id INT UNSIGNED NOT NULL,"
-                   " epoch BIGINT UNSIGNED NOT NULL, "
-                   " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
-
-  run_query(thd, buf, end, TRUE);
-
-  DBUG_RETURN(0);
-}
-
-
-/*********************************************************************
-  Functions for start, stop, wait for ndbcluster binlog thread
-*********************************************************************/
-
-static int do_ndbcluster_binlog_close_connection= 0;
-
-static int ndbcluster_binlog_start()
-{
-  DBUG_ENTER("ndbcluster_binlog_start");
-
-  pthread_mutex_init(&injector_mutex, MY_MUTEX_INIT_FAST);
-  pthread_cond_init(&injector_cond, NULL);
-
-  /* Create injector thread */
-  if (pthread_create(&ndb_binlog_thread, &connection_attrib,
-                     ndb_binlog_thread_func, 0))
-  {
-    DBUG_PRINT("error", ("Could not create ndb injector thread"));
-    pthread_cond_destroy(&injector_cond);
-    pthread_mutex_destroy(&injector_mutex);
-    DBUG_RETURN(-1);
-  }
-
-  /*
-    Wait for the ndb injector thread to finish starting up.
-  */
-  pthread_mutex_lock(&injector_mutex);
-  while (!ndb_binlog_thread_running)
-    pthread_cond_wait(&injector_cond, &injector_mutex);
-  pthread_mutex_unlock(&injector_mutex);
-  
-  if (ndb_binlog_thread_running < 0)
-    DBUG_RETURN(-1);
-
-  DBUG_RETURN(0);
-}
-
-static void ndbcluster_binlog_close_connection(THD *thd)
-{
-  DBUG_ENTER("ndbcluster_binlog_close_connection");
-  const char *save_info= thd->proc_info;
-  thd->proc_info= "ndbcluster_binlog_close_connection";
-  do_ndbcluster_binlog_close_connection= 1;
-  while (ndb_binlog_thread_running > 0)
-    sleep(1);
-  thd->proc_info= save_info;
-  DBUG_VOID_RETURN;
-}
-
-/*
-  called in mysql_show_binlog_events and reset_logs to make sure we wait for
-  all events originating from this mysql server to arrive in the binlog
-
-  Wait for the last epoch in which the last transaction is a part of.
-
-  Wait a maximum of 30 seconds.
-*/
-void ndbcluster_binlog_wait(THD *thd)
-{
-  if (ndb_binlog_thread_running > 0)
-  {
-    DBUG_ENTER("ndbcluster_binlog_wait");
-    const char *save_info= thd ? thd->proc_info : 0;
-    ulonglong wait_epoch= g_latest_trans_gci;
-    int count= 30;
-    if (thd)
-      thd->proc_info= "Waiting for ndbcluster binlog update to "
-	"reach current position";
-    while (count && ndb_binlog_thread_running > 0 &&
-           ndb_latest_handled_binlog_epoch < wait_epoch)
-    {
-      count--;
-      sleep(1);
-    }
-    if (thd)
-      thd->proc_info= save_info;
-    DBUG_VOID_RETURN;
-  }
-}
-
-/*****************************************************************
-  functions called from master sql client threads
-****************************************************************/
-
-/*
- Called from MYSQL_LOG::reset_logs in log.cc when binlog is emptied
-*/
-int ndbcluster_reset_logs(THD *thd)
-{
-  if (ndb_binlog_thread_running <= 0)
-    return 0;
-
-  DBUG_ENTER("ndbcluster_reset_logs");
-
-  /*
-    Wait for all events orifinating from this mysql server has
-    reached the binlog before continuing to reset
-  */
-  ndbcluster_binlog_wait(thd);
-
-  char buf[1024];
-  char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_REP_TABLE);
-
-  run_query(thd, buf, end, FALSE);
-
-  DBUG_RETURN(0);
-}
-
-/*
-  Called from MYSQL_LOG::purge_logs in log.cc when the binlog "file"
-  is removed
-*/
-
-int ndbcluster_binlog_index_purge_file(THD *thd, const char *file)
-{
-  if (ndb_binlog_thread_running <= 0)
-    return 0;
-
-  DBUG_ENTER("ndbcluster_binlog_index_purge_file");
-  DBUG_PRINT("enter", ("file: %s", file));
-
-  char buf[1024];
-  char *end= strmov(strmov(strmov(buf,
-                                  "DELETE FROM "
-                                  NDB_REP_DB "." NDB_REP_TABLE
-                                  " WHERE File='"), file), "'");
-
-  run_query(thd, buf, end, FALSE);
-
-  DBUG_RETURN(0);
-}
-
-/*****************************************************************
-  functions called from slave sql client threads
-****************************************************************/
-void ndbcluster_reset_slave(THD *thd)
-{
-  if (ndb_binlog_thread_running <= 0)
-    return;
-
-  DBUG_ENTER("ndbcluster_reset_slave");
-  char buf[1024];
-  char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
-  run_query(thd, buf, end, FALSE);
-  DBUG_VOID_RETURN;
-}
-
-
-/**************************************************************
-  Internal helper functions for creating/dropping ndb events
-  used by the client sql threads
-**************************************************************/
-static void
-ndb_rep_event_name(String *event_name,const char *db, const char *tbl)
-{
-  event_name->set_ascii("REPL$", 5);
-  event_name->append(db);
-  if (tbl)
-  {
-    event_name->append('/');
-    event_name->append(tbl);
-  }
-}
-
-/*
-  Common function for setting up everything for logging a table at
-  create/discover.
-*/
-static int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
-                                          const char *db,
-                                          const char *table_name,
-                                          bool do_binlog,
-                                          NDB_SHARE *share)
-{
-  DBUG_ENTER("ndbcluster_create_binlog_setup");
-
-  pthread_mutex_lock(&ndbcluster_mutex);
-
-  /* Handle any trailing share */
-  if (share == 0)
-  {
-    share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
-                                    (byte*) key, strlen(key));
-    if (share)
-      handle_trailing_share(share);
-  }
-  else
-    handle_trailing_share(share);
-  
-  /* Create share which is needed to hold replication information */
-  if (!(share= get_share(key, true, true)))
-  {
-    sql_print_error("NDB Binlog: "
-                    "allocating table share for %s failed", key);
-  }
-  pthread_mutex_unlock(&ndbcluster_mutex);
-
-  while (share && do_binlog)
-  {
-    /*
-      ToDo make sanity check of share so that the table is actually the same
-      I.e. we need to do open file from frm in this case
-      Currently awaiting this to be fixed in the 4.1 tree in the general
-      case
-    */
-
-    /* Create the event in NDB */
-    ndb->setDatabaseName(db);
-
-    NDBDICT *dict= ndb->getDictionary();
-    const NDBTAB *ndbtab= dict->getTable(table_name);
-    if (ndbtab == 0)
-    {
-      sql_print_information("NDB Binlog: Failed to get table %s from ndb: "
-                            "%s, %d", key, dict->getNdbError().message,
-                            dict->getNdbError().code);
-      break; // error
-    }
-    String event_name(INJECTOR_EVENT_LEN);
-    ndb_rep_event_name(&event_name, db, table_name);
-    /*
-      event should have been created by someone else,
-      but let's make sure, and create if it doesn't exist
-    */
-    if (!dict->getEvent(event_name.c_ptr()))
-    {
-      if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share))
-      {
-        sql_print_error("NDB Binlog: "
-                        "FAILED CREATE (DISCOVER) TABLE Event: %s",
-                        event_name.c_ptr());
-        break; // error
-      }
-      sql_print_information("NDB Binlog: "
-                            "CREATE (DISCOVER) TABLE Event: %s",
-                            event_name.c_ptr());
-    }
-    else
-      sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
-                            event_name.c_ptr());
-
-    /*
-      create the event operations for receiving logging events
-    */
-    if (ndbcluster_create_event_ops(share, ndbtab,
-                                    event_name.c_ptr()) < 0)
-    {
-      sql_print_error("NDB Binlog:"
-                      "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
-                      event_name.c_ptr());
-      /* a warning has been issued to the client */
-      DBUG_RETURN(0);
-    }
-    DBUG_RETURN(0);
-  }
-  DBUG_RETURN(-1);
-}
-
-static int
-ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
-                        const char *event_name, 
-                        NDB_SHARE *share)
-{
-  DBUG_ENTER("ndbcluster_create_event");
-  NDBDICT *dict= ndb->getDictionary();
-
-  if (!dict)
-  {
-    sql_print_error("NDB Binlog: could not setup binlog, "
-                    "Invalid NdbDictionary");
-    DBUG_RETURN(-1);
-  }
-
-  NDBEVENT my_event(event_name);
-  my_event.setTable(*ndbtab);
-  my_event.addTableEvent(NDBEVENT::TE_ALL);
-  if (share->table->s->primary_key == MAX_KEY)
-    /* No primary key, susbscribe for all attributes */
-    my_event.setReport(NDBEVENT::ER_ALL);
-  else
-    my_event.setReport(NDBEVENT::ER_UPDATED);
-  /* add all columns to the event */
-  int n_cols= ndbtab->getNoOfColumns();
-  for(int a= 0; a < n_cols; a++)
-    my_event.addEventColumn(a);
-
-  if (dict->createEvent(my_event)) // Add event to database
-  {
-#ifdef NDB_BINLOG_EXTRA_WARNINGS
-    /*
-      failed, print a warning
-    */
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                        dict->getNdbError().code,
-                        dict->getNdbError().message, "NDB");
-#endif
-    if (dict->getNdbError().classification != NdbError::SchemaObjectExists)
-    {
-      sql_print_error("NDB Binlog: Unable to create event in database. "
-                      "Event: %s  Error Code: %d  Message: %s", event_name,
-                      dict->getNdbError().code, dict->getNdbError().message);
-      DBUG_RETURN(-1);
-    }
-
-    /*
-      trailing event from before; an error, but try to correct it
-    */
-    if (dict->dropEvent(my_event.getName()))
-    {
-      sql_print_error("NDB Binlog: Unable to create event in database. "
-                      " Attempt to correct with drop failed. "
-                      "Event: %s Error Code: %d Message: %s",
-                      event_name,
-                      dict->getNdbError().code,
-                      dict->getNdbError().message);
-      DBUG_RETURN(-1);
-    }
-
-    /*
-      try to add the event again
-    */
-    if (dict->createEvent(my_event))
-    {
-      sql_print_error("NDB Binlog: Unable to create event in database. "
-                      " Attempt to correct with drop ok, but create failed. "
-                      "Event: %s Error Code: %d Message: %s",
-                      event_name,
-                      dict->getNdbError().code,
-                      dict->getNdbError().message);
-      DBUG_RETURN(-1);
-    }
-#ifdef NDB_BINLOG_EXTRA_WARNINGS
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                        0, "NDB Binlog: Removed trailing event",
-                        "NDB");
-#endif
-  }
-
-  DBUG_RETURN(0);
-}
-
-inline int is_ndb_compatible_type(Field *field)
-{
-  return
-    !(field->flags & BLOB_FLAG) &&
-    field->type() != MYSQL_TYPE_BIT &&
-    field->pack_length() != 0;
-}
-
-/*
-  - create eventOperations for receiving log events
-  - setup ndb recattrs for reception of log event data
-  - "start" the event operation
-
-  used at create/discover of tables
-*/
-static int
-ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
-                            const char *event_name)
-{
-  /*
-    we are in either create table or rename table so table should be
-    locked, hence we can work with the share without locks
-  */
-
-  DBUG_ENTER("ndbcluster_create_event_ops");
-
-  DBUG_ASSERT(share != 0);
-
-  if (share->op)
-  {
-    assert(share->op->getCustomData() == (void *) share);
-
-    DBUG_ASSERT(share->use_count > 1);
-    sql_print_error("NDB Binlog: discover reusing old ev op");
-    free_share(&share); // old event op already has reference
-    DBUG_RETURN(0);
-  }
-
-  TABLE *table= share->table;
-  if (table)
-  {
-    /*
-      Logging of blob tables is not yet implemented, it would require:
-      1. setup of events also on the blob attribute tables
-      2. collect the pieces of the blob into one from an epoch to
-         provide a full blob to binlog
-    */
-    if (table->s->blob_fields)
-    {
-      sql_print_error("NDB Binlog: logging of blob table %s "
-                      "is not supported", share->key);
-      DBUG_RETURN(0);
-    }
-  }
-
-  pthread_mutex_lock(&injector_mutex);
-  if (injector_ndb == 0)
-  {
-    pthread_mutex_unlock(&injector_mutex);
-    DBUG_RETURN(-1);
-  }
-
-  NdbEventOperation *op= injector_ndb->createEventOperation(event_name);
-  if (!op)
-  {
-    pthread_mutex_unlock(&injector_mutex);
-    sql_print_error("NDB Binlog: Creating NdbEventOperation failed for"
-                    " %s",event_name);
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                        injector_ndb->getNdbError().code,
-                        injector_ndb->getNdbError().message,
-                        "NDB");
-    DBUG_RETURN(-1);
-  }
-
-  int n_columns= ndbtab->getNoOfColumns();
-  int n_fields= table ? table->s->fields : 0;
-  for (int j= 0; j < n_columns; j++)
-  {
-    const char *col_name= ndbtab->getColumn(j)->getName();
-    NdbRecAttr *attr0, *attr1;
-    if (j < n_fields)
-    {
-      Field *f= share->table->field[j];
-      if (is_ndb_compatible_type(f))
-      {
-        DBUG_PRINT("info", ("%s compatible", col_name));
-        attr0= op->getValue(col_name, f->ptr);
-        attr1= op->getPreValue(col_name, (f->ptr-share->table->record[0]) +
-                               share->table->record[1]);
-      }
-      else
-      {
-        DBUG_PRINT("info", ("%s non compatible", col_name));
-        attr0= op->getValue(col_name);
-        attr1= op->getPreValue(col_name);
-      }
-    }
-    else
-    {
-      DBUG_PRINT("info", ("%s hidden key", col_name));
-      attr0= op->getValue(col_name);
-      attr1= op->getPreValue(col_name);
-    }
-    share->ndb_value[0][j].rec= attr0;
-    share->ndb_value[1][j].rec= attr1;
-  }
-  op->setCustomData((void *) share); // set before execute
-  share->op= op; // assign op in NDB_SHARE
-  if (op->execute())
-  {
-    share->op= NULL;
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG), 
-                        op->getNdbError().code, op->getNdbError().message,
-                        "NDB");
-    sql_print_error("NDB Binlog: ndbevent->execute failed for %s; %d %s",
-                    event_name,
-                    op->getNdbError().code, op->getNdbError().message);
-    injector_ndb->dropEventOperation(op);
-    pthread_mutex_unlock(&injector_mutex);
-    DBUG_RETURN(-1);
-  }
-  pthread_mutex_unlock(&injector_mutex);
-
-  get_share(share);
-
-  DBUG_PRINT("info",("%s share->op: 0x%lx, share->use_count: %u",
-                     share->key, share->op, share->use_count));
-
-  sql_print_information("NDB Binlog: logging %s", share->key);
-  DBUG_RETURN(0);
-}
-
-/*
-  when entering the calling thread should have a share lock id share != 0
-  then the injector thread will have  one as well, i.e. share->use_count == 0
-  (unless it has already dropped... then share->op == 0)
-*/
-static int
-ndbcluster_handle_drop_table(Ndb *ndb, const char *event_name,
-                             NDB_SHARE *share)
-{
-  DBUG_ENTER("ndbcluster_handle_drop_table");
-
-  NDBDICT *dict= ndb->getDictionary();
-  if (event_name && dict->dropEvent(event_name))
-  {
-    /* drop event failed for some reason, issue a warning */
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-                        dict->getNdbError().code,
-                        dict->getNdbError().message, "NDB");
-    if (dict->getNdbError().code != 4710)
-    {
-      /* error is not that the event did not exist */
-      sql_print_error("NDB Binlog: Unable to drop event in database. "
-                      "Event: %s Error Code: %d Message: %s",
-                      event_name,
-                      dict->getNdbError().code,
-                      dict->getNdbError().message);
-      /* ToDo; handle error? */
-      if (share && share->op &&
-          share->op->getState() == NdbEventOperation::EO_EXECUTING &&
-          dict->getNdbError().code != 4009)
-      {  
-        DBUG_ASSERT(false);
-        DBUG_RETURN(-1);
-      }
-    }
-  }
-
-  if (share == 0 || share->op == 0)
-  {
-    DBUG_RETURN(0);
-  }
-
-/*
-  Syncronized drop between client thread and injector thread is
-  neccessary in order to maintain ordering in the binlog,
-  such that the drop occurs _after_ any inserts/updates/deletes.
-
-  The penalty for this is that the drop table becomes slow.
-
-  This wait is however not strictly neccessary to produce a binlog
-  that is usable.  However the slave does not currently handle
-  these out of order, thus we are keeping the SYNC_DROP_ defined
-  for now.
-*/
-#define SYNC_DROP_
-#ifdef SYNC_DROP_
-  (void) pthread_mutex_lock(&share->mutex);
-  int max_timeout= 10;
-  while (share->op)
-  {
-    struct timespec abstime;
-    set_timespec(abstime, 1);
-    (void) pthread_cond_timedwait(&injector_cond,
-                                  &share->mutex,
-                                  &abstime);
-    max_timeout--;
-    if (share->op == 0)
-      break;
-    if (max_timeout == 0)
-    {
-      sql_print_error("NDB delete table: timed out. Ignoring...");
-      break;
-    }
-    sql_print_information("NDB delete table: "
-                          "waiting max %u sec for drop table %s.",
-                          max_timeout, share->key);
-  }
-  (void) pthread_mutex_unlock(&share->mutex);
-#else
-  (void) pthread_mutex_lock(&share->mutex);
-  share->op_old= share->op;
-  share->op= 0;
-  (void) pthread_mutex_unlock(&share->mutex);
-#endif
-
-  DBUG_RETURN(0);
-}
-
-
-/********************************************************************
-  Internal helper functions for differentd events from the stoarage nodes
-  used by the ndb injector thread
-********************************************************************/
-
-/*
-  Handle error states on events from the storage nodes
-*/
-static int ndb_binlog_thread_handle_error(Ndb *ndb, NdbEventOperation *pOp,
-                                          Binlog_index_row &row)
-{
-  NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
-  DBUG_ENTER("ndb_binlog_thread_handle_error");
-
-  int overrun= pOp->isOverrun();
-  if (overrun)
-  {
-    /*
-      ToDo: this error should rather clear the binlog_index...
-      and continue
-    */
-    sql_print_error("NDB Binlog: Overrun in event buffer, "
-                    "this means we have dropped events. Cannot "
-                    "continue binlog for %s", share->key);
-    pOp->clearError();
-    DBUG_RETURN(-1);
-  }
-
-  if (!pOp->isConsistent())
-  {
-    /*
-      ToDo: this error should rather clear the binlog_index...
-      and continue
-    */
-    sql_print_error("NDB Binlog: Not Consistent. Cannot "
-                    "continue binlog for %s. Error code: %d"
-                    " Message: %s", share->key,
-                    pOp->getNdbError().code,
-                    pOp->getNdbError().message);
-    pOp->clearError();
-    DBUG_RETURN(-1);
-  }
-  sql_print_error("NDB Binlog: unhandled error %d for table %s",
-                  pOp->hasError(), share->key);
-  pOp->clearError();
-  DBUG_RETURN(0);
-}
-
-/*
-  Handle _non_ data events from the storage nodes
-*/
-static int
-ndb_binlog_thread_handle_non_data_event(Ndb *ndb, NdbEventOperation *pOp,
-                                        Binlog_index_row &row)
-{
-  NDB_SHARE *share= (NDB_SHARE *)pOp->getCustomData();
-  NDBEVENT::TableEvent type= pOp->getEventType();
-  int remote_drop_table= 0, do_close_cached_tables= 0;
-
-  /* make sure to flush any pending events as they can be dependent
-     on one of the tables being changed below
-  */
-  injector_thd->binlog_flush_pending_rows_event(true);
-
-  switch (type)
-  {
-  case NDBEVENT::TE_CLUSTER_FAILURE:
-    sql_print_information("NDB Binlog: cluster failure for %s.", share->key);
-
-    DBUG_PRINT("info", ("CLUSTER FAILURE EVENT: "
-                        "%s  received share: 0x%lx  op: %lx  share op: %lx  "
-                        "op_old: %lx",
-                       share->key, share, pOp, share->op, share->op_old));
-    if (apply_status_share)
-    {
-      free_share(&apply_status_share);
-      apply_status_share= 0;
-    }
-    break;
-  case NDBEVENT::TE_ALTER:
-    /* ToDo: remove printout */
-    sql_print_information("NDB Binlog: rename table %s%s/%s -> %s.",
-                          share_prefix, share->table->s->db.str,
-                          share->table->s->table_name.str,
-                          share->key);
-    /* do the rename of the table in the share */
-    share->table->s->db.str= share->db;
-    share->table->s->db.length= strlen(share->db);
-    share->table->s->table_name.str= share->table_name;
-    share->table->s->table_name.length= strlen(share->table_name);
-    goto drop_alter_common;
-  case NDBEVENT::TE_DROP:
-    /* ToDo: remove printout */
-    sql_print_information("NDB Binlog: drop table %s.",
-                          share->key);
-drop_alter_common:
-    row.n_schemaops++;
-    DBUG_PRINT("info", ("TABLE %s EVENT: %s  received share: 0x%lx  op: %lx  "
-                        "share op: %lx  op_old: %lx",
-                       type == NDBEVENT::TE_DROP ? "DROP" : "ALTER",
-                       share->key, share, pOp, share->op, share->op_old));
-    if (pOp->getReqNodeId() != ndb_cluster_node_id)
-    {
-      ndb->setDatabaseName(share->table->s->db.str);
-      ha_ndbcluster::invalidate_dictionary_cache(share->table,
-                                                 ndb,
-                                                
share->table->s->table_name.str,
-                                                 TRUE);
-      remote_drop_table= 1;
-    }
-    break;
-  default:
-    sql_print_error("NDB Binlog: unknown non data event %d for %s. "
-                    "Ignoring...", (unsigned) type, share->key);
-    return 0;
-  }
-
-  (void) pthread_mutex_lock(&share->mutex);
-  DBUG_ASSERT(share->op == pOp || share->op_old == pOp);
-  if (share->op_old == pOp)
-    share->op_old= 0;
-  else
-    share->op= 0;
-  // either just us or drop table handling as well
-      
-  /* Signal ha_ndbcluster::delete/rename_table that drop is done */
-  (void) pthread_mutex_unlock(&share->mutex);
-  (void) pthread_cond_signal(&injector_cond);
-
-  pthread_mutex_lock(&ndbcluster_mutex);
-  free_share(&share, TRUE);
-  if (remote_drop_table && share && share->state != NSS_DROPPED)
-  {
-    DBUG_PRINT("info", ("remote drop table"));
-    if (share->use_count != 1)
-      do_close_cached_tables= 1;
-    share->state= NSS_DROPPED;
-    free_share(&share, TRUE);
-  }
-  pthread_mutex_unlock(&ndbcluster_mutex);
-
-  share= 0;
-  pOp->setCustomData(0);
-          
-  pthread_mutex_lock(&injector_mutex);
-  injector_ndb->dropEventOperation(pOp);
-  pOp= 0;
-  pthread_mutex_unlock(&injector_mutex);
-
-  if (do_close_cached_tables)
-    close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0);
-
-  return 0;
-}
-
-/*
-  Handle data events from the storage nodes
-*/
-static int
-ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
-                                    Binlog_index_row &row,
-                                    injector::transaction &trans)
-{
-  NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
-  TABLE *table= share->table;
-  
-  assert(table != 0);
-#ifndef DBUG_OFF
-  dbug_print_table("table", table);
-#endif
-  TABLE_SHARE *table_s= table->s;
-  uint n_fields= table_s->fields;
-  MY_BITMAP b;
-  /* Potential buffer for the bitmap */
-  uint32 bitbuf[128 / (sizeof(uint32) * 8)];
-  bitmap_init(&b, n_fields <= sizeof(bitbuf) * 8 ? bitbuf : NULL, 
-              n_fields, false);
-  bitmap_set_all(&b);
-
-  /*
-   row data is already in table->record[0]
-   As we told the NdbEventOperation to do this
-   (saves moving data about many times)
-  */
-
-  switch(pOp->getEventType())
-  {
-  case NDBEVENT::TE_INSERT:
-    row.n_inserts++;
-    DBUG_PRINT("info", ("INSERT INTO %s", share->key));
-    {
-      ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
-      trans.write_row(::server_id, injector::transaction::table(table, true),
-                      &b, n_fields, table->record[0]);
-    }
-    break;
-  case NDBEVENT::TE_DELETE:
-    row.n_deletes++;
-    DBUG_PRINT("info",("DELETE FROM %s", share->key));
-    {
-      /*
-        table->record[0] contains only the primary key in this case
-        since we do not have an after image
-      */
-      int n;
-      if (table->s->primary_key != MAX_KEY)
-        n= 0; /*
-                use the primary key only as it save time and space and
-                it is the only thing needed to log the delete
-	      */
-      else
-        n= 1; /*
-                we use the before values since we don't have a primary key
-                since the mysql server does not handle the hidden primary
-                key
-	      */
-
-      ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
-      print_records(table, table->record[n]);
-      trans.delete_row(::server_id, injector::transaction::table(table, true),
-                       &b, n_fields, table->record[n]);
-    }
-    break;
-  case NDBEVENT::TE_UPDATE:
-    row.n_updates++;
-    DBUG_PRINT("info", ("UPDATE %s", share->key));
-    {
-      ndb_unpack_record(table, share->ndb_value[0],
-                        &b, table->record[0]);
-      print_records(table, table->record[0]);
-      if (table->s->primary_key != MAX_KEY) 
-      {
-        /*
-          since table has a primary key, we can to a write
-          using only after values
-	*/
-        trans.write_row(::server_id, injector::transaction::table(table, true),
-                        &b, n_fields, table->record[0]);// after values
-      }
-      else
-      {
-        /*
-          mysql server cannot handle the ndb hidden key and
-          therefore needs the before image as well
-	*/
-        ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
-        print_records(table, table->record[1]);
-        trans.update_row(::server_id,
-                         injector::transaction::table(table, true),
-                         &b, n_fields,
-                         table->record[1], // before values
-                         table->record[0]);// after values
-      }
-    }
-    break;
-  default:
-    /* We should REALLY never get here. */
-    DBUG_PRINT("info", ("default - uh oh, a brain exploded."));
-    break;
-  }
-
-  return 0;
-}
-
-/*
-  Timer class for doing performance measurements
-*/
-//#define RUN_NDB_BINLOG_TIMER
-#ifdef RUN_NDB_BINLOG_TIMER
-class Timer
-{
-public:
-  Timer() { start(); }
-  void start() { gettimeofday(&m_start, 0); }
-  void stop() { gettimeofday(&m_stop, 0); }
-  ulong elapsed_ms()
-  {
-    return (ulong)
-      (((longlong) m_stop.tv_sec - (longlong) m_start.tv_sec) * 1000 +
-       ((longlong) m_stop.tv_usec -
-        (longlong) m_start.tv_usec + 999) / 1000);
-  }
-private:
-  struct timeval m_start,m_stop;
-};
-#endif
-
-
-/****************************************************************
-  Injector thread main loop
-****************************************************************/
-
-pthread_handler_t ndb_binlog_thread_func(void *arg)
-{
-  THD *thd; /* needs to be first for thread_stack */
-  Ndb *ndb= 0;
-  int ndb_update_binlog_index= 1;
-  injector *inj= injector::instance();
-
-  pthread_mutex_lock(&injector_mutex);
-  /*
-    Set up the Thread
-  */
-  my_thread_init();
-  DBUG_ENTER("ndb_binlog_thread");
-
-  thd= new THD; /* note that contructor of THD uses DBUG_ */
-  THD_CHECK_SENTRY(thd);
-
-  thd->thread_stack= (char*) &thd; /* remember where our stack is */
-  if (thd->store_globals())
-  {
-    thd->cleanup();
-    delete thd;
-    ndb_binlog_thread_running= -1;
-    pthread_mutex_unlock(&injector_mutex);
-    pthread_cond_signal(&injector_cond);
-    my_thread_end();
-    pthread_exit(0);
-    DBUG_RETURN(NULL);
-  }
-
-  thd->init_for_queries();
-  thd->command= COM_DAEMON;
-  injector_thd= thd;
-
-  /*
-    Set up ndb binlog
-  */
-  sql_print_information("Starting Cluster Binlog");
-
-  pthread_detach_this_thread();
-  thd->real_id= pthread_self();
-  pthread_mutex_lock(&LOCK_thread_count);
-  thd->thread_id= thread_id++;
-  threads.append(thd);
-  pthread_mutex_unlock(&LOCK_thread_count);
-  thd->lex->start_transaction_opt= 0;
-
-  if (!(ndb= new Ndb(g_ndb_cluster_connection, "")) ||
-      ndb->init())
-  {
-    sql_print_error("NDB Binlog: Getting Ndb object failed");
-    ndb_binlog_thread_running= -1;
-    pthread_mutex_unlock(&injector_mutex);
-    pthread_cond_signal(&injector_cond);
-    goto err;
-  }
-
-  /*
-    Expose global reference to our ndb object.
-
-    Used by both sql client thread and binlog thread to interact
-    with the storage
-    pthread_mutex_lock(&injector_mutex);
-  */
-  injector_ndb= ndb;
-  ndb_binlog_thread_running= 1;
-
-  /*
-    We signal the thread that started us that we've finished
-    starting up.
-  */
-  pthread_mutex_unlock(&injector_mutex);
-  pthread_cond_signal(&injector_cond);
-
-  thd->system_thread= SYSTEM_THREAD_NDBCLUSTER_BINLOG;
-  thd->version= refresh_version;
-  thd->set_time();
-  thd->main_security_ctx.host_or_ip= "";
-  thd->client_capabilities= 0;
-  my_net_init(&thd->net, 0);
-  thd->main_security_ctx.master_access= ~0;
-  thd->main_security_ctx.priv_user= 0;
-
-  thd->proc_info= "Waiting for ndbcluster to start";
-
-  pthread_mutex_lock(&injector_mutex);
-  while (!ndbcluster_util_inited)
-  {
-    /* ndb not connected yet */
-    struct timespec abstime;
-    set_timespec(abstime, 1);
-    pthread_cond_timedwait(&injector_cond, &injector_mutex, &abstime);
-    if (abort_loop)
-    {
-      pthread_mutex_unlock(&injector_mutex);
-      goto err;
-    }
-  }
-  pthread_mutex_unlock(&injector_mutex);
-
-  /*
-    Main NDB Injector loop
-  */
-
-  thd->query_id= 0; // to keep valgrind quiet
-  {
-    static char db[]= "";
-    thd->db= db;
-    open_binlog_index(thd, &binlog_tables, &binlog_index);
-    if (!(apply_status_share= ndbcluster_get_apply_status_share()))
-    {
-      sql_print_error("NDB: Could not get apply status share");
-    }
-    thd->db= db;
-  }
-
-#ifdef RUN_NDB_BINLOG_TIMER
-  Timer main_timer;
-#endif
-  for ( ; !((abort_loop || do_ndbcluster_binlog_close_connection) &&
-            ndb_latest_handled_binlog_epoch >= g_latest_trans_gci); )
-  {
-
-#ifdef RUN_NDB_BINLOG_TIMER
-    main_timer.stop();
-    sql_print_information("main_timer %ld ms",  main_timer.elapsed_ms());
-    main_timer.start();
-#endif
-
-    /*
-      now we don't want any events before next gci is complete
-    */
-    thd->proc_info= "Waiting for event from ndbcluster";
-    thd->set_time();
-    
-    /* wait for event or 1000 ms */
-    Uint64 gci;
-    int res= ndb->pollEvents(1000, &gci);
-    ndb_latest_received_binlog_epoch= gci;
-
-    if ((abort_loop || do_ndbcluster_binlog_close_connection) &&
-        ndb_latest_handled_binlog_epoch >= g_latest_trans_gci)
-      break; /* Shutting down server */
-
-    if (binlog_index && binlog_index->s->version < refresh_version)
-    {
-      if (binlog_index->s->version < refresh_version)
-      {
-        close_thread_tables(thd);
-        binlog_index= 0;
-      }
-    }
-
-    if (res > 0)
-    {
-      DBUG_PRINT("info", ("pollEvents res: %d", res));
-#ifdef RUN_NDB_BINLOG_TIMER
-      Timer gci_timer, write_timer;
-      int event_count= 0;
-#endif
-      thd->proc_info= "Processing events";
-      NdbEventOperation *pOp= ndb->nextEvent();
-      Binlog_index_row row;
-      while (pOp != NULL)
-      {
-        ndb->
-          setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
-        ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
-
-        assert(pOp->getGCI() <= ndb_latest_received_binlog_epoch);
-        if (!apply_status_share)
-        {
-          if (!(apply_status_share= ndbcluster_get_apply_status_share()))
-            sql_print_error("NDB: Could not get apply status share");
-        }
-        bzero((char*) &row, sizeof(row));
-        injector::transaction trans= inj->new_trans(thd);
-        gci= pOp->getGCI();
-        if (apply_status_share)
-        {
-          TABLE *table= apply_status_share->table;
-          MY_BITMAP b;
-          uint32 bitbuf;
-          DBUG_ASSERT(table->s->fields <= sizeof(bitbuf) * 8);
-          bitmap_init(&b, &bitbuf, table->s->fields, false);
-          bitmap_set_all(&b);
-          table->field[0]->store((longlong)::server_id);
-          table->field[1]->store((longlong)gci);
-          trans.write_row(::server_id,
-                          injector::transaction::table(table, true),
-                          &b, table->s->fields,
-                          table->record[0]);
-        }
-#ifdef RUN_NDB_BINLOG_TIMER
-        write_timer.start();
-#endif
-        do
-        {
-#ifdef RUN_NDB_BINLOG_TIMER
-          event_count++;
-#endif
-          if (pOp->hasError() &&
-              ndb_binlog_thread_handle_error(ndb, pOp, row) < 0)
-            goto err;
-
-#ifndef DBUG_OFF
-          {
-            NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
-            DBUG_PRINT("info",
-                       ("EVENT TYPE:%d  GCI:%lld  last applied: %lld  "
-                        "share: 0x%lx", pOp->getEventType(), gci,
-                        ndb_latest_applied_binlog_epoch, share));
-            DBUG_ASSERT(share != 0);
-          }
-#endif
-          if ((unsigned) pOp->getEventType() <
-              (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
-            ndb_binlog_thread_handle_data_event(ndb, pOp, row, trans);
-          else
-            ndb_binlog_thread_handle_non_data_event(ndb, pOp, row);
-
-          pOp= ndb->nextEvent();
-        } while (pOp && pOp->getGCI() == gci);
-
-        /*
-          note! pOp is not referring to an event in the next epoch
-          or is == 0
-	*/
-#ifdef RUN_NDB_BINLOG_TIMER
-        write_timer.stop();
-#endif
-
-        if (row.n_inserts || row.n_updates
-            || row.n_deletes || row.n_schemaops)
-        {
-          injector::transaction::binlog_pos start= trans.start_pos();
-          if (int r= trans.commit())
-          {
-            sql_print_error("NDB binlog:"
-                            "Error during COMMIT of GCI. Error: %d",
-                            r);
-            /* TODO: Further handling? */
-          }
-          row.gci= gci;
-          row.master_log_file= start.file_name();
-          row.master_log_pos= start.file_pos();
-
-          DBUG_PRINT("info",("COMMIT gci %lld",gci));
-          if (ndb_update_binlog_index)
-            ndb_add_binlog_index(thd, &row);
-          ndb_latest_applied_binlog_epoch= gci;
-        }
-        else
-          trans.commit();
-        ndb_latest_handled_binlog_epoch= gci;
-#ifdef RUN_NDB_BINLOG_TIMER
-        gci_timer.stop();
-        sql_print_information("gci %ld event_count %d write time "
-                              "%ld(%d e/s), total time %ld(%d e/s)",
-                              (ulong)gci, event_count,
-                              write_timer.elapsed_ms(),
-                              event_count / write_timer.elapsed_ms(),
-                              gci_timer.elapsed_ms(),
-                              event_count / gci_timer.elapsed_ms());
-#endif
-      }
-    }
-    ndb_latest_handled_binlog_epoch= ndb_latest_received_binlog_epoch;
-  }
-err:
-  DBUG_PRINT("info",("Shutting down cluster binlog thread"));
-  close_thread_tables(thd);
-  pthread_mutex_lock(&injector_mutex);
-  /* don't mess with the injector_ndb anymore from other threads */
-  injector_ndb= 0;
-  pthread_mutex_unlock(&injector_mutex);
-  thd->db= 0; // as not to try to free memory
-  sql_print_information("Stopping Cluster Binlog");
-
-  if (apply_status_share)
-    free_share(&apply_status_share);
-
-  /* remove all event operations */
-  if (ndb)
-  {
-    NdbEventOperation *op;
-    DBUG_PRINT("info",("removing all event operations"));
-    while ((op= ndb->getEventOperation()))
-    {
-      DBUG_PRINT("info",("removing event operation on %s",
-                         op->getEvent()->getName()));
-      NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
-      free_share(&share);
-      ndb->dropEventOperation(op);
-    }
-    delete ndb;
-    ndb= 0;
-  }
-
-  net_end(&thd->net);
-  thd->cleanup();
-  delete thd;
-
-  ndb_binlog_thread_running= -1;
-  (void) pthread_cond_signal(&injector_cond);
-
-  DBUG_PRINT("exit", ("ndb_binlog_thread"));
-  my_thread_end();
-
-  pthread_exit(0);
-  DBUG_RETURN(NULL);
-}
-#endif /* HAVE_NDB_BINLOG */
 
 
 /*

--- 1.102/sql/ha_ndbcluster.h	2006-01-02 21:46:38 +01:00
+++ 1.103/sql/ha_ndbcluster.h	2006-01-04 16:08:47 +01:00
@@ -98,6 +98,9 @@
   TABLE_SHARE *table_share;
   TABLE *table;
   NdbValue *ndb_value[2];
+  MY_BITMAP *subscriber_bitmap;
+  MY_BITMAP slock_bitmap;
+  uint32 slock[256/32]; // 256 bits for lock status of table
 #endif
 } NDB_SHARE;
 
@@ -474,6 +477,11 @@
   Place holder for ha_ndbcluster thread specific data
 */
 
+enum THD_NDB_OPTIONS
+{
+  TNO_NO_LOG_SCHEMA_OP= 1 << 0
+};
+
 class Thd_ndb 
 {
  public:
@@ -485,6 +493,7 @@
   NdbTransaction *all;
   NdbTransaction *stmt;
   int error;
+  uint32 options;
   List<NDB_SHARE> changed_tables;
 };
 
@@ -787,25 +796,10 @@
 
 extern struct show_var_st ndb_status_variables[];
 
-bool ndbcluster_init(void);
-int ndbcluster_end(ha_panic_function flag);
-
 int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
                         const void** frmblob, uint* frmlen);
 int ndbcluster_find_files(THD *thd,const char *db,const char *path,
                           const char *wild, bool dir, List<char> *files);
 int ndbcluster_table_exists_in_engine(THD* thd,
                                       const char *db, const char *name);
-void ndbcluster_drop_database(char* path);
-
 void ndbcluster_print_error(int error, const NdbOperation *error_op);
-
-bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
-
-#ifdef HAVE_NDB_BINLOG
-int ndbcluster_reset_logs(THD *thd);
-int ndbcluster_binlog_index_purge_file(THD *thd, const char *file);
-void ndbcluster_reset_slave(THD *thd);
-void ndbcluster_binlog_wait(THD *thd);
-int ndbcluster_binlog_end();
-#endif

--- 1.73/libmysqld/Makefile.am	2005-12-15 09:21:04 +01:00
+++ 1.74/libmysqld/Makefile.am	2006-01-04 16:08:46 +01:00
@@ -45,10 +45,10 @@
 sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
 	ha_heap.cc ha_myisam.cc ha_myisammrg.cc handler.cc sql_handler.cc \
 	hostname.cc init.cc password.c \
-	rpl_filter.cc \
 	item.cc item_buff.cc item_cmpfunc.cc item_create.cc \
 	item_func.cc item_strfunc.cc item_sum.cc item_timefunc.cc \
 	item_geofunc.cc item_uniq.cc item_subselect.cc item_row.cc\
+	item_xmlfunc.cc \
 	key.cc lock.cc log.cc log_event.cc sql_state.c \
 	protocol.cc net_serv.cc opt_range.cc \
 	opt_sum.cc procedure.cc records.cc sql_acl.cc \
@@ -149,19 +149,19 @@
 link_sources:
 	  set -x; \
 	  for f in $(sqlsources); do \
-	    rm -f $(srcdir)/$$f; \
-	    @LN_CP_F@ $(srcdir)/../sql/$$f $(srcdir)/$$f; \
+	    rm -f $$f; \
+	    @LN_CP_F@ $(top_srcdir)/sql/$$f $$f; \
 	  done; \
 	  for f in $(libmysqlsources); do \
-	    rm -f $(srcdir)/$$f; \
-	    @LN_CP_F@ $(srcdir)/../libmysql/$$f $(srcdir)/$$f; \
+	    rm -f $$f; \
+	    @LN_CP_F@ $(top_srcdir)/libmysql/$$f $$f; \
 	  done; \
 	  for f in $(sqlstoragesources); do \
-	    rm -f $(srcdir)/$$f; \
-	    @LN_CP_F@ `find $(srcdir)/../sql -name $$f` $(srcdir)/$$f; \
+	    rm -f $$f; \
+	    @LN_CP_F@ `find $(srcdir)/../sql -name $$f` $$f; \
 	  done; \
-	  rm -f $(srcdir)/client_settings.h; \
-	  @LN_CP_F@ $(srcdir)/../libmysql/client_settings.h $(srcdir)/client_settings.h;
+	  rm -f client_settings.h; \
+	  @LN_CP_F@ $(top_srcdir)/libmysql/client_settings.h client_settings.h
 
 
 clean-local:

--- 1.7/mysql-test/r/ndb_multi.result	2005-12-29 20:48:07 +01:00
+++ 1.8/mysql-test/r/ndb_multi.result	2006-01-04 16:30:20 +01:00
@@ -30,14 +30,6 @@
 create table t1 (a int) engine=ndbcluster;
 insert into t1 value (2);
 select * from t1;
-ERROR HY000: Table definition has changed, please retry transaction
-show warnings;
-Level	Code	Message
-Error	1296	Got error 241 'Invalid schema object version' from NDB
-Error	1412	Table definition has changed, please retry transaction
-Error	1105	Unknown error
-flush table t1;
-select * from t1;
 a
 2
 flush status;
@@ -60,13 +52,13 @@
 1	Hi!	89	Longtext column
 show status like 'handler_discover%';
 Variable_name	Value
-Handler_discover	1
+Handler_discover	0
 show tables like 't4';
 Tables_in_test (t4)
 t4
 show status like 'handler_discover%';
 Variable_name	Value
-Handler_discover	2
+Handler_discover	0
 show tables;
 Tables_in_test
 t1

--- 1.8/mysql-test/t/ndb_multi.test	2005-12-29 20:48:08 +01:00
+++ 1.9/mysql-test/t/ndb_multi.test	2006-01-04 16:30:20 +01:00
@@ -40,11 +40,12 @@
 create table t1 (a int) engine=ndbcluster;
 insert into t1 value (2);
 connection server1;
-# Currently a retry is required remotely
---error 1412
-select * from t1;
-show warnings;
-flush table t1;
+## Currently a retry is required remotely
+#--error 1412
+#select * from t1;
+#show warnings;
+#flush table t1;
+# Table definition change should be propagated automatically
 select * from t1;
 
 # Connect to server2 and use the tables from there

--- 1.45/storage/ndb/src/ndbapi/ndberror.c	2005-12-14 11:06:38 +01:00
+++ 1.46/storage/ndb/src/ndbapi/ndberror.c	2006-01-04 16:08:48 +01:00
@@ -59,6 +59,8 @@
 
 #define OE ndberror_cl_schema_object_already_exists
 
+#define IT ndberror_cl_internal_temporary
+
 /* default mysql error code for unmapped codes */
 #define DMEC -1
 
@@ -207,6 +209,7 @@
   /**
    * OverloadError
    */
+  { 701,  DMEC, OL, "System busy with other schema operation" },
   { 410,  DMEC, OL, "REDO log files overloaded, consult online manual (decrease
TimeBetweenLocalCheckpoints, and|or increase NoOfFragmentLogFiles)" },
   { 677,  DMEC, OL, "Index UNDO buffers overloaded (increase UndoIndexBuffer)" },
   { 891,  DMEC, OL, "Data UNDO buffers overloaded (increase UndoDataBuffer)" },
@@ -214,6 +217,10 @@
   { 4006, DMEC, OL, "Connect failure - out of connection objects (increase
MaxNoOfConcurrentTransactions)" }, 
 
 
+  /*
+   * Internal Temporary
+   */
+  { 702,  DMEC, IT, "Request to non-master" },
   
   /**
    * Internal errors
@@ -239,7 +246,6 @@
   { 290,  DMEC, IE, "Corrupt key in TC, unable to xfrm" },
   { 631,  DMEC, IE, "631" },
   { 632,  DMEC, IE, "632" },
-  { 702,  DMEC, IE, "Request to non-master" },
   { 706,  DMEC, IE, "Inconsistency during table creation" },
   { 809,  DMEC, IE, "809" },
   { 812,  DMEC, IE, "812" },
@@ -334,7 +340,6 @@
   /**
    * SchemaError
    */
-  { 701,  DMEC, SE, "System busy with other schema operation" },
   { 703,  DMEC, SE, "Invalid table format" },
   { 704,  DMEC, SE, "Attribute name too long" },
   { 705,  DMEC, SE, "Table name too long" },
@@ -614,6 +619,7 @@
   { ST_T, OL, "Overload error"},
   { ST_T, TO, "Timeout expired"},
   { ST_T, NS, "Node shutdown"},
+  { ST_T, IT, "Internal temporary"},
   
   { ST_U , UR, "Unknown result error"},
   { ST_U , UE, "Unknown error code"},
Thread
bk commit into 5.1 tree (tomas:1.1987)tomas4 Jan