#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3271 Andrei Elkin 2011-02-03
wl#5754 Parallel Query replication
This patch implements a core of the design that is the DML queries.
Queries can contain arbitrary features including temp tables.
Todo:
1. Extend the framework to capture the remained DDL:s
2. fix the server startup opt (--mts-exp-run-query-in-parallel).
3. Refine only_sequential() to return TRUE in case the list of the updated db:s
is bigger than the max, *and* in case the query is from the Old master.
@ mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
new result file is added.
@ mysql-test/suite/rpl/r/rpl_parallel_query.result
new result file is added.
@ mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
multi-db DML query test is added.
todo: add triggers, sf(), SP.
@ mysql-test/suite/rpl/t/rpl_parallel_query.test
query with temporary tables testing.
@ sql/binlog.cc
gathering to be updated db in the DML case. Over-MAX_DBS_IN_QUERY_MTS-sized list won't be shipped
to the slave.
@ sql/log_event.cc
master and slave (Coord's and Worker's) handling of updated db:s.
The Coordinator's distribution changed to involve a loop per db, similaraly for the Worker
at applying.
@ sql/log_event.h
Hardcoding the max updated db:s.
Static allocation for updated db:s in Query log event is motivated
by the fact of the event is shared by both C and W and the standard malloc/free
can't be a reasonble choice either.
Added a new status and changed dependent info.
Added a new method to return the *list* of updated db:s which in all but Query case
is just a wrapper over get_db().
@ sql/rpl_rli.cc
fixing an error.
@ sql/rpl_slave.cc
Added a work-around/cleanup needed by the standard temp table closing algorithm.
@ sql/sql_class.cc
master side gathering updated db:s new memeber initializations.
@ sql/sql_class.h
master side gathering updated db:s list and accessor members.
added:
mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
mysql-test/suite/rpl/t/rpl_parallel_query.test
modified:
mysql-test/suite/rpl/r/rpl_parallel_query.result
sql/binlog.cc
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.cc
sql/rpl_slave.cc
sql/sql_class.cc
sql/sql_class.h
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_multi_db.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result 2011-02-03 09:28:27 +0000
@@ -0,0 +1,141 @@
+include/master-slave.inc
+[connection master]
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
+include/start_slave.inc
+Warnings:
+Note 1726 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+create database d4;
+create table d4.t8 (a int);
+select round(rand()*8) into @var;
+insert into d4.t8 values (@var);
+create table d4.t7 (a int);
+select round(rand()*8) into @var;
+insert into d4.t7 values (@var);
+create table d4.t6 (a int);
+select round(rand()*8) into @var;
+insert into d4.t6 values (@var);
+create table d4.t5 (a int);
+select round(rand()*8) into @var;
+insert into d4.t5 values (@var);
+create table d4.t4 (a int);
+select round(rand()*8) into @var;
+insert into d4.t4 values (@var);
+create table d4.t3 (a int);
+select round(rand()*8) into @var;
+insert into d4.t3 values (@var);
+create table d4.t2 (a int);
+select round(rand()*8) into @var;
+insert into d4.t2 values (@var);
+create table d4.t1 (a int);
+select round(rand()*8) into @var;
+insert into d4.t1 values (@var);
+create database d3;
+create table d3.t8 (a int);
+select round(rand()*8) into @var;
+insert into d3.t8 values (@var);
+create table d3.t7 (a int);
+select round(rand()*8) into @var;
+insert into d3.t7 values (@var);
+create table d3.t6 (a int);
+select round(rand()*8) into @var;
+insert into d3.t6 values (@var);
+create table d3.t5 (a int);
+select round(rand()*8) into @var;
+insert into d3.t5 values (@var);
+create table d3.t4 (a int);
+select round(rand()*8) into @var;
+insert into d3.t4 values (@var);
+create table d3.t3 (a int);
+select round(rand()*8) into @var;
+insert into d3.t3 values (@var);
+create table d3.t2 (a int);
+select round(rand()*8) into @var;
+insert into d3.t2 values (@var);
+create table d3.t1 (a int);
+select round(rand()*8) into @var;
+insert into d3.t1 values (@var);
+create database d2;
+create table d2.t8 (a int);
+select round(rand()*8) into @var;
+insert into d2.t8 values (@var);
+create table d2.t7 (a int);
+select round(rand()*8) into @var;
+insert into d2.t7 values (@var);
+create table d2.t6 (a int);
+select round(rand()*8) into @var;
+insert into d2.t6 values (@var);
+create table d2.t5 (a int);
+select round(rand()*8) into @var;
+insert into d2.t5 values (@var);
+create table d2.t4 (a int);
+select round(rand()*8) into @var;
+insert into d2.t4 values (@var);
+create table d2.t3 (a int);
+select round(rand()*8) into @var;
+insert into d2.t3 values (@var);
+create table d2.t2 (a int);
+select round(rand()*8) into @var;
+insert into d2.t2 values (@var);
+create table d2.t1 (a int);
+select round(rand()*8) into @var;
+insert into d2.t1 values (@var);
+create database d1;
+create table d1.t8 (a int);
+select round(rand()*8) into @var;
+insert into d1.t8 values (@var);
+create table d1.t7 (a int);
+select round(rand()*8) into @var;
+insert into d1.t7 values (@var);
+create table d1.t6 (a int);
+select round(rand()*8) into @var;
+insert into d1.t6 values (@var);
+create table d1.t5 (a int);
+select round(rand()*8) into @var;
+insert into d1.t5 values (@var);
+create table d1.t4 (a int);
+select round(rand()*8) into @var;
+insert into d1.t4 values (@var);
+create table d1.t3 (a int);
+select round(rand()*8) into @var;
+insert into d1.t3 values (@var);
+create table d1.t2 (a int);
+select round(rand()*8) into @var;
+insert into d1.t2 values (@var);
+create table d1.t1 (a int);
+select round(rand()*8) into @var;
+insert into d1.t1 values (@var);
+include/diff_tables.inc [master:d4.t8, slave:d4.t8]
+include/diff_tables.inc [master:d4.t7, slave:d4.t7]
+include/diff_tables.inc [master:d4.t6, slave:d4.t6]
+include/diff_tables.inc [master:d4.t5, slave:d4.t5]
+include/diff_tables.inc [master:d4.t4, slave:d4.t4]
+include/diff_tables.inc [master:d4.t3, slave:d4.t3]
+include/diff_tables.inc [master:d4.t2, slave:d4.t2]
+include/diff_tables.inc [master:d4.t1, slave:d4.t1]
+include/diff_tables.inc [master:d3.t8, slave:d3.t8]
+include/diff_tables.inc [master:d3.t7, slave:d3.t7]
+include/diff_tables.inc [master:d3.t6, slave:d3.t6]
+include/diff_tables.inc [master:d3.t5, slave:d3.t5]
+include/diff_tables.inc [master:d3.t4, slave:d3.t4]
+include/diff_tables.inc [master:d3.t3, slave:d3.t3]
+include/diff_tables.inc [master:d3.t2, slave:d3.t2]
+include/diff_tables.inc [master:d3.t1, slave:d3.t1]
+include/diff_tables.inc [master:d2.t8, slave:d2.t8]
+include/diff_tables.inc [master:d2.t7, slave:d2.t7]
+include/diff_tables.inc [master:d2.t6, slave:d2.t6]
+include/diff_tables.inc [master:d2.t5, slave:d2.t5]
+include/diff_tables.inc [master:d2.t4, slave:d2.t4]
+include/diff_tables.inc [master:d2.t3, slave:d2.t3]
+include/diff_tables.inc [master:d2.t2, slave:d2.t2]
+include/diff_tables.inc [master:d2.t1, slave:d2.t1]
+include/diff_tables.inc [master:d1.t8, slave:d1.t8]
+include/diff_tables.inc [master:d1.t7, slave:d1.t7]
+include/diff_tables.inc [master:d1.t6, slave:d1.t6]
+include/diff_tables.inc [master:d1.t5, slave:d1.t5]
+include/diff_tables.inc [master:d1.t4, slave:d1.t4]
+include/diff_tables.inc [master:d1.t3, slave:d1.t3]
+include/diff_tables.inc [master:d1.t2, slave:d1.t2]
+include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_query.result 2011-01-20 13:39:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_query.result 2011-02-03 09:28:27 +0000
@@ -3,28 +3,49 @@ include/master-slave.inc
call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
include/stop_slave.inc
set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
-set @@global.mts_slave_parallel_workers= 2;
+set @@global.mts_slave_parallel_workers= 4;
include/start_slave.inc
Warnings:
Note 1726 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
-create database d1;
-use d1;
-create table d1.t1 (a int auto_increment primary key, b int) engine=innodb;
-create temporary table tt_1 (a int auto_increment primary key);
-insert into tt_1 values (null);
-insert into tt_1 values (null);
-insert into d1.t1 (b) select count(*) from tt_1;
create database d2;
use d2;
create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;
-create temporary table tt_1 (a int auto_increment primary key);
-insert into tt_1 values (null);
-insert into tt_1 values (null);
-insert into d2.t1 (b) select count(*) from tt_1;
+insert into d2.t1 (b) select count(*) from tt_##;
+create database d1;
+use d1;
+create table d1.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d1.t1 (b) select count(*) from tt_##;
+create database d4;
+use d4;
+create table d4.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d4.t1 (b) select count(*) from tt_##;
+create database d3;
+use d3;
+create table d3.t1 (a int auto_increment primary key, b int) engine=innodb;
+insert into d3.t1 (b) select count(*) from tt_##;
+include/diff_tables.inc [master:d4.t1, slave:d4.t1]
+include/diff_tables.inc [master:d3.t1, slave:d3.t1]
include/diff_tables.inc [master:d2.t1, slave:d2.t1]
include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+drop temporary table tt_8;
+drop temporary table tt_7;
+drop temporary table tt_6;
+drop temporary table tt_5;
+drop temporary table tt_4;
+drop temporary table tt_3;
+drop temporary table tt_2;
drop temporary table tt_1;
+drop database d2;
drop database d1;
+drop temporary table tt_8;
+drop temporary table tt_7;
+drop temporary table tt_6;
+drop temporary table tt_5;
+drop temporary table tt_4;
+drop temporary table tt_3;
+drop temporary table tt_2;
drop temporary table tt_1;
+drop database d4;
+drop database d3;
include/stop_slave.inc
set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test 2011-02-03 09:28:27 +0000
@@ -0,0 +1,123 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of Query event parallelization when
+# a DML Query modifies multiple databases.
+#
+
+--source include/master-slave.inc
+--source include/have_binlog_format_statement.inc
+
+--disable_query_log
+call mtr.add_suppression('.*Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT.*');
+--enable_query_log
+
+# restart in Parallel
+
+let $workers= 4;
+
+connection slave;
+
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+
+source include/start_slave.inc;
+
+let $dbs= 4;
+let $tables= 8;
+let $queries= `select $dbs*$tables * 8`;
+
+#
+# 1. Case of multi-update
+#
+
+connection master;
+
+# create & populate
+
+let $n= $dbs;
+while ($n)
+{
+ eval create database d$n;
+ let $m= $tables;
+ while ($m)
+ {
+ eval create table d$n.t$m (a int);
+ eval select round(rand()*$tables) into @var;
+ eval insert into d$n.t$m values (@var);
+ dec $m;
+ }
+ dec $n;
+}
+
+
+# operate to check consistency in the end
+
+let $k= $queries;
+
+--disable_query_log
+--disable_warnings
+while ($k)
+{
+ let $n1= `select floor(rand()*$dbs + 1)`;
+ let $m1= `select floor(rand()*$tables + 1)`;
+ let $n2= `select floor(rand()*$dbs + 1)`;
+ let $m2= `select floor(rand()*$tables + 1)`;
+ let $n3= `select floor(rand()*$dbs + 1)`;
+ let $m3= `select floor(rand()*$tables + 1)`;
+ let $n4= `select floor(rand()*$dbs + 1)`;
+ let $m4= `select floor(rand()*$tables + 1)`;
+ eval update d$n1.t$m1 as t_1, d$n2.t$m2 as t_2, d$n3.t$m3, d$n4.t$m4 as t_3 set t_1.a=t_2.a+ round(rand(10)), t_2.a=t_3.a+ round(rand(10)), t_3.a=t_1.a+ round(rand(10)), t_3.a=t_1.a+ round(rand(10));
+ dec $k;
+}
+--enable_warnings
+--enable_query_log
+
+sync_slave_with_master;
+
+#
+# Consistency check
+#
+
+let $n = $dbs;
+while($n)
+{
+ let $m= $tables;
+ while ($m)
+ {
+ let $diff_tables=master:d$n.t$m, slave:d$n.t$m;
+ source include/diff_tables.inc;
+ dec $m;
+ }
+ dec $n;
+}
+
+
+#
+# 2. Case of invoked routines (TODO: add parallel support to CREATE trig,sf().
+#
+
+
+#
+# Clean-up
+#
+
+connection master;
+
+--disable_query_log
+
+let $n= $dbs;
+while ($n)
+{
+ eval drop database d$n;
+ dec $n;
+}
+
+--enable_query_log
+
+sync_slave_with_master;
+
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+### TODO: --source include/rpl_end.inc
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_query.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_query.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_query.test 2011-02-03 09:28:27 +0000
@@ -0,0 +1,169 @@
+#
+# WL#5569 MTS
+#
+# The test verifies correctness of Query events parallelization.
+#
+
+--source include/master-slave.inc
+--source include/have_binlog_format_statement.inc
+
+call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
+
+let $temp_tables= 16;
+let $workers= 4;
+
+connection slave;
+
+# restart in Parallel
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+
+source include/start_slave.inc;
+
+# Two connections each create 2 db:s a regular table and a set of temp tables.
+# The temp tables contribute content to the regular tables.
+# In the end there must be consistent data on both sides.
+
+connection master;
+
+let $n= `select round($workers/2)`;
+let $n1= `select $n`;
+while ($n)
+{
+ eval create database d$n1;
+ eval use d$n1;
+ eval create table d$n1.t1 (a int auto_increment primary key, b int) engine=innodb;
+ let $i= $temp_tables;
+
+--disable_query_log
+ while($i)
+ {
+ let $temp_rows= `select round(rand()*$temp_tables) + 1`;
+ let $k= $temp_rows;
+ eval create temporary table tt_$i (a int auto_increment primary key);
+ while($k)
+ {
+ eval insert into tt_$i values (null);
+ dec $k;
+ }
+ dec $i;
+ }
+--enable_query_log
+
+ let $j= `select floor(rand()*$temp_tables) + 1`;
+--replace_regex /tt_.*/tt_##/
+ eval insert into d$n1.t1 (b) select count(*) from tt_$j;
+ dec $n;
+ dec $n1;
+}
+
+connection master1;
+
+let $n= `select round($workers/2)`;
+let $n1= `select 2*$n`;
+while ($n)
+{
+ eval create database d$n1;
+ eval use d$n1;
+ eval create table d$n1.t1 (a int auto_increment primary key, b int) engine=innodb;
+ let $i= $temp_tables;
+
+--disable_query_log
+ while($i)
+ {
+ let $temp_rows= `select round(rand()*$temp_tables) + 1`;
+ let $k= $temp_rows;
+ eval create temporary table tt_$i (a int auto_increment primary key);
+ while($k)
+ {
+ eval insert into tt_$i values (null);
+ dec $k;
+ }
+ dec $i;
+ }
+--enable_query_log
+
+ let $j= `select floor(rand()*$temp_tables) + 1`;
+--replace_regex /tt_.*/tt_##/
+ eval insert into d$n1.t1 (b) select count(*) from tt_$j;
+ dec $n;
+ dec $n1;
+}
+
+sync_slave_with_master;
+
+#
+# Consistency check
+#
+
+let $n = $workers;
+while($n)
+{
+ let $diff_tables=master:d$n.t1, slave:d$n.t1;
+ source include/diff_tables.inc;
+
+ dec $n;
+}
+
+#
+# cleanup
+#
+# Temp tables are removed two ways explicitly and implicitly by disconnecting.
+#
+
+connection master;
+
+let $i= `select round($temp_tables/2)`;
+while($i)
+{
+ eval drop temporary table tt_$i;
+ dec $i;
+}
+
+let $n= `select round($workers/2)`;
+let $n1= `select $n`;
+while ($n)
+{
+ eval drop database d$n1;
+ dec $n;
+ dec $n1;
+}
+
+
+connection master1;
+
+let $i= `select round($temp_tables/2)`;
+while($i)
+{
+ eval drop temporary table tt_$i;
+ dec $i;
+}
+
+sync_slave_with_master;
+#connection slave;
+
+connection master1;
+disconnect master1;
+
+#
+# Clean-up
+#
+
+connection master;
+
+let $n= `select round($workers/2)`;
+let $n1= `select 2*$n`;
+while ($n)
+{
+ eval drop database d$n1;
+ dec $n;
+ dec $n1;
+}
+
+sync_slave_with_master;
+source include/stop_slave.inc;
+
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+### TODO: --source include/rpl_end.inc
=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc 2010-12-27 18:54:41 +0000
+++ b/sql/binlog.cc 2011-02-03 09:28:27 +0000
@@ -4733,7 +4733,56 @@ int THD::decide_logging_format(TABLE_LIS
is_write= TRUE;
prev_write_table= table->table;
+
+ if (variables.binlog_format != BINLOG_FORMAT_ROW)
+ {
+ /*
+ Master side of the STMT format events parallelization.
+ Write-locked table's db:s are stored in a abc-ordered name list.
+ The list can remain empty if the only database that
+ is updated is the default one.
+ In case the number of databases exceeds MAX_DBS_IN_QUERY_MTS
+ the list won't be sent to the slave either.
+ */
+
+ if (!binlog_updated_db_names)
+ {
+ binlog_updated_db_names= new List<char>; /* thd->mem_root is used */
+ }
+ if (binlog_updated_db_names->elements < MAX_DBS_IN_QUERY_MTS + 1)
+ {
+ char *after_db= strdup(table->db);
+ if (binlog_updated_db_names->elements != 0)
+ {
+ List_iterator<char> it(*get_binlog_updated_db_names());
+
+ while (it++)
+ {
+ char *swap= NULL;
+ char **ref_cur_db= it.ref();
+ int cmp= strcmp(after_db, *ref_cur_db);
+
+ DBUG_ASSERT(!swap || cmp < 0);
+
+ if (cmp == 0)
+ {
+ after_db= NULL; /* dup to ignore */
+ break;
+ }
+ else if (swap || cmp > 0)
+ {
+ swap= *ref_cur_db;
+ *ref_cur_db= after_db;
+ after_db= swap;
+ }
+ }
+ }
+ if (after_db)
+ binlog_updated_db_names->push_back(after_db);
+ }
+ }
}
+
flags_access_some_set |= flags;
if (lex->sql_command != SQLCOM_CREATE_TABLE ||
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-01-11 23:01:02 +0000
+++ b/sql/log_event.cc 2011-02-03 09:28:27 +0000
@@ -2489,17 +2489,25 @@ Slave_worker *Log_event::get_slave_worke
if (contains_partition_info())
{
- // a lot of things inside `get_slave_worker_id'
- const_cast<Relay_log_info *>(rli)->last_assigned_worker=
- worker= get_slave_worker(get_db(), const_cast<Relay_log_info *>(rli));
- get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
- if (g.worker_id == (ulong) -1) // assign "offically" the current group
+ List_iterator<char> it(*mts_get_dbs());
+ while (it++)
{
- g.worker_id= worker->id; // todo/fixme: think of Slave_worker* here
- set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
-
- DBUG_ASSERT(g.group_relay_log_name == NULL);
+ char **ref_cur_db= it.ref();
+ // a lot of things inside `get_slave_worker_id'
+ const_cast<Relay_log_info *>(rli)->last_assigned_worker=
+ worker= get_slave_worker(*ref_cur_db, const_cast<Relay_log_info *>(rli));
+ get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+ if (g.worker_id == (ulong) -1) // assign "offically" the current group
+ {
+ g.worker_id= worker->id; // todo/fixme: think of Slave_worker* here
+ set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+
+ DBUG_ASSERT(g.group_relay_log_name == NULL);
+ }
}
+ // releasing the Coord's mem-root from the updated dbs
+ // if (get_type_code() == QUERY_EVENT)
+ free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
else // r
if (rli->last_assigned_worker)
@@ -2987,25 +2995,29 @@ int slave_worker_exec_job(Slave_worker *
{
if (ev->contains_partition_info())
{
+ List_iterator<char> it(*ev->mts_get_dbs());
DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
- uint i;
- char key[NAME_LEN + 2];
- bool found= FALSE;
- const char *dbname= ev->get_db();
- uchar dblength= (uint) strlen(dbname);
- for (i= 0; i < ep->elements && !found; i++)
+ while (it++)
{
- get_dynamic(ep, (uchar*) key, i);
- found=
- (key[0] == dblength) &&
- (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
- }
- if (!found)
- {
- key[0]= dblength;
- memcpy(key + 1, dbname, dblength + 1);
- insert_dynamic(ep, (uchar*) key);
+ bool found= FALSE;
+ char key[NAME_LEN + 2];
+ const char *dbname= *it.ref();
+ uchar dblength= (uint) strlen(dbname);
+
+ for (uint i= 0; i < ep->elements && !found; i++)
+ {
+ get_dynamic(ep, (uchar*) key, i);
+ found=
+ (key[0] == dblength) &&
+ (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+ }
+ if (!found)
+ {
+ key[0]= dblength;
+ memcpy(key + 1, dbname, dblength + 1);
+ insert_dynamic(ep, (uchar*) key);
+ }
}
}
}
@@ -3089,6 +3101,8 @@ err:
if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
delete ev; // after ev->update_pos() event is garbage
+ free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
+
DBUG_RETURN(error);
}
@@ -3354,6 +3368,30 @@ bool Query_log_event::write(IO_CACHE* fi
start+= host.length;
}
}
+
+ if (thd->get_binlog_updated_db_names() != NULL)
+ {
+ uchar dbs;
+ *start++= Q_UPDATED_DB_NAMES;
+ dbs= *start++= thd->get_binlog_updated_db_names()->elements;
+
+ DBUG_ASSERT(dbs != 0);
+ /*
+ MAX_DBS_IN_QUERY_MTS + 1 is special no db:s is written
+ and event requires the sequential applying on slave.
+ */
+ if (dbs != MAX_DBS_IN_QUERY_MTS + 1)
+ {
+ List_iterator_fast<char> it(*thd->get_binlog_updated_db_names());
+ char *db_name;
+ while ((db_name= it++))
+ {
+ strcpy((char*) start, db_name);
+ start += strlen(db_name) + 1;
+ }
+ }
+ }
+
/*
NOTE: When adding new status vars, please don't forget to update
the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function
@@ -3437,7 +3475,7 @@ Query_log_event::Query_log_event(THD* th
lc_time_names_number(thd_arg->variables.lc_time_names->number),
charset_database_number(0),
table_map_for_update((ulonglong)thd_arg->table_map_for_update),
- master_data_written(0)
+ master_data_written(0), mts_updated_dbs(0)
{
time_t end_time;
@@ -3638,6 +3676,7 @@ code_name(int code)
case Q_CHARSET_DATABASE_CODE: return "Q_CHARSET_DATABASE_CODE";
case Q_TABLE_MAP_FOR_UPDATE_CODE: return "Q_TABLE_MAP_FOR_UPDATE_CODE";
case Q_MASTER_DATA_WRITTEN_CODE: return "Q_MASTER_DATA_WRITTEN_CODE";
+ case Q_UPDATED_DB_NAMES: return "Q_UPDATED_DB_NAMES";
}
sprintf(buf, "CODE#%d", code);
return buf;
@@ -3675,7 +3714,8 @@ Query_log_event::Query_log_event(const c
flags2_inited(0), sql_mode_inited(0), charset_inited(0),
auto_increment_increment(1), auto_increment_offset(1),
time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
- table_map_for_update(0), master_data_written(0)
+ table_map_for_update(0), master_data_written(0),
+ mts_updated_dbs(MAX_DBS_IN_QUERY_MTS + 1)
{
ulong data_len;
uint32 tmp;
@@ -3683,6 +3723,7 @@ Query_log_event::Query_log_event(const c
Log_event::Byte *start;
const Log_event::Byte *end;
bool catalog_nz= 1;
+
DBUG_ENTER("Query_log_event::Query_log_event(char*,...)");
memset(&user, 0, sizeof(user));
@@ -3854,6 +3895,23 @@ Query_log_event::Query_log_event(const c
CHECK_SPACE(pos, end, host.length);
host.str= (char *)pos;
pos+= host.length;
+ break;
+ }
+ case Q_UPDATED_DB_NAMES:
+ {
+ CHECK_SPACE(pos, end, 1);
+ mts_updated_dbs= *pos++;
+ if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
+ break;
+
+ DBUG_ASSERT(mts_updated_dbs != 0);
+
+ for (uchar i= 0; i < mts_updated_dbs; i++)
+ {
+ strcpy(mts_updated_db_names[i], (char*) pos);
+ pos+= 1 + strlen((const char*) pos);
+ }
+ break;
}
default:
/* That's why you must write status vars in growing order of code */
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-01-11 23:01:02 +0000
+++ b/sql/log_event.h 2011-02-03 09:28:27 +0000
@@ -258,6 +258,14 @@ struct sql_ex_info
#define INCIDENT_HEADER_LEN 2
#define HEARTBEAT_HEADER_LEN 0
#define IGNORABLE_HEADER_LEN 0
+
+/**
+ MTS parameter. The maximum number of databases that a query modifies
+ and be logged in the binarry log with a special status variable containing
+ the database names to facilitate the parallel applying of the Query-event.
+*/
+#define MAX_DBS_IN_QUERY_MTS 16
+
/*
Max number of possible extra bytes in a replication event compared to a
packet (i.e. a query) sent from client to master;
@@ -273,6 +281,8 @@ struct sql_ex_info
1 + 2 /* type, charset_database_number */ + \
1 + 8 /* type, table_map_for_update */ + \
1 + 4 /* type, master_data_written */ + \
+ /* type, db_1, db_2, ... */ \
+ 1 + (MAX_DBS_IN_QUERY_MTS * (1 + NAME_LEN)) + \
1 + 16 + 1 + 60/* type, user_len, user, host_len, host */)
#define MAX_LOG_EVENT_HEADER ( /* in order of Query_log_event::write */ \
LOG_EVENT_HEADER_LEN + /* write_header */ \
@@ -344,6 +354,8 @@ struct sql_ex_info
#define Q_INVOKER 11
+#define Q_UPDATED_DB_NAMES 12
+
/* Intvar event post-header */
/* Intvar event data */
@@ -1069,6 +1081,14 @@ public:
{
return thd ? thd->db : 0;
}
+
+ virtual List<char>* mts_get_dbs()
+ {
+ List<char> *res= new List<char>;
+ res->push_back(strdup(get_db()));
+ return res;
+ }
+
#else
Log_event() : temp_buf(0) {}
/* avoid having to link mysqlbinlog against libpthread */
@@ -1851,12 +1871,26 @@ public:
Q_MASTER_DATA_WRITTEN_CODE to the slave's server binlog.
*/
uint32 master_data_written;
+ /*
+ number of updated db:s by the query and their names. This info
+ is requested by both Coordinator and Worker.
+ */
+ uchar mts_updated_dbs;
+ char mts_updated_db_names[MAX_DBS_IN_QUERY_MTS][NAME_LEN];
#ifdef MYSQL_SERVER
Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
bool using_trans, bool direct, bool suppress_use, int error);
const char* get_db() { return db; }
+ virtual List<char>* mts_get_dbs()
+ {
+ DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs < MAX_DBS_IN_QUERY_MTS + 1);
+ List<char> *res= new List<char>;
+ for (uchar i= 0; i < mts_updated_dbs; i++)
+ res->push_back(mts_updated_db_names[i]);
+ return res;
+ }
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
#endif /* HAVE_REPLICATION */
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-01-20 13:39:00 +0000
+++ b/sql/rpl_rli.cc 2011-02-03 09:28:27 +0000
@@ -223,11 +223,13 @@ THD* mts_get_worker_thd()
NULL : w->w_rli->info_thd;
}
+// TODO: remove the query in parallel option, rename mts_is_worker and
+// s/ SYSTEM_THREAD_SLAVE_SQL/SYSTEM_THREAD_SLAVE_WORKER/
bool mts_is_coord_or_worker(THD *thd)
{
return
thd->slave_thread &&
- thd->system_thread != SYSTEM_THREAD_SLAVE_IO &&
+ thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
(mts_get_worker_thd() != NULL);
}
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-01-20 13:39:00 +0000
+++ b/sql/rpl_slave.cc 2011-02-03 09:28:27 +0000
@@ -3736,7 +3736,7 @@ pthread_handler_t handle_slave_worker(vo
thd->thread_stack = (char*)&thd;
pthread_detach_this_thread();
- if (init_slave_thread(thd, SLAVE_THD_SQL))
+ if (init_slave_thread(thd, SLAVE_THD_SQL)) // todo: make thd->sys_thr= worker
{
// todo make SQL thread killed
sql_print_error("Failed during slave worker initialization");
@@ -3804,6 +3804,7 @@ err:
{
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
+ thd->system_thread= NON_SYSTEM_THREAD; // tt closing work/around
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
}
=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc 2010-12-17 16:14:15 +0000
+++ b/sql/sql_class.cc 2011-02-03 09:28:27 +0000
@@ -503,6 +503,7 @@ THD::THD()
user_time(0), in_sub_stmt(0),
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
+ binlog_updated_db_names(NULL),
table_map_for_update(0),
arg_of_last_insert_id_function(FALSE),
first_successful_insert_id_in_prev_stmt(0),
@@ -981,7 +982,6 @@ void THD::init_for_queries()
transaction.xid_state.in_thd=1;
}
-
/*
Do what's needed when one invokes change user
@@ -1069,7 +1069,6 @@ void THD::cleanup(void)
mysql_mutex_unlock(&LOCK_user_locks);
ull= NULL;
}
-
cleanup_done=1;
DBUG_VOID_RETURN;
}
@@ -1414,6 +1413,7 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */
table_map_for_update= 0;
m_binlog_invoker= FALSE;
+ binlog_updated_db_names= NULL;
}
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h 2011-01-11 23:01:02 +0000
+++ b/sql/sql_class.h 2011-02-03 09:28:27 +0000
@@ -1721,6 +1721,11 @@ private:
transaction cache.
*/
uint binlog_table_maps;
+ /*
+ String space-separated db names listing to be updated by the query databases
+ */
+ List<char> *binlog_updated_db_names;
+
public:
void issue_unsafe_warnings();
@@ -1730,6 +1735,9 @@ public:
void clear_binlog_table_maps() {
binlog_table_maps= 0;
}
+ List<char> * get_binlog_updated_db_names() {
+ return binlog_updated_db_names;
+ }
#endif /* MYSQL_CLIENT */
public:
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110203092827-kd9tg0kdsqtnrizu.bundle