3240 Andrei Elkin 2010-12-10 [merge]
merge from wl5569 repo to a local branch
modified:
mysql-test/r/mysqld--help-notwin.result
mysql-test/r/mysqld--help-win.result
mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
sql/dynamic_ids.cc
sql/dynamic_ids.h
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def 2010-12-08 00:33:48 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def 2010-12-10 15:50:03 +0000
@@ -16,3 +16,4 @@ rpl_row_event_max_size : Bug#55675 20
rpl_delayed_slave : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
rpl_log_pos : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
rpl_trigger : BUG#58258 2010-11-17 VasilDimov Valgrind: possibly lost from ib_bh_create()
+rpl_parallel_conf_limits : wl#5599 9-12-2010 Andrei Waiting for the recovery wl
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-12-08 00:33:48 +0000
+++ b/sql/log_event.cc 2010-12-10 15:50:03 +0000
@@ -2416,15 +2416,23 @@ Slave_worker *Log_event::get_slave_worke
// B or a DDL
if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
{
+ ulong gaq_idx;
+ const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+
g.master_log_pos= log_pos;
g.group_master_log_pos= g.group_relay_log_pos= 0;
+ g.group_master_log_name= NULL; // todo: remove
g.group_relay_log_name= NULL;
g.worker_id= (ulong) -1;
- g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+ g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups;
+ g.checkpoint_log_name= NULL;
+ g.checkpoint_log_pos= 0;
+ g.checkpoint_seqno= (uint) -1;
// the last occupied GAQ's array index
- rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
-
+ gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
+
+ DBUG_ASSERT(gaq_idx != (ulong) -1);
DBUG_ASSERT(((Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
group_relay_log_name == NULL);
@@ -2490,6 +2498,10 @@ Slave_worker *Log_event::get_slave_worke
{
uint i;
mts_group_cnt= rli->gaq->assigned_group_index;
+ Slave_job_group *ptr_g=
+ (Slave_job_group *)
+ dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+
// TODO: throw an error when relay-log reading starts from inside of a group!!
@@ -2501,11 +2513,6 @@ Slave_worker *Log_event::get_slave_worke
Now group terminating event initiates the new name
delivery through the current group relaylog slot in GAQ.
*/
-
- Slave_job_group *ptr_g=
- (Slave_job_group *)
- dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
-
DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
ptr_g->group_relay_log_name= (char *)
@@ -2519,6 +2526,20 @@ Slave_worker *Log_event::get_slave_worke
worker->relay_log_change_notified= TRUE;
}
+ if (!worker->checkpoint_notified)
+ {
+ // Worker dealloc
+ ptr_g->checkpoint_log_name= (char *)
+ my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+ get_group_master_log_name()) + 1, MYF(MY_WME));
+ strcpy(ptr_g->checkpoint_log_name,
+ const_cast<Relay_log_info*>(rli)->get_group_master_log_name());
+ ptr_g->checkpoint_log_pos= const_cast<Relay_log_info*>(rli)->get_group_master_log_pos();
+ worker->checkpoint_notified= TRUE;
+ }
+ ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
+ const_cast<Relay_log_info*>(rli)->checkpoint_seqno++;
+
DBUG_ASSERT(worker == rli->last_assigned_worker);
if (!worker)
@@ -6641,7 +6662,11 @@ int Xid_log_event::do_apply_event(Relay_
}
else if (is_trans_repo && is_parallel)
{
- if ((error= w->commit_positions(this)))
+ ulong gaq_idx= mts_group_cnt;
+ Slave_job_group *ptr_g=
+ (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+
+ if ((error= w->commit_positions(this, ptr_g)))
goto err;
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-10 12:10:20 +0000
+++ b/sql/rpl_rli.cc 2010-12-10 16:25:27 +0000
@@ -89,52 +89,6 @@ Relay_log_info::Relay_log_info(bool is_s
mysql_cond_init(key_checkpoint_stop_cond, &checkpoint_stop_cond, NULL);
relay_log.init_pthread_objects();
-#if 0
-
- /*
- Parallel slave parameters initialization is done regardless
- whether the feature is or going to be active or not.
- */
- trans_jobs= stmt_jobs= pending_jobs= wait_jobs= 0;
- /*
- parallel slave parameter to pospone Crdr reading when the number
- of non-processed yet jobs becomes bigger than the limit's value
- MHS_todo: consider a memory-size based param
- */
- mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
-
- //
- // TODO -- ANDREI --- You need to take care of possible failures related to
- // allocation.
- //
- uint wi= 0;
- key_mutex_slave_parallel_worker= new PSI_mutex_key[slave_parallel_workers];
- key_cond_slave_parallel_worker= new PSI_cond_key[slave_parallel_workers];
- worker_mutexes= new PSI_mutex_info[slave_parallel_workers];
- worker_conds= new PSI_cond_info[slave_parallel_workers];
- for (wi= 0; wi < slave_parallel_workers; wi++)
- {
- worker_mutexes[wi].m_key= (PSI_mutex_key *) &(key_mutex_slave_parallel_worker[wi]);
- worker_mutexes[wi].m_name= "Slave_worker::jobs_lock";
- worker_mutexes[wi].m_flags= 0;
- worker_conds[wi].m_key= (PSI_cond_key *) &(key_cond_slave_parallel_worker[wi]);
- worker_conds[wi].m_name= "Slave_worker::jobs_cond";
- worker_conds[wi].m_flags= 0;
- }
- if (PSI_server)
- {
- PSI_server->register_mutex("worker", worker_mutexes,
- slave_parallel_workers);
- PSI_server->register_cond("worker", worker_conds,
- slave_parallel_workers);
- }
- mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
- MY_MUTEX_INIT_FAST);
- mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
- my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
-
-#endif
-
DBUG_VOID_RETURN;
}
@@ -200,16 +154,6 @@ Relay_log_info::~Relay_log_info()
mysql_cond_destroy(&log_space_cond);
relay_log.cleanup();
-#if 0
-
- mysql_mutex_destroy(&pending_jobs_lock);
- mysql_cond_destroy(&pending_jobs_cond);
-
- if (!this_worker)
- delete_dynamic(&workers);
-
-#endif
-
DBUG_VOID_RETURN;
}
@@ -234,6 +178,27 @@ void Relay_log_info::reset_notified_rela
}
/**
+ Method is called in mts_checkpoint_routine()
+ to marks each Worker as requiring to adapt to a new checkpoint interval
+ whose coordinates is passed to it through GAQ index.
+
+ Worker notices the new checkpoint value at the group commit
+ to reset the current bitmap and set ON a bit number put by C into GAQ index
+ as the first group committed after the new checkpoint.
+*/
+void Relay_log_info::reset_notified_checkpoint()
+{
+ if (!is_parallel_exec())
+ return;
+ for (uint i= 0; i < workers.elements; i++)
+ {
+ Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
+ w->checkpoint_notified= FALSE;
+ }
+ checkpoint_seqno= 0;
+}
+
+/**
The method can be run both by C having the Main (coord) rli context and
by W having both the main and the Local (worker) rli context.
Decision matrix:
@@ -1106,7 +1071,16 @@ void Relay_log_info::stmt_done(my_off_t
/* Alfranio needs to update the coordinator and workers. */
if ((w= get_current_worker()) == NULL)
+ {
flush_info(is_transactional() ? TRUE : FALSE);
+
+ /*
+ The central recovery commit run in sequential mode forces
+ notification on the defacto new checkpoint.
+ */
+ if (is_parallel_exec())
+ reset_notified_checkpoint();
+ }
}
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-12-10 12:10:20 +0000
+++ b/sql/rpl_rli.h 2010-12-10 16:25:27 +0000
@@ -470,6 +470,7 @@ public:
a new partition. Is updated at checkpoint commit to the main RLI.
*/
DYNAMIC_ARRAY least_occupied_workers;
+ uint checkpoint_seqno; // counter of groups executed after the most recent CP
/* most of allocation in the coordinator rli is there */
void init_workers(ulong);
@@ -489,9 +490,21 @@ public:
return ret;
}
+ /**
+ While a group is executed by a Worker the relay log can change.
+ Coordinator notifies Workers about this event. Worker is supposed
+ to commit to the recovery table with the new info.
+ */
void reset_notified_relay_log_change();
/**
+ While a group is executed by a Worker the relay log can change.
+ Coordinator notifies Workers about this event. Coordinator and Workers
+ maintain a bitmap of executed group that is reset with a new checkpoint.
+ */
+ void reset_notified_checkpoint();
+
+ /**
Helper function to do after statement completion.
This function is called from an event to complete the group by
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-08 13:59:07 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-10 15:50:03 +0000
@@ -20,10 +20,11 @@ const char *info_slave_worker_fields []=
Slave_worker::Slave_worker(const char* type, const char* pfs)
: Rpl_info_worker(type, pfs), group_relay_log_pos(0),
- group_master_log_pos(0)
+ group_master_log_pos(0), checkpoint_log_pos(0)
{
group_relay_log_name[0]= 0;
group_master_log_name[0]= 0;
+ checkpoint_log_name[0]= 0;
curr_group_exec_parts= new Database_ids(NAME_LEN);
}
@@ -157,12 +158,24 @@ size_t Slave_worker::get_number_worker_f
return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
}
-bool Slave_worker::commit_positions(Log_event *ev)
+bool Slave_worker::commit_positions(Log_event *ev, Slave_job_group* ptr_g)
{
DBUG_ENTER("Slave_worker::checkpoint_positions");
bool error= FALSE;
+ if (ptr_g->checkpoint_log_name != NULL)
+ {
+ strmake(checkpoint_log_name, ptr_g->checkpoint_log_name,
+ sizeof(checkpoint_log_name) - 1);
+ checkpoint_log_pos= ptr_g->checkpoint_log_pos;
+
+ my_free(ptr_g->checkpoint_log_name);
+ ptr_g->checkpoint_log_name= NULL;
+ }
+
+ // TODO: update the group bitmap ptr_g->checkpoint_seqno 'th bit
+
group_relay_log_pos= ev->future_event_relay_log_pos;
group_master_log_pos= ev->log_pos;
strmake(group_master_log_name, c_rli->get_group_master_log_name(),
@@ -502,16 +515,10 @@ void Slave_worker::slave_worker_ends_gro
strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
}
- // GAQ is updated with the checkpoint info
-
- // delete ptr_g->group_relay_log_name; // C allocated
- // ptr_g->group_relay_log_name= NULL; // mark freed
-
-
- // TODO: as it's same as ev->update_pos(w_rli) remove the latter.
-
if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
- commit_positions(ev);
+ {
+ commit_positions(ev, ptr_g);
+ }
ptr_g->group_master_log_pos= group_master_log_pos;
ptr_g->group_relay_log_pos= group_relay_log_pos;
@@ -536,7 +543,7 @@ void Slave_worker::slave_worker_ends_gro
my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
(uchar*) key + 1, key[0]);
- DBUG_ASSERT(entry && entry->usage != 0);
+ DBUG_ASSERT(entry && entry->usage != 0); // was used to break
DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
entry->usage--;
@@ -693,16 +700,18 @@ bool circular_buffer_queue::gt(ulong i,
/**
The queue is processed from the head item by item
to purge items representing committed groups.
- Progress of each Worker is monitored through @c last_done
- and @c last_group_done_index.
- It's compared first against the polled
- to break out of the loop at once if no progress.
+ Progress in GAQ is assessed through comparision of GAQ index value
+ with Worker's @c last_group_done_index.
+ Purging breaks at a first discovered gap, that is an item
+ that the assinged item->w_id'th Worker has not completed yet.
The caller is supposed to be the checkpoint handler.
A copy of the last discarded item containing
the refreshed value of the committed low-water-mark is stored
into @c lwm container member for further caller's processing.
+ @last_done is updated with the latests total_seqno for each Worker
+ that was met during GAQ parse.
@note dyn-allocated members of Slave_job_group such as
group_relay_log_name as freed here.
@@ -712,27 +721,22 @@ bool circular_buffer_queue::gt(ulong i,
ulong Slave_committed_queue::move_queue_head(DYNAMIC_ARRAY *ws)
{
ulong i, cnt= 0;
+
for (i= e; i != a && !empty();)
{
Slave_worker *w_i;
- Slave_job_group *ptr_g;
- ulong l;
+ Slave_job_group *ptr_g, g;
char grl_name[FN_REFLEN];
+ ulong ind;
grl_name[0]= 0;
ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
if (ptr_g->worker_id == (ulong) -1)
break; /* the head is not even assigned */
get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
- get_dynamic(&last_done, (uchar *) &l, w_i->id);
-
- DBUG_ASSERT(l <= s);
-
- if (l == w_i->last_group_done_index)
- break; /* no progress case */
-
- DBUG_ASSERT(w_i->last_group_done_index >= i ||
- (((i > a && e > a) || a == s) && (w_i->last_group_done_index < a)));
+
+ if (gt(i, w_i->last_group_done_index))
+ break; /* gap at i'th */
// memorize the last met group_relay_log_name
if (ptr_g->group_relay_log_name)
@@ -742,39 +746,39 @@ ulong Slave_committed_queue::move_queue_
ptr_g->group_relay_log_name= NULL; // mark freed
}
- if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
- {
- Slave_job_group g;
- ulong ind= de_queue((uchar*) &g);
+ ind= de_queue((uchar*) &g);
+
+ // stored the memorized name into result struct
+ if (grl_name[0] != 0)
+ strcpy(lwm.group_relay_log_name, grl_name);
+ else
+ lwm.group_relay_log_name[0]= 0;
+
+ DBUG_ASSERT(!ptr_g->group_relay_log_name);
- // stored the memorized name into result struct
- if (grl_name[0] != 0)
- strcpy(lwm.group_relay_log_name, grl_name);
- else
- lwm.group_relay_log_name[0]= 0;
-
- DBUG_ASSERT(!ptr_g->group_relay_log_name);
-
- g.group_relay_log_name= lwm.group_relay_log_name;
- lwm= g; // the result struct is done for the current iteration
-
- /* todo/fixme: the least occupied sorting out can be triggered here */
- /* e.g
- set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
- sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
-
- int ulong_cmp(ulong *id1, ulong *id2)
- {
- return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
- }
- */
- DBUG_ASSERT(ind == i);
- DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+ g.group_relay_log_name= lwm.group_relay_log_name;
+ lwm= g; // the result struct is done for the current iteration
- set_dynamic(&last_done, (uchar*) &i, w_i->id);
+ /* todo/fixme: the least occupied sorting out can be triggered here */
+ /* e.g
+ set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
+ sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
+ int ulong_cmp(ulong *id1, ulong *id2)
+ {
+ return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+ }
+ */
+ DBUG_ASSERT(ind == i);
+ DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+#ifndef DBUG_OFF
+ {
+ ulonglong l;
+ get_dynamic(&last_done, (uchar *) &l, w_i->id);
+ DBUG_ASSERT(l < ptr_g->total_seqno); // there must be some progress
}
- else
- break;
+#endif
+ set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
+
cnt++;
i= (i + 1) % s;
}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-10 12:10:20 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-10 16:25:27 +0000
@@ -95,7 +95,7 @@ public:
typedef struct st_slave_job_group
{
- char *group_master_log_name; // This is used upon recovery.
+ char *group_master_log_name; // (actually redundant)
Dynamic_ids *db_ids; // This is used upon recovery.
my_off_t master_log_pos; // B-event log_pos
@@ -114,6 +114,11 @@ typedef struct st_slave_job_group
char *group_relay_log_name; // The value is last seen relay-log
ulong worker_id;
ulonglong total_seqno;
+
+ /* checkpoint coord are reset by CP and rotate:s */
+ uint checkpoint_seqno;
+ my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
+ char* checkpoint_log_name;
} Slave_job_group;
#define get_job(from, to) \
@@ -155,9 +160,10 @@ public:
: circular_buffer_queue(el_size, max, inc)
{
uint k;
- my_init_dynamic_array(&last_done, sizeof(s), n, 0);
+ ulonglong l= 0;
+ my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
for (k= 0; k < n; k++)
- insert_dynamic(&last_done, (uchar*) &s); // empty for each Worker
+ insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker
lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
lwm.group_relay_log_name[0]= 0;
}
@@ -222,6 +228,7 @@ public:
volatile int curr_jobs; // the current assignments
ulong usage_partition; // number of different partitions handled by this worker
volatile bool relay_log_change_notified; // Coord sets and resets, W can read
+ volatile bool checkpoint_notified; // Coord sets and resets, W can read
bool wq_overrun_set; // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
/*
We need to make this a dynamic field. /Alfranio
@@ -231,6 +238,8 @@ public:
ulonglong group_relay_log_pos;
char group_master_log_name[FN_REFLEN];
ulonglong group_master_log_pos;
+ char checkpoint_log_name[FN_REFLEN];
+ ulonglong checkpoint_log_pos;
int init_info();
void end_info();
@@ -240,7 +249,7 @@ public:
void slave_worker_ends_group(Log_event*, int); // CGEP walk through to upd APH
- bool commit_positions(Log_event *evt);
+ bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
private:
bool read_info(Rpl_info_handler *from);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-10 12:10:20 +0000
+++ b/sql/rpl_slave.cc 2010-12-10 16:25:27 +0000
@@ -3751,6 +3751,8 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn
mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+ // Todo: add necessary stuff to clean up after Q-log-event, a Q trans
+ rli->cleanup_context(thd, error);
}
mysql_mutex_lock(&w->jobs_lock);
@@ -3995,7 +3997,7 @@ bool mts_checkpoint_routine(Relay_log_in
set_timespec_nsec(curr_clock, 0);
ulong diff= diff_timespec(curr_clock, rli->last_clock);
- if (diff < period)
+ if (diff < period && !rli->gaq->full())
{
/*
We do not need to execute the checkpoint now because
@@ -4004,9 +4006,15 @@ bool mts_checkpoint_routine(Relay_log_in
DBUG_RETURN(FALSE);
}
- if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
+ do
+ {
+ cnt= rli->gaq->move_queue_head(&rli->workers);
+ } while (cnt == 0 && rli->gaq->full() &&
+ (my_sleep(rli->mts_coordinator_basic_nap), 1));
+ if (cnt == 0)
goto end;
+
/* TODO:
to turn the least occupied selection in terms of jobs pieces
*/
@@ -4052,6 +4060,8 @@ bool mts_checkpoint_routine(Relay_log_in
error= rli->flush_info(TRUE);
// end of commit_positions
+ rli->reset_notified_checkpoint();
+
end:
set_timespec_nsec(rli->last_clock, 0);
@@ -4082,9 +4092,13 @@ int slave_start_single_worker(Relay_log_
Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(rli->get_number_info_rli_fields());
w->w_rli->set_rpl_info_handler(dummy_handler);
w->init_info();
+
+ // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
+ // entry->usage assert
+ w->curr_group_exec_parts->dynamic_ids.elements= 0;
w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
-
+ w->checkpoint_notified= FALSE;
w->w_rli->workers= rli->workers; // shallow copying is sufficient
w->w_rli->this_worker= w;
@@ -4094,7 +4108,7 @@ int slave_start_single_worker(Relay_log_
w->usage_partition= 0;
w->last_group_done_index= rli->gaq->s; // out of range
- w->jobs.s= rli->mts_slave_worker_queue_len_max + 1;
+ w->jobs.s= rli->mts_slave_worker_queue_len_max;
my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g n * 10;
for (k= 0; k < w->jobs.s; k++)
insert_dynamic(&w->jobs.Q, (uchar*) &empty);
@@ -4154,16 +4168,17 @@ int slave_start_workers(Relay_log_info *
// GAQ queue holds seqno:s of scheduled groups. C polls workers in
// @c lwm_checkpoint_period to update GAQ (see @c next_event())
- // The length of GAQ is derived from @c mts_slave_worker_queue_len_max to guarantee
- // each assigned job being sent to a WQ will be represented by an item in GAQ.
- // ::mts_slave_worker_queue_len_max is the worst case when all jobs contain
- // one event and map to one worker.
- rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
- sizeof(Slave_job_group),
- ::opt_mts_slave_worker_queue_len_max, n);
+ // The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max to guarantee
+ // each assigned job being sent to a WQ will find room in GAQ.
+ // opt_mts_slave_worker_queue_len_max * num-of-W:s is the max length case
+ // all jobs contain one event.
// size of WQ stays fixed in one slave session
rli->mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
+ rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
+ sizeof(Slave_job_group),
+ rli->slave_parallel_workers *
+ rli->mts_slave_worker_queue_len_max, n);
rli->mts_pending_jobs_size= 0;
rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
rli->mts_wqs_underrun_w_id= (ulong) -1;
@@ -4173,6 +4188,8 @@ int slave_start_workers(Relay_log_info *
rli->mts_total_groups= 0;
rli->curr_group_seen_begin= 0;
rli->run_query_in_parallel= opt_slave_run_query_in_parallel;
+ rli->checkpoint_seqno= 0;
+
for (i= 0; i < n; i++)
{
if ((error= slave_start_single_worker(rli, i)))
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-12-10 12:10:20 +0000
+++ b/sql/sys_vars.cc 2010-12-10 16:25:27 +0000
@@ -3177,7 +3177,7 @@ static Sys_var_ulong Sys_mts_coordinator
"Time in msec to sleep by MTS Coordinator to avoid the Worker queues "
"room overrun",
GLOBAL_VAR(opt_mts_coordinator_basic_nap), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
+ VALID_RANGE(0, ULONG_MAX), DEFAULT(5), BLOCK_SIZE(1));
static Sys_var_ulong Sys_mts_worker_underrun_level(
"opt_mts_worker_underrun_level",
"percent of Worker queue size at which Worker is considered to become "
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101210162527-3rrmlpigndb4fu6i.bundle
| Thread |
|---|
| • bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3240) | Andrei Elkin | 10 Dec |