List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:July 1 2011 1:42pm
Subject:bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3329)
View as plain text  
 3329 Andrei Elkin	2011-07-01 [merge]
      wl5569 MTS
      
      merging from the main repo.

    removed:
      mysql-test/suite/innodb/include/innodb_stats_bootstrap.inc
      storage/innobase/scripts/
      storage/innobase/scripts/persistent_storage.sql
    added:
      mysql-test/r/join_cache_jcl0.result
      mysql-test/r/mysql_embedded.result
      mysql-test/suite/innodb/r/innodb_buffer_pool_load.result
      mysql-test/suite/innodb/t/innodb_buffer_pool_load-master.opt
      mysql-test/suite/innodb/t/innodb_buffer_pool_load.test
      mysql-test/suite/perfschema/r/table_schema.result
      mysql-test/suite/perfschema/t/table_schema.test
      mysql-test/suite/sys_vars/r/innodb_buffer_pool_dump_at_shutdown_basic.result
      mysql-test/suite/sys_vars/r/innodb_buffer_pool_dump_now_basic.result
      mysql-test/suite/sys_vars/r/innodb_buffer_pool_filename_basic.result
      mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_abort_basic.result
      mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_at_startup_basic.result
      mysql-test/suite/sys_vars/r/innodb_buffer_pool_load_now_basic.result
      mysql-test/suite/sys_vars/t/innodb_buffer_pool_dump_at_shutdown_basic.test
      mysql-test/suite/sys_vars/t/innodb_buffer_pool_dump_now_basic.test
      mysql-test/suite/sys_vars/t/innodb_buffer_pool_filename_basic.test
      mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_abort_basic.test
      mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_at_startup_basic.test
      mysql-test/suite/sys_vars/t/innodb_buffer_pool_load_now_basic.test
      mysql-test/t/join_cache_jcl0.test
      mysql-test/t/mysql_embedded.test
      storage/innobase/buf/buf0dump.c
      storage/innobase/include/buf0dump.h
      unittest/gunit/bounds_checked_array-t.cc
    modified:
      .bzrignore
      cmake/make_dist.cmake.in
      cmake/os/WindowsCache.cmake
      cmake/plugin.cmake
      config.h.cmake
      configure.cmake
      extra/perror.c
      extra/yassl/src/yassl_error.cpp
      libmysqld/emb_qcache.cc
      libmysqld/lib_sql.cc
      mysql-test/collections/default.experimental
      mysql-test/extra/rpl_tests/rpl_loaddata.test
      mysql-test/extra/rpl_tests/rpl_mts_crash_safe.inc
      mysql-test/extra/rpl_tests/rpl_stm_EE_err2.test
      mysql-test/include/icp_tests.inc
      mysql-test/include/join_cache.inc
      mysql-test/include/subquery.inc
      mysql-test/include/subquery_mat.inc
      mysql-test/include/subquery_sj.inc
      mysql-test/mysql-test-run.pl
      mysql-test/r/1st.result
      mysql-test/r/alter_table.result
      mysql-test/r/connect.result
      mysql-test/r/count_distinct.result
      mysql-test/r/distinct.result
      mysql-test/r/events_bugs.result
      mysql-test/r/flush.result
      mysql-test/r/gis-precise.result
      mysql-test/r/gis-rtree.result
      mysql-test/r/group_by.result
      mysql-test/r/innodb_icp.result
      mysql-test/r/innodb_icp_all.result
      mysql-test/r/innodb_icp_none.result
      mysql-test/r/join_cache_jcl1.result
      mysql-test/r/join_cache_jcl2.result
      mysql-test/r/join_cache_jcl3.result
      mysql-test/r/join_cache_jcl4.result
      mysql-test/r/join_cache_jcl5.result
      mysql-test/r/join_cache_jcl6.result
      mysql-test/r/join_cache_jcl7.result
      mysql-test/r/join_cache_jcl8.result
      mysql-test/r/join_nested.result
      mysql-test/r/join_nested_jcl6.result
      mysql-test/r/join_outer.result
      mysql-test/r/join_outer_jcl6.result
      mysql-test/r/log_tables_upgrade.result
      mysql-test/r/lowercase_table4.result
      mysql-test/r/multi_update.result
      mysql-test/r/multi_update_innodb.result
      mysql-test/r/myisam_icp.result
      mysql-test/r/myisam_icp_all.result
      mysql-test/r/myisam_icp_none.result
      mysql-test/r/mysql_upgrade.result
      mysql-test/r/mysql_upgrade_ssl.result
      mysql-test/r/mysqlcheck.result
      mysql-test/r/null_key_all.result
      mysql-test/r/null_key_icp.result
      mysql-test/r/null_key_none.result
      mysql-test/r/order_by_all.result
      mysql-test/r/order_by_icp_mrr.result
      mysql-test/r/order_by_none.result
      mysql-test/r/partition.result
      mysql-test/r/partition_datatype.result
      mysql-test/r/plugin_auth.result
      mysql-test/r/ps.result
      mysql-test/r/query_cache_28249.result
      mysql-test/r/range_all.result
      mysql-test/r/select_all.result
      mysql-test/r/select_all_jcl6.result
      mysql-test/r/select_found.result
      mysql-test/r/select_icp_mrr.result
      mysql-test/r/select_icp_mrr_jcl6.result
      mysql-test/r/select_none.result
      mysql-test/r/select_none_jcl6.result
      mysql-test/r/sp_notembedded.result
      mysql-test/r/sp_sync.result
      mysql-test/r/subquery_all.result
      mysql-test/r/subquery_all_jcl6.result
      mysql-test/r/subquery_mat.result
      mysql-test/r/subquery_mat_all.result
      mysql-test/r/subquery_mat_none.result
      mysql-test/r/subquery_nomat_nosj.result
      mysql-test/r/subquery_nomat_nosj_jcl6.result
      mysql-test/r/subquery_none.result
      mysql-test/r/subquery_none_jcl6.result
      mysql-test/r/subquery_sj_all.result
      mysql-test/r/subquery_sj_all_jcl6.result
      mysql-test/r/subquery_sj_all_jcl7.result
      mysql-test/r/subquery_sj_dupsweed.result
      mysql-test/r/subquery_sj_dupsweed_jcl6.result
      mysql-test/r/subquery_sj_dupsweed_jcl7.result
      mysql-test/r/subquery_sj_firstmatch.result
      mysql-test/r/subquery_sj_firstmatch_jcl6.result
      mysql-test/r/subquery_sj_firstmatch_jcl7.result
      mysql-test/r/subquery_sj_loosescan.result
      mysql-test/r/subquery_sj_loosescan_jcl6.result
      mysql-test/r/subquery_sj_loosescan_jcl7.result
      mysql-test/r/subquery_sj_mat.result
      mysql-test/r/subquery_sj_mat_jcl6.result
      mysql-test/r/subquery_sj_mat_jcl7.result
      mysql-test/r/subquery_sj_mat_nosj.result
      mysql-test/r/subquery_sj_none.result
      mysql-test/r/subquery_sj_none_jcl6.result
      mysql-test/r/subquery_sj_none_jcl7.result
      mysql-test/r/subselect_innodb.result
      mysql-test/r/symlink.result
      mysql-test/r/system_mysql_db.result
      mysql-test/r/trigger-compat.result
      mysql-test/r/trigger.result
      mysql-test/r/type_datetime.result
      mysql-test/suite/funcs_1/r/is_columns_mysql.result
      mysql-test/suite/funcs_1/r/is_columns_mysql_embedded.result
      mysql-test/suite/funcs_1/r/is_key_column_usage.result
      mysql-test/suite/funcs_1/r/is_statistics.result
      mysql-test/suite/funcs_1/r/is_statistics_mysql.result
      mysql-test/suite/funcs_1/r/is_statistics_mysql_embedded.result
      mysql-test/suite/funcs_1/r/is_table_constraints.result
      mysql-test/suite/funcs_1/r/is_table_constraints_mysql.result
      mysql-test/suite/funcs_1/r/is_table_constraints_mysql_embedded.result
      mysql-test/suite/funcs_1/r/is_tables_mysql.result
      mysql-test/suite/funcs_1/r/is_tables_mysql_embedded.result
      mysql-test/suite/innodb/r/innodb-index.result
      mysql-test/suite/innodb/r/innodb-system-table-view.result
      mysql-test/suite/innodb/r/innodb-use-sys-malloc.result
      mysql-test/suite/innodb/r/innodb-zip.result
      mysql-test/suite/innodb/r/innodb_bug57904.result
      mysql-test/suite/innodb/t/innodb-index.test
      mysql-test/suite/innodb/t/innodb-system-table-view.test
      mysql-test/suite/innodb/t/innodb-use-sys-malloc-master.opt
      mysql-test/suite/innodb/t/innodb-use-sys-malloc.test
      mysql-test/suite/innodb/t/innodb-zip.test
      mysql-test/suite/innodb/t/innodb_bug11933790.test
      mysql-test/suite/innodb/t/innodb_bug57904.test
      mysql-test/suite/innodb/t/innodb_bug60049.test
      mysql-test/suite/innodb/t/innodb_prefix_index_restart_server.test
      mysql-test/suite/innodb/t/innodb_stats.test
      mysql-test/suite/innodb/t/innodb_stats_drop_locked.test
      mysql-test/suite/perfschema/include/cleanup_helper.inc
      mysql-test/suite/perfschema/include/upgrade_check.inc
      mysql-test/suite/perfschema/r/all_tests.result
      mysql-test/suite/perfschema/r/pfs_upgrade.result
      mysql-test/suite/perfschema/r/schema.result
      mysql-test/suite/perfschema/r/selects.result
      mysql-test/suite/perfschema/t/all_tests.test
      mysql-test/suite/perfschema/t/selects.test
      mysql-test/suite/rpl/r/rpl_loaddata.result
      mysql-test/suite/rpl/r/rpl_mixed_mts_crash_safe.result
      mysql-test/suite/rpl/r/rpl_parallel_start_stop.result
      mysql-test/suite/rpl/r/rpl_row_mts_crash_safe.result
      mysql-test/suite/rpl/r/rpl_stm_EE_err2.result
      mysql-test/suite/rpl/r/rpl_stm_loaddata_concurrent.result
      mysql-test/suite/rpl/r/rpl_stm_mts_crash_safe.result
      mysql-test/suite/rpl/t/rpl_mixed_mts_crash_safe.test
      mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
      mysql-test/suite/rpl/t/rpl_row_mts_crash_safe.test
      mysql-test/suite/rpl/t/rpl_stm_mts_crash_safe.test
      mysql-test/t/alter_table.test
      mysql-test/t/disabled.def
      mysql-test/t/events_bugs.test
      mysql-test/t/flush.test
      mysql-test/t/gis-precise.test
      mysql-test/t/gis-rtree.test
      mysql-test/t/group_by.test
      mysql-test/t/join_outer.test
      mysql-test/t/multi_update.test
      mysql-test/t/multi_update_innodb.test
      mysql-test/t/partition.test
      mysql-test/t/partition_datatype.test
      mysql-test/t/ps.test
      mysql-test/t/query_cache_28249.test
      mysql-test/t/sp_notembedded.test
      mysql-test/t/sp_sync.test
      mysql-test/t/symlink.test
      mysql-test/t/system_mysql_db_fix40123.test
      mysql-test/t/system_mysql_db_fix50030.test
      mysql-test/t/system_mysql_db_fix50117.test
      mysql-test/t/trigger-compat.test
      mysql-test/t/type_datetime.test
      mysys/mf_pack.c
      scripts/mysql_install_db.pl.in
      scripts/mysql_install_db.sh
      scripts/mysql_system_tables.sql
      sql/binlog.cc
      sql/event_db_repository.cc
      sql/event_parse_data.cc
      sql/event_parse_data.h
      sql/event_scheduler.cc
      sql/field.cc
      sql/filesort.cc
      sql/gcalc_tools.cc
      sql/ha_ndbcluster_binlog.cc
      sql/ha_partition.cc
      sql/handler.cc
      sql/item.cc
      sql/item.h
      sql/item_cmpfunc.cc
      sql/item_cmpfunc.h
      sql/item_func.cc
      sql/item_func.h
      sql/item_row.cc
      sql/item_row.h
      sql/item_strfunc.cc
      sql/item_strfunc.h
      sql/item_subselect.cc
      sql/item_subselect.h
      sql/item_sum.cc
      sql/item_timefunc.cc
      sql/item_timefunc.h
      sql/log_event.cc
      sql/log_event_old.cc
      sql/mdl.h
      sql/opt_range.cc
      sql/opt_sum.cc
      sql/partition_info.cc
      sql/protocol.cc
      sql/rpl_master.cc
      sql/rpl_reporting.cc
      sql/rpl_rli.cc
      sql/rpl_rli_pdb.cc
      sql/rpl_slave.cc
      sql/share/errmsg-utf8.txt
      sql/sp.cc
      sql/sp_head.cc
      sql/spatial.cc
      sql/sql_acl.cc
      sql/sql_admin.cc
      sql/sql_alloc_error_handler.cc
      sql/sql_array.h
      sql/sql_audit.h
      sql/sql_base.cc
      sql/sql_base.h
      sql/sql_cache.cc
      sql/sql_class.cc
      sql/sql_class.h
      sql/sql_connect.cc
      sql/sql_db.cc
      sql/sql_derived.cc
      sql/sql_do.cc
      sql/sql_error.cc
      sql/sql_error.h
      sql/sql_insert.cc
      sql/sql_join_cache.cc
      sql/sql_lex.cc
      sql/sql_lex.h
      sql/sql_load.cc
      sql/sql_parse.cc
      sql/sql_plist.h
      sql/sql_prepare.cc
      sql/sql_prepare.h
      sql/sql_select.cc
      sql/sql_select.h
      sql/sql_servers.cc
      sql/sql_show.cc
      sql/sql_signal.cc
      sql/sql_table.cc
      sql/sql_test.cc
      sql/sql_time.cc
      sql/sql_trigger.cc
      sql/sql_trigger.h
      sql/sql_union.cc
      sql/sql_update.cc
      sql/sql_yacc.yy
      sql/structs.h
      sql/sys_vars.cc
      sql/table.cc
      sql/table.h
      sql/transaction.cc
      sql/tztime.cc
      storage/innobase/CMakeLists.txt
      storage/innobase/btr/btr0btr.c
      storage/innobase/btr/btr0cur.c
      storage/innobase/buf/buf0buddy.c
      storage/innobase/buf/buf0buf.c
      storage/innobase/buf/buf0flu.c
      storage/innobase/buf/buf0lru.c
      storage/innobase/buf/buf0rea.c
      storage/innobase/fil/fil0fil.c
      storage/innobase/handler/ha_innodb.cc
      storage/innobase/include/btr0btr.h
      storage/innobase/include/btr0cur.h
      storage/innobase/include/btr0cur.ic
      storage/innobase/include/buf0buddy.h
      storage/innobase/include/buf0buddy.ic
      storage/innobase/include/buf0buf.h
      storage/innobase/include/buf0buf.ic
      storage/innobase/include/buf0lru.h
      storage/innobase/include/buf0rea.h
      storage/innobase/include/buf0types.h
      storage/innobase/include/db0err.h
      storage/innobase/include/page0cur.ic
      storage/innobase/include/page0page.h
      storage/innobase/include/page0page.ic
      storage/innobase/include/rem0rec.h
      storage/innobase/include/rem0rec.ic
      storage/innobase/include/srv0srv.h
      storage/innobase/include/srv0start.h
      storage/innobase/include/sync0rw.ic
      storage/innobase/include/sync0sync.h
      storage/innobase/include/univ.i
      storage/innobase/include/ut0ut.h
      storage/innobase/page/page0cur.c
      storage/innobase/page/page0page.c
      storage/innobase/page/page0zip.c
      storage/innobase/rem/rem0rec.c
      storage/innobase/row/row0ins.c
      storage/innobase/row/row0mysql.c
      storage/innobase/row/row0row.c
      storage/innobase/row/row0upd.c
      storage/innobase/row/row0vers.c
      storage/innobase/srv/srv0srv.c
      storage/innobase/srv/srv0start.c
      storage/innobase/sync/sync0rw.c
      storage/innobase/sync/sync0sync.c
      storage/innobase/trx/trx0rec.c
      storage/innobase/trx/trx0sys.c
      storage/innobase/ut/ut0ut.c
      storage/myisam/mi_update.c
      storage/myisam/mi_write.c
      tests/mysql_client_test.c
      unittest/gunit/CMakeLists.txt
