List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:February 3 2011 9:28am
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3271) WL#5754
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3271 Andrei Elkin	2011-02-03
      wl#5754 Parallel Query replication
      
      This patch implements a core of the design that is the DML queries.
      Queries can contain arbitrary features including temp tables.
      
      Todo:
      
      1. Extend the framework to capture the remained DDL:s
      2. fix the server startup opt (--mts-exp-run-query-in-parallel).
      3. Refine only_sequential() to return TRUE in case the list of the updated db:s
         is bigger than the max, *and* in case the query is from the Old master.
     @ mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
        new result file is added.
     @ mysql-test/suite/rpl/r/rpl_parallel_query.result
        new result file is added.
     @ mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
        multi-db DML query test is added.
        todo: add triggers, sf(), SP.
     @ mysql-test/suite/rpl/t/rpl_parallel_query.test
        query with temporary tables testing.
     @ sql/binlog.cc
        gathering to be updated db in the DML case. Over-MAX_DBS_IN_QUERY_MTS-sized list won't be shipped
        to the slave.
     @ sql/log_event.cc
        master and slave (Coord's and Worker's) handling of updated db:s.
        The Coordinator's distribution changed to involve a loop per db, similaraly for the Worker
        at applying.
     @ sql/log_event.h
        Hardcoding the max updated db:s.
        Static allocation for updated db:s in Query log event is motivated
        by the fact of the event is shared by both C and W and the standard malloc/free
        can't be a reasonble choice either.
        Added a new status and changed dependent info.
        Added a new method to return the *list* of updated db:s which in all but Query case
        is just a wrapper over get_db().
     @ sql/rpl_rli.cc
        fixing an error.
     @ sql/rpl_slave.cc
        Added a work-around/cleanup needed by the standard temp table closing algorithm.
     @ sql/sql_class.cc
        master side gathering updated db:s new memeber initializations.
     @ sql/sql_class.h
        master side gathering updated db:s list and accessor members.

    added:
      mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
      mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
      mysql-test/suite/rpl/t/rpl_parallel_query.test
    modified:
      mysql-test/suite/rpl/r/rpl_parallel_query.result
      sql/binlog.cc
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_slave.cc
      sql/sql_class.cc
      sql/sql_class.h
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_multi_db.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result	2011-02-03 09:28:27 +0000
@@ -0,0 +1,141 @@
+include/master-slave.inc
+[connection master]
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
+include/start_slave.inc
+Warnings:
+Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+create database d4;
+create table d4.t8 (a int);
+select round(rand()*8) into @var;
+insert into d4.t8 values (@var);
+create table d4.t7 (a int);
+select round(rand()*8) into @var;
+insert into d4.t7 values (@var);
+create table d4.t6 (a int);
+select round(rand()*8) into @var;
+insert into d4.t6 values (@var);
+create table d4.t5 (a int);
+select round(rand()*8) into @var;
+insert into d4.t5 values (@var);
+create table d4.t4 (a int);
+select round(rand()*8) into @var;
+insert into d4.t4 values (@var);
+create table d4.t3 (a int);
+select round(rand()*8) into @var;
+insert into d4.t3 values (@var);
+create table d4.t2 (a int);
+select round(rand()*8) into @var;
+insert into d4.t2 values (@var);
+create table d4.t1 (a int);
+select round(rand()*8) into @var;
+insert into d4.t1 values (@var);
+create database d3;
+create table d3.t8 (a int);
+select round(rand()*8) into @var;
+insert into d3.t8 values (@var);
+create table d3.t7 (a int);
+select round(rand()*8) into @var;
+insert into d3.t7 values (@var);
+create table d3.t6 (a int);
+select round(rand()*8) into @var;
+insert into d3.t6 values (@var);
+create table d3.t5 (a int);
+select round(rand()*8) into @var;
+insert into d3.t5 values (@var);
+create table d3.t4 (a int);
+select round(rand()*8) into @var;
+insert into d3.t4 values (@var);
+create table d3.t3 (a int);
+select round(rand()*8) into @var;
+insert into d3.t3 values (@var);
+create table d3.t2 (a int);
+select round(rand()*8) into @var;
+insert into d3.t2 values (@var);
+create table d3.t1 (a int);
+select round(rand()*8) into @var;
+insert into d3.t1 values (@var);
+create database d2;
+create table d2.t8 (a int);
+select round(rand()*8) into @var;
+insert into d2.t8 values (@var);
+create table d2.t7 (a int);
+select round(rand()*8) into @var;
+insert into d2.t7 values (@var);
+create table d2.t6 (a int);
+select round(rand()*8) into @var;
+insert into d2.t6 values (@var);
+create table d2.t5 (a int);
+select round(rand()*8) into @var;
+insert into d2.t5 values (@var);
+create table d2.t4 (a int);
+select round(rand()*8) into @var;
+insert into d2.t4 values (@var);
+create table d2.t3 (a int);
+select round(rand()*8) into @var;
+insert into d2.t3 values (@var);
+create table d2.t2 (a int);
+select round(rand()*8) into @var;
+insert into d2.t2 values (@var);
+create table d2.t1 (a int);
+select round(rand()*8) into @var;
+insert into d2.t1 values (@var);
+create database d1;
+create table d1.t8 (a int);
+select round(rand()*8) into @var;
+insert into d1.t8 values (@var);
+create table d1.t7 (a int);
+select round(rand()*8) into @var;
+insert into d1.t7 values (@var);
+create table d1.t6 (a int);
+select round(rand()*8) into @var;
+insert into d1.t6 values (@var);
+create table d1.t5 (a int);
+select round(rand()*8) into @var;
+insert into d1.t5 values (@var);
+create table d1.t4 (a int);
+select round(rand()*8) into @var;
+insert into d1.t4 values (@var);
+create table d1.t3 (a int);
+select round(rand()*8) into @var;
+insert into d1.t3 values (@var);
+create table d1.t2 (a int);
+select round(rand()*8) into @var;
+insert into d1.t2 values (@var);
+create table d1.t1 (a int);
+select round(rand()*8) into @var;
+insert into d1.t1 values (@var);
+include/diff_tables.inc [master:d4.t8, slave:d4.t8]
+include/diff_tables.inc [master:d4.t7, slave:d4.t7]
+include/diff_tables.inc [master:d4.t6, slave:d4.t6]
+include/diff_tables.inc [master:d4.t5, slave:d4.t5]
+include/diff_tables.inc [master:d4.t4, slave:d4.t4]
+include/diff_tables.inc [master:d4.t3, slave:d4.t3]
+include/diff_tables.inc [master:d4.t2, slave:d4.t2]
+include/diff_tables.inc [master:d4.t1, slave:d4.t1]
+include/diff_tables.inc [master:d3.t8, slave:d3.t8]
+include/diff_tables.inc [master:d3.t7, slave:d3.t7]
+include/diff_tables.inc [master:d3.t6, slave:d3.t6]
+include/diff_tables.inc [master:d3.t5, slave:d3.t5]
+include/diff_tables.inc [master:d3.t4, slave:d3.t4]
+include/diff_tables.inc [master:d3.t3, slave:d3.t3]
+include/diff_tables.inc [master:d3.t2, slave:d3.t2]
+include/diff_tables.inc [master:d3.t1, slave:d3.t1]
+include/diff_tables.inc [master:d2.t8, slave:d2.t8]
+include/diff_tables.inc [master:d2.t7, slave:d2.t7]
+include/diff_tables.inc [master:d2.t6, slave:d2.t6]
+include/diff_tables.inc [master:d2.t5, slave:d2.t5]
+include/diff_tables.inc [master:d2.t4, slave:d2.t4]
+include/diff_tables.inc [master:d2.t3, slave:d2.t3]
+include/diff_tables.inc [master:d2.t2, slave:d2.t2]
+include/diff_tables.inc [master:d2.t1, slave:d2.t1]
+include/diff_tables.inc [master:d1.t8, slave:d1.t8]
+include/diff_tables.inc [master:d1.t7, slave:d1.t7]
+include/diff_tables.inc [master:d1.t6, slave:d1.t6]
+include/diff_tables.inc [master:d1.t5, slave:d1.t5]
+include/diff_tables.inc [master:d1.t4, slave:d1.t4]
+include/diff_tables.inc [master:d1.t3, slave:d1.t3]
+include/diff_tables.inc [master:d1.t2, slave:d1.t2]
+include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_query.result	2011-01-20 13:39:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_query.result	2011-02-03 09:28:27 +0000
@@ -3,28 +3,49 @@ include/master-slave.inc
 call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
 include/stop_slave.inc
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
-set @@global.mts_slave_parallel_workers= 2;
+set @@global.mts_slave_parallel_workers= 4;
 include/start_slave.inc
 Warnings:
 Note	1726	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
