List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:June 12 2011 5:36pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3285) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3285 Andrei Elkin	2011-06-12
      wl#5569 MTS
      
      I. Addressing a segfault in sp-binlogging stack that dealt with gathering accessed db:s.
         
         Fixed with specializing the main memroot of THD to hold the list of db:s. Also
         an optimization is done in not letting the gathering routine to execute if the actual binlog
         format is not STATEMENT.
         Rules of resetting the main memroot at the end of the top level query suffice, also
         replication-rooted footprint looks quite tollerable (16 max db:s).
      
         Using this chance, the remained issues of cleanup and some code simpifications
         as requested by a review mail are addressed too.
      
      II. Fixes motivated by failing tests and the genaral cleanup.
      
      1. Implementing exit with an error for a case MTS restarts from a position in relay-log 
         that does not correspond to a beginning of a group or a pattern of events sequence in
         the group is not considered to be valid e.g (very) old master binlog file.
         Such occurance could be detected only in a while after restart still not later than
         finding a terminating event for the group.
      
      2. STOP SLAVE, KILL Query `sql_thread`. Acceptance of the exit status by Coordinator is refined
         in sql_slave_killed() in line with wl#5569 requirements.
      
         Also graceful STOP SLAVE required changes in rpl_rli_pdb.cc
       
     @ mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
        left the test be in rpl yet still be useful in gathering benchmark statistics
        with removal of created files in the beginning not the end of the test.
        So the created files can be saved by MTR called in between of two successive runs.
        
        Decreased stress parameters to make default time run small.
     @ mysql-test/suite/rpl/r/rpl_parallel_benchmark.result
        results updated.
     @ mysql-test/suite/rpl/r/rpl_slave_grp_exec.result
        results updated.
     @ mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt
        added necessary to MTS option.
     @ mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test
        comments improved.
     @ mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
        Making the test to run in STS mode for a while with explanatory comments.
     @ sql/binlog.cc
        simpifications of alg of accessed db:s gathering.
     @ sql/events.cc
        simpifications of alg of accessed db:s gathering.
     @ sql/log_event.cc
        Further clean up in Log_event::apply_event() and Log_event::get_worker_id().
        Error branches in the former are merged to perform necessary common actions.
     @ sql/rpl_rli.h
        comments added.
     @ sql/rpl_rli_pdb.cc
        refining wait_for_workers_to_finish() is caused by a awake-wait loop of
        the terminating thread so Coordiantor could be awakened not by Worker.
        In that case it resumes to waiting.
     @ sql/sp.cc
        cleanup of alg of accessed db:s gathering.
     @ sql/sql_acl.cc
        deploying accessed db:s gathering for GRANT use cases.
        It's necessary to carry db:s mentioned in objects of the grant operations.
     @ sql/sql_class.h
        simpifications of alg of accessed db:s gathering.
     @ sql/sql_db.cc
        simpifications of alg of accessed db:s gathering.
     @ sql/sql_rename.cc
        simpifications of alg of accessed db:s gathering.
     @ sql/sql_table.cc
        simpifications of alg of accessed db:s gathering.
     @ sql/sql_trigger.cc
        simpifications of alg of accessed db:s gathering.
     @ sql/sql_view.cc
        simpifications of alg of accessed db:s gathering.

    added:
      mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt
    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
      mysql-test/suite/rpl/r/rpl_parallel_benchmark.result
      mysql-test/suite/rpl/r/rpl_slave_grp_exec.result
      mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test
      mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
      sql/binlog.cc
      sql/events.cc
      sql/log_event.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_slave.cc
      sql/sp.cc
      sql/sql_acl.cc
      sql/sql_class.h
      sql/sql_db.cc
      sql/sql_rename.cc
      sql/sql_table.cc
      sql/sql_trigger.cc
      sql/sql_view.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test	2011-06-05 17:01:51 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test	2011-06-12 17:36:17 +0000
@@ -6,13 +6,13 @@
 # load volume parameter
 #
 
-let $iter= 08;
-let $tables= 4;
-let $wk_i_queries= 4;
+let $iter= 02;
+let $tables= 2;
+let $wk_i_queries= 2;
 let $wk_m_queries= 0;
 let $nk_i_queries= 0;
 let $nk_m_queries= 0;
-let $pre_inserted_rows= 200;
+let $pre_inserted_rows= 50;
 
 connection slave;
 
