List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:November 20 2010 5:24pm
Subject:bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3213 to 3214)
WL#5569
View as plain text  
 3214 Andrei Elkin	2010-11-20
      wl#5569 MTS
      
      Worker pool start, stop, kills, error out implementation.
     @ mysql-test/extra/rpl_tests/rpl_parallel_load.test
        increasing the load param to get more reliable benchmarking data out of the test.
     @ mysql-test/suite/rpl/r/rpl_parallel_start_stop.result
        a new tests results.
     @ mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
        worker pool start, stop, kills, errors testing.
     @ sql/log_event.cc
        removing a false and unnessary extention-arg to exit_cond();
        Refining start-stop alg to base on the Worker private info, not
        the common info.
        In particular handshakes organized through magic value of length of the Worker
        private queue to is set by an initiator.
     @ sql/rpl_slave.cc
        Starting a worker thread with passing its Slave_worker * pointer.
        Simplifying and refining start-stop.
     @ sql/sql_class.h
        removing a false and unnessary extention-arg to exit_cond();
     @ sql/sys_vars.cc
        Reckoning a magic value outside of the valid range for pending_jobs.

    added:
      mysql-test/suite/rpl/r/rpl_parallel_start_stop.result
      mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      sql/log_event.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sql_class.h
      sql/sys_vars.cc
 3213 Andrei Elkin	2010-11-19
      wl#5569 
      
      recovery interfaces for wl#5599 implementation.
      
      The essence of this patch is to provide GAQ object implimentation
      and valid life cycle. 
      The checkpoint handler prior to call store methods of wl#5599 is supposed
      to invoke rli->gaq->move_queue_head(&rli->workers).
      
      See a simulation of that near ev->update_pos() of the mail sql thread loop.
      The checkpoint info is composed as instance of Slave_job_group to reside
      as rli->gap->lwm.
      
      
      Todo: uncomment 
      +  // delete ev;  // after ev->update_pos() event is garbage
      once the real checkpoint has been done.
      
      Todo: the real implemention needs to take care of filing
      Slave_job_group::update_current_binlog as initially so at time of executing
      Rotate/FD methods.
      
      
      +  // experimental checkpoint per each scheduling attempt
      +  // logics of next_event()
      +
      +    rli->gaq->move_queue_head(&rli->workers);
     @ sql/log_event.cc
        Log_event::get_slave_worker_id() got shaped more to the final version with elements
        necessary to rli->gaq lify cycle.
     @ sql/log_event.h
        Log_event::mts_group_cnt is added as a part of GAQ index propagation path 
        from C to W.
     @ sql/rpl_rli.h
        Further extension to RLI necessary to the distribution hash function (APH).
     @ sql/rpl_rli_pdb.cc
        Implementing circular_buffer_queue::*queue and few other methods incl
        ulong Slave_committed_queue::move_queue_head()
        the main concern for checkpoint.
     @ sql/rpl_rli_pdb.h
        Extending classes with few new member definitions necessary for GAQ interface / checkpoint / recovery.
     @ sql/rpl_slave.cc
        Simulation of the lwm-checkpoint and changes due to rpl_rli_pdb classes extensions.

    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-11-18 14:00:52 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-11-20 17:23:42 +0000
