List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 5 2010 8:04pm
Subject:bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3228 to 3230)
WL#5569 WL#5599
View as plain text  
 3230 Andrei Elkin	2010-12-05
      wl#5569 WL#5599 MTS & recovery
      
      Refining and correcting two wl:s integration.
      The main achievement is events execution status is consistently recorded into 
      the Worker and the central RL recovery tables.
      That was tested manually in rather agressive env where IO was used to reconnect
      randomly and load from Master contained Rotate events.
      
      TODO: 
      
        to fix: rpl.rpl_parallel_conf_limits may not pass
      
        to address: Multi-stmt Query-log-event transaction case (see todo in sources).
        to destruct by Workers their executed events (was deferred until ev->update_pos
        started working).
      
      (Alfranio)
        to deploy mts_checkpoint_routine() call inside the successful event read branch of 
           next_event(). Otherwise no calling happens when Coord is constanly busy with
           read/distribute.

    modified:
      sql/log_event.cc
      sql/rpl_slave.cc
 3229 Andrei Elkin	2010-12-04
      merging from the repo wl5569

    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      sql/log_event.cc
      sql/rpl_rli.cc
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/share/errmsg-utf8.txt
 3228 Alfranio Correia	2010-12-04
      Added mutex to the checkpoint_routine.

    modified:
      sql/rpl_slave.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-02 17:46:46 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-04 17:14:50 +0000
@@ -91,6 +91,11 @@ while($i)
 
 # Exec log position is not accurate in the prototype
 --sleep 2
+--disable_query_log
+--disable_result_log
+###select sleep(300);
+--enable_result_log
+--enable_query_log
 
 sync_slave_with_master;
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-02 18:13:12 +0000
+++ b/sql/log_event.cc	2010-12-05 20:04:17 +0000
@@ -2211,6 +2211,7 @@ Slave_worker *Log_event::get_slave_worke
   if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
   {
     g.master_log_pos= log_pos;
+    g.group_master_log_pos= g.group_relay_log_pos= 0;
     g.group_relay_log_name= NULL;
     g.worker_id= (ulong) -1;
     g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
@@ -2233,7 +2234,7 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(rli->curr_group_da.elements == 1);
 
       // mark the current grup as started with B-event
-      const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= is_b_event;
+      const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= TRUE;
       return NULL;
     } 
     else 
@@ -2284,6 +2285,8 @@ Slave_worker *Log_event::get_slave_worke
     uint i;
     mts_group_cnt= rli->gaq->assigned_group_index;
 
