#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3270 Andrei Elkin 2011-01-20
WL#5754 Query event parallel execution
This is a patch skeletoning parallel applying of Queries containing a temporary table.
A new specialized test proves stability and data consistency.
@ mysql-test/suite/rpl/r/rpl_parallel_query.result
a new results file
@ sql/field.cc
relaxing an assert (todo: add to it more specific claim field->table is temp).
@ sql/handler.cc
relaxing an assert (todo: add to it more specific claim table is temp).
@ sql/rpl_rli.cc
a new temp table mutex init, destroy and a set of helper functions providing access
to C,W's thd:s in arbitrary place of the server code are added.
@ sql/rpl_rli.h
a new temp table mutex is added to RLI class.
@ sql/rpl_slave.cc
SLAVE_THD_WORKER appeared to be redundant. Worker's thd->system_thread is set to the same
as the Coordinator thread constant.
@ sql/sql_base.cc
replacing refs to thd->temporary with an appropriate one corresponding to the Coord's thd->t_t:s.
Also surrounding critical sections of codes dealing with opening, finding, closing or changing
temproray_tables' list with a specific mutex lock/unlock.
added:
mysql-test/suite/rpl/r/rpl_parallel_query.result
modified:
sql/field.cc
sql/handler.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_slave.cc
sql/sql_base.cc
=== added file 'mysql-test/suite/rpl/r/rpl_parallel_query.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel_query.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel_query.result 2011-01-20 13:39:00 +0000
@@ -0,0 +1,30 @@
+include/master-slave.inc
+[connection master]
+call mtr.add_suppression('Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statement accesses nontransactional table as well as transactional or temporary table.*');
+include/stop_slave.inc
+set @save.mts_slave_parallel_workers= @@global.mts_slave_parallel_workers;
+set @@global.mts_slave_parallel_workers= 2;
+include/start_slave.inc
+Warnings:
+Note 1726 Temporary failed transaction retry is not supported in Parallel Slave. Such failure will force the slave to stop.
+create database d1;
+use d1;
+create table d1.t1 (a int auto_increment primary key, b int) engine=innodb;
+create temporary table tt_1 (a int auto_increment primary key);
+insert into tt_1 values (null);
+insert into tt_1 values (null);
+insert into d1.t1 (b) select count(*) from tt_1;
+create database d2;
+use d2;
+create table d2.t1 (a int auto_increment primary key, b int) engine=innodb;
+create temporary table tt_1 (a int auto_increment primary key);
+insert into tt_1 values (null);
+insert into tt_1 values (null);
+insert into d2.t1 (b) select count(*) from tt_1;
+include/diff_tables.inc [master:d2.t1, slave:d2.t1]
+include/diff_tables.inc [master:d1.t1, slave:d1.t1]
+drop temporary table tt_1;
+drop database d1;
+drop temporary table tt_1;
+include/stop_slave.inc
+set @@global.mts_slave_parallel_workers= @save.mts_slave_parallel_workers;
=== modified file 'sql/field.cc'
--- a/sql/field.cc 2010-12-29 00:38:59 +0000
+++ b/sql/field.cc 2011-01-20 13:39:00 +0000
@@ -3736,7 +3736,7 @@ longlong Field_long::val_int(void)
ASSERT_COLUMN_MARKED_FOR_READ;
int32 j;
/* See the comment in Field_long::store(long long) */
- DBUG_ASSERT(table->in_use == current_thd);
+ DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
#ifdef WORDS_BIGENDIAN
if (table->s->db_low_byte_first)
j=sint4korr(ptr);
@@ -6309,7 +6309,7 @@ int Field_string::store(const char *from
const char *from_end_pos;
/* See the comment for Field_long::store(long long) */
- DBUG_ASSERT(table->in_use == current_thd);
+ DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
copy_length= well_formed_copy_nchars(field_charset,
(char*) ptr, field_length,
@@ -6459,7 +6459,7 @@ String *Field_string::val_str(String *va
{
ASSERT_COLUMN_MARKED_FOR_READ;
/* See the comment for Field_long::store(long long) */
- DBUG_ASSERT(table->in_use == current_thd);
+ DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
uint length;
if (table->in_use->variables.sql_mode &
MODE_PAD_CHAR_TO_FULL_LENGTH)
=== modified file 'sql/handler.cc'
--- a/sql/handler.cc 2011-01-11 11:45:02 +0000
+++ b/sql/handler.cc 2011-01-20 13:39:00 +0000
@@ -2127,7 +2127,8 @@ void **handler::ha_data(THD *thd) const
THD *handler::ha_thd(void) const
{
- DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd);
+ DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
+ current_thd->slave_thread /* mts worker */);
return (table && table->in_use) ? table->in_use : current_thd;
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-27 18:54:41 +0000
+++ b/sql/rpl_rli.cc 2011-01-20 13:39:00 +0000
@@ -52,6 +52,7 @@ static PSI_cond_info *worker_conds= NULL
PSI_mutex_key *key_mutex_slave_parallel_worker= NULL;
PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
+PSI_mutex_key key_mutex_mts_temp_tables_lock;
PSI_cond_key *key_cond_slave_parallel_worker= NULL;
PSI_cond_key key_cond_slave_parallel_pend_jobs;
@@ -130,6 +131,8 @@ void Relay_log_info::init_workers(ulong
mysql_mutex_init(key_mutex_slave_parallel_pend_jobs, &pending_jobs_lock,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_cond_slave_parallel_pend_jobs, &pending_jobs_cond, NULL);
+ mysql_mutex_init(key_mutex_mts_temp_tables_lock, &mts_temp_tables_lock,
+ MY_MUTEX_INIT_FAST);
my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
}
@@ -140,6 +143,7 @@ void Relay_log_info::deinit_workers()
{
mysql_mutex_destroy(&pending_jobs_lock);
mysql_cond_destroy(&pending_jobs_cond);
+ mysql_mutex_destroy(&mts_temp_tables_lock);
if (!this_worker)
delete_dynamic(&workers);
@@ -198,6 +202,36 @@ void Relay_log_info::reset_notified_chec
checkpoint_seqno= 0;
}
+mysql_mutex_t* mts_get_temp_table_mutex()
+{
+ return &active_mi->rli->mts_temp_tables_lock;
+}
+
+THD* mts_get_coordinator_thd()
+{
+ Slave_worker *w;
+ return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+ NULL : !(w= active_mi->rli->get_current_worker()) ?
+ NULL : w->c_rli->info_thd;
+}
+
+THD* mts_get_worker_thd()
+{
+ Slave_worker *w;
+ return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+ NULL : !(w= active_mi->rli->get_current_worker()) ?
+ NULL : w->w_rli->info_thd;
+}
+
+bool mts_is_coord_or_worker(THD *thd)
+{
+ return
+ thd->slave_thread &&
+ thd->system_thread != SYSTEM_THREAD_SLAVE_IO &&
+ (mts_get_worker_thd() != NULL);
+}
+
+
/**
The method can be run both by C having the Main (coord) rli context and
by W having both the main and the Local (worker) rli context.
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-01-11 22:54:12 +0000
+++ b/sql/rpl_rli.h 2011-01-20 13:39:00 +0000
@@ -476,6 +476,15 @@ public:
ulong mts_recovery_group_cnt; // number of groups to execute at recovery
ulong mts_recovery_index; // running index of recoverable groups
bool curr_event_is_not_in_group; // a special case of group split apart by FD
+ /*
+ temporary tables are held by Coordinator though are created and dropped
+ if explicilty by Workers. The following lock has to be taken by either party
+ in order to conduct any operation in the temp tables placeholder, incl.
+ find, drop, create, open.
+ */
+ mysql_mutex_t mts_temp_tables_lock;
+
+
/* most of allocation in the coordinator rli is there */
void init_workers(ulong);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-01-11 22:54:12 +0000
+++ b/sql/rpl_slave.cc 2011-01-20 13:39:00 +0000
@@ -143,7 +143,7 @@ failed read"
};
-typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_WORKER, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
+typedef enum { SLAVE_THD_IO, SLAVE_THD_SQL, SLAVE_THD_CHECKPOINT } SLAVE_THD_TYPE;
static int process_io_rotate(Master_info* mi, Rotate_log_event* rev);
static int process_io_create_file(Master_info* mi, Create_file_log_event* cev);
@@ -3730,14 +3730,13 @@ pthread_handler_t handle_slave_worker(vo
sql_print_error("Failed during slave worker initialization");
goto err;
}
-
w->info_thd= thd;
w->w_rli->info_thd= thd;
thd->thread_stack = (char*)&thd;
pthread_detach_this_thread();
- if (init_slave_thread(thd, SLAVE_THD_WORKER))
+ if (init_slave_thread(thd, SLAVE_THD_SQL))
{
// todo make SQL thread killed
sql_print_error("Failed during slave worker initialization");
=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc 2011-01-11 11:45:02 +0000
+++ b/sql/sql_base.cc 2011-01-20 13:39:00 +0000
@@ -58,6 +58,11 @@
#include <io.h>
#endif
+// MTS temp table support
+extern THD* mts_get_coordinator_thd();
+extern THD* mts_get_worker_thd();
+extern mysql_mutex_t* mts_get_temp_table_mutex();
+extern bool mts_is_coord_or_worker(THD *thd);
bool
No_such_table_error_handler::handle_condition(THD *,
@@ -1192,11 +1197,26 @@ bool close_cached_connection_tables(THD
static void mark_temp_tables_as_free_for_reuse(THD *thd)
{
- for (TABLE *table= thd->temporary_tables ; table ; table= table->next)
+ TABLE *temporary_tables;
+ bool mts_slave= mts_is_coord_or_worker(thd);
+
+ if (mts_slave)
+ {
+ temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+ }
+ else
+ {
+ temporary_tables= thd->temporary_tables;
+ }
+
+ for (TABLE *table= temporary_tables; table ; table=table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
}
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
}
@@ -1587,19 +1607,38 @@ bool close_temporary_tables(THD *thd)
/* Assume thd->variables.option_bits has OPTION_QUOTE_SHOW_CREATE */
bool was_quote_show= TRUE;
bool error= 0;
+ bool mts_slave= mts_is_coord_or_worker(thd);
+ TABLE *temporary_tables, **ptr_temporary_tables;
+
+ if (mts_slave)
+ {
+ temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+ }
+ else
+ {
+ temporary_tables= thd->temporary_tables;
+ }
+ ptr_temporary_tables= &temporary_tables;
- if (!thd->temporary_tables)
+ if (!temporary_tables)
+ {
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
DBUG_RETURN(FALSE);
+ }
if (!mysql_bin_log.is_open())
{
TABLE *tmp_next;
- for (table= thd->temporary_tables; table; table= tmp_next)
+ for (table= temporary_tables; table; table= tmp_next)
{
tmp_next= table->next;
close_temporary(table, 1, 1);
}
- thd->temporary_tables= 0;
+ *ptr_temporary_tables= 0;
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
DBUG_RETURN(FALSE);
}
@@ -1617,7 +1656,7 @@ bool close_temporary_tables(THD *thd)
of sublists of equal pseudo_thread_id
*/
- for (prev_table= thd->temporary_tables, table= prev_table->next;
+ for (prev_table= temporary_tables, table= prev_table->next;
table;
prev_table= table, table= table->next)
{
@@ -1626,7 +1665,7 @@ bool close_temporary_tables(THD *thd)
{
if (!found_user_tables)
found_user_tables= true;
- for (prev_sorted= NULL, sorted= thd->temporary_tables; sorted != table;
+ for (prev_sorted= NULL, sorted= temporary_tables; sorted != table;
prev_sorted= sorted, sorted= sorted->next)
{
if (!is_user_table(sorted) ||
@@ -1641,7 +1680,7 @@ bool close_temporary_tables(THD *thd)
}
else
{
- thd->temporary_tables= table;
+ *ptr_temporary_tables= table;
}
table= prev_table;
break;
@@ -1658,7 +1697,7 @@ bool close_temporary_tables(THD *thd)
}
/* scan sorted tmps to generate sequence of DROP */
- for (table= thd->temporary_tables; table; table= next)
+ for (table= temporary_tables; table; table= next)
{
if (is_user_table(table))
{
@@ -1731,7 +1770,10 @@ bool close_temporary_tables(THD *thd)
}
if (!was_quote_show)
thd->variables.option_bits&= ~OPTION_QUOTE_SHOW_CREATE; /* restore option */
- thd->temporary_tables=0;
+ *ptr_temporary_tables= 0;
+
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
DBUG_RETURN(error);
}
@@ -2025,16 +2067,31 @@ TABLE *find_temporary_table(THD *thd,
const char *table_key,
uint table_key_length)
{
- for (TABLE *table= thd->temporary_tables; table; table= table->next)
+ TABLE *table= NULL, *temporary_tables;
+ bool mts_slave= mts_is_coord_or_worker(thd);
+ if (mts_slave)
+ {
+ temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+ }
+ else
+ {
+ temporary_tables= thd->temporary_tables;
+ }
+
+ for (table= temporary_tables; table; table= table->next)
{
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
{
- return table;
+ break;
}
}
- return NULL;
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
+
+ return table;
}
@@ -2072,6 +2129,9 @@ TABLE *find_temporary_table(THD *thd,
int drop_temporary_table(THD *thd, TABLE_LIST *table_list, bool *is_trans)
{
TABLE *table;
+ bool mts_slave= mts_is_coord_or_worker(thd);
+ THD *thd_temp;
+
DBUG_ENTER("drop_temporary_table");
DBUG_PRINT("tmptable", ("closing table: '%s'.'%s'",
table_list->db, table_list->table_name));
@@ -2094,7 +2154,22 @@ int drop_temporary_table(THD *thd, TABLE
unlock the table and remove the table from this list.
*/
mysql_lock_remove(thd, thd->lock, table);
- close_temporary_table(thd, table, 1, 1);
+
+ if (mts_slave)
+ {
+ thd_temp= mts_get_coordinator_thd();
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+ }
+ else
+ {
+ thd_temp= thd;
+ }
+
+ close_temporary_table(thd_temp, table, 1, 1);
+
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
+
DBUG_RETURN(0);
}
@@ -2125,7 +2200,7 @@ void close_temporary_table(THD *thd, TAB
passing non-zero value to end_slave via rli->save_temporary_tables
when no temp tables opened, see an invariant below.
*/
- thd->temporary_tables= table->next;
+ thd->temporary_tables= table->next; // mts: see drop_temporary_table()
if (thd->temporary_tables)
table->next->prev= 0;
}
@@ -2631,7 +2706,19 @@ bool open_table(THD *thd, TABLE_LIST *ta
if (table_list->open_type != OT_BASE_ONLY &&
! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
{
- for (table= thd->temporary_tables; table ; table=table->next)
+ bool mts_slave= mts_is_coord_or_worker(thd);
+ TABLE *temporary_tables;
+ if (mts_slave)
+ {
+ temporary_tables= mts_get_coordinator_thd()->temporary_tables;
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+ }
+ else
+ {
+ temporary_tables= thd->temporary_tables;
+ }
+
+ for (table= temporary_tables; table ; table=table->next)
{
if (table->s->table_cache_key.length == key_length +
TMP_TABLE_KEY_EXTRA &&
@@ -2651,14 +2738,20 @@ bool open_table(THD *thd, TABLE_LIST *ta
(ulong) table->query_id, (uint) thd->server_id,
(ulong) thd->variables.pseudo_thread_id));
my_error(ER_CANT_REOPEN_TABLE, MYF(0), table->alias);
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
DBUG_RETURN(TRUE);
}
table->query_id= thd->query_id;
thd->thread_specific_used= TRUE;
DBUG_PRINT("info",("Using temporary table"));
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
goto reset;
}
}
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
}
if (table_list->open_type == OT_TEMPORARY_ONLY ||
@@ -5851,14 +5944,29 @@ TABLE *open_table_uncached(THD *thd, con
if (add_to_temporary_tables_list)
{
+ TABLE **ptr_temporary_tables;
+ bool mts_slave= mts_is_coord_or_worker(thd);
+
+ if (mts_slave)
+ {
+ ptr_temporary_tables= &mts_get_coordinator_thd()->temporary_tables;
+ mysql_mutex_lock(mts_get_temp_table_mutex());
+ }
+ else
+ {
+ ptr_temporary_tables= &thd->temporary_tables;
+ }
+
/* growing temp list at the head */
- tmp_table->next= thd->temporary_tables;
+ tmp_table->next= *ptr_temporary_tables;
if (tmp_table->next)
tmp_table->next->prev= tmp_table;
- thd->temporary_tables= tmp_table;
- thd->temporary_tables->prev= 0;
+ *ptr_temporary_tables= tmp_table;
+ (*ptr_temporary_tables)->prev= 0;
if (thd->slave_thread)
slave_open_temp_tables++;
+ if (mts_slave)
+ mysql_mutex_unlock(mts_get_temp_table_mutex());
}
tmp_table->pos_in_table_list= 0;
DBUG_PRINT("tmptable", ("opened table: '%s'.'%s' 0x%lx", tmp_table->s->db.str,
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110120133900-ygoyseep8798hy84.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3270) WL#5754 | Andrei Elkin | 20 Jan |