List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 30 2011 10:05am
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3280) WL#5569
WL#5754
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3280 Andrei Elkin	2011-05-30
      WL#5569 MTS
      WL#5754 Query event parallel applying
      
      -----------------------------------------------------------------
      Aggregating 7 commits that are not pushed yet to the wl5569 repo.
      Find comments for each cset below.
      ------------------------------------------------------------------
      
      The current patch addresses concurrent updating slave_open_temp_tables
      status counter.
      The former declaration of the underlying server variable is changed from
      ulong to int32. While that might affect (shrink) the actual range, there has been
      no specified range and now after the number of bits is the same on all platforms
      the range cat be set to be
      
      [0, max(int32)]
      
      ******
      wl#5569 MTS
      Wl#5754 Query event parallel appying
      wl#5599 MTS recovery
      
      The patch includes
      some cleanup, including one for temp tables support, realization of few todo:s.
      
      ******
      wl#5569 MTS
      wl#5754 Query event parallel applying
      
      More cleanup is done;
      Fixing temp tables manipulation.
      Asserting an impossible to support use case of group of events
      not wrapped with BEGIN/COMMIT.
      
      Todo: recognize old master binlog to refuse to run in parallel.
      
      ******
      wl#5569 MTS
      
      Implementation of giving out the applier role to Worker for all cases but
      ones dealing with the Coordinators state.
      That includes Query event with over-max-db:s and Load-data related events.
      The current patch also makes old master binlog be handled by MTS though
      sometimes e.g for Query event to switch to the sequential mode.
      
      Fixing a race condition making C to wait endlessly if a Worker has exitted due to its
      applying error.
      
      ******
      wl#5569 MTS
      
      correcting an assert that was used to fire as warned in the previous commit.
      Parallel feature tests pass now.
      ******
      wl#5569 MTS
      
      This patch contains cleanup and simplification of logics of handling some events 
      sequentially by Coordinator and adds memory-allocation failure branch 
      to workers starting routine.
      ******
      wl#5569 MTS
      
      An intermediate patch to address few issues raised by reviewers.
      To sum up, it's about cleanup and logics simplification of event distribution
      to Worker and consequent actions.
      Some efforts were paid to support Old Master Begin-less group of events.
     @ mysql-test/extra/rpl_tests/rpl_parallel_load_innodb.test
        Elaborated version of rpl_parallel_load generator still narrowed down to
        test performance with Innodb.
     @ mysql-test/suite/rpl/r/rpl_parallel_ddl.result
        results updated.
     @ mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
        results updated.
     @ mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
        results got updated.
        ******
        results updated.
     @ mysql-test/suite/rpl/t/disabled.def
        Disabling few tests that triggers the assert installed in log_event.cc of this commit.
        
        ******
        Restoring tree tests as this patch makes them runable.
     @ mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
        test can't run in MTS because of trans retry.
     @ mysql-test/suite/rpl/t/rpl_dual_pos_advance.test
        test can't run in MTS because of Until option of START SLAVE is not yet supported by  MTS.
     @ mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt
        rpl_parallel tests need --slave-transaction-retries=0
     @ mysql-test/suite/rpl/t/rpl_parallel_innodb-master.opt
        new test opt file is added.
     @ mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt
        new test opt file is added.
        ******
        rpl_parallel tests need --slave-transaction-retries=0
     @ mysql-test/suite/rpl/t/rpl_parallel_innodb.test
        Elaborated version of rpl_parallel narrowed down to
        test performance with Innodb.
     @ mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt
        rpl_parallel tests need --slave-transaction-retries=0
     @ mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt
        rpl_parallel tests need --slave-transaction-retries=0
     @ mysql-test/suite/rpl/t/rpl_parallel_temp_query.test
        Adding logics to watch Slave_open_temp_tables in face of its concurrent updating.
     @ sql/event_parse_data.cc
        Pleasing some tests.
     @ sql/field.cc
        Restoring asserts that were before changes to sql_base.cc.
        
        ******
        Old master binlog events can't be run in parallel 
        for few reasons. Therefore that paticular branch of code
        is irrelevant for MTS.
     @ sql/handler.cc
        Restoring assert that were before changes to sql_base.cc.
     @ sql/log_event.cc
        cleanup, incl restoration of the trunk version of some pieces of code.
        passing future_event_relay_log_pos to Worker to stike out a todo in rpl_rli.cc.
        
        ******
        Asserting a not-implemented support of group of events not braced
        with BEGIN/COMMIT(Xid).
        Such groups are possible in stored routine logging and when an old
        server binlog file is adopted by MTS-aware slave.
        
        ******
        Making a group of event w/o B/C braces be handled by Worker.
        Such group can happen from an old master or the current master bilogging 
        some SP queries.
        Also over-max db:s events are made to be handled by Worker.
        Coordinator only handles asyncrounously events dealing with Relay-log state
        and synchrounously events dealing with checkpoint changes (master-group coordinates).
        Also few types of events from OM are left to Coordinator to execute.
        
        
        ******
        Cleanup and simplification of logics of handling some events sequentially
        by Coordinator.
        An event is marked as parallel or sequential through C's rli that affects
        commit to info table by C as well as the event's destruction.
        
        ******
        Cleanup and logics simplification in Log_event::get_slave_worker_id(),
        Log_event::apply_event().
        The essense is:
        a. to return back to apply_event_and_update_pos() event
           associated either with the single-threaded
           sql-thread rli, or one of Coord or Worker.
        b. while the beginning of a group and corresponding actions are left
           to Log_event::get_slave_worker_id(),
           other actions including passing the event to a Worker and the final closure
           of the current group is moved into apply_event_and_update_pos().
        
        Correcting Query_log_event::at-,de-tach_temp_tables() to expect the magic "-empty
        string name db partition through which the applier thread receives temp tables.
     @ sql/log_event.h
        Leaving in mts_sequential_exec() only events that either
        can deal with Coordinator state, or are from old master.
        Making  Query_log_event::mts_get_dbs  to return a list with 
        a magic ""-empty string partition name
        in case of over-max db:s query.
        The empty magic is converted into a record to APH to indicate
        the whole hash records lock.
        ******
        More members are added to Log_event 
        a. to associate the event with applier.
        b. to provide marking a B-less group of events (old master, select sf()).
     @ sql/mysqld.cc
        Turning slave_open_temp_tables from ulong to int32 and adding
        atomic locks declaration for the counter updating.
     @ sql/mysqld.h
        Extern-lizing slave_open_temp_tables_lock;
     @ sql/rpl_rli.cc
        Initializing/destorying slave_open_temp_tables lock at the same time
        with Workers.
        ******
        passing future_event_relay_log_pos is done via an assignment to 
        Worker's member in slave_worker_exec_job().
     @ sql/rpl_rli.h
        restoring the original version of get_table_data() though
         no real changes.
        
        ******
        simplified (curr_group_is_parallel + curr_group_split) into curr_*event*_is_parallel.
        ******
        Removing rli members that aren't necessary any longer.
     @ sql/rpl_rli_pdb.cc
        cleanup.
        ******
        Removing redundant my_hash_update;
        cleanup;
        Fixing temp tables related issue of leaving wait_for_worker
        without all entries of APH given out their temp tables.
        ******
        Changes due to redifining an object responsible to hold assigned partitions
        in few methods incl  Slave_worker::slave_worker_ends_group().
        Some cleanup in get_slave_worker().
        ******
        cleanup, a new assert, and init of an debug-related member.
     @ sql/rpl_rli_pdb.h
        Redifining an object responsible to hold assigned partitions.
        Now it's a Dyn-array holding *pointers* to records on Assigned Partition Hash.
        That simplifies few routines for Worker. E.g search for the records (entries of APH) by Worker at time
        of committing.
        
        ******
        Adding GAQ memory-allocation failure notification.
        ******
        Memorizing last deleted event for debugging purpose.
     @ sql/rpl_slave.cc
        Adding info message to the error log;
        improving comments.
        ******
        Restoring original sequential mode version of assert
        in sql_slave_killed.
        Worker is not supposed to run this function.
        Testing of skipping logics is left to the rpl suite
         be run in the parallel mode.
        Cleanup.
        Marking recovery related todo items explicitly.
        Setting up guards to guarantee sequential mode in requested
          points of the code.
        
        ******
        Streamlining Workers state identification with a boolean running_status;
        worker start and stop are controlled by means of the disignator.
        
        ******
        simplified (curr_group_is_parallel + curr_group_split) into curr_event_is_parallel;
        GAQ memory-allocation failure branch is added to workers starting routine.
        ******
        Cleanup and,
        moving append_item_to_jobs() invocation into  apply_event_and_update_pos()
        as well as other actions mentioned in log_event.cc comments;
        changing signature of apply_event_and_update_pos() to return NULL in place of
        referrenced pointer in case the event is handed over to a Worker;
        checking of the pointer value is done in places dealing with update-pos and
        event's destruction.
     @ sql/sql_base.cc
        Replacing slave opened temp tables counter incr/decr with a function
        perfoming atomic locking in case Worker runs it.
        ******
        removing unnecessary return value in incr_slave_open_temp_tables def.
        ******
        Func is renamed.
        Removing all traces of previous idea to return value out of
        modify_slave_open_temp_tables.
     @ sql/sql_parse.cc
        cleanup.

    added:
      mysql-test/extra/rpl_tests/rpl_parallel_load_innodb.test
      mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel_innodb-master.opt
      mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel_innodb.test
      mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt
    modified:
      mysql-test/suite/rpl/r/rpl_parallel_ddl.result
      mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
      mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
      mysql-test/suite/rpl/t/disabled.def
      mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
      mysql-test/suite/rpl/t/rpl_dual_pos_advance.test
      mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel_temp_query.test
      sql/event_parse_data.cc
      sql/field.cc
      sql/handler.cc
      sql/log_event.cc
      sql/log_event.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sql_base.cc
      sql/sql_parse.cc
