List:Commits« Previous MessageNext Message »
From:Nuno Carvalho Date:June 15 2012 4:39pm
Subject:bzr push into mysql-trunk branch (nuno.carvalho:4022 to 4023) Bug#13635612
View as plain text  
 4023 Nuno Carvalho	2012-06-15
      BUG#13635612: VALGRIND ERRORS IN REPLICATION
      
      There was a possible memory leak when multi thread slave coordinator is
      waiting for successful job enqueue on worker queue and slave is stopped 
      (3rd error on bug report).
      
      Ensure that when job enqueue on worker is interrupted the current event
      is deleted and the error propagated to coordinator.
      
      @ sql/rpl_info_factory.cc
         Slave_worker::end_info() now is private.
      @ sql/rpl_rli_pdb.h
         Removal of unused code and make Slave_worker::end_info() private, now
         it is implicitly called by Slave_worker destructor.
      @ sql/rpl_rli_pdb.cc
         Refactored Slave_worker destructor to call end_info() and free worker
         jobs queues.
         Changed append_item_to_jobs() in order to inform caller when it is 
         interrupted.
      @ sql/rpl_slave.cc
         Ensure that slave_start_workers() and slave_stop_workers() have proper
         mutex locked.
         Changed apply_event_and_update_pos() in order to take in account 
         possible append_item_to_jobs() interruptions.

    modified:
      sql/rpl_info_factory.cc
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
 4022 kevin.lewis@stripped	2012-06-15
      Compiler warning, unused variable.

    modified:
      storage/innobase/fil/fil0fil.cc
