From: Andrei Elkin Date: November 20 2010 5:24pm Subject: bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3213 to 3214) WL#5569 List-Archive: http://lists.mysql.com/commits/124556 Message-Id: <201011201724.oAKHOIUk013929@mysql1000.dsl.inet.fi> MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="===============0701247318==" --===============0701247318== MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Content-Disposition: inline 3214 Andrei Elkin 2010-11-20 wl#5569 MTS Worker pool start, stop, kills, error out implementation. @ mysql-test/extra/rpl_tests/rpl_parallel_load.test increasing the load param to get more reliable benchmarking data out of the test. @ mysql-test/suite/rpl/r/rpl_parallel_start_stop.result a new tests results. @ mysql-test/suite/rpl/t/rpl_parallel_start_stop.test worker pool start, stop, kills, errors testing. @ sql/log_event.cc removing a false and unnessary extention-arg to exit_cond(); Refining start-stop alg to base on the Worker private info, not the common info. In particular handshakes organized through magic value of length of the Worker private queue to is set by an initiator. @ sql/rpl_slave.cc Starting a worker thread with passing its Slave_worker * pointer. Simplifying and refining start-stop. @ sql/sql_class.h removing a false and unnessary extention-arg to exit_cond(); @ sql/sys_vars.cc Reckoning a magic value outside of the valid range for pending_jobs. added: mysql-test/suite/rpl/r/rpl_parallel_start_stop.result mysql-test/suite/rpl/t/rpl_parallel_start_stop.test modified: mysql-test/extra/rpl_tests/rpl_parallel_load.test sql/log_event.cc sql/rpl_rli.h sql/rpl_rli_pdb.h sql/rpl_slave.cc sql/sql_class.h sql/sys_vars.cc 3213 Andrei Elkin 2010-11-19 wl#5569 recovery interfaces for wl#5599 implementation. The essence of this patch is to provide GAQ object implimentation and valid life cycle. The checkpoint handler prior to call store methods of wl#5599 is supposed to invoke rli->gaq->move_queue_head(&rli->workers). See a simulation of that near ev->update_pos() of the mail sql thread loop. The checkpoint info is composed as instance of Slave_job_group to reside as rli->gap->lwm. Todo: uncomment + // delete ev; // after ev->update_pos() event is garbage once the real checkpoint has been done. Todo: the real implemention needs to take care of filing Slave_job_group::update_current_binlog as initially so at time of executing Rotate/FD methods. + // experimental checkpoint per each scheduling attempt + // logics of next_event() + + rli->gaq->move_queue_head(&rli->workers); @ sql/log_event.cc Log_event::get_slave_worker_id() got shaped more to the final version with elements necessary to rli->gaq lify cycle. @ sql/log_event.h Log_event::mts_group_cnt is added as a part of GAQ index propagation path from C to W. @ sql/rpl_rli.h Further extension to RLI necessary to the distribution hash function (APH). @ sql/rpl_rli_pdb.cc Implementing circular_buffer_queue::*queue and few other methods incl ulong Slave_committed_queue::move_queue_head() the main concern for checkpoint. @ sql/rpl_rli_pdb.h Extending classes with few new member definitions necessary for GAQ interface / checkpoint / recovery. @ sql/rpl_slave.cc Simulation of the lwm-checkpoint and changes due to rpl_rli_pdb classes extensions. modified: sql/log_event.cc sql/log_event.h sql/rpl_rli.h sql/rpl_rli_pdb.cc sql/rpl_rli_pdb.h sql/rpl_slave.cc === modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test' --- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-11-18 14:00:52 +0000 +++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-11-20 17:23:42 +0000 @@ -13,7 +13,7 @@ let $workers = `select @@global.slave_pa # # load volume parameter # -let $iter = 1000; +let $iter = 2000; connection slave; === added file 'mysql-test/suite/rpl/r/rpl_parallel_start_stop.result' --- a/mysql-test/suite/rpl/r/rpl_parallel_start_stop.result 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/rpl/r/rpl_parallel_start_stop.result 2010-11-20 17:23:42 +0000 @@ -0,0 +1,30 @@ +stop slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +reset master; +reset slave; +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; +start slave; +create view worker_proc_list as SELECT id from Information_Schema.processlist +where state like 'Waiting for an event from sql thread%'; +create view coord_proc_list as SELECT id from Information_Schema.processlist where state like 'Slave has read all relay log%'; +include/stop_slave.inc +set @save.slave_exec_mode= @@global.slave_exec_mode; +set @@global.slave_exec_mode = 'Parallel'; +include/start_slave.inc +select min(id) from worker_proc_list into @w_id; +kill query @w_id; +include/start_slave.inc +select id from coord_proc_list into @c_id; +kill query @c_id; +include/start_slave.inc +CREATE TABLE t1 (a int primary key); +insert into t1 values (1),(2); +insert into t1 values (3); +insert into t1 values (3); +delete from t1; +include/start_slave.inc +drop table t1; +drop view worker_proc_list; +drop view coord_proc_list; +set @@global.slave_exec_mode= @save.slave_exec_mode; +end of the tests === added file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test' --- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 1970-01-01 00:00:00 +0000 +++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 2010-11-20 17:23:42 +0000 @@ -0,0 +1,116 @@ +# +# WL#5569 MTS +# +# The test checks START and STOP, graceful, killing or +# due to an error of a Worker. +# + +source include/master-slave.inc; + +connection slave; + +create view worker_proc_list as SELECT id from Information_Schema.processlist + where state like 'Waiting for an event from sql thread%'; +create view coord_proc_list as SELECT id from Information_Schema.processlist where state like 'Slave has read all relay log%'; + +source include/stop_slave.inc; + +set @save.slave_exec_mode= @@global.slave_exec_mode; +set @@global.slave_exec_mode = 'Parallel'; +source include/start_slave.inc; + +let $count= `select @@global.slave_parallel_workers`; +let $table= worker_proc_list; +source include/wait_until_rows_count.inc; + +# +# KILL of a Worker stops the pool and the Coordinator +# +select min(id) from worker_proc_list into @w_id; +kill query @w_id; + +let $count= 0; +let $table= worker_proc_list; +source include/wait_until_rows_count.inc; + +source include/wait_for_slave_sql_to_stop.inc; + +# +# KILL of the Coordinator stops the pool as well +# +source include/start_slave.inc; + +# testing of the poll is up + +let $count= `select @@global.slave_parallel_workers`; +let $table= worker_proc_list; +source include/wait_until_rows_count.inc; + +let $count= 1; +let $table= coord_proc_list; +source include/wait_until_rows_count.inc; + +select id from coord_proc_list into @c_id; + +--disable_query_log +--disable_result_log +#select sleep(300); +--enable_query_log +--enable_result_log + +kill query @c_id; + +let $count= 0; +let $table= worker_proc_list; +source include/wait_until_rows_count.inc; + +source include/wait_for_slave_sql_to_stop.inc; + +source include/start_slave.inc; + +# +# Errored-out Worker stops the pool and the Coordinator +# +connection master; + +# make some load + +CREATE TABLE t1 (a int primary key); + +insert into t1 values (1),(2); + +#connection slave; +sync_slave_with_master; +# create an offending record +insert into t1 values (3); + +connection master; + +# hit it +insert into t1 values (3); + +connection slave; + +let $count= 0; +let $table= worker_proc_list; +source include/wait_until_rows_count.inc; + +source include/wait_for_slave_sql_to_stop.inc; +delete from t1; + +source include/start_slave.inc; + +# cleanup + +connection master; +drop table t1; + +#connection slave; +sync_slave_with_master; + +drop view worker_proc_list; +drop view coord_proc_list; +set @@global.slave_exec_mode= @save.slave_exec_mode; + +--echo end of the tests + === modified file 'sql/log_event.cc' --- a/sql/log_event.cc 2010-11-19 14:51:58 +0000 +++ b/sql/log_event.cc 2010-11-20 17:23:42 +0000 @@ -2300,7 +2300,7 @@ static void * head_queue(Slave_jobs_queu /** return a job item through a struct which point is supplied via argument. */ -static Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) +Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret) { if (jobs->e == jobs->s) { @@ -2346,7 +2346,8 @@ void append_item_to_jobs(slave_job_item &rli->pending_jobs_lock, "Waiting for an event from sql thread"); mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); - thd->exit_cond(old_msg, FALSE); + thd->exit_cond(old_msg); + mysql_mutex_lock(&rli->pending_jobs_lock); if (thd->killed) return; } @@ -2355,15 +2356,12 @@ void append_item_to_jobs(slave_job_item mysql_mutex_unlock(&rli->pending_jobs_lock); - mysql_mutex_lock(&w->jobs_lock); - if (!w->info_thd->is_slave_error) // error check exists at stmt commit as well + if (!w->info_thd->killed) { int ret; - // LABS-TODO: GC delete (JQ[a]->data) - // LABS-TODO: en_queue() + mysql_mutex_lock(&w->jobs_lock); - //err= w->jobs.push_back(job_item, &w->mem_root); ret= en_queue(&w->jobs, job_item); DBUG_ASSERT(ret >= 0); @@ -2371,6 +2369,8 @@ void append_item_to_jobs(slave_job_item w->curr_jobs++; if (w->jobs.len == 1) mysql_cond_signal(&w->jobs_cond); + + mysql_mutex_unlock(&w->jobs_lock); } else { @@ -2378,7 +2378,6 @@ void append_item_to_jobs(slave_job_item rli->pending_jobs--; // roll back of the prev incr mysql_mutex_unlock(&rli->pending_jobs_lock); } - mysql_mutex_unlock(&w->jobs_lock); } /** @@ -2437,10 +2436,11 @@ struct slave_job_item* pop_jobs_item(Sla old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, "Waiting for an event from sql thread"); mysql_cond_wait(&w->jobs_cond, &w->jobs_lock); - thd->exit_cond(old_msg, FALSE); + thd->exit_cond(old_msg); + mysql_mutex_lock(&w->jobs_lock); } } - if (job_item) + if (job_item->data) w->curr_jobs--; mysql_mutex_unlock(&w->jobs_lock); @@ -2469,9 +2469,8 @@ int slave_worker_exec_job(Slave_worker * DBUG_ENTER("slave_worker_exec_job"); job_item= pop_jobs_item(w, job_item); - if (!job_item->data) // LABS-TODO de_queue() + if (thd->killed) { - DBUG_ASSERT(thd->killed); error= -1; goto err; } @@ -2517,8 +2516,7 @@ int slave_worker_exec_job(Slave_worker * mysql_mutex_lock(&rli->pending_jobs_lock); rli->pending_jobs--; - DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max && - rli->pending_jobs >= 0); + DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max); /* coordinator can be waiting */ @@ -2527,32 +2525,9 @@ int slave_worker_exec_job(Slave_worker * mysql_cond_signal(&rli->pending_jobs_cond); mysql_mutex_unlock(&rli->pending_jobs_lock); - + w->stmt_jobs++; err: - if (error) - { - thd->is_slave_error= 1; // mts-II_todo: consider volatile, Crdn reads it. - mysql_mutex_lock(&w->jobs_lock); - - //while(w->jobs.pop()) // LABS-TODO de_queue() - while(de_queue(&w->jobs, job_item)) - { - // purging the local jobs queue - mysql_mutex_lock(&rli->pending_jobs_lock); // todo: check mutex:s order - rli->pending_jobs--; - if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 || - rli->pending_jobs == 0) - mysql_cond_signal(&rli->pending_jobs_cond); - mysql_mutex_unlock(&rli->pending_jobs_lock); - } - mysql_mutex_unlock(&w->jobs_lock); - } - else - { - w->stmt_jobs++; - } - DBUG_RETURN(error); } === modified file 'sql/rpl_rli.h' --- a/sql/rpl_rli.h 2010-11-19 14:51:58 +0000 +++ b/sql/rpl_rli.h 2010-11-20 17:23:42 +0000 @@ -425,11 +425,11 @@ public: WL#5569 MTS-II */ DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers - volatile int pending_jobs; + volatile ulong pending_jobs; ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch) mysql_mutex_t pending_jobs_lock; mysql_cond_t pending_jobs_cond; - int slave_pending_jobs_max; + ulong slave_pending_jobs_max; Slave_worker *last_assigned_worker; // a hint to partitioning func for some events Slave_committed_queue *gaq; DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h 2010-11-19 14:51:58 +0000 +++ b/sql/rpl_rli_pdb.h 2010-11-20 17:23:42 +0000 @@ -147,6 +147,7 @@ public: mysql_cond_t jobs_cond; Slave_jobs_queue jobs; + Relay_log_info *c_rli; // fixme: experimental Relay_log_info *w_rli; === modified file 'sql/rpl_slave.cc' --- a/sql/rpl_slave.cc 2010-11-19 14:51:58 +0000 +++ b/sql/rpl_slave.cc 2010-11-20 17:23:42 +0000 @@ -70,12 +70,6 @@ MY_BITMAP slave_error_mask; char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE]; typedef bool (*CHECK_KILLED_FUNC)(THD*,void*); -typedef struct st_slave_worker_init_args -{ - Relay_log_info *rli; - Slave_worker *w; -} SLAVE_WORKER_INIT_ARGS; - char* slave_load_tmpdir = 0; Master_info *active_mi= 0; @@ -3486,7 +3480,7 @@ int check_temp_dir(char* tmp_file) DBUG_RETURN(0); } - +Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret); /* Worker thread for the parallel execution of the replication events @@ -3495,15 +3489,11 @@ int check_temp_dir(char* tmp_file) pthread_handler_t handle_slave_worker(void *arg) { THD *thd; /* needs to be first for thread_stack */ - Slave_worker *w; int error= 0; - - // TODO: remove pending_jobs ref:s to - // TODO: ulong id; - - // rli= ((SLAVE_WORKER_INIT_ARGS *) arg)->rli; - - Relay_log_info* rli = ((Master_info*)arg)->rli; + Slave_worker *w= (Slave_worker *) arg; + Relay_log_info* rli= w->c_rli; + ulong purge_cnt= 0; + struct slave_job_item _item, *job_item= &_item; my_thread_init(); DBUG_ENTER("handle_slave_worker"); @@ -3512,41 +3502,13 @@ pthread_handler_t handle_slave_worker(vo if (!thd) { sql_print_error("Failed during slave worker initialization"); - - mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs= 0; - mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent - mysql_mutex_unlock(&rli->pending_jobs_lock); - goto err; } - /* handshake with each started worker */ - mysql_mutex_lock(&rli->pending_jobs_lock); - get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, rli->pending_jobs - 1); - rli->pending_jobs= 0; - - // TODO: w= ((SLAVE_WORKER_INIT_ARGS *) arg)->w; - w->info_thd= thd; - w->tables_to_lock= NULL; - w->tables_to_lock_count= 0; - - // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli) - // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE); - w->w_rli= new Relay_log_info(FALSE, - &key_relay_log_info_run_lock, - &key_relay_log_info_data_lock, - &key_relay_log_info_data_cond, - &key_relay_log_info_start_cond, - &key_relay_log_info_stop_cond); w->w_rli->info_thd= thd; - w->w_rli->workers= rli->workers; // shallow copying is sufficient - w->w_rli->this_worker= w; thd->thread_stack = (char*)&thd; - mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent - mysql_mutex_unlock(&rli->pending_jobs_lock); pthread_detach_this_thread(); if (init_slave_thread(thd, SLAVE_THD_WORKER)) @@ -3560,19 +3522,50 @@ pthread_handler_t handle_slave_worker(vo threads.append(thd); mysql_mutex_unlock(&LOCK_thread_count); + mysql_mutex_lock(&w->jobs_lock); + + DBUG_ASSERT(w->jobs.len == rli->slave_pending_jobs_max + 1); + w->jobs.len= 0; + mysql_cond_signal(&w->jobs_cond); // ready for duty + + mysql_mutex_unlock(&w->jobs_lock); + DBUG_ASSERT(thd->is_slave_error == 0); - while (!thd->killed) + while (!thd->killed && !error) { error= slave_worker_exec_job(w, rli); } + if (!rli->info_thd->killed) + { + mysql_mutex_lock(&rli->info_thd->LOCK_thd_data); + rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn + mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data); + } + + mysql_mutex_lock(&w->jobs_lock); + + while(de_queue(&w->jobs, job_item)) + { + purge_cnt++; + DBUG_ASSERT(job_item->data); + delete static_cast(job_item->data); + } + + DBUG_ASSERT(w->jobs.len == 0); + + mysql_mutex_unlock(&w->jobs_lock); + mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs--; - if (rli->pending_jobs == 0) - mysql_cond_signal(&rli->pending_jobs_cond); + rli->pending_jobs -= purge_cnt; mysql_mutex_unlock(&rli->pending_jobs_lock); - + + mysql_mutex_lock(&w->jobs_lock); + w->jobs.len= rli->slave_pending_jobs_max + 1; + mysql_cond_signal(&w->jobs_cond); // famous last goodbye + mysql_mutex_unlock(&w->jobs_lock); + err: if (thd) @@ -3603,7 +3596,21 @@ int slave_start_single_worker(Relay_log_ Slave_worker *w= Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i); Slave_job_item empty= {NULL}; - SLAVE_WORKER_INIT_ARGS worker_args= {rli, w}; + + w->c_rli= rli; + w->tables_to_lock= NULL; + w->tables_to_lock_count= 0; + + // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli) + // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE); + w->w_rli= new Relay_log_info(FALSE, + &key_relay_log_info_run_lock, + &key_relay_log_info_data_lock, + &key_relay_log_info_data_cond, + &key_relay_log_info_start_cond, + &key_relay_log_info_stop_cond); + w->w_rli->workers= rli->workers; // shallow copying is sufficient + w->w_rli->this_worker= w; w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0; w->id= i; @@ -3621,8 +3628,9 @@ int slave_start_single_worker(Relay_log_ DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s); w->jobs.e= w->jobs.s; - w->jobs.len= w->jobs.a= 0; - + w->jobs.a= 0; + w->jobs.len= rli->slave_pending_jobs_max + 1; // to first handshake + set_dynamic(&rli->workers, (uchar*) &w, i); mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock, MY_MUTEX_INIT_FAST); @@ -3633,25 +3641,23 @@ int slave_start_single_worker(Relay_log_ SLAVE_INIT_DBS_IN_GROUP, 1); if (pthread_create(&th, &connection_attrib, handle_slave_worker, - (void*) &worker_args)) + (void*) w)) { sql_print_error("Failed during slave worker thread create"); error= 1; goto err; } + + mysql_mutex_lock(&w->jobs_lock); + if (w->jobs.len != 0) + mysql_cond_wait(&w->jobs_cond, &w->jobs_lock); + mysql_mutex_unlock(&w->jobs_lock); err: return error; } -/** - The @c init_slave_workers number of Worker threads start one-by-one with synch - through rli->pending_jobs. - Also objects are initialized that Coordinator and Workers will maintain during their - session life time. - @return 0 success or 1 if fails -*/ int slave_start_workers(Relay_log_info *rli, ulong n) { uint i; @@ -3661,7 +3667,7 @@ int slave_start_workers(Relay_log_info * my_init_dynamic_array(&rli->curr_group_assigned_parts, NAME_LEN, SLAVE_INIT_DBS_IN_GROUP, 1); // GAQ queue holds seqno:s of scheduled groups. C polls workers in - // @c lwm_checkpoint_period to update GAQ (see @c @next_event()) + // @c lwm_checkpoint_period to update GAQ (see @c next_event()) // The length of GAQ is derived from @c slave_max_pending_jobs to guarantee // each assigned job being sent to a WQ will be represented by an item in GAQ. // ::slave_max_pending_jobs is the worst case when all jobs contain @@ -3672,54 +3678,10 @@ int slave_start_workers(Relay_log_info * rli->mts_total_groups= 0; for (i= 0; i < n; i++) { - uint k; - Slave_worker *w= - Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i); - Slave_job_item empty= {NULL}; - w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0; - w->id= i; - w->current_table= NULL; - w->usage_partition= 0; - w->last_group_done_index= rli->gaq->s; // out of range - - // Queue initialization - w->jobs.s= rli->slave_pending_jobs_max + 1; - my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g n * 10; - for (k= 0; k < w->jobs.s; k++) - insert_dynamic(&w->jobs.Q, (uchar*) &empty); - - DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s); - - w->jobs.e= w->jobs.s; - w->jobs.len= w->jobs.a= 0; - - set_dynamic(&rli->workers, (uchar*) &w, i); - mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock, - MY_MUTEX_INIT_FAST); - mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL); - } - for (i= 0; i < rli->workers.elements; i++) - { - pthread_t th; - Slave_worker *w; - get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i); - - /* handshake with each started workers */ - mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs= i + 1; - mysql_mutex_unlock(&rli->pending_jobs_lock); - if (pthread_create(&th, &connection_attrib, handle_slave_worker, (void*) rli->mi)) + if ((error= slave_start_single_worker(rli, i))) { - sql_print_error("Failed during slave worker thread create"); - error= 1; goto err; } - - mysql_mutex_lock(&rli->pending_jobs_lock); - /* wait till the Worker is of full legal age */ - if (rli->pending_jobs > 0) - mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); - mysql_mutex_unlock(&rli->pending_jobs_lock); } if (init_hash_workers(n)) // MTS TODO: mapping_db_to_worker -> APH @@ -3740,31 +3702,37 @@ void slave_stop_workers(Relay_log_info * { int i; - - mysql_mutex_lock(&rli->pending_jobs_lock); - rli->pending_jobs += rli->workers.elements; - for (i= rli->workers.elements - 1; i >= 0; i--) { Slave_worker *w; get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i); + + mysql_mutex_lock(&w->jobs_lock); + + if (w->jobs.len == rli->slave_pending_jobs_max + 1) + { + mysql_mutex_unlock(&w->jobs_lock); + continue; + } + mysql_mutex_unlock(&w->jobs_lock); + mysql_mutex_lock(&w->info_thd->LOCK_thd_data); w->info_thd->awake(THD::KILL_QUERY); mysql_mutex_unlock(&w->info_thd->LOCK_thd_data); } - while (rli->pending_jobs > 0) - { - thd_proc_info(rli->info_thd, "Waiting for workers to exit"); - mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); - } - - mysql_mutex_unlock(&rli->pending_jobs_lock); + thd_proc_info(rli->info_thd, "Waiting for workers to exit"); for (i= rli->workers.elements - 1; i >= 0; i--) { Slave_worker *w; get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i); + + mysql_mutex_lock(&w->jobs_lock); + if (w->jobs.len != rli->slave_pending_jobs_max + 1) + mysql_cond_wait(&w->jobs_cond, &w->jobs_lock); + mysql_mutex_unlock(&w->jobs_lock); + mysql_mutex_destroy(&w->jobs_lock); mysql_cond_destroy(&w->jobs_cond); @@ -3774,6 +3742,9 @@ void slave_stop_workers(Relay_log_info * delete_dynamic_element(&rli->workers, i); delete w; } + + DBUG_ASSERT(rli->pending_jobs == 0); + destroy_hash_workers(); delete rli->gaq; } === modified file 'sql/sql_class.h' --- a/sql/sql_class.h 2010-11-09 13:04:14 +0000 +++ b/sql/sql_class.h 2010-11-20 17:23:42 +0000 @@ -2198,7 +2198,7 @@ public: proc_info = msg; return old_msg; } - inline void exit_cond(const char* old_msg, bool release_lock= TRUE) + inline void exit_cond(const char* old_msg) { /* Putting the mutex unlock in thd->exit_cond() ensures that @@ -2206,8 +2206,7 @@ public: locked (if that would not be the case, you'll get a deadlock if someone does a THD::awake() on you). */ - if (release_lock) - mysql_mutex_unlock(mysys_var->current_mutex); + mysql_mutex_unlock(mysys_var->current_mutex); mysql_mutex_lock(&mysys_var->mutex); mysys_var->current_mutex = 0; mysys_var->current_cond = 0; === modified file 'sql/sys_vars.cc' --- a/sql/sys_vars.cc 2010-11-18 14:00:52 +0000 +++ b/sql/sys_vars.cc 2010-11-20 17:23:42 +0000 @@ -3115,7 +3115,7 @@ static Sys_var_ulong Sys_slave_max_pendi "The coordinator thread suspends further jobs assigning until " "conditions have been improved ", GLOBAL_VAR(slave_max_pending_jobs), CMD_LINE(REQUIRED_ARG), - VALID_RANGE(0, ULONG_MAX), DEFAULT(40000), BLOCK_SIZE(1)); + VALID_RANGE(0, ULONG_MAX - 1), DEFAULT(40000), BLOCK_SIZE(1)); static Sys_var_mybool Sys_slave_local_timestamp( "slave_local_timestamp", "if enabled slave computes the event appying " "time value to implicitly affected timestamp columms. Otherwise (default) " --===============0701247318== MIME-Version: 1.0 Content-Type: text/bzr-bundle; charset="us-ascii"; name="bzr/andrei.elkin@stripped" Content-Transfer-Encoding: 7bit Content-Disposition: inline # Bazaar merge directive format 2 (Bazaar 0.90) # revision_id: andrei.elkin@stripped # target_branch: file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-\ # mr-wl5569/ # testament_sha1: e6c1c6826f8bcdd0a2fdb3b08196bb664cb849ad # timestamp: 2010-11-20 19:24:18 +0200 # base_revision_id: andrei.elkin@stripped\ # e984dthkb1ussm1b # # Begin bundle IyBCYXphYXIgcmV2aXNpb24gYnVuZGxlIHY0CiMKQlpoOTFBWSZTWRui374AC9z/gFkQIAh///// f+//8P////pgFu7r6y+u4fAwO331w+6++qfJXW261hV7u6N6vvt4u5N33pcW77HpJT1saqa1Vfdv u3rM2wsU9m9fXz7Ma2EkSaaSeI0ZApjRoYk8kzRAjRoGjTQAPUNDaglEAExIxI1MRNMmIMTQANAy A0xMgB6QSmhNBIg1E/Qp6mmjagAAAAAAAABoJNSJDQU2mCUzSbIJp5E0aZMmmgAAAGgAEUhNJlNo NJpqaNo00obU9GmgjIGg0A0BpoAAkUCAmmIARkTajTQSeg0mQ02kAaABpoCQZfsHR7WQLv1KDGlu Twdof9DnYFyUWU+vVrqL0Kd1JzKrEYZ6aVqk2VYS4D6Gej1T6z8O99xtwMu8s/4JjXgud5DcERTm Fzc/fiZ4fvMddqnGHEoif6Xgg5M9Mcvh02llOthzs574KLbr2lk0JV85Y5rLnu+8+7cidWiCODbj CFly1NdGso301RhpjOPZZc7CSAXnZSYplXbcveBy2obcLEKYcbM4qDJjZDrZZyJjbLuP6srdsp3W ve1S/LETjac25JipxqsWvNC1sxtZ+90OWl2Q1OM/Fy8c3OpBQFrUDJxJib4EMyBDBJIMCeNkwScG kiFqYEkCsCnKeDkqjXyzINoeuIOgZ6mf2QH8HGdGOJcv/sPy0ccNh4oHpSckiqqKSLBJsG0sVraX 3Aly7+ff3bnGBThdH6T8TnK2E65EXBeLatyoNr7sKey8mkWRlXSp9ImnB994pTZ27IdX0VnYih8z EU4Ti0EQ7RhyEFk6HRky6dXmQfTwgWrXtbSusite9InKkWrzaajMQnRhQhYvqcrCLrt83UdMQ3A5 hBbr9EUkkEHO9G8Qbn2WP6AaZlI9K9a6HXoMd4iblqY8Ay2RU7SeFfBc02NaeePeGndwaDpvcm5m xuFNyup9Iy8+OwqajU366Gx3c0CDCw9zP8bqTDQgbE232ZnA3P4A8og8ZRgBvHZjxCuMmspcVOOl RHHb+Gtc7pkFxD2UDjGd+EfJthl979R9+LH6zs5kcIH0Tq+dJA0qLm/gD62MXg6BCFxDfND7FLbQ yzMdVWCzkyvVLGCKNhmq7+hgKBSxvQEDfKB01CCzWcjPTCtMMMSltKM1YSjKdsnAjplxOooGQxjq 57YJDr2kS3hrMBlSMqB0a+xejwx8oR9Dlqafr785ZqZIb6+J5M+9l2xBIu3l0tEb9YBLOACPmBHP 0wUkLnyH80T60cEIgxv7FY3FByWETAiAShaeJE9Iw83s68LibZXDR4kir6NFR4WzKHwZBvzbMq+D s8V07TOcvj9sR6N2HTlze+zcGhXt+agc2u3r906LG13NNJodnNgauPGa3/iQ/ici8yuzYCcWfEYM QQ5NRsTNT0crOx65exM4Fpcek9J1HaaTJuRKrs5Xd4O7w3qZMvAcdMbVnEaxgU1jtxFMO4+yUqzO MKRAASCQkmP+cHPKuouPUZR0nWfxKPFioUlFehYSeKvrFdJQuoNbN1yVMTNL0nN1a4uznydFcTxt 6jOEaswAx4ABbyVUSirkw01KVqo2A6Qo6hgIiGsLL5QOuHzITj3ZoGjGlM+hNuFYJHAhYoqBt5OG 4SzzmqSdtDlS7AphdJkaZA7jIdd3GunU+HGCWz5jvm76jLBuHIOgdssU/MLCGmhAoUAGAXA2Fjnh sQowVM2WllkriVQyaJK2zyyl53u7wwVoVKW08tsmE2jZg35ISqYOZe8a2KBUdSpEDSu4uJWFlbPM FohSaCcBZ4c2MX2nh7yDpiSjQMoGyqLAolEsjkpNdV4IWpoIbP2lgbjZA6aNl6twWwghBnjK97hl xEVEytRoIm2dxYm69JbmTm1VyW7ZxYZ7+BJEH3Mkq0nYt+UiI33QJ21JeBpqKQiMjFZAlguATUH8 GHmLTtuklUgvhnKGq3kYMZrQRNhA24nGZrStJGi+ZkFBaZzjnikRJGRLHRYFdrp2K6IV2DkrcDTc FGguJCRjKKh3eSOCA6QisEnWDySlIML+cuhbO5VDIqY1DcNQEhhsGKtjka7inCBRHAaNlygjMZle R0bWUpZMyUZzLuMQWYYiR4LEPYYpyBB1iNJESHFrPgdK15SdYrrnGEzKE+2O2+cGQIPVXvj2h2kV cTUnqOBB3X3IJlrO04bM2xuN/VXJfG4ZjB1wQm4k2mWGEF2zvkgOTGMTYsIq1R3DRMrUAg1Ss2KR OfGsMjOT1V14jhICtskWN3GEc4DVULRy8SxrlFBYSJjusceeO7hHgRF+W3WVJ1uWwOzDndbyMa40 +/eZI30MxQopwSvo5HOFl6KzeULbEvXdEqJRcA418O7gu6zK50EjM61hsVzLXaYO0pE+vt94VKJo IbKVFMyRzyMiZmb0rS02HXjE1DOI//C6aZby4yhfopKRKGePTuvL90tMZQ6a1M+xJhmnqktHA4j3 7j2XIPcdfiK4iUT7FEwxn9Z56t26KCzinNFa1nS4kaTPJokDmM5xJPQKHKYqXkCTqVKnYeNMZHL1 x/sIskgnuJvL5rbe3FnMV5O0XQcxj+cgRhzc7it560JJs+ze+KjzcnU9YdhUmm8yMZNejRX8WJYk Yi0dYJu5BxkLYycQTeSGOL8cPUQ2FMishUiYNxIUwM9S49N6wyFcr9Ove4a9MY6YzdDGFw8ooLbW bBlqXXYkVsovtwiCIuDuiMxFnBUUWCWdgkcsDkMFI6E3dDAcOPPQaJuo1poi1Uajil357H6Xwc0i 7hkVZ14i006byrnJG98OD1iml7jy5Grdw8UpMoxNUzo5+5D7lOH6RzDNTbUZbRzm020dWib2onwv HaiSrJjUToFSTTCaisIFMDxhBzWv4yMQMDaSKFDou0SMtFUKGcm4HKF92mBCYh5YECtVaCw5krUl CDRVTUyyGpQewoji3PzwbEsSQNDaDZuO5uJ5oEbRN5BEmMebSjLUsXNE7giFuR1WcRhAyCBwMx1y YiHcRxu3UYJ5u1nyLq+lxBmqtgwZZEvJYbPICGKrNRZmKZnlTDPdxjMvgg3A1sqs9yLzLKGkaMLc OmUwvOx25emBr6HqbgreohMiv8HUSpMWR5qrkdcrjv8UBsjRjUOTymY3OrlM7R3zZrr1BeLzCmgX lZefVO28Rc5d9fsJ+1trBPUHVAfJsoxcTiw2LlhrxqShGZIG+palEFSRNId/3kPizIYwgBYB6YT8 ANWUAJgfIf36j+vXT/k9hH39lq0AMkzDHeVdyKP9URwmD+/sdHvvm4s8O8boYYF3A5UOmifjfZvS 9BbU4dKUG6SE4dKOPUvjNsBx3tM1f6RrxY/1pYfzMeGh6HW+AjIzF5GvEhv0yCpkMDVCFJoOGBHI UaVFVaaWCrEHFaEWaaCP+Y0Z4UfB8BtexkRrKeRHe1mibj6A9W33I51vR+PmN1qMASe34j8TD4CY kKsypNHnRU1WheAhkLIb4fSOyH+oqlG2BJC2iFqegTMoZ+DAuPSJ+ZVbhNDZeCfEwrlPOwtIsbCC AggWUZRmTGFFnZLzEpqZ0lzVp0STQJDAQ3IOmYmaZchKQqZAUqUTgaS01kMCWYnASE8Ri4yDBsG8 dDkyD1tBDAX2R7TMew+0u8ZPgCH5DENxAm4eeQmef8+oniSpY8p5joPP1+BwzyP5GFKCOXgOBUZF mWqCAMO/cNpo0HQrJHLWaALgZmYU3qfK53U56lkr7TaBp5ub1cUjMoXayYEbEEdtESYq0H2nyCGg h8LSmT8sfRuppD4GaSDzGnbMlkgTfAkTCse2+9Fb7qkHYgndcESCcWrZZbEg2Pa47BDMECJ6KFVm J7/Vlek60tkLhI5k/SMOTm008sxOLUGboHNYjUMBxw2bSUByxA0dBzIDloM6fBxqoQLSasapyHRJ HoEAe4qIxtBXjDktMRcZcn6mslIcqiZCpSKDCk59Aa1C8T7ugqcR83HLwA6zPJue8rQ9WwxYzBs3 GGfLF/YEtE9tpagRFklWiSAK6gRFaUAb3NAOdD0nv+V4FJRRPUJuOjKNQyakxNVAv3t3U0T0yViS GIgiHaO05LJiGkdkjykJg5GlCmjYeOHcUMfigdp6TsMF5DR7e8gax5pHoLaNF6R3rMmUrLNJMtSF rLk9qVYaTjvOxte5GV+ZxxU2TOBBHvXUoZcwRmvSaziUA8MzifvpiI3LcJbjdDmJkAUAMWciVdRT k6/iYLbjKR0LysTvzkpNBnI9xE6pKKPlpddZCIT2XMIWEgmQhENWfHbUrK6k8ENADSTQXxGkHQxs tth4ETgTKc02i8pECNN48T/zCDFJWahyVOseoaA2YgGBLtM5C0xsnTDBYLuQRQawwMLiKlQIkAMk 4JoJxK0yCighsOSOTItLE4HyiNSz+FZVWl2pPZqgpRNpTSRHtWrnp8I0nxd0OgtOijQBpdDvs5Gq 2vFATl5Mi+lnT9gAdeIFaFERgMhi1Y9HihU8OgWNDEd5eRUUjPv4hNpNvPtglEFkGvQYn9PVCgpi n1FFieRlpMgO05hgIi0EkVRvA0xMhm3jletbAb+AIgIbkSglg9g5EkggfTPBLjfhaB60xgdEjwPA 1dSRI+LsSpW/fOTms8ZvEjxlJbjUSVZfOpaki0RbxcgpvjQY91CFwQ4jsFJPO5xxeYEompgyTDpb aA3WcWk3YhcCZN5KZzMux3NgWC0Rl3jZPkAowDyaLJlnmAzGLxjbfFexvHuM3q6eEl4HFhd1EpMh sBG4vgmRZmJEwF9rDhMsYYhmmTyQtNEc7MOE07kU5KDyF0KEwAhmS8bNCD5S9xqW4jLjdG07WFnt I5ruDyYkr4fIa7doFx8rYIPGJTC48BIPZKDuxQlKQjYpqaF3mim8gALlzBEuQMGxHSYLRLWEzXAX xi1HDDttgoa4XYUJSIoUMy80CZwGI41evKnsu20QXNiGDbqkkUrNfMQKwSOkjF9YyGQbQygvPpVk VX2ZLA9mSCrDgixd+S8TmsUYcl9EQZ0g0DYhsGEL73S9sucXAFO28eRmca3dIQ3jkFdk8T6kUO0M QktiAMgVl5k2pdyuglA9ho9RM5yEkBHXDFA79dol2yzbB7dyF00kCSFOEYIiGoqBfpRnp9JoQAwb GkmqCMfPNdiv9jQgIb0AhC4mMgCA79LlxCsKGfqAwrOKFUslpN6AIwrqDXbccAGMRT91aAM8guaq CpAVN+gzQ2AOlQYqIoshLHluXnmAktisVJ0qYLlM+UERAp2rQrQtBmFNkBgA1sSZDnoBRBiKw8Nh lEyGcEoJTsZIFJc4Clk76UOzmmwlRgmQg59ai1EKlicDVHGbpU7PFeEOk29aaA6h7nblDSULEWnO vV5ZmQ1nizY2n9cETGjr1ZUooJAA0VRPeKdBnQq72ATvGW2aJVnMhLgaAwExX6JCAlrSmyw3LOEw RiFIITGkFTs4qNU5ll71ayFyCYmonjnnnYJlJl0yhO6kxJSpKIWWiVdLEGLIO+aanYe1DnJzMkoP DIpG8YaEEML0roEgCBBzqklErhaJTEGwfRCN5ylqSs3FLQdSAdqC/gW2FCYBsuXvaig3Wogl9q4q /UMBoTa2KcC/Wpjs1TU80ALBMNpDXky/H4IrEWEUMZMuGrEI7zz8ARZVHWrlz4MjGo6+UvE5YEvL bwTDreLm1ZGjmu3oEuqEIUhhcCbflE8jUii8MX90drqPjhRuN/NwLlv7XPnyDV8VVDyBjoHrqS9p 5R6XrN72Cm7MbgBhhtdQ9blDQEQ4GSIBnfRagqtWbY3o9lncJBJZwjIVO6epUEiQI89uO9JTSPU7 AmbEzqk5PYgDpAyQ+BoESaRyOVB0+cqnERQ3p26VhbvIEYGamJKlD2HznP8t8iUBTbE2NqpK8XMV ySOG8FTYBfeTkuTlnk4NDVVTS2rtDZGw6t/APK2TgIc2KCQm446rOtAoUY1BUPCAuKmFJ3TkRtGZ ZpkZWW2r2jvFZBus6ogBH1HkrL1OoYe0TBKV82LBOMUiOwI3yzk/g5w+ji0igLPeOHin7tycDmTl gdkx6ZyXN2yt4RgXAytYxgKZpOkgp4SVaKYFYyGknCW2STXA43XVLVVVzSTqhEJRU5AK5NYo1CGw 70u9ahbZT5SFETENYF7hQCoCgxBw9EU0tF4yAERqYBFirBkJEyKJ+5alYY5V/HPG/KKuQsWG8Zqk QZUIklw7Q2KsKLUPElN21dqsIoXY5wqWYW9GoS+8RgmedMKTSGBpvRbdOKmg34ECgqIIhCAalRzO QDKHwjxlRH4YS+paQMQHI3HQZaEy3CJA0Ilk2yHC4ja5GZNNZsRJCCbxdtNaTEg7PVvhrZcAuVkG TnScOgVOip+EhkFHqSmts2+9JlEavacqTC8QxPFERBUDBSVfIhFWsNzv3zSJ1+RKz8agLitdlwI5 jIwC7xGgwOGGuQdmEYs0yPAYiULUZDSZ2q2K999i+fyrDNEjRpHxGOoyoZh0lhxAAVQ0PY6HjbgM CoPlbmm6JUSyEm8/klVIZCkMoaOJhUPN3BmC060FvDtoY5nPu2OZE14v693PMoRcYFfJG/YOdeGP C7fOzoCTgTkYA913HbZ90hB2ZQ390DwavLKiJrTUgIopJVejQblSC3jTUQAzV8xVGUFjrpsMCRap Y1uViznkYb/xdyRThQkBui374A== --===============0701247318==--