List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:February 12 2011 8:32am
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3272) WL#5754
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3272 Andrei Elkin	2011-02-12
      wl#5754 Query parallelization
      
      intermediate commit with some cleanup after removing --mts-exp-query-in-parall

    renamed:
      mysql-test/suite/rpl/r/rpl_parallel_query.result => mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
      mysql-test/suite/rpl/t/rpl_parallel_query.test => mysql-test/suite/rpl/t/rpl_parallel_temp_query.test
    modified:
      mysql-test/r/mysqld--help-notwin.result
      mysql-test/suite/rpl/t/rpl_packet.test
      mysql-test/suite/sys_vars/r/all_vars.result
      sql/binlog.cc
      sql/log_event.cc
      sql/log_event.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
      sql/sql_base.cc
      sql/sql_class.cc
      sql/sql_class.h
      sql/sys_vars.cc
=== modified file 'mysql-test/r/mysqld--help-notwin.result'
--- a/mysql-test/r/mysqld--help-notwin.result	2011-01-11 23:01:02 +0000
+++ b/mysql-test/r/mysqld--help-notwin.result	2011-02-12 08:32:05 +0000
@@ -362,9 +362,6 @@ The following options may be given as th
  If enabled slave itself computes the event appying time
  value to implicitly affected timestamp columms. Otherwise
  (default) it installs prescribed by the master value
- --mts-exp-slave-run-query-in-parallel 
- The default not an actual database name is used as
- partition info for parallel execution of Query_log_event 
  --mts-partition-hash-soft-max=# 
  Number of records in the mts partition hash below which
  entries with zero usage are tolerated
@@ -910,7 +907,6 @@ mts-checkpoint-group 512
 mts-checkpoint-period 300
 mts-coordinator-basic-nap 5
 mts-exp-slave-local-timestamp FALSE
-mts-exp-slave-run-query-in-parallel FALSE
 mts-partition-hash-soft-max 16
 mts-pending-jobs-size-max 16777216
 mts-slave-parallel-workers 0

=== renamed file 'mysql-test/suite/rpl/r/rpl_parallel_query.result' => 'mysql-test/suite/rpl/r/rpl_parallel_temp_query.result'
=== modified file 'mysql-test/suite/rpl/t/rpl_packet.test'
--- a/mysql-test/suite/rpl/t/rpl_packet.test	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_packet.test	2011-02-12 08:32:05 +0000
@@ -84,6 +84,11 @@ connection master;
 
 INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
 
+--disable_query_log
+--disable_result_log
+select sleep(300);
+--enable_result_log
+--enable_query_log
 
 #
 # Bug#42914: The slave I/O thread must stop after trying to read the above
