List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 1 2010 5:08pm
Subject:bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3223) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3223 Andrei Elkin	2010-12-01
      wl#5569 MTS
      
      The limit conditions such as WQ len, total WQ:s size related changes.
      Also a new test file is added.
     @ mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result
        new results file.
     @ mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
        results updated.
     @ mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test
        Testing two RAM usage by Workers limit parameters.
     @ mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
        Converting an assert into wait for that condition.
        Todo: improve the test to let it run with slave_run_query_in_parallel.
     @ sql/log_event.cc
        limit condition (wq len, total wql sizes) related changes.
        fixing a compilation warn.
     @ sql/mysqld.cc
        renaming.
     @ sql/mysqld.h
        renaming.
     @ sql/rpl_rli.cc
        renaming.
     @ sql/rpl_rli.h
        s / slave_max_pending_jobs / opt_mts_slave_worker_queue_len_max /
        the new name is supposed to indicate the purpose of the entity more clearly.
     @ sql/rpl_slave.cc
        renaming.
     @ sql/sys_vars.cc
        renaming.

    added:
      mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result
      mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test
    modified:
      mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
      mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
      sql/log_event.cc
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_slave.cc
      sql/sys_vars.cc
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result	2010-12-01 17:08:21 +0000
@@ -0,0 +1,47 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+create view coord_wait_list  as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+set @save_mts_slave_worker_queue_len_max= @@global.mts_slave_worker_queue_len_max;
+set @@global.mts_slave_worker_queue_len_max= 5;
+include/start_slave.inc
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+select sleep(2);
+sleep(2)
+0
+begin;
+insert into d1.t1 set a=null;
+begin;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+commit;
+*** Coordinator must be waiting a for Worker to process its queue ***
+rollback;
+set @@global.mts_slave_worker_queue_len_max= @save_mts_slave_worker_queue_len_max;
+include/stop_slave.inc
+set @save_mts_pending_jobs_size_max = @@global.mts_pending_jobs_size_max;
+set @@global.mts_pending_jobs_size_max= 1024;
+include/start_slave.inc
+create table d1.t2 (a int auto_increment primary key, b text null) engine=innodb;
+begin;
+insert into d1.t2 set a= 1;
+begin;
+commit;
+*** Coordinator must be waiting for Workers have released pending events mem ***
+rollback;
+set @@global.mts_pending_jobs_size_max= @save_mts_pending_jobs_size_max;
+drop database d1;
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_conflicts.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result	2010-11-26 21:08:30 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result	2010-12-01 17:08:21 +0000
@@ -29,6 +29,7 @@ insert into d3.t1 values (null);
 use d1;
 insert into d1.t1 values (null);
 commit;