-create database d1;
-use d1;
-create table d1.t1 (a int auto_increment primary key, b int) engine=innodb;
-create temporary table tt_1 (a int auto_increment primary key);
-insert into tt_1 values (null);
-insert into tt_1 values (null);
-insert into d1.t1 (b) select count(*) from tt_1;
 create database d2;
 use d2;
 create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;
-create temporary table tt_1 (a int auto_increment primary key);
-insert into tt_1 values (null);
-insert into tt_1 values (null);
-insert into d2.t1 (b) select count(*) from tt_1;
+insert into d2.t1 (b) select count(*) from tt_##;
+create database d1;
+use d1;
+create table d1.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d1.t1 (b) select count(*) from tt_##;
+create database d4;
+use d4;
+create table d4.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d4.t1 (b) select count(*) from tt_##;
+create database d3;
+use d3;
+create table d3.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d3.t1 (b) select count(*) from tt_##;
+include/diff_tables.inc [master:d4.t1, slave:d4.t1]
+include/diff_tables.inc [master:d3.t1, slave:d3.t1]
 include/diff_tables.inc [master:d2.t1, slave:d2.t1]
 include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+drop temporary table tt_8;
+drop temporary table tt_7;
+drop temporary table tt_6;
+drop temporary table tt_5;
+drop temporary table tt_4;
+drop temporary table tt_3;
+drop temporary table tt_2;
 drop temporary table tt_1;
