List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 30 2011 9:38am
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3286) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3286 Andrei Elkin	2011-05-30
      wl#5569 MTS
      
      An intermediate patch to address few issues raised by reviewers.
      To sum up, it's about cleanup and logics simplification of event distribution
      to Worker and consequent actions.
      Some efforts were paid to support Old Master Begin-less group of events.
     @ sql/log_event.cc
        Cleanup and logics simplification in Log_event::get_slave_worker_id(),
        Log_event::apply_event().
        The essense is:
        a. to return back to apply_event_and_update_pos() event
           associated either with the single-threaded
           sql-thread rli, or one of Coord or Worker.
        b. while the beginning of a group and corresponding actions are left
           to Log_event::get_slave_worker_id(),
           other actions including passing the event to a Worker and the final closure
           of the current group is moved into apply_event_and_update_pos().
        
        Correcting Query_log_event::at-,de-tach_temp_tables() to expect the magic "-empty
        string name db partition through which the applier thread receives temp tables.
     @ sql/log_event.h
        More members are added to Log_event 
        a. to associate the event with applier.
        b. to provide marking a B-less group of events (old master, select sf()).
     @ sql/rpl_rli.h
        Removing rli members that aren't necessary any longer.
     @ sql/rpl_rli_pdb.cc
        cleanup, a new assert, and init of an debug-related member.
     @ sql/rpl_rli_pdb.h
        Memorizing last deleted event for debugging purpose.
     @ sql/rpl_slave.cc
        Cleanup and,
        moving append_item_to_jobs() invocation into  apply_event_and_update_pos()
        as well as other actions mentioned in log_event.cc comments;
        changing signature of apply_event_and_update_pos() to return NULL in place of
        referrenced pointer in case the event is handed over to a Worker;
        checking of the pointer value is done in places dealing with update-pos and
        event's destruction.

    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-05-28 10:42:40 +0000
+++ b/sql/log_event.cc	2011-05-30 09:38:24 +0000
@@ -2368,11 +2368,10 @@ bool Log_event::contains_partition_info(
 /**
    General hashing function to compute the id of an applier for
    the current event.
-   The event is marked as belonging to a Worker.
    At computing the id few rules apply depending on partitioning properties
    that the event instance can feature.
 
-   Let's call the properties.
+   Let's call the properties through the following legends:
 
    B - beginning of a group of events (BEGIN query_log_event)
    g - mini-group representative event containing the partition info
@@ -2380,34 +2379,37 @@ bool Log_event::contains_partition_info(
    p - a mini-group internal event that *p*receeding its g-parent
       (int_, rand_, user_ var:s) 
    r - a mini-group internal "regular" event that follows its g-parent
-      (Write, Update, Delete -rows)
+      (Delete, Update, Write -rows)
    S - sequentially applied event (may not be a part of any group).
        Events of this type are determined via @c mts_sequential_exec()
        earlier and don't cause calling this method .
    T - terminator of the group (XID, COMMIT, ROLLBACK)
 
-   Only `g' case requires to compute the assigned Worker id.
-   In `T, r' cases it's @c last_assigned_worker that is one that was
-   assigned at the last `g' processing.
-   In `B' case it's NULL to indicate the Coordinator will skip doing anything
-   more with the event. Its scheduling gets deffered until the following 
-   `g' event names a Worker.
+   Only `g' case really computes the assigned Worker id which must
+   be memorized by the caller and is available through @c rli argument.
+   For instance DUW-rows events are mapped to a Worker previously chosen
+   at assigning of their Table-map parent g-event.
+   In `B' case the assigned Worker is NULL to indicate the Coordinator will
+   postpone scheduling until a following `g' event decides on a Worker.
    
-   @note `p' and g-Query-log-event is not supported yet.
-
-   @note The function can update APH, CGAP objects.
+   A group can consist of multiple events still without explict B
+   event.  This is a case of old master binlog or few corner-cases of
+   the current master version (todo: to fix).  Such group structure is
+   supposed to be {{p_i},g} that is it ends with the first not p-event.
+   Such g-event is marked with set_mts_event_ends_group().
 
+   @note The function can update APH, CGAP, GAQ objects.
+   
    @return a pointer to the Worker stuct or NULL.
 */
 
 Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli)
 {
-  Slave_worker *worker= NULL;
   Slave_job_group g, *ptr_g;
   bool is_b_event;
+  int  num_dbs= 0;
+  Slave_worker *ret_worker= NULL;
 
-  rli->curr_event_is_parallel= TRUE; // event belongs to a Worker
-  
   /* checking partioning properties and perform corresponding actions */
 
   // Beginning of a group designated explicitly with BEGIN
@@ -2457,7 +2459,8 @@ Slave_worker *Log_event::get_slave_worke
 
       // mark the current grup as started with B-event
       rli->curr_group_seen_begin= TRUE;
-      return NULL;
+
+      return ret_worker;
     } 
   }
 
