3219 Andrei Elkin 2010-11-26
wl#5569 MTS
Partitioning conflict detection and handling is implemented.
A new option to run Query in parallel though incompatibly with Rows- case in that
the default db not the actual db:s are used as the partition key.
User interface gained the global var and the cmd line opt:
slave_run_query_in_parallel (Welcome to the set! :-)
@ mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
new tests result file is added.
@ mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
Partitioning conflicts detection, handling basic initial test is added.
@ sql/log_event.cc
Refining parallel vs sequential decider to cover optional support for Query parallelization.
@ sql/log_event.h
Refining only_serial_exec() with providing hints through two new args.
@ sql/mysqld.cc
new Query limited parallelization support related.
@ sql/mysqld.h
new Query limited parallelization support related.
@ sql/rpl_rli.h
changed are due to new Query limited parallelization support.
@ sql/rpl_rli_pdb.cc
Conflict detection, waiting, partition release is implemented.
added:
mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
modified:
sql/log_event.cc
sql/log_event.h
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/sys_vars.cc
3218 Alfranio Correia 2010-11-26
There was a mismatching between the number of fields read and write and
by consequence the read was failing for the Slave_worker.
modified:
sql/rpl_rli_pdb.cc
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_conflicts.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result 2010-11-26 21:08:30 +0000
@@ -0,0 +1,78 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+create view coord_wait_list as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+include/start_slave.inc
+create database d1;
+create database d2;
+create database d3;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+create table d2.t1 (a int auto_increment primary key) engine=innodb;
+create table d3.t1 (a int auto_increment primary key) engine=innodb;
+begin;
+insert into d2.t1 values (1);
+begin;
+use d1;
+insert into d1.t1 values (null);
+use d2;
+insert into d2.t1 values (1);
+commit;
+begin;
+use d3;
+insert into d3.t1 values (null);
+use d1;
+insert into d1.t1 values (null);
+commit;
+rollback;
+select count(*) from d1.t1 into @d1;
+select count(*) from d2.t1 into @d2;
+select count(*) from d3.t1 into @d3;
+use d1;
+create table `exists_only_on_slave` (a int);
+begin;
+insert into d1.t1 values (null);
+insert into d2.t1 values (null);
+insert into d3.t1 values (null);
+begin;
+use d1;
+insert into d1.t1 values (null);
+commit;
+begin;
+use d2;
+insert into d2.t1 values (null);
+commit;
+begin;
+use d3;
+insert into d3.t1 values (null);
+commit;
+use d1;
+drop table if exists `exists_only_on_slave`;
+select sleep(1);
+sleep(1)
+0
+select count(*) - @d1 as 'zero' from d1.t1;
+zero
+0
+select count(*) - @d2 as 'zero' from d2.t1;
+zero
+0
+select count(*) - @d3 as 'zero' from d3.t1;
+zero
+0
+use d1;
+select count(*) as 'zero' from `exists_only_on_slave`;
+zero
+0
+rollback;
+drop database d1;
+drop database d2;
+drop database d3;
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+*** End of the tests ***
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_conflicts.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test 2010-11-26 21:08:30 +0000
@@ -0,0 +1,224 @@
+#
+# WL#5569 MTS
+#
+# The test checks cases of hashing conflicts forcing a special hanling.
+# The cases include
+#
+# I. two Worker jobs conflict to each other
+#
+# a. two multi-statement transactions containing more than one partition
+# in which one is common are mapped to different Workers.
+# b. similarly two autocommit queries or ddl:s
+#
+# Handling of the cases is carried out as the following:
+# when Coordinator hits to an occupied by not the currenly assigned Worker
+# partition it marks the partition and goes to wait till the Worker-owner
+# has released it and signaled.
+#
+# II. An event requires the sequential execution
+#
+# Coordinator does not schedule the event and is waiting till all workers have
+# released their partitions and signalled.
+
+source include/master-slave.inc;
+
+#
+# Testing with the statement format requires
+# @@global.slave_run_query_in_parallel = 1.
+# Notice, parallelization for Query-log-event is limitted
+# to the default dababase. That's why 'use db'.
+# With the default @@global.slave_run_query_in_parallel == 0
+# the tests in stmt format still run to prove switching to the sequential.
+
+# TODO: convert this file into two tests for either value of
+# @@global.slave_run_query_in_parallel
+
+connection slave;
+
+--disable_query_log
+--disable_result_log
+call mtr.add_suppression('Error reading slave worker configuration');
+--enable_query_log
+--enable_result_log
+
+create view coord_wait_list as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+
+source include/stop_slave.inc;
+
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+source include/start_slave.inc;
+
+
+connection master;
+
+create database d1;
+create database d2;
+create database d3;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+create table d2.t1 (a int auto_increment primary key) engine=innodb;
+create table d3.t1 (a int auto_increment primary key) engine=innodb;
+
+#
+# I. Two parallel jobs conflict
+#
+# two conflicting jobs to follow
+
+# sync_slave_with_master
+
+# TODO: remove once `sync_slave_with_master' got fixed
+
+--sleep 3
+
+# To be really conflicting slave needs to block commit of the first.
+connection slave;
+
+begin;
+insert into d2.t1 values (1);
+
+connection master;
+
+# Job_1
+begin;
+use d1;
+insert into d1.t1 values (null);
+use d2;
+insert into d2.t1 values (1); # will be block at this point on Worker
+commit;
+
+# Job_2
+begin;
+use d3;
+insert into d3.t1 values (null);
+use d1;
+insert into d1.t1 values (null); # will be block at this point on Coord
+commit;
+
+--sleep 4
+
+connection slave;
+
+if (`SELECT @@global.binlog_format LIKE "row"`)
+{
+ if (`select COUNT(*) = 0 FROM coord_wait_list`)
+ {
+ SELECT * from Information_Schema.processlist;
+ --die Appologies, coodinator is supposed to be in the waiting state but it is not
+ }
+}
+
+# release the Worker
+rollback;
+
+let $count= 2;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+
+#
+# II. The only-sequential conflicts with ongoing parallel applying
+#
+
+# a. DDL waits for all workers have processed their earlier scheduled assignments
+
+connection slave1;
+
+# fix the tables status. Tables are supposed to exist, possibly with data left
+# after previous part.
+
+select count(*) from d1.t1 into @d1;
+select count(*) from d2.t1 into @d2;
+select count(*) from d3.t1 into @d3;
+use d1;
+create table `exists_only_on_slave` (a int);
+
+connection slave;
+
+# put in the way of workers blocking load
+
+begin;
+insert into d1.t1 values (null);
+insert into d2.t1 values (null);
+insert into d3.t1 values (null);
+
+connection master;
+
+# Job_1
+begin;
+use d1;
+insert into d1.t1 values (null);
+commit;
+
+# Job_2
+begin;
+use d2;
+insert into d2.t1 values (null);
+commit;
+
+
+# Job_3
+begin;
+use d3;
+insert into d3.t1 values (null);
+commit;
+
+--disable_warnings
+use d1;
+drop table if exists `exists_only_on_slave`;
+--enable_warnings
+
+
+connection slave1;
+
+select sleep(1); # give Workers a little time to process (but they won't)
+
+select count(*) - @d1 as 'zero' from d1.t1;
+select count(*) - @d2 as 'zero' from d2.t1;
+select count(*) - @d3 as 'zero' from d3.t1;
+
+# proof the master DDL has not got through
+use d1;
+select count(*) as 'zero' from `exists_only_on_slave`;
+
+connection slave;
+
+rollback; # release workers
+
+connection slave1;
+
+# to finish up with getting all committed.
+
+let $count= `select @d1 + 1`;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+let $count= `select @d2 + 1`;
+let $table= d2.t1;
+source include/wait_until_rows_count.inc;
+
+let $count= `select @d3 + 1`;
+let $table= d3.t1;
+source include/wait_until_rows_count.inc;
+connection slave;
+
+
+#
+# cleanup
+#
+
+connection master;
+
+drop database d1;
+drop database d2;
+drop database d3;
+
+--sleep 4
+
+connection slave;
+#sync_slave_with_master;
+
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+--echo *** End of the tests ***
+
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-25 08:47:39 +0000
+++ b/sql/log_event.cc 2010-11-26 21:08:30 +0000
@@ -2300,6 +2300,8 @@ Slave_worker *Log_event::get_slave_worke
// reset the B-group marker
const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
+
+ const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE; // mark for Coord's T-event delete
}
return worker;
@@ -2445,10 +2447,47 @@ int Log_event::apply_event(Relay_log_inf
Slave_worker *w= NULL;
Slave_job_item item= {NULL}, *job_item= &item;
Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
+ bool parallel;
- if (!rli->is_parallel_exec() || only_serial_exec() /* || wait(APH.N == 0) */)
+ if (!(parallel= rli->is_parallel_exec()) ||
+ only_serial_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
+ {
+ if (parallel)
+ {
+ // This case relates to Query parallel apply which breaks into
+ // DDL and {B, Q, T} group, where Q owns g-parallel property.
+
+ // Apply possibly deferred B
+ if (rli->curr_group_da.elements > 0)
+ {
+ int res;
+ Log_event *ev_begin= * (Log_event**) pop_dynamic(&c_rli->curr_group_da);
+
+ DBUG_ASSERT(rli->curr_group_da.elements == 0);
+ DBUG_ASSERT(rli->curr_group_seen_begin);
+
+ res= ev_begin->do_apply_event(rli);
+ delete ev_begin;
+ /* B appears to be serial, reset parallel stautus of group
+ because the following T won't do that */
+ c_rli->curr_group_seen_begin= FALSE;
+
+ if (res)
+ DBUG_RETURN(res);
+ }
+
+ DBUG_ASSERT(!rli->curr_group_seen_begin);
+ c_rli->curr_group_is_parallel= FALSE; // Coord will destruct all the rest of events
+
+ (void) wait_for_workers_to_finish(rli);
+ }
DBUG_RETURN(do_apply_event(rli));
-
+ }
+
+ // !!! TODO: suppress
+ // if (get_type_code() == ROWS_QUERY_LOG_EVENT)
+
+
if ((!(w= get_slave_worker_id(rli)) ||
DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-11-22 18:57:13 +0000
+++ b/sql/log_event.h 2010-11-26 21:08:30 +0000
@@ -1153,37 +1153,45 @@ public:
public:
/**
- mst-II: to execute serially due to
- technical or conceptual limitation
+ MST: to execute serially due to technical or conceptual limitation
@return TRUE for all but {Query,Rand,User_var,Intvar,Rows}_log_event
*/
- bool only_serial_exec()
+ bool only_serial_exec(bool query_in_parallel, bool group_term_in_parallel)
{
return
- // todo: the 4 types below are limitly parallel-supported (the default
- // session db not the actual db)
+ /*
+ the 4 types below are limitly parallel-supported (the default
+ session db not the actual db).
+ Decision on BEGIN is deferred till the following event.
+ Decision on Commit or Xid is forced by the one for BEGIN.
+ */
- // get_type_code() == QUERY_EVENT ||
- // get_type_code() == INTVAR_EVENT ||
- // get_type_code() == USER_VAR_EVENT ||
- // get_type_code() == RAND_EVENT ||
-
- get_type_code() == STOP_EVENT ||
- get_type_code() == ROTATE_EVENT ||
- get_type_code() == LOAD_EVENT ||
- get_type_code() == SLAVE_EVENT ||
- get_type_code() == CREATE_FILE_EVENT ||
- get_type_code() == APPEND_BLOCK_EVENT ||
- get_type_code() == EXEC_LOAD_EVENT ||
- get_type_code() == DELETE_FILE_EVENT ||
- get_type_code() == NEW_LOAD_EVENT ||
- get_type_code() == FORMAT_DESCRIPTION_EVENT ||
- get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
- get_type_code() == EXECUTE_LOAD_QUERY_EVENT ||
+ (!query_in_parallel &&
+ ((get_type_code() == QUERY_EVENT
+ && !starts_group() && !ends_group()) ||
+ get_type_code() == INTVAR_EVENT ||
+ get_type_code() == USER_VAR_EVENT ||
+ get_type_code() == RAND_EVENT)) ||
+
+ (!group_term_in_parallel && ends_group()) ||
+
+ get_type_code() == START_EVENT_V3 ||
+ get_type_code() == STOP_EVENT ||
+ get_type_code() == ROTATE_EVENT ||
+ get_type_code() == LOAD_EVENT ||
+ get_type_code() == SLAVE_EVENT ||
+ get_type_code() == CREATE_FILE_EVENT ||
+ get_type_code() == APPEND_BLOCK_EVENT ||
+ get_type_code() == EXEC_LOAD_EVENT ||
+ get_type_code() == DELETE_FILE_EVENT ||
+ get_type_code() == NEW_LOAD_EVENT ||
+ get_type_code() == FORMAT_DESCRIPTION_EVENT||
+ get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
+ get_type_code() == EXECUTE_LOAD_QUERY_EVENT|| /* todo: make parallel */
get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
- get_type_code() == PRE_GA_UPDATE_ROWS_EVENT ||
- get_type_code() == PRE_GA_DELETE_ROWS_EVENT ||
+ get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
+ get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
get_type_code() == INCIDENT_EVENT;
}
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2010-11-23 09:03:37 +0000
+++ b/sql/mysqld.cc 2010-11-26 21:08:30 +0000
@@ -463,6 +463,7 @@ ulonglong slave_type_conversions_options
ulong slave_parallel_workers;
ulong slave_max_pending_jobs;
my_bool slave_local_timestamp_opt;
+my_bool opt_slave_run_query_in_parallel;
ulong thread_cache_size=0;
ulong binlog_cache_size=0;
ulonglong max_binlog_cache_size=0;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2010-09-21 22:19:05 +0000
+++ b/sql/mysqld.h 2010-11-26 21:08:30 +0000
@@ -175,6 +175,7 @@ extern uint slave_net_timeout;
extern ulong slave_parallel_workers;
extern ulong slave_max_pending_jobs;
extern my_bool slave_local_timestamp_opt;
+extern my_bool opt_slave_run_query_in_parallel;
extern uint max_user_connections;
extern ulong what_to_log,flush_time;
extern ulong max_prepared_stmt_count, prepared_stmt_count;
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-11-25 09:03:54 +0000
+++ b/sql/rpl_rli.h 2010-11-26 21:08:30 +0000
@@ -428,15 +428,16 @@ public:
Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
Slave_committed_queue *gaq;
DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
- DYNAMIC_ARRAY curr_group_da; // deferred array to hold part-info-free events
- bool curr_group_seen_begin; // current group started with B-event or not
- volatile Slave_worker* slave_worker_is_error;
+ DYNAMIC_ARRAY curr_group_da; // deferred array to hold part-info-free events
+ bool curr_group_seen_begin; // current group started with B-event or not
+ bool run_query_in_parallel; // Query's default db not the actual db as part
Slave_worker* get_current_worker() const;
Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
ulonglong mts_total_groups; // total event groups distributed in current session
-
+ volatile Slave_worker* slave_worker_is_error;
+ bool curr_group_is_parallel; // a mark for Coord to indicate on T-event of the curr group at delete
/*
A sorted array of Worker current assignements number to provide
approximate view on Workers loading.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-11-26 16:15:37 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-11-26 21:08:30 +0000
@@ -285,12 +285,6 @@ Slave_worker *get_slave_worker(const cha
insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) key);
DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
- /*
- The database name was not found which means that a worker never
- processed events from that database. In such case, we need to
- map the database to a worker my inserting an entry into the
- hash map.
- */
hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
dblength);
@@ -302,6 +296,12 @@ Slave_worker *get_slave_worker(const cha
(uchar*) dbname, dblength);
if (!entry)
{
+ /*
+ The database name was not found which means that a worker never
+ processed events from that database. In such case, we need to
+ map the database to a worker my inserting an entry into the
+ hash map.
+ */
my_bool ret;
mysql_mutex_unlock(&slave_worker_hash_lock);
@@ -327,6 +327,7 @@ Slave_worker *get_slave_worker(const cha
*/
entry->worker= !rli->last_assigned_worker ?
get_least_occupied_worker(workers) : rli->last_assigned_worker;
+ entry->worker->usage_partition++;
mysql_mutex_lock(&slave_worker_hash_lock);
ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
@@ -347,7 +348,9 @@ Slave_worker *get_slave_worker(const cha
{
entry->worker= !rli->last_assigned_worker ?
get_least_occupied_worker(workers) : rli->last_assigned_worker;
+ entry->worker->usage_partition++;
entry->usage++;
+
my_hash_update(&mapping_db_to_worker, (uchar*) entry,
(uchar*) dbname, dblength);
}
@@ -361,20 +364,37 @@ Slave_worker *get_slave_worker(const cha
my_hash_update(&mapping_db_to_worker, (uchar*) entry,
(uchar*) dbname, dblength);
}
- else // may be the hashing conflict
+ else
{
- DBUG_ASSERT(rli->last_assigned_worker == NULL ||
- rli->curr_group_assigned_parts.elements > 1);
+ // The case APH contains a W_d != W_c != NULL assigned to
+ // D-partition represents
+ // the hashing conflict and is handled as the following:
+
+ THD *thd= rli->info_thd;
+ const char *proc_info;
+ const char info_format[]=
+ "Waiting for Slave Worker %d to release partition `%s`";
+ char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
+ NAME_LEN + 1];
- DBUG_ASSERT(0); // ... TODO ... *not* ready yet
+ DBUG_ASSERT(rli->last_assigned_worker != NULL &&
+ rli->curr_group_assigned_parts.elements > 1);
// future assignenment and marking at the same time
entry->worker= rli->last_assigned_worker;
- wait();
+ sprintf(wait_info, info_format, entry->worker->id, entry->db);
+
+ proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
+ wait_info);
+ mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+ thd->exit_cond(proc_info);
+ mysql_mutex_lock(&slave_worker_hash_lock);
DBUG_ASSERT(entry->usage == 0);
+
entry->usage= 1;
+ entry->worker->usage_partition++;
}
mysql_mutex_unlock(&slave_worker_hash_lock);
@@ -408,10 +428,6 @@ Slave_worker *get_least_occupied_worker(
DBUG_ASSERT(worker != NULL);
- worker->usage_partition++;
-
- DBUG_ASSERT(worker->usage_partition != 0);
-
return(worker);
}
@@ -449,7 +465,7 @@ void Slave_worker::slave_worker_ends_gro
my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
(uchar*) key + 1, key[0]);
- DBUG_ASSERT(entry && entry->usage != 0 && entry->worker == this);
+ DBUG_ASSERT(entry && entry->usage != 0);
DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
@@ -458,7 +474,11 @@ void Slave_worker::slave_worker_ends_gro
(uchar*) key + 1, key[0]);
if (entry->usage == 0)
+ {
usage_partition--;
+ if (entry->worker != this) // Coordinator is waiting
+ mysql_cond_signal(&slave_worker_hash_cond);
+ }
else
DBUG_ASSERT(usage_partition != 0);
/*
@@ -645,3 +665,45 @@ ulong Slave_committed_queue::move_queue_
return cnt;
}
+
+
+int wait_for_workers_to_finish(Relay_log_info const *rli)
+{
+ uint ret= 0;
+ HASH *hash= &mapping_db_to_worker;
+ for (uint i= 0, ret= 0; i < hash->records; i++)
+ {
+ db_worker *entry;
+ THD *thd= rli->info_thd;
+ const char *proc_info;
+ const char info_format[]=
+ "Waiting for Slave Worker %d to release partition `%s`";
+ char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
+ NAME_LEN + 1];
+
+ mysql_mutex_lock(&slave_worker_hash_lock);
+
+ entry= (db_worker*) my_hash_element(hash, i);
+
+ DBUG_ASSERT(entry);
+
+ if (entry->usage > 0)
+ {
+ sprintf(wait_info, info_format, entry->worker->id, entry->db);
+ entry->worker= NULL;
+
+ proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
+ wait_info);
+ mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+ thd->exit_cond(proc_info);
+ ret++;
+
+ DBUG_ASSERT(entry->usage == 0);
+ }
+ else
+ {
+ mysql_mutex_unlock(&slave_worker_hash_lock);
+ }
+ }
+ return ret;
+}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-11-25 09:03:54 +0000
+++ b/sql/rpl_rli_pdb.h 2010-11-26 21:08:30 +0000
@@ -24,6 +24,7 @@ bool init_hash_workers(ulong slave_paral
void destroy_hash_workers();
Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
+int wait_for_workers_to_finish(Relay_log_info const *rli);
#define SLAVE_WORKER_QUEUE_SIZE 8096
#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-25 09:03:54 +0000
+++ b/sql/rpl_slave.cc 2010-11-26 21:08:30 +0000
@@ -2615,7 +2615,7 @@ int apply_event_and_update_pos(Log_event
if (!rli->is_in_group() && rli->slave_exec_mode != slave_exec_mode_options)
rli->slave_exec_mode= slave_exec_mode_options;
- int reason= ev->shall_skip(rli);
+ int reason= ev->shall_skip(rli); // TODO: MTS skip handling
if (reason == Log_event::EVENT_SKIP_COUNT)
{
sql_slave_skip_counter= --rli->slave_skip_counter;
@@ -2878,8 +2878,9 @@ static int exec_relay_log_event(THD* thd
if (thd->variables.binlog_rows_query_log_events)
handle_rows_query_log_event(ev, rli);
- if (!rli->is_parallel_exec() && !ev->only_serial_exec() &&
- ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts todo: check this case
+ if ((!rli->is_parallel_exec() ||
+ ev->only_serial_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
+ && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts TODO: check this case
{
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
@@ -3600,8 +3601,6 @@ err:
mysql_mutex_unlock(&LOCK_thread_count);
}
- delete w->w_rli; // fixme: experimenting
-
my_thread_end();
pthread_exit(0);
DBUG_RETURN(0);
@@ -3628,7 +3627,7 @@ int slave_start_single_worker(Relay_log_
// fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
// fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
w->w_rli= new Relay_log_info(FALSE);
- Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(FALSE);
+ Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(TRUE);
w->w_rli->set_rpl_info_handler(dummy_handler);
ulong key_worker_idx[]= { server_id, w->id };
w->init_info(key_worker_idx, NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER);
@@ -3712,7 +3711,7 @@ int slave_start_workers(Relay_log_info *
rli->mts_total_groups= 0;
rli->slave_worker_is_error= NULL;
rli->curr_group_seen_begin= NULL;
-
+ rli->run_query_in_parallel= opt_slave_run_query_in_parallel;
for (i= 0; i < n; i++)
{
if ((error= slave_start_single_worker(rli, i)))
@@ -3785,6 +3784,8 @@ void slave_stop_workers(Relay_log_info *
delete_dynamic(&w->jobs.Q);
delete_dynamic(&w->curr_group_exec_parts); // GCEP
delete_dynamic_element(&rli->workers, i);
+ delete w->w_rli;
+
delete w;
}
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-11-20 17:23:42 +0000
+++ b/sql/sys_vars.cc 2010-11-26 21:08:30 +0000
@@ -3121,6 +3121,12 @@ static Sys_var_mybool Sys_slave_local_ti
"time value to implicitly affected timestamp columms. Otherwise (default) "
"installs prescribed by the master value.",
GLOBAL_VAR(slave_local_timestamp_opt), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+static Sys_var_mybool Sys_slave_run_query_in_parallel(
+ "slave_run_query_in_parallel",
+ "The default not an actual database name is used as partition info "
+ "for parallel execution of Query_log_event ",
+ GLOBAL_VAR(opt_slave_run_query_in_parallel), CMD_LINE(OPT_ARG),
+ DEFAULT(FALSE));
#endif
static bool check_locale(sys_var *self, THD *thd, set_var *var)
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101126210830-eqhloz0dt6xp23pz.bundle
| Thread |
|---|
| • bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3218 to 3219)WL#5569 | Andrei Elkin | 26 Nov |