+drop database d2;
 drop database d1;
+drop temporary table tt_8;
+drop temporary table tt_7;
+drop temporary table tt_6;
+drop temporary table tt_5;
+drop temporary table tt_4;
+drop temporary table tt_3;
+drop temporary table tt_2;
 drop temporary table tt_1;
+drop database d4;
+drop database d3;
 include/stop_slave.inc
 set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test	2011-02-03 09:28:27 +0000
@@ -0,0 +1,123 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of Query event parallelization when
+# a DML Query modifies multiple databases.
+#
+
+--source include/master-slave.inc
+--source include/have_binlog_format_statement.inc
+
+--disable_query_log
+call mtr.add_suppression('.*Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.*');
+--enable_query_log
+
+# restart in Parallel
+
+let $workers= 4;
+
+connection slave;
+
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+
+source include/start_slave.inc;
+
+let $dbs= 4;
+let $tables= 8;
+let $queries= `select $dbs*$tables * 8`;
+
+#
+# 1. Case of multi-update
+#
+
+connection master;
+
+# create & populate
+
+let $n= $dbs;
+while ($n)
+{
+  eval create database d$n;
+  let $m= $tables;
+  while ($m)
+  {
+     eval create table d$n.t$m (a int);
+     eval select round(rand()*$tables) into @var;
+     eval insert into d$n.t$m values (@var);
+     dec $m;
+  }
+  dec $n;
+}
+
+
+# operate to check consistency in the end
+
+let $k= $queries;
+
+--disable_query_log
+--disable_warnings
+while ($k)
+{
+   let $n1= `select floor(rand()*$dbs + 1)`;
+   let $m1= `select floor(rand()*$tables + 1)`;
+   let $n2= `select floor(rand()*$dbs + 1)`;
+   let $m2= `select floor(rand()*$tables + 1)`;
+   let $n3= `select floor(rand()*$dbs + 1)`;
+   let $m3= `select floor(rand()*$tables + 1)`;
+   let $n4= `select floor(rand()*$dbs + 1)`;
+   let $m4= `select floor(rand()*$tables + 1)`;
+   eval update d$n1.t$m1 as t_1, d$n2.t$m2 as t_2, d$n3.t$m3, d$n4.t$m4 as t_3 set t_1.a=t_2.a+ round(rand(10)), t_2.a=t_3.a+ round(rand(10)), t_3.a=t_1.a+ round(rand(10)), t_3.a=t_1.a+ round(rand(10));
+   dec $k;
+}
+--enable_warnings
+--enable_query_log
+
+sync_slave_with_master; 
+
+#
+# Consistency check
+#
+
+let $n = $dbs;
+while($n)
+{
+  let $m= $tables;
+  while ($m)
+  {
+    let $diff_tables=master:d$n.t$m, slave:d$n.t$m;
+    source include/diff_tables.inc;
+    dec $m;
+  }
+  dec $n;
+}
+
+
+#
+# 2. Case of invoked routines (TODO: add parallel support to CREATE trig,sf().
+#
+
+
+#
+# Clean-up
+#
+
+connection master;
+
+--disable_query_log
+
+let $n= $dbs;
+while ($n)
+{
+  eval drop database d$n;
+  dec $n;
+}
+
+--enable_query_log
+
+sync_slave_with_master;
+
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+### TODO: --source include/rpl_end.inc

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_query.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_query.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_query.test	2011-02-03 09:28:27 +0000
@@ -0,0 +1,169 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of Query events parallelization.
+#
+
+--source include/master-slave.inc
+--source include/have_binlog_format_statement.inc
+
+call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
+
+let $temp_tables= 16;
+let $workers= 4;
+
+connection slave;
+
+# restart in Parallel
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+
+source include/start_slave.inc;
+
+# Two connections each create 2 db:s a regular table and a set of temp tables.
+# The temp tables contribute content to the regular tables.
+# In the end there must be consistent data on both sides.
+
+connection master;
+
+let $n= `select round($workers/2)`;
+let $n1= `select $n`;
+while ($n)
+{
+  eval create database d$n1;
+  eval use d$n1;
+  eval create table d$n1.t1 (a int auto_increment primary key, b int) engine=innodb;
+  let $i= $temp_tables;
+
+--disable_query_log
+  while($i)
+  {
+    let $temp_rows= `select round(rand()*$temp_tables) + 1`;
+    let $k= $temp_rows;
+    eval create temporary table tt_$i (a int auto_increment primary key);
+    while($k)
+    {
+	eval insert into tt_$i values (null);
+	dec $k;
+    }
+    dec $i;
+  }
+--enable_query_log
+
+  let $j= `select floor(rand()*$temp_tables) + 1`;
+--replace_regex /tt_.*/tt_##/
+  eval insert into d$n1.t1 (b) select count(*) from tt_$j;
+  dec $n;
+  dec $n1;
+}
+
+connection master1;
+
+let $n= `select round($workers/2)`;
+let $n1= `select 2*$n`;
+while ($n)
+{
+  eval create database d$n1;
+  eval use d$n1;
+  eval create table d$n1.t1 (a int auto_increment primary key, b int) engine=innodb;
+  let $i= $temp_tables;
+
+--disable_query_log
+  while($i)
+  {
+    let $temp_rows= `select round(rand()*$temp_tables) + 1`;
+    let $k= $temp_rows;
+    eval create temporary table tt_$i (a int auto_increment primary key);
+    while($k)
+    {
+	eval insert into tt_$i values (null);
+	dec $k;
+    }
+    dec $i;
+  }
+--enable_query_log
+
+  let $j= `select floor(rand()*$temp_tables) + 1`;
+--replace_regex /tt_.*/tt_##/
+  eval insert into d$n1.t1 (b) select count(*) from tt_$j;
+  dec $n;
+  dec $n1;
+}
+
+sync_slave_with_master;
+
+#
+# Consistency check
+#
+
+let $n = $workers;
+while($n)
+{
+  let $diff_tables=master:d$n.t1, slave:d$n.t1;
+  source include/diff_tables.inc;
+
+  dec $n;
+}
+
+#
+# cleanup
+#
+# Temp tables are removed two ways explicitly and implicitly by disconnecting.
+#
+
+connection master;
+
+let $i= `select round($temp_tables/2)`;
+while($i)
+{
+  eval drop temporary table tt_$i;
+  dec $i;
+}
+
+let $n= `select round($workers/2)`;
+let $n1= `select $n`;
+while ($n)
+{
+  eval drop database d$n1;
+  dec $n;
+  dec $n1;
+}
+
+
+connection master1;
+
+let $i= `select round($temp_tables/2)`;
+while($i)
+{
+  eval drop temporary table tt_$i;
+  dec $i;
+}
+
+sync_slave_with_master;
+#connection slave;
+
+connection master1;
+disconnect master1;
+
+#
+# Clean-up
+#
+
+connection master;
+
+let $n= `select round($workers/2)`;
+let $n1= `select 2*$n`;
+while ($n)
+{
+  eval drop database d$n1;
+  dec $n;
+  dec $n1;
+}
+
+sync_slave_with_master;
+source include/stop_slave.inc;
+
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+### TODO: --source include/rpl_end.inc

