From: magnus.blaudd Date: October 31 2011 7:41am Subject: bzr push into mysql-5.5-cluster branch (magnus.blaudd:3619 to 3620) List-Archive: http://lists.mysql.com/commits/141662 Message-Id: <201110310741.p9V7fLPm008476@acsmt358.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3620 magnus.blaudd@stripped 2011-10-31 [merge] Merge in latest ndbcluster with schema dist refactorings added: sql/ndb_event_data.cc sql/ndb_event_data.h modified: sql/ha_ndbcluster.cc sql/ha_ndbcluster.h sql/ha_ndbcluster_binlog.cc sql/ha_ndbcluster_binlog.h sql/ha_ndbcluster_glue.h sql/ndb_thd.cc sql/ndb_thd.h storage/ndb/CMakeLists.txt 3619 magnus.blaudd@stripped 2011-10-27 [merge] Merge added: sql/ndb_anyvalue.cc sql/ndb_anyvalue.h sql/ndb_binlog_extra_row_info.cc sql/ndb_binlog_extra_row_info.h sql/ndb_ndbapi_util.cc sql/ndb_ndbapi_util.h modified: sql/ha_ndbcluster.cc sql/ha_ndbcluster.h sql/ha_ndbcluster_binlog.cc sql/ha_ndbcluster_binlog.h sql/ha_ndbcluster_connection.cc sql/ndb_share.h storage/ndb/CMakeLists.txt === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2011-10-27 11:57:04 +0000 +++ b/sql/ha_ndbcluster.cc 2011-10-29 09:02:21 +0000 @@ -53,6 +53,7 @@ #include "ndb_conflict_trans.h" #include "ndb_anyvalue.h" #include "ndb_binlog_extra_row_info.h" +#include "ndb_event_data.h" // ndb interface initialization/cleanup extern "C" void ndb_init_internal(); @@ -8760,6 +8761,31 @@ ha_ndbcluster::start_transaction_part_id /** + Static error print function called from static handler method + ndbcluster_commit and ndbcluster_rollback. +*/ +static void +ndbcluster_print_error(int error, const NdbOperation *error_op) +{ + DBUG_ENTER("ndbcluster_print_error"); + TABLE_SHARE share; + const char *tab_name= (error_op) ? error_op->getTableName() : ""; + if (tab_name == NULL) + { + DBUG_ASSERT(tab_name != NULL); + tab_name= ""; + } + share.db.str= (char*) ""; + share.db.length= 0; + share.table_name.str= (char *) tab_name; + share.table_name.length= strlen(tab_name); + ha_ndbcluster error_handler(ndbcluster_hton, &share); + error_handler.print_error(error, MYF(0)); + DBUG_VOID_RETURN; +} + + +/** Commit a transaction started in NDB. */ @@ -11841,7 +11867,7 @@ static int ndbcluster_close_connection(h /** Try to discover one table from NDB. */ - +static int ndbcluster_discover(handlerton *hton, THD* thd, const char *db, const char *name, uchar **frmblob, @@ -11962,7 +11988,7 @@ err: /** Check if a table exists in NDB. */ - +static int ndbcluster_table_exists_in_engine(handlerton *hton, THD* thd, const char *db, const char *name) @@ -12751,30 +12777,6 @@ void ha_ndbcluster::print_error(int erro /** - Static error print function called from static handler method - ndbcluster_commit and ndbcluster_rollback. -*/ - -void ndbcluster_print_error(int error, const NdbOperation *error_op) -{ - DBUG_ENTER("ndbcluster_print_error"); - TABLE_SHARE share; - const char *tab_name= (error_op) ? error_op->getTableName() : ""; - if (tab_name == NULL) - { - DBUG_ASSERT(tab_name != NULL); - tab_name= ""; - } - share.db.str= (char*) ""; - share.db.length= 0; - share.table_name.str= (char *) tab_name; - share.table_name.length= strlen(tab_name); - ha_ndbcluster error_handler(ndbcluster_hton, &share); - error_handler.print_error(error, MYF(0)); - DBUG_VOID_RETURN; -} - -/** Set a given location from full pathname to database name. */ === modified file 'sql/ha_ndbcluster.h' --- a/sql/ha_ndbcluster.h 2011-10-27 11:48:43 +0000 +++ b/sql/ha_ndbcluster.h 2011-10-28 07:14:45 +0000 @@ -42,7 +42,6 @@ class NdbBlob; class NdbIndexStat; class NdbEventOperation; class ha_ndbcluster_cond; -class Ndb_event_data; class NdbQuery; class NdbQueryOperation; class NdbQueryOperationTypeWrapper; @@ -497,9 +496,6 @@ private: const uchar *record, bool use_active_index); friend int ndbcluster_drop_database_impl(THD *thd, const char *path); - friend int ndb_handle_schema_change(THD *thd, - Ndb *ndb, NdbEventOperation *pOp, - NDB_SHARE *share); void check_read_before_write_removal(); static int drop_table_impl(THD *thd, ha_ndbcluster *h, Ndb *ndb, @@ -814,12 +810,6 @@ private: int add_handler_to_open_tables(THD*, Thd_ndb*, ha_ndbcluster* handler); }; -int ndbcluster_discover(THD* thd, const char* dbname, const char* name, - const void** frmblob, uint* frmlen); -int ndbcluster_table_exists_in_engine(THD* thd, - const char *db, const char *name); -void ndbcluster_print_error(int error, const NdbOperation *error_op); - static const char ndbcluster_hton_name[]= "ndbcluster"; static const int ndbcluster_hton_name_length=sizeof(ndbcluster_hton_name)-1; extern int ndbcluster_terminating; === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2011-10-27 09:34:06 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2011-10-29 14:21:55 +0000 @@ -58,6 +58,7 @@ bool ndb_log_empty_epochs(void); #include "ndb_dist_priv_util.h" #include "ndb_anyvalue.h" #include "ndb_binlog_extra_row_info.h" +#include "ndb_event_data.h" /* Timeout for syncing schema events between @@ -141,9 +142,9 @@ static int ndbcluster_binlog_terminating Mutex and condition used for interacting between client sql thread and injector thread */ -pthread_t ndb_binlog_thread; -pthread_mutex_t injector_mutex; -pthread_cond_t injector_cond; +static pthread_t ndb_binlog_thread; +static pthread_mutex_t injector_mutex; +static pthread_cond_t injector_cond; /* NDB Injector thread (used for binlog creation) */ static ulonglong ndb_latest_applied_binlog_epoch= 0; @@ -152,7 +153,7 @@ static ulonglong ndb_latest_received_bin NDB_SHARE *ndb_apply_status_share= 0; NDB_SHARE *ndb_schema_share= 0; -pthread_mutex_t ndb_schema_share_mutex; +static pthread_mutex_t ndb_schema_share_mutex; extern my_bool opt_log_slave_updates; static my_bool g_ndb_log_slave_updates; @@ -260,20 +261,6 @@ static void dbug_print_table(const char #define dbug_print_table(a,b) #endif -static inline void -print_warning_list(const char* prefix, List& list) -{ - List_iterator_fast it(list); - MYSQL_ERROR *err; - while ((err= it++)) - { - sql_print_warning("%s: (%d)%s", - prefix, - MYSQL_ERROR_get_sql_errno(err), - MYSQL_ERROR_get_message_text(err)); - } -} - static void run_query(THD *thd, char *buf, char *end, const int *no_print_error) @@ -2087,7 +2074,7 @@ end: static int ndb_handle_schema_change(THD *thd, Ndb *is_ndb, NdbEventOperation *pOp, - Ndb_event_data *event_data) + const Ndb_event_data *event_data) { DBUG_ENTER("ndb_handle_schema_change"); NDB_SHARE *share= event_data->share; @@ -2207,14 +2194,87 @@ private: class Ndb_schema_event_handler { - struct Cluster_schema + class Ndb_schema_op { + // Unpack Ndb_schema_op from event_data pointer + void unpack_event(const Ndb_event_data *event_data) + { + TABLE *table= event_data->shadow_table; + Field **field; + /* unpack blob values */ + uchar* blobs_buffer= 0; + uint blobs_buffer_size= 0; + my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set); + { + ptrdiff_t ptrdiff= 0; + int ret= get_ndb_blobs_value(table, event_data->ndb_value[0], + blobs_buffer, blobs_buffer_size, + ptrdiff); + if (ret != 0) + { + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + DBUG_PRINT("info", ("blob read error")); + DBUG_ASSERT(FALSE); + } + } + /* db varchar 1 length uchar */ + field= table->field; + db_length= *(uint8*)(*field)->ptr; + DBUG_ASSERT(db_length <= (*field)->field_length); + DBUG_ASSERT((*field)->field_length + 1 == sizeof(db)); + memcpy(db, (*field)->ptr + 1, db_length); + db[db_length]= 0; + /* name varchar 1 length uchar */ + field++; + name_length= *(uint8*)(*field)->ptr; + DBUG_ASSERT(name_length <= (*field)->field_length); + DBUG_ASSERT((*field)->field_length + 1 == sizeof(name)); + memcpy(name, (*field)->ptr + 1, name_length); + name[name_length]= 0; + /* slock fixed length */ + field++; + slock_length= (*field)->field_length; + DBUG_ASSERT((*field)->field_length == sizeof(slock_buf)); + memcpy(slock_buf, (*field)->ptr, slock_length); + /* query blob */ + field++; + { + Field_blob *field_blob= (Field_blob*)(*field); + uint blob_len= field_blob->get_length((*field)->ptr); + uchar *blob_ptr= 0; + field_blob->get_ptr(&blob_ptr); + DBUG_ASSERT(blob_len == 0 || blob_ptr != 0); + query_length= blob_len; + query= sql_strmake((char*) blob_ptr, blob_len); + } + /* node_id */ + field++; + node_id= (Uint32)((Field_long *)*field)->val_int(); + /* epoch */ + field++; + epoch= ((Field_long *)*field)->val_int(); + /* id */ + field++; + id= (Uint32)((Field_long *)*field)->val_int(); + /* version */ + field++; + version= (Uint32)((Field_long *)*field)->val_int(); + /* type */ + field++; + type= (Uint32)((Field_long *)*field)->val_int(); + /* free blobs buffer */ + my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); + dbug_tmp_restore_column_map(table->read_set, old_map); + } + + public: uchar db_length; char db[64]; uchar name_length; char name[64]; uchar slock_length; - uint32 slock[SCHEMA_SLOCK_SIZE/4]; + uint32 slock_buf[SCHEMA_SLOCK_SIZE/4]; + MY_BITMAP slock; unsigned short query_length; char *query; Uint64 epoch; @@ -2223,99 +2283,52 @@ class Ndb_schema_event_handler { uint32 version; uint32 type; uint32 any_value; - }; + /** + Create a Ndb_schema_op from event_data + */ + static Ndb_schema_op* + create(const Ndb_event_data* event_data, + Uint32 any_value) + { + DBUG_ENTER("Ndb_schema_op::create"); + Ndb_schema_op* schema_op= + (Ndb_schema_op*)sql_alloc(sizeof(Ndb_schema_op)); + bitmap_init(&schema_op->slock, + schema_op->slock_buf, 8*SCHEMA_SLOCK_SIZE, FALSE); + schema_op->unpack_event(event_data); + schema_op->any_value= any_value; + DBUG_PRINT("exit", ("%s.%s: query: '%s' type: %d", + schema_op->db, schema_op->name, + schema_op->query, + schema_op->type)); + DBUG_RETURN(schema_op); + } + }; static void print_could_not_discover_error(THD *thd, - const Cluster_schema *schema) + const Ndb_schema_op *schema) { sql_print_error("NDB Binlog: Could not discover table '%s.%s' from " "binlog schema event '%s' from node %d. " "my_errno: %d", schema->db, schema->name, schema->query, schema->node_id, my_errno); - print_warning_list("NDB Binlog", thd_warn_list(thd)); + thd_print_warning_list(thd, "NDB Binlog"); } - /* - Transfer schema table event data into Cluster_schema struct - */ - static void ndbcluster_get_schema(const Ndb_event_data *event_data, - Cluster_schema *s) + static void + write_schema_op_to_binlog(THD *thd, Ndb_schema_op *schema) { - TABLE *table= event_data->shadow_table; - Field **field; - /* unpack blob values */ - uchar* blobs_buffer= 0; - uint blobs_buffer_size= 0; - my_bitmap_map *old_map= dbug_tmp_use_all_columns(table, table->read_set); - { - ptrdiff_t ptrdiff= 0; - int ret= get_ndb_blobs_value(table, event_data->ndb_value[0], - blobs_buffer, blobs_buffer_size, - ptrdiff); - if (ret != 0) - { - my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); - DBUG_PRINT("info", ("blob read error")); - DBUG_ASSERT(FALSE); - } - } - /* db varchar 1 length uchar */ - field= table->field; - s->db_length= *(uint8*)(*field)->ptr; - DBUG_ASSERT(s->db_length <= (*field)->field_length); - DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->db)); - memcpy(s->db, (*field)->ptr + 1, s->db_length); - s->db[s->db_length]= 0; - /* name varchar 1 length uchar */ - field++; - s->name_length= *(uint8*)(*field)->ptr; - DBUG_ASSERT(s->name_length <= (*field)->field_length); - DBUG_ASSERT((*field)->field_length + 1 == sizeof(s->name)); - memcpy(s->name, (*field)->ptr + 1, s->name_length); - s->name[s->name_length]= 0; - /* slock fixed length */ - field++; - s->slock_length= (*field)->field_length; - DBUG_ASSERT((*field)->field_length == sizeof(s->slock)); - memcpy(s->slock, (*field)->ptr, s->slock_length); - /* query blob */ - field++; - { - Field_blob *field_blob= (Field_blob*)(*field); - uint blob_len= field_blob->get_length((*field)->ptr); - uchar *blob_ptr= 0; - field_blob->get_ptr(&blob_ptr); - DBUG_ASSERT(blob_len == 0 || blob_ptr != 0); - s->query_length= blob_len; - s->query= sql_strmake((char*) blob_ptr, blob_len); - } - /* node_id */ - field++; - s->node_id= (Uint32)((Field_long *)*field)->val_int(); - /* epoch */ - field++; - s->epoch= ((Field_long *)*field)->val_int(); - /* id */ - field++; - s->id= (Uint32)((Field_long *)*field)->val_int(); - /* version */ - field++; - s->version= (Uint32)((Field_long *)*field)->val_int(); - /* type */ - field++; - s->type= (Uint32)((Field_long *)*field)->val_int(); - /* free blobs buffer */ - my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); - dbug_tmp_restore_column_map(table->read_set, old_map); - } + if (!ndb_binlog_running) + { + // This mysqld is not writing a binlog + return; + } - static void ndb_binlog_query(THD *thd, Cluster_schema *schema) - { /* any_value == 0 means local cluster sourced change that * should be logged */ @@ -2379,24 +2392,23 @@ class Ndb_schema_event_handler { /* - acknowledge handling of schema operation + Acknowledge handling of schema operation + - Inform the other nodes that schema op has + been completed by this node (by updating the + row for this op in ndb_schema table) */ - static int - ndbcluster_update_slock(THD *thd, - const char *db, - const char *table_name, - uint32 table_id, - uint32 table_version) + int + ack_schema_op(const char *db, const char *table_name, + uint32 table_id, uint32 table_version) { - DBUG_ENTER("ndbcluster_update_slock"); + DBUG_ENTER("ack_schema_op"); if (!ndb_schema_share) { DBUG_RETURN(0); } const NdbError *ndb_error= 0; - uint32 node_id= g_ndb_cluster_connection->node_id(); - Ndb *ndb= check_ndb_in_thd(thd); + Ndb *ndb= check_ndb_in_thd(m_thd); char save_db[FN_HEADLEN]; strcpy(save_db, ndb->getDatabaseName()); @@ -2469,7 +2481,7 @@ class Ndb_schema_event_handler { { uint32 copy[SCHEMA_SLOCK_SIZE/4]; memcpy(copy, bitbuf, sizeof(copy)); - bitmap_clear_bit(&slock, node_id); + bitmap_clear_bit(&slock, own_nodeid()); sql_print_information("NDB: reply to %s.%s(%u/%u) from %x%x to %x%x", db, table_name, table_id, table_version, @@ -2479,7 +2491,7 @@ class Ndb_schema_event_handler { } else { - bitmap_clear_bit(&slock, node_id); + bitmap_clear_bit(&slock, own_nodeid()); } { @@ -2505,7 +2517,7 @@ class Ndb_schema_event_handler { r|= op->setValue(SCHEMA_SLOCK_I, (char*)slock.bitmap); DBUG_ASSERT(r == 0); /* node_id */ - r|= op->setValue(SCHEMA_NODE_ID_I, node_id); + r|= op->setValue(SCHEMA_NODE_ID_I, own_nodeid()); DBUG_ASSERT(r == 0); /* type */ r|= op->setValue(SCHEMA_TYPE_I, (uint32)SOT_CLEAR_SLOCK); @@ -2515,14 +2527,15 @@ class Ndb_schema_event_handler { NdbOperation::DefaultAbortOption, 1 /*force send*/) == 0) { DBUG_PRINT("info", ("node %d cleared lock on '%s.%s'", - node_id, db, table_name)); + own_nodeid(), db, table_name)); dict->forceGCPWait(1); break; } err: const NdbError *this_error= trans ? &trans->getNdbError() : &ndb->getNdbError(); - if (this_error->status == NdbError::TemporaryError && !thd->killed) + if (this_error->status == NdbError::TemporaryError && + !thd_killed(m_thd)) { if (retries--) { @@ -2538,12 +2551,10 @@ class Ndb_schema_event_handler { if (ndb_error) { - char buf[1024]; - my_snprintf(buf, sizeof(buf), "Could not release lock on '%s.%s'", - db, table_name); - push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN, - ER_GET_ERRMSG, ER(ER_GET_ERRMSG), - ndb_error->code, ndb_error->message, buf); + sql_print_warning("NDB: Could not release slock on '%s.%s', " + "Error code: %d Message: %s", + db, table_name, + ndb_error->code, ndb_error->message); } if (trans) ndb->closeTransaction(trans); @@ -2552,43 +2563,100 @@ class Ndb_schema_event_handler { } -static int -handle_schema_event(THD *thd, Ndb *s_ndb, - NdbEventOperation *pOp, - List *post_epoch_log_list, - List *post_epoch_unlock_list, - MEM_ROOT *mem_root) -{ - DBUG_ENTER("handle_schema_event"); - Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData(); - NDB_SHARE *tmp_share= event_data->share; - if (tmp_share && ndb_schema_share == tmp_share) + bool check_is_ndb_schema_event(const Ndb_event_data* event_data) const + { + if (!event_data) + { + // Received event without event data pointer + assert(false); + return false; + } + + NDB_SHARE *share= event_data->share; + if (!share) + { + // Received event where the event_data is not properly initialized + assert(false); + return false; + } + assert(event_data->shadow_table); + assert(event_data->ndb_value[0]); + assert(event_data->ndb_value[1]); + + pthread_mutex_lock(&ndb_schema_share_mutex); + if (share != ndb_schema_share) + { + // Received event from s_ndb not pointing at the ndb_schema_share + pthread_mutex_unlock(&ndb_schema_share_mutex); + assert(false); + return false; + } + assert(!strncmp(share->db, STRING_WITH_LEN(NDB_REP_DB))); + assert(!strncmp(share->table_name, STRING_WITH_LEN(NDB_SCHEMA_TABLE))); + pthread_mutex_unlock(&ndb_schema_share_mutex); + return true; + } + + + void + log_after_epoch(Ndb_schema_op* schema) + { + DBUG_ENTER("log_after_epoch"); + m_post_epoch_log_list.push_back(schema, m_mem_root); + DBUG_VOID_RETURN; + } + + + void + unlock_after_epoch(Ndb_schema_op* schema) + { + DBUG_ENTER("unlock_after_epoch"); + m_post_epoch_unlock_list.push_back(schema, m_mem_root); + DBUG_VOID_RETURN; + } + + + uint own_nodeid(void) const + { + return m_own_nodeid; + } + + + bool + check_if_local_tables_in_db(const char *dbname) const { - NDBEVENT::TableEvent ev_type= pOp->getEventType(); - DBUG_PRINT("enter", ("%s.%s ev_type: %d", - tmp_share->db, tmp_share->table_name, ev_type)); - if (ev_type == NDBEVENT::TE_UPDATE || - ev_type == NDBEVENT::TE_INSERT) - { - Thd_ndb *thd_ndb= get_thd_ndb(thd); - Ndb *ndb= thd_ndb->ndb; - NDBDICT *dict= ndb->getDictionary(); - Thd_ndb_options_guard thd_ndb_options(thd_ndb); - Cluster_schema *schema= (Cluster_schema *) - sql_alloc(sizeof(Cluster_schema)); - MY_BITMAP slock; - bitmap_init(&slock, schema->slock, 8*SCHEMA_SLOCK_SIZE, FALSE); - uint node_id= g_ndb_cluster_connection->node_id(); - { - ndbcluster_get_schema(event_data, schema); - schema->any_value= pOp->getAnyValue(); - } - enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type; - DBUG_PRINT("info", - ("%s.%s: log query_length: %d query: '%s' type: %d", - schema->db, schema->name, - schema->query_length, schema->query, - schema_type)); + DBUG_ENTER("check_if_local_tables_in_db"); + DBUG_PRINT("info", ("Looking for files in directory %s", dbname)); + List files; + char path[FN_REFLEN + 1]; + + build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0); + if (find_files(m_thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK) + { + m_thd->clear_error(); + DBUG_PRINT("info", ("Failed to find files")); + DBUG_RETURN(true); + } + DBUG_PRINT("info",("found: %d files", files.elements)); + + LEX_STRING *tabname; + while ((tabname= files.pop())) + { + DBUG_PRINT("info", ("Found table %s", tabname->str)); + if (ndbcluster_check_if_local_table(dbname, tabname->str)) + DBUG_RETURN(true); + } + + DBUG_RETURN(false); + } + + + int + handle_schema_op(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_schema_op"); + { + const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type; if (opt_ndb_extra_logging > 19) { @@ -2599,7 +2667,8 @@ handle_schema_event(THD *thd, Ndb *s_ndb get_schema_type_name(schema_type), schema_type, schema->node_id, - slock.bitmap[0], slock.bitmap[1]); + schema->slock.bitmap[0], + schema->slock.bitmap[1]); } if ((schema->db[0] == 0) && (schema->name[0] == 0)) @@ -2611,6 +2680,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb */ DBUG_RETURN(0); } + switch (schema_type) { case SOT_CLEAR_SLOCK: @@ -2619,24 +2689,29 @@ handle_schema_event(THD *thd, Ndb *s_ndb schema events get inserted in the binlog after any data events */ - post_epoch_log_list->push_back(schema, mem_root); + log_after_epoch(schema); DBUG_RETURN(0); case SOT_ALTER_TABLE_COMMIT: case SOT_RENAME_TABLE_PREPARE: case SOT_ONLINE_ALTER_TABLE_PREPARE: case SOT_ONLINE_ALTER_TABLE_COMMIT: - post_epoch_log_list->push_back(schema, mem_root); - post_epoch_unlock_list->push_back(schema, mem_root); + log_after_epoch(schema); + unlock_after_epoch(schema); DBUG_RETURN(0); default: break; } - if (schema->node_id != node_id) + if (schema->node_id != own_nodeid()) { - int log_query= 0, post_epoch_unlock= 0; + THD* thd= m_thd; // Code compatibility + Thd_ndb *thd_ndb= get_thd_ndb(thd); + Ndb *ndb= thd_ndb->ndb; + Thd_ndb_options_guard thd_ndb_options(thd_ndb); + + int post_epoch_unlock= 0; switch (schema_type) { @@ -2652,7 +2727,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb schema->query + schema->query_length, no_print_error); /* binlog dropping table after any table operations */ - post_epoch_log_list->push_back(schema, mem_root); + log_after_epoch(schema); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } @@ -2667,7 +2742,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb (schema_type == SOT_DROP_TABLE ? "dropp" : "renam"), schema->db, schema->name, schema->query, schema->node_id); - log_query= 1; + write_schema_op_to_binlog(thd, schema); } // Fall through case SOT_TRUNCATE_TABLE: @@ -2687,7 +2762,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb { { ndb->setDatabaseName(schema->db); - Ndb_table_guard ndbtab_g(dict, schema->name); + Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name); ndbtab_g.invalidate(); } TABLE_LIST table_list; @@ -2722,20 +2797,20 @@ handle_schema_event(THD *thd, Ndb *s_ndb { print_could_not_discover_error(thd, schema); } - log_query= 1; + write_schema_op_to_binlog(thd, schema); break; case SOT_DROP_DB: /* Drop the database locally if it only contains ndb tables */ thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); - if (! ndbcluster_check_if_local_tables_in_db(thd, schema->db)) + if (!check_if_local_tables_in_db(schema->db)) { const int no_print_error[1]= {0}; run_query(thd, schema->query, schema->query + schema->query_length, no_print_error); /* binlog dropping database after any table operations */ - post_epoch_log_list->push_back(schema, mem_root); + log_after_epoch(schema); /* acknowledge this query _after_ epoch completion */ post_epoch_unlock= 1; } @@ -2746,7 +2821,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb "binlog schema event '%s' from node %d. ", schema->db, schema->query, schema->node_id); - log_query= 1; + write_schema_op_to_binlog(thd, schema); } break; @@ -2758,7 +2833,7 @@ handle_schema_event(THD *thd, Ndb *s_ndb run_query(thd, schema->query, schema->query + schema->query_length, no_print_error); - log_query= 1; + write_schema_op_to_binlog(thd, schema); break; } @@ -2779,13 +2854,13 @@ handle_schema_event(THD *thd, Ndb *s_ndb run_query(thd, cmd, cmd + strlen(cmd), no_print_error); - log_query= 1; + write_schema_op_to_binlog(thd, schema); break; } case SOT_TABLESPACE: case SOT_LOGFILE_GROUP: - log_query= 1; + write_schema_op_to_binlog(thd, schema); break; case SOT_ALTER_TABLE_COMMIT: @@ -2799,159 +2874,44 @@ handle_schema_event(THD *thd, Ndb *s_ndb break; } - if (log_query && ndb_binlog_running) - ndb_binlog_query(thd, schema); + /* signal that schema operation has been handled */ - DBUG_DUMP("slock", (uchar*) schema->slock, schema->slock_length); - if (bitmap_is_set(&slock, node_id)) + DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length); + if (bitmap_is_set(&schema->slock, own_nodeid())) { if (post_epoch_unlock) - post_epoch_unlock_list->push_back(schema, mem_root); + unlock_after_epoch(schema); else - ndbcluster_update_slock(thd, schema->db, schema->name, - schema->id, schema->version); + ack_schema_op(schema->db, schema->name, + schema->id, schema->version); } } - DBUG_RETURN(0); - } - /* - the normal case of UPDATE/INSERT has already been handled - */ - switch (ev_type) - { - case NDBEVENT::TE_DELETE: - // skip - break; - case NDBEVENT::TE_CLUSTER_FAILURE: - if (opt_ndb_extra_logging) - sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.", - ndb_schema_share->key, - (uint)(pOp->getGCI() >> 32), - (uint)(pOp->getGCI())); - // fall through - case NDBEVENT::TE_DROP: - if (opt_ndb_extra_logging && - ndb_binlog_tables_inited && ndb_binlog_running) - sql_print_information("NDB Binlog: ndb tables initially " - "read only on reconnect."); - - /* begin protect ndb_schema_share */ - pthread_mutex_lock(&ndb_schema_share_mutex); - /* ndb_share reference binlog extra free */ - DBUG_PRINT("NDB_SHARE", ("%s binlog extra free use_count: %u", - ndb_schema_share->key, - ndb_schema_share->use_count)); - free_share(&ndb_schema_share); - ndb_schema_share= 0; - ndb_binlog_tables_inited= FALSE; - ndb_binlog_is_ready= FALSE; - pthread_mutex_unlock(&ndb_schema_share_mutex); - /* end protect ndb_schema_share */ - - close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE); - // fall through - case NDBEVENT::TE_ALTER: - ndb_handle_schema_change(thd, s_ndb, pOp, event_data); - break; - case NDBEVENT::TE_NODE_FAILURE: - { - uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; - DBUG_ASSERT(node_id != 0xFF); - pthread_mutex_lock(&tmp_share->mutex); - bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]); - DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id)); - if (opt_ndb_extra_logging) - { - sql_print_information("NDB Binlog: Node: %d, down," - " Subscriber bitmask %x%x", - pOp->getNdbdNodeId(), - tmp_share->subscriber_bitmap[node_id].bitmap[1], - tmp_share->subscriber_bitmap[node_id].bitmap[0]); - } - pthread_mutex_unlock(&tmp_share->mutex); - (void) pthread_cond_signal(&injector_cond); - break; - } - case NDBEVENT::TE_SUBSCRIBE: - { - uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; - uint8 req_id= pOp->getReqNodeId(); - DBUG_ASSERT(req_id != 0 && node_id != 0xFF); - pthread_mutex_lock(&tmp_share->mutex); - bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id); - DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id)); - if (opt_ndb_extra_logging) - { - sql_print_information("NDB Binlog: Node: %d, subscribe from node %d," - " Subscriber bitmask %x%x", - pOp->getNdbdNodeId(), - req_id, - tmp_share->subscriber_bitmap[node_id].bitmap[1], - tmp_share->subscriber_bitmap[node_id].bitmap[0]); - } - pthread_mutex_unlock(&tmp_share->mutex); - (void) pthread_cond_signal(&injector_cond); - break; - } - case NDBEVENT::TE_UNSUBSCRIBE: - { - uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; - uint8 req_id= pOp->getReqNodeId(); - DBUG_ASSERT(req_id != 0 && node_id != 0xFF); - pthread_mutex_lock(&tmp_share->mutex); - bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id); - DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id)); - if (opt_ndb_extra_logging) - { - sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d," - " Subscriber bitmask %x%x", - pOp->getNdbdNodeId(), - req_id, - tmp_share->subscriber_bitmap[node_id].bitmap[1], - tmp_share->subscriber_bitmap[node_id].bitmap[0]); - } - pthread_mutex_unlock(&tmp_share->mutex); - (void) pthread_cond_signal(&injector_cond); - break; - } - default: - sql_print_error("NDB Binlog: unknown non data event %d for %s. " - "Ignoring...", (unsigned) ev_type, tmp_share->key); } + DBUG_RETURN(0); } - DBUG_RETURN(0); -} - -/* - process any operations that should be done after - the epoch is complete -*/ -static void -handle_schema_log_post_epoch(THD *thd, - List *log_list) -{ - DBUG_ENTER("handle_schema_log_post_epoch"); - Thd_ndb *thd_ndb= get_thd_ndb(thd); - Ndb *ndb= thd_ndb->ndb; - NDBDICT *dict= ndb->getDictionary(); - - Cluster_schema *schema; - while ((schema= log_list->pop())) + void + handle_schema_op_post_epoch(Ndb_schema_op* schema) { + DBUG_ENTER("handle_schema_op_post_epoch"); + THD* thd = m_thd; // Code compatibility + Thd_ndb *thd_ndb= get_thd_ndb(thd); + Ndb *ndb= thd_ndb->ndb; + NDBDICT *dict= ndb->getDictionary(); Thd_ndb_options_guard thd_ndb_options(thd_ndb); DBUG_PRINT("info", ("%s.%s: log query_length: %d query: '%s' type: %d", schema->db, schema->name, schema->query_length, schema->query, schema->type)); - int log_query= 0; + { - enum SCHEMA_OP_TYPE schema_type= (enum SCHEMA_OP_TYPE)schema->type; + const SCHEMA_OP_TYPE schema_type= (SCHEMA_OP_TYPE)schema->type; char key[FN_REFLEN + 1]; build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0); if (schema_type == SOT_CLEAR_SLOCK) { + /* Ack to any SQL thread waiting for schema op to complete */ pthread_mutex_lock(&ndbcluster_mutex); NDB_SCHEMA_OBJECT *ndb_schema_object= (NDB_SCHEMA_OBJECT*) my_hash_search(&ndb_schema_objects, @@ -2968,10 +2928,10 @@ handle_schema_log_post_epoch(THD *thd, key, schema->id, schema->version, ndb_schema_object->slock[0], ndb_schema_object->slock[1], - schema->slock[0], - schema->slock[1]); + schema->slock_buf[0], + schema->slock_buf[1]); } - memcpy(ndb_schema_object->slock, schema->slock, + memcpy(ndb_schema_object->slock, schema->slock_buf, sizeof(ndb_schema_object->slock)); DBUG_DUMP("ndb_schema_object->slock_bitmap.bitmap", (uchar*)ndb_schema_object->slock_bitmap.bitmap, @@ -2998,8 +2958,15 @@ handle_schema_log_post_epoch(THD *thd, } } pthread_mutex_unlock(&ndbcluster_mutex); - continue; + DBUG_VOID_RETURN; } + + if (opt_ndb_extra_logging > 9) + sql_print_information("%s - %s.%s", + get_schema_type_name(schema_type), + schema->db ? schema->db : "(null)", + schema->name ? schema->name : "(null)"); + /* ndb_share reference temporary, free below */ NDB_SHARE *share= get_share(key, 0, FALSE, FALSE); if (share) @@ -3010,12 +2977,11 @@ handle_schema_log_post_epoch(THD *thd, switch (schema_type) { case SOT_DROP_DB: - log_query= 1; + write_schema_op_to_binlog(thd, schema); break; + case SOT_DROP_TABLE: - if (opt_ndb_extra_logging > 9) - sql_print_information("SOT_DROP_TABLE %s.%s", schema->db, schema->name); - log_query= 1; + write_schema_op_to_binlog(thd, schema); { ndb->setDatabaseName(schema->db); Ndb_table_guard ndbtab_g(dict, schema->name); @@ -3029,29 +2995,25 @@ handle_schema_log_post_epoch(THD *thd, close_cached_tables(thd, &table_list, FALSE, FALSE, FALSE); } break; + case SOT_RENAME_TABLE: - if (opt_ndb_extra_logging > 9) - sql_print_information("SOT_RENAME_TABLE %s.%s", schema->db, schema->name); - log_query= 1; + write_schema_op_to_binlog(thd, schema); if (share) { ndbcluster_rename_share(thd, share); } break; + case SOT_RENAME_TABLE_PREPARE: - if (opt_ndb_extra_logging > 9) - sql_print_information("SOT_RENAME_TABLE_PREPARE %s.%s -> %s", - schema->db, schema->name, schema->query); if (share && schema->node_id != g_ndb_cluster_connection->node_id()) ndbcluster_prepare_rename_share(share, schema->query); break; + case SOT_ALTER_TABLE_COMMIT: - if (opt_ndb_extra_logging > 9) - sql_print_information("SOT_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name); if (schema->node_id == g_ndb_cluster_connection->node_id()) break; - log_query= 1; + write_schema_op_to_binlog(thd, schema); { ndb->setDatabaseName(schema->db); Ndb_table_guard ndbtab_g(dict, schema->name); @@ -3112,8 +3074,6 @@ handle_schema_log_post_epoch(THD *thd, case SOT_ONLINE_ALTER_TABLE_PREPARE: { - if (opt_ndb_extra_logging > 9) - sql_print_information("SOT_ONLINE_ALTER_TABLE_PREPARE %s.%s", schema->db, schema->name); int error= 0; ndb->setDatabaseName(schema->db); { @@ -3141,7 +3101,7 @@ handle_schema_log_post_epoch(THD *thd, DBUG_PRINT("info", ("Detected frm change of table %s.%s", schema->db, schema->name)); - log_query= 1; + write_schema_op_to_binlog(thd, schema); build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0); /* If the there is no local table shadowing the altered table and @@ -3220,10 +3180,9 @@ handle_schema_log_post_epoch(THD *thd, } break; } + case SOT_ONLINE_ALTER_TABLE_COMMIT: { - if (opt_ndb_extra_logging > 9) - sql_print_information("SOT_ONLINE_ALTER_TABLE_COMMIT %s.%s", schema->db, schema->name); if (share) { pthread_mutex_lock(&share->mutex); @@ -3245,10 +3204,9 @@ handle_schema_log_post_epoch(THD *thd, } break; } + case SOT_RENAME_TABLE_NEW: - if (opt_ndb_extra_logging > 9) - sql_print_information("SOT_RENAME_TABLE_NEW %s.%s", schema->db, schema->name); - log_query= 1; + write_schema_op_to_binlog(thd, schema); if (ndb_binlog_running && (!share || !share->op)) { /* @@ -3279,9 +3237,11 @@ handle_schema_log_post_epoch(THD *thd, } } break; + default: DBUG_ASSERT(FALSE); } + if (share) { /* ndb_share reference temporary free */ @@ -3291,40 +3251,55 @@ handle_schema_log_post_epoch(THD *thd, share= 0; } } - if (ndb_binlog_running && log_query) - ndb_binlog_query(thd, schema); + + DBUG_VOID_RETURN; } - DBUG_VOID_RETURN; -} + /* + process any operations that should be done after + the epoch is complete + */ + void + handle_schema_log_post_epoch(List *log_list) + { + DBUG_ENTER("handle_schema_log_post_epoch"); + + Ndb_schema_op* schema; + while ((schema= log_list->pop())) + { + handle_schema_op_post_epoch(schema); + } + DBUG_VOID_RETURN; + } -static void -handle_schema_unlock_post_epoch(THD *thd, - List *unlock_list) -{ - DBUG_ENTER("handle_schema_unlock_post_epoch"); - Cluster_schema *schema; - while ((schema= unlock_list->pop())) + void + handle_schema_unlock_post_epoch(List *unlock_list) { - ndbcluster_update_slock(thd, schema->db, schema->name, - schema->id, schema->version); + DBUG_ENTER("handle_schema_unlock_post_epoch"); + + Ndb_schema_op *schema; + while ((schema= unlock_list->pop())) + { + ack_schema_op(schema->db, schema->name, + schema->id, schema->version); + } + DBUG_VOID_RETURN; } - DBUG_VOID_RETURN; -} THD* m_thd; MEM_ROOT* m_mem_root; + uint m_own_nodeid; - List m_post_epoch_log_list; - List m_post_epoch_unlock_list; + List m_post_epoch_log_list; + List m_post_epoch_unlock_list; public: Ndb_schema_event_handler(); // Not implemented Ndb_schema_event_handler(const Ndb_schema_event_handler&); // Not implemented - Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root): - m_thd(thd), m_mem_root(mem_root) + Ndb_schema_event_handler(THD* thd, MEM_ROOT* mem_root, uint own_nodeid): + m_thd(thd), m_mem_root(mem_root), m_own_nodeid(own_nodeid) { } @@ -3337,19 +3312,148 @@ public: void handle_event(Ndb* s_ndb, NdbEventOperation *pOp) { - handle_schema_event(m_thd, s_ndb, pOp, - &m_post_epoch_log_list, - &m_post_epoch_unlock_list, - m_mem_root); + DBUG_ENTER("handle_event"); + + const Ndb_event_data *event_data= + static_cast(pOp->getCustomData()); + + if (!check_is_ndb_schema_event(event_data)) + DBUG_VOID_RETURN; + + const NDBEVENT::TableEvent ev_type= pOp->getEventType(); + switch (ev_type) + { + case NDBEVENT::TE_INSERT: + case NDBEVENT::TE_UPDATE: + { + /* ndb_schema table, row INSERTed or UPDATEed*/ + Ndb_schema_op* schema_op= + Ndb_schema_op::create(event_data, pOp->getAnyValue()); + handle_schema_op(schema_op); + break; + } + + case NDBEVENT::TE_DELETE: + /* ndb_schema table, row DELETEd */ + break; + + case NDBEVENT::TE_CLUSTER_FAILURE: + if (opt_ndb_extra_logging) + sql_print_information("NDB Binlog: cluster failure for %s at epoch %u/%u.", + ndb_schema_share->key, + (uint)(pOp->getGCI() >> 32), + (uint)(pOp->getGCI())); + // fall through + case NDBEVENT::TE_DROP: + /* ndb_schema table DROPped */ + if (opt_ndb_extra_logging && + ndb_binlog_tables_inited && ndb_binlog_running) + sql_print_information("NDB Binlog: ndb tables initially " + "read only on reconnect."); + + /* release the ndb_schema_share */ + pthread_mutex_lock(&ndb_schema_share_mutex); + free_share(&ndb_schema_share); + ndb_schema_share= 0; + ndb_binlog_tables_inited= FALSE; + ndb_binlog_is_ready= FALSE; + pthread_mutex_unlock(&ndb_schema_share_mutex); + + close_cached_tables(NULL, NULL, FALSE, FALSE, FALSE); + // fall through + case NDBEVENT::TE_ALTER: + /* ndb_schema table ALTERed */ + ndb_handle_schema_change(m_thd, s_ndb, pOp, event_data); + break; + + case NDBEVENT::TE_NODE_FAILURE: + { + /* Remove all subscribers for node from bitmap in ndb_schema_share */ + NDB_SHARE *tmp_share= event_data->share; + uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; + DBUG_ASSERT(node_id != 0xFF); + pthread_mutex_lock(&tmp_share->mutex); + bitmap_clear_all(&tmp_share->subscriber_bitmap[node_id]); + DBUG_PRINT("info",("NODE_FAILURE UNSUBSCRIBE[%d]", node_id)); + if (opt_ndb_extra_logging) + { + sql_print_information("NDB Binlog: Node: %d, down," + " Subscriber bitmask %x%x", + pOp->getNdbdNodeId(), + tmp_share->subscriber_bitmap[node_id].bitmap[1], + tmp_share->subscriber_bitmap[node_id].bitmap[0]); + } + pthread_mutex_unlock(&tmp_share->mutex); + (void) pthread_cond_signal(&injector_cond); + break; + } + + case NDBEVENT::TE_SUBSCRIBE: + { + /* Add node as subscriber from bitmap in ndb_schema_share */ + NDB_SHARE *tmp_share= event_data->share; + uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; + uint8 req_id= pOp->getReqNodeId(); + DBUG_ASSERT(req_id != 0 && node_id != 0xFF); + pthread_mutex_lock(&tmp_share->mutex); + bitmap_set_bit(&tmp_share->subscriber_bitmap[node_id], req_id); + DBUG_PRINT("info",("SUBSCRIBE[%d] %d", node_id, req_id)); + if (opt_ndb_extra_logging) + { + sql_print_information("NDB Binlog: Node: %d, subscribe from node %d," + " Subscriber bitmask %x%x", + pOp->getNdbdNodeId(), + req_id, + tmp_share->subscriber_bitmap[node_id].bitmap[1], + tmp_share->subscriber_bitmap[node_id].bitmap[0]); + } + pthread_mutex_unlock(&tmp_share->mutex); + (void) pthread_cond_signal(&injector_cond); + break; + } + + case NDBEVENT::TE_UNSUBSCRIBE: + { + /* Remove node as subscriber from bitmap in ndb_schema_share */ + NDB_SHARE *tmp_share= event_data->share; + uint8 node_id= g_node_id_map[pOp->getNdbdNodeId()]; + uint8 req_id= pOp->getReqNodeId(); + DBUG_ASSERT(req_id != 0 && node_id != 0xFF); + pthread_mutex_lock(&tmp_share->mutex); + bitmap_clear_bit(&tmp_share->subscriber_bitmap[node_id], req_id); + DBUG_PRINT("info",("UNSUBSCRIBE[%d] %d", node_id, req_id)); + if (opt_ndb_extra_logging) + { + sql_print_information("NDB Binlog: Node: %d, unsubscribe from node %d," + " Subscriber bitmask %x%x", + pOp->getNdbdNodeId(), + req_id, + tmp_share->subscriber_bitmap[node_id].bitmap[1], + tmp_share->subscriber_bitmap[node_id].bitmap[0]); + } + pthread_mutex_unlock(&tmp_share->mutex); + (void) pthread_cond_signal(&injector_cond); + break; + } + + default: + { + NDB_SHARE *tmp_share= event_data->share; + sql_print_error("NDB Binlog: unknown non data event %d for %s. " + "Ignoring...", (unsigned) ev_type, tmp_share->key); + } + } + + DBUG_VOID_RETURN; } void post_epoch() { if (m_post_epoch_log_list.elements > 0) { - handle_schema_log_post_epoch(m_thd, &m_post_epoch_log_list); + handle_schema_log_post_epoch(&m_post_epoch_log_list); // NOTE post_epoch_unlock_list may not be handled! - handle_schema_unlock_post_epoch(m_thd, &m_post_epoch_unlock_list); + handle_schema_unlock_post_epoch(&m_post_epoch_unlock_list); } // There should be no work left todo... DBUG_ASSERT(m_post_epoch_log_list.elements == 0); @@ -4784,7 +4888,7 @@ err: *conflict_fn_spec= NULL; if (ndberror.code && opt_ndb_extra_logging) - print_warning_list("NDB", thd_warn_list(thd)); + thd_print_warning_list(thd, "NDB"); DBUG_RETURN(ndberror.code); } @@ -4990,32 +5094,6 @@ ndbcluster_check_if_local_table(const ch DBUG_RETURN(false); } -bool -ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname) -{ - DBUG_ENTER("ndbcluster_check_if_local_tables_in_db"); - DBUG_PRINT("info", ("Looking for files in directory %s", dbname)); - LEX_STRING *tabname; - List files; - char path[FN_REFLEN + 1]; - - build_table_filename(path, sizeof(path) - 1, dbname, "", "", 0); - if (find_files(thd, &files, dbname, path, NullS, 0) != FIND_FILES_OK) - { - thd->clear_error(); - DBUG_PRINT("info", ("Failed to find files")); - DBUG_RETURN(true); - } - DBUG_PRINT("info",("found: %d files", files.elements)); - while ((tabname= files.pop())) - { - DBUG_PRINT("info", ("Found table %s", tabname->str)); - if (ndbcluster_check_if_local_table(dbname, tabname->str)) - DBUG_RETURN(true); - } - - DBUG_RETURN(false); -} /* Common function for setting up everything for logging a table at @@ -5930,7 +6008,8 @@ handle_non_data_event(THD *thd, NdbEventOperation *pOp, ndb_binlog_index_row &row) { - Ndb_event_data *event_data= (Ndb_event_data *) pOp->getCustomData(); + const Ndb_event_data* event_data= + static_cast(pOp->getCustomData()); NDB_SHARE *share= event_data->share; NDBEVENT::TableEvent type= pOp->getEventType(); @@ -6939,7 +7018,9 @@ restart_cluster_failure: // The Ndb_schema_event_handler does not necessarily need // to use the same memroot(or vice versa) - Ndb_schema_event_handler schema_event_handler(thd, &mem_root); + Ndb_schema_event_handler + schema_event_handler(thd, &mem_root, + g_ndb_cluster_connection->node_id()); *root_ptr= &mem_root; === modified file 'sql/ha_ndbcluster_binlog.h' --- a/sql/ha_ndbcluster_binlog.h 2011-10-27 09:34:06 +0000 +++ b/sql/ha_ndbcluster_binlog.h 2011-10-29 14:21:55 +0000 @@ -31,35 +31,6 @@ typedef NdbDictionary::Event NDBEVENT; extern handlerton *ndbcluster_hton; -class Ndb_event_data -{ -public: - Ndb_event_data(NDB_SHARE *the_share) : - shadow_table(0), - share(the_share) - { - ndb_value[0]= 0; - ndb_value[1]= 0; - } - ~Ndb_event_data() - { - if (shadow_table) - closefrm(shadow_table, 1); - shadow_table= 0; - free_root(&mem_root, MYF(0)); - share= 0; - /* - ndbvalue[] allocated with my_multi_malloc - so only first pointer should be freed - */ - my_free(ndb_value[0], MYF(MY_WME|MY_ALLOW_ZERO_PTR)); - } - MEM_ROOT mem_root; - TABLE *shadow_table; - NDB_SHARE *share; - NdbValue *ndb_value[2]; -}; - /* The numbers below must not change as they are passed between mysql servers, and if changed @@ -218,6 +189,3 @@ int cmp_frm(const NDBTAB *ndbtab, const */ bool ndbcluster_check_if_local_table(const char *dbname, const char *tabname); -bool -ndbcluster_check_if_local_tables_in_db(THD *thd, const char *dbname); - === modified file 'sql/ha_ndbcluster_glue.h' --- a/sql/ha_ndbcluster_glue.h 2011-10-05 07:16:59 +0000 +++ b/sql/ha_ndbcluster_glue.h 2011-10-29 09:18:21 +0000 @@ -129,40 +129,6 @@ Diagnostics_area* thd_stmt_da(THD* thd) #endif } -/* extract the list of warnings from THD */ -static inline -List& thd_warn_list(const THD * thd) -{ -#if MYSQL_VERSION_ID < 50500 - return const_cast(thd)->warn_list; -#else - /* "options" has moved to "variables.option_bits" */ - return thd->warning_info->warn_list(); -#endif -} - -static inline -const char* MYSQL_ERROR_get_message_text(const MYSQL_ERROR* err) -{ -#if MYSQL_VERSION_ID < 50500 - return err->msg; -#else - /* "msg" is gone, use accessor */ - return err->get_message_text(); -#endif -} - -static inline -uint MYSQL_ERROR_get_sql_errno(const MYSQL_ERROR* err) -{ -#if MYSQL_VERSION_ID < 50500 - return err->code; -#else - /* "code" is gone, use accessor */ - return err->get_sql_errno(); -#endif -} - #if MYSQL_VERSION_ID < 50500 /* === added file 'sql/ndb_event_data.cc' --- a/sql/ndb_event_data.cc 1970-01-01 00:00:00 +0000 +++ b/sql/ndb_event_data.cc 2011-10-29 09:02:21 +0000 @@ -0,0 +1,44 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "ndb_event_data.h" + +#include + + +Ndb_event_data::Ndb_event_data(NDB_SHARE *the_share) : + shadow_table(NULL), + share(the_share) +{ + ndb_value[0]= NULL; + ndb_value[1]= NULL; +} + + +Ndb_event_data::~Ndb_event_data() +{ + if (shadow_table) + closefrm(shadow_table, 1); + shadow_table= NULL; + free_root(&mem_root, MYF(0)); + share= NULL; + /* + ndbvalue[] allocated with my_multi_malloc -> only + first pointer need to be freed + */ + my_free(ndb_value[0]); +} === added file 'sql/ndb_event_data.h' --- a/sql/ndb_event_data.h 1970-01-01 00:00:00 +0000 +++ b/sql/ndb_event_data.h 2011-10-29 09:02:21 +0000 @@ -0,0 +1,39 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef NDB_EVENT_DATA_H +#define NDB_EVENT_DATA_H + +#include // my_alloc.h +#include // MEM_ROOT + +class Ndb_event_data +{ +public: + Ndb_event_data(); // Not implemented + Ndb_event_data(const Ndb_event_data&); // Not implemented + Ndb_event_data(struct NDB_SHARE *the_share); + + ~Ndb_event_data(); + + MEM_ROOT mem_root; + struct TABLE *shadow_table; + struct NDB_SHARE *share; + union NdbValue *ndb_value[2]; +}; + +#endif === modified file 'sql/ndb_thd.cc' --- a/sql/ndb_thd.cc 2011-03-08 22:08:44 +0000 +++ b/sql/ndb_thd.cc 2011-10-29 09:18:21 +0000 @@ -43,3 +43,23 @@ Ndb* check_ndb_in_thd(THD* thd, bool val return thd_ndb->ndb; } + +#ifndef MYSQL_SERVER +#define MYSQL_SERVER +#endif + +#include + +void +thd_print_warning_list(THD* thd, const char* prefix) +{ + List_iterator_fast it(thd->warning_info->warn_list()); + MYSQL_ERROR *err; + while ((err= it++)) + { + sql_print_warning("%s: (%d)%s", + prefix, + err->get_sql_errno(), + err->get_message_text()); + } +} === modified file 'sql/ndb_thd.h' --- a/sql/ndb_thd.h 2011-03-08 12:59:56 +0000 +++ b/sql/ndb_thd.h 2011-10-29 09:18:21 +0000 @@ -51,4 +51,9 @@ thd_set_thd_ndb(THD *thd, class Thd_ndb /* Make sure THD has a Thd_ndb struct assigned */ class Ndb* check_ndb_in_thd(THD* thd, bool validate_ndb= false); + +/* Print thd's list of warnings to error log */ +void +thd_print_warning_list(THD* thd, const char* prefix); + #endif === modified file 'storage/ndb/CMakeLists.txt' --- a/storage/ndb/CMakeLists.txt 2011-10-27 09:34:06 +0000 +++ b/storage/ndb/CMakeLists.txt 2011-10-29 09:02:21 +0000 @@ -83,6 +83,7 @@ SET(NDBCLUSTER_SOURCES ../../sql/ndb_anyvalue.cc ../../sql/ndb_ndbapi_util.cc ../../sql/ndb_binlog_extra_row_info.cc + ../../sql/ndb_event_data.cc ) # Include directories used when building ha_ndbcluster No bundle (reason: useless for push emails).