List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:November 26 2010 9:08pm
Subject:bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3219) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped

 3219 Andrei Elkin	2010-11-26
      wl#5569 MTS
      
      Partitioning conflict detection and handling is implemented.
      A new option to run Query in parallel though incompatibly with Rows- case in that
      the default db not the actual db:s are used as the partition key.
      
      User interface gained the global var and the cmd line opt:
      
      slave_run_query_in_parallel (Welcome to the set! :-)
     @ mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
        new tests result file is added.
     @ mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
        Partitioning conflicts detection, handling basic initial test is added.
     @ sql/log_event.cc
        Refining parallel vs sequential decider to cover optional support for Query parallelization.
     @ sql/log_event.h
        Refining only_serial_exec() with providing hints through two new args.
     @ sql/mysqld.cc
        new Query limited parallelization support related.
     @ sql/mysqld.h
        new Query limited parallelization support related.
     @ sql/rpl_rli.h
        changed are due to new Query limited parallelization support.
     @ sql/rpl_rli_pdb.cc
        Conflict detection, waiting, partition release is implemented.

    added:
      mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
      mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sys_vars.cc
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_conflicts.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result	2010-11-26 21:08:30 +0000
@@ -0,0 +1,78 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+create view coord_wait_list  as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+include/start_slave.inc
+create database d1;
+create database d2;
+create database d3;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+create table d2.t1 (a int auto_increment primary key) engine=innodb;
+create table d3.t1 (a int auto_increment primary key) engine=innodb;
+begin;
+insert into d2.t1 values (1);
+begin;
+use d1;
+insert into d1.t1 values (null);
+use d2;
+insert into d2.t1 values (1);
+commit;
+begin;
+use d3;
+insert into d3.t1 values (null);
+use d1;
+insert into d1.t1 values (null);
+commit;
+rollback;
+select count(*) from d1.t1 into @d1;
+select count(*) from d2.t1 into @d2;
+select count(*) from d3.t1 into @d3;
+use d1;
+create table `exists_only_on_slave` (a int);
+begin;
+insert into d1.t1 values (null);
+insert into d2.t1 values (null);
+insert into d3.t1 values (null);
+begin;
+use d1;
+insert into d1.t1 values (null);
+commit;
+begin;
+use d2;
+insert into d2.t1 values (null);
+commit;
+begin;
+use d3;
+insert into d3.t1 values (null);
+commit;
+use d1;
+drop table if exists `exists_only_on_slave`;
+select sleep(1);
+sleep(1)
+0
+select count(*) - @d1 as 'zero' from d1.t1;
+zero
+0
+select count(*) - @d2 as 'zero' from d2.t1;
+zero
+0
+select count(*) - @d3 as 'zero' from d3.t1;
+zero
+0
+use d1;
+select count(*) as 'zero' from `exists_only_on_slave`;
+zero
+0
+rollback;
+drop database d1;
+drop database d2;
+drop database d3;
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+*** End of the tests ***

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_conflicts.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test	2010-11-26 21:08:30 +0000
@@ -0,0 +1,224 @@
+#
+# WL#5569 MTS
+#
+# The test checks cases of hashing conflicts forcing a special hanling.
+# The cases include
+#
+# I. two Worker jobs conflict to each other
+#
+#   a. two multi-statement transactions containing more than one partition
+#      in which one is common are mapped to different Workers.
+#   b. similarly two autocommit queries or ddl:s
+#
+# Handling of the cases is carried out as the following:
+# when Coordinator hits to an occupied by not the currenly assigned Worker
+# partition it marks the partition and goes to wait till the Worker-owner
+# has released it and signaled.
+#
+# II. An event requires the sequential execution
+#
+# Coordinator does not schedule the event and is waiting till all workers have 
+# released their partitions and signalled.
+
+source include/master-slave.inc;
+
+# 
+#  Testing with the statement format requires 
+#  @@global.slave_run_query_in_parallel = 1.
+#  Notice, parallelization for Query-log-event is limitted
+#  to the default dababase. That's why 'use db'.
+#  With the default @@global.slave_run_query_in_parallel == 0
+#  the tests in stmt format still run to prove switching to the sequential.
+
+#  TODO: convert this file into two tests for either value of 
+#        @@global.slave_run_query_in_parallel
+
+connection slave;
+
+--disable_query_log
+--disable_result_log
+call mtr.add_suppression('Error reading slave worker configuration');
+--enable_query_log
+--enable_result_log
+
+create view coord_wait_list  as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+
+source include/stop_slave.inc;
+
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+source include/start_slave.inc;
+
+
+connection master;
+
+create database d1;
+create database d2;
+create database d3;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+create table d2.t1 (a int auto_increment primary key) engine=innodb;
+create table d3.t1 (a int auto_increment primary key) engine=innodb;
+
+#
+# I. Two parallel jobs conflict
+#
+# two conflicting jobs to follow
+
+# sync_slave_with_master
+
+# TODO: remove once `sync_slave_with_master' got fixed
+
+--sleep 3
+
+# To be really conflicting slave needs to block commit of the first.
+connection slave;
+
+begin;
+insert into d2.t1 values (1);
+
+connection master;
+
+# Job_1
+begin;
+use d1;
+insert into d1.t1 values (null);
+use d2;
+insert into d2.t1 values (1);   # will be block at this point on Worker
+commit;
+
+# Job_2
+begin;
+use d3;
+insert into d3.t1 values (null);
+use d1;
+insert into d1.t1 values (null); #  will be block at this point on Coord
+commit;
+
+--sleep 4
+
+connection slave;
+
+if (`SELECT @@global.binlog_format LIKE "row"`)
+{
+    if (`select COUNT(*) = 0 FROM coord_wait_list`)
+    {
+	SELECT *  from Information_Schema.processlist;
+	--die Appologies, coodinator is supposed to be in the waiting state but it is not
+    }
+}
+
+# release the Worker
+rollback;
+
+let $count= 2;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+
+#
+# II. The only-sequential conflicts with ongoing parallel applying
+#
+
+# a. DDL waits for all workers have processed their earlier scheduled assignments
+
+connection slave1;
+
+# fix the tables status. Tables are supposed to exist, possibly with data left
+# after previous part.
+
+select count(*) from d1.t1 into @d1;
+select count(*) from d2.t1 into @d2;
+select count(*) from d3.t1 into @d3;
+use d1;
+create table `exists_only_on_slave` (a int);
+
+connection slave;
+
+# put in the way of workers blocking load
+
+begin;
+insert into d1.t1 values (null);
+insert into d2.t1 values (null);
+insert into d3.t1 values (null);
+
+connection master;
+
+# Job_1
+begin;
+use d1;
+insert into d1.t1 values (null);
+commit;
+
+# Job_2
+begin;
+use d2;
+insert into d2.t1 values (null);
+commit;
+
+
+# Job_3
+begin;
+use d3;
+insert into d3.t1 values (null);
+commit;
+
+--disable_warnings
+use d1;
+drop table if exists `exists_only_on_slave`;
+--enable_warnings
+
+
+connection slave1;
+
+select sleep(1);  # give Workers a little time to process (but they won't)
+
+select count(*) - @d1 as 'zero' from d1.t1;
+select count(*) - @d2 as 'zero' from d2.t1;
+select count(*) - @d3 as 'zero' from d3.t1;
+
+# proof the master DDL has not got through
+use d1;
+select count(*) as 'zero' from `exists_only_on_slave`;
+
+connection slave;
+
+rollback; # release workers
+
+connection slave1;
+
+# to finish up with getting all committed.
+
+let $count= `select @d1 + 1`;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+let $count= `select @d2 + 1`;
+let $table= d2.t1;
+source include/wait_until_rows_count.inc;
+
+let $count= `select @d3 + 1`;
+let $table= d3.t1;
+source include/wait_until_rows_count.inc;
+connection slave;
+
+
+#
+# cleanup
+#
+
+connection master;
+
+drop database d1;
+drop database d2;
+drop database d3;
+
+--sleep 4
+
+connection slave;
+#sync_slave_with_master;
+
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+--echo *** End of the tests ***
+

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-25 08:47:39 +0000
+++ b/sql/log_event.cc	2010-11-26 21:08:30 +0000
@@ -2300,6 +2300,8 @@ Slave_worker *Log_event::get_slave_worke
 
     // reset the B-group marker
     const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
