List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:February 17 2011 7:53pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3273) WL#5754
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3273 Andrei Elkin	2011-02-17
      wl#5754 Query parallel replication.
      
      Fixed various issues incl the preliminary review comments, stored routines handling.
     @ mysql-test/suite/rpl/r/rpl_packet.result
        results updated.
     @ mysql-test/suite/rpl/t/rpl_packet.test
        making a hashing fixes in order the test to pass. 
        todo: refine logics of max_allowed_packed for master & slave.
     @ mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
        adding stored routines testing.
     @ sql/binlog.cc
        correcting memory allocation to be in thd's memroot.
     @ sql/field.cc
        adding comments to asserts.
     @ sql/handler.cc
        adding comments to asserts.
     @ sql/log_event.cc
        adding comments and correcting clearence of binlog_updated_db_names to not let
        BEGIN, COMMIT in particular to get the updated list.
     @ sql/log_event.h
        adding commits, and interfaces to helper functions.
     @ sql/rpl_rli.cc
        relocalating helper functions to rpl_slave.cc.
     @ sql/rpl_rli.h
        improving comments.
     @ sql/rpl_slave.cc
        comments explaining close_temp_tables() not to run by Workers.
        Accepting relocated functions.
     @ sql/sql_base.cc
        Correcting and simplifying logics for the temp table parallel support.
        In particular close_temporary_tables() does not need to know about thd of the caller.
     @ sql/sql_class.cc
        Correcting logics of merging the updated db:s of a child to the parent's top-level.
     @ sql/sql_class.h
        adding a necessary cleanup method.
     @ sql/sys_vars.cc
        Added a system variable (todo/fixme: may turn out to be unnecessary though).

    modified:
      mysql-test/suite/rpl/r/rpl_packet.result
      mysql-test/suite/rpl/t/rpl_packet.test
      mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
      sql/binlog.cc
      sql/field.cc
      sql/handler.cc
      sql/log_event.cc
      sql/log_event.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
      sql/sql_base.cc
      sql/sql_class.cc
      sql/sql_class.h
      sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/r/rpl_packet.result'
--- a/mysql-test/suite/rpl/r/rpl_packet.result	2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/r/rpl_packet.result	2011-02-17 19:53:30 +0000
@@ -23,12 +23,12 @@ select * from information_schema.session
 VARIABLE_NAME	VARIABLE_VALUE
 SLAVE_RUNNING	ON
 drop database DB_NAME_OF_MAX_LENGTH_AKA_NAME_LEN_64_BYTES_____________________;
-SET @@global.max_allowed_packet=4096;
-SET @@global.net_buffer_length=4096;
+SET @@global.max_allowed_packet=4096 + (floor(64 * 3 * 254 / 1024) + 1) * 1024;
+SET @@global.net_buffer_length=@@global.max_allowed_packet;
 include/stop_slave.inc
 include/start_slave.inc
 CREATE TABLE `t1` (`f1` LONGTEXT) ENGINE=MyISAM;
-INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 include/wait_for_slave_io_error.inc [errno=1153]
 Last_IO_Error = 'Got a packet bigger than 'max_allowed_packet' bytes'
 include/stop_slave_sql.inc
@@ -51,10 +51,10 @@ include/wait_for_slave_to_start.inc
 DROP TABLE t1;
 select @@global.max_allowed_packet;
 @@global.max_allowed_packet
-4096
+53248
 select @@global.net_buffer_length;
 @@global.net_buffer_length
-4096
+53248
 select @@global.max_allowed_packet;
 @@global.max_allowed_packet
 1024
@@ -62,7 +62,7 @@ select @@global.net_buffer_length;
 @@global.net_buffer_length
 1024
 CREATE TABLE t1 (a TEXT) ENGINE=MyISAM;
-INSERT INTO t1 VALUES (REPEAT('a', 2048));
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 # 1153 = ER_NET_PACKET_TOO_LARGE
 include/wait_for_slave_io_error.inc [errno=1153]
 Last_IO_Error = 'Got a packet bigger than 'max_allowed_packet' bytes'

