#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3283 Andrei Elkin 2011-05-28
wl#5569 MTS
Implementation of giving out the applier role to Worker for all cases but
ones dealing with the Coordinators state.
That includes Query event with over-max-db:s and Load-data related events.
The current patch also makes old master binlog be handled by MTS though
sometimes e.g for Query event to switch to the sequential mode.
Fixing a race condition making C to wait endlessly if a Worker has exitted due to its
applying error.
@ mysql-test/suite/rpl/r/rpl_parallel_ddl.result
results updated.
@ mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
results updated.
@ mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
results updated.
@ mysql-test/suite/rpl/t/disabled.def
Restoring tree tests as this patch makes them runable.
@ mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
test can't run in MTS because of trans retry.
@ mysql-test/suite/rpl/t/rpl_dual_pos_advance.test
test can't run in MTS because of Until option of START SLAVE is not yet supported by MTS.
@ mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt
rpl_parallel tests need --slave-transaction-retries=0
@ mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt
rpl_parallel tests need --slave-transaction-retries=0
@ mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt
rpl_parallel tests need --slave-transaction-retries=0
@ mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt
rpl_parallel tests need --slave-transaction-retries=0
@ sql/event_parse_data.cc
Pleasing some tests.
@ sql/log_event.cc
Making a group of event w/o B/C braces be handled by Worker.
Such group can happen from an old master or the current master bilogging
some SP queries.
Also over-max db:s events are made to be handled by Worker.
Coordinator only handles asyncrounously events dealing with Relay-log state
and synchrounously events dealing with checkpoint changes (master-group coordinates).
Also few types of events from OM are left to Coordinator to execute.
@ sql/log_event.h
Leaving in mts_sequential_exec() only events that either
can deal with Coordinator state, or are from old master.
Making Query_log_event::mts_get_dbs to return a list with
a magic ""-empty string partition name
in case of over-max db:s query.
The empty magic is converted into a record to APH to indicate
the whole hash records lock.
@ sql/rpl_rli_pdb.cc
Changes due to redifining an object responsible to hold assigned partitions
in few methods incl Slave_worker::slave_worker_ends_group().
Some cleanup in get_slave_worker().
@ sql/rpl_rli_pdb.h
Redifining an object responsible to hold assigned partitions.
Now it's a Dyn-array holding *pointers* to records on Assigned Partition Hash.
That simplifies few routines for Worker. E.g search for the records (entries of APH) by Worker at time
of committing.
@ sql/rpl_slave.cc
Streamlining Workers state identification with a boolean running_status;
worker start and stop are controlled by means of the disignator.
added:
mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt
mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt
modified:
mysql-test/suite/rpl/r/rpl_parallel_ddl.result
mysql-test/suite/rpl/r/rpl_parallel_multi_db.result
mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
mysql-test/suite/rpl/t/disabled.def
mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
mysql-test/suite/rpl/t/rpl_dual_pos_advance.test
mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt
mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt
sql/event_parse_data.cc
sql/log_event.cc
sql/log_event.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_ddl.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_ddl.result 2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_ddl.result 2011-05-27 21:29:14 +0000
@@ -4,8 +4,6 @@ include/stop_slave.inc
set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
set @@global.mts_slave_parallel_workers= 4;
include/start_slave.inc
-Warnings:
-Note 1726 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
include/diff_tables.inc [master:d32.t8, slave:d32.t8]
include/diff_tables.inc [master:d32.t7, slave:d32.t7]
include/diff_tables.inc [master:d32.t6, slave:d32.t6]
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_multi_db.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result 2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_multi_db.result 2011-05-27 21:29:14 +0000
@@ -4,8 +4,6 @@ include/stop_slave.inc
set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
set @@global.mts_slave_parallel_workers= 4;
include/start_slave.inc
-Warnings:
-Note 1726 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
create database d8;
create table d8.t8 (a int);
select round(rand()*8) into @var;
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_temp_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result 2011-05-25 07:36:36 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_temp_query.result 2011-05-27 21:29:14 +0000
@@ -6,8 +6,6 @@ include/stop_slave.inc
set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
set @@global.mts_slave_parallel_workers= 4;
include/start_slave.inc
-Warnings:
-Note 1726 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
create database d2;
use d2;
create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;
=== modified file 'mysql-test/suite/rpl/t/disabled.def'
--- a/mysql-test/suite/rpl/t/disabled.def 2011-05-26 17:03:08 +0000
+++ b/mysql-test/suite/rpl/t/disabled.def 2011-05-27 21:29:14 +0000
@@ -16,6 +16,3 @@ rpl_row_event_max_size : Bug#55675 20
rpl_delayed_slave : Bug#57514 2010-11-09 andrei rpl_delayed_slave fails sporadically in pb
rpl_log_pos : BUG#55675 2010-09-10 alfranio rpl.rpl_log_pos fails sporadically with error binlog truncated in the middle
-rpl_sp_effects : bug@MTS SELECT with sf() logging does not follow the correct pattern (no BEGIN/COMMIT) Thu May 26 15:48:06 EEST 2011 Andrei
-rpl_auto_increment_bug33029 : same as rpl_sp_effects
-rpl.rpl_cross_version : same as rpl_sp_effects
=== modified file 'mysql-test/suite/rpl/t/rpl_deadlock_innodb.test'
--- a/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test 2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test 2011-05-27 21:29:14 +0000
@@ -2,3 +2,5 @@
-- source include/have_innodb.inc
let $engine_type=innodb;
-- source extra/rpl_tests/rpl_deadlock.test
+# --slave-transaction-retries=0 in MTS
+-- source include/not_mts_slave_parallel_workers.inc
=== modified file 'mysql-test/suite/rpl/t/rpl_dual_pos_advance.test'
--- a/mysql-test/suite/rpl/t/rpl_dual_pos_advance.test 2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_dual_pos_advance.test 2011-05-27 21:29:14 +0000
@@ -7,6 +7,8 @@
# It also will test BUG#13861.
source include/have_innodb.inc;
+# Until option of START SLAVE is not yet supported by MTS
+source include/not_mts_slave_parallel_workers.inc;
--let $rpl_topology= 1->2->1
--source include/rpl_init.inc
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_ddl-slave.opt 2011-05-27 21:29:14 +0000
@@ -0,0 +1 @@
+--slave-transaction-retries=0
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt 2011-05-26 17:03:08 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_innodb-slave.opt 2011-05-27 21:29:14 +0000
@@ -1,4 +1,4 @@
---log-warnings=0 --slave-transaction-=0 --innodb_flush_log_at_trx_commit=0 --skip-log-bin --skip-log-slave-updates --sync_binlog=0
+--log-warnings=0 --slave-transaction-retries=0 --innodb_flush_log_at_trx_commit=0 --skip-log-bin --skip-log-slave-updates --sync_binlog=0
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt 2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db-slave.opt 2011-05-27 21:29:14 +0000
@@ -1 +1,2 @@
---thread_stack=512K
+--thread_stack=512K --slave-transaction-retries=0
+
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_temp_query-slave.opt 2011-05-27 21:29:14 +0000
@@ -0,0 +1 @@
+ --log-warnings=0 --slave-transaction-retries=0
=== modified file 'sql/event_parse_data.cc'
--- a/sql/event_parse_data.cc 2010-07-02 02:58:51 +0000
+++ b/sql/event_parse_data.cc 2011-05-27 21:29:14 +0000
@@ -566,6 +566,7 @@ void Event_parse_data::check_originator_
{
/* Disable replicated events on slave. */
if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL) ||
+ (thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) ||
(thd->system_thread == SYSTEM_THREAD_SLAVE_IO))
{
DBUG_PRINT("info", ("Invoked object status set to SLAVESIDE_DISABLED."));
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-05-26 17:03:08 +0000
+++ b/sql/log_event.cc 2011-05-27 21:29:14 +0000
@@ -2361,7 +2361,8 @@ bool Log_event::contains_partition_info(
// todo: Query event is limitly supported
// which ev->get_db() yields the session db not the actual db
- (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group());
+ (get_type_code() == QUERY_EVENT && !ends_group() && !starts_group()) ||
+ (get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
}
/**
@@ -2401,13 +2402,20 @@ bool Log_event::contains_partition_info(
Slave_worker *Log_event::get_slave_worker_id(Relay_log_info *rli)
{
Slave_worker *worker= NULL;
- Slave_job_group g;
+ Slave_job_group g, *ptr_g;
bool is_b_event;
- /* checking properties and perform corresponding actions */
+ /* checking partioning properties and perform corresponding actions */
- // Beginning of a group or a DDL
- if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
+ // Beginning of a group designated explicitly with BEGIN
+ if ((is_b_event= starts_group()) ||
+ // or DDL:s or autocommit queries possibly associated with own p-events
+ (!rli->curr_group_seen_begin &&
+ // the following is a case of no-B group: { p_1,p_2,...,p_k, g}
+ (rli->gaq->empty() ||
+ ((Slave_job_group *)
+ dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index))->
+ worker_id != (ulong) -1)))
{
ulong gaq_idx;
rli->mts_total_groups++;
@@ -2436,6 +2444,7 @@ Slave_worker *Log_event::get_slave_worke
group_relay_log_name == NULL);
DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
DBUG_ASSERT(rli->last_assigned_worker == NULL);
+
if (is_b_event)
{
Log_event *ptr_curr_ev= this;
@@ -2460,60 +2469,75 @@ Slave_worker *Log_event::get_slave_worke
List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
it++;
- if (num_dbs != OVER_MAX_DBS_IN_EVENT_MTS)
- {
- do
- {
- char **ref_cur_db= it.ref();
-
- if (!(rli->last_assigned_worker=
- get_slave_worker(*ref_cur_db, rli,
- &mts_assigned_partitions[i],
- get_type_code() == QUERY_EVENT)))
- {
- for (uint k= 0; k < rli->curr_group_da.elements; k++)
- {
- delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
- }
- return NULL;
- }
-
- DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
- DBUG_ASSERT(rli->last_assigned_worker ==
- mts_assigned_partitions[i]->worker);
- DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
-
- i++;
- } while (it++);
- }
- else
+ if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
{
// Temporary tables of Coordinator are relocated by Worker
if (!rli->last_assigned_worker)
rli->last_assigned_worker=
*(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
+
+ if (!rli->curr_group_isolated)
+ (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
+ rli->curr_group_isolated= TRUE;
}
- worker= rli->last_assigned_worker;
- get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
- if (g.worker_id == (ulong) -1) // assign "offically" the current group
+ do
{
- g.worker_id= worker->id;
- set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+ char **ref_cur_db= it.ref();
+
+ if (!(rli->last_assigned_worker=
+ get_slave_worker(*ref_cur_db, rli,
+ &mts_assigned_partitions[i],
+ get_type_code() == QUERY_EVENT)))
+ {
+ // destroy buffered events of the current group prior to exit
+ for (uint k= 0; k < rli->curr_group_da.elements; k++)
+ {
+ delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+ }
+ return NULL;
+ }
- DBUG_ASSERT(g.group_relay_log_name == NULL);
+ DBUG_ASSERT(!strcmp(mts_assigned_partitions[i]->db, *ref_cur_db));
+ DBUG_ASSERT(rli->last_assigned_worker ==
+ mts_assigned_partitions[i]->worker);
+ DBUG_ASSERT(mts_assigned_partitions[i]->usage > 0);
+
+ i++;
+ } while (it++);
+
+ worker= rli->last_assigned_worker;
+ if ((ptr_g= ((Slave_job_group *)
+ dynamic_array_ptr(&rli->gaq->Q,
+ rli->gaq->assigned_group_index)))-> worker_id
+ == (ulong) -1)
+ {
+ ptr_g->worker_id= worker->id;
+
+ DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
}
DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
+ /*
+ Either old master binlog (todo: assert), or a specific "corner"
+ case (todo: wrap with BEGIN/COMMIT on master anyway) of logging
+ like SELECT sf(), where sf() has a side effect.
+ */
+ if (!rli->curr_group_seen_begin)
+ rli->curr_group_is_parallel= TRUE;
+
// TODO: convert to C's private mem_root.
// Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
// point because the root is no longer needed along remained part of Coordinator's
// execution flow.
+
free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
- else // a mini-group internal "regular" event
+ else
+ {
+ // a mini-group internal "regular" event
if (rli->last_assigned_worker)
{
worker= rli->last_assigned_worker;
@@ -2527,30 +2551,36 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(get_type_code() == INTVAR_EVENT ||
get_type_code() == RAND_EVENT ||
get_type_code() == USER_VAR_EVENT ||
- get_type_code() == ROWS_QUERY_LOG_EVENT);
+ get_type_code() == ROWS_QUERY_LOG_EVENT ||
+ get_type_code() == BEGIN_LOAD_QUERY_EVENT);
insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
if (!rli->curr_group_seen_begin)
{
- // TODO: fix the master side to wrap with B/T cases like
- // `set @user_var, select f()' that are logged w/o B-event
- // Notice, while the select-f() can be mended in the current
- // master version, the old server binlogs can't brought to MTS because
- // of not following B|T-bracing rule for DML events.
- DBUG_ASSERT(0);
+ /*
+ This is a case of B/T-less group like
+ `set @user_var, select f()' that are logged w/o B-event.
+ Notice, while the select-f() can be mended in the current
+ master version, the old server binlogs can't since it bring in
+ the same B/T-less {p, g} group.
+ */
+ DBUG_ASSERT(rli->curr_group_da.elements > 0);
+ }
+ else
+ {
+ DBUG_ASSERT(rli->curr_group_da.elements > 1);
}
-
- DBUG_ASSERT(rli->curr_group_da.elements > 1);
+ return NULL;
}
+ }
// the group terminal event (Commit, Xid or a DDL query)
if (ends_group() || !rli->curr_group_seen_begin)
{
uint i;
mts_group_cnt= rli->gaq->assigned_group_index;
- Slave_job_group *ptr_g=
- (Slave_job_group *)
+ ptr_g= (Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
DBUG_ASSERT(rli->curr_group_is_parallel);
@@ -2798,7 +2828,7 @@ int Log_event::apply_event(Relay_log_inf
Slave_worker *w= NULL;
Slave_job_item item= {NULL}, *job_item= &item;
Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
- bool parallel, seq_event, term_event;
+ bool parallel= FALSE, async_event= FALSE, seq_event= FALSE, term_event= FALSE;
if (rli->is_mts_recovery())
{
@@ -2820,40 +2850,65 @@ int Log_event::apply_event(Relay_log_inf
}
if (!(parallel= rli->is_parallel_exec()) ||
- ((seq_event= mts_sequential_exec()) &&
- (!rli->curr_group_seen_begin ||
- mts_async_exec_by_coordinator(::server_id))))
+ (async_event= mts_async_exec_by_coordinator(::server_id)) ||
+ (seq_event= mts_sequential_exec()))
{
if (parallel)
{
/*
There are two classes of events that Coordinator executes
- itself. One requires all Workers to finish up their assignments.
- The other does not need (actually can not have) this synchronization.
+ itself. One e.g the master Rotate requires all Workers to finish up
+ their assignments. The other async class, e.g the slave Rotate,
+ can't have this such synchronization because Worker might be waiting
+ for terminal events to finish.
*/
- if (!mts_async_exec_by_coordinator(::server_id))
- {
+ if (!async_event)
+ {
/*
this event does not split the current group but is indeed
a separator beetwen two master's binlog therefore requiring
Workers to sync.
*/
- DBUG_ASSERT(!rli->curr_group_seen_begin);
+ if (rli->curr_group_da.elements > 0)
+ {
+ /*
+ Possible reason is a old version binlog sequential event
+ wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var.
+ MTS has to stop to suggest restart in the permanent sequential mode.
+ */
+
+ // TODO: improve err msg
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ ER(ER_SLAVE_FATAL_ERROR),
+ "Can't execute all binlog event in parallel mode");
+
+ // destroy possible buffered events of the current group prior to exit
+ for (uint k= 0; k < rli->curr_group_da.elements; k++)
+ {
+ delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+ }
+ DBUG_RETURN(-1);
+ }
/*
Marking sure the event won't be executed in parallel.
That affects memory deallocation in the following execution path.
*/
c_rli->curr_group_is_parallel= FALSE;
(void) wait_for_workers_to_finish(rli);
-
- /* any Worker is idle as done through wait_for_workers_to_finish */
- DBUG_ASSERT((*(Slave_worker **)
- dynamic_array_ptr(&c_rli->workers,
- rand() % c_rli->workers.elements))->
- usage_partition == 0);
+
+#ifndef DBUG_OFF
+ /* all Workers are idle as done through wait_for_workers_to_finish */
+ for (uint k= 0; k < c_rli->curr_group_da.elements; k++)
+ {
+ DBUG_ASSERT(!(*(Slave_worker **)
+ dynamic_array_ptr(&c_rli->workers, k))->usage_partition);
+ DBUG_ASSERT(!(*(Slave_worker **)
+ dynamic_array_ptr(&c_rli->workers, k))->jobs.len);
+ }
+#endif
}
else
{
@@ -2875,13 +2930,6 @@ int Log_event::apply_event(Relay_log_inf
DBUG_ASSERT(!(rli->curr_group_seen_begin && ends_group()) ||
rli->last_assigned_worker);
- if (seq_event)
- { // rli->last_assigned_worker != NULL if BTQ but not BQT
- DBUG_ASSERT(rli->curr_group_seen_begin || ends_group());
- if (!c_rli->curr_group_isolated)
- (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
- c_rli->curr_group_isolated= TRUE;
- }
// getting Worker's id
if ((!(w= get_slave_worker_id(c_rli)) ||
DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
@@ -2986,6 +3034,7 @@ int slave_worker_exec_job(Slave_worker *
struct slave_job_item item= {NULL}, *job_item= &item;
THD *thd= w->info_thd;
Log_event *ev= NULL;
+ bool part_event= FALSE;
DBUG_ENTER("slave_worker_exec_job");
@@ -3012,42 +3061,46 @@ int slave_worker_exec_job(Slave_worker *
}
else
{
- if (ev->contains_partition_info() &&
- ev->mts_number_dbs() < OVER_MAX_DBS_IN_EVENT_MTS)
+ if ((part_event= ev->contains_partition_info()))
{
- List_iterator<char> it(*ev->mts_get_dbs(thd->mem_root));
- DYNAMIC_ARRAY *ep= &(w->curr_group_exec_parts->dynamic_ids);
-
- while (it++)
+ uint num_dbs= ev->mts_number_dbs();
+ DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
+
+ if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
+ num_dbs= 1;
+
+ DBUG_ASSERT(num_dbs > 0);
+
+ for (uint k= 0; k < num_dbs; k++)
{
bool found= FALSE;
- char key[NAME_LEN + 2];
- const char *dbname= *it.ref();
- uchar dblength= (uint) strlen(dbname);
for (uint i= 0; i < ep->elements && !found; i++)
{
- get_dynamic(ep, (uchar*) key, i);
found=
- (key[0] == dblength) &&
- (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+ (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
+ ev->mts_assigned_partitions[k];
}
if (!found)
{
- key[0]= dblength;
- memcpy(key + 1, dbname, dblength + 1);
- insert_dynamic(ep, (uchar*) key);
+ insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
}
}
}
}
w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
error= ev->do_apply_event_worker(w);
-
- if (ev->ends_group() || !w->curr_group_seen_begin)
- {
- DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed %lu", ev->mts_group_cnt, w->last_group_done_index));
-
+ if (ev->ends_group() || (!w->curr_group_seen_begin &&
+ /*
+ p-events of B/T-less {p,g} group (see
+ legends of Log_event::get_slave_worker)
+ obviously can't commit.
+ */
+ part_event))
+ {
+ DBUG_PRINT("slave_worker_exec_job:",
+ (" commits GAQ index %lu, last committed %lu",
+ ev->mts_group_cnt, w->last_group_done_index));
w->slave_worker_ends_group(ev, error); /* last done sets post exec */
}
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-05-24 14:29:35 +0000
+++ b/sql/log_event.h 2011-05-27 21:29:14 +0000
@@ -1235,7 +1235,11 @@ public:
public:
/**
- MST: to execute serially due to technical or conceptual limitation
+ MST: to execute some event types serially.
+
+ @note There are incompatile combinations such the referred event
+ is wrapped with BEGIN/COMMIT. Such cases should be identified
+ by the caller and treates as an error.
@return TRUE if despite permanent parallel execution mode an event
needs applying in a real isolation that is sequentially.
@@ -1243,15 +1247,6 @@ public:
bool mts_sequential_exec()
{
return
- /*
- the 4 types below are limitly parallel-supported (the default
- session db not the actual db).
- Decision on BEGIN, COMMIT, Xid is the parallel.
- */
- (get_type_code() == QUERY_EVENT &&
- !starts_group() && !ends_group() &&
- (mts_number_dbs() == OVER_MAX_DBS_IN_EVENT_MTS)) ||
-
get_type_code() == START_EVENT_V3 ||
get_type_code() == STOP_EVENT ||
get_type_code() == ROTATE_EVENT ||
@@ -1262,12 +1257,8 @@ public:
get_type_code() == EXEC_LOAD_EVENT ||
get_type_code() == DELETE_FILE_EVENT ||
get_type_code() == NEW_LOAD_EVENT ||
+
get_type_code() == FORMAT_DESCRIPTION_EVENT||
- get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
- get_type_code() == EXECUTE_LOAD_QUERY_EVENT|| /* todo: make parallel */
- get_type_code() == PRE_GA_WRITE_ROWS_EVENT ||
- get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
- get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
get_type_code() == INCIDENT_EVENT;
}
@@ -1909,10 +1900,16 @@ public:
{
List<char> *res= new (mem_root) List<char>;
if (mts_accessed_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
- res->push_back((char*) get_db());
+ {
+ // "" == empty string db name is special to indicate sequential applying
+ mts_accessed_db_names[0][0]= 0;
+ res->push_back((char*) mts_accessed_db_names[0]);
+ }
else
+ {
for (uchar i= 0; i < mts_accessed_dbs; i++)
res->push_back(mts_accessed_db_names[i]);
+ }
return res;
}
@@ -3070,7 +3067,8 @@ public:
bool write(IO_CACHE* file);
const char* get_db() { return db; }
#endif
-
+ /* MTS executes this event sequentially */
+ virtual uchar mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2011-05-26 17:03:08 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-05-27 21:29:14 +0000
@@ -29,9 +29,9 @@ const char *info_slave_worker_fields []=
Slave_worker::Slave_worker(const char* type, const char* pfs,
Relay_log_info *rli)
- : Relay_log_info(FALSE), c_rli(rli), curr_group_exec_parts(0),
- checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
- inited_group_execed(0)
+ : Relay_log_info(FALSE), c_rli(rli),
+ checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
+ inited_group_execed(0), running_status(FALSE)
{
checkpoint_relay_log_name[0]= 0;
checkpoint_master_log_name[0]= 0;
@@ -39,8 +39,7 @@ Slave_worker::Slave_worker(const char* t
Slave_worker::~Slave_worker()
{
- if (curr_group_exec_parts)
- delete curr_group_exec_parts;
+ delete_dynamic(&curr_group_exec_parts);
if (inited_group_execed)
bitmap_free(&group_execed);
@@ -55,8 +54,8 @@ int Slave_worker::init_info()
if (inited)
DBUG_RETURN(0);
- if (!(curr_group_exec_parts= new Database_ids(NAME_LEN)))
- goto err;
+ my_init_dynamic_array(&curr_group_exec_parts, sizeof(db_worker_hash_entry*),
+ SLAVE_INIT_DBS_IN_GROUP, 1);
if (bitmap_init(&group_execed, NULL,
c_rli->checkpoint_group, FALSE))
@@ -273,7 +272,7 @@ static void free_entry(db_worker_hash_en
DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
- DBUG_ASSERT(entry->usage == 0);
+ DBUG_ASSERT(entry->usage == 0 || !entry->worker->running_status);
mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
entry->temporary_tables= NULL;
@@ -454,6 +453,8 @@ static void move_temp_tables_to_entry(TH
scheduled the event, and goes back into the parallel mode
@param dbname pointer to c-string containing database name
+ It can be empty string to indicate specific locking
+ to faciliate sequential applying.
@param rli pointer to Coordinators relay-log-info instance
@param ptr_entry reference to a pointer to the resulted entry in
the Assigne Partition Hash where
@@ -481,7 +482,6 @@ Slave_worker *get_slave_worker(const cha
my_hash_value_type hash_value;
uchar dblength= (uint) strlen(dbname);
- DBUG_ASSERT(dblength != 0);
// Search in CGAP
for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
@@ -545,11 +545,6 @@ Slave_worker *get_slave_worker(const cha
entry->worker= !rli->last_assigned_worker ?
get_least_occupied_worker(workers) : rli->last_assigned_worker;
entry->worker->usage_partition++;
- /*
- relocation belonging to db temp tables from C to W via entry
- */
- if (need_temp_tables)
- move_temp_tables_to_entry(thd, entry);
mysql_mutex_lock(&slave_worker_hash_lock);
@@ -574,8 +569,6 @@ Slave_worker *get_slave_worker(const cha
ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
- mysql_mutex_unlock(&slave_worker_hash_lock);
-
if (ret)
{
my_free(db);
@@ -634,26 +627,36 @@ Slave_worker *get_slave_worker(const cha
entry->usage= 1;
entry->worker->usage_partition++;
}
+ }
- if (entry->usage == 1 && need_temp_tables)
+ /*
+ relocation belonging to db temp tables from C to W via entry
+ */
+ if (entry->usage == 1 && need_temp_tables)
+ {
+ if (!entry->temporary_tables)
{
- if (!entry->temporary_tables)
+ if (entry->db_len != 0)
{
move_temp_tables_to_entry(thd, entry);
}
-#ifndef DBUG_OFF
else
{
- for (TABLE *table= thd->temporary_tables; table; table= table->next)
- {
- DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
- }
+ entry->temporary_tables= thd->temporary_tables;
+ thd->temporary_tables= NULL;
}
-#endif
}
-
- mysql_mutex_unlock(&slave_worker_hash_lock);
+#ifndef DBUG_OFF
+ else
+ {
+ for (TABLE *table= thd->temporary_tables; table; table= table->next)
+ {
+ DBUG_ASSERT(0 != strcmp(table->s->db.str, entry->db));
+ }
+ }
+#endif
}
+ mysql_mutex_unlock(&slave_worker_hash_lock);
DBUG_ASSERT(entry);
@@ -748,24 +751,15 @@ void Slave_worker::slave_worker_ends_gro
/*
Cleanup relating to the last executed group regardless of error.
*/
+ DYNAMIC_ARRAY *ep= &curr_group_exec_parts;
- for (int i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
+ for (int i= ep->elements; i > 0; i--)
{
- db_worker_hash_entry *entry= NULL;
- my_hash_value_type hash_value;
- char key[NAME_LEN + 2];
-
- get_dynamic(&(curr_group_exec_parts->dynamic_ids), (uchar*) key, i - 1);
- hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) key + 1, key[0]);
-
+ db_worker_hash_entry *entry=
+ (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i - 1));
mysql_mutex_lock(&slave_worker_hash_lock);
- entry= (db_worker_hash_entry *)
- my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
- (uchar*) key + 1, key[0]);
-
- DBUG_ASSERT(entry && entry->usage != 0); // was used to break
- DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
+ DBUG_ASSERT(entry && entry->usage != 0);
entry->usage--;
@@ -783,14 +777,19 @@ void Slave_worker::slave_worker_ends_gro
usage_partition--;
if (entry->worker != this) // Coordinator is waiting
+ {
+#ifndef DBUG_OFF
+ // TODO: open it! DBUG_ASSERT(usage_partition || !entry->worker->jobs.len);
+#endif
mysql_cond_signal(&slave_worker_hash_cond);
+ }
}
else
DBUG_ASSERT(usage_partition != 0);
mysql_mutex_unlock(&slave_worker_hash_lock);
- delete_dynamic_element(&(curr_group_exec_parts->dynamic_ids), i - 1);
+ delete_dynamic_element(ep, i - 1);
}
curr_group_seen_begin= FALSE;
}
@@ -1101,7 +1100,8 @@ int wait_for_workers_to_finish(Relay_log
thd->exit_cond(proc_info);
ret++;
- DBUG_ASSERT(entry->usage == 0 || thd->killed || rli->abort_slave);
+ DBUG_ASSERT((entry->usage == 0 && entry->worker) ||
+ thd->killed || rli->abort_slave);
}
else
{
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli_pdb.h 2011-05-27 21:29:14 +0000
@@ -226,7 +226,8 @@ public:
Relay_log_info *c_rli;
- Dynamic_ids *curr_group_exec_parts; // CGEP
+ DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
+
bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
// @c last_group_done_index is for recovery, although can be viewed
// as statistics as well.
@@ -266,6 +267,9 @@ public:
char checkpoint_master_log_name[FN_REFLEN];
ulonglong checkpoint_master_log_pos;
ulong checkpoint_seqno;
+ MY_BITMAP group_execed;
+ bool inited_group_execed;
+ volatile bool running_status; // TRUE when Worker is read-exec loop
int init_info();
void end_info();
@@ -281,10 +285,6 @@ public:
ATTRIBUTE_FORMAT(printf, 4, 5);
void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
- MY_BITMAP group_execed;
-
- bool inited_group_execed;
-
private:
bool read_info(Rpl_info_handler *from);
bool write_info(Rpl_info_handler *to);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-05-25 16:02:13 +0000
+++ b/sql/rpl_slave.cc 2011-05-27 21:29:14 +0000
@@ -3715,10 +3715,8 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_lock(&w->jobs_lock);
-
- DBUG_ASSERT(w->jobs.len == rli->mts_slave_worker_queue_len_max + 1);
- w->jobs.len= 0;
- mysql_cond_signal(&w->jobs_cond); // ready for duty
+ w->running_status= TRUE; // ready for duty
+ mysql_cond_signal(&w->jobs_cond);
mysql_mutex_unlock(&w->jobs_lock);
@@ -3729,13 +3727,13 @@ pthread_handler_t handle_slave_worker(vo
error= slave_worker_exec_job(w, rli);
}
+ w->cleanup_context(thd, error);
if (error)
{
mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn
mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
thd->clear_error();
- w->cleanup_context(thd, error);
}
mysql_mutex_lock(&w->jobs_lock);
@@ -3760,14 +3758,15 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_unlock(&rli->pending_jobs_lock);
mysql_mutex_lock(&w->jobs_lock);
- w->jobs.len= rli->mts_slave_worker_queue_len_max + 1;
+
+ w->running_status= 0;
sql_print_information("Worker %lu statistics: "
"events processed = %lu "
"hungry waits = %lu "
"priv queue overfills = %llu "
,w->id, w->stmt_jobs, w->wait_jobs, w->jobs.waited_overfill);
-
mysql_cond_signal(&w->jobs_cond); // famous last goodbye
+
mysql_mutex_unlock(&w->jobs_lock);
err:
@@ -4090,9 +4089,7 @@ int slave_start_single_worker(Relay_log_
goto err;
}
- // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
- // entry->usage assert
- w->curr_group_exec_parts->dynamic_ids.elements= 0;
+ w->curr_group_exec_parts.elements= 0;
w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
w->checkpoint_notified= FALSE;
w->workers= rli->workers; // shallow copying is sufficient
@@ -4103,18 +4100,17 @@ int slave_start_single_worker(Relay_log_
w->usage_partition= 0;
w->last_group_done_index= rli->gaq->s; // out of range
- w->jobs.s= rli->mts_slave_worker_queue_len_max;
- my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g n * 10;
+ w->jobs.a= 0;
+ w->jobs.len= 0;
+ w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
+ w->jobs.waited_overfill= 0;
+ w->jobs.e= w->jobs.s= rli->mts_slave_worker_queue_len_max;
+ my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0);
for (k= 0; k < w->jobs.s; k++)
insert_dynamic(&w->jobs.Q, (uchar*) &empty);
DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
- w->jobs.e= w->jobs.s;
- w->jobs.a= 0;
- w->jobs.len= rli->mts_slave_worker_queue_len_max + 1; // to first handshake
- w->jobs.overfill= FALSE; // todo: move into Slave_jobs_queue constructor
- w->jobs.waited_overfill= 0;
w->wq_overrun_set= FALSE;
set_dynamic(&rli->workers, (uchar*) &w, i);
mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
@@ -4131,7 +4127,7 @@ int slave_start_single_worker(Relay_log_
}
mysql_mutex_lock(&w->jobs_lock);
- if (w->jobs.len != 0)
+ if (!w->running_status)
mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
mysql_mutex_unlock(&w->jobs_lock);
// Least occupied inited with zero
@@ -4221,7 +4217,7 @@ err:
Workers are notified with setting KILLED status
and waited for their acknowledgment as specified by
- a "magic" (out-of-operational range) value of w->jobs.len.
+ worker's running_status.
*/
void slave_stop_workers(Relay_log_info *rli)
{
@@ -4238,7 +4234,7 @@ void slave_stop_workers(Relay_log_info *
mysql_mutex_lock(&w->jobs_lock);
- if (w->jobs.len == rli->mts_slave_worker_queue_len_max + 1)
+ if (!w->running_status)
{
mysql_mutex_unlock(&w->jobs_lock);
continue;
@@ -4259,7 +4255,7 @@ void slave_stop_workers(Relay_log_info *
get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
mysql_mutex_lock(&w->jobs_lock);
- while (w->jobs.len != rli->mts_slave_worker_queue_len_max + 1)
+ while (w->running_status)
{
const char *save_proc_info;
save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110527212914-k720id4iq6eh9ihb.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3283) WL#5569 | Andrei Elkin | 31 May |