+
+    const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE;  // mark for Coord's T-event delete
   }
   
   return worker;
@@ -2445,10 +2447,47 @@ int Log_event::apply_event(Relay_log_inf
   Slave_worker *w= NULL;
   Slave_job_item item= {NULL}, *job_item= &item;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
+  bool parallel;
 
-  if (!rli->is_parallel_exec() || only_serial_exec() /* || wait(APH.N == 0) */)
+  if (!(parallel= rli->is_parallel_exec()) ||
+      only_serial_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
+  {
+    if (parallel)
+    {
+      // This case relates to Query parallel apply which breaks into
+      // DDL and {B, Q, T} group, where Q owns g-parallel property.
+
+      // Apply possibly deferred B
+      if (rli->curr_group_da.elements > 0)
+      {
+        int res;
+        Log_event *ev_begin= * (Log_event**) pop_dynamic(&c_rli->curr_group_da);
+
+        DBUG_ASSERT(rli->curr_group_da.elements == 0);
+        DBUG_ASSERT(rli->curr_group_seen_begin);
+
+        res= ev_begin->do_apply_event(rli);
+        delete ev_begin;
+        /* B appears to be serial, reset parallel stautus of group 
+           because the following T won't do that */
+        c_rli->curr_group_seen_begin= FALSE;
+
+        if (res)
+          DBUG_RETURN(res);
+      }
+
+      DBUG_ASSERT(!rli->curr_group_seen_begin);
+      c_rli->curr_group_is_parallel= FALSE;   // Coord will destruct all the rest of events
+
+      (void) wait_for_workers_to_finish(rli);
+    }
     DBUG_RETURN(do_apply_event(rli));
- 
+  }
+
+  // !!! TODO: suppress
+  // if (get_type_code() == ROWS_QUERY_LOG_EVENT)
+    
+
   if ((!(w= get_slave_worker_id(rli)) ||
        DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
     DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-11-22 18:57:13 +0000
+++ b/sql/log_event.h	2010-11-26 21:08:30 +0000
@@ -1153,37 +1153,45 @@ public:
 public:
 
   /**
-     mst-II: to execute serially due to 
-             technical or conceptual limitation
+     MST: to execute serially due to technical or conceptual limitation
 
      @return TRUE for all but {Query,Rand,User_var,Intvar,Rows}_log_event
   */
-  bool only_serial_exec()
+  bool only_serial_exec(bool query_in_parallel, bool group_term_in_parallel)
   {
     return
-      // todo: the 4 types below are limitly parallel-supported (the default 
-      // session db not the actual db)
+       /* 
+         the 4 types below are limitly parallel-supported (the default 
+         session db not the actual db).
+         Decision on BEGIN is deferred till the following event.
+         Decision on Commit or Xid is forced by the one for BEGIN.
+      */
       
-      // get_type_code() == QUERY_EVENT ||
-      // get_type_code() == INTVAR_EVENT ||
-      // get_type_code() == USER_VAR_EVENT ||
-      // get_type_code() == RAND_EVENT ||
-
-      get_type_code() == STOP_EVENT ||
-      get_type_code() == ROTATE_EVENT ||
-      get_type_code() == LOAD_EVENT ||
-      get_type_code() == SLAVE_EVENT ||
-      get_type_code() == CREATE_FILE_EVENT ||
-      get_type_code() == APPEND_BLOCK_EVENT ||
-      get_type_code() == EXEC_LOAD_EVENT ||
-      get_type_code() == DELETE_FILE_EVENT ||
-      get_type_code() == NEW_LOAD_EVENT ||
-      get_type_code() == FORMAT_DESCRIPTION_EVENT ||
-      get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
-      get_type_code() == EXECUTE_LOAD_QUERY_EVENT ||
+      (!query_in_parallel &&
+       ((get_type_code() == QUERY_EVENT
+         && !starts_group() && !ends_group())    ||
+        get_type_code() == INTVAR_EVENT          ||
+        get_type_code() == USER_VAR_EVENT        ||
+        get_type_code() == RAND_EVENT))          ||
+
+      (!group_term_in_parallel && ends_group())  ||
+
+      get_type_code() == START_EVENT_V3          ||
+      get_type_code() == STOP_EVENT              ||
+      get_type_code() == ROTATE_EVENT            ||
+      get_type_code() == LOAD_EVENT              ||
+      get_type_code() == SLAVE_EVENT             ||
+      get_type_code() == CREATE_FILE_EVENT       ||
+      get_type_code() == APPEND_BLOCK_EVENT      ||
+      get_type_code() == EXEC_LOAD_EVENT         ||
+      get_type_code() == DELETE_FILE_EVENT       ||
+      get_type_code() == NEW_LOAD_EVENT          ||
+      get_type_code() == FORMAT_DESCRIPTION_EVENT||
+      get_type_code() == BEGIN_LOAD_QUERY_EVENT  ||
+      get_type_code() == EXECUTE_LOAD_QUERY_EVENT|| /* todo: make parallel */
       get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
-      get_type_code() == PRE_GA_UPDATE_ROWS_EVENT ||
-      get_type_code() == PRE_GA_DELETE_ROWS_EVENT ||
+      get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
+      get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
       get_type_code() == INCIDENT_EVENT;
   }
   

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-11-23 09:03:37 +0000
+++ b/sql/mysqld.cc	2010-11-26 21:08:30 +0000
@@ -463,6 +463,7 @@ ulonglong slave_type_conversions_options
 ulong slave_parallel_workers;
 ulong slave_max_pending_jobs;
 my_bool slave_local_timestamp_opt;
+my_bool opt_slave_run_query_in_parallel;
 ulong thread_cache_size=0;
 ulong binlog_cache_size=0;
 ulonglong  max_binlog_cache_size=0;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-09-21 22:19:05 +0000
+++ b/sql/mysqld.h	2010-11-26 21:08:30 +0000
@@ -175,6 +175,7 @@ extern uint  slave_net_timeout;
 extern ulong slave_parallel_workers;
 extern ulong slave_max_pending_jobs;
 extern my_bool slave_local_timestamp_opt;
+extern my_bool opt_slave_run_query_in_parallel;
 extern uint max_user_connections;
 extern ulong what_to_log,flush_time;
 extern ulong max_prepared_stmt_count, prepared_stmt_count;

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-25 09:03:54 +0000
+++ b/sql/rpl_rli.h	2010-11-26 21:08:30 +0000
@@ -428,15 +428,16 @@ public:
   Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   Slave_committed_queue *gaq;
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
-  DYNAMIC_ARRAY curr_group_da;   // deferred array to hold part-info-free events
-  bool curr_group_seen_begin;     // current group started with B-event or not
-  volatile Slave_worker* slave_worker_is_error;
+  DYNAMIC_ARRAY curr_group_da;  // deferred array to hold part-info-free events
+  bool curr_group_seen_begin;   // current group started with B-event or not
+  bool run_query_in_parallel;   // Query's default db not the actual db as part
 
   Slave_worker* get_current_worker() const;
   Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
-
+  volatile Slave_worker* slave_worker_is_error;
+  bool curr_group_is_parallel; // a mark for Coord to indicate on T-event of the curr group at delete
   /* 
      A sorted array of Worker current assignements number to provide
      approximate view on Workers loading.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-11-26 16:15:37 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-11-26 21:08:30 +0000
@@ -285,12 +285,6 @@ Slave_worker *get_slave_worker(const cha
   insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) key);
 
   DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
-  /*
-    The database name was not found which means that a worker never
-    processed events from that database. In such case, we need to
-    map the database to a worker my inserting an entry into the
-    hash map.
-  */
 
   hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
                            dblength);
@@ -302,6 +296,12 @@ Slave_worker *get_slave_worker(const cha
                                     (uchar*) dbname, dblength);
   if (!entry)
   {
+    /*
+      The database name was not found which means that a worker never
+      processed events from that database. In such case, we need to
+      map the database to a worker my inserting an entry into the
+      hash map.
+    */
     my_bool ret;
     mysql_mutex_unlock(&slave_worker_hash_lock);
 
@@ -327,6 +327,7 @@ Slave_worker *get_slave_worker(const cha
     */
     entry->worker= !rli->last_assigned_worker ?
       get_least_occupied_worker(workers) : rli->last_assigned_worker;
+    entry->worker->usage_partition++;
 
     mysql_mutex_lock(&slave_worker_hash_lock);
     ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
@@ -347,7 +348,9 @@ Slave_worker *get_slave_worker(const cha
     {
       entry->worker= !rli->last_assigned_worker ? 
         get_least_occupied_worker(workers) : rli->last_assigned_worker;
+      entry->worker->usage_partition++;
       entry->usage++;
+
       my_hash_update(&mapping_db_to_worker, (uchar*) entry,
                      (uchar*) dbname, dblength);
     }
@@ -361,20 +364,37 @@ Slave_worker *get_slave_worker(const cha
       my_hash_update(&mapping_db_to_worker, (uchar*) entry,
                      (uchar*) dbname, dblength);
     }
-    else  // may be the hashing conflict
+    else
     {
-      DBUG_ASSERT(rli->last_assigned_worker == NULL ||
-                  rli->curr_group_assigned_parts.elements > 1);
+      // The case APH contains a W_d != W_c != NULL assigned to
+      // D-partition represents
+      // the hashing conflict and is handled as the following:
+
+      THD *thd= rli->info_thd;
+      const char *proc_info;
+      const char info_format[]=
+        "Waiting for Slave Worker %d to release partition `%s`";
+      char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
+                     NAME_LEN + 1];
 
-      DBUG_ASSERT(0); // ... TODO ... *not* ready yet 
+      DBUG_ASSERT(rli->last_assigned_worker != NULL &&
+                  rli->curr_group_assigned_parts.elements > 1);
 
       // future assignenment and marking at the same time
       entry->worker= rli->last_assigned_worker;
 
-      wait();
+      sprintf(wait_info, info_format, entry->worker->id, entry->db);
+
+      proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
+                                 wait_info);
+      mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+      thd->exit_cond(proc_info);
+      mysql_mutex_lock(&slave_worker_hash_lock);
 
       DBUG_ASSERT(entry->usage == 0);
+
       entry->usage= 1;
+      entry->worker->usage_partition++;
 
     }
     mysql_mutex_unlock(&slave_worker_hash_lock);
@@ -408,10 +428,6 @@ Slave_worker *get_least_occupied_worker(
   
   DBUG_ASSERT(worker != NULL);
 
-  worker->usage_partition++;
-
-  DBUG_ASSERT(worker->usage_partition != 0);
-
   return(worker);
 }
 
@@ -449,7 +465,7 @@ void Slave_worker::slave_worker_ends_gro
       my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
                                       (uchar*) key + 1, key[0]);
 
-    DBUG_ASSERT(entry && entry->usage != 0 && entry->worker == this);
+    DBUG_ASSERT(entry && entry->usage != 0);
 
     DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
 
@@ -458,7 +474,11 @@ void Slave_worker::slave_worker_ends_gro
                    (uchar*) key + 1, key[0]);
 
     if (entry->usage == 0)
+    {
       usage_partition--;
+      if (entry->worker != this) // Coordinator is waiting
+        mysql_cond_signal(&slave_worker_hash_cond);
+    }
     else
       DBUG_ASSERT(usage_partition != 0);
     /*
@@ -645,3 +665,45 @@ ulong Slave_committed_queue::move_queue_
 
   return cnt;
 }
+
+
+int wait_for_workers_to_finish(Relay_log_info const *rli)
+{
+  uint ret= 0;
+  HASH *hash= &mapping_db_to_worker;
+  for (uint i= 0, ret= 0; i < hash->records; i++)
+  {
+    db_worker *entry;
+    THD *thd= rli->info_thd;
+    const char *proc_info;
+    const char info_format[]=
+      "Waiting for Slave Worker %d to release partition `%s`";
+    char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
+                   NAME_LEN + 1];
+   
+    mysql_mutex_lock(&slave_worker_hash_lock);
+  
+    entry= (db_worker*) my_hash_element(hash, i);
+
+    DBUG_ASSERT(entry);
+    
+    if (entry->usage > 0)
+    {
+      sprintf(wait_info, info_format, entry->worker->id, entry->db);
+      entry->worker= NULL;
+
+      proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
+                               wait_info);
+      mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+      thd->exit_cond(proc_info);
+      ret++;
+
+      DBUG_ASSERT(entry->usage == 0);
+    }
+    else
+    {
+      mysql_mutex_unlock(&slave_worker_hash_lock);
+    }
+  }
+  return ret;
+}

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-25 09:03:54 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-26 21:08:30 +0000
@@ -24,6 +24,7 @@ bool init_hash_workers(ulong slave_paral
 void destroy_hash_workers();
 Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
+int wait_for_workers_to_finish(Relay_log_info const *rli);
 
 #define SLAVE_WORKER_QUEUE_SIZE 8096
 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-25 09:03:54 +0000
+++ b/sql/rpl_slave.cc	2010-11-26 21:08:30 +0000
@@ -2615,7 +2615,7 @@ int apply_event_and_update_pos(Log_event
   if (!rli->is_in_group() && rli->slave_exec_mode != slave_exec_mode_options)
     rli->slave_exec_mode= slave_exec_mode_options;
 
-  int reason= ev->shall_skip(rli);
+  int reason= ev->shall_skip(rli);      // TODO: MTS skip handling
   if (reason == Log_event::EVENT_SKIP_COUNT)
   {
     sql_slave_skip_counter= --rli->slave_skip_counter;
@@ -2878,8 +2878,9 @@ static int exec_relay_log_event(THD* thd
       if (thd->variables.binlog_rows_query_log_events)
         handle_rows_query_log_event(ev, rli);
 
-      if (!rli->is_parallel_exec() && !ev->only_serial_exec() &&
-          ev->get_type_code() != ROWS_QUERY_LOG_EVENT)  // mts todo: check this case
+      if ((!rli->is_parallel_exec() ||
+           ev->only_serial_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
+          && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)  // mts TODO: check this case
       {
 
         DBUG_PRINT("info", ("Deleting the event after it has been executed"));
@@ -3600,8 +3601,6 @@ err:
     mysql_mutex_unlock(&LOCK_thread_count);
   }
 
-  delete w->w_rli; // fixme: experimenting
-
   my_thread_end();
   pthread_exit(0);
   DBUG_RETURN(0);        
@@ -3628,7 +3627,7 @@ int slave_start_single_worker(Relay_log_
   // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
   // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
   w->w_rli= new Relay_log_info(FALSE);
-  Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(FALSE);
+  Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(TRUE);
   w->w_rli->set_rpl_info_handler(dummy_handler);
   ulong key_worker_idx[]= { server_id, w->id };
   w->init_info(key_worker_idx, NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER);
@@ -3712,7 +3711,7 @@ int slave_start_workers(Relay_log_info *
   rli->mts_total_groups= 0;
   rli->slave_worker_is_error= NULL;
   rli->curr_group_seen_begin= NULL;
-
+  rli->run_query_in_parallel= opt_slave_run_query_in_parallel;
   for (i= 0; i < n; i++)
   {
     if ((error= slave_start_single_worker(rli, i)))
@@ -3785,6 +3784,8 @@ void slave_stop_workers(Relay_log_info *
     delete_dynamic(&w->jobs.Q);
     delete_dynamic(&w->curr_group_exec_parts);   // GCEP
     delete_dynamic_element(&rli->workers, i);
+    delete w->w_rli;
+
     delete w;
   }
 

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-11-20 17:23:42 +0000
+++ b/sql/sys_vars.cc	2010-11-26 21:08:30 +0000
@@ -3121,6 +3121,12 @@ static Sys_var_mybool Sys_slave_local_ti
        "time value to implicitly affected timestamp columms. Otherwise (default) "
        "installs prescribed by the master value.",
        GLOBAL_VAR(slave_local_timestamp_opt), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+static Sys_var_mybool Sys_slave_run_query_in_parallel(
+       "slave_run_query_in_parallel",
+       "The default not an actual database name is used as partition info "
+       "for parallel execution of Query_log_event ",
+       GLOBAL_VAR(opt_slave_run_query_in_parallel), CMD_LINE(OPT_ARG),
+       DEFAULT(FALSE));
 #endif
 
 static bool check_locale(sys_var *self, THD *thd, set_var *var)


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101126210830-eqhloz0dt6xp23pz.bundle
Thread
bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3219) WL#5569Andrei Elkin26 Nov