List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 16 2010 9:41pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3247) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3247 Andrei Elkin	2010-12-16
      wl#5569 MTS
      
      Adding transparent support/fallback to the sequential execution cases of
      1. Query-log-event
      2. Rows_query_log_event info event
      
      Both cases can be fully parallelized in future project.
      
      Fixing an issue in move_queue_head() that was surficed as an assert in Slave_worker::slave_worker_group_ends().
      Fixing destoying an event by Worker.
     @ mysql-test/extra/rpl_tests/rpl_parallel_load.test
        Edited to compare all involved tables, add explicitly multi-statement transaction,
        and letting verification even in a case of statement format events.
     @ mysql-test/suite/rpl/r/rpl_parallel_fallback.result
        new results file is added.
     @ mysql-test/suite/rpl/t/rpl_parallel.test
        is changed to run with all formats because it starts verifying the transparent fallback
        to sequential for Query-log-event and related.
     @ mysql-test/suite/rpl/t/rpl_parallel_fallback.test
        A new test file is added to contain  cases of transparent fallback to the sequential execution.
        Rows_query_log_event case is placed there.
        Notice, the Query-log-event fallback is largely tested by rpl_parallel.
     @ sql/log_event.cc
        Refining event distribution logics in order to support fallback to the sequential execution.
        The following definions are framed out
        
        curr_group_is_parallel - indicates a Worker is engaged for all operation incl dectruction
           for all events of the group.
           The value lasts till a next group is decided to be "pure" sequential so that C will execute
           it, update rli synchronously and destroy the events.
        curr_group_seen_begin  - indicates if the current group is started with a B-event (BEGIN query).
           The value lasts till T-event is distributed.
        
        Deploying a w/a for  Rows_query_log_event that involves a nap to protect from a case 
        of multiple Rows_query_log in one group. Notice, a specific (w/a as well) rule of destroying
        the event.
     @ sql/log_event.h
        Rows_query_log_event fallback to sequential support is added.
     @ sql/rpl_rli.cc
        Rows_query_log_event fallback to sequential support is added.
     @ sql/rpl_rli.h
        curr_group_isolated is defined to be a parallel group that is executed in isolation from
        any other ahead and behind workers. Coordinator is supposed to provide such environment,
        the new member is a facility to control it.
     @ sql/rpl_rli_pdb.cc
        Fixing usage of circular_buffer_queue::gt() to deploy an assert suggested by
        the heading comments.
        Refining logics of finding a gap in GAQ.
        Adding 2nd arg to wait_for_workers...() to cover the 2nd use case of waiting Workers by C.
        The two are: wait for all, and wait for all but not one being currently scheduled.
     @ sql/rpl_slave.cc
        Refining logics of C's commit to the main rli due to a pure sequential event (e.g FD, Rotate),
        similarly refining logics of freeing.
        Deploying a w/a for Rows_query_log_event.

    added:
      mysql-test/suite/rpl/r/rpl_parallel_fallback.result
      mysql-test/suite/rpl/t/rpl_parallel-master.opt
      mysql-test/suite/rpl/t/rpl_parallel_fallback.test
    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      mysql-test/r/mysqld--help-win.result
      mysql-test/suite/rpl/r/rpl_parallel.result
      mysql-test/suite/rpl/r/rpl_sequential.result
      mysql-test/suite/rpl/t/rpl_parallel-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel.test
      mysql-test/suite/sys_vars/r/all_vars.result
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-14 08:57:16 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-12-16 21:41:45 +0000
@@ -6,7 +6,7 @@
 # load volume parameter
 #
 
-let $iter = 2000;
+let $iter = 1000;
 let $databases = 4;
 
 connection slave;
@@ -71,10 +71,26 @@ while($i)
   
   eval create database test$i1;
   eval use test$i1;
-  create table tm_nk (a int, b int) engine=myisam;
-  create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-  create table ti_nk (a int, b int) engine=innodb;
-  create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+  create table tm_nk (a int, b int, c text) engine=myisam;
+  create table tm_wk (a int auto_increment primary key, b int, c text) engine=myisam;
+  create table ti_nk (a int, b int, c text) engine=innodb;
+  create table ti_wk (a int auto_increment primary key, b int, c text) engine=innodb;
+  
+  if (`select @@global.binlog_format like 'statement'`)
+  {
+     create view v_tm_nk as select a,b from tm_nk;
+     create view v_tm_wk as select a,b from tm_wk;
+     create view v_ti_nk as select a,b from ti_nk;
+     create view v_ti_wk as select a,b from ti_wk;
+  }
+  
+  if (`select @@global.binlog_format not like 'statement'`)
+  {
+     create view v_tm_nk as select a,b,c from tm_nk;
+     create view v_tm_wk as select a,b,c from tm_wk;
+     create view v_ti_nk as select a,b,c from ti_nk;
+     create view v_ti_wk as select a,b,c from ti_wk;
+  }
 
   # this table is special - just for timing. It's more special on test0 db
   # where it contains master timing of the load as well.
@@ -158,11 +174,19 @@ while ($iter)
 
 	eval use test$i1;
 	##call test.one_session(1);
