List:Commits« Previous MessageNext Message »
From:tomas Date:January 5 2006 10:51am
Subject:bk commit into 5.1 tree (tomas:1.1992)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1992 06/01/05 11:51:00 tomas@stripped +12 -0
  Merge tulin@stripped:/home/bk/mysql-5.1-wl1012-v3.old
  into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-wl2325-repl

  sql/ha_ndbcluster.cc
    1.227 06/01/05 11:50:53 tomas@stripped +0 -0
    manual merge

  sql/Makefile.am
    1.126 06/01/05 11:50:53 tomas@stripped +1 -0
    manual merge

  mysql-test/r/information_schema.result
    1.93 06/01/05 11:50:53 tomas@stripped +0 -1
    manual merge

  sql/sql_parse.cc
    1.488 06/01/05 11:46:13 tomas@stripped +0 -0
    Auto merged

  sql/sql_class.h
    1.272 06/01/05 11:46:13 tomas@stripped +0 -0
    Auto merged

  sql/sql_base.cc
    1.287 06/01/05 11:46:13 tomas@stripped +0 -0
    Auto merged

  sql/set_var.cc
    1.151 06/01/05 11:46:13 tomas@stripped +0 -0
    Auto merged

  sql/mysqld.cc
    1.508 06/01/05 11:46:12 tomas@stripped +0 -0
    Auto merged

  sql/mysql_priv.h
    1.350 06/01/05 11:46:12 tomas@stripped +0 -0
    Auto merged

  sql/log.cc
    1.179 06/01/05 11:46:12 tomas@stripped +0 -0
    Auto merged

  sql/handler.h
    1.172 06/01/05 11:46:12 tomas@stripped +0 -0
    Auto merged

  sql/handler.cc
    1.197 06/01/05 11:46:11 tomas@stripped +0 -0
    Auto merged

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/mysql-5.1-wl2325-repl/RESYNC

--- 1.125/sql/Makefile.am	2006-01-04 16:08:46 +01:00
+++ 1.126/sql/Makefile.am	2006-01-05 11:50:53 +01:00
@@ -104,10 +104,8 @@
 			ha_innodb.h  ha_berkeley.h  ha_archive.h \
 			ha_blackhole.cc ha_federated.cc ha_ndbcluster.cc \
 			ha_blackhole.h  ha_federated.h  ha_ndbcluster.h \
-			ha_partition.cc ha_partition.h \
 			ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h \
-			examples/ha_tina.cc examples/ha_example.cc \
-			examples/ha_tina.h  examples/ha_example.h
+			ha_partition.cc ha_partition.h
 mysqld_DEPENDENCIES =	@mysql_se_objs@
 gen_lex_hash_SOURCES =	gen_lex_hash.cc
 gen_lex_hash_LDADD =	$(LDADD) $(CXXLDFLAGS)

--- 1.196/sql/handler.cc	2006-01-04 16:08:47 +01:00
+++ 1.197/sql/handler.cc	2006-01-05 11:46:11 +01:00
@@ -30,7 +30,7 @@
 
 #include <myisampack.h>
 #include <errno.h>
-  
+
 #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
 #define NDB_MAX_ATTRIBUTES_IN_TABLE 128
 #include "ha_ndbcluster.h"
@@ -38,10 +38,34 @@
 #ifdef WITH_PARTITION_STORAGE_ENGINE
 #include "ha_partition.h"
 #endif
+
+#ifdef WITH_INNOBASE_STORAGE_ENGINE
+#include "ha_innodb.h"
+#endif
+
 extern handlerton *sys_table_types[];
-  
+
 #define BITMAP_STACKBUF_SIZE (128/8)
 
+/* static functions defined in this file */
+
+static handler *create_default(TABLE_SHARE *table);
+
+const handlerton default_hton =
+{
+  MYSQL_HANDLERTON_INTERFACE_VERSION,
+  "DEFAULT",
+  SHOW_OPTION_YES,
+  NULL,
+  DB_TYPE_DEFAULT,
+  NULL,
+  0, 0,
+  NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL,
+  NULL, NULL, NULL,
+  create_default,
+  NULL, NULL, NULL, NULL, NULL,
+  HTON_NO_FLAGS
+};
 
 static SHOW_COMP_OPTION have_yes= SHOW_OPTION_YES;
 
@@ -54,12 +78,12 @@
 
 struct show_table_alias_st sys_table_aliases[]=
 {
-  {"INNOBASE",	"InnoDB"},
-  {"NDB", "NDBCLUSTER"},
-  {"BDB", "BERKELEYDB"},
-  {"HEAP", "MEMORY"},
-  {"MERGE", "MRG_MYISAM"},
-  {NullS, NullS}
+  {"INNOBASE",  DB_TYPE_INNODB},
+  {"NDB",       DB_TYPE_NDBCLUSTER},
+  {"BDB",       DB_TYPE_BERKELEY_DB},
+  {"HEAP",      DB_TYPE_HEAP},
+  {"MERGE",     DB_TYPE_MRG_MYISAM},
+  {NullS,       DB_TYPE_UNKNOWN}
 };
 
 const char *ha_row_type[] = {
@@ -75,26 +99,22 @@
 static TYPELIB known_extensions= {0,"known_exts", NULL, NULL};
 uint known_extensions_id= 0;
 
-enum db_type ha_resolve_by_name(const char *name, uint namelen)
+handlerton *ha_resolve_by_name(THD *thd, LEX_STRING *name)
 {
-  THD *thd= current_thd;
   show_table_alias_st *table_alias;
-  handlerton **types;
+  st_plugin_int *plugin;
 
   if (thd && !my_strnncoll(&my_charset_latin1,
-                           (const uchar *)name, namelen,
+                           (const uchar *)name->str, name->length,
                            (const uchar *)"DEFAULT", 7))
-    return (enum db_type) thd->variables.table_type;
+    return ha_resolve_by_legacy_type(thd, DB_TYPE_DEFAULT);
 
-retest:
-  for (types= sys_table_types; *types; types++)
+  if ((plugin= plugin_lock(name, MYSQL_STORAGE_ENGINE_PLUGIN)))
   {
-    if ((!my_strnncoll(&my_charset_latin1,
-                       (const uchar *)name, namelen,
-                       (const uchar *)(*types)->name,
-                        strlen((*types)->name))) &&
-        !((*types)->flags & HTON_NOT_USER_SELECTABLE))
-      return (enum db_type) (*types)->db_type;
+    handlerton *hton= (handlerton *) plugin->plugin->info;
+    if (!(hton->flags & HTON_NOT_USER_SELECTABLE))
+      return hton;
+    plugin_unlock(plugin);
   }
 
   /*
@@ -103,63 +123,99 @@
   for (table_alias= sys_table_aliases; table_alias->type; table_alias++)
   {
     if (!my_strnncoll(&my_charset_latin1,
-                      (const uchar *)name, namelen,
+                      (const uchar *)name->str, name->length,
                       (const uchar *)table_alias->alias,
                       strlen(table_alias->alias)))
-    {
-      name= table_alias->type;
-      namelen= strlen(name);
-      goto retest;
-    }
+      return ha_resolve_by_legacy_type(thd, table_alias->type);
   }
 
-  return DB_TYPE_UNKNOWN;
+  return NULL;
 }
 
 
-const char *ha_get_storage_engine(enum db_type db_type)
+struct plugin_find_dbtype_st
 {
-  handlerton **types;
-  for (types= sys_table_types; *types; types++)
+  enum legacy_db_type db_type;
+  handlerton *hton;
+};
+
+
+static my_bool plugin_find_dbtype(THD *unused, st_plugin_int *plugin,
+                                  void *arg)
+{
+  handlerton *types= (handlerton *) plugin->plugin->info;
+  if (types->db_type == ((struct plugin_find_dbtype_st *)arg)->db_type)
   {
-    if (db_type == (*types)->db_type)
-      return (*types)->name;
+    ((struct plugin_find_dbtype_st *)arg)->hton= types;
+    return TRUE;
   }
-  return "*NONE*";
+  return FALSE;
 }
 
 
-bool ha_check_storage_engine_flag(enum db_type db_type, uint32 flag)
+const char *ha_get_storage_engine(enum legacy_db_type db_type)
 {
-  handlerton **types;
-  for (types= sys_table_types; *types; types++)
+  struct plugin_find_dbtype_st info;
+  
+  switch (db_type)
   {
-    if (db_type == (*types)->db_type)
-      return test((*types)->flags & flag);
+  case DB_TYPE_DEFAULT:
+    return "DEFAULT";
+  case DB_TYPE_UNKNOWN:
+    return "UNKNOWN";
+  default:
+    info.db_type= db_type;
+
+    if (!plugin_foreach(NULL, plugin_find_dbtype, 
+                        MYSQL_STORAGE_ENGINE_PLUGIN, &info))
+      return "*NONE*";
+
+    return info.hton->name;
   }
-  return FALSE;                                 // No matching engine
 }
 
 
-my_bool ha_storage_engine_is_enabled(enum db_type database_type)
+static handler *create_default(TABLE_SHARE *table)
 {
-  handlerton **types;
-  for (types= sys_table_types; *types; types++)
+  handlerton *hton=ha_resolve_by_legacy_type(current_thd, DB_TYPE_DEFAULT);
+  return (hton && hton != &default_hton && hton->create) ? 
+        hton->create(table) : NULL;
+}
+
+
+handlerton *ha_resolve_by_legacy_type(THD *thd, enum legacy_db_type db_type)
+{
+  struct plugin_find_dbtype_st info;
+
+  switch (db_type)
   {
-    if (database_type == (*types)->db_type)
-      return ((*types)->state == SHOW_OPTION_YES) ? TRUE : FALSE;
+  case DB_TYPE_DEFAULT:
+    return (thd->variables.table_type != NULL) ?
+            thd->variables.table_type :
+            (global_system_variables.table_type != NULL ?
+             global_system_variables.table_type : &myisam_hton);
+  case DB_TYPE_UNKNOWN:
+    return NULL;
+  default:
+    info.db_type= db_type;
+    if (!plugin_foreach(NULL, plugin_find_dbtype, 
+                        MYSQL_STORAGE_ENGINE_PLUGIN, &info))
+      return NULL;
+
+    return info.hton;
   }
-  return FALSE;
 }
 
 
 /* Use other database handler if databasehandler is not compiled in */
 
-enum db_type ha_checktype(THD *thd, enum db_type database_type,
+handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type,
                           bool no_substitute, bool report_error)
 {
-  if (ha_storage_engine_is_enabled(database_type))
-    return database_type;
+  handlerton *hton= ha_resolve_by_legacy_type(thd, database_type);
+  if (ha_storage_engine_is_enabled(hton))
+    return hton;
+
   if (no_substitute)
   {
     if (report_error)
@@ -167,34 +223,28 @@
       const char *engine_name= ha_get_storage_engine(database_type);
       my_error(ER_FEATURE_DISABLED,MYF(0),engine_name,engine_name);
     }
-    return DB_TYPE_UNKNOWN;
+    return NULL;
   }
 
   switch (database_type) {
 #ifndef NO_HASH
   case DB_TYPE_HASH:
-    return (database_type);
+    return ha_resolve_by_legacy_type(thd, DB_TYPE_HASH);
 #endif
   case DB_TYPE_MRG_ISAM:
-    return (DB_TYPE_MRG_MYISAM);
+    return ha_resolve_by_legacy_type(thd, DB_TYPE_MRG_MYISAM);
   default:
     break;
   }
-  
-  return ((enum db_type) thd->variables.table_type != DB_TYPE_UNKNOWN ?
-          (enum db_type) thd->variables.table_type :
-          ((enum db_type) global_system_variables.table_type !=
-           DB_TYPE_UNKNOWN ?
-           (enum db_type) global_system_variables.table_type : DB_TYPE_MYISAM)
-          );
+
+  return ha_resolve_by_legacy_type(thd, DB_TYPE_DEFAULT);  
 } /* ha_checktype */
 
 
 handler *get_new_handler(TABLE_SHARE *share, MEM_ROOT *alloc,
-                         enum db_type db_type)
+                         handlerton *db_type)
 {
   handler *file= NULL;
-  handlerton **types;
   /*
     handlers are allocated with new in the handlerton create() function
     we need to set the thd mem_root for these to be allocated correctly
@@ -202,20 +252,15 @@
   THD *thd= current_thd;
   MEM_ROOT *thd_save_mem_root= thd->mem_root;
   thd->mem_root= alloc;
-  for (types= sys_table_types; *types; types++)
-  {
-    if (db_type == (*types)->db_type && (*types)->create)
-    {
-      file= ((*types)->state == SHOW_OPTION_YES) ?
-		(*types)->create(share) : NULL;
-      break;
-    }
-  }
+
+  if (db_type != NULL && db_type->state == SHOW_OPTION_YES && db_type->create)
+    file= db_type->create(share);
+
   thd->mem_root= thd_save_mem_root;
 
   if (!file)
   {
-    enum db_type def=(enum db_type) current_thd->variables.table_type;
+    handlerton *def= current_thd->variables.table_type;
     /* Try first with 'default table type' */
     if (db_type != def)
       return get_new_handler(share, alloc, def);
@@ -342,16 +387,55 @@
 }
 
 
