#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3272 Andrei Elkin 2011-02-12
wl#5754 Query parallelization
intermediate commit with some cleanup after removing --mts-exp-query-in-parall
renamed:
mysql-test/suite/rpl/r/rpl_parallel_query.result => mysql-test/suite/rpl/r/rpl_parallel_temp_query.result
mysql-test/suite/rpl/t/rpl_parallel_query.test => mysql-test/suite/rpl/t/rpl_parallel_temp_query.test
modified:
mysql-test/r/mysqld--help-notwin.result
mysql-test/suite/rpl/t/rpl_packet.test
mysql-test/suite/sys_vars/r/all_vars.result
sql/binlog.cc
sql/log_event.cc
sql/log_event.h
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_slave.cc
sql/sql_base.cc
sql/sql_class.cc
sql/sql_class.h
sql/sys_vars.cc
=== modified file 'mysql-test/r/mysqld--help-notwin.result'
--- a/mysql-test/r/mysqld--help-notwin.result 2011-01-11 23:01:02 +0000
+++ b/mysql-test/r/mysqld--help-notwin.result 2011-02-12 08:32:05 +0000
@@ -362,9 +362,6 @@ The following options may be given as th
If enabled slave itself computes the event appying time
value to implicitly affected timestamp columms. Otherwise
(default) it installs prescribed by the master value
- --mts-exp-slave-run-query-in-parallel
- The default not an actual database name is used as
- partition info for parallel execution of Query_log_event
--mts-partition-hash-soft-max=#
Number of records in the mts partition hash below which
entries with zero usage are tolerated
@@ -910,7 +907,6 @@ mts-checkpoint-group 512
mts-checkpoint-period 300
mts-coordinator-basic-nap 5
mts-exp-slave-local-timestamp FALSE
-mts-exp-slave-run-query-in-parallel FALSE
mts-partition-hash-soft-max 16
mts-pending-jobs-size-max 16777216
mts-slave-parallel-workers 0
=== renamed file 'mysql-test/suite/rpl/r/rpl_parallel_query.result' => 'mysql-test/suite/rpl/r/rpl_parallel_temp_query.result'
=== modified file 'mysql-test/suite/rpl/t/rpl_packet.test'
--- a/mysql-test/suite/rpl/t/rpl_packet.test 2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_packet.test 2011-02-12 08:32:05 +0000
@@ -84,6 +84,11 @@ connection master;
INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
+--disable_query_log
+--disable_result_log
+select sleep(300);
+--enable_result_log
+--enable_query_log
#
# Bug#42914: The slave I/O thread must stop after trying to read the above
@@ -169,6 +174,7 @@ if (`SELECT NOT(@max_allowed_packet_0 =
#
connection slave;
+
START SLAVE;
--source include/wait_for_slave_to_start.inc
=== renamed file 'mysql-test/suite/rpl/t/rpl_parallel_query.test' => 'mysql-test/suite/rpl/t/rpl_parallel_temp_query.test'
=== modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
--- a/mysql-test/suite/sys_vars/r/all_vars.result 2010-12-27 18:54:41 +0000
+++ b/mysql-test/suite/sys_vars/r/all_vars.result 2011-02-12 08:32:05 +0000
@@ -14,9 +14,7 @@ left join t1 on variable_name=test_name
There should be *no* variables listed below:
INNODB_STATS_TRANSIENT_SAMPLE_PAGES
MTS_PARTITION_HASH_SOFT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
MTS_EXP_SLAVE_LOCAL_TIMESTAMP
-MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
INNODB_STATS_PERSISTENT_SAMPLE_PAGES
RELAY_LOG_BASENAME
LOG_BIN_BASENAME
@@ -25,18 +23,17 @@ INNODB_PRINT_ALL_DEADLOCKS
INNODB_RESET_MONITOR_COUNTER
MTS_SLAVE_PARALLEL_WORKERS
MTS_WORKER_UNDERRUN_LEVEL
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_RESET_ALL_MONITOR_COUNTER
LOG_BIN_INDEX
INNODB_DISABLE_MONITOR_COUNTER
INNODB_ENABLE_MONITOR_COUNTER
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_FILE_FORMAT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
MTS_COORDINATOR_BASIC_NAP
INNODB_STATS_TRANSIENT_SAMPLE_PAGES
MTS_PARTITION_HASH_SOFT_MAX
-MTS_PENDING_JOBS_SIZE_MAX
MTS_EXP_SLAVE_LOCAL_TIMESTAMP
-MTS_EXP_SLAVE_RUN_QUERY_IN_PARALLEL
INNODB_STATS_PERSISTENT_SAMPLE_PAGES
RELAY_LOG_BASENAME
LOG_BIN_BASENAME
@@ -45,12 +42,13 @@ INNODB_PRINT_ALL_DEADLOCKS
INNODB_RESET_MONITOR_COUNTER
MTS_SLAVE_PARALLEL_WORKERS
MTS_WORKER_UNDERRUN_LEVEL
-MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_RESET_ALL_MONITOR_COUNTER
LOG_BIN_INDEX
INNODB_DISABLE_MONITOR_COUNTER
INNODB_ENABLE_MONITOR_COUNTER
+MTS_SLAVE_WORKER_QUEUE_LEN_MAX
INNODB_FILE_FORMAT_MAX
+MTS_PENDING_JOBS_SIZE_MAX
MTS_COORDINATOR_BASIC_NAP
drop table t1;
drop table t2;
=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc 2011-02-03 09:28:27 +0000
+++ b/sql/binlog.cc 2011-02-12 08:32:05 +0000
@@ -4734,7 +4734,8 @@ int THD::decide_logging_format(TABLE_LIS
prev_write_table= table->table;
- if (variables.binlog_format != BINLOG_FORMAT_ROW)
+ if (variables.binlog_format != BINLOG_FORMAT_ROW &&
+ lex->sql_command != SQLCOM_END /* rows-event applying by slave */)
{
/*
Master side of the STMT format events parallelization.
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-02-03 09:28:27 +0000
+++ b/sql/log_event.cc 2011-02-12 08:32:05 +0000
@@ -2408,7 +2408,7 @@ bool Log_event::contains_partition_info(
r - a mini-group internal "regular" event that follows its g-parent
(Write, Update, Delete -rows)
S - sequentially applied event (may not be a part of any group).
- Events of this type are determined via @c only_sequential_exec()
+ Events of this type are determined via @c mts_sequential_exec()
earlier and don't cause calling this method .
T - terminator of the group (XID, COMMIT, ROLLBACK)
@@ -2490,7 +2490,9 @@ Slave_worker *Log_event::get_slave_worke
if (contains_partition_info())
{
List_iterator<char> it(*mts_get_dbs());
- while (it++)
+
+ it++;
+ do
{
char **ref_cur_db= it.ref();
// a lot of things inside `get_slave_worker_id'
@@ -2504,9 +2506,8 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(g.group_relay_log_name == NULL);
}
- }
+ } while (it++ && mts_number_dbs() != MAX_DBS_IN_QUERY_MTS + 1);
// releasing the Coord's mem-root from the updated dbs
- // if (get_type_code() == QUERY_EVENT)
free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
else // r
@@ -2537,6 +2538,8 @@ Slave_worker *Log_event::get_slave_worke
(Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+ if (!rli->curr_group_is_parallel)
+ sleep(1000);
DBUG_ASSERT(rli->curr_group_is_parallel);
// TODO: throw an error when relay-log reading starts from inside of a group!!
@@ -2790,53 +2793,50 @@ int Log_event::apply_event(Relay_log_inf
Slave_worker *w= NULL;
Slave_job_item item= {NULL}, *job_item= &item;
Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli); // constless alias
- bool parallel;
- bool seq_event;
- bool term_event;
+ bool parallel, seq_event, term_event;
+
+ if (rli->is_mts_recovery())
+ {
+ bool skip= _bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
+
+ if (ends_group()) // TODO: || ! seen_begin
+ {
+ c_rli->mts_recovery_index++;
+ if (--c_rli->mts_recovery_group_cnt == 0)
+ {
+ c_rli->recovery_parallel_workers= c_rli->slave_parallel_workers;
+ c_rli->mts_recovery_index= 0;
+ }
+ }
+ if (skip)
+ DBUG_RETURN(0);
+ else
+ DBUG_RETURN(do_apply_event(rli));
+ }
if (!(parallel= rli->is_parallel_exec()) ||
- rli->mts_recovery_group_cnt != 0 ||
- ((seq_event=
- only_sequential_exec(rli->run_query_in_parallel,
- rli->curr_group_seen_begin /* todo: obs 2nd arg */))
- // rli->curr_group_seen_begin && ends_group() => rli->last_assigned_worker
- && (!rli->curr_group_seen_begin || parallel_exec_by_coordinator(::server_id))))
+ ((seq_event= mts_sequential_exec()) &&
+ (!rli->curr_group_seen_begin ||
+ mts_async_exec_by_coordinator(::server_id))))
{
if (parallel)
{
- // This `only-sequential' case relates to a DDL Query case
- // or a group split apart by FD event
- DBUG_ASSERT(seq_event &&
- (rli->curr_group_da.elements == 0 || rli->curr_group_seen_begin));
-
- if (!parallel_exec_by_coordinator(::server_id))
+ if (!mts_async_exec_by_coordinator(::server_id))
{
+ // current isn't group split and therefore requires Workers to sync
DBUG_ASSERT(!rli->curr_group_seen_begin);
- c_rli->curr_group_is_parallel= FALSE; // Coord will destruct events
+ c_rli->curr_group_is_parallel= FALSE;
(void) wait_for_workers_to_finish(rli);
}
else
{
- c_rli->curr_event_is_not_in_group= TRUE;
- }
- }
- else if (rli->is_mts_recovery())
- {
- // recovery
- bool skip= _bitmap_is_set(&c_rli->recovery_groups, c_rli->mts_recovery_index);
-
- if (ends_group()) // todo: || rli->run_query_in_parallel && ! seen_begin
- {
- c_rli->mts_recovery_index++;
- if (--c_rli->mts_recovery_group_cnt == 0)
+ if (rli->curr_group_is_parallel)
{
- c_rli->recovery_parallel_workers= c_rli->slave_parallel_workers;
- c_rli->mts_recovery_index= 0;
+ c_rli->curr_group_split= TRUE;
+ c_rli->curr_group_is_parallel= FALSE;
}
}
- if (skip)
- DBUG_RETURN(0);
}
DBUG_RETURN(do_apply_event(rli));
}
@@ -2845,10 +2845,6 @@ int Log_event::apply_event(Relay_log_inf
rli->last_assigned_worker);
/*
- Work-around:s for B, T,..., Q case and ROWS_QUERY_LOG_EVENT
- A worker has been assigned but it needs sequential environment.
-
- Todo: support Query parallelization.
Todo: disassociate Rows_* events from the central rli.
*/
if (seq_event)
@@ -2865,9 +2861,10 @@ int Log_event::apply_event(Relay_log_inf
my_sleep(10);
}
c_rli->rows_query_ev= (Rows_query_log_event*) this;
- }
- }
+ }
+ }
+ // getting Worker's id
if ((!(w= get_slave_worker_id(rli)) ||
DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
@@ -3369,7 +3366,7 @@ bool Query_log_event::write(IO_CACHE* fi
}
}
- if (thd->get_binlog_updated_db_names() != NULL)
+ if (thd && thd->get_binlog_updated_db_names() != NULL)
{
uchar dbs;
*start++= Q_UPDATED_DB_NAMES;
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-02-03 09:28:27 +0000
+++ b/sql/log_event.h 2011-02-12 08:32:05 +0000
@@ -1089,6 +1089,8 @@ public:
return res;
}
+ virtual uchar mts_number_dbs() { return 1; }
+
#else
Log_event() : temp_buf(0) {}
/* avoid having to link mysqlbinlog against libpthread */
@@ -1210,30 +1212,27 @@ public:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
public:
+ virtual uchar mts_number_of_updated_dbs() { return 1; }
+
/**
MST: to execute serially due to technical or conceptual limitation
+
+ TODO: add version of the server check to fall back to seq in the OM case.
@return TRUE if despite permanent parallel execution mode an event
needs applying in a real isolation that is sequentially.
*/
- bool only_sequential_exec(bool query_in_parallel, bool group_term_in_parallel)
+ bool mts_sequential_exec()
{
return
- /*
+ /*
the 4 types below are limitly parallel-supported (the default
session db not the actual db).
- Decision on BEGIN is deferred till the following event.
- Decision on Commit or Xid is forced by the one for BEGIN.
+ Decision on BEGIN, COMMIT, Xid is the parallel.
*/
-
- (!query_in_parallel &&
- ((get_type_code() == QUERY_EVENT
- && !starts_group() && !ends_group()) ||
- get_type_code() == INTVAR_EVENT ||
- get_type_code() == USER_VAR_EVENT ||
- get_type_code() == RAND_EVENT)) ||
-
- (!group_term_in_parallel && ends_group()) ||
+ (get_type_code() == QUERY_EVENT &&
+ !starts_group() && !ends_group() &&
+ (mts_number_of_updated_dbs() == MAX_DBS_IN_QUERY_MTS + 1)) ||
get_type_code() == START_EVENT_V3 ||
get_type_code() == STOP_EVENT ||
@@ -1252,7 +1251,7 @@ public:
get_type_code() == PRE_GA_UPDATE_ROWS_EVENT||
get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
- get_type_code() == ROWS_QUERY_LOG_EVENT || /* todo: make parallel */
+ get_type_code() == ROWS_QUERY_LOG_EVENT || /* TODO: make parallel */
get_type_code() == INCIDENT_EVENT;
}
@@ -1263,7 +1262,7 @@ public:
@return TRUE if that's the case,
FALSE otherwise.
*/
- bool parallel_exec_by_coordinator(ulong slave_server_id)
+ bool mts_async_exec_by_coordinator(ulong slave_server_id)
{
return
(get_type_code() == FORMAT_DESCRIPTION_EVENT ||
@@ -1883,14 +1882,26 @@ public:
Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
bool using_trans, bool direct, bool suppress_use, int error);
const char* get_db() { return db; }
+
+ /**
+ Returns a default db in case of over-MAX_DBS_IN_QUERY_MTS actual db:s
+ */
virtual List<char>* mts_get_dbs()
{
- DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs < MAX_DBS_IN_QUERY_MTS + 1);
+ DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs <= MAX_DBS_IN_QUERY_MTS + 1);
List<char> *res= new List<char>;
- for (uchar i= 0; i < mts_updated_dbs; i++)
- res->push_back(mts_updated_db_names[i]);
+ if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
+ res->push_back((char*) get_db());
+ else
+ for (uchar i= 0; i < mts_updated_dbs; i++)
+ res->push_back(mts_updated_db_names[i]);
return res;
}
+
+ virtual uchar mts_number_dbs() { return mts_updated_dbs; }
+
+ virtual uchar mts_number_of_updated_dbs() { return mts_updated_dbs; }
+
#ifdef HAVE_REPLICATION
void pack_info(Protocol* protocol);
#endif /* HAVE_REPLICATION */
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2011-01-11 23:01:02 +0000
+++ b/sql/mysqld.cc 2011-02-12 08:32:05 +0000
@@ -468,7 +468,6 @@ ulonglong slave_type_conversions_options
ulong opt_mts_slave_parallel_workers;
ulong opt_mts_slave_worker_queue_len_max;
my_bool opt_mts_slave_local_timestamp;
-my_bool opt_mts_slave_run_query_in_parallel;
ulong opt_mts_partition_hash_soft_max;
ulonglong opt_mts_pending_jobs_size_max;
ulong opt_mts_coordinator_basic_nap;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2010-12-27 18:54:41 +0000
+++ b/sql/mysqld.h 2011-02-12 08:32:05 +0000
@@ -182,7 +182,6 @@ extern uint slave_net_timeout;
extern ulong opt_mts_slave_parallel_workers;
extern ulong opt_mts_slave_worker_queue_len_max;
extern my_bool opt_mts_slave_local_timestamp;
-extern my_bool opt_mts_slave_run_query_in_parallel;
extern ulong opt_mts_partition_hash_soft_max;
extern ulonglong opt_mts_pending_jobs_size_max;
extern ulong opt_mts_coordinator_basic_nap;
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-02-03 09:28:27 +0000
+++ b/sql/rpl_rli.cc 2011-02-12 08:32:05 +0000
@@ -73,7 +73,7 @@ Relay_log_info::Relay_log_info(bool is_s
this_worker(NULL), slave_parallel_workers(0),
recovery_parallel_workers(0),
checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0),
- mts_recovery_index(0), curr_event_is_not_in_group(0),
+ mts_recovery_index(0),
sql_delay(0), sql_delay_end(0), m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-01-20 13:39:00 +0000
+++ b/sql/rpl_rli.h 2011-02-12 08:32:05 +0000
@@ -448,7 +448,6 @@ public:
DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
DYNAMIC_ARRAY curr_group_da; // deferred array to hold partition-info-free events
bool curr_group_seen_begin; // current group started with B-event or not
- bool run_query_in_parallel; // Query's default db not the actual db as part
bool curr_group_isolated; // Trans is exec:d by Worker but in exclusive env
volatile ulong mts_wqs_underrun_w_id; // Id of a Worker whose queue is getting empty
volatile long mts_wqs_overrun; // W to incr and decr
@@ -459,7 +458,8 @@ public:
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
ulonglong mts_total_groups; // total event groups distributed in current session
- bool curr_group_is_parallel; // a mark for Coord to indicate on T-event of the curr group at delete
+ bool curr_group_is_parallel; // an event to process by Coordinator
+ bool curr_group_split; // an event split the current group forcing C to exec it
ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
ulong slave_parallel_workers; // the one slave session time number of workers
ulong recovery_parallel_workers; // number of workers while recovering.
@@ -475,7 +475,6 @@ public:
MY_BITMAP recovery_groups; // bitmap used during recovery.
ulong mts_recovery_group_cnt; // number of groups to execute at recovery
ulong mts_recovery_index; // running index of recoverable groups
- bool curr_event_is_not_in_group; // a special case of group split apart by FD
/*
temporary tables are held by Coordinator though are created and dropped
if explicilty by Workers. The following lock has to be taken by either party
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-02-03 09:28:27 +0000
+++ b/sql/rpl_slave.cc 2011-02-12 08:32:05 +0000
@@ -2847,17 +2847,8 @@ int apply_event_and_update_pos(Log_event
See sql/rpl_rli.h for further details.
*/
int error= 0;
- if (skip_event ||
- (!rli->is_parallel_exec() ||
- (!rli->curr_group_is_parallel)))
- {
- DBUG_ASSERT(skip_event || !rli->is_parallel_exec() ||
- (!rli->curr_group_is_parallel ||
- rli->curr_event_is_not_in_group) ||
- (ev->only_sequential_exec(rli->run_query_in_parallel,
- (rli->curr_group_seen_begin ||
- rli->last_assigned_worker != NULL))
- && !rli->curr_group_seen_begin));
+ if (skip_event || !rli->is_parallel_exec() || !rli->curr_group_is_parallel)
+ {
#ifndef DBUG_OFF
/*
This only prints information to the debug trace.
@@ -3056,40 +3047,37 @@ static int exec_relay_log_event(THD* thd
*/
// if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
- if ((!rli->is_parallel_exec() ||
- !rli->curr_group_is_parallel || rli->curr_event_is_not_in_group)
- && ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
+ if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
{
- DBUG_ASSERT(!rli->is_parallel_exec()
- ||
- (ev->only_sequential_exec(rli->run_query_in_parallel,
- // rli->curr_group_is_parallel
- (rli->curr_group_seen_begin ||
- rli->last_assigned_worker != NULL))
- && (!rli->curr_group_seen_begin ||
- ev->parallel_exec_by_coordinator(::server_id)))
- || (ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT));
- /* MTS: Observation/todo.
-
- ROWS_QUERY_LOG_EVENT could be supported easier if
- destructing part of handle_rows_query_log_event would be merged
- with rli->cleanup_context() and the rest move into
- ROWS...::do_apply_event
- */
-
- if (!rli->is_parallel_exec())
- if (thd->variables.binlog_rows_query_log_events)
- handle_rows_query_log_event(ev, rli);
+ DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
+ ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
- if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+ if (rli->curr_group_split) // an artifical FD requires some handling
+ {
+ rli->curr_group_is_parallel= TRUE;
+ rli->curr_group_split= FALSE;
+ }
+ if (ev->get_type_code() != FORMAT_DESCRIPTION_EVENT)
{
- DBUG_PRINT("info", ("Deleting the event after it has been executed"));
- delete ev;
- ev= NULL;
+ /* MTS/ TODO.
+
+ ROWS_QUERY_LOG_EVENT could be supported easier if
+ destructing part of handle_rows_query_log_event would be merged
+ with rli->cleanup_context() and the rest move into
+ ROWS...::do_apply_event
+ */
+ if (!rli->is_parallel_exec())
+ if (thd->variables.binlog_rows_query_log_events)
+ handle_rows_query_log_event(ev, rli);
+
+ if (ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+ {
+ DBUG_PRINT("info", ("Deleting the event after it has been executed"));
+ delete ev;
+ ev= NULL;
+ }
}
}
- if (rli->curr_event_is_not_in_group)
- rli->curr_event_is_not_in_group= FALSE;
}
/*
@@ -3953,7 +3941,6 @@ bool mts_recovery_groups(Relay_log_info
DBUG_ASSERT(ev->is_valid());
DBUG_ASSERT(rli->mts_recovery_group_cnt < rli->checkpoint_group);
- // TODO: relax condition to allow --mts_exp_run_query_in_parallel= 1
if (ev->starts_group())
curr_group_seen_begin= TRUE;
else
@@ -4351,12 +4338,11 @@ int slave_start_workers(Relay_log_info *
rli->mts_coordinator_basic_nap= ::opt_mts_coordinator_basic_nap;
rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
rli->mts_total_groups= 0;
- rli->curr_group_seen_begin= FALSE; // initial presumtion, will change
- rli->curr_group_is_parallel= FALSE; // initial presumtion, will change
+ rli->curr_group_seen_begin= FALSE;
+ rli->curr_group_is_parallel= FALSE;
rli->curr_group_isolated= FALSE;
- rli->run_query_in_parallel= opt_mts_slave_run_query_in_parallel;
+ rli->curr_group_split= FALSE;
rli->checkpoint_seqno= 0;
- rli->curr_event_is_not_in_group= FALSE;
//rli->worker_bitmap_buf= my_malloc(n/8 + 1,MYF(MY_WME));
for (i= 0; i < n; i++)
=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc 2011-01-20 13:39:00 +0000
+++ b/sql/sql_base.cc 2011-02-12 08:32:05 +0000
@@ -1197,26 +1197,25 @@ bool close_cached_connection_tables(THD
static void mark_temp_tables_as_free_for_reuse(THD *thd)
{
- TABLE *temporary_tables;
+#ifndef EMBEDDED_LIBRARY
bool mts_slave= mts_is_coord_or_worker(thd);
-
+ TABLE *temporary_tables= mts_slave ?
+ mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
if (mts_slave)
- {
- temporary_tables= mts_get_coordinator_thd()->temporary_tables;
mysql_mutex_lock(mts_get_temp_table_mutex());
- }
- else
- {
- temporary_tables= thd->temporary_tables;
- }
+#else
+ TABLE *temporary_tables= thd->temporary_tables;
+#endif
for (TABLE *table= temporary_tables; table ; table=table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
- mysql_mutex_unlock(mts_get_temp_table_mutex());
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
}
@@ -1607,24 +1606,26 @@ bool close_temporary_tables(THD *thd)
/* Assume thd->variables.option_bits has OPTION_QUOTE_SHOW_CREATE */
bool was_quote_show= TRUE;
bool error= 0;
+ TABLE **ptr_temporary_tables;
+#ifndef EMBEDDED_LIBRARY
bool mts_slave= mts_is_coord_or_worker(thd);
- TABLE *temporary_tables, **ptr_temporary_tables;
-
+ TABLE *temporary_tables= mts_slave ?
+ mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+
if (mts_slave)
- {
- temporary_tables= mts_get_coordinator_thd()->temporary_tables;
mysql_mutex_lock(mts_get_temp_table_mutex());
- }
- else
- {
- temporary_tables= thd->temporary_tables;
- }
+#else
+ TABLE *temporary_tables= thd->temporary_tables;
+#endif
+
ptr_temporary_tables= &temporary_tables;
if (!temporary_tables)
{
- if (mts_slave)
- mysql_mutex_unlock(mts_get_temp_table_mutex());
+#ifndef EMBEDDED_LIBRARY
+ if (mts_slave)
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+#endif
DBUG_RETURN(FALSE);
}
@@ -1637,8 +1638,11 @@ bool close_temporary_tables(THD *thd)
close_temporary(table, 1, 1);
}
*ptr_temporary_tables= 0;
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
+
DBUG_RETURN(FALSE);
}
@@ -1772,9 +1776,10 @@ bool close_temporary_tables(THD *thd)
thd->variables.option_bits&= ~OPTION_QUOTE_SHOW_CREATE; /* restore option */
*ptr_temporary_tables= 0;
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
-
+#endif
DBUG_RETURN(error);
}
@@ -2067,18 +2072,16 @@ TABLE *find_temporary_table(THD *thd,
const char *table_key,
uint table_key_length)
{
- TABLE *table= NULL, *temporary_tables;
+ TABLE *table= NULL;
+#ifndef EMBEDDED_LIBRARY
bool mts_slave= mts_is_coord_or_worker(thd);
+ TABLE *temporary_tables= mts_slave ?
+ mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
if (mts_slave)
- {
- temporary_tables= mts_get_coordinator_thd()->temporary_tables;
- mysql_mutex_lock(mts_get_temp_table_mutex());
- }
- else
- {
- temporary_tables= thd->temporary_tables;
- }
-
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+#else
+ TABLE *temporary_tables= thd->temporary_tables;
+#endif
for (table= temporary_tables; table; table= table->next)
{
if (table->s->table_cache_key.length == table_key_length &&
@@ -2087,10 +2090,10 @@ TABLE *find_temporary_table(THD *thd,
break;
}
}
-
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
-
+#endif
return table;
}
@@ -2129,7 +2132,9 @@ TABLE *find_temporary_table(THD *thd,
int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
{
TABLE *table;
+#ifndef EMBEDDED_LIBRARY
bool mts_slave= mts_is_coord_or_worker(thd);
+#endif
THD *thd_temp;
DBUG_ENTER("drop_temporary_table");
@@ -2155,20 +2160,24 @@ int drop_temporary_table(THD *thd, TABLE
*/
mysql_lock_remove(thd, thd->lock, table);
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
{
thd_temp= mts_get_coordinator_thd();
mysql_mutex_lock(mts_get_temp_table_mutex());
}
else
+#endif
{
thd_temp= thd;
}
close_temporary_table(thd_temp, table, 1, 1);
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
DBUG_RETURN(0);
}
@@ -2706,17 +2715,15 @@ bool open_table(THD *thd, TABLE_LIST *ta
if (table_list->open_type != OT_BASE_ONLY &&
! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
{
+#ifndef EMBEDDED_LIBRARY
bool mts_slave= mts_is_coord_or_worker(thd);
- TABLE *temporary_tables;
+ TABLE *temporary_tables= mts_slave ?
+ mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
if (mts_slave)
- {
- temporary_tables= mts_get_coordinator_thd()->temporary_tables;
mysql_mutex_lock(mts_get_temp_table_mutex());
- }
- else
- {
- temporary_tables= thd->temporary_tables;
- }
+#else
+ TABLE *temporary_tables= thd->temporary_tables;
+#endif
for (table= temporary_tables; table ; table=table->next)
{
@@ -2738,20 +2745,26 @@ bool open_table(THD *thd, TABLE_LIST *ta
(ulong) table->query_id, (uint) thd->server_id,
(ulong) thd->variables.pseudo_thread_id));
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias);
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
DBUG_RETURN(TRUE);
}
table->query_id= thd->query_id;
thd->thread_specific_used= TRUE;
DBUG_PRINT("info",("Using temporary table"));
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
goto reset;
}
}
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
}
if (table_list->open_type == OT_TEMPORARY_ONLY ||
@@ -5944,19 +5957,16 @@ TABLE *open_table_uncached(THD *thd, con
if (add_to_temporary_tables_list)
{
+#ifndef EMBEDDED_LIBRARY
TABLE **ptr_temporary_tables;
bool mts_slave= mts_is_coord_or_worker(thd);
-
+ ptr_temporary_tables= mts_slave?
+ &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
if (mts_slave)
- {
- ptr_temporary_tables= &mts_get_coordinator_thd()->temporary_tables;
mysql_mutex_lock(mts_get_temp_table_mutex());
- }
- else
- {
- ptr_temporary_tables= &thd->temporary_tables;
- }
-
+#else
+ TABLE **ptr_temporary_tables= &thd->temporary_tables;
+#endif
/* growing temp list at the head */
tmp_table->next= *ptr_temporary_tables;
if (tmp_table->next)
@@ -5965,8 +5975,10 @@ TABLE *open_table_uncached(THD *thd, con
(*ptr_temporary_tables)->prev= 0;
if (thd->slave_thread)
slave_open_temp_tables++;
+#ifndef EMBEDDED_LIBRARY
if (mts_slave)
mysql_mutex_unlock(mts_get_temp_table_mutex());
+#endif
}
tmp_table->pos_in_table_list= 0;
DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,
=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc 2011-02-03 09:28:27 +0000
+++ b/sql/sql_class.cc 2011-02-12 08:32:05 +0000
@@ -3394,6 +3394,7 @@ void THD::reset_sub_statement_state(Sub_
first_successful_insert_id_in_prev_stmt;
backup->first_successful_insert_id_in_cur_stmt=
first_successful_insert_id_in_cur_stmt;
+ backup->binlog_updated_db_names= binlog_updated_db_names;
if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
!is_current_stmt_binlog_format_row())
@@ -3414,6 +3415,7 @@ void THD::reset_sub_statement_state(Sub_
cuted_fields= 0;
transaction.savepoints= 0;
first_successful_insert_id_in_cur_stmt= 0;
+ binlog_updated_db_names= NULL;
}
@@ -3476,6 +3478,7 @@ void THD::restore_sub_statement_state(Su
*/
examined_row_count+= backup->examined_row_count;
cuted_fields+= backup->cuted_fields;
+ binlog_updated_db_names= backup->binlog_updated_db_names;
DBUG_VOID_RETURN;
}
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h 2011-02-03 09:28:27 +0000
+++ b/sql/sql_class.h 2011-02-12 08:32:05 +0000
@@ -1184,6 +1184,7 @@ public:
bool last_insert_id_used;
SAVEPOINT *savepoints;
enum enum_check_fields count_cuted_fields;
+ List<char> *binlog_updated_db_names;
};
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2011-01-11 23:01:02 +0000
+++ b/sql/sys_vars.cc 2011-02-12 08:32:05 +0000
@@ -3198,12 +3198,6 @@ static Sys_var_mybool Sys_slave_local_ti
"time value to implicitly affected timestamp columms. Otherwise (default) "
"it installs prescribed by the master value",
GLOBAL_VAR(opt_mts_slave_local_timestamp), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
-static Sys_var_mybool Sys_slave_run_query_in_parallel(
- "mts_exp_slave_run_query_in_parallel",
- "The default not an actual database name is used as partition info "
- "for parallel execution of Query_log_event ",
- GLOBAL_VAR(opt_mts_slave_run_query_in_parallel), CMD_LINE(OPT_ARG),
- DEFAULT(FALSE));
static Sys_var_ulong Sys_mts_partition_hash_soft_max(
"mts_partition_hash_soft_max",
"Number of records in the mts partition hash below which "
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110212083205-a48u8h9mjqe1jejb.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3272) WL#5754 | Andrei Elkin | 12 Feb |