List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 27 2011 9:29pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3283) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3283 Andrei Elkin	2011-05-28
      wl#5569 MTS
      
      Implementation of giving out the applier role to Worker for all cases but
      ones dealing with the Coordinators state.
      That includes Query event with over-max-db:s and Load-data related events.
      The current patch also makes old master binlog be handled by MTS though
      sometimes e.g for Query event to switch to the sequential mode.
      
      Fixing a race condition making C to wait endlessly if a Worker has exitted due to its
      applying error.
     @ mysql-test/suite/rpl/r/rpl_parallel_ddl.result
        results updated.
     @ mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
        results updated.
     @ mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
        results updated.
     @ mysql-test/suite/rpl/t/disabled.def
        Restoring tree tests as this patch makes them runable.
     @ mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
        test can't run in MTS because of trans retry.
     @ mysql-test/suite/rpl/t/rpl_dual_pos_advance.test
        test can't run in MTS because of Until option of START SLAVE is not yet supported by  MTS.
     @ mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt
        rpl_parallel tests need --slave-transaction-retries=0
     @ mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt
        rpl_parallel tests need --slave-transaction-retries=0
     @ mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt
        rpl_parallel tests need --slave-transaction-retries=0
     @ mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt
        rpl_parallel tests need --slave-transaction-retries=0
     @ sql/event_parse_data.cc
        Pleasing some tests.
     @ sql/log_event.cc
        Making a group of event w/o B/C braces be handled by Worker.
        Such group can happen from an old master or the current master bilogging 
        some SP queries.
        Also over-max db:s events are made to be handled by Worker.
        Coordinator only handles asyncrounously events dealing with Relay-log state
        and synchrounously events dealing with checkpoint changes (master-group coordinates).
        Also few types of events from OM are left to Coordinator to execute.
     @ sql/log_event.h
        Leaving in mts_sequential_exec() only events that either
        can deal with Coordinator state, or are from old master.
        Making  Query_log_event::mts_get_dbs  to return a list with 
        a magic ""-empty string partition name
        in case of over-max db:s query.
        The empty magic is converted into a record to APH to indicate
        the whole hash records lock.
     @ sql/rpl_rli_pdb.cc
        Changes due to redifining an object responsible to hold assigned partitions
        in few methods incl  Slave_worker::slave_worker_ends_group().
        Some cleanup in get_slave_worker().
     @ sql/rpl_rli_pdb.h
        Redifining an object responsible to hold assigned partitions.
        Now it's a Dyn-array holding *pointers* to records on Assigned Partition Hash.
        That simplifies few routines for Worker. E.g search for the records (entries of APH) by Worker at time
        of committing.
     @ sql/rpl_slave.cc
        Streamlining Workers state identification with a boolean running_status;
        worker start and stop are controlled by means of the disignator.

    added:
      mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt
    modified:
      mysql-test/suite/rpl/r/rpl_parallel_ddl.result
      mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
      mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
      mysql-test/suite/rpl/t/disabled.def
      mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
      mysql-test/suite/rpl/t/rpl_dual_pos_advance.test
      mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt
      sql/event_parse_data.cc
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_ddl.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_ddl.result	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_ddl.result	2011-05-27 21:29:14 +0000
@@ -4,8 +4,6 @@ include/stop_slave.inc
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
 set @@global.mts_slave_parallel_workers= 4;
 include/start_slave.inc
-Warnings:
-Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 include/diff_tables.inc [master:d32.t8, slave:d32.t8]
 include/diff_tables.inc [master:d32.t7, slave:d32.t7]
 include/diff_tables.inc [master:d32.t6, slave:d32.t6]

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_multi_db.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	2011-05-27 21:29:14 +0000
@@ -4,8 +4,6 @@ include/stop_slave.inc
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
 set @@global.mts_slave_parallel_workers= 4;
 include/start_slave.inc
-Warnings:
-Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 create database d8;
 create table d8.t8 (a int);
 select round(rand()*8) into @var;

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_temp_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result	2011-05-25 07:36:36 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result	2011-05-27 21:29:14 +0000
@@ -6,8 +6,6 @@ include/stop_slave.inc
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
 set @@global.mts_slave_parallel_workers= 4;
 include/start_slave.inc
-Warnings:
-Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
 create database d2;
 use d2;
 create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;