=== modified file 'mysql-test/suite/rpl/t/rpl_packet.test'
--- a/mysql-test/suite/rpl/t/rpl_packet.test	2011-02-12 08:32:05 +0000
+++ b/mysql-test/suite/rpl/t/rpl_packet.test	2011-02-17 19:53:30 +0000
@@ -63,8 +63,20 @@ connection master;
 
 # Change the max packet size on master
 
-SET @@global.max_allowed_packet=4096;
-SET @@global.net_buffer_length=4096;
+# Todo: improve over-max_allowed_packet size events block on the slave.
+# The current size checking algorithm is not presize to allow large event
+# to slip it. Reject happens according to the guard:
+#   if (data_len > max(max_allowed_packet,
+#       opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
+# However, MAX_LOG_EVENT_HEADER is a conservative estimate so if the actual
+# header size is less the extra data let in the slave.
+
+# Adding the max size of the query log event status as
+# MAX_DBS_IN_QUERY_MTS * (1 + NAME_LEN) to make the master not fail to read
+# an event itself.
+
+SET @@global.max_allowed_packet=4096 + (floor(64 * 3 * 254 / 1024) + 1) * 1024;
+SET @@global.net_buffer_length=@@global.max_allowed_packet;
 
 # Restart slave for new setting to take effect
 connection slave;
@@ -82,13 +94,8 @@ sync_slave_with_master;
 
 connection master;
 
-INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 
---disable_query_log
---disable_result_log
-select sleep(300);
---enable_result_log
---enable_query_log
 
 #
 # Bug#42914: The slave I/O thread must stop after trying to read the above
@@ -192,7 +199,7 @@ CREATE TABLE t1 (a TEXT) ENGINE=MyISAM;
 
 # Create big row event.
 --connection master
-INSERT INTO t1 VALUES (REPEAT('a', 2048));
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
 
 # Slave IO thread should stop with error when trying to read the big event.
 --connection slave

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	2011-02-03 09:28:27 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	2011-02-17 19:53:30 +0000
@@ -77,6 +77,116 @@ while ($k)
 sync_slave_with_master; 
 
 #
+# 2. Case of invoked routines
+#
+
+# A. Functions
+
+# create functions & run load
+let $n= $dbs;
+# n'th db func is defined through n-1'th except n == 1
+let $n1= $n;
+dec $n1;
+
+connection master;
+
+--disable_query_log
+
+while ($n1)
+{
+  let $m= `select floor(rand()*$tables) + 1`;
+
+  delimiter |;
+
+  eval create function `d$n`.`func` (a int) returns int
+     begin
+     insert into `d$n`.`t$m` values (`d$n1`.`func`(1));
+     return 1;
+     end|
+
+  delimiter ;|
+
+  dec $n;
+  dec $n1;
+}
+
+delimiter |;
+
+eval create function `d1`.`func` (a int) returns int
+     begin
+     insert into `d1`.`t$m` values (0);
+     return 1;
+     end|
+
+delimiter ;|
+
+
+# invoke...
+
+let $k= $queries;
+
+while ($k)
+{
+   let $n= `select floor(rand()*$dbs) + 1`;
+   let $m= `select floor(rand()*$tables) + 1`;
+   let $n1= $n;
+   dec $n1;
+   if ($n1)
+   {
+       eval insert into d$n.t$m values (`d$n1`.`func`(1));
+   }
+   dec $k;
+}
+
+--enable_query_log
+
+sync_slave_with_master;
+
+# B. Triggers
+
+connection master;
+
+# create triggers & run load
+let $n= $dbs;
+# n'th db tables won't have any trigger to avoid circular dependency
+let $n1= $n;
+dec $n1;
+
+--disable_query_log
+while ($n1)
+{
+  let $m= $tables;
+
+  while ($m)
+  {
+     eval create trigger `d$n1`.`trg_t$m` before insert on `d$n1`.`t$m` for each row insert into `d$n`.`t$m` values(1);
+     dec $m;
+  }
+  dec $n;
+  dec $n1;
+}
+--enable_query_log
+
+# invoke...
+
+let $k= $queries;
+
+--disable_query_log
+--disable_warnings
+while ($k)
+{
+   let $n= `select floor(rand()*$dbs + 1)`;
+   let $m= `select floor(rand()*$tables + 1)`;
+   eval insert into d$n.t$n values (2);
+   dec $k;
+}
+--enable_warnings
+--enable_query_log
+
+
+sync_slave_with_master;
+
+#
 # Consistency check
 #
 
