#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3239 Andrei Elkin 2010-12-10
wl#5569 MTS
Improving GAQ in a) limit size to be capable to hold items while all WQ:s are full
b) move_queue_head() contained a flaw to make no progress falsely
c) never let to enque in GAQ while it's full
@ sql/log_event.cc
Fixing impossible gaq_idx == -1. GAQ may not be full at this point.
The total counter of executed groups starts from 1, that is nothing is done yet when 0.
@ sql/rpl_rli_pdb.cc
move_queue_head() contained a flaw to break the progress loop falsely.
Fixed with comparing the current index with the Worker::last_group_done_index
instead of this->last_done. The latter changed to become of pure statictics character and
to contain the total seqno which is guaranteed to grow monotonically by its ulonglong size.
@ sql/rpl_rli_pdb.h
changes due to last_done turned into statistics holder.
@ sql/rpl_slave.cc
Improving GAQ in limit size to be capable to hold items while all WQ:s are full.
Wait to release item at checkpoint() when GAQ is full.
@ sql/sys_vars.cc
opt_mts_coordinator_basic_nap is set to non-zero 5 msecs default value.
modified:
mysql-test/suite/rpl/t/disabled.def
sql/log_event.cc
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def 2010-12-08 00:33:48 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def 2010-12-10 15:50:03 +0000
@@ -16,3 +16,4 @@ rpl_row_event_max_size : Bug#55675 20
rpl_delayed_slave : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
rpl_log_pos : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
rpl_trigger : BUG#58258 2010-11-17 VasilDimov Valgrind: possibly lost from ib_bh_create()
+rpl_parallel_conf_limits : wl#5599 9-12-2010 Andrei Waiting for the recovery wl
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-12-09 17:45:02 +0000
+++ b/sql/log_event.cc 2010-12-10 15:50:03 +0000
@@ -2416,19 +2416,23 @@ Slave_worker *Log_event::get_slave_worke
// B or a DDL
if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
{
+ ulong gaq_idx;
+ const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+
g.master_log_pos= log_pos;
g.group_master_log_pos= g.group_relay_log_pos= 0;
g.group_master_log_name= NULL; // todo: remove
g.group_relay_log_name= NULL;
g.worker_id= (ulong) -1;
- g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+ g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups;
g.checkpoint_log_name= NULL;
g.checkpoint_log_pos= 0;
g.checkpoint_seqno= (uint) -1;
// the last occupied GAQ's array index
- rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
-
+ gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
+
+ DBUG_ASSERT(gaq_idx != (ulong) -1);
DBUG_ASSERT(((Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
group_relay_log_name == NULL);
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-09 17:45:02 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-10 15:50:03 +0000
@@ -700,16 +700,18 @@ bool circular_buffer_queue::gt(ulong i,
/**
The queue is processed from the head item by item
to purge items representing committed groups.
- Progress of each Worker is monitored through @c last_done
- and @c last_group_done_index.
- It's compared first against the polled
- to break out of the loop at once if no progress.
+ Progress in GAQ is assessed through comparision of GAQ index value
+ with Worker's @c last_group_done_index.
+ Purging breaks at a first discovered gap, that is an item
+ that the assinged item->w_id'th Worker has not completed yet.
The caller is supposed to be the checkpoint handler.
A copy of the last discarded item containing
the refreshed value of the committed low-water-mark is stored
into @c lwm container member for further caller's processing.
+ @last_done is updated with the latests total_seqno for each Worker
+ that was met during GAQ parse.
@note dyn-allocated members of Slave_job_group such as
group_relay_log_name as freed here.
@@ -719,27 +721,22 @@ bool circular_buffer_queue::gt(ulong i,
ulong Slave_committed_queue::move_queue_head(DYNAMIC_ARRAY *ws)
{
ulong i, cnt= 0;
+
for (i= e; i != a && !empty();)
{
Slave_worker *w_i;
- Slave_job_group *ptr_g;
- ulong l;
+ Slave_job_group *ptr_g, g;
char grl_name[FN_REFLEN];
+ ulong ind;
grl_name[0]= 0;
ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
if (ptr_g->worker_id == (ulong) -1)
break; /* the head is not even assigned */
get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
- get_dynamic(&last_done, (uchar *) &l, w_i->id);
-
- DBUG_ASSERT(l <= s);
-
- if (l == w_i->last_group_done_index)
- break; /* no progress case */
-
- DBUG_ASSERT(w_i->last_group_done_index >= i ||
- (((i > a && e > a) || a == s) && (w_i->last_group_done_index < a)));
+
+ if (gt(i, w_i->last_group_done_index))
+ break; /* gap at i'th */
// memorize the last met group_relay_log_name
if (ptr_g->group_relay_log_name)
@@ -749,39 +746,39 @@ ulong Slave_committed_queue::move_queue_
ptr_g->group_relay_log_name= NULL; // mark freed
}
- if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
- {
- Slave_job_group g;
- ulong ind= de_queue((uchar*) &g);
+ ind= de_queue((uchar*) &g);
+
+ // stored the memorized name into result struct
+ if (grl_name[0] != 0)
+ strcpy(lwm.group_relay_log_name, grl_name);
+ else
+ lwm.group_relay_log_name[0]= 0;
+
+ DBUG_ASSERT(!ptr_g->group_relay_log_name);
- // stored the memorized name into result struct
- if (grl_name[0] != 0)
- strcpy(lwm.group_relay_log_name, grl_name);
- else
- lwm.group_relay_log_name[0]= 0;
-
- DBUG_ASSERT(!ptr_g->group_relay_log_name);
-
- g.group_relay_log_name= lwm.group_relay_log_name;
- lwm= g; // the result struct is done for the current iteration
-
- /* todo/fixme: the least occupied sorting out can be triggered here */
- /* e.g
- set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
- sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
-
- int ulong_cmp(ulong *id1, ulong *id2)
- {
- return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
- }
- */
- DBUG_ASSERT(ind == i);
- DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+ g.group_relay_log_name= lwm.group_relay_log_name;
+ lwm= g; // the result struct is done for the current iteration
- set_dynamic(&last_done, (uchar*) &i, w_i->id);
+ /* todo/fixme: the least occupied sorting out can be triggered here */
+ /* e.g
+ set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
+ sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
+ int ulong_cmp(ulong *id1, ulong *id2)
+ {
+ return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+ }
+ */
+ DBUG_ASSERT(ind == i);
+ DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+#ifndef DBUG_OFF
+ {
+ ulonglong l;
+ get_dynamic(&last_done, (uchar *) &l, w_i->id);
+ DBUG_ASSERT(l < ptr_g->total_seqno); // there must be some progress
}
- else
- break;
+#endif
+ set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
+
cnt++;
i= (i + 1) % s;
}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-09 17:46:27 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-10 15:50:03 +0000
@@ -188,9 +188,10 @@ public:
: circular_buffer_queue(el_size, max, inc)
{
uint k;
- my_init_dynamic_array(&last_done, sizeof(s), n, 0);
+ ulonglong l= 0;
+ my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
for (k= 0; k < n; k++)
- insert_dynamic(&last_done, (uchar*) &s); // empty for each Worker
+ insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker
lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
lwm.group_relay_log_name[0]= 0;
}
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-09 17:46:27 +0000
+++ b/sql/rpl_slave.cc 2010-12-10 15:50:03 +0000
@@ -3850,7 +3850,7 @@ bool mts_checkpoint_routine(Relay_log_in
set_timespec_nsec(curr_clock, 0);
ulong diff= diff_timespec(curr_clock, rli->last_clock);
- if (diff < period)
+ if (diff < period && !rli->gaq->full())
{
/*
We do not need to execute the checkpoint now because
@@ -3859,9 +3859,15 @@ bool mts_checkpoint_routine(Relay_log_in
DBUG_RETURN(FALSE);
}
- if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
+ do
+ {
+ cnt= rli->gaq->move_queue_head(&rli->workers);
+ } while (cnt == 0 && rli->gaq->full() &&
+ (my_sleep(rli->mts_coordinator_basic_nap), 1));
+ if (cnt == 0)
goto end;
+
/* TODO:
to turn the least occupied selection in terms of jobs pieces
*/
@@ -3955,7 +3961,7 @@ int slave_start_single_worker(Relay_log_
w->usage_partition= 0;
w->last_group_done_index= rli->gaq->s; // out of range
- w->jobs.s= rli->mts_slave_worker_queue_len_max + 1;
+ w->jobs.s= rli->mts_slave_worker_queue_len_max;
my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g n * 10;
for (k= 0; k < w->jobs.s; k++)
insert_dynamic(&w->jobs.Q, (uchar*) &empty);
@@ -4015,16 +4021,17 @@ int slave_start_workers(Relay_log_info *
// GAQ queue holds seqno:s of scheduled groups. C polls workers in
// @c lwm_checkpoint_period to update GAQ (see @c next_event())
- // The length of GAQ is derived from @c mts_slave_worker_queue_len_max to guarantee
- // each assigned job being sent to a WQ will be represented by an item in GAQ.
- // ::mts_slave_worker_queue_len_max is the worst case when all jobs contain
- // one event and map to one worker.
- rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
- sizeof(Slave_job_group),
- ::opt_mts_slave_worker_queue_len_max, n);
+ // The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max to guarantee
+ // each assigned job being sent to a WQ will find room in GAQ.
+ // opt_mts_slave_worker_queue_len_max * num-of-W:s is the max length case
+ // all jobs contain one event.
// size of WQ stays fixed in one slave session
rli->mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
+ rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
+ sizeof(Slave_job_group),
+ rli->slave_parallel_workers *
+ rli->mts_slave_worker_queue_len_max, n);
rli->mts_pending_jobs_size= 0;
rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
rli->mts_wqs_underrun_w_id= (ulong) -1;
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-12-09 13:23:19 +0000
+++ b/sql/sys_vars.cc 2010-12-10 15:50:03 +0000
@@ -3170,7 +3170,7 @@ static Sys_var_ulong Sys_mts_coordinator
"Time in msec to sleep by MTS Coordinator to avoid the Worker queues "
"room overrun",
GLOBAL_VAR(opt_mts_coordinator_basic_nap), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
+ VALID_RANGE(0, ULONG_MAX), DEFAULT(5), BLOCK_SIZE(1));
static Sys_var_ulong Sys_mts_worker_underrun_level(
"opt_mts_worker_underrun_level",
"percent of Worker queue size at which Worker is considered to become "
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101210155003-pknsk1xyuov5y54c.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3239) WL#5569 | Andrei Elkin | 10 Dec |