List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 4 2010 5:14pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3229)
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped

 3229 Andrei Elkin	2010-12-04
      merging from the repo wl5569

    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      sql/log_event.cc
      sql/rpl_rli.cc
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/share/errmsg-utf8.txt
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-02 17:46:46 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-04 17:14:50 +0000
@@ -91,6 +91,11 @@ while($i)
 
 # Exec log position is not accurate in the prototype
 --sleep 2
+--disable_query_log
+--disable_result_log
+###select sleep(300);
+--enable_result_log
+--enable_query_log
 
 sync_slave_with_master;
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-02 18:13:12 +0000
+++ b/sql/log_event.cc	2010-12-04 17:14:50 +0000
@@ -2211,6 +2211,7 @@ Slave_worker *Log_event::get_slave_worke
   if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
   {
     g.master_log_pos= log_pos;
+    g.group_master_log_pos= g.group_relay_log_pos= 0;
     g.group_relay_log_name= NULL;
     g.worker_id= (ulong) -1;
     g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
@@ -2284,6 +2285,8 @@ Slave_worker *Log_event::get_slave_worke
     uint i;
     mts_group_cnt= rli->gaq->assigned_group_index;
 
+    // TODO: throw an error when relay-log reading starts from inside of a group!!
+
     if (!worker->relay_log_change_notified)
     {
       /*
@@ -2300,8 +2303,10 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
 
       ptr_g->group_relay_log_name= (char *)
-        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->get_group_relay_log_name()) + 1, MYF(MY_WME));
-      strcpy(ptr_g->group_relay_log_name, const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
+        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+                         get_group_relay_log_name()) + 1, MYF(MY_WME));
+      strcpy(ptr_g->group_relay_log_name,
+             const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
 
       DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
 
@@ -2416,15 +2421,15 @@ void append_item_to_jobs(slave_job_item 
                          Slave_worker *w, Relay_log_info *rli)
 {
   THD *thd= rli->info_thd;
-  int ret= -1;
-  ulonglong new_pend_size= rli->mts_pending_jobs_size +
-    ((Log_event*) (job_item->data))->data_written;
+  int ret;
+  ulong ev_size= ((Log_event*) (job_item->data))->data_written;
+  ulonglong new_pend_size;
 
   DBUG_ASSERT(thd == current_thd);
   thd_proc_info(thd, "Feeding an event to a worker thread");
 
   mysql_mutex_lock(&rli->pending_jobs_lock);
-
+  new_pend_size= rli->mts_pending_jobs_size + ev_size;
   // C waits basing on *data* sizes in the queues
   while (new_pend_size > rli->mts_pending_jobs_size_max)
   {
@@ -2440,11 +2445,12 @@ void append_item_to_jobs(slave_job_item 
                              wait_info);
     mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
     thd->exit_cond(old_msg);
-    mysql_mutex_lock(&rli->pending_jobs_lock);
     if (thd->killed)
       return;
-    new_pend_size= rli->mts_pending_jobs_size +
-      ((Log_event*) (job_item->data))->data_written;
+
+    mysql_mutex_lock(&rli->pending_jobs_lock);
+
+    new_pend_size= rli->mts_pending_jobs_size + ev_size;
   }
   rli->pending_jobs++;
   rli->mts_pending_jobs_size= new_pend_size;
@@ -2463,6 +2469,8 @@ void append_item_to_jobs(slave_job_item 
     my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
   }
 
+  ret= -1;
+
   mysql_mutex_lock(&w->jobs_lock);
 
   // possible WQ overfill
@@ -2495,9 +2503,10 @@ void append_item_to_jobs(slave_job_item 
   else
   {
     mysql_mutex_unlock(&w->jobs_lock);
+
     mysql_mutex_lock(&rli->pending_jobs_lock);
     rli->pending_jobs--;                  // roll back of the prev incr
-    rli->mts_pending_jobs_size -= new_pend_size;
+    rli->mts_pending_jobs_size -= ev_size;
     mysql_mutex_unlock(&rli->pending_jobs_lock);
   }
 }
@@ -2651,6 +2660,7 @@ int slave_worker_exec_job(Slave_worker *
   job_item= pop_jobs_item(w, job_item);
   if (thd->killed)
   {
+    // de-queueing and decrement counters is in the caller's exit branch
     error= -1;
     goto err;
   }
@@ -2701,9 +2711,7 @@ int slave_worker_exec_job(Slave_worker *
   {
     DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed  %lu", ev->mts_group_cnt, w->last_group_done_index));
 
-    w->slave_worker_ends_group(ev->mts_group_cnt, error); /* last done sets post exec */
-    if (!(ev->get_type_code() == XID_EVENT && w->is_transactional()))
-      w->commit_positions(ev);
+    w->slave_worker_ends_group(ev, error); /* last done sets post exec */
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -2720,6 +2728,7 @@ int slave_worker_exec_job(Slave_worker *
     preserving signatures of existing methods.
     todo: convert update_pos(w->w_rli) -> update_pos(w)
           to remove w_rli w/a
+    TODO: remove ?
   */
   if (!error)
     ev->update_pos(w->w_rli);
@@ -2769,16 +2778,13 @@ int slave_worker_exec_job(Slave_worker *
     mysql_cond_signal(&rli->pending_jobs_cond);
   }
   
-  //DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max);
-  // if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
-  //     rli->pending_jobs == 0)
-  //   mysql_cond_signal(&rli->pending_jobs_cond);
-  
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   w->stmt_jobs++;
+
 err:
 
+  // TODO!!! ANDREI to RESTORE
   // if (!ev)
   //    delete ev;  // after ev->update_pos() event is garbage
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-03 16:56:11 +0000
+++ b/sql/rpl_rli.cc	2010-12-04 17:14:50 +0000
@@ -152,7 +152,6 @@ void Relay_log_info::init_workers(ulong 
 {
   uint wi= 0;
 
-  slave_parallel_workers= n;
   /*
     Parallel slave parameters initialization is done regardless
     whether the feature is or going to be active or not.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-03 10:15:45 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-04 17:14:50 +0000
@@ -138,7 +138,8 @@ bool Slave_worker::write_info(Rpl_info_h
   */
 
   if (to->prepare_info_for_write() ||
-      to->set_info(curr_group_exec_parts) ||
+      //to->set_info(curr_group_exec_parts) ||
+      to->set_info("") ||
       to->set_info(group_relay_log_name) ||
       to->set_info((ulong)group_relay_log_pos) ||
       to->set_info(group_master_log_name) ||
@@ -474,9 +475,10 @@ Slave_worker *get_least_occupied_worker(
    TODO: reclaim space if the actual size exceeds the limit.
 */
 
-void Slave_worker::slave_worker_ends_group(ulong gaq_idx, int error)
+void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
 {
   int i;
+  ulong gaq_idx= ev->mts_group_cnt;
 
   if (!error)
   {
@@ -495,12 +497,27 @@ void Slave_worker::slave_worker_ends_gro
                   <= sizeof(group_relay_log_name));
 
       strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
-      delete ptr_g->group_relay_log_name;  // C allocated
-      ptr_g->group_relay_log_name= NULL;   // mark freed
     }
-    last_group_done_index = gaq_idx;
+
+    // GAQ is updated with the checkpoint info
+    
+    // delete ptr_g->group_relay_log_name;  // C allocated
+    // ptr_g->group_relay_log_name= NULL;   // mark freed
+
+
+    // TODO: as it's same as ev->update_pos(w_rli) remove the latter.
+
+    if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
+      commit_positions(ev);
+
+    ptr_g->group_master_log_pos= group_master_log_pos;
+    ptr_g->group_relay_log_pos= group_relay_log_pos;
+
+    last_group_done_index = gaq_idx;   // GAQ index is available to C now
   }
 
+  // cleanup relating to the last executed group regardless of error
+
   for (i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
   {
     db_worker *entry= NULL;
@@ -531,12 +548,6 @@ void Slave_worker::slave_worker_ends_gro
     }
     else
       DBUG_ASSERT(usage_partition != 0);
-    /*
-     TODO:
-       if U == 0 \and count(APH) > max 
-          delete from APH where U = 0;
-          delete entry;
-    */  
 
     mysql_mutex_unlock(&slave_worker_hash_lock);
 
@@ -659,13 +670,14 @@ bool circular_buffer_queue::gt(ulong i, 
    It's compared first against the polled
    to break out of the loop at once if no progress.
 
-
-   The caller is expected to be the checkpoint handler.
+   The caller is supposed to be the checkpoint handler.
 
    A copy of the last discarded item containing
    the refreshed value of the committed low-water-mark is stored
-   into @c lwm member for further caller's processing.
+   into @c lwm container member for further caller's processing.
 
+   @note dyn-allocated members of Slave_job_group such as
+         group_relay_log_name as freed here.
 
    @return number of discarded items
 */
@@ -675,12 +687,15 @@ ulong Slave_committed_queue::move_queue_
   for (i= e; i != a && !empty();)
   {
     Slave_worker *w_i;
-    Slave_job_group g;
+    Slave_job_group *ptr_g;
     ulong l;
-    get_dynamic(&Q, (uchar *) &g, i);
-    if (g.worker_id == (ulong) -1)
+    char grl_name[FN_REFLEN];
+
+    grl_name[0]= 0;
+    ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
+    if (ptr_g->worker_id == (ulong) -1)
       break; /* the head is not even assigned */
-    get_dynamic(ws, (uchar *) &w_i, g.worker_id);
+    get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
     get_dynamic(&last_done, (uchar *) &l, w_i->id);
 
     DBUG_ASSERT(l <= s);
@@ -691,9 +706,29 @@ ulong Slave_committed_queue::move_queue_
     DBUG_ASSERT(w_i->last_group_done_index >= i ||
                 (((i > a && e > a)  || a == s) && (w_i->last_group_done_index < a)));
 
+    // memorize the last met group_relay_log_name
+    if (ptr_g->group_relay_log_name)
+    {
+      strcpy(grl_name, ptr_g->group_relay_log_name);
+      my_free(ptr_g->group_relay_log_name);
+      ptr_g->group_relay_log_name= NULL;   // mark freed
+    }
+
     if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
     {
-      ulong ind= de_queue((uchar*) &lwm);
+      Slave_job_group g;
+      ulong ind= de_queue((uchar*) &g);
+
+      // stored the memorized name into result struct
+      if (grl_name[0] != 0)
+        strcpy(lwm.group_relay_log_name, grl_name);
+      else
+        lwm.group_relay_log_name[0]= 0;
+
+      DBUG_ASSERT(!ptr_g->group_relay_log_name);
+
+      g.group_relay_log_name= lwm.group_relay_log_name;
+      lwm= g; // the result struct is done for the current iteration
 
       /* todo/fixme: the least occupied sorting out can be triggered here */
       /* e.g 
@@ -706,7 +741,7 @@ ulong Slave_committed_queue::move_queue_
          }
       */
       DBUG_ASSERT(ind == i);
-      DBUG_ASSERT(g.total_seqno == lwm.total_seqno);
+      DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
 
       set_dynamic(&last_done, (uchar*) &i, w_i->id);
     }
@@ -751,7 +786,7 @@ int wait_for_workers_to_finish(Relay_log
       thd->exit_cond(proc_info);
       ret++;
 
-      DBUG_ASSERT(entry->usage == 0);
+      DBUG_ASSERT(entry->usage == 0 || thd->killed);
     }
     else
     {

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-04 17:14:50 +0000
@@ -91,7 +91,9 @@ public:
 
 typedef struct st_slave_job_group
 {
-  my_off_t master_log_pos; 
+  my_off_t master_log_pos;       // B-event log_pos
+  my_off_t group_master_log_pos; // T-event lop_pos filled by W for CheckPoint
+  my_off_t group_relay_log_pos;  // filled by W
 
   /* 
      When RL name changes C allocates and fill in a new name of RL,
@@ -99,11 +101,10 @@ typedef struct st_slave_job_group
      C keeps track of each Worker has been notified on the updating
      to make sure the routine runs once per change.
 
-     W checks the value at commit and memoriezes a not-NULL
-     with prior freeing old one's allocation. The memorized value
-     plays its role at commit until a new has arrived.
+     W checks the value at commit and memoriezes a not-NULL.
+     Freeing unless NULL is left to C at CheckPoint.
   */
-  char     *group_relay_log_name;
+  char     *group_relay_log_name; // The value is last seen relay-log 
   ulong worker_id;
   ulonglong total_seqno;
 } Slave_job_group;
@@ -139,11 +140,14 @@ public:
     my_init_dynamic_array(&last_done, sizeof(s), n, 0);
     for (k= 0; k < n; k++)
       insert_dynamic(&last_done, (uchar*) &s);  // empty for each Worker
+    lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
+    lwm.group_relay_log_name[0]= 0;
   }
 
   ~Slave_committed_queue ()
   { 
     delete_dynamic(&last_done);
+    my_free(lwm.group_relay_log_name);
   }
 
   /* Checkpoint routine refreshes the queue */
@@ -216,7 +220,7 @@ public:
 
   size_t get_number_worker_fields();
 
-  void slave_worker_ends_group(ulong, int);  // CGEP walk through to upd APH
+  void slave_worker_ends_group(Log_event*, int);  // CGEP walk through to upd APH
 
   bool commit_positions(Log_event *evt);
 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-04 15:45:02 +0000
+++ b/sql/rpl_slave.cc	2010-12-04 17:14:50 +0000
@@ -168,7 +168,7 @@ static int terminate_slave_thread(THD *t
                                   bool skip_lock);
 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-bool checkpoint_routine(Relay_log_info *rli);
+bool mts_checkpoint_routine(Relay_log_info *rli);
 
 /*
   Find out which replications threads are running
@@ -3571,6 +3571,8 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_lock(&rli->pending_jobs_lock);
   rli->pending_jobs -= purge_cnt;
   rli->mts_pending_jobs_size -= purge_size;
+  DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
+
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3593,22 +3595,27 @@ err:
   DBUG_RETURN(0); 
 }
 
-bool checkpoint_routine(Relay_log_info *rli)
+/**
+   Processing rli->gaq to find out the low-water-mark coordinates
+   stored into the cental recovery table.
+
+
+   @return FALSE success, TRUE otherwise
+*/
+bool mts_checkpoint_routine(Relay_log_info *rli)
 {
   bool error= FALSE;
+  ulong cnt;
 
   DBUG_ENTER("checkpoint_routine");
 
-  mysql_mutex_lock(&rli->data_lock); 
-
-  uint i;
-  rli->gaq->move_queue_head(&rli->workers);
+  if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
+    DBUG_RETURN(error);
 
   /* TODO: 
-     the least occupied sorting out needs moving to the actual
-     checkpoint location - next_event()
+     to turn the least occupied selection in terms of jobs pieces
   */
-  for (i= 0; i < rli->workers.elements; i++)
+  for (uint i= 0; i < rli->workers.elements; i++)
   {
     Slave_worker *w_i;
     get_dynamic(&rli->workers, (uchar *) &w_i, i);
@@ -3616,6 +3623,21 @@ bool checkpoint_routine(Relay_log_info *
   };
   sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
 
+  // Coordinator::commit_positions() {
+
+  // Alfranio, rli->gaq->lwm contains all but rli->group_master_log_name
+
+  // group_master_log_name is updated only by Coordinator and it can't change
+  // within checkpoint interval because Coordinator flushes the updated value
+  // at once.
+
+  mysql_mutex_lock(&rli->data_lock); 
+
+  rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
+  rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
+  if (rli->gaq->lwm.group_relay_log_name[0] != 0)
+    rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
+
   error= rli->flush_info(TRUE);
 
   mysql_mutex_unlock(&rli->data_lock);
@@ -3766,7 +3788,7 @@ void slave_stop_workers(Relay_log_info *
   int i;
   THD *thd= rli->info_thd;
 
-  if (rli->workers.elements == 0) 
+  if (rli->slave_parallel_workers == 0) 
     return;
   
   for (i= rli->workers.elements - 1; i >= 0; i--)
@@ -3875,7 +3897,7 @@ pthread_handler_t handle_slave_sql(void 
   pthread_detach_this_thread();
 
   /* mts-II: starting the worker pool */
-  if (slave_start_workers(rli, opt_slave_parallel_workers) != 0)
+  if (slave_start_workers(rli, rli->slave_parallel_workers) != 0)
     goto err;
   
   if (init_slave_thread(thd, SLAVE_THD_SQL))
@@ -5321,7 +5343,7 @@ static Log_event* next_event(Relay_log_i
             ulong diff= diff_timespec(rli->curr_clock, rli->last_clock);
             if (diff > period)
             {
-               checkpoint_routine(rli);
+               mts_checkpoint_routine(rli);
                set_timespec_nsec(rli->last_clock, 0);
             }
             set_timespec_nsec(waittime, period);
@@ -5707,6 +5729,12 @@ int start_slave(THD* thd , Master_info* 
       */
       if (thread_mask & SLAVE_SQL)
       {
+        /*
+          To cache the system var value and used it in the following.
+          The system var can change but not the cached.
+        */
+        mi->rli->slave_parallel_workers= opt_slave_parallel_workers;
+
         mysql_mutex_lock(&mi->rli->data_lock);
 
         if (thd->lex->mi.pos)
@@ -5759,6 +5787,13 @@ int start_slave(THD* thd , Master_info* 
             push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                          ER_MISSING_SKIP_SLAVE,
                          ER(ER_MISSING_SKIP_SLAVE));
+          if (mi->rli->slave_parallel_workers != 0)
+          {
+            mi->rli->slave_parallel_workers= 0;
+            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+                         ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE,
+                         ER(ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE));
+          }
         }
 
         mysql_mutex_unlock(&mi->rli->data_lock);

=== modified file 'sql/share/errmsg-utf8.txt'
--- a/sql/share/errmsg-utf8.txt	2010-10-25 10:39:01 +0000
+++ b/sql/share/errmsg-utf8.txt	2010-12-04 17:14:50 +0000
@@ -6399,3 +6399,6 @@ ER_RPL_INFO_DATA_TOO_LONG
   eng "Data for column '%s' too long"
 ER_CANT_LOCK_RPL_INFO_TABLE
         eng "You can't use locks with rpl info tables."
+
+ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE
+  eng "Until condition is not supported in Parallel Slave. Slave is started in the sequential mode."


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101204171450-bvuh8o3qb4h1lkt7.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3229) Andrei Elkin4 Dec