-static inline void ha_was_inited_ok(handlerton **ht)
+static void ha_was_inited_ok(handlerton *ht)
 {
-  uint tmp= (*ht)->savepoint_offset;
-  (*ht)->savepoint_offset= savepoint_alloc_size;
+  uint tmp= ht->savepoint_offset;
+  ht->savepoint_offset= savepoint_alloc_size;
   savepoint_alloc_size+= tmp;
-  (*ht)->slot= total_ha++;
-  if ((*ht)->prepare)
+  ht->slot= total_ha++;
+  if (ht->prepare)
     total_ha_2pc++;
 }
 
+
+int ha_initialize_handlerton(handlerton *hton)
+{
+  DBUG_ENTER("ha_initialize_handlerton");
+
+  if (hton == NULL)
+    DBUG_RETURN(1);
+
+  switch (hton->state)
+  {
+  case SHOW_OPTION_NO:
+    break;
+  case SHOW_OPTION_YES:
+    if (!hton->init || !hton->init())
+    {
+      ha_was_inited_ok(hton);
+      break;
+    }
+    /* fall through */
+  default:
+    hton->state= SHOW_OPTION_DISABLED;
+    break;
+  }
+  DBUG_RETURN(0);
+}
+
+
+static my_bool init_handlerton(THD *unused1, st_plugin_int *plugin,
+                               void *unused2)
+{
+  if (plugin->state == PLUGIN_IS_UNINITIALIZED)
+  {
+    ha_initialize_handlerton((handlerton *) plugin->plugin->info);
+    plugin->state= PLUGIN_IS_READY;
+  }
+  return FALSE;
+}
+
+
 int ha_init()
 {
   int error= 0;
@@ -362,16 +446,8 @@
   if (ha_init_errors())
     return 1;
 
-  /*
-    We now initialize everything here.
-  */
-  for (types= sys_table_types; *types; types++)
-  {
-    if (!(*types)->init || !(*types)->init())
-      ha_was_inited_ok(types); 
-    else
-      (*types)->state= SHOW_OPTION_DISABLED;
-  }
+  if (plugin_foreach(NULL, init_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, 0))
+    return 1;
 
   DBUG_ASSERT(total_ha < MAX_HA);
   /*
@@ -384,43 +460,97 @@
   return error;
 }
 
-	/* close, flush or restart databases */
-	/* Ignore this for other databases than ours */
 
-int ha_panic(enum ha_panic_function flag)
+int ha_register_builtin_plugins()
 {
-  int error=0;
-  handlerton **types;
+  handlerton **hton;
+  uint size= 0;
+  struct st_mysql_plugin *plugin;
+  DBUG_ENTER("ha_register_builtin_plugins");
 
-  for (types= sys_table_types; *types; types++)
+  for (hton= sys_table_types; *hton; hton++)
+    size+= sizeof(struct st_mysql_plugin);
+  
+  if (!(plugin= (struct st_mysql_plugin *)
+        my_once_alloc(size, MYF(MY_WME | MY_ZEROFILL))))
+    DBUG_RETURN(1);
+  
+  for (hton= sys_table_types; *hton; hton++, plugin++)
   {
-    if ((*types)->state == SHOW_OPTION_YES && (*types)->panic)
-        error|= (*types)->panic(flag);
+    plugin->type= MYSQL_STORAGE_ENGINE_PLUGIN;
+    plugin->info= *hton;
+    plugin->version= 0;
+    plugin->name= (*hton)->name;
+    plugin->author= NULL;
+    plugin->descr= (*hton)->comment;
+    
+    if (plugin_register_builtin(plugin))
+      DBUG_RETURN(1);
   }
-  if (ha_finish_errors())
-    error= 1;
-  return error;
+  DBUG_RETURN(0);
+}
+
+
+
+
+	/* close, flush or restart databases */
+	/* Ignore this for other databases than ours */
+
+static my_bool panic_handlerton(THD *unused1, st_plugin_int *plugin,
+                               void *arg)
+{
+  handlerton *hton= (handlerton *) plugin->plugin->info;  
+  if (hton->state == SHOW_OPTION_YES && hton->panic)
+    ((int*)arg)[0]|= hton->panic((enum ha_panic_function)((int*)arg)[1]);
+  return FALSE;
+}
+
+
+int ha_panic(enum ha_panic_function flag)
+{
+  int error[2];
+  
+  error[0]= 0; error[1]= (int)flag;
+  plugin_foreach(NULL, panic_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, error);
+  
+  if (flag == HA_PANIC_CLOSE && ha_finish_errors())
+    error[0]= 1;
+  return error[0];
 } /* ha_panic */
 
+static my_bool dropdb_handlerton(THD *unused1, st_plugin_int *plugin,
+                                 void *path)
+{
+  handlerton *hton= (handlerton *) plugin->plugin->info;  
+  if (hton->state == SHOW_OPTION_YES && hton->drop_database)
+    hton->drop_database((char *)path);
+  return FALSE;
+}
+
+
 void ha_drop_database(char* path)
 {
-  handlerton **types;
+  plugin_foreach(NULL, dropdb_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, path);
+}
 
-  for (types= sys_table_types; *types; types++)
-  {
-    if ((*types)->state == SHOW_OPTION_YES && (*types)->drop_database)
-      (*types)->drop_database(path);
-  }
+
+static my_bool closecon_handlerton(THD *thd, st_plugin_int *plugin,
+                                   void *unused)
+{
+  handlerton *hton= (handlerton *) plugin->plugin->info;  
+  /* there's no need to rollback here as all transactions must 
+     be rolled back already */
+  if (hton->state == SHOW_OPTION_YES && hton->close_connection &&
+      thd->ha_data[hton->slot])
+    hton->close_connection(thd);
+  return FALSE;
 }
 
+
 /* don't bother to rollback here, it's done already */
 void ha_close_connection(THD* thd)
 {
-  handlerton **types;
-  for (types= sys_table_types; *types; types++)
-	/* XXX Maybe do a rollback if close_connection == NULL ? */
-    if (thd->ha_data[(*types)->slot] && (*types)->close_connection)
-      (*types)->close_connection(thd);
+  plugin_foreach(thd, closecon_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, 0);
 }
 
 /* ========================================================================
@@ -730,21 +860,46 @@
 }
 
 
-int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
+struct xahton_st {
+  XID *xid;
+  int result;
+};
+
+static my_bool xacommit_handlerton(THD *unused1, st_plugin_int *plugin,
+                                   void *arg)
 {
-  handlerton **types;
-  int res= 1;
+  handlerton *hton= (handlerton *) plugin->plugin->info;
+  if (hton->state == SHOW_OPTION_YES && hton->recover)
+  {
+    hton->commit_by_xid(((struct xahton_st *)arg)->xid);
+    ((struct xahton_st *)arg)->result= 0;
+  }
+  return FALSE;
+}
 
-  for (types= sys_table_types; *types; types++)
+static my_bool xarollback_handlerton(THD *unused1, st_plugin_int *plugin,
+                                     void *arg)
+{
+  handlerton *hton= (handlerton *) plugin->plugin->info;
+  if (hton->state == SHOW_OPTION_YES && hton->recover)
   {
-    if ((*types)->state == SHOW_OPTION_YES && (*types)->recover)
-    {
-      if ((*(commit ? (*types)->commit_by_xid :
-             (*types)->rollback_by_xid))(xid));
-      res= 0;
-    }
+    hton->rollback_by_xid(((struct xahton_st *)arg)->xid);
+    ((struct xahton_st *)arg)->result= 0;
   }
-  return res;
+  return FALSE;
+}
+
+
+int ha_commit_or_rollback_by_xid(XID *xid, bool commit)
+{
+  struct xahton_st xaop;
+  xaop.xid= xid;
+  xaop.result= 1;
+  
+  plugin_foreach(NULL, commit ? xacommit_handlerton : xarollback_handlerton,
+                 MYSQL_STORAGE_ENGINE_PLUGIN, &xaop);
+
+  return xaop.result;
 }
 
 
@@ -820,99 +975,123 @@
      in this case commit_list==0, tc_heuristic_recover == 0
      there should be no prepared transactions in this case.
 */
-int ha_recover(HASH *commit_list)
-{
-  int len, got, found_foreign_xids=0, found_my_xids=0;
-  handlerton **types;
-  XID *list=0;
-  bool dry_run=(commit_list==0 && tc_heuristic_recover==0);
-  DBUG_ENTER("ha_recover");
-
-  /* commit_list and tc_heuristic_recover cannot be set both */
-  DBUG_ASSERT(commit_list==0 || tc_heuristic_recover==0);
-  /* if either is set, total_ha_2pc must be set too */
-  DBUG_ASSERT(dry_run || total_ha_2pc>(ulong)opt_bin_log);
-
-  if (total_ha_2pc <= (ulong)opt_bin_log)
-    DBUG_RETURN(0);
-
-  if (commit_list)
-    sql_print_information("Starting crash recovery...");
-
-#ifndef WILL_BE_DELETED_LATER
-  /*
-    for now, only InnoDB supports 2pc. It means we can always safely
-    rollback all pending transactions, without risking inconsistent data
-  */
-  DBUG_ASSERT(total_ha_2pc == (ulong) opt_bin_log+1); // only InnoDB and binlog
-  tc_heuristic_recover= TC_HEURISTIC_RECOVER_ROLLBACK; // forcing ROLLBACK
-  dry_run=FALSE;
-#endif
 
-  for (len= MAX_XID_LIST_SIZE ; list==0 && len > MIN_XID_LIST_SIZE; len/=2)
-  {
-    list=(XID *)my_malloc(len*sizeof(XID), MYF(0));
-  }
-  if (!list)
-  {
-    sql_print_error(ER(ER_OUTOFMEMORY), len*sizeof(XID));
-    DBUG_RETURN(1);
-  }
+struct xarecover_st
+{
+  int len, found_foreign_xids, found_my_xids;
+  XID *list;
+  HASH *commit_list;
+  bool dry_run;
+};
 
-  for (types= sys_table_types; *types; types++)
+static my_bool xarecover_handlerton(THD *unused, st_plugin_int *plugin,
+                                    void *arg)
+{
+  handlerton *hton= (handlerton *) plugin->plugin->info;
+  struct xarecover_st *info= (struct xarecover_st *) arg;
+  int got;
+  
+  if (hton->state == SHOW_OPTION_YES && hton->recover)
   {
-    if ((*types)->state != SHOW_OPTION_YES || !(*types)->recover)
-      continue;
-    while ((got=(*(*types)->recover)(list, len)) > 0 )
+    while ((got= hton->recover(info->list, info->len)) > 0 )
     {
       sql_print_information("Found %d prepared transaction(s) in %s",
-                            got, (*types)->name);
+                            got, hton->name);
       for (int i=0; i < got; i ++)
       {
-        my_xid x=list[i].get_my_xid();
+        my_xid x=info->list[i].get_my_xid();
         if (!x) // not "mine" - that is generated by external TM
         {
 #ifndef DBUG_OFF
           char buf[XIDDATASIZE*4+6]; // see xid_to_str
-          sql_print_information("ignore xid %s", xid_to_str(buf, list+i));
+          sql_print_information("ignore xid %s", xid_to_str(buf, info->list+i));
 #endif
-          xid_cache_insert(list+i, XA_PREPARED);
-          found_foreign_xids++;
+          xid_cache_insert(info->list+i, XA_PREPARED);
+          info->found_foreign_xids++;
           continue;
         }
-        if (dry_run)
+        if (info->dry_run)
         {
-          found_my_xids++;
+          info->found_my_xids++;
           continue;
         }
         // recovery mode
-        if (commit_list ?
-            hash_search(commit_list, (byte *)&x, sizeof(x)) != 0 :
+        if (info->commit_list ?
+            hash_search(info->commit_list, (byte *)&x, sizeof(x)) != 0 :
             tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
         {
 #ifndef DBUG_OFF
           char buf[XIDDATASIZE*4+6]; // see xid_to_str
-          sql_print_information("commit xid %s", xid_to_str(buf, list+i));
+          sql_print_information("commit xid %s", xid_to_str(buf, info->list+i));
 #endif
-          (*(*types)->commit_by_xid)(list+i);
+          hton->commit_by_xid(info->list+i);
         }
         else
         {
 #ifndef DBUG_OFF
           char buf[XIDDATASIZE*4+6]; // see xid_to_str
-          sql_print_information("rollback xid %s", xid_to_str(buf, list+i));
+          sql_print_information("rollback xid %s",
+                                xid_to_str(buf, info->list+i));
 #endif
-          (*(*types)->rollback_by_xid)(list+i);
+          hton->rollback_by_xid(info->list+i);
         }
       }
-      if (got < len)
+      if (got < info->len)
         break;
     }
   }
-  my_free((gptr)list, MYF(0));
-  if (found_foreign_xids)
-    sql_print_warning("Found %d prepared XA transactions", found_foreign_xids);
-  if (dry_run && found_my_xids)
+  return FALSE;
+}
+
+int ha_recover(HASH *commit_list)
+{
+  struct xarecover_st info;
+  DBUG_ENTER("ha_recover");
+  info.found_foreign_xids= info.found_my_xids= 0;
+  info.commit_list= commit_list;
+  info.dry_run= (info.commit_list==0 && tc_heuristic_recover==0);
+  info.list= NULL;
+
+  /* commit_list and tc_heuristic_recover cannot be set both */
+  DBUG_ASSERT(info.commit_list==0 || tc_heuristic_recover==0);
+  /* if either is set, total_ha_2pc must be set too */
+  DBUG_ASSERT(info.dry_run || total_ha_2pc>(ulong)opt_bin_log);
+
+  if (total_ha_2pc <= (ulong)opt_bin_log)
+    DBUG_RETURN(0);
+
+  if (info.commit_list)
+    sql_print_information("Starting crash recovery...");
+
+#ifndef WILL_BE_DELETED_LATER
+  /*
+    for now, only InnoDB supports 2pc. It means we can always safely
+    rollback all pending transactions, without risking inconsistent data
+  */
+  DBUG_ASSERT(total_ha_2pc == (ulong) opt_bin_log+1); // only InnoDB and binlog
+  tc_heuristic_recover= TC_HEURISTIC_RECOVER_ROLLBACK; // forcing ROLLBACK
+  info.dry_run=FALSE;
+#endif
+
+  for (info.len= MAX_XID_LIST_SIZE ; 
+       info.list==0 && info.len > MIN_XID_LIST_SIZE; info.len/=2)
+  {
+    info.list=(XID *)my_malloc(info.len*sizeof(XID), MYF(0));
+  }
+  if (!info.list)
+  {
+    sql_print_error(ER(ER_OUTOFMEMORY), info.len*sizeof(XID));
+    DBUG_RETURN(1);
+  }
+
+  plugin_foreach(NULL, xarecover_handlerton, 
+                 MYSQL_STORAGE_ENGINE_PLUGIN, &info);
+
+  my_free((gptr)info.list, MYF(0));
+  if (info.found_foreign_xids)
+    sql_print_warning("Found %d prepared XA transactions", 
+                      info.found_foreign_xids);
+  if (info.dry_run && info.found_my_xids)
   {
     sql_print_error("Found %d prepared transactions! It means that mysqld was "
                     "not shut down properly last time and critical recovery "
@@ -920,10 +1099,10 @@
                     "after a crash. You have to start mysqld with "
                     "--tc-heuristic-recover switch to commit or rollback "
                     "pending transactions.",
-                    found_my_xids, opt_tc_log_file);
+                    info.found_my_xids, opt_tc_log_file);
     DBUG_RETURN(1);
   }
-  if (commit_list)
+  if (info.commit_list)
     sql_print_information("Crash recovery finished.");
   DBUG_RETURN(0);
 }
@@ -996,32 +1175,17 @@
 
 int ha_release_temporary_latches(THD *thd)
 {
-  handlerton **types;
-
-  for (types= sys_table_types; *types; types++)
-  {
-    if ((*types)->state == SHOW_OPTION_YES &&
-        (*types)->release_temporary_latches)
-      (*types)->release_temporary_latches(thd);
-  }
-  return 0;
+#ifdef WITH_INNOBASE_STORAGE_ENGINE
+  innobase_release_temporary_latches(thd);
+#endif
 }
 
 
-/* 
-  Export statistics for different engines. Currently we use it only for
-  InnoDB.
-*/
-
 int ha_update_statistics()
 {
-  handlerton **types;
-
-  for (types= sys_table_types; *types; types++)
-  {
-    if ((*types)->state == SHOW_OPTION_YES && (*types)->update_statistics)
-      (*types)->update_statistics();
-  }
+#ifdef WITH_INNOBASE_STORAGE_ENGINE
+  innodb_export_status();
+#endif
   return 0;
 }
 
@@ -1130,20 +1294,25 @@
 }
 
 
+static my_bool snapshot_handlerton(THD *thd, st_plugin_int *plugin,
+                                   void *arg)
+{
+  handlerton *hton= (handlerton *) plugin->plugin->info;
+  if (hton->state == SHOW_OPTION_YES &&
+      hton->start_consistent_snapshot)
+  {
+    hton->start_consistent_snapshot(thd);
+    *((bool *)arg)= false;
+  }
+  return FALSE;
+}
+
 int ha_start_consistent_snapshot(THD *thd)
 {
   bool warn= true;
-  handlerton **types;
 
-  for (types= sys_table_types; *types; types++)
-  {
-    if ((*types)->state == SHOW_OPTION_YES &&
-        (*types)->start_consistent_snapshot)
-    {
-      (*types)->start_consistent_snapshot(thd);
-      warn= false; /* hope user is using engine */
-    }
-  }
+  plugin_foreach(thd, snapshot_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN, &warn);
+
   /*
     Same idea as when one wants to CREATE TABLE in one engine which does not
     exist:
@@ -1156,22 +1325,31 @@
 }
 
 
-bool ha_flush_logs(enum db_type db_type)
+static my_bool flush_handlerton(THD *thd, st_plugin_int *plugin,
+                                void *arg)
 {
-  bool result=0;
-  handlerton **types;
+  handlerton *hton= (handlerton *) plugin->plugin->info;
+  if (hton->state == SHOW_OPTION_YES && hton->flush_logs && hton->flush_logs())
+    return TRUE;
+  return FALSE;
+}
 
-  for (types= sys_table_types; *types; types++)
+
+bool ha_flush_logs(handlerton *db_type)
+{
+  if (db_type == NULL)
   {
-    if ((*types)->state == SHOW_OPTION_YES && 
-        (db_type == DB_TYPE_DEFAULT || db_type == (*types)->db_type) &&
-        (*types)->flush_logs)
-    {
-      if ((*types)->flush_logs())
-        result= 1;
-    }
+    if (plugin_foreach(NULL, flush_handlerton,
+                          MYSQL_STORAGE_ENGINE_PLUGIN, 0))
+      return TRUE;
   }
-  return result;
+  else
+  {
+    if (db_type->state != SHOW_OPTION_YES ||
+        (db_type->flush_logs && db_type->flush_logs()))
+      return TRUE;
+  }
+  return FALSE;
 }
 
 /*
@@ -1179,7 +1357,7 @@
   The .frm file will be deleted only if we return 0 or ENOENT
 */
 
-int ha_delete_table(THD *thd, enum db_type table_type, const char *path,
+int ha_delete_table(THD *thd, handlerton *table_type, const char *path,
                     const char *db, const char *alias, bool generate_warning)
 {
   handler *file;
@@ -1194,7 +1372,7 @@
   dummy_table.s= &dummy_share;
 
   /* DB_TYPE_UNKNOWN is used in ALTER TABLE when renaming only .frm files */
-  if (table_type == DB_TYPE_UNKNOWN ||
+  if (table_type == NULL ||
       ! (file=get_new_handler(&dummy_share, thd->mem_root, table_type)))
     DBUG_RETURN(ENOENT);
 
@@ -2600,40 +2778,50 @@
     pointer		pointer to TYPELIB structure
 */
 
+static my_bool exts_handlerton(THD *unused, st_plugin_int *plugin,
+                               void *arg)
+{
+  List<char> *found_exts= (List<char> *) arg;
+  handlerton *hton= (handlerton *) plugin->plugin->info;
+  handler *file;
+  if (hton->state == SHOW_OPTION_YES && hton->create &&
+      (file= hton->create((TABLE_SHARE*) 0)))
+  {
+    List_iterator_fast<char> it(*found_exts);
+    const char **ext, *old_ext;
+    
+    for (ext= file->bas_ext(); *ext; ext++)
+    {
+      while ((old_ext= it++))
+      {
+        if (!strcmp(old_ext, *ext))
+	  break;
+      }
+      if (!old_ext)
+        found_exts->push_back((char *) *ext);
+
+      it.rewind();
+    }
+    delete file;
+  }
+  return FALSE;
+}
+
 TYPELIB *ha_known_exts(void)
 {
   MEM_ROOT *mem_root= current_thd->mem_root;
   if (!known_extensions.type_names || mysys_usage_id != known_extensions_id)
   {
-    handlerton **types;
     List<char> found_exts;
-    List_iterator_fast<char> it(found_exts);
     const char **ext, *old_ext;
 
     known_extensions_id= mysys_usage_id;
     found_exts.push_back((char*) triggers_file_ext);
     found_exts.push_back((char*) trigname_file_ext);
-    for (types= sys_table_types; *types; types++)
-    {
-      if ((*types)->state == SHOW_OPTION_YES)
-      {
-	handler *file= get_new_handler((TABLE_SHARE*) 0, mem_root,
-                                       (enum db_type) (*types)->db_type);
-	for (ext= file->bas_ext(); *ext; ext++)
-	{
-	  while ((old_ext= it++))
-          {
-	    if (!strcmp(old_ext, *ext))
-	      break;
-          }
-	  if (!old_ext)
-	    found_exts.push_back((char *) *ext);
-
-	  it.rewind();
-	}
-	delete file;
-      }
-    }
+    
+    plugin_foreach(NULL, exts_handlerton, 
+                   MYSQL_STORAGE_ENGINE_PLUGIN, &found_exts);
+
     ext= (const char **) my_once_alloc(sizeof(char *)*
                                        (found_exts.elements+1),
                                        MYF(MY_WME | MY_FAE));
@@ -2642,6 +2830,7 @@
     known_extensions.count= found_exts.elements;
     known_extensions.type_names= ext;
 
+    List_iterator_fast<char> it(found_exts);
     while ((old_ext= it++))
       *ext++= old_ext;
     *ext= 0;
@@ -2649,24 +2838,38 @@
   return &known_extensions;
 }
 
-static bool stat_print(THD *thd, const char *type, const char *file,
-                       const char *status)
+
+static bool stat_print(THD *thd, const char *type, uint type_len,
+                       const char *file, uint file_len,
+                       const char *status, uint status_len)
 {
   Protocol *protocol= thd->protocol;
   protocol->prepare_for_resend();
-  protocol->store(type, system_charset_info);
-  protocol->store(file, system_charset_info);
-  protocol->store(status, system_charset_info);
+  protocol->store(type, type_len, system_charset_info);
+  protocol->store(file, file_len, system_charset_info);
+  protocol->store(status, status_len, system_charset_info);
   if (protocol->write())
     return TRUE;
   return FALSE;
 }
 
-bool ha_show_status(THD *thd, enum db_type db_type, enum ha_stat_type stat)
+
+static my_bool showstat_handlerton(THD *thd, st_plugin_int *plugin,
+                                   void *arg)
+{
+  enum ha_stat_type stat= *(enum ha_stat_type *) arg;
+  handlerton *hton= (handlerton *) plugin->plugin->info;
+  if (hton->state == SHOW_OPTION_YES && hton->show_status &&
+      hton->show_status(thd, stat_print, stat))
+    return TRUE;
+  return FALSE;
+}
+
+bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat)
 {
-  handlerton **types;
   List<Item> field_list;
   Protocol *protocol= thd->protocol;
+  bool result;
 
   field_list.push_back(new Item_empty_string("Type",10));
   field_list.push_back(new Item_empty_string("Name",FN_REFLEN));
@@ -2676,29 +2879,29 @@
                             Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
     return TRUE;
 
-  for (types= sys_table_types; *types; types++)
+  if (db_type == NULL)
   {
-    if ((*types)->state == SHOW_OPTION_YES &&
-        (db_type == DB_TYPE_DEFAULT || db_type == (*types)->db_type) &&
-        (*types)->show_status)
-    {
-      if ((*types)->show_status(thd, stat_print, stat))
-        return TRUE;
-    }
-    else if (db_type == (*types)->db_type &&
-             (*types)->state != SHOW_OPTION_YES)
-    {
-      if (stat_print(thd, (*types)->name, "", "DISABLED"))
-	return TRUE;
-    }
+    result= plugin_foreach(thd, showstat_handlerton, 
+                           MYSQL_STORAGE_ENGINE_PLUGIN, &stat);
+  }
+  else
+  {
+    if (db_type->state != SHOW_OPTION_YES)
+      result= stat_print(thd, db_type->name, strlen(db_type->name), 
+                         "", 0, "DISABLED", 8) ? 1 : 0;
+    else
+      result= db_type->show_status && 
+              db_type->show_status(thd, stat_print, stat) ? 1 : 0;
   }
 
-  send_eof(thd);
-  return FALSE;
+  if (!result)
+    send_eof(thd);
+  return result;
 }
+
 /*
   Function to check if the conditions for row-based binlogging is
-  correct for the table. 
+  correct for the table.
 
   A row in the given table should be replicated if:
   - Row-based replication is on
@@ -2706,7 +2909,7 @@
   - The binlog is enabled
   - The table shall be binlogged (binlog_*_db rules) [Seems disabled /Matz]
 */
-  
+
 #ifdef HAVE_ROW_BASED_REPLICATION
 static bool check_table_binlog_row_based(THD *thd, TABLE *table)
 {
@@ -2725,7 +2928,7 @@
   bool error= 0;
   THD *const thd= current_thd;
 
-  if (check_table_binlog_row_based(thd, table)) 
+  if (check_table_binlog_row_based(thd, table))
   {
     MY_BITMAP cols;
     /* Potential buffer on the stack for the bitmap */
@@ -2733,8 +2936,8 @@
     uint n_fields= table->s->fields;
     my_bool use_bitbuf= n_fields <= sizeof(bitbuf)*8;
     if (likely(!(error= bitmap_init(&cols,
-                                    use_bitbuf ? bitbuf : NULL, 
-                                    (n_fields + 7) & ~7UL, 
+                                    use_bitbuf ? bitbuf : NULL,
+                                    (n_fields + 7) & ~7UL,
                                     false))))
     {
       bitmap_set_all(&cols);
@@ -2762,7 +2965,7 @@
 
 #endif /* HAVE_ROW_BASED_REPLICATION */
 
-int handler::ha_write_row(byte *buf) 
+int handler::ha_write_row(byte *buf)
 {
   int error;
   if (likely(!(error= write_row(buf))))
@@ -2773,8 +2976,8 @@
   }
   return error;
 }
-  
-int handler::ha_update_row(const byte *old_data, byte *new_data) 
+
+int handler::ha_update_row(const byte *old_data, byte *new_data)
 {
   int error;
   if (likely(!(error= update_row(old_data, new_data))))
@@ -2785,8 +2988,8 @@
   }
   return error;
 }
-  
-int handler::ha_delete_row(const byte *buf) 
+
+int handler::ha_delete_row(const byte *buf)
 {
   int error;
   if (likely(!(error= delete_row(buf))))
@@ -2796,7 +2999,7 @@
 #endif
   }
   return error;
-}    
+}
 
 
 #ifdef HAVE_REPLICATION
@@ -2821,19 +3024,10 @@
 int ha_repl_report_sent_binlog(THD *thd, char *log_file_name,
                                my_off_t end_offset)
 {
-  int result= 0;
-  handlerton **types;
-
-  for (types= sys_table_types; *types; types++)
-  {
-    if ((*types)->state == SHOW_OPTION_YES &&
-        (*types)->repl_report_sent_binlog)
-    {
-      (*types)->repl_report_sent_binlog(thd,log_file_name,end_offset);
-      result= 0;
-    }
-  }
-  return result;
+#ifdef WITH_INNOBASE_STORAGE_ENGINE
+  innobase_repl_report_sent_binlog(thd, log_file_name, end_offset);
+#endif
+  return 0;
 }
 
 
@@ -2858,3 +3052,4 @@
   return 0;
 }
 #endif /* HAVE_REPLICATION */
+

--- 1.171/sql/handler.h	2006-01-04 16:08:47 +01:00
+++ 1.172/sql/handler.h	2006-01-05 11:46:12 +01:00
@@ -185,7 +185,7 @@
 /* Options of START TRANSACTION statement (and later of SET TRANSACTION stmt) */
 #define MYSQL_START_TRANS_OPT_WITH_CONS_SNAPSHOT 1
 
-enum db_type
+enum legacy_db_type
 {
   DB_TYPE_UNKNOWN=0,DB_TYPE_DIAB_ISAM=1,
   DB_TYPE_HASH,DB_TYPE_MISAM,DB_TYPE_PISAM,
@@ -198,7 +198,7 @@
   DB_TYPE_BLACKHOLE_DB,
   DB_TYPE_PARTITION_DB,
   DB_TYPE_BINLOG,
-  DB_TYPE_DEFAULT // Must be last
+  DB_TYPE_DEFAULT=127 // Must be last
 };
 
 enum row_type { ROW_TYPE_NOT_USED=-1, ROW_TYPE_DEFAULT, ROW_TYPE_FIXED,
@@ -322,8 +322,9 @@
 typedef struct st_table_share TABLE_SHARE;
 struct st_foreign_key_info;
 typedef struct st_foreign_key_info FOREIGN_KEY_INFO;
-typedef bool (stat_print_fn)(THD *thd, const char *type, const char *file,
-                             const char *status);
+typedef bool (stat_print_fn)(THD *thd, const char *type, uint type_len,
+                             const char *file, uint file_len,
+                             const char *status, uint status_len);
 enum ha_stat_type { HA_ENGINE_STATUS, HA_ENGINE_LOGS, HA_ENGINE_MUTEX };
 
 /*
@@ -340,6 +341,13 @@
 typedef struct
 {
   /*
+    handlerton structure version
+   */
+  const int interface_version;
+#define MYSQL_HANDLERTON_INTERFACE_VERSION 0x0000
+
+
+  /*
     storage engine name as it should be printed to a user
   */
   const char *name;
@@ -358,7 +366,7 @@
     Historical number used for frm file to determine the correct storage engine.
     This is going away and new engines will just use "name" for this.
   */
-  enum db_type db_type;
+  enum legacy_db_type db_type;
   /* 
     Method that initizlizes a storage engine
   */
@@ -423,13 +431,9 @@
    handler *(*create)(TABLE_SHARE *table);
    void (*drop_database)(char* path);
    int (*panic)(enum ha_panic_function flag);
-   int (*release_temporary_latches)(THD *thd);
-   int (*update_statistics)();
    int (*start_consistent_snapshot)(THD *thd);
    bool (*flush_logs)();
    bool (*show_status)(THD *thd, stat_print_fn *print, enum ha_stat_type stat);
-   int (*repl_report_sent_binlog)(THD *thd, char *log_file_name,
-                                  my_off_t end_offset);
    uint32 flags;                                /* global handler flags */
    int (*reset_logs)(THD *thd);
    int (*binlog_index_purge_file)(THD *thd, const char *file);
@@ -442,9 +446,11 @@
                          int query_length);
 } handlerton;
 
+extern const handlerton default_hton;
+
 struct show_table_alias_st {
   const char *alias;
-  const char *type;
+  enum legacy_db_type type;
 };
 
 /* Possible flags of a handlerton */
@@ -512,7 +518,7 @@
   char* part_comment;
   char* data_file_name;
   char* index_file_name;
-  enum db_type engine_type;
+  handlerton *engine_type;
   enum partition_state part_state;
   uint16 nodegroup_id;
   
@@ -520,7 +526,7 @@
   : part_max_rows(0), part_min_rows(0), partition_name(NULL),
     tablespace_name(NULL), range_value(0), part_comment(NULL),
     data_file_name(NULL), index_file_name(NULL),
-    engine_type(DB_TYPE_UNKNOWN),part_state(PART_NORMAL),
+    engine_type(NULL),part_state(PART_NORMAL),
     nodegroup_id(UNDEF_NODEGROUP)
   {
     subpartitions.empty();
@@ -583,7 +589,7 @@
   key_map all_fields_in_PF, all_fields_in_PPF, all_fields_in_SPF;
   key_map some_fields_in_PF;
 
-  enum db_type default_engine_type;
+  handlerton *default_engine_type;
   Item_result part_result_type;
   partition_type part_type;
   partition_type subpart_type;
@@ -624,7 +630,7 @@
     part_info_string(NULL),
     part_func_string(NULL), subpart_func_string(NULL),
     curr_part_elem(NULL), current_partition(NULL),
-    default_engine_type(DB_TYPE_UNKNOWN),
+    default_engine_type(NULL),
     part_result_type(INT_RESULT),
     part_type(NOT_A_PARTITION), subpart_type(NOT_A_PARTITION),
     part_info_len(0), part_func_len(0), subpart_func_len(0),
@@ -699,7 +705,7 @@
   ulong raid_chunksize;
   ulong used_fields;
   SQL_LIST merge_list;
-  enum db_type db_type;
+  handlerton *db_type;
   enum row_type row_type;
   uint null_bits;                       /* NULL bits at start of record */
   uint options;				/* OR of HA_CREATE_ options */
@@ -746,7 +752,7 @@
                          uint32 *old_part_id, uint32 *new_part_id);
 int get_part_for_delete(const byte *buf, const byte *rec0,
                         partition_info *part_info, uint32 *part_id);
-bool check_partition_info(partition_info *part_info,enum db_type eng_type,
+bool check_partition_info(partition_info *part_info,handlerton *eng_type,
                           handler *file, ulonglong max_rows);
 bool fix_partition_func(THD *thd, const char *name, TABLE *table);
 char *generate_partition_syntax(partition_info *part_info,
@@ -762,7 +768,7 @@
                                part_id_range *part_spec);
 bool mysql_unpack_partition(THD *thd, const uchar *part_buf,
                             uint part_info_len, TABLE *table,
-                            enum db_type default_db_type);
+                            handlerton *default_db_type);
 #endif
 
 
@@ -1481,32 +1487,56 @@
 #define ha_rollback(thd) (ha_rollback_trans((thd), TRUE))
 
 /* lookups */
-enum db_type ha_resolve_by_name(const char *name, uint namelen);
-const char *ha_get_storage_engine(enum db_type db_type);
+handlerton *ha_resolve_by_name(THD *thd, LEX_STRING *name);
+handlerton *ha_resolve_by_legacy_type(THD *thd, enum legacy_db_type db_type);
+const char *ha_get_storage_engine(enum legacy_db_type db_type);
 handler *get_new_handler(TABLE_SHARE *share, MEM_ROOT *alloc,
-                         enum db_type db_type);
-enum db_type ha_checktype(THD *thd, enum db_type database_type,
+                         handlerton *db_type);
+handlerton *ha_checktype(THD *thd, enum legacy_db_type database_type,
                           bool no_substitute, bool report_error);
-bool ha_check_storage_engine_flag(enum db_type db_type, uint32 flag);
+
+
+inline enum legacy_db_type ha_legacy_type(const handlerton *db_type)
+{
+  return (db_type == NULL) ? DB_TYPE_UNKNOWN : db_type->db_type;
+}
+
+inline const char *ha_resolve_storage_engine_name(const handlerton *db_type)
+{
+  return db_type == NULL ? "UNKNOWN" : db_type->name;
+}
+
+inline bool ha_check_storage_engine_flag(const handlerton *db_type, uint32 flag)
+{
+  return db_type == NULL ? FALSE : test(db_type->flags & flag);
+}
+
+inline bool ha_storage_engine_is_enabled(const handlerton *db_type)
+{
+  return (db_type && db_type->create) ? 
+         (db_type->state == SHOW_OPTION_YES) : FALSE;
+}
 
 /* basic stuff */
 int ha_init(void);
+int ha_register_builtin_plugins();
+int ha_initialize_handlerton(handlerton *hton);
+
 TYPELIB *ha_known_exts(void);
 int ha_panic(enum ha_panic_function flag);
 int ha_update_statistics();
 void ha_close_connection(THD* thd);
-my_bool ha_storage_engine_is_enabled(enum db_type database_type);
-bool ha_flush_logs(enum db_type db_type=DB_TYPE_DEFAULT);
+bool ha_flush_logs(handlerton *db_type);
 void ha_drop_database(char* path);
 int ha_create_table(THD *thd, const char *path,
                     const char *db, const char *table_name,
                     HA_CREATE_INFO *create_info,
 		    bool update_create_info);
-int ha_delete_table(THD *thd, enum db_type db_type, const char *path,
+int ha_delete_table(THD *thd, handlerton *db_type, const char *path,
                     const char *db, const char *alias, bool generate_warning);
 
 /* statistics and info */
-bool ha_show_status(THD *thd, enum db_type db_type, enum ha_stat_type stat);
+bool ha_show_status(THD *thd, handlerton *db_type, enum ha_stat_type stat);
 
 /* discovery */
 int ha_create_table_from_engine(THD* thd, const char *db, const char *name);

--- 1.178/sql/log.cc	2005-12-29 20:48:10 +01:00
+++ 1.179/sql/log.cc	2006-01-05 11:46:12 +01:00
@@ -61,6 +61,7 @@
 };
 
 handlerton binlog_hton = {
+  MYSQL_HANDLERTON_INTERFACE_VERSION,
   "binlog",
   SHOW_OPTION_YES,
   "This is a meta storage engine to represent the binlog in a transaction",
@@ -84,12 +85,9 @@
   NULL,                         /* Create a new handler */
   NULL,                         /* Drop a database */
   NULL,                         /* Panic call */
-  NULL,                         /* Release temporary latches */
-  NULL,                         /* Update Statistics */
   NULL,                         /* Start Consistent Snapshot */
   NULL,                         /* Flush logs */
   NULL,                         /* Show status */
-  NULL,                         /* Replication Report Sent Binlog */
   HTON_NOT_USER_SELECTABLE | HTON_HIDDEN
 };
 

--- 1.349/sql/mysql_priv.h	2006-01-04 16:08:47 +01:00
+++ 1.350/sql/mysql_priv.h	2006-01-05 11:46:12 +01:00
@@ -609,7 +609,7 @@
 int mysql_rm_table_part2_with_lock(THD *thd, TABLE_LIST *tables,
 				   bool if_exists, bool drop_temporary,
 				   bool log_query);
-bool quick_rm_table(enum db_type base,const char *db,
+bool quick_rm_table(handlerton *base,const char *db,
                     const char *table_name);
 void close_cached_table(THD *thd, TABLE *table);
 bool mysql_rename_tables(THD *thd, TABLE_LIST *table_list);
@@ -757,7 +757,7 @@
 bool mysql_create_like_table(THD *thd, TABLE_LIST *table,
                              HA_CREATE_INFO *create_info,
                              Table_ident *src_table);
-bool mysql_rename_table(enum db_type base,
+bool mysql_rename_table(handlerton *base,
 			const char *old_db,
 			const char * old_name,
 			const char *new_db,
@@ -1008,7 +1008,7 @@
 int lock_tables(THD *thd, TABLE_LIST *tables, uint counter, bool *need_reopen);
 TABLE *open_temporary_table(THD *thd, const char *path, const char *db,
 			    const char *table_name, bool link_in_list);
-bool rm_temporary_table(enum db_type base, char *path);
+bool rm_temporary_table(handlerton *base, char *path);
 void free_io_cache(TABLE *entry);
 void intern_close_table(TABLE *entry);
 bool close_thread_table(THD *thd, TABLE **table_ptr);
@@ -1342,6 +1342,10 @@
 #else
 extern SHOW_COMP_OPTION have_partition_db;
 #endif
+
+extern handlerton myisam_hton;
+extern handlerton myisammrg_hton;
+extern handlerton heap_hton;
 
 extern SHOW_COMP_OPTION have_isam;
 extern SHOW_COMP_OPTION have_row_based_replication;

--- 1.507/sql/mysqld.cc	2006-01-04 16:08:47 +01:00
+++ 1.508/sql/mysqld.cc	2006-01-05 11:46:12 +01:00
@@ -2658,6 +2658,18 @@
   strmake(pidfile_name, glob_hostname, sizeof(pidfile_name)-5);
   strmov(fn_ext(pidfile_name),".pid");		// Add proper extension
 
+  if (plugin_init())
+  {
+    sql_print_error("Failed to init plugins.");
+    return 1;
+  }
+
+  if (ha_register_builtin_plugins())
+  {
+    sql_print_error("Failed to register built-in storage engines.");
+    return 1;
+  }
+
   load_defaults(conf_file_name, groups, &argc, &argv);
   defaults_argv=argv;
   get_options(argc,argv);
@@ -3181,17 +3193,15 @@
   /*
     Check that the default storage engine is actually available.
   */
-  if (!ha_storage_engine_is_enabled((enum db_type)
-                                    global_system_variables.table_type))
+  if (!ha_storage_engine_is_enabled(global_system_variables.table_type))
   {
     if (!opt_bootstrap)
     {
       sql_print_error("Default storage engine (%s) is not available",
-                      ha_get_storage_engine((enum db_type)
-                                            global_system_variables.table_type));
+                      global_system_variables.table_type->name);
       unireg_abort(1);
     }
-    global_system_variables.table_type= DB_TYPE_MYISAM;
+    global_system_variables.table_type= &myisam_hton;
   }
 
   tc_log= (total_ha_2pc > 1 ? (opt_bin_log  ?
@@ -3546,7 +3556,7 @@
 
   if (!opt_noacl)
   {
-    plugin_init();
+    plugin_load();
 #ifdef HAVE_DLOPEN
     udf_init();
 #endif
@@ -6256,6 +6266,7 @@
   {"Com_show_master_status",   (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_MASTER_STAT]), SHOW_LONG_STATUS},
   {"Com_show_new_master",      (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_NEW_MASTER]), SHOW_LONG_STATUS},
   {"Com_show_open_tables",     (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_OPEN_TABLES]), SHOW_LONG_STATUS},
+  {"Com_show_plugins",         (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PLUGINS]), SHOW_LONG_STATUS},
   {"Com_show_privileges",      (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PRIVILEGES]), SHOW_LONG_STATUS},
   {"Com_show_processlist",     (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_PROCESSLIST]), SHOW_LONG_STATUS},
   {"Com_show_slave_hosts",     (char*) offsetof(STATUS_VAR, com_stat[(uint) SQLCOM_SHOW_SLAVE_HOSTS]), SHOW_LONG_STATUS},
@@ -6566,7 +6577,7 @@
 
 
   /* Set default values for some option variables */
-  global_system_variables.table_type=   DB_TYPE_MYISAM;
+  global_system_variables.table_type= &myisam_hton;
   global_system_variables.tx_isolation= ISO_REPEATABLE_READ;
   global_system_variables.select_limit= (ulonglong) HA_POS_ERROR;
   max_system_variables.select_limit=    (ulonglong) HA_POS_ERROR;
@@ -6986,9 +6997,9 @@
     break;
   case OPT_STORAGE_ENGINE:
   {
-    if ((enum db_type)((global_system_variables.table_type=
-                        ha_resolve_by_name(argument, strlen(argument)))) ==
-        DB_TYPE_UNKNOWN)
+    LEX_STRING name= { argument, strlen(argument) };
+    if ((global_system_variables.table_type=
+                        ha_resolve_by_name(current_thd, &name)) == NULL)
     {
       fprintf(stderr,"Unknown/unsupported table type: %s\n",argument);
       exit(1);

--- 1.286/sql/sql_base.cc	2006-01-04 16:08:48 +01:00
+++ 1.287/sql/sql_base.cc	2006-01-05 11:46:13 +01:00
@@ -1449,7 +1449,7 @@
 
 void close_temporary(TABLE *table, bool free_share, bool delete_table)
 {
-  db_type table_type= table->s->db_type;
+  handlerton *table_type= table->s->db_type;
   DBUG_ENTER("close_temporary");
 
   free_io_cache(table);
@@ -1818,7 +1818,7 @@
     */
     {
       char path[FN_REFLEN];
-      db_type not_used;
+      enum legacy_db_type not_used;
       strxnmov(path, FN_REFLEN-1, mysql_data_home, "/", table_list->db, "/",
                table_list->table_name, reg_ext, NullS);
       (void) unpack_filename(path, path);
@@ -3383,7 +3383,7 @@
 }
 
 
-bool rm_temporary_table(enum db_type base, char *path)
+bool rm_temporary_table(handlerton *base, char *path)
 {
   bool error=0;
   handler *file;

--- 1.271/sql/sql_class.h	2005-12-29 20:48:14 +01:00
+++ 1.272/sql/sql_class.h	2006-01-05 11:46:13 +01:00
@@ -212,7 +212,7 @@
   ulong read_rnd_buff_size;
   ulong div_precincrement;
   ulong sortbuff_size;
-  ulong table_type;
+  handlerton *table_type;
   ulong tmp_table_size;
   ulong tx_isolation;
   ulong completion_type;

--- 1.487/sql/sql_parse.cc	2005-12-29 20:48:15 +01:00
+++ 1.488/sql/sql_parse.cc	2006-01-05 11:46:13 +01:00
@@ -6485,7 +6485,7 @@
     rotate_relay_log(active_mi);
     pthread_mutex_unlock(&LOCK_active_mi);
 #endif
-    if (ha_flush_logs())
+    if (ha_flush_logs(NULL))
       result=1;
     if (flush_error_log())
       result=1;
@@ -6803,7 +6803,7 @@
   HA_CREATE_INFO create_info;
   DBUG_ENTER("mysql_create_index");
   bzero((char*) &create_info,sizeof(create_info));
-  create_info.db_type=DB_TYPE_DEFAULT;
+  create_info.db_type= (handlerton*) &default_hton;
   create_info.default_table_charset= thd->variables.collation_database;
   DBUG_RETURN(mysql_alter_table(thd,table_list->db,table_list->table_name,
 				&create_info, table_list,
@@ -6819,7 +6819,7 @@
   HA_CREATE_INFO create_info;
   DBUG_ENTER("mysql_drop_index");
   bzero((char*) &create_info,sizeof(create_info));
-  create_info.db_type=DB_TYPE_DEFAULT;
+  create_info.db_type= (handlerton*) &default_hton;
   create_info.default_table_charset= thd->variables.collation_database;
   alter_info->clear();
   alter_info->flags= ALTER_DROP_INDEX;

--- 1.92/mysql-test/r/information_schema.result	2005-12-21 19:24:56 +01:00
+++ 1.93/mysql-test/r/information_schema.result	2006-01-05 11:50:53 +01:00
@@ -14,6 +14,7 @@
 select schema_name from information_schema.schemata;
 schema_name
 information_schema
+cluster_replication
 mysql
 test
 show databases like 't%';
@@ -22,6 +23,7 @@
 show databases;
 Database
 information_schema
+cluster_replication
 mysql
 test
 show databases where `database` = 't%';
@@ -34,7 +36,7 @@
 create table t5 (id int auto_increment primary key);
 insert into t5 values (10);
 create view v1 (c) as select table_name from information_schema.TABLES;
-select * from v1;
+select * from v1 where c not in ('apply_status');
 c
 CHARACTER_SETS
 COLLATIONS
@@ -53,6 +55,7 @@
 TRIGGERS
 VIEWS
 USER_PRIVILEGES
+binlog_index
 columns_priv
 db
 func
@@ -328,6 +331,7 @@
 select * from v0;
 c
 information_schema
+cluster_replication
 mysql
 test
 explain select * from v0;
@@ -795,8 +799,9 @@
 flush privileges;
 SELECT table_schema, count(*) FROM information_schema.TABLES GROUP BY TABLE_SCHEMA;
 table_schema	count(*)
-information_schema	17
-mysql	18
+cluster_replication	<count>
+information_schema	<count>
+mysql	<count>
 create table t1 (i int, j int);
 create trigger trg1 before insert on t1 for each row
 begin

--- 1.226/sql/ha_ndbcluster.cc	2005-12-21 19:24:56 +01:00
+++ 1.227/sql/ha_ndbcluster.cc	2006-01-05 11:50:53 +01:00
@@ -33,6 +33,8 @@
 #include <../util/Bitmask.hpp>
 #include <ndbapi/NdbIndexStat.hpp>
 
+#include "ha_ndbcluster_binlog.h"
+
 // options from from mysqld.cc
 extern my_bool opt_ndb_optimized_node_selection;
 extern const char *opt_ndbcluster_connectstring;
@@ -50,13 +52,9 @@
 // createable against NDB from this handler
 static const int max_transactions= 3; // should really be 2 but there is a transaction to much allocated when loch table is used
 
-static const char *ha_ndb_ext=".ndb";
-static const char share_prefix[]= "./";
-
-static int ndbcluster_close_connection(THD *thd);
-static int ndbcluster_commit(THD *thd, bool all);
-static int ndbcluster_rollback(THD *thd, bool all);
-static handler* ndbcluster_create_handler(TABLE_SHARE *table);
+static bool ndbcluster_init(void);
+static int ndbcluster_end(ha_panic_function flag);
+static bool ndbcluster_show_status(THD*,stat_print_fn *,enum ha_stat_type);
 
 handlerton ndbcluster_hton = {
   MYSQL_HANDLERTON_INTERFACE_VERSION,
@@ -65,28 +63,7 @@
   "Clustered, fault-tolerant, memory-based tables", 
   DB_TYPE_NDBCLUSTER,
   ndbcluster_init,
-  0, /* slot */
-  0, /* savepoint size */
-  ndbcluster_close_connection,
-  NULL, /* savepoint_set */
-  NULL, /* savepoint_rollback */
-  NULL, /* savepoint_release */
-  ndbcluster_commit,
-  ndbcluster_rollback,
-  NULL, /* prepare */
-  NULL, /* recover */
-  NULL, /* commit_by_xid */
-  NULL, /* rollback_by_xid */
-  NULL, /* create_cursor_read_view */
-  NULL, /* set_cursor_read_view */
-  NULL, /* close_cursor_read_view */
-  ndbcluster_create_handler, /* Create a new handler */
-  ndbcluster_drop_database, /* Drop a database */
-  ndbcluster_end, /* Panic call */
-  NULL, /* Start Consistent Snapshot */
-  NULL, /* Flush logs */
-  ndbcluster_show_status, /* Show status */
-  HTON_NO_FLAGS
+  ~(uint)0, /* slot */
 };
 
 static handler *ndbcluster_create_handler(TABLE_SHARE *table)
@@ -119,33 +96,24 @@
   break;                                 \
 }
 
-// Typedefs for long names
-typedef NdbDictionary::Object NDBOBJ;
-typedef NdbDictionary::Column NDBCOL;
-typedef NdbDictionary::Table NDBTAB;
-typedef NdbDictionary::Index  NDBINDEX;
-typedef NdbDictionary::Dictionary  NDBDICT;
-typedef NdbDictionary::Event  NDBEVENT;
-
 static int ndbcluster_inited= 0;
-static int ndbcluster_util_inited= 0;
+int ndbcluster_util_inited= 0;
 
 static Ndb* g_ndb= NULL;
-static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
+unsigned char g_node_id_map[max_ndb_nodes];
 
 // Handler synchronization
 pthread_mutex_t ndbcluster_mutex;
 
 // Table lock handling
-static HASH ndbcluster_open_tables;
+HASH ndbcluster_open_tables;
 
 static byte *ndbcluster_get_key(NDB_SHARE *share,uint *length,
                                 my_bool not_used __attribute__((unused)));
-static NDB_SHARE *get_share(const char *key,
-                            bool create_if_not_exists= TRUE,
-                            bool have_lock= FALSE);
-static void free_share(NDB_SHARE **share, bool have_lock= FALSE);
-static void real_free_share(NDB_SHARE **share);
+#ifdef HAVE_NDB_BINLOG
+static int rename_share(NDB_SHARE *share, const char *new_key, bool have_lock);
+#endif
 static void ndb_set_fragmentation(NDBTAB &tab, TABLE *table, uint pk_len);
 
 static int packfrm(const void *data, uint len, const void **pack_data, uint *pack_len);
@@ -155,35 +123,9 @@
 static int ndb_get_table_statistics(Ndb*, const char *, 
                                     struct Ndb_statistics *);
 
-#ifndef DBUG_OFF
-void print_records(TABLE *table, const char *record)
-{
-  if (_db_on_)
-  {
-    for (uint j= 0; j < table->s->fields; j++)
-    {
-      char buf[40];
-      int pos= 0;
-      Field *field= table->field[j];
-      const byte* field_ptr= field->ptr - table->record[0] + record;
-      int pack_len= field->pack_length();
-      int n= pack_len < 10 ? pack_len : 10;
-      
-      for (int i= 0; i < n && pos < 20; i++)
-      {
-	pos+= sprintf(&buf[pos]," %x", (int) (unsigned char) field_ptr[i]);
-      }
-      buf[pos]= 0;
-      DBUG_PRINT("info",("[%u]field_ptr[0->%d]: %s", j, n, buf));
-    }
-  }
-}
-#else
-#define print_records(a,b)
-#endif
 
 // Util thread variables
-static pthread_t ndb_util_thread;
+pthread_t ndb_util_thread;
 pthread_mutex_t LOCK_ndb_util_thread;
 pthread_cond_t COND_ndb_util_thread;
 pthread_handler_t ndb_util_thread_func(void *arg);
@@ -212,7 +154,7 @@
 static const char * ndb_connected_host= 0;
 static long ndb_connected_port= 0;
 static long ndb_number_of_replicas= 0;
-static long ndb_number_of_storage_nodes= 0;
+long ndb_number_of_storage_nodes= 0;
 
 static int update_status_variables(Ndb_cluster_connection *c)
 {
@@ -233,9 +175,6 @@
   {NullS, NullS, SHOW_LONG}
 };
 
-/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
-extern Uint64 g_latest_trans_gci;
-
 /*
   Error handling functions
 */
@@ -363,6 +302,7 @@
   all= NULL;
   stmt= NULL;
   error= 0;
+  options= 0;
 }
 
 Thd_ndb::~Thd_ndb()
@@ -388,14 +328,6 @@
 }
 
 inline
-Thd_ndb *
-get_thd_ndb(THD *thd) { return (Thd_ndb *) thd->ha_data[ndbcluster_hton.slot]; }
-
-inline
-void
-set_thd_ndb(THD *thd, Thd_ndb *thd_ndb) { thd->ha_data[ndbcluster_hton.slot]= thd_ndb; }
-
-inline
 Ndb *ha_ndbcluster::get_ndb()
 {
   return get_thd_ndb(current_thd)->ndb;
@@ -2514,8 +2446,8 @@
     set to null.
 */
 
-static void ndb_unpack_record(TABLE *table, NdbValue *value,
-                              MY_BITMAP *defined, byte *buf)
+void ndb_unpack_record(TABLE *table, NdbValue *value,
+                       MY_BITMAP *defined, byte *buf)
 {
   Field **p_field= table->field, *field= *p_field;
   uint row_offset= (uint) (buf - table->record[0]);
@@ -2753,6 +2685,7 @@
   statistic_increment(current_thd->status_var.ha_read_key_count, &LOCK_status);
   DBUG_ENTER("ha_ndbcluster::index_read_idx");
   DBUG_PRINT("enter", ("index_no: %u, key_len: %u", index_no, key_len));  
+  close_scan();
   index_init(index_no, 0);  
   DBUG_RETURN(index_read(buf, key, key_len, find_flag));
 }
@@ -3149,6 +3082,16 @@
     m_use_write= FALSE;
     m_ignore_dup_key= FALSE;
     break;
+  case HA_EXTRA_IGNORE_NO_KEY:
+    DBUG_PRINT("info", ("HA_EXTRA_IGNORE_NO_KEY"));
+    DBUG_PRINT("info", ("Turning on AO_IgnoreError at Commit/NoCommit"));
+    m_ignore_no_key= TRUE;
+    break;
+  case HA_EXTRA_NO_IGNORE_NO_KEY:
+    DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_NO_KEY"));
+    DBUG_PRINT("info", ("Turning on AO_IgnoreError at Commit/NoCommit"));
+    m_ignore_no_key= FALSE;
+    break;
   default:
     break;
   }
@@ -3577,7 +3520,7 @@
   Commit a transaction started in NDB
  */
 
-int ndbcluster_commit(THD *thd, bool all)
+static int ndbcluster_commit(THD *thd, bool all)
 {
   int res= 0;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -3628,7 +3571,7 @@
   Rollback a transaction started in NDB
  */
 
-int ndbcluster_rollback(THD *thd, bool all)
+static int ndbcluster_rollback(THD *thd, bool all)
 {
   int res= 0;
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
@@ -3974,6 +3917,12 @@
     */
     if ((my_errno= write_ndb_file()))
       DBUG_RETURN(my_errno);
+#ifdef HAVE_NDB_BINLOG
+    ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname,
+                                   ndb_binlog_thread_running > 0 &&
+                                   !is_prefix(m_tabname, tmp_file_prefix),
+                                   0, TRUE);
+#endif /* HAVE_NDB_BINLOG */
     DBUG_RETURN(my_errno);
   }
 
@@ -4120,6 +4069,73 @@
   if (!my_errno)
     my_errno= write_ndb_file();
 
+#ifdef HAVE_NDB_BINLOG
+  if (!my_errno)
+  {
+    NDB_SHARE *share= 0;
+    pthread_mutex_lock(&ndbcluster_mutex);
+    /*
+      First make sure we get a "fresh" share here, not an old trailing one...
+    */
+    {
+      const char *key= name2;
+      uint length= (uint) strlen(key);
+      if ((share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+                                           (byte*) key, length)))
+        handle_trailing_share(share, TRUE);
+    }
+    /*
+      get a new share
+    */
+    if (!(share= get_share(name2, true, true)))
+    {
+      sql_print_error("NDB: allocating table share for %s failed", name2);
+      /* my_errno is set */
+    }
+    pthread_mutex_unlock(&ndbcluster_mutex);
+
+    while (!is_prefix(m_tabname, tmp_file_prefix))
+    {
+      const NDBTAB *t= dict->getTable(m_tabname);
+      String event_name(INJECTOR_EVENT_LEN);
+      ndb_rep_event_name(&event_name,m_dbname,m_tabname);
+
+      /*
+        Always create an event for the table, as other mysql servers
+        expect it to be there.
+      */
+      if (ndbcluster_create_event(ndb, t, event_name.c_ptr(), share) < 0)
+      {
+        /* this is only a serious error if the binlog is on */
+	if (share && ndb_binlog_thread_running > 0)
+	{
+          push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                              ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                              "Creating event for logging table failed. "
+                              "See error log for details.");
+	}
+        break;
+      }
+      sql_print_information("NDB Binlog: CREATE TABLE Event: %s",
+                            event_name.c_ptr());
+
+      if (share && ndb_binlog_thread_running > 0 &&
+          ndbcluster_create_event_ops(share, t, event_name.c_ptr()) < 0)
+      {
+        sql_print_error("NDB Binlog: FAILED CREATE TABLE event operations."
+                        " Event: %s", name2);
+        /* a warning has been issued to the client */
+      }
+      ndbcluster_log_schema_op(current_thd, share,
+                               current_thd->query, current_thd->query_length,
+                               share->db, share->table_name,
+                               0, 0,
+                               SOT_CREATE_TABLE);
+      break;
+    }
+  }
+#endif /* HAVE_NDB_BINLOG */
+
   DBUG_RETURN(my_errno);
 }
 
@@ -4214,6 +4230,15 @@
     if (!(orig_tab= dict->getTable(m_tabname)))
       ERR_RETURN(dict->getNdbError());
   }