@@ -2466,27 +2469,24 @@ Slave_worker *Log_event::get_slave_worke
   if (contains_partition_info())
   {
     int i= 0;
-    int num_dbs= mts_number_dbs();
+    num_dbs= mts_number_dbs();
     List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
     it++;
 
     if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
     {
-      // Temporary tables of Coordinator are relocated by Worker
+      // provide a hint - Worker with id 0 - to the following assign
       if (!rli->last_assigned_worker)
         rli->last_assigned_worker=
           *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
-
-      if (!rli->curr_group_isolated)
-        (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
-      rli->curr_group_isolated= TRUE;
+      (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
     }
 
     do
     {
       char **ref_cur_db= it.ref();
       
-      if (!(rli->last_assigned_worker=
+      if (!(ret_worker=
             get_slave_worker(*ref_cur_db, rli,
                              &mts_assigned_partitions[i],
                              get_type_code() == QUERY_EVENT)))
@@ -2496,24 +2496,23 @@ Slave_worker *Log_event::get_slave_worke
         { 
           delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
         }
-        return NULL;
+
+        return ret_worker;
       }
 
       DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
-      DBUG_ASSERT(rli->last_assigned_worker ==
-                  mts_assigned_partitions[i]->worker);
+      DBUG_ASSERT(ret_worker == mts_assigned_partitions[i]->worker);
       DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
 
       i++;
     } while (it++);
 
-    worker= rli->last_assigned_worker;
     if ((ptr_g= ((Slave_job_group *)
                  dynamic_array_ptr(&rli->gaq->Q,
-                                   rli->gaq->assigned_group_index)))-> worker_id
+                                   rli->gaq->assigned_group_index)))->worker_id
         == (ulong) -1)
     {
-      ptr_g->worker_id= worker->id;
+      ptr_g->worker_id= ret_worker->id;
       
       DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
     }
@@ -2533,9 +2532,10 @@ Slave_worker *Log_event::get_slave_worke
     // a mini-group internal "regular" event
     if (rli->last_assigned_worker)
     {
-      worker= rli->last_assigned_worker;
+      ret_worker= rli->last_assigned_worker;
       
-      DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 || worker->id == 0);
+      DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 ||
+                  ret_worker->id == 0);
     }
     else // int_, rand_, user_ var:s
     {
@@ -2564,24 +2564,31 @@ Slave_worker *Log_event::get_slave_worke
       {
         DBUG_ASSERT(rli->curr_group_da.elements > 1);
       }
-      return NULL;
+
+      DBUG_ASSERT(!ret_worker);
+
+      return ret_worker;
     }
   }
 
   // the group terminal event (Commit, Xid or a DDL query)
   if (ends_group() || !rli->curr_group_seen_begin)
   {
-    uint i;
-
     // index of GAQ that this terminal event belongs to
     mts_group_cnt= rli->gaq->assigned_group_index;
 
+    if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+      set_mts_event_ends_group();
+
     ptr_g= (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
-    // TODO: throw an error when relay-log reading starts from inside of a group!!
+    DBUG_ASSERT(ret_worker != NULL);
+    
+    // TODO: UNTIL option, throw an error when relay-log reading
+    // starts from inside of a group!!
 
-    if (!worker->relay_log_change_notified)
+    if (!ret_worker->relay_log_change_notified)
     {
       /*
         Prior this event, C rotated the relay log to drop each
@@ -2599,10 +2606,10 @@ Slave_worker *Log_event::get_slave_worke
 
       DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
 
-      worker->relay_log_change_notified= TRUE;
+      ret_worker->relay_log_change_notified= TRUE;
     }
 
-    if (!worker->checkpoint_notified)
+    if (!ret_worker->checkpoint_notified)
     {
       // Worker to dealloc
       // master binlog checkpoint
@@ -2619,24 +2626,13 @@ Slave_worker *Log_event::get_slave_worke
       strcpy(ptr_g->checkpoint_relay_log_name,
              rli->get_group_relay_log_name());
       ptr_g->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
-      worker->checkpoint_notified= TRUE;
+      ret_worker->checkpoint_notified= TRUE;
     }
     ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
     rli->checkpoint_seqno++;
-
-    DBUG_ASSERT(worker != NULL && worker == rli->last_assigned_worker);
-    
-    // CGAP cleanup
-    for (i= rli->curr_group_assigned_parts.elements; i > 0; i--)
-      delete_dynamic_element(&rli->
-                             curr_group_assigned_parts, i - 1);
-    rli->last_assigned_worker= NULL;
-
-    // reset the B-group marker
-    rli->curr_group_seen_begin= FALSE;
   }
   
-  return worker;
+  return ret_worker;
 }
 
 // returns the next available! (TODO: incompatible to circurla_buff method!!!)
@@ -2813,16 +2809,22 @@ void append_item_to_jobs(slave_job_item 
 }
 
 /**
-   scheduling event execution either serially or in parallel
+   Scheduling event to execute in parallel or execute it directly.
+   In MTS case the event gets associated with either Coordinator or a
+   Worker.  A special case of the association is NULL when the Worker
+   can't be decided yet.  In the single threaded sequential mode the
+   event maps to SQL thread rli.
+
+   @return 0 as success, otherwise a failure.
 */
 int Log_event::apply_event(Relay_log_info const *rli)
 {
-  uint i;
   DBUG_ENTER("LOG_EVENT:apply_event");
   Slave_worker *w= NULL;
-  Slave_job_item item= {NULL}, *job_item= &item;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
-  bool parallel= FALSE, async_event= FALSE, seq_event= FALSE, term_event= FALSE;
+  bool parallel= FALSE, async_event= FALSE, seq_event= FALSE;
+
+  worker= c_rli;
 
   if (rli->is_mts_recovery())
   {
@@ -2849,7 +2851,6 @@ int Log_event::apply_event(Relay_log_inf
   {
     if (parallel)
     {
-      c_rli->curr_event_is_parallel= FALSE; // mark event as belonging to Coordinator
       /* 
          There are two classes of events that Coordinator executes
          itself. One e.g the master Rotate requires all Workers to finish up 
@@ -2910,51 +2911,12 @@ int Log_event::apply_event(Relay_log_inf
   DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
               rli->last_assigned_worker);
 
-  // getting Worker's id
-  if ((!(w= get_slave_worker_id(c_rli)) ||
-       DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
+  worker= NULL;
+  c_rli->last_assigned_worker= w= get_slave_worker_id(c_rli);
+  if (!w || DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
     DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
 
-  job_item->data= this;
-
-  DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
-
-  if (rli->curr_group_da.elements > 0)
-  {
-    /*
-      the current event sorted out which partion the current group belongs to.
-      It's time now to processed deferred array events.
-    */
-    for (i= 0; i < rli->curr_group_da.elements; i++)
-    { 
-      Slave_job_item da_item;
-      get_dynamic(&c_rli->curr_group_da, (uchar*) &da_item.data, i);
-      append_item_to_jobs(&da_item, w, c_rli);
-    }
-    if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
-    {
-      // reallocate to less mem
-      
-      DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
-      
-      c_rli->curr_group_da.elements= rli->curr_group_da.max_element;
-      c_rli->curr_group_da.max_element= 0;
-      freeze_size(&c_rli->curr_group_da); // restores max_element
-    }
-    c_rli->curr_group_da.elements= 0;
-  }
-
-  if (c_rli->curr_group_isolated)
-    term_event= ends_group();
-
-  append_item_to_jobs(job_item, w, c_rli);
-
-  if (c_rli->curr_group_isolated && term_event)
-  {
-    // to make sure the isolated group terminates in isolation as well
-    (void) wait_for_workers_to_finish(rli, w);
-    c_rli->curr_group_isolated= FALSE;
-  }
+  worker= (Relay_log_info*) w;
 
   DBUG_RETURN(FALSE);
 }
@@ -3145,8 +3107,15 @@ err:
     w->slave_worker_ends_group(ev, error);
   
   // rows_query_log_event is deleted as a part of the statement cleanup
+
+  // todo: sync_slave_with_master fails when my_sleep(1000) is put here
+
   if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+  {
+    w->last_event= ev;
     delete ev;
+  }
+  
 
   DBUG_RETURN(error);
 }
@@ -3768,7 +3737,7 @@ Query_log_event::Query_log_event(const c
    auto_increment_increment(1), auto_increment_offset(1),
    time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
    table_map_for_update(0), master_data_written(0),
-   mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS)
+   mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS), m_mts_query_ends_group(FALSE)
 {
   ulong data_len;
   uint32 tmp;
@@ -4241,22 +4210,17 @@ void Query_log_event::attach_temp_tables
   if (!mts_is_worker(thd) || !contains_partition_info())
     return;
   
+  // in over max-db:s case just one special parttion is locked
+  int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ?
+              1 : mts_accessed_dbs);
+
   DBUG_ASSERT(!thd->temporary_tables);
 
-  if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
-  {
-    THD *c_thd= mts_get_coordinator_thd();
-    mts_move_temp_tables_to_thd(thd, c_thd->temporary_tables);
-    c_thd->temporary_tables= NULL;
-  }
-  else
+  for (int i= 0; i < parts; i++)
   {
-    for (int i= 0; i < mts_accessed_dbs; i++)
-    {
-      mts_move_temp_tables_to_thd(thd,
-                                  mts_assigned_partitions[i]->temporary_tables);
-      mts_assigned_partitions[i]->temporary_tables= NULL;
-    }
+    mts_move_temp_tables_to_thd(thd,
+                                mts_assigned_partitions[i]->temporary_tables);
+    mts_assigned_partitions[i]->temporary_tables= NULL;
   }
 }
 
@@ -4272,15 +4236,8 @@ void Query_log_event::detach_temp_tables
   if (!mts_is_worker(thd))
     return;
 
-  if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
-  {
-    THD *c_thd= mts_get_coordinator_thd();
-    /* back to coordinator */
-    mts_move_temp_tables_to_thd(c_thd, thd->temporary_tables);
-    thd->temporary_tables=  NULL;
-    return;
-  }
-
+  int parts= ((mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS) ?
+              1 : mts_accessed_dbs);
   /*
     todo: optimize for a case of 
 
@@ -4293,7 +4250,7 @@ void Query_log_event::detach_temp_tables
        unmodified lists provided that the attach_ method does not
        destroy references to them.
   */
-  for (int i= 0; i < mts_accessed_dbs; i++)
+  for (int i= 0; i < parts; i++)
   {
     mts_assigned_partitions[i]->temporary_tables= NULL;
   }
@@ -4303,7 +4260,7 @@ void Query_log_event::detach_temp_tables
     int i;
 
     // find which entry to go
-    for (i= 0; i <  mts_accessed_dbs; i++)
+    for (i= 0; i < parts; i++)
       if (strcmp(table->s->db.str, mts_accessed_db_names[i]) < 0)
         continue;
       else
@@ -4317,7 +4274,7 @@ void Query_log_event::detach_temp_tables
 
   DBUG_ASSERT(!thd->temporary_tables);
 #ifndef DBUG_OFF
-  for (int i= 0; i < mts_accessed_dbs; i++)
+  for (int i= 0; i < parts; i++)
   {
     DBUG_ASSERT(!mts_assigned_partitions[i]->temporary_tables ||
                 !mts_assigned_partitions[i]->temporary_tables->prev);

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-05-27 21:29:14 +0000
+++ b/sql/log_event.h	2011-05-30 09:38:24 +0000
@@ -1038,6 +1038,10 @@ public:
     the event execution. The indexed data represent the Worker progress status.
   */
   ulong mts_group_cnt;
+  /**
+     MTS: associating the event with either an assigned Worker or Coordinator.
+  */
+  Relay_log_info *worker;
 
   /* a copy of the main rli value stored into event to pass to MTS worker rli */
   ulonglong future_event_relay_log_pos;
@@ -1113,6 +1117,9 @@ public:
   */
   virtual uint8 mts_number_dbs() { return 1; }
 
+  virtual void set_mts_event_ends_group() { DBUG_ASSERT(0); }
+  virtual bool get_mts_event_ends_group() { DBUG_ASSERT(0); }
+
 #else
   Log_event() : temp_buf(0) {}
     /* avoid having to link mysqlbinlog against libpthread */
@@ -1886,6 +1893,13 @@ public:
   uchar mts_accessed_dbs;
   char mts_accessed_db_names[MAX_DBS_IN_EVENT_MTS][NAME_LEN];
 
+  /*
+    Event can be indentified as a group terminator and such fact
+    is memoried by the function.
+  */
+  virtual void set_mts_event_ends_group() { m_mts_query_ends_group= TRUE; }
+  virtual bool get_mts_event_ends_group() { return m_mts_query_ends_group; }
+
 #ifdef MYSQL_SERVER
 
   Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
@@ -1988,13 +2002,16 @@ public:        /* !!! Public in this pat
      its self-group.
   */
   bool starts_group() { return !strncmp(query, "BEGIN", q_len); }
-  bool ends_group()
+  virtual bool ends_group()
   {  
     return
       !strncmp(query, "COMMIT", q_len) ||
       (!strncasecmp(query, STRING_WITH_LEN("ROLLBACK"))
        && strncasecmp(query, STRING_WITH_LEN("ROLLBACK TO ")));
   }
+private:
+  
+  bool m_mts_query_ends_group;
 };
 
 
@@ -2662,7 +2679,7 @@ class Xid_log_event: public Log_event
   bool write(IO_CACHE* file);
 #endif
   bool is_valid() const { return 1; }
-  bool ends_group() { return TRUE; }
+  virtual bool ends_group() { return TRUE; }
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-05-28 10:42:40 +0000
+++ b/sql/rpl_rli.h	2011-05-30 09:38:24 +0000
@@ -445,7 +445,6 @@ public:
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
-  bool curr_group_isolated;     // Trans is exec:d by Worker but in exclusive env
   volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
   volatile long mts_wqs_overrun;   // W to incr and decr
   ulong mts_wqs_underrun_cnt;  // Coord goes to sleep when senses Workers are content
@@ -454,8 +453,6 @@ public:
   ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
- 
-  bool curr_event_is_parallel; // if FALSE the current event is processed by C
   ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
   ulong slave_parallel_workers;     // the one slave session time number of workers
   ulong recovery_parallel_workers; // number of workers while recovering.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-05-28 08:19:43 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-05-30 09:38:24 +0000
@@ -31,7 +31,7 @@ Slave_worker::Slave_worker(const char* t
                            Relay_log_info *rli)
   : Relay_log_info(FALSE), c_rli(rli),
     checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
-    inited_group_execed(0), running_status(FALSE)
+    inited_group_execed(0), running_status(FALSE), last_event(NULL)
 {
   checkpoint_relay_log_name[0]= 0;
   checkpoint_master_log_name[0]= 0;
@@ -462,6 +462,7 @@ static void move_temp_tables_to_entry(TH
 
    @note modifies  CGAP, APH and unlinks @c dbname -keyd temp tables 
          from C's thd->temporary_tables to move them into the entry record.
+         Caller can opt for a Worker via setting rli->last_assigned_worker.
 
    @return the pointer to a Worker struct
 */
@@ -649,6 +650,9 @@ Slave_worker *get_slave_worker(const cha
 #ifndef DBUG_OFF      
     else
     {
+      // all entries must have been emptied from temps by the caller
+      DBUG_ASSERT(entry->db_len != 0);
+
       for (TABLE *table= thd->temporary_tables; table; table= table->next)
       {
         DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-05-28 10:42:40 +0000
+++ b/sql/rpl_rli_pdb.h	2011-05-30 09:38:24 +0000
@@ -280,6 +280,7 @@ public:
   MY_BITMAP group_execed;
   bool inited_group_execed;
   volatile bool  running_status; // TRUE when Worker is read-exec loop
+  Log_event *last_event;
 
   int init_info();
   void end_info();

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-05-28 10:42:40 +0000
+++ b/sql/rpl_slave.cc	2011-05-30 09:38:24 +0000
@@ -80,6 +80,9 @@ ulonglong relay_log_space_limit = 0;
 const char *relay_log_index= 0;
 const char *relay_log_basename= 0;
 
+void append_item_to_jobs(slave_job_item *job_item,
+                         Slave_worker *w, Relay_log_info *rli);
+
 /*
   When slave thread exits, we need to remember the temporary tables so we
   can re-use them on slave start.
@@ -2748,7 +2751,7 @@ int ulong_cmp(ulong *id1, ulong *id2)
 
    - Reports errors as needed.
 
-  @param ev The event to apply.
+  @param ptr_ev a pointer to a reference to the event to apply.
 
   @param thd The client thread that executes the event (i.e., the
   slave sql thread if called from a replication slave, or the client
@@ -2758,6 +2761,9 @@ int ulong_cmp(ulong *id1, ulong *id2)
   a replication slave, or the client's thd->rli_fake if called to
   execute a BINLOG statement).
 
+  @note MTS can store NULL to @c ptr_ev location to indicate
+        the event is taken over by a Worker.
+
   @retval 0 OK.
 
   @retval 1 Error calling ev->apply_event().
@@ -2765,10 +2771,11 @@ int ulong_cmp(ulong *id1, ulong *id2)
   @retval 2 No error calling ev->apply_event(), but error calling
   ev->update_pos().
 */
-int apply_event_and_update_pos(Log_event* ev, THD* thd, Relay_log_info* rli)
+int apply_event_and_update_pos(Log_event** ptr_ev, THD* thd, Relay_log_info* rli)
 {
   int exec_res= 0;
   bool skip_event= FALSE;
+  Log_event *ev= *ptr_ev;
 
   DBUG_ENTER("apply_event_and_update_pos");
 
@@ -2823,6 +2830,75 @@ int apply_event_and_update_pos(Log_event
     if (sql_delay_event(ev, thd, rli))
       DBUG_RETURN(0);
     exec_res= ev->apply_event(rli);
+
+    if (!exec_res && (ev->worker != rli))
+    {
+      if (ev->worker)
+      {
+        Slave_job_item item= {ev}, *job_item= &item;
+        Slave_worker *w= (Slave_worker *) ev->worker;
+        bool need_sync=
+          (ev->mts_number_dbs() == OVER_MAX_DBS_IN_EVENT_MTS) &&
+          ev->get_mts_event_ends_group();
+
+        DBUG_ASSERT(!(ev->ends_group() || !rli->curr_group_seen_begin) ||
+                    ((Slave_worker*) ev->worker) == rli->last_assigned_worker);
+
+        DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
+
+        // Reset mts in-group state
+        if (ev->ends_group() || !rli->curr_group_seen_begin)
+        {
+          // CGAP cleanup
+          for (uint i= rli->curr_group_assigned_parts.elements; i > 0; i--)
+            delete_dynamic_element(&rli->
+                                   curr_group_assigned_parts, i - 1);
+          // reset the B-group marker
+          rli->curr_group_seen_begin= FALSE;
+          rli->last_assigned_worker= NULL;
+        }
+
+        if (rli->curr_group_da.elements > 0)
+        {
+          /*
+            the current event sorted out which partion the current group belongs to.
+            It's time now to processed deferred array events.
+          */
+          for (uint i= 0; i < rli->curr_group_da.elements; i++)
+          { 
+            Slave_job_item da_item;
+            get_dynamic(&rli->curr_group_da, (uchar*) &da_item.data, i);
+            append_item_to_jobs(&da_item, w, rli);
+          }
+          if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
+          {
+            // reallocate to less mem
+            
+            DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
+          
+            rli->curr_group_da.elements= rli->curr_group_da.max_element;
+            rli->curr_group_da.max_element= 0;
+            freeze_size(&rli->curr_group_da); // restores max_element
+          }
+          rli->curr_group_da.elements= 0;
+        }
+
+        //job_item->data= ev;
+        /* Notice `ev' instance can be destoyed after `append()' */
+        append_item_to_jobs(job_item, w, rli);
+
+        if (need_sync)
+        {
+          /*
+            combination of over-max db:s and end of the current group
+            forces to wait for the group completion by the assigned worker.
+          */
+          (void) wait_for_workers_to_finish(rli, w);
+        }
+
+      }
+      *ptr_ev= NULL; // announcing the event is passed to w-worker
+    }
   }
   else
     mysql_mutex_unlock(&rli->data_lock);
@@ -2842,7 +2918,9 @@ int apply_event_and_update_pos(Log_event
       See sql/rpl_rli.h for further details.
     */
     int error= 0;
-    if (skip_event || !rli->is_parallel_exec() || !rli->curr_event_is_parallel)
+    if (*ptr_ev &&
+        (!(ev->get_type_code() == XID_EVENT && rli->is_transactional()) ||
+         skip_event))
     {
 #ifndef DBUG_OFF
       /*
@@ -2958,7 +3036,7 @@ static int exec_relay_log_event(THD* thd
    */
   mysql_mutex_lock(&rli->data_lock);
 
-  Log_event * ev = next_event(rli);
+  Log_event *ev = next_event(rli), **ptr_ev= &ev;
 
   DBUG_ASSERT(rli->info_thd==thd);
 
@@ -3032,12 +3110,13 @@ static int exec_relay_log_event(THD* thd
                       };);
     }
 
-    exec_res= apply_event_and_update_pos(ev, thd, rli);
+    /* ptr_ev can change to NULL indicating MTS coorinator passed to a Worker */
+    exec_res= apply_event_and_update_pos(ptr_ev, thd, rli);
 
-    if ((!rli->is_parallel_exec() || !rli->curr_event_is_parallel))
+    if (*ptr_ev)
     {
-      DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_event_is_parallel ||
-                  ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
+      DBUG_ASSERT(*ptr_ev == ev); // event remains to belong to Coordinator
+
       /*
         Format_description_log_event should not be deleted because it will be
         used to read info about the relay log's format; it will be deleted when
@@ -4176,8 +4255,6 @@ int slave_start_workers(Relay_log_info *
   rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
   rli->mts_total_groups= 0;
   rli->curr_group_seen_begin= FALSE;
-  rli->curr_event_is_parallel= FALSE;
-  rli->curr_group_isolated= FALSE;
   rli->checkpoint_seqno= 0;
   /*
     dyn memory to consume by Coordinator per event


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110530093824-okvolsssp7rb22qd.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3286) WL#5569Andrei Elkin31 May