List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 10 2010 4:26pm
Subject:bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3240)
View as plain text  
 3240 Andrei Elkin	2010-12-10 [merge]
      merge from wl5569 repo to a local branch

    modified:
      mysql-test/r/mysqld--help-notwin.result
      mysql-test/r/mysqld--help-win.result
      mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
      sql/dynamic_ids.cc
      sql/dynamic_ids.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def	2010-12-08 00:33:48 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def	2010-12-10 15:50:03 +0000
@@ -16,3 +16,4 @@ rpl_row_event_max_size    : Bug#55675 20
 rpl_delayed_slave         : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
 rpl_log_pos               : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
 rpl_trigger               : BUG#58258 2010-11-17 VasilDimov Valgrind: possibly lost from ib_bh_create()
+rpl_parallel_conf_limits  : wl#5599   9-12-2010 Andrei Waiting for the recovery wl

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-08 00:33:48 +0000
+++ b/sql/log_event.cc	2010-12-10 15:50:03 +0000
@@ -2416,15 +2416,23 @@ Slave_worker *Log_event::get_slave_worke
   // B or a DDL
   if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
   {
+    ulong gaq_idx;
+    const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+
     g.master_log_pos= log_pos;
     g.group_master_log_pos= g.group_relay_log_pos= 0;
+    g.group_master_log_name= NULL; // todo: remove
     g.group_relay_log_name= NULL;
     g.worker_id= (ulong) -1;
-    g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+    g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups;
+    g.checkpoint_log_name= NULL;
+    g.checkpoint_log_pos= 0;
+    g.checkpoint_seqno= (uint) -1;
 
     // the last occupied GAQ's array index
-    rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
-
+    gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
+    
+    DBUG_ASSERT(gaq_idx != (ulong) -1);
     DBUG_ASSERT(((Slave_job_group *) 
                  dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
                 group_relay_log_name == NULL);
@@ -2490,6 +2498,10 @@ Slave_worker *Log_event::get_slave_worke
   {
     uint i;
     mts_group_cnt= rli->gaq->assigned_group_index;
+    Slave_job_group *ptr_g=
+      (Slave_job_group *)
+      dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+
 
     // TODO: throw an error when relay-log reading starts from inside of a group!!
 
@@ -2501,11 +2513,6 @@ Slave_worker *Log_event::get_slave_worke
         Now group terminating event initiates the new name
         delivery through the current group relaylog slot in GAQ.
       */
-
-      Slave_job_group *ptr_g=
-        (Slave_job_group *)
-        dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
-
       DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
 
       ptr_g->group_relay_log_name= (char *)
@@ -2519,6 +2526,20 @@ Slave_worker *Log_event::get_slave_worke
       worker->relay_log_change_notified= TRUE;
     }
 
+    if (!worker->checkpoint_notified)
+    {
+      // Worker dealloc
+      ptr_g->checkpoint_log_name= (char *)
+        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+                         get_group_master_log_name()) + 1, MYF(MY_WME));
+      strcpy(ptr_g->checkpoint_log_name,
+             const_cast<Relay_log_info*>(rli)->get_group_master_log_name());
+      ptr_g->checkpoint_log_pos= const_cast<Relay_log_info*>(rli)->get_group_master_log_pos();
+      worker->checkpoint_notified= TRUE;
+    }
+    ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
+    const_cast<Relay_log_info*>(rli)->checkpoint_seqno++;
+
     DBUG_ASSERT(worker == rli->last_assigned_worker);
 
     if (!worker)