=== added file 'mysql-test/extra/rpl_tests/rpl_parallel_load_innodb.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load_innodb.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load_innodb.test	2011-05-30 10:05:07 +0000
@@ -0,0 +1,333 @@
+#
+# This is a load generator to call from rpl_parallel and rpl_sequential tests
+
+#
+#
+# load volume parameter
+#
+
+let $iter = 20;
+let $init_rows= 50;
+
+#
+# Distribution of queries within an iteration:
+# legends:
+# auto = auto_increment=1, trans = inside BEGIN-COMMIT, 
+# del = Delete, ins =- Insert, upd = Update
+#
+let $ins_auto_wk= 1;
+let $ins_auto_nk= 1;
+let $ins_trans_wk= 1;
+let $ins_trans_nk= 1;
+let $upd_trans_nk= 0;
+let $upd_trans_wk= 1;
+let $del_trans_nk= 0;
+let $del_trans_wk= 1;
+
+# windows run on PB2 is too slow to time out
+disable_query_log;
+if (`select convert(@@version_compile_os using latin1) IN ("Win32","Win64","Windows") as "TRUE"`)
+{
+  let $iter = 100;
+}
+enable_query_log;
+
+let $databases = 16;
+
+connection slave;
+
+call mtr.add_suppression('Slave: Error dropping database'); ## todo: fix
+
+source include/stop_slave.inc;
+start slave;
+
+connection master;
+
+--disable_query_log
+--disable_result_log
+
+
+let $i = $databases + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  
+  eval create database test$i1;
+  eval use test$i1;
+  create table ti_nk (a int, b int, c text) engine=innodb;
+  create table ti_wk (a int auto_increment primary key, b int, c text) engine=innodb;
+  let $l1= $init_rows;
+  while($l1)
+  {
+       	eval insert into ti_nk values($l1, $i1, uuid());
+	dec $l1;
+  }
+  
+  if (`select @@global.binlog_format like 'statement'`)
+  {
+     create view v_ti_nk as select a,b from ti_nk;
+     create view v_ti_wk as select a,b from ti_wk;
+  }
+  
+  if (`select @@global.binlog_format not like 'statement'`)
+  {
+     create view v_ti_nk as select a,b,c from ti_nk;
+     create view v_ti_wk as select a,b,c from ti_wk;
+  }
+
+  # this table is special - just for timing. It's more special on test0 db
+  # where it contains master timing of the load as well.
+  create table benchmark (state text) engine=myisam; # timestamp keep on the slave side
+
+  dec $i;
+}
+
+--enable_result_log
+--enable_query_log
+
+
+sync_slave_with_master;
+#connection slave;
+
+--disable_query_log
+--disable_result_log
+
+let $i = $databases + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  eval use test$i1;
+  alter table benchmark add ts timestamp not null default current_timestamp;
+
+  dec $i;
+}
+--enable_result_log
+--enable_query_log
+
+
+# not gather events into relay log w/o executing yet
+stop slave sql_thread;
+
+##call p1(1);
+
+connection master;
+
+--disable_query_log
+--disable_result_log
+
+#
+# Load producer
+#
+
+# initial timestamp to record
+
+# the extra ts col on slave is effective only with the STMT format (todo: bug-report)
+set @save.binlog_format= @@session.binlog_format;
+set @@session.binlog_format=STATEMENT;
+let $i = $databases + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  eval use test$i1;
+
+  insert into benchmark set state='slave takes on load';
+
+  dec $i;
+}
+set @@session.binlog_format= @save.binlog_format;
+
+connection slave;
+
+use test0;
+insert into benchmark set state='master started load';
+
+
+connection master;
+
+while ($iter)
+{
+    let $i = $databases + 1;
+
+    while ($i)
+    {
+	let $i1 = $i;
+	dec $i1;
+
+	eval use test$i1;
+
+        let $ins= $ins_auto_nk;
+	while ($ins)
+	{
+	    eval insert into ti_nk values($iter, $i1, uuid());
+	    dec $ins;
+	}
+
+        let $ins= $ins_auto_wk;
+	while ($ins)
+	{
+	    eval insert into ti_wk values(null,  $i1, uuid());
+	    dec $ins;
+	}
+
+	begin;
+
+        let $ins= $ins_trans_nk;
+	while ($ins)
+	{
+	    eval insert into ti_nk values($iter, $i1, uuid());
+	    dec $ins;
+	}
+
+        let $ins= $ins_trans_wk;
+	while ($ins)
+	{
+	    eval insert into ti_wk values(null,  $i1, repeat('a', round(rand()*10)));
+	    dec $ins;
+	}
+
+	let $min=`select min(a) from ti_nk`;
+        let $del= $del_trans_nk;
+	while ($del)
+	{
+	    eval delete from ti_nk where a= $min + $del;
+	    dec $del;
+	}
+
+	let $min=`select min(a) from ti_nk`;
+        let $del= $del_trans_wk;
+	while ($del)
+	{
+	    eval delete from ti_wk where a= $min + $del;
+	    dec $del;
+	}
+
+        let $upd= $upd_trans_nk;
+	while ($upd)
+	{
+	    update ti_nk set c= uuid();
+	    dec $upd;
+	}
+
+        let $upd= $upd_trans_wk;
+	while ($upd)
+	{
+	    update ti_wk set c= uuid();
+	    dec $upd;
+	}
+
+	commit;
+	dec $i;
+    }
+
+    dec $iter;
+}
+
+connection slave;
+
+use test0;
+insert into benchmark set state='master ends load';
+
+connection master;
+
+# terminal timestamp to record
+
+let $i = $databases + 1;
+set @save.binlog_format= @@session.binlog_format;
+set @@session.binlog_format=STATEMENT;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+  eval use test$i1;
+
+  insert into benchmark set state='slave is supposed to finish with load';
+
+  dec $i;
+}
+set @@session.binlog_format= @save.binlog_format;
+
+--enable_result_log
+--enable_query_log
+
+connection slave;
+
+## todo: record start and end time of appying to compare times of
+#  parallel and sequential execution.
+
+--disable_query_log
+--disable_result_log
+
+insert into test0.benchmark set state='slave is processing load';
+
+# To force filling timestamp cols with the slave local clock values
+# to implement benchmarking.
+
+set @save.mts_exp_slave_local_timestamp=@@global.mts_exp_slave_local_timestamp;
+set @@global.mts_exp_slave_local_timestamp=1;
+start slave sql_thread;
+
+let $wait_timeout= 600;
+let $wait_condition= SELECT count(*)+sleep(1) = 5 FROM test0.benchmark;
+source include/wait_condition.inc;
+
+use test0;
+insert into benchmark set state='slave ends load';
+
+use test;
+select * from test0.benchmark into outfile 'benchmark.out';
+select ts from test0.benchmark where state like 'master started load' into @m_0;
+select ts from test0.benchmark where state like 'master ends load' into @m_1;
+select ts from test0.benchmark where state like 'slave takes on load' into @s_m0;
+select ts from test0.benchmark where state like 'slave is supposed to finish with load' into @s_m1;
+
+select ts from test0.benchmark where state like 'slave ends load' into @s_1;
+select ts from test0.benchmark where state like 'slave is processing load' into @s_0;
+select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
+       time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s',
+       time_to_sec(@s_m1) - time_to_sec(@s_m0) as 'delta_sm'  into outfile 'delta.out';
+
+
+let $i = $databases + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+
+  let $diff_tables=master:test$i1.v_ti_nk, slave:test$i1.v_ti_nk;
+  source include/diff_tables.inc;
+
+  let $diff_tables=master:test$i1.v_ti_wk, slave:test$i1.v_ti_wk;
+  source include/diff_tables.inc;
+
+  dec $i;
+}
+--enable_result_log
+--enable_query_log
+
+
+connection master;
+
+--disable_query_log
+--disable_result_log
+
+let $i = $databases + 1;
+while($i)
+{
+  let $i1 = $i;
+  dec $i1;
+
+  eval drop database test$i1;
+  dec $i;
+}
+
+--enable_result_log
+--enable_query_log
+
+sync_slave_with_master;
+#connection slave;
+set @@global.mts_exp_slave_local_timestamp= @save.mts_exp_slave_local_timestamp;
+
+# End of the tests

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_ddl.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_ddl.result	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_ddl.result	2011-05-30 10:05:07 +0000
@@ -4,8 +4,6 @@ include/stop_slave.inc
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
 set @@global.mts_slave_parallel_workers= 4;
 include/start_slave.inc
-Warnings:
-Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 include/diff_tables.inc [master:d32.t8, slave:d32.t8]
 include/diff_tables.inc [master:d32.t7, slave:d32.t7]
 include/diff_tables.inc [master:d32.t6, slave:d32.t6]

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_multi_db.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	2011-05-30 10:05:07 +0000
@@ -4,8 +4,6 @@ include/stop_slave.inc
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
 set @@global.mts_slave_parallel_workers= 4;
 include/start_slave.inc
-Warnings:
-Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 create database d8;
 create table d8.t8 (a int);
 select round(rand()*8) into @var;

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_temp_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result	2011-05-30 10:05:07 +0000
@@ -1,12 +1,11 @@
 include/master-slave.inc
 [connection master]
 call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
+flush status;
 include/stop_slave.inc
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
 set @@global.mts_slave_parallel_workers= 4;
 include/start_slave.inc
-Warnings:
-Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 create database d2;
 use d2;
 create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;

=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def	2011-01-11 23:01:02 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def	2011-05-30 10:05:07 +0000
@@ -15,3 +15,4 @@ rpl_spec_variables        : BUG#47661 20
 rpl_row_event_max_size    : Bug#55675 2010-10-25 andrei mysql_binlog_send attempts to read events partly
 rpl_delayed_slave         : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
 rpl_log_pos               : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
+

=== modified file 'mysql-test/suite/rpl/t/rpl_deadlock_innodb.test'
--- a/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test	2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test	2011-05-30 10:05:07 +0000
@@ -2,3 +2,5 @@
 -- source include/have_innodb.inc
 let $engine_type=innodb;
 -- source extra/rpl_tests/rpl_deadlock.test
+# --slave-transaction-retries=0 in MTS
+-- source include/not_mts_slave_parallel_workers.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_dual_pos_advance.test'
--- a/mysql-test/suite/rpl/t/rpl_dual_pos_advance.test	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_dual_pos_advance.test	2011-05-30 10:05:07 +0000
@@ -7,6 +7,8 @@
 # It also will test BUG#13861.
 
 source include/have_innodb.inc;
+# Until option of START SLAVE is not yet supported by  MTS
+source include/not_mts_slave_parallel_workers.inc;
 
 --let $rpl_topology= 1->2->1
 --source include/rpl_init.inc

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt	2011-05-30 10:05:07 +0000
@@ -0,0 +1 @@
+--slave-transaction-retries=0

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_innodb-master.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_innodb-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_innodb-master.opt	2011-05-30 10:05:07 +0000
@@ -0,0 +1 @@
+--log-warnings=0

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt	2011-05-30 10:05:07 +0000
@@ -0,0 +1,5 @@
+--log-warnings=0 --slave-transaction-retries=0 --innodb_flush_log_at_trx_commit=0  --skip-log-bin --skip-log-slave-updates --sync_binlog=0
+
+
+
+

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_innodb.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_innodb.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_innodb.test	2011-05-30 10:05:07 +0000
@@ -0,0 +1,43 @@
+#
+# WL#5563 Prototype for Parallel Slave with db name partitioning.
+#
+# The test checks correctness of replication and is designed for 
+# benchmarking and comparision with results of its sequential
+# counterpart rpl_sequential.test.
+# Both tests leave mysqld.2/data/test/delta.out file
+# that contains a row with two columns.
+#  1. the duration (in seconds) of execution on the master
+#  2. the duration of execution on the slave
+#  The 2nd column of the rpl_parallel can be compared with the 2nd of rpl_sequential.test.
+#
+#  The duration recorded in the file accounts the SQL thread/workers work.
+#  That is benchmarking on the slave side is effectively started with
+#  `start slave sql_thread'.
+#  NOTICE, there is set @@global.slave_local_timestamp=1;
+#  
+#
+# of load that rpl_parallel_load.test represents.
+# See there how to tune load and concurrency parameters.
+#
+# Example of usage.
+# To gather a collection of figures:
+#  mysql-test$ export slave;
+#  mysql-test$ slave=parallel; for n in `seq 1 10`; 
+#  do ./mtr  --vardir=/dev/shm/var1 --mtr-build-thread=765 rpl_$slave 
+#             --mysqld=--binlog-format=statement; 
+#     find /dev/shm/var1 -name delta.out -exec cat {} \; | cat >> delta.$slave.log; 
+#  done
+#
+# mysql-test$ slave=sequential; ...
+#
+# In the end there will be mysql-test/delta.{parallel,sequential}.log files.
+#
+
+let $rpl_skip_reset_master_and_slave= 1;
+
+--source include/master-slave.inc
+
+connection master;
+source extra/rpl_tests/rpl_parallel_load_innodb.test;
+
+--source include/rpl_end.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt	2011-05-30 10:05:07 +0000
@@ -1 +1,2 @@
---thread_stack=512K
+--thread_stack=512K --slave-transaction-retries=0
+

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt	2011-05-30 10:05:07 +0000
@@ -0,0 +1 @@
+ --log-warnings=0 --slave-transaction-retries=0

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_temp_query.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_temp_query.test	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_temp_query.test	2011-05-30 10:05:07 +0000
@@ -14,6 +14,8 @@ let $workers= 4;
 
 connection slave;
 
