From: Andrei Elkin Date: July 1 2011 1:42pm Subject: bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3329) List-Archive: http://lists.mysql.com/commits/140088 Message-Id: <201107011342.p61DgqmQ032118@mysql1000.dsl.inet.fi> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3329 Andrei Elkin 2011-07-01 [merge] wl5569 MTS merging from the main repo. removed: mysql-test/suite/innodb/include/innodb_stats_bootstrap.inc storage/innobase/scripts/ storage/innobase/scripts/persistent_storage.sql added: mysql-test/r/join_cache_jcl0.result mysql-test/r/mysql_embedded.result mysql-test/suite/innodb/r/innodb_buffer_pool_load.result mysql-test/suite/innodb/t/innodb_buffer_pool_load-master.opt mysql-test/suite/innodb/t/innodb_buffer_pool_load.test mysql-test/suite/perfschema/r/table_schema.result mysql-test/suite/perfschema/t/table_schema.test mysql-test/suite/sys_vars/r/innodb_buffer_pool_dump_at_shutdown_basic.result mysql-test/suite/sys_vars/r/innodb_buffer_pool_dump_now_basic.result mysql-test/suite/sys_vars/r/innodb_buffer_pool_filename_basic.result mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_abort_basic.result mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_at_startup_basic.result mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_now_basic.result mysql-test/suite/sys_vars/t/innodb_buffer_pool_dump_at_shutdown_basic.test mysql-test/suite/sys_vars/t/innodb_buffer_pool_dump_now_basic.test mysql-test/suite/sys_vars/t/innodb_buffer_pool_filename_basic.test mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_abort_basic.test mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_at_startup_basic.test mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_now_basic.test mysql-test/t/join_cache_jcl0.test mysql-test/t/mysql_embedded.test storage/innobase/buf/buf0dump.c storage/innobase/include/buf0dump.h unittest/gunit/bounds_checked_array-t.cc modified: .bzrignore cmake/make_dist.cmake.in cmake/os/WindowsCache.cmake cmake/plugin.cmake config.h.cmake configure.cmake extra/perror.c extra/yassl/src/yassl_error.cpp libmysqld/emb_qcache.cc libmysqld/lib_sql.cc mysql-test/collections/default.experimental mysql-test/extra/rpl_tests/rpl_loaddata.test mysql-test/extra/rpl_tests/rpl_mts_crash_safe.inc mysql-test/extra/rpl_tests/rpl_stm_EE_err2.test mysql-test/include/icp_tests.inc mysql-test/include/join_cache.inc mysql-test/include/subquery.inc mysql-test/include/subquery_mat.inc mysql-test/include/subquery_sj.inc mysql-test/mysql-test-run.pl mysql-test/r/1st.result mysql-test/r/alter_table.result mysql-test/r/connect.result mysql-test/r/count_distinct.result mysql-test/r/distinct.result mysql-test/r/events_bugs.result mysql-test/r/flush.result mysql-test/r/gis-precise.result mysql-test/r/gis-rtree.result mysql-test/r/group_by.result mysql-test/r/innodb_icp.result mysql-test/r/innodb_icp_all.result mysql-test/r/innodb_icp_none.result mysql-test/r/join_cache_jcl1.result mysql-test/r/join_cache_jcl2.result mysql-test/r/join_cache_jcl3.result mysql-test/r/join_cache_jcl4.result mysql-test/r/join_cache_jcl5.result mysql-test/r/join_cache_jcl6.result mysql-test/r/join_cache_jcl7.result mysql-test/r/join_cache_jcl8.result mysql-test/r/join_nested.result mysql-test/r/join_nested_jcl6.result mysql-test/r/join_outer.result mysql-test/r/join_outer_jcl6.result mysql-test/r/log_tables_upgrade.result mysql-test/r/lowercase_table4.result mysql-test/r/multi_update.result mysql-test/r/multi_update_innodb.result mysql-test/r/myisam_icp.result mysql-test/r/myisam_icp_all.result mysql-test/r/myisam_icp_none.result mysql-test/r/mysql_upgrade.result mysql-test/r/mysql_upgrade_ssl.result mysql-test/r/mysqlcheck.result mysql-test/r/null_key_all.result mysql-test/r/null_key_icp.result mysql-test/r/null_key_none.result mysql-test/r/order_by_all.result mysql-test/r/order_by_icp_mrr.result mysql-test/r/order_by_none.result mysql-test/r/partition.result mysql-test/r/partition_datatype.result mysql-test/r/plugin_auth.result mysql-test/r/ps.result mysql-test/r/query_cache_28249.result mysql-test/r/range_all.result mysql-test/r/select_all.result mysql-test/r/select_all_jcl6.result mysql-test/r/select_found.result mysql-test/r/select_icp_mrr.result mysql-test/r/select_icp_mrr_jcl6.result mysql-test/r/select_none.result mysql-test/r/select_none_jcl6.result mysql-test/r/sp_notembedded.result mysql-test/r/sp_sync.result mysql-test/r/subquery_all.result mysql-test/r/subquery_all_jcl6.result mysql-test/r/subquery_mat.result mysql-test/r/subquery_mat_all.result mysql-test/r/subquery_mat_none.result mysql-test/r/subquery_nomat_nosj.result mysql-test/r/subquery_nomat_nosj_jcl6.result mysql-test/r/subquery_none.result mysql-test/r/subquery_none_jcl6.result mysql-test/r/subquery_sj_all.result mysql-test/r/subquery_sj_all_jcl6.result mysql-test/r/subquery_sj_all_jcl7.result mysql-test/r/subquery_sj_dupsweed.result mysql-test/r/subquery_sj_dupsweed_jcl6.result mysql-test/r/subquery_sj_dupsweed_jcl7.result mysql-test/r/subquery_sj_firstmatch.result mysql-test/r/subquery_sj_firstmatch_jcl6.result mysql-test/r/subquery_sj_firstmatch_jcl7.result mysql-test/r/subquery_sj_loosescan.result mysql-test/r/subquery_sj_loosescan_jcl6.result mysql-test/r/subquery_sj_loosescan_jcl7.result mysql-test/r/subquery_sj_mat.result mysql-test/r/subquery_sj_mat_jcl6.result mysql-test/r/subquery_sj_mat_jcl7.result mysql-test/r/subquery_sj_mat_nosj.result mysql-test/r/subquery_sj_none.result mysql-test/r/subquery_sj_none_jcl6.result mysql-test/r/subquery_sj_none_jcl7.result mysql-test/r/subselect_innodb.result mysql-test/r/symlink.result mysql-test/r/system_mysql_db.result mysql-test/r/trigger-compat.result mysql-test/r/trigger.result mysql-test/r/type_datetime.result mysql-test/suite/funcs_1/r/is_columns_mysql.result mysql-test/suite/funcs_1/r/is_columns_mysql_embedded.result mysql-test/suite/funcs_1/r/is_key_column_usage.result mysql-test/suite/funcs_1/r/is_statistics.result mysql-test/suite/funcs_1/r/is_statistics_mysql.result mysql-test/suite/funcs_1/r/is_statistics_mysql_embedded.result mysql-test/suite/funcs_1/r/is_table_constraints.result mysql-test/suite/funcs_1/r/is_table_constraints_mysql.result mysql-test/suite/funcs_1/r/is_table_constraints_mysql_embedded.result mysql-test/suite/funcs_1/r/is_tables_mysql.result mysql-test/suite/funcs_1/r/is_tables_mysql_embedded.result mysql-test/suite/innodb/r/innodb-index.result mysql-test/suite/innodb/r/innodb-system-table-view.result mysql-test/suite/innodb/r/innodb-use-sys-malloc.result mysql-test/suite/innodb/r/innodb-zip.result mysql-test/suite/innodb/r/innodb_bug57904.result mysql-test/suite/innodb/t/innodb-index.test mysql-test/suite/innodb/t/innodb-system-table-view.test mysql-test/suite/innodb/t/innodb-use-sys-malloc-master.opt mysql-test/suite/innodb/t/innodb-use-sys-malloc.test mysql-test/suite/innodb/t/innodb-zip.test mysql-test/suite/innodb/t/innodb_bug11933790.test mysql-test/suite/innodb/t/innodb_bug57904.test mysql-test/suite/innodb/t/innodb_bug60049.test mysql-test/suite/innodb/t/innodb_prefix_index_restart_server.test mysql-test/suite/innodb/t/innodb_stats.test mysql-test/suite/innodb/t/innodb_stats_drop_locked.test mysql-test/suite/perfschema/include/cleanup_helper.inc mysql-test/suite/perfschema/include/upgrade_check.inc mysql-test/suite/perfschema/r/all_tests.result mysql-test/suite/perfschema/r/pfs_upgrade.result mysql-test/suite/perfschema/r/schema.result mysql-test/suite/perfschema/r/selects.result mysql-test/suite/perfschema/t/all_tests.test mysql-test/suite/perfschema/t/selects.test mysql-test/suite/rpl/r/rpl_loaddata.result mysql-test/suite/rpl/r/rpl_mixed_mts_crash_safe.result mysql-test/suite/rpl/r/rpl_parallel_start_stop.result mysql-test/suite/rpl/r/rpl_row_mts_crash_safe.result mysql-test/suite/rpl/r/rpl_stm_EE_err2.result mysql-test/suite/rpl/r/rpl_stm_loaddata_concurrent.result mysql-test/suite/rpl/r/rpl_stm_mts_crash_safe.result mysql-test/suite/rpl/t/rpl_mixed_mts_crash_safe.test mysql-test/suite/rpl/t/rpl_parallel_start_stop.test mysql-test/suite/rpl/t/rpl_row_mts_crash_safe.test mysql-test/suite/rpl/t/rpl_stm_mts_crash_safe.test mysql-test/t/alter_table.test mysql-test/t/disabled.def mysql-test/t/events_bugs.test mysql-test/t/flush.test mysql-test/t/gis-precise.test mysql-test/t/gis-rtree.test mysql-test/t/group_by.test mysql-test/t/join_outer.test mysql-test/t/multi_update.test mysql-test/t/multi_update_innodb.test mysql-test/t/partition.test mysql-test/t/partition_datatype.test mysql-test/t/ps.test mysql-test/t/query_cache_28249.test mysql-test/t/sp_notembedded.test mysql-test/t/sp_sync.test mysql-test/t/symlink.test mysql-test/t/system_mysql_db_fix40123.test mysql-test/t/system_mysql_db_fix50030.test mysql-test/t/system_mysql_db_fix50117.test mysql-test/t/trigger-compat.test mysql-test/t/type_datetime.test mysys/mf_pack.c scripts/mysql_install_db.pl.in scripts/mysql_install_db.sh scripts/mysql_system_tables.sql sql/binlog.cc sql/event_db_repository.cc sql/event_parse_data.cc sql/event_parse_data.h sql/event_scheduler.cc sql/field.cc sql/filesort.cc sql/gcalc_tools.cc sql/ha_ndbcluster_binlog.cc sql/ha_partition.cc sql/handler.cc sql/item.cc sql/item.h sql/item_cmpfunc.cc sql/item_cmpfunc.h sql/item_func.cc sql/item_func.h sql/item_row.cc sql/item_row.h sql/item_strfunc.cc sql/item_strfunc.h sql/item_subselect.cc sql/item_subselect.h sql/item_sum.cc sql/item_timefunc.cc sql/item_timefunc.h sql/log_event.cc sql/log_event_old.cc sql/mdl.h sql/opt_range.cc sql/opt_sum.cc sql/partition_info.cc sql/protocol.cc sql/rpl_master.cc sql/rpl_reporting.cc sql/rpl_rli.cc sql/rpl_rli_pdb.cc sql/rpl_slave.cc sql/share/errmsg-utf8.txt sql/sp.cc sql/sp_head.cc sql/spatial.cc sql/sql_acl.cc sql/sql_admin.cc sql/sql_alloc_error_handler.cc sql/sql_array.h sql/sql_audit.h sql/sql_base.cc sql/sql_base.h sql/sql_cache.cc sql/sql_class.cc sql/sql_class.h sql/sql_connect.cc sql/sql_db.cc sql/sql_derived.cc sql/sql_do.cc sql/sql_error.cc sql/sql_error.h sql/sql_insert.cc sql/sql_join_cache.cc sql/sql_lex.cc sql/sql_lex.h sql/sql_load.cc sql/sql_parse.cc sql/sql_plist.h sql/sql_prepare.cc sql/sql_prepare.h sql/sql_select.cc sql/sql_select.h sql/sql_servers.cc sql/sql_show.cc sql/sql_signal.cc sql/sql_table.cc sql/sql_test.cc sql/sql_time.cc sql/sql_trigger.cc sql/sql_trigger.h sql/sql_union.cc sql/sql_update.cc sql/sql_yacc.yy sql/structs.h sql/sys_vars.cc sql/table.cc sql/table.h sql/transaction.cc sql/tztime.cc storage/innobase/CMakeLists.txt storage/innobase/btr/btr0btr.c storage/innobase/btr/btr0cur.c storage/innobase/buf/buf0buddy.c storage/innobase/buf/buf0buf.c storage/innobase/buf/buf0flu.c storage/innobase/buf/buf0lru.c storage/innobase/buf/buf0rea.c storage/innobase/fil/fil0fil.c storage/innobase/handler/ha_innodb.cc storage/innobase/include/btr0btr.h storage/innobase/include/btr0cur.h storage/innobase/include/btr0cur.ic storage/innobase/include/buf0buddy.h storage/innobase/include/buf0buddy.ic storage/innobase/include/buf0buf.h storage/innobase/include/buf0buf.ic storage/innobase/include/buf0lru.h storage/innobase/include/buf0rea.h storage/innobase/include/buf0types.h storage/innobase/include/db0err.h storage/innobase/include/page0cur.ic storage/innobase/include/page0page.h storage/innobase/include/page0page.ic storage/innobase/include/rem0rec.h storage/innobase/include/rem0rec.ic storage/innobase/include/srv0srv.h storage/innobase/include/srv0start.h storage/innobase/include/sync0rw.ic storage/innobase/include/sync0sync.h storage/innobase/include/univ.i storage/innobase/include/ut0ut.h storage/innobase/page/page0cur.c storage/innobase/page/page0page.c storage/innobase/page/page0zip.c storage/innobase/rem/rem0rec.c storage/innobase/row/row0ins.c storage/innobase/row/row0mysql.c storage/innobase/row/row0row.c storage/innobase/row/row0upd.c storage/innobase/row/row0vers.c storage/innobase/srv/srv0srv.c storage/innobase/srv/srv0start.c storage/innobase/sync/sync0rw.c storage/innobase/sync/sync0sync.c storage/innobase/trx/trx0rec.c storage/innobase/trx/trx0sys.c storage/innobase/ut/ut0ut.c storage/myisam/mi_update.c storage/myisam/mi_write.c tests/mysql_client_test.c unittest/gunit/CMakeLists.txt === modified file 'mysql-test/suite/rpl/t/rpl_heartbeat_basic.test' --- a/mysql-test/suite/rpl/t/rpl_heartbeat_basic.test 2011-06-20 13:26:35 +0000 +++ b/mysql-test/suite/rpl/t/rpl_heartbeat_basic.test 2011-07-01 10:16:52 +0000 @@ -26,7 +26,7 @@ call mtr.add_suppression("The slave coor let $connect_retry= 20; --echo *** Preparing *** ---connection slave +sync_slave_with_master; --disable_query_log call mtr.add_suppression("The master's UUID has changed, although this should not happen unless you have changed it manually."); === modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2011-06-29 07:04:19 +0000 +++ b/sql/log_event.cc 2011-07-01 13:41:35 +0000 @@ -2374,11 +2374,11 @@ bool Log_event::contains_partition_info( (Delete, Update, Write -rows) T - terminator of the group (XID, COMMIT, ROLLBACK, auto-commit query) - Only the first g-type event computes the assigned Worker which once + Only the first g-event computes the assigned Worker which once is determined remains to be for the rest of the group. - That is the g-type event solely carries partitioning info. - For B-type the assigned Worker is NULL to indicate Coordinator - has not yet decided. The same applies to p-type. + That is the g-event solely carries partitioning info. + For B-event the assigned Worker is NULL to indicate Coordinator + has not yet decided. The same applies to p-event. Notice, these is a special group consisting of optionally multiple p-events terminating with a g-event. @@ -2390,9 +2390,9 @@ bool Log_event::contains_partition_info( done. - @note The function can update APH (through map_db_to_worker()), GAQ objects - and relocate some temporary tables from Coordinator's list into - involved entries of APH. + @note The function updates GAQ queue directly, updates APH hash + plus relocates some temporary tables from Coordinator's list into + involved entries of APH through @c map_db_to_worker. There's few memory allocations commented where to be freed. @return a pointer to the Worker struct or NULL. @@ -2406,6 +2406,7 @@ Slave_worker *Log_event::get_slave_worke Slave_worker *ret_worker= NULL; THD *thd= rli->info_thd; char llbuff[22]; + Slave_committed_queue *gaq= rli->gaq; /* checking partioning properties and perform corresponding actions */ @@ -2413,14 +2414,19 @@ Slave_worker *Log_event::get_slave_worke if ((is_b_event= starts_group()) || // or DDL:s or autocommit queries possibly associated with own p-events (!rli->curr_group_seen_begin && - // the following is a case of no-B group: { p_1,p_2,...,p_k, g} - (rli->gaq->empty() || - ((Slave_job_group *) - dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))-> + /* + the following is a special case of B-free still multi-event group like + { p_1,p_2,...,p_k, g }. + In that case either GAQ is empty (the very first group is being + assigned) or the last assigned group index points at one of + mapped-to-a-worker. + */ + (gaq->empty() || + gaq->get_job_group(rli->gaq->assigned_group_index)-> worker_id != MTS_WORKER_UNDEF))) { ulong gaq_idx; - rli->mts_total_groups++; + rli->mts_groups_assigned++; rli->curr_group_isolated= FALSE; g.master_log_pos= log_pos; @@ -2428,7 +2434,7 @@ Slave_worker *Log_event::get_slave_worke g.group_master_log_name= NULL; // todo: remove g.group_relay_log_name= NULL; g.worker_id= MTS_WORKER_UNDEF; - g.total_seqno= rli->mts_total_groups; + g.total_seqno= rli->mts_groups_assigned; g.checkpoint_log_name= NULL; g.checkpoint_log_pos= 0; g.checkpoint_relay_log_name= NULL; @@ -2437,13 +2443,12 @@ Slave_worker *Log_event::get_slave_worke g.done= 0; // the last occupied GAQ's array index - gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g); + gaq_idx= gaq->assigned_group_index= gaq->en_queue((void *) &g); - DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < rli->gaq->size); - DBUG_ASSERT(((Slave_job_group *) - dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))-> + DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < gaq->size); + DBUG_ASSERT(gaq->get_job_group(rli->gaq->assigned_group_index)-> group_relay_log_name == NULL); - DBUG_ASSERT(rli->gaq->assigned_group_index != MTS_WORKER_UNDEF); // gaq must have room + DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF); // gaq must have room DBUG_ASSERT(rli->last_assigned_worker == NULL); if (is_b_event) @@ -2455,7 +2460,7 @@ Slave_worker *Log_event::get_slave_worke DBUG_ASSERT(rli->curr_group_da.elements == 1); - // mark the current grup as started with B-event + // mark the current group as started with explicit B-event rli->curr_group_seen_begin= TRUE; return ret_worker; @@ -2514,10 +2519,8 @@ Slave_worker *Log_event::get_slave_worke i++; } while (it++); - if ((ptr_g= ((Slave_job_group *) - dynamic_array_ptr(&rli->gaq->Q, - rli->gaq->assigned_group_index)))->worker_id - == MTS_WORKER_UNDEF) + if ((ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index))-> + worker_id == MTS_WORKER_UNDEF) { ptr_g->worker_id= ret_worker->id; @@ -2585,12 +2588,11 @@ Slave_worker *Log_event::get_slave_worke if (ends_group() || !rli->curr_group_seen_begin) { // index of GAQ that this terminal event belongs to - mts_group_cnt= rli->gaq->assigned_group_index; + mts_group_cnt= gaq->assigned_group_index; rli->mts_group_status= Relay_log_info::MTS_END_GROUP; if (rli->curr_group_isolated) mts_do_isolate_group(); - ptr_g= (Slave_job_group *) - dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index); + ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index); DBUG_ASSERT(ret_worker != NULL); @@ -2665,18 +2667,18 @@ Slave_worker *Log_event::get_slave_worke @return 0 as success, otherwise a failure. */ -int Log_event::apply_event(Relay_log_info const *rli) +int Log_event::apply_event(Relay_log_info *rli) { DBUG_ENTER("LOG_EVENT:apply_event"); - Relay_log_info *c_rli= const_cast(rli); // constless alias - bool parallel= FALSE, async_event= FALSE, seq_event= FALSE; - THD *thd= c_rli->info_thd; + bool parallel= FALSE; + enum enum_mts_event_exec_mode actual_exec_mode= EVENT_EXEC_PARALLEL; + THD *thd= rli->info_thd; - worker= c_rli; + worker= rli; if (rli->is_mts_recovery()) { - bool skip= bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index); + bool skip= bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index); if (skip) { @@ -2689,10 +2691,10 @@ int Log_event::apply_event(Relay_log_inf } if (!(parallel= rli->is_parallel_exec()) || - (async_event= - mts_async_exec_by_coordinator(::server_id, - rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) || - (seq_event= mts_sequential_exec())) + ((actual_exec_mode= + mts_execution_mode(::server_id, + rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) + != EVENT_EXEC_PARALLEL)) { if (parallel) { @@ -2704,7 +2706,7 @@ int Log_event::apply_event(Relay_log_inf for terminal events to finish. */ - if (!async_event) + if (actual_exec_mode != EVENT_EXEC_ASYNC) { /* this event does not split the current group but is indeed @@ -2719,14 +2721,14 @@ int Log_event::apply_event(Relay_log_inf wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var. MTS has to stop to suggest restart in the permanent sequential mode. */ - llstr(c_rli->get_event_relay_log_pos(), llbuff); + llstr(rli->get_event_relay_log_pos(), llbuff); rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL, ER(ER_MTS_CANT_PARALLEL), - get_type_str(), c_rli->get_event_relay_log_name(), - c_rli->get_event_relay_log_pos()); + get_type_str(), rli->get_event_relay_log_name(), + rli->get_event_relay_log_pos()); /* Coordinator cant continue, it marks MTS group status accordingly */ - c_rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP; + rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP; goto err; } @@ -2742,32 +2744,37 @@ int Log_event::apply_event(Relay_log_inf #ifndef DBUG_OFF /* all Workers are idle as done through wait_for_workers_to_finish */ - for (uint k= 0; k < c_rli->curr_group_da.elements; k++) + for (uint k= 0; k < rli->curr_group_da.elements; k++) { DBUG_ASSERT(!(*(Slave_worker **) - dynamic_array_ptr(&c_rli->workers, k))->usage_partition); + dynamic_array_ptr(&rli->workers, k))->usage_partition); DBUG_ASSERT(!(*(Slave_worker **) - dynamic_array_ptr(&c_rli->workers, k))->jobs.len); + dynamic_array_ptr(&rli->workers, k))->jobs.len); } #endif } + else + { + DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_ASYNC); + } } DBUG_RETURN(do_apply_event(rli)); } + DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_PARALLEL); DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) || rli->last_assigned_worker); worker= NULL; - c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP; + rli->mts_group_status= Relay_log_info::MTS_IN_GROUP; worker= (Relay_log_info*) - (c_rli->last_assigned_worker= get_slave_worker(c_rli)); + (rli->last_assigned_worker= get_slave_worker(rli)); #ifndef DBUG_OFF - if (c_rli->last_assigned_worker) + if (rli->last_assigned_worker) DBUG_PRINT("mts", ("Assigning job to worker %lu", - c_rli->last_assigned_worker->id)); + rli->last_assigned_worker->id)); #endif err: @@ -6273,7 +6280,6 @@ int Rotate_log_event::do_update_pos(Rela rli->get_group_master_log_name(), (ulong) rli->get_group_master_log_pos())); mysql_mutex_unlock(&rli->data_lock); - rli->flush_info(TRUE); // todo: error branch if (rli->is_parallel_exec()) rli->reset_notified_checkpoint(0, when + (time_t) exec_time); @@ -6640,34 +6646,34 @@ int Xid_log_event::do_apply_event_worker { int error= 0; bool is_trans_repo= w->is_transactional(); + Slave_committed_queue *gaq= w->c_rli->gaq; DBUG_PRINT("mts", ("do_apply group master %s %llu group relay %s %llu event %s %llu.", - w->group_master_log_name, - w->group_master_log_pos, - w->group_relay_log_name, - w->group_relay_log_pos, - w->event_relay_log_name, - w->event_relay_log_pos)); + w->get_group_master_log_name(), + w->get_group_master_log_pos(), + w->get_group_relay_log_name(), + w->get_group_relay_log_pos(), + w->get_event_relay_log_name(), + w->get_event_relay_log_pos())); DBUG_EXECUTE_IF("crash_before_update_pos", DBUG_SUICIDE();); if (is_trans_repo) { ulong gaq_idx= mts_group_cnt; - Slave_job_group *ptr_g= - (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx); + Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx); if ((error= w->commit_positions(this, ptr_g))) goto err; } DBUG_PRINT("mts", ("do_apply group master %s %llu group relay %s %llu event %s %llu.", - w->group_master_log_name, - w->group_master_log_pos, - w->group_relay_log_name, - w->group_relay_log_pos, - w->event_relay_log_name, - w->event_relay_log_pos)); + w->get_group_master_log_name(), + w->get_group_master_log_pos(), + w->get_group_relay_log_name(), + w->get_group_relay_log_pos(), + w->get_event_relay_log_name(), + w->get_event_relay_log_pos())); DBUG_EXECUTE_IF("crash_after_update_pos_before_apply", DBUG_SUICIDE();); @@ -9513,7 +9519,6 @@ int Table_map_log_event::do_apply_event( int error= 0; - // mts-II todo: consider filtering if (rli->info_thd->slave_thread /* filtering is for slave only */ && (!rpl_filter->db_ok(table_list->db) || (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list)))) === modified file 'sql/log_event.h' --- a/sql/log_event.h 2011-06-27 17:31:45 +0000 +++ b/sql/log_event.h 2011-07-01 12:48:25 +0000 @@ -262,7 +262,7 @@ struct sql_ex_info */ #define MAX_DBS_IN_EVENT_MTS 16 /* - When the actual number of db:s exceeds MAX_DBS_IN_EVENT_MTS + When the actual number of databases exceeds MAX_DBS_IN_EVENT_MTS the value of OVER_MAX_DBS_IN_EVENT_MTS is is put into the mts_accessed_dbs status. */ #define OVER_MAX_DBS_IN_EVENT_MTS 254 @@ -356,7 +356,7 @@ struct sql_ex_info #define Q_INVOKER 11 /* - Q_UPDATED_DB_NAMES status variable collects of the updated db:s + Q_UPDATED_DB_NAMES status variable collects of the updated databases total number and their names to be propagated to the slave in order to facilitate the parallel applying of the Query events. */ @@ -540,7 +540,7 @@ struct sql_ex_info MTS: group of events can be marked to force its execution in isolation from any other Workers. Typically that is done for a transaction that contains - a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s. + a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases. The flag is set ON for an event that terminates its group. */ #define LOG_EVENT_MTS_ISOLATE_F 0x200 @@ -1270,21 +1270,44 @@ public: /* Return start of query time or current time */ #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) -public: + +private: + + /* + possible decisions by mts_execution_mode() + */ + enum enum_mts_event_exec_mode + { + /* + Event is run by a Worker. + */ + EVENT_EXEC_PARALLEL, + /* + Event is run by Coordinator. + */ + EVENT_EXEC_ASYNC, + /* + Event is run by Coordinator and requires W:s synchronization. + */ + EVENT_EXEC_SYNC, + /* + Event can't be executed neither by Workers nor Coordinator. + */ + EVENT_EXEC_CAN_NOT + }; /** - MST: to execute some event types serially. + Is called from mts_execution_mode() to - @note There are incompatile combinations such the referred event - is wrapped with BEGIN/COMMIT. Such cases should be identified - by the caller and treates as an error. - - Notice, even though the func returns TRUE, some events - like old LOAD-DATA rooted EXEC_LOAD_EVENT can't run even - in isolated parallel mode and MTS would have to stop. - - @return TRUE if despite permanent parallel execution mode an event - needs applying in a real isolation that is sequentially. + @return TRUE if the event needs applying with synchronization + agaist Workers, otherwise + FALSE + + @note There are incompatile combinations such as referred further events + are wrapped with BEGIN/COMMIT. Such cases should be identified + by the caller and treats correspondingly. + + todo: to mts-support Old master Load-data related events */ bool mts_sequential_exec() { @@ -1304,37 +1327,35 @@ public: } /** - MST: some events have to be applied by Coordinator concurrently with Workers. + MTS Coordinator finds out a way how to execute the current event. - *TODO*: combine with mts_sequential_exec() to have ternary outcome. - - @return TRUE if that's the case, - FALSE otherwise. - */ - bool mts_async_exec_by_coordinator(ulong slave_server_id, bool mts_in_group) - { - return - (get_type_code() == FORMAT_DESCRIPTION_EVENT && - ((server_id == (uint32) ::server_id) || (log_pos == 0))) - || - (get_type_code() == ROTATE_EVENT && - ((server_id == (uint32) ::server_id) || - (log_pos == 0 && mts_in_group))); + Besides the parallelizable case, some events have to be applied by + Coordinator concurrently with Workers and some to require synchronization + with Workers before to apply them. + + @retval EVENT_EXEC_PARALLEL if event is executed by a Worker + @retval EVENT_EXEC_ASYNC if event is executed by Coordinator + @retval EVENT_EXEC_ASYNC if event is executed by Coordinator + with synchronization against the Workers + */ + enum enum_mts_event_exec_mode mts_execution_mode(ulong slave_server_id, + bool mts_in_group) + { + if ((get_type_code() == FORMAT_DESCRIPTION_EVENT && + ((server_id == (uint32) ::server_id) || (log_pos == 0))) + || + (get_type_code() == ROTATE_EVENT && + ((server_id == (uint32) ::server_id) || + (log_pos == 0 /* very first fake Rotate */ + && mts_in_group /* ignored events, R_f at slave restart */)))) + return EVENT_EXEC_ASYNC; + else if (mts_sequential_exec()) + return EVENT_EXEC_SYNC; + else + return EVENT_EXEC_PARALLEL; } /** - Events of a cetain type carry partitioning data such as db names. - */ - bool contains_partition_info(); - - /** - Events of a cetain type start or end a group of events treated - transactionally wrt binlog. - */ - virtual bool starts_group() { return FALSE; } - virtual bool ends_group() { return FALSE; } - - /** @return index in \in [0, M] range to indicate to be assigned worker; M is the max index of the worker pool. @@ -1353,16 +1374,10 @@ public: } /* - returns the number of updated by the event databases. - In other than Query-log-event case that's one. - */ - virtual uint8 mts_number_dbs() { return 1; } - - /* Group of events can be marked to force its execution in isolation from any other Workers. Typically that is done for a transaction that contains - a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s + a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases. */ virtual void mts_do_isolate_group() { @@ -1373,13 +1388,48 @@ public: flags |= LOG_EVENT_MTS_ISOLATE_F; } + +public: + + /** + @return TRUE if events carries partitioning data (database names). + */ + bool contains_partition_info(); + /* - Verifying whether the terminal event of a group is marked to - execute in isolation. + @return the number of updated by the event databases. + + @note In other than Query-log-event case that's one. + */ + virtual uint8 mts_number_dbs() { return 1; } + + /** + @return TRUE if the terminal event of a group is marked to + execute in isolation from other Workers, + FASE otherwise */ bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; } /** + Events of a cetain type can start or end a group of events treated + transactionally wrt binlog. + + Public access is required by implementation of recovery + skip. + + @return TRUE if the event starts a group (transaction) + FASE otherwise + */ + virtual bool starts_group() { return FALSE; } + + /** + @return TRUE if the event starts a group (transaction) + FASE otherwise + */ + virtual bool ends_group() { return FALSE; } + +public: + + /** Apply the event to the database. This function represents the public interface for applying an @@ -1387,7 +1437,7 @@ public: @see do_apply_event */ - int apply_event(Relay_log_info const *rli); + int apply_event(Relay_log_info *rli); /** Update the relay log position. @@ -1963,7 +2013,7 @@ public: */ uint32 master_data_written; /* - number of updated db:s by the query and their names. This info + number of updated databases by the query and their names. This info is requested by both Coordinator and Worker. */ uchar mts_accessed_dbs; @@ -1977,15 +2027,15 @@ public: const char* get_db() { return db; } /** - Returns a list of updated db:s or the default db single item list - in case of over-MAX_DBS_IN_EVENT_MTS actual db:s. + Returns a list of updated databases or the default db single item list + in case of the number of databases exceeds MAX_DBS_IN_EVENT_MTS. */ virtual List* mts_get_dbs(MEM_ROOT *mem_root) { List *res= new (mem_root) List; if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) { - // "" == empty string db name is special to indicate sequential applying + // the empty string db name is special to indicate sequential applying mts_accessed_db_names[0][0]= 0; res->push_back((char*) mts_accessed_db_names[0]); } @@ -2066,10 +2116,9 @@ public: /* !!! Public in this pat !strncasecmp(query, "ROLLBACK", 8); } /** - todo: Parallel support for DDL:s. - DDL queries are logged without BEGIN/COMMIT parentheses - and can be regarded as the starting and the ending events of - its self-group. + Notice, DDL queries are logged without BEGIN/COMMIT parentheses + and identification of such single-query group + occures within logics of @c get_slave_worker(). */ bool starts_group() { return !strncmp(query, "BEGIN", q_len); } virtual bool ends_group() @@ -3161,7 +3210,6 @@ public: const char* get_db() { return db; } #endif #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - /* MTS executes this event sequentially */ virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; } virtual List* mts_get_dbs(MEM_ROOT *mem_root) { @@ -4473,8 +4521,6 @@ private: int append_query_string(const CHARSET_INFO *csinfo, String const *from, String *to); bool sqlcom_can_generate_row_events(const THD *thd); -void handle_rows_query_log_event(Log_event *ev, Relay_log_info *rli); - bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg); uint8 get_checksum_alg(const char* buf, ulong len); extern TYPELIB binlog_checksum_typelib; === modified file 'sql/rpl_reporting.cc' --- a/sql/rpl_reporting.cc 2011-06-29 07:04:19 +0000 +++ b/sql/rpl_reporting.cc 2011-07-01 13:41:35 +0000 @@ -110,7 +110,7 @@ Slave_reporting_capability::report(logle } void -Slave_reporting_capability::do_report(loglevel level, int err_code, +Slave_reporting_capability::va_report(loglevel level, int err_code, const char *msg, va_list args) const { #if !defined(EMBEDDED_LIBRARY) === modified file 'sql/rpl_reporting.h' --- a/sql/rpl_reporting.h 2011-06-20 13:26:35 +0000 +++ b/sql/rpl_reporting.h 2011-07-01 12:48:25 +0000 @@ -59,7 +59,7 @@ public: */ virtual void report(loglevel level, int err_code, const char *msg, ...) const ATTRIBUTE_FORMAT(printf, 4, 5); - void do_report(loglevel level, int err_code, + void va_report(loglevel level, int err_code, const char *msg, va_list v_args) const; /** @@ -130,6 +130,15 @@ public: bool is_error() const { return last_error().number != 0; } virtual ~Slave_reporting_capability()= 0; + +protected: + + virtual void do_report(loglevel level, int err_code, + const char *msg, va_list v_args) const + { + va_report(level, err_code, msg, v_args); + } + private: /** Last error produced by the I/O or SQL thread respectively. === modified file 'sql/rpl_rli.cc' --- a/sql/rpl_rli.cc 2011-07-01 08:45:11 +0000 +++ b/sql/rpl_rli.cc 2011-07-01 13:41:35 +0000 @@ -53,14 +53,13 @@ static PSI_cond_info *worker_conds= NULL PSI_mutex_key *key_mutex_slave_parallel_worker= NULL; PSI_mutex_key key_mutex_slave_parallel_pend_jobs; -PSI_mutex_key key_mutex_mts_temp_tables_lock; PSI_cond_key *key_cond_slave_parallel_worker= NULL; PSI_cond_key key_cond_slave_parallel_pend_jobs; #endif Relay_log_info::Relay_log_info(bool is_slave_recovery) - :Rpl_info("SQL", "SQL-Thread"), checkpoint_thd(0), checkpoint_running(0), + :Rpl_info("SQL", "SQL-Thread"), replicate_same_server_id(::replicate_same_server_id), cur_log_fd(-1), relay_log(&sync_relaylog_period), is_relay_log_recovery(is_slave_recovery), @@ -68,11 +67,11 @@ Relay_log_info::Relay_log_info(bool is_s cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0), group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0), last_master_timestamp(0), slave_skip_counter(0), - abort_pos_wait(0), slave_exec_mode(0), until_condition(UNTIL_NONE), + abort_pos_wait(0), until_condition(UNTIL_NONE), until_log_pos(0), retried_trans(0), tables_to_lock(0), tables_to_lock_count(0), rows_query_ev(NULL), last_event_start_time(0), - this_worker(NULL), slave_parallel_workers(0), + slave_parallel_workers(0), recovery_parallel_workers(0), checkpoint_seqno(0), checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0), mts_recovery_index(0), mts_recovery_group_seen_begin(0), @@ -112,7 +111,7 @@ void Relay_log_info::init_workers(ulong Parallel slave parameters initialization is done regardless whether the feature is or going to be active or not. */ - trans_jobs= stmt_jobs= pending_jobs= wq_size_waits= 0; + mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits= 0; mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0; my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4); @@ -142,12 +141,9 @@ void Relay_log_info::init_workers(ulong } mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock, MY_MUTEX_INIT_FAST); - mysql_mutex_init(key_mutex_mts_temp_tables_lock, &mts_temp_tables_lock, - MY_MUTEX_INIT_FAST); mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL); #else mysql_mutex_init(NULL, &pending_jobs_lock, MY_MUTEX_INIT_FAST); - mysql_mutex_init(NULL, &mts_temp_tables_lock, MY_MUTEX_INIT_FAST); mysql_cond_init(NULL, &pending_jobs_cond, NULL); #endif } @@ -169,7 +165,6 @@ void Relay_log_info::deinit_workers() mysql_mutex_destroy(&pending_jobs_lock); mysql_cond_destroy(&pending_jobs_cond); - mysql_mutex_destroy(&mts_temp_tables_lock); delete_dynamic(&workers); my_atomic_rwlock_destroy(&slave_open_temp_tables_lock); @@ -795,6 +790,18 @@ void Relay_log_info::inc_group_relay_log { group_master_log_pos= log_pos; } + + /* + In MTS mode FD or Rotate event commit their solitary group to + Coordinator's info table. Callers make sure that Workers have been + executed all assignements. + Broadcast to master_pos_wait() waiters should be done after + the table is updated. + */ + DBUG_ASSERT(!is_parallel_exec() || + mts_group_status != Relay_log_info::MTS_IN_GROUP); + flush_info(TRUE); // todo: error branch + mysql_cond_broadcast(&data_cond); if (!skip_lock) mysql_mutex_unlock(&data_lock); @@ -1095,6 +1102,9 @@ void Relay_log_info::stmt_done(my_off_t { if (is_parallel_exec()) { + + DBUG_ASSERT(!mts_is_worker(info_thd)); + /* Format Description events are special events that are handled as a synchronization points. For that reason, the checkpoint routine is @@ -1103,10 +1113,6 @@ void Relay_log_info::stmt_done(my_off_t (void) mts_checkpoint_routine(this, 0, FALSE, FALSE); // TODO: ALFRANIO ERROR } inc_group_relay_log_pos(event_master_log_pos); - - DBUG_ASSERT(this_worker == NULL); - - flush_info(is_transactional() ? TRUE : FALSE); // TODO: ALFRANIO ERROR } } === modified file 'sql/rpl_rli.h' --- a/sql/rpl_rli.h 2011-06-27 17:31:45 +0000 +++ b/sql/rpl_rli.h 2011-07-01 10:16:52 +0000 @@ -108,9 +108,6 @@ class Relay_log_info : public Rpl_info friend class Rpl_info_factory; public: - THD* checkpoint_thd; - bool checkpoint_running; - /** Flags for the state of the replication. */ @@ -209,6 +206,7 @@ public: happen when, for example, the relay log gets rotated because of max_binlog_size. */ +protected: char group_relay_log_name[FN_REFLEN]; ulonglong group_relay_log_pos; char event_relay_log_name[FN_REFLEN]; @@ -230,8 +228,6 @@ public: char group_master_log_name[FN_REFLEN]; volatile my_off_t group_master_log_pos; -// TODO: Restore! -protected: /* When it commits, InnoDB internally stores the master log position it has processed so far; the position to store is the one of the end of the @@ -283,12 +279,6 @@ public: mysql_cond_t log_space_cond; /* - A cache for the global system variable's value. - The value is reset at the beginning of each statement execution. - */ - ulong slave_exec_mode; - - /* Condition and its parameters from START SLAVE UNTIL clause. UNTIL condition is tested with is_until_satisfied() method that is @@ -431,7 +421,7 @@ public: */ time_t last_event_start_time; - /* + /***************************************************************************** WL#5569 MTS legends: @@ -441,41 +431,36 @@ public: */ DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers volatile ulong pending_jobs; - ulong stmt_jobs; // statistics: number of events (statements) processed - ulong trans_jobs; // statistics: number of groups (transactions) processed - ulong wq_size_waits; // number of times C goes to sleep due to WQ:s oversize mysql_mutex_t pending_jobs_lock; mysql_cond_t pending_jobs_cond; ulong mts_slave_worker_queue_len_max; ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s - ulonglong mts_pending_jobs_size_max; // the max forcing to wait by C - bool mts_wq_oversize; // C raises flag to wait some memory's released - Slave_worker *last_assigned_worker; // a hint to partitioning func for some events + ulonglong mts_pending_jobs_size_max; // max of WQ:s size forcing C to wait + bool mts_wq_oversize; // C raises flag to wait some memory's released + Slave_worker *last_assigned_worker;// is set to a Worker at assigning a group + /* + master-binlog ordered queue of Slave_job_group descriptors of groups + that are under processing + */ Slave_committed_queue *gaq; - DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP + /* + Container for references of involved partitions for the current event group + */ + DYNAMIC_ARRAY curr_group_assigned_parts; DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events bool curr_group_seen_begin; // current group started with B-event or not bool curr_group_isolated; // current group requires execution in isolation volatile ulong mts_wq_underrun_w_id; // Id of a Worker whose queue is getting empty - volatile long mts_wq_excess; // excessive overrun counter; when W increments and decrements it it also marks updates its own wq_overrun_set - volatile ulong mts_wq_overrun_cnt; // statistics of all mts_wq_excess increments - ulong mts_wq_no_underrun_cnt; // counts times of C goes to sleep when W:s are filled - ulong mts_wq_overfill_cnt; // counts C waits when a W's queue is full - long mts_worker_underrun_level; // percent of WQ size at which W is considered hungry - ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun - Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL. - ulonglong mts_total_groups; // total event groups distributed in current session - ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers - ulong slave_parallel_workers; // the one slave session time number of workers - ulong recovery_parallel_workers; // number of workers while recovering - /* - A sorted array of Worker current assignements number to provide - approximate view on Workers loading. - The first row of the least occupied Worker is queried at assigning - a new partition. Is updated at checkpoint commit to the main RLI. + excessive overrun counter; when W increments and decrements it it + also marks updates its own wq_overrun_set */ - DYNAMIC_ARRAY least_occupied_workers; + volatile long mts_wq_excess; + long mts_worker_underrun_level; // % of WQ size at which W is considered hungry + ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun + ulong opt_slave_parallel_workers; // cache for ::opt_slave_parallel_workers + ulong slave_parallel_workers; // the one slave session time number of workers + ulong recovery_parallel_workers; // number of workers while recovering uint checkpoint_seqno; // counter of groups executed after the most recent CP uint checkpoint_group; // number of groups in one checkpoint interval (period). MY_BITMAP recovery_groups; // bitmap used during recovery @@ -508,9 +493,9 @@ public: MTS_NOT_IN_GROUP => | {MTS_IN_GROUP => MTS_END_GROUP --+} while (!killed) => MTS_KILLED_GROUP - though MTS_END_GROUP has `->' link to MTS_NOT_IN_GROUP - when Coordinator synchronizes with Workers by demanding them to complete their - assignments. + MTS_END_GROUP has `->' loop breaking link to MTS_NOT_IN_GROUP when + Coordinator synchronizes with Workers by demanding them to + complete their assignments. */ enum { @@ -525,6 +510,24 @@ public: MTS_KILLED_GROUP /* Coordinator gave out to reach MTS_END_GROUP */ } mts_group_status; + /* + MTS statistics: + */ + ulong mts_events_assigned; // number of events (statements) scheduled + ulong mts_groups_assigned; // number of groups (transactions) scheduled + volatile ulong mts_wq_overrun_cnt; // counter of all mts_wq_excess increments + ulong wq_size_waits; // number of times C slept due to WQ:s oversize + ulong mts_wq_no_underrun_cnt;// number of times of C slept when W:s were filled + ulong mts_wq_overfill_cnt; // counter of C waited when a W's queue was full + /* + A sorted array of the Workers' current assignement numbers to provide + approximate view on Workers loading. + The first row of the least occupied Worker is queried at assigning + a new partition. Is updated at checkpoint commit to the main RLI. + */ + DYNAMIC_ARRAY least_occupied_workers; + /* end of MTS statistics */ + /* most of allocation in the coordinator rli is there */ void init_workers(ulong); @@ -576,6 +579,10 @@ public: */ void reset_notified_checkpoint(ulong, time_t); + /* + * End of MTS section ******************************************************/ + + /** Helper function to do after statement completion. === modified file 'sql/rpl_rli_pdb.cc' --- a/sql/rpl_rli_pdb.cc 2011-07-01 08:45:11 +0000 +++ b/sql/rpl_rli_pdb.cc 2011-07-01 13:41:35 +0000 @@ -58,9 +58,9 @@ ulong mts_partition_hash_soft_max= 16; Slave_worker::Slave_worker(const char* type, const char* pfs, Relay_log_info *rli) : Relay_log_info(FALSE), c_rli(rli), - checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0), - inited_group_execed(0), processed_group(0), running_status(NOT_RUNNING), - last_event(NULL) + checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), + inited_group_executed(0), checkpoint_seqno(0), running_status(NOT_RUNNING), + inited_curr_group_exec_parts(0) { checkpoint_relay_log_name[0]= 0; checkpoint_master_log_name[0]= 0; @@ -68,15 +68,72 @@ Slave_worker::Slave_worker(const char* t Slave_worker::~Slave_worker() { - delete_dynamic(&curr_group_exec_parts); + if (inited_curr_group_exec_parts) + delete_dynamic(&curr_group_exec_parts); - if (inited_group_execed) + if (inited_group_executed) { - bitmap_free(&group_execed); + bitmap_free(&group_executed); bitmap_free(&group_shifted); } } +/** + Method is executed by Coordinator at Worker startup time to initialize + members parly with values supplied by Coordinator through rli. + + @param rli Coordinator's Relay_log_info pointer + @param i identifier of the Worker + + @return 0 success + non-zero failure +*/ +int Slave_worker::init_worker(Relay_log_info * rli, ulong i) +{ + uint k; + Slave_job_item empty= {NULL}; + + c_rli= rli; + if (init_info()) + return 1; + + id= i; + curr_group_exec_parts.elements= 0; + relay_log_change_notified= FALSE; // the 1st group to contain relaylog name + checkpoint_notified= FALSE; + bitmap_shifted= 0; + workers= c_rli->workers; // shallow copying is sufficient + wq_size_waits= groups_done= events_done= curr_jobs= 0; + usage_partition= 0; + last_group_done_index= c_rli->gaq->size; // out of range + + jobs.avail= 0; + jobs.len= 0; + jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor + jobs.waited_overfill= 0; + jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max; + curr_group_seen_begin= FALSE; + + my_init_dynamic_array(&jobs.Q, sizeof(Slave_job_item), jobs.size, 0); + for (k= 0; k < jobs.size; k++) + insert_dynamic(&jobs.Q, (uchar*) &empty); + + DBUG_ASSERT(jobs.Q.elements == jobs.size); + + wq_overrun_set= FALSE; + +#ifdef HAVE_PSI_INTERFACE + mysql_mutex_init(key_mutex_slave_parallel_worker[i], &jobs_lock, + MY_MUTEX_INIT_FAST); + mysql_cond_init(key_cond_slave_parallel_worker[i], &jobs_cond, NULL); +#else + mysql_mutex_init(NULL, &jobs_lock, MY_MUTEX_INIT_FAST); + mysql_cond_init(NULL, &jobs_cond, NULL); +#endif + + return 0; +} + int Slave_worker::init_info() { int necessary_to_configure= 0; @@ -88,8 +145,9 @@ int Slave_worker::init_info() my_init_dynamic_array(&curr_group_exec_parts, sizeof(db_worker_hash_entry*), SLAVE_INIT_DBS_IN_GROUP, 1); - - if (bitmap_init(&group_execed, NULL, + if (curr_group_exec_parts.max_element != 0) + inited_curr_group_exec_parts= 1; + if (bitmap_init(&group_executed, NULL, c_rli->checkpoint_group, FALSE)) goto err; @@ -97,7 +155,7 @@ int Slave_worker::init_info() c_rli->checkpoint_group, FALSE)) goto err; - inited_group_execed= 1; + inited_group_executed= 1; /* The init_info() is used to either create or read information @@ -171,7 +229,7 @@ bool Slave_worker::read_info(Rpl_info_ha ulong temp_checkpoint_master_log_pos= 0; ulong temp_checkpoint_seqno= 0; ulong nbytes= 0; - uchar *buffer= (uchar *) group_execed.bitmap; + uchar *buffer= (uchar *) group_executed.bitmap; if (from->prepare_info_for_read(nidx)) DBUG_RETURN(TRUE); @@ -216,8 +274,8 @@ bool Slave_worker::write_info(Rpl_info_h { DBUG_ENTER("Master_info::write_info"); - ulong nbytes= (ulong) no_bytes_in_map(&group_execed); - uchar *buffer= (uchar*) group_execed.bitmap; + ulong nbytes= (ulong) no_bytes_in_map(&group_executed); + uchar *buffer= (uchar*) group_executed.bitmap; if (to->prepare_info_for_write(nidx) || to->set_info(group_relay_log_name) || @@ -261,12 +319,12 @@ bool Slave_worker::commit_positions(Log_ my_free(ptr_g->checkpoint_relay_log_name); ptr_g->checkpoint_relay_log_name= NULL; - bitmap_copy(&group_shifted, &group_execed); - bitmap_clear_all(&group_execed); + bitmap_copy(&group_shifted, &group_executed); + bitmap_clear_all(&group_executed); for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++) { if (bitmap_is_set(&group_shifted, pos)) - bitmap_set_bit(&group_execed, pos - ptr_g->shifted); + bitmap_set_bit(&group_executed, pos - ptr_g->shifted); } } /* @@ -282,7 +340,7 @@ bool Slave_worker::commit_positions(Log_ DBUG_ASSERT(ptr_g->checkpoint_seqno <= (c_rli->checkpoint_group - 1)); - bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno); + bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno); checkpoint_seqno= ptr_g->checkpoint_seqno; group_relay_log_pos= ev->future_event_relay_log_pos; group_master_log_pos= ev->log_pos; @@ -505,7 +563,8 @@ static void move_temp_tables_to_entry(TH CGAP .= D - and a possible D's Worker id is searched in Assigne Partition Hash + here .= is concatenate operation, + and a possible D's Worker id is searched in Assigned Partition Hash (APH) that collects tuples (P, W_id, U, mutex, cond). In case not found, @@ -539,6 +598,8 @@ static void move_temp_tables_to_entry(TH @param ptr_entry reference to a pointer to the resulted entry in the Assigne Partition Hash where the entry's pointer is stored at return. + @param need_temp_tables + if FALSE migration of temporary tables not needed @param last_worker caller opts for this Worker, it must be rli->last_assigned_worker if one is determined. @@ -613,7 +674,8 @@ Slave_worker *map_db_to_worker(const cha */ if (!(db= (char *) my_malloc((size_t) dblength + 1, MYF(0)))) goto err; - if (!(entry= (db_worker_hash_entry *) my_malloc(sizeof(db_worker_hash_entry), MYF(0)))) + if (!(entry= (db_worker_hash_entry *) + my_malloc(sizeof(db_worker_hash_entry), MYF(0)))) { my_free(db); goto err; @@ -801,9 +863,9 @@ void Slave_worker::slave_worker_ends_gro if (!error) { + Slave_committed_queue *gaq= c_rli->gaq; ulong gaq_idx= ev->mts_group_cnt; - Slave_job_group *ptr_g= - (Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx); + Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx); // first ever group must have relay log name DBUG_ASSERT(last_group_done_index != c_rli->gaq->size || @@ -824,6 +886,7 @@ void Slave_worker::slave_worker_ends_gro ptr_g->done= 1; // GAQ index is available to C now last_group_done_index= gaq_idx; + groups_done++; } /* @@ -903,67 +966,79 @@ void Slave_worker::slave_worker_ends_gro /** - Class circular_buffer_queue + Class circular_buffer_queue. + + Content of the being dequeued item is copied to the arg-pointer + location. + + @return the queue's array index that the de-queued item + located at, or an error as an int outside the legacy + [0, size) (value `size' is excluded) range. */ ulong circular_buffer_queue::de_queue(uchar *val) { ulong ret; - if (enter == size) + if (entry == size) { DBUG_ASSERT(len == 0); return (ulong) -1; } - ret= enter; - get_dynamic(&Q, val, enter); + ret= entry; + get_dynamic(&Q, val, entry); len--; // pre boundary cond if (avail == size) - avail= enter; - enter= (enter + 1) % size; + avail= entry; + entry= (entry + 1) % size; // post boundary cond - if (avail == enter) - enter= size; + if (avail == entry) + entry= size; - DBUG_ASSERT(enter == size || - (len == (avail >= enter)? (avail - enter) : - (size + avail - enter))); - DBUG_ASSERT(avail != enter); + DBUG_ASSERT(entry == size || + (len == (avail >= entry)? (avail - entry) : + (size + avail - entry))); + DBUG_ASSERT(avail != entry); return ret; } /** - removing an item from the tail side + Similar to de_queue() but removing an item from the tail side. + + return the queue's array index that the de-queued item + located at, or an error. */ ulong circular_buffer_queue::de_tail(uchar *val) { - if (enter == size) + if (entry == size) { DBUG_ASSERT(len == 0); return (ulong) -1; } - avail= (enter + len - 1) % size; + avail= (entry + len - 1) % size; get_dynamic(&Q, val, avail); len--; // post boundary cond - if (avail == enter) - enter= size; + if (avail == entry) + entry= size; - DBUG_ASSERT(enter == size || - (len == (avail >= enter)? (avail - enter) : - (size + avail - enter))); - DBUG_ASSERT(avail != enter); + DBUG_ASSERT(entry == size || + (len == (avail >= entry)? (avail - entry) : + (size + avail - entry))); + DBUG_ASSERT(avail != entry); return avail; } + /** - @return the used index at success or -1 when queue is full + @return the index where the arg item has been located + or an error. */ ulong circular_buffer_queue::en_queue(void *item) { @@ -981,20 +1056,20 @@ ulong circular_buffer_queue::en_queue(vo // pre-boundary cond - if (enter == size) - enter= avail; + if (entry == size) + entry= avail; avail= (avail + 1) % size; len++; // post-boundary cond - if (avail == enter) + if (avail == entry) avail= size; - DBUG_ASSERT(avail == enter || - len == (avail >= enter) ? - (avail - enter) : (size + avail - enter)); - DBUG_ASSERT(avail != enter); + DBUG_ASSERT(avail == entry || + len == (avail >= entry) ? + (avail - entry) : (size + avail - entry)); + DBUG_ASSERT(avail != entry); return ret; } @@ -1002,13 +1077,13 @@ ulong circular_buffer_queue::en_queue(vo void* circular_buffer_queue::head_queue() { uchar *ret= NULL; - if (enter == size) + if (entry == size) { DBUG_ASSERT(len == 0); } else { - get_dynamic(&Q, (uchar*) ret, enter); + get_dynamic(&Q, (uchar*) ret, entry); } return (void*) ret; } @@ -1027,15 +1102,15 @@ void* circular_buffer_queue::head_queue( bool circular_buffer_queue::gt(ulong i, ulong k) { DBUG_ASSERT(i < size && k < size); - DBUG_ASSERT(avail != enter); + DBUG_ASSERT(avail != entry); - if (i >= enter) - if (k >= enter) + if (i >= entry) + if (k >= entry) return i > k; else return FALSE; else - if (k >= enter) + if (k >= entry) return TRUE; else return i > k; @@ -1046,7 +1121,7 @@ bool Slave_committed_queue::count_done(R { ulong i, cnt= 0; - for (i= enter; i != avail && !empty(); i= (i + 1) % size) + for (i= entry; i != avail && !empty(); i= (i + 1) % size) { Slave_job_group *ptr_g; @@ -1090,7 +1165,7 @@ ulong Slave_committed_queue::move_queue_ { ulong i, cnt= 0; - for (i= enter; i != avail && !empty();) + for (i= entry; i != avail && !empty();) { Slave_worker *w_i; Slave_job_group *ptr_g, g; @@ -1098,7 +1173,8 @@ ulong Slave_committed_queue::move_queue_ ulong ind; #ifndef DBUG_OFF - if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && cnt == mts_checkpoint_period) + if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && + cnt == mts_checkpoint_period) return cnt; #endif @@ -1175,14 +1251,14 @@ ulong Slave_committed_queue::move_queue_ } /** - Method should be executed at slave system shutdown to + Method should be executed at slave system stop to cleanup dynamically allocated items that remained as unprocessed by shutdown time. */ void Slave_committed_queue::free_dynamic_items() { ulong i; - for (i= enter; i != avail && !empty(); i= (i + 1) % size) + for (i= entry; i != avail && !empty(); i= (i + 1) % size) { Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i); if (ptr_g->group_relay_log_name) @@ -1200,18 +1276,11 @@ void Slave_committed_queue::free_dynamic } } -void Slave_worker::do_report(loglevel level, int err_code, const char *msg, va_list vargs) const -{ - c_rli->do_report(level, err_code, msg, vargs); -} -void Slave_worker::report(loglevel level, int err_code, const char *msg, ...) const +void Slave_worker::do_report(loglevel level, int err_code, const char *msg, + va_list args) const { - va_list vargs; - va_start(vargs, msg); - - do_report(level, err_code, msg, vargs); - va_end(vargs); + c_rli->va_report(level, err_code, msg, args); } /** @@ -1316,18 +1385,18 @@ static int en_queue(Slave_jobs_queue *jo set_dynamic(&jobs->Q, (uchar*) item, jobs->avail); // pre-boundary cond - if (jobs->enter == jobs->size) - jobs->enter= jobs->avail; + if (jobs->entry == jobs->size) + jobs->entry= jobs->avail; jobs->avail= (jobs->avail + 1) % jobs->size; jobs->len++; // post-boundary cond - if (jobs->avail == jobs->enter) + if (jobs->avail == jobs->entry) jobs->avail= jobs->size; - DBUG_ASSERT(jobs->avail == jobs->enter || - jobs->len == (jobs->avail >= jobs->enter) ? - (jobs->avail - jobs->enter) : (jobs->size + jobs->avail - jobs->enter)); + DBUG_ASSERT(jobs->avail == jobs->entry || + jobs->len == (jobs->avail >= jobs->entry) ? + (jobs->avail - jobs->entry) : (jobs->size + jobs->avail - jobs->entry)); return jobs->avail; } @@ -1336,13 +1405,13 @@ static int en_queue(Slave_jobs_queue *jo */ static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) { - if (jobs->enter == jobs->size) + if (jobs->entry == jobs->size) { DBUG_ASSERT(jobs->len == 0); ret->data= NULL; // todo: move to caller return NULL; } - get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter); + get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry); DBUG_ASSERT(ret->data); // todo: move to caller @@ -1355,26 +1424,27 @@ static void * head_queue(Slave_jobs_queu */ Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) { - if (jobs->enter == jobs->size) + if (jobs->entry == jobs->size) { DBUG_ASSERT(jobs->len == 0); return NULL; } - get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter); + get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry); jobs->len--; // pre boundary cond if (jobs->avail == jobs->size) - jobs->avail= jobs->enter; - jobs->enter= (jobs->enter + 1) % jobs->size; + jobs->avail= jobs->entry; + jobs->entry= (jobs->entry + 1) % jobs->size; // post boundary cond - if (jobs->avail == jobs->enter) - jobs->enter= jobs->size; + if (jobs->avail == jobs->entry) + jobs->entry= jobs->size; - DBUG_ASSERT(jobs->enter == jobs->size || - (jobs->len == (jobs->avail >= jobs->enter)? (jobs->avail - jobs->enter) : - (jobs->size + jobs->avail - jobs->enter))); + DBUG_ASSERT(jobs->entry == jobs->size || + (jobs->len == (jobs->avail >= jobs->entry) ? + (jobs->avail - jobs->entry) : + (jobs->size + jobs->avail - jobs->entry))); return ret; } @@ -1423,7 +1493,7 @@ void append_item_to_jobs(slave_job_item } rli->pending_jobs++; rli->mts_pending_jobs_size= new_pend_size; - rli->stmt_jobs++; + rli->mts_events_assigned++; mysql_mutex_unlock(&rli->pending_jobs_lock); @@ -1621,13 +1691,12 @@ int slave_worker_exec_job(Slave_worker * worker->slave_worker_ends_group(ev, error); /* last done sets post exec */ #ifndef DBUG_OFF - worker->processed_group++; DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group" - " %u processed %u debug %d\n", worker->id, mts_checkpoint_group, - worker->processed_group, + " %u processed %lu debug %d\n", worker->id, mts_checkpoint_group, + worker->groups_done, DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0))); if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && - mts_checkpoint_group == worker->processed_group) + mts_checkpoint_group == worker->groups_done) { DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id)); while (true) my_sleep(6000000); @@ -1693,7 +1762,7 @@ int slave_worker_exec_job(Slave_worker * mysql_mutex_unlock(&rli->pending_jobs_lock); - worker->stmt_jobs++; + worker->events_done++; err: if (error) @@ -1705,13 +1774,10 @@ err: worker->slave_worker_ends_group(ev, error); } - // rows_query_log_event is deleted as a part of the statement cleanup - - // todo: sync_slave_with_master fails when my_sleep(1000) is put here + // todo: similate delay in delete if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) { - worker->last_event= ev; delete ev; } === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h 2011-06-27 17:31:45 +0000 +++ b/sql/rpl_rli_pdb.h 2011-07-01 12:48:25 +0000 @@ -7,7 +7,7 @@ #include #include -/* APH entry */ +/* Assigned Partition Hash (APH) entry */ typedef struct st_db_worker_hash_entry { uint db_len; @@ -18,12 +18,13 @@ typedef struct st_db_worker_hash_entry The list of temp tables belonging to @ db database is attached to an assigned @c worker to become its thd->temporary_tables. The list is updated with every ddl incl CREATE, DROP. - It is removed from the entry and merged to the coordinator's thd->temporary_tables - in case of events: slave stops, the db-to-worker hash oversize. + It is removed from the entry and merged to the coordinator's + thd->temporary_tables in case of events: slave stops, APH oversize. */ TABLE* volatile temporary_tables; - /* todo: relax concurrency after making APH mutex/cond pair has worked + /* todo: relax concurrency to mimic record-level locking. + That is to augmenting the entry with mutex/cond pair pthread_mutex_t pthread_cond_t timestamp updated_at; */ @@ -64,14 +65,14 @@ public: DYNAMIC_ARRAY Q; ulong size; // the Size of the queue in terms of element ulong avail; // first Available index to append at (next to tail) - ulong enter; // the head index + ulong entry; // the head index or the entry point to the queue. volatile ulong len; // actual length bool inited_queue; circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) : - size(max), avail(0), enter(max), len(0), inited_queue(FALSE) + size(max), avail(0), entry(max), len(0), inited_queue(FALSE) { - DBUG_ASSERT(size < ULONG_MAX); + DBUG_ASSERT(size < (ulong) -1); if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc)) inited_queue= TRUE; } @@ -99,9 +100,7 @@ public: /** return the index where the arg item locates or an error encoded as a value in beyond of the legacy range - [0, circular_buffer_max_index]. - - Todo: define the range. + [0, size) (value `size' is excluded). */ ulong en_queue(void *item); /** @@ -111,8 +110,8 @@ public: bool gt(ulong i, ulong k); // comparision of ordering of two entities /* index is within the valid range */ bool in(ulong k) { return !empty() && - (enter > avail ? (k >= enter || k < avail) : (k >= enter && k < avail)); } - bool empty() { return enter == size; } + (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); } + bool empty() { return entry == size; } bool full() { return avail == size; } }; @@ -132,13 +131,12 @@ typedef struct st_slave_job_group */ char *group_relay_log_name; // The value is last seen relay-log my_off_t group_relay_log_pos; // filled by W - ulong worker_id; Slave_worker *worker; ulonglong total_seqno; my_off_t master_log_pos; // B-event log_pos - /* checkpoint coord are reset by CP and rotate:s */ + /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */ uint checkpoint_seqno; my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint char* checkpoint_log_name; @@ -210,6 +208,15 @@ public: ulong move_queue_head(DYNAMIC_ARRAY *ws); /* Method is for slave shutdown time cleanup */ void free_dynamic_items(); + /* + returns a pointer to Slave_job_group struct instance as indexed by arg + in the circular buffer dyn-array + */ + Slave_job_group* get_job_group(ulong ind) + { + return (Slave_job_group*) dynamic_array_ptr(&Q, ind); + } + }; class Slave_jobs_queue : public circular_buffer_queue @@ -228,50 +235,43 @@ public: Relay_log_info *rli); virtual ~Slave_worker(); - mysql_mutex_t jobs_lock; - mysql_cond_t jobs_cond; - Slave_jobs_queue jobs; - - Relay_log_info *c_rli; - - DYNAMIC_ARRAY curr_group_exec_parts; // CGEP - - bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec - List data_in_use; // events are still in use by SQL thread - ulong id; - TABLE *current_table; - - // rbr - // RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */ - // uint tables_to_lock_count; /* RBR: Count of tables to lock */ - // table_mapping m_table_map; /* RBR: Mapping table-id to table */ - - // statictics + Slave_jobs_queue jobs; // assignment queue containing events to execute + mysql_mutex_t jobs_lock; // mutex for the jobs queue + mysql_cond_t jobs_cond; // condition variable for the jobs queue + Relay_log_info *c_rli; // pointer to Coordinator's rli + DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions + bool curr_group_seen_begin; // is set to TRUE with explicit B-event + ulong id; // numberic identifier of the Worker /* - @c last_group_done_index is for statistics - to mean the index in GAQ of the last processed group. + Worker runtime statictics */ - volatile ulong last_group_done_index; // it's index in GAQ - ulong wq_empty_waits; // to gather statistics how many times got idle - ulong stmt_jobs; // how many jobs per stmt - ulong trans_jobs; // how many jobs per trns - volatile int curr_jobs; // the current assignments - long usage_partition; // number of different partitions handled by this worker + // the index in GAQ of the last processed group by this Worker + volatile ulong last_group_done_index; + ulong wq_empty_waits; // how many times got idle + ulong events_done; // how many events (statements) processed + ulong groups_done; // how many groups (transactions) processed + volatile int curr_jobs; // number of active assignments + // number of partitions allocated to the worker at point in time + long usage_partition; + volatile bool relay_log_change_notified; // Coord sets and resets, W can read volatile bool checkpoint_notified; // Coord sets and resets, W can read - ulong bitmap_shifted; // shift the last bitmap at receiving new CP - bool wq_overrun_set; // W monitors its queue usage to incr/decr rli->mts_wqs_overrun + ulong bitmap_shifted; // shift the last bitmap at receiving new CP + bool wq_overrun_set; // W marks inself as incrementer of rli->mts_wq_excess + + /* + Coordinatates of the last CheckPoint (CP) this Worker has + acknowledged; part of is persisent data + */ char checkpoint_relay_log_name[FN_REFLEN]; ulonglong checkpoint_relay_log_pos; - char checkpoint_master_log_name[FN_REFLEN]; ulonglong checkpoint_master_log_pos; - ulong checkpoint_seqno; - MY_BITMAP group_execed; - MY_BITMAP group_shifted; - bool inited_group_execed; - uint processed_group; + MY_BITMAP group_executed; // bitmap describes groups executed after last CP + MY_BITMAP group_shifted; // temporary bitmap to compute group_executed + bool inited_group_executed; + ulong checkpoint_seqno; // the most significant ON bit in group_executed enum en_running_state { NOT_RUNNING= 0, @@ -279,26 +279,24 @@ public: KILLED }; en_running_state volatile running_status; - Log_event *last_event; + bool inited_curr_group_exec_parts; + int init_worker(Relay_log_info*, ulong); int init_info(); void end_info(); int flush_info(bool force= FALSE); - size_t get_number_worker_fields(); - - void slave_worker_ends_group(Log_event*, int); // CGEP walk through to upd APH - + void slave_worker_ends_group(Log_event*, int); bool commit_positions(Log_event *evt, Slave_job_group *ptr_g); - void report(loglevel level, int err_code, const char *msg, ...) const - ATTRIBUTE_FORMAT(printf, 4, 5); - void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const; +protected: + + virtual void do_report(loglevel level, int err_code, + const char *msg, va_list v_args) const; private: bool read_info(Rpl_info_handler *from); bool write_info(Rpl_info_handler *to); - Slave_worker& operator=(const Slave_worker& info); Slave_worker(const Slave_worker& info); }; === modified file 'sql/rpl_slave.cc' --- a/sql/rpl_slave.cc 2011-07-01 08:45:11 +0000 +++ b/sql/rpl_slave.cc 2011-07-01 13:41:35 +0000 @@ -84,24 +84,22 @@ const char *relay_log_basename= 0; of Relay_log_info::gaq (see @c slave_start_workers()). It can be set to any value in [1, ULONG_MAX - 1] range. */ -ulong mts_slave_worker_queue_len_max= 32768; +const ulong mts_slave_worker_queue_len_max= 32768; /* MTS load-ballancing parameter. Time in microsecs to sleep by MTS Coordinator to avoid the Worker queues room overrun. */ -ulong mts_coordinator_basic_nap= 5; +const ulong mts_coordinator_basic_nap= 5; /* MTS load-ballancing parameter. Percent of Worker queue size at which Worker is considered to become hungry. */ -ulong mts_worker_underrun_level= 10; - - - +const ulong mts_worker_underrun_level= 10; +Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret); void append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli); @@ -2820,10 +2818,11 @@ int apply_event_and_update_pos(Log_event // all events except BEGIN-query must be marked with a non-NULL Worker DBUG_ASSERT(((Slave_worker*) ev->worker) == rli->last_assigned_worker); - DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id)); + DBUG_PRINT("Log_event::apply_event:", + ("-> job item data %p to W_%lu", job_item->data, w->id)); // Reset mts in-group state - if (ev->ends_group() || !rli->curr_group_seen_begin) + if (rli->mts_group_status == Relay_log_info::MTS_END_GROUP) { // CGAP cleanup for (uint i= rli->curr_group_assigned_parts.elements; i > 0; i--) @@ -2943,12 +2942,7 @@ int apply_event_and_update_pos(Log_event else { DBUG_ASSERT(*ptr_ev == ev || rli->is_parallel_exec()); - /* - event_relay_log_pos is an anchor to possible reading restart. - It may become lt than group_* value. - However event_relay_log_pos does not affect group_relay_log_pos - othen that through the sequentially executed events or via checkpoint. - */ + rli->inc_event_relay_log_pos(); } @@ -3074,15 +3068,14 @@ static int exec_relay_log_event(THD* thd /* This tests if the position of the beginning of the current event hits the UNTIL barrier. - MTS: since master,relay-group coordinates change per checkpoint - at the end of the checkpoint interval UNTIL can be left far behind. + MTS: since the master and the relay-group coordinates change + asynchronously logics of rli->is_until_satisfied() can't apply. Hence, UNTIL forces the sequential applying. */ if (rli->until_condition != Relay_log_info::UNTIL_NONE && rli->is_until_satisfied(thd, ev)) { char buf[22]; - sql_print_information("Slave SQL thread stopped because it reached its" " UNTIL position %s", llstr(rli->until_pos(), buf)); /* @@ -3137,7 +3130,7 @@ static int exec_relay_log_event(THD* thd ev= NULL; } } - + /* update_log_pos failed: this should not happen, so we don't retry. @@ -3754,11 +3747,8 @@ int check_temp_dir(char* tmp_file) DBUG_RETURN(0); } -Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret); - /* - Worker thread for the parallel execution of the replication events - MHS_todo: consider how to handle error + Worker thread for the parallel execution of the replication events. */ pthread_handler_t handle_slave_worker(void *arg) { @@ -3848,8 +3838,9 @@ pthread_handler_t handle_slave_worker(vo sql_print_information("Worker %lu statistics: " "events processed = %lu " "hungry waits = %lu " - "priv queue overfills = %llu " - ,w->id, w->stmt_jobs, w->wq_size_waits, w->jobs.waited_overfill); + "priv queue overfills = %llu ", + w->id, w->events_done, w->wq_size_waits, + w->jobs.waited_overfill); mysql_cond_signal(&w->jobs_cond); // famous last goodbye mysql_mutex_unlock(&w->jobs_lock); @@ -3935,7 +3926,8 @@ bool mts_recovery_groups(Relay_log_info Slave_worker *worker= Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli); worker->init_info(); - LOG_POS_COORD w_last= {worker->group_master_log_name, worker->group_master_log_pos}; + LOG_POS_COORD w_last= { const_cast(worker->get_group_master_log_name()), + worker->get_group_master_log_pos() }; if (mts_event_coord_cmp(&w_last, &cp) > 0) { /* @@ -3990,16 +3982,17 @@ bool mts_recovery_groups(Relay_log_info { Slave_worker *w= ((Slave_job_group *) dynamic_array_ptr(&above_lwm_jobs, it_job))->worker; - LOG_POS_COORD w_last= { w->group_master_log_name, w->group_master_log_pos }; + LOG_POS_COORD w_last= { const_cast(w->get_group_master_log_name()), + w->get_group_master_log_pos() }; sql_print_information("Recoverying relay log info based on Worker-Id %lu, " "group_relay_log_name %s, group_relay_log_pos %lu " "group_master_log_name %s, group_master_log_pos %lu", w->id, - w->group_relay_log_name, - (ulong) w->group_relay_log_pos, - w->group_master_log_name, - (ulong) w->group_master_log_pos); + w->get_group_relay_log_name(), + (ulong) w->get_group_relay_log_pos(), + w->get_group_master_log_name(), + (ulong) w->get_group_master_log_pos()); recovery_group_cnt= 0; not_reached_commit= true; @@ -4052,14 +4045,14 @@ bool mts_recovery_groups(Relay_log_info sql_print_information("Group Recoverying relay log info " "group_master_log_name %s, event_master_log_pos %llu.", - rli->group_master_log_name, + rli->get_group_master_log_name(), ev->log_pos); if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0) { #ifndef DBUG_OFF for (uint i= 0; i <= w->checkpoint_seqno; i++) { - if (bitmap_is_set(&w->group_execed, i)) + if (bitmap_is_set(&w->group_executed, i)) DBUG_PRINT("mts", ("Bit %u is set.", i)); else DBUG_PRINT("mts", ("Bit %u is not set.", i)); @@ -4073,7 +4066,7 @@ bool mts_recovery_groups(Relay_log_info for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt, j= 0; i <= w->checkpoint_seqno; i++, j++) { - if (bitmap_is_set(&w->group_execed, i)) + if (bitmap_is_set(&w->group_executed, i)) { DBUG_PRINT("mts", ("Setting bit %u.", j)); bitmap_fast_test_and_set(groups, j); @@ -4119,8 +4112,8 @@ err: } /** - Processing rli->gaq to find out the low-water-mark coordinates - stored into the cental recovery table. + Processing rli->gaq to find out the low-water-mark (lwm) coordinates + which is stored into the cental recovery table. @param rli pointer to Relay-log-info of Coordinator @param period period of processing GAQ, normally derived from @@ -4164,12 +4157,6 @@ bool mts_checkpoint_routine(Relay_log_in DBUG_RETURN(FALSE); } - /* - This checks how many consecutive jobs where processed. - If this value is different than zero the checkpoint - routine can proceed. Otherwise, there is nothing to be - done. - */ do { cnt= rli->gaq->move_queue_head(&rli->workers); @@ -4181,6 +4168,12 @@ bool mts_checkpoint_routine(Relay_log_in } while (cnt == 0 && (rli->gaq->full() || force) && !DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && (my_sleep(rli->mts_coordinator_basic_nap), 1)); + /* + This checks how many consecutive jobs where processed. + If this value is different than zero the checkpoint + routine can proceed. Otherwise, there is nothing to be + done. + */ if (cnt == 0) goto end; @@ -4200,16 +4193,11 @@ bool mts_checkpoint_routine(Relay_log_in mysql_mutex_lock(&rli->data_lock); /* - Coordinator::commit_positions() { + "Coordinator::commit_positions" { - rli->gaq->lwm contains all but rli->group_master_log_name - - group_master_log_name is updated only by Coordinator and it can't change - within checkpoint interval because Coordinator flushes the updated value - at once. - Note, unlike group_master_log_name, event_relay_log_pos is updated solely - within Coordinator read loop context. Hence, it's possible at times - event_rlp > group_rlp. + rli->gaq->lwm has been updated in move_queue_head() and + to contain all but rli->group_master_log_name which + is altered solely by Coordinator at special checkpoints. */ rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos); rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos); @@ -4244,6 +4232,8 @@ bool mts_checkpoint_routine(Relay_log_in */ rli->reset_notified_checkpoint(cnt, rli->gaq->lwm.ts); + /* end-of "Coordinator::"commit_positions" */ + end: #ifndef DBUG_OFF if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)) @@ -4255,17 +4245,18 @@ end: } /** - A single Worker thread is forked out. + Instantiation of a Slave_worker and forking out a single Worker thread. + @param rli Coordinator's Relay_log_info pointer + @param i identifier of the Worker + @return 0 suppress or 1 if fails */ int slave_start_single_worker(Relay_log_info *rli, ulong i) { int error= 0; - uint k; pthread_t th; Slave_worker *w= NULL; - Slave_job_item empty= {NULL}; if (!(w= Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli))) @@ -4275,52 +4266,14 @@ int slave_start_single_worker(Relay_log_ goto err; } - w->c_rli= rli; - w->tables_to_lock= NULL; - w->tables_to_lock_count= 0; - - if (w->init_info()) + if (w->init_worker(rli, i)) { sql_print_error("Failed during slave worker thread create"); error= 1; goto err; } - - w->curr_group_exec_parts.elements= 0; - w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name - w->checkpoint_notified= FALSE; - w->bitmap_shifted= 0; - w->workers= rli->workers; // shallow copying is sufficient - w->this_worker= w; - w->wq_size_waits= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0; - w->id= i; - w->current_table= NULL; - w->usage_partition= 0; - w->last_group_done_index= rli->gaq->size; // out of range - - w->jobs.avail= 0; - w->jobs.len= 0; - w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor - w->jobs.waited_overfill= 0; - w->jobs.enter= w->jobs.size= rli->mts_slave_worker_queue_len_max; - my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.size, 0); - for (k= 0; k < w->jobs.size; k++) - insert_dynamic(&w->jobs.Q, (uchar*) &empty); - - DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size); - - w->wq_overrun_set= FALSE; set_dynamic(&rli->workers, (uchar*) &w, i); -#ifdef HAVE_PSI_INTERFACE - mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock, - MY_MUTEX_INIT_FAST); - mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL); -#else - mysql_mutex_init(NULL, &w->jobs_lock, MY_MUTEX_INIT_FAST); - mysql_cond_init(NULL, &w->jobs_cond, NULL); -#endif - w->curr_group_seen_begin= FALSE; if (pthread_create(&th, &connection_attrib, handle_slave_worker, (void*) w)) { @@ -4340,7 +4293,14 @@ err: return error; } +/** + Initialization of the central rli members for Coordinator's role, + communication channels such as Assigned Partition Hash (APH), + and starting the Worker pool. + @return 0 success + non-zero as failure +*/ int slave_start_workers(Relay_log_info *rli, ulong n) { uint i; @@ -4354,7 +4314,8 @@ int slave_start_workers(Relay_log_info * rli->init_workers(n); // CGAP dynarray holds id:s of partitions of the Current being executed Group - my_init_dynamic_array(&rli->curr_group_assigned_parts, sizeof(db_worker_hash_entry*), + my_init_dynamic_array(&rli->curr_group_assigned_parts, + sizeof(db_worker_hash_entry*), SLAVE_INIT_DBS_IN_GROUP, 1); rli->last_assigned_worker= NULL; // associated with curr_group_assigned my_init_dynamic_array(&rli->curr_group_da, sizeof(Log_event*), 8, 2); @@ -4364,13 +4325,13 @@ int slave_start_workers(Relay_log_info * /* GAQ queue holds seqno:s of scheduled groups. C polls workers in @c lwm_checkpoint_period to update GAQ (see @c next_event()) - The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max to guarantee - each assigned job being sent to a WQ will find room in GAQ. + The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max + to guarantee each assigned job being sent to a WQ will find room in GAQ. mts_slave_worker_queue_len_max * num-of-W:s is the max length case all jobs contain one event. */ - // size of WQ stays fixed in one slave session + // length of WQ is actually constant though can be made configurable rli->mts_slave_worker_queue_len_max= mts_slave_worker_queue_len_max; rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(), sizeof(Slave_job_group), @@ -4387,7 +4348,6 @@ int slave_start_workers(Relay_log_info * rli->mts_wq_oversize= FALSE; rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap; rli->mts_worker_underrun_level= mts_worker_underrun_level; - rli->mts_total_groups= 0; rli->curr_group_seen_begin= FALSE; rli->curr_group_isolated= FALSE; rli->checkpoint_seqno= 0; @@ -4526,7 +4486,7 @@ void slave_stop_workers(Relay_log_info * "waited due a Worker queue full = %lu " "waited due the total size = %lu " "sleept when Workers occupied = %lu ", - rli->stmt_jobs, rli->mts_wq_overrun_cnt, + rli->mts_events_assigned, rli->mts_wq_overrun_cnt, rli->mts_wq_overfill_cnt, rli->wq_size_waits, rli->mts_wq_no_underrun_cnt); @@ -4586,7 +4546,6 @@ pthread_handler_t handle_slave_sql(void rli->slave_running = 1; pthread_detach_this_thread(); - if (init_slave_thread(thd, SLAVE_THD_SQL)) { /* @@ -4876,7 +4835,7 @@ llstr(rli->get_group_master_log_pos(), l thd->reset_query(); thd->reset_db(NULL, 0); - slave_stop_workers(rli); // mts-II: stopping the worker pool + slave_stop_workers(rli); THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); mysql_mutex_lock(&rli->run_lock); @@ -7222,7 +7181,7 @@ bool change_master(THD* thd, Master_info in-memory value at restart (thus causing errors, as the old relay log does not exist anymore). - Notice that there are not writes to the rli table as slave is not + Notice that the rli table is available exclusively as slave is not running. */ DBUG_ASSERT(!mi->rli->slave_running); === modified file 'sql/sql_class.h' --- a/sql/sql_class.h 2011-06-29 07:04:19 +0000 +++ b/sql/sql_class.h 2011-07-01 13:41:35 +0000 @@ -1945,7 +1945,8 @@ public: /* MTS: accessor to binlog_accessed_db_names list */ - List * get_binlog_accessed_db_names() { + List * get_binlog_accessed_db_names() + { return binlog_accessed_db_names; } No bundle (reason: useless for push emails).