@@ -169,6 +174,7 @@ if (`SELECT NOT(@max_allowed_packet_0 = 
 #
 
 connection slave;
+
 START SLAVE;
 --source include/wait_for_slave_to_start.inc
 

=== renamed file 'mysql-test/suite/rpl/t/rpl_parallel_query.test' => 'mysql-test/suite/rpl/t/rpl_parallel_temp_query.test'
=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result	2010-12-27 18:54:41 +0000
+++ b/mysql-test/suite/sys_vars/r/all_vars.result	2011-02-12 08:32:05 +0000
@@ -14,9 +14,7 @@ left join t1 on variable_name=test_name 
 There should be *no* variables listed below:
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
 MTS_EXP_SLAVE_LOCAL_TIMESTAMP
-MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME
@@ -25,18 +23,17 @@ INNODB_PRINT_ALL_DEADLOCKS
 INNODB_RESET_MONITOR_COUNTER
 MTS_SLAVE_PARALLEL_WORKERS
 MTS_WORKER_UNDERRUN_LEVEL
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
 INNODB_ENABLE_MONITOR_COUNTER
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
 MTS_COORDINATOR_BASIC_NAP
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
 MTS_EXP_SLAVE_LOCAL_TIMESTAMP
-MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME
@@ -45,12 +42,13 @@ INNODB_PRINT_ALL_DEADLOCKS
 INNODB_RESET_MONITOR_COUNTER
 MTS_SLAVE_PARALLEL_WORKERS
 MTS_WORKER_UNDERRUN_LEVEL
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
 INNODB_ENABLE_MONITOR_COUNTER
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
 MTS_COORDINATOR_BASIC_NAP
 drop table t1;
 drop table t2;

=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc	2011-02-03 09:28:27 +0000
+++ b/sql/binlog.cc	2011-02-12 08:32:05 +0000
@@ -4734,7 +4734,8 @@ int THD::decide_logging_format(TABLE_LIS
 
         prev_write_table= table->table;
 
-        if (variables.binlog_format != BINLOG_FORMAT_ROW)
+        if (variables.binlog_format != BINLOG_FORMAT_ROW &&
+            lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
         {
           /*
             Master side of the STMT format events parallelization.

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-02-03 09:28:27 +0000
+++ b/sql/log_event.cc	2011-02-12 08:32:05 +0000
@@ -2408,7 +2408,7 @@ bool Log_event::contains_partition_info(
    r - a mini-group internal "regular" event that follows its g-parent
       (Write, Update, Delete -rows)
    S - sequentially applied event (may not be a part of any group).
-       Events of this type are determined via @c only_sequential_exec()
+       Events of this type are determined via @c mts_sequential_exec()
        earlier and don't cause calling this method .
    T - terminator of the group (XID, COMMIT, ROLLBACK)
 
@@ -2490,7 +2490,9 @@ Slave_worker *Log_event::get_slave_worke
   if (contains_partition_info())
   {
     List_iterator<char> it(*mts_get_dbs());
-    while (it++)
+
+    it++;
+    do
     {
       char **ref_cur_db= it.ref();
       // a lot of things inside `get_slave_worker_id'
@@ -2504,9 +2506,8 @@ Slave_worker *Log_event::get_slave_worke
         
         DBUG_ASSERT(g.group_relay_log_name == NULL);
       }
-    }
+    } while (it++ && mts_number_dbs() != MAX_DBS_IN_QUERY_MTS + 1);
     // releasing the Coord's mem-root from the updated dbs
-    // if (get_type_code() == QUERY_EVENT)
     free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
   }
   else // r
@@ -2537,6 +2538,8 @@ Slave_worker *Log_event::get_slave_worke
       (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
+    if (!rli->curr_group_is_parallel)
+      sleep(1000);
     DBUG_ASSERT(rli->curr_group_is_parallel);
 
     // TODO: throw an error when relay-log reading starts from inside of a group!!
@@ -2790,53 +2793,50 @@ int Log_event::apply_event(Relay_log_inf
   Slave_worker *w= NULL;
   Slave_job_item item= {NULL}, *job_item= &item;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
-  bool parallel;
-  bool seq_event;
-  bool term_event;
+  bool parallel, seq_event, term_event;
+
+  if (rli->is_mts_recovery())
+  {
+    bool skip= _bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
+
+    if (ends_group()) // TODO: || ! seen_begin
+    {
+      c_rli->mts_recovery_index++;
+      if (--c_rli->mts_recovery_group_cnt == 0)
+      {
+        c_rli->recovery_parallel_workers= c_rli->slave_parallel_workers;
+        c_rli->mts_recovery_index= 0;
+      }
+    }
+    if (skip)
+      DBUG_RETURN(0);
+    else 
+      DBUG_RETURN(do_apply_event(rli));
+  }
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      rli->mts_recovery_group_cnt != 0 ||
-      ((seq_event=
-        only_sequential_exec(rli->run_query_in_parallel,
-                             rli->curr_group_seen_begin /* todo: obs 2nd arg */))
-       // rli->curr_group_seen_begin && ends_group() => rli->last_assigned_worker
-       && (!rli->curr_group_seen_begin || parallel_exec_by_coordinator(::server_id))))
+      ((seq_event= mts_sequential_exec()) &&
+       (!rli->curr_group_seen_begin ||
+        mts_async_exec_by_coordinator(::server_id))))
   {
     if (parallel)
     {
-      // This `only-sequential' case relates to a DDL Query case
-      // or a group split apart by FD event
-      DBUG_ASSERT(seq_event &&
-                  (rli->curr_group_da.elements == 0 || rli->curr_group_seen_begin));
-      
-      if (!parallel_exec_by_coordinator(::server_id))
+      if (!mts_async_exec_by_coordinator(::server_id))
       {
+        // current isn't group split and therefore requires Workers to sync
         DBUG_ASSERT(!rli->curr_group_seen_begin);
 
-        c_rli->curr_group_is_parallel= FALSE;   // Coord will destruct events
+        c_rli->curr_group_is_parallel= FALSE;
         (void) wait_for_workers_to_finish(rli);
       }
       else
       {
-        c_rli->curr_event_is_not_in_group= TRUE;
-      }
-    }
-    else if (rli->is_mts_recovery())
-    {
-      // recovery
-      bool skip= _bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
-
-      if (ends_group()) // todo: || rli->run_query_in_parallel && ! seen_begin
-      {
-        c_rli->mts_recovery_index++;
-        if (--c_rli->mts_recovery_group_cnt == 0)
+        if (rli->curr_group_is_parallel)
         {
-          c_rli->recovery_parallel_workers= c_rli->slave_parallel_workers;
-          c_rli->mts_recovery_index= 0;
+          c_rli->curr_group_split= TRUE;
+          c_rli->curr_group_is_parallel= FALSE;
         }
       }
-      if (skip)
-        DBUG_RETURN(0);
     }
     DBUG_RETURN(do_apply_event(rli));
   }
