From: magnus.blaudd Date: November 21 2011 8:19am Subject: bzr push into mysql-5.5-cluster branch (magnus.blaudd:3654 to 3657) List-Archive: http://lists.mysql.com/commits/142048 Message-Id: <201111210819.pAL8JLCP008140@acsmt358.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3657 magnus.blaudd@stripped 2011-11-21 [merge] Merge 7.1 -> 5.5-cluster 3656 magnus.blaudd@stripped 2011-11-21 [merge] Merge schema dist to 7.2 modified: sql/ha_ndbcluster.cc sql/ha_ndbcluster_binlog.cc sql/ndb_event_data.cc sql/ndb_event_data.h sql/ndb_schema_dist.cc sql/ndb_schema_dist.h sql/ndb_share.cc sql/ndb_share.h 3655 magnus.blaudd@stripped 2011-11-21 [merge] Merge modified: mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test sql/log_event.cc sql/mysqld.h sql/sys_vars.cc 3654 jonas oreland 2011-11-19 [merge] ndb - merge 71 to 72 modified: mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/t/ndb_index_stat.test storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp storage/ndb/src/kernel/blocks/trix/Trix.cpp === modified file 'mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result' --- a/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result 2011-11-15 14:25:58 +0000 +++ b/mysql-test/suite/rpl/r/rpl_row_basic_allow_batching.result 2011-11-18 09:46:57 +0000 @@ -3,22 +3,10 @@ include/master-slave.inc show variables like 'slave_allow_batching'; Variable_name Value slave_allow_batching OFF -Show that slave_allow_batching cannot be changed while slave is running -set global slave_allow_batching=ON; -ERROR HY000: This operation cannot be performed with a running slave; run STOP SLAVE first -show warnings; -Level Code Message -Error 1198 This operation cannot be performed with a running slave; run STOP SLAVE first -show variables like 'slave_allow_batching'; -Variable_name Value -slave_allow_batching OFF -Now stop slave and change it -stop slave; set global slave_allow_batching=ON; show variables like 'slave_allow_batching'; Variable_name Value slave_allow_batching ON -start slave; Now the normal test CREATE TABLE t1 (C1 CHAR(1), C2 CHAR(1), INDEX (C1)) ENGINE = 'INNODB' ; SELECT * FROM t1; === modified file 'mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test' --- a/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test 2011-11-15 14:25:58 +0000 +++ b/mysql-test/suite/rpl/t/rpl_row_basic_allow_batching.test 2011-11-18 09:46:57 +0000 @@ -4,18 +4,9 @@ --connection slave show variables like 'slave_allow_batching'; ---echo Show that slave_allow_batching cannot be changed while slave is running ---error ER_SLAVE_MUST_STOP set global slave_allow_batching=ON; -show warnings; show variables like 'slave_allow_batching'; ---echo Now stop slave and change it -stop slave; -set global slave_allow_batching=ON; -show variables like 'slave_allow_batching'; -start slave; - --echo Now the normal test --connection master === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2011-11-16 08:17:17 +0000 +++ b/sql/ha_ndbcluster.cc 2011-11-21 07:53:04 +0000 @@ -11007,6 +11007,7 @@ static void delete_table_drop_share(NDB_SHARE* share, const char * path) { + DBUG_ENTER("delete_table_drop_share"); if (share) { pthread_mutex_lock(&ndbcluster_mutex); @@ -11038,6 +11039,7 @@ do_drop: } pthread_mutex_unlock(&ndbcluster_mutex); } + DBUG_VOID_RETURN; } /* static version which does not need a handler */ @@ -11048,7 +11050,7 @@ ha_ndbcluster::drop_table_impl(THD *thd, const char *db, const char *table_name) { - DBUG_ENTER("ha_ndbcluster::ndbcluster_delete_table"); + DBUG_ENTER("ha_ndbcluster::drop_table_impl"); NDBDICT *dict= ndb->getDictionary(); int ndb_table_id= 0; int ndb_table_version= 0; @@ -11185,8 +11187,7 @@ int ha_ndbcluster::delete_table(const ch { THD *thd= current_thd; Thd_ndb *thd_ndb= get_thd_ndb(thd); - Ndb *ndb; - int error= 0; + DBUG_ENTER("ha_ndbcluster::delete_table"); DBUG_PRINT("enter", ("name: %s", name)); @@ -11198,6 +11199,7 @@ int ha_ndbcluster::delete_table(const ch dropped inside ndb. Just drop local files. */ + DBUG_PRINT("info", ("Table is already dropped in NDB")); delete_table_drop_share(0, name); DBUG_RETURN(handler::delete_table(name)); } @@ -11213,16 +11215,12 @@ int ha_ndbcluster::delete_table(const ch if (check_ndb_connection(thd)) { - error= HA_ERR_NO_CONNECTION; - goto err; + DBUG_RETURN(HA_ERR_NO_CONNECTION); } - ndb= thd_ndb->ndb; - if (!thd_ndb->has_required_global_schema_lock("ha_ndbcluster::delete_table")) { - error= HA_ERR_NO_CONNECTION; - goto err; + DBUG_RETURN(HA_ERR_NO_CONNECTION); } /* @@ -11230,6 +11228,8 @@ int ha_ndbcluster::delete_table(const ch If it was already gone it might have been dropped remotely, give a warning and then drop .ndb file. */ + int error; + Ndb* ndb= thd_ndb->ndb; if (!(error= drop_table_impl(thd, this, ndb, name, m_dbname, m_tabname)) || error == HA_ERR_NO_SUCH_TABLE) @@ -11240,7 +11240,6 @@ int ha_ndbcluster::delete_table(const ch error= error1; } -err: DBUG_RETURN(error); } @@ -13380,38 +13379,15 @@ static uchar *ndbcluster_get_key(NDB_SHA #ifndef DBUG_OFF -static void print_share(const char* where, NDB_SHARE* share) -{ - fprintf(DBUG_FILE, - "%s %s.%s: use_count: %u, commit_count: %lu\n", - where, share->db, share->table_name, share->use_count, - (ulong) share->commit_count); - fprintf(DBUG_FILE, - " - key: %s, key_length: %d\n", - share->key, share->key_length); - - Ndb_event_data *event_data= 0; - if (share->event_data) - event_data= share->event_data; - else if (share->op) - event_data= (Ndb_event_data *) share->op->getCustomData(); - if (event_data) - { - fprintf(DBUG_FILE, - " - event_data->shadow_table: %p %s.%s\n", - event_data->shadow_table, event_data->shadow_table->s->db.str, - event_data->shadow_table->s->table_name.str); - } -} - - static void print_ndbcluster_open_tables() { DBUG_LOCK_FILE; fprintf(DBUG_FILE, ">ndbcluster_open_tables\n"); for (uint i= 0; i < ndbcluster_open_tables.records; i++) - print_share("", - (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i)); + { + NDB_SHARE* share= (NDB_SHARE*)my_hash_element(&ndbcluster_open_tables, i); + share->print("", DBUG_FILE); + } fprintf(DBUG_FILE, "print((t), DBUG_FILE);); \ DBUG_UNLOCK_FILE; @@ -13573,6 +13549,7 @@ int ndbcluster_undo_rename_share(THD *th return 0; } + int ndbcluster_rename_share(THD *thd, NDB_SHARE *share) { NDB_SHARE *tmp; @@ -13621,11 +13598,7 @@ int ndbcluster_rename_share(THD *thd, ND ha_ndbcluster::set_tabname(share->new_key, share->table_name); dbug_print_share("ndbcluster_rename_share:", share); - Ndb_event_data *event_data= 0; - if (share->event_data) - event_data= share->event_data; - else if (share->op) - event_data= (Ndb_event_data *) share->op->getCustomData(); + Ndb_event_data *event_data= share->get_event_data_ptr(); if (event_data && event_data->shadow_table) { if (!IS_TMP_PREFIX(share->table_name)) @@ -13674,6 +13647,75 @@ NDB_SHARE *ndbcluster_get_share(NDB_SHAR } + +NDB_SHARE* +NDB_SHARE::create(const char* key, size_t key_length, + TABLE* table, const char* db_name, const char* table_name) +{ + NDB_SHARE* share; + if (!(share= (NDB_SHARE*) my_malloc(sizeof(*share), + MYF(MY_WME | MY_ZEROFILL)))) + return NULL; + + MEM_ROOT **root_ptr= + my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC); + MEM_ROOT *old_root= *root_ptr; + + init_sql_alloc(&share->mem_root, 1024, 0); + *root_ptr= &share->mem_root; // remember to reset before return + share->flags= 0; + share->state= NSS_INITIAL; + /* Allocate enough space for key, db, and table_name */ + share->key= (char*) alloc_root(*root_ptr, 2 * (key_length + 1)); + share->key_length= key_length; + strmov(share->key, key); + share->db= share->key + key_length + 1; + strmov(share->db, db_name); + share->table_name= share->db + strlen(share->db) + 1; + strmov(share->table_name, table_name); + thr_lock_init(&share->lock); + pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST); + share->commit_count= 0; + share->commit_count_lock= 0; + +#ifdef HAVE_NDB_BINLOG + share->m_cfn_share= NULL; +#endif + + share->op= 0; + share->new_op= 0; + share->event_data= 0; + + { + // Create array of bitmap for keeping track of subscribed nodes + // NOTE! Only the NDB_SHARE for ndb_schema really needs this + int no_nodes= g_ndb_cluster_connection->no_db_nodes(); + share->subscriber_bitmap= (MY_BITMAP*) + alloc_root(&share->mem_root, no_nodes * sizeof(MY_BITMAP)); + for (int i= 0; i < no_nodes; i++) + { + bitmap_init(&share->subscriber_bitmap[i], + (Uint32*)alloc_root(&share->mem_root, max_ndb_nodes/8), + max_ndb_nodes, FALSE); + bitmap_clear_all(&share->subscriber_bitmap[i]); + } + } + + if (ndbcluster_binlog_init_share(current_thd, share, table)) + { + DBUG_PRINT("error", ("get_share: %s could not init share", key)); + free_root(&share->mem_root, MYF(0)); + my_free(share, 0); + *root_ptr= old_root; + return NULL; + } + + *root_ptr= old_root; + + return share; +} + + static inline NDB_SHARE *ndbcluster_get_share(const char *key, TABLE *table, bool create_if_not_exists) @@ -13694,50 +13736,30 @@ NDB_SHARE *ndbcluster_get_share(const ch DBUG_PRINT("error", ("get_share: %s does not exist", key)); DBUG_RETURN(0); } - if ((share= (NDB_SHARE*) my_malloc(sizeof(*share), - MYF(MY_WME | MY_ZEROFILL)))) - { - MEM_ROOT **root_ptr= - my_pthread_getspecific_ptr(MEM_ROOT**, THR_MALLOC); - MEM_ROOT *old_root= *root_ptr; - init_sql_alloc(&share->mem_root, 1024, 0); - *root_ptr= &share->mem_root; // remember to reset before return - share->flags= 0; - share->state= NSS_INITIAL; - /* enough space for key, db, and table_name */ - share->key= (char*) alloc_root(*root_ptr, 2 * (length + 1)); - share->key_length= length; - strmov(share->key, key); - if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share)) - { - free_root(&share->mem_root, MYF(0)); - my_free((uchar*) share, 0); - *root_ptr= old_root; - DBUG_RETURN(0); - } - thr_lock_init(&share->lock); - pthread_mutex_init(&share->mutex, MY_MUTEX_INIT_FAST); - share->commit_count= 0; - share->commit_count_lock= 0; - share->db= share->key + length + 1; - ha_ndbcluster::set_dbname(key, share->db); - share->table_name= share->db + strlen(share->db) + 1; - ha_ndbcluster::set_tabname(key, share->table_name); - if (ndbcluster_binlog_init_share(current_thd, share, table)) - { - DBUG_PRINT("error", ("get_share: %s could not init share", key)); - ndbcluster_real_free_share(&share); - *root_ptr= old_root; - DBUG_RETURN(0); - } - *root_ptr= old_root; - } - else + + /* + Extract db and table name from key (to avoid that NDB_SHARE + dependens on ha_ndbcluster) + */ + char db_name_buf[FN_HEADLEN]; + char table_name_buf[FN_HEADLEN]; + ha_ndbcluster::set_dbname(key, db_name_buf); + ha_ndbcluster::set_tabname(key, table_name_buf); + + if (!(share= NDB_SHARE::create(key, length, table, + db_name_buf, table_name_buf))) { DBUG_PRINT("error", ("get_share: failed to alloc share")); my_error(ER_OUTOFMEMORY, MYF(0), static_cast(sizeof(*share))); DBUG_RETURN(0); } + + // Insert the new share in list of open shares + if (my_hash_insert(&ndbcluster_open_tables, (uchar*) share)) + { + NDB_SHARE::destroy(share); + DBUG_RETURN(0); + } } share->use_count++; if (opt_ndb_extra_logging > 9) === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2011-11-10 20:35:28 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2011-11-21 08:15:07 +0000 @@ -342,6 +342,13 @@ ndb_binlog_open_shadow_table(THD *thd, N /* We can't use 'use_all_columns()' as the file object is not setup yet */ shadow_table->column_bitmaps_set_no_signal(&shadow_table->s->all_set, &shadow_table->s->all_set); + + if (shadow_table->s->primary_key == MAX_KEY) + share->flags|= NSF_HIDDEN_PK; + + if (shadow_table->s->blob_fields != 0) + share->flags|= NSF_BLOB_FLAG; + #ifndef DBUG_OFF dbug_print_table("table", shadow_table); #endif @@ -355,48 +362,9 @@ ndb_binlog_open_shadow_table(THD *thd, N */ int ndbcluster_binlog_init_share(THD *thd, NDB_SHARE *share, TABLE *_table) { - MEM_ROOT *mem_root= &share->mem_root; - int do_event_op= ndb_binlog_running; - int error= 0; DBUG_ENTER("ndbcluster_binlog_init_share"); -#ifdef HAVE_NDB_BINLOG - share->m_cfn_share= NULL; -#endif - - share->op= 0; - share->new_op= 0; - share->event_data= 0; - - if (!ndb_schema_share && - strcmp(share->db, NDB_REP_DB) == 0 && - strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0) - do_event_op= 1; - else if (!ndb_apply_status_share && - strcmp(share->db, NDB_REP_DB) == 0 && - strcmp(share->table_name, NDB_APPLY_TABLE) == 0) - do_event_op= 1; - - if (Ndb_dist_priv_util::is_distributed_priv_table(share->db, - share->table_name)) - { - do_event_op= 0; - } - - { - int i, no_nodes= g_ndb_cluster_connection->no_db_nodes(); - share->subscriber_bitmap= (MY_BITMAP*) - alloc_root(mem_root, no_nodes * sizeof(MY_BITMAP)); - for (i= 0; i < no_nodes; i++) - { - bitmap_init(&share->subscriber_bitmap[i], - (Uint32*)alloc_root(mem_root, max_ndb_nodes/8), - max_ndb_nodes, FALSE); - bitmap_clear_all(&share->subscriber_bitmap[i]); - } - } - - if (!do_event_op) + if (!share->need_events(ndb_binlog_running)) { if (_table) { @@ -409,19 +377,10 @@ int ndbcluster_binlog_init_share(THD *th { share->flags|= NSF_NO_BINLOG; } - DBUG_RETURN(error); - } - while (1) - { - if ((error= ndb_binlog_open_shadow_table(thd, share))) - break; - if (share->event_data->shadow_table->s->primary_key == MAX_KEY) - share->flags|= NSF_HIDDEN_PK; - if (share->event_data->shadow_table->s->blob_fields != 0) - share->flags|= NSF_BLOB_FLAG; - break; + DBUG_RETURN(0); } - DBUG_RETURN(error); + + DBUG_RETURN(ndb_binlog_open_shadow_table(thd, share)); } /***************************************************************** @@ -2011,19 +1970,21 @@ ndb_handle_schema_change(THD *thd, Ndb * const Ndb_event_data *event_data) { DBUG_ENTER("ndb_handle_schema_change"); - NDB_SHARE *share= event_data->share; - TABLE *shadow_table= event_data->shadow_table; - const char *tabname= shadow_table->s->table_name.str; - const char *dbname= shadow_table->s->db.str; - bool do_close_cached_tables= FALSE; - bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId()); if (pOp->getEventType() == NDBEVENT::TE_ALTER) { + DBUG_PRINT("exit", ("Event type is TE_ALTER")); DBUG_RETURN(0); } + + DBUG_ASSERT(event_data); DBUG_ASSERT(pOp->getEventType() == NDBEVENT::TE_DROP || pOp->getEventType() == NDBEVENT::TE_CLUSTER_FAILURE); + + NDB_SHARE *share= event_data->share; + TABLE *shadow_table= event_data->shadow_table; + const char *tabname= shadow_table->s->table_name.str; + const char *dbname= shadow_table->s->db.str; { Thd_ndb *thd_ndb= get_thd_ndb(thd); Ndb *ndb= thd_ndb->ndb; @@ -2049,10 +2010,10 @@ ndb_handle_schema_change(THD *thd, Ndb * { share->op= 0; } - // either just us or drop table handling as well - - /* Signal ha_ndbcluster::delete/rename_table that drop is done */ pthread_mutex_unlock(&share->mutex); + + /* Signal ha_ndbcluster::delete/rename_table that drop is done */ + DBUG_PRINT("info", ("signal that drop is done")); (void) pthread_cond_signal(&injector_cond); pthread_mutex_lock(&ndbcluster_mutex); @@ -2060,6 +2021,9 @@ ndb_handle_schema_change(THD *thd, Ndb * DBUG_PRINT("NDB_SHARE", ("%s binlog free use_count: %u", share->key, share->use_count)); free_share(&share, TRUE); + + bool do_close_cached_tables= FALSE; + bool is_remote_change= !ndb_has_node_id(pOp->getReqNodeId()); if (is_remote_change && share && share->state != NSS_DROPPED) { DBUG_PRINT("info", ("remote change")); @@ -2083,15 +2047,13 @@ ndb_handle_schema_change(THD *thd, Ndb * share= 0; pthread_mutex_unlock(&ndbcluster_mutex); - if (event_data) - { - delete event_data; - pOp->setCustomData(NULL); - } + DBUG_PRINT("info", ("Deleting event_data")); + delete event_data; + pOp->setCustomData(NULL); + DBUG_PRINT("info", ("Dropping event operation")); pthread_mutex_lock(&injector_mutex); is_ndb->dropEventOperation(pOp); - pOp= 0; pthread_mutex_unlock(&injector_mutex); if (do_close_cached_tables) @@ -2559,18 +2521,21 @@ class Ndb_schema_event_handler { void ndbapi_invalidate_table(const char* db_name, const char* table_name) const { + DBUG_ENTER("ndbapi_invalidate_table"); Thd_ndb *thd_ndb= get_thd_ndb(m_thd); Ndb *ndb= thd_ndb->ndb; ndb->setDatabaseName(db_name); Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name); ndbtab_g.invalidate(); + DBUG_VOID_RETURN; } void mysqld_close_cached_table(const char* db_name, const char* table_name) const { + DBUG_ENTER("mysqld_close_cached_table"); // Just mark table as "need reopen" const bool wait_for_refresh = false; // Not waiting -> no timeout needed @@ -2583,11 +2548,57 @@ class Ndb_schema_event_handler { close_cached_tables(m_thd, &table_list, wait_for_refresh, timeout); + DBUG_VOID_RETURN; + } + + + void + mysqld_write_frm_from_ndb(const char* db_name, + const char* table_name) const + { + DBUG_ENTER("mysqld_write_frm_from_ndb"); + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Ndb *ndb= thd_ndb->ndb; + Ndb_table_guard ndbtab_g(ndb->getDictionary(), table_name); + const NDBTAB *ndbtab= ndbtab_g.get_table(); + + char key[FN_REFLEN]; + build_table_filename(key, sizeof(key)-1, + db_name, table_name, NullS, 0); + + uchar *data= 0, *pack_data= 0; + size_t length, pack_length; + + if (readfrm(key, &data, &length) == 0 && + packfrm(data, length, &pack_data, &pack_length) == 0 && + cmp_frm(ndbtab, pack_data, pack_length)) + { + DBUG_PRINT("info", ("Detected frm change of table %s.%s", + db_name, table_name)); + + DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(), + ndbtab->getFrmLength()); + my_free(data); + data= NULL; + + int error; + if ((error= unpackfrm(&data, &length, + (const uchar*) ndbtab->getFrmData())) || + (error= writefrm(key, data, length))) + { + sql_print_error("NDB: Failed write frm for %s.%s, error %d", + db_name, table_name, error); + } + } + my_free(data); + my_free(pack_data); + DBUG_VOID_RETURN; } NDB_SHARE* get_share(Ndb_schema_op* schema) const { + DBUG_ENTER("get_share(Ndb_schema_op*)"); char key[FN_REFLEN + 1]; build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0); @@ -2597,7 +2608,7 @@ class Ndb_schema_event_handler { DBUG_PRINT("NDB_SHARE", ("%s temporary use_count: %u", share->key, share->use_count)); } - return share; + DBUG_RETURN(share); } @@ -2630,18 +2641,17 @@ class Ndb_schema_event_handler { } + bool is_local_table(const char* db_name, const char* table_name) const + { + return ndbcluster_check_if_local_table(db_name, table_name); + } + + void handle_clear_slock(Ndb_schema_op* schema) { - if (!is_post_epoch()) - { - /* - handle slock after epoch is completed to ensure that - schema events get inserted in the binlog after any data - events - */ - log_after_epoch(schema); - return; - } + DBUG_ENTER("handle_clear_slock"); + + assert(is_post_epoch()); char key[FN_REFLEN + 1]; build_table_filename(key, sizeof(key) - 1, schema->db, schema->name, "", 0); @@ -2654,7 +2664,7 @@ class Ndb_schema_event_handler { if (opt_ndb_extra_logging > 19) sql_print_information("NDB: Discarding event...no obj: %s (%u/%u)", key, schema->id, schema->version); - return; + DBUG_VOID_RETURN; } if (ndb_schema_object->table_id != schema->id || @@ -2670,7 +2680,7 @@ class Ndb_schema_event_handler { schema->id, schema->version); ndb_free_schema_object(&ndb_schema_object); - return; + DBUG_VOID_RETURN; } /* @@ -2699,17 +2709,20 @@ class Ndb_schema_event_handler { /* Wake up the waiter */ pthread_cond_signal(&injector_cond); - return; + + DBUG_VOID_RETURN; } void handle_offline_alter_table_commit(Ndb_schema_op* schema) { + DBUG_ENTER("handle_offline_alter_table_commit"); + assert(is_post_epoch()); // Always after epoch if (schema->node_id == own_nodeid()) - return; + DBUG_VOID_RETURN; write_schema_op_to_binlog(m_thd, schema); ndbapi_invalidate_table(schema->db, schema->name); @@ -2749,7 +2762,7 @@ class Ndb_schema_event_handler { free_share(&share); } - if (ndbcluster_check_if_local_table(schema->db, schema->name) && + if (is_local_table(schema->db, schema->name) && !Ndb_dist_priv_util::is_distributed_priv_table(schema->db, schema->name)) { @@ -2757,13 +2770,14 @@ class Ndb_schema_event_handler { "from binlog schema event '%s' from node %d.", schema->db, schema->name, schema->query, schema->node_id); - return; + DBUG_VOID_RETURN; } if (ndb_create_table_from_engine(m_thd, schema->db, schema->name)) { print_could_not_discover_error(m_thd, schema); } + DBUG_VOID_RETURN; } @@ -2775,96 +2789,63 @@ class Ndb_schema_event_handler { ndbapi_invalidate_table(schema->db, schema->name); mysqld_close_cached_table(schema->db, schema->name); - int error= 0; - Thd_ndb *thd_ndb= get_thd_ndb(m_thd); - Ndb *ndb= thd_ndb->ndb; - Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name); - const NDBTAB *ndbtab= ndbtab_g.get_table(); if (schema->node_id != own_nodeid()) { - char key[FN_REFLEN]; - uchar *data= 0, *pack_data= 0; - size_t length, pack_length; - - DBUG_PRINT("info", ("Detected frm change of table %s.%s", - schema->db, schema->name)); write_schema_op_to_binlog(m_thd, schema); - build_table_filename(key, FN_LEN-1, schema->db, schema->name, NullS, 0); - /* - If the there is no local table shadowing the altered table and - it has an frm that is different than the one on disk then - overwrite it with the new table definition - */ - if (!ndbcluster_check_if_local_table(schema->db, schema->name) && - readfrm(key, &data, &length) == 0 && - packfrm(data, length, &pack_data, &pack_length) == 0 && - cmp_frm(ndbtab, pack_data, pack_length)) - { - DBUG_DUMP("frm", (uchar*) ndbtab->getFrmData(), - ndbtab->getFrmLength()); - my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); - data= NULL; - if ((error= unpackfrm(&data, &length, - (const uchar*) ndbtab->getFrmData())) || - (error= writefrm(key, data, length))) - { - sql_print_error("NDB: Failed write frm for %s.%s, error %d", - schema->db, schema->name, error); - } + if (!is_local_table(schema->db, schema->name)) + { + mysqld_write_frm_from_ndb(schema->db, schema->name); } - my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR)); - my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR)); } NDB_SHARE *share= get_share(schema); if (share) { if (opt_ndb_extra_logging > 9) - sql_print_information("NDB Binlog: handeling online alter/rename"); + sql_print_information("NDB Binlog: handling online alter/rename"); pthread_mutex_lock(&share->mutex); ndb_binlog_close_shadow_table(share); - if ((error= ndb_binlog_open_shadow_table(m_thd, share))) + if (ndb_binlog_open_shadow_table(m_thd, share)) + { sql_print_error("NDB Binlog: Failed to re-open shadow table %s.%s", schema->db, schema->name); - if (error) pthread_mutex_unlock(&share->mutex); - } - if (!error && share) - { - if (share->event_data->shadow_table->s->primary_key == MAX_KEY) - share->flags|= NSF_HIDDEN_PK; - /* - Refresh share->flags to handle added BLOB columns - */ - if (share->event_data->shadow_table->s->blob_fields != 0) - share->flags|= NSF_BLOB_FLAG; - - /* - Start subscribing to data changes to the new table definition - */ - String event_name(INJECTOR_EVENT_LEN); - ndb_rep_event_name(&event_name, schema->db, schema->name, - get_binlog_full(share)); - NdbEventOperation *tmp_op= share->op; - share->new_op= 0; - share->op= 0; - - if (ndbcluster_create_event_ops(m_thd, share, ndbtab, event_name.c_ptr())) - { - sql_print_error("NDB Binlog:" - "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s", - event_name.c_ptr()); } else { - share->new_op= share->op; - } - share->op= tmp_op; - pthread_mutex_unlock(&share->mutex); + /* + Start subscribing to data changes to the new table definition + */ + String event_name(INJECTOR_EVENT_LEN); + ndb_rep_event_name(&event_name, schema->db, schema->name, + get_binlog_full(share)); + NdbEventOperation *tmp_op= share->op; + share->new_op= 0; + share->op= 0; - if (opt_ndb_extra_logging > 9) - sql_print_information("NDB Binlog: handeling online alter/rename done"); + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Ndb *ndb= thd_ndb->ndb; + Ndb_table_guard ndbtab_g(ndb->getDictionary(), schema->name); + const NDBTAB *ndbtab= ndbtab_g.get_table(); + if (ndbcluster_create_event_ops(m_thd, share, ndbtab, + event_name.c_ptr())) + { + sql_print_error("NDB Binlog:" + "FAILED CREATE (DISCOVER) EVENT OPERATIONS Event: %s", + event_name.c_ptr()); + } + else + { + share->new_op= share->op; + } + share->op= tmp_op; + pthread_mutex_unlock(&share->mutex); + + if (opt_ndb_extra_logging > 9) + sql_print_information("NDB Binlog: handling online " + "alter/rename done"); + } } if (share) { @@ -2904,6 +2885,314 @@ class Ndb_schema_event_handler { } + void + handle_drop_table(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_drop_table"); + + assert(is_post_epoch()); // Always after epoch + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + if (is_local_table(schema->db, schema->name)) + { + /* Tables exists as a local table, print error and leave it */ + sql_print_error("NDB Binlog: Skipping dropping locally " + "defined table '%s.%s' from binlog schema " + "event '%s' from node %d. ", + schema->db, schema->name, schema->query, + schema->node_id); + DBUG_VOID_RETURN; + } + + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Thd_ndb_options_guard thd_ndb_options(thd_ndb); + thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); + const int no_print_error[2]= + {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */ + run_query(m_thd, schema->query, + schema->query + schema->query_length, + no_print_error); + + NDB_SHARE *share= get_share(schema); + // invalidation already handled by binlog thread + if (!share || !share->op) + { + ndbapi_invalidate_table(schema->db, schema->name); + mysqld_close_cached_table(schema->db, schema->name); + } + if (share) + free_share(&share); + + ndbapi_invalidate_table(schema->db, schema->name); + mysqld_close_cached_table(schema->db, schema->name); + + DBUG_VOID_RETURN; + } + + + void + handle_rename_table_prepare(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_rename_table_prepare"); + + assert(is_post_epoch()); // Always after epoch + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + NDB_SHARE *share= get_share(schema); + if (share) + { + ndbcluster_prepare_rename_share(share, schema->query); + free_share(&share); + } + DBUG_VOID_RETURN; + } + + + void + handle_rename_table(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_rename_table"); + + assert(is_post_epoch()); // Always after epoch + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + if (is_local_table(schema->db, schema->name)) + { + /* Tables exists as a local table, print error and leave it */ + sql_print_error("NDB Binlog: Skipping renaming locally " + "defined table '%s.%s' from binlog schema " + "event '%s' from node %d. ", + schema->db, schema->name, schema->query, + schema->node_id); + DBUG_VOID_RETURN; + } + + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Thd_ndb_options_guard thd_ndb_options(thd_ndb); + thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); + const int no_print_error[2]= + {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */ + run_query(m_thd, schema->query, + schema->query + schema->query_length, + no_print_error); + + NDB_SHARE *share= get_share(schema); + // invalidation already handled by binlog thread + if (!share || !share->op) + { + ndbapi_invalidate_table(schema->db, schema->name); + mysqld_close_cached_table(schema->db, schema->name); + } + if (share) + free_share(&share); + + share= get_share(schema); + if (share) + { + ndbcluster_rename_share(m_thd, share); + free_share(&share); + } + DBUG_VOID_RETURN; + } + + + void + handle_drop_db(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_drop_db"); + + assert(is_post_epoch()); // Always after epoch + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Thd_ndb_options_guard thd_ndb_options(thd_ndb); + // Set NO_LOCK_SCHEMA_OP before 'check_if_local_tables_indb' + // until ndbcluster_find_files does not take GSL + thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); + + if (check_if_local_tables_in_db(schema->db)) + { + /* Tables exists as a local table, print error and leave it */ + sql_print_error("NDB Binlog: Skipping drop database '%s' since " + "it contained local tables " + "binlog schema event '%s' from node %d. ", + schema->db, schema->query, + schema->node_id); + DBUG_VOID_RETURN; + } + + const int no_print_error[1]= {0}; + run_query(m_thd, schema->query, + schema->query + schema->query_length, + no_print_error); + + DBUG_VOID_RETURN; + } + + + void + handle_truncate_table(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_truncate_table"); + + assert(!is_post_epoch()); // Always directly + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + NDB_SHARE *share= get_share(schema); + // invalidation already handled by binlog thread + if (!share || !share->op) + { + ndbapi_invalidate_table(schema->db, schema->name); + mysqld_close_cached_table(schema->db, schema->name); + } + if (share) + free_share(&share); + + if (is_local_table(schema->db, schema->name)) + { + sql_print_error("NDB Binlog: Skipping locally defined table " + "'%s.%s' from binlog schema event '%s' from " + "node %d. ", + schema->db, schema->name, schema->query, + schema->node_id); + DBUG_VOID_RETURN; + } + + if (ndb_create_table_from_engine(m_thd, schema->db, schema->name)) + { + print_could_not_discover_error(m_thd, schema); + } + + DBUG_VOID_RETURN; + } + + + void + handle_create_table(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_create_table"); + + assert(!is_post_epoch()); // Always directly + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + if (is_local_table(schema->db, schema->name)) + { + sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from " + "binlog schema event '%s' from node %d. ", + schema->db, schema->name, schema->query, + schema->node_id); + DBUG_VOID_RETURN; + } + + if (ndb_create_table_from_engine(m_thd, schema->db, schema->name)) + { + print_could_not_discover_error(m_thd, schema); + } + + DBUG_VOID_RETURN; + } + + + void + handle_create_db(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_create_db"); + + assert(!is_post_epoch()); // Always directly + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Thd_ndb_options_guard thd_ndb_options(thd_ndb); + thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); + const int no_print_error[1]= {0}; + run_query(m_thd, schema->query, + schema->query + schema->query_length, + no_print_error); + + DBUG_VOID_RETURN; + } + + + void + handle_alter_db(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_alter_db"); + + assert(!is_post_epoch()); // Always directly + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Thd_ndb_options_guard thd_ndb_options(thd_ndb); + thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); + const int no_print_error[1]= {0}; + run_query(m_thd, schema->query, + schema->query + schema->query_length, + no_print_error); + + DBUG_VOID_RETURN; + } + + + void + handle_grant_op(Ndb_schema_op* schema) + { + DBUG_ENTER("handle_grant_op"); + + assert(!is_post_epoch()); // Always directly + + if (schema->node_id == own_nodeid()) + DBUG_VOID_RETURN; + + write_schema_op_to_binlog(m_thd, schema); + + if (opt_ndb_extra_logging > 9) + sql_print_information("Got dist_priv event: %s, " + "flushing privileges", + get_schema_type_name(schema->type)); + + Thd_ndb *thd_ndb= get_thd_ndb(m_thd); + Thd_ndb_options_guard thd_ndb_options(thd_ndb); + thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); + const int no_print_error[1]= {0}; + char *cmd= (char *) "flush privileges"; + run_query(m_thd, cmd, + cmd + strlen(cmd), + no_print_error); + + DBUG_VOID_RETURN; + } + + int handle_schema_op(Ndb_schema_op* schema) { @@ -2937,179 +3226,64 @@ class Ndb_schema_event_handler { switch (schema_type) { case SOT_CLEAR_SLOCK: - handle_clear_slock(schema); + /* + handle slock after epoch is completed to ensure that + schema events get inserted in the binlog after any data + events + */ + log_after_epoch(schema); DBUG_RETURN(0); case SOT_ALTER_TABLE_COMMIT: case SOT_RENAME_TABLE_PREPARE: case SOT_ONLINE_ALTER_TABLE_PREPARE: case SOT_ONLINE_ALTER_TABLE_COMMIT: + case SOT_RENAME_TABLE: + case SOT_DROP_TABLE: + case SOT_DROP_DB: log_after_epoch(schema); unlock_after_epoch(schema); DBUG_RETURN(0); - default: + case SOT_TRUNCATE_TABLE: + handle_truncate_table(schema); break; - } - - if (schema->node_id != own_nodeid()) - { - THD* thd= m_thd; // Code compatibility - Thd_ndb *thd_ndb= get_thd_ndb(thd); - Thd_ndb_options_guard thd_ndb_options(thd_ndb); - int post_epoch_unlock= 0; - - switch (schema_type) - { - case SOT_RENAME_TABLE: - case SOT_RENAME_TABLE_NEW: - case SOT_DROP_TABLE: - if (! ndbcluster_check_if_local_table(schema->db, schema->name)) - { - thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); - const int no_print_error[2]= - {ER_BAD_TABLE_ERROR, 0}; /* ignore missing table */ - run_query(thd, schema->query, - schema->query + schema->query_length, - no_print_error); - /* binlog dropping table after any table operations */ - log_after_epoch(schema); - /* acknowledge this query _after_ epoch completion */ - post_epoch_unlock= 1; - } - else - { - /* Tables exists as a local table, print error and leave it */ - DBUG_PRINT("info", ("Found local table '%s.%s', leaving it", - schema->db, schema->name)); - sql_print_error("NDB Binlog: Skipping %sing locally " - "defined table '%s.%s' from binlog schema " - "event '%s' from node %d. ", - (schema_type == SOT_DROP_TABLE ? "dropp" : "renam"), - schema->db, schema->name, schema->query, - schema->node_id); - write_schema_op_to_binlog(thd, schema); - } - // Fall through - case SOT_TRUNCATE_TABLE: - { - NDB_SHARE *share= get_share(schema); - // invalidation already handled by binlog thread - if (!share || !share->op) - { - ndbapi_invalidate_table(schema->db, schema->name); - mysqld_close_cached_table(schema->db, schema->name); - } - if (share) - free_share(&share); - } - if (schema_type != SOT_TRUNCATE_TABLE) - break; - // fall through - case SOT_CREATE_TABLE: - thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); - if (ndbcluster_check_if_local_table(schema->db, schema->name)) - { - DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'", - schema->db, schema->name)); - sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from " - "binlog schema event '%s' from node %d. ", - schema->db, schema->name, schema->query, - schema->node_id); - } - else if (ndb_create_table_from_engine(thd, schema->db, schema->name)) - { - print_could_not_discover_error(thd, schema); - } - write_schema_op_to_binlog(thd, schema); - break; + case SOT_CREATE_TABLE: + handle_create_table(schema); + break; - case SOT_DROP_DB: - /* Drop the database locally if it only contains ndb tables */ - thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); - if (!check_if_local_tables_in_db(schema->db)) - { - const int no_print_error[1]= {0}; - run_query(thd, schema->query, - schema->query + schema->query_length, - no_print_error); - /* binlog dropping database after any table operations */ - log_after_epoch(schema); - /* acknowledge this query _after_ epoch completion */ - post_epoch_unlock= 1; - } - else - { - /* Database contained local tables, leave it */ - sql_print_error("NDB Binlog: Skipping drop database '%s' since it contained local tables " - "binlog schema event '%s' from node %d. ", - schema->db, schema->query, - schema->node_id); - write_schema_op_to_binlog(thd, schema); - } - break; + case SOT_CREATE_DB: + handle_create_db(schema); + break; - case SOT_CREATE_DB: - case SOT_ALTER_DB: - { - thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); - const int no_print_error[1]= {0}; - run_query(thd, schema->query, - schema->query + schema->query_length, - no_print_error); - write_schema_op_to_binlog(thd, schema); - break; - } + case SOT_ALTER_DB: + handle_alter_db(schema); + break; - case SOT_CREATE_USER: - case SOT_DROP_USER: - case SOT_RENAME_USER: - case SOT_GRANT: - case SOT_REVOKE: - { - if (opt_ndb_extra_logging > 9) - sql_print_information("Got dist_priv event: %s, " - "flushing privileges", - get_schema_type_name(schema_type)); - - thd_ndb_options.set(TNO_NO_LOCK_SCHEMA_OP); - const int no_print_error[1]= {0}; - char *cmd= (char *) "flush privileges"; - run_query(thd, cmd, - cmd + strlen(cmd), - no_print_error); - write_schema_op_to_binlog(thd, schema); - break; - } - - case SOT_TABLESPACE: - case SOT_LOGFILE_GROUP: - write_schema_op_to_binlog(thd, schema); - break; + case SOT_CREATE_USER: + case SOT_DROP_USER: + case SOT_RENAME_USER: + case SOT_GRANT: + case SOT_REVOKE: + handle_grant_op(schema); + break; - case SOT_ALTER_TABLE_COMMIT: - case SOT_RENAME_TABLE_PREPARE: - case SOT_ONLINE_ALTER_TABLE_PREPARE: - case SOT_ONLINE_ALTER_TABLE_COMMIT: - case SOT_CLEAR_SLOCK: - // Impossible to come here, the above types has already - // been handled and caused the function to return - abort(); + case SOT_TABLESPACE: + case SOT_LOGFILE_GROUP: + if (schema->node_id == own_nodeid()) break; + write_schema_op_to_binlog(m_thd, schema); + break; - } + } - /* signal that schema operation has been handled */ - DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length); - if (bitmap_is_set(&schema->slock, own_nodeid())) - { - if (post_epoch_unlock) - unlock_after_epoch(schema); - else - ack_schema_op(schema->db, schema->name, - schema->id, schema->version); - } + /* signal that schema operation has been handled */ + DBUG_DUMP("slock", (uchar*) schema->slock_buf, schema->slock_length); + if (bitmap_is_set(&schema->slock, own_nodeid())) + { + ack_schema_op(schema->db, schema->name, + schema->id, schema->version); } } DBUG_RETURN(0); @@ -3140,39 +3314,20 @@ class Ndb_schema_event_handler { break; case SOT_DROP_DB: - write_schema_op_to_binlog(thd, schema); + handle_drop_db(schema); break; case SOT_DROP_TABLE: - write_schema_op_to_binlog(thd, schema); - ndbapi_invalidate_table(schema->db, schema->name); - mysqld_close_cached_table(schema->db, schema->name); + handle_drop_table(schema); break; - case SOT_RENAME_TABLE: - { - write_schema_op_to_binlog(thd, schema); - NDB_SHARE *share= get_share(schema); - if (share) - { - ndbcluster_rename_share(thd, share); - free_share(&share); - } + case SOT_RENAME_TABLE_PREPARE: + handle_rename_table_prepare(schema); break; - } - case SOT_RENAME_TABLE_PREPARE: - { - if (schema->node_id == own_nodeid()) - break; - NDB_SHARE *share= get_share(schema); - if (share) - { - ndbcluster_prepare_rename_share(share, schema->query); - free_share(&share); - } + case SOT_RENAME_TABLE: + handle_rename_table(schema); break; - } case SOT_ALTER_TABLE_COMMIT: handle_offline_alter_table_commit(schema); @@ -3186,46 +3341,6 @@ class Ndb_schema_event_handler { handle_online_alter_table_commit(schema); break; - case SOT_RENAME_TABLE_NEW: - { - write_schema_op_to_binlog(thd, schema); - NDB_SHARE *share= get_share(schema); - if (ndb_binlog_running && (!share || !share->op)) - { - /* - we need to free any share here as command below - may need to call handle_trailing_share - */ - if (share) - { - /* ndb_share reference temporary free */ - DBUG_PRINT("NDB_SHARE", ("%s temporary free use_count: %u", - share->key, share->use_count)); - free_share(&share); - share= 0; - } - - if (ndbcluster_check_if_local_table(schema->db, schema->name)) - { - DBUG_PRINT("info", ("NDB Binlog: Skipping locally defined table '%s.%s'", - schema->db, schema->name)); - sql_print_error("NDB Binlog: Skipping locally defined table '%s.%s' from " - "binlog schema event '%s' from node %d. ", - schema->db, schema->name, schema->query, - schema->node_id); - } - else if (ndb_create_table_from_engine(thd, schema->db, schema->name)) - { - print_could_not_discover_error(thd, schema); - } - } - if (share) - { - free_share(&share); - } - break; - } - default: DBUG_ASSERT(FALSE); } @@ -3452,7 +3567,7 @@ public: }; /********************************************************************* - Internal helper functions for handeling of the cluster replication tables + Internal helper functions for handling of the cluster replication tables - ndb_binlog_index - ndb_apply_status *********************************************************************/ @@ -5081,8 +5196,9 @@ ndbcluster_check_if_local_table(const ch char ndb_file[FN_REFLEN + 1]; DBUG_ENTER("ndbcluster_check_if_local_table"); - build_table_filename(key, FN_LEN-1, dbname, tabname, reg_ext, 0); - build_table_filename(ndb_file, FN_LEN-1, dbname, tabname, ha_ndb_ext, 0); + build_table_filename(key, sizeof(key)-1, dbname, tabname, reg_ext, 0); + build_table_filename(ndb_file, sizeof(ndb_file)-1, + dbname, tabname, ha_ndb_ext, 0); /* Check that any defined table is an ndb table */ DBUG_PRINT("info", ("Looking for file %s and %s", key, ndb_file)); if ((! my_access(key, F_OK)) && my_access(ndb_file, F_OK)) @@ -5107,7 +5223,6 @@ int ndbcluster_create_binlog_setup(THD * const char *table_name, TABLE * table) { - int do_event_op= ndb_binlog_running; DBUG_ENTER("ndbcluster_create_binlog_setup"); DBUG_PRINT("enter",("key: %s key_len: %d %s.%s", key, key_len, db, table_name)); @@ -5131,25 +5246,7 @@ int ndbcluster_create_binlog_setup(THD * DBUG_RETURN(0); // replication already setup, or should not } - if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name)) - { - // The distributed privilege tables are distributed by writing - // the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need - // to listen to events from this table - DBUG_PRINT("info", ("Skipping binlogging of table %s/%s", db, table_name)); - do_event_op= 0; - } - - if (!ndb_schema_share && - strcmp(share->db, NDB_REP_DB) == 0 && - strcmp(share->table_name, NDB_SCHEMA_TABLE) == 0) - do_event_op= 1; - else if (!ndb_apply_status_share && - strcmp(share->db, NDB_REP_DB) == 0 && - strcmp(share->table_name, NDB_APPLY_TABLE) == 0) - do_event_op= 1; - - if (!do_event_op) + if (!share->need_events(ndb_binlog_running)) { set_binlog_nologging(share); pthread_mutex_unlock(&share->mutex); @@ -5256,11 +5353,8 @@ ndbcluster_create_event(THD *thd, Ndb *n ndbtab->getName(), ndbtab->getObjectVersion(), event_name, share ? share->key : "(nil)")); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); - if (!share) - { - DBUG_PRINT("info", ("share == NULL")); - DBUG_RETURN(0); - } + DBUG_ASSERT(share); + if (get_binlog_nologging(share)) { if (opt_ndb_extra_logging && ndb_binlog_running) @@ -5440,8 +5534,7 @@ ndbcluster_create_event_ops(THD *thd, ND DBUG_ENTER("ndbcluster_create_event_ops"); DBUG_PRINT("enter", ("table: %s event: %s", ndbtab->getName(), event_name)); DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(ndbtab->getName())); - - DBUG_ASSERT(share != 0); + DBUG_ASSERT(share); if (get_binlog_nologging(share)) { @@ -5455,7 +5548,6 @@ ndbcluster_create_event_ops(THD *thd, ND assert(!Ndb_dist_priv_util::is_distributed_priv_table(share->db, share->table_name)); - Ndb_event_data *event_data= share->event_data; int do_ndb_schema_share= 0, do_ndb_apply_status_share= 0; #ifdef HAVE_NDB_BINLOG uint len= (int)strlen(share->table_name); @@ -5480,6 +5572,10 @@ ndbcluster_create_event_ops(THD *thd, ND DBUG_RETURN(0); } + // Check that the share agrees + DBUG_ASSERT(share->need_events(ndb_binlog_running)); + + Ndb_event_data *event_data= share->event_data; if (share->op) { event_data= (Ndb_event_data *) share->op->getCustomData(); === modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2011-11-15 14:45:16 +0000 +++ b/sql/log_event.cc 2011-11-18 09:24:36 +0000 @@ -7960,7 +7960,7 @@ int Rows_log_event::do_apply_event(Relay Note that unlike the other thd options set here, this one comes from a global, and not from the incoming event. */ - if (slave_allow_batching) + if (opt_slave_allow_batching) thd->variables.option_bits|= OPTION_ALLOW_BATCH; else thd->variables.option_bits&= ~OPTION_ALLOW_BATCH; === modified file 'sql/mysqld.h' --- a/sql/mysqld.h 2011-11-15 13:32:16 +0000 +++ b/sql/mysqld.h 2011-11-18 09:24:36 +0000 @@ -166,7 +166,9 @@ extern ulong slow_launch_threads, slow_l extern ulong table_cache_size, table_def_size; extern MYSQL_PLUGIN_IMPORT ulong max_connections; extern ulong max_connect_errors, connect_timeout; -extern my_bool slave_allow_batching; +#ifndef MCP_WL3733 +extern my_bool opt_slave_allow_batching; +#endif extern my_bool allow_slave_start; extern LEX_CSTRING reason_slave_blocked; extern ulong slave_trans_retries; === modified file 'sql/ndb_event_data.cc' --- a/sql/ndb_event_data.cc 2011-10-29 09:02:21 +0000 +++ b/sql/ndb_event_data.cc 2011-11-10 08:21:36 +0000 @@ -42,3 +42,13 @@ Ndb_event_data::~Ndb_event_data() */ my_free(ndb_value[0]); } + + +void Ndb_event_data::print(const char* where, FILE* file) const +{ + fprintf(file, + "%s shadow_table: %p '%s.%s'\n", + where, + shadow_table, shadow_table->s->db.str, + shadow_table->s->table_name.str); +} === modified file 'sql/ndb_event_data.h' --- a/sql/ndb_event_data.h 2011-10-29 09:02:21 +0000 +++ b/sql/ndb_event_data.h 2011-11-10 08:21:36 +0000 @@ -34,6 +34,8 @@ public: struct TABLE *shadow_table; struct NDB_SHARE *share; union NdbValue *ndb_value[2]; + + void print(const char* where, FILE* file) const; }; #endif === modified file 'sql/ndb_schema_dist.cc' --- a/sql/ndb_schema_dist.cc 2011-11-02 09:28:48 +0000 +++ b/sql/ndb_schema_dist.cc 2011-11-10 08:29:32 +0000 @@ -26,8 +26,6 @@ get_schema_type_name(uint type) return "DROP_TABLE"; case SOT_CREATE_TABLE: return "CREATE_TABLE"; - case SOT_RENAME_TABLE_NEW: - return "RENAME_TABLE_NEW"; case SOT_ALTER_TABLE_COMMIT: return "ALTER_TABLE_COMMIT"; case SOT_DROP_DB: === modified file 'sql/ndb_schema_dist.h' --- a/sql/ndb_schema_dist.h 2011-11-02 09:28:48 +0000 +++ b/sql/ndb_schema_dist.h 2011-11-10 08:29:32 +0000 @@ -40,7 +40,7 @@ enum SCHEMA_OP_TYPE { SOT_DROP_TABLE= 0, SOT_CREATE_TABLE= 1, - SOT_RENAME_TABLE_NEW= 2, + SOT_RENAME_TABLE_NEW= 2, // Unused, but still reserved SOT_ALTER_TABLE_COMMIT= 3, SOT_DROP_DB= 4, SOT_CREATE_DB= 5, === modified file 'sql/ndb_share.cc' --- a/sql/ndb_share.cc 2011-11-07 19:00:35 +0000 +++ b/sql/ndb_share.cc 2011-11-10 08:21:36 +0000 @@ -17,6 +17,10 @@ #include "ndb_share.h" #include "ndb_event_data.h" +#include "ndb_dist_priv_util.h" +#include "ha_ndbcluster_tables.h" + +#include #include @@ -37,12 +41,109 @@ NDB_SHARE::destroy(NDB_SHARE* share) } #endif share->new_op= 0; - if (share->event_data) + Ndb_event_data* event_data = share->event_data; + if (event_data) { - delete share->event_data; - share->event_data= 0; + delete event_data; + event_data= 0; } free_root(&share->mem_root, MYF(0)); my_free(share); } + +bool +NDB_SHARE::need_events(bool default_on) const +{ + DBUG_ENTER("NDB_SHARE::need_events"); + DBUG_PRINT("enter", ("db: %s, table_name: %s", + db, table_name)); + + if (default_on) + { + // Events are on by default, check if it should be turned off + + if (Ndb_dist_priv_util::is_distributed_priv_table(db, table_name)) + { + /* + The distributed privilege tables are distributed by writing + the CREATE USER, GRANT, REVOKE etc. to ndb_schema -> no need + to listen to events from those table + */ + DBUG_PRINT("exit", ("no events for dist priv table")); + DBUG_RETURN(false); + } + + DBUG_PRINT("exit", ("need events(the default for this mysqld)")); + DBUG_RETURN(true); + } + + // Events are off by default, check if it should be turned on + if (strcmp(db, NDB_REP_DB) == 0) + { + // The table is in "mysql" database + if (strcmp(table_name, NDB_SCHEMA_TABLE) == 0) + { + DBUG_PRINT("exit", ("need events for " NDB_SCHEMA_TABLE)); + DBUG_RETURN(true); + } + + if (strcmp(table_name, NDB_APPLY_TABLE) == 0) + { + DBUG_PRINT("exit", ("need events for " NDB_APPLY_TABLE)); + DBUG_RETURN(true); + } + } + + DBUG_PRINT("exit", ("no events(the default for this mysqld)")); + DBUG_RETURN(false); +} + + +Ndb_event_data* NDB_SHARE::get_event_data_ptr() const +{ + if (event_data) + { + // The event_data pointer is only used before + // creating the NdbEventoperation -> check no op yet + assert(!op); + + return event_data; + } + + if (op) + { + // The event_data should now be empty since it's been moved to + // op's custom data + assert(!event_data); + + // Check that op has custom data + assert(op->getCustomData()); + + return (Ndb_event_data*)op->getCustomData(); + } + + return NULL; +} + + +void NDB_SHARE::print(const char* where, FILE* file) const +{ + fprintf(file, "%s %s.%s: use_count: %u\n", + where, db, table_name, use_count); + fprintf(file, " - key: '%s', key_length: %d\n", key, key_length); + fprintf(file, " - commit_count: %llu\n", commit_count); + if (new_key) + fprintf(file, " - new_key: %p, '%s'\n", + new_key, new_key); + if (event_data) + fprintf(file, " - event_data: %p\n", event_data); + if (op) + fprintf(file, " - op: %p\n", op); + if (new_op) + fprintf(file, " - new_op: %p\n", new_op); + + Ndb_event_data *event_data_ptr= get_event_data_ptr(); + if (event_data_ptr) + event_data_ptr->print(" -", file); +} === modified file 'sql/ndb_share.h' --- a/sql/ndb_share.h 2011-11-05 11:35:57 +0000 +++ b/sql/ndb_share.h 2011-11-10 08:21:36 +0000 @@ -188,7 +188,20 @@ struct NDB_SHARE { MY_BITMAP *subscriber_bitmap; class NdbEventOperation *new_op; + static NDB_SHARE* create(const char* key, size_t key_length, + struct TABLE* table, const char* db_name, + const char* table_name); static void destroy(NDB_SHARE* share); + + class Ndb_event_data* get_event_data_ptr() const; + + void print(const char* where, FILE* file = stderr) const; + + /* + Returns true if this share need to subscribe to + events from the table. + */ + bool need_events(bool default_on) const; }; === modified file 'sql/sys_vars.cc' --- a/sql/sys_vars.cc 2011-11-15 14:25:58 +0000 +++ b/sql/sys_vars.cc 2011-11-18 09:24:36 +0000 @@ -2470,40 +2470,6 @@ static Sys_var_ulong Sys_profiling_histo VALID_RANGE(0, 100), DEFAULT(15), BLOCK_SIZE(1)); #endif -#ifndef MCP_WL3733 -#ifndef EMBEDDED_LIBRARY -my_bool slave_allow_batching; - -/* - Take Active MI lock while checking/updating slave_allow_batching - to give atomicity w.r.t. slave state changes -*/ -static PolyLock_mutex PLock_active_mi(&LOCK_active_mi); - -static bool slave_allow_batching_check(sys_var *self, THD *thd, set_var *var) -{ - /* Only allow a change if the slave SQL thread is currently stopped */ - bool slave_sql_running = active_mi->rli.slave_running; - - if (slave_sql_running) - { - my_error(ER_SLAVE_MUST_STOP, MYF(0)); - return true; - } - - return false; -} - -static Sys_var_mybool Sys_slave_allow_batching( - "slave_allow_batching", "Allow slave to batch requests", - GLOBAL_VAR(slave_allow_batching), - CMD_LINE(OPT_ARG), DEFAULT(FALSE), - &PLock_active_mi, - NOT_IN_BINLOG, - ON_CHECK(slave_allow_batching_check)); -#endif -#endif - static Sys_var_harows Sys_select_limit( "sql_select_limit", "The maximum number of rows to return from SELECT statements", @@ -3041,6 +3007,14 @@ static Sys_var_mybool Sys_relay_log_reco "processed", GLOBAL_VAR(relay_log_recovery), CMD_LINE(OPT_ARG), DEFAULT(FALSE)); +#ifndef MCP_WL3733 +my_bool opt_slave_allow_batching; +static Sys_var_mybool Sys_slave_allow_batching( + "slave_allow_batching", "Allow slave to batch requests", + GLOBAL_VAR(opt_slave_allow_batching), + CMD_LINE(OPT_ARG), DEFAULT(FALSE)); +#endif + static Sys_var_charptr Sys_slave_load_tmpdir( "slave_load_tmpdir", "The location where the slave should put " "its temporary files when replicating a LOAD DATA INFILE command", No bundle (reason: useless for push emails).