#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3283 Andrei Elkin 2011-06-08
wl#5569 MTS
The following has been done with this patch.
Some cleanup;
Sceleton to fix a recovery flaw that discussed with Alfranio;
Fixes for Blackhole (reviewer warned!);
Finalization of turning STOP-SLAVE to end MTS with consistent state.
covering old Load-data related events with possible erroring out;
Fixing circular replication. The failure was due to the substituting an ignored event
Rotate was not treated as one that changes group_master_log coordinates (therefore
MTS-checkpoint).
Three tests were blocked for MTS execution with comments why. They (some) might
be converted to be MTS-friendly though.
@ mysql-test/suite/rpl/t/rpl_bug26395.test
making the test executed only in STS (single-threaded slave).
@ mysql-test/suite/rpl/t/rpl_cross_version.test
making the test executed only in STS (single-threaded slave).
@ mysql-test/suite/rpl/t/rpl_packet.test
making the test executed only in STS (single-threaded slave).
@ mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
Test can run in both MTS and STS modes but required
changes due to a possibility MTS be stopped with its own
error different that a failing STS expects.
Another issue *not-sorted-yet* with this test is it requires
recovery to work flawlessly. Leaving further care of it to Alfranio.
@ sql/binlog.cc
Error constant names changed.
@ sql/log_event.cc
Initialization of the relocated to Log_event::m_mts_event_ends_group;
APPEND_BLOCK_EVENT will be accepted but never assigned because the following
*old* server EXEC_LOAD_EVENT can't be made supported yet.
See rpl_cross_version.test as example how MTS could fail with ER_MTS_CANT_PARALLEL error.
Part of sceleton patch for recovery - rli->reset_notified_checkpoint(0).
And, making substituting an ignored event server_id=0 Rotate to enter the checkpoint branch
with modifying the if condition.
@ sql/log_event.h
some cleanup to shift a piece of definitions to more appropriate position;
Generalizing a special marking for group that is not started with BEGIN
and contains multiple events. Although such group deals with Query_log_event,
the bool member is moved to the base class and renamed: Log_event::m_mts_event_ends_group.
@ sql/rpl_rli.cc
Part of sceleton patch for recovery - + if (w->checkpoint_notified) etc.
@ sql/rpl_rli.h
reset_notified_checkpoint() accepts an arg of how many bits to shift for all workers in
their bitmap.
@ sql/rpl_rli_pdb.cc
just a todo to convert to a piece of code for recovery.
@ sql/rpl_rli_pdb.h
Recovery related new members are added.
@ sql/rpl_slave.cc
comments and warnings relating to handling of stopped MTS.
@ sql/share/errmsg-utf8.txt
Error const:s names are editted;
a new error is added.
@ storage/blackhole/ha_blackhole.cc
Blackhole's rule for slave thread is extended to allow MTS.
modified:
mysql-test/suite/rpl/t/rpl_bug26395.test
mysql-test/suite/rpl/t/rpl_cross_version.test
mysql-test/suite/rpl/t/rpl_packet.test
mysql-test/suite/rpl/t/rpl_slave_grp_exec.test
sql/binlog.cc
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/share/errmsg-utf8.txt
storage/blackhole/ha_blackhole.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_bug26395.test'
--- a/mysql-test/suite/rpl/t/rpl_bug26395.test 2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/t/rpl_bug26395.test 2011-06-08 20:18:08 +0000
@@ -37,6 +37,10 @@ source include/have_innodb.inc;
source include/have_debug.inc;
source include/master-slave.inc;
+# test adapts simulation of incomplete transaction that MTS does not tolerate
+# when is stopped. So it reacts with an error whereas the single-threaded is fine.
+-- source include/not_mts_slave_parallel_workers.inc
+
--echo ==== Initialize ====
@@ -67,7 +71,6 @@ source include/sync_slave_io_with_master
# Sync slave's SQL thread.
sync_with_master 0;
-
--echo ==== Verify results on slave ====
source include/stop_slave.inc;
=== modified file 'mysql-test/suite/rpl/t/rpl_cross_version.test'
--- a/mysql-test/suite/rpl/t/rpl_cross_version.test 2010-12-19 17:15:12 +0000
+++ b/mysql-test/suite/rpl/t/rpl_cross_version.test 2011-06-08 20:18:08 +0000
@@ -17,6 +17,9 @@
# Todo: release it from not_windows
--source include/not_windows.inc
+# EXEC_LOAD_EVENT of 4.1 binlog can't be supported
+-- source include/not_mts_slave_parallel_workers.inc
+
#
# Bug#31240 load data infile replication between (4.0 or 4.1) and 5.1 fails
#
=== modified file 'mysql-test/suite/rpl/t/rpl_packet.test'
--- a/mysql-test/suite/rpl/t/rpl_packet.test 2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_packet.test 2011-06-08 20:18:08 +0000
@@ -11,6 +11,13 @@
# max-out size db name
source include/master-slave.inc;
source include/have_binlog_format_row.inc;
+
+# TODO: Fixing is handed over to Sergei.
+# The test runs slow in MTS mode because of state of MTS at time of the graceful stop.
+# In this case MTS can't stop immediately if there is a Worker that received a BEGIN but never COMMIT.
+-- source include/not_mts_slave_parallel_workers.inc
+
+
call mtr.add_suppression("Slave I/O: Got a packet bigger than 'max_allowed_packet' bytes, Error_code: 1153");
call mtr.add_suppression("Slave I/O: Got fatal error 1236 from master when reading data from binary log:");
@@ -283,6 +290,7 @@ eval SET @@global.max_allowed_packet= $o
DROP TABLE t1;
# Clear Last_IO_Error
+
--source include/stop_slave_sql.inc
RESET SLAVE;
=== modified file 'mysql-test/suite/rpl/t/rpl_slave_grp_exec.test'
--- a/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test 2010-12-19 17:07:28 +0000
+++ b/mysql-test/suite/rpl/t/rpl_slave_grp_exec.test 2011-06-08 20:18:08 +0000
@@ -63,8 +63,20 @@ SELECT * FROM t3 ORDER BY a;
--connection slave
# 1146 = ER_NO_SUCH_TABLE
---let $slave_sql_errno= 1146
---source include/wait_for_slave_sql_error.inc
+# in MTS case error is either of two:
+#--let $slave_sql_errno= 1146,1593
+# whereas in the single-threaded case:
+#--let $slave_sql_errno= 1146
+
+--source include/wait_for_slave_sql_to_stop.inc
+let $slave_sql_errno= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+if (`select $slave_sql_errno != 1146 and $slave_sql_errno != 1593`)
+{
+ --echo Unexpected error: $slave_sql_errno
+ --die
+}
+
+
SHOW TABLES LIKE 't%';
if (`SELECT @@BINLOG_FORMAT = 'ROW'`) {
--replace_regex /AA/AA_for_row_or_XX_for_stmt_mixed/
@@ -110,8 +122,18 @@ UPDATE t1 SET b = 'X' WHERE a = 2;
--connection slave
# 1146 = ER_NO_SUCH_TABLE
---let $slave_sql_errno= 1146
---source include/wait_for_slave_sql_error.inc
+# in MTS case error is either of two:
+#--let $slave_sql_errno= 1146,1593
+# whereas in the single-threaded case:
+#--let $slave_sql_errno= 1146
+
+--source include/wait_for_slave_sql_to_stop.inc
+let $slave_sql_errno= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+if (`select $slave_sql_errno != 1146 and $slave_sql_errno != 1593`)
+{
+ --echo Unexpected error: $slave_sql_errno
+ --die
+}
--connection master
SELECT * FROM t1 ORDER BY a;
@@ -125,6 +147,8 @@ SELECT * FROM t2 ORDER BY a;
--source include/stop_slave_io.inc
RENAME TABLE t3_bak TO t3;
+
+# TODO: recovery. Alfranio it fails to recover here.
--source include/start_slave.inc
--connection master
@@ -156,8 +180,19 @@ COMMIT;
--connection slave
# 1146 = ER_NO_SUCH_TABLE
---let $slave_sql_errno= 1146
---source include/wait_for_slave_sql_error.inc
+# in MTS case error is either of two:
+#--let $slave_sql_errno= 1146,1593
+# whereas in the single-threaded case:
+#--let $slave_sql_errno= 1146
+
+--source include/wait_for_slave_sql_to_stop.inc
+let $slave_sql_errno= query_get_value(SHOW SLAVE STATUS, Last_SQL_Errno, 1);
+if (`select $slave_sql_errno != 1146 and $slave_sql_errno != 1593`)
+{
+ --echo Unexpected error: $slave_sql_errno
+ --die
+}
+
--connection master
SELECT * FROM t1 ORDER BY a;
=== modified file 'sql/binlog.cc'
--- a/sql/binlog.cc 2011-05-24 14:29:35 +0000
+++ b/sql/binlog.cc 2011-06-08 20:18:08 +0000
@@ -4550,8 +4550,8 @@ THD::add_to_binlog_updated_dbs(const cha
if (binlog_accessed_db_names->elements > MAX_DBS_IN_EVENT_MTS)
{
push_warning_printf(this, MYSQL_ERROR::WARN_LEVEL_WARN,
- ER_UPDATED_DBS_GREATER_MAX,
- ER(ER_UPDATED_DBS_GREATER_MAX),
+ ER_MTS_UPDATED_DBS_GREATER_MAX,
+ ER(ER_MTS_UPDATED_DBS_GREATER_MAX),
MAX_DBS_IN_EVENT_MTS);
return;
}
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-06-06 10:51:19 +0000
+++ b/sql/log_event.cc 2011-06-08 20:18:08 +0000
@@ -675,7 +675,7 @@ Log_event::Log_event(THD* thd_arg, uint1
Log_event::Log_event()
:temp_buf(0), exec_time(0), flags(0), crc(0), thd(0),
- checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+ checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), m_mts_event_ends_group(FALSE)
{
server_id= ::server_id;
/*
@@ -695,7 +695,7 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
:temp_buf(0), exec_time(0), cache_type(Log_event::EVENT_INVALID_CACHE),
- crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF)
+ crc(0), checksum_alg(BINLOG_CHECKSUM_ALG_UNDEF), m_mts_event_ends_group(FALSE)
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -2359,7 +2359,7 @@ Log_event::continue_group(Relay_log_info
bool Log_event::contains_partition_info()
{
- return get_type_code() == TABLE_MAP_EVENT ||
+ return (get_type_code() == TABLE_MAP_EVENT) ||
(get_type_code() == QUERY_EVENT && !ends_group() && !starts_group()) ||
(get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
}
@@ -2537,7 +2537,7 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0 ||
ret_worker->id == 0);
}
- else // int_, rand_, user_ var:s
+ else // int_, rand_, user_ var:s, load-data events
{
Log_event *ptr_curr_ev= this;
@@ -2545,7 +2545,8 @@ Slave_worker *Log_event::get_slave_worke
get_type_code() == RAND_EVENT ||
get_type_code() == USER_VAR_EVENT ||
get_type_code() == ROWS_QUERY_LOG_EVENT ||
- get_type_code() == BEGIN_LOAD_QUERY_EVENT);
+ get_type_code() == BEGIN_LOAD_QUERY_EVENT ||
+ get_type_code() == APPEND_BLOCK_EVENT);
insert_dynamic(&rli->curr_group_da, (uchar*) &ptr_curr_ev);
@@ -2629,6 +2630,8 @@ Slave_worker *Log_event::get_slave_worke
strcpy(ptr_g->checkpoint_relay_log_name,
rli->get_group_relay_log_name());
ptr_g->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();
+ ptr_g->shifted= ret_worker->bitmap_shifted;
+ ret_worker->bitmap_shifted= 0;
ret_worker->checkpoint_notified= TRUE;
}
ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
@@ -2878,13 +2881,15 @@ int Log_event::apply_event(Relay_log_inf
wrappped with BEGIN/COMMIT or preceeded by User|Int|Random- var.
MTS has to stop to suggest restart in the permanent sequential mode.
*/
+ rli->report(ERROR_LEVEL, ER_MTS_CANT_PARALLEL,
+ ER(ER_MTS_CANT_PARALLEL),
+ get_type_code(), c_rli->get_event_relay_log_name(),
+ c_rli->get_event_relay_log_pos());
+
+ /* Coordinator cant continue, it marks MTS group status accordingly */
+ c_rli->mts_group_status= Relay_log_info::MTS_KILLED_GROUP;
- // TODO: improve err msg
- rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
- ER(ER_SLAVE_FATAL_ERROR),
- "Can't execute all binlog event in parallel mode");
-
- // destroy possible buffered events of the current group prior to exit
+ /* destroy deferred events */
for (uint k= 0; k < rli->curr_group_da.elements; k++)
{
delete *(Log_event**) dynamic_array_ptr(&rli->curr_group_da, k);
@@ -3760,7 +3765,7 @@ Query_log_event::Query_log_event(const c
auto_increment_increment(1), auto_increment_offset(1),
time_zone_len(0), lc_time_names_number(0), charset_database_number(0),
table_map_for_update(0), master_data_written(0),
- mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS), m_mts_query_ends_group(FALSE)
+ mts_accessed_dbs(OVER_MAX_DBS_IN_EVENT_MTS)
{
ulong data_len;
uint32 tmp;
@@ -6507,7 +6512,7 @@ int Rotate_log_event::do_update_pos(Rela
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
!is_relay_log_event() &&
((!rli->is_parallel_exec() && !rli->is_in_group()) ||
- rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP))
+ rli->mts_group_status != Relay_log_info::MTS_IN_GROUP))
{
mysql_mutex_lock(&rli->data_lock);
DBUG_PRINT("info", ("old group_master_log_name: '%s' "
@@ -6532,7 +6537,7 @@ int Rotate_log_event::do_update_pos(Rela
mysql_mutex_unlock(&rli->data_lock);
rli->flush_info(TRUE); // todo: error branch
if (rli->is_parallel_exec())
- rli->reset_notified_checkpoint();
+ rli->reset_notified_checkpoint(0);
/*
Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-06-05 17:01:51 +0000
+++ b/sql/log_event.h 2011-06-08 20:18:08 +0000
@@ -1099,37 +1099,6 @@ public:
{
return thd ? thd->db : 0;
}
-
- /*
- The method returns a list of updated by the event databases.
- Other than in the case of Query-log-event the list is just one item.
- */
- virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
- {
- List<char> *res= new List<char>;
- res->push_back(strdup_root(mem_root, get_db()));
- return res;
- }
-
- /*
- returns the number of updated by the event databases.
- In other than Query-log-event case that's one.
- */
- virtual uint8 mts_number_dbs() { return 1; }
-
- /*
- Event can be exceptionally marked to force its execution.
- in isolation from any other Workers.
- Other than Query-log-event class should not have any implementation
- of this method.
- */
- virtual void mts_do_isolate_event() { DBUG_ASSERT(0); }
-
- /*
- Verifying whether event is marked to execute in isolation.
- */
- virtual bool mts_is_event_isolated() { return FALSE; }
-
#else
Log_event() : temp_buf(0) {}
/* avoid having to link mysqlbinlog against libpthread */
@@ -1257,6 +1226,10 @@ public:
@note There are incompatile combinations such the referred event
is wrapped with BEGIN/COMMIT. Such cases should be identified
by the caller and treates as an error.
+
+ Notice, even though the func returns TRUE, some events
+ like old LOAD-DATA rooted EXEC_LOAD_EVENT can't run even
+ in isolated parallel mode and MTS would have to stop.
@return TRUE if despite permanent parallel execution mode an event
needs applying in a real isolation that is sequentially.
@@ -1270,11 +1243,9 @@ public:
get_type_code() == LOAD_EVENT ||
get_type_code() == SLAVE_EVENT ||
get_type_code() == CREATE_FILE_EVENT ||
- get_type_code() == APPEND_BLOCK_EVENT ||
- get_type_code() == EXEC_LOAD_EVENT ||
get_type_code() == DELETE_FILE_EVENT ||
get_type_code() == NEW_LOAD_EVENT ||
-
+ get_type_code() == EXEC_LOAD_EVENT ||
get_type_code() == FORMAT_DESCRIPTION_EVENT||
get_type_code() == INCIDENT_EVENT;
@@ -1318,6 +1289,45 @@ public:
*/
Slave_worker *get_slave_worker_id(Relay_log_info *rli);
+ /*
+ The method returns a list of updated by the event databases.
+ Other than in the case of Query-log-event the list is just one item.
+ */
+ virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
+ {
+ List<char> *res= new List<char>;
+ res->push_back(strdup_root(mem_root, get_db()));
+ return res;
+ }
+
+ /*
+ returns the number of updated by the event databases.
+ In other than Query-log-event case that's one.
+ */
+ virtual uint8 mts_number_dbs() { return 1; }
+
+ /*
+ Event can be exceptionally marked to force its execution.
+ in isolation from any other Workers.
+ Other than Query-log-event class should not have any implementation
+ of this method.
+ */
+ /*
+ Event can be indentified as a group terminator and such fact
+ is memoried by the function.
+ */
+ virtual void mts_do_isolate_event()
+ {
+ DBUG_ASSERT(get_type_code() == QUERY_EVENT ||
+ get_type_code() == EXEC_LOAD_EVENT ||
+ get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
+ m_mts_event_ends_group= TRUE;
+ }
+ /*
+ Verifying whether event is marked to execute in isolation.
+ */
+ virtual bool mts_is_event_isolated() { return m_mts_event_ends_group; }
+
/**
Apply the event to the database.
@@ -1451,6 +1461,8 @@ protected:
non-zero. The caller shall decrease the counter by one.
*/
virtual enum_skip_reason do_shall_skip(Relay_log_info *rli);
+
+ bool m_mts_event_ends_group;
#endif
};
@@ -1908,13 +1920,6 @@ public:
uchar mts_accessed_dbs;
char mts_accessed_db_names[MAX_DBS_IN_EVENT_MTS][NAME_LEN];
- /*
- Event can be indentified as a group terminator and such fact
- is memoried by the function.
- */
- virtual void mts_do_isolate_event() { m_mts_query_ends_group= TRUE; }
- virtual bool mts_is_event_isolated() { return m_mts_query_ends_group; }
-
#ifdef MYSQL_SERVER
Query_log_event(THD* thd_arg, const char* query_arg, ulong query_length,
@@ -2024,9 +2029,6 @@ public: /* !!! Public in this pat
(!strncasecmp(query, STRING_WITH_LEN("ROLLBACK"))
&& strncasecmp(query, STRING_WITH_LEN("ROLLBACK TO ")));
}
-private:
-
- bool m_mts_query_ends_group;
};
@@ -3101,6 +3103,13 @@ public:
#endif
/* MTS executes this event sequentially */
virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
+ virtual List<char>* mts_get_dbs(MEM_ROOT *mem_root)
+ {
+ List<char> *res= new List<char>;
+ res->push_back(strdup_root(mem_root, ""));
+ return res;
+ }
+
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-06-06 10:51:19 +0000
+++ b/sql/rpl_rli.cc 2011-06-08 20:18:08 +0000
@@ -192,14 +192,19 @@ void Relay_log_info::reset_notified_rela
to reset the current bitmap and starts using the clean bitmap
indexed from zero of being reset checkpoint_seqno.
*/
-void Relay_log_info::reset_notified_checkpoint()
+void Relay_log_info::reset_notified_checkpoint(ulong shift)
{
if (!is_parallel_exec())
return;
for (uint i= 0; i < workers.elements; i++)
{
Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
- w->checkpoint_notified= FALSE;
+ if (w->checkpoint_notified)
+ {
+ w->bitmap_shifted= 0;
+ w->checkpoint_notified= FALSE;
+ }
+ w->bitmap_shifted += shift; // to reset at passing the accumulate value into GAQ
}
checkpoint_seqno= 0;
}
@@ -1036,7 +1041,7 @@ void Relay_log_info::stmt_done(my_off_t
flush_info(is_transactional() ? TRUE : FALSE); // Alfranio todo: error branch
if (is_parallel_exec())
- reset_notified_checkpoint();
+ reset_notified_checkpoint(0);
}
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-06-06 10:51:19 +0000
+++ b/sql/rpl_rli.h 2011-06-08 20:18:08 +0000
@@ -537,7 +537,7 @@ public:
Coordinator notifies Workers about this event. Coordinator and Workers
maintain a bitmap of executed group that is reset with a new checkpoint.
*/
- void reset_notified_checkpoint();
+ void reset_notified_checkpoint(ulong);
/**
Helper function to do after statement completion.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2011-06-06 10:51:19 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-06-08 20:18:08 +0000
@@ -225,6 +225,7 @@ bool Slave_worker::commit_positions(Log_
my_free(ptr_g->checkpoint_relay_log_name);
ptr_g->checkpoint_relay_log_name= NULL;
+ // TODO: shift `group_execed' << ptr_g->shifted
bitmap_clear_all(&group_execed);
}
// extract an updated relay-log name to store in Worker's rli.
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2011-06-05 17:01:51 +0000
+++ b/sql/rpl_rli_pdb.h 2011-06-08 20:18:08 +0000
@@ -144,6 +144,7 @@ typedef struct st_slave_job_group
my_off_t checkpoint_relay_log_pos; // T-event lop_pos filled by W for CheckPoint
char* checkpoint_relay_log_name;
volatile uchar done; // Flag raised by W, read and reset by C
+ ulong shifted; // shift the last CP bitmap at receiving a new CP
} Slave_job_group;
#define retrieve_job(from, to) \
@@ -265,6 +266,7 @@ public:
long usage_partition; // number of different partitions handled by this worker
volatile bool relay_log_change_notified; // Coord sets and resets, W can read
volatile bool checkpoint_notified; // Coord sets and resets, W can read
+ ulong bitmap_shifted; // shift the last bitmap at receiving new CP
bool wq_overrun_set; // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
/*
We need to make this a dynamic field. /Alfranio
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-06-06 10:51:19 +0000
+++ b/sql/rpl_slave.cc 2011-06-08 20:18:08 +0000
@@ -1067,6 +1067,12 @@ static bool io_slave_killed(THD* thd, Ma
In the event of deffering decision @rli->last_event_start_time waiting
timer is set to force the killed status be accepted upon its expiration.
+ Notice Multi-Threaded-Slave behaives similarly in that when it's being
+ stopped and the current group of assigned events has not yet scheduled
+ completely, Coordinator deferres to accept to leave its read-distribute
+ state. The above timeout ensures waiting won't last endlessly, and in
+ such case an error is repoted.
+
@param thd pointer to a THD instance
@param rli pointer to Relay_log_info instance
@@ -1139,9 +1145,14 @@ static bool sql_slave_killed(THD* thd, R
if (ret == 0)
{
rli->report(WARNING_LEVEL, 0,
+ rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
"slave SQL thread is being stopped in the middle "
"of applying of a group having updated a non-transaction "
- "table; waiting for the group completion ... ");
+ "table; waiting for the group completion ... "
+ :
+ "Coordinator thread of multi-threaded slave is being stopped in the middle "
+ "of assigning a group of events; "
+ "deferring to exit until the group completion ... ");
}
else
{
@@ -1155,7 +1166,8 @@ static bool sql_slave_killed(THD* thd, R
{
ret= TRUE;
rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, ER(ER_SLAVE_FATAL_ERROR),
- msg_stopped);
+ rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP ?
+ msg_stopped : msg_stopped_mts);
}
}
else
@@ -3815,6 +3827,12 @@ pthread_handler_t handle_slave_worker(vo
{
error= slave_worker_exec_job(w, rli);
}
+
+ /*
+ Cleanup after an error requires clear_error() go first.
+ Otherwise assert(!all) in binlog_rollback()
+ */
+ thd->clear_error();
w->cleanup_context(thd, error);
mysql_mutex_lock(&w->jobs_lock);
@@ -3854,7 +3872,6 @@ err:
if (thd)
{
- thd->clear_error();
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
/*
@@ -4136,7 +4153,7 @@ bool mts_checkpoint_routine(Relay_log_in
} // end of commit_positions
*/
- rli->reset_notified_checkpoint();
+ rli->reset_notified_checkpoint(cnt);
end:
set_timespec_nsec(rli->last_clock, 0);
@@ -4179,6 +4196,7 @@ int slave_start_single_worker(Relay_log_
w->curr_group_exec_parts.elements= 0;
w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
w->checkpoint_notified= FALSE;
+ w-> bitmap_shifted= 0;
w->workers= rli->workers; // shallow copying is sufficient
w->this_worker= w;
w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
@@ -5928,6 +5946,9 @@ static Log_event* next_event(Relay_log_i
rli->set_future_event_relay_log_pos(my_b_tell(cur_log));
ev->future_event_relay_log_pos= rli->get_future_event_relay_log_pos();
+ if (hot_log)
+ mysql_mutex_unlock(log_lock);
+
/*
MTS checkpoint in the successful read branch
*/
@@ -5935,11 +5956,10 @@ static Log_event* next_event(Relay_log_i
if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 || force))
{
ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
+ mysql_mutex_unlock(&rli->data_lock);
mts_checkpoint_routine(rli, period, force, TRUE); // TODO: ALFRANIO ERROR
+ mysql_mutex_lock(&rli->data_lock);
}
-
- if (hot_log)
- mysql_mutex_unlock(log_lock);
DBUG_RETURN(ev);
}
DBUG_ASSERT(thd==rli->info_thd);
@@ -6066,7 +6086,10 @@ static Log_event* next_event(Relay_log_i
do
{
+ mysql_mutex_unlock(log_lock);
mts_checkpoint_routine(rli, period, FALSE, FALSE); // TODO: ALFRANIO ERROR
+ mysql_mutex_lock(log_lock);
+
set_timespec_nsec(waittime, period);
thd->enter_cond(log_cond, log_lock,
"Slave has read all relay log; "
@@ -6573,8 +6596,8 @@ int start_slave(THD* thd , Master_info*
{
mi->rli->opt_slave_parallel_workers= 0;
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
- ER_NO_FEATURE_ON_PARALLEL_SLAVE,
- ER(ER_NO_FEATURE_ON_PARALLEL_SLAVE),
+ ER_MTS_FEATURE_IS_NOT_SUPPORTED,
+ ER(ER_MTS_FEATURE_IS_NOT_SUPPORTED),
"UNTIL condtion",
"Slave is started in the sequential execution mode.");
}
@@ -6586,8 +6609,8 @@ int start_slave(THD* thd , Master_info*
if (mi->rli->opt_slave_parallel_workers != 0 && slave_trans_retries != 0)
{
push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
- ER_NO_FEATURE_ON_PARALLEL_SLAVE,
- ER(ER_NO_FEATURE_ON_PARALLEL_SLAVE),
+ ER_MTS_FEATURE_IS_NOT_SUPPORTED,
+ ER(ER_MTS_FEATURE_IS_NOT_SUPPORTED),
"Temporary failed transaction retry",
"Such failure will force the slave to stop.");
}
=== modified file 'sql/share/errmsg-utf8.txt'
--- a/sql/share/errmsg-utf8.txt 2011-02-27 17:35:25 +0000
+++ b/sql/share/errmsg-utf8.txt 2011-06-08 20:18:08 +0000
@@ -6456,7 +6456,9 @@ ER_STMT_CACHE_FULL
ER_BINLOG_STMT_CACHE_SIZE_GREATER_THAN_MAX
eng "Option binlog_stmt_cache_size (%lu) is greater than max_binlog_stmt_cache_size (%lu); setting binlog_stmt_cache_size equal to max_binlog_stmt_cache_size."
-ER_NO_FEATURE_ON_PARALLEL_SLAVE
+ER_MTS_FEATURE_IS_NOT_SUPPORTED
eng "%s is not supported in Parallel Slave. %s"
-ER_UPDATED_DBS_GREATER_MAX
+ER_MTS_UPDATED_DBS_GREATER_MAX
eng "Modified database names number exceeds the maximum %d; the names are not written into the replication event."
+ER_MTS_CANT_PARALLEL
+ eng "Can't execute the current event group in parallel mode running into event %s, relay-log name %s, position %s."
=== modified file 'storage/blackhole/ha_blackhole.cc'
--- a/storage/blackhole/ha_blackhole.cc 2010-10-06 14:34:28 +0000
+++ b/storage/blackhole/ha_blackhole.cc 2011-06-08 20:18:08 +0000
@@ -23,7 +23,7 @@
#include "unireg.h"
#include "probes_mysql.h"
#include "ha_blackhole.h"
-#include "sql_class.h" // THD, SYSTEM_THREAD_SLAVE_SQL
+#include "sql_class.h" // THD, SYSTEM_THREAD_SLAVE_*
/* Static declarations for handlerton */
@@ -118,7 +118,8 @@ int ha_blackhole::update_row(const uchar
{
DBUG_ENTER("ha_blackhole::update_row");
THD *thd= ha_thd();
- if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+ if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+ thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}
@@ -127,7 +128,8 @@ int ha_blackhole::delete_row(const uchar
{
DBUG_ENTER("ha_blackhole::delete_row");
THD *thd= ha_thd();
- if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+ if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+ thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
DBUG_RETURN(0);
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
}
@@ -146,7 +148,8 @@ int ha_blackhole::rnd_next(uchar *buf)
MYSQL_READ_ROW_START(table_share->db.str, table_share->table_name.str,
TRUE);
THD *thd= ha_thd();
- if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+ if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+ thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
rc= 0;
else
rc= HA_ERR_END_OF_FILE;
@@ -236,7 +239,8 @@ int ha_blackhole::index_read_map(uchar *
DBUG_ENTER("ha_blackhole::index_read");
MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
THD *thd= ha_thd();
- if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+ if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+ thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
rc= 0;
else
rc= HA_ERR_END_OF_FILE;
@@ -253,7 +257,8 @@ int ha_blackhole::index_read_idx_map(uch
DBUG_ENTER("ha_blackhole::index_read_idx");
MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
THD *thd= ha_thd();
- if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+ if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+ thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
rc= 0;
else
rc= HA_ERR_END_OF_FILE;
@@ -269,7 +274,8 @@ int ha_blackhole::index_read_last_map(uc
DBUG_ENTER("ha_blackhole::index_read_last");
MYSQL_INDEX_READ_ROW_START(table_share->db.str, table_share->table_name.str);
THD *thd= ha_thd();
- if (thd->system_thread == SYSTEM_THREAD_SLAVE_SQL && thd->query() == NULL)
+ if ((thd->system_thread == SYSTEM_THREAD_SLAVE_SQL ||
+ thd->system_thread == SYSTEM_THREAD_SLAVE_WORKER) && thd->query() == NULL)
rc= 0;
else
rc= HA_ERR_END_OF_FILE;
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110608201808-h9279ue551ngalo9.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3283) WL#5569 | Andrei Elkin | 9 Jun |