@@ -2845,10 +2845,6 @@ int Log_event::apply_event(Relay_log_inf
               rli->last_assigned_worker);
 
   /* 
-     Work-around:s for B, T,..., Q case and ROWS_QUERY_LOG_EVENT
-     A worker has been assigned but it needs sequential environment.
-
-     Todo: support Query parallelization.
      Todo: disassociate Rows_* events from the central rli.
   */
   if (seq_event)
@@ -2865,9 +2861,10 @@ int Log_event::apply_event(Relay_log_inf
         my_sleep(10);
       }
       c_rli->rows_query_ev= (Rows_query_log_event*) this;
-    }
-  }
+     }
+   }
 
+  // getting Worker's id
   if ((!(w= get_slave_worker_id(rli)) ||
        DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
     DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
@@ -3369,7 +3366,7 @@ bool Query_log_event::write(IO_CACHE* fi
     }
   }
 
-  if (thd->get_binlog_updated_db_names() != NULL)
+  if (thd && thd->get_binlog_updated_db_names() != NULL)
   {
     uchar dbs;
     *start++= Q_UPDATED_DB_NAMES;

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-02-03 09:28:27 +0000
+++ b/sql/log_event.h	2011-02-12 08:32:05 +0000
@@ -1089,6 +1089,8 @@ public:
     return res;
   }
 
+  virtual uchar mts_number_dbs() { return 1; }
+
 #else
   Log_event() : temp_buf(0) {}
     /* avoid having to link mysqlbinlog against libpthread */
@@ -1210,30 +1212,27 @@ public:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
 public:
 
+  virtual uchar mts_number_of_updated_dbs() { return 1; }
+
   /**
      MST: to execute serially due to technical or conceptual limitation
+     
+     TODO: add version of the server check to fall back to seq in the OM case.
 
      @return TRUE if despite permanent parallel execution mode an event
                   needs applying in a real isolation that is sequentially.
   */
-  bool only_sequential_exec(bool query_in_parallel, bool group_term_in_parallel)
+  bool mts_sequential_exec()
   {
     return
-       /* 
+      /* 
          the 4 types below are limitly parallel-supported (the default 
          session db not the actual db).
-         Decision on BEGIN is deferred till the following event.
-         Decision on Commit or Xid is forced by the one for BEGIN.
+         Decision on BEGIN, COMMIT, Xid is the parallel.
       */
-      
-      (!query_in_parallel &&
-       ((get_type_code() == QUERY_EVENT
-         && !starts_group() && !ends_group())    ||
-        get_type_code() == INTVAR_EVENT          ||
-        get_type_code() == USER_VAR_EVENT        ||
-        get_type_code() == RAND_EVENT))          ||
-
-      (!group_term_in_parallel && ends_group())  ||
+      (get_type_code() == QUERY_EVENT &&
+       !starts_group() && !ends_group() &&
+       (mts_number_of_updated_dbs() ==  MAX_DBS_IN_QUERY_MTS + 1)) ||
 
       get_type_code() == START_EVENT_V3          ||
       get_type_code() == STOP_EVENT              ||
@@ -1252,7 +1251,7 @@ public:
       get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
       get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
 
-      get_type_code() == ROWS_QUERY_LOG_EVENT    || /* todo: make parallel */
+      get_type_code() == ROWS_QUERY_LOG_EVENT    || /* TODO: make parallel */
 
       get_type_code() == INCIDENT_EVENT;
   }
@@ -1263,7 +1262,7 @@ public:
      @return TRUE  if that's the case,
              FALSE otherwise.
   */
-  bool parallel_exec_by_coordinator(ulong slave_server_id)
+  bool mts_async_exec_by_coordinator(ulong slave_server_id)
   {
     return
       (get_type_code() == FORMAT_DESCRIPTION_EVENT ||
@@ -1883,14 +1882,26 @@ public:
   Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
                   bool using_trans, bool direct, bool suppress_use, int error);
   const char* get_db() { return db; }
+
+  /**
+     Returns a default db in case of over-MAX_DBS_IN_QUERY_MTS actual db:s
+  */
   virtual List<char>* mts_get_dbs()
   {
-    DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs < MAX_DBS_IN_QUERY_MTS + 1);
+    DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs <= MAX_DBS_IN_QUERY_MTS + 1);
     List<char> *res= new List<char>;
-    for (uchar i= 0; i < mts_updated_dbs; i++)
-      res->push_back(mts_updated_db_names[i]);
+    if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
+      res->push_back((char*) get_db());
+    else
+      for (uchar i= 0; i < mts_updated_dbs; i++)
+        res->push_back(mts_updated_db_names[i]);
     return res;
   }