+#ifdef HAVE_NDB_BINLOG
+  NDB_SHARE *share= 0;
+  if (ndb_binlog_thread_running > 0 &&
+      (share= get_share(from, false)))
+  {
+    int r= rename_share(share, to, TRUE);
+    DBUG_ASSERT(r == 0);
+  }
+#endif
   m_table= (void *)orig_tab;
   // Change current database to that of target table
   set_dbname(to);
@@ -4221,6 +4246,14 @@
 
   if ((result= alter_table_name(new_tabname)))
   {
+#ifdef HAVE_NDB_BINLOG
+    if (share)
+    {
+      int r= rename_share(share, from, TRUE);
+      DBUG_ASSERT(r == 0);
+      free_share(&share);
+    }
+#endif
     DBUG_RETURN(result);
   }
   
@@ -4228,9 +4261,75 @@
   if ((result= handler::rename_table(from, to)))
   {
     // ToDo in 4.1 should rollback alter table...
+#ifdef HAVE_NDB_BINLOG
+    if (share)
+      free_share(&share);
+#endif
     DBUG_RETURN(result);
   }
 
+#ifdef HAVE_NDB_BINLOG
+  int is_old_table_tmpfile= 1;
+  if (share && share->op)
+    dict->forceGCPWait();
+
+  /* handle old table */
+  if (!is_prefix(m_tabname, tmp_file_prefix))
+  {
+    is_old_table_tmpfile= 0;
+    String event_name(INJECTOR_EVENT_LEN);
+    ndb_rep_event_name(&event_name, from + sizeof(share_prefix) - 1, 0);
+    ndbcluster_handle_drop_table(ndb, event_name.c_ptr(), share);
+  }
+
+  if (!result && !is_prefix(new_tabname, tmp_file_prefix))
+  {
+    /* always create an event for the table */
+    String event_name(INJECTOR_EVENT_LEN);
+    ndb_rep_event_name(&event_name, to + sizeof(share_prefix) - 1, 0);
+    const NDBTAB *ndbtab= dict->getTable(new_tabname);
+
+    if (ndbcluster_create_event(ndb, ndbtab, event_name.c_ptr(), share) >= 0)
+    {
+      sql_print_information("NDB Binlog: RENAME Event: %s",
+                            event_name.c_ptr());
+      if (share)
+      {
+        if (ndbcluster_create_event_ops(share, ndbtab,
+                                        event_name.c_ptr()) < 0)
+        {
+          sql_print_error("NDB Binlog: FAILED create event operations "
+                          "during RENAME. Event %s", event_name.c_ptr());
+          /* a warning has been issued to the client */
+        }
+      }
+    }
+    else
+    {
+      sql_print_error("NDB Binlog: FAILED create event during RENAME."
+                      "Event: %s", event_name.c_ptr());
+      push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+                          "Creating event for logging table failed. "
+                          "See error log for details.");
+    }
+    if (is_old_table_tmpfile)
+      ndbcluster_log_schema_op(current_thd, share,
+                               current_thd->query, current_thd->query_length,
+                               share->db, share->table_name,
+                               0, 0,
+                               SOT_ALTER_TABLE);
+    else
+      ndbcluster_log_schema_op(current_thd, share,
+                               current_thd->query, current_thd->query_length,
+                               share->db, share->table_name,
+                               0, 0,
+                               SOT_RENAME_TABLE);
+  }
+  if (share)
+    free_share(&share);
+#endif
+
   DBUG_RETURN(result);
 }
 