=== modified file 'mysql-test/suite/rpl/t/rpl_heartbeat_basic.test'
--- a/mysql-test/suite/rpl/t/rpl_heartbeat_basic.test	2011-06-20 13:26:35 +0000
+++ b/mysql-test/suite/rpl/t/rpl_heartbeat_basic.test	2011-07-01 10:16:52 +0000
@@ -26,7 +26,7 @@ call mtr.add_suppression("The slave coor
 let $connect_retry= 20;
 
 --echo *** Preparing ***
---connection slave
+sync_slave_with_master;
 
 --disable_query_log
 call mtr.add_suppression("The master's UUID has changed, although this should not happen unless you have changed it manually.");

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-06-29 07:04:19 +0000
+++ b/sql/log_event.cc	2011-07-01 13:41:35 +0000
@@ -2374,11 +2374,11 @@ bool Log_event::contains_partition_info(
       (Delete, Update, Write -rows)
    T - terminator of the group (XID, COMMIT, ROLLBACK, auto-commit query)
 
-   Only the first g-type event computes the assigned Worker which once 
+   Only the first g-event computes the assigned Worker which once 
    is determined remains to be for the rest of the group.
-   That is the g-type event solely carries partitioning info.
-   For B-type the assigned Worker is NULL to indicate Coordinator 
-   has not yet decided. The same applies to p-type.
+   That is the g-event solely carries partitioning info.
+   For B-event the assigned Worker is NULL to indicate Coordinator 
+   has not yet decided. The same applies to p-event.
    
    Notice, these is a special group consisting of optionally multiple p-events
    terminating with a g-event.
@@ -2390,9 +2390,9 @@ bool Log_event::contains_partition_info(
    done.
 
 
-   @note The function can update APH (through map_db_to_worker()), GAQ objects
-         and relocate some temporary tables from Coordinator's list into
-         involved entries of APH.
+   @note The function updates GAQ queue directly, updates APH hash 
+         plus relocates some temporary tables from Coordinator's list into
+         involved entries of APH through @c map_db_to_worker.
          There's few memory allocations commented where to be freed.
    
    @return a pointer to the Worker struct or NULL.
@@ -2406,6 +2406,7 @@ Slave_worker *Log_event::get_slave_worke
   Slave_worker *ret_worker= NULL;
   THD *thd= rli->info_thd;
   char llbuff[22];
+  Slave_committed_queue *gaq= rli->gaq;
 
   /* checking partioning properties and perform corresponding actions */
 
@@ -2413,14 +2414,19 @@ Slave_worker *Log_event::get_slave_worke
   if ((is_b_event= starts_group()) ||
       // or DDL:s or autocommit queries possibly associated with own p-events
       (!rli->curr_group_seen_begin &&
-       // the following is a case of no-B group: { p_1,p_2,...,p_k, g}
-       (rli->gaq->empty() ||
-        ((Slave_job_group *)
-         dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
+       /*
+         the following is a special case of B-free still multi-event group like
+         { p_1,p_2,...,p_k, g }.
+         In that case either GAQ is empty (the very first group is being
+         assigned) or the last assigned group index points at one of
+         mapped-to-a-worker.
+       */
+       (gaq->empty() ||
+        gaq->get_job_group(rli->gaq->assigned_group_index)->
         worker_id != MTS_WORKER_UNDEF)))
   {
     ulong gaq_idx;
-    rli->mts_total_groups++;
+    rli->mts_groups_assigned++;
 
     rli->curr_group_isolated= FALSE;
     g.master_log_pos= log_pos;
@@ -2428,7 +2434,7 @@ Slave_worker *Log_event::get_slave_worke
     g.group_master_log_name= NULL; // todo: remove
     g.group_relay_log_name= NULL;
     g.worker_id= MTS_WORKER_UNDEF;
-    g.total_seqno= rli->mts_total_groups;
+    g.total_seqno= rli->mts_groups_assigned;
     g.checkpoint_log_name= NULL;
     g.checkpoint_log_pos= 0;
     g.checkpoint_relay_log_name= NULL;
@@ -2437,13 +2443,12 @@ Slave_worker *Log_event::get_slave_worke
     g.done= 0;
 
     // the last occupied GAQ's array index
-    gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
+    gaq_idx= gaq->assigned_group_index= gaq->en_queue((void *) &g);
     
-    DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < rli->gaq->size);
-    DBUG_ASSERT(((Slave_job_group *) 
-                 dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
+    DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < gaq->size);
+    DBUG_ASSERT(gaq->get_job_group(rli->gaq->assigned_group_index)->
                 group_relay_log_name == NULL);
-    DBUG_ASSERT(rli->gaq->assigned_group_index != MTS_WORKER_UNDEF); // gaq must have room
+    DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF);  // gaq must have room
     DBUG_ASSERT(rli->last_assigned_worker == NULL);
 
     if (is_b_event)
@@ -2455,7 +2460,7 @@ Slave_worker *Log_event::get_slave_worke
 
       DBUG_ASSERT(rli->curr_group_da.elements == 1);
 
-      // mark the current grup as started with B-event
+      // mark the current group as started with explicit B-event
       rli->curr_group_seen_begin= TRUE;
 
       return ret_worker;
@@ -2514,10 +2519,8 @@ Slave_worker *Log_event::get_slave_worke
       i++;
     } while (it++);
 
-    if ((ptr_g= ((Slave_job_group *)
-                 dynamic_array_ptr(&rli->gaq->Q,
-                                   rli->gaq->assigned_group_index)))->worker_id
-        == MTS_WORKER_UNDEF)
+    if ((ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index))->
+        worker_id == MTS_WORKER_UNDEF)
     {
       ptr_g->worker_id= ret_worker->id;
       
@@ -2585,12 +2588,11 @@ Slave_worker *Log_event::get_slave_worke
   if (ends_group() || !rli->curr_group_seen_begin)
   {
     // index of GAQ that this terminal event belongs to
-    mts_group_cnt= rli->gaq->assigned_group_index;
+    mts_group_cnt= gaq->assigned_group_index;
     rli->mts_group_status= Relay_log_info::MTS_END_GROUP;
     if (rli->curr_group_isolated)
       mts_do_isolate_group();
-    ptr_g= (Slave_job_group *)
-      dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+    ptr_g= gaq->get_job_group(rli->gaq->assigned_group_index);
 
     DBUG_ASSERT(ret_worker != NULL);
     
@@ -2665,18 +2667,18 @@ Slave_worker *Log_event::get_slave_worke
 
    @return 0 as success, otherwise a failure.
 */
-int Log_event::apply_event(Relay_log_info const *rli)
+int Log_event::apply_event(Relay_log_info *rli)
 {
   DBUG_ENTER("LOG_EVENT:apply_event");
-  Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
-  bool parallel= FALSE, async_event= FALSE, seq_event= FALSE;
-  THD *thd= c_rli->info_thd;
+  bool parallel= FALSE;
+  enum enum_mts_event_exec_mode actual_exec_mode= EVENT_EXEC_PARALLEL;
+  THD *thd= rli->info_thd;
 
-  worker= c_rli;
+  worker= rli;
 
   if (rli->is_mts_recovery())
   {
-    bool skip= bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
+    bool skip= bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index);
 
     if (skip)
     {
@@ -2689,10 +2691,10 @@ int Log_event::apply_event(Relay_log_inf
   }
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      (async_event=
-       mts_async_exec_by_coordinator(::server_id, 
-                                     rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) ||
-      (seq_event= mts_sequential_exec()))
+      ((actual_exec_mode= 
+        mts_execution_mode(::server_id, 
+                           rli->mts_group_status == Relay_log_info::MTS_IN_GROUP))
+       != EVENT_EXEC_PARALLEL))
   {
     if (parallel)
     {
@@ -2704,7 +2706,7 @@ int Log_event::apply_event(Relay_log_inf
          for terminal events to finish.
       */
 
-      if (!async_event)
+      if (actual_exec_mode != EVENT_EXEC_ASYNC)
       {     
         /*
           this  event does not split the current group but is indeed
@@ -2719,14 +2721,14 @@ int Log_event::apply_event(Relay_log_inf
              wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var.
              MTS has to stop to suggest restart in the permanent sequential mode.
           */
-          llstr(c_rli->get_event_relay_log_pos(), llbuff);
+          llstr(rli->get_event_relay_log_pos(), llbuff);
           rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
                       ER(ER_MTS_CANT_PARALLEL),
-                      get_type_str(), c_rli->get_event_relay_log_name(),
-                      c_rli->get_event_relay_log_pos());
+                      get_type_str(), rli->get_event_relay_log_name(),
+                      rli->get_event_relay_log_pos());
           
           /* Coordinator cant continue, it marks MTS group status accordingly */
-          c_rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
+          rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
 
           goto err;
         }
@@ -2742,32 +2744,37 @@ int Log_event::apply_event(Relay_log_inf
 
 #ifndef DBUG_OFF
         /* all Workers are idle as done through wait_for_workers_to_finish */
-        for (uint k= 0; k < c_rli->curr_group_da.elements; k++)
+        for (uint k= 0; k < rli->curr_group_da.elements; k++)
         {
           DBUG_ASSERT(!(*(Slave_worker **)
-                        dynamic_array_ptr(&c_rli->workers, k))->usage_partition);
+                        dynamic_array_ptr(&rli->workers, k))->usage_partition);
           DBUG_ASSERT(!(*(Slave_worker **)
-                        dynamic_array_ptr(&c_rli->workers, k))->jobs.len);
+                        dynamic_array_ptr(&rli->workers, k))->jobs.len);
         }
 #endif
       }
+      else
+      {
+        DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_ASYNC);
+      }
     }
     DBUG_RETURN(do_apply_event(rli));
   }
 