@@ -6641,7 +6662,11 @@ int Xid_log_event::do_apply_event(Relay_
   }
   else if (is_trans_repo && is_parallel)
   {
-    if ((error= w->commit_positions(this)))
+    ulong gaq_idx= mts_group_cnt;
+    Slave_job_group *ptr_g=
+      (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+
+    if ((error= w->commit_positions(this, ptr_g)))
       goto err;
   }
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-10 12:10:20 +0000
+++ b/sql/rpl_rli.cc	2010-12-10 16:25:27 +0000
@@ -89,52 +89,6 @@ Relay_log_info::Relay_log_info(bool is_s
   mysql_cond_init(key_checkpoint_stop_cond, &checkpoint_stop_cond, NULL);
   relay_log.init_pthread_objects();
 
-#if 0
-
- /*
-   Parallel slave parameters initialization is done regardless
-   whether the feature is or going to be active or not.
- */
-  trans_jobs= stmt_jobs= pending_jobs= wait_jobs= 0;
-  /*
-     parallel slave parameter to pospone Crdr reading when the number
-     of non-processed yet jobs becomes bigger than the limit's value
-     MHS_todo: consider a memory-size based param
-  */
-  mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
-
-  //
-  // TODO -- ANDREI --- You need to take care of possible failures related to
-  //                    allocation.
-  //
-  uint wi= 0;
-  key_mutex_slave_parallel_worker= new PSI_mutex_key[slave_parallel_workers];
-  key_cond_slave_parallel_worker= new PSI_cond_key[slave_parallel_workers];
-  worker_mutexes= new PSI_mutex_info[slave_parallel_workers];
-  worker_conds= new PSI_cond_info[slave_parallel_workers];
-  for (wi= 0; wi < slave_parallel_workers; wi++)
-  {
-     worker_mutexes[wi].m_key= (PSI_mutex_key *) &(key_mutex_slave_parallel_worker[wi]);
-     worker_mutexes[wi].m_name= "Slave_worker::jobs_lock";
-     worker_mutexes[wi].m_flags= 0;
-     worker_conds[wi].m_key= (PSI_cond_key *) &(key_cond_slave_parallel_worker[wi]);
-     worker_conds[wi].m_name= "Slave_worker::jobs_cond";
-     worker_conds[wi].m_flags= 0;
-  }
-  if (PSI_server)
-  {
-    PSI_server->register_mutex("worker", worker_mutexes,
-                               slave_parallel_workers);
-    PSI_server->register_cond("worker", worker_conds,
-                               slave_parallel_workers);
-  }
-  mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
-                   MY_MUTEX_INIT_FAST);
-  mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
-  my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
-
-#endif
-
   DBUG_VOID_RETURN;
 }
 
@@ -200,16 +154,6 @@ Relay_log_info::~Relay_log_info()
   mysql_cond_destroy(&log_space_cond);
   relay_log.cleanup();
 
-#if 0
-
-  mysql_mutex_destroy(&pending_jobs_lock);
-  mysql_cond_destroy(&pending_jobs_cond);
-
-  if (!this_worker)
-    delete_dynamic(&workers);
-
-#endif
-
   DBUG_VOID_RETURN;
 }
 
@@ -234,6 +178,27 @@ void Relay_log_info::reset_notified_rela
 }
 
 /**
+   Method is called in mts_checkpoint_routine()
+   to marks each Worker as requiring to adapt to a new checkpoint interval
+   whose coordinates is passed to it through GAQ index.
+
+   Worker notices the new checkpoint value at the group commit
+   to reset the current bitmap and set ON a bit number put by C into GAQ index
+   as the first group committed after the new checkpoint.
+*/
+void Relay_log_info::reset_notified_checkpoint()
+{
+  if (!is_parallel_exec())
+    return;
+  for (uint i= 0; i < workers.elements; i++)
+  {
+    Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
+    w->checkpoint_notified= FALSE;
+  }
+  checkpoint_seqno= 0;
+}
+
+/**
    The method can be run both by C having the Main (coord) rli context and
    by W having both the main and the Local (worker) rli context.
    Decision matrix:
@@ -1106,7 +1071,16 @@ void Relay_log_info::stmt_done(my_off_t 
     /* Alfranio needs to update the coordinator and workers. */
     
     if ((w= get_current_worker()) == NULL)
