List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:June 8 2011 8:18pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3283) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3283 Andrei Elkin	2011-06-08
      wl#5569 MTS
      
      The following has been done with this patch.
      Some cleanup;
      Sceleton to fix a recovery flaw that discussed with Alfranio;
      Fixes for Blackhole (reviewer warned!);
      Finalization of turning STOP-SLAVE to end MTS with consistent state.
      covering old Load-data related events with possible erroring out;
      Fixing circular replication. The failure was due to the substituting an ignored event
      Rotate was not treated as one that changes group_master_log coordinates (therefore
      MTS-checkpoint).
      Three tests were blocked for MTS execution with comments why. They (some) might
        be converted to be MTS-friendly though.
     @ mysql-test/suite/rpl/t/rpl_bug26395.test
        making the test executed only in STS (single-threaded slave).
     @ mysql-test/suite/rpl/t/rpl_cross_version.test
        making the test executed only in STS (single-threaded slave).
     @ mysql-test/suite/rpl/t/rpl_packet.test
        making the test executed only in STS (single-threaded slave).
     @ mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
        Test can run in both MTS and STS modes but required
        changes due to a possibility MTS be stopped with its own
        error different that a failing STS expects.
        
        Another issue *not-sorted-yet* with this test is it requires
        recovery to work flawlessly. Leaving further care of it to Alfranio.
     @ sql/binlog.cc
        Error constant names changed.
     @ sql/log_event.cc
        Initialization of the relocated to Log_event::m_mts_event_ends_group;
        APPEND_BLOCK_EVENT will be accepted but never assigned because the following
        *old* server EXEC_LOAD_EVENT can't be made supported yet.
        See rpl_cross_version.test as example how MTS could fail with ER_MTS_CANT_PARALLEL error.
        Part of sceleton patch for recovery - rli->reset_notified_checkpoint(0).
        And, making substituting an ignored event server_id=0 Rotate to enter the checkpoint branch
        with modifying the if condition.
     @ sql/log_event.h
        some cleanup to shift a piece of definitions to more appropriate position;
        Generalizing a special marking for group that is not started with BEGIN
        and contains multiple events. Although such group deals with Query_log_event,
        the bool member is moved to the base class and renamed: Log_event::m_mts_event_ends_group.
     @ sql/rpl_rli.cc
        Part of sceleton patch for recovery - +    if (w->checkpoint_notified) etc.
     @ sql/rpl_rli.h
        reset_notified_checkpoint() accepts an arg of how many bits to shift for all workers in
        their bitmap.
     @ sql/rpl_rli_pdb.cc
        just a todo to convert to a piece of code for recovery.
     @ sql/rpl_rli_pdb.h
        Recovery related new members are added.
     @ sql/rpl_slave.cc
        comments and warnings relating to handling of stopped MTS.
     @ sql/share/errmsg-utf8.txt
        Error const:s names are editted;
        a new error is added.
     @ storage/blackhole/ha_blackhole.cc
        Blackhole's rule for slave thread is extended to allow MTS.

    modified:
      mysql-test/suite/rpl/t/rpl_bug26395.test
      mysql-test/suite/rpl/t/rpl_cross_version.test
      mysql-test/suite/rpl/t/rpl_packet.test
      mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
      sql/binlog.cc
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/share/errmsg-utf8.txt
      storage/blackhole/ha_blackhole.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_bug26395.test'
--- a/mysql-test/suite/rpl/t/rpl_bug26395.test	2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/t/rpl_bug26395.test	2011-06-08 20:18:08 +0000
@@ -37,6 +37,10 @@ source include/have_innodb.inc;
 source include/have_debug.inc;
 source include/master-slave.inc;
 
+# test adapts simulation of incomplete transaction that MTS does not tolerate
+# when is stopped. So it reacts with an error whereas the single-threaded is fine.
+-- source include/not_mts_slave_parallel_workers.inc
+
 
 --echo ==== Initialize ====
 