+
+  virtual uchar mts_number_dbs() { return mts_updated_dbs; }
+
+  virtual uchar mts_number_of_updated_dbs() { return mts_updated_dbs; }
+
 #ifdef HAVE_REPLICATION
   void pack_info(Protocol* protocol);
 #endif /* HAVE_REPLICATION */

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2011-01-11 23:01:02 +0000
+++ b/sql/mysqld.cc	2011-02-12 08:32:05 +0000
@@ -468,7 +468,6 @@ ulonglong slave_type_conversions_options
 ulong opt_mts_slave_parallel_workers;
 ulong opt_mts_slave_worker_queue_len_max;
 my_bool opt_mts_slave_local_timestamp;
-my_bool opt_mts_slave_run_query_in_parallel;
 ulong opt_mts_partition_hash_soft_max;
 ulonglong opt_mts_pending_jobs_size_max;
 ulong opt_mts_coordinator_basic_nap;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-12-27 18:54:41 +0000
+++ b/sql/mysqld.h	2011-02-12 08:32:05 +0000
@@ -182,7 +182,6 @@ extern uint  slave_net_timeout;
 extern ulong opt_mts_slave_parallel_workers;
 extern ulong opt_mts_slave_worker_queue_len_max;
 extern my_bool opt_mts_slave_local_timestamp;
-extern my_bool opt_mts_slave_run_query_in_parallel;
 extern ulong opt_mts_partition_hash_soft_max;
 extern ulonglong opt_mts_pending_jobs_size_max;
 extern ulong opt_mts_coordinator_basic_nap;

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-02-03 09:28:27 +0000
+++ b/sql/rpl_rli.cc	2011-02-12 08:32:05 +0000
@@ -73,7 +73,7 @@ Relay_log_info::Relay_log_info(bool is_s
    this_worker(NULL), slave_parallel_workers(0),
    recovery_parallel_workers(0),
    checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0),