+    {
       flush_info(is_transactional() ? TRUE : FALSE);
+
+      /* 
+         The central recovery commit run in sequential mode forces
+         notification on the defacto new checkpoint.
+      */
+      if (is_parallel_exec())
+        reset_notified_checkpoint();
+    }
   }
 }
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-12-10 12:10:20 +0000
+++ b/sql/rpl_rli.h	2010-12-10 16:25:27 +0000
@@ -470,6 +470,7 @@ public:
      a new partition. Is updated at checkpoint commit to the main RLI.
   */
   DYNAMIC_ARRAY least_occupied_workers;
+  uint checkpoint_seqno;  // counter of groups executed after the most recent CP
 
   /* most of allocation in the coordinator rli is there */
   void init_workers(ulong);
@@ -489,9 +490,21 @@ public:
     return ret;
   }
 
+  /**
+     While a group is executed by a Worker the relay log can change.
+     Coordinator notifies Workers about this event. Worker is supposed
+     to commit to the recovery table with the new info.
+  */
   void reset_notified_relay_log_change();
 
   /**
+     While a group is executed by a Worker the relay log can change.
+     Coordinator notifies Workers about this event. Coordinator and Workers
+     maintain a bitmap of executed group that is reset with a new checkpoint. 
+  */
+  void reset_notified_checkpoint();
+
+  /**
     Helper function to do after statement completion.
 
     This function is called from an event to complete the group by

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-08 13:59:07 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-10 15:50:03 +0000
@@ -20,10 +20,11 @@ const char *info_slave_worker_fields []=
 
 Slave_worker::Slave_worker(const char* type, const char* pfs)
   : Rpl_info_worker(type, pfs), group_relay_log_pos(0),
-  group_master_log_pos(0)
+  group_master_log_pos(0), checkpoint_log_pos(0)
 {
   group_relay_log_name[0]= 0;
   group_master_log_name[0]= 0;
+  checkpoint_log_name[0]= 0;
   curr_group_exec_parts= new Database_ids(NAME_LEN);
 }
 
@@ -157,12 +158,24 @@ size_t Slave_worker::get_number_worker_f
   return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
 }
 
-bool Slave_worker::commit_positions(Log_event *ev)
+bool Slave_worker::commit_positions(Log_event *ev, Slave_job_group* ptr_g)
 {
   DBUG_ENTER("Slave_worker::checkpoint_positions");
 
   bool error= FALSE;
 
+  if (ptr_g->checkpoint_log_name != NULL)
+  {
+    strmake(checkpoint_log_name, ptr_g->checkpoint_log_name,
+            sizeof(checkpoint_log_name) - 1);
+    checkpoint_log_pos= ptr_g->checkpoint_log_pos;
+
+    my_free(ptr_g->checkpoint_log_name);
+    ptr_g->checkpoint_log_name= NULL;
+  }
+      
+  // TODO: update the group bitmap ptr_g->checkpoint_seqno 'th bit
+
   group_relay_log_pos= ev->future_event_relay_log_pos;
   group_master_log_pos= ev->log_pos;
   strmake(group_master_log_name, c_rli->get_group_master_log_name(),
@@ -502,16 +515,10 @@ void Slave_worker::slave_worker_ends_gro
       strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
     }
 
-    // GAQ is updated with the checkpoint info
-    
-    // delete ptr_g->group_relay_log_name;  // C allocated
-    // ptr_g->group_relay_log_name= NULL;   // mark freed
-
-
-    // TODO: as it's same as ev->update_pos(w_rli) remove the latter.
-
     if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
-      commit_positions(ev);
+    {
+      commit_positions(ev, ptr_g);
+    }
 
     ptr_g->group_master_log_pos= group_master_log_pos;
     ptr_g->group_relay_log_pos= group_relay_log_pos;
@@ -536,7 +543,7 @@ void Slave_worker::slave_worker_ends_gro
       my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
                                       (uchar*) key + 1, key[0]);
 
-    DBUG_ASSERT(entry && entry->usage != 0);
+    DBUG_ASSERT(entry && entry->usage != 0); // was used to break
     DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
 
     entry->usage--;
@@ -693,16 +700,18 @@ bool circular_buffer_queue::gt(ulong i, 
 /**
    The queue is processed from the head item by item
    to purge items representing committed groups.
-   Progress of each Worker is monitored through @c last_done
-   and @c last_group_done_index.
-   It's compared first against the polled
-   to break out of the loop at once if no progress.
+   Progress in GAQ is assessed through comparision of GAQ index value 
+   with Worker's @c last_group_done_index.
+   Purging breaks at a first discovered gap, that is an item
+   that the assinged item->w_id'th Worker has not completed yet.
 
    The caller is supposed to be the checkpoint handler.
 
    A copy of the last discarded item containing
    the refreshed value of the committed low-water-mark is stored
    into @c lwm container member for further caller's processing.
+   @last_done is updated with the latests total_seqno for each Worker
+   that was met during GAQ parse.
 
    @note dyn-allocated members of Slave_job_group such as
          group_relay_log_name as freed here.
@@ -712,27 +721,22 @@ bool circular_buffer_queue::gt(ulong i, 
 ulong Slave_committed_queue::move_queue_head(DYNAMIC_ARRAY *ws)
 {
   ulong i, cnt= 0;
+
   for (i= e; i != a && !empty();)
   {
     Slave_worker *w_i;
-    Slave_job_group *ptr_g;
-    ulong l;
+    Slave_job_group *ptr_g, g;
     char grl_name[FN_REFLEN];
+    ulong ind;
 
     grl_name[0]= 0;
     ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
     if (ptr_g->worker_id == (ulong) -1)
       break; /* the head is not even assigned */
     get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
