Below is the list of changes that have just been committed into a local
5.1 repository of tomas. When tomas does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1830 05/05/09 05:10:56 tomas@stripped +12 -0
added "COM_DAEMON" to be used for e.g. injector thread
fixed some ndb rpl tests to become more flexible
added correct setup/stop of binlog thread with ndb table discovery at startime, and
cleanup
fixed drop database to handle event dropping correctly
fixed som time issues at startup beteen the "ndb" threads
added a state to ndb share to check if there has been a drop table or not
s removed apply status creation from slave thread
sql/sql_parse.cc
1.430 05/05/09 05:10:43 tomas@stripped +3 -1
added "COM_DAEMON" to be used for e.g. injector thread
sql/slave.cc
1.248 05/05/09 05:10:42 tomas@stripped +0 -5
removed apply status creation from slave thread
sql/rpl_injector.cc
1.14 05/05/09 05:10:42 tomas@stripped +4 -2
fixed link error with gcc
sql/ha_ndbcluster.h
1.95 05/05/09 05:10:42 tomas@stripped +11 -0
added a state to ndb share to check if there has been a drop table or not
sql/ha_ndbcluster.cc
1.236 05/05/09 05:10:41 tomas@stripped +390 -122
added correct setup/stop of binlog thread with ndb table discovery at startime, and
cleanup
fixed drop database to handle event dropping correctly
fixed som time issues at startup beteen the "ndb" threads
mysql-test/t/rpl_ndb_sync.test
1.2 05/05/09 05:10:41 tomas@stripped +3 -1
fixed some ndb rpl tests to become more flexible
mysql-test/t/rpl_ndb_idempotent.test
1.5 05/05/09 05:10:41 tomas@stripped +2 -2
fixed some ndb rpl tests to become more flexible
mysql-test/r/rpl_ndb_sync.result
1.2 05/05/09 05:10:41 tomas@stripped +3 -3
fixed some ndb rpl tests to become more flexible
mysql-test/r/rpl_ndb_idempotent.result
1.6 05/05/09 05:10:41 tomas@stripped +2 -2
fixed some ndb rpl tests to become more flexible
mysql-test/mysql-test-run.sh
1.269 05/05/09 05:10:41 tomas@stripped +1 -1
fixed bug in NDB_EXTRA_TEST check
mysql-test/include/have_ndb_extra.inc
1.2 05/05/09 05:10:41 tomas@stripped +2 -1
fixed bug in NDB_EXTRA_TEST check
include/mysql_com.h
1.96 05/05/09 05:10:41 tomas@stripped +1 -1
added "COM_DAEMON" to be used for e.g. injector thread
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: tomas
# Host: poseidon.ndb.mysql.com
# Root: /home/tomas/wl2325
--- 1.95/include/mysql_com.h 2005-02-28 15:22:22 +01:00
+++ 1.96/include/mysql_com.h 2005-05-09 05:10:41 +02:00
@@ -49,7 +49,7 @@
COM_TIME, COM_DELAYED_INSERT, COM_CHANGE_USER, COM_BINLOG_DUMP,
COM_TABLE_DUMP, COM_CONNECT_OUT, COM_REGISTER_SLAVE,
COM_PREPARE, COM_EXECUTE, COM_LONG_DATA, COM_CLOSE_STMT,
- COM_RESET_STMT, COM_SET_OPTION, COM_FETCH,
+ COM_RESET_STMT, COM_SET_OPTION, COM_FETCH, COM_DAEMON,
/* don't forget to update const char *command_name[] in sql_parse.cc */
/* Must be last */
--- 1.268/mysql-test/mysql-test-run.sh 2005-05-04 02:08:26 +02:00
+++ 1.269/mysql-test/mysql-test-run.sh 2005-05-09 05:10:41 +02:00
@@ -293,7 +293,7 @@
USE_NDBCLUSTER_SLAVE="--ndbcluster" ;
USE_RUNNING_NDBCLUSTER_SLAVE=`$ECHO "$1" | $SED -e
"s;--ndb-connectstring-slave=;;"` ;;
--ndb-extra-test)
-# NDBCLUSTER_EXTRA_OPTS=" "
+ NDBCLUSTER_EXTRA_OPTS=" "
NDB_EXTRA_TEST=1 ;
;;
--ndb_mgm-extra-opts=*)
--- 1.247/sql/slave.cc 2005-04-25 13:29:23 +02:00
+++ 1.248/sql/slave.cc 2005-05-09 05:10:42 +02:00
@@ -3266,10 +3266,6 @@
/* Slave SQL Thread entry point */
-#ifdef HAVE_NDBCLUSTER_DB
-int ndbcluster_create_apply_status_table(THD *thd);
-#endif
-
extern "C" pthread_handler_decl(handle_slave_sql,arg)
{
THD *thd; /* needs to be first for thread_stack */
@@ -3409,7 +3405,6 @@
#ifdef HAVE_NDBCLUSTER_DB
if (have_ndbcluster == SHOW_OPTION_YES)
{
- ndbcluster_create_apply_status_table(thd);
thd->variables.ndb_use_exact_count= 0;
}
#endif
--- 1.429/sql/sql_parse.cc 2005-05-03 12:45:07 +02:00
+++ 1.430/sql/sql_parse.cc 2005-05-09 05:10:43 +02:00
@@ -86,7 +86,7 @@
"Connect","Kill","Debug","Ping","Time","Delayed insert","Change user",
"Binlog Dump","Table Dump", "Connect Out", "Register Slave",
"Prepare", "Execute", "Long Data", "Close stmt",
- "Reset stmt", "Set option", "Fetch",
+ "Reset stmt", "Set option", "Fetch", "Daemon",
"Error" // Last command number
};
@@ -6314,6 +6314,8 @@
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
+ if (tmp->command == COM_DAEMON)
+ continue;
if (tmp->thread_id == id)
{
pthread_mutex_lock(&tmp->LOCK_delete); // Lock from delete
--- 1.235/sql/ha_ndbcluster.cc 2005-05-06 14:34:21 +02:00
+++ 1.236/sql/ha_ndbcluster.cc 2005-05-09 05:10:41 +02:00
@@ -89,7 +89,8 @@
typedef NdbDictionary::Dictionary NDBDICT;
typedef NdbDictionary::Event NDBEVENT;
-bool ndbcluster_inited= FALSE;
+static int ndbcluster_inited= 0;
+static int ndbcluster_util_inited= 0;
static Ndb* g_ndb= NULL;
static Ndb_cluster_connection* g_ndb_cluster_connection= NULL;
@@ -123,7 +124,7 @@
{
if (_db_on_)
{
- for (int j= 0; j < table->s->fields; j++)
+ for (uint j= 0; j < table->s->fields; j++)
{
char buf[1024];
int pos= 0;
@@ -171,7 +172,7 @@
static int replication_handle_drop_table(Ndb *ndb, const char* event_name,
NDB_SHARE *share);
inline void ndb_rep_event_name(String *event_name, const char* db, const char* tbl);
-
+static int ndbcluster_create_apply_status_table(THD *thd);
#ifndef DBUG_OFF
static void dbug_print_table(const char *info, TABLE *table);
#endif
@@ -234,6 +235,7 @@
static ulonglong ndb_latest_applied_binlog_epoch= 0;
static ulonglong ndb_latest_handled_binlog_epoch= 0;
static ulonglong ndb_latest_received_binlog_epoch= 0;
+/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
extern Uint64 g_latest_trans_gci;
/*
@@ -1005,6 +1007,20 @@
of table accessed in NDB
*/
+static int cmp_frm(const NDBTAB *ndbtab, const void *pack_data, uint pack_length)
+{
+ DBUG_ENTER("cmp_frm");
+ /*
+ * Compare FrmData in NDB with frm file from disk.
+ */
+ if ((pack_length != ndbtab->getFrmLength()) ||
+ (memcmp(pack_data, ndbtab->getFrmData(), pack_length)))
+ {
+ DBUG_RETURN(1);
+ }
+ DBUG_RETURN(0);
+}
+
int ha_ndbcluster::get_metadata(const char *path)
{
Ndb *ndb= get_ndb();
@@ -1042,8 +1058,7 @@
DBUG_RETURN(1);
}
- if ((pack_length != tab->getFrmLength()) ||
- (memcmp(pack_data, tab->getFrmData(), pack_length)))
+ if (cmp_frm(tab, pack_data, pack_length))
{
if (!invalidating_ndb_table)
{
@@ -4007,6 +4022,73 @@
}
}
+#ifdef HAVE_REPLICATION
+static int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
+ const char* db, const char *table_name,
+ int do_binlog)
+{
+ DBUG_ENTER("ndbcluster_create_binlog_setup");
+ pthread_mutex_lock(&ndbcluster_mutex);
+ NDB_SHARE *share= (NDB_SHARE*) hash_search(&ndbcluster_open_tables,
+ (byte*) key, strlen(key));
+ pthread_mutex_unlock(&ndbcluster_mutex);
+
+ if (share)
+ handle_trailing_share(share);
+
+ /**
+ * Share is needed to hold replication information
+ */
+ if (!(share=get_share(key,true)))
+ {
+ sql_print_error("NDB Binlog: "
+ "allocating table share for %s failed", key);
+ // ToDo: return error here?;
+ }
+
+ while(share && do_binlog)
+ {
+ // ToDo make sanity check of share so that the table is actually the same
+ // I.e. we need to do openfrm in this case
+
+ // Create the event in NDB
+ NDBDICT *dict= ndb->getDictionary();
+ const NDBTAB *ndbtab= dict->getTable(table_name);
+ String event_name(INJECTOR_EVENT_LEN);
+ ndb_rep_event_name(&event_name,db,table_name);
+ /* event should have been created by someone else, but let's make sure */
+ if (!dict->getEvent(event_name.c_ptr()))
+ {
+ ndb->setDatabaseName(db);
+ if (ndbcluster_create_event(ndb,ndbtab,event_name.c_ptr()))
+ {
+ sql_print_error("NDB Binlog: "
+ "FAILED CREATE (DISCOVER) TABLE Event: %s",
+ event_name.c_ptr());
+ break;
+ }
+ sql_print_information("NDB Binlog: "
+ "CREATE (DISCOVER) TABLE Event: %s",
+ event_name.c_ptr());
+ }
+ else
+ sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
+ event_name.c_ptr());
+ if (ndbcluster_create_event_ops(share, ndbtab,
+ event_name.c_ptr()) < 0)
+ {
+ sql_print_error("NDB Binlog:"
+ "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
+ event_name.c_ptr());
+ /* TODO: Handle the error? */
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(-1);
+}
+#endif /* HAVE_REPLICATION */
+
int ha_ndbcluster::create(const char *name,
TABLE *form,
HA_CREATE_INFO *info)
@@ -4034,65 +4116,10 @@
if ((my_errno= write_ndb_file()))
DBUG_RETURN(my_errno);
#ifdef HAVE_REPLICATION
- while(ndb_injector_thread_running > 0 &&
- m_tabname[0] != '#')
- {
- if ((my_errno= check_ndb_connection()))
- DBUG_RETURN(my_errno);
- /**
- * Share is needed to hold replication information
- */
- NDB_SHARE *share;
- if (!(share=get_share(name2,true)))
- {
- sql_print_error("NDB Binlog: "
- "allocating table share for %s failed", name2);
- break;
- // ToDo: return error here?;
- }
-
- // ToDo make sanity check of share so that the table is actually the same
- // I.e. we need to do openfrm in this case
-
- // Create the event in NDB
- Ndb *ndb= get_ndb();
- NDBDICT *dict= ndb->getDictionary();
- const NDBTAB *ndbtab= dict->getTable(m_tabname);
- String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name,m_dbname,m_tabname);
- /* event should have been created by someone else, but let's make sure */
- if (!dict->getEvent(event_name.c_ptr()))
- {
- ndb->setDatabaseName(m_dbname);
- if (ndbcluster_create_event(ndb,ndbtab,event_name.c_ptr()))
- {
- sql_print_error("NDB Binlog: "
- "FAILED CREATE (DISCOVER) TABLE Event: %s",
- event_name.c_ptr());
- free_share(&share);
- break;
- }
- sql_print_information("NDB Binlog: "
- "CREATE (DISCOVER) TABLE Event: %s",
- event_name.c_ptr());
- }
- else
- sql_print_information("NDB Binlog: DISCOVER TABLE Event: %s",
- event_name.c_ptr());
- if (ndbcluster_create_event_ops(share, ndbtab,
- event_name.c_ptr()) < 0)
- {
- sql_print_error("NDB Binlog:"
- "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s",
- event_name.c_ptr());
- /* TODO: Handle the error? */
- free_share(&share);
- DBUG_RETURN(0);
- }
- // keep lock on share, injector thread now has responsability
- DBUG_RETURN(0);
- }
-#endif
+ ndbcluster_create_binlog_setup(get_ndb(), name2, m_dbname, m_tabname,
+ ndb_injector_thread_running > 0 &&
+ m_tabname[0] != '#');
+#endif /* HAVE_REPLICATION */
DBUG_RETURN(my_errno);
} // if (create_from_engine)
@@ -4465,31 +4492,40 @@
*/
-int ha_ndbcluster::delete_table(const char *name)
-{
- DBUG_ENTER("ha_ndbcluster::delete_table");
- DBUG_PRINT("enter", ("name: %s", name));
- set_dbname(name);
- set_tabname(name);
-
- if (check_ndb_connection())
- DBUG_RETURN(HA_ERR_NO_CONNECTION);
-
- /* Call ancestor function to delete .ndb file */
- handler::delete_table(name);
+/* static version which does not need a handler */
+int
+ha_ndbcluster::delete_table(ha_ndbcluster *h, Ndb *ndb,
+ const char *path,
+ const char *db,
+ const char *table_name)
+{
+ DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table");
+ NdbDictionary::Dictionary *dict= ndb->getDictionary();
#ifdef HAVE_REPLICATION
NDB_SHARE *share= 0;
- share= get_share(name,false);
+ share= get_share(path,false);
#endif
/* Drop the table from NDB */
- int r= drop_table();
+
+ int r;
+ if (h)
+ {
+ r= h->drop_table();
+ }
+ else
+ {
+ ndb->setDatabaseName(db);
+ r= dict->dropTable(table_name);
+ }
+
if (r)
{
#ifdef HAVE_REPLICATION
if (share)
{
+ share->state= NSS_DROPPED;
free_share(&share);
free_share(&share);
}
@@ -4498,21 +4534,21 @@
}
#ifdef HAVE_REPLICATION
- Ndb *ndb= get_ndb();
if ( share && share->op)
{
- ndb->getDictionary()->forceGCPWait();
+ dict->forceGCPWait();
}
- if ( m_tabname[0] != '#' ) // FIXME tabname hack
+ if ( table_name[0] != '#' ) // FIXME tabname hack
{
String event_name(INJECTOR_EVENT_LEN);
- ndb_rep_event_name(&event_name,name+sizeof(share_prefix)-1,0);
+ ndb_rep_event_name(&event_name,path+sizeof(share_prefix)-1,0);
replication_handle_drop_table(ndb,event_name.c_ptr(),share);
}
if (share)
{
+ share->state= NSS_DROPPED;
free_share(&share);
free_share(&share);
}
@@ -4520,6 +4556,22 @@
DBUG_RETURN(0);
}
+int ha_ndbcluster::delete_table(const char *name)
+{
+ DBUG_ENTER("ha_ndbcluster::delete_table");
+ DBUG_PRINT("enter", ("name: %s", name));
+ set_dbname(name);
+ set_tabname(name);
+
+ if (check_ndb_connection())
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ /* Call ancestor function to delete .ndb file */
+ handler::delete_table(name);
+
+ DBUG_RETURN(delete_table(this,get_ndb(),name,m_dbname,m_tabname));
+}
+
/*
Drop table in NDB Cluster
@@ -4938,14 +4990,14 @@
drop_list.push_back(thd->strdup(t.name));
}
// Drop any tables belonging to database
+ char full_path[FN_REFLEN];
+ char *t= strxnmov(full_path,FN_REFLEN,share_prefix,dbname,"/",NullS);
ndb->setDatabaseName(dbname);
List_iterator_fast<char> it(drop_list);
while ((tabname=it++))
{
-#ifdef HAVE_REPLICATION
- // ToDo free and associated shares
-#endif
- if (dict->dropTable(tabname))
+ strxnmov(t,FN_REFLEN-(t-full_path),tabname,NullS);
+ if (ha_ndbcluster::delete_table(0, ndb, full_path, dbname, tabname))
{
const NdbError err= dict->getNdbError();
if (err.code != 709)
@@ -4959,6 +5011,81 @@
}
+static int ndbcluster_find_all_files(THD *thd)
+{
+ DBUG_ENTER("ndbcluster_find_all_files");
+ Ndb* ndb;
+ char key[FN_REFLEN];
+ NdbDictionary::Dictionary::List list;
+
+ if (!(ndb= check_ndb_in_thd(thd)))
+ DBUG_RETURN(HA_ERR_NO_CONNECTION);
+
+ NDBDICT *dict= ndb->getDictionary();
+ if (dict->listObjects(list,
+ NdbDictionary::Object::UserTable) != 0)
+ ERR_RETURN(dict->getNdbError());
+
+ for (uint i= 0 ; i < list.count ; i++)
+ {
+ NdbDictionary::Dictionary::List::Element& t= list.elements[i];
+ DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+ ndb->setDatabaseName(t.database);
+ const NDBTAB *ndbtab;
+ if (!(ndbtab= dict->getTable(t.name)))
+ {
+ sql_print_error("NDB: failed to setup table %s.%s, error: %d, %s",
+ t.database,t.name,
+ dict->getNdbError().code,
+ dict->getNdbError().message);
+ continue;
+ }
+
+ strxnmov(key, FN_LEN, mysql_data_home,"/",t.database,"/",t.name, NullS);
+ const void *data, *pack_data;
+ uint length, pack_length;
+ int discover= 0;
+ if (readfrm(key, &data, &length) ||
+ packfrm(data, length, &pack_data, &pack_length))
+ {
+ discover= 1;
+ sql_print_information("NDB: missing frm for %s.%s, discovering...",
+ t.database,t.name);
+ }
+ else if (cmp_frm(ndbtab, pack_data, pack_length))
+ {
+ discover= 1;
+ sql_print_information("NDB: mismatch in frm for %s.%s, discovering...",
+ t.database,t.name);
+ }
+ my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
+ my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
+
+ if (discover)
+ {
+ // ToDo database needs to be created if missing
+ pthread_mutex_lock(&LOCK_open);
+ if (ha_create_table_from_engine(thd, t.database, t.name, TRUE))
+ {
+ ; // ToDo handle error?
+ }
+ pthread_mutex_unlock(&LOCK_open);
+ }
+ else
+ {
+#ifdef HAVE_REPLICATION
+ /*
+ * set up replication for this table
+ */
+ ndbcluster_create_binlog_setup(ndb, key, t.database, t.name,
+ ndb_injector_thread_running > 0 &&
+ t.name[0] != '#');
+#endif
+ }
+ }
+ DBUG_RETURN(0);
+}
+
int ndbcluster_find_files(THD *thd,const char *db,const char *path,
const char *wild, bool dir, List<char> *files)
{
@@ -5001,7 +5128,7 @@
for (i= 0 ; i < list.count ; i++)
{
NdbDictionary::Dictionary::List::Element& t= list.elements[i];
- DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
+ DBUG_PRINT("info", ("Found %s/%s in NDB", t.database, t.name));
// Add only tables that belongs to db
if (my_strcasecmp(system_charset_info, t.database, db))
@@ -5126,12 +5253,7 @@
static int connect_callback()
{
update_status_variables(g_ndb_cluster_connection);
-#ifdef HAVE_REPLICATION
- if (ndb_injector_thread_running > 0)
- {
- pthread_cond_signal(&injector_cond);
- }
-#endif /* HAVE_REPLICATION */
+ pthread_cond_signal(&COND_ndb_util_thread);
return 0;
}
@@ -5260,6 +5382,46 @@
(void) pthread_cond_signal(&COND_ndb_util_thread);
(void) pthread_mutex_unlock(&LOCK_ndb_util_thread);
+#ifdef HAVE_REPLICATION
+ /* remove all shares */
+ {
+ pthread_mutex_lock(&ndbcluster_mutex);
+ for (uint i= 0; i < ndbcluster_open_tables.records; i++)
+ {
+ NDB_SHARE *share= (NDB_SHARE *)hash_element(&ndbcluster_open_tables, i);
+ if (share->table)
+ DBUG_PRINT("share",
+ ("table->s->db.table_name: %s.%s",
+ share->table->s->db, share->table->s->table_name));
+ if (share->state != NSS_DROPPED && !--share->use_count)
+ {
+ real_free_share(&share);
+ }
+ else
+ {
+ DBUG_PRINT("share",
+ ("[%d] 0x%x key: %s key_length: %d",
+ i, share, share->key, share->key_length));
+ DBUG_PRINT("share",
+ ("db.tablename: %s.%s use_count: %d commit_count: %d",
+ share->db, share->table_name,
+ share->use_count, share->commit_count));
+ }
+ }
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ }
+ /* wait for injector thread */
+ pthread_mutex_lock(&injector_mutex);
+ while (ndb_injector_thread_running > 0)
+ {
+ struct timespec abstime;
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&injector_cond,&injector_mutex,&abstime);
+ }
+ pthread_mutex_unlock(&injector_mutex);
+#endif
+
+#if 1
if(g_ndb)
delete g_ndb;
g_ndb= NULL;
@@ -5271,7 +5433,9 @@
pthread_mutex_destroy(&ndbcluster_mutex);
pthread_mutex_destroy(&LOCK_ndb_util_thread);
pthread_cond_destroy(&COND_ndb_util_thread);
+#endif
ndbcluster_inited= 0;
+ ndbcluster_util_inited= 0;
DBUG_RETURN(0);
}
@@ -5792,9 +5956,9 @@
{
static ulong trailing_share_id= 0;
- DBUG_PRINT("error", ("rename_share: %s already exists, use_count=%d.",
+ DBUG_PRINT("error", ("NDB_SHARE: %s already exists, use_count=%d.",
share->key, share->use_count));
- sql_print_error("rename_share: %s already exists, use_count=%d."
+ sql_print_error("NDB_SHARE: %s already exists, use_count=%d."
" Moving away for safety, but possible memleak.",
share->key, share->use_count);
dbug_print_open_tables();
@@ -6689,6 +6853,7 @@
Ndb* ndb;
int error= 0;
struct timespec abstime;
+ List<NDB_SHARE> util_open_tables;
my_thread_init();
DBUG_ENTER("ndb_util_thread");
@@ -6709,10 +6874,68 @@
delete ndb;
DBUG_RETURN(NULL);
}
+ thd->init_for_queries();
+ thd->version=refresh_version;
+ thd->set_time();
+ thd->host_or_ip= "";
+ thd->client_capabilities = 0;
+ my_net_init(&thd->net, 0);
+ thd->master_access= ~0;
+ thd->priv_user = 0;
+
+ /**
+ * wait for mysql server to start
+ */
+ pthread_mutex_lock(&LOCK_server_started);
+ while(!mysqld_server_started)
+ pthread_cond_wait(&COND_server_started,&LOCK_server_started);
+ pthread_mutex_unlock(&LOCK_server_started);
+
+ /**
+ * Wait for cluster to start
+ */
+ pthread_mutex_lock(&LOCK_ndb_util_thread);
+ while (!ndb_cluster_node_id)
+ {
+ /* ndb not connected yet */
+ set_timespec(abstime, 1);
+ pthread_cond_timedwait(&COND_ndb_util_thread,
+ &LOCK_ndb_util_thread,
+ &abstime);
+ if (abort_loop)
+ {
+ pthread_mutex_unlock(&LOCK_ndb_util_thread);
+ goto ndb_util_thread_end;
+ }
+ }
+ pthread_mutex_unlock(&LOCK_ndb_util_thread);
+
+ /**
+ * Get all table definitions from the storage node
+ */
+ ndbcluster_find_all_files(thd);
+
+#ifdef HAVE_REPLICATION
+ /**
+ * create tables needed by the replication
+ */
+ ndbcluster_create_apply_status_table(thd);
+#endif
+
+ ndbcluster_util_inited= 1;
+
+#ifdef HAVE_REPLICATION
+ /**
+ * If running, signal injector thread that all is setup
+ */
+ if (ndb_injector_thread_running > 0)
+ {
+ pthread_cond_signal(&injector_cond);
+ }
+#endif
- List<NDB_SHARE> util_open_tables;
set_timespec(abstime, 0);
- for (;;)
+ for (;!abort_loop;)
{
pthread_mutex_lock(&LOCK_ndb_util_thread);
@@ -6747,7 +6970,7 @@
#ifndef DBUG_OFF
share->no_use_count_check= 1;
#endif
-#endif
+#endif /* HAVE_REPLICATION */
share->use_count++; /* Make sure the table can't be closed */
DBUG_PRINT("ndb_util_thread",
("Found open table[%d]: %s, use_count: %d",
@@ -6769,7 +6992,7 @@
free_share(&share);
continue;
}
-#endif
+#endif /* HAVE_REPLICATION */
DBUG_PRINT("ndb_util_thread",
("Fetching commit count for: %s",
share->key));
@@ -6830,7 +7053,7 @@
abstime.tv_nsec-= 1000000000;
}
}
-
+ndb_util_thread_end:
thd->cleanup();
delete thd;
delete ndb;
@@ -8027,22 +8250,58 @@
return get_share("./" NDB_REP_DB "/" NDB_APPLY_TABLE,false);
}
-int
+static int
ndbcluster_create_apply_status_table(THD *thd)
{
- /**
- * Note, updating this table schema must be reflected in ndb_restore
- */
char buf[1024], *end;
- end=strmov(buf, "CREATE TABLE IF NOT EXISTS " NDB_REP_DB "." NDB_APPLY_TABLE
- " ( server_id INT UNSIGNED NOT NULL, epoch BIGINT UNSIGNED NOT NULL, "
- " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
+ /* Check if we already have the apply status table.
+ * If so it should have been discovered at startup
+ * and thus have a share
+ */
+ end= strxnmov(buf, FN_REFLEN, mysql_data_home,
+ "/" NDB_REP_DB "/" NDB_APPLY_TABLE,NullS);
+ pthread_mutex_lock(&ndbcluster_mutex);
+ void *tmp= hash_search(&ndbcluster_open_tables, (byte*) buf, end-buf);
+ pthread_mutex_unlock(&ndbcluster_mutex);
+ if ( tmp )
+ return 0;
+
+ /* save some thread variables as not to changes them */
ulong save_query_length= thd->query_length;
char *save_query= thd->query;
ulong save_thread_id= thd->variables.pseudo_thread_id;
NET save_net= thd->net;
+ /* Check if apply status table exists in MySQL "dictionary"
+ * if so, remove it since there is none in Ndb
+ */
+ {
+ TABLE_LIST table_list;
+ end= strxnmov(buf, FN_REFLEN, NDB_REP_DB,NullS);
+ end++;
+ strxnmov(end, FN_REFLEN, NDB_REP_DB,NullS);
+ bzero((char*) &table_list,sizeof(table_list));
+ table_list.db= (char*)buf;
+ table_list.alias= table_list.table_name= (char*)end;
+ pthread_mutex_lock(&LOCK_open);
+ (void)mysql_rm_table_part2(thd, &table_list,
+ /* if_exists */ TRUE,
+ /* drop_temporary */ FALSE,
+ /* drop_view */ FALSE,
+ /* dont_log_query*/ TRUE);
+ pthread_mutex_unlock(&LOCK_open);
+ /* Clear error message that is returned when table is deleted */
+ thd->clear_error();
+ }
+
+ /**
+ * Note, updating this table schema must be reflected in ndb_restore
+ */
+ end=strmov(buf, "CREATE TABLE IF NOT EXISTS " NDB_REP_DB "." NDB_APPLY_TABLE
+ " ( server_id INT UNSIGNED NOT NULL, epoch BIGINT UNSIGNED NOT NULL, "
+ " PRIMARY KEY USING HASH (server_id) ) ENGINE=NDB");
+
bzero((char*)&thd->net,sizeof(NET));
thd->query_length= end-buf;
thd->query= buf;
@@ -8271,10 +8530,13 @@
event_name,
dict->getNdbError().code,
dict->getNdbError().message);
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
- dict->getNdbError().code, dict->getNdbError().message,
- "NDB");
+ if ( dict->getNdbError().code != 4710 )
+ {
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
+ dict->getNdbError().code, dict->getNdbError().message,
+ "NDB");
+ }
ndb_error= dict->getNdbError();
DBUG_RETURN(-1);
}
@@ -8736,8 +8998,11 @@
pthread_exit(0);
DBUG_RETURN(NULL);
}
+
thd->init_for_queries();
+ thd->command= COM_DAEMON;
injector_thd= thd;
+
/**
* Set up ndb binlog
*/
@@ -8784,7 +9049,7 @@
thd->proc_info= "Waiting for ndbcluster to start";
pthread_mutex_lock(&injector_mutex);
- while (!ndb_cluster_node_id)
+ while (!ndbcluster_util_inited)
{
/* ndb not connected yet */
struct timespec abstime;
@@ -8798,12 +9063,6 @@
}
pthread_mutex_unlock(&injector_mutex);
- thd->proc_info= "Waiting for mysqld to finish starting";
- pthread_mutex_lock(&LOCK_server_started);
- while(!mysqld_server_started)
- pthread_cond_wait(&COND_server_started,&LOCK_server_started);
- pthread_mutex_unlock(&LOCK_server_started);
-
/**
* Main NDB Injector loop
*/
@@ -8818,21 +9077,26 @@
static char db[]="";
thd->db=db;
open_binlog_index(thd,&tables,&binlog_index);
- ndbcluster_create_apply_status_table(thd);
apply_status_share= ndbcluster_get_apply_status_share();
+ assert(apply_status_share);
thd->db=db;
}
- for (;!(abort_loop || do_ndbcluster_binlog_close_connection);)
+ for (;!((abort_loop || do_ndbcluster_binlog_close_connection) &&
+ ndb_latest_received_binlog_epoch >= g_latest_trans_gci);)
{
/**
* now we don't want any events before next gci is complete
*/
thd->proc_info= "Waiting for event from ndbcluster";
+ thd->set_time();
int res= ndb->pollEvents(1000); // wait for event or 1000 ms
- if (abort_loop || do_ndbcluster_binlog_close_connection)
+ if ( (abort_loop || do_ndbcluster_binlog_close_connection) &&
+ ndb_latest_received_binlog_epoch >= g_latest_trans_gci )
+ {
break; /* Shutting down server */
+ }
if (binlog_index && binlog_index->s->version < refresh_version)
{
@@ -8938,7 +9202,6 @@
}
err:
//close_thread_tables(thd);
- ndb_injector_thread_running= -1;
pthread_mutex_lock(&injector_mutex);
/* don't mess with the injector_ndb anymore from other threads */
injector_ndb= 0;
@@ -8951,6 +9214,7 @@
{
free_share(&apply_status_share);
}
+
/* remove all event operations */
if (ndb)
{
@@ -8968,10 +9232,14 @@
ndb= 0;
}
+ ndb_injector_thread_running= -1;
+ (void) pthread_cond_signal(&injector_cond);
+
thd->cleanup();
delete thd;
DBUG_PRINT("exit", ("ndb_injector_thread"));
my_thread_end();
+
pthread_exit(0);
DBUG_RETURN(NULL);
}
--- 1.94/sql/ha_ndbcluster.h 2005-05-02 17:06:31 +02:00
+++ 1.95/sql/ha_ndbcluster.h 2005-05-09 05:10:42 +02:00
@@ -60,6 +60,11 @@
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
+typedef enum {
+ NSS_INITIAL= 0,
+ NSS_DROPPED
+} NDB_SHARE_STATE;
+
typedef struct st_ndbcluster_share {
MEM_ROOT mem_root;
THR_LOCK lock;
@@ -71,6 +76,7 @@
uint commit_count_lock;
ulonglong commit_count;
#ifdef HAVE_REPLICATION
+ NDB_SHARE_STATE state;
#ifndef DEBUG_OFF
int no_use_count_check;
#endif
@@ -527,7 +533,12 @@
qc_engine_callback *engine_callback,
ulonglong *engine_data);
private:
+ friend int ndbcluster_drop_database(const char *path);
int alter_table_name(const char *to);
+ static int delete_table(ha_ndbcluster *h, Ndb *ndb,
+ const char *path,
+ const char *db,
+ const char *table_name);
int drop_table();
int create_index(const char *name, KEY *key_info, bool unique);
int create_ordered_index(const char *name, KEY *key_info);
--- 1.13/sql/rpl_injector.cc 2005-04-26 09:05:04 +02:00
+++ 1.14/sql/rpl_injector.cc 2005-05-09 05:10:42 +02:00
@@ -98,11 +98,13 @@
{
}
+static injector *s_injector= 0;
injector* injector::
instance()
{
- static injector s_injector;
- return &s_injector; // "There can be only one [instance]"
+ if (s_injector == 0)
+ s_injector= new injector;
+ return s_injector; // "There can be only one [instance]"
}
--- 1.1/mysql-test/include/have_ndb_extra.inc 2005-05-04 12:20:33 +02:00
+++ 1.2/mysql-test/include/have_ndb_extra.inc 2005-05-09 05:10:41 +02:00
@@ -1 +1,2 @@
---exec test x$NDB_EXTRA_TEST = x1
+-- require r/have_ndb_extra.require
+eval select $NDB_EXTRA_TEST;
--- 1.5/mysql-test/r/rpl_ndb_idempotent.result 2005-04-29 18:10:42 +02:00
+++ 1.6/mysql-test/r/rpl_ndb_idempotent.result 2005-05-09 05:10:41 +02:00
@@ -34,14 +34,14 @@
row4 D 4
SHOW SLAVE STATUS;
Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master
-# 127.0.0.1 root MASTER_PORT 1 master-bin.000001 1040 # # master-bin.000001 Yes Yes 0 0 1040 # None 0 No #
+<Slave_IO_State> 127.0.0.1 root MASTER_PORT 1 master-bin.000001 <Read_Master_Log_Pos> <Relay_Log_File> <Relay_Log_Pos> master-bin.000001 Yes Yes <Replicate_Ignore_Table> 0 0 <Exec_Master_Log_Pos> <Relay_Log_Space> None 0 No <Seconds_Behind_Master>
STOP SLAVE;
CHANGE MASTER TO
master_log_file = 'master-bin.000001',
master_log_pos = 246 ;
SHOW SLAVE STATUS;
Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master
-# 127.0.0.1 root MASTER_PORT 1 master-bin.000001 246 # # master-bin.000001 No No 0 0 246 # None 0 No #
+<Slave_IO_State> 127.0.0.1 root MASTER_PORT 1 master-bin.000001 <Read_Master_Log_Pos> <Relay_Log_File> <Relay_Log_Pos> master-bin.000001 No No <Replicate_Ignore_Table> 0 0 <Exec_Master_Log_Pos> <Relay_Log_Space> None 0 No <Seconds_Behind_Master>
START SLAVE;
SELECT * FROM t1 ORDER BY c3;
c1 c2 c3
--- 1.1/mysql-test/r/rpl_ndb_sync.result 2005-05-01 16:03:56 +02:00
+++ 1.2/mysql-test/r/rpl_ndb_sync.result 2005-05-09 05:10:41 +02:00
@@ -58,14 +58,14 @@
SELECT @the_pos:=Position,@the_file:=SUBSTRING_INDEX(FILE, '/', -1)
FROM cluster_replication.binlog_index WHERE epoch > <the_epoch> ORDER BY epoch
ASC LIMIT 1;
@the_pos:=Position @the_file:=SUBSTRING_INDEX(FILE, '/', -1)
-1403 master-bin.000001
+<the_pos> master-bin.000001
CHANGE MASTER TO
master_log_file = 'master-bin.000001',
-master_log_pos = 1403 ;
+master_log_pos = <the_pos> ;
START SLAVE;
SHOW SLAVE STATUS;
Slave_IO_State Master_Host Master_User Master_Port Connect_Retry Master_Log_File Read_Master_Log_Pos Relay_Log_File Relay_Log_Pos Relay_Master_Log_File Slave_IO_Running Slave_SQL_Running Replicate_Do_DB Replicate_Ignore_DB Replicate_Do_Table Replicate_Ignore_Table Replicate_Wild_Do_Table Replicate_Wild_Ignore_Table Last_Errno Last_Error Skip_Counter Exec_Master_Log_Pos Relay_Log_Space Until_Condition Until_Log_File Until_Log_Pos Master_SSL_Allowed Master_SSL_CA_File Master_SSL_CA_Path Master_SSL_Cert Master_SSL_Cipher Master_SSL_Key Seconds_Behind_Master
-# 127.0.0.1 root MASTER_PORT 1 master-bin.000001 1708 # # master-bin.000001 Yes Yes 0 0 1708 # None 0 No #
+<Slave_IO_State> 127.0.0.1 root MASTER_PORT 1 master-bin.000001 <Read_Master_Log_Pos> <Relay_Log_File> <Relay_Log_Pos> master-bin.000001 Yes Yes <Replicate_Ignore_Table> 0 0 <Exec_Master_Log_Pos> <Relay_Log_Space> None 0 No <Seconds_Behind_Master>
SELECT hex(c1),hex(c2),c3 FROM t1 ORDER BY c3;
hex(c1) hex(c2) c3
1 1 row1
--- 1.4/mysql-test/t/rpl_ndb_idempotent.test 2005-04-29 18:10:42 +02:00
+++ 1.5/mysql-test/t/rpl_ndb_idempotent.test 2005-05-09 05:10:41 +02:00
@@ -41,7 +41,7 @@
SELECT * FROM t1 ORDER BY c3;
--replace_result $MASTER_MYPORT MASTER_PORT
---replace_column 1 # 8 # 9 # 23 # 33 #
+--replace_column 1 <Slave_IO_State> 7 <Read_Master_Log_Pos> 8
<Relay_Log_File> 9 <Relay_Log_Pos> 16 <Replicate_Ignore_Table> 22
<Exec_Master_Log_Pos> 23 <Relay_Log_Space> 33 <Seconds_Behind_Master>
SHOW SLAVE STATUS;
# stop slave and reset position to before the last changes
@@ -51,7 +51,7 @@
master_log_pos = $the_pos ;
--replace_result $MASTER_MYPORT MASTER_PORT
---replace_column 1 # 8 # 9 # 23 # 33 #
+--replace_column 1 <Slave_IO_State> 7 <Read_Master_Log_Pos> 8
<Relay_Log_File> 9 <Relay_Log_Pos> 16 <Replicate_Ignore_Table> 22
<Exec_Master_Log_Pos> 23 <Relay_Log_Space> 33 <Seconds_Behind_Master>
SHOW SLAVE STATUS;
# start the slave again
--- 1.1/mysql-test/t/rpl_ndb_sync.test 2005-05-01 16:03:56 +02:00
+++ 1.2/mysql-test/t/rpl_ndb_sync.test 2005-05-09 05:10:41 +02:00
@@ -67,6 +67,7 @@
# 2.
connection master;
--replace_result $the_epoch <the_epoch>
+--replace_column 1 <the_pos>
eval SELECT @the_pos:=Position,@the_file:=SUBSTRING_INDEX(FILE, '/', -1)
FROM cluster_replication.binlog_index WHERE epoch > $the_epoch ORDER BY epoch ASC
LIMIT 1;
let $the_pos= `SELECT @the_pos` ;
@@ -74,6 +75,7 @@
# 3.
connection slave;
+--replace_result $the_pos <the_pos>
eval CHANGE MASTER TO
master_log_file = '$the_file',
master_log_pos = $the_pos ;
@@ -87,7 +89,7 @@
--sleep 2
connection slave;
--replace_result $MASTER_MYPORT MASTER_PORT
---replace_column 1 # 8 # 9 # 23 # 33 #
+--replace_column 1 <Slave_IO_State> 7 <Read_Master_Log_Pos> 8
<Relay_Log_File> 9 <Relay_Log_Pos> 16 <Replicate_Ignore_Table> 22
<Exec_Master_Log_Pos> 23 <Relay_Log_Space> 33 <Seconds_Behind_Master>
SHOW SLAVE STATUS;
SELECT hex(c1),hex(c2),c3 FROM t1 ORDER BY c3;
| Thread |
|---|
| • bk commit into 5.1 tree (tomas:1.1830) | tomas | 9 May |