List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:June 27 2011 5:32pm
Subject:bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3324 to 3325) WL#5569
View as plain text  
 3325 Andrei Elkin	2011-06-27
      wl#5569 MTS
      
      Cleanup and addressing sporadic rpl_temp_table_mix_row failure in 
      post-execution mtr.check_testcase().
      
      The check of the test failure was caused by faulty optimization in
      avoiding to migrate temporary tables from Coordinator to Workers in
      case of rows-event assignement.
      while it's correct with the homogenous rows-event only load, the mixture
      can fail.
      Fixed with removing the optimization so map_db_to_worker() always
      relocates which is somewhat suboptimal and should be improved in future.
     @ mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test
        Adding slave synchronization.
     @ sql/log_event.cc
        cleanup to move circular_buffer releated definitions into rpl_rli_pdb that is specialized
        on objects dealing with Worker, its assignement etc.
        improving comments;
        also instead of former separate flag indicating a T-event requires post-scheduling synchronization
        with the Worker is turned into a bit of existing Log_event::flags which also avoids ungliness of
        #if/#endif:s.
     @ sql/log_event.h
        instead of former separate flag indicating a T-event requires post-scheduling synchronization
        with the Worker is turned into a bit of existing Log_event::flags;
     @ sql/rpl_rli.cc
        cleanup: renaming.
     @ sql/rpl_rli.h
        cleanup: renaming, more comments.
        The former mts_wqs_overrun is converted into two: the statistics parameter mts_wq_overrun_cnt
        and the internal control parameter mts_wq_excess.
     @ sql/rpl_rli_pdb.cc
        Included rpl_slave.h that holds two necessary declarations;
        Cleanup: accepting circular_buffer related definitions migrated from log_event,
                 improved comments, renaming, removing dead code
     @ sql/rpl_rli_pdb.h
        Cleanup: renaming and more comments are added.
     @ sql/rpl_slave.cc
        Augmenting print-out of statistics at the end of MTS session;
        cleanup: renaming.
     @ sql/rpl_slave.h
        Introducing two constants to define range of worker_id domain and
        a magic value of undefined worker.
     @ sql/sys_vars.cc
        replacing a literal int value with a symbilic constant.

    modified:
      mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/rpl_slave.h
      sql/sys_vars.cc
 3324 Alfranio Correia	2011-06-27
      Ensured that updates to the worker_info_repository are transactional and fixed
      the slave_checkpoint_group_basic test case.

    modified:
      mysql-test/extra/rpl_tests/rpl_crash_safe.inc
      mysql-test/suite/rpl/t/rpl_row_crash_safe-slave.opt
      mysql-test/suite/rpl/t/rpl_row_crash_safe.test
      mysql-test/suite/rpl/t/rpl_stm_mixed_crash_safe-slave.opt
      mysql-test/suite/rpl/t/rpl_stm_mixed_crash_safe.test
      mysql-test/suite/sys_vars/r/slave_checkpoint_group_basic.result
      mysql-test/suite/sys_vars/t/slave_checkpoint_group_basic.test
      sql/log_event.cc
      sql/rpl_slave.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test'
--- a/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test	2010-12-19 17:15:12 +0000
+++ b/mysql-test/suite/rpl/t/rpl_temp_table_mix_row.test	2011-06-27 17:31:45 +0000
@@ -207,4 +207,7 @@ source include/show_binlog_events.inc;
 --echo
 connection master;
 DROP TABLE t1;
+
+sync_slave_with_master;
+
 --source include/rpl_end.inc

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-06-27 12:12:52 +0000
+++ b/sql/log_event.cc	2011-06-27 17:31:45 +0000
@@ -672,9 +672,6 @@ Log_event::Log_event(enum_event_cache_ty
   :temp_buf(0), exec_time(0), flags(0), event_cache_type(cache_type_arg),
   event_logging_type(logging_type_arg), crc(0), thd(0),
   checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
-#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
-  , m_mts_event_isolated_group(FALSE)
-#endif
 {
   server_id=	::server_id;
   /*
@@ -697,9 +694,6 @@ Log_event::Log_event(const char* buf,
   event_cache_type(EVENT_INVALID_CACHE),
   event_logging_type(EVENT_INVALID_LOGGING),
   crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
-#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
-  , m_mts_event_isolated_group(FALSE)
-#endif
 {
 #ifndef MYSQL_CLIENT
   thd = 0;
@@ -2368,12 +2362,8 @@ bool Log_event::contains_partition_info(
 }
 
 /**
-   General hashing function to compute the id of an applier for
-   the current event.
-   At computing the id few rules apply depending on partitioning properties
-   that the event instance can feature.
-
-   Let's call the properties through the following legends:
+   The method maps the event to a Worker and return a pointer to it.
+   As a part of the group, an event belongs to one of the following types:
 
    B - beginning of a group of events (BEGIN query_log_event)
    g - mini-group representative event containing the partition info
@@ -2382,30 +2372,33 @@ bool Log_event::contains_partition_info(
       (int_, rand_, user_ var:s) 
    r - a mini-group internal "regular" event that follows its g-parent
       (Delete, Update, Write -rows)
-   S - sequentially applied event (may not be a part of any group).
-       Events of this type are determined via @c mts_sequential_exec()
-       earlier and don't cause calling this method .
-   T - terminator of the group (XID, COMMIT, ROLLBACK)
-
-   Only `g' case really computes the assigned Worker id which must
-   be memorized by the caller and is available through @c rli argument.
-   For instance DUW-rows events are mapped to a Worker previously chosen
-   at assigning of their Table-map parent g-event.
-   In `B' case the assigned Worker is NULL to indicate the Coordinator will
-   postpone scheduling until a following `g' event decides on a Worker.
+   T - terminator of the group (XID, COMMIT, ROLLBACK, auto-commit query)
+
+   Only the first g-type event computes the assigned Worker which once 
+   is determined remains to be for the rest of the group.
+   That is the g-type event solely carries partitioning info.
+   For B-type the assigned Worker is NULL to indicate Coordinator 
+   has not yet decided. The same applies to p-type.
    
-   A group can consist of multiple events still without explict B
-   event.  This is a case of old master binlog or few corner-cases of
-   the current master version (todo: to fix).  Such group structure is
-   supposed to be {{p_i},g} that is it ends with the first not p-event.
-   Such g-event is marked with set_mts_event_ends_group().
+   Notice, these is a special group consisting of optionally multiple p-events
+   terminating with a g-event.
+   Such case is caused by old master binlog and a few corner-cases of
+   the current master version (todo: to fix). 
+
+   In case of the event accesses more than OVER_MAX_DBS the method
+   has to ensure sure previously assigned groups to all other workers are
+   done.
 
-   @note The function can update APH, CGAP, GAQ objects.
+
+   @note The function can update APH (through map_db_to_worker()), GAQ objects
+         and relocate some temporary tables from Coordinator's list into
+         involved entries of APH.
+         There's few memory allocations commented where to be freed.
    
-   @return a pointer to the Worker stuct or NULL.
+   @return a pointer to the Worker struct or NULL.
 */
 
-Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli)
+Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli)
 {
   Slave_job_group g, *ptr_g;
   bool is_b_event;
@@ -2424,7 +2417,7 @@ Slave_worker *Log_event::get_slave_worke
        (rli->gaq->empty() ||
         ((Slave_job_group *)
          dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
-        worker_id != (ulong) -1)))
+        worker_id != MTS_WORKER_UNDEF)))
   {
     ulong gaq_idx;
     rli->mts_total_groups++;
@@ -2434,7 +2427,7 @@ Slave_worker *Log_event::get_slave_worke
     g.group_master_log_pos= g.group_relay_log_pos= 0;
     g.group_master_log_name= NULL; // todo: remove
     g.group_relay_log_name= NULL;
-    g.worker_id= (ulong) -1;
+    g.worker_id= MTS_WORKER_UNDEF;
     g.total_seqno= rli->mts_total_groups;
     g.checkpoint_log_name= NULL;
     g.checkpoint_log_pos= 0;
@@ -2446,11 +2439,11 @@ Slave_worker *Log_event::get_slave_worke
     // the last occupied GAQ's array index
     gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
     
-    DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s);
+    DBUG_ASSERT(gaq_idx != MTS_WORKER_UNDEF && gaq_idx < rli->gaq->size);
     DBUG_ASSERT(((Slave_job_group *) 
                  dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
                 group_relay_log_name == NULL);
-    DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
+    DBUG_ASSERT(rli->gaq->assigned_group_index != MTS_WORKER_UNDEF); // gaq must have room
     DBUG_ASSERT(rli->last_assigned_worker == NULL);
 
     if (is_b_event)
@@ -2485,7 +2478,9 @@ Slave_worker *Log_event::get_slave_worke
       if (!ret_worker)
         ret_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
       (void) wait_for_workers_to_finish(rli, ret_worker);
-
+      /*
+        this marking is transferred further into T-event of the current group.
+      */
       rli->curr_group_isolated= TRUE;
     }
 