-    get_dynamic(&last_done, (uchar *) &l, w_i->id);
-
-    DBUG_ASSERT(l <= s);
-
-    if (l == w_i->last_group_done_index)
-      break; /* no progress case */
-
-    DBUG_ASSERT(w_i->last_group_done_index >= i ||
-                (((i > a && e > a)  || a == s) && (w_i->last_group_done_index < a)));
+    
+    if (gt(i, w_i->last_group_done_index))
+      break; /* gap at i'th */
 
     // memorize the last met group_relay_log_name
     if (ptr_g->group_relay_log_name)
@@ -742,39 +746,39 @@ ulong Slave_committed_queue::move_queue_
       ptr_g->group_relay_log_name= NULL;   // mark freed
     }
 
-    if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
-    {
-      Slave_job_group g;
-      ulong ind= de_queue((uchar*) &g);
+    ind= de_queue((uchar*) &g);
+        
+    // stored the memorized name into result struct
+    if (grl_name[0] != 0)
+      strcpy(lwm.group_relay_log_name, grl_name);
+    else
+      lwm.group_relay_log_name[0]= 0;
+
+    DBUG_ASSERT(!ptr_g->group_relay_log_name);
 
-      // stored the memorized name into result struct
-      if (grl_name[0] != 0)
-        strcpy(lwm.group_relay_log_name, grl_name);
-      else
-        lwm.group_relay_log_name[0]= 0;
-
-      DBUG_ASSERT(!ptr_g->group_relay_log_name);
-
-      g.group_relay_log_name= lwm.group_relay_log_name;
-      lwm= g; // the result struct is done for the current iteration
-
-      /* todo/fixme: the least occupied sorting out can be triggered here */
-      /* e.g 
-         set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
-         sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
-                  
-         int ulong_cmp(ulong *id1, ulong *id2)
-         {
-            return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
-         }
-      */
-      DBUG_ASSERT(ind == i);
-      DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+    g.group_relay_log_name= lwm.group_relay_log_name;
+    lwm= g; // the result struct is done for the current iteration
 
-      set_dynamic(&last_done, (uchar*) &i, w_i->id);
+    /* todo/fixme: the least occupied sorting out can be triggered here */
+    /* e.g 
+       set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
+       sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
+       int ulong_cmp(ulong *id1, ulong *id2)
+       {
+       return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+       }
+    */
+    DBUG_ASSERT(ind == i);
+    DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
+#ifndef DBUG_OFF
+    {
+      ulonglong l;
+      get_dynamic(&last_done, (uchar *) &l, w_i->id);
+      DBUG_ASSERT(l < ptr_g->total_seqno); // there must be some progress
     }
-    else
-      break;
+#endif
+    set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
+
     cnt++;
     i= (i + 1) % s;
   }

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-10 12:10:20 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-10 16:25:27 +0000
@@ -95,7 +95,7 @@ public:
 
 typedef struct st_slave_job_group
 {
-  char     *group_master_log_name; // This is used upon recovery.
+  char     *group_master_log_name; // (actually redundant)
   Dynamic_ids *db_ids;             // This is used upon recovery.
 
   my_off_t master_log_pos;       // B-event log_pos
@@ -114,6 +114,11 @@ typedef struct st_slave_job_group
   char     *group_relay_log_name; // The value is last seen relay-log 
   ulong worker_id;
   ulonglong total_seqno;
+
+  /* checkpoint coord are reset by CP and rotate:s */
+  uint  checkpoint_seqno;
+  my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
+  char*    checkpoint_log_name;
 } Slave_job_group;
 
 #define get_job(from, to) \
