3223 Andrei Elkin 2010-12-01
wl#5569 MTS
The limit conditions such as WQ len, total WQ:s size related changes.
Also a new test file is added.
@ mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result
new results file.
@ mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
results updated.
@ mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test
Testing two RAM usage by Workers limit parameters.
@ mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
Converting an assert into wait for that condition.
Todo: improve the test to let it run with slave_run_query_in_parallel.
@ sql/log_event.cc
limit condition (wq len, total wql sizes) related changes.
fixing a compilation warn.
@ sql/mysqld.cc
renaming.
@ sql/mysqld.h
renaming.
@ sql/rpl_rli.cc
renaming.
@ sql/rpl_rli.h
s / slave_max_pending_jobs / opt_mts_slave_worker_queue_len_max /
the new name is supposed to indicate the purpose of the entity more clearly.
@ sql/rpl_slave.cc
renaming.
@ sql/sys_vars.cc
renaming.
added:
mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result
mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test
modified:
mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
sql/log_event.cc
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_slave.cc
sql/sys_vars.cc
3222 Andrei Elkin 2010-11-30 [merge]
merging from from wl#5569 repo containing wl#5599 integration
modified:
sql/binlog.cc
sql/log_event.cc
sql/rpl_constants.h
sql/rpl_info.cc
sql/rpl_info.h
sql/rpl_info_dummy.cc
sql/rpl_info_dummy.h
sql/rpl_info_factory.cc
sql/rpl_info_factory.h
sql/rpl_mi.cc
sql/rpl_mi.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conf_limits.result 2010-12-01 17:08:21 +0000
@@ -0,0 +1,47 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+create view coord_wait_list as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+set @save_mts_slave_worker_queue_len_max= @@global.mts_slave_worker_queue_len_max;
+set @@global.mts_slave_worker_queue_len_max= 5;
+include/start_slave.inc
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+select sleep(2);
+sleep(2)
+0
+begin;
+insert into d1.t1 set a=null;
+begin;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+commit;
+*** Coordinator must be waiting a for Worker to process its queue ***
+rollback;
+set @@global.mts_slave_worker_queue_len_max= @save_mts_slave_worker_queue_len_max;
+include/stop_slave.inc
+set @save_mts_pending_jobs_size_max = @@global.mts_pending_jobs_size_max;
+set @@global.mts_pending_jobs_size_max= 1024;
+include/start_slave.inc
+create table d1.t2 (a int auto_increment primary key, b text null) engine=innodb;
+begin;
+insert into d1.t2 set a= 1;
+begin;
+commit;
+*** Coordinator must be waiting for Workers have released pending events mem ***
+rollback;
+set @@global.mts_pending_jobs_size_max= @save_mts_pending_jobs_size_max;
+drop database d1;
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_conflicts.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result 2010-11-26 21:08:30 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result 2010-12-01 17:08:21 +0000
@@ -29,6 +29,7 @@ insert into d3.t1 values (null);
use d1;
insert into d1.t1 values (null);
commit;
+*** Coordinator must be in waiting for a Worker to unassign from a partition ***
rollback;
select count(*) from d1.t1 into @d1;
select count(*) from d2.t1 into @d2;
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_conf_limits.test 2010-12-01 17:08:21 +0000
@@ -0,0 +1,164 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of MTS execution when system meets
+# various limits due to configuration options.
+#
+
+source include/master-slave.inc;
+# no support for Query-log-event in this test
+source include/have_binlog_format_row.inc;
+
+connection slave;
+
+create view coord_wait_list as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+
+# restart in Parallel
+source include/stop_slave.inc;
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+
+
+# max len of WQ
+set @save_mts_slave_worker_queue_len_max= @@global.mts_slave_worker_queue_len_max;
+set @@global.mts_slave_worker_queue_len_max= 5;
+source include/start_slave.inc;
+
+
+connection master;
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+
+
+connection slave;
+
+select sleep(2);
+begin;
+insert into d1.t1 set a=null; # lock a row that master has inserted into
+
+
+connection master;
+
+begin;
+
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+insert into d1.t1 set a=null;
+
+commit;
+
+let $d1_t1_count=`select count(*) from d1.t1`;
+
+connection slave;
+
+--echo *** Coordinator must be waiting a for Worker to process its queue ***
+
+let $count= 1;
+let $table= coord_wait_list;
+source include/wait_until_rows_count.inc;
+
+rollback;
+
+let $count= $d1_t1_count;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+# cleanup of the max len
+set @@global.mts_slave_worker_queue_len_max= @save_mts_slave_worker_queue_len_max;
+
+
+#
+# Max size of Worker queues
+#
+
+--let $mts_max_q_size=1024
+
+connection slave;
+
+source include/stop_slave.inc;
+set @save_mts_pending_jobs_size_max = @@global.mts_pending_jobs_size_max;
+eval set @@global.mts_pending_jobs_size_max= $mts_max_q_size;
+source include/start_slave.inc;
+
+connection master;
+create table d1.t2 (a int auto_increment primary key, b text null) engine=innodb;
+
+connection slave;
+
+# sync_slave_with_master
+--disable_query_log
+--disable_result_log
+select sleep(2);
+--enable_result_log
+--enable_query_log
+
+begin;
+insert into d1.t2 set a= 1;
+
+# master trans structure aims at testing C's wait loop
+
+let $i_loop= 10;
+
+connection master;
+
+begin;
+--disable_query_log
+ eval insert into d1.t2 set a= 1, b= REPEAT('b', 1);
+
+while ($i_loop)
+{
+ eval insert into d1.t2 set b= REPEAT('b', 1);
+ eval insert into d1.t2 set b= REPEAT('b', 1);
+ eval insert into d1.t2 set b= REPEAT('b', 1);
+ eval insert into d1.t2 set b= REPEAT('b', 1);
+ eval insert into d1.t2 set b= REPEAT('b', 1);
+ eval insert into d1.t2 set b= REPEAT('b', 1);
+ eval insert into d1.t2 set b= REPEAT('b', 2* $mts_max_q_size/3);
+ dec $i_loop;
+}
+--enable_query_log
+commit;
+
+let $d1_t2_count=`select count(*) from d1.t2`;
+
+connection slave;
+
+--echo *** Coordinator must be waiting for Workers have released pending events mem ***
+
+let $count= 1;
+let $table= coord_wait_list;
+source include/wait_until_rows_count.inc;
+
+rollback;
+
+let $count= $d1_t2_count;
+let $table= d1.t2;
+source include/wait_until_rows_count.inc;
+
+# cleanup of the max len
+set @@global.mts_pending_jobs_size_max= @save_mts_pending_jobs_size_max;
+
+
+#
+# cleanup
+#
+
+connection master;
+
+drop database d1;
+
+
+# sync_slave_with_master
+connection slave;
+
+drop view coord_wait_list;
+
+--sleep 2
+
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_conflicts.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test 2010-11-26 21:08:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test 2010-12-01 17:08:21 +0000
@@ -32,6 +32,7 @@ source include/master-slave.inc;
# TODO: convert this file into two tests for either value of
# @@global.slave_run_query_in_parallel
+source include/have_binlog_format_row.inc;
connection slave;
@@ -94,18 +95,13 @@ use d1;
insert into d1.t1 values (null); # will be block at this point on Coord
commit;
---sleep 4
-
connection slave;
-if (`SELECT @@global.binlog_format LIKE "row"`)
-{
- if (`select COUNT(*) = 0 FROM coord_wait_list`)
- {
- SELECT * from Information_Schema.processlist;
- --die Appologies, coodinator is supposed to be in the waiting state but it is not
- }
-}
+--echo *** Coordinator must be in waiting for a Worker to unassign from a partition ***
+
+let $count= 1;
+let $table= coord_wait_list;
+source include/wait_until_rows_count.inc;
# release the Worker
rollback;
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-30 14:39:40 +0000
+++ b/sql/log_event.cc 2010-12-01 17:08:21 +0000
@@ -2210,15 +2210,10 @@ Slave_worker *Log_event::get_slave_worke
// B or a DDL
if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
{
- // insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
- // Group_cnt= rli->gaq->e;
- // rli->gaq->en_queue({NULL, W_s});
- g= {
- log_pos,
- NULL,
- (ulong) -1,
- const_cast<Relay_log_info*>(rli)->mts_total_groups++
- };
+ g.master_log_pos= log_pos;
+ g.group_relay_log_name= NULL;
+ g.worker_id= (ulong) -1;
+ g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
// the last occupied GAQ's array index
rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
@@ -2411,6 +2406,9 @@ void append_item_to_jobs(slave_job_item
Slave_worker *w, Relay_log_info *rli)
{
THD *thd= rli->info_thd;
+ int ret= -1;
+ ulonglong new_pend_size= rli->mts_pending_jobs_size +
+ ((Log_event*) (job_item->data))->data_written;
DBUG_ASSERT(thd == current_thd);
thd_proc_info(thd, "Feeding an event to a worker thread");
@@ -2418,44 +2416,30 @@ void append_item_to_jobs(slave_job_item
mysql_mutex_lock(&rli->pending_jobs_lock);
// C waits basing on *data* sizes in the queues
- while (rli->mts_pending_jobs_size +
- ((Log_event*) (job_item->data))->data_written
- > rli->mts_pending_jobs_size_max)
+ while (new_pend_size > rli->mts_pending_jobs_size_max)
{
const char *old_msg;
- rli->mts_wqs_oversize= TRUE;
- rli->wait_jobs++;
- old_msg= thd->enter_cond(&rli->pending_jobs_cond,
- &rli->pending_jobs_lock,
- "Waiting for Workers to unload queues");
- mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
- thd->exit_cond(old_msg);
- mysql_mutex_lock(&rli->pending_jobs_lock);
- if (thd->killed)
- return;
- }
- rli->mts_pending_jobs_size += ((Log_event*) (job_item->data))->data_written;
+ const char info_format[]=
+ "Waiting for Slave Workers to free pending events, requested size %lu";
+ char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)];
-#if 0
- while (rli->pending_jobs >= rli->slave_pending_jobs_max)
- {
- const char *old_msg;
- // Coordinator needs to wait to avoid the pending jobs queue overrun
+ sprintf(wait_info, info_format, new_pend_size);
+ rli->mts_wqs_oversize= TRUE;
rli->wait_jobs++;
- old_msg= thd->enter_cond(&rli->pending_jobs_cond,
- &rli->pending_jobs_lock,
- "Waiting for an event from sql thread");
+ old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
+ wait_info);
mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
thd->exit_cond(old_msg);
mysql_mutex_lock(&rli->pending_jobs_lock);
if (thd->killed)
return;
+ new_pend_size= rli->mts_pending_jobs_size +
+ ((Log_event*) (job_item->data))->data_written;
}
-#endif
-
+ rli->mts_pending_jobs_size= new_pend_size;
rli->stmt_jobs++;
rli->pending_jobs++;
-
+
mysql_mutex_unlock(&rli->pending_jobs_lock);
// sleep while all queue lengths are gt Underrun
@@ -2469,33 +2453,38 @@ void append_item_to_jobs(slave_job_item
my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
}
- if (!w->info_thd->killed)
- {
- int ret;
+ mysql_mutex_lock(&w->jobs_lock);
+ // possible WQ overfill
+ while (!w->info_thd->killed && !thd->killed &&
+ (ret= en_queue(&w->jobs, job_item)) == -1)
+ {
+ const char *old_msg;
+ const char info_format[]=
+ "Waiting for Slave Worker %d queue: max len %lu, actual len %lu";
+ char wait_info[sizeof(info_format) + 4*sizeof(w->id) +
+ 4*sizeof(w->jobs.s) + 4*sizeof(w->jobs.len)];
+
+ sprintf(wait_info, info_format, w->id, w->jobs.s, w->jobs.len);
+ old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, wait_info);
+ w->jobs.overfill= TRUE;
+ w->jobs.waited_overfill++;
+ mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
+ thd->exit_cond(old_msg);
+
mysql_mutex_lock(&w->jobs_lock);
-
- // possible WQ overfill
- while (!thd->killed && (ret= en_queue(&w->jobs, job_item)) == -1)
- {
- const char *old_msg;
- old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
- "Waiting for an event from sql thread");
- w->jobs.overfill= TRUE;
- w->jobs.waited_overfill++;
- mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
- thd->exit_cond(old_msg);
-
- mysql_mutex_lock(&w->jobs_lock);
- }
+ }
+ if (ret != -1)
+ {
w->curr_jobs++;
if (w->jobs.len == 1)
mysql_cond_signal(&w->jobs_cond);
-
+
mysql_mutex_unlock(&w->jobs_lock);
}
else
{
+ mysql_mutex_unlock(&w->jobs_lock);
mysql_mutex_lock(&rli->pending_jobs_lock);
rli->pending_jobs--; // roll back of the prev incr
mysql_mutex_unlock(&rli->pending_jobs_lock);
@@ -2533,7 +2522,7 @@ int Log_event::apply_event(Relay_log_inf
res= ev_begin->do_apply_event(rli);
delete ev_begin;
- /* B appears to be serial, reset parallel stautus of group
+ /* B appears to be serial, reset parallel status of group
because the following T won't do that */
c_rli->curr_group_seen_begin= FALSE;
@@ -2574,15 +2563,17 @@ int Log_event::apply_event(Relay_log_inf
get_dynamic(&c_rli->curr_group_da, (uchar*) &da_item.data, i);
append_item_to_jobs(&da_item, w, c_rli);
}
-
if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
{
+ // reallocate to less mem
+
DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
+
c_rli->curr_group_da.elements= rli->curr_group_da.max_element;
c_rli->curr_group_da.max_element= 0;
freeze_size(&c_rli->curr_group_da); // restores max_element
}
- c_rli->curr_group_da.elements= 0; // to keep max_element-based allocation
+ c_rli->curr_group_da.elements= 0;
}
append_item_to_jobs(job_item, w, c_rli);
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2010-11-30 14:02:15 +0000
+++ b/sql/mysqld.cc 2010-12-01 17:08:21 +0000
@@ -461,7 +461,7 @@ uint slave_net_timeout;
ulong slave_exec_mode_options;
ulonglong slave_type_conversions_options;
ulong slave_parallel_workers;
-ulong slave_max_pending_jobs;
+ulong opt_mts_slave_worker_queue_len_max;
my_bool slave_local_timestamp_opt;
my_bool opt_slave_run_query_in_parallel;
ulong opt_mts_partition_hash_soft_max;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2010-11-30 14:02:15 +0000
+++ b/sql/mysqld.h 2010-12-01 17:08:21 +0000
@@ -173,7 +173,7 @@ extern LEX_CSTRING reason_slave_blocked;
extern ulong slave_trans_retries;
extern uint slave_net_timeout;
extern ulong slave_parallel_workers;
-extern ulong slave_max_pending_jobs;
+extern ulong opt_mts_slave_worker_queue_len_max;
extern my_bool slave_local_timestamp_opt;
extern my_bool opt_slave_run_query_in_parallel;
extern ulong opt_mts_partition_hash_soft_max;
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli.cc 2010-12-01 17:08:21 +0000
@@ -93,7 +93,7 @@ Relay_log_info::Relay_log_info(bool is_s
of non-processed yet jobs becomes bigger than the limit's value
MHS_todo: consider a memory-size based param
*/
- slave_pending_jobs_max= ::slave_max_pending_jobs;
+ mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
//
// TODO -- ANDREI --- You need to take care of possible failures related to
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli.h 2010-12-01 17:08:21 +0000
@@ -424,18 +424,18 @@ public:
ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch)
mysql_mutex_t pending_jobs_lock;
mysql_cond_t pending_jobs_cond;
- ulong slave_pending_jobs_max;
- ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s
- ulonglong mts_pending_jobs_size_max; // the max forcing to wait by C
- bool mts_wqs_oversize; // C raises flag to wait some memory's released
+ ulong mts_slave_worker_queue_len_max;
+ ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s
+ ulonglong mts_pending_jobs_size_max; // the max forcing to wait by C
+ bool mts_wqs_oversize; // C raises flag to wait some memory's released
Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
Slave_committed_queue *gaq;
DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
- DYNAMIC_ARRAY curr_group_da; // deferred array to hold part-info-free events
+ DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events
bool curr_group_seen_begin; // current group started with B-event or not
bool run_query_in_parallel; // Query's default db not the actual db as part
volatile ulong mts_wqs_underrun_w_id; // Id of a Worker whose queue is getting empty
- volatile long mts_wqs_overrun; // W to incr and decr
+ volatile long mts_wqs_overrun; // W to incr and decr
long mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
Slave_worker* get_current_worker() const;
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-11-30 14:39:40 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-01 17:08:21 +0000
@@ -441,6 +441,9 @@ err:
/**
least_occupied in partition number sense.
+ This might be too coarse and computing based on assigned task
+ is a possibility.
+ Todo: combine two e.g by means of 2-index vector of weights.
*/
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
{
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-30 14:39:40 +0000
+++ b/sql/rpl_slave.cc 2010-12-01 17:08:21 +0000
@@ -3563,7 +3563,7 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&w->jobs_lock);
- DBUG_ASSERT(w->jobs.len == rli->slave_pending_jobs_max + 1);
+ DBUG_ASSERT(w->jobs.len == rli->mts_slave_worker_queue_len_max + 1);
w->jobs.len= 0;
mysql_cond_signal(&w->jobs_cond); // ready for duty
@@ -3604,7 +3604,7 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_unlock(&rli->pending_jobs_lock);
mysql_mutex_lock(&w->jobs_lock);
- w->jobs.len= rli->slave_pending_jobs_max + 1;
+ w->jobs.len= rli->mts_slave_worker_queue_len_max + 1;
mysql_cond_signal(&w->jobs_cond); // famous last goodbye
mysql_mutex_unlock(&w->jobs_lock);
@@ -3661,7 +3661,7 @@ int slave_start_single_worker(Relay_log_
w->usage_partition= 0;
w->last_group_done_index= rli->gaq->s; // out of range
- w->jobs.s= rli->slave_pending_jobs_max + 1;
+ w->jobs.s= rli->mts_slave_worker_queue_len_max + 1;
my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g n * 10;
for (k= 0; k < w->jobs.s; k++)
insert_dynamic(&w->jobs.Q, (uchar*) &empty);
@@ -3670,7 +3670,7 @@ int slave_start_single_worker(Relay_log_
w->jobs.e= w->jobs.s;
w->jobs.a= 0;
- w->jobs.len= rli->slave_pending_jobs_max + 1; // to first handshake
+ w->jobs.len= rli->mts_slave_worker_queue_len_max + 1; // to first handshake
w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
w->jobs.waited_overfill= 0;
w->wq_overrun_set= FALSE;
@@ -3719,16 +3719,16 @@ int slave_start_workers(Relay_log_info *
// GAQ queue holds seqno:s of scheduled groups. C polls workers in
// @c lwm_checkpoint_period to update GAQ (see @c next_event())
- // The length of GAQ is derived from @c slave_max_pending_jobs to guarantee
+ // The length of GAQ is derived from @c mts_slave_worker_queue_len_max to guarantee
// each assigned job being sent to a WQ will be represented by an item in GAQ.
- // ::slave_max_pending_jobs is the worst case when all jobs contain
+ // ::mts_slave_worker_queue_len_max is the worst case when all jobs contain
// one event and map to one worker.
rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
sizeof(Slave_job_group),
- ::slave_max_pending_jobs, n);
+ ::opt_mts_slave_worker_queue_len_max, n);
// size of WQ stays fixed in one slave session
- rli->slave_pending_jobs_max= ::slave_max_pending_jobs;
+ rli->mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
rli->mts_pending_jobs_size= 0;
rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
rli->mts_wqs_underrun_w_id= (ulong) -1;
@@ -3771,7 +3771,7 @@ void slave_stop_workers(Relay_log_info *
mysql_mutex_lock(&w->jobs_lock);
- if (w->jobs.len == rli->slave_pending_jobs_max + 1)
+ if (w->jobs.len == rli->mts_slave_worker_queue_len_max + 1)
{
mysql_mutex_unlock(&w->jobs_lock);
continue;
@@ -3793,7 +3793,7 @@ void slave_stop_workers(Relay_log_info *
w->end_info();
mysql_mutex_lock(&w->jobs_lock);
- while (w->jobs.len != rli->slave_pending_jobs_max + 1)
+ while (w->jobs.len != rli->mts_slave_worker_queue_len_max + 1)
{
const char *save_proc_info;
save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-11-30 14:02:15 +0000
+++ b/sql/sys_vars.cc 2010-12-01 17:08:21 +0000
@@ -3109,15 +3109,14 @@ static Sys_var_ulong Sys_slave_parallel_
"Number of worker threads for executing events in parallel ",
GLOBAL_VAR(slave_parallel_workers), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(4), BLOCK_SIZE(1));
-
-// TODO: redefine slave_max_pending_jobs
-
-static Sys_var_ulong Sys_slave_max_pending_jobs(
- "slave_max_pending_jobs",
- "Number of replication events read out of Relay log and still not applied. "
- "The coordinator thread suspends further jobs assigning until "
- "conditions have been improved ",
- GLOBAL_VAR(slave_max_pending_jobs), CMD_LINE(REQUIRED_ARG),
+static Sys_var_ulong Sys_mts_slave_worker_queue_len_max(
+ "mts_slave_worker_queue_len_max",
+ "Max length of one MTS Worker queue. Presence in the queue indicates "
+ "a replication event was read out of Relay log and not yet applied. "
+ "Notice the max size of event data in all queues are governed by "
+ "mts_pending_jobs_size_max. Whichever limit is reached Coordinator thread"
+ "suspends further jobs assigning until conditions have been improved.",
+ GLOBAL_VAR(opt_mts_slave_worker_queue_len_max), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX - 1), DEFAULT(40000), BLOCK_SIZE(1));
static Sys_var_mybool Sys_slave_local_timestamp(
"slave_local_timestamp", "if enabled slave computes the event appying "
@@ -3137,7 +3136,7 @@ static Sys_var_ulong Sys_mts_partition_h
GLOBAL_VAR(opt_mts_partition_hash_soft_max), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(16), BLOCK_SIZE(1));
static Sys_var_ulonglong Sys_mts_pending_jobs_size_max(
- "opt_mts_pending_jobs_size_max",
+ "mts_pending_jobs_size_max",
"Max size of Slave Worker queues holding yet not applied events."
"The least possible value must be not less than the master size "
"max_allowed_packet.",
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101201170821-7iwx57b885qn2ij3.bundle
| Thread |
|---|
| • bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3222 to 3223)WL#5569 | Andrei Elkin | 1 Dec |