List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:February 17 2011 7:56pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3270) WL#5754
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3270 Andrei Elkin	2011-02-17
      WL#5754 Query event parallel execution
      
      Bundling together four rev:s to cut off the DML stage of the parallel query project.
      Changeset comments of the former patches follow.
      
      This is a patch skeletoning parallel applying of Queries containing a temporary table.
      A new specialized test proves stability and data consistency.
      
      ******
      wl#5754 Parallel Query replication
      
      This patch implements a core of the design that is the DML queries.
      Queries can contain arbitrary features including temp tables.
      
      Todo:
      
      1. Extend the framework to capture the remained DDL:s
      2. fix the server startup opt (--mts-exp-run-query-in-parallel).
      3. Refine only_sequential() to return TRUE in case the list of the updated db:s
         is bigger than the max, *and* in case the query is from the Old master.
      
      
      ******
      wl#5754 Query parallelization
      
      intermediate commit with some cleanup after removing --mts-exp-query-in-parall
      ******
      wl#5754 Query parallel replication.
      
      Fixed various issues incl the preliminary review comments, stored routines handling.
     @ mysql-test/suite/rpl/r/rpl_packet.result
        results updated.
     @ mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
        new result file is added.
     @ mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
        a new results file
        ******
        new result file is added.
     @ mysql-test/suite/rpl/t/rpl_packet.test
        making a hashing fixes in order the test to pass. 
        todo: refine logics of max_allowed_packed for master & slave.
     @ mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
        multi-db DML query test is added.
        todo: add triggers, sf(), SP.
        ******
        adding stored routines testing.
     @ mysql-test/suite/rpl/t/rpl_parallel_temp_query.test
        query with temporary tables testing.
     @ sql/binlog.cc
        gathering to be updated db in the DML case. Over-MAX_DBS_IN_QUERY_MTS-sized list won't be shipped
        to the slave.
        ******
        correcting memory allocation to be in thd's memroot.
     @ sql/field.cc
        relaxing an assert (todo: add to it more specific claim field->table is temp).
        ******
        adding comments to asserts.
     @ sql/handler.cc
        relaxing an assert (todo: add to it more specific claim table is temp).
        ******
        adding comments to asserts.
     @ sql/log_event.cc
        master and slave (Coord's and Worker's) handling of updated db:s.
        The Coordinator's distribution changed to involve a loop per db, similaraly for the Worker
        at applying.
        ******
        adding comments and correcting clearence of binlog_updated_db_names to not let
        BEGIN, COMMIT in particular to get the updated list.
     @ sql/log_event.h
        Hardcoding the max updated db:s.
        Static allocation for updated db:s in Query log event is motivated
        by the fact of the event is shared by both C and W and the standard malloc/free
        can't be a reasonble choice either.
        Added a new status and changed dependent info.
        Added a new method to return the *list* of updated db:s which in all but Query case
        is just a wrapper over get_db().
        
        ******
        adding commits, and interfaces to helper functions.
     @ sql/rpl_rli.cc
        a new temp table mutex init, destroy and a set of helper functions providing access
        to C,W's thd:s in arbitrary place of the server code are added.
        ******
        fixing an error.
        ******
        relocalating helper functions to rpl_slave.cc.
     @ sql/rpl_rli.h
        a new temp table mutex is added to RLI class.
        ******
        improving comments.
     @ sql/rpl_slave.cc
        SLAVE_THD_WORKER appeared to be redundant. Worker's thd->system_thread is set to the same
        as the Coordinator thread constant.
        ******
        Added a work-around/cleanup needed by the standard temp table closing algorithm.
        ******
        comments explaining close_temp_tables() not to run by Workers.
        Accepting relocated functions.
     @ sql/sql_base.cc
        replacing refs to thd->temporary with an appropriate one corresponding to the Coord's thd->t_t:s.
        Also surrounding critical sections of codes dealing with opening, finding, closing or changing
        temproray_tables' list with a specific mutex lock/unlock.
        ******
        Correcting and simplifying logics for the temp table parallel support.
        In particular close_temporary_tables() does not need to know about thd of the caller.
     @ sql/sql_class.cc
        master side gathering updated db:s new memeber initializations.
        ******
        Correcting logics of merging the updated db:s of a child to the parent's top-level.
     @ sql/sql_class.h
        master side gathering updated db:s list and accessor members. 
        ******
        adding a necessary cleanup method.
     @ sql/sys_vars.cc
        Added a system variable (todo/fixme: may turn out to be unnecessary though).

    added:
      mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
      mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
      mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
      mysql-test/suite/rpl/t/rpl_parallel_temp_query.test
    modified:
      mysql-test/r/mysqld--help-notwin.result
      mysql-test/suite/rpl/r/rpl_packet.result
      mysql-test/suite/rpl/t/rpl_packet.test
      mysql-test/suite/sys_vars/r/all_vars.result
      sql/binlog.cc
      sql/field.cc
      sql/handler.cc
      sql/log_event.cc
      sql/log_event.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
      sql/sql_base.cc
      sql/sql_class.cc
      sql/sql_class.h
      sql/sys_vars.cc
=== modified file 'mysql-test/r/mysqld--help-notwin.result'
--- a/mysql-test/r/mysqld--help-notwin.result	2011-01-11 23:01:02 +0000
+++ b/mysql-test/r/mysqld--help-notwin.result	2011-02-17 19:56:48 +0000
@@ -362,9 +362,6 @@ The following options may be given as th
  If enabled slave itself computes the event appying time
  value to implicitly affected timestamp columms. Otherwise
  (default) it installs prescribed by the master value
- --mts-exp-slave-run-query-in-parallel 
- The default not an actual database name is used as
- partition info for parallel execution of Query_log_event 
  --mts-partition-hash-soft-max=# 
  Number of records in the mts partition hash below which
  entries with zero usage are tolerated
@@ -910,7 +907,6 @@ mts-checkpoint-group 512
 mts-checkpoint-period 300
 mts-coordinator-basic-nap 5
 mts-exp-slave-local-timestamp FALSE
-mts-exp-slave-run-query-in-parallel FALSE
 mts-partition-hash-soft-max 16
 mts-pending-jobs-size-max 16777216
 mts-slave-parallel-workers 0

=== modified file 'mysql-test/suite/rpl/r/rpl_packet.result'
--- a/mysql-test/suite/rpl/r/rpl_packet.result	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/r/rpl_packet.result	2011-02-17 19:56:48 +0000
@@ -23,12 +23,12 @@ select * from information_schema.session
 VARIABLE_NAME	VARIABLE_VALUE
 SLAVE_RUNNING	ON
 drop database DB_NAME_OF_MAX_LENGTH_AKA_NAME_LEN_64_BYTES_____________________;
-SET @@global.max_allowed_packet=4096;
-SET @@global.net_buffer_length=4096;
+SET @@global.max_allowed_packet=4096 + (floor(64 * 3 * 254 / 1024) + 1) * 1024;
+SET @@global.net_buffer_length=@@global.max_allowed_packet;
 include/stop_slave.inc
 include/start_slave.inc
 CREATE TABLE `t1` (`f1` LONGTEXT) ENGINE=MyISAM;
-INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 include/wait_for_slave_io_error.inc [errno=1153]
 Last_IO_Error = 'Got a packet bigger than 'max_allowed_packet' bytes'
 include/stop_slave_sql.inc
@@ -51,10 +51,10 @@ include/wait_for_slave_to_start.inc
 DROP TABLE t1;
 select @@global.max_allowed_packet;
 @@global.max_allowed_packet
-4096
+53248
 select @@global.net_buffer_length;
 @@global.net_buffer_length
-4096
+53248
 select @@global.max_allowed_packet;
 @@global.max_allowed_packet
 1024
@@ -62,7 +62,7 @@ select @@global.net_buffer_length;
 @@global.net_buffer_length
 1024
 CREATE TABLE t1 (a TEXT) ENGINE=MyISAM;
-INSERT INTO t1 VALUES (REPEAT('a', 2048));
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 # 1153 = ER_NET_PACKET_TOO_LARGE
 include/wait_for_slave_io_error.inc [errno=1153]
 Last_IO_Error = 'Got a packet bigger than 'max_allowed_packet' bytes'