@@ -2494,10 +2489,14 @@ Slave_worker *Log_event::get_slave_worke
       char **ref_cur_db= it.ref();
       
       if (!(ret_worker=
-            get_slave_worker(*ref_cur_db, rli,
+            map_db_to_worker(*ref_cur_db, rli,
                              &mts_assigned_partitions[i],
-                             // only rows-events do not need temporary tables
-                             get_type_code() != TABLE_MAP_EVENT, ret_worker)))
+                             /*
+                               todo: optimize it. Although pure 
+                               rows- event load in insensetive to the flag value
+                             */
+                             TRUE,
+                             ret_worker)))
       {
         llstr(rli->get_event_relay_log_pos(), llbuff);
         rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
@@ -2518,7 +2517,7 @@ Slave_worker *Log_event::get_slave_worke
     if ((ptr_g= ((Slave_job_group *)
                  dynamic_array_ptr(&rli->gaq->Q,
                                    rli->gaq->assigned_group_index)))->worker_id
-        == (ulong) -1)
+        == MTS_WORKER_UNDEF)
     {
       ptr_g->worker_id= ret_worker->id;
       
@@ -2582,20 +2581,14 @@ Slave_worker *Log_event::get_slave_worke
     }
   }
 
-  // the group terminal event:
-  // Commit, Xid, a DDL query or dml query of B-less group.
+  // T-event: Commit, Xid, a DDL query or dml query of B-less group.
   if (ends_group() || !rli->curr_group_seen_begin)
   {
     // index of GAQ that this terminal event belongs to
     mts_group_cnt= rli->gaq->assigned_group_index;
-    /*
-      special marking for T event of a group containing over-max db:s event
-      including {p,g} B-less group.
-    */
+    rli->mts_group_status= Relay_log_info::MTS_END_GROUP;
     if (rli->curr_group_isolated)
       mts_do_isolate_group();
-    rli->mts_group_status= Relay_log_info::MTS_END_GROUP;
-
     ptr_g= (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
@@ -2650,7 +2643,7 @@ Slave_worker *Log_event::get_slave_worke
       ret_worker->checkpoint_notified= TRUE;
     }
     ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
-    ptr_g->ts= when + (time_t) exec_time;
+    ptr_g->ts= when + (time_t) exec_time;       // Seconds_behind_master related
     rli->checkpoint_seqno++;
 
     // reclaiming resources allocated during the group scheduling
@@ -2660,179 +2653,6 @@ Slave_worker *Log_event::get_slave_worke
   return ret_worker;
 }
 
