List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 10 2010 3:50pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3239) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3239 Andrei Elkin	2010-12-10
      wl#5569 MTS
      
      Improving GAQ in a) limit size to be capable to hold items while all WQ:s are full
      b) move_queue_head() contained a flaw to make no progress falsely
      c) never let to enque in GAQ while it's full
     @ sql/log_event.cc
        Fixing impossible gaq_idx == -1. GAQ may not be full at this point.
        The total counter of executed groups starts from 1, that is nothing is done yet when 0.
     @ sql/rpl_rli_pdb.cc
        move_queue_head() contained a flaw to break the progress loop falsely.
        Fixed with comparing the current index with the Worker::last_group_done_index
        instead of this->last_done. The latter changed to become of pure statictics character and
        to contain the total seqno which is guaranteed to grow monotonically by its ulonglong size.
     @ sql/rpl_rli_pdb.h
        changes due to last_done turned into statistics holder.
     @ sql/rpl_slave.cc
        Improving GAQ in limit size to be capable to hold items while all WQ:s are full.
        Wait to release item at checkpoint() when GAQ is full.
     @ sql/sys_vars.cc
        opt_mts_coordinator_basic_nap is set to non-zero 5 msecs default value.

    modified:
      mysql-test/suite/rpl/t/disabled.def
      sql/log_event.cc
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def	2010-12-08 00:33:48 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def	2010-12-10 15:50:03 +0000
@@ -16,3 +16,4 @@ rpl_row_event_max_size    : Bug#55675 20
 rpl_delayed_slave         : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
 rpl_log_pos               : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
 rpl_trigger               : BUG#58258 2010-11-17 VasilDimov Valgrind: possibly lost from ib_bh_create()