@@ -4273,6 +4372,9 @@
 {
   DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
   NDBDICT *dict= ndb->getDictionary();
+#ifdef HAVE_NDB_BINLOG
+  NDB_SHARE *share= get_share(path, false);
+#endif
 
   /* Drop the table from NDB */
   
@@ -4289,9 +4391,75 @@
 
   if (res)
   {
+#ifdef HAVE_NDB_BINLOG
+    /* the drop table failed for some reason, drop the share anyways */
+    if (share)
+    {
+      pthread_mutex_lock(&ndbcluster_mutex);
+      if (share->state != NSS_DROPPED)
+      {
+        /*
+          The share kept by the server has not been freed, free it
+        */
+        share->state= NSS_DROPPED;
+        free_share(&share, TRUE);
+      }
+      /* free the share taken above */
+      free_share(&share, TRUE);
+      pthread_mutex_unlock(&ndbcluster_mutex);
+    }
+#endif
     DBUG_RETURN(res);
   }
 
+#ifdef HAVE_NDB_BINLOG
+  /* stop the logging of the dropped table, and cleanup */
+
+  /*
+    drop table is successful even if table does not exist in ndb
+    and in case table was actually not dropped, there is no need
+    to force a gcp, and setting the event_name to null will indicate
+    that there is no event to be dropped
+  */
+  int table_dropped= dict->getNdbError().code != 709;
+
+  if (!is_prefix(table_name, tmp_file_prefix) && share)
+  {
+    ndbcluster_log_schema_op(current_thd, share,
+                             current_thd->query, current_thd->query_length,
+                             share->db, share->table_name,
+                             0, 0,
+                             SOT_DROP_TABLE);
+  }
+  else if (table_dropped && share && share->op) /* ndbcluster_log_schema_op
+                                                   will do a force GCP */
+    dict->forceGCPWait();
+
+  if (!is_prefix(table_name, tmp_file_prefix))
+  {
+    String event_name(INJECTOR_EVENT_LEN);
+    ndb_rep_event_name(&event_name, path + sizeof(share_prefix) - 1, 0);
+    ndbcluster_handle_drop_table(ndb,
+                                 table_dropped ? event_name.c_ptr() : 0,
+                                 share);
+  }
+
+  if (share)
+  {
+    pthread_mutex_lock(&ndbcluster_mutex);
+    if (share->state != NSS_DROPPED)
+    {
+      /*
+        The share kept by the server has not been freed, free it
+      */
+      share->state= NSS_DROPPED;
+      free_share(&share, TRUE);
+    }
+    /* free the share taken above */
+    free_share(&share, TRUE);
+    pthread_mutex_unlock(&ndbcluster_mutex);
+  }
+#endif
   DBUG_RETURN(0);
 }
 