@@ -264,14 +264,22 @@ let $wait_timeout= 600;
 let $wait_condition= SELECT count(*)+sleep(1) = 5 FROM test1.benchmark;
 source include/wait_condition.inc;
 
+
+let $MYSQLD_DATADIR= `select @@datadir`;
+
+# cleanup for files that could not be removed in the end of previous invocation.
+--remove_files_wildcard $MYSQLD_DATADIR *.out
+
 use test;
-select * from test1.benchmark into outfile 'benchmark.out';
+--replace_result $MYSQLD_DATADIR MYSQLD_DATADIR
+eval select * from test1.benchmark into outfile '$MYSQLD_DATADIR/benchmark.out';
 select ts from test1.benchmark where state like 'master started load' into @m_0;
 select ts from test1.benchmark where state like 'master ends load' into @m_1;
 select ts from test1.benchmark where state like 'slave takes on load' into @s_0;
 select ts from test1.benchmark where state like 'slave ends load' into @s_1;
-select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
-       time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
+--replace_result $MYSQLD_DATADIR MYSQLD_DATADIR
+eval select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
+       time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile '$MYSQLD_DATADIR/delta.out';
 
 --enable_result_log
 --enable_query_log

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_benchmark.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_benchmark.result	2011-04-02 11:32:02 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_benchmark.result	2011-06-12 17:36:17 +0000
@@ -6,12 +6,12 @@ include/stop_slave.inc
 start slave;
 stop slave sql_thread;
 use test;
-select * from test1.benchmark into outfile 'benchmark.out';
+select * from test1.benchmark into outfile 'MYSQLD_DATADIR/benchmark.out';
 select ts from test1.benchmark where state like 'master started load' into @m_0;
 select ts from test1.benchmark where state like 'master ends load' into @m_1;
 select ts from test1.benchmark where state like 'slave takes on load' into @s_0;
 select ts from test1.benchmark where state like 'slave ends load' into @s_1;
 select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
-time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
+time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'MYSQLD_DATADIR/delta.out';
 set @@global.mts_exp_slave_local_timestamp= @save.mts_exp_slave_local_timestamp;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/rpl/r/rpl_slave_grp_exec.result'
--- a/mysql-test/suite/rpl/r/rpl_slave_grp_exec.result	2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/r/rpl_slave_grp_exec.result	2011-06-12 17:36:17 +0000
@@ -29,7 +29,7 @@ a	b
 SELECT * FROM t3 ORDER BY a;
 a	b
 1	ZZ
-include/wait_for_slave_sql_error.inc [errno=1146]
+include/wait_for_slave_sql_to_stop.inc
 SHOW TABLES LIKE 't%';
 Tables_in_test (t%)
 t1
@@ -57,7 +57,7 @@ INSERT INTO t3 VALUES(2, 'B');
 INSERT INTO t2 VALUES(2, 'B');
 INSERT INTO t1 VALUES(2, 'B');
 UPDATE t1 SET b = 'X' WHERE a = 2;
-include/wait_for_slave_sql_error.inc [errno=1146]
+include/wait_for_slave_sql_to_stop.inc
 SELECT * FROM t1 ORDER BY a;
 a	b
 2	X
@@ -93,7 +93,7 @@ INSERT INTO t1 VALUES (3, 'C'), (4, 'D')
 INSERT INTO t2 VALUES (3, 'C'), (4, 'D');
 INSERT INTO t3 VALUES (3, 'C'), (4, 'D');
 COMMIT;
-include/wait_for_slave_sql_error.inc [errno=1146]
+include/wait_for_slave_sql_to_stop.inc
 SELECT * FROM t1 ORDER BY a;
 a	b
 3	C

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt	2011-06-12 17:36:17 +0000
@@ -0,0 +1 @@
+--slave-transaction-retries=0

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test	2011-06-10 08:04:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test	2011-06-12 17:36:17 +0000
@@ -5,7 +5,7 @@
 # The number is either @@global.slave_checkpoint_group or less if
 # @@global.slave_checkpoint_period timer elapses first.
 # The value updates *after* the last group commit is executed.
-# Resetting to zero behaviour when Slave goes to read events is
+# Resetting to zero behavior when Slave goes to read events is
 # preserved.
 #
 

=== modified file 'mysql-test/suite/rpl/t/rpl_slave_grp_exec.test'
--- a/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test	2011-06-08 20:18:08 +0000
+++ b/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test	2011-06-12 17:36:17 +0000
@@ -23,6 +23,12 @@
 --source include/master-slave.inc
 --echo
 