+  DBUG_ASSERT(actual_exec_mode == EVENT_EXEC_PARALLEL);
   DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
               rli->last_assigned_worker);
 
   worker= NULL;
-  c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
+  rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
 
   worker= (Relay_log_info*)
-    (c_rli->last_assigned_worker= get_slave_worker(c_rli));
+    (rli->last_assigned_worker= get_slave_worker(rli));
 
 #ifndef DBUG_OFF
-  if (c_rli->last_assigned_worker)
+  if (rli->last_assigned_worker)
     DBUG_PRINT("mts", ("Assigning job to worker %lu",
-               c_rli->last_assigned_worker->id));
+               rli->last_assigned_worker->id));
 #endif
 
 err:
@@ -6273,7 +6280,6 @@ int Rotate_log_event::do_update_pos(Rela
                         rli->get_group_master_log_name(),
                         (ulong) rli->get_group_master_log_pos()));
     mysql_mutex_unlock(&rli->data_lock);
-    rli->flush_info(TRUE);  // todo: error branch
     if (rli->is_parallel_exec())
       rli->reset_notified_checkpoint(0, when + (time_t) exec_time);
 
@@ -6640,34 +6646,34 @@ int Xid_log_event::do_apply_event_worker
 {
   int error= 0;
   bool is_trans_repo= w->is_transactional();
+  Slave_committed_queue *gaq= w->c_rli->gaq;
 
   DBUG_PRINT("mts", ("do_apply group master %s %llu  group relay %s %llu event %s %llu.",
-    w->group_master_log_name,
-    w->group_master_log_pos,
-    w->group_relay_log_name,
-    w->group_relay_log_pos,
-    w->event_relay_log_name,
-    w->event_relay_log_pos));
+                     w->get_group_master_log_name(),
+                     w->get_group_master_log_pos(),
+                     w->get_group_relay_log_name(),
+                     w->get_group_relay_log_pos(),
+                     w->get_event_relay_log_name(),
+                     w->get_event_relay_log_pos()));
 
   DBUG_EXECUTE_IF("crash_before_update_pos", DBUG_SUICIDE(););
 
   if (is_trans_repo)
   {
     ulong gaq_idx= mts_group_cnt;
-    Slave_job_group *ptr_g=
-      (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+    Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx);
 
     if ((error= w->commit_positions(this, ptr_g)))
       goto err;
   }
 
   DBUG_PRINT("mts", ("do_apply group master %s %llu  group relay %s %llu event %s %llu.",
-    w->group_master_log_name,
-    w->group_master_log_pos,
-    w->group_relay_log_name,
-    w->group_relay_log_pos,
-    w->event_relay_log_name,
-    w->event_relay_log_pos));
+                     w->get_group_master_log_name(),
+                     w->get_group_master_log_pos(),
+                     w->get_group_relay_log_name(),
+                     w->get_group_relay_log_pos(),
+                     w->get_event_relay_log_name(),
+                     w->get_event_relay_log_pos()));
 
   DBUG_EXECUTE_IF("crash_after_update_pos_before_apply", DBUG_SUICIDE(););
 