@@ -4380,7 +4548,8 @@
                 HA_NO_PREFIX_CHAR_KEYS | \
                 HA_NEED_READ_RANGE_BUFFER | \
                 HA_CAN_GEOMETRY | \
-                HA_CAN_BIT_FIELD
+                HA_CAN_BIT_FIELD | \
+                HA_PRIMARY_KEY_ALLOW_RANDOM_ACCESS
 
 ha_ndbcluster::ha_ndbcluster(TABLE_SHARE *table_arg):
   handler(&ndbcluster_hton, table_arg),
@@ -4616,7 +4785,7 @@
 }
 
 
-int ndbcluster_close_connection(THD *thd)
+static int ndbcluster_close_connection(THD *thd)
 {
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   DBUG_ENTER("ndbcluster_close_connection");
@@ -4779,14 +4948,21 @@
   DBUG_RETURN(ret);      
 }
 
-void ndbcluster_drop_database(char *path)
+static void ndbcluster_drop_database(char *path)
 {
   ndbcluster_drop_database_impl(path);
+#ifdef HAVE_NDB_BINLOG
+  char db[FN_REFLEN];
+  ha_ndbcluster::set_dbname(path, db);
+  ndbcluster_log_schema_op(current_thd, 0,
+                           current_thd->query, current_thd->query_length,
+                           db, "", 0, 0, SOT_DROP_DB);
+#endif
 }
 /*
   find all tables in ndb and discover those needed
 */
