3329 Andrei Elkin 2011-07-01 [merge]
wl5569 MTS
merging from the main repo.
removed:
mysql-test/suite/innodb/include/innodb_stats_bootstrap.inc
storage/innobase/scripts/
storage/innobase/scripts/persistent_storage.sql
added:
mysql-test/r/join_cache_jcl0.result
mysql-test/r/mysql_embedded.result
mysql-test/suite/innodb/r/innodb_buffer_pool_load.result
mysql-test/suite/innodb/t/innodb_buffer_pool_load-master.opt
mysql-test/suite/innodb/t/innodb_buffer_pool_load.test
mysql-test/suite/perfschema/r/table_schema.result
mysql-test/suite/perfschema/t/table_schema.test
mysql-test/suite/sys_vars/r/innodb_buffer_pool_dump_at_shutdown_basic.result
mysql-test/suite/sys_vars/r/innodb_buffer_pool_dump_now_basic.result
mysql-test/suite/sys_vars/r/innodb_buffer_pool_filename_basic.result
mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_abort_basic.result
mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_at_startup_basic.result
mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_now_basic.result
mysql-test/suite/sys_vars/t/innodb_buffer_pool_dump_at_shutdown_basic.test
mysql-test/suite/sys_vars/t/innodb_buffer_pool_dump_now_basic.test
mysql-test/suite/sys_vars/t/innodb_buffer_pool_filename_basic.test
mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_abort_basic.test
mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_at_startup_basic.test
mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_now_basic.test
mysql-test/t/join_cache_jcl0.test
mysql-test/t/mysql_embedded.test
storage/innobase/buf/buf0dump.c
storage/innobase/include/buf0dump.h
unittest/gunit/bounds_checked_array-t.cc
modified:
.bzrignore
cmake/make_dist.cmake.in
cmake/os/WindowsCache.cmake
cmake/plugin.cmake
config.h.cmake
configure.cmake
extra/perror.c
extra/yassl/src/yassl_error.cpp
libmysqld/emb_qcache.cc
libmysqld/lib_sql.cc
mysql-test/collections/default.experimental
mysql-test/extra/rpl_tests/rpl_loaddata.test
mysql-test/extra/rpl_tests/rpl_mts_crash_safe.inc
mysql-test/extra/rpl_tests/rpl_stm_EE_err2.test
mysql-test/include/icp_tests.inc
mysql-test/include/join_cache.inc
mysql-test/include/subquery.inc
mysql-test/include/subquery_mat.inc
mysql-test/include/subquery_sj.inc
mysql-test/mysql-test-run.pl
mysql-test/r/1st.result
mysql-test/r/alter_table.result
mysql-test/r/connect.result
mysql-test/r/count_distinct.result
mysql-test/r/distinct.result
mysql-test/r/events_bugs.result
mysql-test/r/flush.result
mysql-test/r/gis-precise.result
mysql-test/r/gis-rtree.result
mysql-test/r/group_by.result
mysql-test/r/innodb_icp.result
mysql-test/r/innodb_icp_all.result
mysql-test/r/innodb_icp_none.result
mysql-test/r/join_cache_jcl1.result
mysql-test/r/join_cache_jcl2.result
mysql-test/r/join_cache_jcl3.result
mysql-test/r/join_cache_jcl4.result
mysql-test/r/join_cache_jcl5.result
mysql-test/r/join_cache_jcl6.result
mysql-test/r/join_cache_jcl7.result
mysql-test/r/join_cache_jcl8.result
mysql-test/r/join_nested.result
mysql-test/r/join_nested_jcl6.result
mysql-test/r/join_outer.result
mysql-test/r/join_outer_jcl6.result
mysql-test/r/log_tables_upgrade.result
mysql-test/r/lowercase_table4.result
mysql-test/r/multi_update.result
mysql-test/r/multi_update_innodb.result
mysql-test/r/myisam_icp.result
mysql-test/r/myisam_icp_all.result
mysql-test/r/myisam_icp_none.result
mysql-test/r/mysql_upgrade.result
mysql-test/r/mysql_upgrade_ssl.result
mysql-test/r/mysqlcheck.result
mysql-test/r/null_key_all.result
mysql-test/r/null_key_icp.result
mysql-test/r/null_key_none.result
mysql-test/r/order_by_all.result
mysql-test/r/order_by_icp_mrr.result
mysql-test/r/order_by_none.result
mysql-test/r/partition.result
mysql-test/r/partition_datatype.result
mysql-test/r/plugin_auth.result
mysql-test/r/ps.result
mysql-test/r/query_cache_28249.result
mysql-test/r/range_all.result
mysql-test/r/select_all.result
mysql-test/r/select_all_jcl6.result
mysql-test/r/select_found.result
mysql-test/r/select_icp_mrr.result
mysql-test/r/select_icp_mrr_jcl6.result
mysql-test/r/select_none.result
mysql-test/r/select_none_jcl6.result
mysql-test/r/sp_notembedded.result
mysql-test/r/sp_sync.result
mysql-test/r/subquery_all.result
mysql-test/r/subquery_all_jcl6.result
mysql-test/r/subquery_mat.result
mysql-test/r/subquery_mat_all.result
mysql-test/r/subquery_mat_none.result
mysql-test/r/subquery_nomat_nosj.result
mysql-test/r/subquery_nomat_nosj_jcl6.result
mysql-test/r/subquery_none.result
mysql-test/r/subquery_none_jcl6.result
mysql-test/r/subquery_sj_all.result
mysql-test/r/subquery_sj_all_jcl6.result
mysql-test/r/subquery_sj_all_jcl7.result
mysql-test/r/subquery_sj_dupsweed.result
mysql-test/r/subquery_sj_dupsweed_jcl6.result
mysql-test/r/subquery_sj_dupsweed_jcl7.result
mysql-test/r/subquery_sj_firstmatch.result
mysql-test/r/subquery_sj_firstmatch_jcl6.result
mysql-test/r/subquery_sj_firstmatch_jcl7.result
mysql-test/r/subquery_sj_loosescan.result
mysql-test/r/subquery_sj_loosescan_jcl6.result
mysql-test/r/subquery_sj_loosescan_jcl7.result
mysql-test/r/subquery_sj_mat.result
mysql-test/r/subquery_sj_mat_jcl6.result
mysql-test/r/subquery_sj_mat_jcl7.result
mysql-test/r/subquery_sj_mat_nosj.result
mysql-test/r/subquery_sj_none.result
mysql-test/r/subquery_sj_none_jcl6.result
mysql-test/r/subquery_sj_none_jcl7.result
mysql-test/r/subselect_innodb.result
mysql-test/r/symlink.result
mysql-test/r/system_mysql_db.result
mysql-test/r/trigger-compat.result
mysql-test/r/trigger.result
mysql-test/r/type_datetime.result
mysql-test/suite/funcs_1/r/is_columns_mysql.result
mysql-test/suite/funcs_1/r/is_columns_mysql_embedded.result
mysql-test/suite/funcs_1/r/is_key_column_usage.result
mysql-test/suite/funcs_1/r/is_statistics.result
mysql-test/suite/funcs_1/r/is_statistics_mysql.result
mysql-test/suite/funcs_1/r/is_statistics_mysql_embedded.result
mysql-test/suite/funcs_1/r/is_table_constraints.result
mysql-test/suite/funcs_1/r/is_table_constraints_mysql.result
mysql-test/suite/funcs_1/r/is_table_constraints_mysql_embedded.result
mysql-test/suite/funcs_1/r/is_tables_mysql.result
mysql-test/suite/funcs_1/r/is_tables_mysql_embedded.result
mysql-test/suite/innodb/r/innodb-index.result
mysql-test/suite/innodb/r/innodb-system-table-view.result
mysql-test/suite/innodb/r/innodb-use-sys-malloc.result
mysql-test/suite/innodb/r/innodb-zip.result
mysql-test/suite/innodb/r/innodb_bug57904.result
mysql-test/suite/innodb/t/innodb-index.test
mysql-test/suite/innodb/t/innodb-system-table-view.test
mysql-test/suite/innodb/t/innodb-use-sys-malloc-master.opt
mysql-test/suite/innodb/t/innodb-use-sys-malloc.test
mysql-test/suite/innodb/t/innodb-zip.test
mysql-test/suite/innodb/t/innodb_bug11933790.test
mysql-test/suite/innodb/t/innodb_bug57904.test
mysql-test/suite/innodb/t/innodb_bug60049.test
mysql-test/suite/innodb/t/innodb_prefix_index_restart_server.test
mysql-test/suite/innodb/t/innodb_stats.test
mysql-test/suite/innodb/t/innodb_stats_drop_locked.test
mysql-test/suite/perfschema/include/cleanup_helper.inc
mysql-test/suite/perfschema/include/upgrade_check.inc
mysql-test/suite/perfschema/r/all_tests.result
mysql-test/suite/perfschema/r/pfs_upgrade.result
mysql-test/suite/perfschema/r/schema.result
mysql-test/suite/perfschema/r/selects.result
mysql-test/suite/perfschema/t/all_tests.test
mysql-test/suite/perfschema/t/selects.test
mysql-test/suite/rpl/r/rpl_loaddata.result
mysql-test/suite/rpl/r/rpl_mixed_mts_crash_safe.result
mysql-test/suite/rpl/r/rpl_parallel_start_stop.result
mysql-test/suite/rpl/r/rpl_row_mts_crash_safe.result
mysql-test/suite/rpl/r/rpl_stm_EE_err2.result
mysql-test/suite/rpl/r/rpl_stm_loaddata_concurrent.result
mysql-test/suite/rpl/r/rpl_stm_mts_crash_safe.result
mysql-test/suite/rpl/t/rpl_mixed_mts_crash_safe.test
mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
mysql-test/suite/rpl/t/rpl_row_mts_crash_safe.test
mysql-test/suite/rpl/t/rpl_stm_mts_crash_safe.test
mysql-test/t/alter_table.test
mysql-test/t/disabled.def
mysql-test/t/events_bugs.test
mysql-test/t/flush.test
mysql-test/t/gis-precise.test
mysql-test/t/gis-rtree.test
mysql-test/t/group_by.test
mysql-test/t/join_outer.test
mysql-test/t/multi_update.test
mysql-test/t/multi_update_innodb.test
mysql-test/t/partition.test
mysql-test/t/partition_datatype.test
mysql-test/t/ps.test
mysql-test/t/query_cache_28249.test
mysql-test/t/sp_notembedded.test
mysql-test/t/sp_sync.test
mysql-test/t/symlink.test
mysql-test/t/system_mysql_db_fix40123.test
mysql-test/t/system_mysql_db_fix50030.test
mysql-test/t/system_mysql_db_fix50117.test
mysql-test/t/trigger-compat.test
mysql-test/t/type_datetime.test
mysys/mf_pack.c
scripts/mysql_install_db.pl.in
scripts/mysql_install_db.sh
scripts/mysql_system_tables.sql
sql/binlog.cc
sql/event_db_repository.cc
sql/event_parse_data.cc
sql/event_parse_data.h
sql/event_scheduler.cc
sql/field.cc
sql/filesort.cc
sql/gcalc_tools.cc
sql/ha_ndbcluster_binlog.cc
sql/ha_partition.cc
sql/handler.cc
sql/item.cc
sql/item.h
sql/item_cmpfunc.cc
sql/item_cmpfunc.h
sql/item_func.cc
sql/item_func.h
sql/item_row.cc
sql/item_row.h
sql/item_strfunc.cc
sql/item_strfunc.h
sql/item_subselect.cc
sql/item_subselect.h
sql/item_sum.cc
sql/item_timefunc.cc
sql/item_timefunc.h
sql/log_event.cc
sql/log_event_old.cc
sql/mdl.h
sql/opt_range.cc
sql/opt_sum.cc
sql/partition_info.cc
sql/protocol.cc
sql/rpl_master.cc
sql/rpl_reporting.cc
sql/rpl_rli.cc
sql/rpl_rli_pdb.cc
sql/rpl_slave.cc
sql/share/errmsg-utf8.txt
sql/sp.cc
sql/sp_head.cc
sql/spatial.cc
sql/sql_acl.cc
sql/sql_admin.cc
sql/sql_alloc_error_handler.cc
sql/sql_array.h
sql/sql_audit.h
sql/sql_base.cc
sql/sql_base.h
sql/sql_cache.cc
sql/sql_class.cc
sql/sql_class.h
sql/sql_connect.cc
sql/sql_db.cc
sql/sql_derived.cc
sql/sql_do.cc
sql/sql_error.cc
sql/sql_error.h
sql/sql_insert.cc
sql/sql_join_cache.cc
sql/sql_lex.cc
sql/sql_lex.h
sql/sql_load.cc
sql/sql_parse.cc
sql/sql_plist.h
sql/sql_prepare.cc
sql/sql_prepare.h
sql/sql_select.cc
sql/sql_select.h
sql/sql_servers.cc
sql/sql_show.cc
sql/sql_signal.cc
sql/sql_table.cc
sql/sql_test.cc
sql/sql_time.cc
sql/sql_trigger.cc
sql/sql_trigger.h
sql/sql_union.cc
sql/sql_update.cc
sql/sql_yacc.yy
sql/structs.h
sql/sys_vars.cc
sql/table.cc
sql/table.h
sql/transaction.cc
sql/tztime.cc
storage/innobase/CMakeLists.txt
storage/innobase/btr/btr0btr.c
storage/innobase/btr/btr0cur.c
storage/innobase/buf/buf0buddy.c
storage/innobase/buf/buf0buf.c
storage/innobase/buf/buf0flu.c
storage/innobase/buf/buf0lru.c
storage/innobase/buf/buf0rea.c
storage/innobase/fil/fil0fil.c
storage/innobase/handler/ha_innodb.cc
storage/innobase/include/btr0btr.h
storage/innobase/include/btr0cur.h
storage/innobase/include/btr0cur.ic
storage/innobase/include/buf0buddy.h
storage/innobase/include/buf0buddy.ic
storage/innobase/include/buf0buf.h
storage/innobase/include/buf0buf.ic
storage/innobase/include/buf0lru.h
storage/innobase/include/buf0rea.h
storage/innobase/include/buf0types.h
storage/innobase/include/db0err.h
storage/innobase/include/page0cur.ic
storage/innobase/include/page0page.h
storage/innobase/include/page0page.ic
storage/innobase/include/rem0rec.h
storage/innobase/include/rem0rec.ic
storage/innobase/include/srv0srv.h
storage/innobase/include/srv0start.h
storage/innobase/include/sync0rw.ic
storage/innobase/include/sync0sync.h
storage/innobase/include/univ.i
storage/innobase/include/ut0ut.h
storage/innobase/page/page0cur.c
storage/innobase/page/page0page.c
storage/innobase/page/page0zip.c
storage/innobase/rem/rem0rec.c
storage/innobase/row/row0ins.c
storage/innobase/row/row0mysql.c
storage/innobase/row/row0row.c
storage/innobase/row/row0upd.c
storage/innobase/row/row0vers.c
storage/innobase/srv/srv0srv.c
storage/innobase/srv/srv0start.c
storage/innobase/sync/sync0rw.c
storage/innobase/sync/sync0sync.c
storage/innobase/trx/trx0rec.c
storage/innobase/trx/trx0sys.c
storage/innobase/ut/ut0ut.c
storage/myisam/mi_update.c
storage/myisam/mi_write.c
tests/mysql_client_test.c
unittest/gunit/CMakeLists.txt
=== modified file 'mysql-test/suite/rpl/t/rpl_heartbeat_basic.test'
--- a/mysql-test/suite/rpl/t/rpl_heartbeat_basic.test 2011-06-20 13:26:35 +0000
+++ b/mysql-test/suite/rpl/t/rpl_heartbeat_basic.test 2011-07-01 10:16:52 +0000
@@ -26,7 +26,7 @@ call mtr.add_suppression("The slave coor
let $connect_retry= 20;
--echo *** Preparing ***
---connection slave
+sync_slave_with_master;
--disable_query_log
call mtr.add_suppression("The master's UUID has changed, although this should not happen unless you have changed it manually.");
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-06-29 07:04:19 +0000
+++ b/sql/log_event.cc 2011-07-01 13:41:35 +0000
@@ -2374,11 +2374,11 @@ bool Log_event::contains_partition_info(
(Delete, Update, Write -rows)
T - terminator of the group (XID, COMMIT, ROLLBACK, auto-commit query)
- Only the first g-type event computes the assigned Worker which once
+ Only the first g-event computes the assigned Worker which once
is determined remains to be for the rest of the group.
- That is the g-type event solely carries partitioning info.
- For B-type the assigned Worker is NULL to indicate Coordinator
- has not yet decided. The same applies to p-type.
+ That is the g-event solely carries partitioning info.
+ For B-event the assigned Worker is NULL to indicate Coordinator
+ has not yet decided. The same applies to p-event.
Notice, these is a special group consisting of optionally multiple p-events
terminating with a g-event.
@@ -2390,9 +2390,9 @@ bool Log_event::contains_partition_info(
done.
- @note The function can update APH (through map_db_to_worker()), GAQ objects
- and relocate some temporary tables from Coordinator's list into
- involved entries of APH.
+ @note The function updates GAQ queue directly, updates APH hash
+ plus relocates some temporary tables from Coordinator's list into
+ involved entries of APH through @c map_db_to_worker.
There's few memory allocations commented where to be freed.
@return a pointer to the Worker struct or NULL.
@@ -2406,6 +2406,7 @@ Slave_worker *Log_event::get_slave_worke
Slave_worker *ret_worker= NULL;
THD *thd= rli->info_thd;
char llbuff[22];
+ Slave_committed_queue *gaq= rli->gaq;
/* checking partioning properties and perform corresponding actions */
@@ -2413,14 +2414,19 @@ Slave_worker *Log_event::get_slave_worke
if ((is_b_event= starts_group()) ||
// or DDL:s or autocommit queries possibly associated with own p-events
(!rli->curr_group_seen_begin &&
- // the following is a case of no-B group: { p_1,p_2,...,p_k, g}
- (rli->gaq->empty() ||
- ((Slave_job_group *)
- dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
+ /*
+ the following is a special case of B-free still multi-event group like
+ { p_1,p_2,...,p_k, g }.
+ In that case either GAQ is empty (the very first group is being
+ assigned) or the last assigned group index points at one of
+ mapped-to-a-worker.
+ */
+ (gaq->empty() ||
+ gaq->get_job_group(rli->gaq->assigned_group_index)->
worker_id != MTS_WORKER_UNDEF)))
{
ulong gaq_idx;
- rli->mts_total_groups++;
+ rli->mts_groups_assigned++;
rli->curr_group_isolated= FALSE;
g.master_log_pos= log_pos;
@@ -2428,7 +2434,7 @@ Slave_worker *Log_event::get_slave_worke
g.group_master_log_name= NULL; // todo: remove
g.group_relay_log_name= NULL;
g.worker_id= MTS_WORKER_UNDEF;
- g.total_seqno= rli->mts_total_groups;
+ g.total_seqno= rli->mts_groups_assigned;
g.checkpoint_log_name= NULL;
g.checkpoint_log_pos= 0;
g.checkpoint_relay_log_name= NULL;
@@ -2437,13 +2443,12 @@ Slave_worker *Log_event::get_slave_worke
g.done= 0;
// the last occupied GAQ's array index
- gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
+ gaq_idx= gaq->assigned_group_index= gaq->en_queue((void *) &g);
- DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < rli->gaq->size);
- DBUG_ASSERT(((Slave_job_group *)
- dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
+ DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < gaq->size);
+ DBUG_ASSERT(gaq->get_job_group(rli->gaq->assigned_group_index)->
group_relay_log_name == NULL);
- DBUG_ASSERT(rli->gaq->assigned_group_index != MTS_WORKER_UNDEF); // gaq must have room
+ DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF); // gaq must have room
DBUG_ASSERT(rli->last_assigned_worker == NULL);
if (is_b_event)
@@ -2455,7 +2460,7 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(rli->curr_group_da.elements == 1);
- // mark the current grup as started with B-event
+ // mark the current group as started with explicit B-event
rli->curr_group_seen_begin= TRUE;
return ret_worker;
@@ -2514,10 +2519,8 @@ Slave_worker *Log_event::get_slave_worke
i++;
} while (it++);
- if ((ptr_g= ((Slave_job_group *)
- dynamic_array_ptr(&rli->gaq->Q,
- rli->gaq->assigned_group_index)))->worker_id
- == MTS_WORKER_UNDEF)
+ if ((ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index))->
+ worker_id == MTS_WORKER_UNDEF)
{
ptr_g->worker_id= ret_worker->id;
@@ -2585,12 +2588,11 @@ Slave_worker *Log_event::get_slave_worke
if (ends_group() || !rli->curr_group_seen_begin)
{
// index of GAQ that this terminal event belongs to
- mts_group_cnt= rli->gaq->assigned_group_index;
+ mts_group_cnt= gaq->assigned_group_index;
rli->mts_group_status= Relay_log_info::MTS_END_GROUP;
if (rli->curr_group_isolated)
mts_do_isolate_group();
- ptr_g= (Slave_job_group *)
- dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+ ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index);
DBUG_ASSERT(ret_worker != NULL);
@@ -2665,18 +2667,18 @@ Slave_worker *Log_event::get_slave_worke
@return 0 as success, otherwise a failure.
*/
-int Log_event::apply_event(Relay_log_info const *rli)
+int Log_event::apply_event(Relay_log_info *rli)
{
DBUG_ENTER("LOG_EVENT:apply_event");
- Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
- bool parallel= FALSE, async_event= FALSE, seq_event= FALSE;
- THD *thd= c_rli->info_thd;
+ bool parallel= FALSE;
+ enum enum_mts_event_exec_mode actual_exec_mode= EVENT_EXEC_PARALLEL;
+ THD *thd= rli->info_thd;
- worker= c_rli;
+ worker= rli;
if (rli->is_mts_recovery())
{
- bool skip= bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
+ bool skip= bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index);
if (skip)
{
@@ -2689,10 +2691,10 @@ int Log_event::apply_event(Relay_log_inf
}
if (!(parallel= rli->is_parallel_exec()) ||
- (async_event=
- mts_async_exec_by_coordinator(::server_id,
- rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) ||
- (seq_event= mts_sequential_exec()))
+ ((actual_exec_mode=
+ mts_execution_mode(::server_id,
+ rli->mts_group_status == Relay_log_info::MTS_IN_GROUP))
+ != EVENT_EXEC_PARALLEL))
{
if (parallel)
{
@@ -2704,7 +2706,7 @@ int Log_event::apply_event(Relay_log_inf
for terminal events to finish.
*/
- if (!async_event)
+ if (actual_exec_mode != EVENT_EXEC_ASYNC)
{
/*
this event does not split the current group but is indeed
@@ -2719,14 +2721,14 @@ int Log_event::apply_event(Relay_log_inf
wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var.
MTS has to stop to suggest restart in the permanent sequential mode.
*/
- llstr(c_rli->get_event_relay_log_pos(), llbuff);
+ llstr(rli->get_event_relay_log_pos(), llbuff);
rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
ER(ER_MTS_CANT_PARALLEL),
- get_type_str(), c_rli->get_event_relay_log_name(),
- c_rli->get_event_relay_log_pos());
+ get_type_str(), rli->get_event_relay_log_name(),
+ rli->get_event_relay_log_pos());
/* Coordinator cant continue, it marks MTS group status accordingly */
- c_rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
+ rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
goto err;
}
@@ -2742,32 +2744,37 @@ int Log_event::apply_event(Relay_log_inf
#ifndef DBUG_OFF
/* all Workers are idle as done through wait_for_workers_to_finish */
- for (uint k= 0; k < c_rli->curr_group_da.elements; k++)
+ for (uint k= 0; k < rli->curr_group_da.elements; k++)
{
DBUG_ASSERT(!(*(Slave_worker **)
- dynamic_array_ptr(&c_rli->workers, k))->usage_partition);
+ dynamic_array_ptr(&rli->workers, k))->usage_partition);
DBUG_ASSERT(!(*(Slave_worker **)
- dynamic_array_ptr(&c_rli->workers, k))->jobs.len);
+ dynamic_array_ptr(&rli->workers, k))->jobs.len);
}
#endif
}
+ else
+ {
+ DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_ASYNC);
+ }
}
DBUG_RETURN(do_apply_event(rli));
}
+ DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_PARALLEL);
DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
rli->last_assigned_worker);
worker= NULL;
- c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
+ rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
worker= (Relay_log_info*)
- (c_rli->last_assigned_worker= get_slave_worker(c_rli));
+ (rli->last_assigned_worker= get_slave_worker(rli));
#ifndef DBUG_OFF
- if (c_rli->last_assigned_worker)
+ if (rli->last_assigned_worker)
DBUG_PRINT("mts", ("Assigning job to worker %lu",
- c_rli->last_assigned_worker->id));
+ rli->last_assigned_worker->id));
#endif
err:
@@ -6273,7 +6280,6 @@ int Rotate_log_event::do_update_pos(Rela
rli->get_group_master_log_name(),
(ulong) rli->get_group_master_log_pos()));
mysql_mutex_unlock(&rli->data_lock);
- rli->flush_info(TRUE); // todo: error branch
if (rli->is_parallel_exec())
rli->reset_notified_checkpoint(0, when + (time_t) exec_time);
@@ -6640,34 +6646,34 @@ int Xid_log_event::do_apply_event_worker
{
int error= 0;
bool is_trans_repo= w->is_transactional();
+ Slave_committed_queue *gaq= w->c_rli->gaq;
DBUG_PRINT("mts", ("do_apply group master %s %llu group relay %s %llu event %s %llu.",
- w->group_master_log_name,
- w->group_master_log_pos,
- w->group_relay_log_name,
- w->group_relay_log_pos,
- w->event_relay_log_name,
- w->event_relay_log_pos));
+ w->get_group_master_log_name(),
+ w->get_group_master_log_pos(),
+ w->get_group_relay_log_name(),
+ w->get_group_relay_log_pos(),
+ w->get_event_relay_log_name(),
+ w->get_event_relay_log_pos()));
DBUG_EXECUTE_IF("crash_before_update_pos", DBUG_SUICIDE(););
if (is_trans_repo)
{
ulong gaq_idx= mts_group_cnt;
- Slave_job_group *ptr_g=
- (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+ Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx);
if ((error= w->commit_positions(this, ptr_g)))
goto err;
}
DBUG_PRINT("mts", ("do_apply group master %s %llu group relay %s %llu event %s %llu.",
- w->group_master_log_name,
- w->group_master_log_pos,
- w->group_relay_log_name,
- w->group_relay_log_pos,
- w->event_relay_log_name,
- w->event_relay_log_pos));
+ w->get_group_master_log_name(),
+ w->get_group_master_log_pos(),
+ w->get_group_relay_log_name(),
+ w->get_group_relay_log_pos(),
+ w->get_event_relay_log_name(),
+ w->get_event_relay_log_pos()));
DBUG_EXECUTE_IF("crash_after_update_pos_before_apply", DBUG_SUICIDE(););
@@ -9513,7 +9519,6 @@ int Table_map_log_event::do_apply_event(
int error= 0;
- // mts-II todo: consider filtering
if (rli->info_thd->slave_thread /* filtering is for slave only */ &&
(!rpl_filter->db_ok(table_list->db) ||
(rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-06-27 17:31:45 +0000
+++ b/sql/log_event.h 2011-07-01 12:48:25 +0000
@@ -262,7 +262,7 @@ struct sql_ex_info
*/
#define MAX_DBS_IN_EVENT_MTS 16
/*
- When the actual number of db:s exceeds MAX_DBS_IN_EVENT_MTS
+ When the actual number of databases exceeds MAX_DBS_IN_EVENT_MTS
the value of OVER_MAX_DBS_IN_EVENT_MTS is is put into the mts_accessed_dbs status.
*/
#define OVER_MAX_DBS_IN_EVENT_MTS 254
@@ -356,7 +356,7 @@ struct sql_ex_info
#define Q_INVOKER 11
/*
- Q_UPDATED_DB_NAMES status variable collects of the updated db:s
+ Q_UPDATED_DB_NAMES status variable collects of the updated databases
total number and their names to be propagated to the slave in order
to facilitate the parallel applying of the Query events.
*/
@@ -540,7 +540,7 @@ struct sql_ex_info
MTS: group of events can be marked to force its execution
in isolation from any other Workers.
Typically that is done for a transaction that contains
- a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s.
+ a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases.
The flag is set ON for an event that terminates its group.
*/
#define LOG_EVENT_MTS_ISOLATE_F 0x200
@@ -1270,21 +1270,44 @@ public:
/* Return start of query time or current time */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
-public:
+
+private:
+
+ /*
+ possible decisions by mts_execution_mode()
+ */
+ enum enum_mts_event_exec_mode
+ {
+ /*
+ Event is run by a Worker.
+ */
+ EVENT_EXEC_PARALLEL,
+ /*
+ Event is run by Coordinator.
+ */
+ EVENT_EXEC_ASYNC,
+ /*
+ Event is run by Coordinator and requires W:s synchronization.
+ */
+ EVENT_EXEC_SYNC,
+ /*
+ Event can't be executed neither by Workers nor Coordinator.
+ */
+ EVENT_EXEC_CAN_NOT
+ };
/**
- MST: to execute some event types serially.
+ Is called from mts_execution_mode() to
- @note There are incompatile combinations such the referred event
- is wrapped with BEGIN/COMMIT. Such cases should be identified
- by the caller and treates as an error.
-
- Notice, even though the func returns TRUE, some events
- like old LOAD-DATA rooted EXEC_LOAD_EVENT can't run even
- in isolated parallel mode and MTS would have to stop.
-
- @return TRUE if despite permanent parallel execution mode an event
- needs applying in a real isolation that is sequentially.
+ @return TRUE if the event needs applying with synchronization
+ agaist Workers, otherwise
+ FALSE
+
+ @note There are incompatile combinations such as referred further events
+ are wrapped with BEGIN/COMMIT. Such cases should be identified
+ by the caller and treats correspondingly.
+
+ todo: to mts-support Old master Load-data related events
*/
bool mts_sequential_exec()
{
@@ -1304,37 +1327,35 @@ public:
}
/**
- MST: some events have to be applied by Coordinator concurrently with Workers.
+ MTS Coordinator finds out a way how to execute the current event.
- *TODO*: combine with mts_sequential_exec() to have ternary outcome.
-
- @return TRUE if that's the case,
- FALSE otherwise.
- */
- bool mts_async_exec_by_coordinator(ulong slave_server_id, bool mts_in_group)
- {
- return
- (get_type_code() == FORMAT_DESCRIPTION_EVENT &&
- ((server_id == (uint32) ::server_id) || (log_pos == 0)))
- ||
- (get_type_code() == ROTATE_EVENT &&
- ((server_id == (uint32) ::server_id) ||
- (log_pos == 0 && mts_in_group)));
+ Besides the parallelizable case, some events have to be applied by
+ Coordinator concurrently with Workers and some to require synchronization
+ with Workers before to apply them.
+
+ @retval EVENT_EXEC_PARALLEL if event is executed by a Worker
+ @retval EVENT_EXEC_ASYNC if event is executed by Coordinator
+ @retval EVENT_EXEC_ASYNC if event is executed by Coordinator
+ with synchronization against the Workers
+ */
+ enum enum_mts_event_exec_mode mts_execution_mode(ulong slave_server_id,
+ bool mts_in_group)
+ {
+ if ((get_type_code() == FORMAT_DESCRIPTION_EVENT &&
+ ((server_id == (uint32) ::server_id) || (log_pos == 0)))
+ ||
+ (get_type_code() == ROTATE_EVENT &&
+ ((server_id == (uint32) ::server_id) ||
+ (log_pos == 0 /* very first fake Rotate */
+ && mts_in_group /* ignored events, R_f at slave restart */))))
+ return EVENT_EXEC_ASYNC;
+ else if (mts_sequential_exec())
+ return EVENT_EXEC_SYNC;
+ else
+ return EVENT_EXEC_PARALLEL;
}
/**
- Events of a cetain type carry partitioning data such as db names.
- */
- bool contains_partition_info();
-
- /**
- Events of a cetain type start or end a group of events treated
- transactionally wrt binlog.
- */
- virtual bool starts_group() { return FALSE; }
- virtual bool ends_group() { return FALSE; }
-
- /**
@return index in \in [0, M] range to indicate
to be assigned worker;
M is the max index of the worker pool.
@@ -1353,16 +1374,10 @@ public:
}
/*
- returns the number of updated by the event databases.
- In other than Query-log-event case that's one.
- */
- virtual uint8 mts_number_dbs() { return 1; }
-
- /*
Group of events can be marked to force its execution
in isolation from any other Workers.
Typically that is done for a transaction that contains
- a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s
+ a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases.
*/
virtual void mts_do_isolate_group()
{
@@ -1373,13 +1388,48 @@ public:
flags |= LOG_EVENT_MTS_ISOLATE_F;
}
+
+public:
+
+ /**
+ @return TRUE if events carries partitioning data (database names).
+ */
+ bool contains_partition_info();
+
/*
- Verifying whether the terminal event of a group is marked to
- execute in isolation.
+ @return the number of updated by the event databases.
+
+ @note In other than Query-log-event case that's one.
+ */
+ virtual uint8 mts_number_dbs() { return 1; }
+
+ /**
+ @return TRUE if the terminal event of a group is marked to
+ execute in isolation from other Workers,
+ FASE otherwise
*/
bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; }
/**
+ Events of a cetain type can start or end a group of events treated
+ transactionally wrt binlog.
+
+ Public access is required by implementation of recovery + skip.
+
+ @return TRUE if the event starts a group (transaction)
+ FASE otherwise
+ */
+ virtual bool starts_group() { return FALSE; }
+
+ /**
+ @return TRUE if the event starts a group (transaction)
+ FASE otherwise
+ */
+ virtual bool ends_group() { return FALSE; }
+
+public:
+
+ /**
Apply the event to the database.
This function represents the public interface for applying an
@@ -1387,7 +1437,7 @@ public:
@see do_apply_event
*/
- int apply_event(Relay_log_info const *rli);
+ int apply_event(Relay_log_info *rli);
/**
Update the relay log position.
@@ -1963,7 +2013,7 @@ public:
*/
uint32 master_data_written;
/*
- number of updated db:s by the query and their names. This info
+ number of updated databases by the query and their names. This info
is requested by both Coordinator and Worker.
*/
uchar mts_accessed_dbs;
@@ -1977,15 +2027,15 @@ public:
const char* get_db() { return db; }
/**
- Returns a list of updated db:s or the default db single item list
- in case of over-MAX_DBS_IN_EVENT_MTS actual db:s.
+ Returns a list of updated databases or the default db single item list
+ in case of the number of databases exceeds MAX_DBS_IN_EVENT_MTS.
*/
virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
{
List<char> *res= new (mem_root) List<char>;
if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
{
- // "" == empty string db name is special to indicate sequential applying
+ // the empty string db name is special to indicate sequential applying
mts_accessed_db_names[0][0]= 0;
res->push_back((char*) mts_accessed_db_names[0]);
}
@@ -2066,10 +2116,9 @@ public: /* !!! Public in this pat
!strncasecmp(query, "ROLLBACK", 8);
}
/**
- todo: Parallel support for DDL:s.
- DDL queries are logged without BEGIN/COMMIT parentheses
- and can be regarded as the starting and the ending events of
- its self-group.
+ Notice, DDL queries are logged without BEGIN/COMMIT parentheses
+ and identification of such single-query group
+ occures within logics of @c get_slave_worker().
*/
bool starts_group() { return !strncmp(query, "BEGIN", q_len); }
virtual bool ends_group()
@@ -3161,7 +3210,6 @@ public:
const char* get_db() { return db; }
#endif
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- /* MTS executes this event sequentially */
virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
{
@@ -4473,8 +4521,6 @@ private:
int append_query_string(const CHARSET_INFO *csinfo,
String const *from, String *to);
bool sqlcom_can_generate_row_events(const THD *thd);
-void handle_rows_query_log_event(Log_event *ev, Relay_log_info *rli);
-
bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg);
uint8 get_checksum_alg(const char* buf, ulong len);
extern TYPELIB binlog_checksum_typelib;
=== modified file 'sql/rpl_reporting.cc'
--- a/sql/rpl_reporting.cc 2011-06-29 07:04:19 +0000
+++ b/sql/rpl_reporting.cc 2011-07-01 13:41:35 +0000
@@ -110,7 +110,7 @@ Slave_reporting_capability::report(logle
}
void
-Slave_reporting_capability::do_report(loglevel level, int err_code,
+Slave_reporting_capability::va_report(loglevel level, int err_code,
const char *msg, va_list args) const
{
#if !defined(EMBEDDED_LIBRARY)
=== modified file 'sql/rpl_reporting.h'
--- a/sql/rpl_reporting.h 2011-06-20 13:26:35 +0000
+++ b/sql/rpl_reporting.h 2011-07-01 12:48:25 +0000
@@ -59,7 +59,7 @@ public:
*/
virtual void report(loglevel level, int err_code, const char *msg, ...) const
ATTRIBUTE_FORMAT(printf, 4, 5);
- void do_report(loglevel level, int err_code,
+ void va_report(loglevel level, int err_code,
const char *msg, va_list v_args) const;
/**
@@ -130,6 +130,15 @@ public:
bool is_error() const { return last_error().number != 0; }
virtual ~Slave_reporting_capability()= 0;
+
+protected:
+
+ virtual void do_report(loglevel level, int err_code,
+ const char *msg, va_list v_args) const
+ {
+ va_report(level, err_code, msg, v_args);
+ }
+
private:
/**
Last error produced by the I/O or SQL thread respectively.
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-07-01 08:45:11 +0000
+++ b/sql/rpl_rli.cc 2011-07-01 13:41:35 +0000
@@ -53,14 +53,13 @@ static PSI_cond_info *worker_conds= NULL
PSI_mutex_key *key_mutex_slave_parallel_worker= NULL;
PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
-PSI_mutex_key key_mutex_mts_temp_tables_lock;
PSI_cond_key *key_cond_slave_parallel_worker= NULL;
PSI_cond_key key_cond_slave_parallel_pend_jobs;
#endif
Relay_log_info::Relay_log_info(bool is_slave_recovery)
- :Rpl_info("SQL", "SQL-Thread"), checkpoint_thd(0), checkpoint_running(0),
+ :Rpl_info("SQL", "SQL-Thread"),
replicate_same_server_id(::replicate_same_server_id),
cur_log_fd(-1), relay_log(&sync_relaylog_period),
is_relay_log_recovery(is_slave_recovery),
@@ -68,11 +67,11 @@ Relay_log_info::Relay_log_info(bool is_s
cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
last_master_timestamp(0), slave_skip_counter(0),
- abort_pos_wait(0), slave_exec_mode(0), until_condition(UNTIL_NONE),
+ abort_pos_wait(0), until_condition(UNTIL_NONE),
until_log_pos(0), retried_trans(0),
tables_to_lock(0), tables_to_lock_count(0),
rows_query_ev(NULL), last_event_start_time(0),
- this_worker(NULL), slave_parallel_workers(0),
+ slave_parallel_workers(0),
recovery_parallel_workers(0), checkpoint_seqno(0),
checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0),
mts_recovery_index(0), mts_recovery_group_seen_begin(0),
@@ -112,7 +111,7 @@ void Relay_log_info::init_workers(ulong
Parallel slave parameters initialization is done regardless
whether the feature is or going to be active or not.
*/
- trans_jobs= stmt_jobs= pending_jobs= wq_size_waits= 0;
+ mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits= 0;
mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
@@ -142,12 +141,9 @@ void Relay_log_info::init_workers(ulong
}
mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
MY_MUTEX_INIT_FAST);
- mysql_mutex_init(key_mutex_mts_temp_tables_lock, &mts_temp_tables_lock,
- MY_MUTEX_INIT_FAST);
mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
#else
mysql_mutex_init(NULL, &pending_jobs_lock, MY_MUTEX_INIT_FAST);
- mysql_mutex_init(NULL, &mts_temp_tables_lock, MY_MUTEX_INIT_FAST);
mysql_cond_init(NULL, &pending_jobs_cond, NULL);
#endif
}
@@ -169,7 +165,6 @@ void Relay_log_info::deinit_workers()
mysql_mutex_destroy(&pending_jobs_lock);
mysql_cond_destroy(&pending_jobs_cond);
- mysql_mutex_destroy(&mts_temp_tables_lock);
delete_dynamic(&workers);
my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
@@ -795,6 +790,18 @@ void Relay_log_info::inc_group_relay_log
{
group_master_log_pos= log_pos;
}
+
+ /*
+ In MTS mode FD or Rotate event commit their solitary group to
+ Coordinator's info table. Callers make sure that Workers have been
+ executed all assignements.
+ Broadcast to master_pos_wait() waiters should be done after
+ the table is updated.
+ */
+ DBUG_ASSERT(!is_parallel_exec() ||
+ mts_group_status != Relay_log_info::MTS_IN_GROUP);
+ flush_info(TRUE); // todo: error branch
+
mysql_cond_broadcast(&data_cond);
if (!skip_lock)
mysql_mutex_unlock(&data_lock);
@@ -1095,6 +1102,9 @@ void Relay_log_info::stmt_done(my_off_t
{
if (is_parallel_exec())
{
+
+ DBUG_ASSERT(!mts_is_worker(info_thd));
+
/*
Format Description events are special events that are handled as a
synchronization points. For that reason, the checkpoint routine is
@@ -1103,10 +1113,6 @@ void Relay_log_info::stmt_done(my_off_t
(void) mts_checkpoint_routine(this, 0, FALSE, FALSE); // TODO: ALFRANIO ERROR
}
inc_group_relay_log_pos(event_master_log_pos);
-
- DBUG_ASSERT(this_worker == NULL);
-
- flush_info(is_transactional() ? TRUE : FALSE); // TODO: ALFRANIO ERROR
}
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-06-27 17:31:45 +0000
+++ b/sql/rpl_rli.h 2011-07-01 10:16:52 +0000
@@ -108,9 +108,6 @@ class Relay_log_info : public Rpl_info
friend class Rpl_info_factory;
public:
- THD* checkpoint_thd;
- bool checkpoint_running;
-
/**
Flags for the state of the replication.
*/
@@ -209,6 +206,7 @@ public:
happen when, for example, the relay log gets rotated because of
max_binlog_size.
*/
+protected:
char group_relay_log_name[FN_REFLEN];
ulonglong group_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
@@ -230,8 +228,6 @@ public:
char group_master_log_name[FN_REFLEN];
volatile my_off_t group_master_log_pos;
-// TODO: Restore!
-protected:
/*
When it commits, InnoDB internally stores the master log position it has
processed so far; the position to store is the one of the end of the
@@ -283,12 +279,6 @@ public:
mysql_cond_t log_space_cond;
/*
- A cache for the global system variable's value.
- The value is reset at the beginning of each statement execution.
- */
- ulong slave_exec_mode;
-
- /*
Condition and its parameters from START SLAVE UNTIL clause.
UNTIL condition is tested with is_until_satisfied() method that is
@@ -431,7 +421,7 @@ public:
*/
time_t last_event_start_time;
- /*
+ /*****************************************************************************
WL#5569 MTS
legends:
@@ -441,41 +431,36 @@ public:
*/
DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
volatile ulong pending_jobs;
- ulong stmt_jobs; // statistics: number of events (statements) processed
- ulong trans_jobs; // statistics: number of groups (transactions) processed
- ulong wq_size_waits; // number of times C goes to sleep due to WQ:s oversize
mysql_mutex_t pending_jobs_lock;
mysql_cond_t pending_jobs_cond;
ulong mts_slave_worker_queue_len_max;
ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s
- ulonglong mts_pending_jobs_size_max; // the max forcing to wait by C
- bool mts_wq_oversize; // C raises flag to wait some memory's released
- Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
+ ulonglong mts_pending_jobs_size_max; // max of WQ:s size forcing C to wait
+ bool mts_wq_oversize; // C raises flag to wait some memory's released
+ Slave_worker *last_assigned_worker;// is set to a Worker at assigning a group
+ /*
+ master-binlog ordered queue of Slave_job_group descriptors of groups
+ that are under processing
+ */
Slave_committed_queue *gaq;
- DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
+ /*
+ Container for references of involved partitions for the current event group
+ */
+ DYNAMIC_ARRAY curr_group_assigned_parts;
DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events
bool curr_group_seen_begin; // current group started with B-event or not
bool curr_group_isolated; // current group requires execution in isolation
volatile ulong mts_wq_underrun_w_id; // Id of a Worker whose queue is getting empty
- volatile long mts_wq_excess; // excessive overrun counter; when W increments and decrements it it also marks updates its own wq_overrun_set
- volatile ulong mts_wq_overrun_cnt; // statistics of all mts_wq_excess increments
- ulong mts_wq_no_underrun_cnt; // counts times of C goes to sleep when W:s are filled
- ulong mts_wq_overfill_cnt; // counts C waits when a W's queue is full
- long mts_worker_underrun_level; // percent of WQ size at which W is considered hungry
- ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
- Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
- ulonglong mts_total_groups; // total event groups distributed in current session
- ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
- ulong slave_parallel_workers; // the one slave session time number of workers
- ulong recovery_parallel_workers; // number of workers while recovering
-
/*
- A sorted array of Worker current assignements number to provide
- approximate view on Workers loading.
- The first row of the least occupied Worker is queried at assigning
- a new partition. Is updated at checkpoint commit to the main RLI.
+ excessive overrun counter; when W increments and decrements it it
+ also marks updates its own wq_overrun_set
*/
- DYNAMIC_ARRAY least_occupied_workers;
+ volatile long mts_wq_excess;
+ long mts_worker_underrun_level; // % of WQ size at which W is considered hungry
+ ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
+ ulong opt_slave_parallel_workers; // cache for ::opt_slave_parallel_workers
+ ulong slave_parallel_workers; // the one slave session time number of workers
+ ulong recovery_parallel_workers; // number of workers while recovering
uint checkpoint_seqno; // counter of groups executed after the most recent CP
uint checkpoint_group; // number of groups in one checkpoint interval (period).
MY_BITMAP recovery_groups; // bitmap used during recovery
@@ -508,9 +493,9 @@ public:
MTS_NOT_IN_GROUP => |
{MTS_IN_GROUP => MTS_END_GROUP --+} while (!killed) => MTS_KILLED_GROUP
- though MTS_END_GROUP has `->' link to MTS_NOT_IN_GROUP
- when Coordinator synchronizes with Workers by demanding them to complete their
- assignments.
+ MTS_END_GROUP has `->' loop breaking link to MTS_NOT_IN_GROUP when
+ Coordinator synchronizes with Workers by demanding them to
+ complete their assignments.
*/
enum
{
@@ -525,6 +510,24 @@ public:
MTS_KILLED_GROUP /* Coordinator gave out to reach MTS_END_GROUP */
} mts_group_status;
+ /*
+ MTS statistics:
+ */
+ ulong mts_events_assigned; // number of events (statements) scheduled
+ ulong mts_groups_assigned; // number of groups (transactions) scheduled
+ volatile ulong mts_wq_overrun_cnt; // counter of all mts_wq_excess increments
+ ulong wq_size_waits; // number of times C slept due to WQ:s oversize
+ ulong mts_wq_no_underrun_cnt;// number of times of C slept when W:s were filled
+ ulong mts_wq_overfill_cnt; // counter of C waited when a W's queue was full
+ /*
+ A sorted array of the Workers' current assignement numbers to provide
+ approximate view on Workers loading.
+ The first row of the least occupied Worker is queried at assigning
+ a new partition. Is updated at checkpoint commit to the main RLI.
+ */
+ DYNAMIC_ARRAY least_occupied_workers;
+ /* end of MTS statistics */
+
/* most of allocation in the coordinator rli is there */
void init_workers(ulong);
@@ -576,6 +579,10 @@ public:
*/
void reset_notified_checkpoint(ulong, time_t);
+ /*
+ * End of MTS section ******************************************************/
+
+
/**
Helper function to do after statement completion.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2011-07-01 08:45:11 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-07-01 13:41:35 +0000
@@ -58,9 +58,9 @@ ulong mts_partition_hash_soft_max= 16;
Slave_worker::Slave_worker(const char* type, const char* pfs,
Relay_log_info *rli)
: Relay_log_info(FALSE), c_rli(rli),
- checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
- inited_group_execed(0), processed_group(0), running_status(NOT_RUNNING),
- last_event(NULL)
+ checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
+ inited_group_executed(0), checkpoint_seqno(0), running_status(NOT_RUNNING),
+ inited_curr_group_exec_parts(0)
{
checkpoint_relay_log_name[0]= 0;
checkpoint_master_log_name[0]= 0;
@@ -68,15 +68,72 @@ Slave_worker::Slave_worker(const char* t
Slave_worker::~Slave_worker()
{
- delete_dynamic(&curr_group_exec_parts);
+ if (inited_curr_group_exec_parts)
+ delete_dynamic(&curr_group_exec_parts);
- if (inited_group_execed)
+ if (inited_group_executed)
{
- bitmap_free(&group_execed);
+ bitmap_free(&group_executed);
bitmap_free(&group_shifted);
}
}
+/**
+ Method is executed by Coordinator at Worker startup time to initialize
+ members parly with values supplied by Coordinator through rli.
+
+ @param rli Coordinator's Relay_log_info pointer
+ @param i identifier of the Worker
+
+ @return 0 success
+ non-zero failure
+*/
+int Slave_worker::init_worker(Relay_log_info * rli, ulong i)
+{
+ uint k;
+ Slave_job_item empty= {NULL};
+
+ c_rli= rli;
+ if (init_info())
+ return 1;
+
+ id= i;
+ curr_group_exec_parts.elements= 0;
+ relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
+ checkpoint_notified= FALSE;
+ bitmap_shifted= 0;
+ workers= c_rli->workers; // shallow copying is sufficient
+ wq_size_waits= groups_done= events_done= curr_jobs= 0;
+ usage_partition= 0;
+ last_group_done_index= c_rli->gaq->size; // out of range
+
+ jobs.avail= 0;
+ jobs.len= 0;
+ jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
+ jobs.waited_overfill= 0;
+ jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max;
+ curr_group_seen_begin= FALSE;
+
+ my_init_dynamic_array(&jobs.Q, sizeof(Slave_job_item), jobs.size, 0);
+ for (k= 0; k < jobs.size; k++)
+ insert_dynamic(&jobs.Q, (uchar*) &empty);
+
+ DBUG_ASSERT(jobs.Q.elements == jobs.size);
+
+ wq_overrun_set= FALSE;
+
+#ifdef HAVE_PSI_INTERFACE
+ mysql_mutex_init(key_mutex_slave_parallel_worker[i], &jobs_lock,
+ MY_MUTEX_INIT_FAST);
+ mysql_cond_init(key_cond_slave_parallel_worker[i], &jobs_cond, NULL);
+#else
+ mysql_mutex_init(NULL, &jobs_lock, MY_MUTEX_INIT_FAST);
+ mysql_cond_init(NULL, &jobs_cond, NULL);
+#endif
+
+ return 0;
+}
+
int Slave_worker::init_info()
{
int necessary_to_configure= 0;
@@ -88,8 +145,9 @@ int Slave_worker::init_info()
my_init_dynamic_array(&curr_group_exec_parts, sizeof(db_worker_hash_entry*),
SLAVE_INIT_DBS_IN_GROUP, 1);
-
- if (bitmap_init(&group_execed, NULL,
+ if (curr_group_exec_parts.max_element != 0)
+ inited_curr_group_exec_parts= 1;
+ if (bitmap_init(&group_executed, NULL,
c_rli->checkpoint_group, FALSE))
goto err;
@@ -97,7 +155,7 @@ int Slave_worker::init_info()
c_rli->checkpoint_group, FALSE))
goto err;
- inited_group_execed= 1;
+ inited_group_executed= 1;
/*
The init_info() is used to either create or read information
@@ -171,7 +229,7 @@ bool Slave_worker::read_info(Rpl_info_ha
ulong temp_checkpoint_master_log_pos= 0;
ulong temp_checkpoint_seqno= 0;
ulong nbytes= 0;
- uchar *buffer= (uchar *) group_execed.bitmap;
+ uchar *buffer= (uchar *) group_executed.bitmap;
if (from->prepare_info_for_read(nidx))
DBUG_RETURN(TRUE);
@@ -216,8 +274,8 @@ bool Slave_worker::write_info(Rpl_info_h
{
DBUG_ENTER("Master_info::write_info");
- ulong nbytes= (ulong) no_bytes_in_map(&group_execed);
- uchar *buffer= (uchar*) group_execed.bitmap;
+ ulong nbytes= (ulong) no_bytes_in_map(&group_executed);
+ uchar *buffer= (uchar*) group_executed.bitmap;
if (to->prepare_info_for_write(nidx) ||
to->set_info(group_relay_log_name) ||
@@ -261,12 +319,12 @@ bool Slave_worker::commit_positions(Log_
my_free(ptr_g->checkpoint_relay_log_name);
ptr_g->checkpoint_relay_log_name= NULL;
- bitmap_copy(&group_shifted, &group_execed);
- bitmap_clear_all(&group_execed);
+ bitmap_copy(&group_shifted, &group_executed);
+ bitmap_clear_all(&group_executed);
for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++)
{
if (bitmap_is_set(&group_shifted, pos))
- bitmap_set_bit(&group_execed, pos - ptr_g->shifted);
+ bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
}
}
/*
@@ -282,7 +340,7 @@ bool Slave_worker::commit_positions(Log_
DBUG_ASSERT(ptr_g->checkpoint_seqno <= (c_rli->checkpoint_group - 1));
- bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
+ bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
checkpoint_seqno= ptr_g->checkpoint_seqno;
group_relay_log_pos= ev->future_event_relay_log_pos;
group_master_log_pos= ev->log_pos;
@@ -505,7 +563,8 @@ static void move_temp_tables_to_entry(TH
CGAP .= D
- and a possible D's Worker id is searched in Assigne Partition Hash
+ here .= is concatenate operation,
+ and a possible D's Worker id is searched in Assigned Partition Hash
(APH) that collects tuples (P, W_id, U, mutex, cond).
In case not found,
@@ -539,6 +598,8 @@ static void move_temp_tables_to_entry(TH
@param ptr_entry reference to a pointer to the resulted entry in
the Assigne Partition Hash where
the entry's pointer is stored at return.
+ @param need_temp_tables
+ if FALSE migration of temporary tables not needed
@param last_worker caller opts for this Worker, it must be
rli->last_assigned_worker if one is determined.
@@ -613,7 +674,8 @@ Slave_worker *map_db_to_worker(const cha
*/
if (!(db= (char *) my_malloc((size_t) dblength + 1, MYF(0))))
goto err;
- if (!(entry= (db_worker_hash_entry *) my_malloc(sizeof(db_worker_hash_entry), MYF(0))))
+ if (!(entry= (db_worker_hash_entry *)
+ my_malloc(sizeof(db_worker_hash_entry), MYF(0))))
{
my_free(db);
goto err;
@@ -801,9 +863,9 @@ void Slave_worker::slave_worker_ends_gro
if (!error)
{
+ Slave_committed_queue *gaq= c_rli->gaq;
ulong gaq_idx= ev->mts_group_cnt;
- Slave_job_group *ptr_g=
- (Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx);
+ Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx);
// first ever group must have relay log name
DBUG_ASSERT(last_group_done_index != c_rli->gaq->size ||
@@ -824,6 +886,7 @@ void Slave_worker::slave_worker_ends_gro
ptr_g->done= 1; // GAQ index is available to C now
last_group_done_index= gaq_idx;
+ groups_done++;
}
/*
@@ -903,67 +966,79 @@ void Slave_worker::slave_worker_ends_gro
/**
- Class circular_buffer_queue
+ Class circular_buffer_queue.
+
+ Content of the being dequeued item is copied to the arg-pointer
+ location.
+
+ @return the queue's array index that the de-queued item
+ located at, or an error as an int outside the legacy
+ [0, size) (value `size' is excluded) range.
*/
ulong circular_buffer_queue::de_queue(uchar *val)
{
ulong ret;
- if (enter == size)
+ if (entry == size)
{
DBUG_ASSERT(len == 0);
return (ulong) -1;
}
- ret= enter;
- get_dynamic(&Q, val, enter);
+ ret= entry;
+ get_dynamic(&Q, val, entry);
len--;
// pre boundary cond
if (avail == size)
- avail= enter;
- enter= (enter + 1) % size;
+ avail= entry;
+ entry= (entry + 1) % size;
// post boundary cond
- if (avail == enter)
- enter= size;
+ if (avail == entry)
+ entry= size;
- DBUG_ASSERT(enter == size ||
- (len == (avail >= enter)? (avail - enter) :
- (size + avail - enter)));
- DBUG_ASSERT(avail != enter);
+ DBUG_ASSERT(entry == size ||
+ (len == (avail >= entry)? (avail - entry) :
+ (size + avail - entry)));
+ DBUG_ASSERT(avail != entry);
return ret;
}
/**
- removing an item from the tail side
+ Similar to de_queue() but removing an item from the tail side.
+
+ return the queue's array index that the de-queued item
+ located at, or an error.
*/
ulong circular_buffer_queue::de_tail(uchar *val)
{
- if (enter == size)
+ if (entry == size)
{
DBUG_ASSERT(len == 0);
return (ulong) -1;
}
- avail= (enter + len - 1) % size;
+ avail= (entry + len - 1) % size;
get_dynamic(&Q, val, avail);
len--;
// post boundary cond
- if (avail == enter)
- enter= size;
+ if (avail == entry)
+ entry= size;
- DBUG_ASSERT(enter == size ||
- (len == (avail >= enter)? (avail - enter) :
- (size + avail - enter)));
- DBUG_ASSERT(avail != enter);
+ DBUG_ASSERT(entry == size ||
+ (len == (avail >= entry)? (avail - entry) :
+ (size + avail - entry)));
+ DBUG_ASSERT(avail != entry);
return avail;
}
+
/**
- @return the used index at success or -1 when queue is full
+ @return the index where the arg item has been located
+ or an error.
*/
ulong circular_buffer_queue::en_queue(void *item)
{
@@ -981,20 +1056,20 @@ ulong circular_buffer_queue::en_queue(vo
// pre-boundary cond
- if (enter == size)
- enter= avail;
+ if (entry == size)
+ entry= avail;
avail= (avail + 1) % size;
len++;
// post-boundary cond
- if (avail == enter)
+ if (avail == entry)
avail= size;
- DBUG_ASSERT(avail == enter ||
- len == (avail >= enter) ?
- (avail - enter) : (size + avail - enter));
- DBUG_ASSERT(avail != enter);
+ DBUG_ASSERT(avail == entry ||
+ len == (avail >= entry) ?
+ (avail - entry) : (size + avail - entry));
+ DBUG_ASSERT(avail != entry);
return ret;
}
@@ -1002,13 +1077,13 @@ ulong circular_buffer_queue::en_queue(vo
void* circular_buffer_queue::head_queue()
{
uchar *ret= NULL;
- if (enter == size)
+ if (entry == size)
{
DBUG_ASSERT(len == 0);
}
else
{
- get_dynamic(&Q, (uchar*) ret, enter);
+ get_dynamic(&Q, (uchar*) ret, entry);
}
return (void*) ret;
}
@@ -1027,15 +1102,15 @@ void* circular_buffer_queue::head_queue(
bool circular_buffer_queue::gt(ulong i, ulong k)
{
DBUG_ASSERT(i < size && k < size);
- DBUG_ASSERT(avail != enter);
+ DBUG_ASSERT(avail != entry);
- if (i >= enter)
- if (k >= enter)
+ if (i >= entry)
+ if (k >= entry)
return i > k;
else
return FALSE;
else
- if (k >= enter)
+ if (k >= entry)
return TRUE;
else
return i > k;
@@ -1046,7 +1121,7 @@ bool Slave_committed_queue::count_done(R
{
ulong i, cnt= 0;
- for (i= enter; i != avail && !empty(); i= (i + 1) % size)
+ for (i= entry; i != avail && !empty(); i= (i + 1) % size)
{
Slave_job_group *ptr_g;
@@ -1090,7 +1165,7 @@ ulong Slave_committed_queue::move_queue_
{
ulong i, cnt= 0;
- for (i= enter; i != avail && !empty();)
+ for (i= entry; i != avail && !empty();)
{
Slave_worker *w_i;
Slave_job_group *ptr_g, g;
@@ -1098,7 +1173,8 @@ ulong Slave_committed_queue::move_queue_
ulong ind;
#ifndef DBUG_OFF
- if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && cnt == mts_checkpoint_period)
+ if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
+ cnt == mts_checkpoint_period)
return cnt;
#endif
@@ -1175,14 +1251,14 @@ ulong Slave_committed_queue::move_queue_
}
/**
- Method should be executed at slave system shutdown to
+ Method should be executed at slave system stop to
cleanup dynamically allocated items that remained as unprocessed
by shutdown time.
*/
void Slave_committed_queue::free_dynamic_items()
{
ulong i;
- for (i= enter; i != avail && !empty(); i= (i + 1) % size)
+ for (i= entry; i != avail && !empty(); i= (i + 1) % size)
{
Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
if (ptr_g->group_relay_log_name)
@@ -1200,18 +1276,11 @@ void Slave_committed_queue::free_dynamic
}
}
-void Slave_worker::do_report(loglevel level, int err_code, const char *msg, va_list vargs) const
-{
- c_rli->do_report(level, err_code, msg, vargs);
-}
-void Slave_worker::report(loglevel level, int err_code, const char *msg, ...) const
+void Slave_worker::do_report(loglevel level, int err_code, const char *msg,
+ va_list args) const
{
- va_list vargs;
- va_start(vargs, msg);
-
- do_report(level, err_code, msg, vargs);
- va_end(vargs);
+ c_rli->va_report(level, err_code, msg, args);
}
/**
@@ -1316,18 +1385,18 @@ static int en_queue(Slave_jobs_queue *jo
set_dynamic(&jobs->Q, (uchar*) item, jobs->avail);
// pre-boundary cond
- if (jobs->enter == jobs->size)
- jobs->enter= jobs->avail;
+ if (jobs->entry == jobs->size)
+ jobs->entry= jobs->avail;
jobs->avail= (jobs->avail + 1) % jobs->size;
jobs->len++;
// post-boundary cond
- if (jobs->avail == jobs->enter)
+ if (jobs->avail == jobs->entry)
jobs->avail= jobs->size;
- DBUG_ASSERT(jobs->avail == jobs->enter ||
- jobs->len == (jobs->avail >= jobs->enter) ?
- (jobs->avail - jobs->enter) : (jobs->size + jobs->avail - jobs->enter));
+ DBUG_ASSERT(jobs->avail == jobs->entry ||
+ jobs->len == (jobs->avail >= jobs->entry) ?
+ (jobs->avail - jobs->entry) : (jobs->size + jobs->avail - jobs->entry));
return jobs->avail;
}
@@ -1336,13 +1405,13 @@ static int en_queue(Slave_jobs_queue *jo
*/
static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
{
- if (jobs->enter == jobs->size)
+ if (jobs->entry == jobs->size)
{
DBUG_ASSERT(jobs->len == 0);
ret->data= NULL; // todo: move to caller
return NULL;
}
- get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+ get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
DBUG_ASSERT(ret->data); // todo: move to caller
@@ -1355,26 +1424,27 @@ static void * head_queue(Slave_jobs_queu
*/
Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
{
- if (jobs->enter == jobs->size)
+ if (jobs->entry == jobs->size)
{
DBUG_ASSERT(jobs->len == 0);
return NULL;
}
- get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+ get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
jobs->len--;
// pre boundary cond
if (jobs->avail == jobs->size)
- jobs->avail= jobs->enter;
- jobs->enter= (jobs->enter + 1) % jobs->size;
+ jobs->avail= jobs->entry;
+ jobs->entry= (jobs->entry + 1) % jobs->size;
// post boundary cond
- if (jobs->avail == jobs->enter)
- jobs->enter= jobs->size;
+ if (jobs->avail == jobs->entry)
+ jobs->entry= jobs->size;
- DBUG_ASSERT(jobs->enter == jobs->size ||
- (jobs->len == (jobs->avail >= jobs->enter)? (jobs->avail - jobs->enter) :
- (jobs->size + jobs->avail - jobs->enter)));
+ DBUG_ASSERT(jobs->entry == jobs->size ||
+ (jobs->len == (jobs->avail >= jobs->entry) ?
+ (jobs->avail - jobs->entry) :
+ (jobs->size + jobs->avail - jobs->entry)));
return ret;
}
@@ -1423,7 +1493,7 @@ void append_item_to_jobs(slave_job_item
}
rli->pending_jobs++;
rli->mts_pending_jobs_size= new_pend_size;
- rli->stmt_jobs++;
+ rli->mts_events_assigned++;
mysql_mutex_unlock(&rli->pending_jobs_lock);
@@ -1621,13 +1691,12 @@ int slave_worker_exec_job(Slave_worker *
worker->slave_worker_ends_group(ev, error); /* last done sets post exec */
#ifndef DBUG_OFF
- worker->processed_group++;
DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group"
- " %u processed %u debug %d\n", worker->id, mts_checkpoint_group,
- worker->processed_group,
+ " %u processed %lu debug %d\n", worker->id, mts_checkpoint_group,
+ worker->groups_done,
DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
- mts_checkpoint_group == worker->processed_group)
+ mts_checkpoint_group == worker->groups_done)
{
DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id));
while (true) my_sleep(6000000);
@@ -1693,7 +1762,7 @@ int slave_worker_exec_job(Slave_worker *
mysql_mutex_unlock(&rli->pending_jobs_lock);
- worker->stmt_jobs++;
+ worker->events_done++;
err:
if (error)
@@ -1705,13 +1774,10 @@ err:
worker->slave_worker_ends_group(ev, error);
}
- // rows_query_log_event is deleted as a part of the statement cleanup
-
- // todo: sync_slave_with_master fails when my_sleep(1000) is put here
+ // todo: similate delay in delete
if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
{
- worker->last_event= ev;
delete ev;
}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2011-06-27 17:31:45 +0000
+++ b/sql/rpl_rli_pdb.h 2011-07-01 12:48:25 +0000
@@ -7,7 +7,7 @@
#include <my_sys.h>
#include <my_bitmap.h>
-/* APH entry */
+/* Assigned Partition Hash (APH) entry */
typedef struct st_db_worker_hash_entry
{
uint db_len;
@@ -18,12 +18,13 @@ typedef struct st_db_worker_hash_entry
The list of temp tables belonging to @ db database is
attached to an assigned @c worker to become its thd->temporary_tables.
The list is updated with every ddl incl CREATE, DROP.
- It is removed from the entry and merged to the coordinator's thd->temporary_tables
- in case of events: slave stops, the db-to-worker hash oversize.
+ It is removed from the entry and merged to the coordinator's
+ thd->temporary_tables in case of events: slave stops, APH oversize.
*/
TABLE* volatile temporary_tables;
- /* todo: relax concurrency after making APH mutex/cond pair has worked
+ /* todo: relax concurrency to mimic record-level locking.
+ That is to augmenting the entry with mutex/cond pair
pthread_mutex_t
pthread_cond_t
timestamp updated_at; */
@@ -64,14 +65,14 @@ public:
DYNAMIC_ARRAY Q;
ulong size; // the Size of the queue in terms of element
ulong avail; // first Available index to append at (next to tail)
- ulong enter; // the head index
+ ulong entry; // the head index or the entry point to the queue.
volatile ulong len; // actual length
bool inited_queue;
circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
- size(max), avail(0), enter(max), len(0), inited_queue(FALSE)
+ size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
{
- DBUG_ASSERT(size < ULONG_MAX);
+ DBUG_ASSERT(size < (ulong) -1);
if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
inited_queue= TRUE;
}
@@ -99,9 +100,7 @@ public:
/**
return the index where the arg item locates
or an error encoded as a value in beyond of the legacy range
- [0, circular_buffer_max_index].
-
- Todo: define the range.
+ [0, size) (value `size' is excluded).
*/
ulong en_queue(void *item);
/**
@@ -111,8 +110,8 @@ public:
bool gt(ulong i, ulong k); // comparision of ordering of two entities
/* index is within the valid range */
bool in(ulong k) { return !empty() &&
- (enter > avail ? (k >= enter || k < avail) : (k >= enter && k < avail)); }
- bool empty() { return enter == size; }
+ (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
+ bool empty() { return entry == size; }
bool full() { return avail == size; }
};
@@ -132,13 +131,12 @@ typedef struct st_slave_job_group
*/
char *group_relay_log_name; // The value is last seen relay-log
my_off_t group_relay_log_pos; // filled by W
-
ulong worker_id;
Slave_worker *worker;
ulonglong total_seqno;
my_off_t master_log_pos; // B-event log_pos
- /* checkpoint coord are reset by CP and rotate:s */
+ /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
uint checkpoint_seqno;
my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
char* checkpoint_log_name;
@@ -210,6 +208,15 @@ public:
ulong move_queue_head(DYNAMIC_ARRAY *ws);
/* Method is for slave shutdown time cleanup */
void free_dynamic_items();
+ /*
+ returns a pointer to Slave_job_group struct instance as indexed by arg
+ in the circular buffer dyn-array
+ */
+ Slave_job_group* get_job_group(ulong ind)
+ {
+ return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
+ }
+
};
class Slave_jobs_queue : public circular_buffer_queue
@@ -228,50 +235,43 @@ public:
Relay_log_info *rli);
virtual ~Slave_worker();
- mysql_mutex_t jobs_lock;
- mysql_cond_t jobs_cond;
- Slave_jobs_queue jobs;
-
- Relay_log_info *c_rli;
-
- DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
-
- bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
- List<Log_event> data_in_use; // events are still in use by SQL thread
- ulong id;
- TABLE *current_table;
-
- // rbr
- // RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
- // uint tables_to_lock_count; /* RBR: Count of tables to lock */
- // table_mapping m_table_map; /* RBR: Mapping table-id to table */
-
- // statictics
+ Slave_jobs_queue jobs; // assignment queue containing events to execute
+ mysql_mutex_t jobs_lock; // mutex for the jobs queue
+ mysql_cond_t jobs_cond; // condition variable for the jobs queue
+ Relay_log_info *c_rli; // pointer to Coordinator's rli
+ DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
+ bool curr_group_seen_begin; // is set to TRUE with explicit B-event
+ ulong id; // numberic identifier of the Worker
/*
- @c last_group_done_index is for statistics
- to mean the index in GAQ of the last processed group.
+ Worker runtime statictics
*/
- volatile ulong last_group_done_index; // it's index in GAQ
- ulong wq_empty_waits; // to gather statistics how many times got idle
- ulong stmt_jobs; // how many jobs per stmt
- ulong trans_jobs; // how many jobs per trns
- volatile int curr_jobs; // the current assignments
- long usage_partition; // number of different partitions handled by this worker
+ // the index in GAQ of the last processed group by this Worker
+ volatile ulong last_group_done_index;
+ ulong wq_empty_waits; // how many times got idle
+ ulong events_done; // how many events (statements) processed
+ ulong groups_done; // how many groups (transactions) processed
+ volatile int curr_jobs; // number of active assignments
+ // number of partitions allocated to the worker at point in time
+ long usage_partition;
+
volatile bool relay_log_change_notified; // Coord sets and resets, W can read
volatile bool checkpoint_notified; // Coord sets and resets, W can read
- ulong bitmap_shifted; // shift the last bitmap at receiving new CP
- bool wq_overrun_set; // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
+ ulong bitmap_shifted; // shift the last bitmap at receiving new CP
+ bool wq_overrun_set; // W marks inself as incrementer of rli->mts_wq_excess
+
+ /*
+ Coordinatates of the last CheckPoint (CP) this Worker has
+ acknowledged; part of is persisent data
+ */
char checkpoint_relay_log_name[FN_REFLEN];
ulonglong checkpoint_relay_log_pos;
-
char checkpoint_master_log_name[FN_REFLEN];
ulonglong checkpoint_master_log_pos;
- ulong checkpoint_seqno;
- MY_BITMAP group_execed;
- MY_BITMAP group_shifted;
- bool inited_group_execed;
- uint processed_group;
+ MY_BITMAP group_executed; // bitmap describes groups executed after last CP
+ MY_BITMAP group_shifted; // temporary bitmap to compute group_executed
+ bool inited_group_executed;
+ ulong checkpoint_seqno; // the most significant ON bit in group_executed
enum en_running_state
{
NOT_RUNNING= 0,
@@ -279,26 +279,24 @@ public:
KILLED
};
en_running_state volatile running_status;
- Log_event *last_event;
+ bool inited_curr_group_exec_parts;
+ int init_worker(Relay_log_info*, ulong);
int init_info();
void end_info();
int flush_info(bool force= FALSE);
-
size_t get_number_worker_fields();
-
- void slave_worker_ends_group(Log_event*, int); // CGEP walk through to upd APH
-
+ void slave_worker_ends_group(Log_event*, int);
bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
- void report(loglevel level, int err_code, const char *msg, ...) const
- ATTRIBUTE_FORMAT(printf, 4, 5);
- void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
+protected:
+
+ virtual void do_report(loglevel level, int err_code,
+ const char *msg, va_list v_args) const;
private:
bool read_info(Rpl_info_handler *from);
bool write_info(Rpl_info_handler *to);
-
Slave_worker& operator=(const Slave_worker& info);
Slave_worker(const Slave_worker& info);
};
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-07-01 08:45:11 +0000
+++ b/sql/rpl_slave.cc 2011-07-01 13:41:35 +0000
@@ -84,24 +84,22 @@ const char *relay_log_basename= 0;
of Relay_log_info::gaq (see @c slave_start_workers()).
It can be set to any value in [1, ULONG_MAX - 1] range.
*/
-ulong mts_slave_worker_queue_len_max= 32768;
+const ulong mts_slave_worker_queue_len_max= 32768;
/*
MTS load-ballancing parameter.
Time in microsecs to sleep by MTS Coordinator to avoid the Worker queues
room overrun.
*/
-ulong mts_coordinator_basic_nap= 5;
+const ulong mts_coordinator_basic_nap= 5;
/*
MTS load-ballancing parameter.
Percent of Worker queue size at which Worker is considered to become
hungry.
*/
-ulong mts_worker_underrun_level= 10;
-
-
-
+const ulong mts_worker_underrun_level= 10;
+Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
void append_item_to_jobs(slave_job_item *job_item,
Slave_worker *w, Relay_log_info *rli);
@@ -2820,10 +2818,11 @@ int apply_event_and_update_pos(Log_event
// all events except BEGIN-query must be marked with a non-NULL Worker
DBUG_ASSERT(((Slave_worker*) ev->worker) == rli->last_assigned_worker);
- DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
+ DBUG_PRINT("Log_event::apply_event:",
+ ("-> job item data %p to W_%lu", job_item->data, w->id));
// Reset mts in-group state
- if (ev->ends_group() || !rli->curr_group_seen_begin)
+ if (rli->mts_group_status == Relay_log_info::MTS_END_GROUP)
{
// CGAP cleanup
for (uint i= rli->curr_group_assigned_parts.elements; i > 0; i--)
@@ -2943,12 +2942,7 @@ int apply_event_and_update_pos(Log_event
else
{
DBUG_ASSERT(*ptr_ev == ev || rli->is_parallel_exec());
- /*
- event_relay_log_pos is an anchor to possible reading restart.
- It may become lt than group_* value.
- However event_relay_log_pos does not affect group_relay_log_pos
- othen that through the sequentially executed events or via checkpoint.
- */
+
rli->inc_event_relay_log_pos();
}
@@ -3074,15 +3068,14 @@ static int exec_relay_log_event(THD* thd
/*
This tests if the position of the beginning of the current event
hits the UNTIL barrier.
- MTS: since master,relay-group coordinates change per checkpoint
- at the end of the checkpoint interval UNTIL can be left far behind.
+ MTS: since the master and the relay-group coordinates change
+ asynchronously logics of rli->is_until_satisfied() can't apply.
Hence, UNTIL forces the sequential applying.
*/
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
rli->is_until_satisfied(thd, ev))
{
char buf[22];
-
sql_print_information("Slave SQL thread stopped because it reached its"
" UNTIL position %s", llstr(rli->until_pos(), buf));
/*
@@ -3137,7 +3130,7 @@ static int exec_relay_log_event(THD* thd
ev= NULL;
}
}
-
+
/*
update_log_pos failed: this should not happen, so we don't
retry.
@@ -3754,11 +3747,8 @@ int check_temp_dir(char* tmp_file)
DBUG_RETURN(0);
}
-Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
-
/*
- Worker thread for the parallel execution of the replication events
- MHS_todo: consider how to handle error
+ Worker thread for the parallel execution of the replication events.
*/
pthread_handler_t handle_slave_worker(void *arg)
{
@@ -3848,8 +3838,9 @@ pthread_handler_t handle_slave_worker(vo
sql_print_information("Worker %lu statistics: "
"events processed = %lu "
"hungry waits = %lu "
- "priv queue overfills = %llu "
- ,w->id, w->stmt_jobs, w->wq_size_waits, w->jobs.waited_overfill);
+ "priv queue overfills = %llu ",
+ w->id, w->events_done, w->wq_size_waits,
+ w->jobs.waited_overfill);
mysql_cond_signal(&w->jobs_cond); // famous last goodbye
mysql_mutex_unlock(&w->jobs_lock);
@@ -3935,7 +3926,8 @@ bool mts_recovery_groups(Relay_log_info
Slave_worker *worker=
Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
worker->init_info();
- LOG_POS_COORD w_last= {worker->group_master_log_name, worker->group_master_log_pos};
+ LOG_POS_COORD w_last= { const_cast<char*>(worker->get_group_master_log_name()),
+ worker->get_group_master_log_pos() };
if (mts_event_coord_cmp(&w_last, &cp) > 0)
{
/*
@@ -3990,16 +3982,17 @@ bool mts_recovery_groups(Relay_log_info
{
Slave_worker *w= ((Slave_job_group *)
dynamic_array_ptr(&above_lwm_jobs, it_job))->worker;
- LOG_POS_COORD w_last= { w->group_master_log_name, w->group_master_log_pos };
+ LOG_POS_COORD w_last= { const_cast<char*>(w->get_group_master_log_name()),
+ w->get_group_master_log_pos() };
sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
"group_relay_log_name %s, group_relay_log_pos %lu "
"group_master_log_name %s, group_master_log_pos %lu",
w->id,
- w->group_relay_log_name,
- (ulong) w->group_relay_log_pos,
- w->group_master_log_name,
- (ulong) w->group_master_log_pos);
+ w->get_group_relay_log_name(),
+ (ulong) w->get_group_relay_log_pos(),
+ w->get_group_master_log_name(),
+ (ulong) w->get_group_master_log_pos());
recovery_group_cnt= 0;
not_reached_commit= true;
@@ -4052,14 +4045,14 @@ bool mts_recovery_groups(Relay_log_info
sql_print_information("Group Recoverying relay log info "
"group_master_log_name %s, event_master_log_pos %llu.",
- rli->group_master_log_name,
+ rli->get_group_master_log_name(),
ev->log_pos);
if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
{
#ifndef DBUG_OFF
for (uint i= 0; i <= w->checkpoint_seqno; i++)
{
- if (bitmap_is_set(&w->group_execed, i))
+ if (bitmap_is_set(&w->group_executed, i))
DBUG_PRINT("mts", ("Bit %u is set.", i));
else
DBUG_PRINT("mts", ("Bit %u is not set.", i));
@@ -4073,7 +4066,7 @@ bool mts_recovery_groups(Relay_log_info
for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt,
j= 0; i <= w->checkpoint_seqno; i++, j++)
{
- if (bitmap_is_set(&w->group_execed, i))
+ if (bitmap_is_set(&w->group_executed, i))
{
DBUG_PRINT("mts", ("Setting bit %u.", j));
bitmap_fast_test_and_set(groups, j);
@@ -4119,8 +4112,8 @@ err:
}
/**
- Processing rli->gaq to find out the low-water-mark coordinates
- stored into the cental recovery table.
+ Processing rli->gaq to find out the low-water-mark (lwm) coordinates
+ which is stored into the cental recovery table.
@param rli pointer to Relay-log-info of Coordinator
@param period period of processing GAQ, normally derived from
@@ -4164,12 +4157,6 @@ bool mts_checkpoint_routine(Relay_log_in
DBUG_RETURN(FALSE);
}
- /*
- This checks how many consecutive jobs where processed.
- If this value is different than zero the checkpoint
- routine can proceed. Otherwise, there is nothing to be
- done.
- */
do
{
cnt= rli->gaq->move_queue_head(&rli->workers);
@@ -4181,6 +4168,12 @@ bool mts_checkpoint_routine(Relay_log_in
} while (cnt == 0 && (rli->gaq->full() || force) &&
!DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
(my_sleep(rli->mts_coordinator_basic_nap), 1));
+ /*
+ This checks how many consecutive jobs where processed.
+ If this value is different than zero the checkpoint
+ routine can proceed. Otherwise, there is nothing to be
+ done.
+ */
if (cnt == 0)
goto end;
@@ -4200,16 +4193,11 @@ bool mts_checkpoint_routine(Relay_log_in
mysql_mutex_lock(&rli->data_lock);
/*
- Coordinator::commit_positions() {
+ "Coordinator::commit_positions" {
- rli->gaq->lwm contains all but rli->group_master_log_name
-
- group_master_log_name is updated only by Coordinator and it can't change
- within checkpoint interval because Coordinator flushes the updated value
- at once.
- Note, unlike group_master_log_name, event_relay_log_pos is updated solely
- within Coordinator read loop context. Hence, it's possible at times
- event_rlp > group_rlp.
+ rli->gaq->lwm has been updated in move_queue_head() and
+ to contain all but rli->group_master_log_name which
+ is altered solely by Coordinator at special checkpoints.
*/
rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
@@ -4244,6 +4232,8 @@ bool mts_checkpoint_routine(Relay_log_in
*/
rli->reset_notified_checkpoint(cnt, rli->gaq->lwm.ts);
+ /* end-of "Coordinator::"commit_positions" */
+
end:
#ifndef DBUG_OFF
if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0))
@@ -4255,17 +4245,18 @@ end:
}
/**
- A single Worker thread is forked out.
+ Instantiation of a Slave_worker and forking out a single Worker thread.
+ @param rli Coordinator's Relay_log_info pointer
+ @param i identifier of the Worker
+
@return 0 suppress or 1 if fails
*/
int slave_start_single_worker(Relay_log_info *rli, ulong i)
{
int error= 0;
- uint k;
pthread_t th;
Slave_worker *w= NULL;
- Slave_job_item empty= {NULL};
if (!(w=
Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli)))
@@ -4275,52 +4266,14 @@ int slave_start_single_worker(Relay_log_
goto err;
}
- w->c_rli= rli;
- w->tables_to_lock= NULL;
- w->tables_to_lock_count= 0;
-
- if (w->init_info())
+ if (w->init_worker(rli, i))
{
sql_print_error("Failed during slave worker thread create");
error= 1;
goto err;
}
-
- w->curr_group_exec_parts.elements= 0;
- w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
- w->checkpoint_notified= FALSE;
- w->bitmap_shifted= 0;
- w->workers= rli->workers; // shallow copying is sufficient
- w->this_worker= w;
- w->wq_size_waits= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
- w->id= i;
- w->current_table= NULL;
- w->usage_partition= 0;
- w->last_group_done_index= rli->gaq->size; // out of range
-
- w->jobs.avail= 0;
- w->jobs.len= 0;
- w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
- w->jobs.waited_overfill= 0;
- w->jobs.enter= w->jobs.size= rli->mts_slave_worker_queue_len_max;
- my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.size, 0);
- for (k= 0; k < w->jobs.size; k++)
- insert_dynamic(&w->jobs.Q, (uchar*) &empty);
-
- DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size);
-
- w->wq_overrun_set= FALSE;
set_dynamic(&rli->workers, (uchar*) &w, i);
-#ifdef HAVE_PSI_INTERFACE
- mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
- MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL);
-#else
- mysql_mutex_init(NULL, &w->jobs_lock, MY_MUTEX_INIT_FAST);
- mysql_cond_init(NULL, &w->jobs_cond, NULL);
-#endif
- w->curr_group_seen_begin= FALSE;
if (pthread_create(&th, &connection_attrib, handle_slave_worker,
(void*) w))
{
@@ -4340,7 +4293,14 @@ err:
return error;
}
+/**
+ Initialization of the central rli members for Coordinator's role,
+ communication channels such as Assigned Partition Hash (APH),
+ and starting the Worker pool.
+ @return 0 success
+ non-zero as failure
+*/
int slave_start_workers(Relay_log_info *rli, ulong n)
{
uint i;
@@ -4354,7 +4314,8 @@ int slave_start_workers(Relay_log_info *
rli->init_workers(n);
// CGAP dynarray holds id:s of partitions of the Current being executed Group
- my_init_dynamic_array(&rli->curr_group_assigned_parts, sizeof(db_worker_hash_entry*),
+ my_init_dynamic_array(&rli->curr_group_assigned_parts,
+ sizeof(db_worker_hash_entry*),
SLAVE_INIT_DBS_IN_GROUP, 1);
rli->last_assigned_worker= NULL; // associated with curr_group_assigned
my_init_dynamic_array(&rli->curr_group_da, sizeof(Log_event*), 8, 2);
@@ -4364,13 +4325,13 @@ int slave_start_workers(Relay_log_info *
/*
GAQ queue holds seqno:s of scheduled groups. C polls workers in
@c lwm_checkpoint_period to update GAQ (see @c next_event())
- The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max to guarantee
- each assigned job being sent to a WQ will find room in GAQ.
+ The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max
+ to guarantee each assigned job being sent to a WQ will find room in GAQ.
mts_slave_worker_queue_len_max * num-of-W:s is the max length case
all jobs contain one event.
*/
- // size of WQ stays fixed in one slave session
+ // length of WQ is actually constant though can be made configurable
rli->mts_slave_worker_queue_len_max= mts_slave_worker_queue_len_max;
rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
sizeof(Slave_job_group),
@@ -4387,7 +4348,6 @@ int slave_start_workers(Relay_log_info *
rli->mts_wq_oversize= FALSE;
rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap;
rli->mts_worker_underrun_level= mts_worker_underrun_level;
- rli->mts_total_groups= 0;
rli->curr_group_seen_begin= FALSE;
rli->curr_group_isolated= FALSE;
rli->checkpoint_seqno= 0;
@@ -4526,7 +4486,7 @@ void slave_stop_workers(Relay_log_info *
"waited due a Worker queue full = %lu "
"waited due the total size = %lu "
"sleept when Workers occupied = %lu ",
- rli->stmt_jobs, rli->mts_wq_overrun_cnt,
+ rli->mts_events_assigned, rli->mts_wq_overrun_cnt,
rli->mts_wq_overfill_cnt, rli->wq_size_waits,
rli->mts_wq_no_underrun_cnt);
@@ -4586,7 +4546,6 @@ pthread_handler_t handle_slave_sql(void
rli->slave_running = 1;
pthread_detach_this_thread();
-
if (init_slave_thread(thd, SLAVE_THD_SQL))
{
/*
@@ -4876,7 +4835,7 @@ llstr(rli->get_group_master_log_pos(), l
thd->reset_query();
thd->reset_db(NULL, 0);
- slave_stop_workers(rli); // mts-II: stopping the worker pool
+ slave_stop_workers(rli);
THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
mysql_mutex_lock(&rli->run_lock);
@@ -7222,7 +7181,7 @@ bool change_master(THD* thd, Master_info
in-memory value at restart (thus causing errors, as the old relay log does
not exist anymore).
- Notice that there are not writes to the rli table as slave is not
+ Notice that the rli table is available exclusively as slave is not
running.
*/
DBUG_ASSERT(!mi->rli->slave_running);
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h 2011-06-29 07:04:19 +0000
+++ b/sql/sql_class.h 2011-07-01 13:41:35 +0000
@@ -1945,7 +1945,8 @@ public:
/*
MTS: accessor to binlog_accessed_db_names list
*/
- List<char> * get_binlog_accessed_db_names() {
+ List<char> * get_binlog_accessed_db_names()
+ {
return binlog_accessed_db_names;
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3329) | Andrei Elkin | 4 Jul |