@@ -95,11 +205,6 @@ while($n)
 
 
 #
-# 2. Case of invoked routines (TODO: add parallel support to CREATE trig,sf().
-#
-
-
-#
 # Clean-up
 #
 

=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc	2011-02-12 08:32:05 +0000
+++ b/sql/binlog.cc	2011-02-17 19:53:30 +0000
@@ -4752,7 +4752,7 @@ int THD::decide_logging_format(TABLE_LIS
           }
           if (binlog_updated_db_names->elements <  MAX_DBS_IN_QUERY_MTS + 1)
           {
-            char *after_db= strdup(table->db);
+            char *after_db= strdup_root(mem_root, table->db);
             if (binlog_updated_db_names->elements != 0)
             {
               List_iterator<char> it(*get_binlog_updated_db_names());

=== modified file 'sql/field.cc'
--- a/sql/field.cc	2011-01-20 13:39:00 +0000
+++ b/sql/field.cc	2011-02-17 19:53:30 +0000
@@ -3736,7 +3736,13 @@ longlong Field_long::val_int(void)
   ASSERT_COLUMN_MARKED_FOR_READ;
   int32 j;
   /* See the comment in Field_long::store(long long) */
+  /* 
+     About  (current_thd)->slave_thread alternative,
+     MTS coordinator open/closes a temp table while the rest of operation
+     is done by Workers.
+  */
   DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+
 #ifdef WORDS_BIGENDIAN
   if (table->s->db_low_byte_first)
     j=sint4korr(ptr);
@@ -6308,7 +6314,7 @@ int Field_string::store(const char *from
   const char *cannot_convert_error_pos;
   const char *from_end_pos;
 
-  /* See the comment for Field_long::store(long long) */
+  /* See the comment for Field_long::store(long long) and Field_long::val_int */
   DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
 
   copy_length= well_formed_copy_nchars(field_charset,
@@ -6458,7 +6464,7 @@ String *Field_string::val_str(String *va
 			      String *val_ptr)
 {
   ASSERT_COLUMN_MARKED_FOR_READ;
-  /* See the comment for Field_long::store(long long) */
+  /* See the comment for Field_long::store(long long) and Field_long::val_int */
   DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
   uint length;
   if (table->in_use->variables.sql_mode &

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2011-01-20 13:39:00 +0000
+++ b/sql/handler.cc	2011-02-17 19:53:30 +0000
@@ -2127,8 +2127,13 @@ void **handler::ha_data(THD *thd) const
 
 THD *handler::ha_thd(void) const
 {
+  /* 
+     About current_thd->slave_thread alternative,
+     MTS coordinator open/closes a temp table while the rest of operation
+     is done by Workers.
+  */
   DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
-              current_thd->slave_thread /* mts worker */);
+              current_thd->slave_thread);
   return (table && table->in_use) ? table->in_use : current_thd;
 }
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-02-12 08:32:05 +0000
+++ b/sql/log_event.cc	2011-02-17 19:53:30 +0000
@@ -2507,7 +2507,10 @@ Slave_worker *Log_event::get_slave_worke
         DBUG_ASSERT(g.group_relay_log_name == NULL);
       }
     } while (it++ && mts_number_dbs() != MAX_DBS_IN_QUERY_MTS + 1);
-    // releasing the Coord's mem-root from the updated dbs
+
+    // Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
+    // point because the root is no longer needed along remained part of Coordinator's
+    // execution flow.
     free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
   }
   else // r
@@ -3381,12 +3384,16 @@ bool Query_log_event::write(IO_CACHE* fi
     {
       List_iterator_fast<char> it(*thd->get_binlog_updated_db_names());
       char *db_name;
+
+      DBUG_ASSERT(dbs <= MAX_DBS_IN_QUERY_MTS);
+
       while ((db_name= it++))
       {
         strcpy((char*) start, db_name);
         start += strlen(db_name) + 1;
       }
     }
+    thd->clear_binlog_updated_db_names();
   }
 
   /*

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-02-12 08:32:05 +0000
+++ b/sql/log_event.h	2011-02-17 19:53:30 +0000
@@ -260,11 +260,11 @@ struct sql_ex_info
 #define IGNORABLE_HEADER_LEN   0
 
 /**
-   MTS parameter. The maximum number of databases that a query modifies
-   and be logged in the binarry log with a special status variable containing
-   the database names to facilitate the parallel applying of the Query-event.
+   The maximum value for @@global.mts_max_updated_dbs server variable.
+   When the actual number of db:s exceeds  @@global.mts_max_updated_dbs, the max + 1
+   is put into the mts_updated_dbs status.
 */
-#define MAX_DBS_IN_QUERY_MTS   16
+#define MAX_DBS_IN_QUERY_MTS 16
 
 /* 
   Max number of possible extra bytes in a replication event compared to a
@@ -354,6 +354,10 @@ struct sql_ex_info
 
 #define Q_INVOKER 11
 
+/*
+  Number of the updated db:s and their names to be propagated to the slave
+  in order to facilitate the parallel applying of the Query events
+*/
 #define Q_UPDATED_DB_NAMES 12
 
 /* Intvar event post-header */
@@ -1082,6 +1086,10 @@ public:
     return thd ? thd->db : 0;
   }
 
