List:Internals« Previous MessageNext Message »
From:tomas Date:May 9 2005 5:11am
Subject:bk commit into 5.1 tree (tomas:1.1830)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet
  1.1830 05/05/09 05:10:56 tomas@stripped +12 -0
      added "COM_DAEMON" to be used for e.g. injector thread
      fixed some ndb rpl tests to become more flexible
     added correct setup/stop of binlog thread with ndb table discovery at startime, and
cleanup
      fixed drop database to handle event dropping correctly
      fixed som time issues at startup beteen the "ndb" threads
     added a state to ndb share to check if there has been a drop table or not
  s    removed apply status creation from slave thread

  sql/sql_parse.cc
    1.430 05/05/09 05:10:43 tomas@stripped +3 -1
    added "COM_DAEMON" to be used for e.g. injector thread

  sql/slave.cc
    1.248 05/05/09 05:10:42 tomas@stripped +0 -5
    removed apply status creation from slave thread

  sql/rpl_injector.cc
    1.14 05/05/09 05:10:42 tomas@stripped +4 -2
    fixed link error with gcc

  sql/ha_ndbcluster.h
    1.95 05/05/09 05:10:42 tomas@stripped +11 -0
    added a state to ndb share to check if there has been a drop table or not

  sql/ha_ndbcluster.cc
    1.236 05/05/09 05:10:41 tomas@stripped +390 -122
    added correct setup/stop of binlog thread with ndb table discovery at startime, and
cleanup
    fixed drop database to handle event dropping correctly
    fixed som time issues at startup beteen the "ndb" threads

  mysql-test/t/rpl_ndb_sync.test
    1.2 05/05/09 05:10:41 tomas@stripped +3 -1
    fixed some ndb rpl tests to become more flexible

  mysql-test/t/rpl_ndb_idempotent.test
    1.5 05/05/09 05:10:41 tomas@stripped +2 -2
    fixed some ndb rpl tests to become more flexible

  mysql-test/r/rpl_ndb_sync.result
    1.2 05/05/09 05:10:41 tomas@stripped +3 -3
    fixed some ndb rpl tests to become more flexible

  mysql-test/r/rpl_ndb_idempotent.result
    1.6 05/05/09 05:10:41 tomas@stripped +2 -2
    fixed some ndb rpl tests to become more flexible

  mysql-test/mysql-test-run.sh
    1.269 05/05/09 05:10:41 tomas@stripped +1 -1
    fixed bug in NDB_EXTRA_TEST check

  mysql-test/include/have_ndb_extra.inc
    1.2 05/05/09 05:10:41 tomas@stripped +2 -1
    fixed bug in NDB_EXTRA_TEST check

  include/mysql_com.h
    1.96 05/05/09 05:10:41 tomas@stripped +1 -1
    added "COM_DAEMON" to be used for e.g. injector thread

# This is a BitKeeper patch.  What follows are the unified diffs for the
# set of deltas contained in the patch.  The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User:	tomas
# Host:	poseidon.ndb.mysql.com
# Root:	/home/tomas/wl2325

--- 1.95/include/mysql_com.h	2005-02-28 15:22:22 +01:00
+++ 1.96/include/mysql_com.h	2005-05-09 05:10:41 +02:00
@@ -49,7 +49,7 @@
   COM_TIME, COM_DELAYED_INSERT, COM_CHANGE_USER, COM_BINLOG_DUMP,
   COM_TABLE_DUMP, COM_CONNECT_OUT, COM_REGISTER_SLAVE,
   COM_PREPARE, COM_EXECUTE, COM_LONG_DATA, COM_CLOSE_STMT,
-  COM_RESET_STMT, COM_SET_OPTION, COM_FETCH,
+  COM_RESET_STMT, COM_SET_OPTION, COM_FETCH, COM_DAEMON,
   /* don't forget to update const char *command_name[] in sql_parse.cc */
 
   /* Must be last */

--- 1.268/mysql-test/mysql-test-run.sh	2005-05-04 02:08:26 +02:00
+++ 1.269/mysql-test/mysql-test-run.sh	2005-05-09 05:10:41 +02:00
@@ -293,7 +293,7 @@
       USE_NDBCLUSTER_SLAVE="--ndbcluster" ;
       USE_RUNNING_NDBCLUSTER_SLAVE=`$ECHO "$1" | $SED -e
"s;--ndb-connectstring-slave=;;"` ;;
     --ndb-extra-test)
-#      NDBCLUSTER_EXTRA_OPTS=" "
+      NDBCLUSTER_EXTRA_OPTS=" "
       NDB_EXTRA_TEST=1 ;
       ;;
     --ndb_mgm-extra-opts=*)

--- 1.247/sql/slave.cc	2005-04-25 13:29:23 +02:00
+++ 1.248/sql/slave.cc	2005-05-09 05:10:42 +02:00
@@ -3266,10 +3266,6 @@
 
 /* Slave SQL Thread entry point */
 
-#ifdef HAVE_NDBCLUSTER_DB
-int ndbcluster_create_apply_status_table(THD *thd);
-#endif
-
 extern "C" pthread_handler_decl(handle_slave_sql,arg)
 {
   THD *thd;			/* needs to be first for thread_stack */
@@ -3409,7 +3405,6 @@
 #ifdef HAVE_NDBCLUSTER_DB
   if (have_ndbcluster == SHOW_OPTION_YES)
   {
-    ndbcluster_create_apply_status_table(thd);
     thd->variables.ndb_use_exact_count= 0;
   }
 #endif

--- 1.429/sql/sql_parse.cc	2005-05-03 12:45:07 +02:00
+++ 1.430/sql/sql_parse.cc	2005-05-09 05:10:43 +02:00
@@ -86,7 +86,7 @@
   "Connect","Kill","Debug","Ping","Time","Delayed insert","Change user",
   "Binlog Dump","Table Dump",  "Connect Out", "Register Slave",
   "Prepare", "Execute", "Long Data", "Close stmt",
-  "Reset stmt", "Set option", "Fetch",
+  "Reset stmt", "Set option", "Fetch", "Daemon",
   "Error"					// Last command number
 };
 
