#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3273 Andrei Elkin 2011-02-17
wl#5754 Query parallel replication.
Fixed various issues incl the preliminary review comments, stored routines handling.
@ mysql-test/suite/rpl/r/rpl_packet.result
results updated.
@ mysql-test/suite/rpl/t/rpl_packet.test
making a hashing fixes in order the test to pass.
todo: refine logics of max_allowed_packed for master & slave.
@ mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
adding stored routines testing.
@ sql/binlog.cc
correcting memory allocation to be in thd's memroot.
@ sql/field.cc
adding comments to asserts.
@ sql/handler.cc
adding comments to asserts.
@ sql/log_event.cc
adding comments and correcting clearence of binlog_updated_db_names to not let
BEGIN, COMMIT in particular to get the updated list.
@ sql/log_event.h
adding commits, and interfaces to helper functions.
@ sql/rpl_rli.cc
relocalating helper functions to rpl_slave.cc.
@ sql/rpl_rli.h
improving comments.
@ sql/rpl_slave.cc
comments explaining close_temp_tables() not to run by Workers.
Accepting relocated functions.
@ sql/sql_base.cc
Correcting and simplifying logics for the temp table parallel support.
In particular close_temporary_tables() does not need to know about thd of the caller.
@ sql/sql_class.cc
Correcting logics of merging the updated db:s of a child to the parent's top-level.
@ sql/sql_class.h
adding a necessary cleanup method.
@ sql/sys_vars.cc
Added a system variable (todo/fixme: may turn out to be unnecessary though).
modified:
mysql-test/suite/rpl/r/rpl_packet.result
mysql-test/suite/rpl/t/rpl_packet.test
mysql-test/suite/rpl/t/rpl_parallel_multi_db.test
sql/binlog.cc
sql/field.cc
sql/handler.cc
sql/log_event.cc
sql/log_event.h
sql/mysqld.cc
sql/mysqld.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_slave.cc
sql/sql_base.cc
sql/sql_class.cc
sql/sql_class.h
sql/sys_vars.cc
=== modified file 'mysql-test/suite/rpl/r/rpl_packet.result'
--- a/mysql-test/suite/rpl/r/rpl_packet.result 2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/r/rpl_packet.result 2011-02-17 19:53:30 +0000
@@ -23,12 +23,12 @@ select * from information_schema.session
VARIABLE_NAME VARIABLE_VALUE
SLAVE_RUNNING ON
drop database DB_NAME_OF_MAX_LENGTH_AKA_NAME_LEN_64_BYTES_____________________;
-SET @@global.max_allowed_packet=4096;
-SET @@global.net_buffer_length=4096;
+SET @@global.max_allowed_packet=4096 + (floor(64 * 3 * 254 / 1024) + 1) * 1024;
+SET @@global.net_buffer_length=@@global.max_allowed_packet;
include/stop_slave.inc
include/start_slave.inc
CREATE TABLE `t1` (`f1` LONGTEXT) ENGINE=MyISAM;
-INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
include/wait_for_slave_io_error.inc [errno=1153]
Last_IO_Error = 'Got a packet bigger than 'max_allowed_packet' bytes'
include/stop_slave_sql.inc
@@ -51,10 +51,10 @@ include/wait_for_slave_to_start.inc
DROP TABLE t1;
select @@global.max_allowed_packet;
@@global.max_allowed_packet
-4096
+53248
select @@global.net_buffer_length;
@@global.net_buffer_length
-4096
+53248
select @@global.max_allowed_packet;
@@global.max_allowed_packet
1024
@@ -62,7 +62,7 @@ select @@global.net_buffer_length;
@@global.net_buffer_length
1024
CREATE TABLE t1 (a TEXT) ENGINE=MyISAM;
-INSERT INTO t1 VALUES (REPEAT('a', 2048));
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
# 1153 = ER_NET_PACKET_TOO_LARGE
include/wait_for_slave_io_error.inc [errno=1153]
Last_IO_Error = 'Got a packet bigger than 'max_allowed_packet' bytes'
=== modified file 'mysql-test/suite/rpl/t/rpl_packet.test'
--- a/mysql-test/suite/rpl/t/rpl_packet.test 2011-02-12 08:32:05 +0000
+++ b/mysql-test/suite/rpl/t/rpl_packet.test 2011-02-17 19:53:30 +0000
@@ -63,8 +63,20 @@ connection master;
# Change the max packet size on master
-SET @@global.max_allowed_packet=4096;
-SET @@global.net_buffer_length=4096;
+# Todo: improve over-max_allowed_packet size events block on the slave.
+# The current size checking algorithm is not presize to allow large event
+# to slip it. Reject happens according to the guard:
+# if (data_len > max(max_allowed_packet,
+# opt_binlog_rows_event_max_size + MAX_LOG_EVENT_HEADER))
+# However, MAX_LOG_EVENT_HEADER is a conservative estimate so if the actual
+# header size is less the extra data let in the slave.
+
+# Adding the max size of the query log event status as
+# MAX_DBS_IN_QUERY_MTS * (1 + NAME_LEN) to make the master not fail to read
+# an event itself.
+
+SET @@global.max_allowed_packet=4096 + (floor(64 * 3 * 254 / 1024) + 1) * 1024;
+SET @@global.net_buffer_length=@@global.max_allowed_packet;
# Restart slave for new setting to take effect
connection slave;
@@ -82,13 +94,8 @@ sync_slave_with_master;
connection master;
-INSERT INTO `t1`(`f1`) VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa!
aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa2048');
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
---disable_query_log
---disable_result_log
-select sleep(300);
---enable_result_log
---enable_query_log
#
# Bug#42914: The slave I/O thread must stop after trying to read the above
@@ -192,7 +199,7 @@ CREATE TABLE t1 (a TEXT) ENGINE=MyISAM;
# Create big row event.
--connection master
-INSERT INTO t1 VALUES (REPEAT('a', 2048));
+INSERT INTO t1 VALUES (REPEAT('a', @@global.max_allowed_packet));
# Slave IO thread should stop with error when trying to read the big event.
--connection slave
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_multi_db.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test 2011-02-03 09:28:27 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_multi_db.test 2011-02-17 19:53:30 +0000
@@ -77,6 +77,116 @@ while ($k)
sync_slave_with_master;
#
+# 2. Case of invoked routines
+#
+
+# A. Functions
+
+# create functions & run load
+let $n= $dbs;
+# n'th db func is defined through n-1'th except n == 1
+let $n1= $n;
+dec $n1;
+
+connection master;
+
+--disable_query_log
+
+while ($n1)
+{
+ let $m= `select floor(rand()*$tables) + 1`;
+
+ delimiter |;
+
+ eval create function `d$n`.`func` (a int) returns int
+ begin
+ insert into `d$n`.`t$m` values (`d$n1`.`func`(1));
+ return 1;
+ end|
+
+ delimiter ;|
+
+ dec $n;
+ dec $n1;
+}
+
+delimiter |;
+
+eval create function `d1`.`func` (a int) returns int
+ begin
+ insert into `d1`.`t$m` values (0);
+ return 1;
+ end|
+
+delimiter ;|
+
+
+# invoke...
+
+let $k= $queries;
+
+while ($k)
+{
+ let $n= `select floor(rand()*$dbs) + 1`;
+ let $m= `select floor(rand()*$tables) + 1`;
+ let $n1= $n;
+ dec $n1;
+ if ($n1)
+ {
+ eval insert into d$n.t$m values (`d$n1`.`func`(1));
+ }
+ dec $k;
+}
+
+--enable_query_log
+
+sync_slave_with_master;
+
+# B. Triggers
+
+connection master;
+
+# create triggers & run load
+let $n= $dbs;
+# n'th db tables won't have any trigger to avoid circular dependency
+let $n1= $n;
+dec $n1;
+
+--disable_query_log
+while ($n1)
+{
+ let $m= $tables;
+
+ while ($m)
+ {
+ eval create trigger `d$n1`.`trg_t$m` before insert on `d$n1`.`t$m` for each row insert into `d$n`.`t$m` values(1);
+ dec $m;
+ }
+ dec $n;
+ dec $n1;
+}
+--enable_query_log
+
+# invoke...
+
+let $k= $queries;
+
+--disable_query_log
+--disable_warnings
+while ($k)
+{
+ let $n= `select floor(rand()*$dbs + 1)`;
+ let $m= `select floor(rand()*$tables + 1)`;
+ eval insert into d$n.t$n values (2);
+ dec $k;
+}
+--enable_warnings
+--enable_query_log
+
+
+sync_slave_with_master;
+
+#
# Consistency check
#
@@ -95,11 +205,6 @@ while($n)
#
-# 2. Case of invoked routines (TODO: add parallel support to CREATE trig,sf().
-#
-
-
-#
# Clean-up
#
=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc 2011-02-12 08:32:05 +0000
+++ b/sql/binlog.cc 2011-02-17 19:53:30 +0000
@@ -4752,7 +4752,7 @@ int THD::decide_logging_format(TABLE_LIS
}
if (binlog_updated_db_names->elements < MAX_DBS_IN_QUERY_MTS + 1)
{
- char *after_db= strdup(table->db);
+ char *after_db= strdup_root(mem_root, table->db);
if (binlog_updated_db_names->elements != 0)
{
List_iterator<char> it(*get_binlog_updated_db_names());
=== modified file 'sql/field.cc'
--- a/sql/field.cc 2011-01-20 13:39:00 +0000
+++ b/sql/field.cc 2011-02-17 19:53:30 +0000
@@ -3736,7 +3736,13 @@ longlong Field_long::val_int(void)
ASSERT_COLUMN_MARKED_FOR_READ;
int32 j;
/* See the comment in Field_long::store(long long) */
+ /*
+ About (current_thd)->slave_thread alternative,
+ MTS coordinator open/closes a temp table while the rest of operation
+ is done by Workers.
+ */
DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+
#ifdef WORDS_BIGENDIAN
if (table->s->db_low_byte_first)
j=sint4korr(ptr);
@@ -6308,7 +6314,7 @@ int Field_string::store(const char *from
const char *cannot_convert_error_pos;
const char *from_end_pos;
- /* See the comment for Field_long::store(long long) */
+ /* See the comment for Field_long::store(long long) and Field_long::val_int */
DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
copy_length= well_formed_copy_nchars(field_charset,
@@ -6458,7 +6464,7 @@ String *Field_string::val_str(String *va
String *val_ptr)
{
ASSERT_COLUMN_MARKED_FOR_READ;
- /* See the comment for Field_long::store(long long) */
+ /* See the comment for Field_long::store(long long) and Field_long::val_int */
DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
uint length;
if (table->in_use->variables.sql_mode &
=== modified file 'sql/handler.cc'
--- a/sql/handler.cc 2011-01-20 13:39:00 +0000
+++ b/sql/handler.cc 2011-02-17 19:53:30 +0000
@@ -2127,8 +2127,13 @@ void **handler::ha_data(THD *thd) const
THD *handler::ha_thd(void) const
{
+ /*
+ About current_thd->slave_thread alternative,
+ MTS coordinator open/closes a temp table while the rest of operation
+ is done by Workers.
+ */
DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
- current_thd->slave_thread /* mts worker */);
+ current_thd->slave_thread);
return (table && table->in_use) ? table->in_use : current_thd;
}
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-02-12 08:32:05 +0000
+++ b/sql/log_event.cc 2011-02-17 19:53:30 +0000
@@ -2507,7 +2507,10 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(g.group_relay_log_name == NULL);
}
} while (it++ && mts_number_dbs() != MAX_DBS_IN_QUERY_MTS + 1);
- // releasing the Coord's mem-root from the updated dbs
+
+ // Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
+ // point because the root is no longer needed along remained part of Coordinator's
+ // execution flow.
free_root(rli->info_thd->mem_root, MYF(MY_KEEP_PREALLOC));
}
else // r
@@ -3381,12 +3384,16 @@ bool Query_log_event::write(IO_CACHE* fi
{
List_iterator_fast<char> it(*thd->get_binlog_updated_db_names());
char *db_name;
+
+ DBUG_ASSERT(dbs <= MAX_DBS_IN_QUERY_MTS);
+
while ((db_name= it++))
{
strcpy((char*) start, db_name);
start += strlen(db_name) + 1;
}
}
+ thd->clear_binlog_updated_db_names();
}
/*
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-02-12 08:32:05 +0000
+++ b/sql/log_event.h 2011-02-17 19:53:30 +0000
@@ -260,11 +260,11 @@ struct sql_ex_info
#define IGNORABLE_HEADER_LEN 0
/**
- MTS parameter. The maximum number of databases that a query modifies
- and be logged in the binarry log with a special status variable containing
- the database names to facilitate the parallel applying of the Query-event.
+ The maximum value for @@global.mts_max_updated_dbs server variable.
+ When the actual number of db:s exceeds @@global.mts_max_updated_dbs, the max + 1
+ is put into the mts_updated_dbs status.
*/
-#define MAX_DBS_IN_QUERY_MTS 16
+#define MAX_DBS_IN_QUERY_MTS 16
/*
Max number of possible extra bytes in a replication event compared to a
@@ -354,6 +354,10 @@ struct sql_ex_info
#define Q_INVOKER 11
+/*
+ Number of the updated db:s and their names to be propagated to the slave
+ in order to facilitate the parallel applying of the Query events
+*/
#define Q_UPDATED_DB_NAMES 12
/* Intvar event post-header */
@@ -1082,6 +1086,10 @@ public:
return thd ? thd->db : 0;
}
+ /*
+ The method returns a list of updated by the event databases.
+ Other than in the case of Query-log-event the list is just one item.
+ */
virtual List<char>* mts_get_dbs()
{
List<char> *res= new List<char>;
@@ -1888,7 +1896,7 @@ public:
*/
virtual List<char>* mts_get_dbs()
{
- DBUG_ASSERT(mts_updated_dbs > 0 && mts_updated_dbs <= MAX_DBS_IN_QUERY_MTS + 1);
+ //compile_time_assert(mts_updated_dbs <= MAX_DBS_IN_QUERY_MTS + 1);
List<char> *res= new List<char>;
if (mts_updated_dbs == MAX_DBS_IN_QUERY_MTS + 1)
res->push_back((char*) get_db());
@@ -4356,6 +4364,12 @@ bool event_checksum_test(uchar *buf, ulo
uint8 get_checksum_alg(const char* buf, ulong len);
extern TYPELIB binlog_checksum_typelib;
+// MTS temp table support needed by sql_base.cc
+THD* mts_get_coordinator_thd();
+THD* mts_get_worker_thd();
+mysql_mutex_t* mts_get_temp_table_mutex();
+bool mts_is_worker(THD *thd);
+
/**
@} (end of group Replication)
*/
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2011-02-12 08:32:05 +0000
+++ b/sql/mysqld.cc 2011-02-17 19:53:30 +0000
@@ -472,6 +472,7 @@ ulong opt_mts_partition_hash_soft_max;
ulonglong opt_mts_pending_jobs_size_max;
ulong opt_mts_coordinator_basic_nap;
ulong opt_mts_worker_underrun_level;
+ulong opt_mts_master_updated_dbs_max;
ulong thread_cache_size=0;
ulong binlog_cache_size=0;
ulonglong max_binlog_cache_size=0;
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2011-02-12 08:32:05 +0000
+++ b/sql/mysqld.h 2011-02-17 19:53:30 +0000
@@ -186,6 +186,7 @@ extern ulong opt_mts_partition_hash_soft
extern ulonglong opt_mts_pending_jobs_size_max;
extern ulong opt_mts_coordinator_basic_nap;
extern ulong opt_mts_worker_underrun_level;
+extern ulong opt_mts_master_updated_dbs_max;
extern uint max_user_connections;
extern ulong what_to_log,flush_time;
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-02-12 08:32:05 +0000
+++ b/sql/rpl_rli.cc 2011-02-17 19:53:30 +0000
@@ -202,38 +202,6 @@ void Relay_log_info::reset_notified_chec
checkpoint_seqno= 0;
}
-mysql_mutex_t* mts_get_temp_table_mutex()
-{
- return &active_mi->rli->mts_temp_tables_lock;
-}
-
-THD* mts_get_coordinator_thd()
-{
- Slave_worker *w;
- return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
- NULL : !(w= active_mi->rli->get_current_worker()) ?
- NULL : w->c_rli->info_thd;
-}
-
-THD* mts_get_worker_thd()
-{
- Slave_worker *w;
- return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
- NULL : !(w= active_mi->rli->get_current_worker()) ?
- NULL : w->w_rli->info_thd;
-}
-
-// TODO: remove the query in parallel option, rename mts_is_worker and
-// s/ SYSTEM_THREAD_SLAVE_SQL/SYSTEM_THREAD_SLAVE_WORKER/
-bool mts_is_coord_or_worker(THD *thd)
-{
- return
- thd->slave_thread &&
- thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
- (mts_get_worker_thd() != NULL);
-}
-
-
/**
The method can be run both by C having the Main (coord) rli context and
by W having both the main and the Local (worker) rli context.
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-02-12 08:32:05 +0000
+++ b/sql/rpl_rli.h 2011-02-17 19:53:30 +0000
@@ -477,7 +477,7 @@ public:
ulong mts_recovery_index; // running index of recoverable groups
/*
temporary tables are held by Coordinator though are created and dropped
- if explicilty by Workers. The following lock has to be taken by either party
+ explicilty by Workers. The following lock has to be taken by either party
in order to conduct any operation in the temp tables placeholder, incl.
find, drop, create, open.
*/
@@ -732,4 +732,5 @@ private:
};
bool mysql_show_relaylog_events(THD* thd);
+
#endif /* RPL_RLI_H */
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-02-12 08:32:05 +0000
+++ b/sql/rpl_slave.cc 2011-02-17 19:53:30 +0000
@@ -3792,7 +3792,9 @@ err:
{
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
- thd->system_thread= NON_SYSTEM_THREAD; // tt closing work/around
+ // to avoid close_temporary_tables() closing temp tables as those
+ // are Coordinator's burden.
+ thd->system_thread= NON_SYSTEM_THREAD;
delete thd;
mysql_mutex_unlock(&LOCK_thread_count);
}
@@ -7101,6 +7103,38 @@ err:
DBUG_RETURN(ret);
}
+
+/* MTS temp table support */
+
+mysql_mutex_t* mts_get_temp_table_mutex()
+{
+ return &active_mi->rli->mts_temp_tables_lock;
+}
+
+THD* mts_get_coordinator_thd()
+{
+ Slave_worker *w;
+ return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+ NULL : !(w= active_mi->rli->get_current_worker()) ?
+ NULL : w->c_rli->info_thd;
+}
+
+THD* mts_get_worker_thd()
+{
+ Slave_worker *w;
+ return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
+ NULL : !(w= active_mi->rli->get_current_worker()) ?
+ NULL : w->w_rli->info_thd;
+}
+
+bool mts_is_worker(THD *thd)
+{
+ return
+ thd->slave_thread &&
+ thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
+ (mts_get_worker_thd() != NULL);
+}
+
/**
@} (end of group Replication)
*/
=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc 2011-02-12 08:32:05 +0000
+++ b/sql/sql_base.cc 2011-02-17 19:53:30 +0000
@@ -58,12 +58,6 @@
#include <io.h>
#endif
-// MTS temp table support
-extern THD* mts_get_coordinator_thd();
-extern THD* mts_get_worker_thd();
-extern mysql_mutex_t* mts_get_temp_table_mutex();
-extern bool mts_is_coord_or_worker(THD *thd);
-
bool
No_such_table_error_handler::handle_condition(THD *,
uint sql_errno,
@@ -1198,16 +1192,16 @@ bool close_cached_connection_tables(THD
static void mark_temp_tables_as_free_for_reuse(THD *thd)
{
#ifndef EMBEDDED_LIBRARY
- bool mts_slave= mts_is_coord_or_worker(thd);
- TABLE *temporary_tables= mts_slave ?
- mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+ bool mts_slave= mts_is_worker(thd);
+ TABLE **ptr_temporary_tables= mts_slave ?
+ &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
if (mts_slave)
mysql_mutex_lock(mts_get_temp_table_mutex());
#else
- TABLE *temporary_tables= thd->temporary_tables;
+ TABLE **ptr_temporary_tables= &thd->temporary_tables;
#endif
- for (TABLE *table= temporary_tables; table ; table=table->next)
+ for (TABLE *table= *ptr_temporary_tables; table ; table=table->next)
{
if ((table->query_id == thd->query_id) && ! table->open_by_handler)
mark_tmp_table_for_reuse(table);
@@ -1606,43 +1600,21 @@ bool close_temporary_tables(THD *thd)
/* Assume thd->variables.option_bits has OPTION_QUOTE_SHOW_CREATE */
bool was_quote_show= TRUE;
bool error= 0;
- TABLE **ptr_temporary_tables;
-#ifndef EMBEDDED_LIBRARY
- bool mts_slave= mts_is_coord_or_worker(thd);
- TABLE *temporary_tables= mts_slave ?
- mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
-
- if (mts_slave)
- mysql_mutex_lock(mts_get_temp_table_mutex());
-#else
- TABLE *temporary_tables= thd->temporary_tables;
-#endif
- ptr_temporary_tables= &temporary_tables;
+ /* TODO mts: assert if Woker then thd->temporary_tables == NULL */
- if (!temporary_tables)
- {
-#ifndef EMBEDDED_LIBRARY
- if (mts_slave)
- mysql_mutex_lock(mts_get_temp_table_mutex());
-#endif
+ if (!thd->temporary_tables)
DBUG_RETURN(FALSE);
- }
if (!mysql_bin_log.is_open())
{
TABLE *tmp_next;
- for (table= temporary_tables; table; table= tmp_next)
+ for (table= thd->temporary_tables; table; table= tmp_next)
{
tmp_next= table->next;
close_temporary(table, 1, 1);
}
- *ptr_temporary_tables= 0;
-#ifndef EMBEDDED_LIBRARY
- if (mts_slave)
- mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
-
+ thd->temporary_tables= 0;
DBUG_RETURN(FALSE);
}
@@ -1660,7 +1632,7 @@ bool close_temporary_tables(THD *thd)
of sublists of equal pseudo_thread_id
*/
- for (prev_table= temporary_tables, table= prev_table->next;
+ for (prev_table= thd->temporary_tables, table= prev_table->next;
table;
prev_table= table, table= table->next)
{
@@ -1669,7 +1641,7 @@ bool close_temporary_tables(THD *thd)
{
if (!found_user_tables)
found_user_tables= true;
- for (prev_sorted= NULL, sorted= temporary_tables; sorted != table;
+ for (prev_sorted= NULL, sorted= thd->temporary_tables; sorted != table;
prev_sorted= sorted, sorted= sorted->next)
{
if (!is_user_table(sorted) ||
@@ -1684,7 +1656,7 @@ bool close_temporary_tables(THD *thd)
}
else
{
- *ptr_temporary_tables= table;
+ thd->temporary_tables= table;
}
table= prev_table;
break;
@@ -1701,7 +1673,7 @@ bool close_temporary_tables(THD *thd)
}
/* scan sorted tmps to generate sequence of DROP */
- for (table= temporary_tables; table; table= next)
+ for (table= thd->temporary_tables; table; table= next)
{
if (is_user_table(table))
{
@@ -1774,12 +1746,8 @@ bool close_temporary_tables(THD *thd)
}
if (!was_quote_show)
thd->variables.option_bits&= ~OPTION_QUOTE_SHOW_CREATE; /* restore option */
- *ptr_temporary_tables= 0;
+ thd->temporary_tables=0;
-#ifndef EMBEDDED_LIBRARY
- if (mts_slave)
- mysql_mutex_unlock(mts_get_temp_table_mutex());
-#endif
DBUG_RETURN(error);
}
@@ -2074,15 +2042,15 @@ TABLE *find_temporary_table(THD *thd,
{
TABLE *table= NULL;
#ifndef EMBEDDED_LIBRARY
- bool mts_slave= mts_is_coord_or_worker(thd);
- TABLE *temporary_tables= mts_slave ?
- mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+ bool mts_slave= mts_is_worker(thd);
+ TABLE **ptr_temporary_tables= mts_slave ?
+ &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
if (mts_slave)
mysql_mutex_lock(mts_get_temp_table_mutex());
#else
- TABLE *temporary_tables= thd->temporary_tables;
+ TABLE **ptr_temporary_tables= &thd->temporary_tables;
#endif
- for (table= temporary_tables; table; table= table->next)
+ for (table= *ptr_temporary_tables; table; table= table->next)
{
if (table->s->table_cache_key.length == table_key_length &&
!memcmp(table->s->table_cache_key.str, table_key, table_key_length))
@@ -2133,7 +2101,7 @@ int drop_temporary_table(THD *thd, TABLE
{
TABLE *table;
#ifndef EMBEDDED_LIBRARY
- bool mts_slave= mts_is_coord_or_worker(thd);
+ bool mts_slave= mts_is_worker(thd);
#endif
THD *thd_temp;
@@ -2716,16 +2684,16 @@ bool open_table(THD *thd, TABLE_LIST *ta
! (flags & MYSQL_OPEN_SKIP_TEMPORARY))
{
#ifndef EMBEDDED_LIBRARY
- bool mts_slave= mts_is_coord_or_worker(thd);
- TABLE *temporary_tables= mts_slave ?
- mts_get_coordinator_thd()->temporary_tables : thd->temporary_tables;
+ bool mts_slave= mts_is_worker(thd);
+ TABLE **ptr_temporary_tables= mts_slave ?
+ &mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
if (mts_slave)
mysql_mutex_lock(mts_get_temp_table_mutex());
#else
- TABLE *temporary_tables= thd->temporary_tables;
+ TABLE **ptr_temporary_tables= &thd->temporary_tables;
#endif
- for (table= temporary_tables; table ; table=table->next)
+ for (table= *ptr_temporary_tables; table ; table=table->next)
{
if (table->s->table_cache_key.length == key_length +
TMP_TABLE_KEY_EXTRA &&
@@ -5959,7 +5927,7 @@ TABLE *open_table_uncached(THD *thd, con
{
#ifndef EMBEDDED_LIBRARY
TABLE **ptr_temporary_tables;
- bool mts_slave= mts_is_coord_or_worker(thd);
+ bool mts_slave= mts_is_worker(thd);
ptr_temporary_tables= mts_slave?
&mts_get_coordinator_thd()->temporary_tables : &thd->temporary_tables;
if (mts_slave)
=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc 2011-02-12 08:32:05 +0000
+++ b/sql/sql_class.cc 2011-02-17 19:53:30 +0000
@@ -982,6 +982,7 @@ void THD::init_for_queries()
transaction.xid_state.in_thd=1;
}
+
/*
Do what's needed when one invokes change user
@@ -1069,6 +1070,7 @@ void THD::cleanup(void)
mysql_mutex_unlock(&LOCK_user_locks);
ull= NULL;
}
+
cleanup_done=1;
DBUG_VOID_RETURN;
}
@@ -1396,6 +1398,7 @@ void THD::cleanup_after_query()
stmt_depends_on_first_successful_insert_id_in_prev_stmt= 0;
auto_inc_intervals_in_cur_stmt_for_binlog.empty();
rand_used= 0;
+ binlog_updated_db_names= NULL;
}
if (first_successful_insert_id_in_cur_stmt > 0)
{
@@ -1413,7 +1416,6 @@ void THD::cleanup_after_query()
/* reset table map for multi-table update */
table_map_for_update= 0;
m_binlog_invoker= FALSE;
- binlog_updated_db_names= NULL;
}
@@ -3394,7 +3396,7 @@ void THD::reset_sub_statement_state(Sub_
first_successful_insert_id_in_prev_stmt;
backup->first_successful_insert_id_in_cur_stmt=
first_successful_insert_id_in_cur_stmt;
- backup->binlog_updated_db_names= binlog_updated_db_names;
+ //backup->binlog_updated_db_names= binlog_updated_db_names;
if ((!lex->requires_prelocking() || is_update_query(lex->sql_command)) &&
!is_current_stmt_binlog_format_row())
@@ -3415,7 +3417,7 @@ void THD::reset_sub_statement_state(Sub_
cuted_fields= 0;
transaction.savepoints= 0;
first_successful_insert_id_in_cur_stmt= 0;
- binlog_updated_db_names= NULL;
+ //binlog_updated_db_names= NULL;
}
@@ -3478,7 +3480,8 @@ void THD::restore_sub_statement_state(Su
*/
examined_row_count+= backup->examined_row_count;
cuted_fields+= backup->cuted_fields;
- binlog_updated_db_names= backup->binlog_updated_db_names;
+ //if (binlog_updated_db_names)
+ // binlog_updated_db_names->concat(backup->binlog_updated_db_names);
DBUG_VOID_RETURN;
}
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h 2011-02-12 08:32:05 +0000
+++ b/sql/sql_class.h 2011-02-17 19:53:30 +0000
@@ -1739,6 +1739,8 @@ public:
List<char> * get_binlog_updated_db_names() {
return binlog_updated_db_names;
}
+ void clear_binlog_updated_db_names() { binlog_updated_db_names= NULL; }
+
#endif /* MYSQL_CLIENT */
public:
=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc 2011-02-12 08:32:05 +0000
+++ b/sql/sys_vars.cc 2011-02-17 19:53:30 +0000
@@ -3198,6 +3198,13 @@ static Sys_var_mybool Sys_slave_local_ti
"time value to implicitly affected timestamp columms. Otherwise (default) "
"it installs prescribed by the master value",
GLOBAL_VAR(opt_mts_slave_local_timestamp), CMD_LINE(OPT_ARG), DEFAULT(FALSE));
+static Sys_var_ulong Sys_master_updated_dbs_max(
+ "mts_master_updated_dbs_max",
+ "The maximum number of databases that a query log event can contain in its header "
+ "in order to faciliate the parallel applying on the slave.",
+ GLOBAL_VAR(opt_mts_master_updated_dbs_max), CMD_LINE(REQUIRED_ARG),
+ VALID_RANGE(0, MAX_DBS_IN_QUERY_MTS),
+ DEFAULT(16), BLOCK_SIZE(1));
static Sys_var_ulong Sys_mts_partition_hash_soft_max(
"mts_partition_hash_soft_max",
"Number of records in the mts partition hash below which "
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110217195330-b8fjghldzq2lgri2.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3273) WL#5754 | Andrei Elkin | 17 Feb |