+  /*
+    The method returns a list of updated by the event databases.
+    Other than in the case of Query-log-event the list is just one item.
+  */
   virtual List<char>* mts_get_dbs()
   {
     List<char> *res= new List<char>;
@@ -1888,7 +1896,7 @@ public:
   */
   virtual List<char>* mts_get_dbs()
   {
-    DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs <= MAX_DBS_IN_QUERY_MTS + 1);
+    //compile_time_assert(mts_updated_dbs <= MAX_DBS_IN_QUERY_MTS + 1);
     List<char> *res= new List<char>;
     if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
       res->push_back((char*) get_db());
@@ -4356,6 +4364,12 @@ bool event_checksum_test(uchar *buf, ulo
 uint8 get_checksum_alg(const char* buf, ulong len);
 extern TYPELIB binlog_checksum_typelib;
 
+// MTS temp table support needed by sql_base.cc
+THD* mts_get_coordinator_thd();
+THD* mts_get_worker_thd();
+mysql_mutex_t* mts_get_temp_table_mutex();
+bool mts_is_worker(THD *thd);
+
 /**
   @} (end of group Replication)
 */

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2011-02-12 08:32:05 +0000
+++ b/sql/mysqld.cc	2011-02-17 19:53:30 +0000
@@ -472,6 +472,7 @@ ulong opt_mts_partition_hash_soft_max;
 ulonglong opt_mts_pending_jobs_size_max;
 ulong opt_mts_coordinator_basic_nap;
 ulong opt_mts_worker_underrun_level;
+ulong opt_mts_master_updated_dbs_max;
 ulong thread_cache_size=0;
 ulong binlog_cache_size=0;
 ulonglong  max_binlog_cache_size=0;

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2011-02-12 08:32:05 +0000
+++ b/sql/mysqld.h	2011-02-17 19:53:30 +0000
@@ -186,6 +186,7 @@ extern ulong opt_mts_partition_hash_soft
 extern ulonglong opt_mts_pending_jobs_size_max;
 extern ulong opt_mts_coordinator_basic_nap;
 extern ulong opt_mts_worker_underrun_level;
+extern ulong opt_mts_master_updated_dbs_max;
 
 extern uint max_user_connections;
 extern ulong what_to_log,flush_time;

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-02-12 08:32:05 +0000
+++ b/sql/rpl_rli.cc	2011-02-17 19:53:30 +0000
@@ -202,38 +202,6 @@ void Relay_log_info::reset_notified_chec
   checkpoint_seqno= 0;
 }
 
-mysql_mutex_t* mts_get_temp_table_mutex()
-{
-  return &active_mi->rli->mts_temp_tables_lock;
-}
-
-THD* mts_get_coordinator_thd()
-{
-  Slave_worker *w;
-  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
-    NULL : !(w= active_mi->rli->get_current_worker()) ?
-    NULL : w->c_rli->info_thd;
-}
-
-THD* mts_get_worker_thd()
-{
-  Slave_worker *w;
-  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
-    NULL : !(w= active_mi->rli->get_current_worker()) ?
-    NULL : w->w_rli->info_thd;
-}
-
-// TODO: remove the query in parallel option, rename mts_is_worker and
-//       s/ SYSTEM_THREAD_SLAVE_SQL/SYSTEM_THREAD_SLAVE_WORKER/
-bool mts_is_coord_or_worker(THD *thd)
-{
-  return
-    thd->slave_thread &&
-    thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
-    (mts_get_worker_thd() != NULL);
-}
-
-
 /**
    The method can be run both by C having the Main (coord) rli context and
    by W having both the main and the Local (worker) rli context.

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-02-12 08:32:05 +0000
+++ b/sql/rpl_rli.h	2011-02-17 19:53:30 +0000
@@ -477,7 +477,7 @@ public:
   ulong mts_recovery_index;     // running index of recoverable groups
   /*
     temporary tables are held by Coordinator though are created and dropped
-    if explicilty by Workers. The following lock has to be taken by either party
+    explicilty by Workers. The following lock has to be taken by either party
     in order to conduct any operation in the temp tables placeholder, incl.
     find, drop, create, open.
   */
@@ -732,4 +732,5 @@ private:
 };
 
 bool mysql_show_relaylog_events(THD* thd);