-   mts_recovery_index(0), curr_event_is_not_in_group(0),
+   mts_recovery_index(0),
    sql_delay(0), sql_delay_end(0), m_flags(0)
 {
   DBUG_ENTER("Relay_log_info::Relay_log_info");

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-01-20 13:39:00 +0000
+++ b/sql/rpl_rli.h	2011-02-12 08:32:05 +0000
@@ -448,7 +448,6 @@ public:
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
-  bool run_query_in_parallel;   // Query's default db not the actual db as part
   bool curr_group_isolated;     // Trans is exec:d by Worker but in exclusive env
   volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
   volatile long mts_wqs_overrun;   // W to incr and decr
@@ -459,7 +458,8 @@ public:
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
  
-  bool curr_group_is_parallel; // a mark for Coord to indicate on T-event of the curr group at delete
+  bool curr_group_is_parallel; // an event to process by Coordinator
+  bool curr_group_split;       // an event split the current group forcing C to exec it
   ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
   ulong slave_parallel_workers;     // the one slave session time number of workers
   ulong recovery_parallel_workers; // number of workers while recovering.
@@ -475,7 +475,6 @@ public:
   MY_BITMAP recovery_groups;  // bitmap used during recovery.
   ulong mts_recovery_group_cnt; // number of groups to execute at recovery
   ulong mts_recovery_index;     // running index of recoverable groups
-  bool curr_event_is_not_in_group; // a special case of group split apart by FD
   /*
     temporary tables are held by Coordinator though are created and dropped
     if explicilty by Workers. The following lock has to be taken by either party

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-02-03 09:28:27 +0000
+++ b/sql/rpl_slave.cc	2011-02-12 08:32:05 +0000
@@ -2847,17 +2847,8 @@ int apply_event_and_update_pos(Log_event
       See sql/rpl_rli.h for further details.
     */
     int error= 0;
-    if (skip_event || 
-        (!rli->is_parallel_exec() ||
-         (!rli->curr_group_is_parallel)))
-    {
-      DBUG_ASSERT(skip_event || !rli->is_parallel_exec() ||
-                  (!rli->curr_group_is_parallel ||
-                   rli->curr_event_is_not_in_group) ||
-                  (ev->only_sequential_exec(rli->run_query_in_parallel,
-                                            (rli->curr_group_seen_begin ||
-                                             rli->last_assigned_worker != NULL))
-                   && !rli->curr_group_seen_begin));
+    if (skip_event || !rli->is_parallel_exec() || !rli->curr_group_is_parallel)
+    {
 #ifndef DBUG_OFF
       /*
         This only prints information to the debug trace.
@@ -3056,40 +3047,37 @@ static int exec_relay_log_event(THD* thd
     */
     // if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
     {
-      if ((!rli->is_parallel_exec() ||
-           !rli->curr_group_is_parallel || rli->curr_event_is_not_in_group)
-          && ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+      if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
       {
-        DBUG_ASSERT(!rli->is_parallel_exec() 
-                    ||
-                    (ev->only_sequential_exec(rli->run_query_in_parallel,
-                                              // rli->curr_group_is_parallel
-                                              (rli->curr_group_seen_begin ||
-                                               rli->last_assigned_worker != NULL))
-                     && (!rli->curr_group_seen_begin ||
-                         ev->parallel_exec_by_coordinator(::server_id)))
-                    || (ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT));
-      /* MTS:  Observation/todo.
-
-         ROWS_QUERY_LOG_EVENT could be supported easier if
-         destructing part of handle_rows_query_log_event would be merged
-         with rli->cleanup_context() and the rest move into 
-         ROWS...::do_apply_event
-      */
-        
-        if (!rli->is_parallel_exec())
-          if (thd->variables.binlog_rows_query_log_events)
-            handle_rows_query_log_event(ev, rli);
+        DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
+                    ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
 
-        if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+        if (rli->curr_group_split) // an artifical FD requires some handling
+        {
+          rli->curr_group_is_parallel= TRUE;
+          rli->curr_group_split= FALSE;
+        }
+        if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
         {
-          DBUG_PRINT("info", ("Deleting the event after it has been executed"));
-          delete ev;
-          ev= NULL;
+          /* MTS/ TODO.
+
+             ROWS_QUERY_LOG_EVENT could be supported easier if
+             destructing part of handle_rows_query_log_event would be merged
+             with rli->cleanup_context() and the rest move into 
+             ROWS...::do_apply_event
+          */
+          if (!rli->is_parallel_exec())
+            if (thd->variables.binlog_rows_query_log_events)
+              handle_rows_query_log_event(ev, rli);
+          
+          if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+          {
+            DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+            delete ev;
+            ev= NULL;
+          }
         }
       }
-      if (rli->curr_event_is_not_in_group)
-        rli->curr_event_is_not_in_group= FALSE;
     }
 
     /*
@@ -3953,7 +3941,6 @@ bool mts_recovery_groups(Relay_log_info 
         DBUG_ASSERT(ev->is_valid());
         DBUG_ASSERT(rli->mts_recovery_group_cnt < rli->checkpoint_group);
 
-        // TODO: relax condition to allow --mts_exp_run_query_in_parallel= 1
         if (ev->starts_group())
           curr_group_seen_begin= TRUE;
         else
@@ -4351,12 +4338,11 @@ int slave_start_workers(Relay_log_info *
   rli->mts_coordinator_basic_nap= ::opt_mts_coordinator_basic_nap;
   rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
   rli->mts_total_groups= 0;
-  rli->curr_group_seen_begin= FALSE; // initial presumtion, will change
-  rli->curr_group_is_parallel= FALSE; // initial presumtion, will change
+  rli->curr_group_seen_begin= FALSE;
+  rli->curr_group_is_parallel= FALSE;
   rli->curr_group_isolated= FALSE;
-  rli->run_query_in_parallel= opt_mts_slave_run_query_in_parallel;
+  rli->curr_group_split= FALSE;
   rli->checkpoint_seqno= 0;
-  rli->curr_event_is_not_in_group= FALSE;
   //rli->worker_bitmap_buf= my_malloc(n/8 + 1,MYF(MY_WME));
 
   for (i= 0; i < n; i++)

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-01-20 13:39:00 +0000
+++ b/sql/sql_base.cc	2011-02-12 08:32:05 +0000
@@ -1197,26 +1197,25 @@ bool close_cached_connection_tables(THD 
 
 static void mark_temp_tables_as_free_for_reuse(THD *thd)
 {
-  TABLE *temporary_tables;
+#ifndef EMBEDDED_LIBRARY
   bool mts_slave= mts_is_coord_or_worker(thd);
-
+  TABLE *temporary_tables= mts_slave ?
+    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
   if (mts_slave)
-  {
-    temporary_tables= mts_get_coordinator_thd()->temporary_tables;
     mysql_mutex_lock(mts_get_temp_table_mutex());
-  }
-  else
-  {
-    temporary_tables= thd->temporary_tables;
-  }
+#else
+  TABLE *temporary_tables= thd->temporary_tables;
+#endif
 
   for (TABLE *table= temporary_tables; table ; table=table->next)
   {
     if ((table->query_id == thd->query_id) && ! table->open_by_handler)
       mark_tmp_table_for_reuse(table);
   }
+#ifndef EMBEDDED_LIBRARY
   if (mts_slave)
-  mysql_mutex_unlock(mts_get_temp_table_mutex());
+    mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
 }
 
 
@@ -1607,24 +1606,26 @@ bool close_temporary_tables(THD *thd)
   /* Assume thd->variables.option_bits has OPTION_QUOTE_SHOW_CREATE */
   bool was_quote_show= TRUE;
   bool error= 0;
+  TABLE **ptr_temporary_tables;
+#ifndef EMBEDDED_LIBRARY
   bool mts_slave= mts_is_coord_or_worker(thd);
-  TABLE *temporary_tables, **ptr_temporary_tables;
-
+  TABLE *temporary_tables= mts_slave ?
+    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+  
   if (mts_slave)
-  {
-    temporary_tables= mts_get_coordinator_thd()->temporary_tables;
     mysql_mutex_lock(mts_get_temp_table_mutex());
-  }
-  else
-  {
-    temporary_tables= thd->temporary_tables;
-  }
+#else
+  TABLE *temporary_tables= thd->temporary_tables;
+#endif
+
   ptr_temporary_tables= &temporary_tables;
 
   if (!temporary_tables)
   {
-    if (mts_slave)
-      mysql_mutex_unlock(mts_get_temp_table_mutex());
+#ifndef EMBEDDED_LIBRARY
+  if (mts_slave)
+    mysql_mutex_lock(mts_get_temp_table_mutex());
+#endif
     DBUG_RETURN(FALSE);
   }
 
@@ -1637,8 +1638,11 @@ bool close_temporary_tables(THD *thd)
       close_temporary(table, 1, 1);
     }
     *ptr_temporary_tables= 0;
+#ifndef EMBEDDED_LIBRARY
     if (mts_slave)
       mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
+
     DBUG_RETURN(FALSE);
   }
 
@@ -1772,9 +1776,10 @@ bool close_temporary_tables(THD *thd)
     thd->variables.option_bits&= ~OPTION_QUOTE_SHOW_CREATE; /* restore option */
   *ptr_temporary_tables= 0;
 
+#ifndef EMBEDDED_LIBRARY
   if (mts_slave)
     mysql_mutex_unlock(mts_get_temp_table_mutex());
-
+#endif
   DBUG_RETURN(error);
 }
 
@@ -2067,18 +2072,16 @@ TABLE *find_temporary_table(THD *thd,
                             const char *table_key,
                             uint table_key_length)
 {
-  TABLE *table= NULL, *temporary_tables;
+  TABLE *table= NULL;
+#ifndef EMBEDDED_LIBRARY
   bool mts_slave= mts_is_coord_or_worker(thd);
+  TABLE *temporary_tables= mts_slave ?
+    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
   if (mts_slave)
-  {
-    temporary_tables= mts_get_coordinator_thd()->temporary_tables;
-    mysql_mutex_lock(mts_get_temp_table_mutex());
-  }
-  else
-  {
-    temporary_tables= thd->temporary_tables;
-  }
-
+    mysql_mutex_lock(mts_get_temp_table_mutex());  
+#else
+  TABLE *temporary_tables= thd->temporary_tables;
+#endif
   for (table= temporary_tables; table; table= table->next)
   {
     if (table->s->table_cache_key.length == table_key_length &&
@@ -2087,10 +2090,10 @@ TABLE *find_temporary_table(THD *thd,
       break;
     }
   }
-
+#ifndef EMBEDDED_LIBRARY
   if (mts_slave)
     mysql_mutex_unlock(mts_get_temp_table_mutex());
-
+#endif
   return table;
 }
 
@@ -2129,7 +2132,9 @@ TABLE *find_temporary_table(THD *thd,
 int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
 {
   TABLE *table;
+#ifndef EMBEDDED_LIBRARY
   bool mts_slave= mts_is_coord_or_worker(thd);
+#endif
   THD *thd_temp;
 
   DBUG_ENTER("drop_temporary_table");
@@ -2155,20 +2160,24 @@ int drop_temporary_table(THD *thd, TABLE
   */
   mysql_lock_remove(thd, thd->lock, table);
 
+#ifndef EMBEDDED_LIBRARY
   if (mts_slave)
   {
     thd_temp= mts_get_coordinator_thd();
     mysql_mutex_lock(mts_get_temp_table_mutex());
   }
   else
+#endif
   {
     thd_temp= thd;
   }
 
   close_temporary_table(thd_temp, table, 1, 1);
 
+#ifndef EMBEDDED_LIBRARY
   if (mts_slave)
      mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
 
   DBUG_RETURN(0);
 }
@@ -2706,17 +2715,15 @@ bool open_table(THD *thd, TABLE_LIST *ta
   if (table_list->open_type != OT_BASE_ONLY &&
       ! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
   {
+#ifndef EMBEDDED_LIBRARY
     bool mts_slave= mts_is_coord_or_worker(thd);
-    TABLE *temporary_tables;
+    TABLE *temporary_tables= mts_slave ?
+      mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
     if (mts_slave)
-    {
-      temporary_tables= mts_get_coordinator_thd()->temporary_tables;
       mysql_mutex_lock(mts_get_temp_table_mutex());
-    }
-    else
-    {
-      temporary_tables= thd->temporary_tables;
-    }
+#else
+    TABLE *temporary_tables= thd->temporary_tables;
+#endif
 
     for (table= temporary_tables; table ; table=table->next)
     {
@@ -2738,20 +2745,26 @@ bool open_table(THD *thd, TABLE_LIST *ta
                       (ulong) table->query_id, (uint) thd->server_id,
                       (ulong) thd->variables.pseudo_thread_id));
 	  my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias);
+#ifndef EMBEDDED_LIBRARY
           if (mts_slave)
             mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
 	  DBUG_RETURN(TRUE);
 	}
 	table->query_id= thd->query_id;
 	thd->thread_specific_used= TRUE;
         DBUG_PRINT("info",("Using temporary table"));
+#ifndef EMBEDDED_LIBRARY
         if (mts_slave)
           mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
         goto reset;
       }
     }
+#ifndef EMBEDDED_LIBRARY
     if (mts_slave)
       mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
   }
 
   if (table_list->open_type == OT_TEMPORARY_ONLY ||
@@ -5944,19 +5957,16 @@ TABLE *open_table_uncached(THD *thd, con
 
   if (add_to_temporary_tables_list)
   {
+#ifndef EMBEDDED_LIBRARY
     TABLE **ptr_temporary_tables;
     bool mts_slave= mts_is_coord_or_worker(thd);
-
+    ptr_temporary_tables= mts_slave? 
+      &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
     if (mts_slave)
-    {
-      ptr_temporary_tables= &mts_get_coordinator_thd()->temporary_tables;
       mysql_mutex_lock(mts_get_temp_table_mutex());
-    }
-    else
-    {
-      ptr_temporary_tables= &thd->temporary_tables;
-    }
-
+#else
+    TABLE **ptr_temporary_tables= &thd->temporary_tables;
+#endif
     /* growing temp list at the head */
     tmp_table->next= *ptr_temporary_tables;
     if (tmp_table->next)
@@ -5965,8 +5975,10 @@ TABLE *open_table_uncached(THD *thd, con
     (*ptr_temporary_tables)->prev= 0;
     if (thd->slave_thread)
       slave_open_temp_tables++;
+#ifndef EMBEDDED_LIBRARY
     if (mts_slave)
        mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
   }
   tmp_table->pos_in_table_list= 0;
   DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2011-02-03 09:28:27 +0000
+++ b/sql/sql_class.cc	2011-02-12 08:32:05 +0000
@@ -3394,6 +3394,7 @@ void THD::reset_sub_statement_state(Sub_
     first_successful_insert_id_in_prev_stmt;
   backup->first_successful_insert_id_in_cur_stmt= 
     first_successful_insert_id_in_cur_stmt;
+  backup->binlog_updated_db_names= binlog_updated_db_names;
 
   if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
       !is_current_stmt_binlog_format_row())
@@ -3414,6 +3415,7 @@ void THD::reset_sub_statement_state(Sub_
   cuted_fields= 0;
   transaction.savepoints= 0;
   first_successful_insert_id_in_cur_stmt= 0;
+  binlog_updated_db_names= NULL;
 }
 
 
@@ -3476,6 +3478,7 @@ void THD::restore_sub_statement_state(Su
   */
   examined_row_count+= backup->examined_row_count;
   cuted_fields+=       backup->cuted_fields;
+  binlog_updated_db_names= backup->binlog_updated_db_names;
   DBUG_VOID_RETURN;
 }
 

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-02-03 09:28:27 +0000
+++ b/sql/sql_class.h	2011-02-12 08:32:05 +0000
@@ -1184,6 +1184,7 @@ public:
   bool last_insert_id_used;
   SAVEPOINT *savepoints;
   enum enum_check_fields count_cuted_fields;
+  List<char> *binlog_updated_db_names;
 };
 
 

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2011-01-11 23:01:02 +0000
+++ b/sql/sys_vars.cc	2011-02-12 08:32:05 +0000
@@ -3198,12 +3198,6 @@ static Sys_var_mybool Sys_slave_local_ti
        "time value to implicitly affected timestamp columms. Otherwise (default) "
        "it installs prescribed by the master value",
        GLOBAL_VAR(opt_mts_slave_local_timestamp), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
-static Sys_var_mybool Sys_slave_run_query_in_parallel(
-       "mts_exp_slave_run_query_in_parallel",
-       "The default not an actual database name is used as partition info "
-       "for parallel execution of Query_log_event ",
-       GLOBAL_VAR(opt_mts_slave_run_query_in_parallel), CMD_LINE(OPT_ARG),
-       DEFAULT(FALSE));
 static Sys_var_ulong Sys_mts_partition_hash_soft_max(
        "mts_partition_hash_soft_max",
        "Number of records in the mts partition hash below which "


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110212083205-a48u8h9mjqe1jejb.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3272) WL#5754Andrei Elkin12 Feb