From: Andrei Elkin Date: June 27 2011 5:13pm Subject: bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3322) WL#5569 List-Archive: http://lists.mysql.com/commits/139929 Message-Id: <201106271713.p5RHDgY7022033@mysql1000.dsl.inet.fi> MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="===============1731202208==" --===============1731202208== MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Content-Disposition: inline #At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped 3322 Andrei Elkin 2011-06-27 wl#5569 MTS Cleanup and addressing sporadic rpl_temp_table_mix_row failure in post-execution mtr.check_testcase(). The check of the test failure was caused by faulty optimization in avoiding to migrate temporary tables from Coordinator to Workers in case of rows-event assignement. while it's correct with the homogenous rows-event only load, the mixture can fail. Fixed with removing the optimization so map_db_to_worker() always relocates which is somewhat suboptimal and should be improved in future. @ mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test Adding slave synchronization. @ sql/log_event.cc cleanup to move circular_buffer releated definitions into rpl_rli_pdb that is specialized on objects dealing with Worker, its assignement etc. improving comments; also instead of former separate flag indicating a T-event requires post-scheduling synchronization with the Worker is turned into a bit of existing Log_event::flags which also avoids ungliness of #if/#endif:s. @ sql/log_event.h instead of former separate flag indicating a T-event requires post-scheduling synchronization with the Worker is turned into a bit of existing Log_event::flags; @ sql/rpl_rli.cc cleanup: renaming. @ sql/rpl_rli.h cleanup: renaming, more comments. The former mts_wqs_overrun is converted into two: the statistics parameter mts_wq_overrun_cnt and the internal control parameter mts_wq_excess. @ sql/rpl_rli_pdb.cc Included rpl_slave.h that holds two necessary declarations; Cleanup: accepting circular_buffer related definitions migrated from log_event, improved comments, renaming, removing dead code @ sql/rpl_rli_pdb.h Cleanup: renaming and more comments are added. @ sql/rpl_slave.cc Augmenting print-out of statistics at the end of MTS session; cleanup: renaming. @ sql/rpl_slave.h Introducing two constants to define range of worker_id domain and a magic value of undefined worker. @ sql/sys_vars.cc replacing a literal int value with a symbilic constant. modified: mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test sql/log_event.cc sql/log_event.h sql/rpl_rli.cc sql/rpl_rli.h sql/rpl_rli_pdb.cc sql/rpl_rli_pdb.h sql/rpl_slave.cc sql/rpl_slave.h sql/sys_vars.cc === modified file 'mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test' --- a/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test 2010-12-19 17:15:12 +0000 +++ b/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test 2011-06-27 17:13:27 +0000 @@ -207,4 +207,7 @@ source include/show_binlog_events.inc; --echo connection master; DROP TABLE t1; + +sync_slave_with_master; + --source include/rpl_end.inc === modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2011-06-24 12:38:19 +0000 +++ b/sql/log_event.cc 2011-06-27 17:13:27 +0000 @@ -672,9 +672,6 @@ Log_event::Log_event(enum_event_cache_ty :temp_buf(0), exec_time(0), flags(0), event_cache_type(cache_type_arg), event_logging_type(logging_type_arg), crc(0), thd(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) -#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - , m_mts_event_isolated_group(FALSE) -#endif { server_id= ::server_id; /* @@ -697,9 +694,6 @@ Log_event::Log_event(const char* buf, event_cache_type(EVENT_INVALID_CACHE), event_logging_type(EVENT_INVALID_LOGGING), crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF) -#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) - , m_mts_event_isolated_group(FALSE) -#endif { #ifndef MYSQL_CLIENT thd = 0; @@ -2368,12 +2362,8 @@ bool Log_event::contains_partition_info( } /** - General hashing function to compute the id of an applier for - the current event. - At computing the id few rules apply depending on partitioning properties - that the event instance can feature. - - Let's call the properties through the following legends: + The method maps the event to a Worker and return a pointer to it. + As a part of the group, an event belongs to one of the following types: B - beginning of a group of events (BEGIN query_log_event) g - mini-group representative event containing the partition info @@ -2382,30 +2372,33 @@ bool Log_event::contains_partition_info( (int_, rand_, user_ var:s) r - a mini-group internal "regular" event that follows its g-parent (Delete, Update, Write -rows) - S - sequentially applied event (may not be a part of any group). - Events of this type are determined via @c mts_sequential_exec() - earlier and don't cause calling this method . - T - terminator of the group (XID, COMMIT, ROLLBACK) - - Only `g' case really computes the assigned Worker id which must - be memorized by the caller and is available through @c rli argument. - For instance DUW-rows events are mapped to a Worker previously chosen - at assigning of their Table-map parent g-event. - In `B' case the assigned Worker is NULL to indicate the Coordinator will - postpone scheduling until a following `g' event decides on a Worker. + T - terminator of the group (XID, COMMIT, ROLLBACK, auto-commit query) + + Only the first g-type event computes the assigned Worker which once + is determined remains to be for the rest of the group. + That is the g-type event solely carries partitioning info. + For B-type the assigned Worker is NULL to indicate Coordinator + has not yet decided. The same applies to p-type. - A group can consist of multiple events still without explict B - event. This is a case of old master binlog or few corner-cases of - the current master version (todo: to fix). Such group structure is - supposed to be {{p_i},g} that is it ends with the first not p-event. - Such g-event is marked with set_mts_event_ends_group(). + Notice, these is a special group consisting of optionally multiple p-events + terminating with a g-event. + Such case is caused by old master binlog and a few corner-cases of + the current master version (todo: to fix). + + In case of the event accesses more than OVER_MAX_DBS the method + has to ensure sure previously assigned groups to all other workers are + done. - @note The function can update APH, CGAP, GAQ objects. + + @note The function can update APH (through map_db_to_worker()), GAQ objects + and relocate some temporary tables from Coordinator's list into + involved entries of APH. + There's few memory allocations commented where to be freed. - @return a pointer to the Worker stuct or NULL. + @return a pointer to the Worker struct or NULL. */ -Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli) +Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) { Slave_job_group g, *ptr_g; bool is_b_event; @@ -2424,7 +2417,7 @@ Slave_worker *Log_event::get_slave_worke (rli->gaq->empty() || ((Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))-> - worker_id != (ulong) -1))) + worker_id != MTS_WORKER_UNDEF))) { ulong gaq_idx; rli->mts_total_groups++; @@ -2434,7 +2427,7 @@ Slave_worker *Log_event::get_slave_worke g.group_master_log_pos= g.group_relay_log_pos= 0; g.group_master_log_name= NULL; // todo: remove g.group_relay_log_name= NULL; - g.worker_id= (ulong) -1; + g.worker_id= MTS_WORKER_UNDEF; g.total_seqno= rli->mts_total_groups; g.checkpoint_log_name= NULL; g.checkpoint_log_pos= 0; @@ -2446,11 +2439,11 @@ Slave_worker *Log_event::get_slave_worke // the last occupied GAQ's array index gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g); - DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s); + DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < rli->gaq->size); DBUG_ASSERT(((Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))-> group_relay_log_name == NULL); - DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room + DBUG_ASSERT(rli->gaq->assigned_group_index != MTS_WORKER_UNDEF); // gaq must have room DBUG_ASSERT(rli->last_assigned_worker == NULL); if (is_b_event) @@ -2485,7 +2478,9 @@ Slave_worker *Log_event::get_slave_worke if (!ret_worker) ret_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0); (void) wait_for_workers_to_finish(rli, ret_worker); - + /* + this marking is transferred further into T-event of the current group. + */ rli->curr_group_isolated= TRUE; } @@ -2494,10 +2489,14 @@ Slave_worker *Log_event::get_slave_worke char **ref_cur_db= it.ref(); if (!(ret_worker= - get_slave_worker(*ref_cur_db, rli, + map_db_to_worker(*ref_cur_db, rli, &mts_assigned_partitions[i], - // only rows-events do not need temporary tables - get_type_code() != TABLE_MAP_EVENT, ret_worker))) + /* + todo: optimize it. Although pure + rows- event load in insensetive to the flag value + */ + TRUE, + ret_worker))) { llstr(rli->get_event_relay_log_pos(), llbuff); rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL, @@ -2518,7 +2517,7 @@ Slave_worker *Log_event::get_slave_worke if ((ptr_g= ((Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index)))->worker_id - == (ulong) -1) + == MTS_WORKER_UNDEF) { ptr_g->worker_id= ret_worker->id; @@ -2582,20 +2581,14 @@ Slave_worker *Log_event::get_slave_worke } } - // the group terminal event: - // Commit, Xid, a DDL query or dml query of B-less group. + // T-event: Commit, Xid, a DDL query or dml query of B-less group. if (ends_group() || !rli->curr_group_seen_begin) { // index of GAQ that this terminal event belongs to mts_group_cnt= rli->gaq->assigned_group_index; - /* - special marking for T event of a group containing over-max db:s event - including {p,g} B-less group. - */ + rli->mts_group_status= Relay_log_info::MTS_END_GROUP; if (rli->curr_group_isolated) mts_do_isolate_group(); - rli->mts_group_status= Relay_log_info::MTS_END_GROUP; - ptr_g= (Slave_job_group *) dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index); @@ -2650,7 +2643,7 @@ Slave_worker *Log_event::get_slave_worke ret_worker->checkpoint_notified= TRUE; } ptr_g->checkpoint_seqno= rli->checkpoint_seqno; - ptr_g->ts= when + (time_t) exec_time; + ptr_g->ts= when + (time_t) exec_time; // Seconds_behind_master related rli->checkpoint_seqno++; // reclaiming resources allocated during the group scheduling @@ -2660,179 +2653,6 @@ Slave_worker *Log_event::get_slave_worke return ret_worker; } -// returns the next available! (TODO: incompatible to circurla_buff method!!!) -static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item) -{ - if (jobs->a == jobs->s) - { - DBUG_ASSERT(jobs->a == jobs->Q.elements); - return -1; - } - - // store - - set_dynamic(&jobs->Q, (uchar*) item, jobs->a); - - // pre-boundary cond - if (jobs->e == jobs->s) - jobs->e= jobs->a; - - jobs->a= (jobs->a + 1) % jobs->s; - jobs->len++; - - // post-boundary cond - if (jobs->a == jobs->e) - jobs->a= jobs->s; - DBUG_ASSERT(jobs->a == jobs->e || - jobs->len == (jobs->a >= jobs->e) ? - (jobs->a - jobs->e) : (jobs->s + jobs->a - jobs->e)); - return jobs->a; -} - -/** - return the value of @c data member of the head of the queue. -*/ -static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) -{ - if (jobs->e == jobs->s) - { - DBUG_ASSERT(jobs->len == 0); - ret->data= NULL; // todo: move to caller - return NULL; - } - get_dynamic(&jobs->Q, (uchar*) ret, jobs->e); - - DBUG_ASSERT(ret->data); // todo: move to caller - - return ret; -} - - -/** - return a job item through a struct which point is supplied via argument. -*/ -Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) -{ - if (jobs->e == jobs->s) - { - DBUG_ASSERT(jobs->len == 0); - return NULL; - } - get_dynamic(&jobs->Q, (uchar*) ret, jobs->e); - jobs->len--; - - // pre boundary cond - if (jobs->a == jobs->s) - jobs->a= jobs->e; - jobs->e= (jobs->e + 1) % jobs->s; - - // post boundary cond - if (jobs->a == jobs->e) - jobs->e= jobs->s; - - DBUG_ASSERT(jobs->e == jobs->s || - (jobs->len == (jobs->a >= jobs->e)? (jobs->a - jobs->e) : - (jobs->s + jobs->a - jobs->e))); - - return ret; -} - -void append_item_to_jobs(slave_job_item *job_item, - Slave_worker *w, Relay_log_info *rli) -{ - THD *thd= rli->info_thd; - int ret; - ulong ev_size= ((Log_event*) (job_item->data))->data_written; - ulonglong new_pend_size; - - DBUG_ASSERT(thd == current_thd); - thd_proc_info(thd, "Feeding an event to a worker thread"); - - mysql_mutex_lock(&rli->pending_jobs_lock); - new_pend_size= rli->mts_pending_jobs_size + ev_size; - // C waits basing on *data* sizes in the queues - while (new_pend_size > rli->mts_pending_jobs_size_max) - { - const char *old_msg; - const char info_format[]= - "Waiting for Slave Workers to free pending events, requested size %lu"; - char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)]; - - sprintf(wait_info, info_format, new_pend_size); - rli->mts_wqs_oversize= TRUE; - rli->wait_jobs++; // waiting due to the total size - old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock, - wait_info); - mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); - thd->exit_cond(old_msg); - if (thd->killed) - return; - - mysql_mutex_lock(&rli->pending_jobs_lock); - - new_pend_size= rli->mts_pending_jobs_size + ev_size; - } - rli->pending_jobs++; - rli->mts_pending_jobs_size= new_pend_size; - rli->stmt_jobs++; - - mysql_mutex_unlock(&rli->pending_jobs_lock); - - // sleep while all queue lengths are gt Underrun - // sleep time lasts the longer the further WQ:s shift to Overrun - // Workers report their U,O status - - if (rli->mts_wqs_underrun_w_id != (ulong) -1) - { - // todo: experiment with weight to get a good approximation formula - ulong nap_weight= rli->mts_wqs_overrun + 1; - my_sleep(nap_weight * rli->mts_coordinator_basic_nap); - rli->mts_wqs_underrun_cnt++; - } - - ret= -1; - - mysql_mutex_lock(&w->jobs_lock); - - // possible WQ overfill - while (w->running_status == Slave_worker::RUNNING && !thd->killed && - (ret= en_queue(&w->jobs, job_item)) == -1) - { - const char *old_msg; - const char info_format[]= - "Waiting for Slave Worker %d queue: max len %lu, actual len %lu"; - char wait_info[sizeof(info_format) + 4*sizeof(w->id) + - 4*sizeof(w->jobs.s) + 4*sizeof(w->jobs.len)]; - - sprintf(wait_info, info_format, w->id, w->jobs.s, w->jobs.len); - old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, wait_info); - w->jobs.overfill= TRUE; - w->jobs.waited_overfill++; - rli->mts_wqs_overfill_cnt++; - mysql_cond_wait(&w->jobs_cond, &w->jobs_lock); - thd->exit_cond(old_msg); - - mysql_mutex_lock(&w->jobs_lock); - } - if (ret != -1) - { - w->curr_jobs++; - if (w->jobs.len == 1) - mysql_cond_signal(&w->jobs_cond); - - mysql_mutex_unlock(&w->jobs_lock); - } - else - { - mysql_mutex_unlock(&w->jobs_lock); - - mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs--; // roll back of the prev incr - rli->mts_pending_jobs_size -= ev_size; - mysql_mutex_unlock(&rli->pending_jobs_lock); - } -} - /** Scheduling event to execute in parallel or execute it directly. In MTS case the event gets associated with either Coordinator or a @@ -2942,7 +2762,7 @@ int Log_event::apply_event(Relay_log_inf c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP; worker= (Relay_log_info*) - (c_rli->last_assigned_worker= get_slave_worker_id(c_rli)); + (c_rli->last_assigned_worker= get_slave_worker(c_rli)); err: if (thd->is_error()) @@ -2965,217 +2785,6 @@ err: 0 : -1); } - -/** - Worker's routine to wait for a new assignement in its - private queue. - - @return NULL failure or - a-pointer to an item. -*/ -struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item) -{ - THD *thd= w->info_thd; - - mysql_mutex_lock(&w->jobs_lock); - - while (!job_item->data && !thd->killed && - w->running_status == Slave_worker::RUNNING) - { - const char *old_msg; - - head_queue(&w->jobs, job_item); - if (job_item->data == NULL) - { - w->wait_jobs++; - old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, - "Waiting for an event from sql thread"); - mysql_cond_wait(&w->jobs_cond, &w->jobs_lock); - thd->exit_cond(old_msg); - mysql_mutex_lock(&w->jobs_lock); - } - } - if (job_item->data) - w->curr_jobs--; - - mysql_mutex_unlock(&w->jobs_lock); - - thd_proc_info(w->info_thd, "Executing event"); - return job_item; -} - - -/** - MTS worker main routine. - The worker thread waits for an event, execute it, fixes statistics counters. - - @note the function maintains CGEP and modifies APH, and causes - modification of GAQ. - - @return 0 success - -1 got killed or an error happened during appying -*/ -int slave_worker_exec_job(Slave_worker *w, Relay_log_info *rli) -{ - int error= 0; - struct slave_job_item item= {NULL}, *job_item= &item; - THD *thd= w->info_thd; - Log_event *ev= NULL; - bool part_event= FALSE; - - DBUG_ENTER("slave_worker_exec_job"); - - job_item= pop_jobs_item(w, job_item); - if (thd->killed || w->running_status != Slave_worker::RUNNING) - { - // de-queueing and decrement counters is in the caller's exit branch - error= -1; - goto err; - } - ev= static_cast(job_item->data); - thd->server_id = ev->server_id; - thd->set_time(); - thd->lex->current_select= 0; - if (!ev->when) - ev->when= my_time(0); - ev->thd= thd; // todo: assert because up to this point, ev->thd == 0 - - DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", w->id, job_item, ev, thd)); - - if (ev->starts_group()) - { - w->curr_group_seen_begin= TRUE; // The current group is started with B-event - } - else - { - if ((part_event= ev->contains_partition_info())) - { - uint num_dbs= ev->mts_number_dbs(); - DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts; - - if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS) - num_dbs= 1; - - DBUG_ASSERT(num_dbs > 0); - - for (uint k= 0; k < num_dbs; k++) - { - bool found= FALSE; - - for (uint i= 0; i < ep->elements && !found; i++) - { - found= - *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) == - ev->mts_assigned_partitions[k]; - } - if (!found) - { - /* - notice, can't assert - DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == w); - since entry could be marked as wanted by other worker. - */ - insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]); - } - } - } - } - w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos); - error= ev->do_apply_event_worker(w); - if (ev->ends_group() || (!w->curr_group_seen_begin && - /* - p-events of B/T-less {p,g} group (see - legends of Log_event::get_slave_worker) - obviously can't commit. - */ - part_event)) - { - DBUG_PRINT("slave_worker_exec_job:", - (" commits GAQ index %lu, last committed %lu", - ev->mts_group_cnt, w->last_group_done_index)); - w->slave_worker_ends_group(ev, error); /* last done sets post exec */ - } - - mysql_mutex_lock(&w->jobs_lock); - de_queue(&w->jobs, job_item); - - /* possible overfill */ - if (w->jobs.len == w->jobs.s - 1 && w->jobs.overfill == TRUE) - { - w->jobs.overfill= FALSE; - // todo: w->hungry_cnt++; - mysql_cond_signal(&w->jobs_cond); - } - mysql_mutex_unlock(&w->jobs_lock); - - /* statistics */ - - mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs--; - rli->mts_pending_jobs_size -= ev->data_written; - DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max); - - // underrun - if ((rli->mts_worker_underrun_level * w->jobs.s) / 100 > w->jobs.len) - { - rli-> mts_wqs_underrun_w_id= w->id; - // todo: w->underrun_cnt++; - } else if (rli->mts_wqs_underrun_w_id == w->id) - { - rli->mts_wqs_underrun_w_id= (ulong) -1; - } - - // overrun exploits the underrun level param - if (((100 - rli->mts_worker_underrun_level) * w->jobs.s) / 100 < w->jobs.len) - { - rli->mts_wqs_overrun++; - w->wq_overrun_set= TRUE; - // todo: w->underrun_cnt++; - } - else if (w->wq_overrun_set == TRUE) - { - rli->mts_wqs_overrun--; - w->wq_overrun_set= FALSE; - } - - DBUG_ASSERT(rli->mts_wqs_overrun >= 0); - - /* coordinator can be waiting */ - if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max && - rli->mts_wqs_oversize) // TODO: unit/general test wqs_oversize - { - rli->mts_wqs_oversize= FALSE; - mysql_cond_signal(&rli->pending_jobs_cond); - } - - mysql_mutex_unlock(&rli->pending_jobs_lock); - - w->stmt_jobs++; - -err: - if (error) - { - sql_print_information("Worker %lu is exiting: killed %i, error %i, " - "running_status %d", - w->id, thd->killed, thd->is_error(), - w->running_status); - w->slave_worker_ends_group(ev, error); - } - - // rows_query_log_event is deleted as a part of the statement cleanup - - // todo: sync_slave_with_master fails when my_sleep(1000) is put here - - if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) - { - w->last_event= ev; - delete ev; - } - - - DBUG_RETURN(error); -} - #endif /************************************************************************** === modified file 'sql/log_event.h' --- a/sql/log_event.h 2011-06-14 09:27:38 +0000 +++ b/sql/log_event.h 2011-06-27 17:13:27 +0000 @@ -536,6 +536,15 @@ struct sql_ex_info */ #define LOG_EVENT_NO_FILTER_F 0x100 +/** + MTS: group of events can be marked to force its execution + in isolation from any other Workers. + Typically that is done for a transaction that contains + a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s. + The flag is set ON for an event that terminates its group. +*/ +#define LOG_EVENT_MTS_ISOLATE_F 0x200 + /** @def OPTIONS_WRITTEN_TO_BIN_LOG @@ -1330,7 +1339,7 @@ public: to be assigned worker; M is the max index of the worker pool. */ - Slave_worker *get_slave_worker_id(Relay_log_info *rli); + Slave_worker *get_slave_worker(Relay_log_info *rli); /* The method returns a list of updated by the event databases. @@ -1361,12 +1370,14 @@ public: get_type_code() == QUERY_EVENT || get_type_code() == EXEC_LOAD_EVENT || get_type_code() == EXECUTE_LOAD_QUERY_EVENT); - m_mts_event_isolated_group= TRUE; + flags |= LOG_EVENT_MTS_ISOLATE_F; } + /* - Verifying whether event is marked to execute in isolation. + Verifying whether the terminal event of a group is marked to + execute in isolation. */ - virtual bool mts_is_group_isolated() { return m_mts_event_isolated_group; } + bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; } /** Apply the event to the database. @@ -1501,8 +1512,6 @@ protected: non-zero. The caller shall decrease the counter by one. */ virtual enum_skip_reason do_shall_skip(Relay_log_info *rli); - - bool m_mts_event_isolated_group; #endif }; === modified file 'sql/rpl_rli.cc' --- a/sql/rpl_rli.cc 2011-06-22 17:54:23 +0000 +++ b/sql/rpl_rli.cc 2011-06-27 17:13:27 +0000 @@ -112,8 +112,8 @@ void Relay_log_info::init_workers(ulong Parallel slave parameters initialization is done regardless whether the feature is or going to be active or not. */ - trans_jobs= stmt_jobs= pending_jobs= wait_jobs= 0; - mts_wqs_underrun_cnt= mts_wqs_overfill_cnt= 0; + trans_jobs= stmt_jobs= pending_jobs= wq_size_waits= 0; + mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0; my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4); my_atomic_rwlock_init(&slave_open_temp_tables_lock); === modified file 'sql/rpl_rli.h' --- a/sql/rpl_rli.h 2011-06-24 12:38:19 +0000 +++ b/sql/rpl_rli.h 2011-06-27 17:13:27 +0000 @@ -432,34 +432,42 @@ public: time_t last_event_start_time; /* - WL#5569 MTS-II + WL#5569 MTS + + legends: + C - Coordinator; + W - Worker; + WQ - Worker Query containing event assignments */ DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers volatile ulong pending_jobs; - ulong trans_jobs, wait_jobs, stmt_jobs; // wait_jobs - waiting times due to the total size + ulong stmt_jobs; // statistics: number of events (statements) processed + ulong trans_jobs; // statistics: number of groups (transactions) processed + ulong wq_size_waits; // number of times C goes to sleep due to WQ:s oversize mysql_mutex_t pending_jobs_lock; mysql_cond_t pending_jobs_cond; ulong mts_slave_worker_queue_len_max; ulonglong mts_pending_jobs_size; // actual mem usage by WQ:s ulonglong mts_pending_jobs_size_max; // the max forcing to wait by C - bool mts_wqs_oversize; // C raises flag to wait some memory's released + bool mts_wq_oversize; // C raises flag to wait some memory's released Slave_worker *last_assigned_worker; // a hint to partitioning func for some events Slave_committed_queue *gaq; DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events bool curr_group_seen_begin; // current group started with B-event or not bool curr_group_isolated; // current group requires execution in isolation - volatile ulong mts_wqs_underrun_w_id; // Id of a Worker whose queue is getting empty - volatile long mts_wqs_overrun; // W to incr and decr - ulong mts_wqs_underrun_cnt; // Coord goes to sleep when senses Workers are content - ulong mts_wqs_overfill_cnt; // Coord waits if a W's queue is full - long mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry + volatile ulong mts_wq_underrun_w_id; // Id of a Worker whose queue is getting empty + volatile long mts_wq_excess; // excessive overrun counter; when W increments and decrements it it also marks updates its own wq_overrun_set + volatile ulong mts_wq_overrun_cnt; // statistics of all mts_wq_excess increments + ulong mts_wq_no_underrun_cnt; // counts times of C goes to sleep when W:s are filled + ulong mts_wq_overfill_cnt; // counts C waits when a W's queue is full + long mts_worker_underrun_level; // percent of WQ size at which W is considered hungry ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL. ulonglong mts_total_groups; // total event groups distributed in current session ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers ulong slave_parallel_workers; // the one slave session time number of workers - ulong recovery_parallel_workers; // number of workers while recovering. + ulong recovery_parallel_workers; // number of workers while recovering /* A sorted array of Worker current assignements number to provide === modified file 'sql/rpl_rli_pdb.cc' --- a/sql/rpl_rli_pdb.cc 2011-06-24 12:38:19 +0000 +++ b/sql/rpl_rli_pdb.cc 2011-06-27 17:13:27 +0000 @@ -2,6 +2,7 @@ #include "sql_priv.h" #include "unireg.h" #include "rpl_rli_pdb.h" +#include "rpl_slave.h" #include "sql_string.h" #include @@ -251,6 +252,8 @@ bool Slave_worker::commit_positions(Log_ // extract an updated relay-log name to store in Worker's rli. if (ptr_g->group_relay_log_name) { + DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1 + <= sizeof(group_relay_log_name)); strmake(group_relay_log_name, ptr_g->group_relay_log_name, sizeof(group_relay_log_name) - 1); } @@ -516,7 +519,7 @@ static void move_temp_tables_to_entry(TH @return the pointer to a Worker struct */ -Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli, +Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *last_worker) { @@ -754,17 +757,16 @@ Slave_worker *get_least_occupied_worker( } /** - Deallocative routine that makes few things in opposite to - @c get_slave_worker(). - - Affected by the being committed group APH tuples are updated. - @c last_group_done_index member is set to the arg value. - + Deallocation routine to cancel out few effects of + @c map_db_to_worker(). + Involved into processing of the group APH tuples are updated. + @c last_group_done_index member is set to the GAQ index of + the current group. CGEP the Worker partition cache is cleaned up. - TODO: reclaim space if the actual size exceeds the limit. + @param ev a pointer to Log_event + @param error error code after processing the event by caller. */ - void Slave_worker::slave_worker_ends_group(Log_event* ev, int error) { DBUG_ENTER("Slave_worker::slave_worker_ends_group"); @@ -776,20 +778,10 @@ void Slave_worker::slave_worker_ends_gro (Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx); // first ever group must have relay log name - DBUG_ASSERT(last_group_done_index != c_rli->gaq->s || + DBUG_ASSERT(last_group_done_index != c_rli->gaq->size || ptr_g->group_relay_log_name != NULL); DBUG_ASSERT(ptr_g->worker_id == id); - if (ptr_g->group_relay_log_name != NULL) - { - // memorizing a new relay-log file name - - DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1 - <= sizeof(group_relay_log_name)); - - strcpy(group_relay_log_name, ptr_g->group_relay_log_name); - } - if (!(ev->get_type_code() == XID_EVENT && is_transactional())) { commit_positions(ev, ptr_g); @@ -800,6 +792,7 @@ void Slave_worker::slave_worker_ends_gro ptr_g->group_master_log_pos= group_master_log_pos; ptr_g->group_relay_log_pos= group_relay_log_pos; + ptr_g->done= 1; // GAQ index is available to C now last_group_done_index= gaq_idx; @@ -888,29 +881,29 @@ void Slave_worker::slave_worker_ends_gro ulong circular_buffer_queue::de_queue(uchar *val) { ulong ret; - if (e == s) + if (enter == size) { DBUG_ASSERT(len == 0); return (ulong) -1; } - ret= e; - get_dynamic(&Q, val, e); + ret= enter; + get_dynamic(&Q, val, enter); len--; // pre boundary cond - if (a == s) - a= e; - e= (e + 1) % s; + if (avail == size) + avail= enter; + enter= (enter + 1) % size; // post boundary cond - if (a == e) - e= s; + if (avail == enter) + enter= size; - DBUG_ASSERT(e == s || - (len == (a >= e)? (a - e) : - (s + a - e))); - DBUG_ASSERT(a != e); + DBUG_ASSERT(enter == size || + (len == (avail >= enter)? (avail - enter) : + (size + avail - enter))); + DBUG_ASSERT(avail != enter); return ret; } @@ -920,26 +913,26 @@ ulong circular_buffer_queue::de_queue(uc */ ulong circular_buffer_queue::de_tail(uchar *val) { - if (e == s) + if (enter == size) { DBUG_ASSERT(len == 0); return (ulong) -1; } - a= (e + len - 1) % s; - get_dynamic(&Q, val, a); + avail= (enter + len - 1) % size; + get_dynamic(&Q, val, avail); len--; // post boundary cond - if (a == e) - e= s; + if (avail == enter) + enter= size; - DBUG_ASSERT(e == s || - (len == (a >= e)? (a - e) : - (s + a - e))); - DBUG_ASSERT(a != e); + DBUG_ASSERT(enter == size || + (len == (avail >= enter)? (avail - enter) : + (size + avail - enter))); + DBUG_ASSERT(avail != enter); - return a; + return avail; } /** @return the used index at success or -1 when queue is full @@ -947,33 +940,33 @@ ulong circular_buffer_queue::de_tail(uch ulong circular_buffer_queue::en_queue(void *item) { ulong ret; - if (a == s) + if (avail == size) { - DBUG_ASSERT(a == Q.elements); + DBUG_ASSERT(avail == Q.elements); return (ulong) -1; } // store - ret= a; - set_dynamic(&Q, (uchar*) item, a); + ret= avail; + set_dynamic(&Q, (uchar*) item, avail); // pre-boundary cond - if (e == s) - e= a; + if (enter == size) + enter= avail; - a= (a + 1) % s; + avail= (avail + 1) % size; len++; // post-boundary cond - if (a == e) - a= s; + if (avail == enter) + avail= size; - DBUG_ASSERT(a == e || - len == (a >= e) ? - (a - e) : (s + a - e)); - DBUG_ASSERT(a != e); + DBUG_ASSERT(avail == enter || + len == (avail >= enter) ? + (avail - enter) : (size + avail - enter)); + DBUG_ASSERT(avail != enter); return ret; } @@ -981,13 +974,13 @@ ulong circular_buffer_queue::en_queue(vo void* circular_buffer_queue::head_queue() { uchar *ret= NULL; - if (e == s) + if (enter == size) { DBUG_ASSERT(len == 0); } else { - get_dynamic(&Q, (uchar*) ret, e); + get_dynamic(&Q, (uchar*) ret, enter); } return (void*) ret; } @@ -1005,16 +998,16 @@ void* circular_buffer_queue::head_queue( */ bool circular_buffer_queue::gt(ulong i, ulong k) { - DBUG_ASSERT(i < s && k < s); - DBUG_ASSERT(a != e); + DBUG_ASSERT(i < size && k < size); + DBUG_ASSERT(avail != enter); - if (i >= e) - if (k >= e) + if (i >= enter) + if (k >= enter) return i > k; else return FALSE; else - if (k >= e) + if (k >= enter) return TRUE; else return i > k; @@ -1045,7 +1038,7 @@ ulong Slave_committed_queue::move_queue_ { ulong i, cnt= 0; - for (i= e; i != a && !empty();) + for (i= enter; i != avail && !empty();) { Slave_worker *w_i; Slave_job_group *ptr_g, g; @@ -1059,8 +1052,12 @@ ulong Slave_committed_queue::move_queue_ The current job has not been processed or it was not even assigned, this means there is a gap. */ - if (ptr_g->worker_id == (ulong) -1 || !ptr_g->done) + if (ptr_g->worker_id == MTS_WORKER_UNDEF || !ptr_g->done) break; /* gap at i'th */ + + /* Worker-id domain guard */ + compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS); + get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id); /* @@ -1114,7 +1111,7 @@ ulong Slave_committed_queue::move_queue_ set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id); cnt++; - i= (i + 1) % s; + i= (i + 1) % size; } return cnt; @@ -1128,7 +1125,7 @@ ulong Slave_committed_queue::move_queue_ void Slave_committed_queue::free_dynamic_items() { ulong i; - for (i= e; i != a && !empty(); i= (i + 1) % s) + for (i= enter; i != avail && !empty(); i= (i + 1) % size) { Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i); if (ptr_g->group_relay_log_name) @@ -1246,3 +1243,407 @@ int wait_for_workers_to_finish(Relay_log DBUG_RETURN(ret); } + + +// returns the next available! (TODO: incompatible to circurla_buff method!!!) +static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item) +{ + if (jobs->avail == jobs->size) + { + DBUG_ASSERT(jobs->avail == jobs->Q.elements); + return -1; + } + + // store + + set_dynamic(&jobs->Q, (uchar*) item, jobs->avail); + + // pre-boundary cond + if (jobs->enter == jobs->size) + jobs->enter= jobs->avail; + + jobs->avail= (jobs->avail + 1) % jobs->size; + jobs->len++; + + // post-boundary cond + if (jobs->avail == jobs->enter) + jobs->avail= jobs->size; + DBUG_ASSERT(jobs->avail == jobs->enter || + jobs->len == (jobs->avail >= jobs->enter) ? + (jobs->avail - jobs->enter) : (jobs->size + jobs->avail - jobs->enter)); + return jobs->avail; +} + +/** + return the value of @c data member of the head of the queue. +*/ +static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) +{ + if (jobs->enter == jobs->size) + { + DBUG_ASSERT(jobs->len == 0); + ret->data= NULL; // todo: move to caller + return NULL; + } + get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter); + + DBUG_ASSERT(ret->data); // todo: move to caller + + return ret; +} + + +/** + return a job item through a struct which point is supplied via argument. +*/ +Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) +{ + if (jobs->enter == jobs->size) + { + DBUG_ASSERT(jobs->len == 0); + return NULL; + } + get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter); + jobs->len--; + + // pre boundary cond + if (jobs->avail == jobs->size) + jobs->avail= jobs->enter; + jobs->enter= (jobs->enter + 1) % jobs->size; + + // post boundary cond + if (jobs->avail == jobs->enter) + jobs->enter= jobs->size; + + DBUG_ASSERT(jobs->enter == jobs->size || + (jobs->len == (jobs->avail >= jobs->enter)? (jobs->avail - jobs->enter) : + (jobs->size + jobs->avail - jobs->enter))); + + return ret; +} + +/** + Coordinator enqueues a job item into a Worker private queue. + + @param job_item a pointer to struct carrying a reference to an event + @param worker a pointer to the assigned Worker struct + @param rli a pointer to Relay_log_info of Coordinator +*/ +void append_item_to_jobs(slave_job_item *job_item, + Slave_worker *worker, Relay_log_info *rli) +{ + THD *thd= rli->info_thd; + int ret; + ulong ev_size= ((Log_event*) (job_item->data))->data_written; + ulonglong new_pend_size; + + DBUG_ASSERT(thd == current_thd); + thd_proc_info(thd, "Feeding an event to a worker thread"); + + mysql_mutex_lock(&rli->pending_jobs_lock); + new_pend_size= rli->mts_pending_jobs_size + ev_size; + // C waits basing on *data* sizes in the queues + while (new_pend_size > rli->mts_pending_jobs_size_max) + { + const char *old_msg; + const char info_format[]= + "Waiting for Slave Workers to free pending events, requested size %lu"; + char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)]; + + sprintf(wait_info, info_format, new_pend_size); + rli->mts_wq_oversize= TRUE; + rli->wq_size_waits++; // waiting due to the total size + old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock, + wait_info); + mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); + thd->exit_cond(old_msg); + if (thd->killed) + return; + + mysql_mutex_lock(&rli->pending_jobs_lock); + + new_pend_size= rli->mts_pending_jobs_size + ev_size; + } + rli->pending_jobs++; + rli->mts_pending_jobs_size= new_pend_size; + rli->stmt_jobs++; + + mysql_mutex_unlock(&rli->pending_jobs_lock); + + /* + Sleep unless there is an underrunning Worker. + */ + if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF) + { + // todo: experiment with weight to get a good approximation formula + // The longer Sleep lasts the bigger is excessive overrun counter. + ulong nap_weight= rli->mts_wq_excess + 1; + my_sleep(nap_weight * rli->mts_coordinator_basic_nap); + rli->mts_wq_no_underrun_cnt++; + } + + ret= -1; + + mysql_mutex_lock(&worker->jobs_lock); + + // possible WQ overfill + while (worker->running_status == Slave_worker::RUNNING && !thd->killed && + (ret= en_queue(&worker->jobs, job_item)) == -1) + { + const char *old_msg; + const char info_format[]= + "Waiting for Slave Worker %d queue: max len %lu, actual len %lu"; + char wait_info[sizeof(info_format) + 4*sizeof(worker->id) + + 4*sizeof(worker->jobs.size) + 4*sizeof(worker->jobs.len)]; + + sprintf(wait_info, info_format, worker->id, worker->jobs.size, worker->jobs.len); + old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock, wait_info); + worker->jobs.overfill= TRUE; + worker->jobs.waited_overfill++; + rli->mts_wq_overfill_cnt++; + mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock); + thd->exit_cond(old_msg); + + mysql_mutex_lock(&worker->jobs_lock); + } + if (ret != -1) + { + worker->curr_jobs++; + if (worker->jobs.len == 1) + mysql_cond_signal(&worker->jobs_cond); + + mysql_mutex_unlock(&worker->jobs_lock); + } + else + { + mysql_mutex_unlock(&worker->jobs_lock); + + mysql_mutex_lock(&rli->pending_jobs_lock); + rli->pending_jobs--; // roll back of the prev incr + rli->mts_pending_jobs_size -= ev_size; + mysql_mutex_unlock(&rli->pending_jobs_lock); + } +} + + +/** + Worker's routine to wait for a new assignement through + @c append_item_to_jobs() + + @param worker a pointer to the waiting Worker struct + @param job_item a pointer to struct carrying a reference to an event + + @return NULL failure or + a-pointer to an item. +*/ +struct slave_job_item* pop_jobs_item(Slave_worker *worker, Slave_job_item *job_item) +{ + THD *thd= worker->info_thd; + + mysql_mutex_lock(&worker->jobs_lock); + + while (!job_item->data && !thd->killed && + worker->running_status == Slave_worker::RUNNING) + { + const char *old_msg; + + head_queue(&worker->jobs, job_item); + if (job_item->data == NULL) + { + worker->wq_empty_waits++; + old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock, + "Waiting for an event from sql thread"); + mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock); + thd->exit_cond(old_msg); + mysql_mutex_lock(&worker->jobs_lock); + } + } + if (job_item->data) + worker->curr_jobs--; + + mysql_mutex_unlock(&worker->jobs_lock); + + thd_proc_info(worker->info_thd, "Executing event"); + return job_item; +} + + +/** + MTS worker main routine. + The worker thread loops in waiting for an event, executing it and + fixing statistics counters. + + @param worker a pointer to the assigned Worker struct + @param rli a pointer to Relay_log_info of Coordinator + to update statistics. + + @note the function maintains worker's CGEP and modifies APH, updates + the current group item in GAQ via @c slave_worker_ends_group(). + + @return 0 success + -1 got killed or an error happened during appying +*/ +int slave_worker_exec_job(Slave_worker *worker, Relay_log_info *rli) +{ + int error= 0; + struct slave_job_item item= {NULL}, *job_item= &item; + THD *thd= worker->info_thd; + Log_event *ev= NULL; + bool part_event= FALSE; + + DBUG_ENTER("slave_worker_exec_job"); + + job_item= pop_jobs_item(worker, job_item); + if (thd->killed || worker->running_status != Slave_worker::RUNNING) + { + // de-queueing and decrement counters is in the caller's exit branch + error= -1; + goto err; + } + ev= static_cast(job_item->data); + thd->server_id = ev->server_id; + thd->set_time(); + thd->lex->current_select= 0; + if (!ev->when) + ev->when= my_time(0); + ev->thd= thd; // todo: assert because up to this point, ev->thd == 0 + + DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", worker->id, job_item, ev, thd)); + + if (ev->starts_group()) + { + worker->curr_group_seen_begin= TRUE; // The current group is started with B-event + } + else + { + if ((part_event= ev->contains_partition_info())) + { + uint num_dbs= ev->mts_number_dbs(); + DYNAMIC_ARRAY *ep= &worker->curr_group_exec_parts; + + if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS) + num_dbs= 1; + + DBUG_ASSERT(num_dbs > 0); + + for (uint k= 0; k < num_dbs; k++) + { + bool found= FALSE; + + for (uint i= 0; i < ep->elements && !found; i++) + { + found= + *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) == + ev->mts_assigned_partitions[k]; + } + if (!found) + { + /* + notice, can't assert + DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == worker); + since entry could be marked as wanted by other worker. + */ + insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]); + } + } + } + } + worker->set_future_event_relay_log_pos(ev->future_event_relay_log_pos); + error= ev->do_apply_event_worker(worker); + if (ev->ends_group() || (!worker->curr_group_seen_begin && + /* + p-events of B/T-less {p,g} group (see + legends of Log_event::get_slave_worker) + obviously can't commit. + */ + part_event)) + { + DBUG_PRINT("slave_worker_exec_job:", + (" commits GAQ index %lu, last committed %lu", + ev->mts_group_cnt, worker->last_group_done_index)); + worker->slave_worker_ends_group(ev, error); /* last done sets post exec */ + } + + mysql_mutex_lock(&worker->jobs_lock); + de_queue(&worker->jobs, job_item); + + /* possible overfill */ + if (worker->jobs.len == worker->jobs.size - 1 && worker->jobs.overfill == TRUE) + { + worker->jobs.overfill= FALSE; + // todo: worker->hungry_cnt++; + mysql_cond_signal(&worker->jobs_cond); + } + mysql_mutex_unlock(&worker->jobs_lock); + + /* statistics */ + + /* todo: convert to rwlock/atomic write */ + mysql_mutex_lock(&rli->pending_jobs_lock); + + rli->pending_jobs--; + rli->mts_pending_jobs_size -= ev->data_written; + DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max); + + // underrun (number of pending assignments is less than underrun level) + if ((rli->mts_worker_underrun_level * worker->jobs.size) / 100.0 > + worker->jobs.len) + { + rli->mts_wq_underrun_w_id= worker->id; + } else if (rli->mts_wq_underrun_w_id == worker->id) + { + // reset only own marking + rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF; + } + + // overrun is symmetric to underrun. In a sense it's underrun to get to 100% + if (((100 - rli->mts_worker_underrun_level) * worker->jobs.size) / 100.0 + < worker->jobs.len) + { + rli->mts_wq_excess++; + worker->wq_overrun_set= TRUE; + rli->mts_wq_overrun_cnt++; + } + else if (worker->wq_overrun_set == TRUE) + { + rli->mts_wq_excess--; + worker->wq_overrun_set= FALSE; + } + + DBUG_ASSERT(rli->mts_wq_excess >= 0); + + /* coordinator can be waiting */ + if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max && + rli->mts_wq_oversize) // TODO: unit/general test wq_oversize + { + rli->mts_wq_oversize= FALSE; + mysql_cond_signal(&rli->pending_jobs_cond); + } + + mysql_mutex_unlock(&rli->pending_jobs_lock); + + worker->stmt_jobs++; + +err: + if (error) + { + sql_print_information("Worker %lu is exiting: killed %i, error %i, " + "running_status %d", + worker->id, thd->killed, thd->is_error(), + worker->running_status); + worker->slave_worker_ends_group(ev, error); + } + + // rows_query_log_event is deleted as a part of the statement cleanup + + // todo: sync_slave_with_master fails when my_sleep(1000) is put here + + if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) + { + worker->last_event= ev; + delete ev; + } + + + DBUG_RETURN(error); +} === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h 2011-06-24 12:38:19 +0000 +++ b/sql/rpl_rli_pdb.h 2011-06-27 17:13:27 +0000 @@ -32,7 +32,7 @@ typedef struct st_db_worker_hash_entry bool init_hash_workers(ulong slave_parallel_workers); void destroy_hash_workers(Relay_log_info*); -Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli, +Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli, db_worker_hash_entry **ptr_entry, bool need_temp_tables, Slave_worker *w); Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers); @@ -62,17 +62,17 @@ class circular_buffer_queue public: DYNAMIC_ARRAY Q; - ulong s; // the Size of the queue in terms of element - ulong a; // first Available index to append at (next to tail) - ulong e; // the head index - volatile ulong len; // it is also queried to compute least occupied + ulong size; // the Size of the queue in terms of element + ulong avail; // first Available index to append at (next to tail) + ulong enter; // the head index + volatile ulong len; // actual length bool inited_queue; circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) : - s(max), a(0), e(max), len(0), inited_queue(FALSE) + size(max), avail(0), enter(max), len(0), inited_queue(FALSE) { - DBUG_ASSERT(s < ULONG_MAX); - if (!my_init_dynamic_array(&Q, el_size, s, alloc_inc)) + DBUG_ASSERT(size < ULONG_MAX); + if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc)) inited_queue= TRUE; } circular_buffer_queue () : inited_queue(FALSE) {} @@ -111,9 +111,9 @@ public: bool gt(ulong i, ulong k); // comparision of ordering of two entities /* index is within the valid range */ bool in(ulong k) { return !empty() && - (e > a ? (k >= e || k < a) : (k >= e && k < a)); } - bool empty() { return e == s; } - bool full() { return a == s; } + (enter > avail ? (k >= enter || k < avail) : (k >= enter && k < avail)); } + bool empty() { return enter == size; } + bool full() { return avail == size; } }; typedef struct st_slave_job_group @@ -245,26 +245,23 @@ public: DYNAMIC_ARRAY curr_group_exec_parts; // CGEP bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec - // @c last_group_done_index is for recovery, although can be viewed - // as statistics as well. - // C marks a T-event with the incremented group_cnt that is - // an index in GAQ; W stores it at the event execution. - // C polls the value periodically to maintain an array - // of the indexes in order to progress on GAQ's lwm, see @c next_event(). - // see @c Log_event::group_cnt. - volatile ulong last_group_done_index; // it's index in GAQ - List data_in_use; // events are still in use by SQL thread ulong id; TABLE *current_table; // rbr - RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */ - uint tables_to_lock_count; /* RBR: Count of tables to lock */ - table_mapping m_table_map; /* RBR: Mapping table-id to table */ + // RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */ + // uint tables_to_lock_count; /* RBR: Count of tables to lock */ + // table_mapping m_table_map; /* RBR: Mapping table-id to table */ // statictics - ulong wait_jobs; // to gather statistics how many times got idle + + /* + @c last_group_done_index is for statistics + to mean the index in GAQ of the last processed group. + */ + volatile ulong last_group_done_index; // it's index in GAQ + ulong wq_empty_waits; // to gather statistics how many times got idle ulong stmt_jobs; // how many jobs per stmt ulong trans_jobs; // how many jobs per trns volatile int curr_jobs; // the current assignments === modified file 'sql/rpl_slave.cc' --- a/sql/rpl_slave.cc 2011-06-24 12:38:19 +0000 +++ b/sql/rpl_slave.cc 2011-06-27 17:13:27 +0000 @@ -3826,7 +3826,7 @@ pthread_handler_t handle_slave_worker(vo "events processed = %lu " "hungry waits = %lu " "priv queue overfills = %llu " - ,w->id, w->stmt_jobs, w->wait_jobs, w->jobs.waited_overfill); + ,w->id, w->stmt_jobs, w->wq_size_waits, w->jobs.waited_overfill); mysql_cond_signal(&w->jobs_cond); // famous last goodbye mysql_mutex_unlock(&w->jobs_lock); @@ -4259,22 +4259,22 @@ int slave_start_single_worker(Relay_log_ w->bitmap_shifted= 0; w->workers= rli->workers; // shallow copying is sufficient w->this_worker= w; - w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0; + w->wq_size_waits= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0; w->id= i; w->current_table= NULL; w->usage_partition= 0; - w->last_group_done_index= rli->gaq->s; // out of range + w->last_group_done_index= rli->gaq->size; // out of range - w->jobs.a= 0; + w->jobs.avail= 0; w->jobs.len= 0; w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor w->jobs.waited_overfill= 0; - w->jobs.e= w->jobs.s= rli->mts_slave_worker_queue_len_max; - my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); - for (k= 0; k < w->jobs.s; k++) + w->jobs.enter= w->jobs.size= rli->mts_slave_worker_queue_len_max; + my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.size, 0); + for (k= 0; k < w->jobs.size; k++) insert_dynamic(&w->jobs.Q, (uchar*) &empty); - DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s); + DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size); w->wq_overrun_set= FALSE; set_dynamic(&rli->workers, (uchar*) &w, i); @@ -4348,9 +4348,10 @@ int slave_start_workers(Relay_log_info * rli->mts_pending_jobs_size= 0; rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max; - rli->mts_wqs_underrun_w_id= (ulong) -1; - rli->mts_wqs_overrun= 0; - rli->mts_wqs_oversize= FALSE; + rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF; + rli->mts_wq_excess= 0; + rli->mts_wq_overrun_cnt= 0; + rli->mts_wq_oversize= FALSE; rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap; rli->mts_worker_underrun_level= mts_worker_underrun_level; rli->mts_total_groups= 0; @@ -4472,7 +4473,7 @@ void slave_stop_workers(Relay_log_info * w->end_info(); - DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s); + DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size); delete_dynamic(&w->jobs.Q); delete_dynamic_element(&rli->workers, i); delete w; @@ -4480,10 +4481,13 @@ void slave_stop_workers(Relay_log_info * sql_print_information("MTS coordinator statistics: " "events processed = %lu " - "waits due a Worker queue full = %lu " - "waits due the total size = %lu " - "sleeps when Workers occupied = %lu " - ,rli->stmt_jobs, rli->mts_wqs_overfill_cnt, rli->wait_jobs, rli->mts_wqs_underrun_cnt); + "Worker queues filled over overrun level = %lu " + "waited due a Worker queue full = %lu " + "waited due the total size = %lu " + "sleept when Workers occupied = %lu ", + rli->stmt_jobs, rli->mts_wq_overrun_cnt, + rli->mts_wq_overfill_cnt, rli->wq_size_waits, + rli->mts_wq_no_underrun_cnt); DBUG_ASSERT(rli->pending_jobs == 0); DBUG_ASSERT(rli->mts_pending_jobs_size == 0); === modified file 'sql/rpl_slave.h' --- a/sql/rpl_slave.h 2011-06-09 15:27:47 +0000 +++ b/sql/rpl_slave.h 2011-06-27 17:13:27 +0000 @@ -55,6 +55,9 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_S #define MAX_SLAVE_ERROR 2000 +#define MTS_WORKER_UNDEF ((ulong) -1) +#define MTS_MAX_WORKERS 1024 + // Forward declarations class Relay_log_info; class Master_info; === modified file 'sql/sys_vars.cc' --- a/sql/sys_vars.cc 2011-06-17 15:34:16 +0000 +++ b/sql/sys_vars.cc 2011-06-27 17:13:27 +0000 @@ -3353,7 +3353,7 @@ static Sys_var_ulong Sys_slave_parallel_ "slave_parallel_workers", "Number of worker threads for executing events in parallel ", GLOBAL_VAR(opt_mts_slave_parallel_workers), CMD_LINE(REQUIRED_ARG), - VALID_RANGE(0, 1024), DEFAULT(0), BLOCK_SIZE(1)); + VALID_RANGE(0, MTS_MAX_WORKERS), DEFAULT(0), BLOCK_SIZE(1)); static Sys_var_ulonglong Sys_mts_pending_jobs_size_max( "slave_pending_jobs_size_max", --===============1731202208== MIME-Version: 1.0 Content-Type: text/bzr-bundle; charset="us-ascii"; name="bzr/andrei.elkin@stripped" Content-Transfer-Encoding: 7bit Content-Disposition: inline # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: andrei.elkin@stripped # target_branch: file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-\ # mr-wl5569/ # testament_sha1: 198de9219f13844b6bb05273cc17e3ee7586061d # timestamp: 2011-06-27 20:13:42 +0300 # source_branch: file:///home/andrei/MySQL/BZR/2a-23May/mysql-trunk/ # base_revision_id: andrei.elkin@stripped\ # buljr7f1ib9a7y49 # # Begin bundle IyBCYXphYXIgcmV2aXNpb24gYnVuZGxlIHY0CiMKQlpoOTFBWSZTWTWiWxkAF0R/gH/dh7h7//// /+//+r////5gLR973O9u7d7r3fONfJbw591B72a21syFTvvu7Web3dZzdm8wXvdVNeXW2pNvutWd BtbfZvPfPntxDTFHRpRooZtWlPiaKcVrQ2VTam2rKsqlNtVSb65zLaKraZlk1qlue7vVtlvN3Y1j NAbNT4YkBEYBMpk0xJPTFT0anqPU3kZKeKP1CNNPU2oNAGmhkND2qDQJkAhATQRMBNGnqgNGgaae oAABoAaADQGp6aCEhNNJqPU9pTI0MajQNAAAAAAA0AA0Ak1IgTQIyE0k9G1T8qP1GRPJpMmkzU8o 2o9IaMQ0NGQ0NGahhEkRGjQhkMiaAEmbSam1T9BMk8p6QxND1HpqNHqNABoeoCREIBAmQAJk0nkm TTUn6j1R7VPIajJhGmmjyJ5Q09QNAOAjwTAS+pgkl5//l6fbZ+Uy5/MwN13hg/ZUPjygH9mUtLJt z3WhdFlMrYKOuXr19XqTiey/ya9AbWbU1lom8dQIH48XYcPBf69yEnrlGj+xUsO12b6xwffkVDve IJ9ACQSBS+y3V/76mbQ7W/076NvDk/3tgkF9uIcacP7d5f6+EQ8oWhM90K1kEOESxccVnQX6WaKg 4MICkIZ6IrE9lLM3OIsSm+qIKWupQ7d0mLoNgQHw+wOTn6KuYhCP30p4TauAsK1+TqJins4jx8JN ccsb+NVV1fIPeNgr2mTd2jp6TCADIWzs27mgHZQcrd0/r2auBYg0IrvWcp0u8IoxGo8/9M46OoUM i3L0+NAXJPE3EbGWDXD1zI2XNRnPUTlNn2joYyQnEkqQu7KPwKNiLclU06jE1JBrhFOaNTH7UIPI fFnqYSBkQhtJVBmQOW7RuF800X9hv359y3dWhjN9tV0BDrCKTdNrAwYura+zt15Q8h6E7t3awt7p keSuTLT5Fu+nPqi/Xfdty23Kk81eLsqIWbQrmYNbHDQijpC7uYQ68tLQmrgZs4I58dal21vPfFRq /44tcVJeb1dpmb3icO9VBe9RdzCFSKLKLVNmlRJt0TlC2ls3IX3J1vwgOc9wYF03kKAgARSOnnHp 83Xg4dlrOW6jbF2QQFqMSIIsYokBcWAh3RUdn4UlEQJP3FULIsIRhAUUJEQIZ/vo/N+ypME5a2Ft AWfHYKQUVQSAgjFGMU+MaAUNz60hYSSKKTwZJTBQiwRhAO/7e916tQu4DmXUY+Mu8zHShFrU7agJ o/UMeoppMNkN8Pu9nw3t9vhw4ih75g87VdM/yfLf5DCFQQEdTWjW+Fgoo4hzCOnW7BEUyvcP3+aL 7QaREUrdnpsz8tM483HwUoN21WIfZU1ZtmXEOHaD/f+Beb6U1XMGa95pJeo2edSBPva/YNEyzLGU 95AUEUVIsWP4SGPMhoQYCgLJBVVSLAVRZJM/v3nJzEjwXEEbUQR1o9XUOtuuHYl5+l30b7311Awl u7B200VmOrRG1QaHEJkwNRO7GrCZgLOJsrhxDaabERTzodNHuCGfTVD3uhck6sFheGqWtqIyaplT WmIVEGGYzgAXzJyVL4dWGCcMSKvYLDjSz0qODJGmGtk5KLGmYNfFK5lkRNG2mWDJrgYpAl5Z4F4x EGoYIhk0OzFLMguCXnGHycmdN6zRZoi7HLGs0pLM7SeXwNX4bFrHs8l/cPEzvBqonw+GCe1V0DwV SKq3G2O6/mw2W2hpgjMfP6WamJX+vCvzV2fPTR910dpBPsoZYfGeesiDMJt5AScrVdRfVxKPd8zd 419gGoi3oopgDXGV3moOfCSZdPgsZ2wGsmcrFQuqpgtxF0p1FnMu+I3MfpvzokbWTlLwKEHFQprW mzxOtNdg4aQldBeu5t8TqoXAM2CLAlTdT5WN+cnmcnrPts4dBk1xmJdht8SR3d1PVtscUA5qL8HA 3P/2QnDPt2D3C39zdoMLIU6xbbfdBgTEhjhuFCDMYcrTDbVpXDOdB1SgKRTycLSzyQ3l3mKgP1cq tLblHdzR9BC9oyQ33x9KXG+1K33DYwwwYNCbhom227LkkPUpZYaMa7H6kNCB3lIoMgkBQRngxvx0 bzlhqRdYt04ZM2GbpDVgQzMDr4EiXPCE49WeOGcGOyfDl/GnTwpPJEDwnLghMdjUtRCp8Ruseey6 iu92gN12VnMNLLtnqoWgHVb4BRO+nceZXHNoUJo61rc6xKQ3sGnL4TsQgJAExQQok0vGN7CLxA1e lvgf3ldD59paDYHlvPgcZ7T1wWDWRZtFDl8ndAoPP5No9NIOhiAglV+kdxv2hoMNw9CcDeOpAiwr d3XVDs7taQkHhg4oH4kF+Lm3pHHep8Ldwz1buO+cH6sAtWnw6xnlYPDafbb008G+dMnCO2eK772o XklRaDeukqi+/Y9nU+wzVgsfdCUWIHjBEqbqnRjZX8GNo8A8NcRkhNXIWBY0pMOLeg4W0q5sQyQb Tv4mFDg5rl0oPPByM+YtyDhXDkt9INrNNuYIcwq3jlXSUAh3SkhsZgw42t5RtlbTtj1Q9XMtxr7e qjFRIQcDsvxCGDsNa6kvQdgFPjZBESyJHAU8nEOJCQd5E7oT4oFUWVYLJn7LfA0s8x5JjSIT2q1t 1o6UC+5nBJLawctTtYuVwxYVWXqWaNXfbjC8XBhXRiuD4Wv7t6snbYNRK+wMVk5Jbps3PsAIIMN4 Le1XnwIxcvIqDOtAlzFxd9Tsm0sw02QCp1r2r3aRZifmnFEveSRNHHDTpTWeZQVjQMoXyqpAxh02 jPkpIa8UDmGE1CFUHLtWK9pkFC8uy3wGGPikomQQQRODrZSZrqpvg2UUKXV228L32EbEEKODM40B pcJMVGufKNG7a+nds413KoumhKspCqOOwu5uqd8leEcW8TtgcPftFtmxldle1Ot0yCOgvtF4FI64 NpkRmEUEQYu7g4k/zCQRWxgOz0dnrTN7/phPDfwbz0afXJyTVsbb+fJgZ+kR642ZR1X5ztu3mnf7 +FJ3dQBU3YnT+k++v6tCDhH9ku3pAh+P2/yPshvyV/CId6CwSTZBGQUVSRVieHN76r+P46t6TGH9 yb/PyjeJoi6e31EwqMBAQECBTWvKj5J2UU0bh5oBotdFidV+yv3GmExrvPCi7/VsMwa54IAu/UzG Sgw7uvC387vHllTvoplCH0JtuVIsqjy2JVjjs0wzZZ8i1b5KfscOjhZDjSIOWBKsmBlxRaxAy3LY fgsH3fhHV8m10CJJC9MkvQ/o6sqPws0uUe19Wg0vo8lVapjB+beztv1jwhqs5E84vvhIL0IWAQVg QonLxsNwReA48xuuM8qsZH16TzPXkwmNaa9JOoPCyJZekOZugic/WERgEtdidBrILvNBfhr/rC+P s1qfmxqTy0Xfuqvx6vgyl+RXR0wtjx7LOw3EvkrZystntRziYvOPRxjBR/Zp46KTAcDvh+z9v+Fg PWOVKG6AfmCAv4ngOF+szHBiwUSDAu1h7NqWfRSJbABDf+Uj27aP2Ib/FtEhqhjW/vyv/evDfLRm pt+UQop5xGckeYgZfzqcD3jXSyqku9jxRKCzzonBrfVtJE/zsC3Ij8vfh2szolzneU/MU4OBYbPF g08UQ/S4eBD5Mn7E1GwPtih27E/X8mxOow7Z4uNZAnIa8gTDWmuJfCmyRGMiYIhEwTYVLKrbaAVQ mYihmxNg5KVh2JHdB9+KhUqf1LPq4lIFmu/6hf76vibM3tMrYaN720npOij5+4FAAysOrj27/bHt rpBcPiYGggTxFeOE6DiO0cfXAi0IvqSkj6UR6h0mbJ4ZZNfNKZ4t0KNbulbctg9eXXfSVCldaqFJ pXGgkG09YWSArQh2uahSpTiJogxYjAZn8EI8D4AC5oPh8Pm+H20/w/NIj1AuY/0ReilQgC0P6uEv pYTkwl1/M5yIPPq6NYh+Dt8CyfdVjZkaO/T/T4fV+N9TjjISkmDD4mQukPYn4s69DJDFhowBZoz+ JkzTV10x+5x3H0q4G3ZSTlOTo1K0QSpxHQTYmlGZqm5VFZA5QMEEKFlnnNSSxbHHFXXF55E3DQWT WWu9zLyMpulNfJksy5PCL3ZNCDGxTC1WsiywLirveUAnlhVEIIQXVhFbY3KdSRoWWB7grhc92Owv Kk+3RezSqZt1/MpEIhrW1UwgFo762vs0MXlsCzGTm5fOy1osonKqgL+ijzWgcGx1kOZkQCfFKek6 jjhxMHEyBDpbNNNhSMjMgdonxHGfq50RVKhOAKVZpZpx5V1dJoN9xbBfxRpqzAUrgwH/HqLGnDeC gWAKpv/Bwwob1no9AAlxXoJlkB9095iQVJcDqQBRW432GJi/DoWZjTpvWDvt9/hUDZbXdhkgqa5k b4xHGYFjjEXmBzqCAUFUbgOgAzfBEXHkmoPiYGZUytomlUQI8LdNalcbzCleyohLEkawCx28WeON Y71foqUZXlKq6q1FStDJZ2XZgoAeBcsZC8BgEHao67oWQx4kHZmXOdxowGFV3WeCDaZgOirnNycZ HhrKY3qums2E2KkHJJz56sUKptMNZ9C3FjWfQqRyi0YC/hLSENorPUaj10OBcQHA95wBqNVs8gBB MSrXs9mBoUGT8JZQmbU576ICGootKkU52jHGYvWOaDMggXNGWRACc4YZ2s1suJkjqM6SQtlQumUF OIMTUeJBJk5g+BYHouW+dY47kZUwBpqJs4Ci2Fjo0uyd96jVKxQbjEg2lTJByL4l1IOsA8/MeBd7 xoBtLHkbVezAOrfg+1XKGCFPFF7ppKToPZIShrRWL4lssIJzBgF4i6CnsBwBkqkeFYZoCMzFRhB6 NKgXWtgYEaxrMrF1orQxNVJavTh0EGkBkYPAJRUdqi1hNcvEqGDPPWACdixRDkSYE0Hl23k0fWBi fEfqZ95mx5QkbbYjNZRVNR3rtNoxneayLj8aDo7O5ehXLfympm5hzSTtiIp0dRh2RFCJLfOcEtOB eUGm3dXEpgUQiWKYNlDSC4cMcsN6KkIqUx4yAyEh7BxQVBUEcSkJm7wGExEfWMSQ12w85qpFFy5d JpiNZ6btSmjSM62Pg55EhxyMwW/RQUlahm8k0OogmnkvWuvD4K5Qtq9OU04gbzpwnaA2ytpHbykp iHe6dC0XhjctyygMArlCWFH9tuAMAAR6wWcNUpeUAZ1tjBtcdFmcQEMGvTshv5oE43DiT1bfZJax xjcHLFygYFO2OIahOE1THUCwKrAyxRcRiW6lx9VVku1Bh56LNStcaRprDCNclkkKnbLK4vTBLhu1 yQBbka9gM0IRkIWEAKajoqma7EFc6nIYYMpF7bjHImb6PhQoFOgcHCYSYg5DiPaJSmLvNEbAl4iq jCW5AL9QTewMbxMLIENfVfncJfmvXMAsryzgcXB6CnVMdABpETzHeagAdpxORvOEwBidOWnDNOxc loqjxZTaETI1lRplMqK6BzMpOXXxKsg46UAFAxIVwtbcipN3gabC2gi3gEhW/QbUOLBdBVADWTuE GrMMiQHmL26jYXAgWncnMa1WDUTdbPOAuDSeUkoierFMAGCo3pvBGRk94OcpArmjowpfYXEC8hAD hggYLZgmErkMSgoIDw18TARYHoiEAtXQaXgYDGvDJ6Uzg3mWeRvyfWBU5DDheTsMjcTvJNaTHtHL x8owPDbq1vMf1ndQfn/50XPFQUQEYLiGq7El9xqA2clecfMFDpSOui6Oe+1ByIuL6wRXPSyRRIoo AmBBK1nKX6Pra9tIciFDzez3d3i8pCZAmXKS9e3P1KPyVtxexiNn6GYJOYw3w8TyY4t7Qh9SLgqE YBDoQVFGedXTbjwPE4nq4ZTZWVWdahVqFimtqBSpyMA9tfR92WTJ01UOrK30UK1t8g+5HFD8eI11 rtBJnQKMHbiLX2PR5eY5yaVe+QtutS9Qgroe9aXeNMDetDCU4NhBk/7hI/Gv8aG7CRxDT/GcAM89 N0TY3aE4OX0VJfSQtJZgYrlExrnNy3Z0qxE2bm7DB8MxtV8b4GuGltjGxsQgCJfYDzVuSVybUKYW TaTnRbt+17WwAwMIP17ce1YRUuhQSnGWGIdqY2D0EXQiyVJCSIE5gf1QUPL/OFQYH2RTgomuKiNH iY/xROfltJEAsD/T8/4edfv+jEPvH+8JPfmP1yY/gyVYdge8AokVcv33xn6SAevncmHz19ew1tA/ yl+EsGbP1VSjPZVIiKs5SDu0CD5ww93x+4ohFLGkA1ZpES1/SZkgzgXEjKQMXrz83EwgyM+642vx t0MGf76NgL0hen6AzfB/ntxWswRNd2a6R6XCTtwf5mLWn29me0CrpOKmqT2ycPbB4vG9K/blUoxv ZChGDQU2fw0SOXrWVuCqqFUrwP4SRvNfKMORaclqcglSU3jlqCm0izBj/2+WQn2QhlbDG2XY2yu1 2HAXO/jhsyFedeLSvHAkS5rhnCic671Hb0utRmGgg80UyDugawqof92LB6JrnYbNqjWdw9m6+5u6 o/6wPCg9GKWN0JE/bM1/o4KuKF9eamLx4f6ZjZ4GkCkCUxd4QGSJSJNFyBZcNGPmHnP6h3DqC7xu 5+Uo+p8dHEj1O66V79+Zu8ksVjXf5cVNTipzfc+ze8Pp+Qt8tjG5/b9dM/PKJDLFyckMBnL7sNI/ SVGhXCPrKL91/4VVoBrI0uQXBfsFaQvFAo+mYGw+0eAhw2tkhYQxYG3GRZgWoWCWszG79qCBoDMu DBBoXmmMEQXzLhuQwGgsoJBdb+lDAU0mYuBuzlOiDZDeAZGzY6wysQENJYxKr+dI2pUEXmFi6gZZ UIcZalrNhdosUI12V9xdiDEYrATV7/ONkwUEQtgqiyKBTQ1IxpldxUFAuFeIHfjfeYm8Vrg3LQnR JeYnQTjNuoKcBgbAYmVLMGTVznBIBxkWOHiA2g+9ML7vlID2JoKI+7pD9Z0iRH1ms5QFuNxtPsZ+ lXEfYFjT9oFcSTecILFD68v3FbDSPU1gbDeq9G3cJBzlTToaTzUEjrtNhJTUmlTFHx2DzqhuibYA QLtiQmHoEPiWkaKnmOO32l5FFifSa7jWW9ARR3THe7wdkG2wlAwDHrLywDeExIWIA+hmOGYj8Z8E hiK/VpEz3zYBq6N0WFO6312+qFvhXBfPAOw6XxCo5bwlJDvFguRXFUamJ9Ice4w3aEYBuc9hQzaP ywEI2KzA1ocxclBCWaFXI1xM3ZOHOoe922COLGLK5FS7vLUijmYnCwjluEQLsqtX4QTpoKlBUEiU yigUFWP1emhttOTAF1z2rYlrKoT5fp8Fx9UEJHgfsPtKyetdlS3A9RegauOs2AZFzEJpXm3tf2X9 frOk+1WFmd1xOCfKBnxHF3qtWthw3NjANiwNAFY+pr0fLA8/wK+nx6G1YEIisgTKcYgeuWxDlju8 5IE//6FU2wpXqRzUA8ZSXjQwOsi8JNDqPlHRezisUsXnGdRddzkR5jbl2IvLzYbg41rCH6eR8CZC jORgN8fLzVwGMuscR6QRmaiET1M2q/ZyGw9DNC/JE0g96DVgpGSYGJdgYNQ9SXpKCjV8hZZrXazm WL0qGUKLChfk9PByOCuEmYoRLxxw2WTFEyo6i6UHfAwhkSoOJ4Hg8XVeE5iHRAoscSonZOwz1rcT bBcpOWgVFXVjjEepjcBQisk+ohZuRlRpEjB1rVfloqn/dKVVMkmWBhhLsXAzpRElFnBLyT3yFA1V OXm6uksd5irzeeolcoZeonvGlsBdxkuDFsBsegwhmazeGTMGmiQiJps5IUlzOpdCjiMTuMSoc5zs P7Ihe0XUxHHSRd11xIsDE4HyqexYnkSvUt15+eO/uNjYbxhPLh+9U3M+J4WoILRHE6zCwdSmvnQ4 3NGVCPqrVoQ5UEnWOIILn0XQJ4q8uDEdC8RE1LsfkNta+BSDv0kvDxvUpKDs1pxX2ynbC/21DB0Z aCBY4QcQ8RFUYGVvZQ7oCGArvKUG8jYMlquxzF0AiBWa6SrTAPRmOmEFXSDVkBuyWYMMtYZDQJ0S SzDAVdcmjpkB2YtIwFAehkCUyMyIwdjM4mJvGESuaLCQNJiNOBxms2+LDznrFHifEofaVJ23ZPuY /3wZUXpLjwx+X2YmhZnlt4+YA6GAmIzDrYbY1bt7DkCdo9oO1HXc1sHrM9VrjoAdZmdRf16YkQNb Sn5M8xGBU1AL28x0mBIp3cRTMf4tRA8+k0xRs81WcJlefQJ/FYjkYUiMqUjvrI63k6TeYQZVtLOW LjQx2Dbbpcxpu1B6aen5e8n++2PZ1WrI3LtOhCGp97nXA5NwQ54ex8fWbodXPuh6pfZnifzIGLbS 774bBpAVNlQv5gEU7iZncnKZMFCSQKn2nrLHSxDQxsWuxE7CLi9LzQ0HrQk8Do1lrZOPSO2b2RUd gI9dSWRSWaEgUQRGr7PuiCSmqgMRRGIkUUUH4UTnUwxmG5tZFBRQUzCd4GwWJb3aFq6QtZ1o3tJi Tb274kiJ5IVDVItd7o+2YGVwUuESipWaQpQFkUIiEVA4y1qeXRbtu1IJIBrpDXCRLaeOoNNplIZ9 H+GV9JO/qzQ7Oyl8qbc4MURRBBS1mDTZIGY2PKDsFNY8OEcHaCeFihaj5Cli3xFtly/Vj2y72VKJ pIMl3YmlkLu1wunb9NU2kEY1FFBWV06GZ07amK+yZ5yBqSGnVZKBH9Ffp/1uWTw7aO5hxg+0/JVU wpKBhQNUQUg0LqSdfmiQ6echmnWAaQM4wmuGqPmYZOKJl2h2CJETOAPwRMCPeIK+8r4e4D1niSSe YlB7jyIMRSMsUZjWhoe/jNwsDE9xZC9w9C49/voZnoyXn1GtI3lqHkgzr9Ymv4RIJqE8ikMlglvF PI9vYhrfCq6A+j7ISO8GzCsh4H+sfKsziUpcGr2bUzHwyAXDzwr0RQyJIiyMg8mGgNLRw1NYDWhF sITBoriHjBze9OpkAoAXKfThX5LzRht+OI0jdkDWeZiUoIOxBZGMkAbkNJiMNkKS82LH5Zbp74F5 9fX7PHnakpqUjX3kG6AXtA6J8Dh2WMKFKc7Kw3WD9HbV4oliISEIEVQFAaOp8mRtBMgMiYNZVXtx Idtj1riXrO3e2cmPO9ij68e30vlLHcfiBEqjdqQwbwkfrIIBdQIkueHa49U0mhTeaTmPD7HvA3ml 7+S3CfaCdvGyk60Xd0iOz4mDfUc5+lh1qgXZLJCdO7LrccDf0d+8rdn8HbSyRQE+BqCJ9HBHTx5P MIdZ875AH6X1Cp54AeB3iTokCpJOe2gmgLMQw+yw3oH1dP2medzZmrFC1lA5nekgct/ToDFHnrAj CEY7g2OVctmKlbnhd5IdYTuz+GYdzO2Td07L5t5xl7lhGEc/CGOsc3EVm+undquw0B2ELNHNsN3G dFAn2v42ezwZHRVW6zRSlrEA7Pdp0+Pacz1wSQiMU6z3VxdxuNEB2DBdfDixQoocuIjpNKEXRyHN MpqCDZVhcercUFtYGJJH2xakiQ+emobO5TQureaDNueDGBW0U3mHTcnz1WYjhCFfRQBDagL7k2T3 Z82/meqcJxnSCMnOqY1Xy+cKs3JTyaTrb4MIBA7ykpGjxWbEIJiI4gKogiiKIrFERhOkDdOc+Oe9 PcNKZBQoURLPa+nx7+Ljo0/c2OlzQiIh7nHoFnEIHRCYFcSmUeORPTaglVSyB9im6iDRejZefT31 McslZSNjCQswPTWAfNyz7oOp1el0TNo8GoagYECBmqZol4jNG7Ev6IFznznM66KV0NwHgcYRiirE ZEGIFoTMv6Eh7W6d4kFHfVII7iQEklAOP7WxrBbkGWYw/930zsLWu7cbg+YpwgJJ6ngHR0D5b+/W ZouQ08nwmYKnVrPb2GGRCgPu1qBttmLyxLWOTJLAXvRmdGYlHDiqMVixTwwqW+rN+qqaujfUkibo tTVWrtdtw8TteGP1aHzSGhiGk6TS3RPMba1V9E7MzABmRUSkapqhYiEH5PyPSA+7AMBnMm+xLEqb p0x/h9ZawCL5oGDVnuEy0iz4OGpoUCkEzhilY17/bwslqoKc3F99SZGThqvLqXMUpMBzG+L9byQ+ DS5Jug4PGIabQNhGGcG2xSUZwNZRLeWdeG+BU9cmz7RFJFFEVERiKe6wpji/E5AHwE+Kxk7yvwsW yymkRjyNNYx6x80kmhQhA1C2khC/NSChWlij2ALiAS4Idwg/OaLgySTJUqylhE2AIq4fqV0ku5wI hICCqCRqExFKdKrtLjQTipkl+6+lHA0Via27bZAzMQwDGBIFDaIXRpuVZpY2C8ULgF0X3rqBRghe UKiKTPaF3cYFQwZa98Ns20s4tccAA4ARmxkiwRIIjIgjFIsGMEGRGjY8eVXdd1YRcXURRQBFlJ9e sqLUGn6Du00uQR/QtyDTNRBCNShlYHEpEki8mgoSKiKbkKFQWWRBaqhpmDJL3SmQS8Tx4lpjdAsp FAxgyKVRCmAwfHWUFGJ1hIO9cDYfjZDN1gMVgPGTzh5SDEDqg+CCVGEasUAxZMTOu9pC/oXGGDIN ggdgUPqPfQDcQVQ9tusgXpKUT5ueHF2ZIYnYaUB6wO/V9pIRSA0K/5w9iFojF9K5KoZl0L5F4eL4 rY8mBl7KIfBCCetrsNL12H5YGuPhDng+6ClWEdNRD0daoroqkAWQHR6RVtkqsHOfKRtuxRVUOJHz 0gJG97VUsQpJGfAKGmkYX0oW7Q8r9hAGCyIisAWQAVQUgMVwJNoXVs2Qb2eSYtp56jNSzj6qzrWB lmG0N0pVQg05mp6bzjEtPv+kRDOxSMBSVfAdqdmJUNO/22HyeNRaSi1rSyPpaf1Ofobk2/ZCT6Jy 6tvdoTPXVDEOJE1DY18PEhRoPdQkBh+6erGoFxNOSWUpT405zyN10HzMgNCWt5t2j4kOpRlledCQ GzorRZI7Gg13lNRRHkzgK9jZZBKK3EQNG6ivu2XxnIN3+/f63gLIfSsuheZaI6kOL5Hi9ymBFAp8 70gpmzxOaGO0OjhGgZGPkre3DXqCoqjB3DGMgUKIkRBaKhKQpgUwUiwKGBTFk8VSoqRjYYmlgIGy JhA6PaB4LBpgVBokgzSU8oBoDBpY3aD8gDiCOdi4LAk4i8Vip4jwU39apcqIPUqxTMe+SJo1JVrV enk5mowyPiufCCGBRvCPknSEXSYJmoiE7rziOj7mpUrz1Cn/O3iJYpgex5GzhaQ2qe17mt05b50W erWHiarudLyREI305nB1yoc7Wi8mNioKNFfuoWjk0XKoR/EErC1YomtHcnpehCrbBCoV+VCuddzl 4E7mteHSDd8SIYN3B/Czb6fq9LVU1e1TuqpKqpaitEq96txN5O2dFktJawSCICMDuEoYxbVSVNqD YAHyYNWMfm23UvwCJiZIIhlIMCvnmEfC8Dk2C8joXnrSIDLAX/HcvT4EH2HjltCJnYRiIQBTGQxB By8YiNc1ebiRUEQsTKosefPX1QoykCWLRCKjSw2JMypjEDUymWbHviffOdK3bcOBzkY4XvMb0iRF fFsySSjo06VxxxC4kh1T1E9a+C18WRjreBlBlOxk0jZEBI7KFfKCOEw04ZYuwUr22oTnu1BJmcxS IYGSnZ8teM0dQPkuZybILiB+XmWVzOaqnsOKcLXDycDLHm82ONX2oZc2CmQmOsw0XbK/VC8OzaVF MaLaJDc6TecqLJdbie6yVDHGUmF2qqrWbVBKpovM0gpRNS0gk/IesI5EHS1FdqYNPKSBPE31C1EK 6jg8XUwxnQ0GInVOqqqqqqqqqqqz0E2Ici6EFLziK9bORStLT1Tityr32aTVlQf6FKCC4z4nv9pC RwFawy5GKFeFh4QlaMmjinTsrF2V5ZaZM5Ze/l0PcK8EdvotVtSFtHVYdTDTDYCtZpZrm6axHHSR AKhcbsgTDObBhAYYktWHl7IENdiSYI0rR3603CxoWyLC8uOKoitIGUhuCxGLgtYlNpD10DqXivzv x6E1MDGq2wpI82xw1uC7aI2tBxEuM8cjOAQwDC1QXICkOs4qRnclw6TIhMnrWWBWsG9hNOnwWk4F t+GUlR7LUiSaJCINYtReXAFi89cSkNggpECC+wWcq9anJ4d3fnNgCoIpcd3lkgxEMsB398ZEQvOe 2PeUIRgzs0QpH80VDs68+Fi5hLPPSavWho0QIdQmEo79AuvXiUXGuLAd5SOasA68lqUg8zTiCGpu aS1VOFEN0OPdNkm7VBYxJOKQ7ELDJF/cm/83IhaHsebv6khtJNkucvBCjtc3oxyYwGdTJ1IjVFFN XvYsxiLzaDPYF0kkFC4OLgE0Gnyc7cQsTYPsbO3rieQN97qZvpdbrQ8b2k3nr/R1DXShrehtAopl A/F/jAhNJHiuzl7QFAe7I9HC81nQr+mDA1mYHkGCDzQc5p8PYp1dKrpNYGhO1GG0hS4DcDB0UyJM KBpIrSbePMXLW7uJgSz0bsbIkR1UGmG5owmUMasAWbBfF3IfGPc8lTwDAMRF5TmHenZpbmdiiiqA GueTSnRFB3sS/Agjqi3J3vJuWcbRG3t1FJoOLvAcXSTD7fm7Swt3A3cR6XgOC4lP5a0kPjPxlDgB DIKqdzpsPvEE3w63bSWIaDv97lianFxXKgoI7/uR5erwmBd/+SkwlFeLTgNMq8gNFvyWmg7VkkwB e3KeFAgeukyJDhOEQoHCUM1h1r5zj8mpiFDUpEQzvcSI90ykUExC/S2sPpfblfBfg9b++/zer4XH FCAOjJ3+GPm74UtSiVMeqaSMmNxVQsv5QMlcqP0kXITcK45dtT1bxeJyUiaB/Xq0tzUg+9PYO9vT y0dKbOd+ewgvS9fS63a8ETynn8x6hun/IkUpiRiXuUDgI8ns/+LuSKcKEga0S2Mg --===============1731202208==--