@@ -155,9 +160,10 @@ public:
     : circular_buffer_queue(el_size, max, inc)
   {
     uint k;
-    my_init_dynamic_array(&last_done, sizeof(s), n, 0);
+    ulonglong l= 0;
+    my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
     for (k= 0; k < n; k++)
-      insert_dynamic(&last_done, (uchar*) &s);  // empty for each Worker
+      insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
     lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
     lwm.group_relay_log_name[0]= 0;
   }
@@ -222,6 +228,7 @@ public:
   volatile int curr_jobs; // the current assignments
   ulong usage_partition; // number of different partitions handled by this worker
   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
+  volatile bool checkpoint_notified; // Coord sets and resets, W can read
   bool wq_overrun_set;  // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
   /*
     We need to make this a dynamic field. /Alfranio
@@ -231,6 +238,8 @@ public:
   ulonglong group_relay_log_pos;
   char group_master_log_name[FN_REFLEN];
   ulonglong group_master_log_pos;
+  char checkpoint_log_name[FN_REFLEN];
+  ulonglong checkpoint_log_pos;
 
   int init_info();
   void end_info();
@@ -240,7 +249,7 @@ public:
 
   void slave_worker_ends_group(Log_event*, int);  // CGEP walk through to upd APH
 
-  bool commit_positions(Log_event *evt);
+  bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
 
 private:
   bool read_info(Rpl_info_handler *from);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-10 12:10:20 +0000
+++ b/sql/rpl_slave.cc	2010-12-10 16:25:27 +0000
@@ -3751,6 +3751,8 @@ pthread_handler_t handle_slave_worker(vo
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+    // Todo: add necessary stuff to clean up after Q-log-event, a Q trans
+    rli->cleanup_context(thd, error);
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3995,7 +3997,7 @@ bool mts_checkpoint_routine(Relay_log_in
 
   set_timespec_nsec(curr_clock, 0);
   ulong diff= diff_timespec(curr_clock, rli->last_clock);
-  if (diff < period)
+  if (diff < period && !rli->gaq->full())
   {
     /*
       We do not need to execute the checkpoint now because
@@ -4004,9 +4006,15 @@ bool mts_checkpoint_routine(Relay_log_in
     DBUG_RETURN(FALSE);
   }
       
-  if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
+  do
+  {
+    cnt= rli->gaq->move_queue_head(&rli->workers);
+  } while (cnt == 0 && rli->gaq->full() &&
+           (my_sleep(rli->mts_coordinator_basic_nap), 1));
+  if (cnt == 0)
     goto end;
 
+
   /* TODO: 
      to turn the least occupied selection in terms of jobs pieces
   */
