#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped
3325 Andrei Elkin 2011-06-27
wl#5569 MTS
Cleanup and addressing sporadic rpl_temp_table_mix_row failure in
post-execution mtr.check_testcase().
The check of the test failure was caused by faulty optimization in
avoiding to migrate temporary tables from Coordinator to Workers in
case of rows-event assignement.
while it's correct with the homogenous rows-event only load, the mixture
can fail.
Fixed with removing the optimization so map_db_to_worker() always
relocates which is somewhat suboptimal and should be improved in future.
@ mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test
Adding slave synchronization.
@ sql/log_event.cc
cleanup to move circular_buffer releated definitions into rpl_rli_pdb that is specialized
on objects dealing with Worker, its assignement etc.
improving comments;
also instead of former separate flag indicating a T-event requires post-scheduling synchronization
with the Worker is turned into a bit of existing Log_event::flags which also avoids ungliness of
#if/#endif:s.
@ sql/log_event.h
instead of former separate flag indicating a T-event requires post-scheduling synchronization
with the Worker is turned into a bit of existing Log_event::flags;
@ sql/rpl_rli.cc
cleanup: renaming.
@ sql/rpl_rli.h
cleanup: renaming, more comments.
The former mts_wqs_overrun is converted into two: the statistics parameter mts_wq_overrun_cnt
and the internal control parameter mts_wq_excess.
@ sql/rpl_rli_pdb.cc
Included rpl_slave.h that holds two necessary declarations;
Cleanup: accepting circular_buffer related definitions migrated from log_event,
improved comments, renaming, removing dead code
@ sql/rpl_rli_pdb.h
Cleanup: renaming and more comments are added.
@ sql/rpl_slave.cc
Augmenting print-out of statistics at the end of MTS session;
cleanup: renaming.
@ sql/rpl_slave.h
Introducing two constants to define range of worker_id domain and
a magic value of undefined worker.
@ sql/sys_vars.cc
replacing a literal int value with a symbilic constant.
modified:
mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/rpl_slave.h
sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test'
--- a/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test 2010-12-19 17:15:12 +0000
+++ b/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test 2011-06-27 17:31:45 +0000
@@ -207,4 +207,7 @@ source include/show_binlog_events.inc;
--echo
connection master;
DROP TABLE t1;
+
+sync_slave_with_master;
+
--source include/rpl_end.inc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-06-27 12:12:52 +0000
+++ b/sql/log_event.cc 2011-06-27 17:31:45 +0000
@@ -672,9 +672,6 @@ Log_event::Log_event(enum_event_cache_ty
:temp_buf(0), exec_time(0), flags(0), event_cache_type(cache_type_arg),
event_logging_type(logging_type_arg), crc(0), thd(0),
checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
-#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- , m_mts_event_isolated_group(FALSE)
-#endif
{
server_id= ::server_id;
/*
@@ -697,9 +694,6 @@ Log_event::Log_event(const char* buf,
event_cache_type(EVENT_INVALID_CACHE),
event_logging_type(EVENT_INVALID_LOGGING),
crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
-#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
- , m_mts_event_isolated_group(FALSE)
-#endif
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -2368,12 +2362,8 @@ bool Log_event::contains_partition_info(
}
/**
- General hashing function to compute the id of an applier for
- the current event.
- At computing the id few rules apply depending on partitioning properties
- that the event instance can feature.
-
- Let's call the properties through the following legends:
+ The method maps the event to a Worker and return a pointer to it.
+ As a part of the group, an event belongs to one of the following types:
B - beginning of a group of events (BEGIN query_log_event)
g - mini-group representative event containing the partition info
@@ -2382,30 +2372,33 @@ bool Log_event::contains_partition_info(
(int_, rand_, user_ var:s)
r - a mini-group internal "regular" event that follows its g-parent
(Delete, Update, Write -rows)
- S - sequentially applied event (may not be a part of any group).
- Events of this type are determined via @c mts_sequential_exec()
- earlier and don't cause calling this method .
- T - terminator of the group (XID, COMMIT, ROLLBACK)
-
- Only `g' case really computes the assigned Worker id which must
- be memorized by the caller and is available through @c rli argument.
- For instance DUW-rows events are mapped to a Worker previously chosen
- at assigning of their Table-map parent g-event.
- In `B' case the assigned Worker is NULL to indicate the Coordinator will
- postpone scheduling until a following `g' event decides on a Worker.
+ T - terminator of the group (XID, COMMIT, ROLLBACK, auto-commit query)
+
+ Only the first g-type event computes the assigned Worker which once
+ is determined remains to be for the rest of the group.
+ That is the g-type event solely carries partitioning info.
+ For B-type the assigned Worker is NULL to indicate Coordinator
+ has not yet decided. The same applies to p-type.
- A group can consist of multiple events still without explict B
- event. This is a case of old master binlog or few corner-cases of
- the current master version (todo: to fix). Such group structure is
- supposed to be {{p_i},g} that is it ends with the first not p-event.
- Such g-event is marked with set_mts_event_ends_group().
+ Notice, these is a special group consisting of optionally multiple p-events
+ terminating with a g-event.
+ Such case is caused by old master binlog and a few corner-cases of
+ the current master version (todo: to fix).
+
+ In case of the event accesses more than OVER_MAX_DBS the method
+ has to ensure sure previously assigned groups to all other workers are
+ done.
- @note The function can update APH, CGAP, GAQ objects.
+
+ @note The function can update APH (through map_db_to_worker()), GAQ objects
+ and relocate some temporary tables from Coordinator's list into
+ involved entries of APH.
+ There's few memory allocations commented where to be freed.
- @return a pointer to the Worker stuct or NULL.
+ @return a pointer to the Worker struct or NULL.
*/
-Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli)
+Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli)
{
Slave_job_group g, *ptr_g;
bool is_b_event;
@@ -2424,7 +2417,7 @@ Slave_worker *Log_event::get_slave_worke
(rli->gaq->empty() ||
((Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
- worker_id != (ulong) -1)))
+ worker_id != MTS_WORKER_UNDEF)))
{
ulong gaq_idx;
rli->mts_total_groups++;
@@ -2434,7 +2427,7 @@ Slave_worker *Log_event::get_slave_worke
g.group_master_log_pos= g.group_relay_log_pos= 0;
g.group_master_log_name= NULL; // todo: remove
g.group_relay_log_name= NULL;
- g.worker_id= (ulong) -1;
+ g.worker_id= MTS_WORKER_UNDEF;
g.total_seqno= rli->mts_total_groups;
g.checkpoint_log_name= NULL;
g.checkpoint_log_pos= 0;
@@ -2446,11 +2439,11 @@ Slave_worker *Log_event::get_slave_worke
// the last occupied GAQ's array index
gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
- DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s);
+ DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < rli->gaq->size);
DBUG_ASSERT(((Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
group_relay_log_name == NULL);
- DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
+ DBUG_ASSERT(rli->gaq->assigned_group_index != MTS_WORKER_UNDEF); // gaq must have room
DBUG_ASSERT(rli->last_assigned_worker == NULL);
if (is_b_event)
@@ -2485,7 +2478,9 @@ Slave_worker *Log_event::get_slave_worke
if (!ret_worker)
ret_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
(void) wait_for_workers_to_finish(rli, ret_worker);
-
+ /*
+ this marking is transferred further into T-event of the current group.
+ */
rli->curr_group_isolated= TRUE;
}
@@ -2494,10 +2489,14 @@ Slave_worker *Log_event::get_slave_worke
char **ref_cur_db= it.ref();
if (!(ret_worker=
- get_slave_worker(*ref_cur_db, rli,
+ map_db_to_worker(*ref_cur_db, rli,
&mts_assigned_partitions[i],
- // only rows-events do not need temporary tables
- get_type_code() != TABLE_MAP_EVENT, ret_worker)))
+ /*
+ todo: optimize it. Although pure
+ rows- event load in insensetive to the flag value
+ */
+ TRUE,
+ ret_worker)))
{
llstr(rli->get_event_relay_log_pos(), llbuff);
rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
@@ -2518,7 +2517,7 @@ Slave_worker *Log_event::get_slave_worke
if ((ptr_g= ((Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q,
rli->gaq->assigned_group_index)))->worker_id
- == (ulong) -1)
+ == MTS_WORKER_UNDEF)
{
ptr_g->worker_id= ret_worker->id;
@@ -2582,20 +2581,14 @@ Slave_worker *Log_event::get_slave_worke
}
}
- // the group terminal event:
- // Commit, Xid, a DDL query or dml query of B-less group.
+ // T-event: Commit, Xid, a DDL query or dml query of B-less group.
if (ends_group() || !rli->curr_group_seen_begin)
{
// index of GAQ that this terminal event belongs to
mts_group_cnt= rli->gaq->assigned_group_index;
- /*
- special marking for T event of a group containing over-max db:s event
- including {p,g} B-less group.
- */
+ rli->mts_group_status= Relay_log_info::MTS_END_GROUP;
if (rli->curr_group_isolated)
mts_do_isolate_group();
- rli->mts_group_status= Relay_log_info::MTS_END_GROUP;
-
ptr_g= (Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
@@ -2650,7 +2643,7 @@ Slave_worker *Log_event::get_slave_worke
ret_worker->checkpoint_notified= TRUE;
}
ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
- ptr_g->ts= when + (time_t) exec_time;
+ ptr_g->ts= when + (time_t) exec_time; // Seconds_behind_master related
rli->checkpoint_seqno++;
// reclaiming resources allocated during the group scheduling
@@ -2660,179 +2653,6 @@ Slave_worker *Log_event::get_slave_worke
return ret_worker;
}
-// returns the next available! (TODO: incompatible to circurla_buff method!!!)
-static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
-{
- if (jobs->a == jobs->s)
- {
- DBUG_ASSERT(jobs->a == jobs->Q.elements);
- return -1;
- }
-
- // store
-
- set_dynamic(&jobs->Q, (uchar*) item, jobs->a);
-
- // pre-boundary cond
- if (jobs->e == jobs->s)
- jobs->e= jobs->a;
-
- jobs->a= (jobs->a + 1) % jobs->s;
- jobs->len++;
-
- // post-boundary cond
- if (jobs->a == jobs->e)
- jobs->a= jobs->s;
- DBUG_ASSERT(jobs->a == jobs->e ||
- jobs->len == (jobs->a >= jobs->e) ?
- (jobs->a - jobs->e) : (jobs->s + jobs->a - jobs->e));
- return jobs->a;
-}
-
-/**
- return the value of @c data member of the head of the queue.
-*/
-static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
-{
- if (jobs->e == jobs->s)
- {
- DBUG_ASSERT(jobs->len == 0);
- ret->data= NULL; // todo: move to caller
- return NULL;
- }
- get_dynamic(&jobs->Q, (uchar*) ret, jobs->e);
-
- DBUG_ASSERT(ret->data); // todo: move to caller
-
- return ret;
-}
-
-
-/**
- return a job item through a struct which point is supplied via argument.
-*/
-Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
-{
- if (jobs->e == jobs->s)
- {
- DBUG_ASSERT(jobs->len == 0);
- return NULL;
- }
- get_dynamic(&jobs->Q, (uchar*) ret, jobs->e);
- jobs->len--;
-
- // pre boundary cond
- if (jobs->a == jobs->s)
- jobs->a= jobs->e;
- jobs->e= (jobs->e + 1) % jobs->s;
-
- // post boundary cond
- if (jobs->a == jobs->e)
- jobs->e= jobs->s;
-
- DBUG_ASSERT(jobs->e == jobs->s ||
- (jobs->len == (jobs->a >= jobs->e)? (jobs->a - jobs->e) :
- (jobs->s + jobs->a - jobs->e)));
-
- return ret;
-}
-
-void append_item_to_jobs(slave_job_item *job_item,
- Slave_worker *w, Relay_log_info *rli)
-{
- THD *thd= rli->info_thd;
- int ret;
- ulong ev_size= ((Log_event*) (job_item->data))->data_written;
- ulonglong new_pend_size;
-
- DBUG_ASSERT(thd == current_thd);
- thd_proc_info(thd, "Feeding an event to a worker thread");
-
- mysql_mutex_lock(&rli->pending_jobs_lock);
- new_pend_size= rli->mts_pending_jobs_size + ev_size;
- // C waits basing on *data* sizes in the queues
- while (new_pend_size > rli->mts_pending_jobs_size_max)
- {
- const char *old_msg;
- const char info_format[]=
- "Waiting for Slave Workers to free pending events, requested size %lu";
- char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)];
-
- sprintf(wait_info, info_format, new_pend_size);
- rli->mts_wqs_oversize= TRUE;
- rli->wait_jobs++; // waiting due to the total size
- old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
- wait_info);
- mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
- thd->exit_cond(old_msg);
- if (thd->killed)
- return;
-
- mysql_mutex_lock(&rli->pending_jobs_lock);
-
- new_pend_size= rli->mts_pending_jobs_size + ev_size;
- }
- rli->pending_jobs++;
- rli->mts_pending_jobs_size= new_pend_size;
- rli->stmt_jobs++;
-
- mysql_mutex_unlock(&rli->pending_jobs_lock);
-
- // sleep while all queue lengths are gt Underrun
- // sleep time lasts the longer the further WQ:s shift to Overrun
- // Workers report their U,O status
-
- if (rli->mts_wqs_underrun_w_id != (ulong) -1)
- {
- // todo: experiment with weight to get a good approximation formula
- ulong nap_weight= rli->mts_wqs_overrun + 1;
- my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
- rli->mts_wqs_underrun_cnt++;
- }
-
- ret= -1;
-
- mysql_mutex_lock(&w->jobs_lock);
-
- // possible WQ overfill
- while (w->running_status == Slave_worker::RUNNING && !thd->killed &&
- (ret= en_queue(&w->jobs, job_item)) == -1)
- {
- const char *old_msg;
- const char info_format[]=
- "Waiting for Slave Worker %d queue: max len %lu, actual len %lu";
- char wait_info[sizeof(info_format) + 4*sizeof(w->id) +
- 4*sizeof(w->jobs.s) + 4*sizeof(w->jobs.len)];
-
- sprintf(wait_info, info_format, w->id, w->jobs.s, w->jobs.len);
- old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, wait_info);
- w->jobs.overfill= TRUE;
- w->jobs.waited_overfill++;
- rli->mts_wqs_overfill_cnt++;
- mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
- thd->exit_cond(old_msg);
-
- mysql_mutex_lock(&w->jobs_lock);
- }
- if (ret != -1)
- {
- w->curr_jobs++;
- if (w->jobs.len == 1)
- mysql_cond_signal(&w->jobs_cond);
-
- mysql_mutex_unlock(&w->jobs_lock);
- }
- else
- {
- mysql_mutex_unlock(&w->jobs_lock);
-
- mysql_mutex_lock(&rli->pending_jobs_lock);
- rli->pending_jobs--; // roll back of the prev incr
- rli->mts_pending_jobs_size -= ev_size;
- mysql_mutex_unlock(&rli->pending_jobs_lock);
- }
-}
-
/**
Scheduling event to execute in parallel or execute it directly.
In MTS case the event gets associated with either Coordinator or a
@@ -2942,7 +2762,7 @@ int Log_event::apply_event(Relay_log_inf
c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
worker= (Relay_log_info*)
- (c_rli->last_assigned_worker= get_slave_worker_id(c_rli));
+ (c_rli->last_assigned_worker= get_slave_worker(c_rli));
#ifndef DBUG_OFF
if (c_rli->last_assigned_worker)
@@ -2971,231 +2791,6 @@ err:
0 : -1);
}
-
-/**
- Worker's routine to wait for a new assignement in its
- private queue.
-
- @return NULL failure or
- a-pointer to an item.
-*/
-struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item)
-{
- THD *thd= w->info_thd;
-
- mysql_mutex_lock(&w->jobs_lock);
-
- while (!job_item->data && !thd->killed &&
- w->running_status == Slave_worker::RUNNING)
- {
- const char *old_msg;
-
- head_queue(&w->jobs, job_item);
- if (job_item->data == NULL)
- {
- w->wait_jobs++;
- old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
- "Waiting for an event from sql thread");
- mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
- thd->exit_cond(old_msg);
- mysql_mutex_lock(&w->jobs_lock);
- }
- }
- if (job_item->data)
- w->curr_jobs--;
-
- mysql_mutex_unlock(&w->jobs_lock);
-
- thd_proc_info(w->info_thd, "Executing event");
- return job_item;
-}
-
-
-/**
- MTS worker main routine.
- The worker thread waits for an event, execute it, fixes statistics counters.
-
- @note the function maintains CGEP and modifies APH, and causes
- modification of GAQ.
-
- @return 0 success
- -1 got killed or an error happened during appying
-*/
-int slave_worker_exec_job(Slave_worker *w, Relay_log_info *rli)
-{
- int error= 0;
- struct slave_job_item item= {NULL}, *job_item= &item;
- THD *thd= w->info_thd;
- Log_event *ev= NULL;
- bool part_event= FALSE;
-
- DBUG_ENTER("slave_worker_exec_job");
-
- job_item= pop_jobs_item(w, job_item);
- if (thd->killed || w->running_status != Slave_worker::RUNNING)
- {
- // de-queueing and decrement counters is in the caller's exit branch
- error= -1;
- goto err;
- }
- ev= static_cast<Log_event*>(job_item->data);
- thd->server_id = ev->server_id;
- thd->set_time();
- thd->lex->current_select= 0;
- if (!ev->when)
- ev->when= my_time(0);
- ev->thd= thd; // todo: assert because up to this point, ev->thd == 0
-
- DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", w->id, job_item, ev, thd));
-
- if (ev->starts_group())
- {
- w->curr_group_seen_begin= TRUE; // The current group is started with B-event
- }
- else
- {
- if ((part_event= ev->contains_partition_info()))
- {
- uint num_dbs= ev->mts_number_dbs();
- DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
-
- if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
- num_dbs= 1;
-
- DBUG_ASSERT(num_dbs > 0);
-
- for (uint k= 0; k < num_dbs; k++)
- {
- bool found= FALSE;
-
- for (uint i= 0; i < ep->elements && !found; i++)
- {
- found=
- *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
- ev->mts_assigned_partitions[k];
- }
- if (!found)
- {
- /*
- notice, can't assert
- DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == w);
- since entry could be marked as wanted by other worker.
- */
- insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
- }
- }
- }
- }
- w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
- error= ev->do_apply_event_worker(w);
- if (ev->ends_group() || (!w->curr_group_seen_begin &&
- /*
- p-events of B/T-less {p,g} group (see
- legends of Log_event::get_slave_worker)
- obviously can't commit.
- */
- part_event))
- {
- DBUG_PRINT("slave_worker_exec_job:",
- (" commits GAQ index %lu, last committed %lu",
- ev->mts_group_cnt, w->last_group_done_index));
- w->slave_worker_ends_group(ev, error); /* last done sets post exec */
-
-#ifndef DBUG_OFF
- w->processed_group++;
- DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group "
- "%u processed %u debug %d\n", w->id, mts_checkpoint_group,
- w->processed_group,
- DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
- if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
- mts_checkpoint_group == w->processed_group)
- {
- DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", w->id));
- while (true) my_sleep(6000000);
- }
-#endif
- }
-
- mysql_mutex_lock(&w->jobs_lock);
- de_queue(&w->jobs, job_item);
-
- /* possible overfill */
- if (w->jobs.len == w->jobs.s - 1 && w->jobs.overfill == TRUE)
- {
- w->jobs.overfill= FALSE;
- // todo: w->hungry_cnt++;
- mysql_cond_signal(&w->jobs_cond);
- }
- mysql_mutex_unlock(&w->jobs_lock);
-
- /* statistics */
-
- mysql_mutex_lock(&rli->pending_jobs_lock);
- rli->pending_jobs--;
- rli->mts_pending_jobs_size -= ev->data_written;
- DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
-
- // underrun
- if ((rli->mts_worker_underrun_level * w->jobs.s) / 100 > w->jobs.len)
- {
- rli-> mts_wqs_underrun_w_id= w->id;
- // todo: w->underrun_cnt++;
- } else if (rli->mts_wqs_underrun_w_id == w->id)
- {
- rli->mts_wqs_underrun_w_id= (ulong) -1;
- }
-
- // overrun exploits the underrun level param
- if (((100 - rli->mts_worker_underrun_level) * w->jobs.s) / 100 < w->jobs.len)
- {
- rli->mts_wqs_overrun++;
- w->wq_overrun_set= TRUE;
- // todo: w->underrun_cnt++;
- }
- else if (w->wq_overrun_set == TRUE)
- {
- rli->mts_wqs_overrun--;
- w->wq_overrun_set= FALSE;
- }
-
- DBUG_ASSERT(rli->mts_wqs_overrun >= 0);
-
- /* coordinator can be waiting */
- if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
- rli->mts_wqs_oversize) // TODO: unit/general test wqs_oversize
- {
- rli->mts_wqs_oversize= FALSE;
- mysql_cond_signal(&rli->pending_jobs_cond);
- }
-
- mysql_mutex_unlock(&rli->pending_jobs_lock);
-
- w->stmt_jobs++;
-
-err:
- if (error)
- {
- sql_print_information("Worker %lu is exiting: killed %i, error %i, "
- "running_status %d",
- w->id, thd->killed, thd->is_error(),
- w->running_status);
- w->slave_worker_ends_group(ev, error);
- }
-
- // rows_query_log_event is deleted as a part of the statement cleanup
-
- // todo: sync_slave_with_master fails when my_sleep(1000) is put here
-
- if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
- {
- w->last_event= ev;
- delete ev;
- }
-
-
- DBUG_RETURN(error);
-}
-
#endif
/**************************************************************************
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-06-14 09:27:38 +0000
+++ b/sql/log_event.h 2011-06-27 17:31:45 +0000
@@ -536,6 +536,15 @@ struct sql_ex_info
*/
#define LOG_EVENT_NO_FILTER_F 0x100
+/**
+ MTS: group of events can be marked to force its execution
+ in isolation from any other Workers.
+ Typically that is done for a transaction that contains
+ a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s.
+ The flag is set ON for an event that terminates its group.
+*/
+#define LOG_EVENT_MTS_ISOLATE_F 0x200
+
/**
@def OPTIONS_WRITTEN_TO_BIN_LOG
@@ -1330,7 +1339,7 @@ public:
to be assigned worker;
M is the max index of the worker pool.
*/
- Slave_worker *get_slave_worker_id(Relay_log_info *rli);
+ Slave_worker *get_slave_worker(Relay_log_info *rli);
/*
The method returns a list of updated by the event databases.
@@ -1361,12 +1370,14 @@ public:
get_type_code() == QUERY_EVENT ||
get_type_code() == EXEC_LOAD_EVENT ||
get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
- m_mts_event_isolated_group= TRUE;
+ flags |= LOG_EVENT_MTS_ISOLATE_F;
}
+
/*
- Verifying whether event is marked to execute in isolation.
+ Verifying whether the terminal event of a group is marked to
+ execute in isolation.
*/
- virtual bool mts_is_group_isolated() { return m_mts_event_isolated_group; }
+ bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; }
/**
Apply the event to the database.
@@ -1501,8 +1512,6 @@ protected:
non-zero. The caller shall decrease the counter by one.
*/
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
-
- bool m_mts_event_isolated_group;
#endif
};
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-06-22 17:54:23 +0000
+++ b/sql/rpl_rli.cc 2011-06-27 17:31:45 +0000
@@ -112,8 +112,8 @@ void Relay_log_info::init_workers(ulong
Parallel slave parameters initialization is done regardless
whether the feature is or going to be active or not.
*/
- trans_jobs= stmt_jobs= pending_jobs= wait_jobs= 0;
- mts_wqs_underrun_cnt= mts_wqs_overfill_cnt= 0;
+ trans_jobs= stmt_jobs= pending_jobs= wq_size_waits= 0;
+ mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
my_atomic_rwlock_init(&slave_open_temp_tables_lock);
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-06-24 12:38:19 +0000
+++ b/sql/rpl_rli.h 2011-06-27 17:31:45 +0000
@@ -432,34 +432,42 @@ public:
time_t last_event_start_time;
/*
- WL#5569 MTS-II
+ WL#5569 MTS
+
+ legends:
+ C - Coordinator;
+ W - Worker;
+ WQ - Worker Query containing event assignments
*/
DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
volatile ulong pending_jobs;
- ulong trans_jobs, wait_jobs, stmt_jobs; // wait_jobs - waiting times due to the total size
+ ulong stmt_jobs; // statistics: number of events (statements) processed
+ ulong trans_jobs; // statistics: number of groups (transactions) processed
+ ulong wq_size_waits; // number of times C goes to sleep due to WQ:s oversize
mysql_mutex_t pending_jobs_lock;
mysql_cond_t pending_jobs_cond;
ulong mts_slave_worker_queue_len_max;
ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s
ulonglong mts_pending_jobs_size_max; // the max forcing to wait by C
- bool mts_wqs_oversize; // C raises flag to wait some memory's released
+ bool mts_wq_oversize; // C raises flag to wait some memory's released
Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
Slave_committed_queue *gaq;
DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events
bool curr_group_seen_begin; // current group started with B-event or not
bool curr_group_isolated; // current group requires execution in isolation
- volatile ulong mts_wqs_underrun_w_id; // Id of a Worker whose queue is getting empty
- volatile long mts_wqs_overrun; // W to incr and decr
- ulong mts_wqs_underrun_cnt; // Coord goes to sleep when senses Workers are content
- ulong mts_wqs_overfill_cnt; // Coord waits if a W's queue is full
- long mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
+ volatile ulong mts_wq_underrun_w_id; // Id of a Worker whose queue is getting empty
+ volatile long mts_wq_excess; // excessive overrun counter; when W increments and decrements it it also marks updates its own wq_overrun_set
+ volatile ulong mts_wq_overrun_cnt; // statistics of all mts_wq_excess increments
+ ulong mts_wq_no_underrun_cnt; // counts times of C goes to sleep when W:s are filled
+ ulong mts_wq_overfill_cnt; // counts C waits when a W's queue is full
+ long mts_worker_underrun_level; // percent of WQ size at which W is considered hungry
ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
ulonglong mts_total_groups; // total event groups distributed in current session
ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
ulong slave_parallel_workers; // the one slave session time number of workers
- ulong recovery_parallel_workers; // number of workers while recovering.
+ ulong recovery_parallel_workers; // number of workers while recovering
/*
A sorted array of Worker current assignements number to provide
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2011-06-25 14:14:24 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-06-27 17:31:45 +0000
@@ -2,6 +2,7 @@
#include "sql_priv.h"
#include "unireg.h"
#include "rpl_rli_pdb.h"
+#include "rpl_slave.h"
#include "sql_string.h"
#include <hash.h>
@@ -252,6 +253,8 @@ bool Slave_worker::commit_positions(Log_
// extract an updated relay-log name to store in Worker's rli.
if (ptr_g->group_relay_log_name)
{
+ DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
+ <= sizeof(group_relay_log_name));
strmake(group_relay_log_name, ptr_g->group_relay_log_name,
sizeof(group_relay_log_name) - 1);
}
@@ -515,7 +518,7 @@ static void move_temp_tables_to_entry(TH
@return the pointer to a Worker struct
*/
-Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
+Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
db_worker_hash_entry **ptr_entry,
bool need_temp_tables, Slave_worker *last_worker)
{
@@ -753,17 +756,16 @@ Slave_worker *get_least_occupied_worker(
}
/**
- Deallocative routine that makes few things in opposite to
- @c get_slave_worker().
-
- Affected by the being committed group APH tuples are updated.
- @c last_group_done_index member is set to the arg value.
-
+ Deallocation routine to cancel out few effects of
+ @c map_db_to_worker().
+ Involved into processing of the group APH tuples are updated.
+ @c last_group_done_index member is set to the GAQ index of
+ the current group.
CGEP the Worker partition cache is cleaned up.
- TODO: reclaim space if the actual size exceeds the limit.
+ @param ev a pointer to Log_event
+ @param error error code after processing the event by caller.
*/
-
void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
{
DBUG_ENTER("Slave_worker::slave_worker_ends_group");
@@ -775,20 +777,10 @@ void Slave_worker::slave_worker_ends_gro
(Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx);
// first ever group must have relay log name
- DBUG_ASSERT(last_group_done_index != c_rli->gaq->s ||
+ DBUG_ASSERT(last_group_done_index != c_rli->gaq->size ||
ptr_g->group_relay_log_name != NULL);
DBUG_ASSERT(ptr_g->worker_id == id);
- if (ptr_g->group_relay_log_name != NULL)
- {
- // memorizing a new relay-log file name
-
- DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
- <= sizeof(group_relay_log_name));
-
- strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
- }
-
if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
{
commit_positions(ev, ptr_g);
@@ -799,6 +791,7 @@ void Slave_worker::slave_worker_ends_gro
ptr_g->group_master_log_pos= group_master_log_pos;
ptr_g->group_relay_log_pos= group_relay_log_pos;
+
ptr_g->done= 1; // GAQ index is available to C now
last_group_done_index= gaq_idx;
@@ -887,29 +880,29 @@ void Slave_worker::slave_worker_ends_gro
ulong circular_buffer_queue::de_queue(uchar *val)
{
ulong ret;
- if (e == s)
+ if (enter == size)
{
DBUG_ASSERT(len == 0);
return (ulong) -1;
}
- ret= e;
- get_dynamic(&Q, val, e);
+ ret= enter;
+ get_dynamic(&Q, val, enter);
len--;
// pre boundary cond
- if (a == s)
- a= e;
- e= (e + 1) % s;
+ if (avail == size)
+ avail= enter;
+ enter= (enter + 1) % size;
// post boundary cond
- if (a == e)
- e= s;
+ if (avail == enter)
+ enter= size;
- DBUG_ASSERT(e == s ||
- (len == (a >= e)? (a - e) :
- (s + a - e)));
- DBUG_ASSERT(a != e);
+ DBUG_ASSERT(enter == size ||
+ (len == (avail >= enter)? (avail - enter) :
+ (size + avail - enter)));
+ DBUG_ASSERT(avail != enter);
return ret;
}
@@ -919,26 +912,26 @@ ulong circular_buffer_queue::de_queue(uc
*/
ulong circular_buffer_queue::de_tail(uchar *val)
{
- if (e == s)
+ if (enter == size)
{
DBUG_ASSERT(len == 0);
return (ulong) -1;
}
- a= (e + len - 1) % s;
- get_dynamic(&Q, val, a);
+ avail= (enter + len - 1) % size;
+ get_dynamic(&Q, val, avail);
len--;
// post boundary cond
- if (a == e)
- e= s;
+ if (avail == enter)
+ enter= size;
- DBUG_ASSERT(e == s ||
- (len == (a >= e)? (a - e) :
- (s + a - e)));
- DBUG_ASSERT(a != e);
+ DBUG_ASSERT(enter == size ||
+ (len == (avail >= enter)? (avail - enter) :
+ (size + avail - enter)));
+ DBUG_ASSERT(avail != enter);
- return a;
+ return avail;
}
/**
@return the used index at success or -1 when queue is full
@@ -946,33 +939,33 @@ ulong circular_buffer_queue::de_tail(uch
ulong circular_buffer_queue::en_queue(void *item)
{
ulong ret;
- if (a == s)
+ if (avail == size)
{
- DBUG_ASSERT(a == Q.elements);
+ DBUG_ASSERT(avail == Q.elements);
return (ulong) -1;
}
// store
- ret= a;
- set_dynamic(&Q, (uchar*) item, a);
+ ret= avail;
+ set_dynamic(&Q, (uchar*) item, avail);
// pre-boundary cond
- if (e == s)
- e= a;
+ if (enter == size)
+ enter= avail;
- a= (a + 1) % s;
+ avail= (avail + 1) % size;
len++;
// post-boundary cond
- if (a == e)
- a= s;
+ if (avail == enter)
+ avail= size;
- DBUG_ASSERT(a == e ||
- len == (a >= e) ?
- (a - e) : (s + a - e));
- DBUG_ASSERT(a != e);
+ DBUG_ASSERT(avail == enter ||
+ len == (avail >= enter) ?
+ (avail - enter) : (size + avail - enter));
+ DBUG_ASSERT(avail != enter);
return ret;
}
@@ -980,13 +973,13 @@ ulong circular_buffer_queue::en_queue(vo
void* circular_buffer_queue::head_queue()
{
uchar *ret= NULL;
- if (e == s)
+ if (enter == size)
{
DBUG_ASSERT(len == 0);
}
else
{
- get_dynamic(&Q, (uchar*) ret, e);
+ get_dynamic(&Q, (uchar*) ret, enter);
}
return (void*) ret;
}
@@ -1004,16 +997,16 @@ void* circular_buffer_queue::head_queue(
*/
bool circular_buffer_queue::gt(ulong i, ulong k)
{
- DBUG_ASSERT(i < s && k < s);
- DBUG_ASSERT(a != e);
+ DBUG_ASSERT(i < size && k < size);
+ DBUG_ASSERT(avail != enter);
- if (i >= e)
- if (k >= e)
+ if (i >= enter)
+ if (k >= enter)
return i > k;
else
return FALSE;
else
- if (k >= e)
+ if (k >= enter)
return TRUE;
else
return i > k;
@@ -1024,7 +1017,7 @@ bool Slave_committed_queue::count_done(R
{
ulong i, cnt= 0;
- for (i= e; i != a && !empty(); i= (i + 1) % s)
+ for (i= enter; i != avail && !empty(); i= (i + 1) % size)
{
Slave_job_group *ptr_g;
@@ -1068,7 +1061,7 @@ ulong Slave_committed_queue::move_queue_
{
ulong i, cnt= 0;
- for (i= e; i != a && !empty();)
+ for (i= enter; i != avail && !empty();)
{
Slave_worker *w_i;
Slave_job_group *ptr_g, g;
@@ -1087,8 +1080,12 @@ ulong Slave_committed_queue::move_queue_
The current job has not been processed or it was not
even assigned, this means there is a gap.
*/
- if (ptr_g->worker_id == (ulong) -1 || !ptr_g->done)
+ if (ptr_g->worker_id == MTS_WORKER_UNDEF || !ptr_g->done)
break; /* gap at i'th */
+
+ /* Worker-id domain guard */
+ compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS);
+
get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
/*
@@ -1142,7 +1139,7 @@ ulong Slave_committed_queue::move_queue_
set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
cnt++;
- i= (i + 1) % s;
+ i= (i + 1) % size;
}
return cnt;
@@ -1156,7 +1153,7 @@ ulong Slave_committed_queue::move_queue_
void Slave_committed_queue::free_dynamic_items()
{
ulong i;
- for (i= e; i != a && !empty(); i= (i + 1) % s)
+ for (i= enter; i != avail && !empty(); i= (i + 1) % size)
{
Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
if (ptr_g->group_relay_log_name)
@@ -1274,3 +1271,421 @@ int wait_for_workers_to_finish(Relay_log
DBUG_RETURN(ret);
}
+
+
+// returns the next available! (TODO: incompatible to circurla_buff method!!!)
+static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
+{
+ if (jobs->avail == jobs->size)
+ {
+ DBUG_ASSERT(jobs->avail == jobs->Q.elements);
+ return -1;
+ }
+
+ // store
+
+ set_dynamic(&jobs->Q, (uchar*) item, jobs->avail);
+
+ // pre-boundary cond
+ if (jobs->enter == jobs->size)
+ jobs->enter= jobs->avail;
+
+ jobs->avail= (jobs->avail + 1) % jobs->size;
+ jobs->len++;
+
+ // post-boundary cond
+ if (jobs->avail == jobs->enter)
+ jobs->avail= jobs->size;
+ DBUG_ASSERT(jobs->avail == jobs->enter ||
+ jobs->len == (jobs->avail >= jobs->enter) ?
+ (jobs->avail - jobs->enter) : (jobs->size + jobs->avail - jobs->enter));
+ return jobs->avail;
+}
+
+/**
+ return the value of @c data member of the head of the queue.
+*/
+static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
+{
+ if (jobs->enter == jobs->size)
+ {
+ DBUG_ASSERT(jobs->len == 0);
+ ret->data= NULL; // todo: move to caller
+ return NULL;
+ }
+ get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+
+ DBUG_ASSERT(ret->data); // todo: move to caller
+
+ return ret;
+}
+
+
+/**
+ return a job item through a struct which point is supplied via argument.
+*/
+Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
+{
+ if (jobs->enter == jobs->size)
+ {
+ DBUG_ASSERT(jobs->len == 0);
+ return NULL;
+ }
+ get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+ jobs->len--;
+
+ // pre boundary cond
+ if (jobs->avail == jobs->size)
+ jobs->avail= jobs->enter;
+ jobs->enter= (jobs->enter + 1) % jobs->size;
+
+ // post boundary cond
+ if (jobs->avail == jobs->enter)
+ jobs->enter= jobs->size;
+
+ DBUG_ASSERT(jobs->enter == jobs->size ||
+ (jobs->len == (jobs->avail >= jobs->enter)? (jobs->avail - jobs->enter) :
+ (jobs->size + jobs->avail - jobs->enter)));
+
+ return ret;
+}
+
+/**
+ Coordinator enqueues a job item into a Worker private queue.
+
+ @param job_item a pointer to struct carrying a reference to an event
+ @param worker a pointer to the assigned Worker struct
+ @param rli a pointer to Relay_log_info of Coordinator
+*/
+void append_item_to_jobs(slave_job_item *job_item,
+ Slave_worker *worker, Relay_log_info *rli)
+{
+ THD *thd= rli->info_thd;
+ int ret;
+ ulong ev_size= ((Log_event*) (job_item->data))->data_written;
+ ulonglong new_pend_size;
+
+ DBUG_ASSERT(thd == current_thd);
+ thd_proc_info(thd, "Feeding an event to a worker thread");
+
+ mysql_mutex_lock(&rli->pending_jobs_lock);
+ new_pend_size= rli->mts_pending_jobs_size + ev_size;
+ // C waits basing on *data* sizes in the queues
+ while (new_pend_size > rli->mts_pending_jobs_size_max)
+ {
+ const char *old_msg;
+ const char info_format[]=
+ "Waiting for Slave Workers to free pending events, requested size %lu";
+ char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)];
+
+ sprintf(wait_info, info_format, new_pend_size);
+ rli->mts_wq_oversize= TRUE;
+ rli->wq_size_waits++; // waiting due to the total size
+ old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
+ wait_info);
+ mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
+ thd->exit_cond(old_msg);
+ if (thd->killed)
+ return;
+
+ mysql_mutex_lock(&rli->pending_jobs_lock);
+
+ new_pend_size= rli->mts_pending_jobs_size + ev_size;
+ }
+ rli->pending_jobs++;
+ rli->mts_pending_jobs_size= new_pend_size;
+ rli->stmt_jobs++;
+
+ mysql_mutex_unlock(&rli->pending_jobs_lock);
+
+ /*
+ Sleep unless there is an underrunning Worker.
+ */
+ if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF)
+ {
+ // todo: experiment with weight to get a good approximation formula
+ // The longer Sleep lasts the bigger is excessive overrun counter.
+ ulong nap_weight= rli->mts_wq_excess + 1;
+ my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
+ rli->mts_wq_no_underrun_cnt++;
+ }
+
+ ret= -1;
+
+ mysql_mutex_lock(&worker->jobs_lock);
+
+ // possible WQ overfill
+ while (worker->running_status == Slave_worker::RUNNING && !thd->killed &&
+ (ret= en_queue(&worker->jobs, job_item)) == -1)
+ {
+ const char *old_msg;
+ const char info_format[]=
+ "Waiting for Slave Worker %d queue: max len %lu, actual len %lu";
+ char wait_info[sizeof(info_format) + 4*sizeof(worker->id) +
+ 4*sizeof(worker->jobs.size) + 4*sizeof(worker->jobs.len)];
+
+ sprintf(wait_info, info_format, worker->id, worker->jobs.size, worker->jobs.len);
+ old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock, wait_info);
+ worker->jobs.overfill= TRUE;
+ worker->jobs.waited_overfill++;
+ rli->mts_wq_overfill_cnt++;
+ mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
+ thd->exit_cond(old_msg);
+
+ mysql_mutex_lock(&worker->jobs_lock);
+ }
+ if (ret != -1)
+ {
+ worker->curr_jobs++;
+ if (worker->jobs.len == 1)
+ mysql_cond_signal(&worker->jobs_cond);
+
+ mysql_mutex_unlock(&worker->jobs_lock);
+ }
+ else
+ {
+ mysql_mutex_unlock(&worker->jobs_lock);
+
+ mysql_mutex_lock(&rli->pending_jobs_lock);
+ rli->pending_jobs--; // roll back of the prev incr
+ rli->mts_pending_jobs_size -= ev_size;
+ mysql_mutex_unlock(&rli->pending_jobs_lock);
+ }
+}
+
+
+/**
+ Worker's routine to wait for a new assignement through
+ @c append_item_to_jobs()
+
+ @param worker a pointer to the waiting Worker struct
+ @param job_item a pointer to struct carrying a reference to an event
+
+ @return NULL failure or
+ a-pointer to an item.
+*/
+struct slave_job_item* pop_jobs_item(Slave_worker *worker, Slave_job_item *job_item)
+{
+ THD *thd= worker->info_thd;
+
+ mysql_mutex_lock(&worker->jobs_lock);
+
+ while (!job_item->data && !thd->killed &&
+ worker->running_status == Slave_worker::RUNNING)
+ {
+ const char *old_msg;
+
+ head_queue(&worker->jobs, job_item);
+ if (job_item->data == NULL)
+ {
+ worker->wq_empty_waits++;
+ old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock,
+ "Waiting for an event from sql thread");
+ mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
+ thd->exit_cond(old_msg);
+ mysql_mutex_lock(&worker->jobs_lock);
+ }
+ }
+ if (job_item->data)
+ worker->curr_jobs--;
+
+ mysql_mutex_unlock(&worker->jobs_lock);
+
+ thd_proc_info(worker->info_thd, "Executing event");
+ return job_item;
+}
+
+
+/**
+ MTS worker main routine.
+ The worker thread loops in waiting for an event, executing it and
+ fixing statistics counters.
+
+ @param worker a pointer to the assigned Worker struct
+ @param rli a pointer to Relay_log_info of Coordinator
+ to update statistics.
+
+ @note the function maintains worker's CGEP and modifies APH, updates
+ the current group item in GAQ via @c slave_worker_ends_group().
+
+ @return 0 success
+ -1 got killed or an error happened during appying
+*/
+int slave_worker_exec_job(Slave_worker *worker, Relay_log_info *rli)
+{
+ int error= 0;
+ struct slave_job_item item= {NULL}, *job_item= &item;
+ THD *thd= worker->info_thd;
+ Log_event *ev= NULL;
+ bool part_event= FALSE;
+
+ DBUG_ENTER("slave_worker_exec_job");
+
+ job_item= pop_jobs_item(worker, job_item);
+ if (thd->killed || worker->running_status != Slave_worker::RUNNING)
+ {
+ // de-queueing and decrement counters is in the caller's exit branch
+ error= -1;
+ goto err;
+ }
+ ev= static_cast<Log_event*>(job_item->data);
+ thd->server_id = ev->server_id;
+ thd->set_time();
+ thd->lex->current_select= 0;
+ if (!ev->when)
+ ev->when= my_time(0);
+ ev->thd= thd; // todo: assert because up to this point, ev->thd == 0
+
+ DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", worker->id, job_item, ev, thd));
+
+ if (ev->starts_group())
+ {
+ worker->curr_group_seen_begin= TRUE; // The current group is started with B-event
+ }
+ else
+ {
+ if ((part_event= ev->contains_partition_info()))
+ {
+ uint num_dbs= ev->mts_number_dbs();
+ DYNAMIC_ARRAY *ep= &worker->curr_group_exec_parts;
+
+ if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+ num_dbs= 1;
+
+ DBUG_ASSERT(num_dbs > 0);
+
+ for (uint k= 0; k < num_dbs; k++)
+ {
+ bool found= FALSE;
+
+ for (uint i= 0; i < ep->elements && !found; i++)
+ {
+ found=
+ *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
+ ev->mts_assigned_partitions[k];
+ }
+ if (!found)
+ {
+ /*
+ notice, can't assert
+ DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == worker);
+ since entry could be marked as wanted by other worker.
+ */
+ insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
+ }
+ }
+ }
+ }
+ worker->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
+ error= ev->do_apply_event_worker(worker);
+ if (ev->ends_group() || (!worker->curr_group_seen_begin &&
+ /*
+ p-events of B/T-less {p,g} group (see
+ legends of Log_event::get_slave_worker)
+ obviously can't commit.
+ */
+ part_event))
+ {
+ DBUG_PRINT("slave_worker_exec_job:",
+ (" commits GAQ index %lu, last committed %lu",
+ ev->mts_group_cnt, worker->last_group_done_index));
+ worker->slave_worker_ends_group(ev, error); /* last done sets post exec */
+
+#ifndef DBUG_OFF
+ worker->processed_group++;
+ DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group"
+ " %u processed %u debug %d\n", worker->id, mts_checkpoint_group,
+ worker->processed_group,
+ DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
+ if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
+ mts_checkpoint_group == worker->processed_group)
+ {
+ DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id));
+ while (true) my_sleep(6000000);
+ }
+#endif
+ }
+
+ mysql_mutex_lock(&worker->jobs_lock);
+ de_queue(&worker->jobs, job_item);
+
+ /* possible overfill */
+ if (worker->jobs.len == worker->jobs.size - 1 && worker->jobs.overfill == TRUE)
+ {
+ worker->jobs.overfill= FALSE;
+ // todo: worker->hungry_cnt++;
+ mysql_cond_signal(&worker->jobs_cond);
+ }
+ mysql_mutex_unlock(&worker->jobs_lock);
+
+ /* statistics */
+
+ /* todo: convert to rwlock/atomic write */
+ mysql_mutex_lock(&rli->pending_jobs_lock);
+
+ rli->pending_jobs--;
+ rli->mts_pending_jobs_size -= ev->data_written;
+ DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
+
+ // underrun (number of pending assignments is less than underrun level)
+ if ((rli->mts_worker_underrun_level * worker->jobs.size) / 100.0 >
+ worker->jobs.len)
+ {
+ rli->mts_wq_underrun_w_id= worker->id;
+ } else if (rli->mts_wq_underrun_w_id == worker->id)
+ {
+ // reset only own marking
+ rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
+ }
+
+ // overrun is symmetric to underrun. In a sense it's underrun to get to 100%
+ if (((100 - rli->mts_worker_underrun_level) * worker->jobs.size) / 100.0
+ < worker->jobs.len)
+ {
+ rli->mts_wq_excess++;
+ worker->wq_overrun_set= TRUE;
+ rli->mts_wq_overrun_cnt++;
+ }
+ else if (worker->wq_overrun_set == TRUE)
+ {
+ rli->mts_wq_excess--;
+ worker->wq_overrun_set= FALSE;
+ }
+
+ DBUG_ASSERT(rli->mts_wq_excess >= 0);
+
+ /* coordinator can be waiting */
+ if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
+ rli->mts_wq_oversize) // TODO: unit/general test wq_oversize
+ {
+ rli->mts_wq_oversize= FALSE;
+ mysql_cond_signal(&rli->pending_jobs_cond);
+ }
+
+ mysql_mutex_unlock(&rli->pending_jobs_lock);
+
+ worker->stmt_jobs++;
+
+err:
+ if (error)
+ {
+ sql_print_information("Worker %lu is exiting: killed %i, error %i, "
+ "running_status %d",
+ worker->id, thd->killed, thd->is_error(),
+ worker->running_status);
+ worker->slave_worker_ends_group(ev, error);
+ }
+
+ // rows_query_log_event is deleted as a part of the statement cleanup
+
+ // todo: sync_slave_with_master fails when my_sleep(1000) is put here
+
+ if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+ {
+ worker->last_event= ev;
+ delete ev;
+ }
+
+
+ DBUG_RETURN(error);
+}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2011-06-25 14:14:24 +0000
+++ b/sql/rpl_rli_pdb.h 2011-06-27 17:31:45 +0000
@@ -32,7 +32,7 @@ typedef struct st_db_worker_hash_entry
bool init_hash_workers(ulong slave_parallel_workers);
void destroy_hash_workers(Relay_log_info*);
-Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
+Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
db_worker_hash_entry **ptr_entry,
bool need_temp_tables, Slave_worker *w);
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
@@ -62,17 +62,17 @@ class circular_buffer_queue
public:
DYNAMIC_ARRAY Q;
- ulong s; // the Size of the queue in terms of element
- ulong a; // first Available index to append at (next to tail)
- ulong e; // the head index
- volatile ulong len; // it is also queried to compute least occupied
+ ulong size; // the Size of the queue in terms of element
+ ulong avail; // first Available index to append at (next to tail)
+ ulong enter; // the head index
+ volatile ulong len; // actual length
bool inited_queue;
circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
- s(max), a(0), e(max), len(0), inited_queue(FALSE)
+ size(max), avail(0), enter(max), len(0), inited_queue(FALSE)
{
- DBUG_ASSERT(s < ULONG_MAX);
- if (!my_init_dynamic_array(&Q, el_size, s, alloc_inc))
+ DBUG_ASSERT(size < ULONG_MAX);
+ if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
inited_queue= TRUE;
}
circular_buffer_queue () : inited_queue(FALSE) {}
@@ -111,9 +111,9 @@ public:
bool gt(ulong i, ulong k); // comparision of ordering of two entities
/* index is within the valid range */
bool in(ulong k) { return !empty() &&
- (e > a ? (k >= e || k < a) : (k >= e && k < a)); }
- bool empty() { return e == s; }
- bool full() { return a == s; }
+ (enter > avail ? (k >= enter || k < avail) : (k >= enter && k < avail)); }
+ bool empty() { return enter == size; }
+ bool full() { return avail == size; }
};
typedef struct st_slave_job_group
@@ -237,26 +237,23 @@ public:
DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
- // @c last_group_done_index is for recovery, although can be viewed
- // as statistics as well.
- // C marks a T-event with the incremented group_cnt that is
- // an index in GAQ; W stores it at the event execution.
- // C polls the value periodically to maintain an array
- // of the indexes in order to progress on GAQ's lwm, see @c next_event().
- // see @c Log_event::group_cnt.
- volatile ulong last_group_done_index; // it's index in GAQ
-
List<Log_event> data_in_use; // events are still in use by SQL thread
ulong id;
TABLE *current_table;
// rbr
- RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
- uint tables_to_lock_count; /* RBR: Count of tables to lock */
- table_mapping m_table_map; /* RBR: Mapping table-id to table */
+ // RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
+ // uint tables_to_lock_count; /* RBR: Count of tables to lock */
+ // table_mapping m_table_map; /* RBR: Mapping table-id to table */
// statictics
- ulong wait_jobs; // to gather statistics how many times got idle
+
+ /*
+ @c last_group_done_index is for statistics
+ to mean the index in GAQ of the last processed group.
+ */
+ volatile ulong last_group_done_index; // it's index in GAQ
+ ulong wq_empty_waits; // to gather statistics how many times got idle
ulong stmt_jobs; // how many jobs per stmt
ulong trans_jobs; // how many jobs per trns
volatile int curr_jobs; // the current assignments
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-06-27 12:12:52 +0000
+++ b/sql/rpl_slave.cc 2011-06-27 17:31:45 +0000
@@ -3830,7 +3830,7 @@ pthread_handler_t handle_slave_worker(vo
"events processed = %lu "
"hungry waits = %lu "
"priv queue overfills = %llu "
- ,w->id, w->stmt_jobs, w->wait_jobs, w->jobs.waited_overfill);
+ ,w->id, w->stmt_jobs, w->wq_size_waits, w->jobs.waited_overfill);
mysql_cond_signal(&w->jobs_cond); // famous last goodbye
mysql_mutex_unlock(&w->jobs_lock);
@@ -4249,22 +4249,22 @@ int slave_start_single_worker(Relay_log_
w->bitmap_shifted= 0;
w->workers= rli->workers; // shallow copying is sufficient
w->this_worker= w;
- w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
+ w->wq_size_waits= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
w->id= i;
w->current_table= NULL;
w->usage_partition= 0;
- w->last_group_done_index= rli->gaq->s; // out of range
+ w->last_group_done_index= rli->gaq->size; // out of range
- w->jobs.a= 0;
+ w->jobs.avail= 0;
w->jobs.len= 0;
w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
w->jobs.waited_overfill= 0;
- w->jobs.e= w->jobs.s= rli->mts_slave_worker_queue_len_max;
- my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0);
- for (k= 0; k < w->jobs.s; k++)
+ w->jobs.enter= w->jobs.size= rli->mts_slave_worker_queue_len_max;
+ my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.size, 0);
+ for (k= 0; k < w->jobs.size; k++)
insert_dynamic(&w->jobs.Q, (uchar*) &empty);
- DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
+ DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size);
w->wq_overrun_set= FALSE;
set_dynamic(&rli->workers, (uchar*) &w, i);
@@ -4338,9 +4338,10 @@ int slave_start_workers(Relay_log_info *
rli->mts_pending_jobs_size= 0;
rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
- rli->mts_wqs_underrun_w_id= (ulong) -1;
- rli->mts_wqs_overrun= 0;
- rli->mts_wqs_oversize= FALSE;
+ rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
+ rli->mts_wq_excess= 0;
+ rli->mts_wq_overrun_cnt= 0;
+ rli->mts_wq_oversize= FALSE;
rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap;
rli->mts_worker_underrun_level= mts_worker_underrun_level;
rli->mts_total_groups= 0;
@@ -4470,7 +4471,7 @@ void slave_stop_workers(Relay_log_info *
w->end_info();
- DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
+ DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size);
delete_dynamic(&w->jobs.Q);
delete_dynamic_element(&rli->workers, i);
delete w;
@@ -4478,10 +4479,13 @@ void slave_stop_workers(Relay_log_info *
sql_print_information("MTS coordinator statistics: "
"events processed = %lu "
- "waits due a Worker queue full = %lu "
- "waits due the total size = %lu "
- "sleeps when Workers occupied = %lu "
- ,rli->stmt_jobs, rli->mts_wqs_overfill_cnt, rli->wait_jobs, rli->mts_wqs_underrun_cnt);
+ "Worker queues filled over overrun level = %lu "
+ "waited due a Worker queue full = %lu "
+ "waited due the total size = %lu "
+ "sleept when Workers occupied = %lu ",
+ rli->stmt_jobs, rli->mts_wq_overrun_cnt,
+ rli->mts_wq_overfill_cnt, rli->wq_size_waits,
+ rli->mts_wq_no_underrun_cnt);
DBUG_ASSERT(rli->pending_jobs == 0);
DBUG_ASSERT(rli->mts_pending_jobs_size == 0);
=== modified file 'sql/rpl_slave.h'
--- a/sql/rpl_slave.h 2011-06-09 15:27:47 +0000
+++ b/sql/rpl_slave.h 2011-06-27 17:31:45 +0000
@@ -55,6 +55,9 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_S
#define MAX_SLAVE_ERROR 2000
+#define MTS_WORKER_UNDEF ((ulong) -1)
+#define MTS_MAX_WORKERS 1024
+
// Forward declarations
class Relay_log_info;
class Master_info;
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2011-06-25 14:14:24 +0000
+++ b/sql/sys_vars.cc 2011-06-27 17:31:45 +0000
@@ -3361,7 +3361,7 @@ static Sys_var_ulong Sys_slave_parallel_
"slave_parallel_workers",
"Number of worker threads for executing events in parallel ",
GLOBAL_VAR(opt_mts_slave_parallel_workers), CMD_LINE(REQUIRED_ARG),
- VALID_RANGE(0, 1024), DEFAULT(0), BLOCK_SIZE(1));
+ VALID_RANGE(0, MTS_MAX_WORKERS), DEFAULT(0), BLOCK_SIZE(1));
static Sys_var_ulonglong Sys_mts_pending_jobs_size_max(
"slave_pending_jobs_size_max",
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110627173145-dynwl2o5rr8iynhe.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3325) WL#5569 | Andrei Elkin | 28 Jun |