List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:June 5 2011 5:02pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3281) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3281 Andrei Elkin	2011-06-05
      wl#5569 MTS
      
      More cleanup, fixes due to found issues when running tests, some improvements
      incl in stopping Workers to make routine to distinguish between killed and gracefully stopped
      cases so in the end STOP SLAVE will guarantee consistent state (some todo remains still).
     @ mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
        decreasing execution time.
     @ mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test
        Marking the test as limited to Single-Thread-Slave.
     @ mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
        Marking the test as limited to Single-Thread-Slave.
     @ mysql-test/suite/rpl/t/rpl_slave_skip.test
        Marking the test as limited to Single-Thread-Slave.
     @ sql/log_event.cc
        addressing few reviewing comments;
        asserting do_update_pos() can't run by Workers;
        cleaning up and separating Slave_worker *Log_event::get_slave_worker_id()
        and its caller's interest to rli-> last_assigned_worker;
        Deploying MTS group status marking in Log_event::apply_event();
        Making Worker's exec loop break to obey to a new Worker's running status too;
        Deploying mts_checkpoint_routine() in Rotate_log_event::do_update_pos()
          (sim action for FD event's handler);
        Fixing relay-log update notification in Log_event::get_slave_worker_id();
     @ sql/log_event.h
        renaming and re-typing of func:s as suggested by reviewer;
        leaving  a todo item for the final cleanup;
        correcting logics of mts_async_exec_by_coordinator();
     @ sql/rpl_rli.cc
        Initialization of a new MTS group status proverty: mts_group_status(MTS_NOT_IN_GROUP);
        asserting Relay_log_info::stmt_done() can't be run by Workers;
        deploying mts_checkpoint_routine() alike Rotate_log_event::do_update_pos() this time
          in Relay_log_info::stmt_done() to cover FD-event case and consulting mts_group_status
          in order to decide which branch to follow;
     @ sql/rpl_rli.h
        Augmenting Relay_log_info with mts_group_status to contain
          MTS group status;
     @ sql/rpl_rli_pdb.cc
        Slave_worker::commit_positions() is fixed to carry update relay-log info
        further to the following checkpoint routine action;
        Slave_worker *get_slave_worker() was cleaned, interfaces improved,
          few asserts corrected;
        Slave_worker::slave_worker_ends_group() cleaned a bit, and now frees extra
        memory of CGEP dynarray.
        wait_for_workers_to_finish() is made to set the Coordinator's state as not
        in MTS group after synchronization with all workers;
     @ sql/rpl_rli_pdb.h
        Slave_jobs_queue is augmented with  running_status member.
     @ sql/rpl_slave.cc
        apply_event_and_update_pos(): corrects asserts, synch with *all* Workers
        at the end of dynamically marked as End of group event (mts_is_event_isolated() -> TRUE);
        exec_relay_log_event(): correts NULL event read out case;
        slave_stop_workers(): simplifying logics of stopping Workers, to mark them with 
          w->running_status= Slave_worker::KILLED instead of killing workers' thd.
        
        slave_stop_workers() finilizes reset of Coordinator's state with
         rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP
        to make sure a next restart will proceed with the reset value.

    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
      mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test
      mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
      mysql-test/suite/rpl/t/rpl_slave_skip.test
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test	2011-05-06 18:33:32 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test	2011-06-05 17:01:51 +0000
@@ -6,7 +6,7 @@
 # load volume parameter
 #
 
-let $iter= 16;
+let $iter= 08;
 let $tables= 4;
 let $wk_i_queries= 4;
 let $wk_m_queries= 0;

=== modified file 'mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test'
--- a/mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test	2011-06-05 17:01:51 +0000
@@ -1,6 +1,8 @@
 source include/master-slave.inc;
 source include/have_innodb.inc;
 source include/have_binlog_format_statement.inc;
+# UNTIL is not supported yet (TODO: support and remove the guard)
+-- source include/not_mts_slave_parallel_workers.inc
 
 connection slave;
 call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT");

=== modified file 'mysql-test/suite/rpl/t/rpl_deadlock_innodb.test'
--- a/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test	2011-05-30 10:05:07 +0000
+++ b/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test	2011-06-05 17:01:51 +0000
@@ -1,6 +1,7 @@
 -- source include/not_ndb_default.inc
 -- source include/have_innodb.inc
-let $engine_type=innodb;
--- source extra/rpl_tests/rpl_deadlock.test
 # --slave-transaction-retries=0 in MTS
 -- source include/not_mts_slave_parallel_workers.inc
+
+let $engine_type=innodb;
+-- source extra/rpl_tests/rpl_deadlock.test

=== modified file 'mysql-test/suite/rpl/t/rpl_slave_skip.test'
--- a/mysql-test/suite/rpl/t/rpl_slave_skip.test	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_slave_skip.test	2011-06-05 17:01:51 +0000
@@ -4,6 +4,9 @@
 # test for MIXED mode.
 source include/have_binlog_format_mixed.inc;
 
+# UNTIL is not supported yet (TODO: support and remove the guard)
+-- source include/not_mts_slave_parallel_workers.inc
+
 source include/master-slave.inc;
 source include/have_innodb.inc;
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-05-30 10:05:07 +0000
+++ b/sql/log_event.cc	2011-06-05 17:01:51 +0000
@@ -783,6 +783,9 @@ int Log_event::do_update_pos(Relay_log_i
 
     Matz: I don't think we will need this check with this refactoring.
   */
+
+  DBUG_ASSERT(!mts_is_worker(rli->info_thd));
+
   if (rli)
     rli->stmt_done(log_pos);
   return 0;                                   // Cannot fail currently
@@ -2357,10 +2360,6 @@ Log_event::continue_group(Relay_log_info
 bool Log_event::contains_partition_info()
 {
   return get_type_code() == TABLE_MAP_EVENT ||
-
-    // todo: Query event is limitly supported
-    // which ev->get_db() yields the session db not the actual db
-      
     (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group()) ||
     (get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
 }
@@ -2473,13 +2472,13 @@ Slave_worker *Log_event::get_slave_worke
     List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
     it++;
 
+    ret_worker= rli->last_assigned_worker;
     if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
     {
-      // provide a hint - Worker with id 0 - to the following assign
-      if (!rli->last_assigned_worker)
-        rli->last_assigned_worker=
-          *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
-      (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
+      // Worker with id 0 to handle serial execution
+      if (!ret_worker)
+        ret_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
+      (void) wait_for_workers_to_finish(rli, ret_worker);
     }
 
     do
@@ -2489,7 +2488,8 @@ Slave_worker *Log_event::get_slave_worke
       if (!(ret_worker=
             get_slave_worker(*ref_cur_db, rli,
                              &mts_assigned_partitions[i],
-                             get_type_code() == QUERY_EVENT)))
+                             get_type_code() == QUERY_EVENT,
+                             ret_worker)))
       {
         // destroy buffered events of the current group prior to exit
         for (uint k= 0; k < rli->curr_group_da.elements; k++)
@@ -2571,14 +2571,16 @@ Slave_worker *Log_event::get_slave_worke
     }
   }
 
-  // the group terminal event (Commit, Xid or a DDL query)
+  // the group terminal event:
+  // Commit, Xid, a DDL query or dml query of B-less group.
   if (ends_group() || !rli->curr_group_seen_begin)
   {
     // index of GAQ that this terminal event belongs to
     mts_group_cnt= rli->gaq->assigned_group_index;
 
+    // special marking for T event of {p,g} B-less group
     if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
-      set_mts_event_ends_group();
+      mts_do_isolate_event();
 
     ptr_g= (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
@@ -2592,9 +2594,9 @@ Slave_worker *Log_event::get_slave_worke
     {
       /*
         Prior this event, C rotated the relay log to drop each
-        Worker's notified flag.
-        Now group terminating event initiates the new name
-        delivery through the current group relaylog slot in GAQ.
+        Worker's notified flag. Now group terminating event initiates
+        the new relay-log (where the current event is from) name
+        delivery to Worker that will receive it in commit_positions().
       */
       DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
 
@@ -2602,7 +2604,7 @@ Slave_worker *Log_event::get_slave_worke
         my_malloc(strlen(rli->
                          get_group_relay_log_name()) + 1, MYF(MY_WME));
       strcpy(ptr_g->group_relay_log_name,
-             rli->get_group_relay_log_name());
+             rli->get_event_relay_log_name());
 
       DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
 
@@ -2846,7 +2848,9 @@ int Log_event::apply_event(Relay_log_inf
   }
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      (async_event= mts_async_exec_by_coordinator(::server_id)) ||
+      (async_event=
+       mts_async_exec_by_coordinator(::server_id, 
+                                     rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) ||
       (seq_event= mts_sequential_exec()))
   {
     if (parallel)
@@ -2866,7 +2870,6 @@ int Log_event::apply_event(Relay_log_inf
           a separator beetwen two master's binlog therefore requiring
           Workers to sync.
         */
-
         if (rli->curr_group_da.elements > 0)
         {
           /* 
@@ -2892,6 +2895,11 @@ int Log_event::apply_event(Relay_log_inf
           Marking sure the event will be executed in sequential mode.
         */
         (void) wait_for_workers_to_finish(rli);
+        /*
+          Given not in-group mark the event handler can invoke checkpoint
+          update routine in the following course.
+        */
+        DBUG_ASSERT(rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP);
 
 #ifndef DBUG_OFF
         /* all Workers are idle as done through wait_for_workers_to_finish */
@@ -2912,6 +2920,8 @@ int Log_event::apply_event(Relay_log_inf
               rli->last_assigned_worker);
 
   worker= NULL;
+  c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
+
   c_rli->last_assigned_worker= w= get_slave_worker_id(c_rli);
   if (!w || DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
     DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
@@ -2935,7 +2945,8 @@ struct slave_job_item* pop_jobs_item(Sla
 
   mysql_mutex_lock(&w->jobs_lock);
 
-  while (!job_item->data && !thd->killed)
+  while (!job_item->data && !thd->killed &&
+         w->running_status == Slave_worker::RUNNING)
   {
     const char *old_msg;
 
@@ -2981,7 +2992,7 @@ int slave_worker_exec_job(Slave_worker *
   DBUG_ENTER("slave_worker_exec_job");
 
   job_item= pop_jobs_item(w, job_item);
-  if (thd->killed)
+  if (thd->killed || w->running_status != Slave_worker::RUNNING)
   {
     // de-queueing and decrement counters is in the caller's exit branch
     error= -1;
@@ -3020,11 +3031,16 @@ int slave_worker_exec_job(Slave_worker *
         for (uint i= 0; i < ep->elements && !found; i++)
         {
           found=
-            (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
+            *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
             ev->mts_assigned_partitions[k];
         }
         if (!found)
         {
+          /*
+            notice, can't assert
+            DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == w);
+            since entry could be marked as wanted by other worker.
+          */
           insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
         }
       }
@@ -3104,7 +3120,13 @@ int slave_worker_exec_job(Slave_worker *
 
 err:
   if (error)
+  {
+    sql_print_information("Worker %lu is exiting: killed %i, error %i, "
+                          "running_status %d",
+                          w->id, thd->killed, thd->is_error(),
+                          w->running_status);
     w->slave_worker_ends_group(ev, error);
+  }
   
   // rows_query_log_event is deleted as a part of the statement cleanup
 
@@ -4192,16 +4214,12 @@ void Query_log_event::print(FILE* file, 
 }
 #endif /* MYSQL_CLIENT */
 
-
-/*
-  Query_log_event::do_apply_event()
-*/
-
 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
 
 /**
    Associating slave Worker thread to a subset of temporary tables
    belonging to db-partitions the event accesses.
+   The pointer if all entries is cleaned.
 
    @param thd   THD instance pointer
 */
@@ -4282,6 +4300,9 @@ void Query_log_event::detach_temp_tables
 #endif
 }
 
+/*
+  Query_log_event::do_apply_event()
+*/
 int Query_log_event::do_apply_event(Relay_log_info const *rli)
 {
   return do_apply_event(rli, query, q_len);
@@ -6484,13 +6505,20 @@ int Rotate_log_event::do_update_pos(Rela
   */
   if ((server_id != ::server_id || rli->replicate_same_server_id) &&
       !is_relay_log_event() &&
-      !rli->is_in_group())
+      ((!rli->is_parallel_exec() && !rli->is_in_group()) ||
+       rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP))
   {
     mysql_mutex_lock(&rli->data_lock);
     DBUG_PRINT("info", ("old group_master_log_name: '%s'  "
                         "old group_master_log_pos: %lu",
                         rli->get_group_master_log_name(),
                         (ulong) rli->get_group_master_log_pos()));
+
+    if (rli->is_parallel_exec())
+    {
+      (void) mts_checkpoint_routine(rli, 0, FALSE, TRUE);  // todo: error branch
+    }
+
     memcpy((void *)rli->get_group_master_log_name(),
            new_log_ident, ident_len + 1);
     rli->notify_group_master_log_name_update();
@@ -6501,8 +6529,10 @@ int Rotate_log_event::do_update_pos(Rela
                         rli->get_group_master_log_name(),
                         (ulong) rli->get_group_master_log_pos()));
     mysql_mutex_unlock(&rli->data_lock);
-    rli->flush_info(TRUE);
-    
+    rli->flush_info(TRUE);  // todo: error branch
+    if (rli->is_parallel_exec())
+      rli->reset_notified_checkpoint();
+
     /*
       Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of
       a master's downgrade from 5.0 to 4.0.
@@ -9247,6 +9277,9 @@ Rows_log_event::do_update_pos(Relay_log_
   DBUG_PRINT("info", ("flags: %s",
                       get_flags(STMT_END_F) ? "STMT_END_F " : ""));
 
+  /* Worker does not execute binlog update position logics */
+  DBUG_ASSERT(!mts_is_worker(rli->info_thd));
+
   if (get_flags(STMT_END_F))
   {
     /*

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-05-30 10:05:07 +0000
+++ b/sql/log_event.h	2011-06-05 17:01:51 +0000
@@ -1117,8 +1117,18 @@ public:
   */
   virtual uint8 mts_number_dbs() { return 1; }
 
-  virtual void set_mts_event_ends_group() { DBUG_ASSERT(0); }
-  virtual bool get_mts_event_ends_group() { DBUG_ASSERT(0); }
+  /*
+    Event can be exceptionally marked to force its execution.
+    in isolation from any other Workers.
+    Other than Query-log-event class should not have any implementation
+    of this method.
+  */
+  virtual void mts_do_isolate_event() { DBUG_ASSERT(0); }
+
+  /*
+    Verifying whether event is marked to execute in isolation.
+  */
+  virtual bool mts_is_event_isolated() { return FALSE; }
 
 #else
   Log_event() : temp_buf(0) {}
@@ -1273,15 +1283,20 @@ public:
   /**
      MST: some events have to be applied by Coordinator concurrently with Workers.
 
+     *TODO*: combine with mts_sequential_exec() to have ternary outcome.
+
      @return TRUE  if that's the case,
              FALSE otherwise.
   */
-  bool mts_async_exec_by_coordinator(ulong slave_server_id)
+  bool mts_async_exec_by_coordinator(ulong slave_server_id, bool mts_in_group)
   {
     return
-      (get_type_code() == FORMAT_DESCRIPTION_EVENT ||
-       get_type_code() == ROTATE_EVENT) &&
-      ((server_id == (uint32) ::server_id) || (log_pos == 0));
+      (get_type_code() == FORMAT_DESCRIPTION_EVENT &&
+       ((server_id == (uint32) ::server_id) || (log_pos == 0)))
+      ||
+      (get_type_code() == ROTATE_EVENT &&
+       ((server_id == (uint32) ::server_id) ||
+        (log_pos == 0 && mts_in_group)));
   }
 
   /**
@@ -1897,8 +1912,8 @@ public:
     Event can be indentified as a group terminator and such fact
     is memoried by the function.
   */
-  virtual void set_mts_event_ends_group() { m_mts_query_ends_group= TRUE; }
-  virtual bool get_mts_event_ends_group() { return m_mts_query_ends_group; }
+  virtual void mts_do_isolate_event() { m_mts_query_ends_group= TRUE; }
+  virtual bool mts_is_event_isolated() { return m_mts_query_ends_group; }
 
 #ifdef MYSQL_SERVER
 
@@ -3085,7 +3100,7 @@ public:
   const char* get_db() { return db; }
 #endif
   /* MTS executes this event sequentially */
-  virtual uchar mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
+  virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli.cc	2011-06-05 17:01:51 +0000
@@ -73,7 +73,7 @@ Relay_log_info::Relay_log_info(bool is_s
    this_worker(NULL), slave_parallel_workers(0),
    recovery_parallel_workers(0),
    checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0),
-   mts_recovery_index(0),
+   mts_recovery_index(0), mts_group_status(MTS_NOT_IN_GROUP),
    sql_delay(0), sql_delay_end(0), m_flags(0)
 {
   DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -189,8 +189,8 @@ void Relay_log_info::reset_notified_rela
    whose coordinates is passed to it through GAQ index.
 
    Worker notices the new checkpoint value at the group commit
-   to reset the current bitmap and set ON a bit number put by C into GAQ index
-   as the first group committed after the new checkpoint.
+   to reset the current bitmap and starts using the clean bitmap
+   indexed from zero of being reset checkpoint_seqno.
 */
 void Relay_log_info::reset_notified_checkpoint()
 {
@@ -991,7 +991,10 @@ bool Relay_log_info::cached_charset_comp
 void Relay_log_info::stmt_done(my_off_t event_master_log_pos)
 {
   clear_flag(IN_STMT);
+
   DBUG_ASSERT(!belongs_to_client());
+  /* Worker does not execute binlog update position logics */
+  DBUG_ASSERT(!mts_is_worker(info_thd));
 
   /*
     If in a transaction, and if the slave supports transactions, just
@@ -1016,20 +1019,22 @@ void Relay_log_info::stmt_done(my_off_t 
     middle of the "transaction". START SLAVE will resume at BEGIN
     while the MyISAM table has already been updated.
   */
-  if ((info_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
+  if ((!is_parallel_exec() && is_in_group()) ||
+      mts_group_status == MTS_IN_GROUP)
+  {
     inc_event_relay_log_pos();
+  }
   else
   {
+    if (is_parallel_exec())
+    {
+      (void) mts_checkpoint_routine(this, 0, FALSE, FALSE);  // Alfranio todo: error branch
+    }
     inc_group_relay_log_pos(event_master_log_pos);
     
     DBUG_ASSERT(this_worker == NULL);
-    
-    flush_info(is_transactional() ? TRUE : FALSE);
 
-    /* 
-       The central recovery commit run in sequential mode forces
-       notification on the defacto new checkpoint.
-    */
+    flush_info(is_transactional() ? TRUE : FALSE); // Alfranio todo: error branch
     if (is_parallel_exec())
       reset_notified_checkpoint();
   }

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli.h	2011-06-05 17:01:51 +0000
@@ -464,8 +464,8 @@ public:
   */
   DYNAMIC_ARRAY least_occupied_workers;
   uint checkpoint_seqno;  // counter of groups executed after the most recent CP
-  uint checkpoint_group;  // counter of groups after which a checkpoint is called.
-  MY_BITMAP recovery_groups;  // bitmap used during recovery.
+  uint checkpoint_group;  // number of groups in one checkpoint interval (period).
+  MY_BITMAP recovery_groups;  // bitmap used during recovery
   ulong mts_recovery_group_cnt; // number of groups to execute at recovery
   ulong mts_recovery_index;     // running index of recoverable groups
   /*
@@ -479,6 +479,18 @@ public:
     While Worker utilize its thd->mem_root, Coordinator adopts a specific mem-root:
   */
   MEM_ROOT mts_coor_mem_root;
+  /*
+    While distibuting events basing on their properties MTS Coordinator
+    changes its mts group status.
+    Transition from NOT_IN to IN happens once an event is scheduled to a Worker.
+    Reverse transition occures when Coordinator requests synchronization with
+    Workers demanding them to complete their assignments.
+  */
+  enum
+  {
+    MTS_NOT_IN_GROUP, /* not in group includes Single-Threaded-Slave */
+    MTS_IN_GROUP      /* an event was scheduled to a Worker */
+  } mts_group_status;
 
   /* most of allocation in the coordinator rli is there */
   void init_workers(ulong);

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-06-05 17:01:51 +0000
@@ -31,7 +31,7 @@ Slave_worker::Slave_worker(const char* t
                            Relay_log_info *rli)
   : Relay_log_info(FALSE), c_rli(rli),
     checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
-    inited_group_execed(0), running_status(FALSE), last_event(NULL)
+    inited_group_execed(0), running_status(NOT_RUNNING), last_event(NULL)
 {
   checkpoint_relay_log_name[0]= 0;
   checkpoint_master_log_name[0]= 0;
@@ -227,6 +227,12 @@ bool Slave_worker::commit_positions(Log_
 
     bitmap_clear_all(&group_execed);
   }
+  // extract an updated relay-log name to store in Worker's rli.
+  if (ptr_g->group_relay_log_name)
+  {
+    strmake(group_relay_log_name, ptr_g->group_relay_log_name,
+            sizeof(group_relay_log_name) - 1);
+  }
   
   bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
   checkpoint_seqno= ptr_g->checkpoint_seqno;
@@ -272,7 +278,8 @@ static void free_entry(db_worker_hash_en
   DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
 
   DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
-  DBUG_ASSERT(entry->usage == 0 || !entry->worker->running_status);
+  DBUG_ASSERT(entry->usage == 0 ||
+              entry->worker->running_status != Slave_worker::RUNNING);
 
   mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
   entry->temporary_tables= NULL;
@@ -452,23 +459,24 @@ static void move_temp_tables_to_entry(TH
      c. updates the APH record to point to the first Worker (naturally, U := 1),
         scheduled the event, and goes back into the parallel mode
 
-   @param  dbname    pointer to c-string containing database name
-                     It can be empty string to indicate specific locking
-                     to faciliate sequential applying.
-   @param  rli       pointer to Coordinators relay-log-info instance
-   @param  ptr_entry reference to a pointer to the resulted entry in
-                     the Assigne Partition Hash where
-                     the entry's pointer is stored at return.
+   @param  dbname      pointer to c-string containing database name
+                       It can be empty string to indicate specific locking
+                       to faciliate sequential applying.
+   @param  rli         pointer to Coordinators relay-log-info instance
+   @param  ptr_entry   reference to a pointer to the resulted entry in
+                       the Assigne Partition Hash where
+                       the entry's pointer is stored at return.
+   @param  last_worker caller opts for this Worker, it must be
+                       rli->last_assigned_worker if one is determined.
 
-   @note modifies  CGAP, APH and unlinks @c dbname -keyd temp tables 
+   @note modifies  CGAP, APH and unlinks @c dbname -keyd temporary tables 
          from C's thd->temporary_tables to move them into the entry record.
-         Caller can opt for a Worker via setting rli->last_assigned_worker.
 
    @return the pointer to a Worker struct
 */
 Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
                                db_worker_hash_entry **ptr_entry,
-                               bool need_temp_tables)
+                               bool need_temp_tables, Slave_worker *last_worker)
 {
   uint i;
   DYNAMIC_ARRAY *workers= &rli->workers;
@@ -476,6 +484,9 @@ Slave_worker *get_slave_worker(const cha
 
   DBUG_ENTER("get_slave_worker");
 
+  DBUG_ASSERT(!rli->last_assigned_worker ||
+              rli->last_assigned_worker == last_worker);
+
   if (!inited_hash_workers)
     DBUG_RETURN(NULL);
 
@@ -495,7 +506,7 @@ Slave_worker *get_slave_worker(const cha
       if (strncmp(entry->db, const_cast<char*>(dbname), dblength) == 0)
       {
         *ptr_entry= entry;
-        DBUG_RETURN(rli->last_assigned_worker);
+        DBUG_RETURN(last_worker);
       }
   }
 
@@ -543,8 +554,8 @@ Slave_worker *get_slave_worker(const cha
       Unless \exists the last assigned Worker, get a free worker based
       on a policy described in the function get_least_occupied_worker().
     */
-    entry->worker= !rli->last_assigned_worker ?
-      get_least_occupied_worker(workers) : rli->last_assigned_worker;
+    entry->worker= (!last_worker) ?
+      get_least_occupied_worker(workers) : last_worker;
     entry->worker->usage_partition++;
 
     mysql_mutex_lock(&slave_worker_hash_lock);
@@ -584,13 +595,12 @@ Slave_worker *get_slave_worker(const cha
     /* There is a record. Either  */
     if (entry->usage == 0)
     {
-      entry->worker= !rli->last_assigned_worker ? 
-        get_least_occupied_worker(workers) : rli->last_assigned_worker;
+      entry->worker= (!last_worker) ? 
+        get_least_occupied_worker(workers) : last_worker;
       entry->worker->usage_partition++;
       entry->usage++;
     }
-    else if (entry->worker == rli->last_assigned_worker ||
-             !rli->last_assigned_worker)
+    else if (entry->worker == last_worker || !last_worker)
     {
 
       DBUG_ASSERT(entry->worker);
@@ -609,11 +619,11 @@ Slave_worker *get_slave_worker(const cha
       char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
                      NAME_LEN + 1];
 
-      DBUG_ASSERT(rli->last_assigned_worker != NULL &&
+      DBUG_ASSERT(last_worker != NULL &&
                   rli->curr_group_assigned_parts.elements > 0);
 
       // future assignenment and marking at the same time
-      entry->worker= rli->last_assigned_worker;
+      entry->worker= last_worker;
 
       sprintf(wait_info, info_format, entry->worker->id, entry->db);
 
@@ -623,7 +633,7 @@ Slave_worker *get_slave_worker(const cha
       thd->exit_cond(proc_info);
       mysql_mutex_lock(&slave_worker_hash_lock);
 
-      DBUG_ASSERT(entry->usage == 0);
+      DBUG_ASSERT(entry->usage == 0 || thd->killed);
 
       entry->usage= 1;
       entry->worker->usage_partition++;
@@ -631,7 +641,7 @@ Slave_worker *get_slave_worker(const cha
   }
 
   /* 
-     relocation belonging to db temp tables from C to W via entry
+     relocation belonging to db temporary tables from C to W via entry
   */
   if (entry->usage == 1 && need_temp_tables)
   {
@@ -651,7 +661,6 @@ Slave_worker *get_slave_worker(const cha
     else
     {
       // all entries must have been emptied from temps by the caller
-      DBUG_ASSERT(entry->db_len != 0);
 
       for (TABLE *table= thd->temporary_tables; table; table= table->next)
       {
@@ -683,7 +692,7 @@ err:
 */
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
 {
-  ulong usage= ULONG_MAX;
+  long usage= LONG_MAX;
   Slave_worker **ptr_current_worker= NULL, *worker= NULL;
   ulong i= 0;
 
@@ -757,29 +766,32 @@ void Slave_worker::slave_worker_ends_gro
   */
   DYNAMIC_ARRAY *ep= &curr_group_exec_parts;
 
-  for (int i= ep->elements; i > 0; i--)
+  for (uint i= 0; i < ep->elements; i++)
   {
     db_worker_hash_entry *entry=
-      (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i - 1));
+      *((db_worker_hash_entry **) dynamic_array_ptr(ep, i));
+
     mysql_mutex_lock(&slave_worker_hash_lock);
 
-    DBUG_ASSERT(entry && entry->usage != 0);
+    DBUG_ASSERT(entry);
+
+    entry->usage --;
 
-    entry->usage--;
+    DBUG_ASSERT(entry->usage >= 0);
 
     if (entry->usage == 0)
     {
+      usage_partition--;
       /*
         The detached entry's temp table list, possibly updated, remains 
         with the entry at least until time Coordinator will deallocate it 
         from the hash, that is either due to stop or extra size of the hash.
       */
-
+      DBUG_ASSERT(usage_partition >= 0);
       DBUG_ASSERT(this->info_thd->temporary_tables == 0);
       DBUG_ASSERT(!entry->temporary_tables ||
                   !entry->temporary_tables->prev);
 
-      usage_partition--;
       if (entry->worker != this) // Coordinator is waiting
       {
 #ifndef DBUG_OFF
@@ -792,9 +804,17 @@ void Slave_worker::slave_worker_ends_gro
       DBUG_ASSERT(usage_partition != 0);
 
     mysql_mutex_unlock(&slave_worker_hash_lock);
+  }
 
-    delete_dynamic_element(ep, i - 1);
+  if (ep->elements > ep->max_element)
+  {
+    // reallocate to lessen mem
+    ep->elements= ep->max_element;
+    ep->max_element= 0;
+    freeze_size(ep); // restores max_element
   }
+  ep->elements= 0;
+
   curr_group_seen_begin= FALSE;
 }
 
@@ -1052,6 +1072,7 @@ void Slave_worker::report(loglevel level
    for the assigned to Workers tasks to be completed and their
    resources such as temporary tables be returned to Coordinator's
    repository.
+   In case all workers are waited Coordinator changes its group status.
 
    @param  rli     Relay_log_info instance of Coordinator
    @param  ignore  Optional Worker instance pointer if the sequential context
@@ -1114,5 +1135,9 @@ int wait_for_workers_to_finish(Relay_log
     mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
     entry->temporary_tables= NULL;
   }
+
+  if (!ignore)
+    const_cast<Relay_log_info*>(rli)->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
+
   return ret;
 }

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli_pdb.h	2011-06-05 17:01:51 +0000
@@ -13,7 +13,7 @@ typedef struct st_db_worker_hash_entry
   uint  db_len;
   const char *db;
   Slave_worker *worker;
-  ulong usage;
+  long usage;
   /*
     The list of temp tables belonging to @ db database is
     attached to an assigned @c worker to become its thd->temporary_tables.
@@ -34,7 +34,7 @@ bool init_hash_workers(ulong slave_paral
 void destroy_hash_workers(Relay_log_info*);
 Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
                                db_worker_hash_entry **ptr_entry,
-                               bool need_temp_tables);
+                               bool need_temp_tables, Slave_worker *w);
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
 int wait_for_workers_to_finish(Relay_log_info const *rli,
                                Slave_worker *ignore= NULL);
@@ -262,7 +262,7 @@ public:
   ulong stmt_jobs;  // how many jobs per stmt
   ulong trans_jobs;  // how many jobs per trns
   volatile int curr_jobs; // the current assignments
-  ulong usage_partition; // number of different partitions handled by this worker
+  long usage_partition; // number of different partitions handled by this worker
   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
   volatile bool checkpoint_notified; // Coord sets and resets, W can read
   bool wq_overrun_set;  // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
@@ -279,7 +279,13 @@ public:
   ulong checkpoint_seqno;
   MY_BITMAP group_execed;
   bool inited_group_execed;
-  volatile bool  running_status; // TRUE when Worker is read-exec loop
+  enum en_running_state
+  {
+    NOT_RUNNING= 0,
+    RUNNING= 1,
+    KILLED
+  };
+  en_running_state volatile running_status;
   Log_event *last_event;
 
   int init_info();

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-05-30 10:05:07 +0000
+++ b/sql/rpl_slave.cc	2011-06-05 17:01:51 +0000
@@ -1079,7 +1079,7 @@ static bool sql_slave_killed(THD* thd, R
   DBUG_ENTER("sql_slave_killed");
 
   DBUG_ASSERT(rli->info_thd == thd);
-  DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
+  DBUG_ASSERT(rli->slave_running == 1);
   if (abort_loop || thd->killed || rli->abort_slave)
   {
     /*
@@ -2837,12 +2837,11 @@ int apply_event_and_update_pos(Log_event
       {
         Slave_job_item item= {ev}, *job_item= &item;
         Slave_worker *w= (Slave_worker *) ev->worker;
-        bool need_sync=
-          (ev->mts_number_dbs() == OVER_MAX_DBS_IN_EVENT_MTS) &&
-          ev->get_mts_event_ends_group();
+        // specially marked end of B-less group event requires sync with worker
+        bool need_sync= ev->mts_is_event_isolated();
 
-        DBUG_ASSERT(!(ev->ends_group() || !rli->curr_group_seen_begin) ||
-                    ((Slave_worker*) ev->worker) == rli->last_assigned_worker);
+        // all events except BEGIN-query must be marked with a non-NULL Worker
+        DBUG_ASSERT(((Slave_worker*) ev->worker) == rli->last_assigned_worker);
 
         DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
 
@@ -2873,9 +2872,6 @@ int apply_event_and_update_pos(Log_event
           if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
           {
             // reallocate to less mem
-            
-            DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
-          
             rli->curr_group_da.elements= rli->curr_group_da.max_element;
             rli->curr_group_da.max_element= 0;
             freeze_size(&rli->curr_group_da); // restores max_element
@@ -2891,9 +2887,12 @@ int apply_event_and_update_pos(Log_event
         {
           /*
             combination of over-max db:s and end of the current group
-            forces to wait for the group completion by the assigned worker.
+            forces to wait for the assigned groups completion by assigned
+            to the event worker.
+            Indeed MTS group status could be safely set to MTS_NOT_IN_GROUP
+            after wait_() returns.
           */
-          (void) wait_for_workers_to_finish(rli, w);
+          (void) wait_for_workers_to_finish(rli);
         }
 
       }
@@ -2961,7 +2960,7 @@ int apply_event_and_update_pos(Log_event
     }
     else
     {
-      DBUG_ASSERT(rli->is_parallel_exec());
+      DBUG_ASSERT(*ptr_ev == ev || rli->is_parallel_exec());
       /* 
          event_relay_log_pos is an anchor to possible reading restart.
          It may become lt than group_* value.
@@ -3036,7 +3035,7 @@ static int exec_relay_log_event(THD* thd
    */
   mysql_mutex_lock(&rli->data_lock);
 
-  Log_event *ev = next_event(rli), **ptr_ev= &ev;
+  Log_event *ev = next_event(rli), **ptr_ev;
 
   DBUG_ASSERT(rli->info_thd==thd);
 
@@ -3050,6 +3049,7 @@ static int exec_relay_log_event(THD* thd
   {
     int exec_res;
 
+    ptr_ev= &ev;
     /*
       Even if we don't execute this event, we keep the master timestamp,
       so that seconds behind master shows correct delta (there are events
@@ -3787,14 +3787,14 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_unlock(&LOCK_thread_count);
 
   mysql_mutex_lock(&w->jobs_lock);
-  w->running_status= TRUE;           // ready for duty
+  w->running_status= Slave_worker::RUNNING;
   mysql_cond_signal(&w->jobs_cond);
 
   mysql_mutex_unlock(&w->jobs_lock);
 
   DBUG_ASSERT(thd->is_slave_error == 0);
 
-  while (!thd->killed && !error)
+  while (!error)
   {
       error= slave_worker_exec_job(w, rli);
   }
@@ -3805,7 +3805,6 @@ pthread_handler_t handle_slave_worker(vo
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
-    thd->clear_error();
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3831,7 +3830,7 @@ pthread_handler_t handle_slave_worker(vo
 
   mysql_mutex_lock(&w->jobs_lock);
 
-  w->running_status= 0;
+  w->running_status= Slave_worker::NOT_RUNNING;
   sql_print_information("Worker %lu statistics: "
                         "events processed = %lu "
                         "hungry waits = %lu "
@@ -3845,6 +3844,7 @@ err:
 
   if (thd)
   {
+    thd->clear_error();
     mysql_mutex_lock(&LOCK_thread_count);
     THD_CHECK_SENTRY(thd);
     /*
@@ -4039,6 +4039,11 @@ err:
    Processing rli->gaq to find out the low-water-mark coordinates
    stored into the cental recovery table.
 
+   @param rli    pointer to Relay-log-info of Coordinator
+   @param period period of processing GAQ, normally derived from 
+                 @c mts_checkpoint_period
+   @param force  if TRUE then hang in a loop till some some progress
+   @param locked TRUE if rli->data_lock mutex is aquired by the caller.
 
    @return FALSE success, TRUE otherwise
 */
@@ -4199,7 +4204,7 @@ int slave_start_single_worker(Relay_log_
   }
   
   mysql_mutex_lock(&w->jobs_lock);
-  if (!w->running_status)
+  if (w->running_status == Slave_worker::NOT_RUNNING)
     mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
   mysql_mutex_unlock(&w->jobs_lock);
   // Least occupied inited with zero
@@ -4256,6 +4261,7 @@ int slave_start_workers(Relay_log_info *
   rli->mts_total_groups= 0;
   rli->curr_group_seen_begin= FALSE;
   rli->checkpoint_seqno= 0;
+  rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
   /*
     dyn memory to consume by Coordinator per event
   */
@@ -4299,6 +4305,12 @@ void slave_stop_workers(Relay_log_info *
   if (rli->slave_parallel_workers == 0) 
     return;
   
+  /*
+    this is the soft stop. In order for waiting be successful Coordinator
+    needs (*TODO*) to guarantee Workers were assigned with full groups.
+  */
+  // (void) wait_for_workers_to_finish(rli);
+
   for (i= rli->workers.elements - 1; i >= 0; i--)
   {
     Slave_worker *w;
@@ -4306,19 +4318,21 @@ void slave_stop_workers(Relay_log_info *
     
     mysql_mutex_lock(&w->jobs_lock);
     
-    if (!w->running_status)
+    if (w->running_status != Slave_worker::RUNNING)
     {
       mysql_mutex_unlock(&w->jobs_lock);
       continue;
     }
+
+    w->running_status= Slave_worker::KILLED;
+    mysql_cond_signal(&w->jobs_cond);
+
     mysql_mutex_unlock(&w->jobs_lock);
-    sql_print_information("Notifying Worker %lu to exit", w->id);
-    
-    mysql_mutex_lock(&w->info_thd->LOCK_thd_data);
-    w->info_thd->awake(THD::KILL_QUERY);
-    mysql_mutex_unlock(&w->info_thd->LOCK_thd_data);
+
+    sql_print_information("Notifying Worker %lu to exit, thd %p", w->id,
+                          w->info_thd);
   }
-  
+
   thd_proc_info(thd, "Waiting for workers to exit");
 
   for (i= rli->workers.elements - 1; i >= 0; i--)
@@ -4327,9 +4341,12 @@ void slave_stop_workers(Relay_log_info *
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
 
     mysql_mutex_lock(&w->jobs_lock);
-    while (w->running_status)
+    while (w->running_status != Slave_worker::NOT_RUNNING)
     {
       const char *save_proc_info;
+
+      DBUG_ASSERT(w->running_status == Slave_worker::KILLED);
+
       save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
                                       "Waiting for workers to exit");
       mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
@@ -4358,6 +4375,7 @@ void slave_stop_workers(Relay_log_info *
   DBUG_ASSERT(rli->pending_jobs == 0);
   DBUG_ASSERT(rli->mts_pending_jobs_size == 0);
 
+  rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
   destroy_hash_workers(rli);
   delete rli->gaq;
   delete_dynamic(&rli->least_occupied_workers);    // least occupied


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110605170151-eo9khhhrzf9op05z.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3281) WL#5569Andrei Elkin6 Jun