-static int ndbcluster_find_all_files(THD *thd)
+int ndbcluster_find_all_files(THD *thd)
 {
   DBUG_ENTER("ndbcluster_find_all_files");
   Ndb* ndb;
@@ -4821,10 +4997,11 @@
 
       if (!(ndbtab= dict->getTable(elmt.name)))
       {
-        sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
-                        elmt.database, elmt.name,
-                        dict->getNdbError().code,
-                        dict->getNdbError().message);
+        if (elmt.state == NDBOBJ::StateOnline)
+          sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+                          elmt.database, elmt.name,
+                          dict->getNdbError().code,
+                          dict->getNdbError().message);
         unhandled++;
         continue;
       }
@@ -4863,6 +5040,34 @@
         }
         pthread_mutex_unlock(&LOCK_open);
       }
+#ifdef HAVE_NDB_BINLOG
+      else
+      {
+        /* set up replication for this table */
+        NDB_SHARE *share;
+        pthread_mutex_lock(&ndbcluster_mutex);
+        if (((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
+                                            (byte*) key, strlen(key)))
+              && share->op == 0 && share->op_old == 0)
+            || share == 0)
+        {
+          /*
+            there is no binlog creation setup for this table
+            attempt to do it
+          */
+          pthread_mutex_unlock(&ndbcluster_mutex);
+          pthread_mutex_lock(&LOCK_open);
+          ndbcluster_create_binlog_setup(ndb, key, elmt.database, elmt.name,
+                                         ndb_binlog_thread_running > 0 &&
+                                         !is_prefix(elmt.name,
+                                                    tmp_file_prefix),
+                                         share, TRUE);
+          pthread_mutex_unlock(&LOCK_open);
+        }
+        else
+          pthread_mutex_unlock(&ndbcluster_mutex);
+      }
+#endif
     }
   }
   while (unhandled && retries--);