+
 #endif /* RPL_RLI_H */

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-02-12 08:32:05 +0000
+++ b/sql/rpl_slave.cc	2011-02-17 19:53:30 +0000
@@ -3792,7 +3792,9 @@ err:
   {
     mysql_mutex_lock(&LOCK_thread_count);
     THD_CHECK_SENTRY(thd);
-    thd->system_thread= NON_SYSTEM_THREAD; // tt closing work/around
+    // to avoid close_temporary_tables() closing temp tables as those
+    // are Coordinator's burden.
+    thd->system_thread= NON_SYSTEM_THREAD;
     delete thd;
     mysql_mutex_unlock(&LOCK_thread_count);
   }
@@ -7101,6 +7103,38 @@ err:
   DBUG_RETURN(ret);
 }
 
+
+/* MTS temp table support */
+
+mysql_mutex_t* mts_get_temp_table_mutex()
+{
+  return &active_mi->rli->mts_temp_tables_lock;
+}
+
+THD* mts_get_coordinator_thd()
+{
+  Slave_worker *w;
+  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+    NULL : !(w= active_mi->rli->get_current_worker()) ?
+    NULL : w->c_rli->info_thd;
+}
+
+THD* mts_get_worker_thd()
+{
+  Slave_worker *w;
+  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+    NULL : !(w= active_mi->rli->get_current_worker()) ?
+    NULL : w->w_rli->info_thd;
+}
+
+bool mts_is_worker(THD *thd)
+{
+  return
+    thd->slave_thread &&
+    thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
+    (mts_get_worker_thd() != NULL);
+}
+
 /**
   @} (end of group Replication)
 */