@@ -6314,6 +6314,8 @@
   I_List_iterator<THD> it(threads);
   while ((tmp=it++))
   {
+    if (tmp->command == COM_DAEMON)
+      continue;
     if (tmp->thread_id == id)
     {
       pthread_mutex_lock(&tmp->LOCK_delete);	// Lock from delete

--- 1.235/sql/ha_ndbcluster.cc	2005-05-06 14:34:21 +02:00
+++ 1.236/sql/ha_ndbcluster.cc	2005-05-09 05:10:41 +02:00
@@ -89,7 +89,8 @@
 typedef NdbDictionary::Dictionary  NDBDICT;
 typedef NdbDictionary::Event  NDBEVENT;
 
-bool ndbcluster_inited= FALSE;
+static int ndbcluster_inited= 0;
+static int ndbcluster_util_inited= 0;
 
 static Ndb* g_ndb= NULL;
 static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
@@ -123,7 +124,7 @@
 {
   if (_db_on_)
   {
-    for (int j= 0; j < table->s->fields; j++)
+    for (uint j= 0; j < table->s->fields; j++)
     {
       char buf[1024];
       int pos= 0;
@@ -171,7 +172,7 @@
 static int replication_handle_drop_table(Ndb *ndb, const char* event_name,
 					 NDB_SHARE *share);
 inline void ndb_rep_event_name(String *event_name, const char* db, const char* tbl);
-
+static int ndbcluster_create_apply_status_table(THD *thd);
 #ifndef DBUG_OFF
 static void dbug_print_table(const char *info, TABLE *table);
 #endif
@@ -234,6 +235,7 @@
 static ulonglong ndb_latest_applied_binlog_epoch= 0;
 static ulonglong ndb_latest_handled_binlog_epoch= 0;
 static ulonglong ndb_latest_received_binlog_epoch= 0;
+/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
 extern Uint64 g_latest_trans_gci;
 
 /*
@@ -1005,6 +1007,20 @@
       of table accessed in NDB
 */
 
+static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data, uint pack_length)
+{
+  DBUG_ENTER("cmp_frm");
+  /*
+   * Compare FrmData in NDB with frm file from disk.
+   */
+  if ((pack_length != ndbtab->getFrmLength()) || 
+      (memcmp(pack_data, ndbtab->getFrmData(), pack_length)))
+  {
+    DBUG_RETURN(1);
+  }
+  DBUG_RETURN(0);
+}
+
 int ha_ndbcluster::get_metadata(const char *path)
 {
   Ndb *ndb= get_ndb();
@@ -1042,8 +1058,7 @@
       DBUG_RETURN(1);
     }
     
-    if ((pack_length != tab->getFrmLength()) || 
-        (memcmp(pack_data, tab->getFrmData(), pack_length)))
+    if (cmp_frm(tab, pack_data, pack_length))
     {
       if (!invalidating_ndb_table)
       {
@@ -4007,6 +4022,73 @@
   }
 }
 
+#ifdef HAVE_REPLICATION
+static int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
+					  const char* db, const char *table_name,
+					  int do_binlog)
+{
+  DBUG_ENTER("ndbcluster_create_binlog_setup");
+  pthread_mutex_lock(&ndbcluster_mutex);
+  NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+					     (byte*) key, strlen(key));
+  pthread_mutex_unlock(&ndbcluster_mutex);
+
+  if (share)
+    handle_trailing_share(share);
+  
+  /**
+   * Share is needed to hold replication information
+   */
+  if (!(share=get_share(key,true)))
+  {
+    sql_print_error("NDB Binlog: "
+		    "allocating table share for %s failed", key);
+    // ToDo: return error here?;
+  }
+
+  while(share && do_binlog)
+  {
+    // ToDo make sanity check of share so that the table is actually the same
+    // I.e. we need to do openfrm in this case
+
+    // Create the event in NDB
+    NDBDICT *dict= ndb->getDictionary();
+    const NDBTAB *ndbtab= dict->getTable(table_name);
+    String event_name(INJECTOR_EVENT_LEN);
+    ndb_rep_event_name(&event_name,db,table_name);
+    /* event should have been created by someone else, but let's make sure */ 
+    if (!dict->getEvent(event_name.c_ptr()))
+    {
+      ndb->setDatabaseName(db);
+      if (ndbcluster_create_event(ndb,ndbtab,event_name.c_ptr()))
+      {
+	sql_print_error("NDB Binlog: "
+			"FAILED CREATE (DISCOVER) TABLE Event: %s",
+			event_name.c_ptr());
+	break;
+      }
+      sql_print_information("NDB Binlog: "
+			    "CREATE (DISCOVER) TABLE Event: %s",
+			    event_name.c_ptr());
+    }
+    else
+      sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
+			    event_name.c_ptr());
+    if (ndbcluster_create_event_ops(share, ndbtab,
+				    event_name.c_ptr()) < 0)
+    {
+      sql_print_error("NDB Binlog:"
+		      "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
+		      event_name.c_ptr());
+      /* TODO: Handle the error? */
+      DBUG_RETURN(0);
+    }
+    DBUG_RETURN(0);
+  }
+  DBUG_RETURN(-1);
+}
+#endif /* HAVE_REPLICATION */
+
 int ha_ndbcluster::create(const char *name, 
                           TABLE *form, 
                           HA_CREATE_INFO *info)
@@ -4034,65 +4116,10 @@
     if ((my_errno= write_ndb_file()))
       DBUG_RETURN(my_errno);
 #ifdef HAVE_REPLICATION
-    while(ndb_injector_thread_running > 0 &&
-	  m_tabname[0] != '#')
-    {
-      if ((my_errno= check_ndb_connection()))
-	DBUG_RETURN(my_errno);
-      /**
-       * Share is needed to hold replication information
-       */
-      NDB_SHARE *share;
-      if (!(share=get_share(name2,true)))
-      {
-	sql_print_error("NDB Binlog: "
-			"allocating table share for %s failed", name2);
-	break;
-	// ToDo: return error here?;
-      }
-
-      // ToDo make sanity check of share so that the table is actually the same
-      // I.e. we need to do openfrm in this case
-
-      // Create the event in NDB     
-      Ndb *ndb= get_ndb();
-      NDBDICT *dict= ndb->getDictionary();
-      const NDBTAB *ndbtab= dict->getTable(m_tabname);
-      String event_name(INJECTOR_EVENT_LEN);
-      ndb_rep_event_name(&event_name,m_dbname,m_tabname);
-      /* event should have been created by someone else, but let's make sure */ 
-      if (!dict->getEvent(event_name.c_ptr()))
-      {
-	ndb->setDatabaseName(m_dbname);
-	if (ndbcluster_create_event(ndb,ndbtab,event_name.c_ptr()))
-	{
-	  sql_print_error("NDB Binlog: "
-			  "FAILED CREATE (DISCOVER) TABLE Event: %s",
-			  event_name.c_ptr());
-	  free_share(&share);
-	  break;
-	}
-	sql_print_information("NDB Binlog: "
-			      "CREATE (DISCOVER) TABLE Event: %s",
-			      event_name.c_ptr());
-      }
-      else
-	sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
-			      event_name.c_ptr());
-      if (ndbcluster_create_event_ops(share, ndbtab,
-				      event_name.c_ptr()) < 0)
-      {
-	sql_print_error("NDB Binlog:"
-			"FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
-			event_name.c_ptr());
-	/* TODO: Handle the error? */
-	free_share(&share);
-	DBUG_RETURN(0);
-      }
-      // keep lock on share, injector thread now has responsability
-      DBUG_RETURN(0);
-    }
-#endif
+    ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname,
+				   ndb_injector_thread_running > 0 &&
+				   m_tabname[0] != '#');
+#endif /* HAVE_REPLICATION */
     DBUG_RETURN(my_errno);
   } // if (create_from_engine)
 