+rpl_parallel_conf_limits  : wl#5599   9-12-2010 Andrei Waiting for the recovery wl

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-09 17:45:02 +0000
+++ b/sql/log_event.cc	2010-12-10 15:50:03 +0000
@@ -2416,19 +2416,23 @@ Slave_worker *Log_event::get_slave_worke
   // B or a DDL
   if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
   {
+    ulong gaq_idx;
+    const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+
     g.master_log_pos= log_pos;
     g.group_master_log_pos= g.group_relay_log_pos= 0;
     g.group_master_log_name= NULL; // todo: remove
     g.group_relay_log_name= NULL;
     g.worker_id= (ulong) -1;
-    g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+    g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups;
     g.checkpoint_log_name= NULL;
     g.checkpoint_log_pos= 0;
     g.checkpoint_seqno= (uint) -1;
 
     // the last occupied GAQ's array index
-    rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
-
+    gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
+    
+    DBUG_ASSERT(gaq_idx != (ulong) -1);
     DBUG_ASSERT(((Slave_job_group *) 
                  dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
                 group_relay_log_name == NULL);

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-09 17:45:02 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-10 15:50:03 +0000
@@ -700,16 +700,18 @@ bool circular_buffer_queue::gt(ulong i, 
 /**
    The queue is processed from the head item by item
    to purge items representing committed groups.
-   Progress of each Worker is monitored through @c last_done
-   and @c last_group_done_index.
-   It's compared first against the polled
-   to break out of the loop at once if no progress.
+   Progress in GAQ is assessed through comparision of GAQ index value 
+   with Worker's @c last_group_done_index.
+   Purging breaks at a first discovered gap, that is an item
+   that the assinged item->w_id'th Worker has not completed yet.
 
    The caller is supposed to be the checkpoint handler.
 
    A copy of the last discarded item containing
    the refreshed value of the committed low-water-mark is stored
    into @c lwm container member for further caller's processing.
+   @last_done is updated with the latests total_seqno for each Worker
+   that was met during GAQ parse.
 
    @note dyn-allocated members of Slave_job_group such as
          group_relay_log_name as freed here.
@@ -719,27 +721,22 @@ bool circular_buffer_queue::gt(ulong i, 
 ulong Slave_committed_queue::move_queue_head(DYNAMIC_ARRAY *ws)
 {
   ulong i, cnt= 0;
+
   for (i= e; i != a && !empty();)
   {
     Slave_worker *w_i;
-    Slave_job_group *ptr_g;
-    ulong l;
+    Slave_job_group *ptr_g, g;
     char grl_name[FN_REFLEN];
+    ulong ind;
 
     grl_name[0]= 0;
     ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
     if (ptr_g->worker_id == (ulong) -1)
       break; /* the head is not even assigned */
     get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
-    get_dynamic(&last_done, (uchar *) &l, w_i->id);
-
-    DBUG_ASSERT(l <= s);
-
-    if (l == w_i->last_group_done_index)
-      break; /* no progress case */
-
-    DBUG_ASSERT(w_i->last_group_done_index >= i ||
-                (((i > a && e > a)  || a == s) && (w_i->last_group_done_index < a)));
+    
+    if (gt(i, w_i->last_group_done_index))
+      break; /* gap at i'th */
 
     // memorize the last met group_relay_log_name
     if (ptr_g->group_relay_log_name)
@@ -749,39 +746,39 @@ ulong Slave_committed_queue::move_queue_
       ptr_g->group_relay_log_name= NULL;   // mark freed
     }
 
-    if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
-    {
-      Slave_job_group g;
-      ulong ind= de_queue((uchar*) &g);
+    ind= de_queue((uchar*) &g);
+        
+    // stored the memorized name into result struct
+    if (grl_name[0] != 0)
+      strcpy(lwm.group_relay_log_name, grl_name);
+    else
+      lwm.group_relay_log_name[0]= 0;
+
+    DBUG_ASSERT(!ptr_g->group_relay_log_name);
 
-      // stored the memorized name into result struct
-      if (grl_name[0] != 0)
-        strcpy(lwm.group_relay_log_name, grl_name);
-      else
-        lwm.group_relay_log_name[0]= 0;
-
-      DBUG_ASSERT(!ptr_g->group_relay_log_name);
-
-      g.group_relay_log_name= lwm.group_relay_log_name;
-      lwm= g; // the result struct is done for the current iteration
-
-      /* todo/fixme: the least occupied sorting out can be triggered here */
-      /* e.g 
-         set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
-         sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
-                  
-         int ulong_cmp(ulong *id1, ulong *id2)
-         {
-            return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
-         }
-      */
-      DBUG_ASSERT(ind == i);
-      DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+    g.group_relay_log_name= lwm.group_relay_log_name;
+    lwm= g; // the result struct is done for the current iteration
 
-      set_dynamic(&last_done, (uchar*) &i, w_i->id);
+    /* todo/fixme: the least occupied sorting out can be triggered here */
+    /* e.g 
+       set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
+       sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
+       int ulong_cmp(ulong *id1, ulong *id2)
+       {
+       return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+       }
+    */
+    DBUG_ASSERT(ind == i);
+    DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+#ifndef DBUG_OFF
+    {
+      ulonglong l;
+      get_dynamic(&last_done, (uchar *) &l, w_i->id);
+      DBUG_ASSERT(l < ptr_g->total_seqno); // there must be some progress
     }
-    else
-      break;
+#endif
+    set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
+
     cnt++;
     i= (i + 1) % s;
   }

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-09 17:46:27 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-10 15:50:03 +0000
@@ -188,9 +188,10 @@ public:
     : circular_buffer_queue(el_size, max, inc)
   {
     uint k;
-    my_init_dynamic_array(&last_done, sizeof(s), n, 0);
+    ulonglong l= 0;
+    my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
     for (k= 0; k < n; k++)
-      insert_dynamic(&last_done, (uchar*) &s);  // empty for each Worker
+      insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
     lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
     lwm.group_relay_log_name[0]= 0;
   }

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-09 17:46:27 +0000
+++ b/sql/rpl_slave.cc	2010-12-10 15:50:03 +0000
@@ -3850,7 +3850,7 @@ bool mts_checkpoint_routine(Relay_log_in
 
   set_timespec_nsec(curr_clock, 0);
   ulong diff= diff_timespec(curr_clock, rli->last_clock);
-  if (diff < period)
+  if (diff < period && !rli->gaq->full())
   {
     /*
       We do not need to execute the checkpoint now because
@@ -3859,9 +3859,15 @@ bool mts_checkpoint_routine(Relay_log_in
     DBUG_RETURN(FALSE);
   }
       
-  if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
+  do
+  {
+    cnt= rli->gaq->move_queue_head(&rli->workers);
+  } while (cnt == 0 && rli->gaq->full() &&
+           (my_sleep(rli->mts_coordinator_basic_nap), 1));
+  if (cnt == 0)
     goto end;
 
+
   /* TODO: 
      to turn the least occupied selection in terms of jobs pieces
   */
@@ -3955,7 +3961,7 @@ int slave_start_single_worker(Relay_log_
   w->usage_partition= 0;
   w->last_group_done_index= rli->gaq->s; // out of range
 
-  w->jobs.s= rli->mts_slave_worker_queue_len_max + 1;
+  w->jobs.s= rli->mts_slave_worker_queue_len_max;
   my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
   for (k= 0; k < w->jobs.s; k++)
     insert_dynamic(&w->jobs.Q, (uchar*) &empty);
@@ -4015,16 +4021,17 @@ int slave_start_workers(Relay_log_info *
 
   // GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
   //      @c lwm_checkpoint_period to update GAQ (see @c next_event())
-  // The length of GAQ is derived from @c mts_slave_worker_queue_len_max to guarantee
-  // each assigned job being sent to a WQ will be represented by an item in GAQ.
-  // ::mts_slave_worker_queue_len_max is the worst case when all jobs contain
-  // one event and map to one worker.
-  rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
-                                      sizeof(Slave_job_group),
-                                      ::opt_mts_slave_worker_queue_len_max, n);
+  // The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max to guarantee
+  // each assigned job being sent to a WQ will find room in GAQ.
+  // opt_mts_slave_worker_queue_len_max * num-of-W:s is the max length case 
+  // all jobs contain one event.
 
   // size of WQ stays fixed in one slave session
   rli->mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
+  rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
+                                      sizeof(Slave_job_group),
+                                      rli->slave_parallel_workers *
+                                      rli->mts_slave_worker_queue_len_max, n);
   rli->mts_pending_jobs_size= 0;
   rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
   rli->mts_wqs_underrun_w_id= (ulong) -1;

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-12-09 13:23:19 +0000
+++ b/sql/sys_vars.cc	2010-12-10 15:50:03 +0000
@@ -3170,7 +3170,7 @@ static Sys_var_ulong Sys_mts_coordinator
        "Time in msec to sleep by MTS Coordinator to avoid the Worker queues "
        "room overrun",
        GLOBAL_VAR(opt_mts_coordinator_basic_nap), CMD_LINE(REQUIRED_ARG),
-       VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
+       VALID_RANGE(0, ULONG_MAX), DEFAULT(5), BLOCK_SIZE(1));
 static Sys_var_ulong Sys_mts_worker_underrun_level(
        "opt_mts_worker_underrun_level",
        "percent of Worker queue size at which Worker is considered to become "


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101210155003-pknsk1xyuov5y54c.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3239) WL#5569Andrei Elkin10 Dec