=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc	2010-12-27 18:54:41 +0000
+++ b/sql/binlog.cc	2011-02-03 09:28:27 +0000
@@ -4733,7 +4733,56 @@ int THD::decide_logging_format(TABLE_LIS
         is_write= TRUE;
 
         prev_write_table= table->table;
+
+        if (variables.binlog_format != BINLOG_FORMAT_ROW)
+        {
+          /*
+            Master side of the STMT format events parallelization.
+            Write-locked table's db:s are stored in a abc-ordered name list.
+            The list can remain empty if the only database that 
+            is updated is the default one.
+            In case the number of databases exceeds MAX_DBS_IN_QUERY_MTS
+            the list won't be sent to the slave either.
+          */
+
+          if (!binlog_updated_db_names)
+          {
+            binlog_updated_db_names= new List<char>; /* thd->mem_root is used */
+          }
+          if (binlog_updated_db_names->elements <  MAX_DBS_IN_QUERY_MTS + 1)
+          {
+            char *after_db= strdup(table->db);
+            if (binlog_updated_db_names->elements != 0)
+            {
+              List_iterator<char> it(*get_binlog_updated_db_names());
+              
+              while (it++)
+              {
+                char *swap= NULL;
+                char **ref_cur_db= it.ref();
+                int cmp= strcmp(after_db, *ref_cur_db);
+                
+                DBUG_ASSERT(!swap || cmp < 0);
+                
+                if (cmp == 0)
+                {
+                  after_db= NULL;  /* dup to ignore */
+                  break;
+                }
+                else if (swap || cmp > 0)
+                {
+                  swap= *ref_cur_db;
+                  *ref_cur_db= after_db;
+                  after_db= swap;
+                }
+              }
+            }
+            if (after_db)
+              binlog_updated_db_names->push_back(after_db);
+          }
+        }
       }