=== added file 'mysql-test/suite/rpl/r/rpl_parallel_multi_db.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	2011-02-17 19:56:48 +0000
@@ -0,0 +1,141 @@
+include/master-slave.inc
+[connection master]
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
+include/start_slave.inc
+Warnings:
+Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+create database d4;
+create table d4.t8 (a int);
+select round(rand()*8) into @var;
+insert into d4.t8 values (@var);
+create table d4.t7 (a int);
+select round(rand()*8) into @var;
+insert into d4.t7 values (@var);
+create table d4.t6 (a int);
+select round(rand()*8) into @var;
+insert into d4.t6 values (@var);
+create table d4.t5 (a int);
+select round(rand()*8) into @var;
+insert into d4.t5 values (@var);
+create table d4.t4 (a int);
+select round(rand()*8) into @var;
+insert into d4.t4 values (@var);
+create table d4.t3 (a int);
+select round(rand()*8) into @var;
+insert into d4.t3 values (@var);
+create table d4.t2 (a int);
+select round(rand()*8) into @var;
+insert into d4.t2 values (@var);
+create table d4.t1 (a int);
+select round(rand()*8) into @var;
+insert into d4.t1 values (@var);
+create database d3;
+create table d3.t8 (a int);
+select round(rand()*8) into @var;
+insert into d3.t8 values (@var);
+create table d3.t7 (a int);
+select round(rand()*8) into @var;
+insert into d3.t7 values (@var);
+create table d3.t6 (a int);
+select round(rand()*8) into @var;
+insert into d3.t6 values (@var);
+create table d3.t5 (a int);
+select round(rand()*8) into @var;
+insert into d3.t5 values (@var);
+create table d3.t4 (a int);
+select round(rand()*8) into @var;
+insert into d3.t4 values (@var);
+create table d3.t3 (a int);
+select round(rand()*8) into @var;
+insert into d3.t3 values (@var);
+create table d3.t2 (a int);
+select round(rand()*8) into @var;
+insert into d3.t2 values (@var);
+create table d3.t1 (a int);
+select round(rand()*8) into @var;
+insert into d3.t1 values (@var);
+create database d2;
+create table d2.t8 (a int);
+select round(rand()*8) into @var;
+insert into d2.t8 values (@var);
+create table d2.t7 (a int);
+select round(rand()*8) into @var;
+insert into d2.t7 values (@var);
+create table d2.t6 (a int);
+select round(rand()*8) into @var;
+insert into d2.t6 values (@var);
+create table d2.t5 (a int);
+select round(rand()*8) into @var;
+insert into d2.t5 values (@var);
+create table d2.t4 (a int);
+select round(rand()*8) into @var;
+insert into d2.t4 values (@var);
+create table d2.t3 (a int);
+select round(rand()*8) into @var;
+insert into d2.t3 values (@var);
+create table d2.t2 (a int);
+select round(rand()*8) into @var;
+insert into d2.t2 values (@var);
+create table d2.t1 (a int);
+select round(rand()*8) into @var;
+insert into d2.t1 values (@var);
+create database d1;
+create table d1.t8 (a int);
+select round(rand()*8) into @var;
+insert into d1.t8 values (@var);
+create table d1.t7 (a int);
+select round(rand()*8) into @var;
+insert into d1.t7 values (@var);
+create table d1.t6 (a int);
+select round(rand()*8) into @var;
+insert into d1.t6 values (@var);
+create table d1.t5 (a int);
+select round(rand()*8) into @var;
+insert into d1.t5 values (@var);
+create table d1.t4 (a int);
+select round(rand()*8) into @var;
+insert into d1.t4 values (@var);
+create table d1.t3 (a int);
+select round(rand()*8) into @var;
+insert into d1.t3 values (@var);
+create table d1.t2 (a int);
+select round(rand()*8) into @var;
+insert into d1.t2 values (@var);
+create table d1.t1 (a int);
+select round(rand()*8) into @var;
+insert into d1.t1 values (@var);
+include/diff_tables.inc [master:d4.t8, slave:d4.t8]
+include/diff_tables.inc [master:d4.t7, slave:d4.t7]
+include/diff_tables.inc [master:d4.t6, slave:d4.t6]
+include/diff_tables.inc [master:d4.t5, slave:d4.t5]
+include/diff_tables.inc [master:d4.t4, slave:d4.t4]
+include/diff_tables.inc [master:d4.t3, slave:d4.t3]
+include/diff_tables.inc [master:d4.t2, slave:d4.t2]
+include/diff_tables.inc [master:d4.t1, slave:d4.t1]
+include/diff_tables.inc [master:d3.t8, slave:d3.t8]
+include/diff_tables.inc [master:d3.t7, slave:d3.t7]
+include/diff_tables.inc [master:d3.t6, slave:d3.t6]
+include/diff_tables.inc [master:d3.t5, slave:d3.t5]
+include/diff_tables.inc [master:d3.t4, slave:d3.t4]
+include/diff_tables.inc [master:d3.t3, slave:d3.t3]
+include/diff_tables.inc [master:d3.t2, slave:d3.t2]
+include/diff_tables.inc [master:d3.t1, slave:d3.t1]
+include/diff_tables.inc [master:d2.t8, slave:d2.t8]
+include/diff_tables.inc [master:d2.t7, slave:d2.t7]
+include/diff_tables.inc [master:d2.t6, slave:d2.t6]
+include/diff_tables.inc [master:d2.t5, slave:d2.t5]
+include/diff_tables.inc [master:d2.t4, slave:d2.t4]
+include/diff_tables.inc [master:d2.t3, slave:d2.t3]
+include/diff_tables.inc [master:d2.t2, slave:d2.t2]
+include/diff_tables.inc [master:d2.t1, slave:d2.t1]
+include/diff_tables.inc [master:d1.t8, slave:d1.t8]
+include/diff_tables.inc [master:d1.t7, slave:d1.t7]
+include/diff_tables.inc [master:d1.t6, slave:d1.t6]
+include/diff_tables.inc [master:d1.t5, slave:d1.t5]
+include/diff_tables.inc [master:d1.t4, slave:d1.t4]
+include/diff_tables.inc [master:d1.t3, slave:d1.t3]
+include/diff_tables.inc [master:d1.t2, slave:d1.t2]
+include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== added file 'mysql-test/suite/rpl/r/rpl_parallel_temp_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result	2011-02-17 19:56:48 +0000
@@ -0,0 +1,51 @@
+include/master-slave.inc
+[connection master]
+call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
+include/start_slave.inc
+Warnings:
+Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+create database d2;
+use d2;
+create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d2.t1 (b) select count(*) from tt_##;
+create database d1;
+use d1;
+create table d1.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d1.t1 (b) select count(*) from tt_##;
+create database d4;
+use d4;
+create table d4.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d4.t1 (b) select count(*) from tt_##;
+create database d3;
+use d3;
+create table d3.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d3.t1 (b) select count(*) from tt_##;
+include/diff_tables.inc [master:d4.t1, slave:d4.t1]
+include/diff_tables.inc [master:d3.t1, slave:d3.t1]
+include/diff_tables.inc [master:d2.t1, slave:d2.t1]
+include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+drop temporary table tt_8;
+drop temporary table tt_7;
+drop temporary table tt_6;
+drop temporary table tt_5;
+drop temporary table tt_4;
+drop temporary table tt_3;
+drop temporary table tt_2;
+drop temporary table tt_1;
+drop database d2;
+drop database d1;
+drop temporary table tt_8;
+drop temporary table tt_7;
+drop temporary table tt_6;
+drop temporary table tt_5;
+drop temporary table tt_4;
+drop temporary table tt_3;
+drop temporary table tt_2;
+drop temporary table tt_1;
+drop database d4;
+drop database d3;
+include/stop_slave.inc
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== modified file 'mysql-test/suite/rpl/t/rpl_packet.test'
--- a/mysql-test/suite/rpl/t/rpl_packet.test	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_packet.test	2011-02-17 19:56:48 +0000
@@ -63,8 +63,20 @@ connection master;
 
 # Change the max packet size on master
 
-SET @@global.max_allowed_packet=4096;
-SET @@global.net_buffer_length=4096;
+# Todo: improve over-max_allowed_packet size events block on the slave.
+# The current size checking algorithm is not presize to allow large event
+# to slip it. Reject happens according to the guard:
+#   if (data_len > max(max_allowed_packet,
+#       opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
+# However, MAX_LOG_EVENT_HEADER is a conservative estimate so if the actual
+# header size is less the extra data let in the slave.
+
+# Adding the max size of the query log event status as
+# MAX_DBS_IN_QUERY_MTS * (1 + NAME_LEN) to make the master not fail to read
+# an event itself.
+
+SET @@global.max_allowed_packet=4096 + (floor(64 * 3 * 254 / 1024) + 1) * 1024;
+SET @@global.net_buffer_length=@@global.max_allowed_packet;
 
 # Restart slave for new setting to take effect
 connection slave;
@@ -82,7 +94,7 @@ sync_slave_with_master;
 
 connection master;
 
-INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 
 
 #