@@ -4465,31 +4492,40 @@
 
  */
 
-int ha_ndbcluster::delete_table(const char *name)
-{
-  DBUG_ENTER("ha_ndbcluster::delete_table");
-  DBUG_PRINT("enter", ("name: %s", name));
-  set_dbname(name);
-  set_tabname(name);
-
-  if (check_ndb_connection())
-    DBUG_RETURN(HA_ERR_NO_CONNECTION);
-
-  /* Call ancestor function to delete .ndb file */
-  handler::delete_table(name);
+/* static version which does not need a handler */
 
+int
+ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
+			    const char *path,
+			    const char *db,
+			    const char *table_name)
+{
+  DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+  NdbDictionary::Dictionary *dict= ndb->getDictionary();
 #ifdef HAVE_REPLICATION
   NDB_SHARE *share= 0;
-  share= get_share(name,false);
+  share= get_share(path,false);
 #endif
 
   /* Drop the table from NDB */
-  int r= drop_table();
+  
+  int r;
+  if (h)
+  {
+    r= h->drop_table();
+  }
+  else
+  {
+    ndb->setDatabaseName(db);
+    r= dict->dropTable(table_name);
+  }
+
   if (r)
   {
 #ifdef HAVE_REPLICATION
     if (share)
     {
+      share->state= NSS_DROPPED;
       free_share(&share);
       free_share(&share);
     }
@@ -4498,21 +4534,21 @@
   }
 
 #ifdef HAVE_REPLICATION
-  Ndb *ndb= get_ndb();
   if ( share && share->op)
   {
-    ndb->getDictionary()->forceGCPWait();
+    dict->forceGCPWait();
   }
 
-  if ( m_tabname[0] != '#' ) // FIXME tabname hack
+  if ( table_name[0] != '#' ) // FIXME tabname hack
   {
     String event_name(INJECTOR_EVENT_LEN);
-    ndb_rep_event_name(&event_name,name+sizeof(share_prefix)-1,0);
+    ndb_rep_event_name(&event_name,path+sizeof(share_prefix)-1,0);
     replication_handle_drop_table(ndb,event_name.c_ptr(),share);
   }
 
   if (share)
   {
+    share->state= NSS_DROPPED;
     free_share(&share);
     free_share(&share);
   }
@@ -4520,6 +4556,22 @@
   DBUG_RETURN(0);
 }
 