+
       flags_access_some_set |= flags;
 
       if (lex->sql_command != SQLCOM_CREATE_TABLE ||

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-01-11 23:01:02 +0000
+++ b/sql/log_event.cc	2011-02-03 09:28:27 +0000
@@ -2489,17 +2489,25 @@ Slave_worker *Log_event::get_slave_worke
 
   if (contains_partition_info())
   {
-    // a lot of things inside `get_slave_worker_id'
-    const_cast<Relay_log_info *>(rli)->last_assigned_worker=
-      worker= get_slave_worker(get_db(), const_cast<Relay_log_info *>(rli));
-    get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-    if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+    List_iterator<char> it(*mts_get_dbs());
+    while (it++)
     {
-      g.worker_id= worker->id;       // todo/fixme: think of Slave_worker* here
-      set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-
-      DBUG_ASSERT(g.group_relay_log_name == NULL);
+      char **ref_cur_db= it.ref();
+      // a lot of things inside `get_slave_worker_id'
+      const_cast<Relay_log_info *>(rli)->last_assigned_worker=
+        worker= get_slave_worker(*ref_cur_db, const_cast<Relay_log_info *>(rli));
+      get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+      if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+      {
+        g.worker_id= worker->id;       // todo/fixme: think of Slave_worker* here
+        set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+        
+        DBUG_ASSERT(g.group_relay_log_name == NULL);
+      }
     }
+    // releasing the Coord's mem-root from the updated dbs
+    // if (get_type_code() == QUERY_EVENT)
+    free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
   }
   else // r
     if (rli->last_assigned_worker)
@@ -2987,25 +2995,29 @@ int slave_worker_exec_job(Slave_worker *
   {
     if (ev->contains_partition_info())
     {
+      List_iterator<char> it(*ev->mts_get_dbs());
       DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
-      uint i;
-      char key[NAME_LEN + 2];
-      bool found= FALSE;
-      const char *dbname= ev->get_db();
-      uchar dblength= (uint) strlen(dbname);
       
-      for (i= 0; i < ep->elements && !found; i++)
+      while (it++)
       {
-        get_dynamic(ep, (uchar*) key, i);
-        found=
-          (key[0] == dblength) &&
-          (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
-      }
-      if (!found)
-      {
-        key[0]= dblength;
-        memcpy(key + 1, dbname, dblength + 1);
-        insert_dynamic(ep, (uchar*) key);
+        bool found= FALSE;
+        char key[NAME_LEN + 2];
+        const char *dbname= *it.ref();
+        uchar dblength= (uint) strlen(dbname);
+
+        for (uint i= 0; i < ep->elements && !found; i++)
+        {
+          get_dynamic(ep, (uchar*) key, i);
+          found=
+            (key[0] == dblength) &&
+            (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+        }
+        if (!found)
+        {
+          key[0]= dblength;
+          memcpy(key + 1, dbname, dblength + 1);
+          insert_dynamic(ep, (uchar*) key);
+        }
       }
     }
   }
@@ -3089,6 +3101,8 @@ err:
   if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
     delete ev;  // after ev->update_pos() event is garbage
 
+  free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
+  
   DBUG_RETURN(error);
 }
 
@@ -3354,6 +3368,30 @@ bool Query_log_event::write(IO_CACHE* fi
       start+= host.length;
     }
   }
+
+  if (thd->get_binlog_updated_db_names() != NULL)
+  {
+    uchar dbs;
+    *start++= Q_UPDATED_DB_NAMES;
+    dbs= *start++= thd->get_binlog_updated_db_names()->elements;
+
+    DBUG_ASSERT(dbs != 0);
+    /* 
+       MAX_DBS_IN_QUERY_MTS + 1 is special no db:s is written
+       and event requires the sequential applying on slave.
+    */
+    if (dbs != MAX_DBS_IN_QUERY_MTS + 1)
+    {
+      List_iterator_fast<char> it(*thd->get_binlog_updated_db_names());
+      char *db_name;
+      while ((db_name= it++))
+      {
+        strcpy((char*) start, db_name);
+        start += strlen(db_name) + 1;
+      }
+    }
+  }
+
   /*
     NOTE: When adding new status vars, please don't forget to update
     the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function
@@ -3437,7 +3475,7 @@ Query_log_event::Query_log_event(THD* th
    lc_time_names_number(thd_arg->variables.lc_time_names->number),
    charset_database_number(0),
    table_map_for_update((ulonglong)thd_arg->table_map_for_update),
-   master_data_written(0)
+   master_data_written(0), mts_updated_dbs(0)
 {
   time_t end_time;
 
@@ -3638,6 +3676,7 @@ code_name(int code)
   case Q_CHARSET_DATABASE_CODE: return "Q_CHARSET_DATABASE_CODE";
   case Q_TABLE_MAP_FOR_UPDATE_CODE: return "Q_TABLE_MAP_FOR_UPDATE_CODE";
   case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
+  case Q_UPDATED_DB_NAMES: return "Q_UPDATED_DB_NAMES";
   }
   sprintf(buf, "CODE#%d", code);
   return buf;
@@ -3675,7 +3714,8 @@ Query_log_event::Query_log_event(const c
    flags2_inited(0), sql_mode_inited(0), charset_inited(0),
    auto_increment_increment(1), auto_increment_offset(1),
    time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
-   table_map_for_update(0), master_data_written(0)
+   table_map_for_update(0), master_data_written(0),
+   mts_updated_dbs(MAX_DBS_IN_QUERY_MTS + 1)
 {
   ulong data_len;
   uint32 tmp;
@@ -3683,6 +3723,7 @@ Query_log_event::Query_log_event(const c
   Log_event::Byte *start;
   const Log_event::Byte *end;
   bool catalog_nz= 1;
+
   DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
 
   memset(&user, 0, sizeof(user));
@@ -3854,6 +3895,23 @@ Query_log_event::Query_log_event(const c
       CHECK_SPACE(pos, end, host.length);
       host.str= (char *)pos;
       pos+= host.length;
+      break;
+    }
+    case Q_UPDATED_DB_NAMES:
+    {
+      CHECK_SPACE(pos, end, 1);
+      mts_updated_dbs= *pos++;
+      if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
+        break;
+
+      DBUG_ASSERT(mts_updated_dbs != 0);
+
+      for (uchar i= 0; i < mts_updated_dbs; i++)
+      {
+        strcpy(mts_updated_db_names[i], (char*) pos);
+        pos+= 1 + strlen((const char*) pos);
+      }
+      break;
     }
     default:
       /* That's why you must write status vars in growing order of code */

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-01-11 23:01:02 +0000
+++ b/sql/log_event.h	2011-02-03 09:28:27 +0000
@@ -258,6 +258,14 @@ struct sql_ex_info
 #define INCIDENT_HEADER_LEN    2
 #define HEARTBEAT_HEADER_LEN   0
 #define IGNORABLE_HEADER_LEN   0
+
+/**
+   MTS parameter. The maximum number of databases that a query modifies
+   and be logged in the binarry log with a special status variable containing
+   the database names to facilitate the parallel applying of the Query-event.
+*/
+#define MAX_DBS_IN_QUERY_MTS   16
+
 /* 
   Max number of possible extra bytes in a replication event compared to a
   packet (i.e. a query) sent from client to master;
@@ -273,6 +281,8 @@ struct sql_ex_info
                                    1 + 2          /* type, charset_database_number */ + \
                                    1 + 8          /* type, table_map_for_update */ + \
                                    1 + 4          /* type, master_data_written */ + \
+                                                  /* type, db_1, db_2, ... */  \
+                                   1 + (MAX_DBS_IN_QUERY_MTS * (1 + NAME_LEN)) + \
                                    1 + 16 + 1 + 60/* type, user_len, user, host_len, host */)
 #define MAX_LOG_EVENT_HEADER   ( /* in order of Query_log_event::write */ \
   LOG_EVENT_HEADER_LEN + /* write_header */ \
@@ -344,6 +354,8 @@ struct sql_ex_info
 
 #define Q_INVOKER 11
 
+#define Q_UPDATED_DB_NAMES 12
+
 /* Intvar event post-header */
 
 /* Intvar event data */
@@ -1069,6 +1081,14 @@ public:
   {
     return thd ? thd->db : 0;
   }
+
+  virtual List<char>* mts_get_dbs()
+  {
+    List<char> *res= new List<char>;
+    res->push_back(strdup(get_db()));
+    return res;
+  }
+
 #else
   Log_event() : temp_buf(0) {}
     /* avoid having to link mysqlbinlog against libpthread */
@@ -1851,12 +1871,26 @@ public:
     Q_MASTER_DATA_WRITTEN_CODE to the slave's server binlog.
   */
   uint32 master_data_written;
+  /*
+    number of updated db:s by the query and their names. This info
+    is requested by both Coordinator and Worker.
+  */
+  uchar mts_updated_dbs;
+  char mts_updated_db_names[MAX_DBS_IN_QUERY_MTS][NAME_LEN];
 
 #ifdef MYSQL_SERVER
 
   Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
                   bool using_trans, bool direct, bool suppress_use, int error);
   const char* get_db() { return db; }