-	eval insert into tm_nk values($iter, $i1);
-	eval insert into tm_wk values(null, $i1);
-	eval insert into ti_nk values($iter, $i1);
-	eval insert into ti_wk values(null, $i1);
-    
+	eval insert into tm_nk values($iter, $i1, uuid());
+	eval insert into tm_wk values(null,  $i1, uuid());
+	eval insert into ti_nk values($iter, $i1, uuid());
+	eval insert into ti_wk values(null,  $i1, uuid());
+
+	begin;
+  	eval insert into tm_nk values($iter, $i1, repeat('a', round(rand()*10)));
+	eval insert into tm_wk values(null,  $i1, uuid());
+	commit;
+	begin;
+	eval insert into ti_nk values($iter, $i1, uuid());
+	eval insert into ti_wk values(null,  $i1, repeat('a', round(rand()*10)));
+	commit;
 	dec $i;
     }
 
@@ -226,21 +250,34 @@ select ts from test0.benchmark where sta
 select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
        time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
 
+--enable_result_log
+--enable_query_log
+
 let $i = $databases + 1;
 while($i)
 {
   let $i1 = $i;
   dec $i1;
 
-  let $diff_table_1=master:test$i1.tm_nk;
-  let $diff_table_2=slave:test$i1.tm_nk;
+  let $diff_table_1=master:test$i1.v_tm_nk;
+  let $diff_table_2=slave:test$i1.v_tm_nk;
+  source include/diff_tables.inc;
+
+  let $diff_table_1=master:test$i1.v_ti_nk;
+  let $diff_table_2=slave:test$i1.v_ti_nk;
+  source include/diff_tables.inc;
+
+  let $diff_table_1=master:test$i1.v_tm_wk;
+  let $diff_table_2=slave:test$i1.v_tm_wk;
+  source include/diff_tables.inc;
+
+  let $diff_table_1=master:test$i1.v_ti_wk;
+  let $diff_table_2=slave:test$i1.v_ti_wk;
   source include/diff_tables.inc;
 
   dec $i;
 }
 
---enable_result_log
---enable_query_log
 
 
 connection master;

=== modified file 'mysql-test/r/mysqld--help-win.result'
--- a/mysql-test/r/mysqld--help-win.result	2010-12-14 12:51:30 +0000
+++ b/mysql-test/r/mysqld--help-win.result	2010-12-16 21:41:45 +0000
@@ -345,6 +345,16 @@ The following options may be given as th
  disk after every #th milli-seconds. The zero value
  disables the checkpoint routine (makes sense for
  debugging).
+ --mts-coordinator-basic-nap=# 
+ Time in msec to sleep by MTS Coordinator to avoid the
+ Worker queues room overrun
+ --mts-exp-slave-local-timestamp 
+ If enabled slave itself computes the event appying time
+ value to implicitly affected timestamp columms. Otherwise
+ (default) it installs prescribed by the master value
+ --mts-exp-slave-run-query-in-parallel 
+ The default not an actual database name is used as
+ partition info for parallel execution of Query_log_event 
  --mts-partition-hash-soft-max=# 
  Number of records in the mts partition hash below which
  entries with zero usage are tolerated
@@ -352,6 +362,8 @@ The following options may be given as th
  Max size of Slave Worker queues holding yet not applied
  events.The least possible value must be not less than the
  master size max_allowed_packet.
+ --mts-slave-parallel-workers=# 
+ Number of worker threads for executing events in parallel
  --mts-slave-worker-queue-len-max=# 
  Max length of one MTS Worker queue. Presence in the queue
  indicates a replication event was read out of Relay log
@@ -360,6 +372,9 @@ The following options may be given as th
  Whichever limit is reached Coordinator threadsuspends
  further jobs assigning until conditions have been
  improved.
+ --mts-worker-underrun-level=# 
+ percent of Worker queue size at which Worker is
+ considered to become hungry
  --multi-range-count=# 
  Number of key ranges to request at once
  --myisam-block-size=# 
@@ -415,12 +430,6 @@ The following options may be given as th
  value is 0 then mysqld will reserve max_connections*5 or
  max_connections + table_cache*2 (whichever is larger)
  number of file descriptors
- --opt-mts-coordinator-basic-nap=# 
- Time in msec to sleep by MTS Coordinator to avoid the
- Worker queues room overrun
- --opt-mts-worker-underrun-level=# 
- percent of Worker queue size at which Worker is
- considered to become hungry
  --optimizer-join-cache-level=# 
  Controls what join operations can be executed with join
  buffers. Odd numbers are used for plain join buffers
@@ -664,18 +673,9 @@ The following options may be given as th
  --slave-load-tmpdir=name 
  The location where the slave should put its temporary
  files when replicating a LOAD DATA INFILE command
- --slave-local-timestamp 
- if enabled slave computes the event appying time value to
- implicitly affected timestamp columms. Otherwise
- (default) installs prescribed by the master value
  --slave-net-timeout=# 
  Number of seconds to wait for more data from a
  master/slave connection before aborting the read
- --slave-parallel-workers=# 
- Number of worker threads for executing events in parallel
- --slave-run-query-in-parallel 
- The default not an actual database name is used as
- partition info for parallel execution of Query_log_event 
  --slave-skip-errors=name 
  Tells the slave thread to continue replication when a
  query event returns an error from the provided list