+    // TODO: throw an error when relay-log reading starts from inside of a group!!
+
     if (!worker->relay_log_change_notified)
     {
       /*
@@ -2300,8 +2303,10 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
 
       ptr_g->group_relay_log_name= (char *)
-        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->get_group_relay_log_name()) + 1, MYF(MY_WME));
-      strcpy(ptr_g->group_relay_log_name, const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
+        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+                         get_group_relay_log_name()) + 1, MYF(MY_WME));
+      strcpy(ptr_g->group_relay_log_name,
+             const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
 
       DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
 
@@ -2328,7 +2333,6 @@ Slave_worker *Log_event::get_slave_worke
 
     // reset the B-group marker
     const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
-
     const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE;  // mark for Coord's T-event delete
   }
   
@@ -2416,15 +2420,15 @@ void append_item_to_jobs(slave_job_item 
                          Slave_worker *w, Relay_log_info *rli)
 {
   THD *thd= rli->info_thd;
-  int ret= -1;
-  ulonglong new_pend_size= rli->mts_pending_jobs_size +
-    ((Log_event*) (job_item->data))->data_written;
+  int ret;
+  ulong ev_size= ((Log_event*) (job_item->data))->data_written;
+  ulonglong new_pend_size;
 
   DBUG_ASSERT(thd == current_thd);
   thd_proc_info(thd, "Feeding an event to a worker thread");
 
   mysql_mutex_lock(&rli->pending_jobs_lock);
-
+  new_pend_size= rli->mts_pending_jobs_size + ev_size;
   // C waits basing on *data* sizes in the queues
   while (new_pend_size > rli->mts_pending_jobs_size_max)
   {
@@ -2440,11 +2444,12 @@ void append_item_to_jobs(slave_job_item 
                              wait_info);
     mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
     thd->exit_cond(old_msg);
-    mysql_mutex_lock(&rli->pending_jobs_lock);
     if (thd->killed)
       return;
-    new_pend_size= rli->mts_pending_jobs_size +
-      ((Log_event*) (job_item->data))->data_written;
+
+    mysql_mutex_lock(&rli->pending_jobs_lock);
+
+    new_pend_size= rli->mts_pending_jobs_size + ev_size;
   }
   rli->pending_jobs++;
   rli->mts_pending_jobs_size= new_pend_size;
@@ -2463,6 +2468,8 @@ void append_item_to_jobs(slave_job_item 
     my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
   }
 
+  ret= -1;
+
   mysql_mutex_lock(&w->jobs_lock);
 
   // possible WQ overfill
@@ -2495,9 +2502,10 @@ void append_item_to_jobs(slave_job_item 
   else
   {
     mysql_mutex_unlock(&w->jobs_lock);
+
     mysql_mutex_lock(&rli->pending_jobs_lock);
     rli->pending_jobs--;                  // roll back of the prev incr
-    rli->mts_pending_jobs_size -= new_pend_size;
+    rli->mts_pending_jobs_size -= ev_size;
     mysql_mutex_unlock(&rli->pending_jobs_lock);
   }
 }
@@ -2531,6 +2539,9 @@ int Log_event::apply_event(Relay_log_inf
         DBUG_ASSERT(rli->curr_group_da.elements == 0);
         DBUG_ASSERT(rli->curr_group_seen_begin);
 
+        // TODO: rollback
+        // c_rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
+
         res= ev_begin->do_apply_event(rli);
         delete ev_begin;
         /* B appears to be serial, reset parallel status of group 
@@ -2651,6 +2662,7 @@ int slave_worker_exec_job(Slave_worker *
   job_item= pop_jobs_item(w, job_item);
   if (thd->killed)
   {
+    // de-queueing and decrement counters is in the caller's exit branch
     error= -1;
     goto err;
   }
@@ -2701,9 +2713,7 @@ int slave_worker_exec_job(Slave_worker *
   {
     DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed  %lu", ev->mts_group_cnt, w->last_group_done_index));
 
-    w->slave_worker_ends_group(ev->mts_group_cnt, error); /* last done sets post exec */
-    if (!(ev->get_type_code() == XID_EVENT && w->is_transactional()))
-      w->commit_positions(ev);
+    w->slave_worker_ends_group(ev, error); /* last done sets post exec */
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -2720,6 +2730,7 @@ int slave_worker_exec_job(Slave_worker *
     preserving signatures of existing methods.
     todo: convert update_pos(w->w_rli) -> update_pos(w)
           to remove w_rli w/a
+    TODO: remove ?
   */
   if (!error)
     ev->update_pos(w->w_rli);
@@ -2769,16 +2780,13 @@ int slave_worker_exec_job(Slave_worker *
     mysql_cond_signal(&rli->pending_jobs_cond);
   }
   
-  //DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max);
-  // if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
-  //     rli->pending_jobs == 0)
-  //   mysql_cond_signal(&rli->pending_jobs_cond);
-  
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   w->stmt_jobs++;
+
 err:
 
+  // TODO!!! ANDREI to RESTORE
   // if (!ev)
   //    delete ev;  // after ev->update_pos() event is garbage
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-03 16:56:11 +0000
+++ b/sql/rpl_rli.cc	2010-12-04 17:14:50 +0000
@@ -152,7 +152,6 @@ void Relay_log_info::init_workers(ulong 
 {
   uint wi= 0;
 
-  slave_parallel_workers= n;
   /*
     Parallel slave parameters initialization is done regardless
     whether the feature is or going to be active or not.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-03 10:15:45 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-04 17:14:50 +0000
@@ -138,7 +138,8 @@ bool Slave_worker::write_info(Rpl_info_h
   */
 
   if (to->prepare_info_for_write() ||
-      to->set_info(curr_group_exec_parts) ||
+      //to->set_info(curr_group_exec_parts) ||
+      to->set_info("") ||
       to->set_info(group_relay_log_name) ||
       to->set_info((ulong)group_relay_log_pos) ||
       to->set_info(group_master_log_name) ||
@@ -474,9 +475,10 @@ Slave_worker *get_least_occupied_worker(
    TODO: reclaim space if the actual size exceeds the limit.
 */
 
-void Slave_worker::slave_worker_ends_group(ulong gaq_idx, int error)
+void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
 {
   int i;
+  ulong gaq_idx= ev->mts_group_cnt;
 
   if (!error)
   {
@@ -495,12 +497,27 @@ void Slave_worker::slave_worker_ends_gro
                   <= sizeof(group_relay_log_name));
 
       strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
-      delete ptr_g->group_relay_log_name;  // C allocated
-      ptr_g->group_relay_log_name= NULL;   // mark freed
     }
-    last_group_done_index = gaq_idx;
+
+    // GAQ is updated with the checkpoint info
+    
+    // delete ptr_g->group_relay_log_name;  // C allocated
+    // ptr_g->group_relay_log_name= NULL;   // mark freed
+
+
+    // TODO: as it's same as ev->update_pos(w_rli) remove the latter.
+
+    if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
+      commit_positions(ev);
+
+    ptr_g->group_master_log_pos= group_master_log_pos;
+    ptr_g->group_relay_log_pos= group_relay_log_pos;
+
+    last_group_done_index = gaq_idx;   // GAQ index is available to C now
   }
 
+  // cleanup relating to the last executed group regardless of error
+
   for (i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
   {
     db_worker *entry= NULL;
@@ -531,12 +548,6 @@ void Slave_worker::slave_worker_ends_gro
     }
     else
       DBUG_ASSERT(usage_partition != 0);
-    /*
-     TODO:
-       if U == 0 \and count(APH) > max 
-          delete from APH where U = 0;
-          delete entry;
-    */  
 
     mysql_mutex_unlock(&slave_worker_hash_lock);
 
@@ -659,13 +670,14 @@ bool circular_buffer_queue::gt(ulong i, 
    It's compared first against the polled
    to break out of the loop at once if no progress.
 
-
-   The caller is expected to be the checkpoint handler.
+   The caller is supposed to be the checkpoint handler.
 
    A copy of the last discarded item containing
    the refreshed value of the committed low-water-mark is stored
-   into @c lwm member for further caller's processing.
+   into @c lwm container member for further caller's processing.
 
+   @note dyn-allocated members of Slave_job_group such as
+         group_relay_log_name as freed here.
 
    @return number of discarded items
 */
@@ -675,12 +687,15 @@ ulong Slave_committed_queue::move_queue_
   for (i= e; i != a && !empty();)
   {
     Slave_worker *w_i;
-    Slave_job_group g;
+    Slave_job_group *ptr_g;
     ulong l;
-    get_dynamic(&Q, (uchar *) &g, i);
-    if (g.worker_id == (ulong) -1)
+    char grl_name[FN_REFLEN];
+
+    grl_name[0]= 0;
+    ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
+    if (ptr_g->worker_id == (ulong) -1)
       break; /* the head is not even assigned */
-    get_dynamic(ws, (uchar *) &w_i, g.worker_id);
+    get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
     get_dynamic(&last_done, (uchar *) &l, w_i->id);
 
     DBUG_ASSERT(l <= s);
@@ -691,9 +706,29 @@ ulong Slave_committed_queue::move_queue_
     DBUG_ASSERT(w_i->last_group_done_index >= i ||
                 (((i > a && e > a)  || a == s) && (w_i->last_group_done_index < a)));
 
+    // memorize the last met group_relay_log_name
+    if (ptr_g->group_relay_log_name)
+    {
+      strcpy(grl_name, ptr_g->group_relay_log_name);
+      my_free(ptr_g->group_relay_log_name);
+      ptr_g->group_relay_log_name= NULL;   // mark freed
+    }
+
     if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
     {
-      ulong ind= de_queue((uchar*) &lwm);
+      Slave_job_group g;
+      ulong ind= de_queue((uchar*) &g);
+
+      // stored the memorized name into result struct
+      if (grl_name[0] != 0)
+        strcpy(lwm.group_relay_log_name, grl_name);
+      else
+        lwm.group_relay_log_name[0]= 0;
+
+      DBUG_ASSERT(!ptr_g->group_relay_log_name);
+
+      g.group_relay_log_name= lwm.group_relay_log_name;
+      lwm= g; // the result struct is done for the current iteration
 
       /* todo/fixme: the least occupied sorting out can be triggered here */
       /* e.g 
@@ -706,7 +741,7 @@ ulong Slave_committed_queue::move_queue_
          }
       */
       DBUG_ASSERT(ind == i);
-      DBUG_ASSERT(g.total_seqno == lwm.total_seqno);
+      DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
 
       set_dynamic(&last_done, (uchar*) &i, w_i->id);
     }
@@ -751,7 +786,7 @@ int wait_for_workers_to_finish(Relay_log
       thd->exit_cond(proc_info);
       ret++;
 
-      DBUG_ASSERT(entry->usage == 0);
+      DBUG_ASSERT(entry->usage == 0 || thd->killed);
     }
     else
     {

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-04 17:14:50 +0000
@@ -91,7 +91,9 @@ public:
 
 typedef struct st_slave_job_group
 {
-  my_off_t master_log_pos; 
+  my_off_t master_log_pos;       // B-event log_pos
+  my_off_t group_master_log_pos; // T-event lop_pos filled by W for CheckPoint
+  my_off_t group_relay_log_pos;  // filled by W
 
   /* 
      When RL name changes C allocates and fill in a new name of RL,
@@ -99,11 +101,10 @@ typedef struct st_slave_job_group
      C keeps track of each Worker has been notified on the updating
      to make sure the routine runs once per change.
 
-     W checks the value at commit and memoriezes a not-NULL
-     with prior freeing old one's allocation. The memorized value
-     plays its role at commit until a new has arrived.
+     W checks the value at commit and memoriezes a not-NULL.
+     Freeing unless NULL is left to C at CheckPoint.
   */
-  char     *group_relay_log_name;
+  char     *group_relay_log_name; // The value is last seen relay-log 
   ulong worker_id;
   ulonglong total_seqno;
 } Slave_job_group;
@@ -139,11 +140,14 @@ public:
     my_init_dynamic_array(&last_done, sizeof(s), n, 0);
     for (k= 0; k < n; k++)
       insert_dynamic(&last_done, (uchar*) &s);  // empty for each Worker
+    lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
+    lwm.group_relay_log_name[0]= 0;
   }
 
   ~Slave_committed_queue ()
   { 
     delete_dynamic(&last_done);
+    my_free(lwm.group_relay_log_name);
   }
 
   /* Checkpoint routine refreshes the queue */
@@ -216,7 +220,7 @@ public:
 
   size_t get_number_worker_fields();
 
-  void slave_worker_ends_group(ulong, int);  // CGEP walk through to upd APH
+  void slave_worker_ends_group(Log_event*, int);  // CGEP walk through to upd APH
 
   bool commit_positions(Log_event *evt);
 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-04 15:45:02 +0000
+++ b/sql/rpl_slave.cc	2010-12-05 20:04:17 +0000
@@ -168,7 +168,7 @@ static int terminate_slave_thread(THD *t
                                   bool skip_lock);
 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-bool checkpoint_routine(Relay_log_info *rli);
+bool mts_checkpoint_routine(Relay_log_info *rli);
 
 /*
   Find out which replications threads are running
@@ -2669,10 +2669,25 @@ int apply_event_and_update_pos(Log_event
       See sql/rpl_rli.h for further details.
     */
     int error= 0;
-    if ((rli->curr_group_is_parallel == FALSE && 
-        !(ev->get_type_code() == XID_EVENT && rli->is_transactional())) ||
-        skip_event)
+    if (!rli->is_parallel_exec() ||
+        ev->only_sequential_exec(rli->run_query_in_parallel,
+                                 ev->ends_group() ?
+                                 rli->curr_group_is_parallel :
+                                 rli->curr_group_seen_begin))
+    {
       error= ev->update_pos(rli);
+    }
+    else
+    {
+      DBUG_ASSERT(rli->is_parallel_exec());
+      /* 
+         event_relay_log_pos is an anchor to possible reading restart.
+         It may become lt than group_* value.
+         However event_relay_log_pos does not affect group_relay_log_pos
+         othen that through the sequentially executed events or via checkpoint.
+      */
+      rli->inc_event_relay_log_pos();
+    }
 
 #ifndef DBUG_OFF
     DBUG_PRINT("info", ("update_pos error = %d", error));
@@ -3571,6 +3586,8 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_lock(&rli->pending_jobs_lock);
   rli->pending_jobs -= purge_cnt;
   rli->mts_pending_jobs_size -= purge_size;
+  DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
+
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3593,22 +3610,27 @@ err:
   DBUG_RETURN(0); 
 }
 
-bool checkpoint_routine(Relay_log_info *rli)
+/**
+   Processing rli->gaq to find out the low-water-mark coordinates
+   stored into the cental recovery table.
+
+
+   @return FALSE success, TRUE otherwise
+*/
+bool mts_checkpoint_routine(Relay_log_info *rli)
 {
   bool error= FALSE;
+  ulong cnt;
 
   DBUG_ENTER("checkpoint_routine");
 
-  mysql_mutex_lock(&rli->data_lock); 
-
-  uint i;
-  rli->gaq->move_queue_head(&rli->workers);
+  if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
+    goto end;
 
   /* TODO: 
-     the least occupied sorting out needs moving to the actual
-     checkpoint location - next_event()
+     to turn the least occupied selection in terms of jobs pieces
   */
-  for (i= 0; i < rli->workers.elements; i++)
+  for (uint i= 0; i < rli->workers.elements; i++)
   {
     Slave_worker *w_i;
     get_dynamic(&rli->workers, (uchar *) &w_i, i);
@@ -3616,10 +3638,30 @@ bool checkpoint_routine(Relay_log_info *
   };
   sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
 
+  mysql_mutex_lock(&rli->data_lock);
+
+  // Coordinator::commit_positions() {
+
+  // rli->gaq->lwm contains all but rli->group_master_log_name
+
+  // group_master_log_name is updated only by Coordinator and it can't change
+  // within checkpoint interval because Coordinator flushes the updated value
+  // at once.
+
+  rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
+  rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
+
+  if (rli->gaq->lwm.group_relay_log_name[0] != 0)
+    rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
+
   error= rli->flush_info(TRUE);
 
+  // end of commit_positions
+
   mysql_mutex_unlock(&rli->data_lock);
 
+end:
+  
   // ANDREI NOTIFICATIONS?
   DBUG_RETURN(error);
 }
@@ -3766,7 +3808,7 @@ void slave_stop_workers(Relay_log_info *
   int i;
   THD *thd= rli->info_thd;
 
-  if (rli->workers.elements == 0) 
+  if (rli->slave_parallel_workers == 0) 
     return;
   
   for (i= rli->workers.elements - 1; i >= 0; i--)
@@ -3875,7 +3917,7 @@ pthread_handler_t handle_slave_sql(void 
   pthread_detach_this_thread();
 
   /* mts-II: starting the worker pool */
-  if (slave_start_workers(rli, opt_slave_parallel_workers) != 0)
+  if (slave_start_workers(rli, rli->slave_parallel_workers) != 0)
     goto err;
   
   if (init_slave_thread(thd, SLAVE_THD_SQL))
@@ -5321,7 +5363,7 @@ static Log_event* next_event(Relay_log_i
             ulong diff= diff_timespec(rli->curr_clock, rli->last_clock);
             if (diff > period)
             {
-               checkpoint_routine(rli);
+               mts_checkpoint_routine(rli);
                set_timespec_nsec(rli->last_clock, 0);
             }
             set_timespec_nsec(waittime, period);
@@ -5334,6 +5376,10 @@ static Log_event* next_event(Relay_log_i
         }
         else
         {
+          thd->enter_cond(log_cond, log_lock,
+                          "Slave has read all relay log; "
+                          "waiting for the slave I/O "
+                          "thread to update it");
           rli->relay_log.wait_for_update_relay_log(thd, NULL);
         }
         
@@ -5707,6 +5753,12 @@ int start_slave(THD* thd , Master_info* 
       */
       if (thread_mask & SLAVE_SQL)
       {
+        /*
+          To cache the system var value and used it in the following.
+          The system var can change but not the cached.
+        */
+        mi->rli->slave_parallel_workers= opt_slave_parallel_workers;
+
         mysql_mutex_lock(&mi->rli->data_lock);
 
         if (thd->lex->mi.pos)
@@ -5759,6 +5811,13 @@ int start_slave(THD* thd , Master_info* 
             push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
                          ER_MISSING_SKIP_SLAVE,
                          ER(ER_MISSING_SKIP_SLAVE));
+          if (mi->rli->slave_parallel_workers != 0)
+          {
+            mi->rli->slave_parallel_workers= 0;
+            push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+                         ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE,
+                         ER(ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE));
+          }
         }
 
         mysql_mutex_unlock(&mi->rli->data_lock);

=== modified file 'sql/share/errmsg-utf8.txt'
--- a/sql/share/errmsg-utf8.txt	2010-10-25 10:39:01 +0000
+++ b/sql/share/errmsg-utf8.txt	2010-12-04 17:14:50 +0000
@@ -6399,3 +6399,6 @@ ER_RPL_INFO_DATA_TOO_LONG
   eng "Data for column '%s' too long"
 ER_CANT_LOCK_RPL_INFO_TABLE
         eng "You can't use locks with rpl info tables."
+
+ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE
+  eng "Until condition is not supported in Parallel Slave. Slave is started in the sequential mode."


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101205200417-lezfgpc9q24mezc3.bundle
Thread
bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3228 to 3230)WL#5569 WL#5599Andrei Elkin5 Dec