From: Andrei Elkin Date: June 27 2011 5:31pm Subject: bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3325) WL#5569 List-Archive: http://lists.mysql.com/commits/139931 Message-Id: <201106271731.p5RHVp5B029431@mysql1000.dsl.inet.fi> MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="===============0566371524==" --===============0566371524== MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Content-Disposition: inline #At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped 3325 Andrei Elkin 2011-06-27 wl#5569 MTS Cleanup and addressing sporadic rpl_temp_table_mix_row failure in post-execution mtr.check_testcase(). The check of the test failure was caused by faulty optimization in avoiding to migrate temporary tables from Coordinator to Workers in case of rows-event assignement. while it's correct with the homogenous rows-event only load, the mixture can fail. Fixed with removing the optimization so map_db_to_worker() always relocates which is somewhat suboptimal and should be improved in future. @ mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test Adding slave synchronization. @ sql/log_event.cc cleanup to move circular_buffer releated definitions into rpl_rli_pdb that is specialized on objects dealing with Worker, its assignement etc. improving comments; also instead of former separate flag indicating a T-event requires post-scheduling synchronization with the Worker is turned into a bit of existing Log_event::flags which also avoids ungliness of #if/#endif:s. @ sql/log_event.h instead of former separate flag indicating a T-event requires post-scheduling synchronization with the Worker is turned into a bit of existing Log_event::flags; @ sql/rpl_rli.cc cleanup: renaming. @ sql/rpl_rli.h cleanup: renaming, more comments. The former mts_wqs_overrun is converted into two: the statistics parameter mts_wq_overrun_cnt and the internal control parameter mts_wq_excess. @ sql/rpl_rli_pdb.cc Included rpl_slave.h that holds two necessary declarations; Cleanup: accepting circular_buffer related definitions migrated from log_event, improved comments, renaming, removing dead code @ sql/rpl_rli_pdb.h Cleanup: renaming and more comments are added. @ sql/rpl_slave.cc Augmenting print-out of statistics at the end of MTS session; cleanup: renaming. @ sql/rpl_slave.h Introducing two constants to define range of worker_id domain and a magic value of undefined worker. @ sql/sys_vars.cc replacing a literal int value with a symbilic constant. modified: mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test sql/log_event.cc sql/log_event.h sql/rpl_rli.cc sql/rpl_rli.h sql/rpl_rli_pdb.cc sql/rpl_rli_pdb.h sql/rpl_slave.cc sql/rpl_slave.h sql/sys_vars.cc === modified file 'mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test' --- a/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test 2010-12-19 17:15:12 +0000 +++ b/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test 2011-06-27 17:31:45 +0000 @@ -207,4 +207,7 @@ source include/show_binlog_events.inc; --echo connection master; DROP TABLE t1; + +sync_slave_with_master; + --source include/rpl_end.inc === modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2011-06-27 12:12:52 +0000 +++ b/sql/log_event.cc 2011-06-27 17:31:45 +0000 @@ -672,9 +672,6 @@ Log_event::Log_event(enum_event_cache_ty :temp_buf(0), exec_time(0), flags(0), event_cache_type(cache_type_arg), event_logging_type(logging_type_arg), crc(0), thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) -#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - , m_mts_event_isolated_group(FALSE) -#endif { server_id= ::server_id; /* @@ -697,9 +694,6 @@ Log_event::Log_event(const char* buf, event_cache_type(EVENT_INVALID_CACHE), event_logging_type(EVENT_INVALID_LOGGING), crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) -#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - , m_mts_event_isolated_group(FALSE) -#endif { #ifndef MYSQL_CLIENT thd = 0; @@ -2368,12 +2362,8 @@ bool Log_event::contains_partition_info( } /** - General hashing function to compute the id of an applier for - the current event. - At computing the id few rules apply depending on partitioning properties - that the event instance can feature. - - Let's call the properties through the following legends: + The method maps the event to a Worker and return a pointer to it. + As a part of the group, an event belongs to one of the following types: B - beginning of a group of events (BEGIN query_log_event) g - mini-group representative event containing the partition info @@ -2382,30 +2372,33 @@ bool Log_event::contains_partition_info( (int_, rand_, user_ var:s) r - a mini-group internal "regular" event that follows its g-parent (Delete, Update, Write -rows) - S - sequentially applied event (may not be a part of any group). - Events of this type are determined via @c mts_sequential_exec() - earlier and don't cause calling this method . - T - terminator of the group (XID, COMMIT, ROLLBACK) - - Only `g' case really computes the assigned Worker id which must - be memorized by the caller and is available through @c rli argument. - For instance DUW-rows events are mapped to a Worker previously chosen - at assigning of their Table-map parent g-event. - In `B' case the assigned Worker is NULL to indicate the Coordinator will - postpone scheduling until a following `g' event decides on a Worker. + T - terminator of the group (XID, COMMIT, ROLLBACK, auto-commit query) + + Only the first g-type event computes the assigned Worker which once + is determined remains to be for the rest of the group. + That is the g-type event solely carries partitioning info. + For B-type the assigned Worker is NULL to indicate Coordinator + has not yet decided. The same applies to p-type. - A group can consist of multiple events still without explict B - event. This is a case of old master binlog or few corner-cases of - the current master version (todo: to fix). Such group structure is - supposed to be {{p_i},g} that is it ends with the first not p-event. - Such g-event is marked with set_mts_event_ends_group(). + Notice, these is a special group consisting of optionally multiple p-events + terminating with a g-event. + Such case is caused by old master binlog and a few corner-cases of + the current master version (todo: to fix). + + In case of the event accesses more than OVER_MAX_DBS the method + has to ensure sure previously assigned groups to all other workers are + done. - @note The function can update APH, CGAP, GAQ objects. + + @note The function can update APH (through map_db_to_worker()), GAQ objects + and relocate some temporary tables from Coordinator's list into + involved entries of APH. + There's few memory allocations commented where to be freed. - @return a pointer to the Worker stuct or NULL. + @return a pointer to the Worker struct or NULL. */ -Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli) +Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) { Slave_job_group g, *ptr_g; bool is_b_event; @@ -2424,7 +2417,7 @@ Slave_worker *Log_event::get_slave_worke (rli->gaq->empty() || ((Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))-> - worker_id != (ulong) -1))) + worker_id != MTS_WORKER_UNDEF))) { ulong gaq_idx; rli->mts_total_groups++; @@ -2434,7 +2427,7 @@ Slave_worker *Log_event::get_slave_worke g.group_master_log_pos= g.group_relay_log_pos= 0; g.group_master_log_name= NULL; // todo: remove g.group_relay_log_name= NULL; - g.worker_id= (ulong) -1; + g.worker_id= MTS_WORKER_UNDEF; g.total_seqno= rli->mts_total_groups; g.checkpoint_log_name= NULL; g.checkpoint_log_pos= 0; @@ -2446,11 +2439,11 @@ Slave_worker *Log_event::get_slave_worke // the last occupied GAQ's array index gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g); - DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s); + DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < rli->gaq->size); DBUG_ASSERT(((Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))-> group_relay_log_name == NULL); - DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room + DBUG_ASSERT(rli->gaq->assigned_group_index != MTS_WORKER_UNDEF); // gaq must have room DBUG_ASSERT(rli->last_assigned_worker == NULL); if (is_b_event) @@ -2485,7 +2478,9 @@ Slave_worker *Log_event::get_slave_worke if (!ret_worker) ret_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0); (void) wait_for_workers_to_finish(rli, ret_worker); - + /* + this marking is transferred further into T-event of the current group. + */ rli->curr_group_isolated= TRUE; } @@ -2494,10 +2489,14 @@ Slave_worker *Log_event::get_slave_worke char **ref_cur_db= it.ref(); if (!(ret_worker= - get_slave_worker(*ref_cur_db, rli, + map_db_to_worker(*ref_cur_db, rli, &mts_assigned_partitions[i], - // only rows-events do not need temporary tables - get_type_code() != TABLE_MAP_EVENT, ret_worker))) + /* + todo: optimize it. Although pure + rows- event load in insensetive to the flag value + */ + TRUE, + ret_worker))) { llstr(rli->get_event_relay_log_pos(), llbuff); rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL, @@ -2518,7 +2517,7 @@ Slave_worker *Log_event::get_slave_worke if ((ptr_g= ((Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index)))->worker_id - == (ulong) -1) + == MTS_WORKER_UNDEF) { ptr_g->worker_id= ret_worker->id; @@ -2582,20 +2581,14 @@ Slave_worker *Log_event::get_slave_worke } } - // the group terminal event: - // Commit, Xid, a DDL query or dml query of B-less group. + // T-event: Commit, Xid, a DDL query or dml query of B-less group. if (ends_group() || !rli->curr_group_seen_begin) { // index of GAQ that this terminal event belongs to mts_group_cnt= rli->gaq->assigned_group_index; - /* - special marking for T event of a group containing over-max db:s event - including {p,g} B-less group. - */ + rli->mts_group_status= Relay_log_info::MTS_END_GROUP; if (rli->curr_group_isolated) mts_do_isolate_group(); - rli->mts_group_status= Relay_log_info::MTS_END_GROUP; - ptr_g= (Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index); @@ -2650,7 +2643,7 @@ Slave_worker *Log_event::get_slave_worke ret_worker->checkpoint_notified= TRUE; } ptr_g->checkpoint_seqno= rli->checkpoint_seqno; - ptr_g->ts= when + (time_t) exec_time; + ptr_g->ts= when + (time_t) exec_time; // Seconds_behind_master related rli->checkpoint_seqno++; // reclaiming resources allocated during the group scheduling @@ -2660,179 +2653,6 @@ Slave_worker *Log_event::get_slave_worke return ret_worker; } -// returns the next available! (TODO: incompatible to circurla_buff method!!!) -static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item) -{ - if (jobs->a == jobs->s) - { - DBUG_ASSERT(jobs->a == jobs->Q.elements); - return -1; - } - - // store - - set_dynamic(&jobs->Q, (uchar*) item, jobs->a); - - // pre-boundary cond - if (jobs->e == jobs->s) - jobs->e= jobs->a; - - jobs->a= (jobs->a + 1) % jobs->s; - jobs->len++; - - // post-boundary cond - if (jobs->a == jobs->e) - jobs->a= jobs->s; - DBUG_ASSERT(jobs->a == jobs->e || - jobs->len == (jobs->a >= jobs->e) ? - (jobs->a - jobs->e) : (jobs->s + jobs->a - jobs->e)); - return jobs->a; -} - -/** - return the value of @c data member of the head of the queue. -*/ -static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) -{ - if (jobs->e == jobs->s) - { - DBUG_ASSERT(jobs->len == 0); - ret->data= NULL; // todo: move to caller - return NULL; - } - get_dynamic(&jobs->Q, (uchar*) ret, jobs->e); - - DBUG_ASSERT(ret->data); // todo: move to caller - - return ret; -} - - -/** - return a job item through a struct which point is supplied via argument. -*/ -Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) -{ - if (jobs->e == jobs->s) - { - DBUG_ASSERT(jobs->len == 0); - return NULL; - } - get_dynamic(&jobs->Q, (uchar*) ret, jobs->e); - jobs->len--; - - // pre boundary cond - if (jobs->a == jobs->s) - jobs->a= jobs->e; - jobs->e= (jobs->e + 1) % jobs->s; - - // post boundary cond - if (jobs->a == jobs->e) - jobs->e= jobs->s; - - DBUG_ASSERT(jobs->e == jobs->s || - (jobs->len == (jobs->a >= jobs->e)? (jobs->a - jobs->e) : - (jobs->s + jobs->a - jobs->e))); - - return ret; -} - -void append_item_to_jobs(slave_job_item *job_item, - Slave_worker *w, Relay_log_info *rli) -{ - THD *thd= rli->info_thd; - int ret; - ulong ev_size= ((Log_event*) (job_item->data))->data_written; - ulonglong new_pend_size; - - DBUG_ASSERT(thd == current_thd); - thd_proc_info(thd, "Feeding an event to a worker thread"); - - mysql_mutex_lock(&rli->pending_jobs_lock); - new_pend_size= rli->mts_pending_jobs_size + ev_size; - // C waits basing on *data* sizes in the queues - while (new_pend_size > rli->mts_pending_jobs_size_max) - { - const char *old_msg; - const char info_format[]= - "Waiting for Slave Workers to free pending events, requested size %lu"; - char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)]; - - sprintf(wait_info, info_format, new_pend_size); - rli->mts_wqs_oversize= TRUE; - rli->wait_jobs++; // waiting due to the total size - old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock, - wait_info); - mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); - thd->exit_cond(old_msg); - if (thd->killed) - return; - - mysql_mutex_lock(&rli->pending_jobs_lock); - - new_pend_size= rli->mts_pending_jobs_size + ev_size; - } - rli->pending_jobs++; - rli->mts_pending_jobs_size= new_pend_size; - rli->stmt_jobs++; - - mysql_mutex_unlock(&rli->pending_jobs_lock); - - // sleep while all queue lengths are gt Underrun - // sleep time lasts the longer the further WQ:s shift to Overrun - // Workers report their U,O status - - if (rli->mts_wqs_underrun_w_id != (ulong) -1) - { - // todo: experiment with weight to get a good approximation formula - ulong nap_weight= rli->mts_wqs_overrun + 1; - my_sleep(nap_weight * rli->mts_coordinator_basic_nap); - rli->mts_wqs_underrun_cnt++; - } - - ret= -1; - - mysql_mutex_lock(&w->jobs_lock); - - // possible WQ overfill - while (w->running_status == Slave_worker::RUNNING && !thd->killed && - (ret= en_queue(&w->jobs, job_item)) == -1) - { - const char *old_msg; - const char info_format[]= - "Waiting for Slave Worker %d queue: max len %lu, actual len %lu"; - char wait_info[sizeof(info_format) + 4*sizeof(w->id) + - 4*sizeof(w->jobs.s) + 4*sizeof(w->jobs.len)]; - - sprintf(wait_info, info_format, w->id, w->jobs.s, w->jobs.len); - old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, wait_info); - w->jobs.overfill= TRUE; - w->jobs.waited_overfill++; - rli->mts_wqs_overfill_cnt++; - mysql_cond_wait(&w->jobs_cond, &w->jobs_lock); - thd->exit_cond(old_msg); - - mysql_mutex_lock(&w->jobs_lock); - } - if (ret != -1) - { - w->curr_jobs++; - if (w->jobs.len == 1) - mysql_cond_signal(&w->jobs_cond); - - mysql_mutex_unlock(&w->jobs_lock); - } - else - { - mysql_mutex_unlock(&w->jobs_lock); - - mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs--; // roll back of the prev incr - rli->mts_pending_jobs_size -= ev_size; - mysql_mutex_unlock(&rli->pending_jobs_lock); - } -} - /** Scheduling event to execute in parallel or execute it directly. In MTS case the event gets associated with either Coordinator or a @@ -2942,7 +2762,7 @@ int Log_event::apply_event(Relay_log_inf c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP; worker= (Relay_log_info*) - (c_rli->last_assigned_worker= get_slave_worker_id(c_rli)); + (c_rli->last_assigned_worker= get_slave_worker(c_rli)); #ifndef DBUG_OFF if (c_rli->last_assigned_worker) @@ -2971,231 +2791,6 @@ err: 0 : -1); } - -/** - Worker's routine to wait for a new assignement in its - private queue. - - @return NULL failure or - a-pointer to an item. -*/ -struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item) -{ - THD *thd= w->info_thd; - - mysql_mutex_lock(&w->jobs_lock); - - while (!job_item->data && !thd->killed && - w->running_status == Slave_worker::RUNNING) - { - const char *old_msg; - - head_queue(&w->jobs, job_item); - if (job_item->data == NULL) - { - w->wait_jobs++; - old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, - "Waiting for an event from sql thread"); - mysql_cond_wait(&w->jobs_cond, &w->jobs_lock); - thd->exit_cond(old_msg); - mysql_mutex_lock(&w->jobs_lock); - } - } - if (job_item->data) - w->curr_jobs--; - - mysql_mutex_unlock(&w->jobs_lock); - - thd_proc_info(w->info_thd, "Executing event"); - return job_item; -} - - -/** - MTS worker main routine. - The worker thread waits for an event, execute it, fixes statistics counters. - - @note the function maintains CGEP and modifies APH, and causes - modification of GAQ. - - @return 0 success - -1 got killed or an error happened during appying -*/ -int slave_worker_exec_job(Slave_worker *w, Relay_log_info *rli) -{ - int error= 0; - struct slave_job_item item= {NULL}, *job_item= &item; - THD *thd= w->info_thd; - Log_event *ev= NULL; - bool part_event= FALSE; - - DBUG_ENTER("slave_worker_exec_job"); - - job_item= pop_jobs_item(w, job_item); - if (thd->killed || w->running_status != Slave_worker::RUNNING) - { - // de-queueing and decrement counters is in the caller's exit branch - error= -1; - goto err; - } - ev= static_cast(job_item->data); - thd->server_id = ev->server_id; - thd->set_time(); - thd->lex->current_select= 0; - if (!ev->when) - ev->when= my_time(0); - ev->thd= thd; // todo: assert because up to this point, ev->thd == 0 - - DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", w->id, job_item, ev, thd)); - - if (ev->starts_group()) - { - w->curr_group_seen_begin= TRUE; // The current group is started with B-event - } - else - { - if ((part_event= ev->contains_partition_info())) - { - uint num_dbs= ev->mts_number_dbs(); - DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts; - - if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS) - num_dbs= 1; - - DBUG_ASSERT(num_dbs > 0); - - for (uint k= 0; k < num_dbs; k++) - { - bool found= FALSE; - - for (uint i= 0; i < ep->elements && !found; i++) - { - found= - *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) == - ev->mts_assigned_partitions[k]; - } - if (!found) - { - /* - notice, can't assert - DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == w); - since entry could be marked as wanted by other worker. - */ - insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]); - } - } - } - } - w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos); - error= ev->do_apply_event_worker(w); - if (ev->ends_group() || (!w->curr_group_seen_begin && - /* - p-events of B/T-less {p,g} group (see - legends of Log_event::get_slave_worker) - obviously can't commit. - */ - part_event)) - { - DBUG_PRINT("slave_worker_exec_job:", - (" commits GAQ index %lu, last committed %lu", - ev->mts_group_cnt, w->last_group_done_index)); - w->slave_worker_ends_group(ev, error); /* last done sets post exec */ - -#ifndef DBUG_OFF - w->processed_group++; - DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group " - "%u processed %u debug %d\n", w->id, mts_checkpoint_group, - w->processed_group, - DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0))); - if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && - mts_checkpoint_group == w->processed_group) - { - DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", w->id)); - while (true) my_sleep(6000000); - } -#endif - } - - mysql_mutex_lock(&w->jobs_lock); - de_queue(&w->jobs, job_item); - - /* possible overfill */ - if (w->jobs.len == w->jobs.s - 1 && w->jobs.overfill == TRUE) - { - w->jobs.overfill= FALSE; - // todo: w->hungry_cnt++; - mysql_cond_signal(&w->jobs_cond); - } - mysql_mutex_unlock(&w->jobs_lock); - - /* statistics */ - - mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs--; - rli->mts_pending_jobs_size -= ev->data_written; - DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max); - - // underrun - if ((rli->mts_worker_underrun_level * w->jobs.s) / 100 > w->jobs.len) - { - rli-> mts_wqs_underrun_w_id= w->id; - // todo: w->underrun_cnt++; - } else if (rli->mts_wqs_underrun_w_id == w->id) - { - rli->mts_wqs_underrun_w_id= (ulong) -1; - } - - // overrun exploits the underrun level param - if (((100 - rli->mts_worker_underrun_level) * w->jobs.s) / 100 < w->jobs.len) - { - rli->mts_wqs_overrun++; - w->wq_overrun_set= TRUE; - // todo: w->underrun_cnt++; - } - else if (w->wq_overrun_set == TRUE) - { - rli->mts_wqs_overrun--; - w->wq_overrun_set= FALSE; - } - - DBUG_ASSERT(rli->mts_wqs_overrun >= 0); - - /* coordinator can be waiting */ - if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max && - rli->mts_wqs_oversize) // TODO: unit/general test wqs_oversize - { - rli->mts_wqs_oversize= FALSE; - mysql_cond_signal(&rli->pending_jobs_cond); - } - - mysql_mutex_unlock(&rli->pending_jobs_lock); - - w->stmt_jobs++; - -err: - if (error) - { - sql_print_information("Worker %lu is exiting: killed %i, error %i, " - "running_status %d", - w->id, thd->killed, thd->is_error(), - w->running_status); - w->slave_worker_ends_group(ev, error); - } - - // rows_query_log_event is deleted as a part of the statement cleanup - - // todo: sync_slave_with_master fails when my_sleep(1000) is put here - - if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) - { - w->last_event= ev; - delete ev; - } - - - DBUG_RETURN(error); -} - #endif /************************************************************************** === modified file 'sql/log_event.h' --- a/sql/log_event.h 2011-06-14 09:27:38 +0000 +++ b/sql/log_event.h 2011-06-27 17:31:45 +0000 @@ -536,6 +536,15 @@ struct sql_ex_info */ #define LOG_EVENT_NO_FILTER_F 0x100 +/** + MTS: group of events can be marked to force its execution + in isolation from any other Workers. + Typically that is done for a transaction that contains + a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s. + The flag is set ON for an event that terminates its group. +*/ +#define LOG_EVENT_MTS_ISOLATE_F 0x200 + /** @def OPTIONS_WRITTEN_TO_BIN_LOG @@ -1330,7 +1339,7 @@ public: to be assigned worker; M is the max index of the worker pool. */ - Slave_worker *get_slave_worker_id(Relay_log_info *rli); + Slave_worker *get_slave_worker(Relay_log_info *rli); /* The method returns a list of updated by the event databases. @@ -1361,12 +1370,14 @@ public: get_type_code() == QUERY_EVENT || get_type_code() == EXEC_LOAD_EVENT || get_type_code() == EXECUTE_LOAD_QUERY_EVENT); - m_mts_event_isolated_group= TRUE; + flags |= LOG_EVENT_MTS_ISOLATE_F; } + /* - Verifying whether event is marked to execute in isolation. + Verifying whether the terminal event of a group is marked to + execute in isolation. */ - virtual bool mts_is_group_isolated() { return m_mts_event_isolated_group; } + bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; } /** Apply the event to the database. @@ -1501,8 +1512,6 @@ protected: non-zero. The caller shall decrease the counter by one. */ virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); - - bool m_mts_event_isolated_group; #endif }; === modified file 'sql/rpl_rli.cc' --- a/sql/rpl_rli.cc 2011-06-22 17:54:23 +0000 +++ b/sql/rpl_rli.cc 2011-06-27 17:31:45 +0000 @@ -112,8 +112,8 @@ void Relay_log_info::init_workers(ulong Parallel slave parameters initialization is done regardless whether the feature is or going to be active or not. */ - trans_jobs= stmt_jobs= pending_jobs= wait_jobs= 0; - mts_wqs_underrun_cnt= mts_wqs_overfill_cnt= 0; + trans_jobs= stmt_jobs= pending_jobs= wq_size_waits= 0; + mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0; my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4); my_atomic_rwlock_init(&slave_open_temp_tables_lock); === modified file 'sql/rpl_rli.h' --- a/sql/rpl_rli.h 2011-06-24 12:38:19 +0000 +++ b/sql/rpl_rli.h 2011-06-27 17:31:45 +0000 @@ -432,34 +432,42 @@ public: time_t last_event_start_time; /* - WL#5569 MTS-II + WL#5569 MTS + + legends: + C - Coordinator; + W - Worker; + WQ - Worker Query containing event assignments */ DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers volatile ulong pending_jobs; - ulong trans_jobs, wait_jobs, stmt_jobs; // wait_jobs - waiting times due to the total size + ulong stmt_jobs; // statistics: number of events (statements) processed + ulong trans_jobs; // statistics: number of groups (transactions) processed + ulong wq_size_waits; // number of times C goes to sleep due to WQ:s oversize mysql_mutex_t pending_jobs_lock; mysql_cond_t pending_jobs_cond; ulong mts_slave_worker_queue_len_max; ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s ulonglong mts_pending_jobs_size_max; // the max forcing to wait by C - bool mts_wqs_oversize; // C raises flag to wait some memory's released + bool mts_wq_oversize; // C raises flag to wait some memory's released Slave_worker *last_assigned_worker; // a hint to partitioning func for some events Slave_committed_queue *gaq; DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events bool curr_group_seen_begin; // current group started with B-event or not bool curr_group_isolated; // current group requires execution in isolation - volatile ulong mts_wqs_underrun_w_id; // Id of a Worker whose queue is getting empty - volatile long mts_wqs_overrun; // W to incr and decr - ulong mts_wqs_underrun_cnt; // Coord goes to sleep when senses Workers are content - ulong mts_wqs_overfill_cnt; // Coord waits if a W's queue is full - long mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry + volatile ulong mts_wq_underrun_w_id; // Id of a Worker whose queue is getting empty + volatile long mts_wq_excess; // excessive overrun counter; when W increments and decrements it it also marks updates its own wq_overrun_set + volatile ulong mts_wq_overrun_cnt; // statistics of all mts_wq_excess increments + ulong mts_wq_no_underrun_cnt; // counts times of C goes to sleep when W:s are filled + ulong mts_wq_overfill_cnt; // counts C waits when a W's queue is full + long mts_worker_underrun_level; // percent of WQ size at which W is considered hungry ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL. ulonglong mts_total_groups; // total event groups distributed in current session ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers ulong slave_parallel_workers; // the one slave session time number of workers - ulong recovery_parallel_workers; // number of workers while recovering. + ulong recovery_parallel_workers; // number of workers while recovering /* A sorted array of Worker current assignements number to provide === modified file 'sql/rpl_rli_pdb.cc' --- a/sql/rpl_rli_pdb.cc 2011-06-25 14:14:24 +0000 +++ b/sql/rpl_rli_pdb.cc 2011-06-27 17:31:45 +0000 @@ -2,6 +2,7 @@ #include "sql_priv.h" #include "unireg.h" #include "rpl_rli_pdb.h" +#include "rpl_slave.h" #include "sql_string.h" #include @@ -252,6 +253,8 @@ bool Slave_worker::commit_positions(Log_ // extract an updated relay-log name to store in Worker's rli. if (ptr_g->group_relay_log_name) { + DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1 + <= sizeof(group_relay_log_name)); strmake(group_relay_log_name, ptr_g->group_relay_log_name, sizeof(group_relay_log_name) - 1); } @@ -515,7 +518,7 @@ static void move_temp_tables_to_entry(TH @return the pointer to a Worker struct */ -Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli, +Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *last_worker) { @@ -753,17 +756,16 @@ Slave_worker *get_least_occupied_worker( } /** - Deallocative routine that makes few things in opposite to - @c get_slave_worker(). - - Affected by the being committed group APH tuples are updated. - @c last_group_done_index member is set to the arg value. - + Deallocation routine to cancel out few effects of + @c map_db_to_worker(). + Involved into processing of the group APH tuples are updated. + @c last_group_done_index member is set to the GAQ index of + the current group. CGEP the Worker partition cache is cleaned up. - TODO: reclaim space if the actual size exceeds the limit. + @param ev a pointer to Log_event + @param error error code after processing the event by caller. */ - void Slave_worker::slave_worker_ends_group(Log_event* ev, int error) { DBUG_ENTER("Slave_worker::slave_worker_ends_group"); @@ -775,20 +777,10 @@ void Slave_worker::slave_worker_ends_gro (Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx); // first ever group must have relay log name - DBUG_ASSERT(last_group_done_index != c_rli->gaq->s || + DBUG_ASSERT(last_group_done_index != c_rli->gaq->size || ptr_g->group_relay_log_name != NULL); DBUG_ASSERT(ptr_g->worker_id == id); - if (ptr_g->group_relay_log_name != NULL) - { - // memorizing a new relay-log file name - - DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1 - <= sizeof(group_relay_log_name)); - - strcpy(group_relay_log_name, ptr_g->group_relay_log_name); - } - if (!(ev->get_type_code() == XID_EVENT && is_transactional())) { commit_positions(ev, ptr_g); @@ -799,6 +791,7 @@ void Slave_worker::slave_worker_ends_gro ptr_g->group_master_log_pos= group_master_log_pos; ptr_g->group_relay_log_pos= group_relay_log_pos; + ptr_g->done= 1; // GAQ index is available to C now last_group_done_index= gaq_idx; @@ -887,29 +880,29 @@ void Slave_worker::slave_worker_ends_gro ulong circular_buffer_queue::de_queue(uchar *val) { ulong ret; - if (e == s) + if (enter == size) { DBUG_ASSERT(len == 0); return (ulong) -1; } - ret= e; - get_dynamic(&Q, val, e); + ret= enter; + get_dynamic(&Q, val, enter); len--; // pre boundary cond - if (a == s) - a= e; - e= (e + 1) % s; + if (avail == size) + avail= enter; + enter= (enter + 1) % size; // post boundary cond - if (a == e) - e= s; + if (avail == enter) + enter= size; - DBUG_ASSERT(e == s || - (len == (a >= e)? (a - e) : - (s + a - e))); - DBUG_ASSERT(a != e); + DBUG_ASSERT(enter == size || + (len == (avail >= enter)? (avail - enter) : + (size + avail - enter))); + DBUG_ASSERT(avail != enter); return ret; } @@ -919,26 +912,26 @@ ulong circular_buffer_queue::de_queue(uc */ ulong circular_buffer_queue::de_tail(uchar *val) { - if (e == s) + if (enter == size) { DBUG_ASSERT(len == 0); return (ulong) -1; } - a= (e + len - 1) % s; - get_dynamic(&Q, val, a); + avail= (enter + len - 1) % size; + get_dynamic(&Q, val, avail); len--; // post boundary cond - if (a == e) - e= s; + if (avail == enter) + enter= size; - DBUG_ASSERT(e == s || - (len == (a >= e)? (a - e) : - (s + a - e))); - DBUG_ASSERT(a != e); + DBUG_ASSERT(enter == size || + (len == (avail >= enter)? (avail - enter) : + (size + avail - enter))); + DBUG_ASSERT(avail != enter); - return a; + return avail; } /** @return the used index at success or -1 when queue is full @@ -946,33 +939,33 @@ ulong circular_buffer_queue::de_tail(uch ulong circular_buffer_queue::en_queue(void *item) { ulong ret; - if (a == s) + if (avail == size) { - DBUG_ASSERT(a == Q.elements); + DBUG_ASSERT(avail == Q.elements); return (ulong) -1; } // store - ret= a; - set_dynamic(&Q, (uchar*) item, a); + ret= avail; + set_dynamic(&Q, (uchar*) item, avail); // pre-boundary cond - if (e == s) - e= a; + if (enter == size) + enter= avail; - a= (a + 1) % s; + avail= (avail + 1) % size; len++; // post-boundary cond - if (a == e) - a= s; + if (avail == enter) + avail= size; - DBUG_ASSERT(a == e || - len == (a >= e) ? - (a - e) : (s + a - e)); - DBUG_ASSERT(a != e); + DBUG_ASSERT(avail == enter || + len == (avail >= enter) ? + (avail - enter) : (size + avail - enter)); + DBUG_ASSERT(avail != enter); return ret; } @@ -980,13 +973,13 @@ ulong circular_buffer_queue::en_queue(vo void* circular_buffer_queue::head_queue() { uchar *ret= NULL; - if (e == s) + if (enter == size) { DBUG_ASSERT(len == 0); } else { - get_dynamic(&Q, (uchar*) ret, e); + get_dynamic(&Q, (uchar*) ret, enter); } return (void*) ret; } @@ -1004,16 +997,16 @@ void* circular_buffer_queue::head_queue( */ bool circular_buffer_queue::gt(ulong i, ulong k) { - DBUG_ASSERT(i < s && k < s); - DBUG_ASSERT(a != e); + DBUG_ASSERT(i < size && k < size); + DBUG_ASSERT(avail != enter); - if (i >= e) - if (k >= e) + if (i >= enter) + if (k >= enter) return i > k; else return FALSE; else - if (k >= e) + if (k >= enter) return TRUE; else return i > k; @@ -1024,7 +1017,7 @@ bool Slave_committed_queue::count_done(R { ulong i, cnt= 0; - for (i= e; i != a && !empty(); i= (i + 1) % s) + for (i= enter; i != avail && !empty(); i= (i + 1) % size) { Slave_job_group *ptr_g; @@ -1068,7 +1061,7 @@ ulong Slave_committed_queue::move_queue_ { ulong i, cnt= 0; - for (i= e; i != a && !empty();) + for (i= enter; i != avail && !empty();) { Slave_worker *w_i; Slave_job_group *ptr_g, g; @@ -1087,8 +1080,12 @@ ulong Slave_committed_queue::move_queue_ The current job has not been processed or it was not even assigned, this means there is a gap. */ - if (ptr_g->worker_id == (ulong) -1 || !ptr_g->done) + if (ptr_g->worker_id == MTS_WORKER_UNDEF || !ptr_g->done) break; /* gap at i'th */ + + /* Worker-id domain guard */ + compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS); + get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id); /* @@ -1142,7 +1139,7 @@ ulong Slave_committed_queue::move_queue_ set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id); cnt++; - i= (i + 1) % s; + i= (i + 1) % size; } return cnt; @@ -1156,7 +1153,7 @@ ulong Slave_committed_queue::move_queue_ void Slave_committed_queue::free_dynamic_items() { ulong i; - for (i= e; i != a && !empty(); i= (i + 1) % s) + for (i= enter; i != avail && !empty(); i= (i + 1) % size) { Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i); if (ptr_g->group_relay_log_name) @@ -1274,3 +1271,421 @@ int wait_for_workers_to_finish(Relay_log DBUG_RETURN(ret); } + + +// returns the next available! (TODO: incompatible to circurla_buff method!!!) +static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item) +{ + if (jobs->avail == jobs->size) + { + DBUG_ASSERT(jobs->avail == jobs->Q.elements); + return -1; + } + + // store + + set_dynamic(&jobs->Q, (uchar*) item, jobs->avail); + + // pre-boundary cond + if (jobs->enter == jobs->size) + jobs->enter= jobs->avail; + + jobs->avail= (jobs->avail + 1) % jobs->size; + jobs->len++; + + // post-boundary cond + if (jobs->avail == jobs->enter) + jobs->avail= jobs->size; + DBUG_ASSERT(jobs->avail == jobs->enter || + jobs->len == (jobs->avail >= jobs->enter) ? + (jobs->avail - jobs->enter) : (jobs->size + jobs->avail - jobs->enter)); + return jobs->avail; +} + +/** + return the value of @c data member of the head of the queue. +*/ +static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) +{ + if (jobs->enter == jobs->size) + { + DBUG_ASSERT(jobs->len == 0); + ret->data= NULL; // todo: move to caller + return NULL; + } + get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter); + + DBUG_ASSERT(ret->data); // todo: move to caller + + return ret; +} + + +/** + return a job item through a struct which point is supplied via argument. +*/ +Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) +{ + if (jobs->enter == jobs->size) + { + DBUG_ASSERT(jobs->len == 0); + return NULL; + } + get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter); + jobs->len--; + + // pre boundary cond + if (jobs->avail == jobs->size) + jobs->avail= jobs->enter; + jobs->enter= (jobs->enter + 1) % jobs->size; + + // post boundary cond + if (jobs->avail == jobs->enter) + jobs->enter= jobs->size; + + DBUG_ASSERT(jobs->enter == jobs->size || + (jobs->len == (jobs->avail >= jobs->enter)? (jobs->avail - jobs->enter) : + (jobs->size + jobs->avail - jobs->enter))); + + return ret; +} + +/** + Coordinator enqueues a job item into a Worker private queue. + + @param job_item a pointer to struct carrying a reference to an event + @param worker a pointer to the assigned Worker struct + @param rli a pointer to Relay_log_info of Coordinator +*/ +void append_item_to_jobs(slave_job_item *job_item, + Slave_worker *worker, Relay_log_info *rli) +{ + THD *thd= rli->info_thd; + int ret; + ulong ev_size= ((Log_event*) (job_item->data))->data_written; + ulonglong new_pend_size; + + DBUG_ASSERT(thd == current_thd); + thd_proc_info(thd, "Feeding an event to a worker thread"); + + mysql_mutex_lock(&rli->pending_jobs_lock); + new_pend_size= rli->mts_pending_jobs_size + ev_size; + // C waits basing on *data* sizes in the queues + while (new_pend_size > rli->mts_pending_jobs_size_max) + { + const char *old_msg; + const char info_format[]= + "Waiting for Slave Workers to free pending events, requested size %lu"; + char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)]; + + sprintf(wait_info, info_format, new_pend_size); + rli->mts_wq_oversize= TRUE; + rli->wq_size_waits++; // waiting due to the total size + old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock, + wait_info); + mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); + thd->exit_cond(old_msg); + if (thd->killed) + return; + + mysql_mutex_lock(&rli->pending_jobs_lock); + + new_pend_size= rli->mts_pending_jobs_size + ev_size; + } + rli->pending_jobs++; + rli->mts_pending_jobs_size= new_pend_size; + rli->stmt_jobs++; + + mysql_mutex_unlock(&rli->pending_jobs_lock); + + /* + Sleep unless there is an underrunning Worker. + */ + if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF) + { + // todo: experiment with weight to get a good approximation formula + // The longer Sleep lasts the bigger is excessive overrun counter. + ulong nap_weight= rli->mts_wq_excess + 1; + my_sleep(nap_weight * rli->mts_coordinator_basic_nap); + rli->mts_wq_no_underrun_cnt++; + } + + ret= -1; + + mysql_mutex_lock(&worker->jobs_lock); + + // possible WQ overfill + while (worker->running_status == Slave_worker::RUNNING && !thd->killed && + (ret= en_queue(&worker->jobs, job_item)) == -1) + { + const char *old_msg; + const char info_format[]= + "Waiting for Slave Worker %d queue: max len %lu, actual len %lu"; + char wait_info[sizeof(info_format) + 4*sizeof(worker->id) + + 4*sizeof(worker->jobs.size) + 4*sizeof(worker->jobs.len)]; + + sprintf(wait_info, info_format, worker->id, worker->jobs.size, worker->jobs.len); + old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock, wait_info); + worker->jobs.overfill= TRUE; + worker->jobs.waited_overfill++; + rli->mts_wq_overfill_cnt++; + mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock); + thd->exit_cond(old_msg); + + mysql_mutex_lock(&worker->jobs_lock); + } + if (ret != -1) + { + worker->curr_jobs++; + if (worker->jobs.len == 1) + mysql_cond_signal(&worker->jobs_cond); + + mysql_mutex_unlock(&worker->jobs_lock); + } + else + { + mysql_mutex_unlock(&worker->jobs_lock); + + mysql_mutex_lock(&rli->pending_jobs_lock); + rli->pending_jobs--; // roll back of the prev incr + rli->mts_pending_jobs_size -= ev_size; + mysql_mutex_unlock(&rli->pending_jobs_lock); + } +} + + +/** + Worker's routine to wait for a new assignement through + @c append_item_to_jobs() + + @param worker a pointer to the waiting Worker struct + @param job_item a pointer to struct carrying a reference to an event + + @return NULL failure or + a-pointer to an item. +*/ +struct slave_job_item* pop_jobs_item(Slave_worker *worker, Slave_job_item *job_item) +{ + THD *thd= worker->info_thd; + + mysql_mutex_lock(&worker->jobs_lock); + + while (!job_item->data && !thd->killed && + worker->running_status == Slave_worker::RUNNING) + { + const char *old_msg; + + head_queue(&worker->jobs, job_item); + if (job_item->data == NULL) + { + worker->wq_empty_waits++; + old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock, + "Waiting for an event from sql thread"); + mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock); + thd->exit_cond(old_msg); + mysql_mutex_lock(&worker->jobs_lock); + } + } + if (job_item->data) + worker->curr_jobs--; + + mysql_mutex_unlock(&worker->jobs_lock); + + thd_proc_info(worker->info_thd, "Executing event"); + return job_item; +} + + +/** + MTS worker main routine. + The worker thread loops in waiting for an event, executing it and + fixing statistics counters. + + @param worker a pointer to the assigned Worker struct + @param rli a pointer to Relay_log_info of Coordinator + to update statistics. + + @note the function maintains worker's CGEP and modifies APH, updates + the current group item in GAQ via @c slave_worker_ends_group(). + + @return 0 success + -1 got killed or an error happened during appying +*/ +int slave_worker_exec_job(Slave_worker *worker, Relay_log_info *rli) +{ + int error= 0; + struct slave_job_item item= {NULL}, *job_item= &item; + THD *thd= worker->info_thd; + Log_event *ev= NULL; + bool part_event= FALSE; + + DBUG_ENTER("slave_worker_exec_job"); + + job_item= pop_jobs_item(worker, job_item); + if (thd->killed || worker->running_status != Slave_worker::RUNNING) + { + // de-queueing and decrement counters is in the caller's exit branch + error= -1; + goto err; + } + ev= static_cast(job_item->data); + thd->server_id = ev->server_id; + thd->set_time(); + thd->lex->current_select= 0; + if (!ev->when) + ev->when= my_time(0); + ev->thd= thd; // todo: assert because up to this point, ev->thd == 0 + + DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", worker->id, job_item, ev, thd)); + + if (ev->starts_group()) + { + worker->curr_group_seen_begin= TRUE; // The current group is started with B-event + } + else + { + if ((part_event= ev->contains_partition_info())) + { + uint num_dbs= ev->mts_number_dbs(); + DYNAMIC_ARRAY *ep= &worker->curr_group_exec_parts; + + if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS) + num_dbs= 1; + + DBUG_ASSERT(num_dbs > 0); + + for (uint k= 0; k < num_dbs; k++) + { + bool found= FALSE; + + for (uint i= 0; i < ep->elements && !found; i++) + { + found= + *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) == + ev->mts_assigned_partitions[k]; + } + if (!found) + { + /* + notice, can't assert + DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == worker); + since entry could be marked as wanted by other worker. + */ + insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]); + } + } + } + } + worker->set_future_event_relay_log_pos(ev->future_event_relay_log_pos); + error= ev->do_apply_event_worker(worker); + if (ev->ends_group() || (!worker->curr_group_seen_begin && + /* + p-events of B/T-less {p,g} group (see + legends of Log_event::get_slave_worker) + obviously can't commit. + */ + part_event)) + { + DBUG_PRINT("slave_worker_exec_job:", + (" commits GAQ index %lu, last committed %lu", + ev->mts_group_cnt, worker->last_group_done_index)); + worker->slave_worker_ends_group(ev, error); /* last done sets post exec */ + +#ifndef DBUG_OFF + worker->processed_group++; + DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group" + " %u processed %u debug %d\n", worker->id, mts_checkpoint_group, + worker->processed_group, + DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0))); + if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) && + mts_checkpoint_group == worker->processed_group) + { + DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id)); + while (true) my_sleep(6000000); + } +#endif + } + + mysql_mutex_lock(&worker->jobs_lock); + de_queue(&worker->jobs, job_item); + + /* possible overfill */ + if (worker->jobs.len == worker->jobs.size - 1 && worker->jobs.overfill == TRUE) + { + worker->jobs.overfill= FALSE; + // todo: worker->hungry_cnt++; + mysql_cond_signal(&worker->jobs_cond); + } + mysql_mutex_unlock(&worker->jobs_lock); + + /* statistics */ + + /* todo: convert to rwlock/atomic write */ + mysql_mutex_lock(&rli->pending_jobs_lock); + + rli->pending_jobs--; + rli->mts_pending_jobs_size -= ev->data_written; + DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max); + + // underrun (number of pending assignments is less than underrun level) + if ((rli->mts_worker_underrun_level * worker->jobs.size) / 100.0 > + worker->jobs.len) + { + rli->mts_wq_underrun_w_id= worker->id; + } else if (rli->mts_wq_underrun_w_id == worker->id) + { + // reset only own marking + rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF; + } + + // overrun is symmetric to underrun. In a sense it's underrun to get to 100% + if (((100 - rli->mts_worker_underrun_level) * worker->jobs.size) / 100.0 + < worker->jobs.len) + { + rli->mts_wq_excess++; + worker->wq_overrun_set= TRUE; + rli->mts_wq_overrun_cnt++; + } + else if (worker->wq_overrun_set == TRUE) + { + rli->mts_wq_excess--; + worker->wq_overrun_set= FALSE; + } + + DBUG_ASSERT(rli->mts_wq_excess >= 0); + + /* coordinator can be waiting */ + if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max && + rli->mts_wq_oversize) // TODO: unit/general test wq_oversize + { + rli->mts_wq_oversize= FALSE; + mysql_cond_signal(&rli->pending_jobs_cond); + } + + mysql_mutex_unlock(&rli->pending_jobs_lock); + + worker->stmt_jobs++; + +err: + if (error) + { + sql_print_information("Worker %lu is exiting: killed %i, error %i, " + "running_status %d", + worker->id, thd->killed, thd->is_error(), + worker->running_status); + worker->slave_worker_ends_group(ev, error); + } + + // rows_query_log_event is deleted as a part of the statement cleanup + + // todo: sync_slave_with_master fails when my_sleep(1000) is put here + + if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) + { + worker->last_event= ev; + delete ev; + } + + + DBUG_RETURN(error); +} === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h 2011-06-25 14:14:24 +0000 +++ b/sql/rpl_rli_pdb.h 2011-06-27 17:31:45 +0000 @@ -32,7 +32,7 @@ typedef struct st_db_worker_hash_entry bool init_hash_workers(ulong slave_parallel_workers); void destroy_hash_workers(Relay_log_info*); -Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli, +Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w); Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers); @@ -62,17 +62,17 @@ class circular_buffer_queue public: DYNAMIC_ARRAY Q; - ulong s; // the Size of the queue in terms of element - ulong a; // first Available index to append at (next to tail) - ulong e; // the head index - volatile ulong len; // it is also queried to compute least occupied + ulong size; // the Size of the queue in terms of element + ulong avail; // first Available index to append at (next to tail) + ulong enter; // the head index + volatile ulong len; // actual length bool inited_queue; circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) : - s(max), a(0), e(max), len(0), inited_queue(FALSE) + size(max), avail(0), enter(max), len(0), inited_queue(FALSE) { - DBUG_ASSERT(s < ULONG_MAX); - if (!my_init_dynamic_array(&Q, el_size, s, alloc_inc)) + DBUG_ASSERT(size < ULONG_MAX); + if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc)) inited_queue= TRUE; } circular_buffer_queue () : inited_queue(FALSE) {} @@ -111,9 +111,9 @@ public: bool gt(ulong i, ulong k); // comparision of ordering of two entities /* index is within the valid range */ bool in(ulong k) { return !empty() && - (e > a ? (k >= e || k < a) : (k >= e && k < a)); } - bool empty() { return e == s; } - bool full() { return a == s; } + (enter > avail ? (k >= enter || k < avail) : (k >= enter && k < avail)); } + bool empty() { return enter == size; } + bool full() { return avail == size; } }; typedef struct st_slave_job_group @@ -237,26 +237,23 @@ public: DYNAMIC_ARRAY curr_group_exec_parts; // CGEP bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec - // @c last_group_done_index is for recovery, although can be viewed - // as statistics as well. - // C marks a T-event with the incremented group_cnt that is - // an index in GAQ; W stores it at the event execution. - // C polls the value periodically to maintain an array - // of the indexes in order to progress on GAQ's lwm, see @c next_event(). - // see @c Log_event::group_cnt. - volatile ulong last_group_done_index; // it's index in GAQ - List data_in_use; // events are still in use by SQL thread ulong id; TABLE *current_table; // rbr - RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */ - uint tables_to_lock_count; /* RBR: Count of tables to lock */ - table_mapping m_table_map; /* RBR: Mapping table-id to table */ + // RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */ + // uint tables_to_lock_count; /* RBR: Count of tables to lock */ + // table_mapping m_table_map; /* RBR: Mapping table-id to table */ // statictics - ulong wait_jobs; // to gather statistics how many times got idle + + /* + @c last_group_done_index is for statistics + to mean the index in GAQ of the last processed group. + */ + volatile ulong last_group_done_index; // it's index in GAQ + ulong wq_empty_waits; // to gather statistics how many times got idle ulong stmt_jobs; // how many jobs per stmt ulong trans_jobs; // how many jobs per trns volatile int curr_jobs; // the current assignments === modified file 'sql/rpl_slave.cc' --- a/sql/rpl_slave.cc 2011-06-27 12:12:52 +0000 +++ b/sql/rpl_slave.cc 2011-06-27 17:31:45 +0000 @@ -3830,7 +3830,7 @@ pthread_handler_t handle_slave_worker(vo "events processed = %lu " "hungry waits = %lu " "priv queue overfills = %llu " - ,w->id, w->stmt_jobs, w->wait_jobs, w->jobs.waited_overfill); + ,w->id, w->stmt_jobs, w->wq_size_waits, w->jobs.waited_overfill); mysql_cond_signal(&w->jobs_cond); // famous last goodbye mysql_mutex_unlock(&w->jobs_lock); @@ -4249,22 +4249,22 @@ int slave_start_single_worker(Relay_log_ w->bitmap_shifted= 0; w->workers= rli->workers; // shallow copying is sufficient w->this_worker= w; - w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0; + w->wq_size_waits= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0; w->id= i; w->current_table= NULL; w->usage_partition= 0; - w->last_group_done_index= rli->gaq->s; // out of range + w->last_group_done_index= rli->gaq->size; // out of range - w->jobs.a= 0; + w->jobs.avail= 0; w->jobs.len= 0; w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor w->jobs.waited_overfill= 0; - w->jobs.e= w->jobs.s= rli->mts_slave_worker_queue_len_max; - my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); - for (k= 0; k < w->jobs.s; k++) + w->jobs.enter= w->jobs.size= rli->mts_slave_worker_queue_len_max; + my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.size, 0); + for (k= 0; k < w->jobs.size; k++) insert_dynamic(&w->jobs.Q, (uchar*) &empty); - DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s); + DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size); w->wq_overrun_set= FALSE; set_dynamic(&rli->workers, (uchar*) &w, i); @@ -4338,9 +4338,10 @@ int slave_start_workers(Relay_log_info * rli->mts_pending_jobs_size= 0; rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max; - rli->mts_wqs_underrun_w_id= (ulong) -1; - rli->mts_wqs_overrun= 0; - rli->mts_wqs_oversize= FALSE; + rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF; + rli->mts_wq_excess= 0; + rli->mts_wq_overrun_cnt= 0; + rli->mts_wq_oversize= FALSE; rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap; rli->mts_worker_underrun_level= mts_worker_underrun_level; rli->mts_total_groups= 0; @@ -4470,7 +4471,7 @@ void slave_stop_workers(Relay_log_info * w->end_info(); - DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s); + DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size); delete_dynamic(&w->jobs.Q); delete_dynamic_element(&rli->workers, i); delete w; @@ -4478,10 +4479,13 @@ void slave_stop_workers(Relay_log_info * sql_print_information("MTS coordinator statistics: " "events processed = %lu " - "waits due a Worker queue full = %lu " - "waits due the total size = %lu " - "sleeps when Workers occupied = %lu " - ,rli->stmt_jobs, rli->mts_wqs_overfill_cnt, rli->wait_jobs, rli->mts_wqs_underrun_cnt); + "Worker queues filled over overrun level = %lu " + "waited due a Worker queue full = %lu " + "waited due the total size = %lu " + "sleept when Workers occupied = %lu ", + rli->stmt_jobs, rli->mts_wq_overrun_cnt, + rli->mts_wq_overfill_cnt, rli->wq_size_waits, + rli->mts_wq_no_underrun_cnt); DBUG_ASSERT(rli->pending_jobs == 0); DBUG_ASSERT(rli->mts_pending_jobs_size == 0); === modified file 'sql/rpl_slave.h' --- a/sql/rpl_slave.h 2011-06-09 15:27:47 +0000 +++ b/sql/rpl_slave.h 2011-06-27 17:31:45 +0000 @@ -55,6 +55,9 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_S #define MAX_SLAVE_ERROR 2000 +#define MTS_WORKER_UNDEF ((ulong) -1) +#define MTS_MAX_WORKERS 1024 + // Forward declarations class Relay_log_info; class Master_info; === modified file 'sql/sys_vars.cc' --- a/sql/sys_vars.cc 2011-06-25 14:14:24 +0000 +++ b/sql/sys_vars.cc 2011-06-27 17:31:45 +0000 @@ -3361,7 +3361,7 @@ static Sys_var_ulong Sys_slave_parallel_ "slave_parallel_workers", "Number of worker threads for executing events in parallel ", GLOBAL_VAR(opt_mts_slave_parallel_workers), CMD_LINE(REQUIRED_ARG), - VALID_RANGE(0, 1024), DEFAULT(0), BLOCK_SIZE(1)); + VALID_RANGE(0, MTS_MAX_WORKERS), DEFAULT(0), BLOCK_SIZE(1)); static Sys_var_ulonglong Sys_mts_pending_jobs_size_max( "slave_pending_jobs_size_max", --===============0566371524== MIME-Version: 1.0 Content-Type: text/bzr-bundle; charset="us-ascii"; name="bzr/andrei.elkin@stripped" Content-Transfer-Encoding: 7bit Content-Disposition: inline # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: andrei.elkin@stripped # target_branch: file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-\ # mr-wl5569/ # testament_sha1: a2feefefc8b105c3b1b9056d1bb51b4390b690c9 # timestamp: 2011-06-27 20:31:51 +0300 # source_branch: file:///home/andrei/MySQL/BZR/2a-23May/mysql-trunk/ # base_revision_id: alfranio.correia@stripped\ # u4ejo1tmrjkd6zmi # # Begin bundle IyBCYXphYXIgcmV2aXNpb24gYnVuZGxlIHY0CiMKQlpoOTFBWSZTWVJCnloAF8L/gH/dh7h7//// /+///r////5gLh87JNmbd8+L773ee74feuA963TVG2Baffe9vvu877ve5973ntd5fR32K1tG+3ud u3VrRYaznfect8V75tUyyVKVE6NCYw1ffd0Ojeaa61OhVsyEq0qabZrYzt3GRAq1tX25OtVV59vU nq3jMC6NAszT4YkIAAmIA0kT0yT0mpk21HqbU2lPKGnqDDU00aM1PUaepgaBoCaAgIIAVNhqnoNQ aaNMg9TIAAAANAAAaZBCCJqGmQymTQ9IaZDTQGgaAAA0GgAAAEmlEQmE0Cnopk9NQ9TaTNIZNNGg ZDTQAAAAAABEkgICNNDJGgymU8U8TEp+pk9FPCmRjUGTBqYJp6QDZTQSJAgRkmTRiAmRoJ6jIU/S noJo9QNDT0mgyabUNAAG5xHpXsIEJfnaADyaAD1f/r5fhc6+cnL8bA3XeWD99A88oB/52UlJmV2u SJsbUNWlgNjxu+GPw+DG8OxHlabmGiWiZ83XZeRI/Db2YEPO5+6RR65g2I/V4nD8frkpFfomFA/N 8EHygJBIFL1Wf7xz3trV1Sil2Z7ft2QSC+7QHGe/0/mX9nCIPKFoUHQe6pYhDhEWLjespWjRKiJc IICEwI5FKffiiOUUqDsTxJCKtEDo8pxkYB91AOx3z6w4LHqZ4pC6Y+koPRUZbA9ntNOUbhKIc1Mf JhB9Aywu2qmqn9w7RrHLXvKEi8tHNEYQAZCySvdwaAdjBys4SfZrw4pkFGIR8rPTnud0IRqEKF3f uaZ2ZFjAXH0c8gube1uI0ZUM4ecpDZZqMpdZLN6PSBgZOExTs7OD6vNrrGtpZcety+OY2pIW4UXV apv4ImGEUoNwUISYbpdwSwM8rbHM02Y2v9Ji79Xrn3TSoVUW2OrBGsIJA0nUiBBFKdUwnhFJPeRp HsocYakQ7mEY0tei53Izc4jbW7O2F898pe7XDDOBIsWBvaDU9j82hFHNhi6tCOm7CSFk9heyxJF9 2iaE53PWpCEuNXMWo1Iva6qpmxrGImVDaKUkxuoiQoaK5RIkkodt4m8LZ9X5DGydWfCDCZ7i4ePM maKLfM/GeXt6b2/qtZz20bIuuIotRiQEVjBUiLgxEOuCLr+2koyAv+C2SLIiMZAUFCRECG3/VPw/ xsmidm/AuICz2sBUFFUEgIIxRjFPaGgKHF+SyGBJIopPIySokgMiQiIHb49k08/I23B0hwh9r6ZD jmAsmraiwBjZHpFXQY+/M+d4isMPo6dvgqp5p7LgkB3hBYuUTZc/yfXP4k0QhgNPFYwYzpIITXOS grlyltMhWNlH6PnSaegKogKffzvnz9uvvUjz9vcc3TGZj+mEyLSGyWPcP1/6Nzu0StXiardc6Fum Gz78xE9WnumcMcixsnqICgiipFix9Ihr3kNxBAUgskFVVIsBVFkk29Z7ekuLIfD6oQbUCR0o/r6X eDpTdXcgK3/PvdCmgmFYk+Df1CqWapCvJnoHohSNDoZQB72qi5gLKsMsB4Oj6abERUle9bvQE1GT 20wnFyTa+HGDRoi5wWtezo2ItdXgkomcABaSTk4ThAScWRIvdhIcZs8WixubEZFqIwcy5mELXwhM mrK04CGaQRFwMZOCRJeBaDEGckLETdWhUkMppySXSkucCVekXseEF7iIwd9oBsikHAXHxMn8JEpH z8J9p5F2YNsT1/JonoXKB5FUiquRxr63LT0dd+OIcsEah8vczNiV2V9PvU0/DPU++0dxBL5pmN/u PGkgCjGUJr5AScrWkovpIPHueZvSMdoNRFvGimANMZLuag58VCqbe+VV1QDMKtpBwIclQS3EKRuC uqnNE2Mfmvzoka2Tk/coQcVCmtaayidaaaiQZw90F67NvE6qFwDNgiwHptT5WN8sjym8qz7LSEkG RrjKH6zb4kjt7aezrqcUA5qL8JAbS/64nDS16y9ot+1uwGFcU6hbXe6DAmHGOGwoQZjc5FaYfWtZ 4aR0HQ8BSKeRwtLSdDk/pMVAl0cqs+vKO3mksxC9gyIb3x778b6089sbjZCA0MGhK4UStuuy3xjx KPLRPhVW/FCdA5lIoMgkBgtltxxY6N6yQ0cw+G76U6WEukNMCUkDrtdyYVobDVVhS+IIbZNGPol1 ZtJUSFS2Ohi9TpwFgjc3pUSx42XQV3k0BuqytIwz5SbLooWYHRb1CifIp2nk9xzaJEZy0jTo1lrH dBF1vpDi1QsCVWMvtFvM7fRItQSLL6KeqXurJD6O0tBsDzbxwOU9p64LBq5ZWihvrtAoPP5NY9FH Oxu4UKsT7z2Of+xYUTXhKg6x53DkI28PEMj3sYgkHhg4oHyoLx9vgkdHBSZ2bxlhv2XSQfheFhR5 ukc+WY77T7LeindvOmRwkmy4rve1C8j0Wg3XN6i/hLhsd0jltScwWHrhLnG4gd4ESU30Oiits6Mb h2h7FcQ0oSK4xmD5VM0HDtUMF0NZmRhFUJb13k6uwOsdloDSz6karl0jVcSJb5wbWabcwQ5hVvHK ubwCHe7obGYMSGtukde2bXpfL2YlaRlua+xqoyqHEHA7L8ghg7DXDYpcTqC/+CGNFuhI7zDs4HBl iXzOXrc6sTvz1bXiz9dnaaPLId+QUCEm1WNwsHOEC+1nBJLawcsHahatRwQYz2OOaNXe3GF4yBhZ IxXB8LX9y9WTssGolfjDFXkSW6dW59YBBBhu63sV58CMXLyqgy0oEuYuLy0OyavlDTZAKnWvYvbn FmKXNOKJe7pE0kOGfTTSeTwVjQMoXzKpAxiSaxl5KONOKCRhhNQhVBv2LFewyCheXXb1GGPikSKA YMGRbrpEImuJs6cpGKrlpp4bNmQ6jBingiKaA0uOa0Fhrn5Ro3Zb199pDXdVF00JVdxM0Q2F8m6L VLRkrgEcG9l2wODdFott2MrMb2k1ukII8lPQ5gwPjSMrI6BpjQqy+wkgP1DY0+PCQfl8fi+DxrsX e6lPjyUNCWnscNMHlFLxDsRM+gEVt350I53h2An7fgxftD7R2LPgCBILMZgh5h9LfPGQWCKCJgj+ uWbqQIfB7fpD1x580D7Yj6VJEgBoSEkFFUkVYnl73nb9P2bjtNYeVl9U+uf1Txm5IbU7vURPkThw 4cgLZn7Zl4T0pWncPdcJJISRLlPV/8jVeZTlnefRNePyWMga5QQBb7WYSJjDw30l+N6x7NleVlQo h9ROGSyLLTz8EuDn0btNqavfNnMcR8hRgpswVqkOFMzS4wM+WqoIEVw0NnkkdfpHD4troERpC5Mk uvCHc93Kb4WaTiPkfVQZny+SqtTRQfpb6/v7eJ6SNXTYD5PI27a3D/AzQKPQZhda+C3aQqBYeY4X GXKzHYwnXnTFNF5OgmNKV6XhQeFclf0CRnJBE5/CERgEvhikg0cLuaC/DP/FMN5OvSx9+WhSei+j hvnE/5uWREY0E7DGKy4N1GgSnPDIou2e7x2HQDEN4BNcrRRR9WXHGkvG93w/X+/+Fl9A50obYB+A IiXrW47ubgdbvO6FA7CVVLjUdJRvfT5Yl3ICiv/GR8vsDdATf1Qz7u4RC1kLafFCr1TYaIWvfLd5 xG4S9wR1VFZRAyl4uyQPdNc88QS/eYUhz6ORUm6afS7SIT+6gJO4y5fx/090tbBta3sfpY3t5YbP Bg08EQ/G391DxP3tD+VNJsD88UO7sT9Hw7E6S/uzwcazBOoa7Al+ld7DOlmIMIxYhCwxTGpM6zRV gE0ilKImZMTYOROkOpIdoPjFAoUPtWXZtUqteOkAH4wfVXzJClXCiajNzUnnM0n0eQGOw47d+fyx eGqUFw+FAVCAnuMt6kN6VHAdo4/BMReEX2ko59VElUSSZsng+Zr53meLJ0l2T2w6ShyFfT3wpGSF 3kwDKz0YEQOUT0hgo33FTHEKbNmVVrkM4JmQ0/IiOUdYAB1sOvr8PX/p/1eaMjsBcx+pF6KUqALQ 9DhH9bGWNGPw/Ta0R+5z+TiJ8nH3i0JJGVFlHYZfp9fp+cnnHlYsCGxe9lWKzR9DKsRVpE6/XL6G hYMk2NTLAUNEwwNEm92bt+/2OnjsX3ogrrMGHfg4crlF6NJeMDMyYyREiT148dOm7ZqKaB0EMEEK DVr12J1JmMslcRQdj2EUaC7FvS+bbjMeNs25QxltWyFQFDEBgEXeqiWzZC4d1nfKQmxRa6aIQTUg LRAUrfZS2JFFBmaDLq7qmmH0ZL35XJmmv8CkgFMYLJxqrPAtKucU1Uvm1Vpgb2Xs3XLqpNNhIW9o Ie+ApJBk7jaCF56olP9RznC+5vMkQ4Nmmm+zc+cgWwO70lxlHDcgllLAtMuN6kMPPRk0FDxZhtYV rteFuYD+rIzwxefEFJTyUCwUsgFUkBy3Y4ljTmWe5yAvwHUuWcBXmg2yKGJE78AFjeX9KvvNmA9n VeuQ9DC/6aZ10NLZa0G6xjOY3ziJ4HOIuLxTqYEAwLcdPYQ3vgiLhyZdroQzlLJMznNE1PETk2Mi EYVi78FRCViQ1U7sLErxeCrgpzeo4FFdRZOgoyLirR5M2oZOFXscIJlBuPA0dd7liRE1CTbO0ghX DDIkZqs5kyOAyDr2rdrnTJAinrIm4nfUJIjbIaWGbyogzHPrxlMgTDYfSJRgZmJ8rsNvhwL9BRym Rpfa8797osPaA3IXD/R3DvdNufIAZncHqcxJlUyFjPmp6pAQSdxSsLGxGVqVrqOHCBBUyuRF0pmA CtEn74GGEpmWAiBNPWDKBUpSC3vJRRDBsY1LYYoTxLbUwjeVIF8DDSo5ZjlKdLRrLnNQQTQWNehB I0OBqQb16l1oOPguxOPDkNYNxYrBIDXVfg+1XKGCEJnIR65JIQlgyUcstGESnEF8ViVghSIF4F1h msHKYh1Rn7xkTlAMEQECBHJAeoIQ0BGQgqZyyihqkWliFVUgCnxpEy3DITmXFxMwgC6WlbziWztV 0T7IhymXGJPG8td3ayR69Z3H2s+dmj1wkc1V0klKZqOY9iuF4kQgHGQVDcROP2EDbnyHlMZrGjXt RNz9mi8Q2vehRfa9SsXhqADHKQbAHdxxxiaa6sxDaIkYdASni4bHb2zmcvHBC1ykK8iA8wXLpE5h QUB+oqBxOhXIGEBMPsGkuwz5TTjWyF6dXuFJn3HmvkVI1MXDltMpDdGcWeWkUENp79pGsRMzDosq 9BiTWeWyoUDQKhCDxlznvHOzwl4AMRccD3WdkRY4aCkMBbLbyglEVIhpWj5bFz3RsoXzVz3mJYtT 5K9w0AN8ALoy0ywfURiGHeCiiod5r1NGTPRwH8pd7yigUu5OdxrG97mzGxdbnKVIqVTvyBXFGVLl eaQYmUbw7EZmGOcpqMRAY0moiU2OqamwlhZBROfJSmidrUd0VnuggRdUwKXiwYCrUaxXUgllhYBY XT4GRWdKBYwkbqKC/cdBeX3Xq5+5MqXGJKpxJ3ehXxtsmSgNplpRSEZ+eO/IjDfLjcCxl0WPDMwE Y2CoIS30c1At2WCc8p4neYjpNxwjxM8UzDTpa3U5/JTjoXhCwpm3IqNNhN4k8yqVVisMRlBcuRMI kRyJzi9ZHIq8itd0cK+t6BuoOTRMXXoCCLsUgays0KEGeYDBhiB4lkMC47xsJjqABTfMtJoIVTda Sab6CFlAlAgtDBK4ohGFgIpObbmYahsCq6vmLqDLiYSrdEixgTq1RX/KQE2WQW78yhlkYaAoMVEY BmzKQFqko5iMrRPIQVB4WpIxUUmKYEYzLCxwiFWIV4mAlFMol47OIuOWyaR5i853Sn4/v5rnikKI BGCqGzK1iX2mkDHVDILtgJ4IxO1w1bL1BcMLC8oKdNaZhDMIZANAYLLK8z3O3vcWLUzDB2aZ1pVV bSZF2DQ+xT6m7g6zj2Ni4feTDn++kCXTU7ae2aVd0flGf4tSHe1QZ6WOCC66JsyrTM8jQ+R07Q1u y4d9hcUWKb8WCzodEHq5+X7OnJmt0USVlb30K1t3EtkkKH62I00rrBJlmFGDtxFr6nnJ/Mc3Z6+l xba1L1CCuZ7rS8ozwW60CCJum7obegiHbP5ZrtMw4Rp3byBwcGm6m2rtMXDnuau/zkPnSQM254qd ayuV9mdmIplxdhCMJFrXx0JbWVS2IsWIcAiTZEfZGyqszMoEICVLSrZduvRilQAUFEH4da+BUuhs spiJd/j6N4URFGcSGzExCqisAnID9MVDs/xCoMD/KKb1E0wFRo77H4onLx2SQCINgfzff+z3lh+z 48A/YP7gXzv2GX2Lj8mUkiebiEJAIMhl/O7B/aqHr57hv+evs69JwzA/TNOqYDcz9tqjPlWoiL9r 8Vxls75B57C+7Rf0/b+LixIydOgHb1SJj3f4OSjle2U44gVRxy6erkVZoMIae+opbrcZ7kxP/W7Q SVgnAusCo7/TZIIzaAxDoaKDvCpjBKiyXmQNZXHjRaAIJ7pi6BKQtcMPcYc6mkJjtngHoJWMWIlR AfZ9tKnDi6cOdvbgvXID/uxW828DZQdLs6cbukW6zJjlmE7w4yswY/9P22x/oYzCVLSw5KWE1HIO cS1XeGGvMV3LxaV43kiXGmGqFE3LvUdnQ6VCdLDFo0i8OxhiEQz/qRIPU8XyDVrSFGVwerbdtS50 R/lA76DuwSxthIn75rNa/m3ghihfq1qYPDf9esCzvMgmIspt3gqRZFc9yBZZz4e+Oo+cdXlHvBes cO5yEg9B4sBqCORsQeQu6+0vOshxDRN7W+IkTGIkW/2Pt6Dh2e+HdThE4vH0+RivilEhni5uKF4z j+m/IfnKjk2HIT6i9/Zn9DfEBqxqsgsF2gqyB2bLkdylDFOUoGw+4eAhw2tJBUQxYGzGQsAKzKhJ rIwt/aggiBqMQzQ1mRr0UVRla0qSwZkQwYJBdL+RC8UyNQt5t1SnGDZDmAMzXrdoacCghpLCJKn8 qRsSmIuL6lphjjMhxjrWZoW1LBE24OWJjoCCaHMY5T6SQtRcJTyDePO6igU1rNGM8rt0xQK+m0Hd hdcYG4VbBzLULYlNVNgtFliDZmNBgDTLQ2qNLF3LNiA3lYHP5AJEPZan6/UUHvsQuT68zuncFPqN hbkP3SP0fkX4bH6ixlX9JcbyhQ3COeCxl+k/NOg0jzauNDcqcsu4SDkUM+ppPYQSHTQgxGTwRckV QvDYPEAmyJsgBAubEhL/Goeximtu7Dow+4uImsDiIzsZlfUETdpR4O4HVBsqJQMA0d4yMAJM7VYM CgPlhoz1H0HxqQSvpyENXNNYGjdtiwp22+i3zQt6VvXxwDqOd74tcMgsqTIc3pcS9NkGd0OjcZ7t ZWYbpbvlxqif3UFJoqsDNDlFkoISyQqYmcSlbFw5ax7nXQRtwiqsihbxKziblKJX1EdNhEC7aLX+ IGuaAhQFgkSspQUFWP1/Vo44HYhCeLHjnOTMohPp+z0rf4dPpmTSIP1fcVlWkzvO0/uXtJfIUPuK HEkgmvMg3VzMWITSzOfxeB37C45sewslctcdIGXipSa1nNo2MA1qxQ/O15e+B9HxVp/JcYtoEMJI oOLteoKw9bf1tg82WPhVB/+3SRkgXU5I4zA9kkl7OB0FbjEkdp1k183PU2l53mBwBlrdpsw70WOY Ohagh+3ofOSkE2dDAb5+HGl4xlqmw3HtQLp7GbFdQ8DM9TMjgX9SLFKyPqQZ4JSGSNZkWxMGoeaX tJijkbOZfKdivWa3X3rMkThKGqFAWFC7reHS5mpW+TWKES6OF+2yYImdHEuSg88C+GYkqDgcHoep 0XQnIJywKYOZYnhnhNu+8ScILsk7NwWKu9B1t8yLQyXTBjzFnBwJoMhgQdNNJ7+Zof6zVVTYk2aG mkyxdDa2oiSzDomZJ6xDId9Dh19VDqL1tPMhdIekj0jkxZgvvI1niYrnYswUyZXyWo2hgnNQoGxo RuvCawQQihthzQo3GIeo6igcjkw/NEL5xdjEeU5C8bWJC6zE9yjvXYoXmub+R+Pga3gQjFbiApww z/WqQz1vGsxBaI811JvU18qHC4050I+itGKHTQScg4Aqz5WgTwVxYMBzLhEe4BTqXY/N1j21r6yc HdnYzTzZNlaM+RlX+yzMM8vZeGcuhhRQ6M6N5NAYSoBbPpo6IMNQ9VM2NSYELPKoNxogLWGV1zet MA82Y6YQVkkGrIDkyWYMMmsMg0CdEdZiYYVlrcVjWFYSJzWdtRILF7QpNSiEmPFuY5mWOOuqv1Jg KEWCa9QnQdhy88DyG8a8p40R7TuKndP3bp+zPulqbj9cTVS4zL7HA6ylPUM0OnkAdTATEYBcwyai XVxXI5AjqFIFrQsZmKYthlhKYrALE9/v1l/ZrxIga6DYU/h16xEyYfP2EEC9G7nJ4gP93UQPDqNM EaeFGb5Skzr1BP4UI4SBCWcQuOXBm4fV7jCGc05HOVhqMt4EkmNWugabtQc9Xad3w4k/67Y3+Ox4 mgGnMgsYLymlQ4KYJbGI+Zd+iOgwZoel3+6J6ssT99AxaTt+0Gg0hUMKBdwARP0EpS5k5JkoJkiQ FD7l5lTpYkSEg7cCrchWJkvZUQ9RAXynLpbWxmG4eE5Mio9AI91kwikw0SBSCI3PR7IgkrbAYiiM RIoosH10TtspVU1OUmNg2Ng2XAvEDRLAr9OorTVCzOtTtZEkV6u24Lcd9mTlgnTnKPkNDZkFMhEp ZdqQqgLIoREEJA8DjFnh68ejlqRSQDTSGmEiWy4aAz1GKmXL+vG7Ne3nygd/fV+AnDbBiiKIIMnS bmKYA4Cjmh0gzyunvrfNIPv0IFQeARQfyA+UCN+uc0HgTITdIMhl6E3YQy8HTKeH3rXEgjGxRQkj W7MyN2qnCT1uWShpFM+e0KCE/VX7f43FoeTw0+Ew5wfPfatrCpQYUG0gpBou8Xo62Cbt6mUOgAzQ yYk0w0R7WGbgiZ9x6pFEi9sF1xFvI/dEL/hDy7fRRmh7DznxO5MwFMoSPaSMZ0Pq+k2i9IeF+0S1 jLzxsvDJTmL7HjQyv+gT4fOJBMxPCpDNYJbvzwvc4oaNL3lXJPl/bSnaEhovsHiP969LrE5my88c obmGjxbfe1h0xTmihoJIiyMi8GGQZtHPpavGsQTJiFwjAiYaQ8lHtcLzSA8D7L5/LcamGz3RGqOb IGssjAnMQdiCqMJEHMxRGLBM+SmxkaJGfE23dqFz/Po8vi32qFSmoOPiwc2ATkg2PyVOMikCRCyk kmZyD+PlE2AHEZDIwFEQiqAoDYQwdz4Nv54OwIYZgZjjKzqr3vtKdVyYFq1287ZxY7nqfLh3PG9h Z63yIiVRs0IXt0JH+wggFyiaHM1KhvdlVI2FhGZ1/WvPaZXP313S+AJ19dVI7EW4yEdnkwb+c4L8 jR1qYTaUmIXNqtvdaGYbeWu3aW2cuj08akkFkAh6ZSQh8d6My4cXlENZ7n5Xzqp2wA7gbhJyyBSr zaaByCRwC/zxNcA/Hy+9HvzKOUJGQLWUdrzJIHDdvxDBHlq8jCEY7Q1udcNeCuHGnsLB1hbdq+/k Heztkc3LS6VfAZc5MIvjjzwx0jhtKSupq79drzUDqIWSOGhzbzlMJfO/kZ7/FkcoiXQ07QkSkMDq 9uWXg2Ccj0wSQiMU6T21wdpuNaB3yAuzn24IUTOnAR1GqZFo6DhKSaggjSiZAt+ZAW0gYEkfgi1J Eh8dNQ19amK6NhmR1ZXxYF4CnI068k9y32wJuKOsIX3bAIcUBngdL6n3bec9TyAeNNzvCEXoqoyq 93YFWlw09xpOLdewgED0FJSNHhs2IQTARwAVRBFEURWKIjCdnKd0+ZPUnwjbDcFChREs9b5Obg34 Y/obJ76dDqQgKh7XHlFnMJOqE0Lz3RwzB6m2TwYoS2rIHoVyog0zTfmfW8lmuzYrKjg0kMMD1roG oE+jujvQsFh5KzyUHVgKGFBgwySRkhE2hO3TqZ9KBk6u2dp30qu44geQ5gDFFWIyIMQMQm0z5kh5 /EPiCQUeVqCPEkCCtASvdIRzdyGnURPt33asB2vZuNwfGXc9BIl1u8ObmF6J+yMi0xl3BdgjIEkb 8T6eJS8ZH498hOGNovZqYwdjJMAZzTadW0SnTzVGKxZIeJK49/hPdtlzCZ6ILDpZK9F6PPnLIei7 Hfh+LM+VHMwDN3GduV6zVWmSeZ6ci8COJTCo2y0kYQEnzfTny8hkj0w68Dga8ntZ9X5iUgGN9aCj iT7xlr02vPXiXkA2IZciqVhXl6uayWqgp1OD5VJmZt+i6XKfhS8xSyYDrG7F/A8UPu5OhOSDe8Ih laBsIw1QbbFJRkhpaW3U+D1JhtQs2XR2HjfmsUyCyERFREYinqMKx2vsu4A+QT2cTY8y+4xcLK1E R4nT1CPUPoYMHEyWAyzsMBW9qEICmKEywBgSCYBEQHEdU2MA0STRerNsP6WMMUIYSp77lYtMpQlK NF6FiNMETn3KnMWNYjkkXon3zuQqFoqYqalrYZFQoGqChSYYGYSzJcSyMwGfFZA0ANCSerpISmqG ZRURSbuIZeRoWGjMZzpxnGrOtvXpIBuZIsESCIyIIxSSDGAhoK1SEz7ryAOPZlhSZCZXJc0BWLY+ raXjsDX/eePZrxQrxO5DXqaopNjUL6JVpDBgnmYFEioinFCioLMIgttGs0ZJnMLHMpBYeTUw65gG JBkA1ZFkLRKgIPfpiBlI6WQPSnccz4aWG7ADQ5k0dADccQ94gxA6YPXBKjCNWKAYsmImvypb0vaA 3IYeRcoYMg2CB1hQ+gzQDlIKofBbiQLqQpRPl5odTrzQwO6ZIDxA9Oj85IRSBEcvmD1gGtNE7zwv SGnrfU+PzTmw0dOZp9dyTnRCelrqMnpsPxQNMe+HLB9sFKsI5VEPudKpfrbygMEDuegmmu+aIOPA PVruhymTsJ7maCzOcWuBLBWecUMqRhdkhbuB2P5ZIhEkWEJIgKBAVQUgMV0JOATZpqpTtT1Yk2Hj QZrWUfnpmrzHINgc0kqCSLtxiec1omSX3vMRDbgqMBSXOg8E79Sw3fE8ST5vlsWpSFoWjYhJ541J /CZeaXD1OvsBfod/Rr68k16qoTQchowFIy5+1BBY+MCGCZ/y/dWGExF2aWUpT1pynY3LiffZAaEt bftx9iHW1p05HJIDTlSaxPemu5oNLymZNHtZvFexsqgkiEYZggLihfAyyXStU4BKXqw9pSkzhbjP tPIaSN5Dqex6nvKXkUCnxPQCmpngdSGGwN2+NAyMfDeTkN/cFiqCR4jGMgUURIiC0sJUKwKwUiwK MCsWT01SoqRjYYmTEENcS+Bu9neQbBlAqLRJBmRTxgGIXtLG5oLwR00WByJHMIsKpQ9Y7lK7tU7K aD5FSJ4j2yIlNqRErVdTwdRoL8z13HpBbxKNwR8M3hFyL01KIhPHmcx3Pi3ll9Gwr+vHlEwVgew9 h0dOJDkp7HutbZx5pus9GkO81XWnW5vUCoRvz1nO6pSd2MRe1jgsFGl/zRadrTJaI+8EumKwRNKP InieZCraxCoV95Ctym1z607zWm/zgmZ4ySGkzNJ/zEx7vufFltlzivitW2uKXY1ddVvwgPKbR6nd aCWW1gUhAIRDtIUMYuLZM9ZDqADzdDNTf63Xszt3BcXGClqQhgTyOmPX6vO2C9DfvXh0JCfAf/jy n9Pa4jYebSzIqthGJlwC6NBEEHKJxM61a9XFC0kS+Kp7FH7FfPd1eqoEsWaEVGfDYdmVMYgaGUyy se8T9M50rwzqdZ2t0Tpzma5qJEV9NwySSnVu61111CYxI518wvlX31ntxMM3eYwYy0ZKcaRASHVQ rnMQHuxWNTBmJIX2MUh1ZlSDwOZYQKF6Rx+qO8tvB9FmdGkFlAw9voWNmcKKXabZX1sPFwMqejvw wo+5BfMDRq8RXEpaakkvxMmzjrKyGtMbFOmbTkdlMJlcifGQLDXWVNMttuMOLBLWmcYzTOYzDIGD 3z44XvCehdOVgRgabFDNBzVC1Ea6De8HNhhNzQYCec+dbbbbbbbcTrHjA8JmAkhl8BAO+PhKVpae icFuKuu5Ndr4XhP8GyFGJq5pv9hSnOKtRlkYIVwVHmhRe0r3VnNhFXhHFa5q5rl3619x70EneIOv VdPoQ+4dRDkwqlYDsypK5u6YjbZ3IBeX34TgmVBYISEMUVbEVDOCFdEkyRnbOI8Grh8ZlZBsXFhy 5DCYQIhEoUFkaMMCzCKTbcHW+R8z+HFNDAwqtkKSPJrb9LeuyiNrYNDBLhngWO0EvC+1JJiBUDea Nia72TDmLCFZ5rG8pSDcwlPq9K1SvK7r8ZFB6VnEiU0hEGYtZcWAKlx5xJIbBBOISC6oVclctbke n0eGUqgFAROx6PZigwEMqB4eEYkQvE+EfUTIRRPjZkIX2NJBx6Mt0iZRyWyEYeaC1mDN4i+SPDUF rl6ya3rbeO4nHCkA6dFaUpI72nEENSs4ZKIWcCNSNO9fajJLZrgSMYLywThAsRWT+cNv07xLJ8E5 5tU3km6YOz4qFPgOx6tdjGBmzuZO5EbSlbffYtmU0wk3ygzuW6CyChcDg3hNuRo7LNwhYmsfU2dn TB7A3XXJ80ZE1vkdTimpDse6Tee/+TpGuhDU8zaBRTKW0T4P4wKYqe1Lp7Q+OB699jWcl4dcF5oZ e4L0HgxZO7t5JG/ckuRpAxTrRhsIUt43AXuNBIkvoEpIrSbOHIXFrd7gXks7tuFkSI6KDKG1ov3V ohjVwBZsF/8snlQ+Ie+9Sp1jAMVF6puHnTuZNxrsUUVSg1zSZJvig87Eu6CCOiLcT33nbizhaI29 ugpMTi3uJLv0fH3SwtzebOI73mG9cCmfNMiHxHwFDeBDPgxtad5zsPsGg8HuZ0rSESGXHy84C8uN jJWRZLGAgE+r14+d7HpTQyz/qVNJS9TXQay5kBo/1ufwbMaDs0wYOgJ8vhjwZCjzzjBINjYWUbJU 5h33wPDzRtVNRkh2yvhqlLDBH8sa+hyqy9/0Pzv5vL532uOKEAcvc7/BHx80KWpRKtXba6xC3qlt KhC/FirKT9xFUJuAqcNtD37xYnBIaNAGgHIf1ac3A0oPknqHmbqerHSmvlf67KC5JvejnbOx3onY +92nnG5P/BIpTEjEuuKBgUd3yfoLuSKcKEgpIU8tAA== --===============0566371524==--