@@ -67,7 +71,6 @@ source include/sync_slave_io_with_master
 # Sync slave's SQL thread.
 sync_with_master 0;
 
-
 --echo ==== Verify results on slave ====
 
 source include/stop_slave.inc;

=== modified file 'mysql-test/suite/rpl/t/rpl_cross_version.test'
--- a/mysql-test/suite/rpl/t/rpl_cross_version.test	2010-12-19 17:15:12 +0000
+++ b/mysql-test/suite/rpl/t/rpl_cross_version.test	2011-06-08 20:18:08 +0000
@@ -17,6 +17,9 @@
 # Todo: release it from not_windows
 --source include/not_windows.inc
 
+# EXEC_LOAD_EVENT of 4.1 binlog can't be supported
+-- source include/not_mts_slave_parallel_workers.inc
+
 #
 # Bug#31240 load data infile replication between (4.0 or 4.1) and 5.1 fails
 #

=== modified file 'mysql-test/suite/rpl/t/rpl_packet.test'
--- a/mysql-test/suite/rpl/t/rpl_packet.test	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_packet.test	2011-06-08 20:18:08 +0000
@@ -11,6 +11,13 @@
 # max-out size db name 
 source include/master-slave.inc;
 source include/have_binlog_format_row.inc;
+
+# TODO: Fixing is handed over to Sergei.
+# The test runs slow in MTS mode because of state of MTS at time of the graceful stop.
+# In this case MTS can't stop immediately if there is a Worker that received a BEGIN but never COMMIT.
+-- source include/not_mts_slave_parallel_workers.inc
+
+
 call mtr.add_suppression("Slave I/O: Got a packet bigger than 'max_allowed_packet' bytes, Error_code: 1153");
 call mtr.add_suppression("Slave I/O: Got fatal error 1236 from master when reading data from binary log:");
 
@@ -283,6 +290,7 @@ eval SET @@global.max_allowed_packet= $o
 DROP TABLE t1;
 
 # Clear Last_IO_Error
+
 --source include/stop_slave_sql.inc
 RESET SLAVE;
 

=== modified file 'mysql-test/suite/rpl/t/rpl_slave_grp_exec.test'
--- a/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test	2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test	2011-06-08 20:18:08 +0000
@@ -63,8 +63,20 @@ SELECT * FROM t3 ORDER BY a;
 
 --connection slave
 # 1146 = ER_NO_SUCH_TABLE