+int ha_ndbcluster::delete_table(const char *name)
+{
+  DBUG_ENTER("ha_ndbcluster::delete_table");
+  DBUG_PRINT("enter", ("name: %s", name));
+  set_dbname(name);
+  set_tabname(name);
+
+  if (check_ndb_connection())
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+  /* Call ancestor function to delete .ndb file */
+  handler::delete_table(name);
+
+  DBUG_RETURN(delete_table(this,get_ndb(),name,m_dbname,m_tabname));
+}
+
 
 /*
   Drop table in NDB Cluster
@@ -4938,14 +4990,14 @@
     drop_list.push_back(thd->strdup(t.name));
   }
   // Drop any tables belonging to database
+  char full_path[FN_REFLEN];
+  char *t= strxnmov(full_path,FN_REFLEN,share_prefix,dbname,"/",NullS);
   ndb->setDatabaseName(dbname);
   List_iterator_fast<char> it(drop_list);
   while ((tabname=it++))
   {
-#ifdef HAVE_REPLICATION
-    // ToDo free and associated shares
-#endif
-    if (dict->dropTable(tabname))
+    strxnmov(t,FN_REFLEN-(t-full_path),tabname,NullS);
+    if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
     {
       const NdbError err= dict->getNdbError();
       if (err.code != 709)
@@ -4959,6 +5011,81 @@
 }
 
 
+static int ndbcluster_find_all_files(THD *thd)
+{
+  DBUG_ENTER("ndbcluster_find_all_files");
+  Ndb* ndb;
+  char key[FN_REFLEN];
+  NdbDictionary::Dictionary::List list;
+
+  if (!(ndb= check_ndb_in_thd(thd)))
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+  NDBDICT *dict= ndb->getDictionary();
+  if (dict->listObjects(list, 
+                        NdbDictionary::Object::UserTable) != 0)
+    ERR_RETURN(dict->getNdbError());
+
+  for (uint i= 0 ; i < list.count ; i++)
+  {
+    NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+    DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+    ndb->setDatabaseName(t.database);
+    const NDBTAB *ndbtab;
+    if (!(ndbtab= dict->getTable(t.name)))
+    {
+      sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+		      t.database,t.name,
+		      dict->getNdbError().code,
+		      dict->getNdbError().message);
+      continue;
+    }
+
+    strxnmov(key, FN_LEN, mysql_data_home,"/",t.database,"/",t.name, NullS);
+    const void *data, *pack_data;
+    uint length, pack_length;
+    int discover= 0;
+    if (readfrm(key, &data, &length) ||
+        packfrm(data, length, &pack_data, &pack_length))
+    {
+      discover= 1;
+      sql_print_information("NDB: missing frm for %s.%s, discovering...",
+			    t.database,t.name);
+    }
+    else if (cmp_frm(ndbtab, pack_data, pack_length))
+    {
+      discover= 1;
+      sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
+			    t.database,t.name);
+    }
+    my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+    my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+
+    if (discover)
+    {
+      // ToDo database needs to be created if missing
+      pthread_mutex_lock(&LOCK_open);
+      if (ha_create_table_from_engine(thd, t.database, t.name, TRUE))
+      {
+	; // ToDo handle error?
+      }
+      pthread_mutex_unlock(&LOCK_open);
+    }
+    else
+    {
+#ifdef HAVE_REPLICATION
+      /*
+       * set up replication for this table
+       */
+      ndbcluster_create_binlog_setup(ndb, key, t.database, t.name,
+				     ndb_injector_thread_running > 0 &&
+				     t.name[0] != '#');
+#endif
+    }
+  }
+  DBUG_RETURN(0);
+}
+
 int ndbcluster_find_files(THD *thd,const char *db,const char *path,
                           const char *wild, bool dir, List<char> *files)
 {
@@ -5001,7 +5128,7 @@
   for (i= 0 ; i < list.count ; i++)
   {
     NdbDictionary::Dictionary::List::Element& t= list.elements[i];
-    DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));     
+    DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
 
     // Add only tables that belongs to db
     if (my_strcasecmp(system_charset_info, t.database, db))
@@ -5126,12 +5253,7 @@
 static int connect_callback()
 {
   update_status_variables(g_ndb_cluster_connection);
-#ifdef HAVE_REPLICATION
-  if (ndb_injector_thread_running > 0)
-  {
-    pthread_cond_signal(&injector_cond);
-  }
-#endif /* HAVE_REPLICATION */
+  pthread_cond_signal(&COND_ndb_util_thread);
   return 0;
 }
 
@@ -5260,6 +5382,46 @@
   (void) pthread_cond_signal(&COND_ndb_util_thread);
   (void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
 
+#ifdef HAVE_REPLICATION
+  /* remove all shares */
+  {
+    pthread_mutex_lock(&ndbcluster_mutex);
+    for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+    {
+      NDB_SHARE *share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
+      if (share->table)
+	DBUG_PRINT("share",
+		   ("table->s->db.table_name: %s.%s",
+		    share->table->s->db, share->table->s->table_name));
+      if (share->state != NSS_DROPPED && !--share->use_count)
+      {
+	real_free_share(&share);
+      }
+      else
+      {
+	DBUG_PRINT("share",
+		   ("[%d] 0x%x key: %s key_length: %d",
+		    i, share, share->key, share->key_length));
+	DBUG_PRINT("share",
+		   ("db.tablename: %s.%s use_count: %d commit_count: %d",
+		    share->db, share->table_name,
+		    share->use_count, share->commit_count));
+      }
+    }
+    pthread_mutex_unlock(&ndbcluster_mutex);
+  }
+  /* wait for injector thread */
+  pthread_mutex_lock(&injector_mutex);
+  while (ndb_injector_thread_running > 0)
+  {
+    struct timespec abstime;
+    set_timespec(abstime, 1);
+    pthread_cond_timedwait(&injector_cond,&injector_mutex,&abstime);
+  }
+  pthread_mutex_unlock(&injector_mutex);
+#endif
+
+#if 1
   if(g_ndb)
     delete g_ndb;
   g_ndb= NULL;
@@ -5271,7 +5433,9 @@
   pthread_mutex_destroy(&ndbcluster_mutex);
   pthread_mutex_destroy(&LOCK_ndb_util_thread);
   pthread_cond_destroy(&COND_ndb_util_thread);
+#endif
   ndbcluster_inited= 0;
+  ndbcluster_util_inited= 0;
   DBUG_RETURN(0);
 }
 