=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc	2011-02-12 08:32:05 +0000
+++ b/sql/sql_base.cc	2011-02-17 19:53:30 +0000
@@ -58,12 +58,6 @@
 #include <io.h>
 #endif
 
-// MTS temp table support
-extern THD* mts_get_coordinator_thd();
-extern THD* mts_get_worker_thd();
-extern mysql_mutex_t* mts_get_temp_table_mutex();
-extern bool mts_is_coord_or_worker(THD *thd);
-
 bool
 No_such_table_error_handler::handle_condition(THD *,
                                               uint sql_errno,
@@ -1198,16 +1192,16 @@ bool close_cached_connection_tables(THD 
 static void mark_temp_tables_as_free_for_reuse(THD *thd)
 {
 #ifndef EMBEDDED_LIBRARY
-  bool mts_slave= mts_is_coord_or_worker(thd);
-  TABLE *temporary_tables= mts_slave ?
-    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+  bool mts_slave= mts_is_worker(thd);
+  TABLE **ptr_temporary_tables= mts_slave ?
+    &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
   if (mts_slave)
     mysql_mutex_lock(mts_get_temp_table_mutex());
 #else
-  TABLE *temporary_tables= thd->temporary_tables;
+  TABLE **ptr_temporary_tables= &thd->temporary_tables;
 #endif
 
-  for (TABLE *table= temporary_tables; table ; table=table->next)
+  for (TABLE *table= *ptr_temporary_tables; table ; table=table->next)
   {
     if ((table->query_id == thd->query_id) && ! table->open_by_handler)
       mark_tmp_table_for_reuse(table);
@@ -1606,43 +1600,21 @@ bool close_temporary_tables(THD *thd)
   /* Assume thd->variables.option_bits has OPTION_QUOTE_SHOW_CREATE */
   bool was_quote_show= TRUE;
   bool error= 0;
-  TABLE **ptr_temporary_tables;
-#ifndef EMBEDDED_LIBRARY
-  bool mts_slave= mts_is_coord_or_worker(thd);
-  TABLE *temporary_tables= mts_slave ?
-    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
-  
-  if (mts_slave)
-    mysql_mutex_lock(mts_get_temp_table_mutex());
-#else
-  TABLE *temporary_tables= thd->temporary_tables;
-#endif
 
-  ptr_temporary_tables= &temporary_tables;
+  /* TODO mts: assert if Woker then thd->temporary_tables == NULL */
 
-  if (!temporary_tables)
-  {
-#ifndef EMBEDDED_LIBRARY
-  if (mts_slave)
-    mysql_mutex_lock(mts_get_temp_table_mutex());
-#endif
+  if (!thd->temporary_tables)
     DBUG_RETURN(FALSE);
-  }
 
   if (!mysql_bin_log.is_open())
   {
     TABLE *tmp_next;
-    for (table= temporary_tables; table; table= tmp_next)
+    for (table= thd->temporary_tables; table; table= tmp_next)
     {
       tmp_next= table->next;
       close_temporary(table, 1, 1);
     }
-    *ptr_temporary_tables= 0;
-#ifndef EMBEDDED_LIBRARY
-    if (mts_slave)
-      mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
-
+    thd->temporary_tables= 0;
     DBUG_RETURN(FALSE);
   }
 
@@ -1660,7 +1632,7 @@ bool close_temporary_tables(THD *thd)
     of sublists of equal pseudo_thread_id
   */
 
-  for (prev_table= temporary_tables, table= prev_table->next;
+  for (prev_table= thd->temporary_tables, table= prev_table->next;
        table;
        prev_table= table, table= table->next)
   {
@@ -1669,7 +1641,7 @@ bool close_temporary_tables(THD *thd)
     {
       if (!found_user_tables)
         found_user_tables= true;
-      for (prev_sorted= NULL, sorted= temporary_tables; sorted != table;
+      for (prev_sorted= NULL, sorted= thd->temporary_tables; sorted != table;
            prev_sorted= sorted, sorted= sorted->next)
       {
         if (!is_user_table(sorted) ||
@@ -1684,7 +1656,7 @@ bool close_temporary_tables(THD *thd)
           }
           else
           {
-            *ptr_temporary_tables= table;
+            thd->temporary_tables= table;
           }
           table= prev_table;
           break;
@@ -1701,7 +1673,7 @@ bool close_temporary_tables(THD *thd)
   }
 
   /* scan sorted tmps to generate sequence of DROP */
-  for (table= temporary_tables; table; table= next)
+  for (table= thd->temporary_tables; table; table= next)
   {
     if (is_user_table(table))
     {
@@ -1774,12 +1746,8 @@ bool close_temporary_tables(THD *thd)
   }
   if (!was_quote_show)
     thd->variables.option_bits&= ~OPTION_QUOTE_SHOW_CREATE; /* restore option */
-  *ptr_temporary_tables= 0;
+  thd->temporary_tables=0;
 
-#ifndef EMBEDDED_LIBRARY
-  if (mts_slave)
-    mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
   DBUG_RETURN(error);
 }
 
@@ -2074,15 +2042,15 @@ TABLE *find_temporary_table(THD *thd,
 {
   TABLE *table= NULL;
 #ifndef EMBEDDED_LIBRARY
-  bool mts_slave= mts_is_coord_or_worker(thd);
-  TABLE *temporary_tables= mts_slave ?
-    mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+  bool mts_slave= mts_is_worker(thd);
+  TABLE **ptr_temporary_tables= mts_slave ?
+    &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
   if (mts_slave)
     mysql_mutex_lock(mts_get_temp_table_mutex());  
 #else
-  TABLE *temporary_tables= thd->temporary_tables;
+  TABLE **ptr_temporary_tables= &thd->temporary_tables;
 #endif
-  for (table= temporary_tables; table; table= table->next)
+  for (table= *ptr_temporary_tables; table; table= table->next)
   {
     if (table->s->table_cache_key.length == table_key_length &&
         !memcmp(table->s->table_cache_key.str, table_key, table_key_length))
@@ -2133,7 +2101,7 @@ int drop_temporary_table(THD *thd, TABLE
 {
   TABLE *table;
 #ifndef EMBEDDED_LIBRARY
-  bool mts_slave= mts_is_coord_or_worker(thd);
+  bool mts_slave= mts_is_worker(thd);
 #endif
   THD *thd_temp;
 
@@ -2716,16 +2684,16 @@ bool open_table(THD *thd, TABLE_LIST *ta
       ! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
   {
 #ifndef EMBEDDED_LIBRARY
-    bool mts_slave= mts_is_coord_or_worker(thd);
-    TABLE *temporary_tables= mts_slave ?
-      mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+    bool mts_slave= mts_is_worker(thd);
+    TABLE **ptr_temporary_tables= mts_slave ?
+      &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
     if (mts_slave)
       mysql_mutex_lock(mts_get_temp_table_mutex());
 #else
-    TABLE *temporary_tables= thd->temporary_tables;
+    TABLE **ptr_temporary_tables= &thd->temporary_tables;
 #endif
 
-    for (table= temporary_tables; table ; table=table->next)
+    for (table= *ptr_temporary_tables; table ; table=table->next)
     {
       if (table->s->table_cache_key.length == key_length +
           TMP_TABLE_KEY_EXTRA &&
@@ -5959,7 +5927,7 @@ TABLE *open_table_uncached(THD *thd, con
   {
 #ifndef EMBEDDED_LIBRARY
     TABLE **ptr_temporary_tables;
-    bool mts_slave= mts_is_coord_or_worker(thd);
+    bool mts_slave= mts_is_worker(thd);
     ptr_temporary_tables= mts_slave? 
       &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
     if (mts_slave)

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2011-02-12 08:32:05 +0000
+++ b/sql/sql_class.cc	2011-02-17 19:53:30 +0000
@@ -982,6 +982,7 @@ void THD::init_for_queries()
   transaction.xid_state.in_thd=1;
 }
 
+
 /*
   Do what's needed when one invokes change user
 
@@ -1069,6 +1070,7 @@ void THD::cleanup(void)
     mysql_mutex_unlock(&LOCK_user_locks);
     ull= NULL;
   }
+
   cleanup_done=1;
   DBUG_VOID_RETURN;
 }
@@ -1396,6 +1398,7 @@ void THD::cleanup_after_query()
     stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
     auto_inc_intervals_in_cur_stmt_for_binlog.empty();
     rand_used= 0;
+    binlog_updated_db_names= NULL;
   }
   if (first_successful_insert_id_in_cur_stmt > 0)
   {
@@ -1413,7 +1416,6 @@ void THD::cleanup_after_query()
   /* reset table map for multi-table update */
   table_map_for_update= 0;
   m_binlog_invoker= FALSE;
-  binlog_updated_db_names= NULL;
 }
 
 
@@ -3394,7 +3396,7 @@ void THD::reset_sub_statement_state(Sub_
     first_successful_insert_id_in_prev_stmt;
   backup->first_successful_insert_id_in_cur_stmt= 
     first_successful_insert_id_in_cur_stmt;
-  backup->binlog_updated_db_names= binlog_updated_db_names;
+  //backup->binlog_updated_db_names= binlog_updated_db_names;
 
   if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
       !is_current_stmt_binlog_format_row())
@@ -3415,7 +3417,7 @@ void THD::reset_sub_statement_state(Sub_
   cuted_fields= 0;
   transaction.savepoints= 0;
   first_successful_insert_id_in_cur_stmt= 0;
-  binlog_updated_db_names= NULL;
+  //binlog_updated_db_names= NULL;
 }
 
 
@@ -3478,7 +3480,8 @@ void THD::restore_sub_statement_state(Su
   */
   examined_row_count+= backup->examined_row_count;
   cuted_fields+=       backup->cuted_fields;
-  binlog_updated_db_names= backup->binlog_updated_db_names;
+  //if (binlog_updated_db_names)
+  //  binlog_updated_db_names->concat(backup->binlog_updated_db_names);
   DBUG_VOID_RETURN;
 }
 

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-02-12 08:32:05 +0000
+++ b/sql/sql_class.h	2011-02-17 19:53:30 +0000
@@ -1739,6 +1739,8 @@ public:
   List<char> * get_binlog_updated_db_names() {
     return binlog_updated_db_names;
   }
+  void clear_binlog_updated_db_names() { binlog_updated_db_names= NULL; }
+
 #endif /* MYSQL_CLIENT */
 
 public:

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2011-02-12 08:32:05 +0000
+++ b/sql/sys_vars.cc	2011-02-17 19:53:30 +0000
@@ -3198,6 +3198,13 @@ static Sys_var_mybool Sys_slave_local_ti
        "time value to implicitly affected timestamp columms. Otherwise (default) "
        "it installs prescribed by the master value",
        GLOBAL_VAR(opt_mts_slave_local_timestamp), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+static Sys_var_ulong Sys_master_updated_dbs_max(
+       "mts_master_updated_dbs_max",
+       "The maximum number of databases that a query log event can contain in its header "
+       "in order to faciliate the parallel applying on the slave.",
+       GLOBAL_VAR(opt_mts_master_updated_dbs_max), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(0, MAX_DBS_IN_QUERY_MTS),
+       DEFAULT(16), BLOCK_SIZE(1));
 static Sys_var_ulong Sys_mts_partition_hash_soft_max(
        "mts_partition_hash_soft_max",
        "Number of records in the mts partition hash below which "


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110217195330-b8fjghldzq2lgri2.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3273) WL#5754Andrei Elkin17 Feb