=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def	2011-05-26 17:03:08 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def	2011-05-27 21:29:14 +0000
@@ -16,6 +16,3 @@ rpl_row_event_max_size    : Bug#55675 20
 rpl_delayed_slave         : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
 rpl_log_pos               : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
 
-rpl_sp_effects	  	  : bug@MTS SELECT with sf() logging does not follow the correct pattern (no BEGIN/COMMIT) Thu May 26 15:48:06 EEST 2011 Andrei
-rpl_auto_increment_bug33029  : same as rpl_sp_effects
-rpl.rpl_cross_version	  : same as rpl_sp_effects

=== modified file 'mysql-test/suite/rpl/t/rpl_deadlock_innodb.test'
--- a/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test	2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test	2011-05-27 21:29:14 +0000
@@ -2,3 +2,5 @@
 -- source include/have_innodb.inc
 let $engine_type=innodb;
 -- source extra/rpl_tests/rpl_deadlock.test
+# --slave-transaction-retries=0 in MTS
+-- source include/not_mts_slave_parallel_workers.inc

=== modified file 'mysql-test/suite/rpl/t/rpl_dual_pos_advance.test'
--- a/mysql-test/suite/rpl/t/rpl_dual_pos_advance.test	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_dual_pos_advance.test	2011-05-27 21:29:14 +0000
@@ -7,6 +7,8 @@
 # It also will test BUG#13861.
 
 source include/have_innodb.inc;
+# Until option of START SLAVE is not yet supported by  MTS
+source include/not_mts_slave_parallel_workers.inc;
 
 --let $rpl_topology= 1->2->1
 --source include/rpl_init.inc

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt	2011-05-27 21:29:14 +0000
@@ -0,0 +1 @@
+--slave-transaction-retries=0

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt	2011-05-26 17:03:08 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt	2011-05-27 21:29:14 +0000
@@ -1,4 +1,4 @@
---log-warnings=0 --slave-transaction-=0 --innodb_flush_log_at_trx_commit=0  --skip-log-bin --skip-log-slave-updates --sync_binlog=0
+--log-warnings=0 --slave-transaction-retries=0 --innodb_flush_log_at_trx_commit=0  --skip-log-bin --skip-log-slave-updates --sync_binlog=0
 
 
 

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt	2011-05-27 21:29:14 +0000
@@ -1 +1,2 @@
---thread_stack=512K
+--thread_stack=512K --slave-transaction-retries=0
+

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt	2011-05-27 21:29:14 +0000
@@ -0,0 +1 @@
+ --log-warnings=0 --slave-transaction-retries=0

=== modified file 'sql/event_parse_data.cc'
--- a/sql/event_parse_data.cc	2010-07-02 02:58:51 +0000
+++ b/sql/event_parse_data.cc	2011-05-27 21:29:14 +0000
@@ -566,6 +566,7 @@ void Event_parse_data::check_originator_
 {
   /* Disable replicated events on slave. */
   if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL) ||