@@ -169,6 +181,7 @@ if (`SELECT NOT(@max_allowed_packet_0 = 
 #
 
 connection slave;
+
 START SLAVE;
 --source include/wait_for_slave_to_start.inc
 
@@ -186,7 +199,7 @@ CREATE TABLE t1 (a TEXT) ENGINE=MyISAM;
 
 # Create big row event.
 --connection master
-INSERT INTO t1 VALUES (REPEAT('a', 2048));
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 
 # Slave IO thread should stop with error when trying to read the big event.
 --connection slave

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	2011-02-17 19:56:48 +0000
@@ -0,0 +1,228 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of Query event parallelization when
+# a DML Query modifies multiple databases.
+#
+
+--source include/master-slave.inc
+--source include/have_binlog_format_statement.inc
+
+--disable_query_log
+call mtr.add_suppression('.*Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.*');
+--enable_query_log
+
+# restart in Parallel
+
+let $workers= 4;
+
+connection slave;
+
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+
+source include/start_slave.inc;
+
+let $dbs= 4;
+let $tables= 8;
+let $queries= `select $dbs*$tables * 8`;
+
+#
+# 1. Case of multi-update
+#
+
+connection master;
+
+# create & populate
+
+let $n= $dbs;
+while ($n)
+{
+  eval create database d$n;
+  let $m= $tables;
+  while ($m)
+  {
+     eval create table d$n.t$m (a int);
+     eval select round(rand()*$tables) into @var;
+     eval insert into d$n.t$m values (@var);
+     dec $m;
+  }
+  dec $n;
+}
+
+
+# operate to check consistency in the end
+
+let $k= $queries;
+
+--disable_query_log
+--disable_warnings
+while ($k)
+{
+   let $n1= `select floor(rand()*$dbs + 1)`;
+   let $m1= `select floor(rand()*$tables + 1)`;
+   let $n2= `select floor(rand()*$dbs + 1)`;
+   let $m2= `select floor(rand()*$tables + 1)`;
+   let $n3= `select floor(rand()*$dbs + 1)`;
+   let $m3= `select floor(rand()*$tables + 1)`;
+   let $n4= `select floor(rand()*$dbs + 1)`;
+   let $m4= `select floor(rand()*$tables + 1)`;
+   eval update d$n1.t$m1 as t_1, d$n2.t$m2 as t_2, d$n3.t$m3, d$n4.t$m4 as t_3 set t_1.a=t_2.a+ round(rand(10)), t_2.a=t_3.a+ round(rand(10)), t_3.a=t_1.a+ round(rand(10)), t_3.a=t_1.a+ round(rand(10));
+   dec $k;
+}
+--enable_warnings
+--enable_query_log
+
+sync_slave_with_master; 
+
+#
+# 2. Case of invoked routines
+#
+
+# A. Functions
+
+# create functions & run load
+let $n= $dbs;
+# n'th db func is defined through n-1'th except n == 1
+let $n1= $n;
+dec $n1;
+
+connection master;
+
+--disable_query_log
+
+while ($n1)
+{
+  let $m= `select floor(rand()*$tables) + 1`;
+
+  delimiter |;
+
+  eval create function `d$n`.`func` (a int) returns int
+     begin
+     insert into `d$n`.`t$m` values (`d$n1`.`func`(1));
+     return 1;
+     end|
+
+  delimiter ;|
+
+  dec $n;
+  dec $n1;
+}
+
+delimiter |;
+
+eval create function `d1`.`func` (a int) returns int
+     begin
+     insert into `d1`.`t$m` values (0);
+     return 1;
+     end|
+
+delimiter ;|
+
+
+# invoke...
+
+let $k= $queries;
+
+while ($k)
+{
+   let $n= `select floor(rand()*$dbs) + 1`;
+   let $m= `select floor(rand()*$tables) + 1`;
+   let $n1= $n;
+   dec $n1;
+   if ($n1)
+   {
+       eval insert into d$n.t$m values (`d$n1`.`func`(1));
+   }
+   dec $k;
+}
+
+--enable_query_log
+
+sync_slave_with_master;
+
+# B. Triggers
+
+connection master;
+
+# create triggers & run load
+let $n= $dbs;
+# n'th db tables won't have any trigger to avoid circular dependency
+let $n1= $n;
+dec $n1;
+
+--disable_query_log
+while ($n1)
+{
+  let $m= $tables;
+
+  while ($m)
+  {
+     eval create trigger `d$n1`.`trg_t$m` before insert on `d$n1`.`t$m` for each row insert into `d$n`.`t$m` values(1);
+     dec $m;
+  }
+  dec $n;
+  dec $n1;
+}
+--enable_query_log
+
+# invoke...
+
+let $k= $queries;
+
+--disable_query_log
+--disable_warnings
+while ($k)
+{
+   let $n= `select floor(rand()*$dbs + 1)`;
+   let $m= `select floor(rand()*$tables + 1)`;
+   eval insert into d$n.t$n values (2);
+   dec $k;
+}
+--enable_warnings
+--enable_query_log
+
+
+sync_slave_with_master;
+
+#
+# Consistency check
+#
+
+let $n = $dbs;
+while($n)
+{
+  let $m= $tables;
+  while ($m)
+  {
+    let $diff_tables=master:d$n.t$m, slave:d$n.t$m;
+    source include/diff_tables.inc;
+    dec $m;
+  }
+  dec $n;
+}
+
+
+#
+# Clean-up
+#
+
+connection master;
+
+--disable_query_log
+
+let $n= $dbs;
+while ($n)
+{
+  eval drop database d$n;
+  dec $n;
+}
+
+--enable_query_log
+
+sync_slave_with_master;
+
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+### TODO: --source include/rpl_end.inc

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_temp_query.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_temp_query.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_temp_query.test	2011-02-17 19:56:48 +0000
@@ -0,0 +1,169 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of Query events parallelization.
+#
+
+--source include/master-slave.inc
+--source include/have_binlog_format_statement.inc
+
+call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
+
+let $temp_tables= 16;
+let $workers= 4;
+
+connection slave;
+
+# restart in Parallel
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+
+source include/start_slave.inc;
+
+# Two connections each create 2 db:s a regular table and a set of temp tables.
+# The temp tables contribute content to the regular tables.
+# In the end there must be consistent data on both sides.
+
+connection master;
+
+let $n= `select round($workers/2)`;
+let $n1= `select $n`;
+while ($n)
+{
+  eval create database d$n1;
+  eval use d$n1;
+  eval create table d$n1.t1 (a int auto_increment primary key, b int) engine=innodb;
+  let $i= $temp_tables;
+
+--disable_query_log
+  while($i)
+  {
+    let $temp_rows= `select round(rand()*$temp_tables) + 1`;
+    let $k= $temp_rows;
+    eval create temporary table tt_$i (a int auto_increment primary key);
+    while($k)
+    {
+	eval insert into tt_$i values (null);
+	dec $k;
+    }
+    dec $i;
+  }
+--enable_query_log
+
+  let $j= `select floor(rand()*$temp_tables) + 1`;
+--replace_regex /tt_.*/tt_##/
+  eval insert into d$n1.t1 (b) select count(*) from tt_$j;
+  dec $n;
+  dec $n1;
+}
+
+connection master1;
+
+let $n= `select round($workers/2)`;
+let $n1= `select 2*$n`;
+while ($n)
+{
+  eval create database d$n1;
+  eval use d$n1;
+  eval create table d$n1.t1 (a int auto_increment primary key, b int) engine=innodb;
+  let $i= $temp_tables;
+
+--disable_query_log
+  while($i)
+  {
+    let $temp_rows= `select round(rand()*$temp_tables) + 1`;
+    let $k= $temp_rows;
+    eval create temporary table tt_$i (a int auto_increment primary key);
+    while($k)
+    {
+	eval insert into tt_$i values (null);
+	dec $k;
+    }
+    dec $i;
+  }
+--enable_query_log
+
+  let $j= `select floor(rand()*$temp_tables) + 1`;
+--replace_regex /tt_.*/tt_##/
+  eval insert into d$n1.t1 (b) select count(*) from tt_$j;
+  dec $n;
+  dec $n1;
+}
+
+sync_slave_with_master;
+
+#
+# Consistency check
+#
+
+let $n = $workers;
+while($n)
+{
+  let $diff_tables=master:d$n.t1, slave:d$n.t1;
+  source include/diff_tables.inc;
+
+  dec $n;
+}
+
+#
+# cleanup
+#
+# Temp tables are removed two ways explicitly and implicitly by disconnecting.
+#
+
+connection master;
+
+let $i= `select round($temp_tables/2)`;
+while($i)
+{
+  eval drop temporary table tt_$i;
+  dec $i;
+}
+
+let $n= `select round($workers/2)`;
+let $n1= `select $n`;
+while ($n)
+{
+  eval drop database d$n1;
+  dec $n;
+  dec $n1;
+}
+
+
+connection master1;
+
+let $i= `select round($temp_tables/2)`;
+while($i)
+{
+  eval drop temporary table tt_$i;
+  dec $i;
+}
+
+sync_slave_with_master;
+#connection slave;
+
+connection master1;
+disconnect master1;
+
+#
+# Clean-up
+#
+
+connection master;
+
+let $n= `select round($workers/2)`;
+let $n1= `select 2*$n`;
+while ($n)
+{
+  eval drop database d$n1;
+  dec $n;
+  dec $n1;
+}
+
+sync_slave_with_master;
+source include/stop_slave.inc;
+
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+### TODO: --source include/rpl_end.inc

=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result	2010-12-27 18:54:41 +0000
+++ b/mysql-test/suite/sys_vars/r/all_vars.result	2011-02-17 19:56:48 +0000
@@ -14,9 +14,7 @@ left join t1 on variable_name=test_name 
 There should be *no* variables listed below:
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
 MTS_EXP_SLAVE_LOCAL_TIMESTAMP
-MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME
@@ -25,18 +23,17 @@ INNODB_PRINT_ALL_DEADLOCKS
 INNODB_RESET_MONITOR_COUNTER
 MTS_SLAVE_PARALLEL_WORKERS
 MTS_WORKER_UNDERRUN_LEVEL
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
 INNODB_ENABLE_MONITOR_COUNTER
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
 MTS_COORDINATOR_BASIC_NAP
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
 MTS_EXP_SLAVE_LOCAL_TIMESTAMP
-MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME
@@ -45,12 +42,13 @@ INNODB_PRINT_ALL_DEADLOCKS
 INNODB_RESET_MONITOR_COUNTER
 MTS_SLAVE_PARALLEL_WORKERS
 MTS_WORKER_UNDERRUN_LEVEL
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
 INNODB_ENABLE_MONITOR_COUNTER
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
 MTS_COORDINATOR_BASIC_NAP
 drop table t1;
 drop table t2;

=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc	2010-12-27 18:54:41 +0000
+++ b/sql/binlog.cc	2011-02-17 19:56:48 +0000
@@ -4733,7 +4733,57 @@ int THD::decide_logging_format(TABLE_LIS
         is_write= TRUE;
 
         prev_write_table= table->table;
+
+        if (variables.binlog_format != BINLOG_FORMAT_ROW &&
+            lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
+        {
+          /*
+            Master side of the STMT format events parallelization.
+            Write-locked table's db:s are stored in a abc-ordered name list.
+            The list can remain empty if the only database that 
+            is updated is the default one.
+            In case the number of databases exceeds MAX_DBS_IN_QUERY_MTS
+            the list won't be sent to the slave either.
+          */
+
+          if (!binlog_updated_db_names)
+          {
+            binlog_updated_db_names= new List<char>; /* thd->mem_root is used */
+          }
+          if (binlog_updated_db_names->elements <  MAX_DBS_IN_QUERY_MTS + 1)
+          {
+            char *after_db= strdup_root(mem_root, table->db);
+            if (binlog_updated_db_names->elements != 0)
+            {
+              List_iterator<char> it(*get_binlog_updated_db_names());
+              
+              while (it++)
+              {
+                char *swap= NULL;
+                char **ref_cur_db= it.ref();
+                int cmp= strcmp(after_db, *ref_cur_db);
+                
+                DBUG_ASSERT(!swap || cmp < 0);
+                
+                if (cmp == 0)
+                {
+                  after_db= NULL;  /* dup to ignore */
+                  break;
+                }
+                else if (swap || cmp > 0)
+                {
+                  swap= *ref_cur_db;
+                  *ref_cur_db= after_db;
+                  after_db= swap;
+                }
+              }
+            }
+            if (after_db)
+              binlog_updated_db_names->push_back(after_db);
+          }
+        }
       }
+
       flags_access_some_set |= flags;
 
       if (lex->sql_command != SQLCOM_CREATE_TABLE ||

=== modified file 'sql/field.cc'
--- a/sql/field.cc	2010-12-29 00:38:59 +0000
+++ b/sql/field.cc	2011-02-17 19:56:48 +0000
@@ -3736,7 +3736,13 @@ longlong Field_long::val_int(void)
   ASSERT_COLUMN_MARKED_FOR_READ;
   int32 j;
   /* See the comment in Field_long::store(long long) */
-  DBUG_ASSERT(table->in_use == current_thd);
+  /* 
+     About  (current_thd)->slave_thread alternative,
+     MTS coordinator open/closes a temp table while the rest of operation
+     is done by Workers.
+  */
+  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+
 #ifdef WORDS_BIGENDIAN
   if (table->s->db_low_byte_first)
     j=sint4korr(ptr);
@@ -6308,8 +6314,8 @@ int Field_string::store(const char *from
   const char *cannot_convert_error_pos;
   const char *from_end_pos;
 
-  /* See the comment for Field_long::store(long long) */
-  DBUG_ASSERT(table->in_use == current_thd);
+  /* See the comment for Field_long::store(long long) and Field_long::val_int */
+  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
 
   copy_length= well_formed_copy_nchars(field_charset,
                                        (char*) ptr, field_length,
@@ -6458,8 +6464,8 @@ String *Field_string::val_str(String *va
 			      String *val_ptr)
 {
   ASSERT_COLUMN_MARKED_FOR_READ;
-  /* See the comment for Field_long::store(long long) */
-  DBUG_ASSERT(table->in_use == current_thd);
+  /* See the comment for Field_long::store(long long) and Field_long::val_int */
+  DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
   uint length;
   if (table->in_use->variables.sql_mode &
       MODE_PAD_CHAR_TO_FULL_LENGTH)

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2011-01-11 11:45:02 +0000
+++ b/sql/handler.cc	2011-02-17 19:56:48 +0000
@@ -2127,7 +2127,13 @@ void **handler::ha_data(THD *thd) const
 
 THD *handler::ha_thd(void) const
 {
-  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd);
+  /* 
+     About current_thd->slave_thread alternative,
+     MTS coordinator open/closes a temp table while the rest of operation
+     is done by Workers.
+  */
+  DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
+              current_thd->slave_thread);
   return (table && table->in_use) ? table->in_use : current_thd;
 }
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-01-11 23:01:02 +0000
+++ b/sql/log_event.cc	2011-02-17 19:56:48 +0000
@@ -2408,7 +2408,7 @@ bool Log_event::contains_partition_info(
    r - a mini-group internal "regular" event that follows its g-parent
       (Write, Update, Delete -rows)
    S - sequentially applied event (may not be a part of any group).
-       Events of this type are determined via @c only_sequential_exec()
+       Events of this type are determined via @c mts_sequential_exec()
        earlier and don't cause calling this method .
    T - terminator of the group (XID, COMMIT, ROLLBACK)
 
@@ -2489,17 +2489,29 @@ Slave_worker *Log_event::get_slave_worke
 
   if (contains_partition_info())
   {
-    // a lot of things inside `get_slave_worker_id'
-    const_cast<Relay_log_info *>(rli)->last_assigned_worker=
-      worker= get_slave_worker(get_db(), const_cast<Relay_log_info *>(rli));
-    get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-    if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+    List_iterator<char> it(*mts_get_dbs());
+
+    it++;
+    do
     {
-      g.worker_id= worker->id;       // todo/fixme: think of Slave_worker* here
-      set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+      char **ref_cur_db= it.ref();
+      // a lot of things inside `get_slave_worker_id'
+      const_cast<Relay_log_info *>(rli)->last_assigned_worker=
+        worker= get_slave_worker(*ref_cur_db, const_cast<Relay_log_info *>(rli));
+      get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+      if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+      {
+        g.worker_id= worker->id;       // todo/fixme: think of Slave_worker* here
+        set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+        
+        DBUG_ASSERT(g.group_relay_log_name == NULL);
+      }
+    } while (it++ && mts_number_dbs() != MAX_DBS_IN_QUERY_MTS + 1);
 
-      DBUG_ASSERT(g.group_relay_log_name == NULL);
-    }
+    // Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
+    // point because the root is no longer needed along remained part of Coordinator's
+    // execution flow.
+    free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
   }
   else // r
     if (rli->last_assigned_worker)