+# Test is MTS unfriendly because of 
+# a. incompatible with STS error reporting (MTS stop due to an error in applying
+#    causes inconsistency so the latter is reported)
+# b. failing recovery
+-- source include/not_mts_slave_parallel_workers.inc
+
 # Create tables and data
 --echo *** Preparing data ***
 --connection master

=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc	2011-06-08 20:18:08 +0000
+++ b/sql/binlog.cc	2011-06-12 17:36:17 +0000
@@ -4537,16 +4537,21 @@ THD::binlog_set_pending_rows_event(Rows_
 /**
    @param db    db name c-string to be inserted into abc-sorted
                 THD::binlog_accessed_db_names list.
-
-                Note, as the list node data (explicitly) so the node
-                struct itself (implicitly) are allocated in
-                thd->mem_root to be cleared at the end of the query
-                processing (@c THD::cleanup_after_query()).
+                
+                Note, as the list node data so the node
+                struct itself are allocated in THD::main_mem_root.
+                The list lasts for the top-level query time and resets
+                in @c THD::cleanup_after_query() and Query_log_event::write().
 */
 void
-THD::add_to_binlog_updated_dbs(const char *db)
+THD::add_to_binlog_accessed_dbs(const char *db)
 {
   char *after_db;
+  MEM_ROOT *db_mem_root= &main_mem_root;
+
+  if (!binlog_accessed_db_names)
+    binlog_accessed_db_names= new (db_mem_root) List<char>;
+
   if (binlog_accessed_db_names->elements >  MAX_DBS_IN_EVENT_MTS)
   {
     push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_WARN,
@@ -4556,7 +4561,13 @@ THD::add_to_binlog_updated_dbs(const cha
     return;
   }
 
-  after_db= strdup_root(mem_root, db);
+  after_db= strdup_root(db_mem_root, db);
+
+  /* 
+     sorted insertion is implemented with first rearranging data
+     (pointer to char*) of the links and final appending of the least
+     ordered data to create a new link in the list.
+  */
   if (binlog_accessed_db_names->elements != 0)
   {
     List_iterator<char> it(*get_binlog_accessed_db_names());
@@ -4583,7 +4594,7 @@ THD::add_to_binlog_updated_dbs(const cha
     }
   }
   if (after_db)
-    binlog_accessed_db_names->push_back(after_db);
+    binlog_accessed_db_names->push_back(after_db, &main_mem_root);
 }
 
 
@@ -4810,27 +4821,6 @@ int THD::decide_logging_format(TABLE_LIS
       prev_access_table= table->table;
 
     }
-    
-    /*
-      Master side of DML in the STMT format events parallelization.
-      All involving table db:s are stored in a abc-ordered name list.
-      In case the number of databases exceeds MAX_DBS_IN_EVENT_MTS maximum
-      the list gathering breaks since it won't be sent to the slave.
-    */
-    if (is_write && variables.binlog_format != BINLOG_FORMAT_ROW &&
-        lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
-    {
-      if (!binlog_accessed_db_names)
-      {
-        binlog_accessed_db_names= new List<char>; /* thd->mem_root is used */
-      }
-      for (TABLE_LIST *table= tables; table; table= table->next_global)
-      {
-        if (table->placeholder())
-          continue;
-        add_to_binlog_updated_dbs(table->db);
-      }
-    }
 
     DBUG_PRINT("info", ("flags_write_all_set: 0x%llx", flags_write_all_set));
     DBUG_PRINT("info", ("flags_write_some_set: 0x%llx", flags_write_some_set));
@@ -4965,6 +4955,23 @@ int THD::decide_logging_format(TABLE_LIS
       DBUG_PRINT("info", ("decision: no logging since an error was generated"));
       DBUG_RETURN(-1);
     }
+
+    if (is_write && !is_current_stmt_binlog_format_row() &&
+        lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
+    {
+      /*
+        Master side of DML in the STMT format events parallelization.
+        All involving table db:s are stored in a abc-ordered name list.
+        In case the number of databases exceeds MAX_DBS_IN_EVENT_MTS maximum
+        the list gathering breaks since it won't be sent to the slave.
+      */
+      for (TABLE_LIST *table= tables; table; table= table->next_global)
+      {
+        if (table->placeholder())
+          continue;
+        add_to_binlog_accessed_dbs(table->db);
+      }
+    }
     DBUG_PRINT("info", ("decision: logging in %s format",
                         is_current_stmt_binlog_format_row() ?
                         "ROW" : "STATEMENT"));

=== modified file 'sql/events.cc'
--- a/sql/events.cc	2011-05-24 14:29:35 +0000
+++ b/sql/events.cc	2011-06-12 17:36:17 +0000
@@ -384,7 +384,7 @@ Events::create_event(THD *thd, Event_par
       }
       else
       {
-        thd->add_one_db_to_binlog_updated_dbs(parse_data->dbname.str);
+        thd->add_to_binlog_accessed_dbs(parse_data->dbname.str);
         /* If the definer is not set or set to CURRENT_USER, the value of CURRENT_USER
            will be written into the binary log as the definer for the SQL thread. */
         ret= write_bin_log(thd, TRUE, log_query.c_ptr(), log_query.length());
@@ -504,10 +504,9 @@ Events::update_event(THD *thd, Event_par
       /* Binlog the alter event. */
       DBUG_ASSERT(thd->query() && thd->query_length());
 
-      thd->set_binlog_accessed_db_names(new List<char>);
-      thd->add_to_binlog_updated_dbs(parse_data->dbname.str);
+      thd->add_to_binlog_accessed_dbs(parse_data->dbname.str);
       if (new_dbname)
-        thd->add_to_binlog_updated_dbs(new_dbname->str);
+        thd->add_to_binlog_accessed_dbs(new_dbname->str);
 
       ret= write_bin_log(thd, TRUE, thd->query(), thd->query_length());
     }
@@ -576,7 +575,7 @@ Events::drop_event(THD *thd, LEX_STRING 
     /* Binlog the drop event. */
     DBUG_ASSERT(thd->query() && thd->query_length());
 
-    thd->add_one_db_to_binlog_updated_dbs(dbname.str);
+    thd->add_to_binlog_accessed_dbs(dbname.str);
     ret= write_bin_log(thd, TRUE, thd->query(), thd->query_length());
   }
   /* Restore the state of binlog format */

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-06-10 08:04:00 +0000
+++ b/sql/log_event.cc	2011-06-12 17:36:17 +0000
@@ -2408,6 +2408,7 @@ Slave_worker *Log_event::get_slave_worke
   bool is_b_event;
   int  num_dbs= 0;
   Slave_worker *ret_worker= NULL;
+  THD *thd= rli->info_thd;
 
   /* checking partioning properties and perform corresponding actions */
 
@@ -2469,7 +2470,7 @@ Slave_worker *Log_event::get_slave_worke
   {
     int i= 0;
     num_dbs= mts_number_dbs();
-    List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
+    List_iterator<char> it(*mts_get_dbs(thd->mem_root));
     it++;
 
     ret_worker= rli->last_assigned_worker;
@@ -2491,12 +2492,10 @@ Slave_worker *Log_event::get_slave_worke
                              get_type_code() == QUERY_EVENT,
                              ret_worker)))
       {
-        // destroy buffered events of the current group prior to exit
-        for (uint k= 0; k < rli->curr_group_da.elements; k++)
-        { 
-          delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
-        }
-
+        rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
+                    ER(ER_MTS_CANT_PARALLEL),
+                    get_type_str(), rli->get_event_relay_log_name(),
+                    rli->get_event_relay_log_pos());
         return ret_worker;
       }
 
@@ -2519,13 +2518,13 @@ Slave_worker *Log_event::get_slave_worke
 
     DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
 
-    // TODO: convert to C's private mem_root.
+    // TODO: convert to C's private mem_root to reset not per event but rather realrely.
 
     // Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
     // point because the root is no longer needed along remained part of Coordinator's
     // execution flow.
     
-    free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
+    free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
   }
   else 
   {
@@ -2541,12 +2540,21 @@ Slave_worker *Log_event::get_slave_worke
     {
       Log_event *ptr_curr_ev= this;
 
-      DBUG_ASSERT(get_type_code() == INTVAR_EVENT ||
-                  get_type_code() == RAND_EVENT ||
-                  get_type_code() == USER_VAR_EVENT ||
-                  get_type_code() == ROWS_QUERY_LOG_EVENT ||
-                  get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
-                  get_type_code() == APPEND_BLOCK_EVENT);
+      if (!(get_type_code() == INTVAR_EVENT ||
+            get_type_code() == RAND_EVENT ||
+            get_type_code() == USER_VAR_EVENT ||
+            get_type_code() == ROWS_QUERY_LOG_EVENT ||
+            get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
+            get_type_code() == APPEND_BLOCK_EVENT))
+      {
+        DBUG_ASSERT(!ret_worker);
+
+        rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
+                    ER(ER_MTS_CANT_PARALLEL),
+                    get_type_str(), rli->get_event_relay_log_name(),
+                    rli->get_event_relay_log_pos());
+        return ret_worker;
+      }
 
       insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
       
@@ -2822,14 +2830,17 @@ void append_item_to_jobs(slave_job_item 
    can't be decided yet.  In the single threaded sequential mode the
    event maps to SQL thread rli.
 
+   @note in case of MTS failure Coordinator destroys all gathered
+         deferred events.
+
    @return 0 as success, otherwise a failure.
 */
 int Log_event::apply_event(Relay_log_info const *rli)
 {
   DBUG_ENTER("LOG_EVENT:apply_event");
-  Slave_worker *w= NULL;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
   bool parallel= FALSE, async_event= FALSE, seq_event= FALSE;
+  THD *thd= c_rli->info_thd;
 
   worker= c_rli;
 
@@ -2884,19 +2895,13 @@ int Log_event::apply_event(Relay_log_inf
           */
           rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
                       ER(ER_MTS_CANT_PARALLEL),
-                      get_type_code(), c_rli->get_event_relay_log_name(),
+                      get_type_str(), c_rli->get_event_relay_log_name(),
                       c_rli->get_event_relay_log_pos());
           
           /* Coordinator cant continue, it marks MTS group status accordingly */
           c_rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
 
-          /* destroy deferred events */
-          for (uint k= 0; k < rli->curr_group_da.elements; k++)
-          { 
-            delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
-          }
-
-          DBUG_RETURN(-1);
+          goto err;
         }
         /*
           Marking sure the event will be executed in sequential mode.
@@ -2929,13 +2934,28 @@ int Log_event::apply_event(Relay_log_inf
   worker= NULL;
   c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
 
-  c_rli->last_assigned_worker= w= get_slave_worker_id(c_rli);
-  if (!w || DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
-    DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
+  worker= (Relay_log_info*)
+    (c_rli->last_assigned_worker= get_slave_worker_id(c_rli));
 
-  worker= (Relay_log_info*) w;
+err:
+  if (thd->is_error())
+  {
+    DBUG_ASSERT(!worker);
+
+    // destroy buffered events of the current group prior to exit
+    for (uint k= 0; k < rli->curr_group_da.elements; k++)
+    { 
+      delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+    }
+  }
+  else
+  {
+    DBUG_ASSERT(worker || rli->curr_group_assigned_parts.elements == 0);
+  }
 
-  DBUG_RETURN(FALSE);
+  DBUG_RETURN((!thd->is_error() ||
+               DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)) ?
+              0 : -1);
 }
 
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-06-10 08:04:00 +0000
+++ b/sql/rpl_rli.h	2011-06-12 17:36:17 +0000
@@ -493,7 +493,8 @@ public:
        includes Single-Threaded-Slave case.
     */
     MTS_NOT_IN_GROUP,
-    MTS_IN_GROUP,    /* at least one event was scheduled to a Worker */
+
+    MTS_IN_GROUP,    /* at least one not-terminal event scheduled to a Worker */
     MTS_END_GROUP,   /* the last scheduled event is a terminal event */
     MTS_KILLED_GROUP /* Coordinator gave out to reach MTS_END_GROUP */
   } mts_group_status;

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2011-06-08 20:18:08 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-06-12 17:36:17 +0000
@@ -726,6 +726,8 @@ Slave_worker *get_least_occupied_worker(
 
 void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
 {
+  DBUG_ENTER("Slave_worker::slave_worker_ends_group");
+
   if (!error)
   {
     ulong gaq_idx= ev->mts_group_cnt;
@@ -798,6 +800,9 @@ void Slave_worker::slave_worker_ends_gro
 #ifndef DBUG_OFF
         // TODO: open it! DBUG_ASSERT(usage_partition || !entry->worker->jobs.len);
 #endif
+        DBUG_PRINT("info",
+                   ("Notifying entry %p release by worker %lu", entry, this->id));
+
         mysql_cond_signal(&slave_worker_hash_cond);
       }
     }
@@ -828,6 +833,8 @@ void Slave_worker::slave_worker_ends_gro
     mysql_mutex_unlock(&c_rli->info_thd->LOCK_thd_data);
     mysql_mutex_unlock(&slave_worker_hash_lock);
   }
+
+  DBUG_VOID_RETURN;
 }
 
 
@@ -1106,6 +1113,9 @@ int wait_for_workers_to_finish(Relay_log
   THD *thd= rli->info_thd;
   const char info_format[]=
     "Waiting for Slave Worker %d to release partition `%s`";
+
+  DBUG_ENTER("wait_for_workers_to_finish");
+
   for (uint i= 0, ret= 0; i < hash->records; i++)
   {
     db_worker_hash_entry *entry;
@@ -1128,16 +1138,22 @@ int wait_for_workers_to_finish(Relay_log
 
     if (entry->usage > 0 && !thd->killed)
     {
+      long w_id= entry->worker->id;
       sprintf(wait_info, info_format, entry->worker->id, entry->db);
       entry->worker= NULL; // mark Worker to signal when  usage drops to 0
-
-      proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
-                               wait_info);
-      mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+      do
+      {
+        proc_info= thd->enter_cond(&slave_worker_hash_cond,
+                                   &slave_worker_hash_lock,
+                                   wait_info);
+        mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+        DBUG_PRINT("info",
+                   ("Either got awakened of notified: "
+                    "entry %p, usage %lu, worker %lu",
+                    entry, entry->usage, w_id));
+      } while (entry->usage != 0 && !thd->killed);
       thd->exit_cond(proc_info);
       ret++;
-
-      DBUG_ASSERT(entry->usage == 0 || thd->killed);
     }
     else
     {
@@ -1151,5 +1167,5 @@ int wait_for_workers_to_finish(Relay_log
   if (!ignore)
     const_cast<Relay_log_info*>(rli)->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
 
-  return ret;
+  DBUG_RETURN(ret);
 }

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-06-10 08:04:00 +0000
+++ b/sql/rpl_slave.cc	2011-06-12 17:36:17 +0000
@@ -1082,6 +1082,8 @@ static bool io_slave_killed(THD* thd, Ma
 static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
 {
   bool ret= FALSE;
+  bool is_parallel_group= FALSE;
+
   DBUG_ENTER("sql_slave_killed");
 
   DBUG_ASSERT(rli->info_thd == thd);
@@ -1096,10 +1098,12 @@ static bool sql_slave_killed(THD* thd, R
       as well.
       Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped.
     */
-    if ((!rli->is_parallel_exec() &&
+    if ((is_parallel_group= (rli->is_parallel_exec() &&
+                       rli->mts_group_status == Relay_log_info::MTS_IN_GROUP))
+        ||
+        (!rli->is_parallel_exec() && 
          (thd->transaction.all.modified_non_trans_table ||
-          (thd->variables.option_bits & OPTION_KEEP_LOG)) && rli->is_in_group())
-        || (rli->mts_group_status == Relay_log_info::MTS_IN_GROUP))
+          (thd->variables.option_bits & OPTION_KEEP_LOG)) && rli->is_in_group()))
     {
       char msg_stopped[]=
         "... The slave SQL is stopped, leaving the current group "
@@ -1145,7 +1149,7 @@ static bool sql_slave_killed(THD* thd, R
         if (ret == 0)
         {
           rli->report(WARNING_LEVEL, 0,
-                      rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
+                      !is_parallel_group ?
                       "slave SQL thread is being stopped in the middle "
                       "of applying of a group having updated a non-transaction "
                       "table; waiting for the group completion ... "
@@ -1158,16 +1162,14 @@ static bool sql_slave_killed(THD* thd, R
         {
           rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
                       ER(ER_SLAVE_FATAL_ERROR),
-                      rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
-                      msg_stopped : msg_stopped_mts);
+                      !is_parallel_group ? msg_stopped : msg_stopped_mts);
         }
       }
       else
       {
         ret= TRUE;
         rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
-                    rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
-                    msg_stopped : msg_stopped_mts);
+                    !is_parallel_group ? msg_stopped : msg_stopped_mts);
       }
     }
     else

=== modified file 'sql/sp.cc'
--- a/sql/sp.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sp.cc	2011-06-12 17:36:17 +0000
@@ -1150,7 +1150,7 @@ sp_create_routine(THD *thd, int type, sp
       }
       /* restore sql_mode when binloging */
       thd->variables.sql_mode= saved_mode;
-      thd->add_one_db_to_binlog_updated_dbs(sp->m_db.str);
+      thd->add_to_binlog_accessed_dbs(sp->m_db.str);
       /* Such a statement can always go directly to binlog, no trans cache */
       if (thd->binlog_query(THD::STMT_QUERY_TYPE,
                             log_query.c_ptr(), log_query.length(),
@@ -1224,7 +1224,7 @@ sp_drop_routine(THD *thd, int type, sp_n
 
   if (ret == SP_OK)
   {
-    thd->add_one_db_to_binlog_updated_dbs(name->m_db.str);
+    thd->add_to_binlog_accessed_dbs(name->m_db.str);
     if (write_bin_log(thd, TRUE, thd->query(), thd->query_length()))
       ret= SP_INTERNAL_ERROR;
     sp_cache_invalidate();

=== modified file 'sql/sql_acl.cc'
--- a/sql/sql_acl.cc	2010-12-17 11:28:59 +0000
+++ b/sql/sql_acl.cc	2011-06-12 17:36:17 +0000
@@ -3687,6 +3687,7 @@ int mysql_table_grant(THD *thd, TABLE_LI
     should_write_to_binlog= TRUE;
 
     db_name= table_list->get_db_name();
+    thd->add_to_binlog_accessed_dbs(db_name); // collecting db:s for MTS
     table_name= table_list->get_table_name();
 
     /* Find/create cached table grant */
@@ -3910,8 +3911,9 @@ bool mysql_routine_grant(THD *thd, TABLE
     }
 
     db_name= table_list->db;
+    if (write_to_binlog)
+      thd->add_to_binlog_accessed_dbs(db_name);
     table_name= table_list->table_name;
-
     grant_name= routine_hash_search(Str->host.str, NullS, db_name,
                                     Str->user.str, table_name, is_proc, 1);
     if (!grant_name)
@@ -4098,6 +4100,7 @@ bool mysql_grant(THD *thd, const char *d
 	my_error(ER_WRONG_USAGE, MYF(0), "DB GRANT", "GLOBAL PRIVILEGES");
 	result= -1;
       }
+      thd->add_to_binlog_accessed_dbs(db);
     }
     else if (is_proxy)
     {

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-05-24 14:29:35 +0000
+++ b/sql/sql_class.h	2011-06-12 17:36:17 +0000
@@ -1744,31 +1744,13 @@ public:
   }
 
   /*
-     MTS: initializer of binlog_accessed_db_names list
-  */
-  void set_binlog_accessed_db_names(List<char>* arg)
-  {
-    binlog_accessed_db_names= arg;
-  }
-
-  /*
      MTS: resetter of binlog_accessed_db_names list normally
      at the end of the query execution
   */
   void clear_binlog_accessed_db_names() { binlog_accessed_db_names= NULL; }
 
   /* MTS: method inserts a new unique name into binlog_updated_dbs */
-  void add_to_binlog_updated_dbs(const char *db);
-
-  /* 
-     MTS: method shortcuts initialization and insertion of just one db name
-     into binlog_updated_dbs
-  */
-  void add_one_db_to_binlog_updated_dbs(const char *db)
-  {
-    set_binlog_accessed_db_names(new List<char>);
-    binlog_accessed_db_names->push_back(strdup_root(mem_root, db));
-  }
+  void add_to_binlog_accessed_dbs(const char *db);
 
 #endif /* MYSQL_CLIENT */
 

=== modified file 'sql/sql_db.cc'
--- a/sql/sql_db.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sql_db.cc	2011-06-12 17:36:17 +0000
@@ -660,7 +660,7 @@ not_silent:
       */
       qinfo.db     = db;
       qinfo.db_len = strlen(db);
-      thd->add_one_db_to_binlog_updated_dbs(db);
+      thd->add_to_binlog_accessed_dbs(db);
       /*
         These DDL methods and logging are protected with the exclusive
         metadata lock on the schema
@@ -964,7 +964,7 @@ update_binlog:
 
     if (query_pos != query_data_start)
     {
-      thd->add_one_db_to_binlog_updated_dbs(db);
+      thd->add_to_binlog_accessed_dbs(db);
       /*
         These DDL methods and logging are protected with the exclusive
         metadata lock on the schema.

=== modified file 'sql/sql_rename.cc'
--- a/sql/sql_rename.cc	2011-05-24 14:29:35 +0000
+++ b/sql/sql_rename.cc	2011-06-12 17:36:17 +0000
@@ -318,12 +318,8 @@ do_rename(THD *thd, TABLE_LIST *ren_tabl
       break;
   }
 
-  if (!thd->get_binlog_accessed_db_names())
-  {
-    thd->set_binlog_accessed_db_names(new List<char>);
-  }
-  thd->add_to_binlog_updated_dbs(ren_table->db);
-  thd->add_to_binlog_updated_dbs(new_db);
+  thd->add_to_binlog_accessed_dbs(ren_table->db);
+  thd->add_to_binlog_accessed_dbs(new_db);
 
   if (rc && !skip_error)
     DBUG_RETURN(1);

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2011-05-24 14:29:35 +0000
+++ b/sql/sql_table.cc	2011-06-12 17:36:17 +0000
@@ -2236,12 +2236,7 @@ int mysql_rm_table_no_locks(THD *thd, TA
                   find_temporary_table(thd, table) &&
                   table->mdl_request.ticket != NULL));
 
-    /* MTS: similarly to decide_logging_format() gathering of the db names */
-    if (!thd->get_binlog_accessed_db_names())
-    {
-      thd->set_binlog_accessed_db_names(new List<char>);
-    }
-    thd->add_to_binlog_updated_dbs(table->db);
+    thd->add_to_binlog_accessed_dbs(table->db);
 
     /*
       drop_temporary_table may return one of the following error codes:
@@ -4570,7 +4565,7 @@ bool mysql_create_table(THD *thd, TABLE_
        (thd->is_current_stmt_binlog_format_row() &&
         !(create_info->options & HA_LEX_CREATE_TMP_TABLE))))
   {
-    thd->add_one_db_to_binlog_updated_dbs(create_table->db);
+    thd->add_to_binlog_accessed_dbs(create_table->db);
     result= write_bin_log(thd, TRUE, thd->query(), thd->query_length(), is_trans);
   }
 
@@ -5953,13 +5948,9 @@ bool mysql_alter_table(THD *thd,char *ne
   if (!new_db || !my_strcasecmp(table_alias_charset, new_db, db))
     new_db= db;
 
-  if (!thd->get_binlog_accessed_db_names())
-  {
-    thd->set_binlog_accessed_db_names(new List<char>);
-  }
-  thd->add_to_binlog_updated_dbs(db);
+  thd->add_to_binlog_accessed_dbs(db);
   if (new_db != db)
-    thd->add_to_binlog_updated_dbs(new_db);
+    thd->add_to_binlog_accessed_dbs(new_db);
 
   build_table_filename(reg_path, sizeof(reg_path) - 1, db, table_name, reg_ext, 0);
   build_table_filename(path, sizeof(path) - 1, db, table_name, "", 0);

=== modified file 'sql/sql_trigger.cc'
--- a/sql/sql_trigger.cc	2011-02-27 17:35:25 +0000
+++ b/sql/sql_trigger.cc	2011-06-12 17:36:17 +0000
@@ -522,7 +522,7 @@ end:
   if (!result)
   {
     if (tables)
-      thd->add_one_db_to_binlog_updated_dbs(tables->db);
+      thd->add_to_binlog_accessed_dbs(tables->db);
     result= write_bin_log(thd, TRUE, stmt_query.ptr(), stmt_query.length());
   }
 

=== modified file 'sql/sql_view.cc'
--- a/sql/sql_view.cc	2011-05-24 14:29:35 +0000
+++ b/sql/sql_view.cc	2011-06-12 17:36:17 +0000
@@ -689,7 +689,7 @@ bool mysql_create_view(THD *thd, TABLE_L
     buff.append(views->source.str, views->source.length);
 
     int errcode= query_error_code(thd, TRUE);
-    thd->add_one_db_to_binlog_updated_dbs(views->db);
+    thd->add_to_binlog_accessed_dbs(views->db);
     if (thd->binlog_query(THD::STMT_QUERY_TYPE,
                           buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcode))
       res= TRUE;
@@ -1683,11 +1683,7 @@ bool mysql_drop_view(THD *thd, TABLE_LIS
       }
       continue;
     }
-    if (!thd->get_binlog_accessed_db_names())
-    {
-      thd->set_binlog_accessed_db_names(new List<char>);
-    }
-    thd->add_to_binlog_updated_dbs(view->db);
+    thd->add_to_binlog_accessed_dbs(view->db);
     if (mysql_file_delete(key_file_frm, path, MYF(MY_WME)))
       error= TRUE;
 


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110612173617-mfocuapqm7mb9n5b.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3285) WL#5569Andrei Elkin13 Jun