#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3247 Andrei Elkin 2010-12-16
wl#5569 MTS
Adding transparent support/fallback to the sequential execution cases of
1. Query-log-event
2. Rows_query_log_event info event
Both cases can be fully parallelized in future project.
Fixing an issue in move_queue_head() that was surficed as an assert in Slave_worker::slave_worker_group_ends().
Fixing destoying an event by Worker.
@ mysql-test/extra/rpl_tests/rpl_parallel_load.test
Edited to compare all involved tables, add explicitly multi-statement transaction,
and letting verification even in a case of statement format events.
@ mysql-test/suite/rpl/r/rpl_parallel_fallback.result
new results file is added.
@ mysql-test/suite/rpl/t/rpl_parallel.test
is changed to run with all formats because it starts verifying the transparent fallback
to sequential for Query-log-event and related.
@ mysql-test/suite/rpl/t/rpl_parallel_fallback.test
A new test file is added to contain cases of transparent fallback to the sequential execution.
Rows_query_log_event case is placed there.
Notice, the Query-log-event fallback is largely tested by rpl_parallel.
@ sql/log_event.cc
Refining event distribution logics in order to support fallback to the sequential execution.
The following definions are framed out
curr_group_is_parallel - indicates a Worker is engaged for all operation incl dectruction
for all events of the group.
The value lasts till a next group is decided to be "pure" sequential so that C will execute
it, update rli synchronously and destroy the events.
curr_group_seen_begin - indicates if the current group is started with a B-event (BEGIN query).
The value lasts till T-event is distributed.
Deploying a w/a for Rows_query_log_event that involves a nap to protect from a case
of multiple Rows_query_log in one group. Notice, a specific (w/a as well) rule of destroying
the event.
@ sql/log_event.h
Rows_query_log_event fallback to sequential support is added.
@ sql/rpl_rli.cc
Rows_query_log_event fallback to sequential support is added.
@ sql/rpl_rli.h
curr_group_isolated is defined to be a parallel group that is executed in isolation from
any other ahead and behind workers. Coordinator is supposed to provide such environment,
the new member is a facility to control it.
@ sql/rpl_rli_pdb.cc
Fixing usage of circular_buffer_queue::gt() to deploy an assert suggested by
the heading comments.
Refining logics of finding a gap in GAQ.
Adding 2nd arg to wait_for_workers...() to cover the 2nd use case of waiting Workers by C.
The two are: wait for all, and wait for all but not one being currently scheduled.
@ sql/rpl_slave.cc
Refining logics of C's commit to the main rli due to a pure sequential event (e.g FD, Rotate),
similarly refining logics of freeing.
Deploying a w/a for Rows_query_log_event.
added:
mysql-test/suite/rpl/r/rpl_parallel_fallback.result
mysql-test/suite/rpl/t/rpl_parallel-master.opt
mysql-test/suite/rpl/t/rpl_parallel_fallback.test
modified:
mysql-test/extra/rpl_tests/rpl_parallel_load.test
mysql-test/r/mysqld--help-win.result
mysql-test/suite/rpl/r/rpl_parallel.result
mysql-test/suite/rpl/r/rpl_sequential.result
mysql-test/suite/rpl/t/rpl_parallel-slave.opt
mysql-test/suite/rpl/t/rpl_parallel.test
mysql-test/suite/sys_vars/r/all_vars.result
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-12-14 08:57:16 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-12-16 21:41:45 +0000
@@ -6,7 +6,7 @@
# load volume parameter
#
-let $iter = 2000;
+let $iter = 1000;
let $databases = 4;
connection slave;
@@ -71,10 +71,26 @@ while($i)
eval create database test$i1;
eval use test$i1;
- create table tm_nk (a int, b int) engine=myisam;
- create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
- create table ti_nk (a int, b int) engine=innodb;
- create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+ create table tm_nk (a int, b int, c text) engine=myisam;
+ create table tm_wk (a int auto_increment primary key, b int, c text) engine=myisam;
+ create table ti_nk (a int, b int, c text) engine=innodb;
+ create table ti_wk (a int auto_increment primary key, b int, c text) engine=innodb;
+
+ if (`select @@global.binlog_format like 'statement'`)
+ {
+ create view v_tm_nk as select a,b from tm_nk;
+ create view v_tm_wk as select a,b from tm_wk;
+ create view v_ti_nk as select a,b from ti_nk;
+ create view v_ti_wk as select a,b from ti_wk;
+ }
+
+ if (`select @@global.binlog_format not like 'statement'`)
+ {
+ create view v_tm_nk as select a,b,c from tm_nk;
+ create view v_tm_wk as select a,b,c from tm_wk;
+ create view v_ti_nk as select a,b,c from ti_nk;
+ create view v_ti_wk as select a,b,c from ti_wk;
+ }
# this table is special - just for timing. It's more special on test0 db
# where it contains master timing of the load as well.
@@ -158,11 +174,19 @@ while ($iter)
eval use test$i1;
##call test.one_session(1);
- eval insert into tm_nk values($iter, $i1);
- eval insert into tm_wk values(null, $i1);
- eval insert into ti_nk values($iter, $i1);
- eval insert into ti_wk values(null, $i1);
-
+ eval insert into tm_nk values($iter, $i1, uuid());
+ eval insert into tm_wk values(null, $i1, uuid());
+ eval insert into ti_nk values($iter, $i1, uuid());
+ eval insert into ti_wk values(null, $i1, uuid());
+
+ begin;
+ eval insert into tm_nk values($iter, $i1, repeat('a', round(rand()*10)));
+ eval insert into tm_wk values(null, $i1, uuid());
+ commit;
+ begin;
+ eval insert into ti_nk values($iter, $i1, uuid());
+ eval insert into ti_wk values(null, $i1, repeat('a', round(rand()*10)));
+ commit;
dec $i;
}
@@ -226,21 +250,34 @@ select ts from test0.benchmark where sta
select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m',
time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
+--enable_result_log
+--enable_query_log
+
let $i = $databases + 1;
while($i)
{
let $i1 = $i;
dec $i1;
- let $diff_table_1=master:test$i1.tm_nk;
- let $diff_table_2=slave:test$i1.tm_nk;
+ let $diff_table_1=master:test$i1.v_tm_nk;
+ let $diff_table_2=slave:test$i1.v_tm_nk;
+ source include/diff_tables.inc;
+
+ let $diff_table_1=master:test$i1.v_ti_nk;
+ let $diff_table_2=slave:test$i1.v_ti_nk;
+ source include/diff_tables.inc;
+
+ let $diff_table_1=master:test$i1.v_tm_wk;
+ let $diff_table_2=slave:test$i1.v_tm_wk;
+ source include/diff_tables.inc;
+
+ let $diff_table_1=master:test$i1.v_ti_wk;
+ let $diff_table_2=slave:test$i1.v_ti_wk;
source include/diff_tables.inc;
dec $i;
}
---enable_result_log
---enable_query_log
connection master;
=== modified file 'mysql-test/r/mysqld--help-win.result'
--- a/mysql-test/r/mysqld--help-win.result 2010-12-14 12:51:30 +0000
+++ b/mysql-test/r/mysqld--help-win.result 2010-12-16 21:41:45 +0000
@@ -345,6 +345,16 @@ The following options may be given as th
disk after every #th milli-seconds. The zero value
disables the checkpoint routine (makes sense for
debugging).
+ --mts-coordinator-basic-nap=#
+ Time in msec to sleep by MTS Coordinator to avoid the
+ Worker queues room overrun
+ --mts-exp-slave-local-timestamp
+ If enabled slave itself computes the event appying time
+ value to implicitly affected timestamp columms. Otherwise
+ (default) it installs prescribed by the master value
+ --mts-exp-slave-run-query-in-parallel
+ The default not an actual database name is used as
+ partition info for parallel execution of Query_log_event
--mts-partition-hash-soft-max=#
Number of records in the mts partition hash below which
entries with zero usage are tolerated
@@ -352,6 +362,8 @@ The following options may be given as th
Max size of Slave Worker queues holding yet not applied
events.The least possible value must be not less than the
master size max_allowed_packet.
+ --mts-slave-parallel-workers=#
+ Number of worker threads for executing events in parallel
--mts-slave-worker-queue-len-max=#
Max length of one MTS Worker queue. Presence in the queue
indicates a replication event was read out of Relay log
@@ -360,6 +372,9 @@ The following options may be given as th
Whichever limit is reached Coordinator threadsuspends
further jobs assigning until conditions have been
improved.
+ --mts-worker-underrun-level=#
+ percent of Worker queue size at which Worker is
+ considered to become hungry
--multi-range-count=#
Number of key ranges to request at once
--myisam-block-size=#
@@ -415,12 +430,6 @@ The following options may be given as th
value is 0 then mysqld will reserve max_connections*5 or
max_connections + table_cache*2 (whichever is larger)
number of file descriptors
- --opt-mts-coordinator-basic-nap=#
- Time in msec to sleep by MTS Coordinator to avoid the
- Worker queues room overrun
- --opt-mts-worker-underrun-level=#
- percent of Worker queue size at which Worker is
- considered to become hungry
--optimizer-join-cache-level=#
Controls what join operations can be executed with join
buffers. Odd numbers are used for plain join buffers
@@ -664,18 +673,9 @@ The following options may be given as th
--slave-load-tmpdir=name
The location where the slave should put its temporary
files when replicating a LOAD DATA INFILE command
- --slave-local-timestamp
- if enabled slave computes the event appying time value to
- implicitly affected timestamp columms. Otherwise
- (default) installs prescribed by the master value
--slave-net-timeout=#
Number of seconds to wait for more data from a
master/slave connection before aborting the read
- --slave-parallel-workers=#
- Number of worker threads for executing events in parallel
- --slave-run-query-in-parallel
- The default not an actual database name is used as
- partition info for parallel execution of Query_log_event
--slave-skip-errors=name
Tells the slave thread to continue replication when a
query event returns an error from the provided list
@@ -899,9 +899,14 @@ max-write-lock-count 1844674407370955161
memlock FALSE
min-examined-row-limit 0
mts-checkpoint-period 300
+mts-coordinator-basic-nap 5
+mts-exp-slave-local-timestamp FALSE
+mts-exp-slave-run-query-in-parallel FALSE
mts-partition-hash-soft-max 16
mts-pending-jobs-size-max 16777216
+mts-slave-parallel-workers 0
mts-slave-worker-queue-len-max 40000
+mts-worker-underrun-level 0
multi-range-count 256
myisam-block-size 1024
myisam-data-pointer-size 6
@@ -922,8 +927,6 @@ old FALSE
old-alter-table FALSE
old-passwords FALSE
old-style-user-limits FALSE
-opt-mts-coordinator-basic-nap 5
-opt-mts-worker-underrun-level 0
optimizer-join-cache-level 4
optimizer-prune-level 1
optimizer-search-depth 62
@@ -989,10 +992,7 @@ skip-show-database FALSE
skip-slave-start FALSE
slave-compressed-protocol FALSE
slave-exec-mode STRICT
-slave-local-timestamp FALSE
slave-net-timeout 3600
-slave-parallel-workers 0
-slave-run-query-in-parallel FALSE
slave-skip-errors (No default value)
slave-sql-verify-checksum TRUE
slave-transaction-retries 10
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result 2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result 2010-12-16 21:41:45 +0000
@@ -24,9 +24,21 @@ select ts from test0.benchmark where sta
select ts from test0.benchmark where state like 'slave ends load' into @s_1;
select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m',
time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
-Comparing tables master:test3.tm_nk and slave:test3.tm_nk
-Comparing tables master:test2.tm_nk and slave:test2.tm_nk
-Comparing tables master:test1.tm_nk and slave:test1.tm_nk
-Comparing tables master:test0.tm_nk and slave:test0.tm_nk
+Comparing tables master:test3.v_tm_nk and slave:test3.v_tm_nk
+Comparing tables master:test3.v_ti_nk and slave:test3.v_ti_nk
+Comparing tables master:test3.v_tm_wk and slave:test3.v_tm_wk
+Comparing tables master:test3.v_ti_wk and slave:test3.v_ti_wk
+Comparing tables master:test2.v_tm_nk and slave:test2.v_tm_nk
+Comparing tables master:test2.v_ti_nk and slave:test2.v_ti_nk
+Comparing tables master:test2.v_tm_wk and slave:test2.v_tm_wk
+Comparing tables master:test2.v_ti_wk and slave:test2.v_ti_wk
+Comparing tables master:test1.v_tm_nk and slave:test1.v_tm_nk
+Comparing tables master:test1.v_ti_nk and slave:test1.v_ti_nk
+Comparing tables master:test1.v_tm_wk and slave:test1.v_tm_wk
+Comparing tables master:test1.v_ti_wk and slave:test1.v_ti_wk
+Comparing tables master:test0.v_tm_nk and slave:test0.v_tm_nk
+Comparing tables master:test0.v_ti_nk and slave:test0.v_ti_nk
+Comparing tables master:test0.v_tm_wk and slave:test0.v_tm_wk
+Comparing tables master:test0.v_ti_wk and slave:test0.v_ti_wk
set @@global.mts_exp_slave_local_timestamp= @save.mts_exp_slave_local_timestamp;
set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_fallback.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_fallback.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_fallback.result 2010-12-16 21:41:45 +0000
@@ -0,0 +1,24 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
+Warnings:
+Note 1724 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+include/start_slave.inc
+set @@session.binlog_format= row;
+create database d0;
+create table d0.t1 (a int auto_increment primary key) engine=innodb;
+set @@session.binlog_format= row;
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+set @@session.binlog_rows_query_log_events= 1;
+set @@session.binlog_rows_query_log_events= 0;
+Comparing tables master:d0.t1 and slave:d0.t1
+drop database d0;
+drop database d1;
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
=== modified file 'mysql-test/suite/rpl/r/rpl_sequential.result'
--- a/mysql-test/suite/rpl/r/rpl_sequential.result 2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/r/rpl_sequential.result 2010-12-16 21:41:45 +0000
@@ -20,8 +20,20 @@ select ts from test0.benchmark where sta
select ts from test0.benchmark where state like 'slave ends load' into @s_1;
select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m',
time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
-Comparing tables master:test3.tm_nk and slave:test3.tm_nk
-Comparing tables master:test2.tm_nk and slave:test2.tm_nk
-Comparing tables master:test1.tm_nk and slave:test1.tm_nk
-Comparing tables master:test0.tm_nk and slave:test0.tm_nk
+Comparing tables master:test3.v_tm_nk and slave:test3.v_tm_nk
+Comparing tables master:test3.v_ti_nk and slave:test3.v_ti_nk
+Comparing tables master:test3.v_tm_wk and slave:test3.v_tm_wk
+Comparing tables master:test3.v_ti_wk and slave:test3.v_ti_wk
+Comparing tables master:test2.v_tm_nk and slave:test2.v_tm_nk
+Comparing tables master:test2.v_ti_nk and slave:test2.v_ti_nk
+Comparing tables master:test2.v_tm_wk and slave:test2.v_tm_wk
+Comparing tables master:test2.v_ti_wk and slave:test2.v_ti_wk
+Comparing tables master:test1.v_tm_nk and slave:test1.v_tm_nk
+Comparing tables master:test1.v_ti_nk and slave:test1.v_ti_nk
+Comparing tables master:test1.v_tm_wk and slave:test1.v_tm_wk
+Comparing tables master:test1.v_ti_wk and slave:test1.v_ti_wk
+Comparing tables master:test0.v_tm_nk and slave:test0.v_tm_nk
+Comparing tables master:test0.v_ti_nk and slave:test0.v_ti_nk
+Comparing tables master:test0.v_tm_wk and slave:test0.v_tm_wk
+Comparing tables master:test0.v_ti_wk and slave:test0.v_ti_wk
set @@global.mts_exp_slave_local_timestamp= @save.mts_exp_slave_local_timestamp;
=== added file 'mysql-test/suite/rpl/t/rpl_parallel-master.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel-master.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel-master.opt 2010-12-16 21:41:45 +0000
@@ -0,0 +1 @@
+--log-warnings=0
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel-slave.opt 2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel-slave.opt 2010-12-16 21:41:45 +0000
@@ -1 +1,2 @@
---mts-slave-parallel-workers=4
+--mts-slave-parallel-workers=4 --log-warnings=0
+
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test 2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2010-12-16 21:41:45 +0000
@@ -34,7 +34,7 @@
#
source include/master-slave.inc;
-source include/have_binlog_format_row.inc;
+# source include/have_binlog_format_row.inc;
connection slave;
set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_fallback.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_fallback.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_fallback.test 2010-12-16 21:41:45 +0000
@@ -0,0 +1,93 @@
+#
+# WL#5569 MTS
+#
+# The test lists cases of transparent fallback to the sequential execution and
+# verifies it correctness.
+# Notice, the Query-log-event fallback is largely tested by rpl_parallel.
+#
+
+source include/master-slave.inc;
+source include/have_binlog_format_mixed.inc;
+
+let $workers= 4;
+
+connection slave;
+
+# restart in Parallel
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+source include/start_slave.inc;
+
+connection master;
+
+set @@session.binlog_format= row;
+create database d0;
+create table d0.t1 (a int auto_increment primary key) engine=innodb;
+
+connection master1;
+
+set @@session.binlog_format= row;
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+
+#
+# Rows_query_log_event case
+#
+
+let $iter= 100;
+let $i= $iter;
+
+connection master;
+set @@session.binlog_rows_query_log_events= 1;
+
+connection master1;
+set @@session.binlog_rows_query_log_events= 0;
+
+--disable_query_log
+--disable_result_log
+
+while ($i)
+{
+ connection master;
+ begin;
+ insert into d0.t1 values(null);
+ insert into d1.t1 values(null);
+ commit;
+
+ connection master1;
+ begin;
+ insert into d1.t1 values(null);
+ insert into d0.t1 values(null);
+ commit;
+
+ dec $i;
+}
+
+--enable_result_log
+--enable_query_log
+
+sync_slave_with_master;
+
+# verification
+
+let $diff_table_1=master:d0.t1;
+let $diff_table_2= slave:d0.t1;
+source include/diff_tables.inc;
+
+
+#
+# cleanup
+#
+
+connection master;
+
+drop database d0;
+drop database d1;
+
+
+sync_slave_with_master;
+#connection slave;
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+
=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result 2010-12-13 21:16:31 +0000
+++ b/mysql-test/suite/sys_vars/r/all_vars.result 2010-12-16 21:41:45 +0000
@@ -11,16 +11,17 @@ There should be *no* long test name list
select variable_name as `There should be *no* variables listed below:` from t2
left join t1 on variable_name=test_name where test_name is null;
There should be *no* variables listed below:
-OPT_MTS_COORDINATOR_BASIC_NAP
INNODB_STATS_TRANSIENT_SAMPLE_PAGES
MTS_PARTITION_HASH_SOFT_MAX
-OPT_MTS_WORKER_UNDERRUN_LEVEL
-SLAVE_RUN_QUERY_IN_PARALLEL
+MTS_EXP_SLAVE_LOCAL_TIMESTAMP
+MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
INNODB_STATS_PERSISTENT_SAMPLE_PAGES
RELAY_LOG_BASENAME
LOG_BIN_BASENAME
INNODB_ANALYZE_IS_PERSISTENT
INNODB_RESET_MONITOR_COUNTER
+MTS_SLAVE_PARALLEL_WORKERS
+MTS_WORKER_UNDERRUN_LEVEL
INNODB_RESET_ALL_MONITOR_COUNTER
LOG_BIN_INDEX
INNODB_DISABLE_MONITOR_COUNTER
@@ -28,18 +29,18 @@ INNODB_ENABLE_MONITOR_COUNTER
MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_FILE_FORMAT_MAX
MTS_PENDING_JOBS_SIZE_MAX
-SLAVE_PARALLEL_WORKERS
-SLAVE_LOCAL_TIMESTAMP
-OPT_MTS_COORDINATOR_BASIC_NAP
+MTS_COORDINATOR_BASIC_NAP
INNODB_STATS_TRANSIENT_SAMPLE_PAGES
MTS_PARTITION_HASH_SOFT_MAX
-OPT_MTS_WORKER_UNDERRUN_LEVEL
-SLAVE_RUN_QUERY_IN_PARALLEL
+MTS_EXP_SLAVE_LOCAL_TIMESTAMP
+MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
INNODB_STATS_PERSISTENT_SAMPLE_PAGES
RELAY_LOG_BASENAME
LOG_BIN_BASENAME
INNODB_ANALYZE_IS_PERSISTENT
INNODB_RESET_MONITOR_COUNTER
+MTS_SLAVE_PARALLEL_WORKERS
+MTS_WORKER_UNDERRUN_LEVEL
INNODB_RESET_ALL_MONITOR_COUNTER
LOG_BIN_INDEX
INNODB_DISABLE_MONITOR_COUNTER
@@ -47,7 +48,6 @@ INNODB_ENABLE_MONITOR_COUNTER
MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_FILE_FORMAT_MAX
MTS_PENDING_JOBS_SIZE_MAX
-SLAVE_PARALLEL_WORKERS
-SLAVE_LOCAL_TIMESTAMP
+MTS_COORDINATOR_BASIC_NAP
drop table t1;
drop table t2;
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-12-14 08:57:16 +0000
+++ b/sql/log_event.cc 2010-12-16 21:41:45 +0000
@@ -176,7 +176,8 @@ void handle_rows_query_log_event(Log_eve
ev_type == UPDATE_ROWS_EVENT) && rli->rows_query_ev != NULL &&
((Rows_log_event*) ev)->get_flags(Rows_log_event::STMT_END_F))
{
- delete rli->rows_query_ev;
+ if (rli->rows_query_ev)
+ delete rli->rows_query_ev;
rli->rows_query_ev= NULL;
rli->info_thd->set_query(NULL, 0);
}
@@ -2365,8 +2366,8 @@ bool Log_event::contains_partition_info(
{
return get_type_code() == TABLE_MAP_EVENT ||
- // todo: the 4 types below are limitly parallel-supported (the default
- // session db not the actual db)
+ // todo: Query event is limitly supported
+ // which ev->get_db() yields the session db not the actual db
get_type_code() == QUERY_EVENT;
}
@@ -2449,12 +2450,14 @@ Slave_worker *Log_event::get_slave_worke
// mark the current grup as started with B-event
const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= TRUE;
- return NULL;
}
else
{
DBUG_ASSERT(!rli->curr_group_seen_begin);
}
+ // serves as a mark for Coord to delete events otherwise
+ const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE;
+ return NULL;
}
//else // g
@@ -2471,7 +2474,6 @@ Slave_worker *Log_event::get_slave_worke
set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
DBUG_ASSERT(g.group_relay_log_name == NULL);
-
}
}
else // r
@@ -2502,6 +2504,7 @@ Slave_worker *Log_event::get_slave_worke
(Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+ DBUG_ASSERT(rli->curr_group_is_parallel);
// TODO: throw an error when relay-log reading starts from inside of a group!!
@@ -2560,7 +2563,6 @@ Slave_worker *Log_event::get_slave_worke
// reset the B-group marker
const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
- const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE; // mark for Coord's T-event delete
}
return worker;
@@ -2748,16 +2750,22 @@ int Log_event::apply_event(Relay_log_inf
Slave_job_item item= {NULL}, *job_item= &item;
Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
bool parallel;
+ bool seq_event;
+ bool term_event;
if (!(parallel= rli->is_parallel_exec()) ||
- only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
+ ((seq_event=
+ only_sequential_exec(rli->run_query_in_parallel,
+ rli->curr_group_seen_begin /* todo: obs 2nd arg */))
+ // rli->curr_group_seen_begin && ends_group() => rli->last_assigned_worker
+ && !rli->curr_group_seen_begin))
{
if (parallel)
{
- // This `only-sequential' case relates to Query parallel apply which
- // breaks into DDL and {B, Q, T} group, where Q owns g-parallel property.
+ // This `only-sequential' case relates to a DDL Query case
- // Apply possibly deferred B
+ DBUG_ASSERT(rli->curr_group_da.elements == 0);
+#if 0
if (rli->curr_group_da.elements > 0)
{
int res;
@@ -2778,27 +2786,45 @@ int Log_event::apply_event(Relay_log_inf
}
res= ev_begin->do_apply_event(rli);
delete ev_begin;
- /* B appears to be serial, reset parallel status of group
- because the following T won't do that */
- c_rli->curr_group_seen_begin= FALSE;
-
+ c_rli->curr_group_is_parallel= FALSE; // Coord will destruct events
if (res)
DBUG_RETURN(res);
}
-
+#endif
DBUG_ASSERT(!rli->curr_group_seen_begin);
- c_rli->curr_group_is_parallel= FALSE; // Coord will destruct all the rest of events
+
+ c_rli->curr_group_is_parallel= FALSE; // Coord will destruct events
if (!parallel_exec_by_coordinator(::server_id))
(void) wait_for_workers_to_finish(rli);
}
DBUG_RETURN(do_apply_event(rli));
}
+
+ DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
+ rli->last_assigned_worker);
- if (get_type_code() == ROWS_QUERY_LOG_EVENT)
- {
- rli->report(ERROR_LEVEL, 0,
- "No parallel support for ROWS_QUERY_LOG_EVENT");
- DBUG_RETURN(1);
+ /*
+ Work-around:s for B, T,..., Q case and ROWS_QUERY_LOG_EVENT
+ A worker has been assigned but it needs sequential environment.
+
+ Todo: support Query parallelization.
+ Todo: disassociate Rows_* events from the central rli.
+ */
+ if (seq_event)
+ { // rli->last_assigned_worker != NULL if BTQ but not BQT
+ DBUG_ASSERT(rli->curr_group_seen_begin || ends_group());
+ if (!c_rli->curr_group_isolated)
+ (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
+ c_rli->curr_group_isolated= TRUE;
+
+ if (get_type_code() == ROWS_QUERY_LOG_EVENT)
+ {
+ while (c_rli->rows_query_ev != NULL)
+ {
+ my_sleep(10);
+ }
+ c_rli->rows_query_ev= (Rows_query_log_event*) this;
+ }
}
if ((!(w= get_slave_worker_id(rli)) ||
@@ -2832,7 +2858,17 @@ int Log_event::apply_event(Relay_log_inf
c_rli->curr_group_da.elements= 0;
}
+ if (c_rli->curr_group_isolated)
+ term_event= ends_group();
+
append_item_to_jobs(job_item, w, c_rli);
+
+ if (c_rli->curr_group_isolated && term_event)
+ {
+ (void) wait_for_workers_to_finish(rli);
+ c_rli->curr_group_isolated= FALSE;
+ }
+
DBUG_RETURN(FALSE);
}
@@ -3016,7 +3052,8 @@ int slave_worker_exec_job(Slave_worker *
err:
- if (!ev)
+ // todo: fix w/a for Rows_query_log_event
+ if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
delete ev; // after ev->update_pos() event is garbage
DBUG_RETURN(error);
@@ -11196,7 +11233,7 @@ Rows_query_log_event::write_data_body(IO
int Rows_query_log_event::do_apply_event(Relay_log_info const *rli)
{
DBUG_ENTER("Rows_query_log_event::do_apply_event");
- DBUG_ASSERT(rli->info_thd == thd);
+ DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
/* Set query for writing Rows_query log event into binlog later.*/
thd->set_query(m_rows_query, (uint32) strlen(m_rows_query));
DBUG_RETURN(0);
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-12-08 00:33:48 +0000
+++ b/sql/log_event.h 2010-12-16 21:41:45 +0000
@@ -1231,6 +1231,9 @@ public:
get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
+
+ get_type_code() == ROWS_QUERY_LOG_EVENT || /* todo: make parallel */
+
get_type_code() == INCIDENT_EVENT;
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-14 12:51:30 +0000
+++ b/sql/rpl_rli.cc 2010-12-16 21:41:45 +0000
@@ -1102,12 +1102,22 @@ void Relay_log_info::cleanup_context(THD
{
trans_rollback_stmt(thd); // if a "statement transaction"
trans_rollback(thd); // if a "real transaction"
+ }
+ /*
+ MTS W/a for Rows_query_log_event.
+ Cleanup of rows_query_ev at the end of the current statement.
+
+ todo: move handle_rows_query_log_event() cleanup logics into this method
+ inconditionally.
+ */
+ if (error || is_parallel_exec())
if (rows_query_ev)
{
delete rows_query_ev;
rows_query_ev= NULL;
+ info_thd->set_query(NULL, 0);
}
- }
+
if (!is_parallel_exec() || thd == info_thd)
m_table_map.clear_tables();
else
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-12-13 16:53:32 +0000
+++ b/sql/rpl_rli.h 2010-12-16 21:41:45 +0000
@@ -390,7 +390,7 @@ public:
uint tables_to_lock_count; /* RBR: Count of tables to lock */
table_mapping m_table_map; /* RBR: Mapping table-id to table */
/* RBR: Record Rows_query log event */
- Rows_query_log_event* rows_query_ev;
+ volatile Rows_query_log_event* rows_query_ev; // mts w/a makes it volatile
bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
{
@@ -449,6 +449,7 @@ public:
DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events
bool curr_group_seen_begin; // current group started with B-event or not
bool run_query_in_parallel; // Query's default db not the actual db as part
+ bool curr_group_isolated; // Trans is exec:d by Worker but in exclusive env
volatile ulong mts_wqs_underrun_w_id; // Id of a Worker whose queue is getting empty
volatile long mts_wqs_overrun; // W to incr and decr
long mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-14 08:57:16 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-16 21:41:45 +0000
@@ -677,7 +677,8 @@ void* circular_buffer_queue::head_queue(
}
/**
- two index comparision.
+ two index comparision to determine which of the two
+ is ordered first.
@note The caller makes sure the args are within the valid
range, incl cases the queue is empty or full.
@@ -688,6 +689,8 @@ void* circular_buffer_queue::head_queue(
*/
bool circular_buffer_queue::gt(ulong i, ulong k)
{
+ DBUG_ASSERT(i < s && k < s);
+
if (i >= e)
if (k >= e)
return i > k;
@@ -738,7 +741,7 @@ ulong Slave_committed_queue::move_queue_
break; /* the head is not even assigned */
get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
- if (gt(i, w_i->last_group_done_index))
+ if (w_i->last_group_done_index == s || gt(i, w_i->last_group_done_index))
break; /* gap at i'th */
// memorize the last met group_relay_log_name
@@ -790,7 +793,7 @@ ulong Slave_committed_queue::move_queue_
}
-int wait_for_workers_to_finish(Relay_log_info const *rli)
+int wait_for_workers_to_finish(Relay_log_info const *rli, Slave_worker *ignore)
{
uint ret= 0;
HASH *hash= &mapping_db_to_worker;
@@ -809,7 +812,13 @@ int wait_for_workers_to_finish(Relay_log
entry= (db_worker*) my_hash_element(hash, i);
DBUG_ASSERT(entry);
-
+
+ if (ignore && entry->worker == ignore)
+ {
+ mysql_mutex_unlock(&slave_worker_hash_lock);
+ continue;
+ }
+
if (entry->usage > 0)
{
sprintf(wait_info, info_format, entry->worker->id, entry->db);
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-10 16:25:27 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-16 21:41:45 +0000
@@ -24,7 +24,8 @@ bool init_hash_workers(ulong slave_paral
void destroy_hash_workers();
Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
-int wait_for_workers_to_finish(Relay_log_info const *rli);
+int wait_for_workers_to_finish(Relay_log_info const *rli,
+ Slave_worker *ignore= NULL);
#define SLAVE_WORKER_QUEUE_SIZE 8096
#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-14 14:46:20 +0000
+++ b/sql/rpl_slave.cc 2010-12-16 21:41:45 +0000
@@ -2817,27 +2817,6 @@ int apply_event_and_update_pos(Log_event
else
mysql_mutex_unlock(&rli->data_lock);
-#ifndef DBUG_OFF
- /*
- This only prints information to the debug trace.
-
- TODO: Print an informational message to the error log?
- */
- static const char *const explain[] = {
- // EVENT_SKIP_NOT,
- "not skipped",
- // EVENT_SKIP_IGNORE,
- "skipped because event should be ignored",
- // EVENT_SKIP_COUNT
- "skipped because event skip counter was non-zero"
- };
- DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
- test(thd->variables.option_bits & OPTION_BEGIN),
- rli->get_flag(Relay_log_info::IN_STMT)));
- DBUG_PRINT("skip_event", ("%s event was %s",
- ev->get_type_str(), explain[reason]));
-#endif
-
DBUG_PRINT("info", ("apply_event error = %d", exec_res));
if (exec_res == 0)
{
@@ -2854,13 +2833,51 @@ int apply_event_and_update_pos(Log_event
*/
int error= 0;
if (skip_event ||
- !rli->is_parallel_exec() ||
- ev->only_sequential_exec(rli->run_query_in_parallel,
- ev->ends_group() ?
- rli->curr_group_is_parallel :
- rli->curr_group_seen_begin))
+ (!rli->is_parallel_exec() ||
+ !rli->curr_group_is_parallel))
{
+ DBUG_ASSERT(skip_event || !rli->is_parallel_exec() ||
+ !rli->curr_group_is_parallel ||
+ (ev->only_sequential_exec(rli->run_query_in_parallel,
+ (rli->curr_group_seen_begin ||
+ rli->last_assigned_worker != NULL))
+ && !rli->curr_group_seen_begin));
+#ifndef DBUG_OFF
+ /*
+ This only prints information to the debug trace.
+
+ TODO: Print an informational message to the error log?
+ */
+ static const char *const explain[] = {
+ // EVENT_SKIP_NOT,
+ "not skipped",
+ // EVENT_SKIP_IGNORE,
+ "skipped because event should be ignored",
+ // EVENT_SKIP_COUNT
+ "skipped because event skip counter was non-zero"
+ };
+ DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
+ test(thd->variables.option_bits & OPTION_BEGIN),
+ rli->get_flag(Relay_log_info::IN_STMT)));
+ DBUG_PRINT("skip_event", ("%s event was %s",
+ ev->get_type_str(), explain[reason]));
+#endif
+
error= ev->update_pos(rli);
+
+#ifndef DBUG_OFF
+ DBUG_PRINT("info", ("update_pos error = %d", error));
+ if (!rli->belongs_to_client())
+ {
+ char buf[22];
+ DBUG_PRINT("info", ("group %s %s",
+ llstr(rli->get_group_relay_log_pos(), buf),
+ rli->get_group_relay_log_name()));
+ DBUG_PRINT("info", ("event %s %s",
+ llstr(rli->get_event_relay_log_pos(), buf),
+ rli->get_event_relay_log_name()));
+ }
+#endif
}
else
{
@@ -2874,28 +2891,15 @@ int apply_event_and_update_pos(Log_event
rli->inc_event_relay_log_pos();
}
-#ifndef DBUG_OFF
- DBUG_PRINT("info", ("update_pos error = %d", error));
- if (!rli->belongs_to_client())
- {
- char buf[22];
- DBUG_PRINT("info", ("group %s %s",
- llstr(rli->get_group_relay_log_pos(), buf),
- rli->get_group_relay_log_name()));
- DBUG_PRINT("info", ("event %s %s",
- llstr(rli->get_event_relay_log_pos(), buf),
- rli->get_event_relay_log_name()));
- }
-#endif
- /*
- The update should not fail, so print an error message and
- return an error code.
-
- TODO: Replace this with a decent error message when merged
- with BUG#24954 (which adds several new error message).
- */
if (error)
{
+ /*
+ The update should not fail, so print an error message and
+ return an error code.
+
+ TODO: Replace this with a decent error message when merged
+ with BUG#24954 (which adds several new error message).
+ */
char buf[22];
rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
"It was not possible to update the positions"
@@ -3034,25 +3038,18 @@ static int exec_relay_log_event(THD* thd
used to read info about the relay log's format; it will be deleted when
the SQL thread does not need it, i.e. when this thread terminates.
*/
- if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ // if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
-
- /*
- WL5363 MTS-II:
-
- ev is deleted by Coordinator when Worker has released the event's item
- from its queue. de_queue() just returns the item for extracting ev and
- processing, and once that has been done the queue get the item as free.
-
- if (slave_exec_mode == SLAVE_PARALLEL)
- {
- if (parallel_terminate(ev))
- delete dyn_array the sequence of events comprising the current group;
- else push(ev, dyn_array);
- }
- else
- */
-
+ if ((!rli->is_parallel_exec() ||
+ !rli->curr_group_is_parallel)
+ && ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ {
+ DBUG_ASSERT(!rli->is_parallel_exec() ||
+ (ev->only_sequential_exec(rli->run_query_in_parallel,
+ // rli->curr_group_is_parallel
+ (rli->curr_group_seen_begin ||
+ rli->last_assigned_worker != NULL))
+ && !rli->curr_group_seen_begin));
/* MTS: Observation/todo.
ROWS_QUERY_LOG_EVENT could be supported easier if
@@ -3060,17 +3057,17 @@ static int exec_relay_log_event(THD* thd
with rli->cleanup_context() and the rest move into
ROWS...::do_apply_event
*/
- if (thd->variables.binlog_rows_query_log_events)
- handle_rows_query_log_event(ev, rli);
-
- if ((!rli->is_parallel_exec() ||
- ev->only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
- && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts TODO: check this case
- {
+
+ if (!rli->is_parallel_exec())
+ if (thd->variables.binlog_rows_query_log_events)
+ handle_rows_query_log_event(ev, rli);
- DBUG_PRINT("info", ("Deleting the event after it has been executed"));
- delete ev;
- ev= NULL;
+ if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+ {
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ delete ev;
+ ev= NULL;
+ }
}
}
@@ -4130,10 +4127,12 @@ int slave_start_workers(Relay_log_info *
rli->mts_coordinator_basic_nap= ::opt_mts_coordinator_basic_nap;
rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
rli->mts_total_groups= 0;
- rli->curr_group_seen_begin= 0;
+ rli->curr_group_seen_begin= FALSE; // initial presumtion, will change
+ rli->curr_group_is_parallel= TRUE; // initial presumtion, will change
+ rli->curr_group_isolated= FALSE;
rli->run_query_in_parallel= opt_mts_slave_run_query_in_parallel;
rli->checkpoint_seqno= 0;
-
+ //rli->worker_bitmap_buf= my_malloc(n/8 + 1,MYF(MY_WME));
for (i= 0; i < n; i++)
{
if ((error= slave_start_single_worker(rli, i)))
@@ -4220,7 +4219,7 @@ void slave_stop_workers(Relay_log_info *
delete_dynamic(&rli->least_occupied_workers); // least occupied
delete_dynamic(&rli->curr_group_da); // GCDA
delete_dynamic(&rli->curr_group_assigned_parts); // GCAP
-
+ //my_free(rli->worker_bitmap_buf);
rli->deinit_workers();
}
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101216214145-cof1c6wxz6fsfp29.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3247) WL#5569 | Andrei Elkin | 16 Dec |