+      (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) ||
       (thd->system_thread == SYSTEM_THREAD_SLAVE_IO))
   {
     DBUG_PRINT("info", ("Invoked object status set to SLAVESIDE_DISABLED."));

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-05-26 17:03:08 +0000
+++ b/sql/log_event.cc	2011-05-27 21:29:14 +0000
@@ -2361,7 +2361,8 @@ bool Log_event::contains_partition_info(
     // todo: Query event is limitly supported
     // which ev->get_db() yields the session db not the actual db
       
-    (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group());
+    (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group()) ||
+    (get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
 }
 
 /**
@@ -2401,13 +2402,20 @@ bool Log_event::contains_partition_info(
 Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli)
 {
   Slave_worker *worker= NULL;
-  Slave_job_group g;
+  Slave_job_group g, *ptr_g;
   bool is_b_event;
 
-  /* checking properties and perform corresponding actions */
+  /* checking partioning properties and perform corresponding actions */
 
-  // Beginning of a group or a DDL
-  if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
+  // Beginning of a group designated explicitly with BEGIN
+  if ((is_b_event= starts_group()) ||
+      // or DDL:s or autocommit queries possibly associated with own p-events
+      (!rli->curr_group_seen_begin &&
+       // the following is a case of no-B group: { p_1,p_2,...,p_k, g}
+       (rli->gaq->empty() ||
+        ((Slave_job_group *)
+         dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
+        worker_id != (ulong) -1)))
   {
     ulong gaq_idx;
     rli->mts_total_groups++;
@@ -2436,6 +2444,7 @@ Slave_worker *Log_event::get_slave_worke
                 group_relay_log_name == NULL);
     DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
     DBUG_ASSERT(rli->last_assigned_worker == NULL);
+
     if (is_b_event)
     {
       Log_event *ptr_curr_ev= this;
@@ -2460,60 +2469,75 @@ Slave_worker *Log_event::get_slave_worke
     List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
     it++;
 
-    if (num_dbs != OVER_MAX_DBS_IN_EVENT_MTS)
-    {
-      do
-      {
-        char **ref_cur_db= it.ref();
-
-        if (!(rli->last_assigned_worker=
-              get_slave_worker(*ref_cur_db, rli,
-                               &mts_assigned_partitions[i],
-                               get_type_code() == QUERY_EVENT)))
-        {
-          for (uint k= 0; k < rli->curr_group_da.elements; k++)
-          { 
-            delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
-          }
-          return NULL;
-        }
-
-        DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
-        DBUG_ASSERT(rli->last_assigned_worker ==
-                    mts_assigned_partitions[i]->worker);
-        DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
-
-        i++;
-      } while (it++);
-    }
-    else
+    if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
     {
       // Temporary tables of Coordinator are relocated by Worker
       if (!rli->last_assigned_worker)
         rli->last_assigned_worker=
           *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
+
+      if (!rli->curr_group_isolated)
+        (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
+      rli->curr_group_isolated= TRUE;
     }
-    worker= rli->last_assigned_worker;
 
-    get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-    if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+    do
     {
-      g.worker_id= worker->id;
-      set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+      char **ref_cur_db= it.ref();
+      
+      if (!(rli->last_assigned_worker=
+            get_slave_worker(*ref_cur_db, rli,
+                             &mts_assigned_partitions[i],
+                             get_type_code() == QUERY_EVENT)))
+      {
+        // destroy buffered events of the current group prior to exit
+        for (uint k= 0; k < rli->curr_group_da.elements; k++)
+        { 
+          delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+        }
+        return NULL;
+      }
 
-      DBUG_ASSERT(g.group_relay_log_name == NULL);
+      DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
+      DBUG_ASSERT(rli->last_assigned_worker ==
+                  mts_assigned_partitions[i]->worker);
+      DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
+
+      i++;
+    } while (it++);
+
+    worker= rli->last_assigned_worker;
+    if ((ptr_g= ((Slave_job_group *)
+                 dynamic_array_ptr(&rli->gaq->Q,
+                                   rli->gaq->assigned_group_index)))-> worker_id
+        == (ulong) -1)
+    {
+      ptr_g->worker_id= worker->id;
+      
+      DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
     }
 
     DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
 
+    /* 
+       Either old master binlog (todo: assert), or a specific "corner"
+       case (todo: wrap with BEGIN/COMMIT on master anyway) of logging 
+       like SELECT sf(), where sf() has a side effect.
+    */
+    if (!rli->curr_group_seen_begin)
+      rli->curr_group_is_parallel= TRUE;
+
     // TODO: convert to C's private mem_root.
 
     // Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
     // point because the root is no longer needed along remained part of Coordinator's
     // execution flow.
+    
     free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
   }
-  else // a mini-group internal "regular" event
+  else 
+  {
+    // a mini-group internal "regular" event
     if (rli->last_assigned_worker)
     {
       worker= rli->last_assigned_worker;
@@ -2527,30 +2551,36 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(get_type_code() == INTVAR_EVENT ||
                   get_type_code() == RAND_EVENT ||
                   get_type_code() == USER_VAR_EVENT ||
-                  get_type_code() == ROWS_QUERY_LOG_EVENT);
+                  get_type_code() == ROWS_QUERY_LOG_EVENT ||
+                  get_type_code() == BEGIN_LOAD_QUERY_EVENT);
 
       insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
       
       if (!rli->curr_group_seen_begin)
       {
-        // TODO: fix the master side to wrap with B/T cases like
-        // `set @user_var, select f()' that are logged w/o B-event
-        // Notice, while the select-f() can be mended in the current
-        // master version, the old server binlogs can't brought to MTS because
-        // of not following B|T-bracing rule for DML events.
-        DBUG_ASSERT(0);
+        /*
+          This is a case of B/T-less group like
+          `set @user_var, select f()' that are logged w/o B-event.
+          Notice, while the select-f() can be mended in the current
+          master version, the old server binlogs can't since it bring in
+          the same B/T-less {p, g} group.
+        */
+        DBUG_ASSERT(rli->curr_group_da.elements > 0);
+      }
+      else
+      {
+        DBUG_ASSERT(rli->curr_group_da.elements > 1);
       }
-
-      DBUG_ASSERT(rli->curr_group_da.elements > 1);
+      return NULL;
     }
+  }
 
   // the group terminal event (Commit, Xid or a DDL query)
   if (ends_group() || !rli->curr_group_seen_begin)
   {
     uint i;
     mts_group_cnt= rli->gaq->assigned_group_index;
-    Slave_job_group *ptr_g=
-      (Slave_job_group *)
+    ptr_g= (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
     DBUG_ASSERT(rli->curr_group_is_parallel);
@@ -2798,7 +2828,7 @@ int Log_event::apply_event(Relay_log_inf
   Slave_worker *w= NULL;
   Slave_job_item item= {NULL}, *job_item= &item;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
-  bool parallel, seq_event, term_event;
+  bool parallel= FALSE, async_event= FALSE, seq_event= FALSE, term_event= FALSE;
 
   if (rli->is_mts_recovery())
   {
@@ -2820,40 +2850,65 @@ int Log_event::apply_event(Relay_log_inf
   }
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      ((seq_event= mts_sequential_exec()) &&
-       (!rli->curr_group_seen_begin ||
-        mts_async_exec_by_coordinator(::server_id))))
+      (async_event= mts_async_exec_by_coordinator(::server_id)) ||
+      (seq_event= mts_sequential_exec()))
   {
     if (parallel)
     {
       /* 
          There are two classes of events that Coordinator executes
-         itself. One requires all Workers to finish up their assignments.
-         The other does not need (actually can not have) this synchronization.
+         itself. One e.g the master Rotate requires all Workers to finish up 
+         their assignments. The other async class, e.g the slave Rotate,
+         can't have this such synchronization because Worker might be waiting
+         for terminal events to finish.
       */
 
-      if (!mts_async_exec_by_coordinator(::server_id))
-      {
+      if (!async_event)
+      {     
         /*
           this  event does not split the current group but is indeed
           a separator beetwen two master's binlog therefore requiring
           Workers to sync.
         */
 
-        DBUG_ASSERT(!rli->curr_group_seen_begin);
+        if (rli->curr_group_da.elements > 0)
+        {
+          /* 
+             Possible reason is a old version binlog sequential event
+             wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var.
+             MTS has to stop to suggest restart in the permanent sequential mode.
+          */
+
+          // TODO: improve err msg
+          rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+                      ER(ER_SLAVE_FATAL_ERROR),
+                      "Can't execute all binlog event in parallel mode");
+
+          // destroy possible buffered events of the current group prior to exit
+          for (uint k= 0; k < rli->curr_group_da.elements; k++)
+          { 
+            delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+          }
 
+          DBUG_RETURN(-1);
+        }
         /*
           Marking sure the event won't be executed in parallel.
           That affects memory deallocation in the following execution path.
         */
         c_rli->curr_group_is_parallel= FALSE;
         (void) wait_for_workers_to_finish(rli);
-        
-        /* any Worker is idle as done through wait_for_workers_to_finish */
-        DBUG_ASSERT((*(Slave_worker **)
-                     dynamic_array_ptr(&c_rli->workers,
-                                       rand() % c_rli->workers.elements))->
-                    usage_partition == 0);
+
+#ifndef DBUG_OFF
+        /* all Workers are idle as done through wait_for_workers_to_finish */
+        for (uint k= 0; k < c_rli->curr_group_da.elements; k++)
+        {
+          DBUG_ASSERT(!(*(Slave_worker **)
+                        dynamic_array_ptr(&c_rli->workers, k))->usage_partition);
+          DBUG_ASSERT(!(*(Slave_worker **)
+                        dynamic_array_ptr(&c_rli->workers, k))->jobs.len);
+        }
+#endif
       }
       else
       {
@@ -2875,13 +2930,6 @@ int Log_event::apply_event(Relay_log_inf
   DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
               rli->last_assigned_worker);
 
-  if (seq_event)
-  {   // rli->last_assigned_worker != NULL if BTQ but not BQT
-    DBUG_ASSERT(rli->curr_group_seen_begin || ends_group());
-    if (!c_rli->curr_group_isolated)
-      (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
-    c_rli->curr_group_isolated= TRUE;
-   }
   // getting Worker's id
   if ((!(w= get_slave_worker_id(c_rli)) ||
        DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
@@ -2986,6 +3034,7 @@ int slave_worker_exec_job(Slave_worker *
   struct slave_job_item item= {NULL}, *job_item= &item;
   THD *thd= w->info_thd;
   Log_event *ev= NULL;
+  bool part_event= FALSE;
 
   DBUG_ENTER("slave_worker_exec_job");
 
@@ -3012,42 +3061,46 @@ int slave_worker_exec_job(Slave_worker *
   } 
   else
   {
-    if (ev->contains_partition_info() &&
-        ev->mts_number_dbs() < OVER_MAX_DBS_IN_EVENT_MTS)
+    if ((part_event= ev->contains_partition_info()))
     {
-      List_iterator<char> it(*ev->mts_get_dbs(thd->mem_root));
-      DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
-      
-      while (it++)
+      uint num_dbs=  ev->mts_number_dbs();
+      DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
+
+      if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+        num_dbs= 1;
+
+      DBUG_ASSERT(num_dbs > 0);
+
+      for (uint k= 0; k < num_dbs; k++)
       {
         bool found= FALSE;
-        char key[NAME_LEN + 2];
-        const char *dbname= *it.ref();
-        uchar dblength= (uint) strlen(dbname);
 
         for (uint i= 0; i < ep->elements && !found; i++)
         {
-          get_dynamic(ep, (uchar*) key, i);
           found=
-            (key[0] == dblength) &&
-            (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+            (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
+            ev->mts_assigned_partitions[k];
         }
         if (!found)
         {
-          key[0]= dblength;
-          memcpy(key + 1, dbname, dblength + 1);
-          insert_dynamic(ep, (uchar*) key);
+          insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
         }
       }
     }
   }
   w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
   error= ev->do_apply_event_worker(w);
-  
-  if (ev->ends_group() || !w->curr_group_seen_begin)
-  {
-    DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed  %lu", ev->mts_group_cnt, w->last_group_done_index));
-
+  if (ev->ends_group() || (!w->curr_group_seen_begin && 
+                           /* 
+                              p-events of B/T-less {p,g} group (see
+                              legends of Log_event::get_slave_worker)
+                              obviously can't commit.
+                           */
+                           part_event))
+  {
+    DBUG_PRINT("slave_worker_exec_job:",
+               (" commits GAQ index %lu, last committed  %lu",
+                ev->mts_group_cnt, w->last_group_done_index));
     w->slave_worker_ends_group(ev, error); /* last done sets post exec */
   }
 

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-05-24 14:29:35 +0000
+++ b/sql/log_event.h	2011-05-27 21:29:14 +0000
@@ -1235,7 +1235,11 @@ public:
 public:
 
   /**
-     MST: to execute serially due to technical or conceptual limitation
+     MST: to execute some event types serially.
+
+     @note There are incompatile combinations such the referred event
+           is wrapped with BEGIN/COMMIT. Such cases should be identified
+           by the caller and treates as an error.
      
      @return TRUE if despite permanent parallel execution mode an event
                   needs applying in a real isolation that is sequentially.
@@ -1243,15 +1247,6 @@ public:
   bool mts_sequential_exec()
   {
     return
-      /* 
-         the 4 types below are limitly parallel-supported (the default 
-         session db not the actual db).
-         Decision on BEGIN, COMMIT, Xid is the parallel.
-      */
-      (get_type_code() == QUERY_EVENT &&
-       !starts_group() && !ends_group() &&
-       (mts_number_dbs() ==  OVER_MAX_DBS_IN_EVENT_MTS)) ||
-
       get_type_code() == START_EVENT_V3          ||
       get_type_code() == STOP_EVENT              ||
       get_type_code() == ROTATE_EVENT            ||
@@ -1262,12 +1257,8 @@ public:
       get_type_code() == EXEC_LOAD_EVENT         ||
       get_type_code() == DELETE_FILE_EVENT       ||
       get_type_code() == NEW_LOAD_EVENT          ||
+
       get_type_code() == FORMAT_DESCRIPTION_EVENT||
-      get_type_code() == BEGIN_LOAD_QUERY_EVENT  ||
-      get_type_code() == EXECUTE_LOAD_QUERY_EVENT|| /* todo: make parallel */
-      get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
-      get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
-      get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
 
       get_type_code() == INCIDENT_EVENT;
   }
@@ -1909,10 +1900,16 @@ public:
   {
     List<char> *res= new (mem_root) List<char>;
     if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
-      res->push_back((char*) get_db());
+    {
+      // "" == empty string db name is special to indicate sequential applying
+      mts_accessed_db_names[0][0]= 0;
+      res->push_back((char*) mts_accessed_db_names[0]);
+    }
     else
+    {
       for (uchar i= 0; i < mts_accessed_dbs; i++)
         res->push_back(mts_accessed_db_names[i]);
+    }
     return res;
   }
 
@@ -3070,7 +3067,8 @@ public:
   bool write(IO_CACHE* file);
   const char* get_db() { return db; }
 #endif
-
+  /* MTS executes this event sequentially */
+  virtual uchar mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-05-26 17:03:08 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-05-27 21:29:14 +0000
@@ -29,9 +29,9 @@ const char *info_slave_worker_fields []=
 
 Slave_worker::Slave_worker(const char* type, const char* pfs,
                            Relay_log_info *rli)
-  : Relay_log_info(FALSE), c_rli(rli), curr_group_exec_parts(0),
-  checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
-  inited_group_execed(0)
+  : Relay_log_info(FALSE), c_rli(rli),
+    checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
+    inited_group_execed(0), running_status(FALSE)
 {
   checkpoint_relay_log_name[0]= 0;
   checkpoint_master_log_name[0]= 0;
@@ -39,8 +39,7 @@ Slave_worker::Slave_worker(const char* t
 
 Slave_worker::~Slave_worker() 
 {
-  if (curr_group_exec_parts)
-    delete curr_group_exec_parts;
+  delete_dynamic(&curr_group_exec_parts);
 
   if (inited_group_execed)
     bitmap_free(&group_execed);
@@ -55,8 +54,8 @@ int Slave_worker::init_info()
   if (inited)
     DBUG_RETURN(0);
 
-  if (!(curr_group_exec_parts= new Database_ids(NAME_LEN)))
-    goto err;
+  my_init_dynamic_array(&curr_group_exec_parts, sizeof(db_worker_hash_entry*),
+                        SLAVE_INIT_DBS_IN_GROUP, 1);
 
   if (bitmap_init(&group_execed, NULL,
                   c_rli->checkpoint_group, FALSE))
@@ -273,7 +272,7 @@ static void free_entry(db_worker_hash_en
   DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
 
   DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
-  DBUG_ASSERT(entry->usage == 0);
+  DBUG_ASSERT(entry->usage == 0 || !entry->worker->running_status);
 
   mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
   entry->temporary_tables= NULL;
@@ -454,6 +453,8 @@ static void move_temp_tables_to_entry(TH
         scheduled the event, and goes back into the parallel mode
 
    @param  dbname    pointer to c-string containing database name
+                     It can be empty string to indicate specific locking
+                     to faciliate sequential applying.
    @param  rli       pointer to Coordinators relay-log-info instance
    @param  ptr_entry reference to a pointer to the resulted entry in
                      the Assigne Partition Hash where
@@ -481,7 +482,6 @@ Slave_worker *get_slave_worker(const cha
   my_hash_value_type hash_value;
   uchar dblength= (uint) strlen(dbname);
 
-  DBUG_ASSERT(dblength != 0);
 
   // Search in CGAP
   for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
@@ -545,11 +545,6 @@ Slave_worker *get_slave_worker(const cha
     entry->worker= !rli->last_assigned_worker ?
       get_least_occupied_worker(workers) : rli->last_assigned_worker;
     entry->worker->usage_partition++;
-    /* 
-       relocation belonging to db temp tables from C to W via entry
-    */
-    if (need_temp_tables)
-      move_temp_tables_to_entry(thd, entry);
 
     mysql_mutex_lock(&slave_worker_hash_lock);
 
@@ -574,8 +569,6 @@ Slave_worker *get_slave_worker(const cha
 
     ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
 
-    mysql_mutex_unlock(&slave_worker_hash_lock);
-
     if (ret)
     {
       my_free(db);
@@ -634,26 +627,36 @@ Slave_worker *get_slave_worker(const cha
       entry->usage= 1;
       entry->worker->usage_partition++;
     }
+  }
 
-    if (entry->usage == 1 && need_temp_tables)
+  /* 
+     relocation belonging to db temp tables from C to W via entry
+  */
+  if (entry->usage == 1 && need_temp_tables)
+  {
+    if (!entry->temporary_tables)
     {
-      if (!entry->temporary_tables)
+      if (entry->db_len != 0)
       {
         move_temp_tables_to_entry(thd, entry);
       }
-#ifndef DBUG_OFF      
       else
       {
-        for (TABLE *table= thd->temporary_tables; table; table= table->next)
-        {
-          DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
-        }
+        entry->temporary_tables= thd->temporary_tables;
+        thd->temporary_tables= NULL;
       }
-#endif
     }
-
-    mysql_mutex_unlock(&slave_worker_hash_lock);
+#ifndef DBUG_OFF      
+    else
+    {
+      for (TABLE *table= thd->temporary_tables; table; table= table->next)
+      {
+        DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
+      }
+    }
+#endif
   }
+  mysql_mutex_unlock(&slave_worker_hash_lock);
 
   DBUG_ASSERT(entry);
 
@@ -748,24 +751,15 @@ void Slave_worker::slave_worker_ends_gro
   /*
     Cleanup relating to the last executed group regardless of error.
   */
+  DYNAMIC_ARRAY *ep= &curr_group_exec_parts;
 
-  for (int i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
+  for (int i= ep->elements; i > 0; i--)
   {
-    db_worker_hash_entry *entry= NULL;
-    my_hash_value_type hash_value;
-    char key[NAME_LEN + 2];
-
-    get_dynamic(&(curr_group_exec_parts->dynamic_ids), (uchar*) key, i - 1);
-    hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) key + 1, key[0]);
-
+    db_worker_hash_entry *entry=
+      (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i - 1));
     mysql_mutex_lock(&slave_worker_hash_lock);
 
-    entry= (db_worker_hash_entry *)
-      my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
-                                      (uchar*) key + 1, key[0]);
-
-    DBUG_ASSERT(entry && entry->usage != 0); // was used to break
-    DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
+    DBUG_ASSERT(entry && entry->usage != 0);
 
     entry->usage--;
 
@@ -783,14 +777,19 @@ void Slave_worker::slave_worker_ends_gro
 
       usage_partition--;
       if (entry->worker != this) // Coordinator is waiting
+      {
+#ifndef DBUG_OFF
+        // TODO: open it! DBUG_ASSERT(usage_partition || !entry->worker->jobs.len);
+#endif
         mysql_cond_signal(&slave_worker_hash_cond);
+      }
     }
     else
       DBUG_ASSERT(usage_partition != 0);
 
     mysql_mutex_unlock(&slave_worker_hash_lock);
 
-    delete_dynamic_element(&(curr_group_exec_parts->dynamic_ids), i - 1);
+    delete_dynamic_element(ep, i - 1);
   }
   curr_group_seen_begin= FALSE;
 }
@@ -1101,7 +1100,8 @@ int wait_for_workers_to_finish(Relay_log
       thd->exit_cond(proc_info);
       ret++;
 
-      DBUG_ASSERT(entry->usage == 0 || thd->killed || rli->abort_slave);
+      DBUG_ASSERT((entry->usage == 0 && entry->worker) || 
+                  thd->killed || rli->abort_slave);
     }
     else
     {

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli_pdb.h	2011-05-27 21:29:14 +0000
@@ -226,7 +226,8 @@ public:
 
   Relay_log_info *c_rli;
 
-  Dynamic_ids *curr_group_exec_parts; // CGEP
+  DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
+
   bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
   // @c last_group_done_index is for recovery, although can be viewed
   //    as statistics as well.
@@ -266,6 +267,9 @@ public:
   char checkpoint_master_log_name[FN_REFLEN];
   ulonglong checkpoint_master_log_pos;
   ulong checkpoint_seqno;
+  MY_BITMAP group_execed;
+  bool inited_group_execed;
+  volatile bool  running_status; // TRUE when Worker is read-exec loop
 
   int init_info();
   void end_info();
@@ -281,10 +285,6 @@ public:
     ATTRIBUTE_FORMAT(printf, 4, 5);
   void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
 
-  MY_BITMAP group_execed;
-  
-  bool inited_group_execed;
-
 private:
   bool read_info(Rpl_info_handler *from);
   bool write_info(Rpl_info_handler *to);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-05-25 16:02:13 +0000
+++ b/sql/rpl_slave.cc	2011-05-27 21:29:14 +0000
@@ -3715,10 +3715,8 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_unlock(&LOCK_thread_count);
 
   mysql_mutex_lock(&w->jobs_lock);
-
-  DBUG_ASSERT(w->jobs.len == rli->mts_slave_worker_queue_len_max + 1);
-  w->jobs.len= 0;
-  mysql_cond_signal(&w->jobs_cond);  // ready for duty
+  w->running_status= TRUE;           // ready for duty
+  mysql_cond_signal(&w->jobs_cond);
 
   mysql_mutex_unlock(&w->jobs_lock);
 
@@ -3729,13 +3727,13 @@ pthread_handler_t handle_slave_worker(vo
       error= slave_worker_exec_job(w, rli);
   }
 
+  w->cleanup_context(thd, error);
   if (error)
   {
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
     thd->clear_error();
-    w->cleanup_context(thd, error);
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3760,14 +3758,15 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   mysql_mutex_lock(&w->jobs_lock);
-  w->jobs.len= rli->mts_slave_worker_queue_len_max + 1;
+
+  w->running_status= 0;
   sql_print_information("Worker %lu statistics: "
                         "events processed = %lu "
                         "hungry waits = %lu "
                         "priv queue overfills = %llu "
                         ,w->id, w->stmt_jobs, w->wait_jobs, w->jobs.waited_overfill);
-
   mysql_cond_signal(&w->jobs_cond);  // famous last goodbye
+
   mysql_mutex_unlock(&w->jobs_lock);
 
 err:
@@ -4090,9 +4089,7 @@ int slave_start_single_worker(Relay_log_
     goto err;
   }
   
-  // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
-  // entry->usage assert
-  w->curr_group_exec_parts->dynamic_ids.elements= 0;
+  w->curr_group_exec_parts.elements= 0;
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
   w->checkpoint_notified= FALSE;
   w->workers= rli->workers; // shallow copying is sufficient
@@ -4103,18 +4100,17 @@ int slave_start_single_worker(Relay_log_
   w->usage_partition= 0;
   w->last_group_done_index= rli->gaq->s; // out of range
 
-  w->jobs.s= rli->mts_slave_worker_queue_len_max;
-  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
+  w->jobs.a= 0;
+  w->jobs.len= 0;
+  w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
+  w->jobs.waited_overfill= 0;
+  w->jobs.e= w->jobs.s= rli->mts_slave_worker_queue_len_max;
+  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0);
   for (k= 0; k < w->jobs.s; k++)
     insert_dynamic(&w->jobs.Q, (uchar*) &empty);
   
   DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
   
-  w->jobs.e= w->jobs.s;
-  w->jobs.a= 0;
-  w->jobs.len= rli->mts_slave_worker_queue_len_max + 1; // to first handshake
-  w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
-  w->jobs.waited_overfill= 0;
   w->wq_overrun_set= FALSE;
   set_dynamic(&rli->workers, (uchar*) &w, i);
   mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
@@ -4131,7 +4127,7 @@ int slave_start_single_worker(Relay_log_
   }
   
   mysql_mutex_lock(&w->jobs_lock);
-  if (w->jobs.len != 0)
+  if (!w->running_status)
     mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
   mysql_mutex_unlock(&w->jobs_lock);
   // Least occupied inited with zero
@@ -4221,7 +4217,7 @@ err:
 
    Workers are notified with setting KILLED status
    and waited for their acknowledgment as specified by
-   a "magic" (out-of-operational range) value of w->jobs.len.
+   worker's running_status.
 */
 void slave_stop_workers(Relay_log_info *rli)
 {
@@ -4238,7 +4234,7 @@ void slave_stop_workers(Relay_log_info *
     
     mysql_mutex_lock(&w->jobs_lock);
     
-    if (w->jobs.len == rli->mts_slave_worker_queue_len_max + 1)
+    if (!w->running_status)
     {
       mysql_mutex_unlock(&w->jobs_lock);
       continue;
@@ -4259,7 +4255,7 @@ void slave_stop_workers(Relay_log_info *
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
 
     mysql_mutex_lock(&w->jobs_lock);
-    while (w->jobs.len != rli->mts_slave_worker_queue_len_max + 1)
+    while (w->running_status)
     {
       const char *save_proc_info;
       save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110527212914-k720id4iq6eh9ihb.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3283) WL#5569Andrei Elkin31 May