-// returns the next available! (TODO: incompatible to circurla_buff method!!!)
-static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
-{
-  if (jobs->a == jobs->s)
-  {
-    DBUG_ASSERT(jobs->a == jobs->Q.elements);
-    return -1;
-  }
-
-  // store
-
-  set_dynamic(&jobs->Q, (uchar*) item, jobs->a);
-
-  // pre-boundary cond
-  if (jobs->e == jobs->s)
-    jobs->e= jobs->a;
-  
-  jobs->a= (jobs->a + 1) % jobs->s;
-  jobs->len++;
-
-  // post-boundary cond
-  if (jobs->a == jobs->e)
-    jobs->a= jobs->s;
-  DBUG_ASSERT(jobs->a == jobs->e || 
-              jobs->len == (jobs->a >= jobs->e) ?
-              (jobs->a - jobs->e) : (jobs->s + jobs->a - jobs->e));
-  return jobs->a;
-}
-
-/**
-   return the value of @c data member of the head of the queue.
-*/
-static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
-{
-  if (jobs->e == jobs->s)
-  {
-    DBUG_ASSERT(jobs->len == 0);
-    ret->data= NULL;               // todo: move to caller
-    return NULL;
-  }
-  get_dynamic(&jobs->Q, (uchar*) ret, jobs->e);
-
-  DBUG_ASSERT(ret->data);         // todo: move to caller
- 
-  return ret;
-}
-
-
-/**
-   return a job item through a struct which point is supplied via argument.
-*/
-Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
-{
-  if (jobs->e == jobs->s)
-  {
-    DBUG_ASSERT(jobs->len == 0);
-    return NULL;
-  }
-  get_dynamic(&jobs->Q, (uchar*) ret, jobs->e);
-  jobs->len--;
-  
-  // pre boundary cond
-  if (jobs->a == jobs->s)
-    jobs->a= jobs->e;
-  jobs->e= (jobs->e + 1) % jobs->s;
-
-  // post boundary cond
-  if (jobs->a == jobs->e)
-    jobs->e= jobs->s;
-
-  DBUG_ASSERT(jobs->e == jobs->s ||
-              (jobs->len == (jobs->a >= jobs->e)? (jobs->a - jobs->e) :
-               (jobs->s + jobs->a - jobs->e)));
-
-  return ret;
-}
-
-void append_item_to_jobs(slave_job_item *job_item,
-                         Slave_worker *w, Relay_log_info *rli)
-{
-  THD *thd= rli->info_thd;
-  int ret;
-  ulong ev_size= ((Log_event*) (job_item->data))->data_written;
-  ulonglong new_pend_size;
-
-  DBUG_ASSERT(thd == current_thd);
-  thd_proc_info(thd, "Feeding an event to a worker thread");
-
-  mysql_mutex_lock(&rli->pending_jobs_lock);
-  new_pend_size= rli->mts_pending_jobs_size + ev_size;
-  // C waits basing on *data* sizes in the queues
-  while (new_pend_size > rli->mts_pending_jobs_size_max)
-  {
-    const char *old_msg;
-    const char info_format[]=
-      "Waiting for Slave Workers to free pending events, requested size %lu";
-    char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)];
-
-    sprintf(wait_info, info_format, new_pend_size);
-    rli->mts_wqs_oversize= TRUE;
-    rli->wait_jobs++; // waiting due to the total size
-    old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
-                             wait_info);
-    mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
-    thd->exit_cond(old_msg);
-    if (thd->killed)
-      return;
-
-    mysql_mutex_lock(&rli->pending_jobs_lock);
-
-    new_pend_size= rli->mts_pending_jobs_size + ev_size;
-  }
-  rli->pending_jobs++;
-  rli->mts_pending_jobs_size= new_pend_size;
-  rli->stmt_jobs++;
-
-  mysql_mutex_unlock(&rli->pending_jobs_lock);
-
-  // sleep while all queue lengths are gt Underrun
-  // sleep time lasts the longer the further WQ:s shift to Overrun
-  // Workers report their U,O status
-
-  if (rli->mts_wqs_underrun_w_id != (ulong) -1)
-  {
-    // todo: experiment with weight to get a good approximation formula
-    ulong nap_weight= rli->mts_wqs_overrun + 1;
-    my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
-    rli->mts_wqs_underrun_cnt++;
-  }
-
-  ret= -1;
-
-  mysql_mutex_lock(&w->jobs_lock);
-
-  // possible WQ overfill
-  while (w->running_status == Slave_worker::RUNNING && !thd->killed &&
-         (ret= en_queue(&w->jobs, job_item)) == -1)
-  {
-    const char *old_msg;
-    const char info_format[]=
-      "Waiting for Slave Worker %d queue: max len %lu, actual len %lu";
-    char wait_info[sizeof(info_format) + 4*sizeof(w->id) +
-                   4*sizeof(w->jobs.s) + 4*sizeof(w->jobs.len)];
-    
-    sprintf(wait_info, info_format, w->id, w->jobs.s, w->jobs.len);
-    old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock, wait_info);
-    w->jobs.overfill= TRUE;
-    w->jobs.waited_overfill++;
-    rli->mts_wqs_overfill_cnt++;
-    mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
-    thd->exit_cond(old_msg);
-    
-    mysql_mutex_lock(&w->jobs_lock);
-  }
-  if (ret != -1)
-  {
-    w->curr_jobs++;
-    if (w->jobs.len == 1)
-      mysql_cond_signal(&w->jobs_cond);
-    
-    mysql_mutex_unlock(&w->jobs_lock);
-  }
-  else
-  {
-    mysql_mutex_unlock(&w->jobs_lock);
-
-    mysql_mutex_lock(&rli->pending_jobs_lock);
-    rli->pending_jobs--;                  // roll back of the prev incr
-    rli->mts_pending_jobs_size -= ev_size;
-    mysql_mutex_unlock(&rli->pending_jobs_lock);
-  }
-}
-
 /**
    Scheduling event to execute in parallel or execute it directly.
    In MTS case the event gets associated with either Coordinator or a
@@ -2942,7 +2762,7 @@ int Log_event::apply_event(Relay_log_inf
   c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
 
   worker= (Relay_log_info*)
-    (c_rli->last_assigned_worker= get_slave_worker_id(c_rli));
+    (c_rli->last_assigned_worker= get_slave_worker(c_rli));
 
 #ifndef DBUG_OFF
   if (c_rli->last_assigned_worker)
@@ -2971,231 +2791,6 @@ err:
               0 : -1);
 }
 
-
-/**
-   Worker's routine to wait for a new assignement in its
-   private queue.
-   
-   @return NULL failure or
-           a-pointer to an item.
-*/
-struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item)
-{
-  THD *thd= w->info_thd;
-
-  mysql_mutex_lock(&w->jobs_lock);
-
-  while (!job_item->data && !thd->killed &&
-         w->running_status == Slave_worker::RUNNING)
-  {
-    const char *old_msg;
-
-    head_queue(&w->jobs, job_item);
-    if (job_item->data == NULL)
-    {
-      w->wait_jobs++;
-      old_msg= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
-                               "Waiting for an event from sql thread");
-      mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
-      thd->exit_cond(old_msg);
-      mysql_mutex_lock(&w->jobs_lock);
-    }
-  }
-  if (job_item->data)
-    w->curr_jobs--;
-
-  mysql_mutex_unlock(&w->jobs_lock);
-
-  thd_proc_info(w->info_thd, "Executing event");
-  return job_item;
-}
-
-
-/**
-  MTS worker main routine.
-  The worker thread waits for an event, execute it, fixes statistics counters.
-
-  @note the function maintains CGEP and modifies APH, and causes
-        modification of GAQ.
-
-  @return 0 success 
-         -1 got killed or an error happened during appying
-*/
-int slave_worker_exec_job(Slave_worker *w, Relay_log_info *rli)
-{
-  int error= 0;
-  struct slave_job_item item= {NULL}, *job_item= &item;
-  THD *thd= w->info_thd;
-  Log_event *ev= NULL;
-  bool part_event= FALSE;
-
-  DBUG_ENTER("slave_worker_exec_job");
-
-  job_item= pop_jobs_item(w, job_item);
-  if (thd->killed || w->running_status != Slave_worker::RUNNING)
-  {
-    // de-queueing and decrement counters is in the caller's exit branch
-    error= -1;
-    goto err;
-  }
-  ev= static_cast<Log_event*>(job_item->data);
-  thd->server_id = ev->server_id;
-  thd->set_time();
-  thd->lex->current_select= 0;
-  if (!ev->when)
-    ev->when= my_time(0);
-  ev->thd= thd; // todo: assert because up to this point, ev->thd == 0
-
-  DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", w->id, job_item, ev, thd));
-
-  if (ev->starts_group())
-  {
-    w->curr_group_seen_begin= TRUE; // The current group is started with B-event
-  } 
-  else
-  {
-    if ((part_event= ev->contains_partition_info()))
-    {
-      uint num_dbs=  ev->mts_number_dbs();
-      DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
-
-      if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
-        num_dbs= 1;
-
-      DBUG_ASSERT(num_dbs > 0);
-
-      for (uint k= 0; k < num_dbs; k++)
-      {
-        bool found= FALSE;
-
-        for (uint i= 0; i < ep->elements && !found; i++)
-        {
-          found=
-            *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
-            ev->mts_assigned_partitions[k];
-        }
-        if (!found)
-        {
-          /*
-            notice, can't assert
-            DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == w);
-            since entry could be marked as wanted by other worker.
-          */
-          insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
-        }
-      }
-    }
-  }
-  w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
-  error= ev->do_apply_event_worker(w);
-  if (ev->ends_group() || (!w->curr_group_seen_begin && 
-                           /* 
-                              p-events of B/T-less {p,g} group (see
-                              legends of Log_event::get_slave_worker)
-                              obviously can't commit.
-                           */
-                           part_event))
-  {
-    DBUG_PRINT("slave_worker_exec_job:",
-               (" commits GAQ index %lu, last committed  %lu",
-                ev->mts_group_cnt, w->last_group_done_index));
-    w->slave_worker_ends_group(ev, error); /* last done sets post exec */
-
-#ifndef DBUG_OFF
-    w->processed_group++;
-    DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group "
-               "%u processed %u debug %d\n", w->id, mts_checkpoint_group,
-               w->processed_group,
-               DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
-    if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
-        mts_checkpoint_group == w->processed_group)
-    {
-      DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", w->id));
-      while (true) my_sleep(6000000);
-    }
-#endif
-  }
-
-  mysql_mutex_lock(&w->jobs_lock);
-  de_queue(&w->jobs, job_item);
-
-  /* possible overfill */
-  if (w->jobs.len == w->jobs.s - 1 && w->jobs.overfill == TRUE)
-  {
-    w->jobs.overfill= FALSE;
-    // todo: w->hungry_cnt++;
-    mysql_cond_signal(&w->jobs_cond);
-  }
-  mysql_mutex_unlock(&w->jobs_lock);
-
-  /* statistics */
-
-  mysql_mutex_lock(&rli->pending_jobs_lock);
-  rli->pending_jobs--;
-  rli->mts_pending_jobs_size -= ev->data_written;
-  DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
-  
-  // underrun
-  if ((rli->mts_worker_underrun_level * w->jobs.s) / 100 >  w->jobs.len)
-  {
-    rli-> mts_wqs_underrun_w_id= w->id;
-    // todo: w->underrun_cnt++;
-  } else if (rli->mts_wqs_underrun_w_id == w->id)
-  {
-    rli->mts_wqs_underrun_w_id= (ulong) -1;
-  }
-
-  // overrun exploits the underrun level param
-  if (((100 - rli->mts_worker_underrun_level) * w->jobs.s) / 100 < w->jobs.len)
-  {
-    rli->mts_wqs_overrun++;
-    w->wq_overrun_set= TRUE;
-    // todo: w->underrun_cnt++;
-  }
-  else if (w->wq_overrun_set == TRUE)
-  {
-    rli->mts_wqs_overrun--;
-    w->wq_overrun_set= FALSE;
-  }
-
-  DBUG_ASSERT(rli->mts_wqs_overrun >= 0);
-
-  /* coordinator can be waiting */
-  if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
-      rli->mts_wqs_oversize)  // TODO: unit/general test wqs_oversize
-  {
-    rli->mts_wqs_oversize= FALSE;
-    mysql_cond_signal(&rli->pending_jobs_cond);
-  }
-  
-  mysql_mutex_unlock(&rli->pending_jobs_lock);
-
-  w->stmt_jobs++;
-
-err:
-  if (error)
-  {
-    sql_print_information("Worker %lu is exiting: killed %i, error %i, "
-                          "running_status %d",
-                          w->id, thd->killed, thd->is_error(),
-                          w->running_status);
-    w->slave_worker_ends_group(ev, error);
-  }
-  
-  // rows_query_log_event is deleted as a part of the statement cleanup
-
-  // todo: sync_slave_with_master fails when my_sleep(1000) is put here
-
-  if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
-  {
-    w->last_event= ev;
-    delete ev;
-  }
-  
-
-  DBUG_RETURN(error);
-}
-
 #endif
 
 /**************************************************************************

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-06-14 09:27:38 +0000
+++ b/sql/log_event.h	2011-06-27 17:31:45 +0000
@@ -536,6 +536,15 @@ struct sql_ex_info
 */
 #define LOG_EVENT_NO_FILTER_F 0x100
 