@@ -9513,7 +9519,6 @@ int Table_map_log_event::do_apply_event(
 
   int error= 0;
 
-  // mts-II todo: consider filtering
   if (rli->info_thd->slave_thread /* filtering is for slave only */ &&
       (!rpl_filter->db_ok(table_list->db) ||
        (rpl_filter->is_on() && !rpl_filter->tables_ok("", table_list))))

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-06-27 17:31:45 +0000
+++ b/sql/log_event.h	2011-07-01 12:48:25 +0000
@@ -262,7 +262,7 @@ struct sql_ex_info
 */
 #define MAX_DBS_IN_EVENT_MTS 16
 /*
-   When the actual number of db:s exceeds MAX_DBS_IN_EVENT_MTS
+   When the actual number of databases exceeds MAX_DBS_IN_EVENT_MTS
    the value of OVER_MAX_DBS_IN_EVENT_MTS is is put into the mts_accessed_dbs status.
 */
 #define OVER_MAX_DBS_IN_EVENT_MTS 254
@@ -356,7 +356,7 @@ struct sql_ex_info
 #define Q_INVOKER 11
 
 /*
-  Q_UPDATED_DB_NAMES status variable collects of the updated db:s
+  Q_UPDATED_DB_NAMES status variable collects of the updated databases
   total number and their names to be propagated to the slave in order
   to facilitate the parallel applying of the Query events.
 */
@@ -540,7 +540,7 @@ struct sql_ex_info
    MTS: group of events can be marked to force its execution
    in isolation from any other Workers.
    Typically that is done for a transaction that contains 
-   a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s.
+   a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases.
    The flag is set ON for an event that terminates its group.
 */
 #define LOG_EVENT_MTS_ISOLATE_F 0x200
@@ -1270,21 +1270,44 @@ public:
   /* Return start of query time or current time */
 
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
-public:
+
+private:
+
+  /*
+    possible decisions by mts_execution_mode()
+  */
+  enum enum_mts_event_exec_mode
+  {
+    /*
+      Event is run by a Worker.
+    */
+    EVENT_EXEC_PARALLEL,
+    /*
+      Event is run by Coordinator.
+    */
+    EVENT_EXEC_ASYNC,
+    /*
+      Event is run by Coordinator and requires W:s synchronization.
+    */
+    EVENT_EXEC_SYNC,
+    /*
+      Event can't be executed neither by Workers nor Coordinator.
+    */
+    EVENT_EXEC_CAN_NOT
+  };
 
   /**
-     MST: to execute some event types serially.
+     Is called from mts_execution_mode() to
 
-     @note There are incompatile combinations such the referred event
-           is wrapped with BEGIN/COMMIT. Such cases should be identified
-           by the caller and treates as an error.
-
-           Notice, even though the func returns TRUE, some events
-           like old LOAD-DATA rooted EXEC_LOAD_EVENT can't run even
-           in isolated parallel mode and MTS would have to stop.
-     
-     @return TRUE if despite permanent parallel execution mode an event
-                  needs applying in a real isolation that is sequentially.
+     @return TRUE  if the event needs applying with synchronization
+                   agaist Workers, otherwise
+             FALSE
+
+     @note There are incompatile combinations such as referred further events
+           are wrapped with BEGIN/COMMIT. Such cases should be identified
+           by the caller and treats correspondingly.
+
+           todo: to mts-support Old master Load-data related events
   */
   bool mts_sequential_exec()
   {
@@ -1304,37 +1327,35 @@ public:
   }
 
   /**
-     MST: some events have to be applied by Coordinator concurrently with Workers.
+     MTS Coordinator finds out a way how to execute the current event.
 
-     *TODO*: combine with mts_sequential_exec() to have ternary outcome.
-
-     @return TRUE  if that's the case,
-             FALSE otherwise.
-  */
-  bool mts_async_exec_by_coordinator(ulong slave_server_id, bool mts_in_group)
-  {
-    return
-      (get_type_code() == FORMAT_DESCRIPTION_EVENT &&
-       ((server_id == (uint32) ::server_id) || (log_pos == 0)))
-      ||
-      (get_type_code() == ROTATE_EVENT &&
-       ((server_id == (uint32) ::server_id) ||
-        (log_pos == 0 && mts_in_group)));
+     Besides the parallelizable case, some events have to be applied by
+     Coordinator concurrently with Workers and some to require synchronization
+     with Workers before to apply them.
+
+     @retval EVENT_EXEC_PARALLEL  if event is executed by a Worker
+     @retval EVENT_EXEC_ASYNC     if event is executed by Coordinator
+     @retval EVENT_EXEC_ASYNC     if event is executed by Coordinator
+                                  with synchronization against the Workers
+  */
+  enum enum_mts_event_exec_mode mts_execution_mode(ulong slave_server_id,
+                                                   bool mts_in_group)
+  {
+    if ((get_type_code() == FORMAT_DESCRIPTION_EVENT &&
+         ((server_id == (uint32) ::server_id) || (log_pos == 0)))
+        ||
+        (get_type_code() == ROTATE_EVENT &&
+         ((server_id == (uint32) ::server_id) ||
+          (log_pos == 0    /* very first fake Rotate */
+           && mts_in_group /* ignored events, R_f at slave restart */))))
+      return EVENT_EXEC_ASYNC;
+    else if (mts_sequential_exec())
+      return EVENT_EXEC_SYNC;
+    else
+      return EVENT_EXEC_PARALLEL;
   }
 
   /**
-     Events of a cetain type carry partitioning data such as db names.
-  */
-  bool contains_partition_info();
-
-  /**
-     Events of a cetain type start or end a group of events treated
-     transactionally wrt binlog.
-  */
-  virtual bool starts_group() { return FALSE; }
-  virtual bool ends_group()   { return FALSE; }
-
-  /**
      @return index  in \in [0, M] range to indicate
              to be assigned worker;
              M is the max index of the worker pool.
@@ -1353,16 +1374,10 @@ public:
   }
 
   /*
-    returns the number of updated by the event databases.
-    In other than Query-log-event case that's one.
-  */
-  virtual uint8 mts_number_dbs() { return 1; }
-
-  /*
     Group of events can be marked to force its execution
     in isolation from any other Workers.
     Typically that is done for a transaction that contains 
-    a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s
+    a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS databases.
   */
   virtual void mts_do_isolate_group()
   { 
@@ -1373,13 +1388,48 @@ public:
     flags |= LOG_EVENT_MTS_ISOLATE_F;
   }
 
+
+public:
+
+  /**
+     @return TRUE  if events carries partitioning data (database names).
+  */
+  bool contains_partition_info();
+
   /*
-    Verifying whether the terminal event of a group is marked to
-    execute in isolation.
+    @return  the number of updated by the event databases.
+
+    @note In other than Query-log-event case that's one.
+  */
+  virtual uint8 mts_number_dbs() { return 1; }
+
+  /**
+    @return TRUE  if the terminal event of a group is marked to
+                  execute in isolation from other Workers,
+            FASE  otherwise
   */
   bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; }
 
   /**
+     Events of a cetain type can start or end a group of events treated
+     transactionally wrt binlog.
+
+     Public access is required by implementation of recovery + skip.
+
+     @return TRUE  if the event starts a group (transaction)
+             FASE  otherwise
+  */
+  virtual bool starts_group() { return FALSE; }
+
+  /**
+     @return TRUE  if the event starts a group (transaction)
+             FASE  otherwise
+  */
+  virtual bool ends_group()   { return FALSE; }
+
+public:
+
+  /**
      Apply the event to the database.
 
      This function represents the public interface for applying an
@@ -1387,7 +1437,7 @@ public:
 
      @see do_apply_event
    */
-  int apply_event(Relay_log_info const *rli);
+  int apply_event(Relay_log_info *rli);
 
   /**
      Update the relay log position.
@@ -1963,7 +2013,7 @@ public:
   */
   uint32 master_data_written;
   /*
-    number of updated db:s by the query and their names. This info
+    number of updated databases by the query and their names. This info
     is requested by both Coordinator and Worker.
   */
   uchar mts_accessed_dbs;
@@ -1977,15 +2027,15 @@ public:
   const char* get_db() { return db; }
 
   /**
-     Returns a list of updated db:s or the default db single item list
-     in case of over-MAX_DBS_IN_EVENT_MTS actual db:s.
+     Returns a list of updated databases or the default db single item list
+     in case of the number of databases exceeds MAX_DBS_IN_EVENT_MTS.
   */
   virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
   {
     List<char> *res= new (mem_root) List<char>;
     if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
     {
-      // "" == empty string db name is special to indicate sequential applying
+      // the empty string db name is special to indicate sequential applying
       mts_accessed_db_names[0][0]= 0;
       res->push_back((char*) mts_accessed_db_names[0]);
     }
@@ -2066,10 +2116,9 @@ public:        /* !!! Public in this pat
       !strncasecmp(query, "ROLLBACK", 8);
   }
   /**
-     todo: Parallel support for DDL:s.
-     DDL queries are logged without BEGIN/COMMIT parentheses
-     and can be regarded as the starting and the ending events of 
-     its self-group.
+     Notice, DDL queries are logged without BEGIN/COMMIT parentheses
+     and identification of such single-query group
+     occures within logics of @c get_slave_worker().
   */
   bool starts_group() { return !strncmp(query, "BEGIN", q_len); }
   virtual bool ends_group()
@@ -3161,7 +3210,6 @@ public:
   const char* get_db() { return db; }
 #endif
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
-  /* MTS executes this event sequentially */
   virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
   virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
   {
@@ -4473,8 +4521,6 @@ private:
 int append_query_string(const CHARSET_INFO *csinfo,
                         String const *from, String *to);
 bool sqlcom_can_generate_row_events(const THD *thd);
-void handle_rows_query_log_event(Log_event *ev, Relay_log_info *rli);
-
 bool event_checksum_test(uchar *buf, ulong event_len, uint8 alg);
 uint8 get_checksum_alg(const char* buf, ulong len);
 extern TYPELIB binlog_checksum_typelib;

=== modified file 'sql/rpl_reporting.cc'
--- a/sql/rpl_reporting.cc	2011-06-29 07:04:19 +0000
+++ b/sql/rpl_reporting.cc	2011-07-01 13:41:35 +0000
@@ -110,7 +110,7 @@ Slave_reporting_capability::report(logle
 }
 
 void
-Slave_reporting_capability::do_report(loglevel level, int err_code,
+Slave_reporting_capability::va_report(loglevel level, int err_code,
                                       const char *msg, va_list args) const
 {
 #if !defined(EMBEDDED_LIBRARY)

=== modified file 'sql/rpl_reporting.h'
--- a/sql/rpl_reporting.h	2011-06-20 13:26:35 +0000
+++ b/sql/rpl_reporting.h	2011-07-01 12:48:25 +0000
@@ -59,7 +59,7 @@ public:
   */
   virtual void report(loglevel level, int err_code, const char *msg, ...) const
     ATTRIBUTE_FORMAT(printf, 4, 5);
-  void do_report(loglevel level, int err_code,
+  void va_report(loglevel level, int err_code,
                  const char *msg, va_list v_args) const;
 
   /**
@@ -130,6 +130,15 @@ public:
   bool is_error() const { return last_error().number != 0; }
 
   virtual ~Slave_reporting_capability()= 0;
+
+protected:
+
+  virtual void do_report(loglevel level, int err_code,
+                 const char *msg, va_list v_args) const
+  {
+    va_report(level, err_code, msg, v_args);
+  }
+
 private:
   /**
      Last error produced by the I/O or SQL thread respectively.

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-07-01 08:45:11 +0000
+++ b/sql/rpl_rli.cc	2011-07-01 13:41:35 +0000
@@ -53,14 +53,13 @@ static PSI_cond_info *worker_conds= NULL
 
 PSI_mutex_key *key_mutex_slave_parallel_worker= NULL;
 PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
-PSI_mutex_key key_mutex_mts_temp_tables_lock;
 
 PSI_cond_key *key_cond_slave_parallel_worker= NULL;
 PSI_cond_key key_cond_slave_parallel_pend_jobs;
 #endif
 
 Relay_log_info::Relay_log_info(bool is_slave_recovery)
-   :Rpl_info("SQL", "SQL-Thread"), checkpoint_thd(0), checkpoint_running(0),
+   :Rpl_info("SQL", "SQL-Thread"),
    replicate_same_server_id(::replicate_same_server_id),
    cur_log_fd(-1), relay_log(&sync_relaylog_period),
    is_relay_log_recovery(is_slave_recovery),
@@ -68,11 +67,11 @@ Relay_log_info::Relay_log_info(bool is_s
    cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
    group_master_log_pos(0), log_space_total(0), ignore_log_space_limit(0),
    last_master_timestamp(0), slave_skip_counter(0),
-   abort_pos_wait(0), slave_exec_mode(0), until_condition(UNTIL_NONE),
+   abort_pos_wait(0), until_condition(UNTIL_NONE),
    until_log_pos(0), retried_trans(0),
    tables_to_lock(0), tables_to_lock_count(0),
    rows_query_ev(NULL), last_event_start_time(0),
-   this_worker(NULL), slave_parallel_workers(0),
+   slave_parallel_workers(0),
    recovery_parallel_workers(0), checkpoint_seqno(0),
    checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0),
    mts_recovery_index(0), mts_recovery_group_seen_begin(0),
@@ -112,7 +111,7 @@ void Relay_log_info::init_workers(ulong 
     Parallel slave parameters initialization is done regardless
     whether the feature is or going to be active or not.
   */
-  trans_jobs= stmt_jobs= pending_jobs= wq_size_waits= 0;
+  mts_groups_assigned= mts_events_assigned= pending_jobs= wq_size_waits= 0;
   mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
 
   my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
@@ -142,12 +141,9 @@ void Relay_log_info::init_workers(ulong 
   }
   mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
                    MY_MUTEX_INIT_FAST);
-  mysql_mutex_init(key_mutex_mts_temp_tables_lock, &mts_temp_tables_lock,
-                   MY_MUTEX_INIT_FAST);
   mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
 #else
   mysql_mutex_init(NULL, &pending_jobs_lock, MY_MUTEX_INIT_FAST);
-  mysql_mutex_init(NULL, &mts_temp_tables_lock, MY_MUTEX_INIT_FAST);
   mysql_cond_init(NULL, &pending_jobs_cond, NULL);
 #endif
 }
@@ -169,7 +165,6 @@ void Relay_log_info::deinit_workers()
 
   mysql_mutex_destroy(&pending_jobs_lock);
   mysql_cond_destroy(&pending_jobs_cond);
-  mysql_mutex_destroy(&mts_temp_tables_lock);
 
   delete_dynamic(&workers);
   my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
@@ -795,6 +790,18 @@ void Relay_log_info::inc_group_relay_log
   {
     group_master_log_pos= log_pos;
   }
+
+  /*
+    In MTS mode FD or Rotate event commit their solitary group to
+    Coordinator's info table. Callers make sure that Workers have been
+    executed all assignements.
+    Broadcast to master_pos_wait() waiters should be done after
+    the table is updated.
+  */
+  DBUG_ASSERT(!is_parallel_exec() ||
+              mts_group_status != Relay_log_info::MTS_IN_GROUP);
+  flush_info(TRUE);  // todo: error branch
+
   mysql_cond_broadcast(&data_cond);
   if (!skip_lock)
     mysql_mutex_unlock(&data_lock);
@@ -1095,6 +1102,9 @@ void Relay_log_info::stmt_done(my_off_t 
   {
     if (is_parallel_exec())
     {
+
+      DBUG_ASSERT(!mts_is_worker(info_thd));
+
       /*
         Format Description events are special events that are handled as a
         synchronization points. For that reason, the checkpoint routine is
@@ -1103,10 +1113,6 @@ void Relay_log_info::stmt_done(my_off_t 
       (void) mts_checkpoint_routine(this, 0, FALSE, FALSE); // TODO: ALFRANIO ERROR
     }
     inc_group_relay_log_pos(event_master_log_pos);
-    
-    DBUG_ASSERT(this_worker == NULL);
-
-    flush_info(is_transactional() ? TRUE : FALSE); // TODO: ALFRANIO ERROR
   }
 }
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-06-27 17:31:45 +0000
+++ b/sql/rpl_rli.h	2011-07-01 10:16:52 +0000
@@ -108,9 +108,6 @@ class Relay_log_info : public Rpl_info
   friend class Rpl_info_factory;
 
 public:
-  THD* checkpoint_thd;
-  bool checkpoint_running;
-
   /**
      Flags for the state of the replication.
    */
@@ -209,6 +206,7 @@ public:
     happen when, for example, the relay log gets rotated because of
     max_binlog_size.
   */
+protected:
   char group_relay_log_name[FN_REFLEN];
   ulonglong group_relay_log_pos;
   char event_relay_log_name[FN_REFLEN];
@@ -230,8 +228,6 @@ public:
   char group_master_log_name[FN_REFLEN];
   volatile my_off_t group_master_log_pos;
 
-// TODO: Restore!
-protected:
   /*
     When it commits, InnoDB internally stores the master log position it has
     processed so far; the position to store is the one of the end of the
@@ -283,12 +279,6 @@ public:
   mysql_cond_t log_space_cond;
 
   /*
-    A cache for the global system variable's value.
-    The value is reset at the beginning of each statement execution.
-  */
-  ulong slave_exec_mode;
-
-  /*
      Condition and its parameters from START SLAVE UNTIL clause.
      
      UNTIL condition is tested with is_until_satisfied() method that is
@@ -431,7 +421,7 @@ public:
   */
   time_t last_event_start_time;
 
-  /*
+  /*****************************************************************************
     WL#5569 MTS
 
     legends:
@@ -441,41 +431,36 @@ public:
   */
   DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
   volatile ulong pending_jobs;
-  ulong stmt_jobs;      // statistics: number of events (statements) processed
-  ulong trans_jobs;     // statistics: number of groups (transactions) processed
-  ulong wq_size_waits;  // number of times C goes to sleep due to WQ:s oversize
   mysql_mutex_t pending_jobs_lock;
   mysql_cond_t pending_jobs_cond;
   ulong       mts_slave_worker_queue_len_max;
   ulonglong   mts_pending_jobs_size;      // actual mem usage by WQ:s
-  ulonglong   mts_pending_jobs_size_max;  // the max forcing to wait by C
-  bool    mts_wq_oversize;           // C raises flag to wait some memory's released
-  Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
+  ulonglong   mts_pending_jobs_size_max;  // max of WQ:s size forcing C to wait
+  bool    mts_wq_oversize;      // C raises flag to wait some memory's released
+  Slave_worker  *last_assigned_worker;// is set to a Worker at assigning a group
+  /*
+    master-binlog ordered queue of Slave_job_group descriptors of groups
+    that are under processing
+  */
   Slave_committed_queue *gaq;
-  DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
+  /*
+    Container for references of involved partitions for the current event group
+  */
+  DYNAMIC_ARRAY curr_group_assigned_parts;
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
   bool curr_group_isolated;     // current group requires execution in isolation
   volatile ulong mts_wq_underrun_w_id;  // Id of a Worker whose queue is getting empty
-  volatile long mts_wq_excess;   // excessive overrun counter; when W increments and decrements it it also marks updates its own wq_overrun_set
-  volatile ulong mts_wq_overrun_cnt; // statistics of all mts_wq_excess increments
-  ulong mts_wq_no_underrun_cnt;  // counts times of C goes to sleep when W:s are filled
-  ulong mts_wq_overfill_cnt;  // counts C waits when a W's queue is full
-  long  mts_worker_underrun_level; // percent of WQ size at which W is considered hungry
-  ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
-  Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
-  ulonglong mts_total_groups; // total event groups distributed in current session
-  ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
-  ulong slave_parallel_workers;     // the one slave session time number of workers
-  ulong recovery_parallel_workers; // number of workers while recovering
-
   /* 
-     A sorted array of Worker current assignements number to provide
-     approximate view on Workers loading.
-     The first row of the least occupied Worker is queried at assigning 
-     a new partition. Is updated at checkpoint commit to the main RLI.
+     excessive overrun counter; when W increments and decrements it it
+     also marks updates its own wq_overrun_set
   */
-  DYNAMIC_ARRAY least_occupied_workers;
+  volatile long mts_wq_excess;
+  long  mts_worker_underrun_level; // % of WQ size at which W is considered hungry
+  ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
+  ulong opt_slave_parallel_workers; // cache for ::opt_slave_parallel_workers
+  ulong slave_parallel_workers; // the one slave session time number of workers
+  ulong recovery_parallel_workers; // number of workers while recovering
   uint checkpoint_seqno;  // counter of groups executed after the most recent CP
   uint checkpoint_group;  // number of groups in one checkpoint interval (period).
   MY_BITMAP recovery_groups;  // bitmap used during recovery
@@ -508,9 +493,9 @@ public:
     MTS_NOT_IN_GROUP =>                  |
         {MTS_IN_GROUP => MTS_END_GROUP --+} while (!killed) => MTS_KILLED_GROUP
       
-    though MTS_END_GROUP has `->' link to MTS_NOT_IN_GROUP
-    when Coordinator synchronizes with Workers by demanding them to complete their
-    assignments.
+    MTS_END_GROUP has `->' loop breaking link to MTS_NOT_IN_GROUP when
+    Coordinator synchronizes with Workers by demanding them to
+    complete their assignments.
   */
   enum
   {
@@ -525,6 +510,24 @@ public:
     MTS_KILLED_GROUP /* Coordinator gave out to reach MTS_END_GROUP */
   } mts_group_status;
 
+  /*
+    MTS statistics: 
+  */
+  ulong mts_events_assigned; // number of events (statements) scheduled
+  ulong mts_groups_assigned; // number of groups (transactions) scheduled
+  volatile ulong mts_wq_overrun_cnt; // counter of all mts_wq_excess increments
+  ulong wq_size_waits;    // number of times C slept due to WQ:s oversize
+  ulong mts_wq_no_underrun_cnt;// number of times of C slept when W:s were filled
+  ulong mts_wq_overfill_cnt;  // counter of C waited when a W's queue was full
+  /* 
+     A sorted array of the Workers' current assignement numbers to provide
+     approximate view on Workers loading.
+     The first row of the least occupied Worker is queried at assigning 
+     a new partition. Is updated at checkpoint commit to the main RLI.
+  */
+  DYNAMIC_ARRAY least_occupied_workers;
+  /* end of MTS statistics */
+
   /* most of allocation in the coordinator rli is there */
   void init_workers(ulong);
 
@@ -576,6 +579,10 @@ public:
   */
   void reset_notified_checkpoint(ulong, time_t);
 
+  /*
+   * End of MTS section ******************************************************/
+
+
   /**
     Helper function to do after statement completion.
 

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-07-01 08:45:11 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-07-01 13:41:35 +0000
@@ -58,9 +58,9 @@ ulong mts_partition_hash_soft_max= 16;
 Slave_worker::Slave_worker(const char* type, const char* pfs,
                            Relay_log_info *rli)
   : Relay_log_info(FALSE), c_rli(rli),
-    checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
-    inited_group_execed(0), processed_group(0), running_status(NOT_RUNNING),
-    last_event(NULL)
+    checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
+    inited_group_executed(0),  checkpoint_seqno(0), running_status(NOT_RUNNING),
+    inited_curr_group_exec_parts(0)
 {
   checkpoint_relay_log_name[0]= 0;
   checkpoint_master_log_name[0]= 0;
@@ -68,15 +68,72 @@ Slave_worker::Slave_worker(const char* t
 
 Slave_worker::~Slave_worker() 
 {
-  delete_dynamic(&curr_group_exec_parts);
+  if (inited_curr_group_exec_parts)
+    delete_dynamic(&curr_group_exec_parts);
 
-  if (inited_group_execed)
+  if (inited_group_executed)
   {
-    bitmap_free(&group_execed);
+    bitmap_free(&group_executed);
     bitmap_free(&group_shifted);
   }
 }
 
+/**
+   Method is executed by Coordinator at Worker startup time to initialize
+   members parly with values supplied by Coordinator through rli.
+
+   @param  rli  Coordinator's Relay_log_info pointer
+   @param  i    identifier of the Worker
+
+   @return 0          success
+           non-zero   failure
+*/
+int Slave_worker::init_worker(Relay_log_info * rli, ulong i)
+{
+  uint k;
+  Slave_job_item empty= {NULL};
+
+  c_rli= rli;
+  if (init_info())
+    return 1;
+
+  id= i;
+  curr_group_exec_parts.elements= 0;
+  relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
+  checkpoint_notified= FALSE;
+  bitmap_shifted= 0;
+  workers= c_rli->workers; // shallow copying is sufficient
+  wq_size_waits= groups_done= events_done= curr_jobs= 0;
+  usage_partition= 0;
+  last_group_done_index= c_rli->gaq->size; // out of range
+
+  jobs.avail= 0;
+  jobs.len= 0;
+  jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
+  jobs.waited_overfill= 0;
+  jobs.entry= jobs.size= c_rli->mts_slave_worker_queue_len_max;
+  curr_group_seen_begin= FALSE;
+
+  my_init_dynamic_array(&jobs.Q, sizeof(Slave_job_item), jobs.size, 0);
+  for (k= 0; k < jobs.size; k++)
+    insert_dynamic(&jobs.Q, (uchar*) &empty);
+  
+  DBUG_ASSERT(jobs.Q.elements == jobs.size);
+  
+  wq_overrun_set= FALSE;
+
+#ifdef HAVE_PSI_INTERFACE
+  mysql_mutex_init(key_mutex_slave_parallel_worker[i], &jobs_lock,
+                   MY_MUTEX_INIT_FAST);
+  mysql_cond_init(key_cond_slave_parallel_worker[i], &jobs_cond, NULL);
+#else
+  mysql_mutex_init(NULL, &jobs_lock, MY_MUTEX_INIT_FAST);
+  mysql_cond_init(NULL, &jobs_cond, NULL);
+#endif
+
+  return 0;
+}
+
 int Slave_worker::init_info()
 {
   int necessary_to_configure= 0;
@@ -88,8 +145,9 @@ int Slave_worker::init_info()
 
   my_init_dynamic_array(&curr_group_exec_parts, sizeof(db_worker_hash_entry*),
                         SLAVE_INIT_DBS_IN_GROUP, 1);
-
-  if (bitmap_init(&group_execed, NULL,
+  if (curr_group_exec_parts.max_element != 0)
+    inited_curr_group_exec_parts= 1;
+  if (bitmap_init(&group_executed, NULL,
                   c_rli->checkpoint_group, FALSE))
     goto err;
   
@@ -97,7 +155,7 @@ int Slave_worker::init_info()
                   c_rli->checkpoint_group, FALSE))
     goto err;
 
-  inited_group_execed= 1;
+  inited_group_executed= 1;
 
   /*
     The init_info() is used to either create or read information
@@ -171,7 +229,7 @@ bool Slave_worker::read_info(Rpl_info_ha
   ulong temp_checkpoint_master_log_pos= 0;
   ulong temp_checkpoint_seqno= 0;
   ulong nbytes= 0;
-  uchar *buffer= (uchar *) group_execed.bitmap;
+  uchar *buffer= (uchar *) group_executed.bitmap;
 
   if (from->prepare_info_for_read(nidx))
     DBUG_RETURN(TRUE);
@@ -216,8 +274,8 @@ bool Slave_worker::write_info(Rpl_info_h
 {
   DBUG_ENTER("Master_info::write_info");
 
-  ulong nbytes= (ulong) no_bytes_in_map(&group_execed);
-  uchar *buffer= (uchar*) group_execed.bitmap;
+  ulong nbytes= (ulong) no_bytes_in_map(&group_executed);
+  uchar *buffer= (uchar*) group_executed.bitmap;
 
   if (to->prepare_info_for_write(nidx) ||
       to->set_info(group_relay_log_name) ||
@@ -261,12 +319,12 @@ bool Slave_worker::commit_positions(Log_
     my_free(ptr_g->checkpoint_relay_log_name);
     ptr_g->checkpoint_relay_log_name= NULL;
 
-    bitmap_copy(&group_shifted, &group_execed);
-    bitmap_clear_all(&group_execed);
+    bitmap_copy(&group_shifted, &group_executed);
+    bitmap_clear_all(&group_executed);
     for (uint pos= ptr_g->shifted; pos < c_rli->checkpoint_group; pos++)
     {
       if (bitmap_is_set(&group_shifted, pos))
-        bitmap_set_bit(&group_execed, pos - ptr_g->shifted);
+        bitmap_set_bit(&group_executed, pos - ptr_g->shifted);
     }
   }
   /*
@@ -282,7 +340,7 @@ bool Slave_worker::commit_positions(Log_
 
   DBUG_ASSERT(ptr_g->checkpoint_seqno <= (c_rli->checkpoint_group - 1));
   
-  bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
+  bitmap_set_bit(&group_executed, ptr_g->checkpoint_seqno);
   checkpoint_seqno= ptr_g->checkpoint_seqno;
   group_relay_log_pos= ev->future_event_relay_log_pos;
   group_master_log_pos= ev->log_pos;
@@ -505,7 +563,8 @@ static void move_temp_tables_to_entry(TH
 
         CGAP .= D
 
-   and a possible D's Worker id is searched in Assigne Partition Hash
+   here .= is concatenate operation,
+   and a possible D's Worker id is searched in Assigned Partition Hash
    (APH) that collects tuples (P, W_id, U, mutex, cond).
    In case not found,
 
@@ -539,6 +598,8 @@ static void move_temp_tables_to_entry(TH
    @param  ptr_entry   reference to a pointer to the resulted entry in
                        the Assigne Partition Hash where
                        the entry's pointer is stored at return.
+   @param  need_temp_tables
+                       if FALSE migration of temporary tables not needed
    @param  last_worker caller opts for this Worker, it must be
                        rli->last_assigned_worker if one is determined.
 
@@ -613,7 +674,8 @@ Slave_worker *map_db_to_worker(const cha
     */
     if (!(db= (char *) my_malloc((size_t) dblength + 1, MYF(0))))
       goto err;
-    if (!(entry= (db_worker_hash_entry *) my_malloc(sizeof(db_worker_hash_entry), MYF(0))))
+    if (!(entry= (db_worker_hash_entry *)
+          my_malloc(sizeof(db_worker_hash_entry), MYF(0))))
     {
       my_free(db);
       goto err;
@@ -801,9 +863,9 @@ void Slave_worker::slave_worker_ends_gro
 
   if (!error)
   {
+    Slave_committed_queue *gaq= c_rli->gaq;
     ulong gaq_idx= ev->mts_group_cnt;
-    Slave_job_group *ptr_g=
-      (Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx);
+    Slave_job_group *ptr_g= gaq->get_job_group(gaq_idx);
 
     // first ever group must have relay log name
     DBUG_ASSERT(last_group_done_index != c_rli->gaq->size ||
@@ -824,6 +886,7 @@ void Slave_worker::slave_worker_ends_gro
     ptr_g->done= 1;    // GAQ index is available to C now
 
     last_group_done_index= gaq_idx;
+    groups_done++;
   }
 
   /*
@@ -903,67 +966,79 @@ void Slave_worker::slave_worker_ends_gro
 
 
 /**
-   Class circular_buffer_queue
+   Class circular_buffer_queue.
+
+   Content of the being dequeued item is copied to the arg-pointer
+   location.
+      
+   @return the queue's array index that the de-queued item
+           located at, or an error as an int outside the legacy 
+           [0, size) (value `size' is excluded) range.
 */
 
 ulong circular_buffer_queue::de_queue(uchar *val)
 {
   ulong ret;
-  if (enter == size)
+  if (entry == size)
   {
     DBUG_ASSERT(len == 0);
     return (ulong) -1;
   }
 
-  ret= enter;
-  get_dynamic(&Q, val, enter);
+  ret= entry;
+  get_dynamic(&Q, val, entry);
   len--;
   
   // pre boundary cond
   if (avail == size)
-    avail= enter;
-  enter= (enter + 1) % size;
+    avail= entry;
+  entry= (entry + 1) % size;
 
   // post boundary cond
-  if (avail == enter)
-    enter= size;
+  if (avail == entry)
+    entry= size;
 
-  DBUG_ASSERT(enter == size ||
-              (len == (avail >= enter)? (avail - enter) :
-               (size + avail - enter)));
-  DBUG_ASSERT(avail != enter);
+  DBUG_ASSERT(entry == size ||
+              (len == (avail >= entry)? (avail - entry) :
+               (size + avail - entry)));
+  DBUG_ASSERT(avail != entry);
 
   return ret;
 }
 
 /**
-   removing an item from the tail side
+   Similar to de_queue() but removing an item from the tail side.
+   
+   return  the queue's array index that the de-queued item
+           located at, or an error.
 */
 ulong circular_buffer_queue::de_tail(uchar *val)
 {
-  if (enter == size)
+  if (entry == size)
   {
     DBUG_ASSERT(len == 0);
     return (ulong) -1;
   }
 
-  avail= (enter + len - 1) % size;
+  avail= (entry + len - 1) % size;
   get_dynamic(&Q, val, avail);
   len--;
   
   // post boundary cond
-  if (avail == enter)
-    enter= size;
+  if (avail == entry)
+    entry= size;
 
-  DBUG_ASSERT(enter == size ||
-              (len == (avail >= enter)? (avail - enter) :
-               (size + avail - enter)));
-  DBUG_ASSERT(avail != enter);
+  DBUG_ASSERT(entry == size ||
+              (len == (avail >= entry)? (avail - entry) :
+               (size + avail - entry)));
+  DBUG_ASSERT(avail != entry);
 
   return avail;
 }
+
 /** 
-    @return the used index at success or -1 when queue is full
+    @return  the index where the arg item has been located
+             or an error.
 */
 ulong circular_buffer_queue::en_queue(void *item)
 {
@@ -981,20 +1056,20 @@ ulong circular_buffer_queue::en_queue(vo
 
 
   // pre-boundary cond
-  if (enter == size)
-    enter= avail;
+  if (entry == size)
+    entry= avail;
   
   avail= (avail + 1) % size;
   len++;
 
   // post-boundary cond
-  if (avail == enter)
+  if (avail == entry)
     avail= size;
 
-  DBUG_ASSERT(avail == enter || 
-              len == (avail >= enter) ?
-              (avail - enter) : (size + avail - enter));
-  DBUG_ASSERT(avail != enter);
+  DBUG_ASSERT(avail == entry || 
+              len == (avail >= entry) ?
+              (avail - entry) : (size + avail - entry));
+  DBUG_ASSERT(avail != entry);
 
   return ret;
 }
@@ -1002,13 +1077,13 @@ ulong circular_buffer_queue::en_queue(vo
 void* circular_buffer_queue::head_queue()
 {
   uchar *ret= NULL;
-  if (enter == size)
+  if (entry == size)
   {
     DBUG_ASSERT(len == 0);
   }
   else
   {
-    get_dynamic(&Q, (uchar*) ret, enter);
+    get_dynamic(&Q, (uchar*) ret, entry);
   }
   return (void*) ret;
 }
@@ -1027,15 +1102,15 @@ void* circular_buffer_queue::head_queue(
 bool circular_buffer_queue::gt(ulong i, ulong k)
 {
   DBUG_ASSERT(i < size && k < size);
-  DBUG_ASSERT(avail != enter);
+  DBUG_ASSERT(avail != entry);
 
-  if (i >= enter)
-    if (k >= enter)
+  if (i >= entry)
+    if (k >= entry)
       return i > k;
     else
       return FALSE;
   else
-    if (k >= enter)
+    if (k >= entry)
       return TRUE;
     else
       return i > k;
@@ -1046,7 +1121,7 @@ bool Slave_committed_queue::count_done(R
 {
   ulong i, cnt= 0;
 
-  for (i= enter; i != avail && !empty(); i= (i + 1) % size)
+  for (i= entry; i != avail && !empty(); i= (i + 1) % size)
   {
     Slave_job_group *ptr_g;
 
@@ -1090,7 +1165,7 @@ ulong Slave_committed_queue::move_queue_
 {
   ulong i, cnt= 0;
 
-  for (i= enter; i != avail && !empty();)
+  for (i= entry; i != avail && !empty();)
   {
     Slave_worker *w_i;
     Slave_job_group *ptr_g, g;
@@ -1098,7 +1173,8 @@ ulong Slave_committed_queue::move_queue_
     ulong ind;
 
 #ifndef DBUG_OFF
-    if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && cnt == mts_checkpoint_period)
+    if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
+        cnt == mts_checkpoint_period)
       return cnt;
 #endif
 
@@ -1175,14 +1251,14 @@ ulong Slave_committed_queue::move_queue_
 }
 
 /**
-   Method should be executed at slave system shutdown to 
+   Method should be executed at slave system stop to 
    cleanup dynamically allocated items that remained as unprocessed
    by shutdown time.
 */
 void Slave_committed_queue::free_dynamic_items()
 {
   ulong i;
-  for (i= enter; i != avail && !empty(); i= (i + 1) % size)
+  for (i= entry; i != avail && !empty(); i= (i + 1) % size)
   {
     Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
     if (ptr_g->group_relay_log_name)
@@ -1200,18 +1276,11 @@ void Slave_committed_queue::free_dynamic
   }
 }
 
-void Slave_worker::do_report(loglevel level, int err_code, const char *msg, va_list vargs) const
-{
-  c_rli->do_report(level, err_code, msg, vargs);
-}
 
-void Slave_worker::report(loglevel level, int err_code, const char *msg, ...) const
+void Slave_worker::do_report(loglevel level, int err_code, const char *msg,
+                             va_list args) const
 {
-  va_list vargs;
-  va_start(vargs, msg);
-
-  do_report(level, err_code, msg, vargs);
-  va_end(vargs);
+  c_rli->va_report(level, err_code, msg, args);
 }
 
 /**
@@ -1316,18 +1385,18 @@ static int en_queue(Slave_jobs_queue *jo
   set_dynamic(&jobs->Q, (uchar*) item, jobs->avail);
 
   // pre-boundary cond
-  if (jobs->enter == jobs->size)
-    jobs->enter= jobs->avail;
+  if (jobs->entry == jobs->size)
+    jobs->entry= jobs->avail;
   
   jobs->avail= (jobs->avail + 1) % jobs->size;
   jobs->len++;
 
   // post-boundary cond
-  if (jobs->avail == jobs->enter)
+  if (jobs->avail == jobs->entry)
     jobs->avail= jobs->size;
-  DBUG_ASSERT(jobs->avail == jobs->enter || 
-              jobs->len == (jobs->avail >= jobs->enter) ?
-              (jobs->avail - jobs->enter) : (jobs->size + jobs->avail - jobs->enter));
+  DBUG_ASSERT(jobs->avail == jobs->entry || 
+              jobs->len == (jobs->avail >= jobs->entry) ?
+              (jobs->avail - jobs->entry) : (jobs->size + jobs->avail - jobs->entry));
   return jobs->avail;
 }
 
@@ -1336,13 +1405,13 @@ static int en_queue(Slave_jobs_queue *jo
 */
 static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
 {
-  if (jobs->enter == jobs->size)
+  if (jobs->entry == jobs->size)
   {
     DBUG_ASSERT(jobs->len == 0);
     ret->data= NULL;               // todo: move to caller
     return NULL;
   }
-  get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+  get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
 
   DBUG_ASSERT(ret->data);         // todo: move to caller
  
@@ -1355,26 +1424,27 @@ static void * head_queue(Slave_jobs_queu
 */
 Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
 {
-  if (jobs->enter == jobs->size)
+  if (jobs->entry == jobs->size)
   {
     DBUG_ASSERT(jobs->len == 0);
     return NULL;
   }
-  get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+  get_dynamic(&jobs->Q, (uchar*) ret, jobs->entry);
   jobs->len--;
   
   // pre boundary cond
   if (jobs->avail == jobs->size)
-    jobs->avail= jobs->enter;
-  jobs->enter= (jobs->enter + 1) % jobs->size;
+    jobs->avail= jobs->entry;
+  jobs->entry= (jobs->entry + 1) % jobs->size;
 
   // post boundary cond
-  if (jobs->avail == jobs->enter)
-    jobs->enter= jobs->size;
+  if (jobs->avail == jobs->entry)
+    jobs->entry= jobs->size;
 
-  DBUG_ASSERT(jobs->enter == jobs->size ||
-              (jobs->len == (jobs->avail >= jobs->enter)? (jobs->avail - jobs->enter) :
-               (jobs->size + jobs->avail - jobs->enter)));
+  DBUG_ASSERT(jobs->entry == jobs->size ||
+              (jobs->len == (jobs->avail >= jobs->entry) ?
+               (jobs->avail - jobs->entry) :
+               (jobs->size + jobs->avail - jobs->entry)));
 
   return ret;
 }
@@ -1423,7 +1493,7 @@ void append_item_to_jobs(slave_job_item 
   }
   rli->pending_jobs++;
   rli->mts_pending_jobs_size= new_pend_size;
-  rli->stmt_jobs++;
+  rli->mts_events_assigned++;
 
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
@@ -1621,13 +1691,12 @@ int slave_worker_exec_job(Slave_worker *
     worker->slave_worker_ends_group(ev, error); /* last done sets post exec */
 
 #ifndef DBUG_OFF
-    worker->processed_group++;
     DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group"
-               " %u processed %u debug %d\n", worker->id, mts_checkpoint_group,
-               worker->processed_group,
+               " %u processed %lu debug %d\n", worker->id, mts_checkpoint_group,
+               worker->groups_done,
                DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
     if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
-        mts_checkpoint_group == worker->processed_group)
+        mts_checkpoint_group == worker->groups_done)
     {
       DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id));
       while (true) my_sleep(6000000);
@@ -1693,7 +1762,7 @@ int slave_worker_exec_job(Slave_worker *
   
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
-  worker->stmt_jobs++;
+  worker->events_done++;
 
 err:
   if (error)
@@ -1705,13 +1774,10 @@ err:
     worker->slave_worker_ends_group(ev, error);
   }
   
-  // rows_query_log_event is deleted as a part of the statement cleanup
-
-  // todo: sync_slave_with_master fails when my_sleep(1000) is put here
 
+  // todo: similate delay in delete
   if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
   {
-    worker->last_event= ev;
     delete ev;
   }
   

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-06-27 17:31:45 +0000
+++ b/sql/rpl_rli_pdb.h	2011-07-01 12:48:25 +0000
@@ -7,7 +7,7 @@
 #include <my_sys.h>
 #include <my_bitmap.h>
 
-/* APH entry */
+/* Assigned Partition Hash (APH) entry */
 typedef struct st_db_worker_hash_entry
 {
   uint  db_len;
@@ -18,12 +18,13 @@ typedef struct st_db_worker_hash_entry
     The list of temp tables belonging to @ db database is
     attached to an assigned @c worker to become its thd->temporary_tables.
     The list is updated with every ddl incl CREATE, DROP.
-    It is removed from the entry and merged to the coordinator's thd->temporary_tables
-    in case of events: slave stops, the db-to-worker hash oversize.
+    It is removed from the entry and merged to the coordinator's
+    thd->temporary_tables in case of events: slave stops, APH oversize.
   */
   TABLE* volatile temporary_tables;
 
-  /* todo: relax concurrency after making APH mutex/cond pair has worked
+  /* todo: relax concurrency to mimic record-level locking.
+     That is to augmenting the entry with mutex/cond pair
      pthread_mutex_t
      pthread_cond_t
      timestamp updated_at; */
@@ -64,14 +65,14 @@ public:
   DYNAMIC_ARRAY Q;
   ulong size;           // the Size of the queue in terms of element
   ulong avail;          // first Available index to append at (next to tail)
-  ulong enter;          // the head index
+  ulong entry;          // the head index or the entry point to the queue.
   volatile ulong len;   // actual length
   bool inited_queue;
 
   circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
-    size(max), avail(0), enter(max), len(0), inited_queue(FALSE)
+    size(max), avail(0), entry(max), len(0), inited_queue(FALSE)
   {
-    DBUG_ASSERT(size < ULONG_MAX);
+    DBUG_ASSERT(size < (ulong) -1);
     if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
       inited_queue= TRUE;
   }
@@ -99,9 +100,7 @@ public:
   /**
     return the index where the arg item locates
            or an error encoded as a value in beyond of the legacy range
-           [0, circular_buffer_max_index].
-           
-           Todo: define the range.
+           [0, size) (value `size' is excluded).
   */
   ulong en_queue(void *item);
   /**
@@ -111,8 +110,8 @@ public:
   bool   gt(ulong i, ulong k); // comparision of ordering of two entities
   /* index is within the valid range */
   bool in(ulong k) { return !empty() && 
-      (enter > avail ? (k >= enter || k < avail) : (k >= enter && k < avail)); }
-  bool empty() { return enter == size; }
+      (entry > avail ? (k >= entry || k < avail) : (k >= entry && k < avail)); }
+  bool empty() { return entry == size; }
   bool full() { return avail == size; }
 };
 
@@ -132,13 +131,12 @@ typedef struct st_slave_job_group
   */
   char     *group_relay_log_name; // The value is last seen relay-log 
   my_off_t group_relay_log_pos;  // filled by W
-
   ulong worker_id;
   Slave_worker *worker;
   ulonglong total_seqno;
 
   my_off_t master_log_pos;       // B-event log_pos
-  /* checkpoint coord are reset by CP and rotate:s */
+  /* checkpoint coord are reset by periodical and special (Rotate event) CP:s */
   uint  checkpoint_seqno;
   my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
   char*    checkpoint_log_name;
@@ -210,6 +208,15 @@ public:
   ulong move_queue_head(DYNAMIC_ARRAY *ws);
   /* Method is for slave shutdown time cleanup */
   void free_dynamic_items();
+  /* 
+     returns a pointer to Slave_job_group struct instance as indexed by arg
+     in the circular buffer dyn-array 
+  */
+  Slave_job_group* get_job_group(ulong ind)
+  {
+    return (Slave_job_group*) dynamic_array_ptr(&Q, ind);
+  }
+
 };
 
 class Slave_jobs_queue : public circular_buffer_queue
@@ -228,50 +235,43 @@ public:
                Relay_log_info *rli);
   virtual ~Slave_worker();
 
-  mysql_mutex_t jobs_lock;
-  mysql_cond_t  jobs_cond;
-  Slave_jobs_queue jobs;
-
-  Relay_log_info *c_rli;
-
-  DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
-
-  bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
-  List<Log_event> data_in_use; // events are still in use by SQL thread
-  ulong id;
-  TABLE *current_table;
-
-  // rbr
-  // RPL_TABLE_LIST *tables_to_lock;           /* RBR: Tables to lock  */
-  // uint tables_to_lock_count;        /* RBR: Count of tables to lock */
-  // table_mapping m_table_map;      /* RBR: Mapping table-id to table */
-
-  // statictics
+  Slave_jobs_queue jobs;   // assignment queue containing events to execute
+  mysql_mutex_t jobs_lock; // mutex for the jobs queue
+  mysql_cond_t  jobs_cond; // condition variable for the jobs queue
+  Relay_log_info *c_rli;   // pointer to Coordinator's rli
+  DYNAMIC_ARRAY curr_group_exec_parts; // Current Group Executed Partitions
+  bool curr_group_seen_begin; // is set to TRUE with explicit B-event
+  ulong id;                 // numberic identifier of the Worker
 
   /*
-    @c last_group_done_index is for statistics
-    to mean the index in GAQ of the last processed group.
+    Worker runtime statictics
   */
-  volatile ulong last_group_done_index; // it's index in GAQ
-  ulong wq_empty_waits;  // to gather statistics how many times got idle
-  ulong stmt_jobs;  // how many jobs per stmt
-  ulong trans_jobs;  // how many jobs per trns
-  volatile int curr_jobs; // the current assignments
-  long usage_partition; // number of different partitions handled by this worker
+  // the index in GAQ of the last processed group by this Worker
+  volatile ulong last_group_done_index;
+  ulong wq_empty_waits;  // how many times got idle
+  ulong events_done;     // how many events (statements) processed
+  ulong groups_done;     // how many groups (transactions) processed
+  volatile int curr_jobs; // number of active  assignments
+  // number of partitions allocated to the worker at point in time
+  long usage_partition;
+
   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
   volatile bool checkpoint_notified; // Coord sets and resets, W can read
-  ulong bitmap_shifted;   // shift the last bitmap at receiving new CP
-  bool wq_overrun_set;  // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
+  ulong bitmap_shifted;  // shift the last bitmap at receiving new CP
+  bool wq_overrun_set;   // W marks inself as incrementer of rli->mts_wq_excess
+
+  /*
+    Coordinatates of the last CheckPoint (CP) this Worker has
+    acknowledged; part of is persisent data
+  */
   char checkpoint_relay_log_name[FN_REFLEN];
   ulonglong checkpoint_relay_log_pos;
-
   char checkpoint_master_log_name[FN_REFLEN];
   ulonglong checkpoint_master_log_pos;
-  ulong checkpoint_seqno;
-  MY_BITMAP group_execed;
-  MY_BITMAP group_shifted;
-  bool inited_group_execed;
-  uint processed_group;
+  MY_BITMAP group_executed; // bitmap describes groups executed after last CP
+  MY_BITMAP group_shifted;  // temporary bitmap to compute group_executed
+  bool inited_group_executed;
+  ulong checkpoint_seqno;   // the most significant ON bit in group_executed
   enum en_running_state
   {
     NOT_RUNNING= 0,
@@ -279,26 +279,24 @@ public:
     KILLED
   };
   en_running_state volatile running_status;
-  Log_event *last_event;
+  bool inited_curr_group_exec_parts;
 
+  int init_worker(Relay_log_info*, ulong);
   int init_info();
   void end_info();
   int flush_info(bool force= FALSE);
-
   size_t get_number_worker_fields();
-
-  void slave_worker_ends_group(Log_event*, int);  // CGEP walk through to upd APH
-
+  void slave_worker_ends_group(Log_event*, int);
   bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
 
-  void report(loglevel level, int err_code, const char *msg, ...) const
-    ATTRIBUTE_FORMAT(printf, 4, 5);
-  void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
+protected:
+
+  virtual void do_report(loglevel level, int err_code,
+                         const char *msg, va_list v_args) const;
 
 private:
   bool read_info(Rpl_info_handler *from);
   bool write_info(Rpl_info_handler *to);
-
   Slave_worker& operator=(const Slave_worker& info);
   Slave_worker(const Slave_worker& info);
 };

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-07-01 08:45:11 +0000
+++ b/sql/rpl_slave.cc	2011-07-01 13:41:35 +0000
@@ -84,24 +84,22 @@ const char *relay_log_basename= 0;
   of Relay_log_info::gaq (see @c slave_start_workers()).
   It can be set to any value in [1, ULONG_MAX - 1] range.
 */
-ulong mts_slave_worker_queue_len_max= 32768;
+const ulong mts_slave_worker_queue_len_max= 32768;
 
 /*
   MTS load-ballancing parameter.
   Time in microsecs to sleep by MTS Coordinator to avoid the Worker queues
   room overrun.
 */
-ulong mts_coordinator_basic_nap= 5;
+const ulong mts_coordinator_basic_nap= 5;
 
 /*
   MTS load-ballancing parameter.
   Percent of Worker queue size at which Worker is considered to become
   hungry.
 */
-ulong mts_worker_underrun_level= 10;
-
-
-
+const ulong mts_worker_underrun_level= 10;
+Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
 void append_item_to_jobs(slave_job_item *job_item,
                          Slave_worker *w, Relay_log_info *rli);
 
@@ -2820,10 +2818,11 @@ int apply_event_and_update_pos(Log_event
         // all events except BEGIN-query must be marked with a non-NULL Worker
         DBUG_ASSERT(((Slave_worker*) ev->worker) == rli->last_assigned_worker);
 
-        DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
+        DBUG_PRINT("Log_event::apply_event:",
+                   ("-> job item data %p to W_%lu", job_item->data, w->id));
 
         // Reset mts in-group state
-        if (ev->ends_group() || !rli->curr_group_seen_begin)
+        if (rli->mts_group_status == Relay_log_info::MTS_END_GROUP)
         {
           // CGAP cleanup
           for (uint i= rli->curr_group_assigned_parts.elements; i > 0; i--)
@@ -2943,12 +2942,7 @@ int apply_event_and_update_pos(Log_event
     else
     {
       DBUG_ASSERT(*ptr_ev == ev || rli->is_parallel_exec());
-      /* 
-         event_relay_log_pos is an anchor to possible reading restart.
-         It may become lt than group_* value.
-         However event_relay_log_pos does not affect group_relay_log_pos
-         othen that through the sequentially executed events or via checkpoint.
-      */
+
       rli->inc_event_relay_log_pos();
     }
 
@@ -3074,15 +3068,14 @@ static int exec_relay_log_event(THD* thd
     /*
       This tests if the position of the beginning of the current event
       hits the UNTIL barrier.
-      MTS: since master,relay-group coordinates change per checkpoint
-      at the end of the checkpoint interval UNTIL can be left far behind.
+      MTS: since the master and the relay-group coordinates change 
+      asynchronously logics of rli->is_until_satisfied() can't apply.
       Hence, UNTIL forces the sequential applying.
     */
     if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
         rli->is_until_satisfied(thd, ev))
     {
       char buf[22];
-
       sql_print_information("Slave SQL thread stopped because it reached its"
                             " UNTIL position %s", llstr(rli->until_pos(), buf));
       /*
@@ -3137,7 +3130,7 @@ static int exec_relay_log_event(THD* thd
         ev= NULL;
       }
     }
-  
+
     /*
       update_log_pos failed: this should not happen, so we don't
       retry.
@@ -3754,11 +3747,8 @@ int check_temp_dir(char* tmp_file)
   DBUG_RETURN(0);
 }
 
-Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
-
 /*
-  Worker thread for the parallel execution of the replication events
-  MHS_todo: consider how to handle error
+  Worker thread for the parallel execution of the replication events.
 */
 pthread_handler_t handle_slave_worker(void *arg)
 {
@@ -3848,8 +3838,9 @@ pthread_handler_t handle_slave_worker(vo
   sql_print_information("Worker %lu statistics: "
                         "events processed = %lu "
                         "hungry waits = %lu "
-                        "priv queue overfills = %llu "
-                        ,w->id, w->stmt_jobs, w->wq_size_waits, w->jobs.waited_overfill);
+                        "priv queue overfills = %llu ",
+                        w->id, w->events_done, w->wq_size_waits,
+                        w->jobs.waited_overfill);
   mysql_cond_signal(&w->jobs_cond);  // famous last goodbye
 
   mysql_mutex_unlock(&w->jobs_lock);
@@ -3935,7 +3926,8 @@ bool mts_recovery_groups(Relay_log_info 
     Slave_worker *worker=
       Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
     worker->init_info();
-    LOG_POS_COORD w_last= {worker->group_master_log_name, worker->group_master_log_pos};
+    LOG_POS_COORD w_last= { const_cast<char*>(worker->get_group_master_log_name()),
+                            worker->get_group_master_log_pos() };
     if (mts_event_coord_cmp(&w_last, &cp) > 0)
     {
       /*
@@ -3990,16 +3982,17 @@ bool mts_recovery_groups(Relay_log_info 
   {
     Slave_worker *w= ((Slave_job_group *)
                       dynamic_array_ptr(&above_lwm_jobs, it_job))->worker;
-    LOG_POS_COORD w_last= { w->group_master_log_name, w->group_master_log_pos };
+    LOG_POS_COORD w_last= { const_cast<char*>(w->get_group_master_log_name()),
+                            w->get_group_master_log_pos() };
 
     sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
                           "group_relay_log_name %s, group_relay_log_pos %lu "
                           "group_master_log_name %s, group_master_log_pos %lu",
                           w->id,
-                          w->group_relay_log_name,
-                          (ulong) w->group_relay_log_pos,
-                          w->group_master_log_name,
-                          (ulong) w->group_master_log_pos);
+                          w->get_group_relay_log_name(),
+                          (ulong) w->get_group_relay_log_pos(),
+                          w->get_group_master_log_name(),
+                          (ulong) w->get_group_master_log_pos());
 
     recovery_group_cnt= 0;
     not_reached_commit= true;
@@ -4052,14 +4045,14 @@ bool mts_recovery_groups(Relay_log_info 
 
           sql_print_information("Group Recoverying relay log info "
                                 "group_master_log_name %s, event_master_log_pos %llu.",
-                                rli->group_master_log_name,
+                                rli->get_group_master_log_name(),
                                 ev->log_pos);
           if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
           {
 #ifndef DBUG_OFF
             for (uint i= 0; i <= w->checkpoint_seqno; i++)
             {
-              if (bitmap_is_set(&w->group_execed, i))
+              if (bitmap_is_set(&w->group_executed, i))
                 DBUG_PRINT("mts", ("Bit %u is set.", i));
               else
                 DBUG_PRINT("mts", ("Bit %u is not set.", i));
@@ -4073,7 +4066,7 @@ bool mts_recovery_groups(Relay_log_info 
             for (uint i= (w->checkpoint_seqno + 1) - recovery_group_cnt,
                  j= 0; i <= w->checkpoint_seqno; i++, j++)
             {
-              if (bitmap_is_set(&w->group_execed, i))
+              if (bitmap_is_set(&w->group_executed, i))
               {
                 DBUG_PRINT("mts", ("Setting bit %u.", j));
                 bitmap_fast_test_and_set(groups, j);
@@ -4119,8 +4112,8 @@ err:
 }
 
 /**
-   Processing rli->gaq to find out the low-water-mark coordinates
-   stored into the cental recovery table.
+   Processing rli->gaq to find out the low-water-mark (lwm) coordinates
+   which is stored into the cental recovery table.
 
    @param rli    pointer to Relay-log-info of Coordinator
    @param period period of processing GAQ, normally derived from 
@@ -4164,12 +4157,6 @@ bool mts_checkpoint_routine(Relay_log_in
     DBUG_RETURN(FALSE);
   }
 
-  /*
-    This checks how many consecutive jobs where processed.
-    If this value is different than zero the checkpoint
-    routine can proceed. Otherwise, there is nothing to be
-    done.
-  */      
   do
   {
     cnt= rli->gaq->move_queue_head(&rli->workers);
@@ -4181,6 +4168,12 @@ bool mts_checkpoint_routine(Relay_log_in
   } while (cnt == 0 && (rli->gaq->full()  || force) &&
            !DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
            (my_sleep(rli->mts_coordinator_basic_nap), 1));
+  /*
+    This checks how many consecutive jobs where processed.
+    If this value is different than zero the checkpoint
+    routine can proceed. Otherwise, there is nothing to be
+    done.
+  */      
   if (cnt == 0)
     goto end;
 
@@ -4200,16 +4193,11 @@ bool mts_checkpoint_routine(Relay_log_in
     mysql_mutex_lock(&rli->data_lock);
 
   /*
-    Coordinator::commit_positions() {
+    "Coordinator::commit_positions" {
 
-    rli->gaq->lwm contains all but rli->group_master_log_name
-
-    group_master_log_name is updated only by Coordinator and it can't change
-    within checkpoint interval because Coordinator flushes the updated value
-    at once.
-    Note, unlike group_master_log_name, event_relay_log_pos is updated solely 
-    within Coordinator read loop context. Hence, it's possible at times 
-    event_rlp > group_rlp.
+    rli->gaq->lwm has been updated in move_queue_head() and 
+    to contain all but rli->group_master_log_name which 
+    is altered solely by Coordinator at special checkpoints.
   */
   rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
   rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
@@ -4244,6 +4232,8 @@ bool mts_checkpoint_routine(Relay_log_in
   */
   rli->reset_notified_checkpoint(cnt, rli->gaq->lwm.ts);
 
+  /* end-of "Coordinator::"commit_positions" */
+
 end:
 #ifndef DBUG_OFF
   if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0))
@@ -4255,17 +4245,18 @@ end:
 }
 
 /**
-   A single Worker thread is forked out.
+   Instantiation of a Slave_worker and forking out a single Worker thread.
    
+   @param  rli  Coordinator's Relay_log_info pointer
+   @param  i    identifier of the Worker
+
    @return 0 suppress or 1 if fails
 */
 int slave_start_single_worker(Relay_log_info *rli, ulong i)
 {
   int error= 0;
-  uint k;
   pthread_t th;
   Slave_worker *w= NULL;
-  Slave_job_item empty= {NULL};
 
   if (!(w=
       Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli)))
@@ -4275,52 +4266,14 @@ int slave_start_single_worker(Relay_log_
     goto err;
   }
 
-  w->c_rli= rli;
-  w->tables_to_lock= NULL;
-  w->tables_to_lock_count= 0;
-
-  if (w->init_info())
+  if (w->init_worker(rli, i))
   {
     sql_print_error("Failed during slave worker thread create");
     error= 1;
     goto err;
   }
-  
-  w->curr_group_exec_parts.elements= 0;
-  w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
-  w->checkpoint_notified= FALSE;
-  w->bitmap_shifted= 0;
-  w->workers= rli->workers; // shallow copying is sufficient
-  w->this_worker= w;
-  w->wq_size_waits= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
-  w->id= i;
-  w->current_table= NULL;
-  w->usage_partition= 0;
-  w->last_group_done_index= rli->gaq->size; // out of range
-
-  w->jobs.avail= 0;
-  w->jobs.len= 0;
-  w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
-  w->jobs.waited_overfill= 0;
-  w->jobs.enter= w->jobs.size= rli->mts_slave_worker_queue_len_max;
-  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.size, 0);
-  for (k= 0; k < w->jobs.size; k++)
-    insert_dynamic(&w->jobs.Q, (uchar*) &empty);
-  
-  DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size);
-  
-  w->wq_overrun_set= FALSE;
   set_dynamic(&rli->workers, (uchar*) &w, i);
-#ifdef HAVE_PSI_INTERFACE
-  mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
-                   MY_MUTEX_INIT_FAST);
-  mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL);
-#else
-  mysql_mutex_init(NULL, &w->jobs_lock, MY_MUTEX_INIT_FAST);
-  mysql_cond_init(NULL, &w->jobs_cond, NULL);
-#endif
 
-  w->curr_group_seen_begin= FALSE;
   if (pthread_create(&th, &connection_attrib, handle_slave_worker,
                      (void*) w))
   {
@@ -4340,7 +4293,14 @@ err:
   return error;
 }
 
+/**
+   Initialization of the central rli members for Coordinator's role,
+   communication channels such as Assigned Partition Hash (APH),
+   and starting the Worker pool.
 
+   @return 0         success
+           non-zero  as failure
+*/
 int slave_start_workers(Relay_log_info *rli, ulong n)
 {
   uint i;
@@ -4354,7 +4314,8 @@ int slave_start_workers(Relay_log_info *
   rli->init_workers(n);
 
   // CGAP dynarray holds id:s of partitions of the Current being executed Group
-  my_init_dynamic_array(&rli->curr_group_assigned_parts, sizeof(db_worker_hash_entry*),
+  my_init_dynamic_array(&rli->curr_group_assigned_parts,
+                        sizeof(db_worker_hash_entry*),
                         SLAVE_INIT_DBS_IN_GROUP, 1);
   rli->last_assigned_worker= NULL;     // associated with curr_group_assigned
   my_init_dynamic_array(&rli->curr_group_da, sizeof(Log_event*), 8, 2);
@@ -4364,13 +4325,13 @@ int slave_start_workers(Relay_log_info *
   /* 
      GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
      @c lwm_checkpoint_period to update GAQ (see @c next_event())
-     The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max to guarantee
-     each assigned job being sent to a WQ will find room in GAQ.
+     The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max
+     to guarantee each assigned job being sent to a WQ will find room in GAQ.
      mts_slave_worker_queue_len_max * num-of-W:s is the max length case 
      all jobs contain one event.
   */
 
-  // size of WQ stays fixed in one slave session
+  // length of WQ is actually constant though can be made configurable
   rli->mts_slave_worker_queue_len_max= mts_slave_worker_queue_len_max;
   rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
                                       sizeof(Slave_job_group),
@@ -4387,7 +4348,6 @@ int slave_start_workers(Relay_log_info *
   rli->mts_wq_oversize= FALSE;
   rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap;
   rli->mts_worker_underrun_level= mts_worker_underrun_level;
-  rli->mts_total_groups= 0;
   rli->curr_group_seen_begin= FALSE;
   rli->curr_group_isolated= FALSE;
   rli->checkpoint_seqno= 0;
@@ -4526,7 +4486,7 @@ void slave_stop_workers(Relay_log_info *
                         "waited due a Worker queue full = %lu "
                         "waited due the total size = %lu "
                         "sleept when Workers occupied = %lu ",
-                        rli->stmt_jobs, rli->mts_wq_overrun_cnt,
+                        rli->mts_events_assigned, rli->mts_wq_overrun_cnt,
                         rli->mts_wq_overfill_cnt, rli->wq_size_waits,
                         rli->mts_wq_no_underrun_cnt);
 
@@ -4586,7 +4546,6 @@ pthread_handler_t handle_slave_sql(void 
   rli->slave_running = 1;
 
   pthread_detach_this_thread();
-
   if (init_slave_thread(thd, SLAVE_THD_SQL))
   {
     /*
@@ -4876,7 +4835,7 @@ llstr(rli->get_group_master_log_pos(), l
   thd->reset_query();
   thd->reset_db(NULL, 0);
 
-  slave_stop_workers(rli); // mts-II: stopping the worker pool
+  slave_stop_workers(rli);
 
   THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
   mysql_mutex_lock(&rli->run_lock);
@@ -7222,7 +7181,7 @@ bool change_master(THD* thd, Master_info
     in-memory value at restart (thus causing errors, as the old relay log does
     not exist anymore).
 
-    Notice that there are not writes to the rli table as slave is not
+    Notice that the rli table is available exclusively as slave is not
     running.
   */
   DBUG_ASSERT(!mi->rli->slave_running);

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-06-29 07:04:19 +0000
+++ b/sql/sql_class.h	2011-07-01 13:41:35 +0000
@@ -1945,7 +1945,8 @@ public:
   /*
     MTS: accessor to binlog_accessed_db_names list
   */
-  List<char> * get_binlog_accessed_db_names() {
+  List<char> * get_binlog_accessed_db_names()
+  {
     return binlog_accessed_db_names;
   }
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3329) Andrei Elkin4 Jul