@@ -5792,9 +5956,9 @@
 {
   static ulong trailing_share_id= 0;
 
-  DBUG_PRINT("error", ("rename_share: %s already exists, use_count=%d.",
+  DBUG_PRINT("error", ("NDB_SHARE: %s already exists, use_count=%d.",
 		       share->key, share->use_count));
-  sql_print_error("rename_share: %s already exists, use_count=%d."
+  sql_print_error("NDB_SHARE: %s already exists, use_count=%d."
 		  " Moving away for safety, but possible memleak.",
 		  share->key, share->use_count);
   dbug_print_open_tables();
@@ -6689,6 +6853,7 @@
   Ndb* ndb;
   int error= 0;
   struct timespec abstime;
+  List<NDB_SHARE> util_open_tables;
 
   my_thread_init();
   DBUG_ENTER("ndb_util_thread");
@@ -6709,10 +6874,68 @@
     delete ndb;
     DBUG_RETURN(NULL);
   }
+  thd->init_for_queries();
+  thd->version=refresh_version;
+  thd->set_time();
+  thd->host_or_ip= "";
+  thd->client_capabilities = 0;
+  my_net_init(&thd->net, 0);
+  thd->master_access= ~0;
+  thd->priv_user = 0;
+
+  /**
+   * wait for mysql server to start
+   */
+  pthread_mutex_lock(&LOCK_server_started);
+  while(!mysqld_server_started)
+    pthread_cond_wait(&COND_server_started,&LOCK_server_started);
+  pthread_mutex_unlock(&LOCK_server_started);
+
+  /**
+   * Wait for cluster to start
+   */
+  pthread_mutex_lock(&LOCK_ndb_util_thread);
+  while (!ndb_cluster_node_id)
+  {
+    /* ndb not connected yet */
+    set_timespec(abstime, 1);
+    pthread_cond_timedwait(&COND_ndb_util_thread,
+			   &LOCK_ndb_util_thread,
+			   &abstime);
+    if (abort_loop)
+    {
+      pthread_mutex_unlock(&LOCK_ndb_util_thread);
+      goto ndb_util_thread_end;
+    }
+  }
+  pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+  /**
+   * Get all table definitions from the storage node
+   */
+  ndbcluster_find_all_files(thd);
+
+#ifdef HAVE_REPLICATION
+  /**
+   * create tables needed by the replication
+   */
+  ndbcluster_create_apply_status_table(thd);
+#endif
+
+  ndbcluster_util_inited= 1;
+
+#ifdef HAVE_REPLICATION
+  /**
+   * If running, signal injector thread that all is setup
+   */
+  if (ndb_injector_thread_running > 0)
+  {
+    pthread_cond_signal(&injector_cond);
+  }
+#endif
 
-  List<NDB_SHARE> util_open_tables;
   set_timespec(abstime, 0);
-  for (;;)
+  for (;!abort_loop;)
   {
 
     pthread_mutex_lock(&LOCK_ndb_util_thread);
@@ -6747,7 +6970,7 @@
 #ifndef DBUG_OFF
       share->no_use_count_check= 1;
 #endif
-#endif
+#endif /* HAVE_REPLICATION */
       share->use_count++; /* Make sure the table can't be closed */
       DBUG_PRINT("ndb_util_thread",
 		 ("Found open table[%d]: %s, use_count: %d",
@@ -6769,7 +6992,7 @@
 	free_share(&share);
 	continue;
       }
-#endif
+#endif /* HAVE_REPLICATION */
       DBUG_PRINT("ndb_util_thread",
                  ("Fetching commit count for: %s",
                   share->key));
@@ -6830,7 +7053,7 @@
       abstime.tv_nsec-= 1000000000;
     }
   }
-
+ndb_util_thread_end:
   thd->cleanup();
   delete thd;
   delete ndb;
@@ -8027,22 +8250,58 @@
   return get_share("./" NDB_REP_DB "/" NDB_APPLY_TABLE,false);
 }
 
-int
+static int
 ndbcluster_create_apply_status_table(THD *thd)
 {
-  /**
-   * Note, updating this table schema must be reflected in ndb_restore
-   */
   char buf[1024], *end;
-  end=strmov(buf, "CREATE TABLE IF NOT EXISTS " NDB_REP_DB "." NDB_APPLY_TABLE
-	     " ( server_id INT UNSIGNED NOT NULL, epoch BIGINT UNSIGNED NOT NULL, "
-	     " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
 
+  /* Check if we already have the apply status table.
+   * If so it should have been discovered at startup
+   * and thus have a share
+   */
+  end= strxnmov(buf, FN_REFLEN, mysql_data_home,
+		"/" NDB_REP_DB "/" NDB_APPLY_TABLE,NullS);
+  pthread_mutex_lock(&ndbcluster_mutex);
+  void *tmp= hash_search(&ndbcluster_open_tables, (byte*) buf, end-buf);
+  pthread_mutex_unlock(&ndbcluster_mutex);
+  if ( tmp )
+    return 0;
+
+  /* save some thread variables as not to changes them */
   ulong save_query_length= thd->query_length;
   char *save_query= thd->query;
   ulong save_thread_id= thd->variables.pseudo_thread_id;
   NET save_net= thd->net;
 
+  /* Check if apply status table exists in MySQL "dictionary"
+   * if so, remove it since there is none in Ndb
+   */
+  {
+    TABLE_LIST table_list;
+    end= strxnmov(buf, FN_REFLEN, NDB_REP_DB,NullS);
+    end++;
+    strxnmov(end, FN_REFLEN, NDB_REP_DB,NullS);
+    bzero((char*) &table_list,sizeof(table_list));
+    table_list.db= (char*)buf;
+    table_list.alias= table_list.table_name= (char*)end;
+    pthread_mutex_lock(&LOCK_open);
+    (void)mysql_rm_table_part2(thd, &table_list,
+			       /* if_exists */ TRUE,
+			       /* drop_temporary */ FALSE,
+			       /* drop_view */ FALSE,
+			       /* dont_log_query*/ TRUE);
+    pthread_mutex_unlock(&LOCK_open);
+    /* Clear error message that is returned when table is deleted */
+    thd->clear_error();
+  }
+
+  /**
+   * Note, updating this table schema must be reflected in ndb_restore
+   */
+  end=strmov(buf, "CREATE TABLE IF NOT EXISTS " NDB_REP_DB "." NDB_APPLY_TABLE
+	     " ( server_id INT UNSIGNED NOT NULL, epoch BIGINT UNSIGNED NOT NULL, "
+	     " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
+
   bzero((char*)&thd->net,sizeof(NET));
   thd->query_length= end-buf;
   thd->query= buf;
@@ -8271,10 +8530,13 @@
 		    event_name,
 		    dict->getNdbError().code,
 		    dict->getNdbError().message);
-    push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-			ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
-			dict->getNdbError().code, dict->getNdbError().message,
-			"NDB");
+    if ( dict->getNdbError().code != 4710 )
+    {
+      push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+			  ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+			  dict->getNdbError().code, dict->getNdbError().message,
+			  "NDB");
+    }
     ndb_error= dict->getNdbError();
     DBUG_RETURN(-1);
   }
@@ -8736,8 +8998,11 @@
     pthread_exit(0);
     DBUG_RETURN(NULL);
   }