+*** Coordinator must be in waiting for a Worker to unassign from a partition ***
 rollback;
 select count(*) from d1.t1 into @d1;
 select count(*) from d2.t1 into @d2;

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test	2010-12-01 17:08:21 +0000
@@ -0,0 +1,164 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of MTS execution when system meets
+# various limits due to configuration options.
+#
+
+source include/master-slave.inc;
+# no support for Query-log-event in this test
+source include/have_binlog_format_row.inc;
+
+connection slave;
+
+create view coord_wait_list  as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+
+# restart in Parallel
+source include/stop_slave.inc;
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+
+
+# max len of WQ
+set @save_mts_slave_worker_queue_len_max= @@global.mts_slave_worker_queue_len_max;
+set @@global.mts_slave_worker_queue_len_max= 5;
+source include/start_slave.inc;
+
+
+connection master;
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+
+
+connection slave;
+
+select sleep(2);
+begin;
+insert into d1.t1 set a=null; # lock a row that master has inserted into
+
+
+connection master;
+
+begin;
+
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+
+commit;
+
+let $d1_t1_count=`select count(*) from d1.t1`;
+
+connection slave;
+
+--echo *** Coordinator must be waiting a for Worker to process its queue ***
+
+let $count= 1;
+let $table= coord_wait_list;
+source include/wait_until_rows_count.inc;
+
+rollback;
+
+let $count= $d1_t1_count;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+# cleanup of the max len
+set @@global.mts_slave_worker_queue_len_max= @save_mts_slave_worker_queue_len_max;
+
+
+#
+# Max size of Worker queues
+#
+
+--let $mts_max_q_size=1024
+
+connection slave;
+
+source include/stop_slave.inc;
+set @save_mts_pending_jobs_size_max = @@global.mts_pending_jobs_size_max;
+eval set @@global.mts_pending_jobs_size_max= $mts_max_q_size;
+source include/start_slave.inc;
+
+connection master;
+create table d1.t2 (a int auto_increment primary key, b text null) engine=innodb;
+
+connection slave;
+
+# sync_slave_with_master
+--disable_query_log
+--disable_result_log
+select sleep(2);
+--enable_result_log
+--enable_query_log
+
+begin;
+insert into d1.t2 set a= 1;
+
+# master trans structure aims at testing C's wait loop
+
+let $i_loop= 10;
+
+connection master;
+
+begin;
+--disable_query_log
+  eval insert into d1.t2 set a= 1, b=  REPEAT('b', 1);
+
+while ($i_loop)
+{
+  eval insert into d1.t2 set       b=  REPEAT('b', 1);
+  eval insert into d1.t2 set       b=  REPEAT('b', 1);
+  eval insert into d1.t2 set       b=  REPEAT('b', 1);
+  eval insert into d1.t2 set       b=  REPEAT('b', 1);
+  eval insert into d1.t2 set       b=  REPEAT('b', 1);
+  eval insert into d1.t2 set       b=  REPEAT('b', 1);
+  eval insert into d1.t2 set       b=  REPEAT('b', 2* $mts_max_q_size/3);
+  dec $i_loop;
+}
+--enable_query_log
+commit;
+
+let $d1_t2_count=`select count(*) from d1.t2`;
+
+connection slave;
+
+--echo *** Coordinator must be waiting for Workers have released pending events mem ***
+
+let $count= 1;
+let $table= coord_wait_list;
+source include/wait_until_rows_count.inc;
+
+rollback;
+
+let $count= $d1_t2_count;
+let $table= d1.t2;
+source include/wait_until_rows_count.inc;
+
+# cleanup of the max len
+set @@global.mts_pending_jobs_size_max= @save_mts_pending_jobs_size_max;
+
+
+#
+# cleanup
+#
+
+connection master;
+
+drop database d1;
+
+
+# sync_slave_with_master
+connection slave;
+
+drop view coord_wait_list;
+
+--sleep 2
+
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_conflicts.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test	2010-11-26 21:08:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test	2010-12-01 17:08:21 +0000
@@ -32,6 +32,7 @@ source include/master-slave.inc;
 
 #  TODO: convert this file into two tests for either value of 
 #        @@global.slave_run_query_in_parallel
+source include/have_binlog_format_row.inc;
 
 connection slave;
 
@@ -94,18 +95,13 @@ use d1;
 insert into d1.t1 values (null); #  will be block at this point on Coord
 commit;
 
---sleep 4
-
 connection slave;
 
