Below is the list of changes that have just been committed into a local
5.1 repository of stewart. When stewart does a push these changes will
be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet
1.1803 05/03/15 17:59:14 stewart@stripped +1 -0
Further work on WL2325 - NDB Injector
When creating table, add it to NDB replication
sql/ha_ndbcluster.cc
1.168 05/03/15 17:59:04 stewart@stripped +73 -2
Injector Thread
- have prototypes for utility functions
- mutex and cond mutex for starting the injector thread
- global variable for the injector thread's Ndb object
create table
- on create table, add it to ndb replication
ndb init
- wait for injector thread to have set up
# This is a BitKeeper patch. What follows are the unified diffs for the
# set of deltas contained in the patch. The rest of the patch, the part
# that BitKeeper cares about, is below these diffs.
# User: stewart
# Host: kennedy.(none)
# Root: /home/stewart/Documents/MySQL/5.1/ndb-wl2325
--- 1.167/sql/ha_ndbcluster.cc 2005-03-11 17:58:00 +11:00
+++ 1.168/sql/ha_ndbcluster.cc 2005-03-15 17:59:04 +11:00
@@ -122,7 +122,24 @@
#ifdef HAVE_REPLICATION
// NDB Injector thread (used for replication)
static pthread_t ndb_injector_thread;
+
+int ndbcluster_create_event(Ndb *ndb, const char* db_name, const char* table_name, const char* eventName);
+int ndbcluster_setup_repl_for_table(Ndb *ndb, const char* db_name, const char* table_name, const char* eventName);
+inline void ndb_rep_event_name(String *eventName,const char* db, const char* tbl);
+
extern "C" pthread_handler_decl(ndb_injector_thread_func, arg);
+
+pthread_mutex_t injector_startup_mutex;
+pthread_cond_t injector_startup_cond;
+
+/**
+ * NDB object that belongs to the injector thread.
+ * No locking is performed on this, so *only* call methods
+ * that you know are thread safe.
+ *
+ * You have been warned.
+ */
+Ndb *injector_ndb;
#endif
/*
@@ -3832,6 +3849,17 @@
my_errno= ndb_to_mysql_error(&err);
DBUG_RETURN(my_errno);
}
+
+#ifdef HAVE_REPLICATION
+ if (opt_bin_log) {
+ String eventName(200);
+ ndb_rep_event_name(&eventName,m_dbname,m_tabname);
+ ndbcluster_create_event(ndb,m_dbname,m_tabname,eventName.c_ptr());
+ ndbcluster_setup_repl_for_table(injector_ndb, m_dbname, m_tabname,
+ eventName.c_ptr());
+ }
+#endif
+
DBUG_PRINT("info", ("Table %s/%s created successfully",
m_dbname, m_tabname));
@@ -4605,6 +4633,10 @@
#ifdef HAVE_REPLICATION
if (opt_bin_log)
{
+ pthread_mutex_init(&injector_startup_mutex,MY_MUTEX_INIT_FAST);
+ pthread_cond_init(&injector_startup_cond,NULL);
+ pthread_mutex_lock(&injector_startup_mutex);
+
// Create injector thread
pthread_t inj;
if (pthread_create(&inj, &connection_attrib, ndb_injector_thread_func, 0))
@@ -4612,6 +4644,14 @@
DBUG_PRINT("error", ("Could not create ndb injector thread"));
goto ndbcluster_init_error;
}
+ /**
+ * Wait for the ndb injector thread to finish starting up.
+ * Afterwards, we no longer need this mutex
+ */
+ pthread_cond_wait(&injector_startup_cond,&injector_startup_mutex);
+ pthread_mutex_unlock(&injector_startup_mutex);
+ pthread_cond_destroy(&injector_startup_cond);
+ pthread_mutex_destroy(&injector_startup_mutex);
}
#endif
@@ -6886,8 +6926,9 @@
return error;
}
-int ndbcluster_setup_repl_for_table(Ndb *ndb, const char* db_name, const char* table_name, const char* eventName)
-{
+int
+ndbcluster_create_event(Ndb *ndb, const char* db_name, const char* table_name, const char* eventName)
+{
NDBDICT *dict;
dict= ndb->getDictionary();
@@ -6926,8 +6967,24 @@
eventName,
dict->getNdbError().code,
dict->getNdbError().message);
+ return res;
+ }
+
+ return 0;
+}
+
+int ndbcluster_setup_repl_for_table(Ndb *ndb, const char* db_name, const char* table_name, const char* eventName)
+{
+ NDBDICT *dict;
+
+ dict= ndb->getDictionary();
+ if(!dict)
+ {
+ sql_print_error("NDB Replication: could not setup replication, "
+ "Invalid NdbDictionary");
return -1;
}
+ const NdbDictionary::Table *table= dict->getTable(table_name);
NdbEventOperation *op= ndb->createEventOperation(eventName,100);
if(!op)
@@ -7041,6 +7098,9 @@
ndb_rep_event_name(&eventName,t.database,t.name);
+ if(ndbcluster_create_event(ndb,t.database,t.name,eventName.c_ptr())<0)
+ goto err;
+
if(ndbcluster_setup_repl_for_table(ndb, t.database, t.name,
eventName.c_ptr()) < 0)
goto err;
@@ -7049,7 +7109,18 @@
t.database,t.name,eventName.c_ptr());
} // for
+ /**
+ * Global reference to our ndb object
+ */
+ injector_ndb= ndb;
+
sql_print_information("Starting Cluster Replication");
+
+ /**
+ * We signal the thread that started us that we've finished
+ * starting up.
+ */
+ pthread_cond_signal(&injector_startup_cond);
/**
* Main NDB Injector loop
| Thread |
|---|
| • bk commit into 5.1 tree (stewart:1.1803) | Stewart Smith | 15 Mar |