+
   thd->init_for_queries();
+  thd->command= COM_DAEMON;
   injector_thd= thd;
+
   /**
    * Set up ndb binlog
    */
@@ -8784,7 +9049,7 @@
   thd->proc_info= "Waiting for ndbcluster to start";
 
   pthread_mutex_lock(&injector_mutex);
-  while (!ndb_cluster_node_id)
+  while (!ndbcluster_util_inited)
   {
     /* ndb not connected yet */
     struct timespec abstime;
@@ -8798,12 +9063,6 @@
   }
   pthread_mutex_unlock(&injector_mutex);
 
-  thd->proc_info= "Waiting for mysqld to finish starting";
-  pthread_mutex_lock(&LOCK_server_started);
-  while(!mysqld_server_started)
-    pthread_cond_wait(&COND_server_started,&LOCK_server_started);
-  pthread_mutex_unlock(&LOCK_server_started);
-
   /**
    * Main NDB Injector loop
    */
@@ -8818,21 +9077,26 @@
     static char db[]="";
     thd->db=db;
     open_binlog_index(thd,&tables,&binlog_index);
-    ndbcluster_create_apply_status_table(thd);
     apply_status_share= ndbcluster_get_apply_status_share();
+    assert(apply_status_share);
     thd->db=db;
   }
-  for (;!(abort_loop || do_ndbcluster_binlog_close_connection);)
+  for (;!((abort_loop || do_ndbcluster_binlog_close_connection) &&
+	  ndb_latest_received_binlog_epoch >= g_latest_trans_gci);)
   {
     /**
      * now we don't want any events before next gci is complete
      */
     thd->proc_info= "Waiting for event from ndbcluster";
+    thd->set_time();
     
     int res= ndb->pollEvents(1000); // wait for event or 1000 ms
 
-    if (abort_loop || do_ndbcluster_binlog_close_connection)
+    if ( (abort_loop || do_ndbcluster_binlog_close_connection) &&
+	 ndb_latest_received_binlog_epoch >= g_latest_trans_gci )
+    {
       break; /* Shutting down server */
+    }
 
     if (binlog_index && binlog_index->s->version < refresh_version)
     {
@@ -8938,7 +9202,6 @@
   }
 err:
   //close_thread_tables(thd);
-  ndb_injector_thread_running= -1;
   pthread_mutex_lock(&injector_mutex);
   /* don't mess with the injector_ndb anymore from other threads */
   injector_ndb= 0;
@@ -8951,6 +9214,7 @@
   {
     free_share(&apply_status_share);
   }
+
   /* remove all event operations */
   if (ndb)
   {
@@ -8968,10 +9232,14 @@
     ndb= 0;
   }
 
+  ndb_injector_thread_running= -1;
+  (void) pthread_cond_signal(&injector_cond);
+
   thd->cleanup();
   delete thd;
   DBUG_PRINT("exit", ("ndb_injector_thread"));
   my_thread_end();
+
   pthread_exit(0);
   DBUG_RETURN(NULL);
 }

--- 1.94/sql/ha_ndbcluster.h	2005-05-02 17:06:31 +02:00
+++ 1.95/sql/ha_ndbcluster.h	2005-05-09 05:10:42 +02:00
@@ -60,6 +60,11 @@
 
 typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
 