-if (`SELECT @@global.binlog_format LIKE "row"`)
-{
-    if (`select COUNT(*) = 0 FROM coord_wait_list`)
-    {
-	SELECT *  from Information_Schema.processlist;
-	--die Appologies, coodinator is supposed to be in the waiting state but it is not
-    }
-}
+--echo *** Coordinator must be in waiting for a Worker to unassign from a partition ***
+
+let $count= 1;
+let $table= coord_wait_list;
+source include/wait_until_rows_count.inc;
 
 # release the Worker
 rollback;

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-30 14:39:40 +0000
+++ b/sql/log_event.cc	2010-12-01 17:08:21 +0000
@@ -2210,15 +2210,10 @@ Slave_worker *Log_event::get_slave_worke
   // B or a DDL
   if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
   {
-    // insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
-    // Group_cnt= rli->gaq->e;
-    // rli->gaq->en_queue({NULL, W_s}); 
-    g= {
-      log_pos,
-      NULL,
-      (ulong) -1,
-      const_cast<Relay_log_info*>(rli)->mts_total_groups++
-    };
+    g.master_log_pos= log_pos;
+    g.group_relay_log_name= NULL;
+    g.worker_id= (ulong) -1;
+    g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
 
     // the last occupied GAQ's array index
     rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
@@ -2411,6 +2406,9 @@ void append_item_to_jobs(slave_job_item 
                          Slave_worker *w, Relay_log_info *rli)
 {
   THD *thd= rli->info_thd;
+  int ret= -1;
+  ulonglong new_pend_size= rli->mts_pending_jobs_size +
+    ((Log_event*) (job_item->data))->data_written;
 
   DBUG_ASSERT(thd == current_thd);
   thd_proc_info(thd, "Feeding an event to a worker thread");
@@ -2418,44 +2416,30 @@ void append_item_to_jobs(slave_job_item 
   mysql_mutex_lock(&rli->pending_jobs_lock);
 
   // C waits basing on *data* sizes in the queues
-  while (rli->mts_pending_jobs_size +
-         ((Log_event*) (job_item->data))->data_written
-         > rli->mts_pending_jobs_size_max)
+  while (new_pend_size > rli->mts_pending_jobs_size_max)
   {
     const char *old_msg;
-    rli->mts_wqs_oversize= TRUE;
-    rli->wait_jobs++;
-    old_msg= thd->enter_cond(&rli->pending_jobs_cond,
-                             &rli->pending_jobs_lock,
-                             "Waiting for Workers to unload queues");
-    mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
-    thd->exit_cond(old_msg);
-    mysql_mutex_lock(&rli->pending_jobs_lock);
-    if (thd->killed)
-      return;
-  }
-  rli->mts_pending_jobs_size += ((Log_event*) (job_item->data))->data_written;
+    const char info_format[]=
+      "Waiting for Slave Workers to free pending events, requested size %lu";
+    char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)];
 
-#if 0  
-  while (rli->pending_jobs >= rli->slave_pending_jobs_max)
-  {
-    const char *old_msg;
-    //   Coordinator needs to wait to avoid the pending jobs queue overrun
+    sprintf(wait_info, info_format, new_pend_size);
+    rli->mts_wqs_oversize= TRUE;
     rli->wait_jobs++;
-    old_msg= thd->enter_cond(&rli->pending_jobs_cond,
-                                      &rli->pending_jobs_lock,
-                                      "Waiting for an event from sql thread");
+    old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
+                             wait_info);
     mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
     thd->exit_cond(old_msg);
     mysql_mutex_lock(&rli->pending_jobs_lock);
     if (thd->killed)
       return;
+    new_pend_size= rli->mts_pending_jobs_size +
+      ((Log_event*) (job_item->data))->data_written;
   }
-#endif
-
+  rli->mts_pending_jobs_size= new_pend_size;
   rli->stmt_jobs++;
   rli->pending_jobs++;
-  
+
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   // sleep while all queue lengths are gt Underrun
@@ -2469,33 +2453,38 @@ void append_item_to_jobs(slave_job_item 
     my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
   }
 
