List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 24 2011 2:31pm
Subject:bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3275 to 3279)
WL#5569 WL#5754
View as plain text  
 3279 Andrei Elkin	2011-05-24
      WL#5569 MTS
      WL#5754 Query parallel appying
      
      Changing implementation of temporary tables support in MTS.
      Cleanup, fixing few todo:s and few potential issues found.
     @ mysql-test/suite/rpl/t/rpl_parallel.test
        commetting failure in /include/rpl_end.inc (todo: explore and fix).
     @ mysql-test/suite/rpl/t/rpl_parallel_fallback.test
        The only Rows_query_log_event case of testing is no longer valid
        because the event is parallizable now.
        The test is removed.
     @ mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
        Adding comments about possible issue of somewhat loose behaviour
        of sync_slave_with_master in parallel mode.
        TODO: investigate and fix.
     @ sql/binlog.cc
        Renaming only.
     @ sql/events.cc
        Renaming only.
     @ sql/log_event.cc
        Fixing found issues, cleanup and temp tables support.
        
        The assigned partition as represented by an entry is passed through the assigned Worker.
        via Log_event::get_slave_worker().
        The method attaches the entry to the Query event which do_exec_event()
        calls new attach and detach methods that grabs temp tables list on each involved db
        and returns possibly updated lists back to APH at the end of Query event applying.
     @ sql/log_event.h
        Mostly renaming.
     @ sql/rpl_rli.cc
        relocating mts_get_coordinator_thd() definition.
     @ sql/rpl_rli.h
        re-defining mts_is_worker() through SYSTEM_THREAD_SLAVE_WORKER.
     @ sql/rpl_rli_pdb.cc
        Changes mostly due to temp table support.
        Coordinator disaccosiates temporary tables of a being schedule db-partition 
        from its thd and attaches the list to APH's entry.
        In the following the Worker finds the list and adopts it to return possibly updated
        version back to the entry at the end of the query.
        The list resides most of time in either APH's passive (usage == 0) entry,
        or in Worker's thd->temporary_tables.
        It can be relocated back to the Coordinator's repository via wait_for_workers_to_finish()
        that is called in case an event requires the sequential execution.
        
        Few auxiliary functions are defined dealing with migration and merging temp tables.
     @ sql/rpl_rli_pdb.h
        Adding TABLE* pointer to list of temp tables in entry of Assigned Partition Hash.
        The entry pointer carries temp tables from C to W and backward.
        Changes in few function signitures motivated by temp table support.
        Adding auxiliary funcs to help with temp tables manipulations.
     @ sql/rpl_slave.cc
        renaming, cleanup and improving Worker identification.
     @ sql/rpl_slave.h
        cleanup.
     @ sql/rpl_utility.h
        cleanup.
     @ sql/sql_base.cc
        removing a hack to access temp tables in MTS.
     @ sql/sql_class.cc
        Renaming only.
     @ sql/sql_class.h
        Renaming only.
     @ sql/sql_rename.cc
        Renaming only.
     @ sql/sql_table.cc
        Renaming only.
     @ sql/sql_view.cc
        Renaming only.

    removed:
      mysql-test/suite/rpl/t/rpl_parallel_fallback.test
    modified:
      mysql-test/suite/rpl/t/rpl_parallel.test
      mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
      sql/binlog.cc
      sql/events.cc
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/rpl_slave.h
      sql/rpl_utility.h
      sql/sql_base.cc
      sql/sql_class.cc
      sql/sql_class.h
      sql/sql_rename.cc
      sql/sql_table.cc
      sql/sql_view.cc
 3278 Andrei Elkin	2011-05-19
      wl#5569 MTS
      
      Support for ROWS_QUERY_LOG_EVENT is added.
      It required refactoring of its handling in the canonical sequential mode.
      The event life suggests its behavior similar to objects associated with Table_map,
      in particural, its destoying to occur at the end-of-statement time.
      
      Tested against existing ROWS_QUERY_LOG_EVENT feature tests incl
      rpl_row_ignorable_event in both sequential and parallel mode.
     @ sql/log_event.cc
        cleanup of MTS code;
        relocating handle_rows_query_log_event() logics into 
        a. do_apply_event() and
        b. rli->cleanup_context().
     @ sql/log_event.h
        cleanup of MTS code;
     @ sql/rpl_rli.cc
        Deploying ROWS_QUERY_LOG_EVENT destruction in context_cleanup().
     @ sql/rpl_rli.h
        cleanup of MTS code;
     @ sql/rpl_slave.cc
        cleanup of MTS code;
     @ sql/sql_binlog.cc
        Simplifying ROWS_QUERY_LOG_EVENT handling in the case
        of BINLOG pseudo-query.

    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
      sql/sql_binlog.cc
 3277 Andrei Elkin	2011-05-16
      wl#5569 MTS
      
      Simplifying Coordinator-Worker interfaces. In essence after this patch Worker execute
        events in its private context (class Slave_worker :public Relay_log_info).
        The only exception is Query referring to temporary table. The temp:s are maintained 
        in the Coordinator's "central" rli;
      removing some dead code;
      performing a lot of cleanup.
      
      There are few todo items incl:
      
      1. To implement several todo:s scattered across MTS' code and tests (e.g to restore 
         protected for few members of RLI of rpl_rli.h);
      2. to cover Rows_query_log_event that currently can cause hanging (e.g rpl_parallel_fallback)
      3. To sort out names of classes based on Rpl_info, possibly remove  Rpl_info_worker
     @ mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
        The test as most of rpl_parallel* bunch can't yet stand `include/rpl_end.inc'.
     @ sql/log_event.cc
        Defining the default Log_event::do_apply_event_worker() that simply executes
          canonical do_apply_event() however supplying Slave_worker intance reference
          that is critical in order to execute different rli->methods(), e.g `report'.
        Xid_log_event::do_apply_event_worker() runs the Worker version of Xid commit;
        simplifying Rows event parallel applying to remove or elaborate some host of the early prototype code incl. rli->get_tables_to_lock() and related logics;
     @ sql/log_event.h
        Adding virtual int do_apply_event_worker() to Log_event and specializing
        it for Xid class;
     @ sql/rpl_reporting.cc
        Spliting report() into two methods in order to make possible
        to call the functional part of the two  with va_list as an arg 
        be called from Slave_worker class.
     @ sql/rpl_reporting.h
        New  va_list version of report method is declared.
     @ sql/rpl_rli.cc
        removing early prototype time support to Rows-event parallel execution.
        The new scheme of applying is almost equivalent to the standard sequential algorith
        thanks to Slave_worker :public Relay_log_info inheritence implementation.
     @ sql/rpl_rli.h
        Removing unnecessary interfaces;
        TODO: restore `protected' for few members.
     @ sql/rpl_rli_pdb.cc
        Some cleanup and 
        defining Slave_worker::report() to eventially call the Coordinator's rli->report() and exploit
        a fact that the latter was designed for concurrent use.
     @ sql/rpl_rli_pdb.h
        Changing base class for Slave_worker to make it behaving
        as Relay_log_info when needed;
        Removing some dead code;
        Adding report() methods to run it in do_apply_event().
     @ sql/rpl_slave.cc
        Removed UNTIL todo as it's actually not supported with a warning;
        Removed a todo for cleanup of error-out statement format transaction
          because  w->cleanup_context() impelements it indeed;
        Cleanup or transition from w->w_rli (of Relay_log_info) to w (of Slave_worker);
        Adding forgotten unlock_mutex;
        Simplifying definitions of few func:s (mts_is_worker() etc);

    modified:
      mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_reporting.cc
      sql/rpl_reporting.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
 3276 Andrei Elkin	2011-05-06
      wl#5569 MTS
      
      improving benchmarking test.

    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      mysql-test/suite/rpl/t/rpl_parallel-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel.test
 3275 Andrei Elkin	2011-04-06
      wl#5569 MTS
      
      Statistics for Workers and Coordinator incl waiting times, sleeping is reported
      now into the error log as slave stopping time.
     @ sql/log_event.cc
        statistics addded.
     @ sql/rpl_rli.h
        statistics added.
     @ sql/rpl_slave.cc
        print-out mts statistics into the error log at stopping the slave.

    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      mysql-test/suite/rpl/r/rpl_sequential.result
      sql/log_event.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test	2011-04-02 11:32:02 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test	2011-05-06 18:33:32 +0000
@@ -6,11 +6,13 @@
 # load volume parameter
 #
 
-let $iter= 128;
+let $iter= 16;
 let $tables= 4;
-let $wk_queries= 16;
-let $nk_queries= 0;
-let $pre_inserted_rows= 400;
+let $wk_i_queries= 4;
+let $wk_m_queries= 0;
+let $nk_i_queries= 0;
+let $nk_m_queries= 0;
+let $pre_inserted_rows= 200;
 
 connection slave;
 
@@ -83,6 +85,8 @@ while($i)
   {
    eval insert into test$i.ti_wk_$m values(null,  $i, uuid());
    eval insert into test$i.ti_nk_$m values(null,  $i, uuid());
+   eval insert into test$i.tm_wk_$m values(null,  $i, uuid());
+   eval insert into test$i.tm_nk_$m values(null,  $i, uuid());
 
    dec $k;
   }
@@ -163,19 +167,44 @@ while ($iter)
 	begin;
    	   ###eval insert into tm_nk values($iter, $i1, repeat('a', round(rand()*10)));
 
-    	   let $q= $wk_queries;
+    	   let $q= $wk_m_queries;
 	   while ($q)
 	   {
 	       let $m= `select 1 + floor(rand() * $tables)`;
+	       eval update      test$i.tm_wk_$m set c= uuid();
+	       eval insert into test$i.tm_wk_$m values(null,  $i, uuid());
+
+	       dec $q;
+	   }
+
+    	   let $q= $wk_i_queries;
+	   while ($q)
+	   {
+	       let $m= `select 1 + floor(rand() * $tables)`;
+	       eval update      test$i.ti_wk_$m set c= uuid();
+	       
 	       eval insert into test$i.ti_wk_$m values(null,  $i, uuid());
 
 	       dec $q;
 	   }
+            
+	   # NK
+
+    	   let $q= $nk_m_queries;
+	   while ($q)
+	   {
+	       let $m= `select 1 + floor(rand() * $tables)`;
+	       eval update test$i.tm_nk_$m set c= uuid();
+	       eval insert into test$i.tm_nk_$m values(null,  $i, uuid());
+
+	       dec $q;
+	   }
 