+typedef enum {
+  NSS_INITIAL= 0,
+  NSS_DROPPED
+} NDB_SHARE_STATE;
+
 typedef struct st_ndbcluster_share {
   MEM_ROOT mem_root;
   THR_LOCK lock;
@@ -71,6 +76,7 @@
   uint commit_count_lock;
   ulonglong commit_count;
 #ifdef HAVE_REPLICATION
+  NDB_SHARE_STATE state;
 #ifndef DEBUG_OFF
   int no_use_count_check;
 #endif
@@ -527,7 +533,12 @@
                                      qc_engine_callback *engine_callback,
                                      ulonglong *engine_data);
 private:
+  friend int ndbcluster_drop_database(const char *path);
   int alter_table_name(const char *to);
+  static int delete_table(ha_ndbcluster *h, Ndb *ndb,
+			  const char *path,
+			  const char *db,
+			  const char *table_name);
   int drop_table();
   int create_index(const char *name, KEY *key_info, bool unique);
   int create_ordered_index(const char *name, KEY *key_info);

--- 1.13/sql/rpl_injector.cc	2005-04-26 09:05:04 +02:00
+++ 1.14/sql/rpl_injector.cc	2005-05-09 05:10:42 +02:00
@@ -98,11 +98,13 @@
 {
 }
 
+static injector *s_injector= 0;
 injector* injector::
 instance()
 {
-  static injector s_injector;
-  return &s_injector;		// "There can be only one [instance]"
+  if (s_injector == 0)
+    s_injector= new injector;
+  return s_injector;		// "There can be only one [instance]"
 }
 
 

--- 1.1/mysql-test/include/have_ndb_extra.inc	2005-05-04 12:20:33 +02:00
+++ 1.2/mysql-test/include/have_ndb_extra.inc	2005-05-09 05:10:41 +02:00
@@ -1 +1,2 @@
---exec test x$NDB_EXTRA_TEST = x1
+-- require r/have_ndb_extra.require
+eval select $NDB_EXTRA_TEST;

--- 1.5/mysql-test/r/rpl_ndb_idempotent.result	2005-04-29 18:10:42 +02:00
+++ 1.6/mysql-test/r/rpl_ndb_idempotent.result	2005-05-09 05:10:41 +02:00
@@ -34,14 +34,14 @@
 row4	D	4
 SHOW SLAVE STATUS;
 Slave_IO_State	Master_Host	Master_User	Master_Port	Connect_Retry	Master_Log_File	Read_Master_Log_Pos	Relay_Log_File	Relay_Log_Pos	Relay_Master_Log_File	Slave_IO_Running	Slave_SQL_Running	Replicate_Do_DB	Replicate_Ignore_DB	Replicate_Do_Table	Replicate_Ignore_Table	Replicate_Wild_Do_Table	Replicate_Wild_Ignore_Table	Last_Errno	Last_Error	Skip_Counter	Exec_Master_Log_Pos	Relay_Log_Space	Until_Condition	Until_Log_File	Until_Log_Pos	Master_SSL_Allowed	Master_SSL_CA_File	Master_SSL_CA_Path	Master_SSL_Cert	Master_SSL_Cipher	Master_SSL_Key	Seconds_Behind_Master
-#	127.0.0.1	root	MASTER_PORT	1	master-bin.000001	1040	#	#	master-bin.000001	Yes	Yes							0		0	1040	#	None		0	No						#
+<Slave_IO_State>	127.0.0.1	root	MASTER_PORT	1	master-bin.000001	<Read_Master_Log_Pos>	<Relay_Log_File>	<Relay_Log_Pos>	master-bin.000001	Yes	Yes				<Replicate_Ignore_Table>			0		0	<Exec_Master_Log_Pos>	<Relay_Log_Space>	None		0	No						<Seconds_Behind_Master>
 STOP SLAVE;
 CHANGE MASTER TO
 master_log_file = 'master-bin.000001',
 master_log_pos = 246 ;
 SHOW SLAVE STATUS;
 Slave_IO_State	Master_Host	Master_User	Master_Port	Connect_Retry	Master_Log_File	Read_Master_Log_Pos	Relay_Log_File	Relay_Log_Pos	Relay_Master_Log_File	Slave_IO_Running	Slave_SQL_Running	Replicate_Do_DB	Replicate_Ignore_DB	Replicate_Do_Table	Replicate_Ignore_Table	Replicate_Wild_Do_Table	Replicate_Wild_Ignore_Table	Last_Errno	Last_Error	Skip_Counter	Exec_Master_Log_Pos	Relay_Log_Space	Until_Condition	Until_Log_File	Until_Log_Pos	Master_SSL_Allowed	Master_SSL_CA_File	Master_SSL_CA_Path	Master_SSL_Cert	Master_SSL_Cipher	Master_SSL_Key	Seconds_Behind_Master
-#	127.0.0.1	root	MASTER_PORT	1	master-bin.000001	246	#	#	master-bin.000001	No	No							0		0	246	#	None		0	No						#
+<Slave_IO_State>	127.0.0.1	root	MASTER_PORT	1	master-bin.000001	<Read_Master_Log_Pos>	<Relay_Log_File>	<Relay_Log_Pos>	master-bin.000001	No	No				<Replicate_Ignore_Table>			0		0	<Exec_Master_Log_Pos>	<Relay_Log_Space>	None		0	No						<Seconds_Behind_Master>
 START SLAVE;
 SELECT * FROM t1 ORDER BY c3;
 c1	c2	c3