@@ -2529,6 +2541,8 @@ Slave_worker *Log_event::get_slave_worke
       (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
+    if (!rli->curr_group_is_parallel)
+      sleep(1000);
     DBUG_ASSERT(rli->curr_group_is_parallel);
 
     // TODO: throw an error when relay-log reading starts from inside of a group!!
@@ -2782,53 +2796,50 @@ int Log_event::apply_event(Relay_log_inf
   Slave_worker *w= NULL;
   Slave_job_item item= {NULL}, *job_item= &item;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
-  bool parallel;
-  bool seq_event;
-  bool term_event;
+  bool parallel, seq_event, term_event;
+
+  if (rli->is_mts_recovery())
+  {
+    bool skip= _bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
+
+    if (ends_group()) // TODO: || ! seen_begin
+    {
+      c_rli->mts_recovery_index++;
+      if (--c_rli->mts_recovery_group_cnt == 0)
+      {
+        c_rli->recovery_parallel_workers= c_rli->slave_parallel_workers;
+        c_rli->mts_recovery_index= 0;
+      }
+    }
+    if (skip)
+      DBUG_RETURN(0);
+    else 
+      DBUG_RETURN(do_apply_event(rli));
+  }
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      rli->mts_recovery_group_cnt != 0 ||
-      ((seq_event=
-        only_sequential_exec(rli->run_query_in_parallel,
-                             rli->curr_group_seen_begin /* todo: obs 2nd arg */))
-       // rli->curr_group_seen_begin && ends_group() => rli->last_assigned_worker
-       && (!rli->curr_group_seen_begin || parallel_exec_by_coordinator(::server_id))))
+      ((seq_event= mts_sequential_exec()) &&
+       (!rli->curr_group_seen_begin ||
+        mts_async_exec_by_coordinator(::server_id))))
   {
     if (parallel)
     {
-      // This `only-sequential' case relates to a DDL Query case
-      // or a group split apart by FD event
-      DBUG_ASSERT(seq_event &&
-                  (rli->curr_group_da.elements == 0 || rli->curr_group_seen_begin));
-      
-      if (!parallel_exec_by_coordinator(::server_id))
+      if (!mts_async_exec_by_coordinator(::server_id))
       {
+        // current isn't group split and therefore requires Workers to sync
         DBUG_ASSERT(!rli->curr_group_seen_begin);
 
-        c_rli->curr_group_is_parallel= FALSE;   // Coord will destruct events
+        c_rli->curr_group_is_parallel= FALSE;
         (void) wait_for_workers_to_finish(rli);
       }
       else
       {
-        c_rli->curr_event_is_not_in_group= TRUE;
-      }
-    }
-    else if (rli->is_mts_recovery())
-    {
-      // recovery
-      bool skip= _bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
-
-      if (ends_group()) // todo: || rli->run_query_in_parallel && ! seen_begin
-      {
-        c_rli->mts_recovery_index++;
-        if (--c_rli->mts_recovery_group_cnt == 0)
+        if (rli->curr_group_is_parallel)
         {
-          c_rli->recovery_parallel_workers= c_rli->slave_parallel_workers;
-          c_rli->mts_recovery_index= 0;
+          c_rli->curr_group_split= TRUE;
+          c_rli->curr_group_is_parallel= FALSE;
         }
       }
-      if (skip)
-        DBUG_RETURN(0);
     }
     DBUG_RETURN(do_apply_event(rli));
   }