@@ -4970,6 +5175,41 @@
     }
   }
 
+#ifdef HAVE_NDB_BINLOG
+  /* setup logging to binlog for all discovered tables */
+  if (ndb_binlog_thread_running > 0)
+  {
+    char *end;
+    char *end1=
+      strxnmov(name, sizeof(name), mysql_data_home, "/", db, "/", NullS);
+    NDB_SHARE *share;
+    pthread_mutex_lock(&ndbcluster_mutex);
+    for (i= 0; i < ok_tables.records; i++)
+    {
+      file_name= (char*)hash_element(&ok_tables, i);
+      end= strxnmov(end1, sizeof(name) - (end1 - name), file_name, NullS);
+      if ((share= (NDB_SHARE*)hash_search(&ndbcluster_open_tables,
+                                          (byte*)name, end - name))
+          && share->op == 0 && share->op_old == 0)
+      {
+        /*
+          there is no binlog creation setup for this table
+          attempt to do it
+	*/
+        
+        pthread_mutex_unlock(&ndbcluster_mutex);
+        pthread_mutex_lock(&LOCK_open);
+        ndbcluster_create_binlog_setup(ndb, name, db, file_name,
+                                       !is_prefix(file_name, tmp_file_prefix),
+                                       share, TRUE);
+        pthread_mutex_unlock(&LOCK_open);
+        pthread_mutex_lock(&ndbcluster_mutex);
+      }
+    }
+    pthread_mutex_unlock(&ndbcluster_mutex);
+  }
+#endif
+
   // Check for new files to discover
   DBUG_PRINT("info", ("Checking for new files to discover"));       
   List<char> create_list;
@@ -5042,11 +5282,18 @@
 static int connect_callback()
 {
   update_status_variables(g_ndb_cluster_connection);
+
+  uint node_id, i= 0;
+  Ndb_cluster_connection_node_iter node_iter;
+  memset((void *)g_node_id_map, 0xFFFF, sizeof(g_node_id_map));
+  while ((node_id= g_ndb_cluster_connection->get_next_node(node_iter)))
+    g_node_id_map[node_id]= i++;
+
   pthread_cond_signal(&COND_ndb_util_thread);
   return 0;
 }
 
-bool ndbcluster_init()
+static bool ndbcluster_init()
 {
   int res;
   DBUG_ENTER("ndbcluster_init");
@@ -5054,6 +5301,21 @@
   if (have_ndbcluster != SHOW_OPTION_YES)
     goto ndbcluster_init_error;
 
+  {
+    handlerton &h= ndbcluster_hton;
+    h.close_connection= ndbcluster_close_connection;
+    h.commit=           ndbcluster_commit;
+    h.rollback=         ndbcluster_rollback;
+    h.create=           ndbcluster_create_handler; /* Create a new handler */
+    h.drop_database=    ndbcluster_drop_database;  /* Drop a database */
+    h.panic=            ndbcluster_end;            /* Panic call */
+    h.show_status=      ndbcluster_show_status;    /* Show status */
+#ifdef HAVE_NDB_BINLOG
+    ndbcluster_binlog_init_handlerton();
+#endif
+    h.flags=            HTON_NO_FLAGS;
+  }
+
   // Set connectstring if specified
   if (opt_ndbcluster_connectstring != 0)
     DBUG_PRINT("connectstring", ("%s", opt_ndbcluster_connectstring));     
@@ -5117,6 +5379,22 @@
   (void) hash_init(&ndbcluster_open_tables,system_charset_info,32,0,0,
                    (hash_get_key) ndbcluster_get_key,0,0);
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
+#ifdef HAVE_NDB_BINLOG
+  /* start the ndb injector thread */
+  if (opt_bin_log)
+  {
+    if (binlog_row_based)
+    {
+      if (ndbcluster_binlog_start())
+        goto ndbcluster_init_error;
+    }
+    else
+    {
+      sql_print_error("NDB: only row based binary logging is supported");
+    }
+  }
+#endif /* HAVE_NDB_BINLOG */
+  
   pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&COND_ndb_util_thread, NULL);
 
@@ -5147,26 +5425,13 @@
   DBUG_RETURN(TRUE);
 }
 
-
-/*
-  End use of the NDB Cluster table handler
-  - free all global variables allocated by 
-    ndbcluster_init()
-*/
-
-int ndbcluster_end(ha_panic_function type)
+static int ndbcluster_end(ha_panic_function type)
 {
   DBUG_ENTER("ndbcluster_end");
 
   if (!ndbcluster_inited)
     DBUG_RETURN(0);
 
-  // Kill ndb utility thread
-  (void) pthread_mutex_lock(&LOCK_ndb_util_thread);  
-  DBUG_PRINT("exit",("killing ndb util thread: %lx", ndb_util_thread));
-  (void) pthread_cond_signal(&COND_ndb_util_thread);
-  (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
-
   if (g_ndb)
   {
 #ifndef DBUG_OFF
@@ -5193,7 +5458,6 @@
   pthread_mutex_destroy(&LOCK_ndb_util_thread);
   pthread_cond_destroy(&COND_ndb_util_thread);
   ndbcluster_inited= 0;
-  ndbcluster_util_inited= 0;
   DBUG_RETURN(0);
 }
 
@@ -5660,60 +5924,6 @@
 }
 
 
-#ifndef DBUG_OFF
-static void dbug_print_table(const char *info, TABLE *table)
-{
-  if (table == 0)
-  {
-    DBUG_PRINT("info",("%s: (null)", info));
-    return;
-  }
-  DBUG_PRINT("info",
-             ("%s: %s.%s s->fields: %d  "
-              "reclength: %d  rec_buff_length: %d  record[0]: %lx  "
-              "record[1]: %lx",
-              info,
-              table->s->db,
-              table->s->table_name,
-              table->s->fields,
-              table->s->reclength,
-              table->s->rec_buff_length,
-              table->record[0],
-              table->record[1]));
-
-  for (unsigned int i= 0; i < table->s->fields; i++) 
-  {
-    Field *f= table->field[i];
-    DBUG_PRINT("info",
-               ("[%d] \"%s\"(0x%lx:%s%s%s%s%s%s) type: %d  pack_length: %d  "
-                "ptr: 0x%lx[+%d]  null_bit: %u  null_ptr: 0x%lx[+%d]",
-                i,
-                f->field_name,
-                f->flags,
-                (f->flags & PRI_KEY_FLAG)  ? "pri"       : "attr",
-                (f->flags & NOT_NULL_FLAG) ? ""          : ",nullable",
-                (f->flags & UNSIGNED_FLAG) ? ",unsigned" : ",signed",
-                (f->flags & ZEROFILL_FLAG) ? ",zerofill" : "",
-                (f->flags & BLOB_FLAG)     ? ",blob"     : "",
-                (f->flags & BINARY_FLAG)   ? ",binary"   : "",
-                f->real_type(),
-                f->pack_length(),
-                f->ptr, f->ptr - table->record[0],
-                f->null_bit,
-                f->null_ptr, (byte*) f->null_ptr - table->record[0]));
-    if (f->type() == MYSQL_TYPE_BIT)
-    {
-      Field_bit *g= (Field_bit*) f;
-      DBUG_PRINT("MYSQL_TYPE_BIT",("field_length: %d  bit_ptr: 0x%lx[+%d] "
-                                   "bit_ofs: %u  bit_len: %u",
-                                   g->field_length, g->bit_ptr,
-                                   (byte*) g->bit_ptr-table->record[0],
-                                   g->bit_ofs, g->bit_len));
-    }
-  }
-}
-#endif
-
 /*
   Handling the shared NDB_SHARE structure that is needed to
   provide table locking.
@@ -5743,6 +5953,12 @@
                ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
                 share->db, share->table_name,
                 share->use_count, share->commit_count));
+#ifdef HAVE_NDB_BINLOG
+    if (share->table)
+      DBUG_PRINT("share",
+                 ("table->s->db.table_name: %s.%s",
+                  share->table->s->db.str, share->table->s->table_name.str));
+#endif
   }
   DBUG_VOID_RETURN;
 }
@@ -5750,11 +5966,170 @@
 #define dbug_print_open_tables()
 #endif
 
+#ifdef HAVE_NDB_BINLOG
+/*
+  For some reason a share is still around, try to salvage the situation
+  by closing all cached tables. If the share still exists, there is an
+  error somewhere but only report this to the error log.  Keep this
+  "trailing share" but rename it since there are still references to it
+  to avoid segmentation faults.  There is a risk that the memory for
+  this trailing share leaks.
+  
+  Must be called with previous pthread_mutex_lock(&ndbcluster_mutex)
+*/
+int handle_trailing_share(NDB_SHARE *share, bool have_lock)
+{
+  static ulong trailing_share_id= 0;
+  DBUG_ENTER("handle_trailing_share");
+
+  ++share->use_count;
+  pthread_mutex_unlock(&ndbcluster_mutex);
+
+  close_cached_tables((THD*) 0, 0, (TABLE_LIST*) 0, have_lock);
+
+  pthread_mutex_lock(&ndbcluster_mutex);
+  if (!--share->use_count)
+  {
+    DBUG_PRINT("info", ("NDB_SHARE: close_cashed_tables %s freed share.",
+               share->key)); 
+    real_free_share(&share);
+    DBUG_RETURN(0);
+  }
+
+  /*
+    share still exists, if share has not been dropped by server
+    release that share
+  */
+  if (share->state != NSS_DROPPED && !--share->use_count)
+  {
+    DBUG_PRINT("info", ("NDB_SHARE: %s already exists, "
+                        "use_count=%d  state != NSS_DROPPED.",
+                        share->key, share->use_count)); 
+    real_free_share(&share);
+    DBUG_RETURN(0);
+  }
+  DBUG_PRINT("error", ("NDB_SHARE: %s already exists  use_count=%d.",
+                       share->key, share->use_count));
+
+  sql_print_error("NDB_SHARE: %s already exists  use_count=%d."
+                  " Moving away for safety, but possible memleak.",
+                  share->key, share->use_count);
+  dbug_print_open_tables();
+
+  /*
+    This is probably an error.  We can however save the situation
+    at the cost of a possible mem leak, by "renaming" the share
+    - First remove from hash
+  */
+  hash_delete(&ndbcluster_open_tables, (byte*) share);
+
+  /*
+    now give it a new name, just a running number
+    if space is not enough allocate some more
+  */
+  {
+    const uint min_key_length= 10;
+    if (share->key_length < min_key_length)
+    {
+      share->key= alloc_root(&share->mem_root, min_key_length + 1);
+      share->key_length= min_key_length;
+    }
+    share->key_length=
+      my_snprintf(share->key, min_key_length + 1, "#leak%d",
+                  trailing_share_id++);
+  }
+  /* Keep it for possible the future trailing free */
+  my_hash_insert(&ndbcluster_open_tables, (byte*) share);
+
+  DBUG_RETURN(0);
+}
+
+/*
+  Rename share is used during rename table.
+*/
+static int rename_share(NDB_SHARE *share, const char *new_key, bool have_lock)
+{
+  NDB_SHARE *tmp;
+  pthread_mutex_lock(&ndbcluster_mutex);
+  uint new_length= (uint) strlen(new_key);
+  DBUG_PRINT("rename_share", ("old_key: %s  old__length: %d",
+                              share->key, share->key_length));
+  if ((tmp= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+                                     (byte*) new_key, new_length)))
+    handle_trailing_share(tmp, have_lock);
+
+  /* remove the share from hash */
+  hash_delete(&ndbcluster_open_tables, (byte*) share);
+  dbug_print_open_tables();
+
+  /* save old stuff if insert should fail */
+  uint old_length= share->key_length;
+  char *old_key= share->key;
+
+  /*
+    now allocate and set the new key, db etc
+    enough space for key, db, and table_name
+  */
+  share->key= alloc_root(&share->mem_root, 2 * (new_length + 1));
+  strmov(share->key, new_key);
+  share->key_length= new_length;
+
+  if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
+  {
+    // ToDo free the allocated stuff above?
+    DBUG_PRINT("error", ("rename_share: my_hash_insert %s failed",
+                         share->key));
+    share->key= old_key;
+    share->key_length= old_length;
+    if (my_hash_insert(&ndbcluster_open_tables, (byte*) share))
+    {
+      sql_print_error("rename_share: failed to recover %s", share->key);
+      DBUG_PRINT("error", ("rename_share: my_hash_insert %s failed",
+                           share->key));
+    }
+    dbug_print_open_tables();
+    pthread_mutex_unlock(&ndbcluster_mutex);
+    return -1;
+  }
+  dbug_print_open_tables();
+
+  share->db= share->key + new_length + 1;
+  ha_ndbcluster::set_dbname(new_key, share->db);
+  share->table_name= share->db + strlen(share->db) + 1;
+  ha_ndbcluster::set_tabname(new_key, share->table_name);
+
+  DBUG_PRINT("rename_share",
+             ("0x%lx key: %s  key_length: %d",
+              share, share->key, share->key_length));
+  DBUG_PRINT("rename_share",
+             ("db.tablename: %s.%s  use_count: %d  commit_count: %d",
+              share->db, share->table_name,
+              share->use_count, share->commit_count));
+  DBUG_PRINT("rename_share",
+             ("table->s->db.table_name: %s.%s",
+              share->table->s->db.str, share->table->s->table_name.str));
+
+  if (share->op == 0)
+  {
+    share->table->s->db.str= share->db;
+    share->table->s->db.length= strlen(share->db);
+    share->table->s->table_name.str= share->table_name;
+    share->table->s->table_name.length= strlen(share->table_name);
+  }
+  /* else rename will be handled when the ALTER event comes */
+  share->old_names= old_key;
+  // ToDo free old_names after ALTER EVENT
+
+  pthread_mutex_unlock(&ndbcluster_mutex);
+  return 0;
+}
+#endif
+
 /*
   Increase refcount on existing share.
   Always returns share and cannot fail.
 */