-    	   let $q= $nk_queries;
+    	   let $q= $nk_i_queries;
 	   while ($q)
 	   {
 	       let $m= `select 1 + floor(rand() * $tables)`;
+	       eval update test$i.ti_nk_$m set c= uuid();
 	       eval insert into test$i.ti_nk_$m values(null,  $i, uuid());
 
 	       dec $q;

=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2011-04-06 12:51:58 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2011-05-06 18:33:32 +0000
@@ -6,7 +6,7 @@
 # load volume parameter
 #
 
-let $iter = 250;
+let $iter = 50;
 
 # windows run on PB2 is too slow to time out
 disable_query_log;

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel-slave.opt	2011-04-05 16:26:37 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel-slave.opt	2011-05-06 18:33:32 +0000
@@ -1 +1 @@
---mts-slave-parallel-workers=4 --log-warnings=0 --slave-transaction-retries=0
+ --log-warnings=0 --slave-transaction-retries=0

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	2011-04-03 10:07:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2011-05-24 14:29:35 +0000
@@ -33,9 +33,14 @@
 # In the end there will be mysql-test/delta.{parallel,sequential}.log files.
 #
 
+let $rpl_skip_reset_master_and_slave= 1;
 --source include/master-slave.inc
 
 connection master;
 source extra/rpl_tests/rpl_parallel_load.test;
 
---source include/rpl_end.inc
+# TODO: sort out 
+# mysqltest: In included file "./include/rpl_end.inc": At line 70: Error running query 'SHOW SLAVE STATUS': 2006 MySQL server has gone away
+# --source include/rpl_end.inc
+
+--echo include/rpl_end.inc

=== removed file 'mysql-test/suite/rpl/t/rpl_parallel_fallback.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_fallback.test	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_fallback.test	1970-01-01 00:00:00 +0000
@@ -1,94 +0,0 @@
-#
-# WL#5569 MTS
-#
-# The test lists cases of transparent fallback to the sequential execution and
-# verifies it correctness.
-# Notice, the Query-log-event fallback is largely tested by rpl_parallel.
-#
-
---source include/master-slave.inc
---source include/have_binlog_format_mixed.inc
-
-let $workers= 4;
-
-connection slave;
-
-# restart in Parallel
-source include/stop_slave.inc;
-set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
-eval set @@global.mts_slave_parallel_workers= $workers;
-source include/start_slave.inc;
-
-connection master;
-
-set @@session.binlog_format= row;
-create database d0;
-create table d0.t1 (a int auto_increment primary key) engine=innodb;
-
-connection master1;
-
-set @@session.binlog_format= row;
-create database d1;
-create table d1.t1 (a int auto_increment primary key) engine=innodb;
-
-# (TODO: remove during refactoring)
-# Rows_query_log_event case
-#
-
-let $iter= 100;
-let $i= $iter;
-
-connection master;
-set @@session.binlog_rows_query_log_events= 1;
-
-connection master1;
-set @@session.binlog_rows_query_log_events= 0;
-
---disable_query_log
---disable_result_log
-
-while ($i)
-{
-  connection master;
-  begin;
-  insert into d0.t1 values(null);
-  insert into d1.t1 values(null);
-  commit;
-
-  connection master1;
-  begin;
-  insert into d1.t1 values(null);
-  insert into d0.t1 values(null);
-  commit;
-
-  dec $i;
-}
-
---enable_result_log
---enable_query_log
-
-sync_slave_with_master;
-
-# verification
-
-let $diff_tables=master:d0.t1, slave:d0.t1;
-source include/diff_tables.inc;
-
-
-#
-# cleanup
-#
-
-connection master;
-
-drop database d0;
-drop database d1;
-
-
-sync_slave_with_master;
-#connection slave;
-set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
-
---source include/rpl_end.inc
-
-

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	2011-05-24 14:29:35 +0000
@@ -304,6 +304,16 @@ while ($k)
 
 sync_slave_with_master;
 
+# Todo: to impelement a stress test for sync_slave_with_master
+#       specifically in parallel mode.
+#       The following lines are left as extra reminder.
+#
+#--disable_query_log
+#--disable_result_log
+#select sleep(1);
+#--enable_result_log
+#--enable_query_log
+
 #
 # Consistency check
 #

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2011-05-16 19:43:58 +0000
@@ -273,5 +273,5 @@ set @@global.mts_slave_parallel_workers=
 set @@global.slave_transaction_retries= @save.slave_transaction_retries;
 --echo end of the tests
 
---source include/rpl_end.inc
-
+### TODO: restore --source include/rpl_end.inc
+--echo include/rpl_end.inc

=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc	2011-02-27 17:35:25 +0000
+++ b/sql/binlog.cc	2011-05-24 14:29:35 +0000
@@ -4536,7 +4536,7 @@ THD::binlog_set_pending_rows_event(Rows_
 
 /**
    @param db    db name c-string to be inserted into abc-sorted
-                THD::binlog_updated_db_names list.
+                THD::binlog_accessed_db_names list.
 
                 Note, as the list node data (explicitly) so the node
                 struct itself (implicitly) are allocated in
@@ -4547,19 +4547,19 @@ void
 THD::add_to_binlog_updated_dbs(const char *db)
 {
   char *after_db;
-  if (binlog_updated_db_names->elements >  MAX_DBS_IN_QUERY_MTS)
+  if (binlog_accessed_db_names->elements >  MAX_DBS_IN_EVENT_MTS)
   {
     push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_WARN,
                         ER_UPDATED_DBS_GREATER_MAX,
                         ER(ER_UPDATED_DBS_GREATER_MAX),
-                        MAX_DBS_IN_QUERY_MTS);
+                        MAX_DBS_IN_EVENT_MTS);
     return;
   }
 
   after_db= strdup_root(mem_root, db);
-  if (binlog_updated_db_names->elements != 0)
+  if (binlog_accessed_db_names->elements != 0)
   {
-    List_iterator<char> it(*get_binlog_updated_db_names());
+    List_iterator<char> it(*get_binlog_accessed_db_names());
 
     while (it++)
     {
@@ -4583,7 +4583,7 @@ THD::add_to_binlog_updated_dbs(const cha
     }
   }
   if (after_db)
-    binlog_updated_db_names->push_back(after_db);
+    binlog_accessed_db_names->push_back(after_db);
 }
 
 
@@ -4814,15 +4814,15 @@ int THD::decide_logging_format(TABLE_LIS
     /*
       Master side of DML in the STMT format events parallelization.
       All involving table db:s are stored in a abc-ordered name list.
-      In case the number of databases exceeds MAX_DBS_IN_QUERY_MTS maximum
+      In case the number of databases exceeds MAX_DBS_IN_EVENT_MTS maximum
       the list gathering breaks since it won't be sent to the slave.
     */
     if (is_write && variables.binlog_format != BINLOG_FORMAT_ROW &&
         lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
     {
-      if (!binlog_updated_db_names)
+      if (!binlog_accessed_db_names)
       {
-        binlog_updated_db_names= new List<char>; /* thd->mem_root is used */
+        binlog_accessed_db_names= new List<char>; /* thd->mem_root is used */
       }
       for (TABLE_LIST *table= tables; table; table= table->next_global)
       {

=== modified file 'sql/events.cc'
--- a/sql/events.cc	2011-02-27 17:35:25 +0000
+++ b/sql/events.cc	2011-05-24 14:29:35 +0000
@@ -504,7 +504,7 @@ Events::update_event(THD *thd, Event_par
       /* Binlog the alter event. */
       DBUG_ASSERT(thd->query() && thd->query_length());
 
-      thd->set_binlog_updated_db_names(new List<char>);
+      thd->set_binlog_accessed_db_names(new List<char>);
       thd->add_to_binlog_updated_dbs(parse_data->dbname.str);
       if (new_dbname)
         thd->add_to_binlog_updated_dbs(new_dbname->str);

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-04-06 12:51:58 +0000
+++ b/sql/log_event.cc	2011-05-24 14:29:35 +0000
@@ -161,38 +161,6 @@ static const char *HA_ERR(int i)
 }
 
 /**
-  Delay to delete the Rows_query log event until all its rows event are applied
-
-  @param ev    log event should be deleted
-  @param rli   Relay_log_info structure for the slave IO thread.
-*/
-void handle_rows_query_log_event(Log_event *ev, Relay_log_info *rli)
-{
-  DBUG_ENTER("handle_rows_query_log_event");
-  Log_event_type ev_type= ev->get_type_code();
-
-  /* Delete the Rows_query log event after its last rows event are applied */
-  if ((ev_type == WRITE_ROWS_EVENT || ev_type == DELETE_ROWS_EVENT ||
-       ev_type == UPDATE_ROWS_EVENT) && rli->rows_query_ev != NULL &&
-      ((Rows_log_event*) ev)->get_flags(Rows_log_event::STMT_END_F))
-  {
-    if (rli->rows_query_ev)
-      delete rli->rows_query_ev;
-    rli->rows_query_ev= NULL;
-    rli->info_thd->set_query(NULL, 0);
-  }
-
-  /* Record the Rows_query log event until all its rows event are applied */
-  if (ev_type == ROWS_QUERY_LOG_EVENT)
-  {
-    DBUG_ASSERT(rli->rows_query_ev == NULL);
-    rli->rows_query_ev= (Rows_query_log_event*) ev;
-  }
-
-  DBUG_VOID_RETURN;
-}
-
-/**
    Error reporting facility for Rows_log_event::do_apply_event
 
    @param level     error, warning or info
@@ -795,6 +763,10 @@ Log_event::Log_event(const char* buf,
 
 #ifndef MYSQL_CLIENT
 #ifdef HAVE_REPLICATION
+inline int Log_event::do_apply_event_worker(Slave_worker *w)
+{ 
+  return do_apply_event(w);
+}
 
 int Log_event::do_update_pos(Relay_log_info *rli)
 {
@@ -2389,7 +2361,7 @@ bool Log_event::contains_partition_info(
     // todo: Query event is limitly supported
     // which ev->get_db() yields the session db not the actual db
       
-    (get_type_code() == QUERY_EVENT && !ends_group());
+    (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group());
 }
 
 /**
@@ -2426,7 +2398,7 @@ bool Log_event::contains_partition_info(
    @return a pointer to the Worker stuct or NULL.
 */
 
-Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
+Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli)
 {
   Slave_worker *worker= NULL;
   Slave_job_group g;
@@ -2438,14 +2410,14 @@ Slave_worker *Log_event::get_slave_worke
   if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
   {
     ulong gaq_idx;
-    const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+    rli->mts_total_groups++;
 
     g.master_log_pos= log_pos;
     g.group_master_log_pos= g.group_relay_log_pos= 0;
     g.group_master_log_name= NULL; // todo: remove
     g.group_relay_log_name= NULL;
     g.worker_id= (ulong) -1;
-    g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups;
+    g.total_seqno= rli->mts_total_groups;
     g.checkpoint_log_name= NULL;
     g.checkpoint_log_pos= 0;
     g.checkpoint_relay_log_name= NULL;
@@ -2456,7 +2428,7 @@ Slave_worker *Log_event::get_slave_worke
     // the last occupied GAQ's array index
     gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
     // serves as a mark for Coord to delete events otherwise
-    const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE;
+    rli->curr_group_is_parallel= TRUE;
     
     DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s);
     DBUG_ASSERT(((Slave_job_group *) 
@@ -2468,13 +2440,13 @@ Slave_worker *Log_event::get_slave_worke
     {
       Log_event *ptr_curr_ev= this;
       // B-event is appended to the Deferred Array associated with GCAP
-      insert_dynamic(&const_cast<Relay_log_info*>(rli)->curr_group_da,
+      insert_dynamic(&rli->curr_group_da,
                      (uchar*) &ptr_curr_ev);
 
       DBUG_ASSERT(rli->curr_group_da.elements == 1);
 
       // mark the current grup as started with B-event
-      const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= TRUE;
+      rli->curr_group_seen_begin= TRUE;
       return NULL;
     } 
   }
@@ -2483,24 +2455,56 @@ Slave_worker *Log_event::get_slave_worke
 
   if (contains_partition_info())
   {
+    int i= 0;
+    int num_dbs= mts_number_dbs();
     List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
-
     it++;
-    do
+
+    if (num_dbs != OVER_MAX_DBS_IN_EVENT_MTS)
     {
-      char **ref_cur_db= it.ref();
-      // a lot of things inside `get_slave_worker_id'
-      const_cast<Relay_log_info *>(rli)->last_assigned_worker=
-        worker= get_slave_worker(*ref_cur_db, const_cast<Relay_log_info *>(rli));
-      get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-      if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+      do
       {
-        g.worker_id= worker->id;       // todo/fixme: think of Slave_worker* here
-        set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-        
-        DBUG_ASSERT(g.group_relay_log_name == NULL);
-      }
-    } while (mts_number_dbs() != OVER_MAX_DBS_IN_QUERY_MTS && it++);
+        char **ref_cur_db= it.ref();
+
+        if (!(rli->last_assigned_worker=
+              get_slave_worker(*ref_cur_db, rli,
+                               &mts_assigned_partitions[i],
+                               get_type_code() == QUERY_EVENT)))
+        {
+          for (uint k= 0; k < rli->curr_group_da.elements; k++)
+          { 
+            delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+          }
+          return NULL;
+        }
+
+        DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
+        DBUG_ASSERT(rli->last_assigned_worker ==
+                    mts_assigned_partitions[i]->worker);
+        DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
+
+        i++;
+      } while (it++);
+    }
+    else
+    {
+      // Temporary tables of Coordinator are relocated by Worker
+      if (!rli->last_assigned_worker)
+        rli->last_assigned_worker=
+          *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
+    }
+    worker= rli->last_assigned_worker;
+
+    get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+    if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+    {
+      g.worker_id= worker->id;
+      set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+
+      DBUG_ASSERT(g.group_relay_log_name == NULL);
+    }
+
+    DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
 
     // TODO: convert to C's private mem_root.
 
@@ -2514,7 +2518,7 @@ Slave_worker *Log_event::get_slave_worke
     {
       worker= rli->last_assigned_worker;
       
-      DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0); // g must've done
+      DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 || worker->id == 0);
     }
     else // int_, rand_, user_ var:s
     {
@@ -2523,12 +2527,9 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(get_type_code() == INTVAR_EVENT ||
                   get_type_code() == RAND_EVENT ||
                   get_type_code() == USER_VAR_EVENT ||
-
-                  // (TODO: remove) temprory placed:
                   get_type_code() ==  ROWS_QUERY_LOG_EVENT);
 
-      insert_dynamic(&const_cast<Relay_log_info*>(rli)->curr_group_da,
-                     (uchar*) &ptr_curr_ev);
+      insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
       
       DBUG_ASSERT(rli->curr_group_da.elements > 0);
     }
@@ -2557,10 +2558,10 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
 
       ptr_g->group_relay_log_name= (char *)
-        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+        my_malloc(strlen(rli->
                          get_group_relay_log_name()) + 1, MYF(MY_WME));
       strcpy(ptr_g->group_relay_log_name,
-             const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
+             rli->get_group_relay_log_name());
 
       DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
 
@@ -2572,43 +2573,33 @@ Slave_worker *Log_event::get_slave_worke
       // Worker to dealloc
       // master binlog checkpoint
       ptr_g->checkpoint_log_name= (char *)
-        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+        my_malloc(strlen(rli->
                          get_group_master_log_name()) + 1, MYF(MY_WME));
       strcpy(ptr_g->checkpoint_log_name,
-             const_cast<Relay_log_info*>(rli)->get_group_master_log_name());
-      ptr_g->checkpoint_log_pos= const_cast<Relay_log_info*>(rli)->get_group_master_log_pos();
+             rli->get_group_master_log_name());
+      ptr_g->checkpoint_log_pos= rli->get_group_master_log_pos();
       // relay log checkpoint
       ptr_g->checkpoint_relay_log_name= (char *)
-        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+        my_malloc(strlen(rli->
                          get_group_relay_log_name()) + 1, MYF(MY_WME));
       strcpy(ptr_g->checkpoint_relay_log_name,
-             const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
-      ptr_g->checkpoint_relay_log_pos= const_cast<Relay_log_info*>(rli)->get_group_relay_log_pos();
+             rli->get_group_relay_log_name());
+      ptr_g->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
       worker->checkpoint_notified= TRUE;
     }
     ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
-    const_cast<Relay_log_info*>(rli)->checkpoint_seqno++;
-
-    DBUG_ASSERT(worker == rli->last_assigned_worker);
-
-    if (!worker)
-    {
-      DBUG_ASSERT(0); 
+    rli->checkpoint_seqno++;
 
-      // a very special case of the empty group: {B, T}
-      DBUG_ASSERT(rli->curr_group_assigned_parts.elements == 0
-                  && rli->curr_group_da.elements == 1);
-      worker= get_slave_worker("", const_cast<Relay_log_info *>(rli));
-    }
+    DBUG_ASSERT(worker != NULL && worker == rli->last_assigned_worker);
     
     // CGAP cleanup
     for (i= rli->curr_group_assigned_parts.elements; i > 0; i--)
-      delete_dynamic_element(&const_cast<Relay_log_info*>(rli)->
+      delete_dynamic_element(&rli->
                              curr_group_assigned_parts, i - 1);
-    const_cast<Relay_log_info*>(rli)->last_assigned_worker= NULL;
+    rli->last_assigned_worker= NULL;
 
     // reset the B-group marker
-    const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
+    rli->curr_group_seen_begin= FALSE;
   }
   
   return worker;
@@ -2842,11 +2833,17 @@ int Log_event::apply_event(Relay_log_inf
         DBUG_ASSERT(!rli->curr_group_seen_begin);
 
         /*
-          marking the event as not being executed in parallel that affects
-          memory deallocation in the following execution path.
+          Marking sure the event won't be executed in parallel.
+          That affects memory deallocation in the following execution path.
         */
         c_rli->curr_group_is_parallel= FALSE;
         (void) wait_for_workers_to_finish(rli);
+        
+        /* any Worker is idle as done through wait_for_workers_to_finish */
+        DBUG_ASSERT((*(Slave_worker **)
+                     dynamic_array_ptr(&c_rli->workers,
+                                       rand() % c_rli->workers.elements))->
+                    usage_partition == 0);
       }
       else
       {
@@ -2868,28 +2865,15 @@ int Log_event::apply_event(Relay_log_inf
   DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
               rli->last_assigned_worker);
 
-  /* 
-     Todo: disassociate Rows_* events from the central rli.
-  */
   if (seq_event)
   {   // rli->last_assigned_worker != NULL if BTQ but not BQT
     DBUG_ASSERT(rli->curr_group_seen_begin || ends_group());
     if (!c_rli->curr_group_isolated)
       (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
     c_rli->curr_group_isolated= TRUE;
-
-    if (get_type_code() == ROWS_QUERY_LOG_EVENT)
-    {
-      while (c_rli->rows_query_ev != NULL)
-      {
-        my_sleep(10);
-      }
-      c_rli->rows_query_ev= (Rows_query_log_event*) this;
-     }
    }
-
   // getting Worker's id
-  if ((!(w= get_slave_worker_id(rli)) ||
+  if ((!(w= get_slave_worker_id(c_rli)) ||
        DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
     DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
 
@@ -2929,7 +2913,8 @@ int Log_event::apply_event(Relay_log_inf
 
   if (c_rli->curr_group_isolated && term_event)
   {
-    (void) wait_for_workers_to_finish(rli);
+    // to make sure the isolated group terminates in isolation as well
+    (void) wait_for_workers_to_finish(rli, w);
     c_rli->curr_group_isolated= FALSE;
   }
 
@@ -3016,7 +3001,8 @@ int slave_worker_exec_job(Slave_worker *
   } 
   else
   {
-    if (ev->contains_partition_info())
+    if (ev->contains_partition_info() &&
+        ev->mts_number_dbs() < OVER_MAX_DBS_IN_EVENT_MTS)
     {
       List_iterator<char> it(*ev->mts_get_dbs(thd->mem_root));
       DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
@@ -3044,9 +3030,9 @@ int slave_worker_exec_job(Slave_worker *
       }
     }
   }
-  w->w_rli->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
-  error= ev->do_apply_event(rli);
 
+  error= ev->do_apply_event_worker(w);
+  
   if (ev->ends_group() || !w->curr_group_seen_begin)
   {
     DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed  %lu", ev->mts_group_cnt, w->last_group_done_index));
@@ -3054,13 +3040,6 @@ int slave_worker_exec_job(Slave_worker *
     w->slave_worker_ends_group(ev, error); /* last done sets post exec */
   }
 
-    /*
-      commit_positions() fullfils group pos incr and flush
-      TODO: remove
-      if (!error)
-      ev->update_pos(w->w_rli);
-    */
-
   mysql_mutex_lock(&w->jobs_lock);
   de_queue(&w->jobs, job_item);
 
@@ -3118,10 +3097,12 @@ int slave_worker_exec_job(Slave_worker *
   w->stmt_jobs++;
 
 err:
-
-  // todo: fix w/a for Rows_query_log_event
+  if (error)
+    w->slave_worker_ends_group(ev, error);
+  
+  // rows_query_log_event is deleted as a part of the statement cleanup
   if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
-    delete ev;  // after ev->update_pos() event is garbage
+    delete ev;
 
   DBUG_RETURN(error);
 }
@@ -3389,26 +3370,26 @@ bool Query_log_event::write(IO_CACHE* fi
     }
   }
 
-  if (thd && thd->get_binlog_updated_db_names() != NULL)
+  if (thd && thd->get_binlog_accessed_db_names() != NULL)
   {
     uchar dbs;
     *start++= Q_UPDATED_DB_NAMES;
 
-    compile_time_assert(MAX_DBS_IN_QUERY_MTS <= OVER_MAX_DBS_IN_QUERY_MTS);
+    compile_time_assert(MAX_DBS_IN_EVENT_MTS <= OVER_MAX_DBS_IN_EVENT_MTS);
 
     /* 
-       in case of the number of db:s exceeds  MAX_DBS_IN_QUERY_MTS
+       in case of the number of db:s exceeds  MAX_DBS_IN_EVENT_MTS
        no db:s is written and event will require the sequential applying on slave.
     */
     dbs= *start++=
-      (thd->get_binlog_updated_db_names()->elements <= MAX_DBS_IN_QUERY_MTS) ?
-      thd->get_binlog_updated_db_names()->elements : OVER_MAX_DBS_IN_QUERY_MTS;
+      (thd->get_binlog_accessed_db_names()->elements <= MAX_DBS_IN_EVENT_MTS) ?
+      thd->get_binlog_accessed_db_names()->elements : OVER_MAX_DBS_IN_EVENT_MTS;
 
     DBUG_ASSERT(dbs != 0);
 
-    if (dbs <= MAX_DBS_IN_QUERY_MTS)
+    if (dbs <= MAX_DBS_IN_EVENT_MTS)
     {
-      List_iterator_fast<char> it(*thd->get_binlog_updated_db_names());
+      List_iterator_fast<char> it(*thd->get_binlog_accessed_db_names());
       char *db_name;
 
       while ((db_name= it++))
@@ -3417,7 +3398,7 @@ bool Query_log_event::write(IO_CACHE* fi
         start += strlen(db_name) + 1;
       }
     }
-    thd->clear_binlog_updated_db_names();
+    thd->clear_binlog_accessed_db_names();
   }
 
   /*
@@ -3503,7 +3484,7 @@ Query_log_event::Query_log_event(THD* th
    lc_time_names_number(thd_arg->variables.lc_time_names->number),
    charset_database_number(0),
    table_map_for_update((ulonglong)thd_arg->table_map_for_update),
-   master_data_written(0), mts_updated_dbs(0)
+   master_data_written(0), mts_accessed_dbs(0)
 {
   time_t end_time;
 
@@ -3743,7 +3724,7 @@ Query_log_event::Query_log_event(const c
    auto_increment_increment(1), auto_increment_offset(1),
    time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
    table_map_for_update(0), master_data_written(0),
-   mts_updated_dbs(OVER_MAX_DBS_IN_QUERY_MTS)
+   mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS)
 {
   ulong data_len;
   uint32 tmp;
@@ -3927,23 +3908,23 @@ Query_log_event::Query_log_event(const c
     case Q_UPDATED_DB_NAMES:
     {
       CHECK_SPACE(pos, end, 1);
-      mts_updated_dbs= *pos++;
+      mts_accessed_dbs= *pos++;
       /* 
          Notice, the following check is positive also in case of
-         the master's MAX_DBS_IN_QUERY_MTS > the slave's one and the event 
-         contains e.g the master's MAX_DBS_IN_QUERY_MTS db:s.
+         the master's MAX_DBS_IN_EVENT_MTS > the slave's one and the event 
+         contains e.g the master's MAX_DBS_IN_EVENT_MTS db:s.
       */
-      if (mts_updated_dbs > MAX_DBS_IN_QUERY_MTS)
+      if (mts_accessed_dbs > MAX_DBS_IN_EVENT_MTS)
       {
-        mts_updated_dbs= OVER_MAX_DBS_IN_QUERY_MTS;
+        mts_accessed_dbs= OVER_MAX_DBS_IN_EVENT_MTS;
         break;
       }
 
-      DBUG_ASSERT(mts_updated_dbs != 0);
+      DBUG_ASSERT(mts_accessed_dbs != 0);
 
-      for (uchar i= 0; i < mts_updated_dbs; i++)
+      for (uchar i= 0; i < mts_accessed_dbs; i++)
       {
-        strcpy(mts_updated_db_names[i], (char*) pos);
+        strcpy(mts_accessed_db_names[i], (char*) pos);
         pos+= 1 + strlen((const char*) pos);
       }
       break;
@@ -4205,6 +4186,92 @@ void Query_log_event::print(FILE* file, 
 
 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
 
+/**
+   Associating slave Worker thread to a subset of temporary tables
+   belonging to db-partitions the event accesses.
+
+   @param thd   THD instance pointer
+*/
+void Query_log_event::attach_temp_tables_worker(THD *thd)
+{
+  if (!mts_is_worker(thd) || !contains_partition_info())
+    return;
+  
+  DBUG_ASSERT(!thd->temporary_tables);
+
+  if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+  {
+    THD *c_thd= mts_get_coordinator_thd();
+    mts_move_temp_tables_to_thd(thd, c_thd->temporary_tables);
+    c_thd->temporary_tables= NULL;
+  }
+  else
+  {
+    for (int i= 0; i < mts_accessed_dbs; i++)
+    {
+      mts_move_temp_tables_to_thd(thd,
+                                  mts_assigned_partitions[i]->temporary_tables);
+    }
+  }
+}
+
+/**
+   Dissociating slave Worker thread from its thd->temporary_tables
+   to possibly update the involved entries of db-to-worker hash
+   with new values of temporary_tables.
+
+   @param thd   THD instance pointer
+*/
+void Query_log_event::detach_temp_tables_worker(THD *thd)
+{
+  if (!mts_is_worker(thd))
+    return;
+
+  if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+  {
+    THD *c_thd= mts_get_coordinator_thd();
+    /* back to coordinator */
+    mts_move_temp_tables_to_thd(c_thd, thd->temporary_tables);
+    thd->temporary_tables=  NULL;
+    return;
+  }
+
+  /*
+    todo: optimize for a case of 
+
+    a. one db
+       Only detaching temporary_tables from thd to entry would require
+       instead of the double-loop below.
+
+    b. unchanged thd->temporary_tables. 
+       In such case the involved entries would continue to hold the
+       unmodified lists provided that the attach_ method does not
+       destroy references to them.
+  */
+  for (int i= 0; i < mts_accessed_dbs; i++)
+  {
+    mts_assigned_partitions[i]->temporary_tables= 0;
+  }
+
+  for (TABLE *table= thd->temporary_tables; table;)
+  {
+    int i;
+
+    // find which entry to go
+    for (i= 0; i <  mts_accessed_dbs; i++)
+      if (strcmp(table->s->db.str, mts_accessed_db_names[i]) < 0)
+        continue;
+      else
+        break;
+
+    DBUG_ASSERT(i < mts_accessed_dbs);
+
+    // table pointer is shifted inside the function
+    table= mts_move_temp_table_to_entry(table, thd, mts_assigned_partitions[i]);
+  }
+  DBUG_ASSERT(!thd->temporary_tables);
+}
+
 int Query_log_event::do_apply_event(Relay_log_info const *rli)
 {
   return do_apply_event(rli, query, q_len);
@@ -4266,8 +4333,12 @@ int Query_log_event::do_apply_event(Rela
   const_cast<Relay_log_info*>(rli)->set_future_group_master_log_pos(log_pos);
   DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
 
+  /*
+    todo: such cleanup should not be specific to Query event and therefore
+          is preferable at a common with other event pre-execution point
+  */
   clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
-  if (strcmp("COMMIT", query) == 0 && *(rli->get_tables_to_lock()) != NULL)
+  if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock != NULL)
   {
     /*
       Cleaning-up the last statement context:
@@ -4311,11 +4382,12 @@ int Query_log_event::do_apply_event(Rela
   */
   if (is_trans_keyword() || rpl_filter->db_ok(thd->db))
   {
+    // TODO: MTS testing|benchmarking feature to remove|separate out
     thd->set_time(!opt_mts_slave_local_timestamp ? (time_t)when : my_time(0));
-    //thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id());
     thd->set_query_and_id((char*)query_arg, q_len_arg,
                           thd->charset(), next_query_id());
     thd->variables.pseudo_thread_id= thread_id;		// for temp tables
+    attach_temp_tables_worker(thd);
     DBUG_PRINT("query",("%s", thd->query()));
 
     if (ignored_error_code((expected_error= error_code)) ||
@@ -4579,6 +4651,9 @@ Default database: '%s'. Query: '%s'",
       The sql thread receives the killed status and will proceed
       to shutdown trying to finish incomplete events group.
     */
+
+    // TODO: address the middle-group killing in MTS case
+
     DBUG_EXECUTE_IF("stop_slave_middle_group",
                     if (strcmp("COMMIT", query) != 0 &&
                         strcmp("BEGIN", query) != 0)
@@ -4589,6 +4664,9 @@ Default database: '%s'. Query: '%s'",
   }
 
 end:
+
+  if (thd->temporary_tables)
+    detach_temp_tables_worker(thd);
   /*
     Probably we have set thd->query, thd->db, thd->catalog to point to places
     in the data_buf of this event. Now the event is going to be deleted
@@ -6413,7 +6491,6 @@ int Rotate_log_event::do_update_pos(Rela
                         rli->get_group_master_log_name(),
                         (ulong) rli->get_group_master_log_pos()));
     mysql_mutex_unlock(&rli->data_lock);
-    
     rli->flush_info(TRUE);
     
     /*
@@ -6774,6 +6851,28 @@ void Xid_log_event::print(FILE* file, PR
 
 
 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+
+int Xid_log_event::do_apply_event_worker(Slave_worker *w)
+{
+  int error= 0;
+  bool is_trans_repo= w->is_transactional();
+
+  if (is_trans_repo)
+  {
+    ulong gaq_idx= mts_group_cnt;
+    Slave_job_group *ptr_g=
+      (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+
+    if ((error= w->commit_positions(this, ptr_g)))
+      goto err;
+  }
+  error= trans_commit(thd); /* Automatically rolls back on error. */
+  thd->mdl_context.release_transactional_locks();
+
+err:
+  return error;
+}
+
 int Xid_log_event::do_apply_event(Relay_log_info const *rli)
 {
   int error= 0;
@@ -6785,15 +6884,13 @@ int Xid_log_event::do_apply_event(Relay_
     the context of the current transaction in order to provide
     data integrity. See sql/rpl_rli.h for further details.
   */
-  Slave_worker *w= rli_ptr->get_current_worker();
-  bool is_parallel= (w != NULL);
-  bool is_trans_repo= (is_parallel ? w->is_transactional() : rli_ptr->is_transactional());
+  bool is_trans_repo= rli_ptr->is_transactional();
 
   /* For a slave Xid_log_event is COMMIT */
   general_log_print(thd, COM_QUERY,
                     "COMMIT /* implicit, from Xid_log_event */");
 
-  if (is_trans_repo && !is_parallel)
+  if (is_trans_repo)
   {
     mysql_mutex_lock(&rli_ptr->data_lock);
   }
@@ -6811,7 +6908,7 @@ int Xid_log_event::do_apply_event(Relay_
   /*
     We need to update the positions in here to make it transactional.  
   */
-  if (is_trans_repo && !is_parallel)
+  if (is_trans_repo)
   {
     rli_ptr->inc_event_relay_log_pos();
     rli_ptr->set_group_relay_log_pos(rli_ptr->get_event_relay_log_pos());
@@ -6827,15 +6924,6 @@ int Xid_log_event::do_apply_event(Relay_
     if ((error= rli_ptr->flush_info(TRUE)))
       goto err;
   }
-  else if (is_trans_repo && is_parallel)
-  {
-    ulong gaq_idx= mts_group_cnt;
-    Slave_job_group *ptr_g=
-      (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
-
-    if ((error= w->commit_positions(this, ptr_g)))
-      goto err;
-  }
 
   DBUG_PRINT("info", ("do_apply group master %s %lu  group relay %s %lu event %s %lu\n",
     rli_ptr->get_group_master_log_name(),
@@ -6852,7 +6940,7 @@ int Xid_log_event::do_apply_event(Relay_
   thd->mdl_context.release_transactional_locks();
 
 err:
-  if (is_trans_repo && !is_parallel)
+  if (is_trans_repo)
   {
     mysql_cond_broadcast(&rli_ptr->data_cond);
     mysql_mutex_unlock(&rli_ptr->data_lock);
@@ -8711,7 +8799,7 @@ int Rows_log_event::do_apply_event(Relay
     do_apply_event(). We still check here to prevent future coding
     errors.
   */
-  DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
+  DBUG_ASSERT(rli->info_thd == thd);
 
   /*
     If there is no locks taken, this is the first binrow event seen
@@ -8763,7 +8851,7 @@ int Rows_log_event::do_apply_event(Relay
     /* A small test to verify that objects have consistent types */
     DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
-    if (open_and_lock_tables(thd, *rli->get_tables_to_lock(), FALSE, 0))
+    if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
     {
       uint actual_error= thd->stmt_da->sql_errno();
       if (thd->is_slave_error || thd->is_fatal_error)
@@ -8794,8 +8882,8 @@ int Rows_log_event::do_apply_event(Relay
 
     {
       DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
-                           *rli->get_tables_to_lock()));
-      RPL_TABLE_LIST *ptr= *rli->get_tables_to_lock();
+                           rli->tables_to_lock));
+      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
       for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
       {
         TABLE *conv_table;
@@ -8835,22 +8923,17 @@ int Rows_log_event::do_apply_event(Relay
       Rows_log_event, we can invalidate the query cache for the
       associated table.
      */
-    for (TABLE_LIST *ptr= *rli->get_tables_to_lock() ; ptr ; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr; ptr= ptr->next_global)
     {
-      if (!rli->is_parallel_exec())
-        const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
-      else
-        rli->get_current_worker()->m_table_map.set_table(ptr->table_id, ptr->table);
+      const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
     }
 #ifdef HAVE_QUERY_CACHE
-    query_cache.invalidate_locked_for_write(*rli->get_tables_to_lock());
+    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
 #endif
   }
 
-  TABLE* 
-    table= m_table= (!rli->is_parallel_exec()) ?
-    const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id) :
-    rli->get_current_worker()->m_table_map.get_table(m_table_id);
+  TABLE* table= 
+    m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
   
   DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
 
@@ -9566,7 +9649,7 @@ int Table_map_log_event::do_apply_event(
   size_t dummy_len;
   void *memory;
   DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
-  DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
+  DBUG_ASSERT(rli->info_thd == thd);
 
   /* Step the query id to mark what columns that are actually used. */
   thd->set_query_id(next_query_id());
@@ -9631,11 +9714,9 @@ int Table_map_log_event::do_apply_event(
       We record in the slave's information that the table should be
       locked by linking the table into the list of tables to lock.
     */
-    table_list->next_global= table_list->next_local= *rli->get_tables_to_lock();
-
-    //const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
-    //const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
-    const_cast<Relay_log_info*>(rli)->add_table_to_lock(table_list);
+    table_list->next_global= table_list->next_local= rli->tables_to_lock;
+    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
+    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
 
     /* 'memory' is freed in clear_tables_to_lock */
   }
@@ -11385,16 +11466,21 @@ Rows_query_log_event::write_data_body(IO
   DBUG_RETURN(write_str(file, m_rows_query, (uint) strlen(m_rows_query)));
 }
 
-#ifndef MYSQL_CLIENT
+#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
 int Rows_query_log_event::do_apply_event(Relay_log_info const *rli)
 {
   DBUG_ENTER("Rows_query_log_event::do_apply_event");
-  DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
+  DBUG_ASSERT(rli->info_thd == thd);
   /* Set query for writing Rows_query log event into binlog later.*/
   thd->set_query(m_rows_query, (uint32) strlen(m_rows_query));
+
+  DBUG_ASSERT(rli->rows_query_ev == NULL);
+
+  const_cast<Relay_log_info*>(rli)->rows_query_ev= this;
+
   DBUG_RETURN(0);
 }
-#endif /* !MYSQL_CLIENT */
+#endif
 
 
 #ifdef MYSQL_CLIENT

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-02-27 17:35:25 +0000
+++ b/sql/log_event.h	2011-05-24 14:29:35 +0000
@@ -52,6 +52,7 @@
 /* Forward declarations */
 class String;
 typedef ulonglong sql_mode_t;
+typedef struct st_db_worker_hash_entry db_worker_hash_entry;
 
 #define PREFIX_SQL_LOAD "SQL_LOAD-"
 
@@ -261,14 +262,14 @@ struct sql_ex_info
 
 /*
    The maximum number of updated databases that a status of Query-log-event can carry.
-   In can redefined still to not be bigger than OVER_MAX_DBS_IN_QUERY_MTS.
+   In can redefined still to not be bigger than OVER_MAX_DBS_IN_EVENT_MTS.
 */
-#define MAX_DBS_IN_QUERY_MTS 16
+#define MAX_DBS_IN_EVENT_MTS 16
 /*
-   When the actual number of db:s exceeds MAX_DBS_IN_QUERY_MTS
-   the value of OVER_MAX_DBS_IN_QUERY_MTS is is put into the mts_updated_dbs status.
+   When the actual number of db:s exceeds MAX_DBS_IN_EVENT_MTS
+   the value of OVER_MAX_DBS_IN_EVENT_MTS is is put into the mts_accessed_dbs status.
 */
-#define OVER_MAX_DBS_IN_QUERY_MTS 254
+#define OVER_MAX_DBS_IN_EVENT_MTS 254
 
 /* 
   Max number of possible extra bytes in a replication event compared to a
@@ -286,7 +287,7 @@ struct sql_ex_info
                                    1 + 8          /* type, table_map_for_update */ + \
                                    1 + 4          /* type, master_data_written */ + \
                                                   /* type, db_1, db_2, ... */  \
-                                   1 + (MAX_DBS_IN_QUERY_MTS * (1 + NAME_LEN)) + \
+                                   1 + (MAX_DBS_IN_EVENT_MTS * (1 + NAME_LEN)) + \
                                    1 + 16 + 1 + 60/* type, user_len, user, host_len, host */)
 #define MAX_LOG_EVENT_HEADER   ( /* in order of Query_log_event::write */ \
   LOG_EVENT_HEADER_LEN + /* write_header */ \
@@ -1048,6 +1049,10 @@ public:
 
 #ifdef MYSQL_SERVER
   THD* thd;
+  /**
+     Partition info associate with event to deliver to MTS event applier 
+  */
+  db_worker_hash_entry *mts_assigned_partitions[MAX_DBS_IN_EVENT_MTS];
 
   Log_event();
   Log_event(THD* thd_arg, uint16 flags_arg, bool is_transactional);
@@ -1245,7 +1250,7 @@ public:
       */
       (get_type_code() == QUERY_EVENT &&
        !starts_group() && !ends_group() &&
-       (mts_number_dbs() ==  OVER_MAX_DBS_IN_QUERY_MTS)) ||
+       (mts_number_dbs() ==  OVER_MAX_DBS_IN_EVENT_MTS)) ||
 
       get_type_code() == START_EVENT_V3          ||
       get_type_code() == STOP_EVENT              ||
@@ -1264,8 +1269,6 @@ public:
       get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
       get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
 
-      get_type_code() == ROWS_QUERY_LOG_EVENT    || /* TODO: make parallel */
-
       get_type_code() == INCIDENT_EVENT;
   }
 
@@ -1300,7 +1303,7 @@ public:
              to be assigned worker;
              M is the max index of the worker pool.
   */
-  Slave_worker *get_slave_worker_id(Relay_log_info const *rli);
+  Slave_worker *get_slave_worker_id(Relay_log_info *rli);
 
   /**
      Apply the event to the database.
@@ -1356,6 +1359,8 @@ public:
     return 0;                /* Default implementation does nothing */
   }
 
+  virtual int do_apply_event_worker(Slave_worker *w);
+
 protected:
 
   /**
@@ -1887,8 +1892,8 @@ public:
     number of updated db:s by the query and their names. This info
     is requested by both Coordinator and Worker.
   */
-  uchar mts_updated_dbs;
-  char mts_updated_db_names[MAX_DBS_IN_QUERY_MTS][NAME_LEN];
+  uchar mts_accessed_dbs;
+  char mts_accessed_db_names[MAX_DBS_IN_EVENT_MTS][NAME_LEN];
 
 #ifdef MYSQL_SERVER
 
@@ -1898,22 +1903,23 @@ public:
 
   /**
      Returns a list of updated db:s or the default db single item list
-     in case of over-MAX_DBS_IN_QUERY_MTS actual db:s.
+     in case of over-MAX_DBS_IN_EVENT_MTS actual db:s.
   */
   virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
   {
     List<char> *res= new (mem_root) List<char>;
-    if (mts_updated_dbs == OVER_MAX_DBS_IN_QUERY_MTS)
+    if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
       res->push_back((char*) get_db());
     else
-      for (uchar i= 0; i < mts_updated_dbs; i++)
-        res->push_back(mts_updated_db_names[i]);
+      for (uchar i= 0; i < mts_accessed_dbs; i++)
+        res->push_back(mts_accessed_db_names[i]);
     return res;
   }
 
-  virtual uchar mts_number_dbs() { return mts_updated_dbs; }
+  void attach_temp_tables_worker(THD*);
+  void detach_temp_tables_worker(THD*);
 
-  virtual uchar mts_number_of_updated_dbs() { return mts_updated_dbs; }
+  virtual uchar mts_number_dbs() { return mts_accessed_dbs; }
 
 #ifdef HAVE_REPLICATION
   void pack_info(Protocol* protocol);
@@ -2663,6 +2669,7 @@ class Xid_log_event: public Log_event
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);
+  virtual int do_apply_event_worker(Slave_worker *rli);
   enum_skip_reason do_shall_skip(Relay_log_info *rli);
 #endif
 };
@@ -4305,11 +4312,11 @@ public:
   {
     return IGNORABLE_HEADER_LEN + 1 + (uint) strlen(m_rows_query);
   }
+#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
+  virtual int do_apply_event(Relay_log_info const *rli);
+#endif
 
 private:
-#if !defined(MYSQL_CLIENT)
-  virtual int do_apply_event(Relay_log_info const* rli);
-#endif
 
   char * m_rows_query;
 };

=== modified file 'sql/rpl_reporting.cc'
--- a/sql/rpl_reporting.cc	2010-08-05 17:45:25 +0000
+++ b/sql/rpl_reporting.cc	2011-05-16 19:43:58 +0000
@@ -29,12 +29,20 @@ void
 Slave_reporting_capability::report(loglevel level, int err_code,
                                    const char *msg, ...) const
 {
+  va_list args;
+  va_start(args, msg);
+  do_report(level,  err_code, msg, args);
+  va_end(args);
+}
+
+void
+Slave_reporting_capability::do_report(loglevel level, int err_code,
+                                   const char *msg, va_list args) const
+{
   void (*report_function)(const char *, ...);
   char buff[MAX_SLAVE_ERRMSG];
   char *pbuff= buff;
   uint pbuffsize= sizeof(buff);
-  va_list args;
-  va_start(args, msg);
 
   mysql_mutex_lock(&err_lock);
   switch (level)
@@ -64,7 +72,6 @@ Slave_reporting_capability::report(logle
   my_vsnprintf(pbuff, pbuffsize, msg, args);
 
   mysql_mutex_unlock(&err_lock);
-  va_end(args);
 
   /* If the msg string ends with '.', do not add a ',' it would be ugly */
   report_function("Slave %s: %s%s Error_code: %d",

=== modified file 'sql/rpl_reporting.h'
--- a/sql/rpl_reporting.h	2010-08-05 17:45:25 +0000
+++ b/sql/rpl_reporting.h	2011-05-16 19:43:58 +0000
@@ -52,8 +52,10 @@ public:
                         code, but can contain more information), in
                         printf() format.
   */
-  void report(loglevel level, int err_code, const char *msg, ...) const
+  virtual void report(loglevel level, int err_code, const char *msg, ...) const
     ATTRIBUTE_FORMAT(printf, 4, 5);
+  void do_report(loglevel level, int err_code,
+                 const char *msg, va_list v_args) const;
 
   /**
      Clear errors. They will not show up under <code>SHOW SLAVE

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-04-06 12:51:58 +0000
+++ b/sql/rpl_rli.cc	2011-05-24 14:29:35 +0000
@@ -146,8 +146,7 @@ void Relay_log_info::deinit_workers()
   mysql_cond_destroy(&pending_jobs_cond);
   mysql_mutex_destroy(&mts_temp_tables_lock);
 
-  if (!this_worker)
-    delete_dynamic(&workers);
+  delete_dynamic(&workers);
 }
 
 Relay_log_info::~Relay_log_info()
@@ -203,57 +202,6 @@ void Relay_log_info::reset_notified_chec
   checkpoint_seqno= 0;
 }
 
-/**
-   The method can be run both by C having the Main (coord) rli context and
-   by W having both the main and the Local (worker) rli context.
-   Decision matrix:
-
-       Main   Local
-     -+-------------
-     C| this_w  -
-     W| w_i    this_w
-
-*/
-Slave_worker* Relay_log_info::get_current_worker() const
-{ 
-  uint i;
-  if (!is_parallel_exec() || info_thd == current_thd)
-    return this_worker; //  can be asserted:  !this_worker => C
-  for (i= 0; i< workers.elements; i++)
-  {
-    Slave_worker* w_i= *(Slave_worker**)
-      dynamic_array_ptr(const_cast<DYNAMIC_ARRAY*>(&workers), i);
-    if (w_i->info_thd == current_thd)
-    {
-      return w_i;
-    }
-  }
-  DBUG_ASSERT(0);
-}
-
-/**
-   The method can be run both by C having the Main context.
-*/
-RPL_TABLE_LIST** Relay_log_info::get_tables_to_lock() const
-{
-  return
-    ((!is_parallel_exec()) || info_thd == current_thd) ?
-    const_cast<RPL_TABLE_LIST**>(&tables_to_lock) :
-    &get_current_worker()->tables_to_lock;
-}
-
-uint Relay_log_info::add_table_to_lock(RPL_TABLE_LIST *table_list)
-{
-  if (!is_parallel_exec())
-    tables_to_lock= table_list;
-  else
-    get_current_worker()->tables_to_lock= table_list;
-  return 
-    (!is_parallel_exec()) ? 
-    tables_to_lock_count++ : 
-    get_current_worker()->tables_to_lock_count++;
-}
-
 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 {
   MY_STAT s;
@@ -1070,22 +1018,18 @@ void Relay_log_info::stmt_done(my_off_t 
     inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
   else
   {
-    Slave_worker *w;
     inc_group_relay_log_pos(event_master_log_pos);
     
-    /* Alfranio needs to update the coordinator and workers. */
+    DBUG_ASSERT(this_worker == NULL);
     
-    if ((w= get_current_worker()) == NULL)
-    {
-      flush_info(is_transactional() ? TRUE : FALSE);
+    flush_info(is_transactional() ? TRUE : FALSE);
 
-      /* 
-         The central recovery commit run in sequential mode forces
-         notification on the defacto new checkpoint.
-      */
-      if (is_parallel_exec())
-        reset_notified_checkpoint();
-    }
+    /* 
+       The central recovery commit run in sequential mode forces
+       notification on the defacto new checkpoint.
+    */
+    if (is_parallel_exec())
+      reset_notified_checkpoint();
   }
 }
 
@@ -1094,7 +1038,7 @@ void Relay_log_info::cleanup_context(THD
 {
   DBUG_ENTER("Relay_log_info::cleanup_context");
 
-  DBUG_ASSERT((info_thd == thd) || is_parallel_exec() || is_mts_recovery());
+  DBUG_ASSERT((info_thd == thd));
   /*
     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
     may have opened tables, which we cannot be sure have been closed (because
@@ -1112,26 +1056,13 @@ void Relay_log_info::cleanup_context(THD
     trans_rollback_stmt(thd); // if a "statement transaction"
     trans_rollback(thd);      // if a "real transaction"
   }
-  /*
-    MTS W/a for Rows_query_log_event.
-    Cleanup of rows_query_ev at the end of the current statement.
-
-    todo: move handle_rows_query_log_event() cleanup logics into this method
-          inconditionally.
-  */
-  if (error || is_parallel_exec())
-    if (rows_query_ev)
-    {
-      delete rows_query_ev;
-      rows_query_ev= NULL;
-      info_thd->set_query(NULL, 0);
-    }
-
-  if (!is_parallel_exec() || thd == info_thd)
-    m_table_map.clear_tables();
-  else
-    get_current_worker()->m_table_map.clear_tables();
-
+  if (rows_query_ev)
+  {
+    delete rows_query_ev;
+    rows_query_ev= NULL;
+    info_thd->set_query(NULL, 0);
+  }
+  m_table_map.clear_tables();
   slave_close_thread_tables(thd);
   if (error)
     thd->mdl_context.release_transactional_locks();
@@ -1146,26 +1077,20 @@ void Relay_log_info::cleanup_context(THD
 
 void Relay_log_info::clear_tables_to_lock()
 {
-  RPL_TABLE_LIST **p_tables= get_tables_to_lock();
-  Slave_worker* w_c= get_current_worker();
-  while ((*p_tables))
+  while (tables_to_lock)
   {
-    uchar* to_free= reinterpret_cast<uchar*>((*p_tables));
-    if ((*p_tables)->m_tabledef_valid)
+    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
+    if (tables_to_lock->m_tabledef_valid)
     {
-      (*p_tables)->m_tabledef.table_def::~table_def();
-      (*p_tables)->m_tabledef_valid= FALSE;
+      tables_to_lock->m_tabledef.table_def::~table_def();
+      tables_to_lock->m_tabledef_valid= FALSE;
     }
-    (*p_tables)=
-      static_cast<RPL_TABLE_LIST*>((*p_tables)->next_global);
-    if (w_c)
-      w_c->tables_to_lock_count--;
-    else
-      tables_to_lock_count--;
+    tables_to_lock=
+      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
+    tables_to_lock_count--;
     my_free(to_free);
   }
-  DBUG_ASSERT((*p_tables) == NULL &&
-              ((!w_c && tables_to_lock_count == 0) || w_c->tables_to_lock_count == 0));
+  DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
 }
 
 void Relay_log_info::slave_close_thread_tables(THD *thd)
@@ -1701,3 +1626,11 @@ bool Relay_log_info::write_info(Rpl_info
 
   DBUG_RETURN(FALSE);
 }
+
+
+THD* mts_get_coordinator_thd()
+{
+  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+    NULL : active_mi->rli->info_thd;
+}
+

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-04-06 12:51:58 +0000
+++ b/sql/rpl_rli.h	2011-05-24 14:29:35 +0000
@@ -206,7 +206,6 @@ public:
     happen when, for example, the relay log gets rotated because of
     max_binlog_size.
   */
-protected:
   char group_relay_log_name[FN_REFLEN];
   ulonglong group_relay_log_pos;
   char event_relay_log_name[FN_REFLEN];
@@ -228,6 +227,8 @@ protected:
   char group_master_log_name[FN_REFLEN];
   volatile my_off_t group_master_log_pos;
 
+// TODO: Restore!
+protected:
   /*
     When it commits, InnoDB internally stores the master log position it has
     processed so far; the position to store is the one of the end of the
@@ -284,9 +285,6 @@ public:
   */
   ulong slave_exec_mode;
 
-  RPL_TABLE_LIST** get_tables_to_lock() const;
-  uint add_table_to_lock(RPL_TABLE_LIST *table_list);
-
   /*
      Condition and its parameters from START SLAVE UNTIL clause.
      
@@ -390,11 +388,12 @@ public:
   uint tables_to_lock_count;        /* RBR: Count of tables to lock */
   table_mapping m_table_map;      /* RBR: Mapping table-id to table */
   /* RBR: Record Rows_query log event */
-  volatile Rows_query_log_event* rows_query_ev;  // mts w/a makes it volatile
+  Rows_query_log_event* rows_query_ev;
 
   bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
   {
-    TABLE_LIST *tables= *get_tables_to_lock();
+    TABLE_LIST *tables= tables_to_lock;
+
     DBUG_ASSERT(tabledef_var && conv_table_var);
     for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
       if (ptr->table == table_arg)
@@ -455,8 +454,6 @@ public:
   ulong mts_wqs_overfill_cnt;  // Coord waits if a W's queue is full
   long  mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
   ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
-  Slave_worker* get_current_worker() const;
-  Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
  
@@ -738,4 +735,15 @@ private:
 
 bool mysql_show_relaylog_events(THD* thd);
 
+THD* mts_get_coordinator_thd();
+
+/**
+   @param  thd a reference to THD
+   @return TRUE if thd belongs to a Worker thread and FALSE otherwise.
+*/
+inline bool mts_is_worker(THD *thd)
+{
+  return thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER;
+}
+
 #endif /* RPL_RLI_H */

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-23 13:34:02 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-05-24 14:29:35 +0000
@@ -29,13 +29,10 @@ const char *info_slave_worker_fields []=
 
 Slave_worker::Slave_worker(const char* type, const char* pfs,
                            Relay_log_info *rli)
-  : Rpl_info_worker(type, pfs), c_rli(rli), curr_group_exec_parts(0),
-  group_relay_log_pos(0), group_master_log_pos(0),
+  : Relay_log_info(FALSE), c_rli(rli), curr_group_exec_parts(0),
   checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
   inited_group_execed(0)
 {
-  group_relay_log_name[0]= 0;
-  group_master_log_name[0]= 0;
   checkpoint_relay_log_name[0]= 0;
   checkpoint_master_log_name[0]= 0;
 }
@@ -258,7 +255,7 @@ extern "C" uchar *get_key(const uchar *r
 {
   DBUG_ENTER("get_key");
 
-  db_worker *entry=(db_worker *) record;
+  db_worker_hash_entry *entry=(db_worker_hash_entry *) record;
   *length= strlen(entry->db);
 
   DBUG_PRINT("info", ("get_key  %s, %d", entry->db, (int) *length));
@@ -267,12 +264,20 @@ extern "C" uchar *get_key(const uchar *r
 }
 
 
-static void free_entry(db_worker *entry)
+static void free_entry(db_worker_hash_entry *entry)
 {
+  THD *c_thd= current_thd;
+
   DBUG_ENTER("free_entry");
 
   DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
 
+  DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
+  DBUG_ASSERT(entry->usage == 0);
+
+  mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
+  entry->temporary_tables= NULL;
+
   my_free((void *) entry->db);
   my_free(entry);
 
@@ -294,11 +299,13 @@ bool init_hash_workers(ulong slave_paral
   DBUG_RETURN (!inited_hash_workers);
 }
 
-void destroy_hash_workers()
+void destroy_hash_workers(Relay_log_info *rli)
 {
   DBUG_ENTER("destroy_hash_workers");
   if (inited_hash_workers)
+  {
     my_hash_free(&mapping_db_to_worker);
+  }
   mysql_mutex_destroy(&slave_worker_hash_lock);
   mysql_cond_destroy(&slave_worker_hash_cond);
 
@@ -306,6 +313,110 @@ void destroy_hash_workers()
 }
 
 /**
+   Relocating temporary table reference into @c entry location.
+   Sources can be the coordinator's and the Worker's thd->temporary_tables.
+
+   @param table   TABLE instance pointer
+   @param thd     THD instance pointer of the source of relocation
+   @param entry   db_worker_hash_entry instance pointer
+
+   @note  thd->temporary_tables can become NULL
+
+   @return the pointer to a table following the unlinked
+*/
+TABLE* mts_move_temp_table_to_entry(TABLE *table, THD *thd,
+                                    db_worker_hash_entry *entry)
+{
+  TABLE *ret= table->next;
+
+  if (table->prev)
+  {
+    table->prev->next= table->next;
+    if (table->prev->next)
+      table->next->prev= table->prev;
+  }
+  else
+  {
+    /* removing the first item from the list */
+    DBUG_ASSERT(table == thd->temporary_tables);
+
+    thd->temporary_tables= table->next;
+    if (thd->temporary_tables)
+      table->next->prev= 0;
+  }
+  table->next= entry->temporary_tables;
+  table->prev= 0;
+  if (table->next)
+    table->next->prev= table;
+  entry->temporary_tables= table;
+
+  return ret;
+}
+
+
+/**
+   Relocation of the list of temporary tables to thd->temporary_tables.
+
+   @param thd     THD instance pointer of the destination
+   @param temporary_tables
+                  the source temporary_tables list
+
+   @note     destorying references to the source list, if necessary,
+             is left to the caller.
+
+   @return   the post-merge value of thd->temporary_tables.
+*/
+TABLE* mts_move_temp_tables_to_thd(THD *thd, TABLE *temporary_tables)
+{
+  TABLE *table= temporary_tables;
+  if (!table)
+    return NULL;
+  
+  // accept only the list head 
+  DBUG_ASSERT(!temporary_tables->prev);
+
+  // walk along the source list and associate the tables with thd
+  do
+  {
+    table->in_use= thd;
+  } while(table->next && (table= table->next));
+
+  // link the former list against the tail of the source list
+  if (thd->temporary_tables)
+    thd->temporary_tables->prev= table;
+  table->next= thd->temporary_tables;
+  thd->temporary_tables= temporary_tables;
+
+  return thd->temporary_tables;
+}
+
+/**
+   Relocating references of temporary tables of a database
+   of the entry argument from THD into the entry.
+   
+   @param thd    THD pointer of the source temporary_tables list
+   @param entry  a pointer to db_worker_hash_entry record
+                 containing database descriptor and temporary_tables list.
+                 
+*/
+static void move_temp_tables_to_entry(THD* thd, db_worker_hash_entry* entry)
+{
+  for (TABLE *table= thd->temporary_tables; table;)
+  {
+    if (strcmp(table->s->db.str, entry->db) == 0)
+    {
+      // table pointer is shifted inside the function
+      table= mts_move_temp_table_to_entry(table, thd, entry);
+    }
+    else
+    {
+      table= table->next;
+    }
+  }
+}
+
+
+/**
    The function produces a reference to the struct of a Worker
    that has been or will be engaged to process the @c dbname -keyed  partition (D).
    It checks a local to Coordinator CGAP list first and returns 
@@ -315,8 +426,8 @@ void destroy_hash_workers()
 
         CGAP .= D
 
-   and a possible  D's Worker id is searched in APH that collects tuples
-   (P, W_id, U, mutex, cond).
+   and a possible D's Worker id is searched in Assigne Partition Hash
+   (APH) that collects tuples (P, W_id, U, mutex, cond).
    In case not found,
 
         W_d := W_c unless W_c is NULL.
@@ -342,22 +453,31 @@ void destroy_hash_workers()
      c. updates the APH record to point to the first Worker (naturally, U := 1),
         scheduled the event, and goes back into the parallel mode
 
-   @note modifies  CGAP, APH
+   @param  dbname    pointer to c-string containing database name
+   @param  rli       pointer to Coordinators relay-log-info instance
+   @param  ptr_entry reference to a pointer to the resulted entry in
+                     the Assigne Partition Hash where
+                     the entry's pointer is stored at return.
 
-   @return the pointer to a Worker struct 
+   @note modifies  CGAP, APH and unlinks @c dbname -keyd temp tables 
+         from C's thd->temporary_tables to move them into the entry record.
+
+   @return the pointer to a Worker struct
 */
-Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli)
+Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
+                               db_worker_hash_entry **ptr_entry,
+                               bool need_temp_tables)
 {
   uint i;
-  char key[NAME_LEN + 2];
   DYNAMIC_ARRAY *workers= &rli->workers;
+  THD *thd= rli->info_thd;
 
   DBUG_ENTER("get_slave_worker");
 
   if (!inited_hash_workers)
     DBUG_RETURN(NULL);
 
-  db_worker *entry= NULL;
+  db_worker_hash_entry *entry= NULL;
   my_hash_value_type hash_value;
   uchar dblength= (uint) strlen(dbname);
 
@@ -366,16 +486,17 @@ Slave_worker *get_slave_worker(const cha
   // Search in CGAP
   for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
   {
-    get_dynamic(&rli->curr_group_assigned_parts, (uchar*) key, i);
-    if ((uchar) key[0] != dblength)
+    entry= * (db_worker_hash_entry **)
+      dynamic_array_ptr(&rli->curr_group_assigned_parts, i);
+    if ((uchar) entry->db_len != dblength)
       continue;
     else
-      if (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0)
+      if (strncmp(entry->db, const_cast<char*>(dbname), dblength) == 0)
+      {
+        *ptr_entry= entry;
         DBUG_RETURN(rli->last_assigned_worker);
+      }
   }
-  key[0]= dblength;
-  memcpy(key + 1, dbname, dblength + 1);
-  insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) key);
 
   DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
 
@@ -384,7 +505,7 @@ Slave_worker *get_slave_worker(const cha
 
   mysql_mutex_lock(&slave_worker_hash_lock);
 
-  entry= (db_worker *)
+  entry= (db_worker_hash_entry *)
     my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
                                     (uchar*) dbname, dblength);
   if (!entry)
@@ -398,17 +519,6 @@ Slave_worker *get_slave_worker(const cha
     my_bool ret;
     char *db= NULL;
 
-    if (mapping_db_to_worker.records > opt_mts_partition_hash_soft_max)
-    {
-      /* remove zero-usage (todo: relatively rare scheduled) records */
-      for (uint i= 0; i < mapping_db_to_worker.records; i++)
-      {
-        db_worker *entry= (db_worker*) my_hash_element(&mapping_db_to_worker, i);
-        if (entry->usage == 0)
-          my_hash_delete(&mapping_db_to_worker, (uchar*) entry);
-      }
-    }
-
     mysql_mutex_unlock(&slave_worker_hash_lock);
 
     DBUG_PRINT("info", ("Inserting %s, %d", dbname, dblength));
@@ -418,14 +528,16 @@ Slave_worker *get_slave_worker(const cha
     */
     if (!(db= (char *) my_malloc((size_t) dblength + 1, MYF(0))))
       goto err;
-    if (!(entry= (db_worker *) my_malloc(sizeof(db_worker), MYF(0))))
+    if (!(entry= (db_worker_hash_entry *) my_malloc(sizeof(db_worker_hash_entry), MYF(0))))
     {
       my_free(db);
       goto err;
     }
     strmov(db, dbname);
     entry->db= db;
+    entry->db_len= strlen(db);
     entry->usage= 1;
+    entry->temporary_tables= NULL;
     /*
       Unless \exists the last assigned Worker, get a free worker based
       on a policy described in the function get_least_occupied_worker().
@@ -433,8 +545,33 @@ Slave_worker *get_slave_worker(const cha
     entry->worker= !rli->last_assigned_worker ?
       get_least_occupied_worker(workers) : rli->last_assigned_worker;
     entry->worker->usage_partition++;
+    /* 
+       relocation belonging to db temp tables from C to W via entry
+    */
+    if (need_temp_tables)
+      move_temp_tables_to_entry(thd, entry);
 
     mysql_mutex_lock(&slave_worker_hash_lock);
+
+    if (mapping_db_to_worker.records > opt_mts_partition_hash_soft_max)
+    {
+      /* remove zero-usage (todo: rare or long ago scheduled) records */
+      for (uint i= 0; i < mapping_db_to_worker.records; i++)
+      {
+        db_worker_hash_entry *entry=
+          (db_worker_hash_entry*) my_hash_element(&mapping_db_to_worker, i);
+        if (entry->usage == 0)
+        {
+          DBUG_ASSERT(!entry->temporary_tables || !entry->temporary_tables->prev);
+          DBUG_ASSERT(!thd->temporary_tables || !thd->temporary_tables->prev);
+          
+          mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
+          entry->temporary_tables= NULL;
+          my_hash_delete(&mapping_db_to_worker, (uchar*) entry);
+        }
+      }
+    }
+
     ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
     mysql_mutex_unlock(&slave_worker_hash_lock);
     if (ret)
@@ -475,7 +612,6 @@ Slave_worker *get_slave_worker(const cha
       // D-partition represents
       // the hashing conflict and is handled as the following:
 
-      THD *thd= rli->info_thd;
       const char *proc_info;
       const char info_format[]=
         "Waiting for Slave Worker %d to release partition `%s`";
@@ -483,7 +619,7 @@ Slave_worker *get_slave_worker(const cha
                      NAME_LEN + 1];
 
       DBUG_ASSERT(rli->last_assigned_worker != NULL &&
-                  rli->curr_group_assigned_parts.elements > 1);
+                  rli->curr_group_assigned_parts.elements > 0);
 
       // future assignenment and marking at the same time
       entry->worker= rli->last_assigned_worker;
@@ -500,15 +636,38 @@ Slave_worker *get_slave_worker(const cha
 
       entry->usage= 1;
       entry->worker->usage_partition++;
+    }
 
+    if (entry->usage == 1 && need_temp_tables)
+    {
+      if (!entry->temporary_tables)
+      {
+        move_temp_tables_to_entry(thd, entry);
+      }
+#ifndef DBUG_OFF      
+      else
+      {
+        for (TABLE *table= thd->temporary_tables; table; table= table->next)
+        {
+          DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
+        }
+      }
+#endif
     }
+
     mysql_mutex_unlock(&slave_worker_hash_lock);
   }
 
+  DBUG_ASSERT(entry);
+
 err:
   if (entry)
-    DBUG_PRINT("info", ("Updating %s with worker %lu", entry->db, entry->worker->id));
-    
+  {
+    DBUG_PRINT("info",
+               ("Updating %s with worker %lu", entry->db, entry->worker->id));
+    insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) &entry);
+    *ptr_entry= entry;
+  }
   DBUG_RETURN(entry ? entry->worker : NULL);
 }
 
@@ -521,16 +680,16 @@ err:
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
 {
   ulong usage= ULONG_MAX;
-  Slave_worker *current_worker= NULL, *worker= NULL;
+  Slave_worker **ptr_current_worker= NULL, *worker= NULL;
   ulong i= 0;
 
   for (i= 0; i< ws->elements; i++)
   {
-    get_dynamic(ws, (uchar*) &current_worker, i);
-    if (current_worker->usage_partition <= usage)
+    ptr_current_worker= (Slave_worker **) dynamic_array_ptr(ws, i);
+    if ((*ptr_current_worker)->usage_partition <= usage)
     {
-      worker= current_worker;
-      usage= current_worker->usage_partition;
+      worker= *ptr_current_worker;
+      usage= (*ptr_current_worker)->usage_partition;
     }
   }
   
@@ -553,11 +712,9 @@ Slave_worker *get_least_occupied_worker(
 
 void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
 {
-  int i;
-  ulong gaq_idx= ev->mts_group_cnt;
-
   if (!error)
   {
+    ulong gaq_idx= ev->mts_group_cnt;
     Slave_job_group *ptr_g=
       (Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx);
 
@@ -591,11 +748,13 @@ void Slave_worker::slave_worker_ends_gro
     last_group_done_index= gaq_idx;
   }
 
-  // cleanup relating to the last executed group regardless of error
+  /*
+    Cleanup relating to the last executed group regardless of error.
+  */
 
-  for (i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
+  for (int i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
   {
-    db_worker *entry= NULL;
+    db_worker_hash_entry *entry= NULL;
     my_hash_value_type hash_value;
     char key[NAME_LEN + 2];
 
@@ -604,7 +763,7 @@ void Slave_worker::slave_worker_ends_gro
 
     mysql_mutex_lock(&slave_worker_hash_lock);
 
-    entry= (db_worker *)
+    entry= (db_worker_hash_entry *)
       my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
                                       (uchar*) key + 1, key[0]);
 
@@ -617,6 +776,14 @@ void Slave_worker::slave_worker_ends_gro
 
     if (entry->usage == 0)
     {
+      /*
+        The detached entry's temp table list, possibly updated, remains 
+        with the entry at least until time Coordinator will deallocate it 
+        from the hash, that is either due to stop or extra size of the hash.
+      */
+
+      DBUG_ASSERT(this->info_thd->temporary_tables == 0);
+
       usage_partition--;
       if (entry->worker != this) // Coordinator is waiting
         mysql_cond_signal(&slave_worker_hash_cond);
@@ -777,14 +944,14 @@ bool circular_buffer_queue::gt(ulong i, 
    Progress in GAQ is assessed through comparision of GAQ index value 
    with Worker's @c last_group_done_index.
    Purging breaks at a first discovered gap, that is an item
-   that the assinged item->w_id'th Worker has not completed yet.
+   that the assinged item->w_id'th Worker has not yet completed.
 
    The caller is supposed to be the checkpoint handler.
 
    A copy of the last discarded item containing
    the refreshed value of the committed low-water-mark is stored
    into @c lwm container member for further caller's processing.
-   @last_done is updated with the latests total_seqno for each Worker
+   @c last_done is updated with the latest total_seqno for each Worker
    that was met during GAQ parse.
 
    @note dyn-allocated members of Slave_job_group such as
@@ -864,28 +1031,63 @@ ulong Slave_committed_queue::move_queue_
   return cnt;
 }
 
+void Slave_worker::do_report(loglevel level, int err_code, const char *msg, va_list vargs) const
+{
+  c_rli->do_report(level, err_code, msg, vargs);
+}
+
+void Slave_worker::report(loglevel level, int err_code, const char *msg, ...) const
+{
+  va_list vargs;
+  va_start(vargs, msg);
+
+  do_report(level, err_code, msg, vargs);
+  va_end(vargs);
+}
+
+/**
+   Function is called by Coordinator when it identified an event
+   requiring sequential execution. 
+   Creating sequential context for the event includes waiting
+   for the assigned to Workers tasks to be completed and their
+   resources such as temporary tables be returned to Coordinator's
+   repository.
+
+   @param  rli     Relay_log_info instance of Coordinator
+   @param  ignore  Optional Worker instance pointer if the sequential context
+                   is established due for the ignore Worker. Its resources 
+                   are to be retained.
+                   
+   @note   Resources that are not occupied by Workers such as
+           a list of temporary tables held in unused (zero-usage) records
+           of APH are relocated to the Coordinator placeholder.
+
+   @return non-negative number of released by Workers partitions 
+           (one partition by one Worker can count multiple times).
+*/
 
 int wait_for_workers_to_finish(Relay_log_info const *rli, Slave_worker *ignore)
 {
   uint ret= 0;
   HASH *hash= &mapping_db_to_worker;
+  THD *thd= rli->info_thd;
+  const char info_format[]=
+    "Waiting for Slave Worker %d to release partition `%s`";
   for (uint i= 0, ret= 0; i < hash->records; i++)
   {
-    db_worker *entry;
-    THD *thd= rli->info_thd;
+    db_worker_hash_entry *entry;
     const char *proc_info;
-    const char info_format[]=
-      "Waiting for Slave Worker %d to release partition `%s`";
     char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
                    NAME_LEN + 1];
    
     mysql_mutex_lock(&slave_worker_hash_lock);
   
-    entry= (db_worker*) my_hash_element(hash, i);
+    entry= (db_worker_hash_entry*) my_hash_element(hash, i);
 
     DBUG_ASSERT(entry);
 
-    if (ignore && entry->worker == ignore)
+    // the ignore Worker retains its active resources
+    if (ignore && entry->worker == ignore && entry->usage > 0)
     {
       mysql_mutex_unlock(&slave_worker_hash_lock);
       continue;
@@ -894,7 +1096,7 @@ int wait_for_workers_to_finish(Relay_log
     if (entry->usage > 0)
     {
       sprintf(wait_info, info_format, entry->worker->id, entry->db);
-      entry->worker= NULL;
+      entry->worker= NULL; // mark Worker to signal when  usage drops to 0
 
       proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
                                wait_info);
@@ -906,6 +1108,9 @@ int wait_for_workers_to_finish(Relay_log
     }
     else
     {
+      // resources relocation
+      mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
+      entry->temporary_tables= NULL;
       mysql_mutex_unlock(&slave_worker_hash_lock);
     }
   }

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-21 19:31:29 +0000
+++ b/sql/rpl_rli_pdb.h	2011-05-24 14:29:35 +0000
@@ -8,22 +8,33 @@
 #include <my_bitmap.h>
 
 /* APH entry */
-struct db_worker
+typedef struct st_db_worker_hash_entry
 {
+  uint  db_len;
   const char *db;
   Slave_worker *worker;
   ulong usage;
+  /*
+    The list of temp tables belonging to @ db database is
+    attached to an assigned @c worker to become its thd->temporary_tables.
+    The list is updated with every ddl incl CREATE, DROP.
+    It is removed from the entry and merged to the coordinator's thd->temporary_tables
+    in case of events: slave stops, the db-to-worker hash oversize.
+  */
+  TABLE* volatile temporary_tables;
 
-  // todo: relax concurrency after making APH mutex/cond pair has worked
-  // pthread_mutex_t
-  // pthread_cond_t
-  // timestamp updated_at;
+  /* todo: relax concurrency after making APH mutex/cond pair has worked
+     pthread_mutex_t
+     pthread_cond_t
+     timestamp updated_at; */
 
-} typedef db_worker;
+} db_worker_hash_entry;
 
 bool init_hash_workers(ulong slave_parallel_workers);
-void destroy_hash_workers();
-Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
+void destroy_hash_workers(Relay_log_info*);
+Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
+                               db_worker_hash_entry **ptr_entry,
+                               bool need_temp_tables);
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
 int wait_for_workers_to_finish(Relay_log_info const *rli,
                                Slave_worker *ignore= NULL);
@@ -202,7 +213,7 @@ public:
   ulonglong waited_overfill;
 };
 
-class Slave_worker : public Rpl_info_worker
+class Slave_worker : public Relay_log_info
 {
 public:
   Slave_worker(const char *type, const char *pfs,
@@ -214,8 +225,6 @@ public:
   Slave_jobs_queue jobs;
 
   Relay_log_info *c_rli;
-  // fixme: experimental
-  Relay_log_info *w_rli;
 
   Dynamic_ids *curr_group_exec_parts; // CGEP
   bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
@@ -250,11 +259,6 @@ public:
     We need to make this a dynamic field. /Alfranio
   */
   char partitions[FN_REFLEN];
-  char group_relay_log_name[FN_REFLEN];
-  ulonglong group_relay_log_pos;
-  char group_master_log_name[FN_REFLEN];
-  ulonglong group_master_log_pos;
-
   // todo: remove
   char checkpoint_relay_log_name[FN_REFLEN];
   ulonglong checkpoint_relay_log_pos;
@@ -273,6 +277,10 @@ public:
 
   bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
 
+  void report(loglevel level, int err_code, const char *msg, ...) const
+    ATTRIBUTE_FORMAT(printf, 4, 5);
+  void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
+
   MY_BITMAP group_execed;
   
   bool inited_group_execed;
@@ -285,6 +293,9 @@ private:
   Slave_worker(const Slave_worker& info);
 };
 
+TABLE* mts_move_temp_table_to_entry(TABLE*, THD*, db_worker_hash_entry*);
+TABLE* mts_move_temp_tables_to_thd(THD*, TABLE*);
+
 extern PSI_mutex_key *key_mutex_slave_parallel_worker;
 extern PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-04-06 12:51:58 +0000
+++ b/sql/rpl_slave.cc	2011-05-24 14:29:35 +0000
@@ -143,7 +143,7 @@ failed read"
 };
 
 
-typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_CHECKPOINT, SLAVE_THD_WORKER } SLAVE_THD_TYPE;
 
 static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
@@ -2387,7 +2387,8 @@ static int init_slave_thread(THD* thd, S
 #if !defined(DBUG_OFF)
   int simulate_error= 0;
 #endif
-  thd->system_thread = (thd_type == SLAVE_THD_SQL) ?
+  thd->system_thread= (thd_type == SLAVE_THD_WORKER) ? 
+    SYSTEM_THREAD_SLAVE_WORKER : (thd_type == SLAVE_THD_SQL) ?
     SYSTEM_THREAD_SLAVE_SQL : SYSTEM_THREAD_SLAVE_IO;
   thd->security_ctx->skip_grants();
   my_net_init(&thd->net, 0);
@@ -2998,8 +2999,6 @@ static int exec_relay_log_event(THD* thd
       MTS: since master,relay-group coordinates change per checkpoint
       at the end of the checkpoint interval UNTIL can be left far behind.
       Hence, UNTIL forces the sequential applying.
-
-      TODO: to not let to start with UNTIL whenever @@global.max_slave_workers>0.
     */
     if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
         rli->is_until_satisfied(thd, ev))
@@ -3040,47 +3039,33 @@ static int exec_relay_log_event(THD* thd
 
     exec_res= apply_event_and_update_pos(ev, thd, rli);
 
-    /*
-      Format_description_log_event should not be deleted because it will be
-      used to read info about the relay log's format; it will be deleted when
-      the SQL thread does not need it, i.e. when this thread terminates.
-    */
-    // if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+    if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
     {
-      if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
+      DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
+                  ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
+      
+      if (rli->curr_group_split)
       {
-        DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
-                    ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
-
-        if (rli->curr_group_split)
-        {
-         // the current group split status is reset
-          rli->curr_group_is_parallel= TRUE;
-          rli->curr_group_split= FALSE;
-        }
-        if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
-        {
-          /* MTS/ TODO.
-
-             ROWS_QUERY_LOG_EVENT could be supported easier if
-             destructing part of handle_rows_query_log_event would be merged
-             with rli->cleanup_context() and the rest move into 
-             ROWS...::do_apply_event
-          */
-          if (!rli->is_parallel_exec())
-            if (thd->variables.binlog_rows_query_log_events)
-              handle_rows_query_log_event(ev, rli);
-          
-          if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
-          {
-            DBUG_PRINT("info", ("Deleting the event after it has been executed"));
-            delete ev;
-            ev= NULL;
-          }
-        }
+        // the current group split status is reset
+        rli->curr_group_is_parallel= TRUE;
+        rli->curr_group_split= FALSE;
+      }
+      /*
+        Format_description_log_event should not be deleted because it will be
+        used to read info about the relay log's format; it will be deleted when
+        the SQL thread does not need it, i.e. when this thread terminates.
+        ROWS_QUERY_LOG_EVENT is destroyed at the end of the current statement
+        clean-up routine.
+      */
+      if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT &&
+          ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+      {
+        DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+        delete ev;
+        ev= NULL;
       }
     }
-
+  
     /*
       update_log_pos failed: this should not happen, so we don't
       retry.
@@ -3720,12 +3705,10 @@ pthread_handler_t handle_slave_worker(vo
     goto err;
   }
   w->info_thd= thd;
-  w->w_rli->info_thd= thd;
-
   thd->thread_stack = (char*)&thd;
   
   pthread_detach_this_thread();
-  if (init_slave_thread(thd, SLAVE_THD_SQL))  // todo: make thd->sys_thr= worker
+  if (init_slave_thread(thd, SLAVE_THD_WORKER))
   {
     // todo make SQL thread killed
     sql_print_error("Failed during slave worker initialization");
@@ -3756,9 +3739,8 @@ pthread_handler_t handle_slave_worker(vo
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
-    // Todo: add necessary stuff to clean up after Q-log-event, a Q trans
     thd->clear_error();
-    rli->cleanup_context(thd, error);
+    w->cleanup_context(thd, error);
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3871,26 +3853,6 @@ bool mts_recovery_groups(Relay_log_info 
       delete worker;
   };
 
-#if 0
-  for (uint id= 0; id < rli->slave_parallel_workers; id++)
-  {
-    Slave_worker *worker=
-      Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
-    worker->init_info();
-    retrieve_job(worker, job_file);
-    /*
-      This avoids gathering information on workers that haven't
-      processed anything.
-    */
-
-    // TODO: disregard W_i | W_i->coord < LWM
-
-    if (job_file.group_relay_log_name != NULL && strcmp(job_file.group_relay_log_name, "") &&
-        job_file.group_master_log_name != NULL && strcmp(job_file.group_master_log_name, ""))
-      insert_dynamic(&above_lwm_jobs, (uchar*) &job_file);
-  }
-#endif
-
   sort_dynamic(&above_lwm_jobs, (qsort_cmp) mts_event_coord_cmp);
   /*
     In what follows, the group Recovery Bitmap is constructed.
@@ -3962,9 +3924,11 @@ bool mts_recovery_groups(Relay_log_info 
                                       ev->log_pos };
             if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
             {
-              // hit it
-              // w.B << group_cnt++;
-              // RB |= w.B;
+              /* 
+                 hit it
+                 w.B << group_cnt++;
+                 RB |= w.B;
+              */
               for (uint i= w->checkpoint_seqno - rli->mts_recovery_group_cnt, j= 0;
                    i <= w->checkpoint_seqno; i++, j++)
               {
@@ -4178,36 +4142,41 @@ bool mts_checkpoint_routine(Relay_log_in
   if (!locked)
     mysql_mutex_lock(&rli->data_lock);
 
-  // Coordinator::commit_positions() {
-
-  // rli->gaq->lwm contains all but rli->group_master_log_name
+  /*
+    Coordinator::commit_positions() {
 
-  // group_master_log_name is updated only by Coordinator and it can't change
-  // within checkpoint interval because Coordinator flushes the updated value
-  // at once.
-  // Note, unlike group_master_log_name, event_relay_log_pos is updated solely 
-  // within Coordinator read loop context. Hence, it's possible at times 
-  // event_rlp > group_rlp.
+    rli->gaq->lwm contains all but rli->group_master_log_name
 
+    group_master_log_name is updated only by Coordinator and it can't change
+    within checkpoint interval because Coordinator flushes the updated value
+    at once.
+    Note, unlike group_master_log_name, event_relay_log_pos is updated solely 
+    within Coordinator read loop context. Hence, it's possible at times 
+    event_rlp > group_rlp.
+  */
   rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
   rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
 
   if (rli->gaq->lwm.group_relay_log_name[0] != 0)
     rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
 
-  //todo: uncomment notifies when UNTIL will be supported
+  /* 
+     todo: uncomment notifies when UNTIL will be supported
 
-  //rli->notify_group_master_log_name_update();
-  //rli->notify_group_relay_log_name_update();
+     rli->notify_group_master_log_name_update();
+     rli->notify_group_relay_log_name_update();
 
-  // todo: optimize with if (wait_flag) broadcast
-  //       waiter: set wait_flag; waits....; drops wait_flag;
+     Todo: optimize with if (wait_flag) broadcast
+         waiter: set wait_flag; waits....; drops wait_flag;
+  */
   mysql_cond_broadcast(&rli->data_cond);
   if (!locked)
     mysql_mutex_unlock(&rli->data_lock);
 
   error= rli->flush_info(TRUE);
-  // end of commit_positions
+  /*
+    } // end of commit_positions
+  */
 
   rli->reset_notified_checkpoint();
 
@@ -4229,7 +4198,6 @@ int slave_start_single_worker(Relay_log_
   pthread_t th;
   Slave_worker *w= NULL;
   Slave_job_item empty= {NULL};
-  Rpl_info_dummy *dummy_handler= NULL;
 
   if (!(w=
       Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli)))
@@ -4243,11 +4211,6 @@ int slave_start_single_worker(Relay_log_
   w->tables_to_lock= NULL;
   w->tables_to_lock_count= 0;
 
-  // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
-  // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
-  w->w_rli= new Relay_log_info(FALSE);
-  dummy_handler= new Rpl_info_dummy(rli->get_number_info_rli_fields());
-  w->w_rli->set_rpl_info_handler(dummy_handler);
   if (w->init_info())
   {
     sql_print_error("Failed during slave worker thread create");
@@ -4258,12 +4221,10 @@ int slave_start_single_worker(Relay_log_
   // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
   // entry->usage assert
   w->curr_group_exec_parts->dynamic_ids.elements= 0;
-
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
   w->checkpoint_notified= FALSE;
-  w->w_rli->workers= rli->workers; // shallow copying is sufficient
-  w->w_rli->this_worker= w;
-
+  w->workers= rli->workers; // shallow copying is sufficient
+  w->this_worker= w;
   w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
   w->id= i;
   w->current_table= NULL;
@@ -4322,7 +4283,8 @@ int slave_start_workers(Relay_log_info *
   rli->init_workers(n);
 
   // CGAP dynarray holds id:s of partitions of the Current being executed Group
-  my_init_dynamic_array(&rli->curr_group_assigned_parts, 1 + NAME_LEN + 1, SLAVE_INIT_DBS_IN_GROUP, 1);
+  my_init_dynamic_array(&rli->curr_group_assigned_parts, sizeof(db_worker_hash_entry*),
+                        SLAVE_INIT_DBS_IN_GROUP, 1);
   rli->last_assigned_worker= NULL; /* associated with curr_group_assigned */
   my_init_dynamic_array(&rli->curr_group_da, sizeof(Log_event*), 8, 2);
   // Least_occupied_workers array to hold items size of Slave_jobs_queue::len
@@ -4358,7 +4320,7 @@ int slave_start_workers(Relay_log_info *
     dyn memory to consume by Coordinator per event
   */
   init_alloc_root(&rli->mts_coor_mem_root, NAME_LEN,
-                  (MAX_DBS_IN_QUERY_MTS / 2) * NAME_LEN);
+                  (MAX_DBS_IN_EVENT_MTS / 2) * NAME_LEN);
 
   for (i= 0; i < n; i++)
   {
@@ -4438,8 +4400,6 @@ void slave_stop_workers(Relay_log_info *
     DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
     delete_dynamic(&w->jobs.Q);
     delete_dynamic_element(&rli->workers, i);
-    delete w->w_rli;
-
     delete w;
   }
 
@@ -4453,7 +4413,7 @@ void slave_stop_workers(Relay_log_info *
   DBUG_ASSERT(rli->pending_jobs == 0);
   DBUG_ASSERT(rli->mts_pending_jobs_size == 0);
 
-  destroy_hash_workers();
+  destroy_hash_workers(rli);
   delete rli->gaq;
   delete_dynamic(&rli->least_occupied_workers);    // least occupied
   delete_dynamic(&rli->curr_group_da);             // GCDA
@@ -4508,8 +4468,10 @@ pthread_handler_t handle_slave_sql(void 
 
   /* mts-II: starting the worker pool */
   if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
-    goto err;
-  
+  {
+      mysql_mutex_unlock(&rli->run_lock);
+      goto err;
+  }
   if (init_slave_thread(thd, SLAVE_THD_SQL))
   {
     /*
@@ -7124,58 +7086,6 @@ err:
 }
 
 
-/******************************************/
-/*   MTS temporary table support section  */
-
-
-/**
-   @return   a mutex that guards access to the SQL thread controlled
-             temporary tables list.
-*/
-mysql_mutex_t* mts_get_temp_table_mutex()
-{
-  return &active_mi->rli->mts_temp_tables_lock;
-}
-
-/**
-   @return a reference to THD of the Coordinator thread or NULL
-           in case of no replication is set up or it's in the sequential mode.
-*/
-THD* mts_get_coordinator_thd()
-{
-  Slave_worker *w= NULL;
-  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
-    NULL : !(w= active_mi->rli->get_current_worker()) ?
-    NULL : w->c_rli->info_thd;
-}
-
-/**
-   @return a reference to THD of a Worker thread or NULL
-           in case of no replication is set up or it's in the sequential mode.
-*/
-THD* mts_get_worker_thd()
-{
-  Slave_worker *w= NULL;
-  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
-    NULL : !(w= active_mi->rli->get_current_worker()) ?
-    NULL : w->w_rli->info_thd;
-}
-
-/**
-   @param  thd a reference to THD
-
-   @return TRUE if thd belongs to a Worker thread and FALSE otherwise.
-*/
-bool mts_is_worker(THD *thd)
-{
-  return
-    thd->slave_thread &&
-    thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
-    (mts_get_worker_thd() != NULL);
-}
-
-/* end of MTS temp table support section */
-
 /**
   @} (end of group Replication)
 */

=== modified file 'sql/rpl_slave.h'
--- a/sql/rpl_slave.h	2011-02-27 17:35:25 +0000
+++ b/sql/rpl_slave.h	2011-05-24 14:29:35 +0000
@@ -242,11 +242,6 @@ extern I_List<THD> threads;
 bool mts_recovery_groups(Relay_log_info *rli, MY_BITMAP *groups);
 bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period,
                             bool force, bool locked);
-THD* mts_get_coordinator_thd();
-THD* mts_get_worker_thd();
-mysql_mutex_t* mts_get_temp_table_mutex();
-bool mts_is_worker(THD *thd);
-
 #endif /* HAVE_REPLICATION */
 
 /* masks for start/stop operations on io and sql slave threads */

=== modified file 'sql/rpl_utility.h'
--- a/sql/rpl_utility.h	2010-09-09 18:43:16 +0000
+++ b/sql/rpl_utility.h	2011-05-24 14:29:35 +0000
@@ -27,16 +27,6 @@
 #endif
 #include "mysql_com.h"
 
-/*
-  mts-II prototype macros (once were a part of my_bitmap.h...)
-*/
-#define bit_is_set(I,B)   (sizeof(I) * CHAR_BIT > (B) ?                 \
-                           (((I) & (ULL(1) << (B))) == 0 ? 0 : 1) : -1)
-#define bit_do_set(I,B)   (sizeof(I) * CHAR_BIT > (B) ?         \
-                           ((I) |= (ULL(1) << (B)), 1) : -1)
-#define bit_do_clear(I,B) (sizeof(I) * CHAR_BIT > (B) ?         \
-                           ((I) &= ~(ULL(1) << (B)), 0) : -1)
-
 class Relay_log_info;
 
 

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sql_base.cc	2011-05-24 14:29:35 +0000
@@ -40,7 +40,6 @@
 #include "sql_handler.h" // mysql_ha_flush
 #include "sql_partition.h"                      // ALTER_PARTITION_PARAM_TYPE
 #include "log_event.h"                          // Query_log_event
-#include "rpl_slave.h"                          // MTS temp table support
 #include "sql_select.h"
 #include "sp_head.h"
 #include "sp.h"
@@ -59,6 +58,7 @@
 #include <io.h>
 #endif
 
+
 bool
 No_such_table_error_handler::handle_condition(THD *,
                                               uint sql_errno,
@@ -1192,25 +1192,11 @@ bool close_cached_connection_tables(THD 
 
 static void mark_temp_tables_as_free_for_reuse(THD *thd)
 {
-#ifndef EMBEDDED_LIBRARY
-  bool mts_slave= mts_is_worker(thd);
-  TABLE *temporary_tables= mts_slave ?
-    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
-  if (mts_slave)
-    mysql_mutex_lock(mts_get_temp_table_mutex());
-#else
-  TABLE *temporary_tables= thd->temporary_tables;
-#endif
-
-  for (TABLE *table= temporary_tables; table ; table=table->next)
+  for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
   {
     if ((table->query_id == thd->query_id) && ! table->open_by_handler)
       mark_tmp_table_for_reuse(table);
   }
-#ifndef EMBEDDED_LIBRARY
-  if (mts_slave)
-    mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
 }
 
 
@@ -1602,8 +1588,6 @@ bool close_temporary_tables(THD *thd)
   bool was_quote_show= TRUE;
   bool error= 0;
 
-  DBUG_ASSERT(!thd->slave_thread || thd->temporary_tables == NULL);
-
   if (!thd->temporary_tables)
     DBUG_RETURN(FALSE);
 
@@ -2041,29 +2025,16 @@ TABLE *find_temporary_table(THD *thd,
                             const char *table_key,
                             uint table_key_length)
 {
-  TABLE *table= NULL;
-#ifndef EMBEDDED_LIBRARY
-  bool mts_slave= mts_is_worker(thd);
-  TABLE *temporary_tables= mts_slave ?
-    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
-  if (mts_slave)
-    mysql_mutex_lock(mts_get_temp_table_mutex());  
-#else
-  TABLE *temporary_tables= thd->temporary_tables;
-#endif
-  for (table= temporary_tables; table; table= table->next)
+  for (TABLE *table= thd->temporary_tables; table; table= table->next)
   {
     if (table->s->table_cache_key.length == table_key_length &&
         !memcmp(table->s->table_cache_key.str, table_key, table_key_length))
     {
-      break;
+      return table;
     }
   }
-#ifndef EMBEDDED_LIBRARY
-  if (mts_slave)
-    mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
-  return table;
+
+  return NULL;
 }
 
 
@@ -2101,11 +2072,6 @@ TABLE *find_temporary_table(THD *thd,
 int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
 {
   TABLE *table;
-#ifndef EMBEDDED_LIBRARY
-  bool mts_slave= mts_is_worker(thd);
-#endif
-  THD *thd_temp= NULL;
-
   DBUG_ENTER("drop_temporary_table");
   DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
                           table_list->db, table_list->table_name));
@@ -2128,26 +2094,7 @@ int drop_temporary_table(THD *thd, TABLE
     unlock the table and remove the table from this list.
   */
   mysql_lock_remove(thd, thd->lock, table);
-
-#ifndef EMBEDDED_LIBRARY
-  if (mts_slave)
-  {
-    thd_temp= mts_get_coordinator_thd();
-    mysql_mutex_lock(mts_get_temp_table_mutex());
-  }
-  else
-#endif
-  {
-    thd_temp= thd;
-  }
-
-  close_temporary_table(thd_temp, table, 1, 1);
-
-#ifndef EMBEDDED_LIBRARY
-  if (mts_slave)
-     mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
-
+  close_temporary_table(thd, table, 1, 1);
   DBUG_RETURN(0);
 }
 
@@ -2178,7 +2125,7 @@ void close_temporary_table(THD *thd, TAB
       passing non-zero value to end_slave via rli->save_temporary_tables
       when no temp tables opened, see an invariant below.
     */
-    thd->temporary_tables= table->next; // mts: see drop_temporary_table()
+    thd->temporary_tables= table->next;
     if (thd->temporary_tables)
       table->next->prev= 0;
   }
@@ -2684,17 +2631,7 @@ bool open_table(THD *thd, TABLE_LIST *ta
   if (table_list->open_type != OT_BASE_ONLY &&
       ! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
   {
-#ifndef EMBEDDED_LIBRARY
-    bool mts_slave= mts_is_worker(thd);
-    TABLE *temporary_tables= mts_slave ?
-      mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
-    if (mts_slave)
-      mysql_mutex_lock(mts_get_temp_table_mutex());
-#else
-    TABLE *temporary_tables= thd->temporary_tables;
-#endif
-
-    for (table= temporary_tables; table ; table=table->next)
+    for (table= thd->temporary_tables; table ; table=table->next)
     {
       if (table->s->table_cache_key.length == key_length +
           TMP_TABLE_KEY_EXTRA &&
@@ -2714,26 +2651,14 @@ bool open_table(THD *thd, TABLE_LIST *ta
                       (ulong) table->query_id, (uint) thd->server_id,
                       (ulong) thd->variables.pseudo_thread_id));
 	  my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias);
-#ifndef EMBEDDED_LIBRARY
-          if (mts_slave)
-            mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
 	  DBUG_RETURN(TRUE);
 	}
 	table->query_id= thd->query_id;
 	thd->thread_specific_used= TRUE;
         DBUG_PRINT("info",("Using temporary table"));
-#ifndef EMBEDDED_LIBRARY
-        if (mts_slave)
-          mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
         goto reset;
       }
     }
-#ifndef EMBEDDED_LIBRARY
-    if (mts_slave)
-      mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
   }
 
   if (table_list->open_type == OT_TEMPORARY_ONLY ||
@@ -5926,28 +5851,14 @@ TABLE *open_table_uncached(THD *thd, con
 
   if (add_to_temporary_tables_list)
   {
-#ifndef EMBEDDED_LIBRARY
-    TABLE **ptr_temporary_tables;
-    bool mts_slave= mts_is_worker(thd);
-    ptr_temporary_tables= mts_slave? 
-      &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
-    if (mts_slave)
-      mysql_mutex_lock(mts_get_temp_table_mutex());
-#else
-    TABLE **ptr_temporary_tables= &thd->temporary_tables;
-#endif
     /* growing temp list at the head */
-    tmp_table->next= *ptr_temporary_tables;
+    tmp_table->next= thd->temporary_tables;
     if (tmp_table->next)
       tmp_table->next->prev= tmp_table;
-    *ptr_temporary_tables= tmp_table;
-    (*ptr_temporary_tables)->prev= 0;
+    thd->temporary_tables= tmp_table;
+    thd->temporary_tables->prev= 0;
     if (thd->slave_thread)
       slave_open_temp_tables++;
-#ifndef EMBEDDED_LIBRARY
-    if (mts_slave)
-       mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
   }
   tmp_table->pos_in_table_list= 0;
   DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,

=== modified file 'sql/sql_binlog.cc'
--- a/sql/sql_binlog.cc	2010-12-02 13:44:21 +0000
+++ b/sql/sql_binlog.cc	2011-05-19 09:36:28 +0000
@@ -284,16 +284,14 @@ void mysql_client_binlog_statement(THD* 
         will be used to read info about the relay log's format; it
         will be deleted when the SQL thread does not need it,
         i.e. when this thread terminates.
+        ROWS_QUERY_LOG_EVENT if present in rli is deleted at the end
+        of the event.
       */
-      if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+      if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT &&
+          ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
       {
-        if (thd->variables.binlog_rows_query_log_events)
-          handle_rows_query_log_event(ev, rli);
-        if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
-        {
-          delete ev;
-          ev= NULL;
-        }
+        delete ev;
+        ev= NULL;
       }
       if (err)
       {

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sql_class.cc	2011-05-24 14:29:35 +0000
@@ -503,7 +503,7 @@ THD::THD()
    user_time(0), in_sub_stmt(0),
    binlog_unsafe_warning_flags(0),
    binlog_table_maps(0),
-   binlog_updated_db_names(NULL),
+   binlog_accessed_db_names(NULL),
    table_map_for_update(0),
    arg_of_last_insert_id_function(FALSE),
    first_successful_insert_id_in_prev_stmt(0),
@@ -1398,7 +1398,7 @@ void THD::cleanup_after_query()
     stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
     auto_inc_intervals_in_cur_stmt_for_binlog.empty();
     rand_used= 0;
-    binlog_updated_db_names= NULL;
+    binlog_accessed_db_names= NULL;
   }
   if (first_successful_insert_id_in_cur_stmt > 0)
   {

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-02-27 17:35:25 +0000
+++ b/sql/sql_class.h	2011-05-24 14:29:35 +0000
@@ -1724,7 +1724,7 @@ private:
   /*
     MTS: db names listing to be updated by the query databases
   */
-  List<char> *binlog_updated_db_names;
+  List<char> *binlog_accessed_db_names;
 
 public:
   void issue_unsafe_warnings();
@@ -1737,25 +1737,25 @@ public:
   }
 
   /*
-    MTS: accessor to binlog_updated_db_names list
+    MTS: accessor to binlog_accessed_db_names list
   */
-  List<char> * get_binlog_updated_db_names() {
-    return binlog_updated_db_names;
+  List<char> * get_binlog_accessed_db_names() {
+    return binlog_accessed_db_names;
   }
 
   /*
-     MTS: initializer of binlog_updated_db_names list
+     MTS: initializer of binlog_accessed_db_names list
   */
-  void set_binlog_updated_db_names(List<char>* arg)
+  void set_binlog_accessed_db_names(List<char>* arg)
   {
-    binlog_updated_db_names= arg;
+    binlog_accessed_db_names= arg;
   }
 
   /*
-     MTS: resetter of binlog_updated_db_names list normally
+     MTS: resetter of binlog_accessed_db_names list normally
      at the end of the query execution
   */
-  void clear_binlog_updated_db_names() { binlog_updated_db_names= NULL; }
+  void clear_binlog_accessed_db_names() { binlog_accessed_db_names= NULL; }
 
   /* MTS: method inserts a new unique name into binlog_updated_dbs */
   void add_to_binlog_updated_dbs(const char *db);
@@ -1766,8 +1766,8 @@ public:
   */
   void add_one_db_to_binlog_updated_dbs(const char *db)
   {
-    set_binlog_updated_db_names(new List<char>);
-    binlog_updated_db_names->push_back(strdup_root(mem_root, db));
+    set_binlog_accessed_db_names(new List<char>);
+    binlog_accessed_db_names->push_back(strdup_root(mem_root, db));
   }
 
 #endif /* MYSQL_CLIENT */

=== modified file 'sql/sql_rename.cc'
--- a/sql/sql_rename.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sql_rename.cc	2011-05-24 14:29:35 +0000
@@ -318,9 +318,9 @@ do_rename(THD *thd, TABLE_LIST *ren_tabl
       break;
   }
 
-  if (!thd->get_binlog_updated_db_names())
+  if (!thd->get_binlog_accessed_db_names())
   {
-    thd->set_binlog_updated_db_names(new List<char>);
+    thd->set_binlog_accessed_db_names(new List<char>);
   }
   thd->add_to_binlog_updated_dbs(ren_table->db);
   thd->add_to_binlog_updated_dbs(new_db);

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sql_table.cc	2011-05-24 14:29:35 +0000
@@ -2237,9 +2237,9 @@ int mysql_rm_table_no_locks(THD *thd, TA
                   table->mdl_request.ticket != NULL));
 
     /* MTS: similarly to decide_logging_format() gathering of the db names */
-    if (!thd->get_binlog_updated_db_names())
+    if (!thd->get_binlog_accessed_db_names())
     {
-      thd->set_binlog_updated_db_names(new List<char>);
+      thd->set_binlog_accessed_db_names(new List<char>);
     }
     thd->add_to_binlog_updated_dbs(table->db);
 
@@ -5953,9 +5953,9 @@ bool mysql_alter_table(THD *thd,char *ne
   if (!new_db || !my_strcasecmp(table_alias_charset, new_db, db))
     new_db= db;
 
-  if (!thd->get_binlog_updated_db_names())
+  if (!thd->get_binlog_accessed_db_names())
   {
-    thd->set_binlog_updated_db_names(new List<char>);
+    thd->set_binlog_accessed_db_names(new List<char>);
   }
   thd->add_to_binlog_updated_dbs(db);
   if (new_db != db)

=== modified file 'sql/sql_view.cc'
--- a/sql/sql_view.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sql_view.cc	2011-05-24 14:29:35 +0000
@@ -1683,9 +1683,9 @@ bool mysql_drop_view(THD *thd, TABLE_LIS
       }
       continue;
     }
-    if (!thd->get_binlog_updated_db_names())
+    if (!thd->get_binlog_accessed_db_names())
     {
-      thd->set_binlog_updated_db_names(new List<char>);
+      thd->set_binlog_accessed_db_names(new List<char>);
     }
     thd->add_to_binlog_updated_dbs(view->db);
     if (mysql_file_delete(key_file_frm, path, MYF(MY_WME)))


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110524142935-0bcifpp0qvpa5sq7.bundle
Thread
bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3275 to 3279)WL#5569 WL#5754Andrei Elkin24 May