@@ -899,9 +899,14 @@ max-write-lock-count 1844674407370955161
 memlock FALSE
 min-examined-row-limit 0
 mts-checkpoint-period 300
+mts-coordinator-basic-nap 5
+mts-exp-slave-local-timestamp FALSE
+mts-exp-slave-run-query-in-parallel FALSE
 mts-partition-hash-soft-max 16
 mts-pending-jobs-size-max 16777216
+mts-slave-parallel-workers 0
 mts-slave-worker-queue-len-max 40000
+mts-worker-underrun-level 0
 multi-range-count 256
 myisam-block-size 1024
 myisam-data-pointer-size 6
@@ -922,8 +927,6 @@ old FALSE
 old-alter-table FALSE
 old-passwords FALSE
 old-style-user-limits FALSE
-opt-mts-coordinator-basic-nap 5
-opt-mts-worker-underrun-level 0
 optimizer-join-cache-level 4
 optimizer-prune-level 1
 optimizer-search-depth 62
@@ -989,10 +992,7 @@ skip-show-database FALSE
 skip-slave-start FALSE
 slave-compressed-protocol FALSE
 slave-exec-mode STRICT
-slave-local-timestamp FALSE
 slave-net-timeout 3600
-slave-parallel-workers 0
-slave-run-query-in-parallel FALSE
 slave-skip-errors (No default value)
 slave-sql-verify-checksum TRUE
 slave-transaction-retries 10

=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result	2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result	2010-12-16 21:41:45 +0000
@@ -24,9 +24,21 @@ select ts from test0.benchmark where sta
 select ts from test0.benchmark where state like 'slave ends load' into @s_1;
 select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
 time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
-Comparing tables master:test3.tm_nk and slave:test3.tm_nk
-Comparing tables master:test2.tm_nk and slave:test2.tm_nk
-Comparing tables master:test1.tm_nk and slave:test1.tm_nk
-Comparing tables master:test0.tm_nk and slave:test0.tm_nk
+Comparing tables master:test3.v_tm_nk and slave:test3.v_tm_nk
+Comparing tables master:test3.v_ti_nk and slave:test3.v_ti_nk
+Comparing tables master:test3.v_tm_wk and slave:test3.v_tm_wk
+Comparing tables master:test3.v_ti_wk and slave:test3.v_ti_wk
+Comparing tables master:test2.v_tm_nk and slave:test2.v_tm_nk
+Comparing tables master:test2.v_ti_nk and slave:test2.v_ti_nk
+Comparing tables master:test2.v_tm_wk and slave:test2.v_tm_wk
+Comparing tables master:test2.v_ti_wk and slave:test2.v_ti_wk
+Comparing tables master:test1.v_tm_nk and slave:test1.v_tm_nk
+Comparing tables master:test1.v_ti_nk and slave:test1.v_ti_nk
+Comparing tables master:test1.v_tm_wk and slave:test1.v_tm_wk
+Comparing tables master:test1.v_ti_wk and slave:test1.v_ti_wk
+Comparing tables master:test0.v_tm_nk and slave:test0.v_tm_nk
+Comparing tables master:test0.v_ti_nk and slave:test0.v_ti_nk
+Comparing tables master:test0.v_tm_wk and slave:test0.v_tm_wk
+Comparing tables master:test0.v_ti_wk and slave:test0.v_ti_wk
 set @@global.mts_exp_slave_local_timestamp= @save.mts_exp_slave_local_timestamp;
 set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== added file 'mysql-test/suite/rpl/r/rpl_parallel_fallback.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_fallback.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_fallback.result	2010-12-16 21:41:45 +0000
@@ -0,0 +1,24 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 4;
+Warnings:
+Note	1724	Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+include/start_slave.inc
+set @@session.binlog_format= row;
+create database d0;
+create table d0.t1 (a int auto_increment primary key) engine=innodb;
+set @@session.binlog_format= row;
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+set @@session.binlog_rows_query_log_events= 1;
+set @@session.binlog_rows_query_log_events= 0;
+Comparing tables master:d0.t1 and slave:d0.t1
+drop database d0;
+drop database d1;
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;

=== modified file 'mysql-test/suite/rpl/r/rpl_sequential.result'
--- a/mysql-test/suite/rpl/r/rpl_sequential.result	2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/r/rpl_sequential.result	2010-12-16 21:41:45 +0000
@@ -20,8 +20,20 @@ select ts from test0.benchmark where sta
 select ts from test0.benchmark where state like 'slave ends load' into @s_1;
 select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
 time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