+/**
+   MTS: group of events can be marked to force its execution
+   in isolation from any other Workers.
+   Typically that is done for a transaction that contains 
+   a query accessing more than OVER_MAX_DBS_IN_EVENT_MTS db:s.
+   The flag is set ON for an event that terminates its group.
+*/
+#define LOG_EVENT_MTS_ISOLATE_F 0x200
+
 
 /**
   @def OPTIONS_WRITTEN_TO_BIN_LOG
@@ -1330,7 +1339,7 @@ public:
              to be assigned worker;
              M is the max index of the worker pool.
   */
-  Slave_worker *get_slave_worker_id(Relay_log_info *rli);
+  Slave_worker *get_slave_worker(Relay_log_info *rli);
 
   /*
     The method returns a list of updated by the event databases.
@@ -1361,12 +1370,14 @@ public:
                 get_type_code() == QUERY_EVENT ||
                 get_type_code() == EXEC_LOAD_EVENT ||
                 get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
-    m_mts_event_isolated_group= TRUE;
+    flags |= LOG_EVENT_MTS_ISOLATE_F;
   }
+
   /*
-    Verifying whether event is marked to execute in isolation.
+    Verifying whether the terminal event of a group is marked to
+    execute in isolation.
   */
-  virtual bool mts_is_group_isolated() { return m_mts_event_isolated_group; }
+  bool mts_is_group_isolated() { return flags & LOG_EVENT_MTS_ISOLATE_F; }
 
   /**
      Apply the event to the database.
@@ -1501,8 +1512,6 @@ protected:
      non-zero. The caller shall decrease the counter by one.
    */
   virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
