#At file:///home/andrei/MySQL/BZR/2a-23May/WL/wl5569-merge/ based on revid:andrei.elkin@stripped
3219 Andrei Elkin 2010-11-27 [merge]
merging from from wl5569 repo
added:
mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
modified:
sql/log_event.cc
sql/log_event.h
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/sys_vars.cc
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_conflicts.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_conflicts.result 2010-11-26 21:08:30 +0000
@@ -0,0 +1,78 @@
+stop slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+reset master;
+reset slave;
+drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
+start slave;
+create view coord_wait_list as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+include/stop_slave.inc
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+include/start_slave.inc
+create database d1;
+create database d2;
+create database d3;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+create table d2.t1 (a int auto_increment primary key) engine=innodb;
+create table d3.t1 (a int auto_increment primary key) engine=innodb;
+begin;
+insert into d2.t1 values (1);
+begin;
+use d1;
+insert into d1.t1 values (null);
+use d2;
+insert into d2.t1 values (1);
+commit;
+begin;
+use d3;
+insert into d3.t1 values (null);
+use d1;
+insert into d1.t1 values (null);
+commit;
+rollback;
+select count(*) from d1.t1 into @d1;
+select count(*) from d2.t1 into @d2;
+select count(*) from d3.t1 into @d3;
+use d1;
+create table `exists_only_on_slave` (a int);
+begin;
+insert into d1.t1 values (null);
+insert into d2.t1 values (null);
+insert into d3.t1 values (null);
+begin;
+use d1;
+insert into d1.t1 values (null);
+commit;
+begin;
+use d2;
+insert into d2.t1 values (null);
+commit;
+begin;
+use d3;
+insert into d3.t1 values (null);
+commit;
+use d1;
+drop table if exists `exists_only_on_slave`;
+select sleep(1);
+sleep(1)
+0
+select count(*) - @d1 as 'zero' from d1.t1;
+zero
+0
+select count(*) - @d2 as 'zero' from d2.t1;
+zero
+0
+select count(*) - @d3 as 'zero' from d3.t1;
+zero
+0
+use d1;
+select count(*) as 'zero' from `exists_only_on_slave`;
+zero
+0
+rollback;
+drop database d1;
+drop database d2;
+drop database d3;
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+*** End of the tests ***
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_conflicts.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_conflicts.test 2010-11-26 21:08:30 +0000
@@ -0,0 +1,224 @@
+#
+# WL#5569 MTS
+#
+# The test checks cases of hashing conflicts forcing a special hanling.
+# The cases include
+#
+# I. two Worker jobs conflict to each other
+#
+# a. two multi-statement transactions containing more than one partition
+# in which one is common are mapped to different Workers.
+# b. similarly two autocommit queries or ddl:s
+#
+# Handling of the cases is carried out as the following:
+# when Coordinator hits to an occupied by not the currenly assigned Worker
+# partition it marks the partition and goes to wait till the Worker-owner
+# has released it and signaled.
+#
+# II. An event requires the sequential execution
+#
+# Coordinator does not schedule the event and is waiting till all workers have
+# released their partitions and signalled.
+
+source include/master-slave.inc;
+
+#
+# Testing with the statement format requires
+# @@global.slave_run_query_in_parallel = 1.
+# Notice, parallelization for Query-log-event is limitted
+# to the default dababase. That's why 'use db'.
+# With the default @@global.slave_run_query_in_parallel == 0
+# the tests in stmt format still run to prove switching to the sequential.
+
+# TODO: convert this file into two tests for either value of
+# @@global.slave_run_query_in_parallel
+
+connection slave;
+
+--disable_query_log
+--disable_result_log
+call mtr.add_suppression('Error reading slave worker configuration');
+--enable_query_log
+--enable_result_log
+
+create view coord_wait_list as SELECT id from Information_Schema.processlist where state like 'Waiting for Slave Worker%';
+
+source include/stop_slave.inc;
+
+set @save.slave_exec_mode= @@global.slave_exec_mode;
+set @@global.slave_exec_mode = 'Parallel';
+source include/start_slave.inc;
+
+
+connection master;
+
+create database d1;
+create database d2;
+create database d3;
+create table d1.t1 (a int auto_increment primary key) engine=innodb;
+create table d2.t1 (a int auto_increment primary key) engine=innodb;
+create table d3.t1 (a int auto_increment primary key) engine=innodb;
+
+#
+# I. Two parallel jobs conflict
+#
+# two conflicting jobs to follow
+
+# sync_slave_with_master
+
+# TODO: remove once `sync_slave_with_master' got fixed
+
+--sleep 3
+
+# To be really conflicting slave needs to block commit of the first.
+connection slave;
+
+begin;
+insert into d2.t1 values (1);
+
+connection master;
+
+# Job_1
+begin;
+use d1;
+insert into d1.t1 values (null);
+use d2;
+insert into d2.t1 values (1); # will be block at this point on Worker
+commit;
+
+# Job_2
+begin;
+use d3;
+insert into d3.t1 values (null);
+use d1;
+insert into d1.t1 values (null); # will be block at this point on Coord
+commit;
+
+--sleep 4
+
+connection slave;
+
+if (`SELECT @@global.binlog_format LIKE "row"`)
+{
+ if (`select COUNT(*) = 0 FROM coord_wait_list`)
+ {
+ SELECT * from Information_Schema.processlist;
+ --die Appologies, coodinator is supposed to be in the waiting state but it is not
+ }
+}
+
+# release the Worker
+rollback;
+
+let $count= 2;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+
+#
+# II. The only-sequential conflicts with ongoing parallel applying
+#
+
+# a. DDL waits for all workers have processed their earlier scheduled assignments
+
+connection slave1;
+
+# fix the tables status. Tables are supposed to exist, possibly with data left
+# after previous part.
+
+select count(*) from d1.t1 into @d1;
+select count(*) from d2.t1 into @d2;
+select count(*) from d3.t1 into @d3;
+use d1;
+create table `exists_only_on_slave` (a int);
+
+connection slave;
+
+# put in the way of workers blocking load
+
+begin;
+insert into d1.t1 values (null);
+insert into d2.t1 values (null);
+insert into d3.t1 values (null);
+
+connection master;
+
+# Job_1
+begin;
+use d1;
+insert into d1.t1 values (null);
+commit;
+
+# Job_2
+begin;
+use d2;
+insert into d2.t1 values (null);
+commit;
+
+
+# Job_3
+begin;
+use d3;
+insert into d3.t1 values (null);
+commit;
+
+--disable_warnings
+use d1;
+drop table if exists `exists_only_on_slave`;
+--enable_warnings
+
+
+connection slave1;
+
+select sleep(1); # give Workers a little time to process (but they won't)
+
+select count(*) - @d1 as 'zero' from d1.t1;
+select count(*) - @d2 as 'zero' from d2.t1;
+select count(*) - @d3 as 'zero' from d3.t1;
+
+# proof the master DDL has not got through
+use d1;
+select count(*) as 'zero' from `exists_only_on_slave`;
+
+connection slave;
+
+rollback; # release workers
+
+connection slave1;
+
+# to finish up with getting all committed.
+
+let $count= `select @d1 + 1`;
+let $table= d1.t1;
+source include/wait_until_rows_count.inc;
+
+let $count= `select @d2 + 1`;
+let $table= d2.t1;
+source include/wait_until_rows_count.inc;
+
+let $count= `select @d3 + 1`;
+let $table= d3.t1;
+source include/wait_until_rows_count.inc;
+connection slave;
+
+
+#
+# cleanup
+#
+
+connection master;
+
+drop database d1;
+drop database d2;
+drop database d3;
+
+--sleep 4
+
+connection slave;
+#sync_slave_with_master;
+
+drop view coord_wait_list;
+set @@global.slave_exec_mode= @save.slave_exec_mode;
+
+--echo *** End of the tests ***
+
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-27 09:17:41 +0000
+++ b/sql/log_event.cc 2010-11-27 17:17:41 +0000
@@ -2106,7 +2106,7 @@ bool Log_event::contains_partition_info(
r - a mini-group internal "regular" event that follows its g-parent
(Write, Update, Delete -rows)
S - sequentially applied event (may not be a part of any group).
- Events of this type are determined via @c only_serial_exec()
+ Events of this type are determined via @c only_sequential_exec()
earlier and don't cause calling this method .
T - terminator of the group (XID, COMMIT, ROLLBACK)
@@ -2140,6 +2140,7 @@ Slave_worker *Log_event::get_slave_worke
// rli->gaq->en_queue({NULL, W_s});
g= {
log_pos,
+ NULL,
(ulong) -1,
const_cast<Relay_log_info*>(rli)->mts_total_groups++
};
@@ -2205,12 +2206,35 @@ Slave_worker *Log_event::get_slave_worke
if (ends_group() || !rli->curr_group_seen_begin)
{
uint i;
- // assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
mts_group_cnt= rli->gaq->assigned_group_index;
-
+
+ if (!worker->relay_log_change_notified)
+ {
+ /*
+ Prior this event, C rotated the relay log to drop each
+ Worker's notified flag.
+ Now group terminating event initiates the new name
+ delivery through the current group relaylog slot in GAQ.
+ */
+
+ Slave_job_group *ptr_g=
+ (Slave_job_group *)
+ dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+
+ DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
+
+ ptr_g->group_relay_log_name= (char *)
+ my_malloc(strlen(const_cast<Relay_log_info*>(rli)->get_group_relay_log_name()) + 1, MYF(MY_WME));
+ strcpy(ptr_g->group_relay_log_name, const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
+ worker->relay_log_change_notified= TRUE;
+ }
+
DBUG_ASSERT(worker == rli->last_assigned_worker);
+
if (!worker)
{
+ DBUG_ASSERT(0);
+
// a very special case of the empty group: {B, T}
DBUG_ASSERT(rli->curr_group_assigned_parts.elements == 0
&& rli->curr_group_da.elements == 1);
@@ -2225,6 +2249,8 @@ Slave_worker *Log_event::get_slave_worke
// reset the B-group marker
const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
+
+ const_cast<Relay_log_info*>(rli)->curr_group_is_parallel= TRUE; // mark for Coord's T-event delete
}
return worker;
@@ -2370,10 +2396,50 @@ int Log_event::apply_event(Relay_log_inf
Slave_worker *w= NULL;
Slave_job_item item= {NULL}, *job_item= &item;
Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
+ bool parallel;
+
+ if (!(parallel= rli->is_parallel_exec()) ||
+ only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
+ {
+ if (parallel)
+ {
+ // This case relates to Query parallel apply which breaks into
+ // DDL and {B, Q, T} group, where Q owns g-parallel property.
- if (!rli->is_parallel_exec() || only_serial_exec() /* || wait(APH.N == 0) */)
+ // Apply possibly deferred B
+ if (rli->curr_group_da.elements > 0)
+ {
+ int res;
+ Log_event *ev_begin= * (Log_event**) pop_dynamic(&c_rli->curr_group_da);
+
+ DBUG_ASSERT(rli->curr_group_da.elements == 0);
+ DBUG_ASSERT(rli->curr_group_seen_begin);
+
+ res= ev_begin->do_apply_event(rli);
+ delete ev_begin;
+ /* B appears to be serial, reset parallel stautus of group
+ because the following T won't do that */
+ c_rli->curr_group_seen_begin= FALSE;
+
+ if (res)
+ DBUG_RETURN(res);
+ }
+
+ DBUG_ASSERT(!rli->curr_group_seen_begin);
+ c_rli->curr_group_is_parallel= FALSE; // Coord will destruct all the rest of events
+ if (!parallel_exec_by_coordinator(::server_id))
+ (void) wait_for_workers_to_finish(rli);
+ }
DBUG_RETURN(do_apply_event(rli));
-
+ }
+
+ if (get_type_code() == ROWS_QUERY_LOG_EVENT)
+ {
+ rli->report(ERROR_LEVEL, 0,
+ "No parallel support for ROWS_QUERY_LOG_EVENT");
+ DBUG_RETURN(1);
+ }
+
if ((!(w= get_slave_worker_id(rli)) ||
DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-11-27 09:17:41 +0000
+++ b/sql/log_event.h 2010-11-27 17:17:41 +0000
@@ -1153,40 +1153,62 @@ public:
public:
/**
- mst-II: to execute serially due to
- technical or conceptual limitation
+ MST: to execute serially due to technical or conceptual limitation
- @return TRUE for all but {Query,Rand,User_var,Intvar,Rows}_log_event
+ @return TRUE if despite permanent parallel execution mode an event
+ needs applying in a real isolation that is sequentially.
*/
- bool only_serial_exec()
+ bool only_sequential_exec(bool query_in_parallel, bool group_term_in_parallel)
{
return
- // todo: the 4 types below are limitly parallel-supported (the default
- // session db not the actual db)
+ /*
+ the 4 types below are limitly parallel-supported (the default
+ session db not the actual db).
+ Decision on BEGIN is deferred till the following event.
+ Decision on Commit or Xid is forced by the one for BEGIN.
+ */
- // get_type_code() == QUERY_EVENT ||
- // get_type_code() == INTVAR_EVENT ||
- // get_type_code() == USER_VAR_EVENT ||
- // get_type_code() == RAND_EVENT ||
-
- get_type_code() == STOP_EVENT ||
- get_type_code() == ROTATE_EVENT ||
- get_type_code() == LOAD_EVENT ||
- get_type_code() == SLAVE_EVENT ||
- get_type_code() == CREATE_FILE_EVENT ||
- get_type_code() == APPEND_BLOCK_EVENT ||
- get_type_code() == EXEC_LOAD_EVENT ||
- get_type_code() == DELETE_FILE_EVENT ||
- get_type_code() == NEW_LOAD_EVENT ||
- get_type_code() == FORMAT_DESCRIPTION_EVENT ||
- get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
- get_type_code() == EXECUTE_LOAD_QUERY_EVENT ||
+ (!query_in_parallel &&
+ ((get_type_code() == QUERY_EVENT
+ && !starts_group() && !ends_group()) ||
+ get_type_code() == INTVAR_EVENT ||
+ get_type_code() == USER_VAR_EVENT ||
+ get_type_code() == RAND_EVENT)) ||
+
+ (!group_term_in_parallel && ends_group()) ||
+
+ get_type_code() == START_EVENT_V3 ||
+ get_type_code() == STOP_EVENT ||
+ get_type_code() == ROTATE_EVENT ||
+ get_type_code() == LOAD_EVENT ||
+ get_type_code() == SLAVE_EVENT ||
+ get_type_code() == CREATE_FILE_EVENT ||
+ get_type_code() == APPEND_BLOCK_EVENT ||
+ get_type_code() == EXEC_LOAD_EVENT ||
+ get_type_code() == DELETE_FILE_EVENT ||
+ get_type_code() == NEW_LOAD_EVENT ||
+ get_type_code() == FORMAT_DESCRIPTION_EVENT||
+ get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
+ get_type_code() == EXECUTE_LOAD_QUERY_EVENT|| /* todo: make parallel */
get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
- get_type_code() == PRE_GA_UPDATE_ROWS_EVENT ||
- get_type_code() == PRE_GA_DELETE_ROWS_EVENT ||
+ get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
+ get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
get_type_code() == INCIDENT_EVENT;
}
-
+
+ /**
+ MST: some events can be applied by Coordinator concurrently with Workers.
+
+ @return TRUE if that's the case,
+ FALSE otherwise.
+ */
+ bool parallel_exec_by_coordinator(ulong slave_server_id)
+ {
+ return
+ get_type_code() == FORMAT_DESCRIPTION_EVENT &&
+ (server_id == (uint32) ::server_id);
+ }
+
/**
Events of a cetain type carry partitioning data such as db names.
*/
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2010-11-27 09:17:41 +0000
+++ b/sql/mysqld.cc 2010-11-27 17:17:41 +0000
@@ -459,6 +459,7 @@ ulonglong slave_type_conversions_options
ulong slave_parallel_workers;
ulong slave_max_pending_jobs;
my_bool slave_local_timestamp_opt;
+my_bool opt_slave_run_query_in_parallel;
ulong thread_cache_size=0;
ulong binlog_cache_size=0;
ulonglong max_binlog_cache_size=0;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2010-11-27 09:17:41 +0000
+++ b/sql/mysqld.h 2010-11-27 17:17:41 +0000
@@ -176,6 +176,7 @@ extern uint slave_net_timeout;
extern ulong slave_parallel_workers;
extern ulong slave_max_pending_jobs;
extern my_bool slave_local_timestamp_opt;
+extern my_bool opt_slave_run_query_in_parallel;
extern uint max_user_connections;
extern ulong what_to_log,flush_time;
extern ulong max_prepared_stmt_count, prepared_stmt_count;
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-11-27 09:17:41 +0000
+++ b/sql/rpl_rli.h 2010-11-27 17:17:41 +0000
@@ -438,15 +438,16 @@ public:
Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
Slave_committed_queue *gaq;
DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
- DYNAMIC_ARRAY curr_group_da; // deferred array to hold part-info-free events
- bool curr_group_seen_begin; // current group started with B-event or not
- volatile Slave_worker* slave_worker_is_error;
+ DYNAMIC_ARRAY curr_group_da; // deferred array to hold part-info-free events
+ bool curr_group_seen_begin; // current group started with B-event or not
+ bool run_query_in_parallel; // Query's default db not the actual db as part
Slave_worker* get_current_worker() const;
Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
ulonglong mts_total_groups; // total event groups distributed in current session
-
+ volatile Slave_worker* slave_worker_is_error;
+ bool curr_group_is_parallel; // a mark for Coord to indicate on T-event of the curr group at delete
/*
A sorted array of Worker current assignements number to provide
approximate view on Workers loading.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-11-25 08:47:39 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-11-27 15:36:50 +0000
@@ -138,6 +138,7 @@ bool Slave_worker::write_info(Rpl_info_h
*/
if (to->prepare_info_for_write() ||
+ to->set_info(partitions) ||
to->set_info(group_relay_log_name) ||
to->set_info((ulong)group_relay_log_pos) ||
to->set_info(group_master_log_name) ||
@@ -284,12 +285,6 @@ Slave_worker *get_slave_worker(const cha
insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) key);
DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
- /*
- The database name was not found which means that a worker never
- processed events from that database. In such case, we need to
- map the database to a worker my inserting an entry into the
- hash map.
- */
hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
dblength);
@@ -301,6 +296,12 @@ Slave_worker *get_slave_worker(const cha
(uchar*) dbname, dblength);
if (!entry)
{
+ /*
+ The database name was not found which means that a worker never
+ processed events from that database. In such case, we need to
+ map the database to a worker my inserting an entry into the
+ hash map.
+ */
my_bool ret;
mysql_mutex_unlock(&slave_worker_hash_lock);
@@ -326,6 +327,7 @@ Slave_worker *get_slave_worker(const cha
*/
entry->worker= !rli->last_assigned_worker ?
get_least_occupied_worker(workers) : rli->last_assigned_worker;
+ entry->worker->usage_partition++;
mysql_mutex_lock(&slave_worker_hash_lock);
ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
@@ -346,7 +348,9 @@ Slave_worker *get_slave_worker(const cha
{
entry->worker= !rli->last_assigned_worker ?
get_least_occupied_worker(workers) : rli->last_assigned_worker;
+ entry->worker->usage_partition++;
entry->usage++;
+
my_hash_update(&mapping_db_to_worker, (uchar*) entry,
(uchar*) dbname, dblength);
}
@@ -360,20 +364,37 @@ Slave_worker *get_slave_worker(const cha
my_hash_update(&mapping_db_to_worker, (uchar*) entry,
(uchar*) dbname, dblength);
}
- else // may be the hashing conflict
+ else
{
- DBUG_ASSERT(rli->last_assigned_worker == NULL ||
- rli->curr_group_assigned_parts.elements > 1);
+ // The case APH contains a W_d != W_c != NULL assigned to
+ // D-partition represents
+ // the hashing conflict and is handled as the following:
+
+ THD *thd= rli->info_thd;
+ const char *proc_info;
+ const char info_format[]=
+ "Waiting for Slave Worker %d to release partition `%s`";
+ char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
+ NAME_LEN + 1];
- DBUG_ASSERT(0); // ... TODO ... *not* ready yet
+ DBUG_ASSERT(rli->last_assigned_worker != NULL &&
+ rli->curr_group_assigned_parts.elements > 1);
// future assignenment and marking at the same time
entry->worker= rli->last_assigned_worker;
- wait();
+ sprintf(wait_info, info_format, entry->worker->id, entry->db);
+
+ proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
+ wait_info);
+ mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+ thd->exit_cond(proc_info);
+ mysql_mutex_lock(&slave_worker_hash_lock);
DBUG_ASSERT(entry->usage == 0);
+
entry->usage= 1;
+ entry->worker->usage_partition++;
}
mysql_mutex_unlock(&slave_worker_hash_lock);
@@ -407,10 +428,6 @@ Slave_worker *get_least_occupied_worker(
DBUG_ASSERT(worker != NULL);
- worker->usage_partition++;
-
- DBUG_ASSERT(worker->usage_partition != 0);
-
return(worker);
}
@@ -431,7 +448,23 @@ void Slave_worker::slave_worker_ends_gro
uint i;
if (!error)
+ {
+ Slave_job_group *ptr_g=
+ (Slave_job_group *)
+ dynamic_array_ptr(&c_rli->gaq->Q, c_rli->gaq->assigned_group_index);
+ if (ptr_g->group_relay_log_name != NULL)
+ {
+ // memorizing a new relay-log file name
+
+ DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
+ <= sizeof(group_relay_log_name));
+
+ strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
+ delete ptr_g->group_relay_log_name; // C allocated
+ ptr_g->group_relay_log_name= NULL; // mark freed
+ }
last_group_done_index = gaq_idx;
+ }
for (i= curr_group_exec_parts.elements; i > 0; i--)
{
@@ -448,7 +481,7 @@ void Slave_worker::slave_worker_ends_gro
my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
(uchar*) key + 1, key[0]);
- DBUG_ASSERT(entry && entry->usage != 0 && entry->worker == this);
+ DBUG_ASSERT(entry && entry->usage != 0);
DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
@@ -457,7 +490,11 @@ void Slave_worker::slave_worker_ends_gro
(uchar*) key + 1, key[0]);
if (entry->usage == 0)
+ {
usage_partition--;
+ if (entry->worker != this) // Coordinator is waiting
+ mysql_cond_signal(&slave_worker_hash_cond);
+ }
else
DBUG_ASSERT(usage_partition != 0);
/*
@@ -644,3 +681,45 @@ ulong Slave_committed_queue::move_queue_
return cnt;
}
+
+
+int wait_for_workers_to_finish(Relay_log_info const *rli)
+{
+ uint ret= 0;
+ HASH *hash= &mapping_db_to_worker;
+ for (uint i= 0, ret= 0; i < hash->records; i++)
+ {
+ db_worker *entry;
+ THD *thd= rli->info_thd;
+ const char *proc_info;
+ const char info_format[]=
+ "Waiting for Slave Worker %d to release partition `%s`";
+ char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
+ NAME_LEN + 1];
+
+ mysql_mutex_lock(&slave_worker_hash_lock);
+
+ entry= (db_worker*) my_hash_element(hash, i);
+
+ DBUG_ASSERT(entry);
+
+ if (entry->usage > 0)
+ {
+ sprintf(wait_info, info_format, entry->worker->id, entry->db);
+ entry->worker= NULL;
+
+ proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
+ wait_info);
+ mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+ thd->exit_cond(proc_info);
+ ret++;
+
+ DBUG_ASSERT(entry->usage == 0);
+ }
+ else
+ {
+ mysql_mutex_unlock(&slave_worker_hash_lock);
+ }
+ }
+ return ret;
+}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-11-25 09:03:54 +0000
+++ b/sql/rpl_rli_pdb.h 2010-11-27 15:36:50 +0000
@@ -24,6 +24,7 @@ bool init_hash_workers(ulong slave_paral
void destroy_hash_workers();
Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
+int wait_for_workers_to_finish(Relay_log_info const *rli);
#define SLAVE_WORKER_QUEUE_SIZE 8096
#define SLAVE_INIT_DBS_IN_GROUP 4 // initial allocation for CGEP dynarray
@@ -90,9 +91,19 @@ public:
typedef struct st_slave_job_group
{
- //struct event_coordinates coord;
- my_off_t pos; // filename in Slave_committed_queue::current_binlog[]
+ my_off_t master_log_pos;
+ /*
+ When RL name changes C allocates and fill in a new name of RL,
+ otherwise it fills in NULL.
+ C keeps track of each Worker has been notified on the updating
+ to make sure the routine runs once per change.
+
+ W checks the value at commit and memoriezes a not-NULL
+ with prior freeing old one's allocation. The memorized value
+ plays its role at commit until a new has arrived.
+ */
+ char *group_relay_log_name;
ulong worker_id;
ulonglong total_seqno;
} Slave_job_group;
@@ -106,9 +117,6 @@ class Slave_committed_queue : public cir
{
public:
- /* Allocation of file_name that is common for all Slave_assigned_job_group:s */
- char current_binlog[FN_REFLEN];
-
/* master's Rot-ev exec */
void update_current_binlog(const char *post_rotate);
@@ -128,7 +136,6 @@ public:
: circular_buffer_queue(el_size, max, inc)
{
uint k;
- strmake(current_binlog, log, sizeof(current_binlog) - 1);
my_init_dynamic_array(&last_done, sizeof(s), n, 0);
for (k= 0; k < n; k++)
insert_dynamic(&last_done, (uchar*) &s); // empty for each Worker
@@ -187,6 +194,7 @@ public:
ulong trans_jobs; // how many jobs per trns
volatile int curr_jobs; // the current assignments
ulong usage_partition; // number of different partitions handled by this worker
+ volatile bool relay_log_change_notified; // Coord sets and resets, W can read
/*
We need to make this a dynamic field. /Alfranio
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-27 09:17:41 +0000
+++ b/sql/rpl_slave.cc 2010-11-27 17:17:41 +0000
@@ -2625,7 +2625,7 @@ int apply_event_and_update_pos(Log_event
if (!rli->is_in_group() && rli->slave_exec_mode != slave_exec_mode_options)
rli->slave_exec_mode= slave_exec_mode_options;
- int reason= ev->shall_skip(rli);
+ int reason= ev->shall_skip(rli); // TODO: MTS skip handling
if (reason == Log_event::EVENT_SKIP_COUNT)
{
sql_slave_skip_counter= --rli->slave_skip_counter;
@@ -2687,7 +2687,7 @@ int apply_event_and_update_pos(Log_event
// It was served so inside apply_event() above.
// The main RLI table is safe to update now.
-// if (!rli->is_parallel_exec() || ev->only_serial_exec())
+// if (!rli->is_parallel_exec() || ev->only_sequential_exec())
error= ev->update_pos(rli);
#if 1
@@ -2888,8 +2888,9 @@ static int exec_relay_log_event(THD* thd
if (thd->variables.binlog_rows_query_log_events)
handle_rows_query_log_event(ev, rli);
- if (!rli->is_parallel_exec() && !ev->only_serial_exec() &&
- ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts todo: check this case
+ if ((!rli->is_parallel_exec() ||
+ ev->only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
+ && ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts TODO: check this case
{
DBUG_PRINT("info", ("Deleting the event after it has been executed"));
@@ -3608,11 +3609,9 @@ err:
mysql_mutex_unlock(&LOCK_thread_count);
}
- delete w->w_rli; // fixme: experimenting
-
my_thread_end();
pthread_exit(0);
- DBUG_RETURN(0);
+ DBUG_RETURN(0);
}
/**
@@ -3636,11 +3635,13 @@ int slave_start_single_worker(Relay_log_
// fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
// fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
w->w_rli= new Relay_log_info(FALSE);
- Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(FALSE);
+ Rpl_info_dummy *dummy_handler= new Rpl_info_dummy(TRUE);
w->w_rli->set_rpl_info_handler(dummy_handler);
ulong key_worker_idx[]= { server_id, w->id };
w->init_info(key_worker_idx, NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER);
+ w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
+
// ALFRANIO --> The recovery procedure must be introduced here.
w->w_rli->workers= rli->workers; // shallow copying is sufficient
@@ -3720,7 +3721,7 @@ int slave_start_workers(Relay_log_info *
rli->mts_total_groups= 0;
rli->slave_worker_is_error= NULL;
rli->curr_group_seen_begin= NULL;
-
+ rli->run_query_in_parallel= opt_slave_run_query_in_parallel;
for (i= 0; i < n; i++)
{
if ((error= slave_start_single_worker(rli, i)))
@@ -3793,6 +3794,8 @@ void slave_stop_workers(Relay_log_info *
delete_dynamic(&w->jobs.Q);
delete_dynamic(&w->curr_group_exec_parts); // GCEP
delete_dynamic_element(&rli->workers, i);
+ delete w->w_rli;
+
delete w;
}
@@ -5360,6 +5363,13 @@ static Log_event* next_event(Relay_log_i
rli->flush_info(key_info_idx, key_info_size);
}
+ /* Reset the relay-log-change-notified status of Slave Workers */
+ for (uint i; i < rli->workers.elements; i++)
+ {
+ Slave_worker *w= (Slave_worker *) dynamic_array_ptr(&rli->workers, i);
+ w->relay_log_change_notified= FALSE;
+ }
+
/*
Now we want to open this next log. To know if it's a hot log (the one
being written by the I/O thread now) or a cold log, we can use
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-11-27 09:17:41 +0000
+++ b/sql/sys_vars.cc 2010-11-27 17:17:41 +0000
@@ -3052,6 +3052,12 @@ static Sys_var_mybool Sys_slave_local_ti
"time value to implicitly affected timestamp columms. Otherwise (default) "
"installs prescribed by the master value.",
GLOBAL_VAR(slave_local_timestamp_opt), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+static Sys_var_mybool Sys_slave_run_query_in_parallel(
+ "slave_run_query_in_parallel",
+ "The default not an actual database name is used as partition info "
+ "for parallel execution of Query_log_event ",
+ GLOBAL_VAR(opt_slave_run_query_in_parallel), CMD_LINE(OPT_ARG),
+ DEFAULT(FALSE));
#endif
static bool check_locale(sys_var *self, THD *thd, set_var *var)
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101127171741-fwobpf952r5jdudi.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3219) | Andrei Elkin | 27 Nov |