-Comparing tables master:test3.tm_nk and slave:test3.tm_nk
-Comparing tables master:test2.tm_nk and slave:test2.tm_nk
-Comparing tables master:test1.tm_nk and slave:test1.tm_nk
-Comparing tables master:test0.tm_nk and slave:test0.tm_nk
+Comparing tables master:test3.v_tm_nk and slave:test3.v_tm_nk
+Comparing tables master:test3.v_ti_nk and slave:test3.v_ti_nk
+Comparing tables master:test3.v_tm_wk and slave:test3.v_tm_wk
+Comparing tables master:test3.v_ti_wk and slave:test3.v_ti_wk
+Comparing tables master:test2.v_tm_nk and slave:test2.v_tm_nk
+Comparing tables master:test2.v_ti_nk and slave:test2.v_ti_nk
+Comparing tables master:test2.v_tm_wk and slave:test2.v_tm_wk
+Comparing tables master:test2.v_ti_wk and slave:test2.v_ti_wk
+Comparing tables master:test1.v_tm_nk and slave:test1.v_tm_nk
+Comparing tables master:test1.v_ti_nk and slave:test1.v_ti_nk
+Comparing tables master:test1.v_tm_wk and slave:test1.v_tm_wk
+Comparing tables master:test1.v_ti_wk and slave:test1.v_ti_wk
+Comparing tables master:test0.v_tm_nk and slave:test0.v_tm_nk
+Comparing tables master:test0.v_ti_nk and slave:test0.v_ti_nk
+Comparing tables master:test0.v_tm_wk and slave:test0.v_tm_wk
+Comparing tables master:test0.v_ti_wk and slave:test0.v_ti_wk
 set @@global.mts_exp_slave_local_timestamp= @save.mts_exp_slave_local_timestamp;

=== added file 'mysql-test/suite/rpl/t/rpl_parallel-master.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel-master.opt	2010-12-16 21:41:45 +0000
@@ -0,0 +1 @@
+--log-warnings=0

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel-slave.opt	2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel-slave.opt	2010-12-16 21:41:45 +0000
@@ -1 +1,2 @@
---mts-slave-parallel-workers=4
+--mts-slave-parallel-workers=4 --log-warnings=0
+

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	2010-12-14 08:57:16 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2010-12-16 21:41:45 +0000
@@ -34,7 +34,7 @@
 #
 
 source include/master-slave.inc;
-source include/have_binlog_format_row.inc;
+# source include/have_binlog_format_row.inc;
 
 connection slave;
 set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;

=== added file 'mysql-test/suite/rpl/t/rpl_parallel_fallback.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_fallback.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_fallback.test	2010-12-16 21:41:45 +0000
@@ -0,0 +1,93 @@
+#
+# WL#5569 MTS
+#
+# The test lists cases of transparent fallback to the sequential execution and
+# verifies it correctness.
+# Notice, the Query-log-event fallback is largely tested by rpl_parallel.
+#
+
+source include/master-slave.inc;
+source include/have_binlog_format_mixed.inc;
+
+let $workers= 4;
+
+connection slave;
+
+# restart in Parallel
+source include/stop_slave.inc;
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+eval set @@global.mts_slave_parallel_workers= $workers;
+source include/start_slave.inc;
+
+connection master;
+
+set @@session.binlog_format= row;
+create database d0;
+create table d0.t1 (a int auto_increment primary key) engine=innodb;
+
+connection master1;
+
+set @@session.binlog_format= row;
+create database d1;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+
+#
+# Rows_query_log_event case
+#
+
+let $iter= 100;
+let $i= $iter;
+
+connection master;
+set @@session.binlog_rows_query_log_events= 1;
+
+connection master1;
+set @@session.binlog_rows_query_log_events= 0;
+
+--disable_query_log
+--disable_result_log
+
+while ($i)
+{
+  connection master;
+  begin;
+  insert into d0.t1 values(null);
+  insert into d1.t1 values(null);
+  commit;
+
+  connection master1;
+  begin;
+  insert into d1.t1 values(null);
+  insert into d0.t1 values(null);
+  commit;
+
+  dec $i;
+}
+
+--enable_result_log
+--enable_query_log
+
+sync_slave_with_master;
+
+# verification
+
+let $diff_table_1=master:d0.t1;
+let $diff_table_2= slave:d0.t1;
+source include/diff_tables.inc;
+
+
+#
+# cleanup
+#
+
+connection master;
+
+drop database d0;
+drop database d1;
+
+
+sync_slave_with_master;
+#connection slave;
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
+
+

=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result	2010-12-13 21:16:31 +0000
+++ b/mysql-test/suite/sys_vars/r/all_vars.result	2010-12-16 21:41:45 +0000
@@ -11,16 +11,17 @@ There should be *no* long test name list
 select variable_name as `There should be *no* variables listed below:` from t2
 left join t1 on variable_name=test_name where test_name is null;
 There should be *no* variables listed below:
-OPT_MTS_COORDINATOR_BASIC_NAP
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
-OPT_MTS_WORKER_UNDERRUN_LEVEL
-SLAVE_RUN_QUERY_IN_PARALLEL
+MTS_EXP_SLAVE_LOCAL_TIMESTAMP
+MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME
 INNODB_ANALYZE_IS_PERSISTENT
 INNODB_RESET_MONITOR_COUNTER
+MTS_SLAVE_PARALLEL_WORKERS
+MTS_WORKER_UNDERRUN_LEVEL
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
@@ -28,18 +29,18 @@ INNODB_ENABLE_MONITOR_COUNTER
 MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
 MTS_PENDING_JOBS_SIZE_MAX
-SLAVE_PARALLEL_WORKERS
-SLAVE_LOCAL_TIMESTAMP
-OPT_MTS_COORDINATOR_BASIC_NAP
+MTS_COORDINATOR_BASIC_NAP
 INNODB_STATS_TRANSIENT_SAMPLE_PAGES
 MTS_PARTITION_HASH_SOFT_MAX