-  if (!w->info_thd->killed)
-  {
-    int ret;
+  mysql_mutex_lock(&w->jobs_lock);
 
+  // possible WQ overfill
+  while (!w->info_thd->killed && !thd->killed &&
+         (ret= en_queue(&w->jobs, job_item)) == -1)
+  {
+    const char *old_msg;
+    const char info_format[]=
+      "Waiting for Slave Worker %d queue: max len %lu, actual len %lu";
+    char wait_info[sizeof(info_format) + 4*sizeof(w->id) +
+                   4*sizeof(w->jobs.s) + 4*sizeof(w->jobs.len)];
+    
+    sprintf(wait_info, info_format, w->id, w->jobs.s, w->jobs.len);
+    old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, wait_info);
+    w->jobs.overfill= TRUE;
+    w->jobs.waited_overfill++;
+    mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
+    thd->exit_cond(old_msg);
+    
     mysql_mutex_lock(&w->jobs_lock);
-
-    // possible WQ overfill
-    while (!thd->killed && (ret= en_queue(&w->jobs, job_item)) == -1)
-    {
-      const char *old_msg;
-      old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
-                               "Waiting for an event from sql thread");
-      w->jobs.overfill= TRUE;
-      w->jobs.waited_overfill++;
-      mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
-      thd->exit_cond(old_msg);
-
-      mysql_mutex_lock(&w->jobs_lock);
-    }
+  }
+  if (ret != -1)
+  {
     w->curr_jobs++;
     if (w->jobs.len == 1)
       mysql_cond_signal(&w->jobs_cond);
-
+    
     mysql_mutex_unlock(&w->jobs_lock);
   }
   else
   {
+    mysql_mutex_unlock(&w->jobs_lock);
     mysql_mutex_lock(&rli->pending_jobs_lock);
     rli->pending_jobs--; // roll back of the prev incr
     mysql_mutex_unlock(&rli->pending_jobs_lock);
@@ -2533,7 +2522,7 @@ int Log_event::apply_event(Relay_log_inf
 
         res= ev_begin->do_apply_event(rli);
         delete ev_begin;
-        /* B appears to be serial, reset parallel stautus of group 
+        /* B appears to be serial, reset parallel status of group 
            because the following T won't do that */
         c_rli->curr_group_seen_begin= FALSE;
 
@@ -2574,15 +2563,17 @@ int Log_event::apply_event(Relay_log_inf
       get_dynamic(&c_rli->curr_group_da, (uchar*) &da_item.data, i);
       append_item_to_jobs(&da_item, w, c_rli);
     }
-    
     if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
     {
+      // reallocate to less mem
+      
       DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
+      
       c_rli->curr_group_da.elements= rli->curr_group_da.max_element;
       c_rli->curr_group_da.max_element= 0;
       freeze_size(&c_rli->curr_group_da); // restores max_element
     }
-    c_rli->curr_group_da.elements= 0;     // to keep max_element-based allocation
+    c_rli->curr_group_da.elements= 0;
   }
 
   append_item_to_jobs(job_item, w, c_rli);

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-11-30 14:02:15 +0000
+++ b/sql/mysqld.cc	2010-12-01 17:08:21 +0000
@@ -461,7 +461,7 @@ uint  slave_net_timeout;
 ulong slave_exec_mode_options;
 ulonglong slave_type_conversions_options;
 ulong slave_parallel_workers;
-ulong slave_max_pending_jobs;
+ulong opt_mts_slave_worker_queue_len_max;
 my_bool slave_local_timestamp_opt;
 my_bool opt_slave_run_query_in_parallel;
 ulong opt_mts_partition_hash_soft_max;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-11-30 14:02:15 +0000
+++ b/sql/mysqld.h	2010-12-01 17:08:21 +0000
@@ -173,7 +173,7 @@ extern LEX_CSTRING reason_slave_blocked;
 extern ulong slave_trans_retries;
 extern uint  slave_net_timeout;
 extern ulong slave_parallel_workers;
-extern ulong slave_max_pending_jobs;
+extern ulong opt_mts_slave_worker_queue_len_max;
 extern my_bool slave_local_timestamp_opt;
 extern my_bool opt_slave_run_query_in_parallel;
 extern ulong opt_mts_partition_hash_soft_max;

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli.cc	2010-12-01 17:08:21 +0000
@@ -93,7 +93,7 @@ Relay_log_info::Relay_log_info(bool is_s
      of non-processed yet jobs becomes bigger than the limit's value
      MHS_todo: consider a memory-size based param
   */
-  slave_pending_jobs_max= ::slave_max_pending_jobs;
+  mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
 
   //
   // TODO -- ANDREI --- You need to take care of possible failures related to

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli.h	2010-12-01 17:08:21 +0000
@@ -424,18 +424,18 @@ public:
   ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch)
   mysql_mutex_t pending_jobs_lock;
   mysql_cond_t pending_jobs_cond;
-  ulong   slave_pending_jobs_max;
-  ulonglong   mts_pending_jobs_size;           // actual mem usage by WQ:s
-  ulonglong   mts_pending_jobs_size_max;       // the max forcing to wait by C
-  bool    mts_wqs_oversize;                // C raises flag to wait some memory's released
+  ulong       mts_slave_worker_queue_len_max;
+  ulonglong   mts_pending_jobs_size;      // actual mem usage by WQ:s
+  ulonglong   mts_pending_jobs_size_max;  // the max forcing to wait by C
+  bool    mts_wqs_oversize;           // C raises flag to wait some memory's released
   Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   Slave_committed_queue *gaq;
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
-  DYNAMIC_ARRAY curr_group_da;  // deferred array to hold part-info-free events
+  DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
   bool run_query_in_parallel;   // Query's default db not the actual db as part
   volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
-  volatile long mts_wqs_overrun;    // W to incr and decr
+  volatile long mts_wqs_overrun;   // W to incr and decr
   long  mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
   ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
   Slave_worker* get_current_worker() const;

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-01 17:08:21 +0000
@@ -441,6 +441,9 @@ err:
 
 /**
    least_occupied in partition number sense.
+   This might be too coarse and computing based on assigned task
+   is a possibility.
+   Todo: combine two e.g by means of 2-index vector of weights.
 */
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
 {

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-30 14:39:40 +0000
+++ b/sql/rpl_slave.cc	2010-12-01 17:08:21 +0000
@@ -3563,7 +3563,7 @@ pthread_handler_t handle_slave_worker(vo
 
   mysql_mutex_lock(&w->jobs_lock);
 
-  DBUG_ASSERT(w->jobs.len == rli->slave_pending_jobs_max + 1);
+  DBUG_ASSERT(w->jobs.len == rli->mts_slave_worker_queue_len_max + 1);
   w->jobs.len= 0;
   mysql_cond_signal(&w->jobs_cond);  // ready for duty
 
@@ -3604,7 +3604,7 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   mysql_mutex_lock(&w->jobs_lock);
-  w->jobs.len= rli->slave_pending_jobs_max + 1;
+  w->jobs.len= rli->mts_slave_worker_queue_len_max + 1;
   mysql_cond_signal(&w->jobs_cond);  // famous last goodbye
   mysql_mutex_unlock(&w->jobs_lock);
 
@@ -3661,7 +3661,7 @@ int slave_start_single_worker(Relay_log_
   w->usage_partition= 0;
   w->last_group_done_index= rli->gaq->s; // out of range
 
-  w->jobs.s= rli->slave_pending_jobs_max + 1;
+  w->jobs.s= rli->mts_slave_worker_queue_len_max + 1;
   my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
   for (k= 0; k < w->jobs.s; k++)
     insert_dynamic(&w->jobs.Q, (uchar*) &empty);
@@ -3670,7 +3670,7 @@ int slave_start_single_worker(Relay_log_
   
   w->jobs.e= w->jobs.s;
   w->jobs.a= 0;
-  w->jobs.len= rli->slave_pending_jobs_max + 1; // to first handshake
+  w->jobs.len= rli->mts_slave_worker_queue_len_max + 1; // to first handshake
   w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
   w->jobs.waited_overfill= 0;
   w->wq_overrun_set= FALSE;
@@ -3719,16 +3719,16 @@ int slave_start_workers(Relay_log_info *
 
   // GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
   //      @c lwm_checkpoint_period to update GAQ (see @c next_event())
-  // The length of GAQ is derived from @c slave_max_pending_jobs to guarantee
+  // The length of GAQ is derived from @c mts_slave_worker_queue_len_max to guarantee
   // each assigned job being sent to a WQ will be represented by an item in GAQ.
-  // ::slave_max_pending_jobs is the worst case when all jobs contain
+  // ::mts_slave_worker_queue_len_max is the worst case when all jobs contain
   // one event and map to one worker.
   rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
                                       sizeof(Slave_job_group),
-                                      ::slave_max_pending_jobs, n);
+                                      ::opt_mts_slave_worker_queue_len_max, n);
 
   // size of WQ stays fixed in one slave session
-  rli->slave_pending_jobs_max= ::slave_max_pending_jobs;
+  rli->mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
   rli->mts_pending_jobs_size= 0;
   rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
   rli->mts_wqs_underrun_w_id= (ulong) -1;
@@ -3771,7 +3771,7 @@ void slave_stop_workers(Relay_log_info *
     
     mysql_mutex_lock(&w->jobs_lock);
     
-    if (w->jobs.len == rli->slave_pending_jobs_max + 1)
+    if (w->jobs.len == rli->mts_slave_worker_queue_len_max + 1)
     {
       mysql_mutex_unlock(&w->jobs_lock);
       continue;
@@ -3793,7 +3793,7 @@ void slave_stop_workers(Relay_log_info *
     w->end_info();
 
     mysql_mutex_lock(&w->jobs_lock);
-    while (w->jobs.len != rli->slave_pending_jobs_max + 1)
+    while (w->jobs.len != rli->mts_slave_worker_queue_len_max + 1)
     {
       const char *save_proc_info;
       save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-11-30 14:02:15 +0000
+++ b/sql/sys_vars.cc	2010-12-01 17:08:21 +0000
@@ -3109,15 +3109,14 @@ static Sys_var_ulong Sys_slave_parallel_
        "Number of worker threads for executing events in parallel ",
        GLOBAL_VAR(slave_parallel_workers), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, ULONG_MAX), DEFAULT(4), BLOCK_SIZE(1));
-
-// TODO: redefine slave_max_pending_jobs
-
-static Sys_var_ulong Sys_slave_max_pending_jobs(
-       "slave_max_pending_jobs",
-       "Number of replication events read out of Relay log and still not applied. "
-       "The coordinator thread suspends further jobs assigning until "
-       "conditions have been improved ",
-       GLOBAL_VAR(slave_max_pending_jobs), CMD_LINE(REQUIRED_ARG),
+static Sys_var_ulong Sys_mts_slave_worker_queue_len_max(
+       "mts_slave_worker_queue_len_max",
+       "Max length of one MTS Worker queue. Presence in the queue indicates "
+       "a replication event was read out of Relay log and not yet applied. "
+       "Notice the max size of event data in all queues are governed by "
+       "mts_pending_jobs_size_max. Whichever limit is reached Coordinator thread"
+       "suspends further jobs assigning until conditions have been improved.",
+       GLOBAL_VAR(opt_mts_slave_worker_queue_len_max), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, ULONG_MAX - 1), DEFAULT(40000), BLOCK_SIZE(1));
 static Sys_var_mybool Sys_slave_local_timestamp(
        "slave_local_timestamp", "if enabled slave computes the event appying "
@@ -3137,7 +3136,7 @@ static Sys_var_ulong Sys_mts_partition_h
        GLOBAL_VAR(opt_mts_partition_hash_soft_max), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, ULONG_MAX), DEFAULT(16), BLOCK_SIZE(1));
 static Sys_var_ulonglong Sys_mts_pending_jobs_size_max(
-       "opt_mts_pending_jobs_size_max",
+       "mts_pending_jobs_size_max",
        "Max size of Slave Worker queues holding yet not applied events."
        "The least possible value must be not less than the master size "
        "max_allowed_packet.",


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101201170821-7iwx57b885qn2ij3.bundle
Thread
bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3223) WL#5569Andrei Elkin1 Dec