#At file:///home/andrei/MySQL/BZR/2a-23May/WL/wl5563-paraslave_part_db/ based on revid:aelkin@stripped
3200 Andrei Elkin 2010-09-17
wl#5563
improved test;
fixed a delete issue that was used to crash;
added @@global.slave_local_timestamp to fill in timestamp col slave clock value.
Performance growth can be seen through the test.
todo: merge with Alfranio work on hashing and dyn alloc of PFS obj:s.
modified:
mysql-test/suite/rpl/r/rpl_parallel.result
mysql-test/suite/rpl/t/rpl_parallel.test
sql/log_event.cc
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.h
sql/rpl_slave.cc
sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result 2010-09-15 11:51:49 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result 2010-09-17 08:49:00 +0000
@@ -4,37 +4,16 @@ reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
+call mtr.add_suppression('Parallel Slave: pending jobs get to the max');
include/stop_slave.inc
set @save.slave_exec_mode= @@global.slave_exec_mode;
set @@global.slave_exec_mode = 'Parallel';
-start slave io_thread;
-create database test3;
-use test3;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-create database test2;
-use test2;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-create database test1;
-use test1;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-create database test0;
-use test0;
-create table tm_nk (a int, b int) engine=myisam;
-create table tm_wk (a int auto_increment primary key, b int) engine=myisam;
-create table ti_nk (a int, b int) engine=innodb;
-create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
-start slave sql_thread;
+start slave;
+stop slave sql_thread;
+*** you can connect and change the exec mode as well now ***
+*** and select * from benchmark before to run consistency check ***
+Comparing tables master:test3.tm_nk and slave:test3.tm_nk
+Comparing tables master:test2.tm_nk and slave:test2.tm_nk
+Comparing tables master:test1.tm_nk and slave:test1.tm_nk
Comparing tables master:test0.tm_nk and slave:test0.tm_nk
-Comparing tables master:test0.tm_wk and slave:test0.tm_wk
-Comparing tables master:test0.ti_nk and slave:test0.ti_nk
-Comparing tables master:test0.ti_wk and slave:test0.ti_wk
set @@global.slave_exec_mode= @save.slave_exec_mode;
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test 2010-09-15 11:51:49 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2010-09-17 08:49:00 +0000
@@ -1,13 +1,16 @@
source include/master-slave.inc;
let $workers = 4;
-let $iter = 50;
+let $iter = 5000;
connection slave;
+
+call mtr.add_suppression('Parallel Slave: pending jobs get to the max');
+
source include/stop_slave.inc;
set @save.slave_exec_mode= @@global.slave_exec_mode;
set @@global.slave_exec_mode = 'Parallel';
-start slave io_thread;
+start slave;
connection master;
@@ -28,9 +31,6 @@ begin
end|
delimiter ;|
---enable_result_log
---enable_query_log
-
## let $i = $workers + 1;
##eval
# delimiter |;
@@ -72,17 +72,77 @@ while($i)
create table ti_nk (a int, b int) engine=innodb;
create table ti_wk (a int auto_increment primary key, b int) engine=innodb;
+ # this table is special - just for timing. It's more special on test0 db
+ # where it contains master timing of the load as well.
+ create table benchmark (state text) engine=myisam; # timestamp keep on the slave side
+
dec $i;
}
+--enable_result_log
+--enable_query_log
+
+
+#connection slave;
+sync_slave_with_master;
+
+--disable_query_log
+--disable_result_log
+
+let $i = $workers + 1;
+while($i)
+{
+ let $i1 = $i;
+ dec $i1;
+ eval use test$i1;
+ alter table benchmark add ts timestamp not null default current_timestamp;
+
+ dec $i;
+}
+--enable_result_log
+--enable_query_log
+
+
+# not gather events into relay log w/o executing yet
+stop slave sql_thread;
+
##call p1(1);
+connection master;
+
--disable_query_log
--disable_result_log
#
# Load producer
#
+
+# initial timestamp to record
+
+# the extra ts col on slave is effective only with the STMT format (todo: bug-report)
+set @save.binlog_format= @@session.binlog_format;
+set @@session.binlog_format=STATEMENT;
+let $i = $workers + 1;
+while($i)
+{
+ let $i1 = $i;
+ dec $i1;
+ eval use test$i1;
+
+ insert into benchmark set state='slave takes on load';
+
+ dec $i;
+}
+set @@session.binlog_format= @save.binlog_format;
+
+connection slave;
+
+use test0;
+insert into benchmark set state='master started load';
+
+
+connection master;
+
while ($iter)
{
let $i = $workers + 1;
@@ -105,6 +165,30 @@ while ($iter)
dec $iter;
}
+connection slave;
+
+use test0;
+insert into benchmark set state='master ends load';
+
+connection master;
+
+# terminal timestamp to record
+
+let $i = $workers + 1;
+set @save.binlog_format= @@session.binlog_format;
+set @@session.binlog_format=STATEMENT;
+while($i)
+{
+ let $i1 = $i;
+ dec $i1;
+ eval use test$i1;
+
+ insert into benchmark set state='slave ends load';
+
+ dec $i;
+}
+set @@session.binlog_format= @save.binlog_format;
+
--enable_result_log
--enable_query_log
@@ -113,26 +197,35 @@ connection slave;
## todo: record start and end time of appying to compare times of
# parallel and serial execution.
-start slave sql_thread;
+--disable_query_log
+--disable_result_log
+
+### --sleep 15 # todo: convert to wait for the last event has been applied
+
+--echo *** you can connect and change the exec mode as well now ***
+--echo *** and select * from benchmark before to run consistency check ***
+
+insert into test0.benchmark set state='slave is processing load';
---sleep 5 # todo: convert to wait for the last event has been applied
+select sleep(10000);
+### start slave sql_thread;
-let $diff_table_1=master:test0.tm_nk;
-let $diff_table_2=slave:test0.tm_nk;
-source include/diff_tables.inc;
-let $diff_table_1=master:test0.tm_wk;
-let $diff_table_2=slave:test0.tm_wk;
-source include/diff_tables.inc;
+let $i = $workers + 1;
+while($i)
+{
+ let $i1 = $i;
+ dec $i1;
-let $diff_table_1=master:test0.ti_nk;
-let $diff_table_2=slave:test0.ti_nk;
-source include/diff_tables.inc;
+ let $diff_table_1=master:test$i1.tm_nk;
+ let $diff_table_2=slave:test$i1.tm_nk;
+ source include/diff_tables.inc;
-let $diff_table_1=master:test0.ti_wk;
-let $diff_table_2=slave:test0.ti_wk;
-source include/diff_tables.inc;
+ dec $i;
+}
+--enable_result_log
+--enable_query_log
connection master;
@@ -156,6 +249,7 @@ connection slave;
set @@global.slave_exec_mode= @save.slave_exec_mode;
connection master;
-### select sleep(10000);
+
+###select sleep(10000);
# End of 4.1 tests
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-09-15 11:51:49 +0000
+++ b/sql/log_event.cc 2010-09-17 08:49:00 +0000
@@ -2167,7 +2167,10 @@ void append_item_to_jobs(struct slave_jo
mysql_mutex_lock(&w->jobs_lock);
if (!w->thd->is_slave_error) // error check exists at stmt commit as well
{
- w->jobs.push_back(job_item, &w->mem_root);
+ int err;
+ err= w->jobs.push_back(job_item, &w->mem_root);
+ DBUG_ASSERT(!err);
+
w->curr_jobs++;
if (w->jobs.elements == 1)
mysql_cond_signal(&w->jobs_cond);
@@ -2185,11 +2188,6 @@ void append_item_to_jobs(struct slave_jo
scheduling event execution either serially or in parallel
*/
int Log_event::apply_event(Relay_log_info const *rli)
-#if 0
-{
- return do_apply_event(rli);
-}
-#else
{
uint i_w;
DBUG_ENTER("LOG_EVENT:apply_event");
@@ -2208,7 +2206,6 @@ int Log_event::apply_event(Relay_log_inf
mysql_mutex_lock(&const_cast<Relay_log_info*>(rli)->pending_jobs_lock);
job_item= const_cast<Relay_log_info*>(rli)->free_jobs.pop();
mysql_mutex_unlock(&const_cast<Relay_log_info*>(rli)->pending_jobs_lock);
- DBUG_PRINT("Log_event::apply_event:", ("-> job item: %p data %p to W_%d", job_item, this, i_w));
if (!job_item)
{
/*
@@ -2219,13 +2216,14 @@ int Log_event::apply_event(Relay_log_inf
job_item= (struct slave_job_item *)
alloc_root(&const_cast<Relay_log_info*>(rli)->job_list_mem_root,
sizeof(struct slave_job_item));
+ DBUG_PRINT("Log_event::apply_event:", ("allocated: %p item", job_item));
}
+ DBUG_PRINT("Log_event::apply_event:", ("-> job item: %p data %p to W_%d", job_item, this, i_w));
job_item->data= this;
append_item_to_jobs(job_item, w, const_cast<Relay_log_info*>(rli));
DBUG_RETURN(0);
}
-#endif
/**
Worker's routine to wait for a new assignement in its
@@ -2274,6 +2272,7 @@ int slave_worker_exec_job(struct slave_w
struct slave_job_item *job_item;
THD *thd= w->thd;
Log_event *ev= NULL;
+ int err;
DBUG_ENTER("slave_worker_exec_job");
@@ -2284,6 +2283,7 @@ int slave_worker_exec_job(struct slave_w
goto err;
}
ev= static_cast<Log_event*>(job_item->data);
+ DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data %p", w->id, job_item, ev));
thd->server_id = ev->server_id;
thd->set_time();
@@ -2300,9 +2300,15 @@ int slave_worker_exec_job(struct slave_w
rli->pending_jobs--;
DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max &&
rli->pending_jobs >= 0);
- rli->free_jobs.push_back(job_item, &rli->job_list_mem_root);
- DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data %p", w->id, job_item, ev));
-
+
+ // todo: move afront of applying
+ DBUG_PRINT("slave_worker_exec_job:", ("W_%lu -> job item: %p data %p", w->id, job_item, ev));
+#ifndef DBUG_OFF
+ job_item->data= (char*) w->id; // mark as used by worker
+#endif
+ err= rli->free_jobs.push_back(job_item, &rli->job_list_mem_root);
+ DBUG_ASSERT(!err);
+
/* coordinator can be waiting */
if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
@@ -2318,17 +2324,31 @@ int slave_worker_exec_job(struct slave_w
}
else
{
- // Accumulating events is supposed to be rare.
- // todo: add a private memroot, max length param, overrun guard, statistics
- Log_event *used_ev;
- w->data_in_use.push_back(ev);
- while (w->data_in_use.is_empty() &&
- (used_ev=
- static_cast<Log_event *>(w->data_in_use.first_node()->info))->soiled)
- {
- DBUG_PRINT("slave_worker_exec_job GC:", ("W_%lu, event %p", w->id, used_ev));
- delete used_ev;
- w->data_in_use.pop();
+ // Accumulating to be-deleted events is supposed to be pretty rare,
+ // still necessary.
+ // todo: convert ev->update_pos() into rli->update_pos() and get rid of this
+ // workaround.
+
+ mysql_mutex_lock(&w->jobs_lock);
+ w->data_in_use.push_back(ev, &w->mem_root);
+ mysql_mutex_unlock(&w->jobs_lock);
+
+ while (!w->data_in_use.is_empty())
+ {
+ struct slave_job_item *item=
+ static_cast<struct slave_job_item *>(w->data_in_use.first_node()->info);
+ Log_event *used_ev= static_cast<Log_event *>(item->data);
+
+ if (used_ev->soiled)
+ {
+ DBUG_PRINT("GC:", ("W_%lu, event %p", w->id, used_ev));
+ delete used_ev;
+ w->data_in_use.pop();
+ }
+ else
+ {
+ break;
+ }
}
}
@@ -3481,7 +3501,7 @@ int Query_log_event::do_apply_event(Rela
*/
if (is_trans_keyword() || rpl_filter->db_ok(thd->db))
{
- thd->set_time((time_t)when);
+ thd->set_time(!slave_local_timestamp_opt ? (time_t)when : my_time(0));
thd->set_query_and_id((char*)query_arg, q_len_arg, next_query_id());
thd->variables.pseudo_thread_id= thread_id; // for temp tables
DBUG_PRINT("query",("%s", thd->query()));
@@ -5040,7 +5060,7 @@ int Load_log_event::do_apply_event(NET*
*/
if (rpl_filter->db_ok(thd->db))
{
- thd->set_time((time_t)when);
+ thd->set_time(!slave_local_timestamp_opt ? (time_t)when : my_time(0));
thd->set_query_id(next_query_id());
thd->warning_info->opt_clear_warning_info(thd->query_id);
@@ -7931,7 +7951,7 @@ int Rows_log_event::do_apply_event(Relay
TIMESTAMP column to a table with one.
So we call set_time(), like in SBR. Presently it changes nothing.
*/
- thd->set_time((time_t)when);
+ thd->set_time(!slave_local_timestamp_opt ? (time_t)when : my_time(0));
/*
Now we are in a statement and will stay in a statement until we
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2010-09-09 18:43:16 +0000
+++ b/sql/mysqld.cc 2010-09-17 08:49:00 +0000
@@ -461,6 +461,7 @@ uint slave_net_timeout;
ulong slave_exec_mode_options;
ulonglong slave_type_conversions_options;
ulong slave_parallel_workers;
+my_bool slave_local_timestamp_opt;
ulong thread_cache_size=0;
ulong binlog_cache_size=0;
ulonglong max_binlog_cache_size=0;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2010-09-09 18:43:16 +0000
+++ b/sql/mysqld.h 2010-09-17 08:49:00 +0000
@@ -173,6 +173,7 @@ extern LEX_CSTRING reason_slave_blocked;
extern ulong slave_trans_retries;
extern uint slave_net_timeout;
extern ulong slave_parallel_workers;
+extern my_bool slave_local_timestamp_opt;
extern uint max_user_connections;
extern ulong what_to_log,flush_time;
extern ulong max_prepared_stmt_count, prepared_stmt_count;
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-09-13 10:15:38 +0000
+++ b/sql/rpl_rli.h 2010-09-17 08:49:00 +0000
@@ -28,7 +28,6 @@ class Master_info;
extern uint sql_slave_skip_counter;
#define SLAVE_WORKER_QUEUE_SIZE 8096
-#define PARA_SLAVE_JOB_PARAM_QUEUE_SIZE 1024
struct slave_job_item
{
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-09-13 10:15:38 +0000
+++ b/sql/rpl_slave.cc 2010-09-17 08:49:00 +0000
@@ -5016,7 +5016,8 @@ static Log_event* next_event(Relay_log_i
DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos);
// mts-II: consider the following if the above fails:
DBUG_ASSERT(my_b_tell(cur_log) == rli->event_relay_log_pos ||
- bit_is_set(slave_exec_mode_options, SLAVE_EXEC_MODE_PARALLEL) == 1);
+ //bit_is_set(slave_exec_mode_options, SLAVE_EXEC_MODE_PARALLEL) == 1);
+ slave_exec_mode_options == SLAVE_EXEC_MODE_PARALLEL); // todo: fix slave_exec_mode enum->set
}
#endif
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2010-09-09 18:43:16 +0000
+++ b/sql/sys_vars.cc 2010-09-17 08:49:00 +0000
@@ -3049,6 +3049,11 @@ static Sys_var_ulong Sys_slave_parallel_
"Number of worker threads for executing events in parallel ",
GLOBAL_VAR(slave_parallel_workers), CMD_LINE(REQUIRED_ARG),
VALID_RANGE(0, ULONG_MAX), DEFAULT(4), BLOCK_SIZE(1));
+static Sys_var_mybool Sys_slave_local_timestamp(
+ "slave_local_timestamp", "if enabled slave computes the event appying "
+ "time value to implicitly affected timestamp columms. Otherwise (default) "
+ "installs prescribed by the master value.",
+ GLOBAL_VAR(slave_local_timestamp_opt), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
#endif
static bool check_locale(sys_var *self, THD *thd, set_var *var)
Attachment: [text/bzr-bundle] bzr/aelkin@mysql.com-20100917084900-n4pohdldt6st9rws.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr branch (aelkin:3200) WL#5563 | Andrei Elkin | 17 Sep |