--- 1.1/mysql-test/r/rpl_ndb_sync.result	2005-05-01 16:03:56 +02:00
+++ 1.2/mysql-test/r/rpl_ndb_sync.result	2005-05-09 05:10:41 +02:00
@@ -58,14 +58,14 @@
 SELECT @the_pos:=Position,@the_file:=SUBSTRING_INDEX(FILE, '/', -1)
 FROM cluster_replication.binlog_index WHERE epoch > <the_epoch> ORDER BY epoch
ASC LIMIT 1;
 @the_pos:=Position	@the_file:=SUBSTRING_INDEX(FILE, '/', -1)
-1403	master-bin.000001
+<the_pos>	master-bin.000001
 CHANGE MASTER TO
 master_log_file = 'master-bin.000001',
-master_log_pos = 1403 ;
+master_log_pos = <the_pos> ;
 START SLAVE;
 SHOW SLAVE STATUS;
 Slave_IO_State	Master_Host	Master_User	Master_Port	Connect_Retry	Master_Log_File	Read_Master_Log_Pos	Relay_Log_File	Relay_Log_Pos	Relay_Master_Log_File	Slave_IO_Running	Slave_SQL_Running	Replicate_Do_DB	Replicate_Ignore_DB	Replicate_Do_Table	Replicate_Ignore_Table	Replicate_Wild_Do_Table	Replicate_Wild_Ignore_Table	Last_Errno	Last_Error	Skip_Counter	Exec_Master_Log_Pos	Relay_Log_Space	Until_Condition	Until_Log_File	Until_Log_Pos	Master_SSL_Allowed	Master_SSL_CA_File	Master_SSL_CA_Path	Master_SSL_Cert	Master_SSL_Cipher	Master_SSL_Key	Seconds_Behind_Master
-#	127.0.0.1	root	MASTER_PORT	1	master-bin.000001	1708	#	#	master-bin.000001	Yes	Yes							0		0	1708	#	None		0	No						#
+<Slave_IO_State>	127.0.0.1	root	MASTER_PORT	1	master-bin.000001	<Read_Master_Log_Pos>	<Relay_Log_File>	<Relay_Log_Pos>	master-bin.000001	Yes	Yes				<Replicate_Ignore_Table>			0		0	<Exec_Master_Log_Pos>	<Relay_Log_Space>	None		0	No						<Seconds_Behind_Master>
 SELECT hex(c1),hex(c2),c3 FROM t1 ORDER BY c3;
 hex(c1)	hex(c2)	c3
 1	1	row1

--- 1.4/mysql-test/t/rpl_ndb_idempotent.test	2005-04-29 18:10:42 +02:00
+++ 1.5/mysql-test/t/rpl_ndb_idempotent.test	2005-05-09 05:10:41 +02:00
@@ -41,7 +41,7 @@
 SELECT * FROM t1 ORDER BY c3;
 
 --replace_result $MASTER_MYPORT MASTER_PORT
---replace_column 1 # 8 # 9 # 23 # 33 #
+--replace_column 1 <Slave_IO_State> 7 <Read_Master_Log_Pos> 8
<Relay_Log_File> 9 <Relay_Log_Pos> 16 <Replicate_Ignore_Table> 22
<Exec_Master_Log_Pos> 23 <Relay_Log_Space> 33 <Seconds_Behind_Master>
 SHOW SLAVE STATUS;
 
 # stop slave and reset position to before the last changes
@@ -51,7 +51,7 @@
   master_log_pos = $the_pos ;
 
 --replace_result $MASTER_MYPORT MASTER_PORT
---replace_column 1 # 8 # 9 # 23 # 33 #
+--replace_column 1 <Slave_IO_State> 7 <Read_Master_Log_Pos> 8
<Relay_Log_File> 9 <Relay_Log_Pos> 16 <Replicate_Ignore_Table> 22
<Exec_Master_Log_Pos> 23 <Relay_Log_Space> 33 <Seconds_Behind_Master>
 SHOW SLAVE STATUS;
 
 # start the slave again

--- 1.1/mysql-test/t/rpl_ndb_sync.test	2005-05-01 16:03:56 +02:00
+++ 1.2/mysql-test/t/rpl_ndb_sync.test	2005-05-09 05:10:41 +02:00
@@ -67,6 +67,7 @@
 # 2.
 connection master;
 --replace_result $the_epoch <the_epoch>
+--replace_column 1 <the_pos>
 eval SELECT @the_pos:=Position,@the_file:=SUBSTRING_INDEX(FILE, '/', -1)
    FROM cluster_replication.binlog_index WHERE epoch > $the_epoch ORDER BY epoch ASC
LIMIT 1;
 let $the_pos= `SELECT @the_pos` ;
@@ -74,6 +75,7 @@
 
 # 3.
 connection slave;
+--replace_result $the_pos <the_pos>
 eval CHANGE MASTER TO
   master_log_file = '$the_file',
   master_log_pos = $the_pos ;
@@ -87,7 +89,7 @@
 --sleep 2
 connection slave;
 --replace_result $MASTER_MYPORT MASTER_PORT
---replace_column 1 # 8 # 9 # 23 # 33 #
+--replace_column 1 <Slave_IO_State> 7 <Read_Master_Log_Pos> 8
<Relay_Log_File> 9 <Relay_Log_Pos> 16 <Replicate_Ignore_Table> 22
<Exec_Master_Log_Pos> 23 <Relay_Log_Space> 33 <Seconds_Behind_Master>
 SHOW SLAVE STATUS;
 
 SELECT hex(c1),hex(c2),c3 FROM t1 ORDER BY c3;
Thread
bk commit into 5.1 tree (tomas:1.1830)tomas9 May