---let $slave_sql_errno= 1146
---source include/wait_for_slave_sql_error.inc
+# in MTS case error is either of two:
+#--let $slave_sql_errno= 1146,1593
+# whereas in the single-threaded case:
+#--let $slave_sql_errno= 1146
+
+--source include/wait_for_slave_sql_to_stop.inc
+let $slave_sql_errno= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+if (`select $slave_sql_errno != 1146 and $slave_sql_errno != 1593`)
+{
+    --echo Unexpected error: $slave_sql_errno
+    --die
+}
+
+
 SHOW TABLES LIKE 't%';
 if (`SELECT @@BINLOG_FORMAT = 'ROW'`) {
 --replace_regex /AA/AA_for_row_or_XX_for_stmt_mixed/
@@ -110,8 +122,18 @@ UPDATE t1 SET b = 'X' WHERE a = 2;
 
 --connection slave
 # 1146 = ER_NO_SUCH_TABLE
---let $slave_sql_errno= 1146
---source include/wait_for_slave_sql_error.inc
+# in MTS case error is either of two:
+#--let $slave_sql_errno= 1146,1593
+# whereas in the single-threaded case:
+#--let $slave_sql_errno= 1146
+
+--source include/wait_for_slave_sql_to_stop.inc
+let $slave_sql_errno= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+if (`select $slave_sql_errno != 1146 and $slave_sql_errno != 1593`)
+{
+    --echo Unexpected error: $slave_sql_errno
+    --die
+}
 
 --connection master
 SELECT * FROM t1 ORDER BY a;
@@ -125,6 +147,8 @@ SELECT * FROM t2 ORDER BY a;
 
 --source include/stop_slave_io.inc
 RENAME TABLE t3_bak TO t3;
+
+# TODO: recovery. Alfranio it fails to recover here.
 --source include/start_slave.inc
 
 --connection master
@@ -156,8 +180,19 @@ COMMIT;
 
 --connection slave
 # 1146 = ER_NO_SUCH_TABLE
---let $slave_sql_errno= 1146
---source include/wait_for_slave_sql_error.inc
+# in MTS case error is either of two:
+#--let $slave_sql_errno= 1146,1593
+# whereas in the single-threaded case:
+#--let $slave_sql_errno= 1146
+
+--source include/wait_for_slave_sql_to_stop.inc
+let $slave_sql_errno= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+if (`select $slave_sql_errno != 1146 and $slave_sql_errno != 1593`)
+{
+    --echo Unexpected error: $slave_sql_errno
+    --die
+}
+
 
 --connection master
 SELECT * FROM t1 ORDER BY a;

=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc	2011-05-24 14:29:35 +0000
+++ b/sql/binlog.cc	2011-06-08 20:18:08 +0000
@@ -4550,8 +4550,8 @@ THD::add_to_binlog_updated_dbs(const cha
   if (binlog_accessed_db_names->elements >  MAX_DBS_IN_EVENT_MTS)
   {
     push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_WARN,
-                        ER_UPDATED_DBS_GREATER_MAX,
-                        ER(ER_UPDATED_DBS_GREATER_MAX),
+                        ER_MTS_UPDATED_DBS_GREATER_MAX,
+                        ER(ER_MTS_UPDATED_DBS_GREATER_MAX),
                         MAX_DBS_IN_EVENT_MTS);
     return;
   }

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-06-06 10:51:19 +0000
+++ b/sql/log_event.cc	2011-06-08 20:18:08 +0000
@@ -675,7 +675,7 @@ Log_event::Log_event(THD* thd_arg, uint1
 
 Log_event::Log_event()
   :temp_buf(0), exec_time(0), flags(0),  crc(0), thd(0),
-   checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+   checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), m_mts_event_ends_group(FALSE)
 {
   server_id=	::server_id;
   /*
@@ -695,7 +695,7 @@ Log_event::Log_event()
 Log_event::Log_event(const char* buf,
                      const Format_description_log_event* description_event)
   :temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE),
-    crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+   crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), m_mts_event_ends_group(FALSE)
 {
 #ifndef MYSQL_CLIENT
   thd = 0;
@@ -2359,7 +2359,7 @@ Log_event::continue_group(Relay_log_info
 
 bool Log_event::contains_partition_info()
 {
-  return get_type_code() == TABLE_MAP_EVENT ||
+  return (get_type_code() == TABLE_MAP_EVENT) ||
     (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group()) ||
     (get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
 }
@@ -2537,7 +2537,7 @@ Slave_worker *Log_event::get_slave_worke
       DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 ||
                   ret_worker->id == 0);
     }
-    else // int_, rand_, user_ var:s
+    else // int_, rand_, user_ var:s, load-data events
     {
       Log_event *ptr_curr_ev= this;
 
@@ -2545,7 +2545,8 @@ Slave_worker *Log_event::get_slave_worke
                   get_type_code() == RAND_EVENT ||
                   get_type_code() == USER_VAR_EVENT ||
                   get_type_code() == ROWS_QUERY_LOG_EVENT ||
-                  get_type_code() == BEGIN_LOAD_QUERY_EVENT);
+                  get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
+                  get_type_code() == APPEND_BLOCK_EVENT);
 
       insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
       
@@ -2629,6 +2630,8 @@ Slave_worker *Log_event::get_slave_worke
       strcpy(ptr_g->checkpoint_relay_log_name,
              rli->get_group_relay_log_name());
       ptr_g->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
+      ptr_g->shifted= ret_worker->bitmap_shifted;
+      ret_worker->bitmap_shifted= 0;
       ret_worker->checkpoint_notified= TRUE;
     }
     ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
@@ -2878,13 +2881,15 @@ int Log_event::apply_event(Relay_log_inf
              wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var.
              MTS has to stop to suggest restart in the permanent sequential mode.
           */
+          rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
+                      ER(ER_MTS_CANT_PARALLEL),
+                      get_type_code(), c_rli->get_event_relay_log_name(),
+                      c_rli->get_event_relay_log_pos());
+          
+          /* Coordinator cant continue, it marks MTS group status accordingly */
+          c_rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
 
-          // TODO: improve err msg
-          rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
-                      ER(ER_SLAVE_FATAL_ERROR),
-                      "Can't execute all binlog event in parallel mode");
-
-          // destroy possible buffered events of the current group prior to exit
+          /* destroy deferred events */
           for (uint k= 0; k < rli->curr_group_da.elements; k++)
           { 
             delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
@@ -3760,7 +3765,7 @@ Query_log_event::Query_log_event(const c
    auto_increment_increment(1), auto_increment_offset(1),
    time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
    table_map_for_update(0), master_data_written(0),
-   mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS), m_mts_query_ends_group(FALSE)
+   mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS)
 {
   ulong data_len;
   uint32 tmp;
@@ -6507,7 +6512,7 @@ int Rotate_log_event::do_update_pos(Rela
   if ((server_id != ::server_id || rli->replicate_same_server_id) &&
       !is_relay_log_event() &&
       ((!rli->is_parallel_exec() && !rli->is_in_group()) ||
-       rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP))
+       rli->mts_group_status != Relay_log_info::MTS_IN_GROUP))
   {
     mysql_mutex_lock(&rli->data_lock);
     DBUG_PRINT("info", ("old group_master_log_name: '%s'  "
@@ -6532,7 +6537,7 @@ int Rotate_log_event::do_update_pos(Rela
     mysql_mutex_unlock(&rli->data_lock);
     rli->flush_info(TRUE);  // todo: error branch
     if (rli->is_parallel_exec())
-      rli->reset_notified_checkpoint();
+      rli->reset_notified_checkpoint(0);
 
     /*
       Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-06-05 17:01:51 +0000
+++ b/sql/log_event.h	2011-06-08 20:18:08 +0000
@@ -1099,37 +1099,6 @@ public:
   {
     return thd ? thd->db : 0;
   }
-
-  /*
-    The method returns a list of updated by the event databases.
-    Other than in the case of Query-log-event the list is just one item.
-  */
-  virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
-  {
-    List<char> *res= new List<char>;
-    res->push_back(strdup_root(mem_root, get_db()));
-    return res;
-  }
-
-  /*
-    returns the number of updated by the event databases.
-    In other than Query-log-event case that's one.
-  */
-  virtual uint8 mts_number_dbs() { return 1; }
-
-  /*
-    Event can be exceptionally marked to force its execution.
-    in isolation from any other Workers.
-    Other than Query-log-event class should not have any implementation
-    of this method.
-  */
-  virtual void mts_do_isolate_event() { DBUG_ASSERT(0); }
-
-  /*
-    Verifying whether event is marked to execute in isolation.
-  */
-  virtual bool mts_is_event_isolated() { return FALSE; }
-
 #else
   Log_event() : temp_buf(0) {}
     /* avoid having to link mysqlbinlog against libpthread */
@@ -1257,6 +1226,10 @@ public:
      @note There are incompatile combinations such the referred event
            is wrapped with BEGIN/COMMIT. Such cases should be identified
            by the caller and treates as an error.
+
+           Notice, even though the func returns TRUE, some events
+           like old LOAD-DATA rooted EXEC_LOAD_EVENT can't run even
+           in isolated parallel mode and MTS would have to stop.
      
      @return TRUE if despite permanent parallel execution mode an event
                   needs applying in a real isolation that is sequentially.
@@ -1270,11 +1243,9 @@ public:
       get_type_code() == LOAD_EVENT              ||
       get_type_code() == SLAVE_EVENT             ||
       get_type_code() == CREATE_FILE_EVENT       ||
-      get_type_code() == APPEND_BLOCK_EVENT      ||
-      get_type_code() == EXEC_LOAD_EVENT         ||
       get_type_code() == DELETE_FILE_EVENT       ||
       get_type_code() == NEW_LOAD_EVENT          ||
-
+      get_type_code() == EXEC_LOAD_EVENT         ||
       get_type_code() == FORMAT_DESCRIPTION_EVENT||
 
       get_type_code() == INCIDENT_EVENT;
@@ -1318,6 +1289,45 @@ public:
   */
   Slave_worker *get_slave_worker_id(Relay_log_info *rli);
 
+  /*
+    The method returns a list of updated by the event databases.
+    Other than in the case of Query-log-event the list is just one item.
+  */
+  virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
+  {
+    List<char> *res= new List<char>;
+    res->push_back(strdup_root(mem_root, get_db()));
+    return res;
+  }
+
+  /*
+    returns the number of updated by the event databases.
+    In other than Query-log-event case that's one.
+  */
+  virtual uint8 mts_number_dbs() { return 1; }
+
+  /*
+    Event can be exceptionally marked to force its execution.
+    in isolation from any other Workers.
+    Other than Query-log-event class should not have any implementation
+    of this method.
+  */
+  /*
+    Event can be indentified as a group terminator and such fact
+    is memoried by the function.
+  */
+  virtual void mts_do_isolate_event()
+  { 
+    DBUG_ASSERT(get_type_code() == QUERY_EVENT ||
+                get_type_code() == EXEC_LOAD_EVENT ||
+                get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
+    m_mts_event_ends_group= TRUE;
+  }
+  /*
+    Verifying whether event is marked to execute in isolation.
+  */
+  virtual bool mts_is_event_isolated() { return m_mts_event_ends_group; }
+
   /**
      Apply the event to the database.
 
@@ -1451,6 +1461,8 @@ protected:
      non-zero. The caller shall decrease the counter by one.
    */
   virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+  
+  bool m_mts_event_ends_group;
 #endif
 };
 
@@ -1908,13 +1920,6 @@ public:
   uchar mts_accessed_dbs;
   char mts_accessed_db_names[MAX_DBS_IN_EVENT_MTS][NAME_LEN];
 
-  /*
-    Event can be indentified as a group terminator and such fact
-    is memoried by the function.
-  */
-  virtual void mts_do_isolate_event() { m_mts_query_ends_group= TRUE; }
-  virtual bool mts_is_event_isolated() { return m_mts_query_ends_group; }
-
 #ifdef MYSQL_SERVER
 
   Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
@@ -2024,9 +2029,6 @@ public:        /* !!! Public in this pat
       (!strncasecmp(query, STRING_WITH_LEN("ROLLBACK"))
        && strncasecmp(query, STRING_WITH_LEN("ROLLBACK TO ")));
   }
-private:
-  
-  bool m_mts_query_ends_group;
 };
 
 
@@ -3101,6 +3103,13 @@ public:
 #endif
   /* MTS executes this event sequentially */
   virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
+  virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
+  {
+    List<char> *res= new List<char>;
+    res->push_back(strdup_root(mem_root, ""));
+    return res;
+  }
+
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-06-06 10:51:19 +0000
+++ b/sql/rpl_rli.cc	2011-06-08 20:18:08 +0000
@@ -192,14 +192,19 @@ void Relay_log_info::reset_notified_rela
    to reset the current bitmap and starts using the clean bitmap
    indexed from zero of being reset checkpoint_seqno.
 */
-void Relay_log_info::reset_notified_checkpoint()
+void Relay_log_info::reset_notified_checkpoint(ulong shift)
 {
   if (!is_parallel_exec())
     return;
   for (uint i= 0; i < workers.elements; i++)
   {
     Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
-    w->checkpoint_notified= FALSE;
+    if (w->checkpoint_notified)
+    {
+      w->bitmap_shifted= 0;
+      w->checkpoint_notified= FALSE;
+    }
+    w->bitmap_shifted += shift; // to reset at passing the accumulate value into GAQ
   }
   checkpoint_seqno= 0;
 }
@@ -1036,7 +1041,7 @@ void Relay_log_info::stmt_done(my_off_t 
 
     flush_info(is_transactional() ? TRUE : FALSE); // Alfranio todo: error branch
     if (is_parallel_exec())
-      reset_notified_checkpoint();
+      reset_notified_checkpoint(0);
   }
 }
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-06-06 10:51:19 +0000
+++ b/sql/rpl_rli.h	2011-06-08 20:18:08 +0000
@@ -537,7 +537,7 @@ public:
      Coordinator notifies Workers about this event. Coordinator and Workers
      maintain a bitmap of executed group that is reset with a new checkpoint. 
   */
-  void reset_notified_checkpoint();
+  void reset_notified_checkpoint(ulong);
 
   /**
     Helper function to do after statement completion.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-06-06 10:51:19 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-06-08 20:18:08 +0000
@@ -225,6 +225,7 @@ bool Slave_worker::commit_positions(Log_
     my_free(ptr_g->checkpoint_relay_log_name);
     ptr_g->checkpoint_relay_log_name= NULL;
 
+    // TODO: shift `group_execed' << ptr_g->shifted
     bitmap_clear_all(&group_execed);
   }
   // extract an updated relay-log name to store in Worker's rli.

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-06-05 17:01:51 +0000
+++ b/sql/rpl_rli_pdb.h	2011-06-08 20:18:08 +0000
@@ -144,6 +144,7 @@ typedef struct st_slave_job_group
   my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
   char*    checkpoint_relay_log_name;
   volatile uchar done;  // Flag raised by W,  read and reset by C
+  ulong shifted;          // shift the last CP bitmap at receiving a new CP
 } Slave_job_group;
 
 #define retrieve_job(from, to) \
@@ -265,6 +266,7 @@ public:
   long usage_partition; // number of different partitions handled by this worker
   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
   volatile bool checkpoint_notified; // Coord sets and resets, W can read
+  ulong bitmap_shifted;   // shift the last bitmap at receiving new CP
   bool wq_overrun_set;  // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
   /*
     We need to make this a dynamic field. /Alfranio

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-06-06 10:51:19 +0000
+++ b/sql/rpl_slave.cc	2011-06-08 20:18:08 +0000
@@ -1067,6 +1067,12 @@ static bool io_slave_killed(THD* thd, Ma
    In the event of deffering decision @rli->last_event_start_time waiting
    timer is set to force the killed status be accepted upon its expiration.
 
+   Notice Multi-Threaded-Slave behaives similarly in that when it's being
+   stopped and the current group of assigned events has not yet scheduled 
+   completely, Coordinator deferres to accept to leave its read-distribute
+   state. The above timeout ensures waiting won't last endlessly, and in
+   such case an error is repoted.
+
    @param thd   pointer to a THD instance
    @param rli   pointer to Relay_log_info instance
 
@@ -1139,9 +1145,14 @@ static bool sql_slave_killed(THD* thd, R
         if (ret == 0)
         {
           rli->report(WARNING_LEVEL, 0,
+                      rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
                       "slave SQL thread is being stopped in the middle "
                       "of applying of a group having updated a non-transaction "
-                      "table; waiting for the group completion ... ");
+                      "table; waiting for the group completion ... "
+                      :
+                      "Coordinator thread of multi-threaded slave is being stopped in the middle "
+                      "of assigning a group of events; "
+                      "deferring to exit until the group completion ... ");
         }
         else
         {
@@ -1155,7 +1166,8 @@ static bool sql_slave_killed(THD* thd, R
       {
         ret= TRUE;
         rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
-                    msg_stopped);
+                    rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
+                    msg_stopped : msg_stopped_mts);
       }
     }
     else
@@ -3815,6 +3827,12 @@ pthread_handler_t handle_slave_worker(vo
   {
       error= slave_worker_exec_job(w, rli);
   }
+
+  /* 
+     Cleanup after an error requires clear_error() go first.
+     Otherwise assert(!all) in binlog_rollback()
+  */
+  thd->clear_error();
   w->cleanup_context(thd, error);
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3854,7 +3872,6 @@ err:
 
   if (thd)
   {
-    thd->clear_error();
     mysql_mutex_lock(&LOCK_thread_count);
     THD_CHECK_SENTRY(thd);
     /*
@@ -4136,7 +4153,7 @@ bool mts_checkpoint_routine(Relay_log_in
     } // end of commit_positions
   */
 
-  rli->reset_notified_checkpoint();
+  rli->reset_notified_checkpoint(cnt);
 
 end:
   set_timespec_nsec(rli->last_clock, 0);
@@ -4179,6 +4196,7 @@ int slave_start_single_worker(Relay_log_
   w->curr_group_exec_parts.elements= 0;
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
   w->checkpoint_notified= FALSE;
+  w->  bitmap_shifted= 0;
   w->workers= rli->workers; // shallow copying is sufficient
   w->this_worker= w;
   w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
@@ -5928,6 +5946,9 @@ static Log_event* next_event(Relay_log_i
       rli->set_future_event_relay_log_pos(my_b_tell(cur_log));
       ev->future_event_relay_log_pos= rli->get_future_event_relay_log_pos();
 
+      if (hot_log)
+        mysql_mutex_unlock(log_lock);
+
       /* 
          MTS checkpoint in the successful read branch 
       */
@@ -5935,11 +5956,10 @@ static Log_event* next_event(Relay_log_i
       if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 || force))
       {
         ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
+        mysql_mutex_unlock(&rli->data_lock);
         mts_checkpoint_routine(rli, period, force, TRUE); // TODO: ALFRANIO ERROR
+        mysql_mutex_lock(&rli->data_lock);
       }
-
-      if (hot_log)
-        mysql_mutex_unlock(log_lock);
       DBUG_RETURN(ev);
     }
     DBUG_ASSERT(thd==rli->info_thd);
@@ -6066,7 +6086,10 @@ static Log_event* next_event(Relay_log_i
 
           do
           {
+            mysql_mutex_unlock(log_lock);
             mts_checkpoint_routine(rli, period, FALSE, FALSE); // TODO: ALFRANIO ERROR
+            mysql_mutex_lock(log_lock);
+
             set_timespec_nsec(waittime, period);
             thd->enter_cond(log_cond, log_lock,
                             "Slave has read all relay log; "
@@ -6573,8 +6596,8 @@ int start_slave(THD* thd , Master_info* 
           {
             mi->rli->opt_slave_parallel_workers= 0;
             push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
-                                ER_NO_FEATURE_ON_PARALLEL_SLAVE,
-                                ER(ER_NO_FEATURE_ON_PARALLEL_SLAVE),
+                                ER_MTS_FEATURE_IS_NOT_SUPPORTED,
+                                ER(ER_MTS_FEATURE_IS_NOT_SUPPORTED),
                                 "UNTIL condtion",
                                 "Slave is started in the sequential execution mode.");
           }
@@ -6586,8 +6609,8 @@ int start_slave(THD* thd , Master_info* 
         if (mi->rli->opt_slave_parallel_workers != 0 && slave_trans_retries != 0)
         {
           push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
-                              ER_NO_FEATURE_ON_PARALLEL_SLAVE,
-                              ER(ER_NO_FEATURE_ON_PARALLEL_SLAVE),
+                              ER_MTS_FEATURE_IS_NOT_SUPPORTED,
+                              ER(ER_MTS_FEATURE_IS_NOT_SUPPORTED),
                               "Temporary failed transaction retry",
                               "Such failure will force the slave to stop.");
         }

=== modified file 'sql/share/errmsg-utf8.txt'
--- a/sql/share/errmsg-utf8.txt	2011-02-27 17:35:25 +0000
+++ b/sql/share/errmsg-utf8.txt	2011-06-08 20:18:08 +0000
@@ -6456,7 +6456,9 @@ ER_STMT_CACHE_FULL  
 ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX
   eng "Option binlog_stmt_cache_size (%lu) is greater than max_binlog_stmt_cache_size (%lu); setting binlog_stmt_cache_size equal to max_binlog_stmt_cache_size."
 
-ER_NO_FEATURE_ON_PARALLEL_SLAVE
+ER_MTS_FEATURE_IS_NOT_SUPPORTED
   eng "%s is not supported in Parallel Slave. %s"
-ER_UPDATED_DBS_GREATER_MAX
+ER_MTS_UPDATED_DBS_GREATER_MAX
   eng "Modified database names number exceeds the maximum %d; the names are not written into the replication event."
+ER_MTS_CANT_PARALLEL
+  eng "Can't execute the current event group in parallel mode running into event %s, relay-log name %s, position %s."

=== modified file 'storage/blackhole/ha_blackhole.cc'
--- a/storage/blackhole/ha_blackhole.cc	2010-10-06 14:34:28 +0000
+++ b/storage/blackhole/ha_blackhole.cc	2011-06-08 20:18:08 +0000
@@ -23,7 +23,7 @@
 #include "unireg.h"
 #include "probes_mysql.h"
 #include "ha_blackhole.h"
-#include "sql_class.h"                          // THD, SYSTEM_THREAD_SLAVE_SQL
+#include "sql_class.h"                          // THD, SYSTEM_THREAD_SLAVE_*
 
 /* Static declarations for handlerton */
 
@@ -118,7 +118,8 @@ int ha_blackhole::update_row(const uchar
 {
   DBUG_ENTER("ha_blackhole::update_row");
   THD *thd= ha_thd();
-  if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+  if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+       thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
     DBUG_RETURN(0);
   DBUG_RETURN(HA_ERR_WRONG_COMMAND);
 }
@@ -127,7 +128,8 @@ int ha_blackhole::delete_row(const uchar
 {
   DBUG_ENTER("ha_blackhole::delete_row");
   THD *thd= ha_thd();
-  if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+  if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+       thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
     DBUG_RETURN(0);
   DBUG_RETURN(HA_ERR_WRONG_COMMAND);
 }
@@ -146,7 +148,8 @@ int ha_blackhole::rnd_next(uchar *buf)
   MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
                        TRUE);
   THD *thd= ha_thd();
-  if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+  if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+       thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
     rc= 0;
   else
     rc= HA_ERR_END_OF_FILE;
@@ -236,7 +239,8 @@ int ha_blackhole::index_read_map(uchar *
   DBUG_ENTER("ha_blackhole::index_read");
   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
   THD *thd= ha_thd();
-  if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+  if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+       thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
     rc= 0;
   else
     rc= HA_ERR_END_OF_FILE;
@@ -253,7 +257,8 @@ int ha_blackhole::index_read_idx_map(uch
   DBUG_ENTER("ha_blackhole::index_read_idx");
   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
   THD *thd= ha_thd();
-  if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+  if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+       thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
     rc= 0;
   else
     rc= HA_ERR_END_OF_FILE;
@@ -269,7 +274,8 @@ int ha_blackhole::index_read_last_map(uc
   DBUG_ENTER("ha_blackhole::index_read_last");
   MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
   THD *thd= ha_thd();
-  if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+  if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+       thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
     rc= 0;
   else
     rc= HA_ERR_END_OF_FILE;


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110608201808-h9279ue551ngalo9.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3283) WL#5569Andrei Elkin9 Jun