-  
-  bool m_mts_event_isolated_group;
 #endif
 };
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-06-22 17:54:23 +0000
+++ b/sql/rpl_rli.cc	2011-06-27 17:31:45 +0000
@@ -112,8 +112,8 @@ void Relay_log_info::init_workers(ulong 
     Parallel slave parameters initialization is done regardless
     whether the feature is or going to be active or not.
   */
-  trans_jobs= stmt_jobs= pending_jobs= wait_jobs= 0;
-  mts_wqs_underrun_cnt= mts_wqs_overfill_cnt= 0;
+  trans_jobs= stmt_jobs= pending_jobs= wq_size_waits= 0;
+  mts_wq_excess= mts_wq_no_underrun_cnt= mts_wq_overfill_cnt= 0;
 
   my_init_dynamic_array(&workers, sizeof(Slave_worker *), n_workers, 4);
   my_atomic_rwlock_init(&slave_open_temp_tables_lock);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-06-24 12:38:19 +0000
+++ b/sql/rpl_rli.h	2011-06-27 17:31:45 +0000
@@ -432,34 +432,42 @@ public:
   time_t last_event_start_time;
 
   /*
-    WL#5569 MTS-II
+    WL#5569 MTS
+
+    legends:
+    C  - Coordinator;
+    W  - Worker;
+    WQ - Worker Query containing event assignments
   */
   DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
   volatile ulong pending_jobs;
-  ulong trans_jobs, wait_jobs, stmt_jobs; // wait_jobs - waiting times due to the total size
+  ulong stmt_jobs;      // statistics: number of events (statements) processed
+  ulong trans_jobs;     // statistics: number of groups (transactions) processed
+  ulong wq_size_waits;  // number of times C goes to sleep due to WQ:s oversize
   mysql_mutex_t pending_jobs_lock;
   mysql_cond_t pending_jobs_cond;
   ulong       mts_slave_worker_queue_len_max;
   ulonglong   mts_pending_jobs_size;      // actual mem usage by WQ:s
   ulonglong   mts_pending_jobs_size_max;  // the max forcing to wait by C
-  bool    mts_wqs_oversize;           // C raises flag to wait some memory's released
+  bool    mts_wq_oversize;           // C raises flag to wait some memory's released
   Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   Slave_committed_queue *gaq;
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
   bool curr_group_isolated;     // current group requires execution in isolation
-  volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
-  volatile long mts_wqs_overrun;   // W to incr and decr
-  ulong mts_wqs_underrun_cnt;  // Coord goes to sleep when senses Workers are content
-  ulong mts_wqs_overfill_cnt;  // Coord waits if a W's queue is full
-  long  mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
+  volatile ulong mts_wq_underrun_w_id;  // Id of a Worker whose queue is getting empty
+  volatile long mts_wq_excess;   // excessive overrun counter; when W increments and decrements it it also marks updates its own wq_overrun_set
+  volatile ulong mts_wq_overrun_cnt; // statistics of all mts_wq_excess increments
+  ulong mts_wq_no_underrun_cnt;  // counts times of C goes to sleep when W:s are filled
+  ulong mts_wq_overfill_cnt;  // counts C waits when a W's queue is full
+  long  mts_worker_underrun_level; // percent of WQ size at which W is considered hungry
   ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
   ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
   ulong slave_parallel_workers;     // the one slave session time number of workers
-  ulong recovery_parallel_workers; // number of workers while recovering.
+  ulong recovery_parallel_workers; // number of workers while recovering
 
   /* 
      A sorted array of Worker current assignements number to provide

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-06-25 14:14:24 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-06-27 17:31:45 +0000
@@ -2,6 +2,7 @@
 #include "sql_priv.h"
 #include "unireg.h"
 #include "rpl_rli_pdb.h"
+#include "rpl_slave.h"
 #include "sql_string.h"
 #include <hash.h>
 
@@ -252,6 +253,8 @@ bool Slave_worker::commit_positions(Log_
   // extract an updated relay-log name to store in Worker's rli.
   if (ptr_g->group_relay_log_name)
   {
+    DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
+                <= sizeof(group_relay_log_name));
     strmake(group_relay_log_name, ptr_g->group_relay_log_name,
             sizeof(group_relay_log_name) - 1);
   }
@@ -515,7 +518,7 @@ static void move_temp_tables_to_entry(TH
 
    @return the pointer to a Worker struct
 */
-Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
+Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
                                db_worker_hash_entry **ptr_entry,
                                bool need_temp_tables, Slave_worker *last_worker)
 {
@@ -753,17 +756,16 @@ Slave_worker *get_least_occupied_worker(
 }
 
 /**
-   Deallocative routine that makes few things in opposite to
-   @c get_slave_worker().
-
-   Affected by the being committed group APH tuples are updated.
-   @c last_group_done_index member is set to the arg value.
-
+   Deallocation routine to cancel out few effects of
+   @c map_db_to_worker().
+   Involved into processing of the group APH tuples are updated.
+   @c last_group_done_index member is set to the GAQ index of
+   the current group.
    CGEP the Worker partition cache is cleaned up.
 
-   TODO: reclaim space if the actual size exceeds the limit.
+   @param ev     a pointer to Log_event
+   @param error  error code after processing the event by caller.
 */
-
 void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
 {
   DBUG_ENTER("Slave_worker::slave_worker_ends_group");
@@ -775,20 +777,10 @@ void Slave_worker::slave_worker_ends_gro
       (Slave_job_group *) dynamic_array_ptr(&c_rli->gaq->Q, gaq_idx);
 
     // first ever group must have relay log name
-    DBUG_ASSERT(last_group_done_index != c_rli->gaq->s ||
+    DBUG_ASSERT(last_group_done_index != c_rli->gaq->size ||
                 ptr_g->group_relay_log_name != NULL);
     DBUG_ASSERT(ptr_g->worker_id == id);
 
-    if (ptr_g->group_relay_log_name != NULL)
-    {
-      // memorizing a new relay-log file name
-
-      DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
-                  <= sizeof(group_relay_log_name));
-
-      strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
-    }
-
     if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
     {
       commit_positions(ev, ptr_g);
@@ -799,6 +791,7 @@ void Slave_worker::slave_worker_ends_gro
 
     ptr_g->group_master_log_pos= group_master_log_pos;
     ptr_g->group_relay_log_pos= group_relay_log_pos;
+
     ptr_g->done= 1;    // GAQ index is available to C now
 
     last_group_done_index= gaq_idx;
@@ -887,29 +880,29 @@ void Slave_worker::slave_worker_ends_gro
 ulong circular_buffer_queue::de_queue(uchar *val)
 {
   ulong ret;
-  if (e == s)
+  if (enter == size)
   {
     DBUG_ASSERT(len == 0);
     return (ulong) -1;
   }
 
-  ret= e;
-  get_dynamic(&Q, val, e);
+  ret= enter;
+  get_dynamic(&Q, val, enter);
   len--;
   
   // pre boundary cond
-  if (a == s)
-    a= e;
-  e= (e + 1) % s;
+  if (avail == size)
+    avail= enter;
+  enter= (enter + 1) % size;
 
   // post boundary cond
-  if (a == e)
-    e= s;
+  if (avail == enter)
+    enter= size;
 
-  DBUG_ASSERT(e == s ||
-              (len == (a >= e)? (a - e) :
-               (s + a - e)));
-  DBUG_ASSERT(a != e);
+  DBUG_ASSERT(enter == size ||
+              (len == (avail >= enter)? (avail - enter) :
+               (size + avail - enter)));
+  DBUG_ASSERT(avail != enter);
 
   return ret;
 }
@@ -919,26 +912,26 @@ ulong circular_buffer_queue::de_queue(uc
 */
 ulong circular_buffer_queue::de_tail(uchar *val)
 {
-  if (e == s)
+  if (enter == size)
   {
     DBUG_ASSERT(len == 0);
     return (ulong) -1;
   }
 
-  a= (e + len - 1) % s;
-  get_dynamic(&Q, val, a);
+  avail= (enter + len - 1) % size;
+  get_dynamic(&Q, val, avail);
   len--;
   
   // post boundary cond
-  if (a == e)
-    e= s;
+  if (avail == enter)
+    enter= size;
 
-  DBUG_ASSERT(e == s ||
-              (len == (a >= e)? (a - e) :
-               (s + a - e)));
-  DBUG_ASSERT(a != e);
+  DBUG_ASSERT(enter == size ||
+              (len == (avail >= enter)? (avail - enter) :
+               (size + avail - enter)));
+  DBUG_ASSERT(avail != enter);
 
-  return a;
+  return avail;
 }
 /** 
     @return the used index at success or -1 when queue is full
@@ -946,33 +939,33 @@ ulong circular_buffer_queue::de_tail(uch
 ulong circular_buffer_queue::en_queue(void *item)
 {
   ulong ret;
-  if (a == s)
+  if (avail == size)
   {
-    DBUG_ASSERT(a == Q.elements);
+    DBUG_ASSERT(avail == Q.elements);
     return (ulong) -1;
   }
 
   // store
 
-  ret= a;
-  set_dynamic(&Q, (uchar*) item, a);
+  ret= avail;
+  set_dynamic(&Q, (uchar*) item, avail);
 
 
   // pre-boundary cond
-  if (e == s)
-    e= a;
+  if (enter == size)
+    enter= avail;
   
-  a= (a + 1) % s;
+  avail= (avail + 1) % size;
   len++;
 
   // post-boundary cond
-  if (a == e)
-    a= s;
+  if (avail == enter)
+    avail= size;
 
-  DBUG_ASSERT(a == e || 
-              len == (a >= e) ?
-              (a - e) : (s + a - e));
-  DBUG_ASSERT(a != e);
+  DBUG_ASSERT(avail == enter || 
+              len == (avail >= enter) ?
+              (avail - enter) : (size + avail - enter));
+  DBUG_ASSERT(avail != enter);
 
   return ret;
 }
@@ -980,13 +973,13 @@ ulong circular_buffer_queue::en_queue(vo
 void* circular_buffer_queue::head_queue()
 {
   uchar *ret= NULL;
-  if (e == s)
+  if (enter == size)
   {
     DBUG_ASSERT(len == 0);
   }
   else
   {
-    get_dynamic(&Q, (uchar*) ret, e);
+    get_dynamic(&Q, (uchar*) ret, enter);
   }
   return (void*) ret;
 }
@@ -1004,16 +997,16 @@ void* circular_buffer_queue::head_queue(
 */
 bool circular_buffer_queue::gt(ulong i, ulong k)
 {
-  DBUG_ASSERT(i < s && k < s);
-  DBUG_ASSERT(a != e);
+  DBUG_ASSERT(i < size && k < size);
+  DBUG_ASSERT(avail != enter);
 
-  if (i >= e)
-    if (k >= e)
+  if (i >= enter)
+    if (k >= enter)
       return i > k;
     else
       return FALSE;
   else
-    if (k >= e)
+    if (k >= enter)
       return TRUE;
     else
       return i > k;
@@ -1024,7 +1017,7 @@ bool Slave_committed_queue::count_done(R
 {
   ulong i, cnt= 0;
 
-  for (i= e; i != a && !empty(); i= (i + 1) % s)
+  for (i= enter; i != avail && !empty(); i= (i + 1) % size)
   {
     Slave_job_group *ptr_g;
 
@@ -1068,7 +1061,7 @@ ulong Slave_committed_queue::move_queue_
 {
   ulong i, cnt= 0;
 
-  for (i= e; i != a && !empty();)
+  for (i= enter; i != avail && !empty();)
   {
     Slave_worker *w_i;
     Slave_job_group *ptr_g, g;
@@ -1087,8 +1080,12 @@ ulong Slave_committed_queue::move_queue_
       The current job has not been processed or it was not
       even assigned, this means there is a gap.
     */
-    if (ptr_g->worker_id == (ulong) -1 || !ptr_g->done)
+    if (ptr_g->worker_id == MTS_WORKER_UNDEF || !ptr_g->done)
       break; /* gap at i'th */
+
+    /* Worker-id domain guard */
+    compile_time_assert(MTS_WORKER_UNDEF > MTS_MAX_WORKERS);
+
     get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
 
     /*
@@ -1142,7 +1139,7 @@ ulong Slave_committed_queue::move_queue_
     set_dynamic(&last_done, &ptr_g->total_seqno, w_i->id);
 
     cnt++;
-    i= (i + 1) % s;
+    i= (i + 1) % size;
   }
 
   return cnt;
@@ -1156,7 +1153,7 @@ ulong Slave_committed_queue::move_queue_
 void Slave_committed_queue::free_dynamic_items()
 {
   ulong i;
-  for (i= e; i != a && !empty(); i= (i + 1) % s)
+  for (i= enter; i != avail && !empty(); i= (i + 1) % size)
   {
     Slave_job_group *ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
     if (ptr_g->group_relay_log_name)
@@ -1274,3 +1271,421 @@ int wait_for_workers_to_finish(Relay_log
 
   DBUG_RETURN(ret);
 }
+
+
+// returns the next available! (TODO: incompatible to circurla_buff method!!!)
+static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
+{
+  if (jobs->avail == jobs->size)
+  {
+    DBUG_ASSERT(jobs->avail == jobs->Q.elements);
+    return -1;
+  }
+
+  // store
+
+  set_dynamic(&jobs->Q, (uchar*) item, jobs->avail);
+
+  // pre-boundary cond
+  if (jobs->enter == jobs->size)
+    jobs->enter= jobs->avail;
+  
+  jobs->avail= (jobs->avail + 1) % jobs->size;
+  jobs->len++;
+
+  // post-boundary cond
+  if (jobs->avail == jobs->enter)
+    jobs->avail= jobs->size;
+  DBUG_ASSERT(jobs->avail == jobs->enter || 
+              jobs->len == (jobs->avail >= jobs->enter) ?
+              (jobs->avail - jobs->enter) : (jobs->size + jobs->avail - jobs->enter));
+  return jobs->avail;
+}
+
+/**
+   return the value of @c data member of the head of the queue.
+*/
+static void * head_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
+{
+  if (jobs->enter == jobs->size)
+  {
+    DBUG_ASSERT(jobs->len == 0);
+    ret->data= NULL;               // todo: move to caller
+    return NULL;
+  }
+  get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+
+  DBUG_ASSERT(ret->data);         // todo: move to caller
+ 
+  return ret;
+}
+
+
+/**
+   return a job item through a struct which point is supplied via argument.
+*/
+Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
+{
+  if (jobs->enter == jobs->size)
+  {
+    DBUG_ASSERT(jobs->len == 0);
+    return NULL;
+  }
+  get_dynamic(&jobs->Q, (uchar*) ret, jobs->enter);
+  jobs->len--;
+  
+  // pre boundary cond
+  if (jobs->avail == jobs->size)
+    jobs->avail= jobs->enter;
+  jobs->enter= (jobs->enter + 1) % jobs->size;
+
+  // post boundary cond
+  if (jobs->avail == jobs->enter)
+    jobs->enter= jobs->size;
+
+  DBUG_ASSERT(jobs->enter == jobs->size ||
+              (jobs->len == (jobs->avail >= jobs->enter)? (jobs->avail - jobs->enter) :
+               (jobs->size + jobs->avail - jobs->enter)));
+
+  return ret;
+}
+
+/**
+   Coordinator enqueues a job item into a Worker private queue.
+
+   @param job_item  a pointer to struct carrying a reference to an event
+   @param worker    a pointer to the assigned Worker struct
+   @param rli       a pointer to Relay_log_info of Coordinator
+*/
+void append_item_to_jobs(slave_job_item *job_item,
+                         Slave_worker *worker, Relay_log_info *rli)
+{
+  THD *thd= rli->info_thd;
+  int ret;
+  ulong ev_size= ((Log_event*) (job_item->data))->data_written;
+  ulonglong new_pend_size;
+
+  DBUG_ASSERT(thd == current_thd);
+  thd_proc_info(thd, "Feeding an event to a worker thread");
+
+  mysql_mutex_lock(&rli->pending_jobs_lock);
+  new_pend_size= rli->mts_pending_jobs_size + ev_size;
+  // C waits basing on *data* sizes in the queues
+  while (new_pend_size > rli->mts_pending_jobs_size_max)
+  {
+    const char *old_msg;
+    const char info_format[]=
+      "Waiting for Slave Workers to free pending events, requested size %lu";
+    char wait_info[sizeof(info_format) + 4*sizeof(new_pend_size)];
+
+    sprintf(wait_info, info_format, new_pend_size);
+    rli->mts_wq_oversize= TRUE;
+    rli->wq_size_waits++; // waiting due to the total size
+    old_msg= thd->enter_cond(&rli->pending_jobs_cond, &rli->pending_jobs_lock,
+                             wait_info);
+    mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
+    thd->exit_cond(old_msg);
+    if (thd->killed)
+      return;
+
+    mysql_mutex_lock(&rli->pending_jobs_lock);
+
+    new_pend_size= rli->mts_pending_jobs_size + ev_size;
+  }
+  rli->pending_jobs++;
+  rli->mts_pending_jobs_size= new_pend_size;
+  rli->stmt_jobs++;
+
+  mysql_mutex_unlock(&rli->pending_jobs_lock);
+
+  /*
+    Sleep unless there is an underrunning Worker.
+  */
+  if (rli->mts_wq_underrun_w_id == MTS_WORKER_UNDEF)
+  {
+    // todo: experiment with weight to get a good approximation formula
+    // The longer Sleep lasts the bigger is excessive overrun counter.
+    ulong nap_weight= rli->mts_wq_excess + 1;
+    my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
+    rli->mts_wq_no_underrun_cnt++;
+  }
+
+  ret= -1;
+
+  mysql_mutex_lock(&worker->jobs_lock);
+
+  // possible WQ overfill
+  while (worker->running_status == Slave_worker::RUNNING && !thd->killed &&
+         (ret= en_queue(&worker->jobs, job_item)) == -1)
+  {
+    const char *old_msg;
+    const char info_format[]=
+      "Waiting for Slave Worker %d queue: max len %lu, actual len %lu";
+    char wait_info[sizeof(info_format) + 4*sizeof(worker->id) +
+                   4*sizeof(worker->jobs.size) + 4*sizeof(worker->jobs.len)];
+    
+    sprintf(wait_info, info_format, worker->id, worker->jobs.size, worker->jobs.len);
+    old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock, wait_info);
+    worker->jobs.overfill= TRUE;
+    worker->jobs.waited_overfill++;
+    rli->mts_wq_overfill_cnt++;
+    mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
+    thd->exit_cond(old_msg);
+    
+    mysql_mutex_lock(&worker->jobs_lock);
+  }
+  if (ret != -1)
+  {
+    worker->curr_jobs++;
+    if (worker->jobs.len == 1)
+      mysql_cond_signal(&worker->jobs_cond);
+    
+    mysql_mutex_unlock(&worker->jobs_lock);
+  }
+  else
+  {
+    mysql_mutex_unlock(&worker->jobs_lock);
+
+    mysql_mutex_lock(&rli->pending_jobs_lock);
+    rli->pending_jobs--;                  // roll back of the prev incr
+    rli->mts_pending_jobs_size -= ev_size;
+    mysql_mutex_unlock(&rli->pending_jobs_lock);
+  }
+}
+
+
+/**
+   Worker's routine to wait for a new assignement through
+   @c append_item_to_jobs()
+
+   @param worker    a pointer to the waiting Worker struct
+   @param job_item  a pointer to struct carrying a reference to an event
+   
+   @return NULL failure or
+           a-pointer to an item.
+*/
+struct slave_job_item* pop_jobs_item(Slave_worker *worker, Slave_job_item *job_item)
+{
+  THD *thd= worker->info_thd;
+
+  mysql_mutex_lock(&worker->jobs_lock);
+
+  while (!job_item->data && !thd->killed &&
+         worker->running_status == Slave_worker::RUNNING)
+  {
+    const char *old_msg;
+
+    head_queue(&worker->jobs, job_item);
+    if (job_item->data == NULL)
+    {
+      worker->wq_empty_waits++;
+      old_msg= thd->enter_cond(&worker->jobs_cond, &worker->jobs_lock,
+                               "Waiting for an event from sql thread");
+      mysql_cond_wait(&worker->jobs_cond, &worker->jobs_lock);
+      thd->exit_cond(old_msg);
+      mysql_mutex_lock(&worker->jobs_lock);
+    }
+  }
+  if (job_item->data)
+    worker->curr_jobs--;
+
+  mysql_mutex_unlock(&worker->jobs_lock);
+
+  thd_proc_info(worker->info_thd, "Executing event");
+  return job_item;
+}
+
+
+/**
+  MTS worker main routine.
+  The worker thread loops in waiting for an event, executing it and 
+  fixing statistics counters.
+
+  @param worker    a pointer to the assigned Worker struct
+  @param rli       a pointer to Relay_log_info of Coordinator
+                   to update statistics.
+
+  @note the function maintains worker's CGEP and modifies APH, updates
+        the current group item in GAQ via @c slave_worker_ends_group().
+
+  @return 0 success 
+         -1 got killed or an error happened during appying
+*/
+int slave_worker_exec_job(Slave_worker *worker, Relay_log_info *rli)
+{
+  int error= 0;
+  struct slave_job_item item= {NULL}, *job_item= &item;
+  THD *thd= worker->info_thd;
+  Log_event *ev= NULL;
+  bool part_event= FALSE;
+
+  DBUG_ENTER("slave_worker_exec_job");
+
+  job_item= pop_jobs_item(worker, job_item);
+  if (thd->killed || worker->running_status != Slave_worker::RUNNING)
+  {
+    // de-queueing and decrement counters is in the caller's exit branch
+    error= -1;
+    goto err;
+  }
+  ev= static_cast<Log_event*>(job_item->data);
+  thd->server_id = ev->server_id;
+  thd->set_time();
+  thd->lex->current_select= 0;
+  if (!ev->when)
+    ev->when= my_time(0);
+  ev->thd= thd; // todo: assert because up to this point, ev->thd == 0
+
+  DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", worker->id, job_item, ev, thd));
+
+  if (ev->starts_group())
+  {
+    worker->curr_group_seen_begin= TRUE; // The current group is started with B-event
+  } 
+  else
+  {
+    if ((part_event= ev->contains_partition_info()))
+    {
+      uint num_dbs=  ev->mts_number_dbs();
+      DYNAMIC_ARRAY *ep= &worker->curr_group_exec_parts;
+
+      if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+        num_dbs= 1;
+
+      DBUG_ASSERT(num_dbs > 0);
+
+      for (uint k= 0; k < num_dbs; k++)
+      {
+        bool found= FALSE;
+
+        for (uint i= 0; i < ep->elements && !found; i++)
+        {
+          found=
+            *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
+            ev->mts_assigned_partitions[k];
+        }
+        if (!found)
+        {
+          /*
+            notice, can't assert
+            DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == worker);
+            since entry could be marked as wanted by other worker.
+          */
+          insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
+        }
+      }
+    }
+  }
+  worker->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
+  error= ev->do_apply_event_worker(worker);
+  if (ev->ends_group() || (!worker->curr_group_seen_begin && 
+                           /* 
+                              p-events of B/T-less {p,g} group (see
+                              legends of Log_event::get_slave_worker)
+                              obviously can't commit.
+                           */
+                           part_event))
+  {
+    DBUG_PRINT("slave_worker_exec_job:",
+               (" commits GAQ index %lu, last committed  %lu",
+                ev->mts_group_cnt, worker->last_group_done_index));
+    worker->slave_worker_ends_group(ev, error); /* last done sets post exec */
+
+#ifndef DBUG_OFF
+    worker->processed_group++;
+    DBUG_PRINT("mts", ("Check_slave_debug_group worker %lu mts_checkpoint_group"
+               " %u processed %u debug %d\n", worker->id, mts_checkpoint_group,
+               worker->processed_group,
+               DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0)));
+    if (DBUG_EVALUATE_IF("check_slave_debug_group", 1, 0) &&
+        mts_checkpoint_group == worker->processed_group)
+    {
+      DBUG_PRINT("mts", ("Putting worker %lu in busy wait.", worker->id));
+      while (true) my_sleep(6000000);
+    }
+#endif
+  }
+
+  mysql_mutex_lock(&worker->jobs_lock);
+  de_queue(&worker->jobs, job_item);
+
+  /* possible overfill */
+  if (worker->jobs.len == worker->jobs.size - 1 && worker->jobs.overfill == TRUE)
+  {
+    worker->jobs.overfill= FALSE;
+    // todo: worker->hungry_cnt++;
+    mysql_cond_signal(&worker->jobs_cond);
+  }
+  mysql_mutex_unlock(&worker->jobs_lock);
+
+  /* statistics */
+
+  /* todo: convert to rwlock/atomic write */
+  mysql_mutex_lock(&rli->pending_jobs_lock);
+
+  rli->pending_jobs--;
+  rli->mts_pending_jobs_size -= ev->data_written;
+  DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
+  
+  // underrun (number of pending assignments is less than underrun level)
+  if ((rli->mts_worker_underrun_level * worker->jobs.size) / 100.0 >
+      worker->jobs.len)
+  {
+    rli->mts_wq_underrun_w_id= worker->id;
+  } else if (rli->mts_wq_underrun_w_id == worker->id)
+  {
+    // reset only own marking
+    rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
+  }
+
+  // overrun is symmetric to underrun. In a sense it's underrun to get to 100%
+  if (((100 - rli->mts_worker_underrun_level) * worker->jobs.size) / 100.0
+      < worker->jobs.len)
+  {
+    rli->mts_wq_excess++;
+    worker->wq_overrun_set= TRUE;
+    rli->mts_wq_overrun_cnt++;
+  }
+  else if (worker->wq_overrun_set == TRUE)
+  {
+    rli->mts_wq_excess--;
+    worker->wq_overrun_set= FALSE;
+  }
+
+  DBUG_ASSERT(rli->mts_wq_excess >= 0);
+
+  /* coordinator can be waiting */
+  if (rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max &&
+      rli->mts_wq_oversize)  // TODO: unit/general test wq_oversize
+  {
+    rli->mts_wq_oversize= FALSE;
+    mysql_cond_signal(&rli->pending_jobs_cond);
+  }
+  
+  mysql_mutex_unlock(&rli->pending_jobs_lock);
+
+  worker->stmt_jobs++;
+
+err:
+  if (error)
+  {
+    sql_print_information("Worker %lu is exiting: killed %i, error %i, "
+                          "running_status %d",
+                          worker->id, thd->killed, thd->is_error(),
+                          worker->running_status);
+    worker->slave_worker_ends_group(ev, error);
+  }
+  
+  // rows_query_log_event is deleted as a part of the statement cleanup
+
+  // todo: sync_slave_with_master fails when my_sleep(1000) is put here
+
+  if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+  {
+    worker->last_event= ev;
+    delete ev;
+  }
+  
+
+  DBUG_RETURN(error);
+}

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-06-25 14:14:24 +0000
+++ b/sql/rpl_rli_pdb.h	2011-06-27 17:31:45 +0000
@@ -32,7 +32,7 @@ typedef struct st_db_worker_hash_entry
 
 bool init_hash_workers(ulong slave_parallel_workers);
 void destroy_hash_workers(Relay_log_info*);
-Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
+Slave_worker *map_db_to_worker(const char *dbname, Relay_log_info *rli,
                                db_worker_hash_entry **ptr_entry,
                                bool need_temp_tables, Slave_worker *w);
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
@@ -62,17 +62,17 @@ class circular_buffer_queue
 public:
 
   DYNAMIC_ARRAY Q;
-  ulong s;              // the Size of the queue in terms of element
-  ulong a;              // first Available index to append at (next to tail)
-  ulong e;              // the head index
-  volatile ulong len;   // it is also queried to compute least occupied
+  ulong size;           // the Size of the queue in terms of element
+  ulong avail;          // first Available index to append at (next to tail)
+  ulong enter;          // the head index
+  volatile ulong len;   // actual length
   bool inited_queue;
 
   circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
-    s(max), a(0), e(max), len(0), inited_queue(FALSE)
+    size(max), avail(0), enter(max), len(0), inited_queue(FALSE)
   {
-    DBUG_ASSERT(s < ULONG_MAX);
-    if (!my_init_dynamic_array(&Q, el_size, s, alloc_inc))
+    DBUG_ASSERT(size < ULONG_MAX);
+    if (!my_init_dynamic_array(&Q, el_size, size, alloc_inc))
       inited_queue= TRUE;
   }
   circular_buffer_queue () : inited_queue(FALSE) {}
@@ -111,9 +111,9 @@ public:
   bool   gt(ulong i, ulong k); // comparision of ordering of two entities
   /* index is within the valid range */
   bool in(ulong k) { return !empty() && 
-      (e > a ? (k >= e || k < a) : (k >= e && k < a)); }
-  bool empty() { return e == s; }
-  bool full() { return a == s; }
+      (enter > avail ? (k >= enter || k < avail) : (k >= enter && k < avail)); }
+  bool empty() { return enter == size; }
+  bool full() { return avail == size; }
 };
 
 typedef struct st_slave_job_group
@@ -237,26 +237,23 @@ public:
   DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
 
   bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
-  // @c last_group_done_index is for recovery, although can be viewed
-  //    as statistics as well.
-  // C marks a T-event with the incremented group_cnt that is
-  // an index in GAQ; W stores it at the event execution. 
-  // C polls the value periodically to maintain an array
-  // of the indexes in order to progress on GAQ's lwm, see @c next_event().
-  // see @c Log_event::group_cnt.
-  volatile ulong last_group_done_index; // it's index in GAQ
-
   List<Log_event> data_in_use; // events are still in use by SQL thread
   ulong id;
   TABLE *current_table;
 
   // rbr
-  RPL_TABLE_LIST *tables_to_lock;           /* RBR: Tables to lock  */
-  uint tables_to_lock_count;        /* RBR: Count of tables to lock */
-  table_mapping m_table_map;      /* RBR: Mapping table-id to table */
+  // RPL_TABLE_LIST *tables_to_lock;           /* RBR: Tables to lock  */
+  // uint tables_to_lock_count;        /* RBR: Count of tables to lock */
+  // table_mapping m_table_map;      /* RBR: Mapping table-id to table */
 
   // statictics
-  ulong wait_jobs;  // to gather statistics how many times got idle
+
+  /*
+    @c last_group_done_index is for statistics
+    to mean the index in GAQ of the last processed group.
+  */
+  volatile ulong last_group_done_index; // it's index in GAQ
+  ulong wq_empty_waits;  // to gather statistics how many times got idle
   ulong stmt_jobs;  // how many jobs per stmt
   ulong trans_jobs;  // how many jobs per trns
   volatile int curr_jobs; // the current assignments

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-06-27 12:12:52 +0000
+++ b/sql/rpl_slave.cc	2011-06-27 17:31:45 +0000
@@ -3830,7 +3830,7 @@ pthread_handler_t handle_slave_worker(vo
                         "events processed = %lu "
                         "hungry waits = %lu "
                         "priv queue overfills = %llu "
-                        ,w->id, w->stmt_jobs, w->wait_jobs, w->jobs.waited_overfill);
+                        ,w->id, w->stmt_jobs, w->wq_size_waits, w->jobs.waited_overfill);
   mysql_cond_signal(&w->jobs_cond);  // famous last goodbye
 
   mysql_mutex_unlock(&w->jobs_lock);
@@ -4249,22 +4249,22 @@ int slave_start_single_worker(Relay_log_
   w->bitmap_shifted= 0;
   w->workers= rli->workers; // shallow copying is sufficient
   w->this_worker= w;
-  w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
+  w->wq_size_waits= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
   w->id= i;
   w->current_table= NULL;
   w->usage_partition= 0;
-  w->last_group_done_index= rli->gaq->s; // out of range
+  w->last_group_done_index= rli->gaq->size; // out of range
 
-  w->jobs.a= 0;
+  w->jobs.avail= 0;
   w->jobs.len= 0;
   w->jobs.overfill= FALSE;    //  todo: move into Slave_jobs_queue constructor
   w->jobs.waited_overfill= 0;
-  w->jobs.e= w->jobs.s= rli->mts_slave_worker_queue_len_max;
-  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0);
-  for (k= 0; k < w->jobs.s; k++)
+  w->jobs.enter= w->jobs.size= rli->mts_slave_worker_queue_len_max;
+  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.size, 0);
+  for (k= 0; k < w->jobs.size; k++)
     insert_dynamic(&w->jobs.Q, (uchar*) &empty);
   
-  DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
+  DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size);
   
   w->wq_overrun_set= FALSE;
   set_dynamic(&rli->workers, (uchar*) &w, i);
@@ -4338,9 +4338,10 @@ int slave_start_workers(Relay_log_info *
 
   rli->mts_pending_jobs_size= 0;
   rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
-  rli->mts_wqs_underrun_w_id= (ulong) -1;
-  rli->mts_wqs_overrun= 0;
-  rli->mts_wqs_oversize= FALSE;
+  rli->mts_wq_underrun_w_id= MTS_WORKER_UNDEF;
+  rli->mts_wq_excess= 0;
+  rli->mts_wq_overrun_cnt= 0;
+  rli->mts_wq_oversize= FALSE;
   rli->mts_coordinator_basic_nap= mts_coordinator_basic_nap;
   rli->mts_worker_underrun_level= mts_worker_underrun_level;
   rli->mts_total_groups= 0;
@@ -4470,7 +4471,7 @@ void slave_stop_workers(Relay_log_info *
 
     w->end_info();
 
-    DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
+    DBUG_ASSERT(w->jobs.Q.elements == w->jobs.size);
     delete_dynamic(&w->jobs.Q);
     delete_dynamic_element(&rli->workers, i);
     delete w;
@@ -4478,10 +4479,13 @@ void slave_stop_workers(Relay_log_info *
 
   sql_print_information("MTS coordinator statistics: "
                         "events processed = %lu "
-                        "waits due a Worker queue full = %lu "
-                        "waits due the total size = %lu "
-                        "sleeps when Workers occupied = %lu "
-                        ,rli->stmt_jobs, rli->mts_wqs_overfill_cnt, rli->wait_jobs, rli->mts_wqs_underrun_cnt);
+                        "Worker queues filled over overrun level = %lu "
+                        "waited due a Worker queue full = %lu "
+                        "waited due the total size = %lu "
+                        "sleept when Workers occupied = %lu ",
+                        rli->stmt_jobs, rli->mts_wq_overrun_cnt,
+                        rli->mts_wq_overfill_cnt, rli->wq_size_waits,
+                        rli->mts_wq_no_underrun_cnt);
 
   DBUG_ASSERT(rli->pending_jobs == 0);
   DBUG_ASSERT(rli->mts_pending_jobs_size == 0);

=== modified file 'sql/rpl_slave.h'
--- a/sql/rpl_slave.h	2011-06-09 15:27:47 +0000
+++ b/sql/rpl_slave.h	2011-06-27 17:31:45 +0000
@@ -55,6 +55,9 @@ typedef enum { SLAVE_THD_IO, SLAVE_THD_S
 
 #define MAX_SLAVE_ERROR    2000
 
+#define MTS_WORKER_UNDEF ((ulong) -1)
+#define MTS_MAX_WORKERS  1024
+
 // Forward declarations
 class Relay_log_info;
 class Master_info;

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2011-06-25 14:14:24 +0000
+++ b/sql/sys_vars.cc	2011-06-27 17:31:45 +0000
@@ -3361,7 +3361,7 @@ static Sys_var_ulong Sys_slave_parallel_
        "slave_parallel_workers",
        "Number of worker threads for executing events in parallel ",
        GLOBAL_VAR(opt_mts_slave_parallel_workers), CMD_LINE(REQUIRED_ARG),
-       VALID_RANGE(0, 1024), DEFAULT(0), BLOCK_SIZE(1));
+       VALID_RANGE(0, MTS_MAX_WORKERS), DEFAULT(0), BLOCK_SIZE(1));
 
 static Sys_var_ulonglong Sys_mts_pending_jobs_size_max(
        "slave_pending_jobs_size_max",


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110627173145-dynwl2o5rr8iynhe.bundle
Thread
bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3324 to 3325) WL#5569Andrei Elkin28 Jun