-OPT_MTS_WORKER_UNDERRUN_LEVEL
-SLAVE_RUN_QUERY_IN_PARALLEL
+MTS_EXP_SLAVE_LOCAL_TIMESTAMP
+MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
 INNODB_STATS_PERSISTENT_SAMPLE_PAGES
 RELAY_LOG_BASENAME
 LOG_BIN_BASENAME
 INNODB_ANALYZE_IS_PERSISTENT
 INNODB_RESET_MONITOR_COUNTER
+MTS_SLAVE_PARALLEL_WORKERS
+MTS_WORKER_UNDERRUN_LEVEL
 INNODB_RESET_ALL_MONITOR_COUNTER
 LOG_BIN_INDEX
 INNODB_DISABLE_MONITOR_COUNTER
@@ -47,7 +48,6 @@ INNODB_ENABLE_MONITOR_COUNTER
 MTS_SLAVE_WORKER_QUEUE_LEN_MAX
 INNODB_FILE_FORMAT_MAX
 MTS_PENDING_JOBS_SIZE_MAX
-SLAVE_PARALLEL_WORKERS
-SLAVE_LOCAL_TIMESTAMP
+MTS_COORDINATOR_BASIC_NAP
 drop table t1;
 drop table t2;

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-14 08:57:16 +0000
+++ b/sql/log_event.cc	2010-12-16 21:41:45 +0000
@@ -176,7 +176,8 @@ void handle_rows_query_log_event(Log_eve
        ev_type == UPDATE_ROWS_EVENT) && rli->rows_query_ev != NULL &&
       ((Rows_log_event*) ev)->get_flags(Rows_log_event::STMT_END_F))
   {
-    delete rli->rows_query_ev;
+    if (rli->rows_query_ev)
+      delete rli->rows_query_ev;
     rli->rows_query_ev= NULL;
     rli->info_thd->set_query(NULL, 0);
   }
@@ -2365,8 +2366,8 @@ bool Log_event::contains_partition_info(
 {
   return get_type_code() == TABLE_MAP_EVENT ||
 
-    // todo: the 4 types below are limitly parallel-supported (the default 
-    // session db not the actual db)
+    // todo: Query event is limitly supported
+    // which ev->get_db() yields the session db not the actual db
       
     get_type_code() == QUERY_EVENT;
 }
@@ -2449,12 +2450,14 @@ Slave_worker *Log_event::get_slave_worke
 
       // mark the current grup as started with B-event
       const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= TRUE;
-      return NULL;
     } 
     else 
     { 
       DBUG_ASSERT(!rli->curr_group_seen_begin);
     }
+    // serves as a mark for Coord to delete events otherwise
+    const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE;
+    return NULL;
   }
 
   //else // g
@@ -2471,7 +2474,6 @@ Slave_worker *Log_event::get_slave_worke
       set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
 
       DBUG_ASSERT(g.group_relay_log_name == NULL);
-
     }
   }
   else // r
@@ -2502,6 +2504,7 @@ Slave_worker *Log_event::get_slave_worke
       (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
+    DBUG_ASSERT(rli->curr_group_is_parallel);
 
     // TODO: throw an error when relay-log reading starts from inside of a group!!
 
@@ -2560,7 +2563,6 @@ Slave_worker *Log_event::get_slave_worke
 
     // reset the B-group marker
     const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
-    const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE;  // mark for Coord's T-event delete
   }
   
   return worker;
@@ -2748,16 +2750,22 @@ int Log_event::apply_event(Relay_log_inf
   Slave_job_item item= {NULL}, *job_item= &item;
   Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
   bool parallel;
+  bool seq_event;
+  bool term_event;
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
+      ((seq_event=
+        only_sequential_exec(rli->run_query_in_parallel,
+                             rli->curr_group_seen_begin /* todo: obs 2nd arg */))
+       // rli->curr_group_seen_begin && ends_group() => rli->last_assigned_worker
+       && !rli->curr_group_seen_begin))
   {
     if (parallel)
     {
-      // This `only-sequential' case relates to Query parallel apply which
-      // breaks into DDL and {B, Q, T} group, where Q owns g-parallel property.
+      // This `only-sequential' case relates to a DDL Query case
 
-      // Apply possibly deferred B
+      DBUG_ASSERT(rli->curr_group_da.elements == 0);
+#if 0      
       if (rli->curr_group_da.elements > 0)
       {
         int res;
@@ -2778,27 +2786,45 @@ int Log_event::apply_event(Relay_log_inf
         }
         res= ev_begin->do_apply_event(rli);
         delete ev_begin;
-        /* B appears to be serial, reset parallel status of group 
-           because the following T won't do that */
-        c_rli->curr_group_seen_begin= FALSE;
-
+        c_rli->curr_group_is_parallel= FALSE;   // Coord will destruct events
         if (res)
           DBUG_RETURN(res);
       }
-
+#endif
       DBUG_ASSERT(!rli->curr_group_seen_begin);
-      c_rli->curr_group_is_parallel= FALSE;   // Coord will destruct all the rest of events
+      
+      c_rli->curr_group_is_parallel= FALSE;   // Coord will destruct events
       if (!parallel_exec_by_coordinator(::server_id))
         (void) wait_for_workers_to_finish(rli);
     }
     DBUG_RETURN(do_apply_event(rli));
   }