-static NDB_SHARE *get_share(NDB_SHARE *share)
+NDB_SHARE *ndbcluster_get_share(NDB_SHARE *share)
 {
   pthread_mutex_lock(&ndbcluster_mutex);
   share->use_count++;
@@ -5786,9 +6161,12 @@
 
   have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
 */
-static NDB_SHARE *get_share(const char *key, bool create_if_not_exists,
-                            bool have_lock)
+NDB_SHARE *ndbcluster_get_share(const char *key, bool create_if_not_exists,
+                                bool have_lock)
 {
+  DBUG_ENTER("get_share");
+  DBUG_PRINT("info", ("get_share: key %s", key));
+  THD *thd= current_thd;
   NDB_SHARE *share;
   if (!have_lock)
     pthread_mutex_lock(&ndbcluster_mutex);
@@ -5834,6 +6212,9 @@
       ha_ndbcluster::set_dbname(key, share->db);
       share->table_name= share->db + strlen(share->db) + 1;
       ha_ndbcluster::set_tabname(key, share->table_name);
+#ifdef HAVE_NDB_BINLOG
+      ndbcluster_binlog_init_share(share);
+#endif
       *root_ptr= old_root;
     }
     else
@@ -5861,7 +6242,7 @@
   return share;
 }
 
-static void real_free_share(NDB_SHARE **share)
+void ndbcluster_real_free_share(NDB_SHARE **share)
 {
   DBUG_PRINT("real_free_share",
              ("0x%lx key: %s  key_length: %d",
@@ -5876,6 +6257,26 @@
   pthread_mutex_destroy(&(*share)->mutex);
   free_root(&(*share)->mem_root, MYF(0));
 
+#ifdef HAVE_NDB_BINLOG
+  if ((*share)->table)
+  {
+    closefrm((*share)->table, 0);
+#if 0 // todo ?
+    free_root(&(*share)->table->mem_root, MYF(0));
+#endif
+
+#ifndef DBUG_OFF
+    bzero((gptr)(*share)->table_share, sizeof(*(*share)->table_share));
+    bzero((gptr)(*share)->table, sizeof(*(*share)->table));
+#endif
+    my_free((gptr) (*share)->table_share, MYF(0));
+    my_free((gptr) (*share)->table, MYF(0));
+#ifndef DBUG_OFF
+    (*share)->table_share= 0;
+    (*share)->table= 0;
+#endif
+  }
+#endif
   my_free((gptr) *share, MYF(0));
   *share= 0;
 
@@ -5888,7 +6289,7 @@
 
   have_lock == TRUE, pthread_mutex_lock(&ndbcluster_mutex) already taken
 */
-static void free_share(NDB_SHARE **share, bool have_lock)
+void ndbcluster_free_share(NDB_SHARE **share, bool have_lock)
 {
   if (!have_lock)
     pthread_mutex_lock(&ndbcluster_mutex);
@@ -5914,7 +6315,6 @@
 }
 
 
-
 /*
   Internal representation of the frm blob
    
@@ -6588,7 +6988,7 @@
     Wait for cluster to start
   */
   pthread_mutex_lock(&LOCK_ndb_util_thread);
-  while (!ndb_cluster_node_id)
+  while (!ndb_cluster_node_id && (ndbcluster_hton.slot != ~(uint)0))
   {
     /* ndb not connected yet */
     set_timespec(abstime, 1);
@@ -6603,13 +7003,35 @@
   }
   pthread_mutex_unlock(&LOCK_ndb_util_thread);
 
+  {
+    Thd_ndb *thd_ndb;
+    if (!(thd_ndb= ha_ndbcluster::seize_thd_ndb()))
+    {
+      sql_print_error("Could not allocate Thd_ndb object");
+      goto ndb_util_thread_end;
+    }
+    set_thd_ndb(thd, thd_ndb);
+    thd_ndb->options|= TNO_NO_LOG_SCHEMA_OP;
+  }
+
+#ifdef HAVE_NDB_BINLOG
+  /* create tables needed by the replication */
+  ndbcluster_setup_binlog_table_shares(thd);
+#else
   /*
     Get all table definitions from the storage node
   */
   ndbcluster_find_all_files(thd);
+#endif
 
   ndbcluster_util_inited= 1;
 
+#ifdef HAVE_NDB_BINLOG
+  /* If running, signal injector thread that all is setup */
+  if (ndb_binlog_thread_running > 0)
+    pthread_cond_signal(&injector_cond);
+#endif
+
   set_timespec(abstime, 0);
   for (;!abort_loop;)
   {
@@ -6626,6 +7048,15 @@
     if (abort_loop)
       break; /* Shutting down server */
 
+#ifdef HAVE_NDB_BINLOG
+    /*
+      Check that the apply_status_share and schema_share has been created.
+      If not try to create it
+    */
+    if (!apply_status_share || !schema_share)
+      ndbcluster_setup_binlog_table_shares(thd);
+#endif
+
     if (ndb_cache_check_time == 0)
     {
       /* Wake up in 1 second to check if value has changed */
@@ -6639,6 +7070,12 @@
     for (uint i= 0; i < ndbcluster_open_tables.records; i++)
     {
       share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
+#ifdef HAVE_NDB_BINLOG
+      if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0))
+          <= 0)
+        continue; // injector thread is the only user, skip statistics
+      share->util_lock= current_thd; // Mark that util thread has lock
+#endif /* HAVE_NDB_BINLOG */
       share->use_count++; /* Make sure the table can't be closed */
       DBUG_PRINT("ndb_util_thread",
                  ("Found open table[%d]: %s, use_count: %d",
@@ -6653,6 +7090,17 @@
     List_iterator_fast<NDB_SHARE> it(util_open_tables);
     while ((share= it++))
     {
+#ifdef HAVE_NDB_BINLOG
+      if ((share->use_count - (int) (share->op != 0) - (int) (share->op != 0))
+          <= 1)
+      {
+        /*
+          Util thread and injector thread is the only user, skip statistics
+	*/
+        free_share(&share);
+        continue;
+      }
+#endif /* HAVE_NDB_BINLOG */
       DBUG_PRINT("ndb_util_thread",
                  ("Fetching commit count for: %s",
                   share->key));
@@ -6714,6 +7162,7 @@
     }
   }
 ndb_util_thread_end:
+  sql_print_information("Stopping Cluster Utility thread");
   net_end(&thd->net);
   thd->cleanup();
   delete thd;
@@ -8069,7 +8518,20 @@
   {
     DBUG_RETURN(FALSE);
   }
-  
+
+  update_status_variables(g_ndb_cluster_connection);
+  my_snprintf(buf, sizeof(buf),
+              "cluster_node_id=%u, "
+              "connected_host=%s, "
+              "connected_port=%u, "
+              "number_of_storage_nodes=%u",
+              ndb_cluster_node_id,
+              ndb_connected_host,
+              ndb_connected_port,
+              ndb_number_of_storage_nodes);
+  if (stat_print(thd, ndbcluster_hton.name, "connection", buf))
+    DBUG_RETURN(TRUE);
+
   if (get_thd_ndb(thd) && get_thd_ndb(thd)->ndb)
   {
     Ndb* ndb= (get_thd_ndb(thd))->ndb;
@@ -8085,10 +8547,13 @@
         DBUG_RETURN(TRUE);
     }
   }
-  send_eof(thd);
-  
+#ifdef HAVE_NDB_BINLOG
+  ndbcluster_show_status_binlog(thd, stat_print, stat_type);
+#endif
+
   DBUG_RETURN(FALSE);
 }
+
 
 /*
   Create a table in NDB Cluster

--- 1.150/sql/set_var.cc	2005-12-29 20:48:13 +01:00
+++ 1.151/sql/set_var.cc	2006-01-05 11:46:13 +01:00
@@ -3054,11 +3054,12 @@
 
   if (var->value->result_type() == STRING_RESULT)
   {
-    enum db_type db_type;
+    LEX_STRING name;
+    handlerton *db_type;
     if (!(res=var->value->val_str(&str)) ||
-	!(var->save_result.ulong_value=
-          (ulong) (db_type= ha_resolve_by_name(res->ptr(), res->length()))) ||
-        ha_checktype(thd, db_type, 1, 0) != db_type)
+        !(name.str= (char *)res->ptr()) || !(name.length= res->length()) ||
+	!(var->save_result.hton= db_type= ha_resolve_by_name(thd, &name)) ||
+        ha_checktype(thd, ha_legacy_type(db_type), 1, 0) != db_type)
     {
       value= res ? res->c_ptr() : "NULL";
       goto err;
@@ -3076,29 +3077,28 @@
 byte *sys_var_thd_storage_engine::value_ptr(THD *thd, enum_var_type type,
 					    LEX_STRING *base)
 {
-  ulong val;
-  val= ((type == OPT_GLOBAL) ? global_system_variables.*offset :
-        thd->variables.*offset);
-  const char *table_type= ha_get_storage_engine((enum db_type)val);
-  return (byte *) table_type;
+  handlerton *val;
+  val= (type == OPT_GLOBAL) ? global_system_variables.*offset :
+        thd->variables.*offset;
+  return (byte *) val->name;
 }
 
 
 void sys_var_thd_storage_engine::set_default(THD *thd, enum_var_type type)
 {
   if (type == OPT_GLOBAL)
-    global_system_variables.*offset= (ulong) DB_TYPE_MYISAM;
+    global_system_variables.*offset= &myisam_hton;
   else
-    thd->variables.*offset= (ulong) (global_system_variables.*offset);
+    thd->variables.*offset= global_system_variables.*offset;
 }
 
 
 bool sys_var_thd_storage_engine::update(THD *thd, set_var *var)
 {
-  if (var->type == OPT_GLOBAL)
-    global_system_variables.*offset= var->save_result.ulong_value;
-  else
-    thd->variables.*offset= var->save_result.ulong_value;
+  handlerton **value= &(global_system_variables.*offset);
+  if (var->type != OPT_GLOBAL)
+    value= &(thd->variables.*offset);
+  *value= var->save_result.hton;
   return 0;
 }
 
Thread
bk commit into 5.1 tree (tomas:1.1992)tomas5 Jan