#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3285 Andrei Elkin 2011-06-12
wl#5569 MTS
I. Addressing a segfault in sp-binlogging stack that dealt with gathering accessed db:s.
Fixed with specializing the main memroot of THD to hold the list of db:s. Also
an optimization is done in not letting the gathering routine to execute if the actual binlog
format is not STATEMENT.
Rules of resetting the main memroot at the end of the top level query suffice, also
replication-rooted footprint looks quite tollerable (16 max db:s).
Using this chance, the remained issues of cleanup and some code simpifications
as requested by a review mail are addressed too.
II. Fixes motivated by failing tests and the genaral cleanup.
1. Implementing exit with an error for a case MTS restarts from a position in relay-log
that does not correspond to a beginning of a group or a pattern of events sequence in
the group is not considered to be valid e.g (very) old master binlog file.
Such occurance could be detected only in a while after restart still not later than
finding a terminating event for the group.
2. STOP SLAVE, KILL Query `sql_thread`. Acceptance of the exit status by Coordinator is refined
in sql_slave_killed() in line with wl#5569 requirements.
Also graceful STOP SLAVE required changes in rpl_rli_pdb.cc
@ mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
left the test be in rpl yet still be useful in gathering benchmark statistics
with removal of created files in the beginning not the end of the test.
So the created files can be saved by MTR called in between of two successive runs.
Decreased stress parameters to make default time run small.
@ mysql-test/suite/rpl/r/rpl_parallel_benchmark.result
results updated.
@ mysql-test/suite/rpl/r/rpl_slave_grp_exec.result
results updated.
@ mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt
added necessary to MTS option.
@ mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test
comments improved.
@ mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
Making the test to run in STS mode for a while with explanatory comments.
@ sql/binlog.cc
simpifications of alg of accessed db:s gathering.
@ sql/events.cc
simpifications of alg of accessed db:s gathering.
@ sql/log_event.cc
Further clean up in Log_event::apply_event() and Log_event::get_worker_id().
Error branches in the former are merged to perform necessary common actions.
@ sql/rpl_rli.h
comments added.
@ sql/rpl_rli_pdb.cc
refining wait_for_workers_to_finish() is caused by a awake-wait loop of
the terminating thread so Coordiantor could be awakened not by Worker.
In that case it resumes to waiting.
@ sql/sp.cc
cleanup of alg of accessed db:s gathering.
@ sql/sql_acl.cc
deploying accessed db:s gathering for GRANT use cases.
It's necessary to carry db:s mentioned in objects of the grant operations.
@ sql/sql_class.h
simpifications of alg of accessed db:s gathering.
@ sql/sql_db.cc
simpifications of alg of accessed db:s gathering.
@ sql/sql_rename.cc
simpifications of alg of accessed db:s gathering.
@ sql/sql_table.cc
simpifications of alg of accessed db:s gathering.
@ sql/sql_trigger.cc
simpifications of alg of accessed db:s gathering.
@ sql/sql_view.cc
simpifications of alg of accessed db:s gathering.
added:
mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt
modified:
mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
mysql-test/suite/rpl/r/rpl_parallel_benchmark.result
mysql-test/suite/rpl/r/rpl_slave_grp_exec.result
mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test
mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
sql/binlog.cc
sql/events.cc
sql/log_event.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_slave.cc
sql/sp.cc
sql/sql_acl.cc
sql/sql_class.h
sql/sql_db.cc
sql/sql_rename.cc
sql/sql_table.cc
sql/sql_trigger.cc
sql/sql_view.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test 2011-06-05 17:01:51 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test 2011-06-12 17:36:17 +0000
@@ -6,13 +6,13 @@
# load volume parameter
#
-let $iter= 08;
-let $tables= 4;
-let $wk_i_queries= 4;
+let $iter= 02;
+let $tables= 2;
+let $wk_i_queries= 2;
let $wk_m_queries= 0;
let $nk_i_queries= 0;
let $nk_m_queries= 0;
-let $pre_inserted_rows= 200;
+let $pre_inserted_rows= 50;
connection slave;
@@ -264,14 +264,22 @@ let $wait_timeout= 600;
let $wait_condition= SELECT count(*)+sleep(1) = 5 FROM test1.benchmark;
source include/wait_condition.inc;
+
+let $MYSQLD_DATADIR= `select @@datadir`;
+
+# cleanup for files that could not be removed in the end of previous invocation.
+--remove_files_wildcard $MYSQLD_DATADIR *.out
+
use test;
-select * from test1.benchmark into outfile 'benchmark.out';
+--replace_result $MYSQLD_DATADIR MYSQLD_DATADIR
+eval select * from test1.benchmark into outfile '$MYSQLD_DATADIR/benchmark.out';
select ts from test1.benchmark where state like 'master started load' into @m_0;
select ts from test1.benchmark where state like 'master ends load' into @m_1;
select ts from test1.benchmark where state like 'slave takes on load' into @s_0;
select ts from test1.benchmark where state like 'slave ends load' into @s_1;
-select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m',
- time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
+--replace_result $MYSQLD_DATADIR MYSQLD_DATADIR
+eval select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m',
+ time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile '$MYSQLD_DATADIR/delta.out';
--enable_result_log
--enable_query_log
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel_benchmark.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_benchmark.result 2011-04-02 11:32:02 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_benchmark.result 2011-06-12 17:36:17 +0000
@@ -6,12 +6,12 @@ include/stop_slave.inc
start slave;
stop slave sql_thread;
use test;
-select * from test1.benchmark into outfile 'benchmark.out';
+select * from test1.benchmark into outfile 'MYSQLD_DATADIR/benchmark.out';
select ts from test1.benchmark where state like 'master started load' into @m_0;
select ts from test1.benchmark where state like 'master ends load' into @m_1;
select ts from test1.benchmark where state like 'slave takes on load' into @s_0;
select ts from test1.benchmark where state like 'slave ends load' into @s_1;
select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m',
-time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
+time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'MYSQLD_DATADIR/delta.out';
set @@global.mts_exp_slave_local_timestamp= @save.mts_exp_slave_local_timestamp;
include/rpl_end.inc
=== modified file 'mysql-test/suite/rpl/r/rpl_slave_grp_exec.result'
--- a/mysql-test/suite/rpl/r/rpl_slave_grp_exec.result 2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/r/rpl_slave_grp_exec.result 2011-06-12 17:36:17 +0000
@@ -29,7 +29,7 @@ a b
SELECT * FROM t3 ORDER BY a;
a b
1 ZZ
-include/wait_for_slave_sql_error.inc [errno=1146]
+include/wait_for_slave_sql_to_stop.inc
SHOW TABLES LIKE 't%';
Tables_in_test (t%)
t1
@@ -57,7 +57,7 @@ INSERT INTO t3 VALUES(2, 'B');
INSERT INTO t2 VALUES(2, 'B');
INSERT INTO t1 VALUES(2, 'B');
UPDATE t1 SET b = 'X' WHERE a = 2;
-include/wait_for_slave_sql_error.inc [errno=1146]
+include/wait_for_slave_sql_to_stop.inc
SELECT * FROM t1 ORDER BY a;
a b
2 X
@@ -93,7 +93,7 @@ INSERT INTO t1 VALUES (3, 'C'), (4, 'D')
INSERT INTO t2 VALUES (3, 'C'), (4, 'D');
INSERT INTO t3 VALUES (3, 'C'), (4, 'D');
COMMIT;
-include/wait_for_slave_sql_error.inc [errno=1146]
+include/wait_for_slave_sql_to_stop.inc
SELECT * FROM t1 ORDER BY a;
a b
3 C
=== added file 'mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master-slave.opt 2011-06-12 17:36:17 +0000
@@ -0,0 +1 @@
+--slave-transaction-retries=0
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test 2011-06-10 08:04:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_seconds_behind_master.test 2011-06-12 17:36:17 +0000
@@ -5,7 +5,7 @@
# The number is either @@global.slave_checkpoint_group or less if
# @@global.slave_checkpoint_period timer elapses first.
# The value updates *after* the last group commit is executed.
-# Resetting to zero behaviour when Slave goes to read events is
+# Resetting to zero behavior when Slave goes to read events is
# preserved.
#
=== modified file 'mysql-test/suite/rpl/t/rpl_slave_grp_exec.test'
--- a/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test 2011-06-08 20:18:08 +0000
+++ b/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test 2011-06-12 17:36:17 +0000
@@ -23,6 +23,12 @@
--source include/master-slave.inc
--echo
+# Test is MTS unfriendly because of
+# a. incompatible with STS error reporting (MTS stop due to an error in applying
+# causes inconsistency so the latter is reported)
+# b. failing recovery
+-- source include/not_mts_slave_parallel_workers.inc
+
# Create tables and data
--echo *** Preparing data ***
--connection master
=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc 2011-06-08 20:18:08 +0000
+++ b/sql/binlog.cc 2011-06-12 17:36:17 +0000
@@ -4537,16 +4537,21 @@ THD::binlog_set_pending_rows_event(Rows_
/**
@param db db name c-string to be inserted into abc-sorted
THD::binlog_accessed_db_names list.
-
- Note, as the list node data (explicitly) so the node
- struct itself (implicitly) are allocated in
- thd->mem_root to be cleared at the end of the query
- processing (@c THD::cleanup_after_query()).
+
+ Note, as the list node data so the node
+ struct itself are allocated in THD::main_mem_root.
+ The list lasts for the top-level query time and resets
+ in @c THD::cleanup_after_query() and Query_log_event::write().
*/
void
-THD::add_to_binlog_updated_dbs(const char *db)
+THD::add_to_binlog_accessed_dbs(const char *db)
{
char *after_db;
+ MEM_ROOT *db_mem_root= &main_mem_root;
+
+ if (!binlog_accessed_db_names)
+ binlog_accessed_db_names= new (db_mem_root) List<char>;
+
if (binlog_accessed_db_names->elements > MAX_DBS_IN_EVENT_MTS)
{
push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_WARN,
@@ -4556,7 +4561,13 @@ THD::add_to_binlog_updated_dbs(const cha
return;
}
- after_db= strdup_root(mem_root, db);
+ after_db= strdup_root(db_mem_root, db);
+
+ /*
+ sorted insertion is implemented with first rearranging data
+ (pointer to char*) of the links and final appending of the least
+ ordered data to create a new link in the list.
+ */
if (binlog_accessed_db_names->elements != 0)
{
List_iterator<char> it(*get_binlog_accessed_db_names());
@@ -4583,7 +4594,7 @@ THD::add_to_binlog_updated_dbs(const cha
}
}
if (after_db)
- binlog_accessed_db_names->push_back(after_db);
+ binlog_accessed_db_names->push_back(after_db, &main_mem_root);
}
@@ -4810,27 +4821,6 @@ int THD::decide_logging_format(TABLE_LIS
prev_access_table= table->table;
}
-
- /*
- Master side of DML in the STMT format events parallelization.
- All involving table db:s are stored in a abc-ordered name list.
- In case the number of databases exceeds MAX_DBS_IN_EVENT_MTS maximum
- the list gathering breaks since it won't be sent to the slave.
- */
- if (is_write && variables.binlog_format != BINLOG_FORMAT_ROW &&
- lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
- {
- if (!binlog_accessed_db_names)
- {
- binlog_accessed_db_names= new List<char>; /* thd->mem_root is used */
- }
- for (TABLE_LIST *table= tables; table; table= table->next_global)
- {
- if (table->placeholder())
- continue;
- add_to_binlog_updated_dbs(table->db);
- }
- }
DBUG_PRINT("info", ("flags_write_all_set: 0x%llx", flags_write_all_set));
DBUG_PRINT("info", ("flags_write_some_set: 0x%llx", flags_write_some_set));
@@ -4965,6 +4955,23 @@ int THD::decide_logging_format(TABLE_LIS
DBUG_PRINT("info", ("decision: no logging since an error was generated"));
DBUG_RETURN(-1);
}
+
+ if (is_write && !is_current_stmt_binlog_format_row() &&
+ lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
+ {
+ /*
+ Master side of DML in the STMT format events parallelization.
+ All involving table db:s are stored in a abc-ordered name list.
+ In case the number of databases exceeds MAX_DBS_IN_EVENT_MTS maximum
+ the list gathering breaks since it won't be sent to the slave.
+ */
+ for (TABLE_LIST *table= tables; table; table= table->next_global)
+ {
+ if (table->placeholder())
+ continue;
+ add_to_binlog_accessed_dbs(table->db);
+ }
+ }
DBUG_PRINT("info", ("decision: logging in %s format",
is_current_stmt_binlog_format_row() ?
"ROW" : "STATEMENT"));
=== modified file 'sql/events.cc'
--- a/sql/events.cc 2011-05-24 14:29:35 +0000
+++ b/sql/events.cc 2011-06-12 17:36:17 +0000
@@ -384,7 +384,7 @@ Events::create_event(THD *thd, Event_par
}
else
{
- thd->add_one_db_to_binlog_updated_dbs(parse_data->dbname.str);
+ thd->add_to_binlog_accessed_dbs(parse_data->dbname.str);
/* If the definer is not set or set to CURRENT_USER, the value of CURRENT_USER
will be written into the binary log as the definer for the SQL thread. */
ret= write_bin_log(thd, TRUE, log_query.c_ptr(), log_query.length());
@@ -504,10 +504,9 @@ Events::update_event(THD *thd, Event_par
/* Binlog the alter event. */
DBUG_ASSERT(thd->query() && thd->query_length());
- thd->set_binlog_accessed_db_names(new List<char>);
- thd->add_to_binlog_updated_dbs(parse_data->dbname.str);
+ thd->add_to_binlog_accessed_dbs(parse_data->dbname.str);
if (new_dbname)
- thd->add_to_binlog_updated_dbs(new_dbname->str);
+ thd->add_to_binlog_accessed_dbs(new_dbname->str);
ret= write_bin_log(thd, TRUE, thd->query(), thd->query_length());
}
@@ -576,7 +575,7 @@ Events::drop_event(THD *thd, LEX_STRING
/* Binlog the drop event. */
DBUG_ASSERT(thd->query() && thd->query_length());
- thd->add_one_db_to_binlog_updated_dbs(dbname.str);
+ thd->add_to_binlog_accessed_dbs(dbname.str);
ret= write_bin_log(thd, TRUE, thd->query(), thd->query_length());
}
/* Restore the state of binlog format */
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-06-10 08:04:00 +0000
+++ b/sql/log_event.cc 2011-06-12 17:36:17 +0000
@@ -2408,6 +2408,7 @@ Slave_worker *Log_event::get_slave_worke
bool is_b_event;
int num_dbs= 0;
Slave_worker *ret_worker= NULL;
+ THD *thd= rli->info_thd;
/* checking partioning properties and perform corresponding actions */
@@ -2469,7 +2470,7 @@ Slave_worker *Log_event::get_slave_worke
{
int i= 0;
num_dbs= mts_number_dbs();
- List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
+ List_iterator<char> it(*mts_get_dbs(thd->mem_root));
it++;
ret_worker= rli->last_assigned_worker;
@@ -2491,12 +2492,10 @@ Slave_worker *Log_event::get_slave_worke
get_type_code() == QUERY_EVENT,
ret_worker)))
{
- // destroy buffered events of the current group prior to exit
- for (uint k= 0; k < rli->curr_group_da.elements; k++)
- {
- delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
- }
-
+ rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
+ ER(ER_MTS_CANT_PARALLEL),
+ get_type_str(), rli->get_event_relay_log_name(),
+ rli->get_event_relay_log_pos());
return ret_worker;
}
@@ -2519,13 +2518,13 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
- // TODO: convert to C's private mem_root.
+ // TODO: convert to C's private mem_root to reset not per event but rather realrely.
// Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
// point because the root is no longer needed along remained part of Coordinator's
// execution flow.
- free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
+ free_root(thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
else
{
@@ -2541,12 +2540,21 @@ Slave_worker *Log_event::get_slave_worke
{
Log_event *ptr_curr_ev= this;
- DBUG_ASSERT(get_type_code() == INTVAR_EVENT ||
- get_type_code() == RAND_EVENT ||
- get_type_code() == USER_VAR_EVENT ||
- get_type_code() == ROWS_QUERY_LOG_EVENT ||
- get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
- get_type_code() == APPEND_BLOCK_EVENT);
+ if (!(get_type_code() == INTVAR_EVENT ||
+ get_type_code() == RAND_EVENT ||
+ get_type_code() == USER_VAR_EVENT ||
+ get_type_code() == ROWS_QUERY_LOG_EVENT ||
+ get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
+ get_type_code() == APPEND_BLOCK_EVENT))
+ {
+ DBUG_ASSERT(!ret_worker);
+
+ rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
+ ER(ER_MTS_CANT_PARALLEL),
+ get_type_str(), rli->get_event_relay_log_name(),
+ rli->get_event_relay_log_pos());
+ return ret_worker;
+ }
insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
@@ -2822,14 +2830,17 @@ void append_item_to_jobs(slave_job_item
can't be decided yet. In the single threaded sequential mode the
event maps to SQL thread rli.
+ @note in case of MTS failure Coordinator destroys all gathered
+ deferred events.
+
@return 0 as success, otherwise a failure.
*/
int Log_event::apply_event(Relay_log_info const *rli)
{
DBUG_ENTER("LOG_EVENT:apply_event");
- Slave_worker *w= NULL;
Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
bool parallel= FALSE, async_event= FALSE, seq_event= FALSE;
+ THD *thd= c_rli->info_thd;
worker= c_rli;
@@ -2884,19 +2895,13 @@ int Log_event::apply_event(Relay_log_inf
*/
rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
ER(ER_MTS_CANT_PARALLEL),
- get_type_code(), c_rli->get_event_relay_log_name(),
+ get_type_str(), c_rli->get_event_relay_log_name(),
c_rli->get_event_relay_log_pos());
/* Coordinator cant continue, it marks MTS group status accordingly */
c_rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
- /* destroy deferred events */
- for (uint k= 0; k < rli->curr_group_da.elements; k++)
- {
- delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
- }
-
- DBUG_RETURN(-1);
+ goto err;
}
/*
Marking sure the event will be executed in sequential mode.
@@ -2929,13 +2934,28 @@ int Log_event::apply_event(Relay_log_inf
worker= NULL;
c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
- c_rli->last_assigned_worker= w= get_slave_worker_id(c_rli);
- if (!w || DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
- DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
+ worker= (Relay_log_info*)
+ (c_rli->last_assigned_worker= get_slave_worker_id(c_rli));
- worker= (Relay_log_info*) w;
+err:
+ if (thd->is_error())
+ {
+ DBUG_ASSERT(!worker);
+
+ // destroy buffered events of the current group prior to exit
+ for (uint k= 0; k < rli->curr_group_da.elements; k++)
+ {
+ delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
+ }
+ }
+ else
+ {
+ DBUG_ASSERT(worker || rli->curr_group_assigned_parts.elements == 0);
+ }
- DBUG_RETURN(FALSE);
+ DBUG_RETURN((!thd->is_error() ||
+ DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)) ?
+ 0 : -1);
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-06-10 08:04:00 +0000
+++ b/sql/rpl_rli.h 2011-06-12 17:36:17 +0000
@@ -493,7 +493,8 @@ public:
includes Single-Threaded-Slave case.
*/
MTS_NOT_IN_GROUP,
- MTS_IN_GROUP, /* at least one event was scheduled to a Worker */
+
+ MTS_IN_GROUP, /* at least one not-terminal event scheduled to a Worker */
MTS_END_GROUP, /* the last scheduled event is a terminal event */
MTS_KILLED_GROUP /* Coordinator gave out to reach MTS_END_GROUP */
} mts_group_status;
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2011-06-08 20:18:08 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-06-12 17:36:17 +0000
@@ -726,6 +726,8 @@ Slave_worker *get_least_occupied_worker(
void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
{
+ DBUG_ENTER("Slave_worker::slave_worker_ends_group");
+
if (!error)
{
ulong gaq_idx= ev->mts_group_cnt;
@@ -798,6 +800,9 @@ void Slave_worker::slave_worker_ends_gro
#ifndef DBUG_OFF
// TODO: open it! DBUG_ASSERT(usage_partition || !entry->worker->jobs.len);
#endif
+ DBUG_PRINT("info",
+ ("Notifying entry %p release by worker %lu", entry, this->id));
+
mysql_cond_signal(&slave_worker_hash_cond);
}
}
@@ -828,6 +833,8 @@ void Slave_worker::slave_worker_ends_gro
mysql_mutex_unlock(&c_rli->info_thd->LOCK_thd_data);
mysql_mutex_unlock(&slave_worker_hash_lock);
}
+
+ DBUG_VOID_RETURN;
}
@@ -1106,6 +1113,9 @@ int wait_for_workers_to_finish(Relay_log
THD *thd= rli->info_thd;
const char info_format[]=
"Waiting for Slave Worker %d to release partition `%s`";
+
+ DBUG_ENTER("wait_for_workers_to_finish");
+
for (uint i= 0, ret= 0; i < hash->records; i++)
{
db_worker_hash_entry *entry;
@@ -1128,16 +1138,22 @@ int wait_for_workers_to_finish(Relay_log
if (entry->usage > 0 && !thd->killed)
{
+ long w_id= entry->worker->id;
sprintf(wait_info, info_format, entry->worker->id, entry->db);
entry->worker= NULL; // mark Worker to signal when usage drops to 0
-
- proc_info= thd->enter_cond(&slave_worker_hash_cond, &slave_worker_hash_lock,
- wait_info);
- mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+ do
+ {
+ proc_info= thd->enter_cond(&slave_worker_hash_cond,
+ &slave_worker_hash_lock,
+ wait_info);
+ mysql_cond_wait(&slave_worker_hash_cond, &slave_worker_hash_lock);
+ DBUG_PRINT("info",
+ ("Either got awakened of notified: "
+ "entry %p, usage %lu, worker %lu",
+ entry, entry->usage, w_id));
+ } while (entry->usage != 0 && !thd->killed);
thd->exit_cond(proc_info);
ret++;
-
- DBUG_ASSERT(entry->usage == 0 || thd->killed);
}
else
{
@@ -1151,5 +1167,5 @@ int wait_for_workers_to_finish(Relay_log
if (!ignore)
const_cast<Relay_log_info*>(rli)->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
- return ret;
+ DBUG_RETURN(ret);
}
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-06-10 08:04:00 +0000
+++ b/sql/rpl_slave.cc 2011-06-12 17:36:17 +0000
@@ -1082,6 +1082,8 @@ static bool io_slave_killed(THD* thd, Ma
static bool sql_slave_killed(THD* thd, Relay_log_info* rli)
{
bool ret= FALSE;
+ bool is_parallel_group= FALSE;
+
DBUG_ENTER("sql_slave_killed");
DBUG_ASSERT(rli->info_thd == thd);
@@ -1096,10 +1098,12 @@ static bool sql_slave_killed(THD* thd, R
as well.
Example: OPTION_KEEP_LOG is set if a temporary table is created or dropped.
*/
- if ((!rli->is_parallel_exec() &&
+ if ((is_parallel_group= (rli->is_parallel_exec() &&
+ rli->mts_group_status == Relay_log_info::MTS_IN_GROUP))
+ ||
+ (!rli->is_parallel_exec() &&
(thd->transaction.all.modified_non_trans_table ||
- (thd->variables.option_bits & OPTION_KEEP_LOG)) && rli->is_in_group())
- || (rli->mts_group_status == Relay_log_info::MTS_IN_GROUP))
+ (thd->variables.option_bits & OPTION_KEEP_LOG)) && rli->is_in_group()))
{
char msg_stopped[]=
"... The slave SQL is stopped, leaving the current group "
@@ -1145,7 +1149,7 @@ static bool sql_slave_killed(THD* thd, R
if (ret == 0)
{
rli->report(WARNING_LEVEL, 0,
- rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
+ !is_parallel_group ?
"slave SQL thread is being stopped in the middle "
"of applying of a group having updated a non-transaction "
"table; waiting for the group completion ... "
@@ -1158,16 +1162,14 @@ static bool sql_slave_killed(THD* thd, R
{
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
ER(ER_SLAVE_FATAL_ERROR),
- rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
- msg_stopped : msg_stopped_mts);
+ !is_parallel_group ? msg_stopped : msg_stopped_mts);
}
}
else
{
ret= TRUE;
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
- rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
- msg_stopped : msg_stopped_mts);
+ !is_parallel_group ? msg_stopped : msg_stopped_mts);
}
}
else
=== modified file 'sql/sp.cc'
--- a/sql/sp.cc 2011-02-27 17:35:25 +0000
+++ b/sql/sp.cc 2011-06-12 17:36:17 +0000
@@ -1150,7 +1150,7 @@ sp_create_routine(THD *thd, int type, sp
}
/* restore sql_mode when binloging */
thd->variables.sql_mode= saved_mode;
- thd->add_one_db_to_binlog_updated_dbs(sp->m_db.str);
+ thd->add_to_binlog_accessed_dbs(sp->m_db.str);
/* Such a statement can always go directly to binlog, no trans cache */
if (thd->binlog_query(THD::STMT_QUERY_TYPE,
log_query.c_ptr(), log_query.length(),
@@ -1224,7 +1224,7 @@ sp_drop_routine(THD *thd, int type, sp_n
if (ret == SP_OK)
{
- thd->add_one_db_to_binlog_updated_dbs(name->m_db.str);
+ thd->add_to_binlog_accessed_dbs(name->m_db.str);
if (write_bin_log(thd, TRUE, thd->query(), thd->query_length()))
ret= SP_INTERNAL_ERROR;
sp_cache_invalidate();
=== modified file 'sql/sql_acl.cc'
--- a/sql/sql_acl.cc 2010-12-17 11:28:59 +0000
+++ b/sql/sql_acl.cc 2011-06-12 17:36:17 +0000
@@ -3687,6 +3687,7 @@ int mysql_table_grant(THD *thd, TABLE_LI
should_write_to_binlog= TRUE;
db_name= table_list->get_db_name();
+ thd->add_to_binlog_accessed_dbs(db_name); // collecting db:s for MTS
table_name= table_list->get_table_name();
/* Find/create cached table grant */
@@ -3910,8 +3911,9 @@ bool mysql_routine_grant(THD *thd, TABLE
}
db_name= table_list->db;
+ if (write_to_binlog)
+ thd->add_to_binlog_accessed_dbs(db_name);
table_name= table_list->table_name;
-
grant_name= routine_hash_search(Str->host.str, NullS, db_name,
Str->user.str, table_name, is_proc, 1);
if (!grant_name)
@@ -4098,6 +4100,7 @@ bool mysql_grant(THD *thd, const char *d
my_error(ER_WRONG_USAGE, MYF(0), "DB GRANT", "GLOBAL PRIVILEGES");
result= -1;
}
+ thd->add_to_binlog_accessed_dbs(db);
}
else if (is_proxy)
{
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h 2011-05-24 14:29:35 +0000
+++ b/sql/sql_class.h 2011-06-12 17:36:17 +0000
@@ -1744,31 +1744,13 @@ public:
}
/*
- MTS: initializer of binlog_accessed_db_names list
- */
- void set_binlog_accessed_db_names(List<char>* arg)
- {
- binlog_accessed_db_names= arg;
- }
-
- /*
MTS: resetter of binlog_accessed_db_names list normally
at the end of the query execution
*/
void clear_binlog_accessed_db_names() { binlog_accessed_db_names= NULL; }
/* MTS: method inserts a new unique name into binlog_updated_dbs */
- void add_to_binlog_updated_dbs(const char *db);
-
- /*
- MTS: method shortcuts initialization and insertion of just one db name
- into binlog_updated_dbs
- */
- void add_one_db_to_binlog_updated_dbs(const char *db)
- {
- set_binlog_accessed_db_names(new List<char>);
- binlog_accessed_db_names->push_back(strdup_root(mem_root, db));
- }
+ void add_to_binlog_accessed_dbs(const char *db);
#endif /* MYSQL_CLIENT */
=== modified file 'sql/sql_db.cc'
--- a/sql/sql_db.cc 2011-02-27 17:35:25 +0000
+++ b/sql/sql_db.cc 2011-06-12 17:36:17 +0000
@@ -660,7 +660,7 @@ not_silent:
*/
qinfo.db = db;
qinfo.db_len = strlen(db);
- thd->add_one_db_to_binlog_updated_dbs(db);
+ thd->add_to_binlog_accessed_dbs(db);
/*
These DDL methods and logging are protected with the exclusive
metadata lock on the schema
@@ -964,7 +964,7 @@ update_binlog:
if (query_pos != query_data_start)
{
- thd->add_one_db_to_binlog_updated_dbs(db);
+ thd->add_to_binlog_accessed_dbs(db);
/*
These DDL methods and logging are protected with the exclusive
metadata lock on the schema.
=== modified file 'sql/sql_rename.cc'
--- a/sql/sql_rename.cc 2011-05-24 14:29:35 +0000
+++ b/sql/sql_rename.cc 2011-06-12 17:36:17 +0000
@@ -318,12 +318,8 @@ do_rename(THD *thd, TABLE_LIST *ren_tabl
break;
}
- if (!thd->get_binlog_accessed_db_names())
- {
- thd->set_binlog_accessed_db_names(new List<char>);
- }
- thd->add_to_binlog_updated_dbs(ren_table->db);
- thd->add_to_binlog_updated_dbs(new_db);
+ thd->add_to_binlog_accessed_dbs(ren_table->db);
+ thd->add_to_binlog_accessed_dbs(new_db);
if (rc && !skip_error)
DBUG_RETURN(1);
=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc 2011-05-24 14:29:35 +0000
+++ b/sql/sql_table.cc 2011-06-12 17:36:17 +0000
@@ -2236,12 +2236,7 @@ int mysql_rm_table_no_locks(THD *thd, TA
find_temporary_table(thd, table) &&
table->mdl_request.ticket != NULL));
- /* MTS: similarly to decide_logging_format() gathering of the db names */
- if (!thd->get_binlog_accessed_db_names())
- {
- thd->set_binlog_accessed_db_names(new List<char>);
- }
- thd->add_to_binlog_updated_dbs(table->db);
+ thd->add_to_binlog_accessed_dbs(table->db);
/*
drop_temporary_table may return one of the following error codes:
@@ -4570,7 +4565,7 @@ bool mysql_create_table(THD *thd, TABLE_
(thd->is_current_stmt_binlog_format_row() &&
!(create_info->options & HA_LEX_CREATE_TMP_TABLE))))
{
- thd->add_one_db_to_binlog_updated_dbs(create_table->db);
+ thd->add_to_binlog_accessed_dbs(create_table->db);
result= write_bin_log(thd, TRUE, thd->query(), thd->query_length(), is_trans);
}
@@ -5953,13 +5948,9 @@ bool mysql_alter_table(THD *thd,char *ne
if (!new_db || !my_strcasecmp(table_alias_charset, new_db, db))
new_db= db;
- if (!thd->get_binlog_accessed_db_names())
- {
- thd->set_binlog_accessed_db_names(new List<char>);
- }
- thd->add_to_binlog_updated_dbs(db);
+ thd->add_to_binlog_accessed_dbs(db);
if (new_db != db)
- thd->add_to_binlog_updated_dbs(new_db);
+ thd->add_to_binlog_accessed_dbs(new_db);
build_table_filename(reg_path, sizeof(reg_path) - 1, db, table_name, reg_ext, 0);
build_table_filename(path, sizeof(path) - 1, db, table_name, "", 0);
=== modified file 'sql/sql_trigger.cc'
--- a/sql/sql_trigger.cc 2011-02-27 17:35:25 +0000
+++ b/sql/sql_trigger.cc 2011-06-12 17:36:17 +0000
@@ -522,7 +522,7 @@ end:
if (!result)
{
if (tables)
- thd->add_one_db_to_binlog_updated_dbs(tables->db);
+ thd->add_to_binlog_accessed_dbs(tables->db);
result= write_bin_log(thd, TRUE, stmt_query.ptr(), stmt_query.length());
}
=== modified file 'sql/sql_view.cc'
--- a/sql/sql_view.cc 2011-05-24 14:29:35 +0000
+++ b/sql/sql_view.cc 2011-06-12 17:36:17 +0000
@@ -689,7 +689,7 @@ bool mysql_create_view(THD *thd, TABLE_L
buff.append(views->source.str, views->source.length);
int errcode= query_error_code(thd, TRUE);
- thd->add_one_db_to_binlog_updated_dbs(views->db);
+ thd->add_to_binlog_accessed_dbs(views->db);
if (thd->binlog_query(THD::STMT_QUERY_TYPE,
buff.ptr(), buff.length(), FALSE, FALSE, FALSE, errcode))
res= TRUE;
@@ -1683,11 +1683,7 @@ bool mysql_drop_view(THD *thd, TABLE_LIS
}
continue;
}
- if (!thd->get_binlog_accessed_db_names())
- {
- thd->set_binlog_accessed_db_names(new List<char>);
- }
- thd->add_to_binlog_updated_dbs(view->db);
+ thd->add_to_binlog_accessed_dbs(view->db);
if (mysql_file_delete(key_file_frm, path, MYF(MY_WME)))
error= TRUE;
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110612173617-mfocuapqm7mb9n5b.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3285) WL#5569 | Andrei Elkin | 13 Jun |