@@ -4052,6 +4060,8 @@ bool mts_checkpoint_routine(Relay_log_in
   error= rli->flush_info(TRUE);
   // end of commit_positions
 
+  rli->reset_notified_checkpoint();
+
 end:
   set_timespec_nsec(rli->last_clock, 0);
   
@@ -4082,9 +4092,13 @@ int slave_start_single_worker(Relay_log_
   Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(rli->get_number_info_rli_fields());
   w->w_rli->set_rpl_info_handler(dummy_handler);
   w->init_info();
+  
+  // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
+  // entry->usage assert
+  w->curr_group_exec_parts->dynamic_ids.elements= 0;
 
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
-
+  w->checkpoint_notified= FALSE;
   w->w_rli->workers= rli->workers; // shallow copying is sufficient
   w->w_rli->this_worker= w;
 
@@ -4094,7 +4108,7 @@ int slave_start_single_worker(Relay_log_
   w->usage_partition= 0;
   w->last_group_done_index= rli->gaq->s; // out of range
 
-  w->jobs.s= rli->mts_slave_worker_queue_len_max + 1;
+  w->jobs.s= rli->mts_slave_worker_queue_len_max;
   my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
   for (k= 0; k < w->jobs.s; k++)
     insert_dynamic(&w->jobs.Q, (uchar*) &empty);
@@ -4154,16 +4168,17 @@ int slave_start_workers(Relay_log_info *
 
   // GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
   //      @c lwm_checkpoint_period to update GAQ (see @c next_event())
-  // The length of GAQ is derived from @c mts_slave_worker_queue_len_max to guarantee
-  // each assigned job being sent to a WQ will be represented by an item in GAQ.
-  // ::mts_slave_worker_queue_len_max is the worst case when all jobs contain
-  // one event and map to one worker.
-  rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
-                                      sizeof(Slave_job_group),
-                                      ::opt_mts_slave_worker_queue_len_max, n);
+  // The length of GAQ is derived from @c opt_mts_slave_worker_queue_len_max to guarantee
+  // each assigned job being sent to a WQ will find room in GAQ.
+  // opt_mts_slave_worker_queue_len_max * num-of-W:s is the max length case 
+  // all jobs contain one event.
 
   // size of WQ stays fixed in one slave session
   rli->mts_slave_worker_queue_len_max= ::opt_mts_slave_worker_queue_len_max;
+  rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
+                                      sizeof(Slave_job_group),
+                                      rli->slave_parallel_workers *
+                                      rli->mts_slave_worker_queue_len_max, n);
   rli->mts_pending_jobs_size= 0;
   rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
   rli->mts_wqs_underrun_w_id= (ulong) -1;
@@ -4173,6 +4188,8 @@ int slave_start_workers(Relay_log_info *
   rli->mts_total_groups= 0;
   rli->curr_group_seen_begin= 0;
   rli->run_query_in_parallel= opt_slave_run_query_in_parallel;
+  rli->checkpoint_seqno= 0;
+
   for (i= 0; i < n; i++)
   {
     if ((error= slave_start_single_worker(rli, i)))

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-12-10 12:10:20 +0000
+++ b/sql/sys_vars.cc	2010-12-10 16:25:27 +0000
@@ -3177,7 +3177,7 @@ static Sys_var_ulong Sys_mts_coordinator
        "Time in msec to sleep by MTS Coordinator to avoid the Worker queues "
        "room overrun",
        GLOBAL_VAR(opt_mts_coordinator_basic_nap), CMD_LINE(REQUIRED_ARG),
-       VALID_RANGE(0, ULONG_MAX), DEFAULT(0), BLOCK_SIZE(1));
+       VALID_RANGE(0, ULONG_MAX), DEFAULT(5), BLOCK_SIZE(1));
 static Sys_var_ulong Sys_mts_worker_underrun_level(
        "opt_mts_worker_underrun_level",
        "percent of Worker queue size at which Worker is considered to become "


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101210162527-3rrmlpigndb4fu6i.bundle
Thread
bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3240) Andrei Elkin10 Dec