+  virtual List<char>* mts_get_dbs()
+  {
+    DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs < MAX_DBS_IN_QUERY_MTS + 1);
+    List<char> *res= new List<char>;
+    for (uchar i= 0; i < mts_updated_dbs; i++)
+      res->push_back(mts_updated_db_names[i]);
+    return res;
+  }
 #ifdef HAVE_REPLICATION
   void pack_info(Protocol* protocol);
 #endif /* HAVE_REPLICATION */

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-01-20 13:39:00 +0000
+++ b/sql/rpl_rli.cc	2011-02-03 09:28:27 +0000
@@ -223,11 +223,13 @@ THD* mts_get_worker_thd()
     NULL : w->w_rli->info_thd;
 }
 
+// TODO: remove the query in parallel option, rename mts_is_worker and
+//       s/ SYSTEM_THREAD_SLAVE_SQL/SYSTEM_THREAD_SLAVE_WORKER/
 bool mts_is_coord_or_worker(THD *thd)
 {
   return
     thd->slave_thread &&
-    thd->system_thread != SYSTEM_THREAD_SLAVE_IO &&
+    thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
     (mts_get_worker_thd() != NULL);
 }
 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-01-20 13:39:00 +0000
+++ b/sql/rpl_slave.cc	2011-02-03 09:28:27 +0000
@@ -3736,7 +3736,7 @@ pthread_handler_t handle_slave_worker(vo
   thd->thread_stack = (char*)&thd;
   
   pthread_detach_this_thread();
-  if (init_slave_thread(thd, SLAVE_THD_SQL))
+  if (init_slave_thread(thd, SLAVE_THD_SQL))  // todo: make thd->sys_thr= worker
   {
     // todo make SQL thread killed
     sql_print_error("Failed during slave worker initialization");
@@ -3804,6 +3804,7 @@ err:
   {
     mysql_mutex_lock(&LOCK_thread_count);
     THD_CHECK_SENTRY(thd);
+    thd->system_thread= NON_SYSTEM_THREAD; // tt closing work/around
     delete thd;
     mysql_mutex_unlock(&LOCK_thread_count);
   }

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2010-12-17 16:14:15 +0000
+++ b/sql/sql_class.cc	2011-02-03 09:28:27 +0000
@@ -503,6 +503,7 @@ THD::THD()
    user_time(0), in_sub_stmt(0),
    binlog_unsafe_warning_flags(0),
    binlog_table_maps(0),
+   binlog_updated_db_names(NULL),
    table_map_for_update(0),
    arg_of_last_insert_id_function(FALSE),
    first_successful_insert_id_in_prev_stmt(0),
@@ -981,7 +982,6 @@ void THD::init_for_queries()
   transaction.xid_state.in_thd=1;
 }
 
-
 /*
   Do what's needed when one invokes change user
 
@@ -1069,7 +1069,6 @@ void THD::cleanup(void)
     mysql_mutex_unlock(&LOCK_user_locks);
     ull= NULL;
   }
-
   cleanup_done=1;
   DBUG_VOID_RETURN;
 }
@@ -1414,6 +1413,7 @@ void THD::cleanup_after_query()
   /* reset table map for multi-table update */
   table_map_for_update= 0;
   m_binlog_invoker= FALSE;
+  binlog_updated_db_names= NULL;
 }
 
 

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-01-11 23:01:02 +0000
+++ b/sql/sql_class.h	2011-02-03 09:28:27 +0000
@@ -1721,6 +1721,11 @@ private:
     transaction cache.
   */
   uint binlog_table_maps;
+  /*
+    String space-separated db names listing to be updated by the query databases
+  */
+  List<char> *binlog_updated_db_names;
+
 public:
   void issue_unsafe_warnings();
 
@@ -1730,6 +1735,9 @@ public:
   void clear_binlog_table_maps() {
     binlog_table_maps= 0;
   }
+  List<char> * get_binlog_updated_db_names() {
+    return binlog_updated_db_names;
+  }
 #endif /* MYSQL_CLIENT */
 
 public:


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110203092827-kd9tg0kdsqtnrizu.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3271) WL#5754Andrei Elkin3 Feb
  • Re: bzr commit into mysql-next-mr-wl5569 branch(andrei.elkin:3271) WL#5754Bjorn Munch3 Feb