=== modified file 'sql/rpl_info_factory.cc'
--- a/sql/rpl_info_factory.cc	2012-05-07 08:53:28 +0000
+++ b/sql/rpl_info_factory.cc	2012-06-15 16:37:59 +0000
@@ -375,7 +375,6 @@ Slave_worker *Rpl_info_factory::create_w
        
   if (worker->rli_init_info(is_gaps_collecting_phase))
   {
-    worker->end_info();
     msg= "Failed to intialize the worker info structure";
     goto err;
   }

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2012-05-07 08:53:28 +0000
+++ b/sql/rpl_rli_pdb.cc	2012-06-15 16:37:59 +0000
@@ -93,6 +93,12 @@ Slave_worker::Slave_worker(Relay_log_inf
 
 Slave_worker::~Slave_worker() 
 {
+  end_info();
+  if (jobs.inited_queue)
+  {
+    DBUG_ASSERT(jobs.Q.elements == jobs.size);
+    delete_dynamic(&jobs.Q);
+  }
   delete_dynamic(&curr_group_exec_parts);
   mysql_mutex_destroy(&jobs_lock);
   mysql_cond_destroy(&jobs_cond);
@@ -1578,12 +1584,16 @@ Slave_job_item * de_queue(Slave_jobs_que
    @param job_item  a pointer to struct carrying a reference to an event
    @param worker    a pointer to the assigned Worker struct
    @param rli       a pointer to Relay_log_info of Coordinator
+
+   @return false Success.
+           true  Thread killed or worker stopped while waiting for
+                 successful enqueue.
 */
-void append_item_to_jobs(slave_job_item *job_item,
+bool append_item_to_jobs(slave_job_item *job_item,
                          Slave_worker *worker, Relay_log_info *rli)
 {
   THD *thd= rli->info_thd;
-  int ret;
+  int ret= -1;
   ulong ev_size= ((Log_event*) (job_item->data))->data_written;
   ulonglong new_pend_size;
   PSI_stage_info old_stage;
@@ -1603,7 +1613,7 @@ void append_item_to_jobs(slave_job_item
     mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
     thd->EXIT_COND(&old_stage);
     if (thd->killed)
-      return;
+      return true;
 
     mysql_mutex_lock(&rli->pending_jobs_lock);
 
@@ -1627,8 +1637,6 @@ void append_item_to_jobs(slave_job_item
     rli->mts_wq_no_underrun_cnt++;
   }
 
-  ret= -1;
-
   mysql_mutex_lock(&worker->jobs_lock);
 
   // possible WQ overfill
@@ -1662,6 +1670,8 @@ void append_item_to_jobs(slave_job_item
     rli->mts_pending_jobs_size -= ev_size;
     mysql_mutex_unlock(&rli->pending_jobs_lock);
   }
+
+  return (-1 != ret ? false : true);
 }
 
 

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2012-05-07 08:53:28 +0000
+++ b/sql/rpl_rli_pdb.h	2012-06-15 16:37:59 +0000
@@ -53,9 +53,7 @@ Slave_worker *map_db_to_worker(const cha
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
 int wait_for_workers_to_finish(Relay_log_info const *rli,
                                Slave_worker *ignore= NULL);
-bool critical_worker(Relay_log_info *rli);
 
-#define SLAVE_WORKER_QUEUE_SIZE 8096
 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
 
 #define NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER 2
@@ -346,7 +344,6 @@ public:
 
   int init_worker(Relay_log_info*, ulong);
   int rli_init_info(bool);
-  void end_info();
   int flush_info(bool force= FALSE);
   static size_t get_number_worker_fields();
   void slave_worker_ends_group(Log_event*, int);
@@ -359,6 +356,7 @@ protected:
                          const char *msg, va_list v_args) const;
 
 private:
+  void end_info();
   bool read_info(Rpl_info_handler *from);
   bool write_info(Rpl_info_handler *to);
   Slave_worker& operator=(const Slave_worker& info);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2012-06-13 11:16:44 +0000
+++ b/sql/rpl_slave.cc	2012-06-15 16:37:59 +0000
@@ -104,7 +104,7 @@ const ulong mts_coordinator_basic_nap= 5
 */
 const ulong mts_worker_underrun_level= 10;
 Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret);
-void append_item_to_jobs(slave_job_item *job_item,
+bool append_item_to_jobs(slave_job_item *job_item,
                          Slave_worker *w, Relay_log_info *rli);
 
 /*
@@ -169,6 +169,15 @@ failed read"
   }
 };
 
+enum enum_slave_apply_event_and_update_pos_retval
+{
+  SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK= 0,
+  SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR= 1,
+  SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR= 2,
+  SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR= 3,
+  SLAVE_APPLY_EVENT_AND_UPDATE_POS_MAX
+};
+
 
 static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
 static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
@@ -3192,14 +3201,22 @@ int ulong_cmp(ulong *id1, ulong *id2)
   @note MTS can store NULL to @c ptr_ev location to indicate
         the event is taken over by a Worker.
 
-  @retval 0 OK.
+  @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK
+          OK.
 
-  @retval 1 Error calling ev->apply_event().
+  @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR
+          Error calling ev->apply_event().
 
-  @retval 2 No error calling ev->apply_event(), but error calling
-  ev->update_pos().
+  @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR
+          No error calling ev->apply_event(), but error calling
+          ev->update_pos().
+
+  @retval SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR
+          append_item_to_jobs() failed, thread was killed while waiting
+          for successful enqueue on worker.
 */
-int apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli)
+enum enum_slave_apply_event_and_update_pos_retval
+apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli)
 {
   int exec_res= 0;
   bool skip_event= FALSE;
@@ -3276,7 +3293,7 @@ int apply_event_and_update_pos(Log_event
   {
     // Sleeps if needed, and unlocks rli->data_lock.
     if (sql_delay_event(ev, thd, rli))
-      DBUG_RETURN(0);
+      DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK);
 
     exec_res= ev->apply_event(rli);
 
@@ -3307,6 +3324,7 @@ int apply_event_and_update_pos(Log_event
           rli->last_assigned_worker= NULL;
         }
 
+        bool append_item_to_jobs_error= false;
         if (rli->curr_group_da.elements > 0)
         {
           /*
@@ -3319,7 +3337,10 @@ int apply_event_and_update_pos(Log_event
             get_dynamic(&rli->curr_group_da, (uchar*) &da_item.data, i);
             DBUG_PRINT("mts", ("Assigning job %llu to worker %lu",
                       ((Log_event* )da_item.data)->log_pos, w->id));
-            append_item_to_jobs(&da_item, w, rli);
+            if (!append_item_to_jobs_error)
+              append_item_to_jobs_error= append_item_to_jobs(&da_item, w, rli);
+            if (append_item_to_jobs_error)
+              delete static_cast<Log_event*>(da_item.data);
           }
           if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
           {
@@ -3330,12 +3351,15 @@ int apply_event_and_update_pos(Log_event
           }
           rli->curr_group_da.elements= 0;
         }
+        if (append_item_to_jobs_error)
+          DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR);
 
         DBUG_PRINT("mts", ("Assigning job %llu to worker %lu\n",
                    ((Log_event* )job_item->data)->log_pos, w->id));
 
         /* Notice `ev' instance can be destoyed after `append()' */
-        append_item_to_jobs(job_item, w, rli);
+        if (append_item_to_jobs(job_item, w, rli))
+          DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR);
         if (need_sync)
         {
           /*
@@ -3490,11 +3514,12 @@ int apply_event_and_update_pos(Log_event
                   " Stopped in %s position %s",
                   rli->get_group_relay_log_name(),
                   llstr(rli->get_group_relay_log_pos(), buf));
-      DBUG_RETURN(2);
+      DBUG_RETURN(SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR);
     }
   }
 
-  DBUG_RETURN(exec_res ? 1 : 0);
+  DBUG_RETURN(exec_res ? SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPLY_ERROR
+                       : SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK);
 }
 
 
@@ -3550,7 +3575,7 @@ static int exec_relay_log_event(THD* thd
   }
   if (ev)
   {
-    int exec_res;
+    enum enum_slave_apply_event_and_update_pos_retval exec_res;
 
     ptr_ev= &ev;
     /*
@@ -3639,11 +3664,18 @@ static int exec_relay_log_event(THD* thd
     }
 
     /*
-      update_log_pos failed: this should not happen, so we don't
-      retry.
+      exec_res == SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR
+                  update_log_pos failed: this should not happen, so we
+                  don't retry.
+      exec_res == SLAVE_APPLY_EVENT_AND_UPDATE_POS_APPEND_JOB_ERROR
+                  append_item_to_jobs() failed, this happened because
+                  thread was killed while waiting for enqueue on worker.
     */
-    if (exec_res == 2)
+    if (exec_res >= SLAVE_APPLY_EVENT_AND_UPDATE_POS_UPDATE_POS_ERROR)
+    {
+      delete ev;
       DBUG_RETURN(1);
+    }
 
     if (slave_trans_retries)
     {
@@ -3684,7 +3716,7 @@ static int exec_relay_log_event(THD* thd
                             errmsg);
           else
           {
-            exec_res= 0;
+            exec_res= SLAVE_APPLY_EVENT_AND_UPDATE_POS_OK;
             rli->cleanup_context(thd, 1);
             /* chance for concurrent connection to get more locks */
             slave_sleep(thd, min<ulong>(rli->trans_retries, MAX_SLAVE_RETRY_PAUSE),
@@ -3723,6 +3755,8 @@ static int exec_relay_log_event(THD* thd
                             rli->trans_retries));
       }
     }
+    if (exec_res)
+      delete ev;
     DBUG_RETURN(exec_res);
   }
   mysql_mutex_unlock(&rli->data_lock);
@@ -4525,7 +4559,6 @@ int mts_recovery_groups(Relay_log_info *
         Deletes the worker because its jobs are included in the latest
         checkpoint.
       */
-      worker->end_info();
       delete worker;
     }
   }
@@ -4714,7 +4747,6 @@ err:
   for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
   {
     get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
-    job_worker.worker->end_info();
     delete job_worker.worker;
   }
 
@@ -4888,6 +4920,8 @@ int slave_start_single_worker(Relay_log_
   pthread_t th;
   Slave_worker *w= NULL;
 
+  mysql_mutex_assert_owner(&rli->run_lock);
+
   if (!(w=
         Rpl_info_factory::create_worker(opt_rli_repository_id, i, rli, false)))
   {
@@ -4923,9 +4957,6 @@ int slave_start_single_worker(Relay_log_
 err:
   if (error && w)
   {
-    w->end_info();
-    if (w->jobs.inited_queue)
-      delete_dynamic(&(w->jobs.Q));
     delete w;
     /*
       Any failure after dynarray inserted must follow with deletion
@@ -4952,6 +4983,8 @@ int slave_start_workers(Relay_log_info *
   uint i;
   int error= 0;
 
+  mysql_mutex_assert_owner(&rli->run_lock);
+
   if (n == 0 && rli->mts_recovery_group_cnt == 0)
   {
     reset_dynamic(&rli->workers);
@@ -5052,6 +5085,8 @@ void slave_stop_workers(Relay_log_info *
   int i;
   THD *thd= rli->info_thd;
 
+  mysql_mutex_assert_owner(&rli->run_lock);
+
   if (!*mts_inited) 
     return;
   else if (rli->slave_parallel_workers == 0)
@@ -5130,10 +5165,7 @@ void slave_stop_workers(Relay_log_info *
       mysql_mutex_lock(&w->jobs_lock);
     }
     mysql_mutex_unlock(&w->jobs_lock);
-    w->end_info();
 
-    DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size);
-    delete_dynamic(&w->jobs.Q);
     delete_dynamic_element(&rli->workers, i);
     delete w;
   }
@@ -5490,6 +5522,7 @@ llstr(rli->get_group_master_log_pos(), l
 
  err:
 
+  mysql_mutex_lock(&rli->run_lock);
   slave_stop_workers(rli, &mts_inited); // stopping worker pool
   if (rli->recovery_groups_inited)
   {
@@ -5515,7 +5548,6 @@ llstr(rli->get_group_master_log_pos(), l
   thd->reset_db(NULL, 0);
 
   THD_STAGE_INFO(thd, stage_waiting_for_slave_mutex_on_exit);
-  mysql_mutex_lock(&rli->run_lock);
   /* We need data_lock, at least to wake up any waiting master_pos_wait() */
   mysql_mutex_lock(&rli->data_lock);
   DBUG_ASSERT(rli->slave_running == 1); // tracking buffer overrun

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-trunk branch (nuno.carvalho:4022 to 4023) Bug#13635612Nuno Carvalho18 Jun