@@ -13,7 +13,7 @@ let $workers = `select @@global.slave_pa
 #
 # load volume parameter
 #
-let $iter = 1000;
+let $iter = 2000;
 
 connection slave;
 

=== added file 'mysql-test/suite/rpl/r/rpl_parallel_start_stop.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_start_stop.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_start_stop.result	2010-11-20 17:23:42 +0000
@@ -0,0 +1,30 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+create view worker_proc_list as SELECT id  from Information_Schema.processlist
+where state like 'Waiting for an event from sql thread%';
+create view coord_proc_list  as SELECT id from Information_Schema.processlist where state like 'Slave has read all relay log%';
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+include/start_slave.inc
+select min(id) from worker_proc_list into @w_id;
+kill query @w_id;
+include/start_slave.inc
+select id from coord_proc_list into @c_id;
+kill query @c_id;
+include/start_slave.inc
+CREATE TABLE t1 (a int primary key);
+insert into t1 values (1),(2);
+insert into t1 values (3);
+insert into t1 values (3);
+delete from t1;
+include/start_slave.inc
+drop table t1;
+drop view worker_proc_list;
+drop view coord_proc_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+end of the tests

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2010-11-20 17:23:42 +0000
@@ -0,0 +1,116 @@
+#
+# WL#5569 MTS
+#
+# The test checks START and STOP, graceful, killing or 
+# due to an error of a Worker.
+#
+
+source include/master-slave.inc;
+
+connection slave;
+
+create view worker_proc_list as SELECT id  from Information_Schema.processlist
+       where state like 'Waiting for an event from sql thread%';
+create view coord_proc_list  as SELECT id from Information_Schema.processlist where state like 'Slave has read all relay log%';
+
+source include/stop_slave.inc;
+
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+source include/start_slave.inc;
+
+let $count= `select @@global.slave_parallel_workers`;
+let $table= worker_proc_list;
+source include/wait_until_rows_count.inc;
+
+#
+# KILL of a Worker stops the pool and the Coordinator
+#
+select min(id) from worker_proc_list into @w_id;
+kill query @w_id;
+
+let $count= 0;
+let $table= worker_proc_list;
+source include/wait_until_rows_count.inc;
+
+source include/wait_for_slave_sql_to_stop.inc;
+
+#
+# KILL of the Coordinator stops the pool as well
+#
+source include/start_slave.inc;
+
+# testing of the poll is up
+
+let $count= `select @@global.slave_parallel_workers`;
+let $table= worker_proc_list;
+source include/wait_until_rows_count.inc;
+
+let $count= 1;
+let $table= coord_proc_list;
+source include/wait_until_rows_count.inc;
+
+select id from coord_proc_list into @c_id;
+
+--disable_query_log
+--disable_result_log
+#select sleep(300);
+--enable_query_log
+--enable_result_log
+
+kill query @c_id;
+
+let $count= 0;
+let $table= worker_proc_list;
+source include/wait_until_rows_count.inc;
+
+source include/wait_for_slave_sql_to_stop.inc;
+
+source include/start_slave.inc;
+
+#
+# Errored-out Worker stops the pool and the Coordinator
+#
+connection master;
+
+# make some load
+
+CREATE TABLE t1 (a int primary key);
+
+insert into t1 values (1),(2);
+
+#connection slave;
+sync_slave_with_master;
+# create an offending record
+insert into t1 values (3);
+
+connection master;
+
+# hit it
+insert into t1 values (3);
+
+connection slave;
+
+let $count= 0;
+let $table= worker_proc_list;
+source include/wait_until_rows_count.inc;
+
+source include/wait_for_slave_sql_to_stop.inc;
+delete from t1;
+
+source include/start_slave.inc;
+
+# cleanup
+
+connection master;
+drop table t1;
+
+#connection slave;
+sync_slave_with_master;
+
+drop view worker_proc_list;
+drop view coord_proc_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+--echo end of the tests
+

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-19 14:51:58 +0000
+++ b/sql/log_event.cc	2010-11-20 17:23:42 +0000
@@ -2300,7 +2300,7 @@ static void * head_queue(Slave_jobs_queu
 /**
    return a job item through a struct which point is supplied via argument.
 */
-static Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
+Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
 {
   if (jobs->e == jobs->s)
   {
@@ -2346,7 +2346,8 @@ void append_item_to_jobs(slave_job_item 
                                       &rli->pending_jobs_lock,
                                       "Waiting for an event from sql thread");
     mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
-    thd->exit_cond(old_msg, FALSE);
+    thd->exit_cond(old_msg);
+    mysql_mutex_lock(&rli->pending_jobs_lock);
     if (thd->killed)
       return;
   }
@@ -2355,15 +2356,12 @@ void append_item_to_jobs(slave_job_item 
   
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
-  mysql_mutex_lock(&w->jobs_lock);
-  if (!w->info_thd->is_slave_error)  // error check exists at stmt commit as well
+  if (!w->info_thd->killed)
   {
     int ret;
 
-    // LABS-TODO: GC delete (JQ[a]->data)
-     // LABS-TODO: en_queue()
+    mysql_mutex_lock(&w->jobs_lock);
 
-    //err= w->jobs.push_back(job_item, &w->mem_root);
     ret= en_queue(&w->jobs, job_item);
 
     DBUG_ASSERT(ret >= 0);
@@ -2371,6 +2369,8 @@ void append_item_to_jobs(slave_job_item 
     w->curr_jobs++;
     if (w->jobs.len == 1)
       mysql_cond_signal(&w->jobs_cond);
+    
+    mysql_mutex_unlock(&w->jobs_lock);
   }
   else
   {
@@ -2378,7 +2378,6 @@ void append_item_to_jobs(slave_job_item 
     rli->pending_jobs--; // roll back of the prev incr
     mysql_mutex_unlock(&rli->pending_jobs_lock);
   }
-  mysql_mutex_unlock(&w->jobs_lock);
 }
 
 /**
@@ -2437,10 +2436,11 @@ struct slave_job_item* pop_jobs_item(Sla
       old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
                                "Waiting for an event from sql thread");
       mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
-      thd->exit_cond(old_msg, FALSE);
+      thd->exit_cond(old_msg);
+      mysql_mutex_lock(&w->jobs_lock);
     }
   }
-  if (job_item)
+  if (job_item->data)
     w->curr_jobs--;
   mysql_mutex_unlock(&w->jobs_lock);
 
@@ -2469,9 +2469,8 @@ int slave_worker_exec_job(Slave_worker *
   DBUG_ENTER("slave_worker_exec_job");
 
   job_item= pop_jobs_item(w, job_item);
-  if (!job_item->data) // LABS-TODO  de_queue()
+  if (thd->killed)
   {
-    DBUG_ASSERT(thd->killed);
     error= -1;
     goto err;
   }
@@ -2517,8 +2516,7 @@ int slave_worker_exec_job(Slave_worker *
 
   mysql_mutex_lock(&rli->pending_jobs_lock);
   rli->pending_jobs--;
-  DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max &&
-              rli->pending_jobs >= 0);
+  DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max);
 
   /* coordinator can be waiting */
 
@@ -2527,32 +2525,9 @@ int slave_worker_exec_job(Slave_worker *
     mysql_cond_signal(&rli->pending_jobs_cond);
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
-
+  w->stmt_jobs++;
 err:
 
-  if (error)
-  {
-    thd->is_slave_error= 1; // mts-II_todo: consider volatile, Crdn reads it.
-    mysql_mutex_lock(&w->jobs_lock);
-
-    //while(w->jobs.pop())  // LABS-TODO de_queue()
-    while(de_queue(&w->jobs, job_item))
-    {
-      // purging the local jobs queue
-      mysql_mutex_lock(&rli->pending_jobs_lock); // todo: check mutex:s order
-      rli->pending_jobs--;
-      if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
-          rli->pending_jobs == 0)
-        mysql_cond_signal(&rli->pending_jobs_cond);
-      mysql_mutex_unlock(&rli->pending_jobs_lock);
-    }
-    mysql_mutex_unlock(&w->jobs_lock);
-  }
-  else
-  {
-    w->stmt_jobs++;
-  }
-
   DBUG_RETURN(error);
 }
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-19 14:51:58 +0000
+++ b/sql/rpl_rli.h	2010-11-20 17:23:42 +0000
@@ -425,11 +425,11 @@ public:
     WL#5569 MTS-II
   */
   DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
-  volatile int pending_jobs;
+  volatile ulong pending_jobs;
   ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch)
   mysql_mutex_t pending_jobs_lock;
   mysql_cond_t pending_jobs_cond;
-  int   slave_pending_jobs_max;
+  ulong   slave_pending_jobs_max;
   Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   Slave_committed_queue *gaq;
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-19 14:51:58 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-20 17:23:42 +0000
@@ -147,6 +147,7 @@ public:
   mysql_cond_t  jobs_cond;
   Slave_jobs_queue jobs;
 
+  Relay_log_info *c_rli;
   // fixme: experimental
   Relay_log_info *w_rli;
 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-19 14:51:58 +0000
+++ b/sql/rpl_slave.cc	2010-11-20 17:23:42 +0000
@@ -70,12 +70,6 @@ MY_BITMAP slave_error_mask;
 char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
 
 typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
-typedef struct st_slave_worker_init_args
-{
-  Relay_log_info *rli;
-  Slave_worker *w;
-} SLAVE_WORKER_INIT_ARGS;
-
 
 char* slave_load_tmpdir = 0;
 Master_info *active_mi= 0;
@@ -3486,7 +3480,7 @@ int check_temp_dir(char* tmp_file)
   DBUG_RETURN(0);
 }
 
-
+Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
 
 /*
   Worker thread for the parallel execution of the replication events
@@ -3495,15 +3489,11 @@ int check_temp_dir(char* tmp_file)
 pthread_handler_t handle_slave_worker(void *arg)
 {
   THD *thd;                     /* needs to be first for thread_stack */
-  Slave_worker *w;
   int error= 0;
-
-  // TODO: remove pending_jobs ref:s to
-  // TODO: ulong id;
-
-  // rli= ((SLAVE_WORKER_INIT_ARGS *) arg)->rli;
-  
-  Relay_log_info* rli = ((Master_info*)arg)->rli;
+  Slave_worker *w= (Slave_worker *) arg;
+  Relay_log_info* rli= w->c_rli;
+  ulong purge_cnt= 0;
+  struct slave_job_item _item, *job_item= &_item;
 
   my_thread_init();
   DBUG_ENTER("handle_slave_worker");
@@ -3512,41 +3502,13 @@ pthread_handler_t handle_slave_worker(vo
   if (!thd)
   {
     sql_print_error("Failed during slave worker initialization");
-
-    mysql_mutex_lock(&rli->pending_jobs_lock);
-    rli->pending_jobs= 0;
-    mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent
-    mysql_mutex_unlock(&rli->pending_jobs_lock);
-
     goto err;
   }
 
-  /* handshake with each started worker */
-  mysql_mutex_lock(&rli->pending_jobs_lock);
-  get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, rli->pending_jobs - 1);
-  rli->pending_jobs= 0;
-
-  // TODO: w= ((SLAVE_WORKER_INIT_ARGS *) arg)->w;
-
   w->info_thd= thd;
-  w->tables_to_lock= NULL;
-  w->tables_to_lock_count= 0;
-
-  // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
-  // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
-  w->w_rli= new Relay_log_info(FALSE,
-                               &key_relay_log_info_run_lock,
-                               &key_relay_log_info_data_lock,
-                               &key_relay_log_info_data_cond,
-                               &key_relay_log_info_start_cond,
-                               &key_relay_log_info_stop_cond);
   w->w_rli->info_thd= thd;
-  w->w_rli->workers= rli->workers; // shallow copying is sufficient
-  w->w_rli->this_worker= w;
 
   thd->thread_stack = (char*)&thd;
-  mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent
-  mysql_mutex_unlock(&rli->pending_jobs_lock);
   
   pthread_detach_this_thread();
   if (init_slave_thread(thd, SLAVE_THD_WORKER))
@@ -3560,19 +3522,50 @@ pthread_handler_t handle_slave_worker(vo
   threads.append(thd);
   mysql_mutex_unlock(&LOCK_thread_count);
 
+  mysql_mutex_lock(&w->jobs_lock);
+
+  DBUG_ASSERT(w->jobs.len == rli->slave_pending_jobs_max + 1);
+  w->jobs.len= 0;
+  mysql_cond_signal(&w->jobs_cond);  // ready for duty
+
+  mysql_mutex_unlock(&w->jobs_lock);
+
   DBUG_ASSERT(thd->is_slave_error == 0);
 
-  while (!thd->killed)
+  while (!thd->killed && !error)
   {
       error= slave_worker_exec_job(w, rli);
   }
 
+  if (!rli->info_thd->killed)
+  {
+    mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
+    rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
+    mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+  }
+
+  mysql_mutex_lock(&w->jobs_lock);
+
+  while(de_queue(&w->jobs, job_item))
+  {
+    purge_cnt++;
+    DBUG_ASSERT(job_item->data);
+    delete static_cast<Log_event*>(job_item->data);
+  }
+
+  DBUG_ASSERT(w->jobs.len == 0);
+
+  mysql_mutex_unlock(&w->jobs_lock);
+
   mysql_mutex_lock(&rli->pending_jobs_lock);
-  rli->pending_jobs--;
-  if (rli->pending_jobs == 0)
-    mysql_cond_signal(&rli->pending_jobs_cond);
+  rli->pending_jobs -= purge_cnt;
   mysql_mutex_unlock(&rli->pending_jobs_lock);
- 
+
+  mysql_mutex_lock(&w->jobs_lock);
+  w->jobs.len= rli->slave_pending_jobs_max + 1;
+  mysql_cond_signal(&w->jobs_cond);  // famous last goodbye
+  mysql_mutex_unlock(&w->jobs_lock);
+
 err:
 
   if (thd)
@@ -3603,7 +3596,21 @@ int slave_start_single_worker(Relay_log_
   Slave_worker *w=
     Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i);
   Slave_job_item empty= {NULL};
-  SLAVE_WORKER_INIT_ARGS worker_args= {rli, w};
+
+  w->c_rli= rli;
+  w->tables_to_lock= NULL;
+  w->tables_to_lock_count= 0;
+
+  // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
+  // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
+  w->w_rli= new Relay_log_info(FALSE,
+                               &key_relay_log_info_run_lock,
+                               &key_relay_log_info_data_lock,
+                               &key_relay_log_info_data_cond,
+                               &key_relay_log_info_start_cond,
+                               &key_relay_log_info_stop_cond);
+  w->w_rli->workers= rli->workers; // shallow copying is sufficient
+  w->w_rli->this_worker= w;
 
   w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
   w->id= i;
@@ -3621,8 +3628,9 @@ int slave_start_single_worker(Relay_log_
   DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
   
   w->jobs.e= w->jobs.s;
-  w->jobs.len= w->jobs.a= 0;
-  
+  w->jobs.a= 0;
+  w->jobs.len= rli->slave_pending_jobs_max + 1; // to first handshake
+
   set_dynamic(&rli->workers, (uchar*) &w, i);
   mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
                    MY_MUTEX_INIT_FAST);
@@ -3633,25 +3641,23 @@ int slave_start_single_worker(Relay_log_
                         SLAVE_INIT_DBS_IN_GROUP, 1);
 
   if (pthread_create(&th, &connection_attrib, handle_slave_worker,
-                     (void*) &worker_args))
+                     (void*) w))
   {
     sql_print_error("Failed during slave worker thread create");
     error= 1;
     goto err;
   }
+  
+  mysql_mutex_lock(&w->jobs_lock);
+  if (w->jobs.len != 0)
+    mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
+  mysql_mutex_unlock(&w->jobs_lock);
 
 err:
   return error;
 }
 
-/**
-   The @c init_slave_workers number of Worker threads start one-by-one with synch
-   through rli->pending_jobs.
-   Also objects are initialized that Coordinator and Workers will maintain during their
-   session life time.
 
-   @return 0 success or 1 if fails 
-*/
 int slave_start_workers(Relay_log_info *rli, ulong n)
 {
   uint i;
@@ -3661,7 +3667,7 @@ int slave_start_workers(Relay_log_info *
   my_init_dynamic_array(&rli->curr_group_assigned_parts, NAME_LEN, SLAVE_INIT_DBS_IN_GROUP, 1);
 
   // GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
-  //      @c lwm_checkpoint_period to update GAQ (see @c @next_event())
+  //      @c lwm_checkpoint_period to update GAQ (see @c next_event())
   // The length of GAQ is derived from @c slave_max_pending_jobs to guarantee
   // each assigned job being sent to a WQ will be represented by an item in GAQ.
   // ::slave_max_pending_jobs is the worst case when all jobs contain
@@ -3672,54 +3678,10 @@ int slave_start_workers(Relay_log_info *
   rli->mts_total_groups= 0;
   for (i= 0; i < n; i++)
   {
-    uint k;
-    Slave_worker *w=
-      Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i);
-    Slave_job_item empty= {NULL};
-    w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
-    w->id= i;
-    w->current_table= NULL;
-    w->usage_partition= 0;
-    w->last_group_done_index= rli->gaq->s; // out of range
-
-    // Queue initialization
-    w->jobs.s= rli->slave_pending_jobs_max + 1;
-    my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
-    for (k= 0; k < w->jobs.s; k++)
-      insert_dynamic(&w->jobs.Q, (uchar*) &empty);
-
-    DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
-
-    w->jobs.e= w->jobs.s;
-    w->jobs.len= w->jobs.a= 0;
-    
-    set_dynamic(&rli->workers, (uchar*) &w, i);
-    mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
-                     MY_MUTEX_INIT_FAST);
-    mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL);
-  }
-  for (i= 0; i < rli->workers.elements; i++)
-  {
-    pthread_t th;
-    Slave_worker *w;
-    get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
-
-    /* handshake with each started workers */
-    mysql_mutex_lock(&rli->pending_jobs_lock);
-    rli->pending_jobs= i + 1;
-    mysql_mutex_unlock(&rli->pending_jobs_lock);
-    if (pthread_create(&th, &connection_attrib, handle_slave_worker, (void*) rli->mi))
+    if ((error= slave_start_single_worker(rli, i)))
     {
-      sql_print_error("Failed during slave worker thread create");
-      error= 1;
       goto err;
     }
-
-    mysql_mutex_lock(&rli->pending_jobs_lock);
-    /* wait till the Worker is of full legal age */
-    if (rli->pending_jobs > 0)
-      mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
-    mysql_mutex_unlock(&rli->pending_jobs_lock);
   }
 
   if (init_hash_workers(n))  // MTS TODO: mapping_db_to_worker -> APH
@@ -3740,31 +3702,37 @@ void slave_stop_workers(Relay_log_info *
 {
   int i;
 
-  
-  mysql_mutex_lock(&rli->pending_jobs_lock);
-  rli->pending_jobs += rli->workers.elements;
-
   for (i= rli->workers.elements - 1; i >= 0; i--)
   {
     Slave_worker *w;
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
+    
+    mysql_mutex_lock(&w->jobs_lock);
+    
+    if (w->jobs.len == rli->slave_pending_jobs_max + 1)
+    {
+      mysql_mutex_unlock(&w->jobs_lock);
+      continue;
+    }
+    mysql_mutex_unlock(&w->jobs_lock);
+    
     mysql_mutex_lock(&w->info_thd->LOCK_thd_data);
     w->info_thd->awake(THD::KILL_QUERY);
     mysql_mutex_unlock(&w->info_thd->LOCK_thd_data);
   }
   
-  while (rli->pending_jobs > 0)
-  {
-    thd_proc_info(rli->info_thd, "Waiting for workers to exit");
-    mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
-  }
-
-  mysql_mutex_unlock(&rli->pending_jobs_lock);
+  thd_proc_info(rli->info_thd, "Waiting for workers to exit");
 
   for (i= rli->workers.elements - 1; i >= 0; i--)
   {
     Slave_worker *w;
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
+
+    mysql_mutex_lock(&w->jobs_lock);
+    if (w->jobs.len != rli->slave_pending_jobs_max + 1)
+      mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
+    mysql_mutex_unlock(&w->jobs_lock);
+
     mysql_mutex_destroy(&w->jobs_lock);
     mysql_cond_destroy(&w->jobs_cond);
 
@@ -3774,6 +3742,9 @@ void slave_stop_workers(Relay_log_info *
     delete_dynamic_element(&rli->workers, i);
     delete w;
   }
+
+  DBUG_ASSERT(rli->pending_jobs == 0);
+
   destroy_hash_workers();
   delete rli->gaq;
 }

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2010-11-09 13:04:14 +0000
+++ b/sql/sql_class.h	2010-11-20 17:23:42 +0000
@@ -2198,7 +2198,7 @@ public:
     proc_info = msg;
     return old_msg;
   }
-  inline void exit_cond(const char* old_msg, bool release_lock= TRUE)
+  inline void exit_cond(const char* old_msg)
   {
     /*
       Putting the mutex unlock in thd->exit_cond() ensures that
@@ -2206,8 +2206,7 @@ public:
       locked (if that would not be the case, you'll get a deadlock if someone
       does a THD::awake() on you).
     */
-    if (release_lock)
-      mysql_mutex_unlock(mysys_var->current_mutex);
+    mysql_mutex_unlock(mysys_var->current_mutex);
     mysql_mutex_lock(&mysys_var->mutex);
     mysys_var->current_mutex = 0;
     mysys_var->current_cond = 0;

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-11-18 14:00:52 +0000
+++ b/sql/sys_vars.cc	2010-11-20 17:23:42 +0000
@@ -3115,7 +3115,7 @@ static Sys_var_ulong Sys_slave_max_pendi
        "The coordinator thread suspends further jobs assigning until "
        "conditions have been improved ",
        GLOBAL_VAR(slave_max_pending_jobs), CMD_LINE(REQUIRED_ARG),
-       VALID_RANGE(0, ULONG_MAX), DEFAULT(40000), BLOCK_SIZE(1));
+       VALID_RANGE(0, ULONG_MAX - 1), DEFAULT(40000), BLOCK_SIZE(1));
 static Sys_var_mybool Sys_slave_local_timestamp(
        "slave_local_timestamp", "if enabled slave computes the event appying "
        "time value to implicitly affected timestamp columms. Otherwise (default) "


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101120172342-99nahb2qf8fir7ma.bundle
Thread
bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3213 to 3214)WL#5569Andrei Elkin20 Nov