+  
+  DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
+              rli->last_assigned_worker);
 
-  if (get_type_code() == ROWS_QUERY_LOG_EVENT)
-  {      
-    rli->report(ERROR_LEVEL, 0,
-                "No parallel support for ROWS_QUERY_LOG_EVENT");
-    DBUG_RETURN(1);
+  /* 
+     Work-around:s for B, T,..., Q case and ROWS_QUERY_LOG_EVENT
+     A worker has been assigned but it needs sequential environment.
+
+     Todo: support Query parallelization.
+     Todo: disassociate Rows_* events from the central rli.
+  */
+  if (seq_event)
+  {   // rli->last_assigned_worker != NULL if BTQ but not BQT
+    DBUG_ASSERT(rli->curr_group_seen_begin || ends_group());
+    if (!c_rli->curr_group_isolated)
+      (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
+    c_rli->curr_group_isolated= TRUE;
+
+    if (get_type_code() == ROWS_QUERY_LOG_EVENT)
+    {
+      while (c_rli->rows_query_ev != NULL)
+      {
+        my_sleep(10);
+      }
+      c_rli->rows_query_ev= (Rows_query_log_event*) this;
+    }
   }
 
   if ((!(w= get_slave_worker_id(rli)) ||
@@ -2832,7 +2858,17 @@ int Log_event::apply_event(Relay_log_inf
     c_rli->curr_group_da.elements= 0;
   }
 
+  if (c_rli->curr_group_isolated)
+    term_event= ends_group();
+
   append_item_to_jobs(job_item, w, c_rli);
+
+  if (c_rli->curr_group_isolated && term_event)
+  {
+    (void) wait_for_workers_to_finish(rli);
+    c_rli->curr_group_isolated= FALSE;
+  }
+
   DBUG_RETURN(FALSE);
 }
 
@@ -3016,7 +3052,8 @@ int slave_worker_exec_job(Slave_worker *
 
 err:
 
-  if (!ev)
+  // todo: fix w/a for Rows_query_log_event
+  if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
     delete ev;  // after ev->update_pos() event is garbage
 
   DBUG_RETURN(error);
@@ -11196,7 +11233,7 @@ Rows_query_log_event::write_data_body(IO
 int Rows_query_log_event::do_apply_event(Relay_log_info const *rli)
 {
   DBUG_ENTER("Rows_query_log_event::do_apply_event");
-  DBUG_ASSERT(rli->info_thd == thd);
+  DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
   /* Set query for writing Rows_query log event into binlog later.*/
   thd->set_query(m_rows_query, (uint32) strlen(m_rows_query));
   DBUG_RETURN(0);

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-12-08 00:33:48 +0000
+++ b/sql/log_event.h	2010-12-16 21:41:45 +0000
@@ -1231,6 +1231,9 @@ public:
       get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
       get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
       get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
+
+      get_type_code() == ROWS_QUERY_LOG_EVENT    || /* todo: make parallel */
+
       get_type_code() == INCIDENT_EVENT;
   }
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-14 12:51:30 +0000
+++ b/sql/rpl_rli.cc	2010-12-16 21:41:45 +0000
@@ -1102,12 +1102,22 @@ void Relay_log_info::cleanup_context(THD
   {
     trans_rollback_stmt(thd); // if a "statement transaction"
     trans_rollback(thd);      // if a "real transaction"
+  }
+  /*
+    MTS W/a for Rows_query_log_event.
+    Cleanup of rows_query_ev at the end of the current statement.
+
+    todo: move handle_rows_query_log_event() cleanup logics into this method
+          inconditionally.
+  */
+  if (error || is_parallel_exec())
     if (rows_query_ev)
     {
       delete rows_query_ev;
       rows_query_ev= NULL;
+      info_thd->set_query(NULL, 0);
     }
-  }
+
   if (!is_parallel_exec() || thd == info_thd)
     m_table_map.clear_tables();
   else

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-12-13 16:53:32 +0000
+++ b/sql/rpl_rli.h	2010-12-16 21:41:45 +0000
@@ -390,7 +390,7 @@ public:
   uint tables_to_lock_count;        /* RBR: Count of tables to lock */
   table_mapping m_table_map;      /* RBR: Mapping table-id to table */
   /* RBR: Record Rows_query log event */
-  Rows_query_log_event* rows_query_ev;
+  volatile Rows_query_log_event* rows_query_ev;  // mts w/a makes it volatile
 
   bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
   {
@@ -449,6 +449,7 @@ public:
   DYNAMIC_ARRAY curr_group_da;  // deferred array to hold partition-info-free events
   bool curr_group_seen_begin;   // current group started with B-event or not
   bool run_query_in_parallel;   // Query's default db not the actual db as part
+  bool curr_group_isolated;     // Trans is exec:d by Worker but in exclusive env
   volatile ulong mts_wqs_underrun_w_id;  // Id of a Worker whose queue is getting empty
   volatile long mts_wqs_overrun;   // W to incr and decr
   long  mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-14 08:57:16 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-16 21:41:45 +0000
@@ -677,7 +677,8 @@ void* circular_buffer_queue::head_queue(
 }
 
 /**
-   two index comparision.
+   two index comparision to determine which of the two
+   is ordered first.
 
    @note   The caller makes sure the args are within the valid
            range, incl cases the queue is empty or full.
@@ -688,6 +689,8 @@ void* circular_buffer_queue::head_queue(
 */
 bool circular_buffer_queue::gt(ulong i, ulong k)
 {
+  DBUG_ASSERT(i < s && k < s);
+
   if (i >= e)
     if (k >= e)
       return i > k;
@@ -738,7 +741,7 @@ ulong Slave_committed_queue::move_queue_
       break; /* the head is not even assigned */
     get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
     
-    if (gt(i, w_i->last_group_done_index))
+    if (w_i->last_group_done_index == s || gt(i, w_i->last_group_done_index))
       break; /* gap at i'th */
 
     // memorize the last met group_relay_log_name
@@ -790,7 +793,7 @@ ulong Slave_committed_queue::move_queue_
 }
 
 
-int wait_for_workers_to_finish(Relay_log_info const *rli)
+int wait_for_workers_to_finish(Relay_log_info const *rli, Slave_worker *ignore)
 {
   uint ret= 0;
   HASH *hash= &mapping_db_to_worker;
@@ -809,7 +812,13 @@ int wait_for_workers_to_finish(Relay_log
     entry= (db_worker*) my_hash_element(hash, i);
 
     DBUG_ASSERT(entry);
-    
+
+    if (ignore && entry->worker == ignore)
+    {
+      mysql_mutex_unlock(&slave_worker_hash_lock);
+      continue;
+    }
+
     if (entry->usage > 0)
     {
       sprintf(wait_info, info_format, entry->worker->id, entry->db);

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-10 16:25:27 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-16 21:41:45 +0000
@@ -24,7 +24,8 @@ bool init_hash_workers(ulong slave_paral
 void destroy_hash_workers();
 Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
 Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
-int wait_for_workers_to_finish(Relay_log_info const *rli);
+int wait_for_workers_to_finish(Relay_log_info const *rli,
+                               Slave_worker *ignore= NULL);
 
 #define SLAVE_WORKER_QUEUE_SIZE 8096
 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-14 14:46:20 +0000
+++ b/sql/rpl_slave.cc	2010-12-16 21:41:45 +0000
@@ -2817,27 +2817,6 @@ int apply_event_and_update_pos(Log_event
   else
     mysql_mutex_unlock(&rli->data_lock);
 
-#ifndef DBUG_OFF
-  /*
-    This only prints information to the debug trace.
-
-    TODO: Print an informational message to the error log?
-  */
-  static const char *const explain[] = {
-    // EVENT_SKIP_NOT,
-    "not skipped",
-    // EVENT_SKIP_IGNORE,
-    "skipped because event should be ignored",
-    // EVENT_SKIP_COUNT
-    "skipped because event skip counter was non-zero"
-  };
-  DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
-                      test(thd->variables.option_bits & OPTION_BEGIN),
-                      rli->get_flag(Relay_log_info::IN_STMT)));
-  DBUG_PRINT("skip_event", ("%s event was %s",
-                            ev->get_type_str(), explain[reason]));
-#endif
-
   DBUG_PRINT("info", ("apply_event error = %d", exec_res));
   if (exec_res == 0)
   {
@@ -2854,13 +2833,51 @@ int apply_event_and_update_pos(Log_event
     */
     int error= 0;
     if (skip_event || 
-        !rli->is_parallel_exec() ||
-        ev->only_sequential_exec(rli->run_query_in_parallel,
-                                 ev->ends_group() ?
-                                 rli->curr_group_is_parallel :
-                                 rli->curr_group_seen_begin))
+        (!rli->is_parallel_exec() ||
+         !rli->curr_group_is_parallel))
     {
+      DBUG_ASSERT(skip_event || !rli->is_parallel_exec() ||
+                  !rli->curr_group_is_parallel ||
+                  (ev->only_sequential_exec(rli->run_query_in_parallel,
+                                            (rli->curr_group_seen_begin ||
+                                             rli->last_assigned_worker != NULL))
+                   && !rli->curr_group_seen_begin));
+#ifndef DBUG_OFF
+      /*
+        This only prints information to the debug trace.
+        
+        TODO: Print an informational message to the error log?
+      */
+      static const char *const explain[] = {
+        // EVENT_SKIP_NOT,
+        "not skipped",
+        // EVENT_SKIP_IGNORE,
+        "skipped because event should be ignored",
+        // EVENT_SKIP_COUNT
+        "skipped because event skip counter was non-zero"
+      };
+      DBUG_PRINT("info", ("OPTION_BEGIN: %d; IN_STMT: %d",
+                          test(thd->variables.option_bits & OPTION_BEGIN),
+                          rli->get_flag(Relay_log_info::IN_STMT)));
+      DBUG_PRINT("skip_event", ("%s event was %s",
+                                ev->get_type_str(), explain[reason]));
+#endif
+
       error= ev->update_pos(rli);
+
+#ifndef DBUG_OFF
+      DBUG_PRINT("info", ("update_pos error = %d", error));
+      if (!rli->belongs_to_client())
+      {
+        char buf[22];
+        DBUG_PRINT("info", ("group %s %s",
+                            llstr(rli->get_group_relay_log_pos(), buf),
+                            rli->get_group_relay_log_name()));
+        DBUG_PRINT("info", ("event %s %s",
+                            llstr(rli->get_event_relay_log_pos(), buf),
+                            rli->get_event_relay_log_name()));
+      }
+#endif
     }
     else
     {
@@ -2874,28 +2891,15 @@ int apply_event_and_update_pos(Log_event
       rli->inc_event_relay_log_pos();
     }
 
-#ifndef DBUG_OFF
-    DBUG_PRINT("info", ("update_pos error = %d", error));
-    if (!rli->belongs_to_client())
-    {
-      char buf[22];
-      DBUG_PRINT("info", ("group %s %s",
-                          llstr(rli->get_group_relay_log_pos(), buf),
-                          rli->get_group_relay_log_name()));
-      DBUG_PRINT("info", ("event %s %s",
-                          llstr(rli->get_event_relay_log_pos(), buf),
-                          rli->get_event_relay_log_name()));
-    }
-#endif
-    /*
-      The update should not fail, so print an error message and
-      return an error code.
-
-      TODO: Replace this with a decent error message when merged
-      with BUG#24954 (which adds several new error message).
-    */
     if (error)
     {
+      /*
+        The update should not fail, so print an error message and
+        return an error code.
+        
+        TODO: Replace this with a decent error message when merged
+        with BUG#24954 (which adds several new error message).
+      */
       char buf[22];
       rli->report(ERROR_LEVEL, ER_UNKNOWN_ERROR,
                   "It was not possible to update the positions"
@@ -3034,25 +3038,18 @@ static int exec_relay_log_event(THD* thd
       used to read info about the relay log's format; it will be deleted when
       the SQL thread does not need it, i.e. when this thread terminates.
     */
-    if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+    // if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
     {
-
-      /*
-        WL5363 MTS-II: 
-        
-        ev is deleted by Coordinator when Worker has released the event's item 
-        from its queue. de_queue() just returns the item for extracting ev and
-        processing, and once that has been done the queue get the item as free.
-
-      if (slave_exec_mode == SLAVE_PARALLEL)
-      {
-        if (parallel_terminate(ev))
-          delete dyn_array the sequence of events comprising the current group;
-        else push(ev, dyn_array);
-      } 
-      else
-      */
- 
+      if ((!rli->is_parallel_exec() ||
+           !rli->curr_group_is_parallel)
+          && ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+      {
+        DBUG_ASSERT(!rli->is_parallel_exec() ||
+                    (ev->only_sequential_exec(rli->run_query_in_parallel,
+                                              // rli->curr_group_is_parallel
+                                              (rli->curr_group_seen_begin ||
+                                               rli->last_assigned_worker != NULL))
+                     && !rli->curr_group_seen_begin));
       /* MTS:  Observation/todo.
 
          ROWS_QUERY_LOG_EVENT could be supported easier if
@@ -3060,17 +3057,17 @@ static int exec_relay_log_event(THD* thd
          with rli->cleanup_context() and the rest move into 
          ROWS...::do_apply_event
       */
-      if (thd->variables.binlog_rows_query_log_events)
-        handle_rows_query_log_event(ev, rli);
-
-      if ((!rli->is_parallel_exec() ||
-           ev->only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
-          && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)  // mts TODO: check this case
-      {
+        
+        if (!rli->is_parallel_exec())
+          if (thd->variables.binlog_rows_query_log_events)
+            handle_rows_query_log_event(ev, rli);
 
-        DBUG_PRINT("info", ("Deleting the event after it has been executed"));
-        delete ev;
-        ev= NULL;
+        if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+        {
+          DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+          delete ev;
+          ev= NULL;
+        }
       }
     }
 
@@ -4130,10 +4127,12 @@ int slave_start_workers(Relay_log_info *
   rli->mts_coordinator_basic_nap= ::opt_mts_coordinator_basic_nap;
   rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
   rli->mts_total_groups= 0;
-  rli->curr_group_seen_begin= 0;
+  rli->curr_group_seen_begin= FALSE; // initial presumtion, will change
+  rli->curr_group_is_parallel= TRUE; // initial presumtion, will change
+  rli->curr_group_isolated= FALSE;
   rli->run_query_in_parallel= opt_mts_slave_run_query_in_parallel;
   rli->checkpoint_seqno= 0;
-
+  //rli->worker_bitmap_buf= my_malloc(n/8 + 1,MYF(MY_WME));
   for (i= 0; i < n; i++)
   {
     if ((error= slave_start_single_worker(rli, i)))
@@ -4220,7 +4219,7 @@ void slave_stop_workers(Relay_log_info *
   delete_dynamic(&rli->least_occupied_workers);    // least occupied
   delete_dynamic(&rli->curr_group_da);             // GCDA
   delete_dynamic(&rli->curr_group_assigned_parts); // GCAP
-
+  //my_free(rli->worker_bitmap_buf);
   rli->deinit_workers();
 }
 


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101216214145-cof1c6wxz6fsfp29.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3247) WL#5569Andrei Elkin16 Dec