+flush status; # to nullify Slave_open_temp_tables
+
 # restart in Parallel
 source include/stop_slave.inc;
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
@@ -41,7 +43,8 @@ while ($n)
   {
     let $temp_rows= `select round(rand()*$temp_tables) + 1`;
     let $k= $temp_rows;
-    eval create temporary table tt_$i (a int auto_increment primary key);
+    # create makes a table in explicit db
+    eval create temporary table d$n1.tt_$i (a int auto_increment primary key);
     while($k)
     {
 	eval insert into tt_$i values (null);
@@ -74,6 +77,7 @@ while ($n)
   {
     let $temp_rows= `select round(rand()*$temp_tables) + 1`;
     let $k= $temp_rows;
+    # create makes a table in the default db
     eval create temporary table tt_$i (a int auto_increment primary key);
     while($k)
     {
@@ -93,6 +97,14 @@ while ($n)
 
 sync_slave_with_master;
 
+if (`select variable_value - $workers*$temp_tables as must_be_zero from information_schema.global_status where variable_name like 'Slave_open_temp_tables'`)
+{
+   --let $actual_temps= `select variable_value from information_schema.global_status where variable_name like 'Slave_open_temp_tables'`
+   --let $exected= `select  $workers*$temp_tables`
+   --echo *** Wrong value of Slave_open_temp_tables: got $actual_temps, expected $expected ***
+   --die
+}
+
 #
 # Consistency check
 #

=== modified file 'sql/event_parse_data.cc'
--- a/sql/event_parse_data.cc	2010-07-02 02:58:51 +0000
+++ b/sql/event_parse_data.cc	2011-05-30 10:05:07 +0000
@@ -566,6 +566,7 @@ void Event_parse_data::check_originator_
 {
   /* Disable replicated events on slave. */
   if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL) ||
+      (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) ||
       (thd->system_thread == SYSTEM_THREAD_SLAVE_IO))
   {
     DBUG_PRINT("info", ("Invoked object status set to SLAVESIDE_DISABLED."));

=== modified file 'sql/field.cc'
--- a/sql/field.cc	2011-02-27 17:35:25 +0000
+++ b/sql/field.cc	2011-05-30 10:05:07 +0000
@@ -3736,12 +3736,7 @@ longlong Field_long::val_int(void)
   ASSERT_COLUMN_MARKED_FOR_READ;
   int32 j;
   /* See the comment in Field_long::store(long long) */
-  /* 
-     In case the method is executed not by the table's owner
-     that one must be a Slave worker thread.
-  */
-  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
-
+  DBUG_ASSERT(table->in_use == current_thd);
 #ifdef WORDS_BIGENDIAN
   if (table->s->db_low_byte_first)
     j=sint4korr(ptr);
@@ -6313,8 +6308,8 @@ int Field_string::store(const char *from
   const char *cannot_convert_error_pos;
   const char *from_end_pos;
 
-  /* See the comment for Field_long::store(long long) and Field_long::val_int */
-  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+  /* See the comment for Field_long::store(long long) */
+  DBUG_ASSERT(table->in_use == current_thd);
 
   copy_length= well_formed_copy_nchars(field_charset,
                                        (char*) ptr, field_length,
@@ -6463,8 +6458,8 @@ String *Field_string::val_str(String *va
 			      String *val_ptr)
 {
   ASSERT_COLUMN_MARKED_FOR_READ;
-  /* See the comment for Field_long::store(long long) and Field_long::val_int */
-  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+  /* See the comment for Field_long::store(long long) */
+  DBUG_ASSERT(table->in_use == current_thd);
   uint length;
   if (table->in_use->variables.sql_mode &
       MODE_PAD_CHAR_TO_FULL_LENGTH)
@@ -6522,7 +6517,7 @@ Field_string::compatible_field_size(uint
 {
 #ifdef HAVE_REPLICATION
   const Check_field_param check_param = { this };
-  if (rpl_master_has_bug(rli_arg, 37426, TRUE,
+  if (!mts_is_worker(rli_arg->info_thd) && rpl_master_has_bug(rli_arg, 37426, TRUE,
                          check_field_for_37426, &check_param))
     return FALSE;                        // Not compatible field sizes
 #endif

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2011-02-27 17:35:25 +0000
+++ b/sql/handler.cc	2011-05-30 10:05:07 +0000
@@ -2127,13 +2127,7 @@ void **handler::ha_data(THD *thd) const
 
 THD *handler::ha_thd(void) const
 {
-  /* 
-     About current_thd->slave_thread alternative,
-     MTS coordinator open/closes a temp table while the rest of operation
-     is done by Workers.
-  */
-  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
-              current_thd->slave_thread);
+  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd);
   return (table && table->in_use) ? table->in_use : current_thd;
 }
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-05-24 14:29:35 +0000
+++ b/sql/log_event.cc	2011-05-30 10:05:07 +0000
@@ -2361,7 +2361,8 @@ bool Log_event::contains_partition_info(
     // todo: Query event is limitly supported
     // which ev->get_db() yields the session db not the actual db
       
-    (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group());
+    (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group()) ||
+    (get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
 }
 
 /**
@@ -2370,7 +2371,7 @@ bool Log_event::contains_partition_info(
    At computing the id few rules apply depending on partitioning properties
    that the event instance can feature.
 
-   Let's call the properties.
+   Let's call the properties through the following legends:
 
    B - beginning of a group of events (BEGIN query_log_event)
    g - mini-group representative event containing the partition info
@@ -2378,36 +2379,48 @@ bool Log_event::contains_partition_info(
    p - a mini-group internal event that *p*receeding its g-parent
       (int_, rand_, user_ var:s) 
    r - a mini-group internal "regular" event that follows its g-parent
-      (Write, Update, Delete -rows)
+      (Delete, Update, Write -rows)
    S - sequentially applied event (may not be a part of any group).
        Events of this type are determined via @c mts_sequential_exec()
        earlier and don't cause calling this method .
    T - terminator of the group (XID, COMMIT, ROLLBACK)
 
-   Only `g' case requires to compute the assigned Worker id.
-   In `T, r' cases it's @c last_assigned_worker that is one that was
-   assigned at the last `g' processing.
-   In `B' case it's NULL to indicate the Coordinator will skip doing anything
-   more with the event. Its scheduling gets deffered until the following 
-   `g' event names a Worker.
+   Only `g' case really computes the assigned Worker id which must
+   be memorized by the caller and is available through @c rli argument.
+   For instance DUW-rows events are mapped to a Worker previously chosen
+   at assigning of their Table-map parent g-event.
+   In `B' case the assigned Worker is NULL to indicate the Coordinator will
+   postpone scheduling until a following `g' event decides on a Worker.
    
-   @note `p' and g-Query-log-event is not supported yet.
-
-   @note The function can update APH, CGAP objects.
+   A group can consist of multiple events still without explict B
+   event.  This is a case of old master binlog or few corner-cases of
+   the current master version (todo: to fix).  Such group structure is
+   supposed to be {{p_i},g} that is it ends with the first not p-event.
+   Such g-event is marked with set_mts_event_ends_group().
 
+   @note The function can update APH, CGAP, GAQ objects.
+   
    @return a pointer to the Worker stuct or NULL.
 */
 
 Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli)
 {
-  Slave_worker *worker= NULL;
-  Slave_job_group g;
+  Slave_job_group g, *ptr_g;
   bool is_b_event;
+  int  num_dbs= 0;
+  Slave_worker *ret_worker= NULL;
 
-  /* checking properties and perform corresponding actions */
+  /* checking partioning properties and perform corresponding actions */
 
-  // Beginning of a group or a DDL
-  if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
+  // Beginning of a group designated explicitly with BEGIN
+  if ((is_b_event= starts_group()) ||
+      // or DDL:s or autocommit queries possibly associated with own p-events
+      (!rli->curr_group_seen_begin &&
+       // the following is a case of no-B group: { p_1,p_2,...,p_k, g}
+       (rli->gaq->empty() ||
+        ((Slave_job_group *)
+         dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
+        worker_id != (ulong) -1)))
   {
     ulong gaq_idx;
     rli->mts_total_groups++;
@@ -2427,8 +2440,6 @@ Slave_worker *Log_event::get_slave_worke
 
     // the last occupied GAQ's array index
     gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
-    // serves as a mark for Coord to delete events otherwise
-    rli->curr_group_is_parallel= TRUE;
     
     DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s);
     DBUG_ASSERT(((Slave_job_group *) 
@@ -2436,6 +2447,7 @@ Slave_worker *Log_event::get_slave_worke
                 group_relay_log_name == NULL);
     DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
     DBUG_ASSERT(rli->last_assigned_worker == NULL);
+
     if (is_b_event)
     {
       Log_event *ptr_curr_ev= this;
@@ -2447,7 +2459,8 @@ Slave_worker *Log_event::get_slave_worke
 
       // mark the current grup as started with B-event
       rli->curr_group_seen_begin= TRUE;
-      return NULL;
+
+      return ret_worker;
     } 
   }
 
@@ -2456,52 +2469,52 @@ Slave_worker *Log_event::get_slave_worke
   if (contains_partition_info())
   {
     int i= 0;
-    int num_dbs= mts_number_dbs();
+    num_dbs= mts_number_dbs();
     List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
     it++;
 
-    if (num_dbs != OVER_MAX_DBS_IN_EVENT_MTS)
-    {
-      do
-      {
-        char **ref_cur_db= it.ref();
-
-        if (!(rli->last_assigned_worker=
-              get_slave_worker(*ref_cur_db, rli,
-                               &mts_assigned_partitions[i],
-                               get_type_code() == QUERY_EVENT)))
-        {
-          for (uint k= 0; k < rli->curr_group_da.elements; k++)
-          { 
-            delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
-          }
-          return NULL;
-        }
-
-        DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
-        DBUG_ASSERT(rli->last_assigned_worker ==
-                    mts_assigned_partitions[i]->worker);
-        DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
-
-        i++;
-      } while (it++);
-    }
-    else
+    if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
     {
-      // Temporary tables of Coordinator are relocated by Worker
+      // provide a hint - Worker with id 0 - to the following assign
       if (!rli->last_assigned_worker)
         rli->last_assigned_worker=
           *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
+      (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
     }
-    worker= rli->last_assigned_worker;
 
-    get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-    if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+    do
     {
-      g.worker_id= worker->id;
-      set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+      char **ref_cur_db= it.ref();
+      
+      if (!(ret_worker=
+            get_slave_worker(*ref_cur_db, rli,
+                             &mts_assigned_partitions[i],
+                             get_type_code() == QUERY_EVENT)))
+      {
+        // destroy buffered events of the current group prior to exit
+        for (uint k= 0; k < rli->curr_group_da.elements; k++)
+        { 
+          delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+        }
+
+        return ret_worker;
+      }
+
+      DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
+      DBUG_ASSERT(ret_worker == mts_assigned_partitions[i]->worker);
+      DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
 
-      DBUG_ASSERT(g.group_relay_log_name == NULL);
+      i++;
+    } while (it++);
+
+    if ((ptr_g= ((Slave_job_group *)
+                 dynamic_array_ptr(&rli->gaq->Q,
+                                   rli->gaq->assigned_group_index)))->worker_id
+        == (ulong) -1)
+    {
+      ptr_g->worker_id= ret_worker->id;
+      
+      DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
     }
 
     DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
@@ -2511,14 +2524,18 @@ Slave_worker *Log_event::get_slave_worke
     // Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
     // point because the root is no longer needed along remained part of Coordinator's
     // execution flow.
+    
     free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
   }
-  else // a mini-group internal "regular" event
+  else 
+  {
+    // a mini-group internal "regular" event
     if (rli->last_assigned_worker)
     {
-      worker= rli->last_assigned_worker;
+      ret_worker= rli->last_assigned_worker;
       
-      DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 || worker->id == 0);
+      DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 ||
+                  ret_worker->id == 0);
     }
     else // int_, rand_, user_ var:s
     {
@@ -2527,27 +2544,51 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(get_type_code() == INTVAR_EVENT ||
                   get_type_code() == RAND_EVENT ||
                   get_type_code() == USER_VAR_EVENT ||
-                  get_type_code() ==  ROWS_QUERY_LOG_EVENT);
+                  get_type_code() == ROWS_QUERY_LOG_EVENT ||
+                  get_type_code() == BEGIN_LOAD_QUERY_EVENT);
 
       insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
       
-      DBUG_ASSERT(rli->curr_group_da.elements > 0);
+      if (!rli->curr_group_seen_begin)
+      {
+        /*
+          This is a case of B/T-less group like
+          `set @user_var, select f()' that are logged w/o B-event.
+          Notice, while the select-f() can be mended in the current
+          master version, the old server binlogs can't since it bring in
+          the same B/T-less {p, g} group.
+        */
+        DBUG_ASSERT(rli->curr_group_da.elements > 0);
+      }
+      else
+      {
+        DBUG_ASSERT(rli->curr_group_da.elements > 1);
+      }
+
+      DBUG_ASSERT(!ret_worker);
+
+      return ret_worker;
     }
+  }
 
   // the group terminal event (Commit, Xid or a DDL query)
   if (ends_group() || !rli->curr_group_seen_begin)
   {
-    uint i;
+    // index of GAQ that this terminal event belongs to
     mts_group_cnt= rli->gaq->assigned_group_index;
-    Slave_job_group *ptr_g=
-      (Slave_job_group *)
-      dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
-    DBUG_ASSERT(rli->curr_group_is_parallel);
+    if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+      set_mts_event_ends_group();
+
+    ptr_g= (Slave_job_group *)
+      dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
-    // TODO: throw an error when relay-log reading starts from inside of a group!!
+    DBUG_ASSERT(ret_worker != NULL);
+    
+    // TODO: UNTIL option, throw an error when relay-log reading
+    // starts from inside of a group!!
 
-    if (!worker->relay_log_change_notified)
+    if (!ret_worker->relay_log_change_notified)
     {
       /*
         Prior this event, C rotated the relay log to drop each
@@ -2565,10 +2606,10 @@ Slave_worker *Log_event::get_slave_worke
 
       DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
 
-      worker->relay_log_change_notified= TRUE;
+      ret_worker->relay_log_change_notified= TRUE;
     }
 
-    if (!worker->checkpoint_notified)
+    if (!ret_worker->checkpoint_notified)
     {
       // Worker to dealloc
       // master binlog checkpoint
@@ -2585,24 +2626,13 @@ Slave_worker *Log_event::get_slave_worke
       strcpy(ptr_g->checkpoint_relay_log_name,
              rli->get_group_relay_log_name());
       ptr_g->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
-      worker->checkpoint_notified= TRUE;
+      ret_worker->checkpoint_notified= TRUE;
     }
     ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
     rli->checkpoint_seqno++;
-
-    DBUG_ASSERT(worker != NULL && worker == rli->last_assigned_worker);
-    
-    // CGAP cleanup
-    for (i= rli->curr_group_assigned_parts.elements; i > 0; i--)
-      delete_dynamic_element(&rli->
-                             curr_group_assigned_parts, i - 1);
-    rli->last_assigned_worker= NULL;
-
-    // reset the B-group marker
-    rli->curr_group_seen_begin= FALSE;
   }
   
-  return worker;
+  return ret_worker;
 }
 
 // returns the next available! (TODO: incompatible to circurla_buff method!!!)
@@ -2779,16 +2809,22 @@ void append_item_to_jobs(slave_job_item 
 }
 
 /**
-   scheduling event execution either serially or in parallel
+   Scheduling event to execute in parallel or execute it directly.
+   In MTS case the event gets associated with either Coordinator or a
+   Worker.  A special case of the association is NULL when the Worker
+   can't be decided yet.  In the single threaded sequential mode the
+   event maps to SQL thread rli.
+
+   @return 0 as success, otherwise a failure.
 */
 int Log_event::apply_event(Relay_log_info const *rli)
 {
-  uint i;
   DBUG_ENTER("LOG_EVENT:apply_event");
   Slave_worker *w= NULL;
-  Slave_job_item item= {NULL}, *job_item= &item;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
-  bool parallel, seq_event, term_event;
+  bool parallel= FALSE, async_event= FALSE, seq_event= FALSE;
+
+  worker= c_rli;
 
   if (rli->is_mts_recovery())
   {
@@ -2810,53 +2846,63 @@ int Log_event::apply_event(Relay_log_inf
   }
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      ((seq_event= mts_sequential_exec()) &&
-       (!rli->curr_group_seen_begin ||
-        mts_async_exec_by_coordinator(::server_id))))
+      (async_event= mts_async_exec_by_coordinator(::server_id)) ||
+      (seq_event= mts_sequential_exec()))
   {
     if (parallel)
     {
       /* 
          There are two classes of events that Coordinator executes
-         itself. One requires all Workers to finish up their assignments.
-         The other does not need (actually can not have) this synchronization.
+         itself. One e.g the master Rotate requires all Workers to finish up 
+         their assignments. The other async class, e.g the slave Rotate,
+         can't have this such synchronization because Worker might be waiting
+         for terminal events to finish.
       */
 
-      if (!mts_async_exec_by_coordinator(::server_id))
-      {
+      if (!async_event)
+      {     
         /*
           this  event does not split the current group but is indeed
           a separator beetwen two master's binlog therefore requiring
           Workers to sync.
         */
 
-        DBUG_ASSERT(!rli->curr_group_seen_begin);
+        if (rli->curr_group_da.elements > 0)
+        {
+          /* 
+             Possible reason is a old version binlog sequential event
+             wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var.
+             MTS has to stop to suggest restart in the permanent sequential mode.
+          */
 
+          // TODO: improve err msg
+          rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+                      ER(ER_SLAVE_FATAL_ERROR),
+                      "Can't execute all binlog event in parallel mode");
+
+          // destroy possible buffered events of the current group prior to exit
+          for (uint k= 0; k < rli->curr_group_da.elements; k++)
+          { 
+            delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+          }
+
+          DBUG_RETURN(-1);
+        }
         /*
-          Marking sure the event won't be executed in parallel.
-          That affects memory deallocation in the following execution path.
+          Marking sure the event will be executed in sequential mode.
         */
-        c_rli->curr_group_is_parallel= FALSE;
         (void) wait_for_workers_to_finish(rli);
-        
-        /* any Worker is idle as done through wait_for_workers_to_finish */
-        DBUG_ASSERT((*(Slave_worker **)
-                     dynamic_array_ptr(&c_rli->workers,
-                                       rand() % c_rli->workers.elements))->
-                    usage_partition == 0);
-      }
-      else
-      {
-        if (rli->curr_group_is_parallel)
+
+#ifndef DBUG_OFF
+        /* all Workers are idle as done through wait_for_workers_to_finish */
+        for (uint k= 0; k < c_rli->curr_group_da.elements; k++)
         {
-          /* 
-             the event is artifical to splits the current group into separate
-             relay-logs. Differently to the previous events of the group this one
-             is applied by Coordinator and w/o any synchronization with Workers.
-          */
-          c_rli->curr_group_split= TRUE;
-          c_rli->curr_group_is_parallel= FALSE;
+          DBUG_ASSERT(!(*(Slave_worker **)
+                        dynamic_array_ptr(&c_rli->workers, k))->usage_partition);
+          DBUG_ASSERT(!(*(Slave_worker **)
+                        dynamic_array_ptr(&c_rli->workers, k))->jobs.len);
         }
+#endif
       }
     }
     DBUG_RETURN(do_apply_event(rli));
@@ -2865,58 +2911,12 @@ int Log_event::apply_event(Relay_log_inf
   DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
               rli->last_assigned_worker);
 
-  if (seq_event)
-  {   // rli->last_assigned_worker != NULL if BTQ but not BQT
-    DBUG_ASSERT(rli->curr_group_seen_begin || ends_group());
-    if (!c_rli->curr_group_isolated)
-      (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
-    c_rli->curr_group_isolated= TRUE;
-   }
-  // getting Worker's id
-  if ((!(w= get_slave_worker_id(c_rli)) ||
-       DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
+  worker= NULL;
+  c_rli->last_assigned_worker= w= get_slave_worker_id(c_rli);
+  if (!w || DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
     DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
 
-  job_item->data= this;
-
-  DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
-
-  if (rli->curr_group_da.elements > 0)
-  {
-    /*
-      the current event sorted out which partion the current group belongs to.
-      It's time now to processed deferred array events.
-    */
-    for (i= 0; i < rli->curr_group_da.elements; i++)
-    { 
-      Slave_job_item da_item;
-      get_dynamic(&c_rli->curr_group_da, (uchar*) &da_item.data, i);
-      append_item_to_jobs(&da_item, w, c_rli);
-    }
-    if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
-    {
-      // reallocate to less mem
-      
-      DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
-      
-      c_rli->curr_group_da.elements= rli->curr_group_da.max_element;
-      c_rli->curr_group_da.max_element= 0;
-      freeze_size(&c_rli->curr_group_da); // restores max_element
-    }
-    c_rli->curr_group_da.elements= 0;
-  }
-
-  if (c_rli->curr_group_isolated)
-    term_event= ends_group();
-
-  append_item_to_jobs(job_item, w, c_rli);
-
-  if (c_rli->curr_group_isolated && term_event)
-  {
-    // to make sure the isolated group terminates in isolation as well
-    (void) wait_for_workers_to_finish(rli, w);
-    c_rli->curr_group_isolated= FALSE;
-  }
+  worker= (Relay_log_info*) w;
 
   DBUG_RETURN(FALSE);
 }
@@ -2932,14 +2932,14 @@ int Log_event::apply_event(Relay_log_inf
 struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item)
 {
   THD *thd= w->info_thd;
+
   mysql_mutex_lock(&w->jobs_lock);
+
   while (!job_item->data && !thd->killed)
   {
     const char *old_msg;
 
-    //job_item= w->jobs.pop(); // LABS-TODO de_queue()
     head_queue(&w->jobs, job_item);
-
     if (job_item->data == NULL)
     {
       w->wait_jobs++;
@@ -2952,6 +2952,7 @@ struct slave_job_item* pop_jobs_item(Sla
   }
   if (job_item->data)
     w->curr_jobs--;
+
   mysql_mutex_unlock(&w->jobs_lock);
 
   thd_proc_info(w->info_thd, "Executing event");
@@ -2960,7 +2961,7 @@ struct slave_job_item* pop_jobs_item(Sla
 
 
 /**
-  mts-II worker main routine.
+  MTS worker main routine.
   The worker thread waits for an event, execute it, fixes statistics counters.
 
   @note the function maintains CGEP and modifies APH, and causes
@@ -2975,6 +2976,7 @@ int slave_worker_exec_job(Slave_worker *
   struct slave_job_item item= {NULL}, *job_item= &item;
   THD *thd= w->info_thd;
   Log_event *ev= NULL;
+  bool part_event= FALSE;
 
   DBUG_ENTER("slave_worker_exec_job");
 
@@ -3001,42 +3003,46 @@ int slave_worker_exec_job(Slave_worker *
   } 
   else
   {
-    if (ev->contains_partition_info() &&
-        ev->mts_number_dbs() < OVER_MAX_DBS_IN_EVENT_MTS)
+    if ((part_event= ev->contains_partition_info()))
     {
-      List_iterator<char> it(*ev->mts_get_dbs(thd->mem_root));
-      DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
-      
-      while (it++)
+      uint num_dbs=  ev->mts_number_dbs();
+      DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
+
+      if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+        num_dbs= 1;
+
+      DBUG_ASSERT(num_dbs > 0);
+
+      for (uint k= 0; k < num_dbs; k++)
       {
         bool found= FALSE;
-        char key[NAME_LEN + 2];
-        const char *dbname= *it.ref();
-        uchar dblength= (uint) strlen(dbname);
 
         for (uint i= 0; i < ep->elements && !found; i++)
         {
-          get_dynamic(ep, (uchar*) key, i);
           found=
-            (key[0] == dblength) &&
-            (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+            (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
+            ev->mts_assigned_partitions[k];
         }
         if (!found)
         {
-          key[0]= dblength;
-          memcpy(key + 1, dbname, dblength + 1);
-          insert_dynamic(ep, (uchar*) key);
+          insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
         }
       }
     }
   }
-
+  w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
   error= ev->do_apply_event_worker(w);
-  
-  if (ev->ends_group() || !w->curr_group_seen_begin)
-  {
-    DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed  %lu", ev->mts_group_cnt, w->last_group_done_index));
-
+  if (ev->ends_group() || (!w->curr_group_seen_begin && 
+                           /* 
+                              p-events of B/T-less {p,g} group (see
+                              legends of Log_event::get_slave_worker)
+                              obviously can't commit.
+                           */
+                           part_event))
+  {
+    DBUG_PRINT("slave_worker_exec_job:",
+               (" commits GAQ index %lu, last committed  %lu",
+                ev->mts_group_cnt, w->last_group_done_index));
     w->slave_worker_ends_group(ev, error); /* last done sets post exec */
   }
 
@@ -3101,8 +3107,15 @@ err:
     w->slave_worker_ends_group(ev, error);
   
   // rows_query_log_event is deleted as a part of the statement cleanup
+
+  // todo: sync_slave_with_master fails when my_sleep(1000) is put here
+
   if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+  {
+    w->last_event= ev;
     delete ev;
+  }
+  
 
   DBUG_RETURN(error);
 }
@@ -3724,7 +3737,7 @@ Query_log_event::Query_log_event(const c
    auto_increment_increment(1), auto_increment_offset(1),
    time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
    table_map_for_update(0), master_data_written(0),
-   mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS)
+   mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS), m_mts_query_ends_group(FALSE)
 {
   ulong data_len;
   uint32 tmp;
@@ -4197,21 +4210,17 @@ void Query_log_event::attach_temp_tables
   if (!mts_is_worker(thd) || !contains_partition_info())
     return;
   
+  // in over max-db:s case just one special parttion is locked
+  int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ?
+              1 : mts_accessed_dbs);
+
   DBUG_ASSERT(!thd->temporary_tables);
 
-  if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+  for (int i= 0; i < parts; i++)
   {
-    THD *c_thd= mts_get_coordinator_thd();
-    mts_move_temp_tables_to_thd(thd, c_thd->temporary_tables);
-    c_thd->temporary_tables= NULL;
-  }
-  else
-  {
-    for (int i= 0; i < mts_accessed_dbs; i++)
-    {
-      mts_move_temp_tables_to_thd(thd,
-                                  mts_assigned_partitions[i]->temporary_tables);
-    }
+    mts_move_temp_tables_to_thd(thd,
+                                mts_assigned_partitions[i]->temporary_tables);
+    mts_assigned_partitions[i]->temporary_tables= NULL;
   }
 }
 
@@ -4227,15 +4236,8 @@ void Query_log_event::detach_temp_tables
   if (!mts_is_worker(thd))
     return;
 
-  if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
-  {
-    THD *c_thd= mts_get_coordinator_thd();
-    /* back to coordinator */
-    mts_move_temp_tables_to_thd(c_thd, thd->temporary_tables);
-    thd->temporary_tables=  NULL;
-    return;
-  }
-
+  int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ?
+              1 : mts_accessed_dbs);
   /*
     todo: optimize for a case of 
 
@@ -4248,9 +4250,9 @@ void Query_log_event::detach_temp_tables
        unmodified lists provided that the attach_ method does not
        destroy references to them.
   */
-  for (int i= 0; i < mts_accessed_dbs; i++)
+  for (int i= 0; i < parts; i++)
   {
-    mts_assigned_partitions[i]->temporary_tables= 0;
+    mts_assigned_partitions[i]->temporary_tables= NULL;
   }
 
   for (TABLE *table= thd->temporary_tables; table;)
@@ -4258,7 +4260,7 @@ void Query_log_event::detach_temp_tables
     int i;
 
     // find which entry to go
-    for (i= 0; i <  mts_accessed_dbs; i++)
+    for (i= 0; i < parts; i++)
       if (strcmp(table->s->db.str, mts_accessed_db_names[i]) < 0)
         continue;
       else
@@ -4269,7 +4271,15 @@ void Query_log_event::detach_temp_tables
     // table pointer is shifted inside the function
     table= mts_move_temp_table_to_entry(table, thd, mts_assigned_partitions[i]);
   }
+
   DBUG_ASSERT(!thd->temporary_tables);
+#ifndef DBUG_OFF
+  for (int i= 0; i < parts; i++)
+  {
+    DBUG_ASSERT(!mts_assigned_partitions[i]->temporary_tables ||
+                !mts_assigned_partitions[i]->temporary_tables->prev);
+  }
+#endif
 }
 
 int Query_log_event::do_apply_event(Relay_log_info const *rli)
@@ -6920,7 +6930,7 @@ int Xid_log_event::do_apply_event(Relay_
     {
       rli_ptr->set_group_master_log_pos(log_pos);
     }
-
+  
     if ((error= rli_ptr->flush_info(TRUE)))
       goto err;
   }
@@ -8923,7 +8933,7 @@ int Rows_log_event::do_apply_event(Relay
       Rows_log_event, we can invalidate the query cache for the
       associated table.
      */
-    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
     {
       const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
     }
@@ -8932,9 +8942,10 @@ int Rows_log_event::do_apply_event(Relay
 #endif
   }
 
-  TABLE* table= 
+  TABLE* 
+    table= 
     m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
-  
+
   DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
 
   if (table)
@@ -9717,7 +9728,6 @@ int Table_map_log_event::do_apply_event(
     table_list->next_global= table_list->next_local= rli->tables_to_lock;
     const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
     const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
-
     /* 'memory' is freed in clear_tables_to_lock */
   }
 

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-05-24 14:29:35 +0000
+++ b/sql/log_event.h	2011-05-30 10:05:07 +0000
@@ -1038,6 +1038,10 @@ public:
     the event execution. The indexed data represent the Worker progress status.
   */
   ulong mts_group_cnt;
+  /**
+     MTS: associating the event with either an assigned Worker or Coordinator.
+  */
+  Relay_log_info *worker;
 
   /* a copy of the main rli value stored into event to pass to MTS worker rli */
   ulonglong future_event_relay_log_pos;
@@ -1113,6 +1117,9 @@ public:
   */
   virtual uint8 mts_number_dbs() { return 1; }
 
+  virtual void set_mts_event_ends_group() { DBUG_ASSERT(0); }
+  virtual bool get_mts_event_ends_group() { DBUG_ASSERT(0); }
+
 #else
   Log_event() : temp_buf(0) {}
     /* avoid having to link mysqlbinlog against libpthread */
@@ -1235,7 +1242,11 @@ public:
 public:
 
   /**
-     MST: to execute serially due to technical or conceptual limitation
+     MST: to execute some event types serially.
+
+     @note There are incompatile combinations such the referred event
+           is wrapped with BEGIN/COMMIT. Such cases should be identified
+           by the caller and treates as an error.
      
      @return TRUE if despite permanent parallel execution mode an event
                   needs applying in a real isolation that is sequentially.
@@ -1243,15 +1254,6 @@ public:
   bool mts_sequential_exec()
   {
     return
-      /* 
-         the 4 types below are limitly parallel-supported (the default 
-         session db not the actual db).
-         Decision on BEGIN, COMMIT, Xid is the parallel.
-      */
-      (get_type_code() == QUERY_EVENT &&
-       !starts_group() && !ends_group() &&
-       (mts_number_dbs() ==  OVER_MAX_DBS_IN_EVENT_MTS)) ||
-
       get_type_code() == START_EVENT_V3          ||
       get_type_code() == STOP_EVENT              ||
       get_type_code() == ROTATE_EVENT            ||
@@ -1262,12 +1264,8 @@ public:
       get_type_code() == EXEC_LOAD_EVENT         ||
       get_type_code() == DELETE_FILE_EVENT       ||
       get_type_code() == NEW_LOAD_EVENT          ||
+
       get_type_code() == FORMAT_DESCRIPTION_EVENT||
-      get_type_code() == BEGIN_LOAD_QUERY_EVENT  ||
-      get_type_code() == EXECUTE_LOAD_QUERY_EVENT|| /* todo: make parallel */
-      get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
-      get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
-      get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
 
       get_type_code() == INCIDENT_EVENT;
   }
@@ -1895,6 +1893,13 @@ public:
   uchar mts_accessed_dbs;
   char mts_accessed_db_names[MAX_DBS_IN_EVENT_MTS][NAME_LEN];
 
+  /*
+    Event can be indentified as a group terminator and such fact
+    is memoried by the function.
+  */
+  virtual void set_mts_event_ends_group() { m_mts_query_ends_group= TRUE; }
+  virtual bool get_mts_event_ends_group() { return m_mts_query_ends_group; }
+
 #ifdef MYSQL_SERVER
 
   Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
@@ -1909,10 +1914,16 @@ public:
   {
     List<char> *res= new (mem_root) List<char>;
     if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
-      res->push_back((char*) get_db());
+    {
+      // "" == empty string db name is special to indicate sequential applying
+      mts_accessed_db_names[0][0]= 0;
+      res->push_back((char*) mts_accessed_db_names[0]);
+    }
     else
+    {
       for (uchar i= 0; i < mts_accessed_dbs; i++)
         res->push_back(mts_accessed_db_names[i]);
+    }
     return res;
   }
 
@@ -1991,13 +2002,16 @@ public:        /* !!! Public in this pat
      its self-group.
   */
   bool starts_group() { return !strncmp(query, "BEGIN", q_len); }
-  bool ends_group()
+  virtual bool ends_group()
   {  
     return
       !strncmp(query, "COMMIT", q_len) ||
       (!strncasecmp(query, STRING_WITH_LEN("ROLLBACK"))
        && strncasecmp(query, STRING_WITH_LEN("ROLLBACK TO ")));
   }
+private:
+  
+  bool m_mts_query_ends_group;
 };
 
 
@@ -2665,7 +2679,7 @@ class Xid_log_event: public Log_event
   bool write(IO_CACHE* file);
 #endif
   bool is_valid() const { return 1; }
-  bool ends_group() { return TRUE; }
+  virtual bool ends_group() { return TRUE; }
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);
@@ -3070,7 +3084,8 @@ public:
   bool write(IO_CACHE* file);
   const char* get_db() { return db; }
 #endif
-
+  /* MTS executes this event sequentially */
+  virtual uchar mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2011-02-27 17:35:25 +0000
+++ b/sql/mysqld.cc	2011-05-30 10:05:07 +0000
@@ -459,7 +459,8 @@ ulong thread_created;
 ulong back_log, connect_timeout, concurrency, server_id;
 ulong table_cache_size, table_def_size;
 ulong what_to_log;
-ulong slow_launch_time, slave_open_temp_tables;
+ulong slow_launch_time;
+int32 slave_open_temp_tables;
 ulong open_files_limit, max_binlog_size, max_relay_log_size;
 ulong slave_trans_retries;
 uint  slave_net_timeout;
@@ -482,6 +483,7 @@ ulong refresh_version;  /* Increments on
 query_id_t global_query_id;
 my_atomic_rwlock_t global_query_id_lock;
 my_atomic_rwlock_t thread_running_lock;
+my_atomic_rwlock_t slave_open_temp_tables_lock;
 ulong aborted_threads, aborted_connects;
 ulong delayed_insert_timeout, delayed_insert_limit, delayed_queue_size;
 ulong delayed_insert_threads, delayed_insert_writes, delayed_rows_in_use;
@@ -6734,7 +6736,7 @@ SHOW_VAR status_vars[]= {
   {"Select_range",             (char*) offsetof(STATUS_VAR, select_range_count), SHOW_LONG_STATUS},
   {"Select_range_check",       (char*) offsetof(STATUS_VAR, select_range_check_count), SHOW_LONG_STATUS},
   {"Select_scan",	       (char*) offsetof(STATUS_VAR, select_scan_count), SHOW_LONG_STATUS},
-  {"Slave_open_temp_tables",   (char*) &slave_open_temp_tables, SHOW_LONG},
+  {"Slave_open_temp_tables",   (char*) &slave_open_temp_tables, SHOW_INT},
 #ifdef HAVE_REPLICATION
   {"Slave_retried_transactions",(char*) &show_slave_retried_trans, SHOW_FUNC},
   {"Slave_heartbeat_period",   (char*) &show_heartbeat_period, SHOW_FUNC},

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2011-02-27 17:35:25 +0000
+++ b/sql/mysqld.h	2011-05-30 10:05:07 +0000
@@ -168,7 +168,7 @@ extern ulong delayed_insert_timeout;
 extern ulong delayed_insert_limit, delayed_queue_size;
 extern ulong delayed_insert_threads, delayed_insert_writes;
 extern ulong delayed_rows_in_use,delayed_insert_errors;
-extern ulong slave_open_temp_tables;
+extern int32 slave_open_temp_tables;
 extern ulong query_cache_size, query_cache_min_res_unit;
 extern ulong slow_launch_threads, slow_launch_time;
 extern ulong table_cache_size, table_def_size;
@@ -360,6 +360,7 @@ extern mysql_cond_t COND_thread_count;
 extern mysql_cond_t COND_manager;
 extern int32 thread_running;
 extern my_atomic_rwlock_t thread_running_lock;
+extern my_atomic_rwlock_t slave_open_temp_tables_lock;
 
 extern char *opt_ssl_ca, *opt_ssl_capath, *opt_ssl_cert, *opt_ssl_cipher,
             *opt_ssl_key;

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli.cc	2011-05-30 10:05:07 +0000
@@ -135,6 +135,7 @@ void Relay_log_info::init_workers(ulong 
   mysql_mutex_init(key_mutex_mts_temp_tables_lock, &mts_temp_tables_lock,
                    MY_MUTEX_INIT_FAST);
   my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
+  my_atomic_rwlock_init(&slave_open_temp_tables_lock);
 }
 
 /**
@@ -147,6 +148,7 @@ void Relay_log_info::deinit_workers()
   mysql_mutex_destroy(&mts_temp_tables_lock);
 
   delete_dynamic(&workers);
+  my_atomic_rwlock_destroy(&slave_open_temp_tables_lock);
 }
 
 Relay_log_info::~Relay_log_info()
@@ -1015,7 +1017,7 @@ void Relay_log_info::stmt_done(my_off_t 
     while the MyISAM table has already been updated.
   */
   if ((info_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
-    inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
+    inc_event_relay_log_pos();
   else
   {
     inc_group_relay_log_pos(event_master_log_pos);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli.h	2011-05-30 10:05:07 +0000
@@ -392,10 +392,8 @@ public:
 
   bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
   {
-    TABLE_LIST *tables= tables_to_lock;
-
     DBUG_ASSERT(tabledef_var && conv_table_var);
-    for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
       if (ptr->table == table_arg)
       {
         *tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
@@ -447,7 +445,6 @@ public:
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
-  bool curr_group_isolated;     // Trans is exec:d by Worker but in exclusive env
   volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
   volatile long mts_wqs_overrun;   // W to incr and decr
   ulong mts_wqs_underrun_cnt;  // Coord goes to sleep when senses Workers are content
@@ -456,9 +453,6 @@ public:
   ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
- 
-  bool curr_group_is_parallel; // an event to process by Coordinator
-  bool curr_group_split;       // an event split the current group forcing C to exec it
   ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
   ulong slave_parallel_workers;     // the one slave session time number of workers
   ulong recovery_parallel_workers; // number of workers while recovering.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-05-30 10:05:07 +0000
@@ -29,9 +29,9 @@ const char *info_slave_worker_fields []=
 
 Slave_worker::Slave_worker(const char* type, const char* pfs,
                            Relay_log_info *rli)
-  : Relay_log_info(FALSE), c_rli(rli), curr_group_exec_parts(0),
-  checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
-  inited_group_execed(0)
+  : Relay_log_info(FALSE), c_rli(rli),
+    checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
+    inited_group_execed(0), running_status(FALSE), last_event(NULL)
 {
   checkpoint_relay_log_name[0]= 0;
   checkpoint_master_log_name[0]= 0;
@@ -39,8 +39,7 @@ Slave_worker::Slave_worker(const char* t
 
 Slave_worker::~Slave_worker() 
 {
-  if (curr_group_exec_parts)
-    delete curr_group_exec_parts;
+  delete_dynamic(&curr_group_exec_parts);
 
   if (inited_group_execed)
     bitmap_free(&group_execed);
@@ -55,15 +54,15 @@ int Slave_worker::init_info()
   if (inited)
     DBUG_RETURN(0);
 
-  if (!(curr_group_exec_parts= new Database_ids(NAME_LEN)))
-    goto err;
+  my_init_dynamic_array(&curr_group_exec_parts, sizeof(db_worker_hash_entry*),
+                        SLAVE_INIT_DBS_IN_GROUP, 1);
 
   if (bitmap_init(&group_execed, NULL,
                   c_rli->checkpoint_group, FALSE))
     goto err;
   
   inited_group_execed= 1;
-  
+
   /*
     The init_info() is used to either create or read information
     from the repository, in order to initialize the Slave_worker.
@@ -273,7 +272,7 @@ static void free_entry(db_worker_hash_en
   DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
 
   DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
-  DBUG_ASSERT(entry->usage == 0);
+  DBUG_ASSERT(entry->usage == 0 || !entry->worker->running_status);
 
   mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
   entry->temporary_tables= NULL;
@@ -313,7 +312,7 @@ void destroy_hash_workers(Relay_log_info
 }
 
 /**
-   Relocating temporary table reference into @c entry location.
+   Relocating temporary table reference into @c entry's table list head.
    Sources can be the coordinator's and the Worker's thd->temporary_tables.
 
    @param table   TABLE instance pointer
@@ -454,6 +453,8 @@ static void move_temp_tables_to_entry(TH
         scheduled the event, and goes back into the parallel mode
 
    @param  dbname    pointer to c-string containing database name
+                     It can be empty string to indicate specific locking
+                     to faciliate sequential applying.
    @param  rli       pointer to Coordinators relay-log-info instance
    @param  ptr_entry reference to a pointer to the resulted entry in
                      the Assigne Partition Hash where
@@ -461,6 +462,7 @@ static void move_temp_tables_to_entry(TH
 
    @note modifies  CGAP, APH and unlinks @c dbname -keyd temp tables 
          from C's thd->temporary_tables to move them into the entry record.
+         Caller can opt for a Worker via setting rli->last_assigned_worker.
 
    @return the pointer to a Worker struct
 */
@@ -481,7 +483,6 @@ Slave_worker *get_slave_worker(const cha
   my_hash_value_type hash_value;
   uchar dblength= (uint) strlen(dbname);
 
-  DBUG_ASSERT(dblength != 0);
 
   // Search in CGAP
   for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
@@ -545,11 +546,6 @@ Slave_worker *get_slave_worker(const cha
     entry->worker= !rli->last_assigned_worker ?
       get_least_occupied_worker(workers) : rli->last_assigned_worker;
     entry->worker->usage_partition++;
-    /* 
-       relocation belonging to db temp tables from C to W via entry
-    */
-    if (need_temp_tables)
-      move_temp_tables_to_entry(thd, entry);
 
     mysql_mutex_lock(&slave_worker_hash_lock);
 
@@ -573,7 +569,7 @@ Slave_worker *get_slave_worker(const cha
     }
 
     ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
-    mysql_mutex_unlock(&slave_worker_hash_lock);
+
     if (ret)
     {
       my_free(db);
@@ -592,9 +588,6 @@ Slave_worker *get_slave_worker(const cha
         get_least_occupied_worker(workers) : rli->last_assigned_worker;
       entry->worker->usage_partition++;
       entry->usage++;
-
-      my_hash_update(&mapping_db_to_worker, (uchar*) entry,
-                     (uchar*) dbname, dblength);
     }
     else if (entry->worker == rli->last_assigned_worker ||
              !rli->last_assigned_worker)
@@ -603,8 +596,6 @@ Slave_worker *get_slave_worker(const cha
       DBUG_ASSERT(entry->worker);
 
       entry->usage++;
-      my_hash_update(&mapping_db_to_worker, (uchar*) entry,
-                     (uchar*) dbname, dblength);
     }
     else
     {
@@ -637,26 +628,39 @@ Slave_worker *get_slave_worker(const cha
       entry->usage= 1;
       entry->worker->usage_partition++;
     }
+  }
 
-    if (entry->usage == 1 && need_temp_tables)
+  /* 
+     relocation belonging to db temp tables from C to W via entry
+  */
+  if (entry->usage == 1 && need_temp_tables)
+  {
+    if (!entry->temporary_tables)
     {
-      if (!entry->temporary_tables)
+      if (entry->db_len != 0)
       {
         move_temp_tables_to_entry(thd, entry);
       }
-#ifndef DBUG_OFF      
       else
       {
-        for (TABLE *table= thd->temporary_tables; table; table= table->next)
-        {
-          DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
-        }
+        entry->temporary_tables= thd->temporary_tables;
+        thd->temporary_tables= NULL;
       }
-#endif
     }
+#ifndef DBUG_OFF      
+    else
+    {
+      // all entries must have been emptied from temps by the caller
+      DBUG_ASSERT(entry->db_len != 0);
 
-    mysql_mutex_unlock(&slave_worker_hash_lock);
+      for (TABLE *table= thd->temporary_tables; table; table= table->next)
+      {
+        DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
+      }
+    }
+#endif
   }
+  mysql_mutex_unlock(&slave_worker_hash_lock);
 
   DBUG_ASSERT(entry);
 
@@ -751,28 +755,17 @@ void Slave_worker::slave_worker_ends_gro
   /*
     Cleanup relating to the last executed group regardless of error.
   */
+  DYNAMIC_ARRAY *ep= &curr_group_exec_parts;
 
-  for (int i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
+  for (int i= ep->elements; i > 0; i--)
   {
-    db_worker_hash_entry *entry= NULL;
-    my_hash_value_type hash_value;
-    char key[NAME_LEN + 2];
-
-    get_dynamic(&(curr_group_exec_parts->dynamic_ids), (uchar*) key, i - 1);
-    hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) key + 1, key[0]);
-
+    db_worker_hash_entry *entry=
+      (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i - 1));
     mysql_mutex_lock(&slave_worker_hash_lock);
 
-    entry= (db_worker_hash_entry *)
-      my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
-                                      (uchar*) key + 1, key[0]);
-
-    DBUG_ASSERT(entry && entry->usage != 0); // was used to break
-    DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
+    DBUG_ASSERT(entry && entry->usage != 0);
 
     entry->usage--;
-    my_hash_update(&mapping_db_to_worker, (uchar*) entry,
-                   (uchar*) key + 1, key[0]);
 
     if (entry->usage == 0)
     {
@@ -783,17 +776,24 @@ void Slave_worker::slave_worker_ends_gro
       */
 
       DBUG_ASSERT(this->info_thd->temporary_tables == 0);
+      DBUG_ASSERT(!entry->temporary_tables ||
+                  !entry->temporary_tables->prev);
 
       usage_partition--;
       if (entry->worker != this) // Coordinator is waiting
+      {
+#ifndef DBUG_OFF
+        // TODO: open it! DBUG_ASSERT(usage_partition || !entry->worker->jobs.len);
+#endif
         mysql_cond_signal(&slave_worker_hash_cond);
+      }
     }
     else
       DBUG_ASSERT(usage_partition != 0);
 
     mysql_mutex_unlock(&slave_worker_hash_lock);
 
-    delete_dynamic_element(&(curr_group_exec_parts->dynamic_ids), i - 1);
+    delete_dynamic_element(ep, i - 1);
   }
   curr_group_seen_begin= FALSE;
 }
@@ -1108,11 +1108,11 @@ int wait_for_workers_to_finish(Relay_log
     }
     else
     {
-      // resources relocation
-      mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
-      entry->temporary_tables= NULL;
       mysql_mutex_unlock(&slave_worker_hash_lock);
     }
+    // resources relocation
+    mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
+    entry->temporary_tables= NULL;
   }
   return ret;
 }

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli_pdb.h	2011-05-30 10:05:07 +0000
@@ -166,6 +166,8 @@ typedef struct st_slave_job_group
 class Slave_committed_queue : public circular_buffer_queue
 {
 public:
+  
+  bool inited;
 
   /* master's Rot-ev exec */
   void update_current_binlog(const char *post_rotate);
@@ -183,10 +185,15 @@ public:
 
   Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
                          uint inc= 0)
-    : circular_buffer_queue(el_size, max, inc)
+    : circular_buffer_queue(el_size, max, inc), inited(FALSE)
   {
     uint k;
     ulonglong l= 0;
+    
+    if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
+      return;
+    else
+      inited= TRUE;
     my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
     for (k= 0; k < n; k++)
       insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
@@ -196,8 +203,11 @@ public:
 
   ~Slave_committed_queue ()
   { 
-    delete_dynamic(&last_done);
-    my_free(lwm.group_relay_log_name);
+    if (inited)
+    {
+      delete_dynamic(&last_done);
+      my_free(lwm.group_relay_log_name);
+    }
   }
 
   /* Checkpoint routine refreshes the queue */
@@ -226,7 +236,8 @@ public:
 
   Relay_log_info *c_rli;
 
-  Dynamic_ids *curr_group_exec_parts; // CGEP
+  DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
+
   bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
   // @c last_group_done_index is for recovery, although can be viewed
   //    as statistics as well.
@@ -266,6 +277,10 @@ public:
   char checkpoint_master_log_name[FN_REFLEN];
   ulonglong checkpoint_master_log_pos;
   ulong checkpoint_seqno;
+  MY_BITMAP group_execed;
+  bool inited_group_execed;
+  volatile bool  running_status; // TRUE when Worker is read-exec loop
+  Log_event *last_event;
 
   int init_info();
   void end_info();
@@ -281,10 +296,6 @@ public:
     ATTRIBUTE_FORMAT(printf, 4, 5);
   void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
 
-  MY_BITMAP group_execed;
-  
-  bool inited_group_execed;
-
 private:
   bool read_info(Rpl_info_handler *from);
   bool write_info(Rpl_info_handler *to);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_slave.cc	2011-05-30 10:05:07 +0000
@@ -80,6 +80,9 @@ ulonglong relay_log_space_limit = 0;
 const char *relay_log_index= 0;
 const char *relay_log_basename= 0;
 
+void append_item_to_jobs(slave_job_item *job_item,
+                         Slave_worker *w, Relay_log_info *rli);
+
 /*
   When slave thread exits, we need to remember the temporary tables so we
   can re-use them on slave start.
@@ -1075,8 +1078,8 @@ static bool sql_slave_killed(THD* thd, R
   bool ret= FALSE;
   DBUG_ENTER("sql_slave_killed");
 
-  DBUG_ASSERT(rli->info_thd == thd || thd->slave_thread);
-  DBUG_ASSERT(rli->slave_running == 1 || thd->slave_thread);// tracking buffer overrun
+  DBUG_ASSERT(rli->info_thd == thd);
+  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
   if (abort_loop || thd->killed || rli->abort_slave)
   {
     /*
@@ -2748,7 +2751,7 @@ int ulong_cmp(ulong *id1, ulong *id2)
 
    - Reports errors as needed.
 
-  @param ev The event to apply.
+  @param ptr_ev a pointer to a reference to the event to apply.
 
   @param thd The client thread that executes the event (i.e., the
   slave sql thread if called from a replication slave, or the client
@@ -2758,6 +2761,9 @@ int ulong_cmp(ulong *id1, ulong *id2)
   a replication slave, or the client's thd->rli_fake if called to
   execute a BINLOG statement).
 
+  @note MTS can store NULL to @c ptr_ev location to indicate
+        the event is taken over by a Worker.
+
   @retval 0 OK.
 
   @retval 1 Error calling ev->apply_event().
@@ -2765,10 +2771,11 @@ int ulong_cmp(ulong *id1, ulong *id2)
   @retval 2 No error calling ev->apply_event(), but error calling
   ev->update_pos().
 */
-int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
+int apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli)
 {
   int exec_res= 0;
   bool skip_event= FALSE;
+  Log_event *ev= *ptr_ev;
 
   DBUG_ENTER("apply_event_and_update_pos");
 
@@ -2819,16 +2826,79 @@ int apply_event_and_update_pos(Log_event
   }
   if (reason == Log_event::EVENT_SKIP_NOT)
   {
-    /* 
-       MTS-todo: to test neither skipping nor delayed-exec logics
-       are affected by parallel exec mode.
-    */
-
     // Sleeps if needed, and unlocks rli->data_lock.
-
     if (sql_delay_event(ev, thd, rli))
       DBUG_RETURN(0);
     exec_res= ev->apply_event(rli);
+
+    if (!exec_res && (ev->worker != rli))
+    {
+      if (ev->worker)
+      {
+        Slave_job_item item= {ev}, *job_item= &item;
+        Slave_worker *w= (Slave_worker *) ev->worker;
+        bool need_sync=
+          (ev->mts_number_dbs() == OVER_MAX_DBS_IN_EVENT_MTS) &&
+          ev->get_mts_event_ends_group();
+
+        DBUG_ASSERT(!(ev->ends_group() || !rli->curr_group_seen_begin) ||
+                    ((Slave_worker*) ev->worker) == rli->last_assigned_worker);
+
+        DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
+
+        // Reset mts in-group state
+        if (ev->ends_group() || !rli->curr_group_seen_begin)
+        {
+          // CGAP cleanup
+          for (uint i= rli->curr_group_assigned_parts.elements; i > 0; i--)
+            delete_dynamic_element(&rli->
+                                   curr_group_assigned_parts, i - 1);
+          // reset the B-group marker
+          rli->curr_group_seen_begin= FALSE;
+          rli->last_assigned_worker= NULL;
+        }
+
+        if (rli->curr_group_da.elements > 0)
+        {
+          /*
+            the current event sorted out which partion the current group belongs to.
+            It's time now to processed deferred array events.
+          */
+          for (uint i= 0; i < rli->curr_group_da.elements; i++)
+          { 
+            Slave_job_item da_item;
+            get_dynamic(&rli->curr_group_da, (uchar*) &da_item.data, i);
+            append_item_to_jobs(&da_item, w, rli);
+          }
+          if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
+          {
+            // reallocate to less mem
+            
+            DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
+          
+            rli->curr_group_da.elements= rli->curr_group_da.max_element;
+            rli->curr_group_da.max_element= 0;
+            freeze_size(&rli->curr_group_da); // restores max_element
+          }
+          rli->curr_group_da.elements= 0;
+        }
+
+        //job_item->data= ev;
+        /* Notice `ev' instance can be destoyed after `append()' */
+        append_item_to_jobs(job_item, w, rli);
+
+        if (need_sync)
+        {
+          /*
+            combination of over-max db:s and end of the current group
+            forces to wait for the group completion by the assigned worker.
+          */
+          (void) wait_for_workers_to_finish(rli, w);
+        }
+
+      }
+      *ptr_ev= NULL; // announcing the event is passed to w-worker
+    }
   }
   else
     mysql_mutex_unlock(&rli->data_lock);
@@ -2848,7 +2918,9 @@ int apply_event_and_update_pos(Log_event
       See sql/rpl_rli.h for further details.
     */
     int error= 0;
-    if (skip_event || !rli->is_parallel_exec() || !rli->curr_group_is_parallel)
+    if (*ptr_ev &&
+        (!(ev->get_type_code() == XID_EVENT && rli->is_transactional()) ||
+         skip_event))
     {
 #ifndef DBUG_OFF
       /*
@@ -2923,6 +2995,7 @@ int apply_event_and_update_pos(Log_event
   DBUG_RETURN(exec_res ? 1 : 0);
 }
 
+
 /**
   Top-level function for executing the next event in the relay log.
   This is called from the SQL thread.
@@ -2963,7 +3036,7 @@ static int exec_relay_log_event(THD* thd
    */
   mysql_mutex_lock(&rli->data_lock);
 
-  Log_event * ev = next_event(rli);
+  Log_event *ev = next_event(rli), **ptr_ev= &ev;
 
   DBUG_ASSERT(rli->info_thd==thd);
 
@@ -3037,19 +3110,13 @@ static int exec_relay_log_event(THD* thd
                       };);
     }
 
-    exec_res= apply_event_and_update_pos(ev, thd, rli);
+    /* ptr_ev can change to NULL indicating MTS coorinator passed to a Worker */
+    exec_res= apply_event_and_update_pos(ptr_ev, thd, rli);
 
-    if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
+    if (*ptr_ev)
     {
-      DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
-                  ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
-      
-      if (rli->curr_group_split)
-      {
-        // the current group split status is reset
-        rli->curr_group_is_parallel= TRUE;
-        rli->curr_group_split= FALSE;
-      }
+      DBUG_ASSERT(*ptr_ev == ev); // event remains to belong to Coordinator
+
       /*
         Format_description_log_event should not be deleted because it will be
         used to read info about the relay log's format; it will be deleted when
@@ -3720,10 +3787,8 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_unlock(&LOCK_thread_count);
 
   mysql_mutex_lock(&w->jobs_lock);
-
-  DBUG_ASSERT(w->jobs.len == rli->mts_slave_worker_queue_len_max + 1);
-  w->jobs.len= 0;
-  mysql_cond_signal(&w->jobs_cond);  // ready for duty
+  w->running_status= TRUE;           // ready for duty
+  mysql_cond_signal(&w->jobs_cond);
 
   mysql_mutex_unlock(&w->jobs_lock);
 
@@ -3734,13 +3799,13 @@ pthread_handler_t handle_slave_worker(vo
       error= slave_worker_exec_job(w, rli);
   }
 
+  w->cleanup_context(thd, error);
   if (error)
   {
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
     thd->clear_error();
-    w->cleanup_context(thd, error);
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3765,14 +3830,15 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   mysql_mutex_lock(&w->jobs_lock);
-  w->jobs.len= rli->mts_slave_worker_queue_len_max + 1;
+
+  w->running_status= 0;
   sql_print_information("Worker %lu statistics: "
                         "events processed = %lu "
                         "hungry waits = %lu "
                         "priv queue overfills = %llu "
                         ,w->id, w->stmt_jobs, w->wait_jobs, w->jobs.waited_overfill);
-
   mysql_cond_signal(&w->jobs_cond);  // famous last goodbye
+
   mysql_mutex_unlock(&w->jobs_lock);
 
 err:
@@ -3798,9 +3864,6 @@ err:
 /**
    Orders jobs by comparing relay log information.
 */
-#if 0
-int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
-#endif
 
 int mts_event_coord_cmp(LOG_POS_COORD *id1, LOG_POS_COORD *id2)
 {
@@ -3954,126 +4017,6 @@ bool mts_recovery_groups(Relay_log_info 
 
   DBUG_ASSERT(rli->mts_recovery_group_cnt < groups->n_bits);
 
-#if 0
-  for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
-  {
-    group_worker_counter= 0;
-    group_lwm_counter= 0;
-    get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
-
-    sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
-                          "group_relay_log_name %s, group_relay_log_pos %lu "
-                          "group_master_log_name %s, group_master_log_pos %lu",
-                          job_worker.worker_id,
-                          job_worker.group_relay_log_name,
-                          (ulong) job_worker.group_relay_log_pos,
-                          job_worker.group_master_log_name,
-                          (ulong) job_worker.group_master_log_pos);
-
-    for (uint it_file= 0; it_file < above_lwm_jobs.elements; it_file++)
-    {
-      get_dynamic(&above_lwm_jobs, (uchar *) &job_file, it_file);
-
-      /*
-        Either the current relay log file was already processed by the
-        current worker or all groups were analyzed. So, the next file
-        is checked.
-      */
-      if  ((strcmp(job_worker.group_relay_log_name,
-                  job_file.group_relay_log_name) > 0) ||
-           (group_worker_counter > (rli->checkpoint_group - 1)))
-        continue;
-
-      if (desc)
-      {
-        delete desc;
-        desc= NULL;
-      }
-
-      if (log_name)
-      {
-        end_io_cache(&log);
-        mysql_file_close(file, MYF(MY_WME));
-        log_name= NULL;
-      }
-
-      if ((file= open_binlog(&log, job_file.group_relay_log_name, &errmsg)) < 0)
-      {
-        sql_print_error("%s", errmsg);
-        goto end;
-      }
-      log_name= job_file.group_relay_log_name;
-      my_stat(log_name, &s, MYF(0));
-
-      if (!((desc= Log_event::read_log_event(&log, 0, &fdle,
-                                             opt_master_verify_checksum)) &&
-           desc->get_type_code() == FORMAT_DESCRIPTION_EVENT))
-        goto end;
-    
-      my_b_seek(&log, (my_off_t) 0);
-      while ((ev= Log_event::read_log_event(&log, 0, &fdle,
-              opt_master_verify_checksum)))
-      {
-        DBUG_ASSERT(ev->is_valid());
-
-        /*
-          All groups were analyzed. So, the next worker needs to
-          be checked.
-        */
-        if (group_worker_counter > (rli->checkpoint_group - 1))
-          break;
-
-        filecmp= strcmp(job_file.group_relay_log_name,
-                        job_worker.group_relay_log_name);
-        poscmp= ev->log_pos -
-                job_worker.group_master_log_pos;
-
-        if (filecmp > 0 || (filecmp == 0 && poscmp > 0))
-        {
-          bool unhandled= !bitmap_is_set(&job_worker.worker->group_execed,
-                                         group_worker_counter);
-          if (ev->starts_group())
-            curr_group_seen_begin= TRUE;
-
-          if (ev->ends_group() || !curr_group_seen_begin)
-          {
-            filecmp= strcmp(job_file.group_relay_log_name,
-                            rli->get_group_master_log_name());
-            poscmp= ev->log_pos -
-                    rli->get_group_master_log_pos();
-            if (filecmp > 0 || (filecmp == 0 && poscmp > 0))
-            {
-              if (unhandled)
-                bitmap_is_set(groups, group_lwm_counter);
-              group_lwm_counter++;
-            }
-            curr_group_seen_begin= FALSE;
-            group_worker_counter++;
-          }
-        }
-
-        delete ev;
-        ev= NULL;
-      }
-    }
-  }
-
-end:
-  if (desc)
-  {
-    delete desc;
-    desc= NULL;
-  }
-
-
-  if (log_name)
-  {
-    end_io_cache(&log);
-    mysql_file_close(file, MYF(MY_WME));
-    log_name= NULL;
-  }
-#endif
-
   end_io_cache(&log);
   mysql_file_close(file, MYF(MY_WME));
   log_name= NULL;
@@ -4218,9 +4161,7 @@ int slave_start_single_worker(Relay_log_
     goto err;
   }
   
-  // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
-  // entry->usage assert
-  w->curr_group_exec_parts->dynamic_ids.elements= 0;
+  w->curr_group_exec_parts.elements= 0;
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
   w->checkpoint_notified= FALSE;
   w->workers= rli->workers; // shallow copying is sufficient
@@ -4231,18 +4172,17 @@ int slave_start_single_worker(Relay_log_
   w->usage_partition= 0;
   w->last_group_done_index= rli->gaq->s; // out of range
 
-  w->jobs.s= rli->mts_slave_worker_queue_len_max;
-  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
+  w->jobs.a= 0;
+  w->jobs.len= 0;
+  w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
+  w->jobs.waited_overfill= 0;
+  w->jobs.e= w->jobs.s= rli->mts_slave_worker_queue_len_max;
+  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0);
   for (k= 0; k < w->jobs.s; k++)
     insert_dynamic(&w->jobs.Q, (uchar*) &empty);
   
   DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
   
-  w->jobs.e= w->jobs.s;
-  w->jobs.a= 0;
-  w->jobs.len= rli->mts_slave_worker_queue_len_max + 1; // to first handshake
-  w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
-  w->jobs.waited_overfill= 0;
   w->wq_overrun_set= FALSE;
   set_dynamic(&rli->workers, (uchar*) &w, i);
   mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
@@ -4259,7 +4199,7 @@ int slave_start_single_worker(Relay_log_
   }
   
   mysql_mutex_lock(&w->jobs_lock);
-  if (w->jobs.len != 0)
+  if (!w->running_status)
     mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
   mysql_mutex_unlock(&w->jobs_lock);
   // Least occupied inited with zero
@@ -4303,6 +4243,9 @@ int slave_start_workers(Relay_log_info *
                                       sizeof(Slave_job_group),
                                       1 + rli->opt_slave_parallel_workers *
                                       rli->mts_slave_worker_queue_len_max, n);
+  if (!rli->gaq->inited)
+    return 1;
+
   rli->mts_pending_jobs_size= 0;
   rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
   rli->mts_wqs_underrun_w_id= (ulong) -1;
@@ -4312,9 +4255,6 @@ int slave_start_workers(Relay_log_info *
   rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
   rli->mts_total_groups= 0;
   rli->curr_group_seen_begin= FALSE;
-  rli->curr_group_is_parallel= FALSE;
-  rli->curr_group_isolated= FALSE;
-  rli->curr_group_split= FALSE;
   rli->checkpoint_seqno= 0;
   /*
     dyn memory to consume by Coordinator per event
@@ -4345,7 +4285,11 @@ err:
 }
 
 /* 
-   Worker threads ends one-by-one with synch through rli->pending_jobs
+   Ending Worker threads.
+
+   Workers are notified with setting KILLED status
+   and waited for their acknowledgment as specified by
+   worker's running_status.
 */
 void slave_stop_workers(Relay_log_info *rli)
 {
@@ -4362,12 +4306,13 @@ void slave_stop_workers(Relay_log_info *
     
     mysql_mutex_lock(&w->jobs_lock);
     
-    if (w->jobs.len == rli->mts_slave_worker_queue_len_max + 1)
+    if (!w->running_status)
     {
       mysql_mutex_unlock(&w->jobs_lock);
       continue;
     }
     mysql_mutex_unlock(&w->jobs_lock);
+    sql_print_information("Notifying Worker %lu to exit", w->id);
     
     mysql_mutex_lock(&w->info_thd->LOCK_thd_data);
     w->info_thd->awake(THD::KILL_QUERY);
@@ -4382,7 +4327,7 @@ void slave_stop_workers(Relay_log_info *
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
 
     mysql_mutex_lock(&w->jobs_lock);
-    while (w->jobs.len != rli->mts_slave_worker_queue_len_max + 1)
+    while (w->running_status)
     {
       const char *save_proc_info;
       save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
@@ -4466,10 +4411,12 @@ pthread_handler_t handle_slave_sql(void 
 
   pthread_detach_this_thread();
 
-  /* mts-II: starting the worker pool */
+  /* MTS: starting the worker pool */
   if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
   {
       mysql_mutex_unlock(&rli->run_lock);
+      rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, 
+                  "Failed during slave workers initialization");
       goto err;
   }
   if (init_slave_thread(thd, SLAVE_THD_SQL))
@@ -4534,7 +4481,6 @@ pthread_handler_t handle_slave_sql(void 
     goto err;
   }
   THD_CHECK_SENTRY(thd);
-
 #ifndef DBUG_OFF
   {
     char llbuf1[22], llbuf2[22];
@@ -5834,7 +5780,6 @@ static Log_event* next_event(Relay_log_i
   mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
   const char* errmsg=0;
   THD* thd = rli->info_thd;
-
   DBUG_ENTER("next_event");
 
   DBUG_ASSERT(thd != 0);
@@ -5949,7 +5894,7 @@ static Log_event* next_event(Relay_log_i
       if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 || force))
       {
         ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
-        mts_checkpoint_routine(rli, period, force, TRUE); // ALFRANIO ERROR
+        mts_checkpoint_routine(rli, period, force, TRUE); // TODO: ALFRANIO ERROR
       }
 
       if (hot_log)
@@ -6080,7 +6025,7 @@ static Log_event* next_event(Relay_log_i
 
           do
           {
-            mts_checkpoint_routine(rli, period, FALSE, FALSE); // ALFRANIO ERROR
+            mts_checkpoint_routine(rli, period, FALSE, FALSE); // TODO: ALFRANIO ERROR
             set_timespec_nsec(waittime, period);
             thd->enter_cond(log_cond, log_lock,
                             "Slave has read all relay log; "
@@ -6746,9 +6691,8 @@ int reset_slave(THD *thd, Master_info* m
 
   /* 
     Clear master's log coordinates 
-
-    Andrei needs to guarantee that this done in sequential mode.
   */
+  DBUG_ASSERT(!mi->rli || !mi->rli->slave_running); // none writes in rli table
   mi->init_master_log_pos();
 
   if (remove_info(mi))
@@ -7069,9 +7013,9 @@ bool change_master(THD* thd, Master_info
     info and relay log info are prepared to handle events from all
     masters. In such case, we need to execute the code below for each
     master and correctly set the key_info_idx. /Alfranio
-
-    Andrei needs to guarantee that this done in sequential mode.
   */
+
+  DBUG_ASSERT(!mi->rli->slave_running); // none writes in rli table
   ret= mi->rli->flush_info(TRUE);
   mysql_cond_broadcast(&mi->data_cond);
   mysql_mutex_unlock(&mi->rli->data_lock);

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-05-24 14:29:35 +0000
+++ b/sql/sql_base.cc	2011-05-30 10:05:07 +0000
@@ -125,6 +125,19 @@ static void init_tdc_psi_keys(void)
 }
 #endif /* HAVE_PSI_INTERFACE */
 
+static void modify_slave_open_temp_tables(THD *thd, int inc)
+{
+  if (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER)
+  {
+    my_atomic_rwlock_wrlock(&slave_open_temp_tables_lock);
+    my_atomic_add32(&slave_open_temp_tables, inc);
+    my_atomic_rwlock_wrlock(&slave_open_temp_tables_unlock);
+  }
+  else
+  {
+    slave_open_temp_tables += inc;
+  }
+}
 
 /**
    Total number of TABLE instances for tables in the table definition cache
@@ -2133,7 +2146,7 @@ void close_temporary_table(THD *thd, TAB
   {
     /* natural invariant of temporary_tables */
     DBUG_ASSERT(slave_open_temp_tables || !thd->temporary_tables);
-    slave_open_temp_tables--;
+    modify_slave_open_temp_tables(thd, -1);
   }
   close_temporary(table, free_share, delete_table);
   DBUG_VOID_RETURN;
@@ -5858,7 +5871,7 @@ TABLE *open_table_uncached(THD *thd, con
     thd->temporary_tables= tmp_table;
     thd->temporary_tables->prev= 0;
     if (thd->slave_thread)
-      slave_open_temp_tables++;
+      modify_slave_open_temp_tables(thd, 1);
   }
   tmp_table->pos_in_table_list= 0;
   DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2011-01-11 23:01:02 +0000
+++ b/sql/sql_parse.cc	2011-05-30 10:05:07 +0000
@@ -5164,7 +5164,7 @@ bool check_stack_overrun(THD *thd, long 
 			 uchar *buf __attribute__((unused)))
 {
   long stack_used;
-  DBUG_ASSERT(thd == current_thd);  // mts-II: be prepared to hit it
+  DBUG_ASSERT(thd == current_thd);
   if ((stack_used=used_stack(thd->thread_stack,(char*) &stack_used)) >=
       (long) (my_thread_stack_size - margin))
   {


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110530100507-4f5mbtdvz6yzct14.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3280) WL#5569WL#5754Andrei Elkin31 May