@@ -2837,10 +2848,6 @@ int Log_event::apply_event(Relay_log_inf
               rli->last_assigned_worker);
 
   /* 
-     Work-around:s for B, T,..., Q case and ROWS_QUERY_LOG_EVENT
-     A worker has been assigned but it needs sequential environment.
-
-     Todo: support Query parallelization.
      Todo: disassociate Rows_* events from the central rli.
   */
   if (seq_event)
@@ -2857,9 +2864,10 @@ int Log_event::apply_event(Relay_log_inf
         my_sleep(10);
       }
       c_rli->rows_query_ev= (Rows_query_log_event*) this;
-    }
-  }
+     }
+   }
 
+  // getting Worker's id
   if ((!(w= get_slave_worker_id(rli)) ||
        DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
     DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
@@ -2987,25 +2995,29 @@ int slave_worker_exec_job(Slave_worker *
   {
     if (ev->contains_partition_info())
     {
+      List_iterator<char> it(*ev->mts_get_dbs());
       DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
-      uint i;
-      char key[NAME_LEN + 2];
-      bool found= FALSE;
-      const char *dbname= ev->get_db();
-      uchar dblength= (uint) strlen(dbname);
       
-      for (i= 0; i < ep->elements && !found; i++)
+      while (it++)
       {
-        get_dynamic(ep, (uchar*) key, i);
-        found=
-          (key[0] == dblength) &&
-          (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
-      }
-      if (!found)
-      {
-        key[0]= dblength;
-        memcpy(key + 1, dbname, dblength + 1);
-        insert_dynamic(ep, (uchar*) key);
+        bool found= FALSE;
+        char key[NAME_LEN + 2];
+        const char *dbname= *it.ref();
+        uchar dblength= (uint) strlen(dbname);
+
+        for (uint i= 0; i < ep->elements && !found; i++)
+        {
+          get_dynamic(ep, (uchar*) key, i);
+          found=
+            (key[0] == dblength) &&
+            (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+        }
+        if (!found)
+        {
+          key[0]= dblength;
+          memcpy(key + 1, dbname, dblength + 1);
+          insert_dynamic(ep, (uchar*) key);
+        }
       }
     }
   }
@@ -3089,6 +3101,8 @@ err:
   if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
     delete ev;  // after ev->update_pos() event is garbage
 
+  free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
+  
   DBUG_RETURN(error);
 }
 
@@ -3354,6 +3368,34 @@ bool Query_log_event::write(IO_CACHE* fi
       start+= host.length;
     }
   }
+
+  if (thd && thd->get_binlog_updated_db_names() != NULL)
+  {
+    uchar dbs;
+    *start++= Q_UPDATED_DB_NAMES;
+    dbs= *start++= thd->get_binlog_updated_db_names()->elements;
+
+    DBUG_ASSERT(dbs != 0);
+    /* 
+       MAX_DBS_IN_QUERY_MTS + 1 is special no db:s is written
+       and event requires the sequential applying on slave.
+    */
+    if (dbs != MAX_DBS_IN_QUERY_MTS + 1)
+    {
+      List_iterator_fast<char> it(*thd->get_binlog_updated_db_names());
+      char *db_name;
+
+      DBUG_ASSERT(dbs <= MAX_DBS_IN_QUERY_MTS);
+
+      while ((db_name= it++))
+      {
+        strcpy((char*) start, db_name);
+        start += strlen(db_name) + 1;
+      }
+    }
+    thd->clear_binlog_updated_db_names();
+  }
+
   /*
     NOTE: When adding new status vars, please don't forget to update
     the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function
@@ -3437,7 +3479,7 @@ Query_log_event::Query_log_event(THD* th
    lc_time_names_number(thd_arg->variables.lc_time_names->number),
    charset_database_number(0),
    table_map_for_update((ulonglong)thd_arg->table_map_for_update),
-   master_data_written(0)
+   master_data_written(0), mts_updated_dbs(0)
 {
   time_t end_time;
 
@@ -3638,6 +3680,7 @@ code_name(int code)
   case Q_CHARSET_DATABASE_CODE: return "Q_CHARSET_DATABASE_CODE";
   case Q_TABLE_MAP_FOR_UPDATE_CODE: return "Q_TABLE_MAP_FOR_UPDATE_CODE";
   case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
+  case Q_UPDATED_DB_NAMES: return "Q_UPDATED_DB_NAMES";
   }
   sprintf(buf, "CODE#%d", code);
   return buf;
@@ -3675,7 +3718,8 @@ Query_log_event::Query_log_event(const c
    flags2_inited(0), sql_mode_inited(0), charset_inited(0),
    auto_increment_increment(1), auto_increment_offset(1),
    time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
-   table_map_for_update(0), master_data_written(0)
+   table_map_for_update(0), master_data_written(0),
+   mts_updated_dbs(MAX_DBS_IN_QUERY_MTS + 1)
 {
   ulong data_len;
   uint32 tmp;
@@ -3683,6 +3727,7 @@ Query_log_event::Query_log_event(const c
   Log_event::Byte *start;
   const Log_event::Byte *end;
   bool catalog_nz= 1;
+
   DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
 
   memset(&user, 0, sizeof(user));
@@ -3854,6 +3899,23 @@ Query_log_event::Query_log_event(const c
       CHECK_SPACE(pos, end, host.length);
       host.str= (char *)pos;
       pos+= host.length;
+      break;
+    }
+    case Q_UPDATED_DB_NAMES:
+    {
+      CHECK_SPACE(pos, end, 1);
+      mts_updated_dbs= *pos++;
+      if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
+        break;
+
+      DBUG_ASSERT(mts_updated_dbs != 0);
+
+      for (uchar i= 0; i < mts_updated_dbs; i++)
+      {
+        strcpy(mts_updated_db_names[i], (char*) pos);
+        pos+= 1 + strlen((const char*) pos);
+      }
+      break;
     }
     default:
       /* That's why you must write status vars in growing order of code */

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-01-11 23:01:02 +0000
+++ b/sql/log_event.h	2011-02-17 19:56:48 +0000
@@ -258,6 +258,14 @@ struct sql_ex_info
 #define INCIDENT_HEADER_LEN    2
 #define HEARTBEAT_HEADER_LEN   0
 #define IGNORABLE_HEADER_LEN   0
+
+/**
+   The maximum value for @@global.mts_max_updated_dbs server variable.
+   When the actual number of db:s exceeds  @@global.mts_max_updated_dbs, the max + 1
+   is put into the mts_updated_dbs status.
+*/
+#define MAX_DBS_IN_QUERY_MTS 16
+
 /* 
   Max number of possible extra bytes in a replication event compared to a
   packet (i.e. a query) sent from client to master;
@@ -273,6 +281,8 @@ struct sql_ex_info
                                    1 + 2          /* type, charset_database_number */ + \
                                    1 + 8          /* type, table_map_for_update */ + \
                                    1 + 4          /* type, master_data_written */ + \
+                                                  /* type, db_1, db_2, ... */  \
+                                   1 + (MAX_DBS_IN_QUERY_MTS * (1 + NAME_LEN)) + \
                                    1 + 16 + 1 + 60/* type, user_len, user, host_len, host */)
 #define MAX_LOG_EVENT_HEADER   ( /* in order of Query_log_event::write */ \
   LOG_EVENT_HEADER_LEN + /* write_header */ \
@@ -344,6 +354,12 @@ struct sql_ex_info
 
 #define Q_INVOKER 11
 
+/*
+  Number of the updated db:s and their names to be propagated to the slave
+  in order to facilitate the parallel applying of the Query events
+*/
+#define Q_UPDATED_DB_NAMES 12
+
 /* Intvar event post-header */
 
 /* Intvar event data */
@@ -1069,6 +1085,20 @@ public:
   {
     return thd ? thd->db : 0;
   }
+
+  /*
+    The method returns a list of updated by the event databases.
+    Other than in the case of Query-log-event the list is just one item.
+  */
+  virtual List<char>* mts_get_dbs()
+  {
+    List<char> *res= new List<char>;
+    res->push_back(strdup(get_db()));
+    return res;
+  }
+
+  virtual uchar mts_number_dbs() { return 1; }
+
 #else
   Log_event() : temp_buf(0) {}
     /* avoid having to link mysqlbinlog against libpthread */
@@ -1190,30 +1220,27 @@ public:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
 public:
 
+  virtual uchar mts_number_of_updated_dbs() { return 1; }
+
   /**
      MST: to execute serially due to technical or conceptual limitation
+     
+     TODO: add version of the server check to fall back to seq in the OM case.
 
      @return TRUE if despite permanent parallel execution mode an event
                   needs applying in a real isolation that is sequentially.
   */
-  bool only_sequential_exec(bool query_in_parallel, bool group_term_in_parallel)
+  bool mts_sequential_exec()
   {
     return
-       /* 
+      /* 
          the 4 types below are limitly parallel-supported (the default 
          session db not the actual db).
-         Decision on BEGIN is deferred till the following event.
-         Decision on Commit or Xid is forced by the one for BEGIN.
+         Decision on BEGIN, COMMIT, Xid is the parallel.
       */
-      
-      (!query_in_parallel &&
-       ((get_type_code() == QUERY_EVENT
-         && !starts_group() && !ends_group())    ||
-        get_type_code() == INTVAR_EVENT          ||
-        get_type_code() == USER_VAR_EVENT        ||
-        get_type_code() == RAND_EVENT))          ||
-
-      (!group_term_in_parallel && ends_group())  ||
+      (get_type_code() == QUERY_EVENT &&
+       !starts_group() && !ends_group() &&
+       (mts_number_of_updated_dbs() ==  MAX_DBS_IN_QUERY_MTS + 1)) ||
 
       get_type_code() == START_EVENT_V3          ||
       get_type_code() == STOP_EVENT              ||
@@ -1232,7 +1259,7 @@ public:
       get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
       get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
 
-      get_type_code() == ROWS_QUERY_LOG_EVENT    || /* todo: make parallel */
+      get_type_code() == ROWS_QUERY_LOG_EVENT    || /* TODO: make parallel */
 
       get_type_code() == INCIDENT_EVENT;
   }
@@ -1243,7 +1270,7 @@ public:
      @return TRUE  if that's the case,
              FALSE otherwise.
   */
-  bool parallel_exec_by_coordinator(ulong slave_server_id)
+  bool mts_async_exec_by_coordinator(ulong slave_server_id)
   {
     return
       (get_type_code() == FORMAT_DESCRIPTION_EVENT ||
@@ -1851,12 +1878,38 @@ public:
     Q_MASTER_DATA_WRITTEN_CODE to the slave's server binlog.
   */
   uint32 master_data_written;
+  /*
+    number of updated db:s by the query and their names. This info
+    is requested by both Coordinator and Worker.
+  */
+  uchar mts_updated_dbs;
+  char mts_updated_db_names[MAX_DBS_IN_QUERY_MTS][NAME_LEN];
 
 #ifdef MYSQL_SERVER
 
   Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
                   bool using_trans, bool direct, bool suppress_use, int error);
   const char* get_db() { return db; }
+
+  /**
+     Returns a default db in case of over-MAX_DBS_IN_QUERY_MTS actual db:s
+  */
+  virtual List<char>* mts_get_dbs()
+  {
+    //compile_time_assert(mts_updated_dbs <= MAX_DBS_IN_QUERY_MTS + 1);
+    List<char> *res= new List<char>;
+    if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
+      res->push_back((char*) get_db());
+    else
+      for (uchar i= 0; i < mts_updated_dbs; i++)
+        res->push_back(mts_updated_db_names[i]);
+    return res;
+  }
+
+  virtual uchar mts_number_dbs() { return mts_updated_dbs; }
+
+  virtual uchar mts_number_of_updated_dbs() { return mts_updated_dbs; }
+
 #ifdef HAVE_REPLICATION
   void pack_info(Protocol* protocol);
 #endif /* HAVE_REPLICATION */
@@ -4311,6 +4364,12 @@ bool event_checksum_test(uchar *buf, ulo
 uint8 get_checksum_alg(const char* buf, ulong len);
 extern TYPELIB binlog_checksum_typelib;
 
+// MTS temp table support needed by sql_base.cc
+THD* mts_get_coordinator_thd();
+THD* mts_get_worker_thd();
+mysql_mutex_t* mts_get_temp_table_mutex();
+bool mts_is_worker(THD *thd);
+
 /**
   @} (end of group Replication)
 */

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2011-01-11 23:01:02 +0000
+++ b/sql/mysqld.cc	2011-02-17 19:56:48 +0000
@@ -468,11 +468,11 @@ ulonglong slave_type_conversions_options
 ulong opt_mts_slave_parallel_workers;
 ulong opt_mts_slave_worker_queue_len_max;
 my_bool opt_mts_slave_local_timestamp;
-my_bool opt_mts_slave_run_query_in_parallel;
 ulong opt_mts_partition_hash_soft_max;
 ulonglong opt_mts_pending_jobs_size_max;
 ulong opt_mts_coordinator_basic_nap;
 ulong opt_mts_worker_underrun_level;
+ulong opt_mts_master_updated_dbs_max;
 ulong thread_cache_size=0;
 ulong binlog_cache_size=0;
 ulonglong  max_binlog_cache_size=0;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-12-27 18:54:41 +0000
+++ b/sql/mysqld.h	2011-02-17 19:56:48 +0000
@@ -182,11 +182,11 @@ extern uint  slave_net_timeout;
 extern ulong opt_mts_slave_parallel_workers;
 extern ulong opt_mts_slave_worker_queue_len_max;
 extern my_bool opt_mts_slave_local_timestamp;
-extern my_bool opt_mts_slave_run_query_in_parallel;
 extern ulong opt_mts_partition_hash_soft_max;
 extern ulonglong opt_mts_pending_jobs_size_max;
 extern ulong opt_mts_coordinator_basic_nap;
 extern ulong opt_mts_worker_underrun_level;
+extern ulong opt_mts_master_updated_dbs_max;
 
 extern uint max_user_connections;
 extern ulong what_to_log,flush_time;

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-27 18:54:41 +0000
+++ b/sql/rpl_rli.cc	2011-02-17 19:56:48 +0000
@@ -52,6 +52,7 @@ static PSI_cond_info *worker_conds= NULL
 
 PSI_mutex_key *key_mutex_slave_parallel_worker= NULL;
 PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
+PSI_mutex_key key_mutex_mts_temp_tables_lock;
 
 PSI_cond_key *key_cond_slave_parallel_worker= NULL;
 PSI_cond_key key_cond_slave_parallel_pend_jobs;
@@ -72,7 +73,7 @@ Relay_log_info::Relay_log_info(bool is_s
    this_worker(NULL), slave_parallel_workers(0),
    recovery_parallel_workers(0),
    checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0),
-   mts_recovery_index(0), curr_event_is_not_in_group(0),
+   mts_recovery_index(0),
    sql_delay(0), sql_delay_end(0), m_flags(0)
 {
   DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -130,6 +131,8 @@ void Relay_log_info::init_workers(ulong 
   mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
                    MY_MUTEX_INIT_FAST);
   mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
+  mysql_mutex_init(key_mutex_mts_temp_tables_lock, &mts_temp_tables_lock,
+                   MY_MUTEX_INIT_FAST);
   my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
 }
 
@@ -140,6 +143,7 @@ void Relay_log_info::deinit_workers()
 {
   mysql_mutex_destroy(&pending_jobs_lock);
   mysql_cond_destroy(&pending_jobs_cond);
+  mysql_mutex_destroy(&mts_temp_tables_lock);
 
   if (!this_worker)
     delete_dynamic(&workers);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-01-11 22:54:12 +0000
+++ b/sql/rpl_rli.h	2011-02-17 19:56:48 +0000
@@ -448,7 +448,6 @@ public:
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
-  bool run_query_in_parallel;   // Query's default db not the actual db as part
   bool curr_group_isolated;     // Trans is exec:d by Worker but in exclusive env
   volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
   volatile long mts_wqs_overrun;   // W to incr and decr
@@ -459,7 +458,8 @@ public:
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
  
-  bool curr_group_is_parallel; // a mark for Coord to indicate on T-event of the curr group at delete
+  bool curr_group_is_parallel; // an event to process by Coordinator
+  bool curr_group_split;       // an event split the current group forcing C to exec it
   ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
   ulong slave_parallel_workers;     // the one slave session time number of workers
   ulong recovery_parallel_workers; // number of workers while recovering.
@@ -475,7 +475,15 @@ public:
   MY_BITMAP recovery_groups;  // bitmap used during recovery.
   ulong mts_recovery_group_cnt; // number of groups to execute at recovery
   ulong mts_recovery_index;     // running index of recoverable groups
-  bool curr_event_is_not_in_group; // a special case of group split apart by FD
+  /*
+    temporary tables are held by Coordinator though are created and dropped
+    explicilty by Workers. The following lock has to be taken by either party
+    in order to conduct any operation in the temp tables placeholder, incl.
+    find, drop, create, open.
+  */
+  mysql_mutex_t mts_temp_tables_lock;
+
+
   /* most of allocation in the coordinator rli is there */
   void init_workers(ulong);
 
@@ -724,4 +732,5 @@ private:
 };
 
 bool mysql_show_relaylog_events(THD* thd);
+
 #endif /* RPL_RLI_H */

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-01-11 22:54:12 +0000
+++ b/sql/rpl_slave.cc	2011-02-17 19:56:48 +0000
@@ -143,7 +143,7 @@ failed read"
 };
 
 
-typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_WORKER, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
 
 static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
@@ -2847,17 +2847,8 @@ int apply_event_and_update_pos(Log_event
       See sql/rpl_rli.h for further details.
     */
     int error= 0;
-    if (skip_event || 
-        (!rli->is_parallel_exec() ||
-         (!rli->curr_group_is_parallel)))
-    {
-      DBUG_ASSERT(skip_event || !rli->is_parallel_exec() ||
-                  (!rli->curr_group_is_parallel ||
-                   rli->curr_event_is_not_in_group) ||
-                  (ev->only_sequential_exec(rli->run_query_in_parallel,
-                                            (rli->curr_group_seen_begin ||
-                                             rli->last_assigned_worker != NULL))
-                   && !rli->curr_group_seen_begin));
+    if (skip_event || !rli->is_parallel_exec() || !rli->curr_group_is_parallel)
+    {
 #ifndef DBUG_OFF
       /*
         This only prints information to the debug trace.
@@ -3056,40 +3047,37 @@ static int exec_relay_log_event(THD* thd
     */
     // if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
     {
-      if ((!rli->is_parallel_exec() ||
-           !rli->curr_group_is_parallel || rli->curr_event_is_not_in_group)
-          && ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
-      {
-        DBUG_ASSERT(!rli->is_parallel_exec() 
-                    ||
-                    (ev->only_sequential_exec(rli->run_query_in_parallel,
-                                              // rli->curr_group_is_parallel
-                                              (rli->curr_group_seen_begin ||
-                                               rli->last_assigned_worker != NULL))
-                     && (!rli->curr_group_seen_begin ||
-                         ev->parallel_exec_by_coordinator(::server_id)))
-                    || (ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT));
-      /* MTS:  Observation/todo.
-
-         ROWS_QUERY_LOG_EVENT could be supported easier if
-         destructing part of handle_rows_query_log_event would be merged
-         with rli->cleanup_context() and the rest move into 
-         ROWS...::do_apply_event
-      */
-        
-        if (!rli->is_parallel_exec())
-          if (thd->variables.binlog_rows_query_log_events)
-            handle_rows_query_log_event(ev, rli);
+      if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
+      {
+        DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
+                    ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
 
-        if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+        if (rli->curr_group_split) // an artifical FD requires some handling
         {
-          DBUG_PRINT("info", ("Deleting the event after it has been executed"));
-          delete ev;
-          ev= NULL;
+          rli->curr_group_is_parallel= TRUE;
+          rli->curr_group_split= FALSE;
+        }
+        if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+        {
+          /* MTS/ TODO.
+
+             ROWS_QUERY_LOG_EVENT could be supported easier if
+             destructing part of handle_rows_query_log_event would be merged
+             with rli->cleanup_context() and the rest move into 
+             ROWS...::do_apply_event
+          */
+          if (!rli->is_parallel_exec())
+            if (thd->variables.binlog_rows_query_log_events)
+              handle_rows_query_log_event(ev, rli);
+          
+          if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+          {
+            DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+            delete ev;
+            ev= NULL;
+          }
         }
       }
-      if (rli->curr_event_is_not_in_group)
-        rli->curr_event_is_not_in_group= FALSE;
     }
 
     /*
@@ -3730,14 +3718,13 @@ pthread_handler_t handle_slave_worker(vo
     sql_print_error("Failed during slave worker initialization");
     goto err;
   }
-
   w->info_thd= thd;
   w->w_rli->info_thd= thd;
 
   thd->thread_stack = (char*)&thd;
   
   pthread_detach_this_thread();
-  if (init_slave_thread(thd, SLAVE_THD_WORKER))
+  if (init_slave_thread(thd, SLAVE_THD_SQL))  // todo: make thd->sys_thr= worker
   {
     // todo make SQL thread killed
     sql_print_error("Failed during slave worker initialization");
@@ -3805,6 +3792,9 @@ err:
   {
     mysql_mutex_lock(&LOCK_thread_count);
     THD_CHECK_SENTRY(thd);
+    // to avoid close_temporary_tables() closing temp tables as those
+    // are Coordinator's burden.
+    thd->system_thread= NON_SYSTEM_THREAD;
     delete thd;
     mysql_mutex_unlock(&LOCK_thread_count);
   }
@@ -3953,7 +3943,6 @@ bool mts_recovery_groups(Relay_log_info 
         DBUG_ASSERT(ev->is_valid());
         DBUG_ASSERT(rli->mts_recovery_group_cnt < rli->checkpoint_group);
 
-        // TODO: relax condition to allow --mts_exp_run_query_in_parallel= 1
         if (ev->starts_group())
           curr_group_seen_begin= TRUE;
         else
@@ -4351,12 +4340,11 @@ int slave_start_workers(Relay_log_info *
   rli->mts_coordinator_basic_nap= ::opt_mts_coordinator_basic_nap;
   rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
   rli->mts_total_groups= 0;
-  rli->curr_group_seen_begin= FALSE; // initial presumtion, will change
-  rli->curr_group_is_parallel= FALSE; // initial presumtion, will change
+  rli->curr_group_seen_begin= FALSE;
+  rli->curr_group_is_parallel= FALSE;
   rli->curr_group_isolated= FALSE;
-  rli->run_query_in_parallel= opt_mts_slave_run_query_in_parallel;
+  rli->curr_group_split= FALSE;
   rli->checkpoint_seqno= 0;
-  rli->curr_event_is_not_in_group= FALSE;
   //rli->worker_bitmap_buf= my_malloc(n/8 + 1,MYF(MY_WME));
 
   for (i= 0; i < n; i++)
@@ -7115,6 +7103,38 @@ err:
   DBUG_RETURN(ret);
 }
 
+
+/* MTS temp table support */
+
+mysql_mutex_t* mts_get_temp_table_mutex()
+{
+  return &active_mi->rli->mts_temp_tables_lock;
+}
+
+THD* mts_get_coordinator_thd()
+{
+  Slave_worker *w;
+  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+    NULL : !(w= active_mi->rli->get_current_worker()) ?
+    NULL : w->c_rli->info_thd;
+}
+
+THD* mts_get_worker_thd()
+{
+  Slave_worker *w;
+  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+    NULL : !(w= active_mi->rli->get_current_worker()) ?
+    NULL : w->w_rli->info_thd;
+}
+
+bool mts_is_worker(THD *thd)
+{
+  return
+    thd->slave_thread &&
+    thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
+    (mts_get_worker_thd() != NULL);
+}
+
 /**
   @} (end of group Replication)
 */

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-01-11 11:45:02 +0000
+++ b/sql/sql_base.cc	2011-02-17 19:56:48 +0000
@@ -58,7 +58,6 @@
 #include <io.h>
 #endif
 
-
 bool
 No_such_table_error_handler::handle_condition(THD *,
                                               uint sql_errno,
@@ -1192,11 +1191,25 @@ bool close_cached_connection_tables(THD 
 
 static void mark_temp_tables_as_free_for_reuse(THD *thd)
 {
-  for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
+#ifndef EMBEDDED_LIBRARY
+  bool mts_slave= mts_is_worker(thd);
+  TABLE **ptr_temporary_tables= mts_slave ?
+    &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
+  if (mts_slave)
+    mysql_mutex_lock(mts_get_temp_table_mutex());
+#else
+  TABLE **ptr_temporary_tables= &thd->temporary_tables;
+#endif
+
+  for (TABLE *table= *ptr_temporary_tables; table ; table=table->next)
   {
     if ((table->query_id == thd->query_id) && ! table->open_by_handler)
       mark_tmp_table_for_reuse(table);
   }
+#ifndef EMBEDDED_LIBRARY
+  if (mts_slave)
+    mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
 }
 
 
@@ -1588,6 +1601,8 @@ bool close_temporary_tables(THD *thd)
   bool was_quote_show= TRUE;
   bool error= 0;
 
+  /* TODO mts: assert if Woker then thd->temporary_tables == NULL */
+
   if (!thd->temporary_tables)
     DBUG_RETURN(FALSE);
 
@@ -2025,16 +2040,29 @@ TABLE *find_temporary_table(THD *thd,
                             const char *table_key,
                             uint table_key_length)
 {
-  for (TABLE *table= thd->temporary_tables; table; table= table->next)
+  TABLE *table= NULL;
+#ifndef EMBEDDED_LIBRARY
+  bool mts_slave= mts_is_worker(thd);
+  TABLE **ptr_temporary_tables= mts_slave ?
+    &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
+  if (mts_slave)
+    mysql_mutex_lock(mts_get_temp_table_mutex());  
+#else
+  TABLE **ptr_temporary_tables= &thd->temporary_tables;
+#endif
+  for (table= *ptr_temporary_tables; table; table= table->next)
   {
     if (table->s->table_cache_key.length == table_key_length &&
         !memcmp(table->s->table_cache_key.str, table_key, table_key_length))
     {
-      return table;
+      break;
     }
   }
-
-  return NULL;
+#ifndef EMBEDDED_LIBRARY
+  if (mts_slave)
+    mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
+  return table;
 }
 
 
@@ -2072,6 +2100,11 @@ TABLE *find_temporary_table(THD *thd,
 int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
 {
   TABLE *table;
+#ifndef EMBEDDED_LIBRARY
+  bool mts_slave= mts_is_worker(thd);
+#endif
+  THD *thd_temp;
+
   DBUG_ENTER("drop_temporary_table");
   DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
                           table_list->db, table_list->table_name));
@@ -2094,7 +2127,26 @@ int drop_temporary_table(THD *thd, TABLE
     unlock the table and remove the table from this list.
   */
   mysql_lock_remove(thd, thd->lock, table);
-  close_temporary_table(thd, table, 1, 1);
+
+#ifndef EMBEDDED_LIBRARY
+  if (mts_slave)
+  {
+    thd_temp= mts_get_coordinator_thd();
+    mysql_mutex_lock(mts_get_temp_table_mutex());
+  }
+  else
+#endif
+  {
+    thd_temp= thd;
+  }
+
+  close_temporary_table(thd_temp, table, 1, 1);
+
+#ifndef EMBEDDED_LIBRARY
+  if (mts_slave)
+     mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
+
   DBUG_RETURN(0);
 }
 
@@ -2125,7 +2177,7 @@ void close_temporary_table(THD *thd, TAB
       passing non-zero value to end_slave via rli->save_temporary_tables
       when no temp tables opened, see an invariant below.
     */
-    thd->temporary_tables= table->next;
+    thd->temporary_tables= table->next; // mts: see drop_temporary_table()
     if (thd->temporary_tables)
       table->next->prev= 0;
   }
@@ -2631,7 +2683,17 @@ bool open_table(THD *thd, TABLE_LIST *ta
   if (table_list->open_type != OT_BASE_ONLY &&
       ! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
   {
-    for (table= thd->temporary_tables; table ; table=table->next)
+#ifndef EMBEDDED_LIBRARY
+    bool mts_slave= mts_is_worker(thd);
+    TABLE **ptr_temporary_tables= mts_slave ?
+      &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
+    if (mts_slave)
+      mysql_mutex_lock(mts_get_temp_table_mutex());
+#else
+    TABLE **ptr_temporary_tables= &thd->temporary_tables;
+#endif
+
+    for (table= *ptr_temporary_tables; table ; table=table->next)
     {
       if (table->s->table_cache_key.length == key_length +
           TMP_TABLE_KEY_EXTRA &&
@@ -2651,14 +2713,26 @@ bool open_table(THD *thd, TABLE_LIST *ta
                       (ulong) table->query_id, (uint) thd->server_id,
                       (ulong) thd->variables.pseudo_thread_id));
 	  my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias);
+#ifndef EMBEDDED_LIBRARY
+          if (mts_slave)
+            mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
 	  DBUG_RETURN(TRUE);
 	}
 	table->query_id= thd->query_id;
 	thd->thread_specific_used= TRUE;
         DBUG_PRINT("info",("Using temporary table"));
+#ifndef EMBEDDED_LIBRARY
+        if (mts_slave)
+          mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
         goto reset;
       }
     }
+#ifndef EMBEDDED_LIBRARY
+    if (mts_slave)
+      mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
   }
 
   if (table_list->open_type == OT_TEMPORARY_ONLY ||
@@ -5851,14 +5925,28 @@ TABLE *open_table_uncached(THD *thd, con
 
   if (add_to_temporary_tables_list)
   {
+#ifndef EMBEDDED_LIBRARY
+    TABLE **ptr_temporary_tables;
+    bool mts_slave= mts_is_worker(thd);
+    ptr_temporary_tables= mts_slave? 
+      &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
+    if (mts_slave)
+      mysql_mutex_lock(mts_get_temp_table_mutex());
+#else
+    TABLE **ptr_temporary_tables= &thd->temporary_tables;
+#endif
     /* growing temp list at the head */
-    tmp_table->next= thd->temporary_tables;
+    tmp_table->next= *ptr_temporary_tables;
     if (tmp_table->next)
       tmp_table->next->prev= tmp_table;
-    thd->temporary_tables= tmp_table;
-    thd->temporary_tables->prev= 0;
+    *ptr_temporary_tables= tmp_table;
+    (*ptr_temporary_tables)->prev= 0;
     if (thd->slave_thread)
       slave_open_temp_tables++;
+#ifndef EMBEDDED_LIBRARY
+    if (mts_slave)
+       mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
   }
   tmp_table->pos_in_table_list= 0;
   DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2010-12-17 16:14:15 +0000
+++ b/sql/sql_class.cc	2011-02-17 19:56:48 +0000
@@ -503,6 +503,7 @@ THD::THD()
    user_time(0), in_sub_stmt(0),
    binlog_unsafe_warning_flags(0),
    binlog_table_maps(0),
+   binlog_updated_db_names(NULL),
    table_map_for_update(0),
    arg_of_last_insert_id_function(FALSE),
    first_successful_insert_id_in_prev_stmt(0),
@@ -1397,6 +1398,7 @@ void THD::cleanup_after_query()
     stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
     auto_inc_intervals_in_cur_stmt_for_binlog.empty();
     rand_used= 0;
+    binlog_updated_db_names= NULL;
   }
   if (first_successful_insert_id_in_cur_stmt > 0)
   {
@@ -3394,6 +3396,7 @@ void THD::reset_sub_statement_state(Sub_
     first_successful_insert_id_in_prev_stmt;
   backup->first_successful_insert_id_in_cur_stmt= 
     first_successful_insert_id_in_cur_stmt;
+  //backup->binlog_updated_db_names= binlog_updated_db_names;
 
   if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
       !is_current_stmt_binlog_format_row())
@@ -3414,6 +3417,7 @@ void THD::reset_sub_statement_state(Sub_
   cuted_fields= 0;
   transaction.savepoints= 0;
   first_successful_insert_id_in_cur_stmt= 0;
+  //binlog_updated_db_names= NULL;
 }
 
 
@@ -3476,6 +3480,8 @@ void THD::restore_sub_statement_state(Su
   */
   examined_row_count+= backup->examined_row_count;
   cuted_fields+=       backup->cuted_fields;
+  //if (binlog_updated_db_names)
+  //  binlog_updated_db_names->concat(backup->binlog_updated_db_names);
   DBUG_VOID_RETURN;
 }
 

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-01-11 23:01:02 +0000
+++ b/sql/sql_class.h	2011-02-17 19:56:48 +0000
@@ -1184,6 +1184,7 @@ public:
   bool last_insert_id_used;
   SAVEPOINT *savepoints;
   enum enum_check_fields count_cuted_fields;
+  List<char> *binlog_updated_db_names;
 };
 
 
@@ -1721,6 +1722,11 @@ private:
     transaction cache.
   */
   uint binlog_table_maps;
+  /*
+    String space-separated db names listing to be updated by the query databases
+  */
+  List<char> *binlog_updated_db_names;
+
 public:
   void issue_unsafe_warnings();
 
@@ -1730,6 +1736,11 @@ public:
   void clear_binlog_table_maps() {
     binlog_table_maps= 0;
   }
+  List<char> * get_binlog_updated_db_names() {
+    return binlog_updated_db_names;
+  }
+  void clear_binlog_updated_db_names() { binlog_updated_db_names= NULL; }
+
 #endif /* MYSQL_CLIENT */
 
 public:

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2011-01-11 23:01:02 +0000
+++ b/sql/sys_vars.cc	2011-02-17 19:56:48 +0000
@@ -3198,12 +3198,13 @@ static Sys_var_mybool Sys_slave_local_ti
        "time value to implicitly affected timestamp columms. Otherwise (default) "
        "it installs prescribed by the master value",
        GLOBAL_VAR(opt_mts_slave_local_timestamp), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
-static Sys_var_mybool Sys_slave_run_query_in_parallel(
-       "mts_exp_slave_run_query_in_parallel",
-       "The default not an actual database name is used as partition info "
-       "for parallel execution of Query_log_event ",
-       GLOBAL_VAR(opt_mts_slave_run_query_in_parallel), CMD_LINE(OPT_ARG),
-       DEFAULT(FALSE));
+static Sys_var_ulong Sys_master_updated_dbs_max(
+       "mts_master_updated_dbs_max",
+       "The maximum number of databases that a query log event can contain in its header "
+       "in order to faciliate the parallel applying on the slave.",
+       GLOBAL_VAR(opt_mts_master_updated_dbs_max), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(0, MAX_DBS_IN_QUERY_MTS),
+       DEFAULT(16), BLOCK_SIZE(1));
 static Sys_var_ulong Sys_mts_partition_hash_soft_max(
        "mts_partition_hash_soft_max",
        "Number of records in the mts partition hash below which "


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110217195648-qr2w5j4q802celrj.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3270) WL#5754Andrei Elkin17 Feb