From: Nuno Carvalho Date: June 15 2012 4:39pm Subject: bzr push into mysql-trunk branch (nuno.carvalho:4022 to 4023) Bug#13635612 List-Archive: http://lists.mysql.com/commits/144252 X-Bug: 13635612 Message-Id: <20120615163926.30139.68872.4023@localhost> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4023 Nuno Carvalho 2012-06-15 BUG#13635612: VALGRIND ERRORS IN REPLICATION There was a possible memory leak when multi thread slave coordinator is waiting for successful job enqueue on worker queue and slave is stopped (3rd error on bug report). Ensure that when job enqueue on worker is interrupted the current event is deleted and the error propagated to coordinator. @ sql/rpl_info_factory.cc Slave_worker::end_info() now is private. @ sql/rpl_rli_pdb.h Removal of unused code and make Slave_worker::end_info() private, now it is implicitly called by Slave_worker destructor. @ sql/rpl_rli_pdb.cc Refactored Slave_worker destructor to call end_info() and free worker jobs queues. Changed append_item_to_jobs() in order to inform caller when it is interrupted. @ sql/rpl_slave.cc Ensure that slave_start_workers() and slave_stop_workers() have proper mutex locked. Changed apply_event_and_update_pos() in order to take in account possible append_item_to_jobs() interruptions. modified: sql/rpl_info_factory.cc sql/rpl_rli_pdb.cc sql/rpl_rli_pdb.h sql/rpl_slave.cc 4022 kevin.lewis@stripped 2012-06-15 Compiler warning, unused variable. modified: storage/innobase/fil/fil0fil.cc === modified file 'sql/rpl_info_factory.cc' --- a/sql/rpl_info_factory.cc 2012-05-07 08:53:28 +0000 +++ b/sql/rpl_info_factory.cc 2012-06-15 16:37:59 +0000 @@ -375,7 +375,6 @@ Slave_worker *Rpl_info_factory::create_w if (worker->rli_init_info(is_gaps_collecting_phase)) { - worker->end_info(); msg= "Failed to intialize the worker info structure"; goto err; } === modified file 'sql/rpl_rli_pdb.cc' --- a/sql/rpl_rli_pdb.cc 2012-05-07 08:53:28 +0000 +++ b/sql/rpl_rli_pdb.cc 2012-06-15 16:37:59 +0000 @@ -93,6 +93,12 @@ Slave_worker::Slave_worker(Relay_log_inf Slave_worker::~Slave_worker() { + end_info(); + if (jobs.inited_queue) + { + DBUG_ASSERT(jobs.Q.elements == jobs.size); + delete_dynamic(&jobs.Q); + } delete_dynamic(&curr_group_exec_parts); mysql_mutex_destroy(&jobs_lock); mysql_cond_destroy(&jobs_cond); @@ -1578,12 +1584,16 @@ Slave_job_item * de_queue(Slave_jobs_que @param job_item a pointer to struct carrying a reference to an event @param worker a pointer to the assigned Worker struct @param rli a pointer to Relay_log_info of Coordinator + + @return false Success. + true Thread killed or worker stopped while waiting for + successful enqueue. */ -void append_item_to_jobs(slave_job_item *job_item, +bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *worker, Relay_log_info *rli) { THD *thd= rli->info_thd; - int ret; + int ret= -1; ulong ev_size= ((Log_event*) (job_item->data))->data_written; ulonglong new_pend_size; PSI_stage_info old_stage; @@ -1603,7 +1613,7 @@ void append_item_to_jobs(slave_job_item mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock); thd->EXIT_COND(&old_stage); if (thd->killed) - return; + return true; mysql_mutex_lock(&rli->pending_jobs_lock); @@ -1627,8 +1637,6 @@ void append_item_to_jobs(slave_job_item rli->mts_wq_no_underrun_cnt++; } - ret= -1; - mysql_mutex_lock(&worker->jobs_lock); // possible WQ overfill @@ -1662,6 +1670,8 @@ void append_item_to_jobs(slave_job_item rli->mts_pending_jobs_size -= ev_size; mysql_mutex_unlock(&rli->pending_jobs_lock); } + + return (-1 != ret ? false : true); } === modified file 'sql/rpl_rli_pdb.h' --- a/sql/rpl_rli_pdb.h 2012-05-07 08:53:28 +0000 +++ b/sql/rpl_rli_pdb.h 2012-06-15 16:37:59 +0000 @@ -53,9 +53,7 @@ Slave_worker *map_db_to_worker(const cha Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers); int wait_for_workers_to_finish(Relay_log_info const *rli, Slave_worker *ignore= NULL); -bool critical_worker(Relay_log_info *rli); -#define SLAVE_WORKER_QUEUE_SIZE 8096 #define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2 @@ -346,7 +344,6 @@ public: int init_worker(Relay_log_info*, ulong); int rli_init_info(bool); - void end_info(); int flush_info(bool force= FALSE); static size_t get_number_worker_fields(); void slave_worker_ends_group(Log_event*, int); @@ -359,6 +356,7 @@ protected: const char *msg, va_list v_args) const; private: + void end_info(); bool read_info(Rpl_info_handler *from); bool write_info(Rpl_info_handler *to); Slave_worker& operator=(const Slave_worker& info); === modified file 'sql/rpl_slave.cc' --- a/sql/rpl_slave.cc 2012-06-13 11:16:44 +0000 +++ b/sql/rpl_slave.cc 2012-06-15 16:37:59 +0000 @@ -104,7 +104,7 @@ const ulong mts_coordinator_basic_nap= 5 */ const ulong mts_worker_underrun_level= 10; Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret); -void append_item_to_jobs(slave_job_item *job_item, +bool append_item_to_jobs(slave_job_item *job_item, Slave_worker *w, Relay_log_info *rli); /* @@ -169,6 +169,15 @@ failed read" } }; +enum enum_slave_apply_event_and_update_pos_retval +{ + SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK= 0, + SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR= 1, + SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR= 2, + SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR= 3, + SLAVE_APPLY_EVENT_AND_UPDATE_POS_MAX +}; + static int process_io_rotate(Master_info* mi, Rotate_log_event* rev); static int process_io_create_file(Master_info* mi, Create_file_log_event* cev); @@ -3192,14 +3201,22 @@ int ulong_cmp(ulong *id1, ulong *id2) @note MTS can store NULL to @c ptr_ev location to indicate the event is taken over by a Worker. - @retval 0 OK. + @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK + OK. - @retval 1 Error calling ev->apply_event(). + @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR + Error calling ev->apply_event(). - @retval 2 No error calling ev->apply_event(), but error calling - ev->update_pos(). + @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR + No error calling ev->apply_event(), but error calling + ev->update_pos(). + + @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR + append_item_to_jobs() failed, thread was killed while waiting + for successful enqueue on worker. */ -int apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli) +enum enum_slave_apply_event_and_update_pos_retval +apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli) { int exec_res= 0; bool skip_event= FALSE; @@ -3276,7 +3293,7 @@ int apply_event_and_update_pos(Log_event { // Sleeps if needed, and unlocks rli->data_lock. if (sql_delay_event(ev, thd, rli)) - DBUG_RETURN(0); + DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK); exec_res= ev->apply_event(rli); @@ -3307,6 +3324,7 @@ int apply_event_and_update_pos(Log_event rli->last_assigned_worker= NULL; } + bool append_item_to_jobs_error= false; if (rli->curr_group_da.elements > 0) { /* @@ -3319,7 +3337,10 @@ int apply_event_and_update_pos(Log_event get_dynamic(&rli->curr_group_da, (uchar*) &da_item.data, i); DBUG_PRINT("mts", ("Assigning job %llu to worker %lu", ((Log_event* )da_item.data)->log_pos, w->id)); - append_item_to_jobs(&da_item, w, rli); + if (!append_item_to_jobs_error) + append_item_to_jobs_error= append_item_to_jobs(&da_item, w, rli); + if (append_item_to_jobs_error) + delete static_cast(da_item.data); } if (rli->curr_group_da.elements > rli->curr_group_da.max_element) { @@ -3330,12 +3351,15 @@ int apply_event_and_update_pos(Log_event } rli->curr_group_da.elements= 0; } + if (append_item_to_jobs_error) + DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR); DBUG_PRINT("mts", ("Assigning job %llu to worker %lu\n", ((Log_event* )job_item->data)->log_pos, w->id)); /* Notice `ev' instance can be destoyed after `append()' */ - append_item_to_jobs(job_item, w, rli); + if (append_item_to_jobs(job_item, w, rli)) + DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR); if (need_sync) { /* @@ -3490,11 +3514,12 @@ int apply_event_and_update_pos(Log_event " Stopped in %s position %s", rli->get_group_relay_log_name(), llstr(rli->get_group_relay_log_pos(), buf)); - DBUG_RETURN(2); + DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR); } } - DBUG_RETURN(exec_res ? 1 : 0); + DBUG_RETURN(exec_res ? SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR + : SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK); } @@ -3550,7 +3575,7 @@ static int exec_relay_log_event(THD* thd } if (ev) { - int exec_res; + enum enum_slave_apply_event_and_update_pos_retval exec_res; ptr_ev= &ev; /* @@ -3639,11 +3664,18 @@ static int exec_relay_log_event(THD* thd } /* - update_log_pos failed: this should not happen, so we don't - retry. + exec_res == SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR + update_log_pos failed: this should not happen, so we + don't retry. + exec_res == SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR + append_item_to_jobs() failed, this happened because + thread was killed while waiting for enqueue on worker. */ - if (exec_res == 2) + if (exec_res >= SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR) + { + delete ev; DBUG_RETURN(1); + } if (slave_trans_retries) { @@ -3684,7 +3716,7 @@ static int exec_relay_log_event(THD* thd errmsg); else { - exec_res= 0; + exec_res= SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK; rli->cleanup_context(thd, 1); /* chance for concurrent connection to get more locks */ slave_sleep(thd, min(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE), @@ -3723,6 +3755,8 @@ static int exec_relay_log_event(THD* thd rli->trans_retries)); } } + if (exec_res) + delete ev; DBUG_RETURN(exec_res); } mysql_mutex_unlock(&rli->data_lock); @@ -4525,7 +4559,6 @@ int mts_recovery_groups(Relay_log_info * Deletes the worker because its jobs are included in the latest checkpoint. */ - worker->end_info(); delete worker; } } @@ -4714,7 +4747,6 @@ err: for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++) { get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job); - job_worker.worker->end_info(); delete job_worker.worker; } @@ -4888,6 +4920,8 @@ int slave_start_single_worker(Relay_log_ pthread_t th; Slave_worker *w= NULL; + mysql_mutex_assert_owner(&rli->run_lock); + if (!(w= Rpl_info_factory::create_worker(opt_rli_repository_id, i, rli, false))) { @@ -4923,9 +4957,6 @@ int slave_start_single_worker(Relay_log_ err: if (error && w) { - w->end_info(); - if (w->jobs.inited_queue) - delete_dynamic(&(w->jobs.Q)); delete w; /* Any failure after dynarray inserted must follow with deletion @@ -4952,6 +4983,8 @@ int slave_start_workers(Relay_log_info * uint i; int error= 0; + mysql_mutex_assert_owner(&rli->run_lock); + if (n == 0 && rli->mts_recovery_group_cnt == 0) { reset_dynamic(&rli->workers); @@ -5052,6 +5085,8 @@ void slave_stop_workers(Relay_log_info * int i; THD *thd= rli->info_thd; + mysql_mutex_assert_owner(&rli->run_lock); + if (!*mts_inited) return; else if (rli->slave_parallel_workers == 0) @@ -5130,10 +5165,7 @@ void slave_stop_workers(Relay_log_info * mysql_mutex_lock(&w->jobs_lock); } mysql_mutex_unlock(&w->jobs_lock); - w->end_info(); - DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size); - delete_dynamic(&w->jobs.Q); delete_dynamic_element(&rli->workers, i); delete w; } @@ -5490,6 +5522,7 @@ llstr(rli->get_group_master_log_pos(), l err: + mysql_mutex_lock(&rli->run_lock); slave_stop_workers(rli, &mts_inited); // stopping worker pool if (rli->recovery_groups_inited) { @@ -5515,7 +5548,6 @@ llstr(rli->get_group_master_log_pos(), l thd->reset_db(NULL, 0); THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit); - mysql_mutex_lock(&rli->run_lock); /* We need data_lock, at least to wake up any waiting master_pos_wait() */ mysql_mutex_lock(&rli->data_lock); DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun No bundle (reason: useless for push emails).