#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3281 Andrei Elkin 2011-06-05
wl#5569 MTS
More cleanup, fixes due to found issues when running tests, some improvements
incl in stopping Workers to make routine to distinguish between killed and gracefully stopped
cases so in the end STOP SLAVE will guarantee consistent state (some todo remains still).
@ mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
decreasing execution time.
@ mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test
Marking the test as limited to Single-Thread-Slave.
@ mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
Marking the test as limited to Single-Thread-Slave.
@ mysql-test/suite/rpl/t/rpl_slave_skip.test
Marking the test as limited to Single-Thread-Slave.
@ sql/log_event.cc
addressing few reviewing comments;
asserting do_update_pos() can't run by Workers;
cleaning up and separating Slave_worker *Log_event::get_slave_worker_id()
and its caller's interest to rli-> last_assigned_worker;
Deploying MTS group status marking in Log_event::apply_event();
Making Worker's exec loop break to obey to a new Worker's running status too;
Deploying mts_checkpoint_routine() in Rotate_log_event::do_update_pos()
(sim action for FD event's handler);
Fixing relay-log update notification in Log_event::get_slave_worker_id();
@ sql/log_event.h
renaming and re-typing of func:s as suggested by reviewer;
leaving a todo item for the final cleanup;
correcting logics of mts_async_exec_by_coordinator();
@ sql/rpl_rli.cc
Initialization of a new MTS group status proverty: mts_group_status(MTS_NOT_IN_GROUP);
asserting Relay_log_info::stmt_done() can't be run by Workers;
deploying mts_checkpoint_routine() alike Rotate_log_event::do_update_pos() this time
in Relay_log_info::stmt_done() to cover FD-event case and consulting mts_group_status
in order to decide which branch to follow;
@ sql/rpl_rli.h
Augmenting Relay_log_info with mts_group_status to contain
MTS group status;
@ sql/rpl_rli_pdb.cc
Slave_worker::commit_positions() is fixed to carry update relay-log info
further to the following checkpoint routine action;
Slave_worker *get_slave_worker() was cleaned, interfaces improved,
few asserts corrected;
Slave_worker::slave_worker_ends_group() cleaned a bit, and now frees extra
memory of CGEP dynarray.
wait_for_workers_to_finish() is made to set the Coordinator's state as not
in MTS group after synchronization with all workers;
@ sql/rpl_rli_pdb.h
Slave_jobs_queue is augmented with running_status member.
@ sql/rpl_slave.cc
apply_event_and_update_pos(): corrects asserts, synch with *all* Workers
at the end of dynamically marked as End of group event (mts_is_event_isolated() -> TRUE);
exec_relay_log_event(): correts NULL event read out case;
slave_stop_workers(): simplifying logics of stopping Workers, to mark them with
w->running_status= Slave_worker::KILLED instead of killing workers' thd.
slave_stop_workers() finilizes reset of Coordinator's state with
rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP
to make sure a next restart will proceed with the reset value.
modified:
mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test
mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test
mysql-test/suite/rpl/t/rpl_deadlock_innodb.test
mysql-test/suite/rpl/t/rpl_slave_skip.test
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test 2011-05-06 18:33:32 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_benchmark_load.test 2011-06-05 17:01:51 +0000
@@ -6,7 +6,7 @@
# load volume parameter
#
-let $iter= 16;
+let $iter= 08;
let $tables= 4;
let $wk_i_queries= 4;
let $wk_m_queries= 0;
=== modified file 'mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test'
--- a/mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test 2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_begin_commit_rollback.test 2011-06-05 17:01:51 +0000
@@ -1,6 +1,8 @@
source include/master-slave.inc;
source include/have_innodb.inc;
source include/have_binlog_format_statement.inc;
+# UNTIL is not supported yet (TODO: support and remove the guard)
+-- source include/not_mts_slave_parallel_workers.inc
connection slave;
call mtr.add_suppression("Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT");
=== modified file 'mysql-test/suite/rpl/t/rpl_deadlock_innodb.test'
--- a/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test 2011-05-30 10:05:07 +0000
+++ b/mysql-test/suite/rpl/t/rpl_deadlock_innodb.test 2011-06-05 17:01:51 +0000
@@ -1,6 +1,7 @@
-- source include/not_ndb_default.inc
-- source include/have_innodb.inc
-let $engine_type=innodb;
--- source extra/rpl_tests/rpl_deadlock.test
# --slave-transaction-retries=0 in MTS
-- source include/not_mts_slave_parallel_workers.inc
+
+let $engine_type=innodb;
+-- source extra/rpl_tests/rpl_deadlock.test
=== modified file 'mysql-test/suite/rpl/t/rpl_slave_skip.test'
--- a/mysql-test/suite/rpl/t/rpl_slave_skip.test 2010-12-19 17:22:30 +0000
+++ b/mysql-test/suite/rpl/t/rpl_slave_skip.test 2011-06-05 17:01:51 +0000
@@ -4,6 +4,9 @@
# test for MIXED mode.
source include/have_binlog_format_mixed.inc;
+# UNTIL is not supported yet (TODO: support and remove the guard)
+-- source include/not_mts_slave_parallel_workers.inc
+
source include/master-slave.inc;
source include/have_innodb.inc;
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-05-30 10:05:07 +0000
+++ b/sql/log_event.cc 2011-06-05 17:01:51 +0000
@@ -783,6 +783,9 @@ int Log_event::do_update_pos(Relay_log_i
Matz: I don't think we will need this check with this refactoring.
*/
+
+ DBUG_ASSERT(!mts_is_worker(rli->info_thd));
+
if (rli)
rli->stmt_done(log_pos);
return 0; // Cannot fail currently
@@ -2357,10 +2360,6 @@ Log_event::continue_group(Relay_log_info
bool Log_event::contains_partition_info()
{
return get_type_code() == TABLE_MAP_EVENT ||
-
- // todo: Query event is limitly supported
- // which ev->get_db() yields the session db not the actual db
-
(get_type_code() == QUERY_EVENT && !ends_group() && !starts_group()) ||
(get_type_code() == EXECUTE_LOAD_QUERY_EVENT);
}
@@ -2473,13 +2472,13 @@ Slave_worker *Log_event::get_slave_worke
List_iterator<char> it(*mts_get_dbs(rli->info_thd->mem_root));
it++;
+ ret_worker= rli->last_assigned_worker;
if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
{
- // provide a hint - Worker with id 0 - to the following assign
- if (!rli->last_assigned_worker)
- rli->last_assigned_worker=
- *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
- (void) wait_for_workers_to_finish(rli, rli->last_assigned_worker);
+ // Worker with id 0 to handle serial execution
+ if (!ret_worker)
+ ret_worker= *(Slave_worker**) dynamic_array_ptr(&rli->workers, 0);
+ (void) wait_for_workers_to_finish(rli, ret_worker);
}
do
@@ -2489,7 +2488,8 @@ Slave_worker *Log_event::get_slave_worke
if (!(ret_worker=
get_slave_worker(*ref_cur_db, rli,
&mts_assigned_partitions[i],
- get_type_code() == QUERY_EVENT)))
+ get_type_code() == QUERY_EVENT,
+ ret_worker)))
{
// destroy buffered events of the current group prior to exit
for (uint k= 0; k < rli->curr_group_da.elements; k++)
@@ -2571,14 +2571,16 @@ Slave_worker *Log_event::get_slave_worke
}
}
- // the group terminal event (Commit, Xid or a DDL query)
+ // the group terminal event:
+ // Commit, Xid, a DDL query or dml query of B-less group.
if (ends_group() || !rli->curr_group_seen_begin)
{
// index of GAQ that this terminal event belongs to
mts_group_cnt= rli->gaq->assigned_group_index;
+ // special marking for T event of {p,g} B-less group
if (num_dbs == OVER_MAX_DBS_IN_EVENT_MTS)
- set_mts_event_ends_group();
+ mts_do_isolate_event();
ptr_g= (Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
@@ -2592,9 +2594,9 @@ Slave_worker *Log_event::get_slave_worke
{
/*
Prior this event, C rotated the relay log to drop each
- Worker's notified flag.
- Now group terminating event initiates the new name
- delivery through the current group relaylog slot in GAQ.
+ Worker's notified flag. Now group terminating event initiates
+ the new relay-log (where the current event is from) name
+ delivery to Worker that will receive it in commit_positions().
*/
DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
@@ -2602,7 +2604,7 @@ Slave_worker *Log_event::get_slave_worke
my_malloc(strlen(rli->
get_group_relay_log_name()) + 1, MYF(MY_WME));
strcpy(ptr_g->group_relay_log_name,
- rli->get_group_relay_log_name());
+ rli->get_event_relay_log_name());
DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
@@ -2846,7 +2848,9 @@ int Log_event::apply_event(Relay_log_inf
}
if (!(parallel= rli->is_parallel_exec()) ||
- (async_event= mts_async_exec_by_coordinator(::server_id)) ||
+ (async_event=
+ mts_async_exec_by_coordinator(::server_id,
+ rli->mts_group_status == Relay_log_info::MTS_IN_GROUP)) ||
(seq_event= mts_sequential_exec()))
{
if (parallel)
@@ -2866,7 +2870,6 @@ int Log_event::apply_event(Relay_log_inf
a separator beetwen two master's binlog therefore requiring
Workers to sync.
*/
-
if (rli->curr_group_da.elements > 0)
{
/*
@@ -2892,6 +2895,11 @@ int Log_event::apply_event(Relay_log_inf
Marking sure the event will be executed in sequential mode.
*/
(void) wait_for_workers_to_finish(rli);
+ /*
+ Given not in-group mark the event handler can invoke checkpoint
+ update routine in the following course.
+ */
+ DBUG_ASSERT(rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP);
#ifndef DBUG_OFF
/* all Workers are idle as done through wait_for_workers_to_finish */
@@ -2912,6 +2920,8 @@ int Log_event::apply_event(Relay_log_inf
rli->last_assigned_worker);
worker= NULL;
+ c_rli->mts_group_status= Relay_log_info::MTS_IN_GROUP;
+
c_rli->last_assigned_worker= w= get_slave_worker_id(c_rli);
if (!w || DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
@@ -2935,7 +2945,8 @@ struct slave_job_item* pop_jobs_item(Sla
mysql_mutex_lock(&w->jobs_lock);
- while (!job_item->data && !thd->killed)
+ while (!job_item->data && !thd->killed &&
+ w->running_status == Slave_worker::RUNNING)
{
const char *old_msg;
@@ -2981,7 +2992,7 @@ int slave_worker_exec_job(Slave_worker *
DBUG_ENTER("slave_worker_exec_job");
job_item= pop_jobs_item(w, job_item);
- if (thd->killed)
+ if (thd->killed || w->running_status != Slave_worker::RUNNING)
{
// de-queueing and decrement counters is in the caller's exit branch
error= -1;
@@ -3020,11 +3031,16 @@ int slave_worker_exec_job(Slave_worker *
for (uint i= 0; i < ep->elements && !found; i++)
{
found=
- (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
+ *((db_worker_hash_entry **) dynamic_array_ptr(ep, i)) ==
ev->mts_assigned_partitions[k];
}
if (!found)
{
+ /*
+ notice, can't assert
+ DBUG_ASSERT(ev->mts_assigned_partitions[k]->worker == w);
+ since entry could be marked as wanted by other worker.
+ */
insert_dynamic(ep, (uchar*) &ev->mts_assigned_partitions[k]);
}
}
@@ -3104,7 +3120,13 @@ int slave_worker_exec_job(Slave_worker *
err:
if (error)
+ {
+ sql_print_information("Worker %lu is exiting: killed %i, error %i, "
+ "running_status %d",
+ w->id, thd->killed, thd->is_error(),
+ w->running_status);
w->slave_worker_ends_group(ev, error);
+ }
// rows_query_log_event is deleted as a part of the statement cleanup
@@ -4192,16 +4214,12 @@ void Query_log_event::print(FILE* file,
}
#endif /* MYSQL_CLIENT */
-
-/*
- Query_log_event::do_apply_event()
-*/
-
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
/**
Associating slave Worker thread to a subset of temporary tables
belonging to db-partitions the event accesses.
+ The pointer if all entries is cleaned.
@param thd THD instance pointer
*/
@@ -4282,6 +4300,9 @@ void Query_log_event::detach_temp_tables
#endif
}
+/*
+ Query_log_event::do_apply_event()
+*/
int Query_log_event::do_apply_event(Relay_log_info const *rli)
{
return do_apply_event(rli, query, q_len);
@@ -6484,13 +6505,20 @@ int Rotate_log_event::do_update_pos(Rela
*/
if ((server_id != ::server_id || rli->replicate_same_server_id) &&
!is_relay_log_event() &&
- !rli->is_in_group())
+ ((!rli->is_parallel_exec() && !rli->is_in_group()) ||
+ rli->mts_group_status == Relay_log_info::MTS_NOT_IN_GROUP))
{
mysql_mutex_lock(&rli->data_lock);
DBUG_PRINT("info", ("old group_master_log_name: '%s' "
"old group_master_log_pos: %lu",
rli->get_group_master_log_name(),
(ulong) rli->get_group_master_log_pos()));
+
+ if (rli->is_parallel_exec())
+ {
+ (void) mts_checkpoint_routine(rli, 0, FALSE, TRUE); // todo: error branch
+ }
+
memcpy((void *)rli->get_group_master_log_name(),
new_log_ident, ident_len + 1);
rli->notify_group_master_log_name_update();
@@ -6501,8 +6529,10 @@ int Rotate_log_event::do_update_pos(Rela
rli->get_group_master_log_name(),
(ulong) rli->get_group_master_log_pos()));
mysql_mutex_unlock(&rli->data_lock);
- rli->flush_info(TRUE);
-
+ rli->flush_info(TRUE); // todo: error branch
+ if (rli->is_parallel_exec())
+ rli->reset_notified_checkpoint();
+
/*
Reset thd->variables.option_bits and sql_mode etc, because this could be the signal of
a master's downgrade from 5.0 to 4.0.
@@ -9247,6 +9277,9 @@ Rows_log_event::do_update_pos(Relay_log_
DBUG_PRINT("info", ("flags: %s",
get_flags(STMT_END_F) ? "STMT_END_F " : ""));
+ /* Worker does not execute binlog update position logics */
+ DBUG_ASSERT(!mts_is_worker(rli->info_thd));
+
if (get_flags(STMT_END_F))
{
/*
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-05-30 10:05:07 +0000
+++ b/sql/log_event.h 2011-06-05 17:01:51 +0000
@@ -1117,8 +1117,18 @@ public:
*/
virtual uint8 mts_number_dbs() { return 1; }
- virtual void set_mts_event_ends_group() { DBUG_ASSERT(0); }
- virtual bool get_mts_event_ends_group() { DBUG_ASSERT(0); }
+ /*
+ Event can be exceptionally marked to force its execution.
+ in isolation from any other Workers.
+ Other than Query-log-event class should not have any implementation
+ of this method.
+ */
+ virtual void mts_do_isolate_event() { DBUG_ASSERT(0); }
+
+ /*
+ Verifying whether event is marked to execute in isolation.
+ */
+ virtual bool mts_is_event_isolated() { return FALSE; }
#else
Log_event() : temp_buf(0) {}
@@ -1273,15 +1283,20 @@ public:
/**
MST: some events have to be applied by Coordinator concurrently with Workers.
+ *TODO*: combine with mts_sequential_exec() to have ternary outcome.
+
@return TRUE if that's the case,
FALSE otherwise.
*/
- bool mts_async_exec_by_coordinator(ulong slave_server_id)
+ bool mts_async_exec_by_coordinator(ulong slave_server_id, bool mts_in_group)
{
return
- (get_type_code() == FORMAT_DESCRIPTION_EVENT ||
- get_type_code() == ROTATE_EVENT) &&
- ((server_id == (uint32) ::server_id) || (log_pos == 0));
+ (get_type_code() == FORMAT_DESCRIPTION_EVENT &&
+ ((server_id == (uint32) ::server_id) || (log_pos == 0)))
+ ||
+ (get_type_code() == ROTATE_EVENT &&
+ ((server_id == (uint32) ::server_id) ||
+ (log_pos == 0 && mts_in_group)));
}
/**
@@ -1897,8 +1912,8 @@ public:
Event can be indentified as a group terminator and such fact
is memoried by the function.
*/
- virtual void set_mts_event_ends_group() { m_mts_query_ends_group= TRUE; }
- virtual bool get_mts_event_ends_group() { return m_mts_query_ends_group; }
+ virtual void mts_do_isolate_event() { m_mts_query_ends_group= TRUE; }
+ virtual bool mts_is_event_isolated() { return m_mts_query_ends_group; }
#ifdef MYSQL_SERVER
@@ -3085,7 +3100,7 @@ public:
const char* get_db() { return db; }
#endif
/* MTS executes this event sequentially */
- virtual uchar mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
+ virtual uint8 mts_number_dbs() { return OVER_MAX_DBS_IN_EVENT_MTS; }
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli.cc 2011-06-05 17:01:51 +0000
@@ -73,7 +73,7 @@ Relay_log_info::Relay_log_info(bool is_s
this_worker(NULL), slave_parallel_workers(0),
recovery_parallel_workers(0),
checkpoint_group(mts_checkpoint_group), mts_recovery_group_cnt(0),
- mts_recovery_index(0),
+ mts_recovery_index(0), mts_group_status(MTS_NOT_IN_GROUP),
sql_delay(0), sql_delay_end(0), m_flags(0)
{
DBUG_ENTER("Relay_log_info::Relay_log_info");
@@ -189,8 +189,8 @@ void Relay_log_info::reset_notified_rela
whose coordinates is passed to it through GAQ index.
Worker notices the new checkpoint value at the group commit
- to reset the current bitmap and set ON a bit number put by C into GAQ index
- as the first group committed after the new checkpoint.
+ to reset the current bitmap and starts using the clean bitmap
+ indexed from zero of being reset checkpoint_seqno.
*/
void Relay_log_info::reset_notified_checkpoint()
{
@@ -991,7 +991,10 @@ bool Relay_log_info::cached_charset_comp
void Relay_log_info::stmt_done(my_off_t event_master_log_pos)
{
clear_flag(IN_STMT);
+
DBUG_ASSERT(!belongs_to_client());
+ /* Worker does not execute binlog update position logics */
+ DBUG_ASSERT(!mts_is_worker(info_thd));
/*
If in a transaction, and if the slave supports transactions, just
@@ -1016,20 +1019,22 @@ void Relay_log_info::stmt_done(my_off_t
middle of the "transaction". START SLAVE will resume at BEGIN
while the MyISAM table has already been updated.
*/
- if ((info_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
+ if ((!is_parallel_exec() && is_in_group()) ||
+ mts_group_status == MTS_IN_GROUP)
+ {
inc_event_relay_log_pos();
+ }
else
{
+ if (is_parallel_exec())
+ {
+ (void) mts_checkpoint_routine(this, 0, FALSE, FALSE); // Alfranio todo: error branch
+ }
inc_group_relay_log_pos(event_master_log_pos);
DBUG_ASSERT(this_worker == NULL);
-
- flush_info(is_transactional() ? TRUE : FALSE);
- /*
- The central recovery commit run in sequential mode forces
- notification on the defacto new checkpoint.
- */
+ flush_info(is_transactional() ? TRUE : FALSE); // Alfranio todo: error branch
if (is_parallel_exec())
reset_notified_checkpoint();
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli.h 2011-06-05 17:01:51 +0000
@@ -464,8 +464,8 @@ public:
*/
DYNAMIC_ARRAY least_occupied_workers;
uint checkpoint_seqno; // counter of groups executed after the most recent CP
- uint checkpoint_group; // counter of groups after which a checkpoint is called.
- MY_BITMAP recovery_groups; // bitmap used during recovery.
+ uint checkpoint_group; // number of groups in one checkpoint interval (period).
+ MY_BITMAP recovery_groups; // bitmap used during recovery
ulong mts_recovery_group_cnt; // number of groups to execute at recovery
ulong mts_recovery_index; // running index of recoverable groups
/*
@@ -479,6 +479,18 @@ public:
While Worker utilize its thd->mem_root, Coordinator adopts a specific mem-root:
*/
MEM_ROOT mts_coor_mem_root;
+ /*
+ While distibuting events basing on their properties MTS Coordinator
+ changes its mts group status.
+ Transition from NOT_IN to IN happens once an event is scheduled to a Worker.
+ Reverse transition occures when Coordinator requests synchronization with
+ Workers demanding them to complete their assignments.
+ */
+ enum
+ {
+ MTS_NOT_IN_GROUP, /* not in group includes Single-Threaded-Slave */
+ MTS_IN_GROUP /* an event was scheduled to a Worker */
+ } mts_group_status;
/* most of allocation in the coordinator rli is there */
void init_workers(ulong);
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-06-05 17:01:51 +0000
@@ -31,7 +31,7 @@ Slave_worker::Slave_worker(const char* t
Relay_log_info *rli)
: Relay_log_info(FALSE), c_rli(rli),
checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
- inited_group_execed(0), running_status(FALSE), last_event(NULL)
+ inited_group_execed(0), running_status(NOT_RUNNING), last_event(NULL)
{
checkpoint_relay_log_name[0]= 0;
checkpoint_master_log_name[0]= 0;
@@ -227,6 +227,12 @@ bool Slave_worker::commit_positions(Log_
bitmap_clear_all(&group_execed);
}
+ // extract an updated relay-log name to store in Worker's rli.
+ if (ptr_g->group_relay_log_name)
+ {
+ strmake(group_relay_log_name, ptr_g->group_relay_log_name,
+ sizeof(group_relay_log_name) - 1);
+ }
bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
checkpoint_seqno= ptr_g->checkpoint_seqno;
@@ -272,7 +278,8 @@ static void free_entry(db_worker_hash_en
DBUG_PRINT("info", ("free_entry %s, %d", entry->db, (int) strlen(entry->db)));
DBUG_ASSERT(c_thd->system_thread == SYSTEM_THREAD_SLAVE_SQL);
- DBUG_ASSERT(entry->usage == 0 || !entry->worker->running_status);
+ DBUG_ASSERT(entry->usage == 0 ||
+ entry->worker->running_status != Slave_worker::RUNNING);
mts_move_temp_tables_to_thd(c_thd, entry->temporary_tables);
entry->temporary_tables= NULL;
@@ -452,23 +459,24 @@ static void move_temp_tables_to_entry(TH
c. updates the APH record to point to the first Worker (naturally, U := 1),
scheduled the event, and goes back into the parallel mode
- @param dbname pointer to c-string containing database name
- It can be empty string to indicate specific locking
- to faciliate sequential applying.
- @param rli pointer to Coordinators relay-log-info instance
- @param ptr_entry reference to a pointer to the resulted entry in
- the Assigne Partition Hash where
- the entry's pointer is stored at return.
+ @param dbname pointer to c-string containing database name
+ It can be empty string to indicate specific locking
+ to faciliate sequential applying.
+ @param rli pointer to Coordinators relay-log-info instance
+ @param ptr_entry reference to a pointer to the resulted entry in
+ the Assigne Partition Hash where
+ the entry's pointer is stored at return.
+ @param last_worker caller opts for this Worker, it must be
+ rli->last_assigned_worker if one is determined.
- @note modifies CGAP, APH and unlinks @c dbname -keyd temp tables
+ @note modifies CGAP, APH and unlinks @c dbname -keyd temporary tables
from C's thd->temporary_tables to move them into the entry record.
- Caller can opt for a Worker via setting rli->last_assigned_worker.
@return the pointer to a Worker struct
*/
Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
db_worker_hash_entry **ptr_entry,
- bool need_temp_tables)
+ bool need_temp_tables, Slave_worker *last_worker)
{
uint i;
DYNAMIC_ARRAY *workers= &rli->workers;
@@ -476,6 +484,9 @@ Slave_worker *get_slave_worker(const cha
DBUG_ENTER("get_slave_worker");
+ DBUG_ASSERT(!rli->last_assigned_worker ||
+ rli->last_assigned_worker == last_worker);
+
if (!inited_hash_workers)
DBUG_RETURN(NULL);
@@ -495,7 +506,7 @@ Slave_worker *get_slave_worker(const cha
if (strncmp(entry->db, const_cast<char*>(dbname), dblength) == 0)
{
*ptr_entry= entry;
- DBUG_RETURN(rli->last_assigned_worker);
+ DBUG_RETURN(last_worker);
}
}
@@ -543,8 +554,8 @@ Slave_worker *get_slave_worker(const cha
Unless \exists the last assigned Worker, get a free worker based
on a policy described in the function get_least_occupied_worker().
*/
- entry->worker= !rli->last_assigned_worker ?
- get_least_occupied_worker(workers) : rli->last_assigned_worker;
+ entry->worker= (!last_worker) ?
+ get_least_occupied_worker(workers) : last_worker;
entry->worker->usage_partition++;
mysql_mutex_lock(&slave_worker_hash_lock);
@@ -584,13 +595,12 @@ Slave_worker *get_slave_worker(const cha
/* There is a record. Either */
if (entry->usage == 0)
{
- entry->worker= !rli->last_assigned_worker ?
- get_least_occupied_worker(workers) : rli->last_assigned_worker;
+ entry->worker= (!last_worker) ?
+ get_least_occupied_worker(workers) : last_worker;
entry->worker->usage_partition++;
entry->usage++;
}
- else if (entry->worker == rli->last_assigned_worker ||
- !rli->last_assigned_worker)
+ else if (entry->worker == last_worker || !last_worker)
{
DBUG_ASSERT(entry->worker);
@@ -609,11 +619,11 @@ Slave_worker *get_slave_worker(const cha
char wait_info[sizeof(info_format) + 4*sizeof(entry->worker->id) +
NAME_LEN + 1];
- DBUG_ASSERT(rli->last_assigned_worker != NULL &&
+ DBUG_ASSERT(last_worker != NULL &&
rli->curr_group_assigned_parts.elements > 0);
// future assignenment and marking at the same time
- entry->worker= rli->last_assigned_worker;
+ entry->worker= last_worker;
sprintf(wait_info, info_format, entry->worker->id, entry->db);
@@ -623,7 +633,7 @@ Slave_worker *get_slave_worker(const cha
thd->exit_cond(proc_info);
mysql_mutex_lock(&slave_worker_hash_lock);
- DBUG_ASSERT(entry->usage == 0);
+ DBUG_ASSERT(entry->usage == 0 || thd->killed);
entry->usage= 1;
entry->worker->usage_partition++;
@@ -631,7 +641,7 @@ Slave_worker *get_slave_worker(const cha
}
/*
- relocation belonging to db temp tables from C to W via entry
+ relocation belonging to db temporary tables from C to W via entry
*/
if (entry->usage == 1 && need_temp_tables)
{
@@ -651,7 +661,6 @@ Slave_worker *get_slave_worker(const cha
else
{
// all entries must have been emptied from temps by the caller
- DBUG_ASSERT(entry->db_len != 0);
for (TABLE *table= thd->temporary_tables; table; table= table->next)
{
@@ -683,7 +692,7 @@ err:
*/
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
{
- ulong usage= ULONG_MAX;
+ long usage= LONG_MAX;
Slave_worker **ptr_current_worker= NULL, *worker= NULL;
ulong i= 0;
@@ -757,29 +766,32 @@ void Slave_worker::slave_worker_ends_gro
*/
DYNAMIC_ARRAY *ep= &curr_group_exec_parts;
- for (int i= ep->elements; i > 0; i--)
+ for (uint i= 0; i < ep->elements; i++)
{
db_worker_hash_entry *entry=
- (* (db_worker_hash_entry **) dynamic_array_ptr(ep, i - 1));
+ *((db_worker_hash_entry **) dynamic_array_ptr(ep, i));
+
mysql_mutex_lock(&slave_worker_hash_lock);
- DBUG_ASSERT(entry && entry->usage != 0);
+ DBUG_ASSERT(entry);
+
+ entry->usage --;
- entry->usage--;
+ DBUG_ASSERT(entry->usage >= 0);
if (entry->usage == 0)
{
+ usage_partition--;
/*
The detached entry's temp table list, possibly updated, remains
with the entry at least until time Coordinator will deallocate it
from the hash, that is either due to stop or extra size of the hash.
*/
-
+ DBUG_ASSERT(usage_partition >= 0);
DBUG_ASSERT(this->info_thd->temporary_tables == 0);
DBUG_ASSERT(!entry->temporary_tables ||
!entry->temporary_tables->prev);
- usage_partition--;
if (entry->worker != this) // Coordinator is waiting
{
#ifndef DBUG_OFF
@@ -792,9 +804,17 @@ void Slave_worker::slave_worker_ends_gro
DBUG_ASSERT(usage_partition != 0);
mysql_mutex_unlock(&slave_worker_hash_lock);
+ }
- delete_dynamic_element(ep, i - 1);
+ if (ep->elements > ep->max_element)
+ {
+ // reallocate to lessen mem
+ ep->elements= ep->max_element;
+ ep->max_element= 0;
+ freeze_size(ep); // restores max_element
}
+ ep->elements= 0;
+
curr_group_seen_begin= FALSE;
}
@@ -1052,6 +1072,7 @@ void Slave_worker::report(loglevel level
for the assigned to Workers tasks to be completed and their
resources such as temporary tables be returned to Coordinator's
repository.
+ In case all workers are waited Coordinator changes its group status.
@param rli Relay_log_info instance of Coordinator
@param ignore Optional Worker instance pointer if the sequential context
@@ -1114,5 +1135,9 @@ int wait_for_workers_to_finish(Relay_log
mts_move_temp_tables_to_thd(thd, entry->temporary_tables);
entry->temporary_tables= NULL;
}
+
+ if (!ignore)
+ const_cast<Relay_log_info*>(rli)->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
+
return ret;
}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2011-05-30 10:05:07 +0000
+++ b/sql/rpl_rli_pdb.h 2011-06-05 17:01:51 +0000
@@ -13,7 +13,7 @@ typedef struct st_db_worker_hash_entry
uint db_len;
const char *db;
Slave_worker *worker;
- ulong usage;
+ long usage;
/*
The list of temp tables belonging to @ db database is
attached to an assigned @c worker to become its thd->temporary_tables.
@@ -34,7 +34,7 @@ bool init_hash_workers(ulong slave_paral
void destroy_hash_workers(Relay_log_info*);
Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli,
db_worker_hash_entry **ptr_entry,
- bool need_temp_tables);
+ bool need_temp_tables, Slave_worker *w);
Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
int wait_for_workers_to_finish(Relay_log_info const *rli,
Slave_worker *ignore= NULL);
@@ -262,7 +262,7 @@ public:
ulong stmt_jobs; // how many jobs per stmt
ulong trans_jobs; // how many jobs per trns
volatile int curr_jobs; // the current assignments
- ulong usage_partition; // number of different partitions handled by this worker
+ long usage_partition; // number of different partitions handled by this worker
volatile bool relay_log_change_notified; // Coord sets and resets, W can read
volatile bool checkpoint_notified; // Coord sets and resets, W can read
bool wq_overrun_set; // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
@@ -279,7 +279,13 @@ public:
ulong checkpoint_seqno;
MY_BITMAP group_execed;
bool inited_group_execed;
- volatile bool running_status; // TRUE when Worker is read-exec loop
+ enum en_running_state
+ {
+ NOT_RUNNING= 0,
+ RUNNING= 1,
+ KILLED
+ };
+ en_running_state volatile running_status;
Log_event *last_event;
int init_info();
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-05-30 10:05:07 +0000
+++ b/sql/rpl_slave.cc 2011-06-05 17:01:51 +0000
@@ -1079,7 +1079,7 @@ static bool sql_slave_killed(THD* thd, R
DBUG_ENTER("sql_slave_killed");
DBUG_ASSERT(rli->info_thd == thd);
- DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
+ DBUG_ASSERT(rli->slave_running == 1);
if (abort_loop || thd->killed || rli->abort_slave)
{
/*
@@ -2837,12 +2837,11 @@ int apply_event_and_update_pos(Log_event
{
Slave_job_item item= {ev}, *job_item= &item;
Slave_worker *w= (Slave_worker *) ev->worker;
- bool need_sync=
- (ev->mts_number_dbs() == OVER_MAX_DBS_IN_EVENT_MTS) &&
- ev->get_mts_event_ends_group();
+ // specially marked end of B-less group event requires sync with worker
+ bool need_sync= ev->mts_is_event_isolated();
- DBUG_ASSERT(!(ev->ends_group() || !rli->curr_group_seen_begin) ||
- ((Slave_worker*) ev->worker) == rli->last_assigned_worker);
+ // all events except BEGIN-query must be marked with a non-NULL Worker
+ DBUG_ASSERT(((Slave_worker*) ev->worker) == rli->last_assigned_worker);
DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
@@ -2873,9 +2872,6 @@ int apply_event_and_update_pos(Log_event
if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
{
// reallocate to less mem
-
- DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
-
rli->curr_group_da.elements= rli->curr_group_da.max_element;
rli->curr_group_da.max_element= 0;
freeze_size(&rli->curr_group_da); // restores max_element
@@ -2891,9 +2887,12 @@ int apply_event_and_update_pos(Log_event
{
/*
combination of over-max db:s and end of the current group
- forces to wait for the group completion by the assigned worker.
+ forces to wait for the assigned groups completion by assigned
+ to the event worker.
+ Indeed MTS group status could be safely set to MTS_NOT_IN_GROUP
+ after wait_() returns.
*/
- (void) wait_for_workers_to_finish(rli, w);
+ (void) wait_for_workers_to_finish(rli);
}
}
@@ -2961,7 +2960,7 @@ int apply_event_and_update_pos(Log_event
}
else
{
- DBUG_ASSERT(rli->is_parallel_exec());
+ DBUG_ASSERT(*ptr_ev == ev || rli->is_parallel_exec());
/*
event_relay_log_pos is an anchor to possible reading restart.
It may become lt than group_* value.
@@ -3036,7 +3035,7 @@ static int exec_relay_log_event(THD* thd
*/
mysql_mutex_lock(&rli->data_lock);
- Log_event *ev = next_event(rli), **ptr_ev= &ev;
+ Log_event *ev = next_event(rli), **ptr_ev;
DBUG_ASSERT(rli->info_thd==thd);
@@ -3050,6 +3049,7 @@ static int exec_relay_log_event(THD* thd
{
int exec_res;
+ ptr_ev= &ev;
/*
Even if we don't execute this event, we keep the master timestamp,
so that seconds behind master shows correct delta (there are events
@@ -3787,14 +3787,14 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_unlock(&LOCK_thread_count);
mysql_mutex_lock(&w->jobs_lock);
- w->running_status= TRUE; // ready for duty
+ w->running_status= Slave_worker::RUNNING;
mysql_cond_signal(&w->jobs_cond);
mysql_mutex_unlock(&w->jobs_lock);
DBUG_ASSERT(thd->is_slave_error == 0);
- while (!thd->killed && !error)
+ while (!error)
{
error= slave_worker_exec_job(w, rli);
}
@@ -3805,7 +3805,6 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn
mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
- thd->clear_error();
}
mysql_mutex_lock(&w->jobs_lock);
@@ -3831,7 +3830,7 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&w->jobs_lock);
- w->running_status= 0;
+ w->running_status= Slave_worker::NOT_RUNNING;
sql_print_information("Worker %lu statistics: "
"events processed = %lu "
"hungry waits = %lu "
@@ -3845,6 +3844,7 @@ err:
if (thd)
{
+ thd->clear_error();
mysql_mutex_lock(&LOCK_thread_count);
THD_CHECK_SENTRY(thd);
/*
@@ -4039,6 +4039,11 @@ err:
Processing rli->gaq to find out the low-water-mark coordinates
stored into the cental recovery table.
+ @param rli pointer to Relay-log-info of Coordinator
+ @param period period of processing GAQ, normally derived from
+ @c mts_checkpoint_period
+ @param force if TRUE then hang in a loop till some some progress
+ @param locked TRUE if rli->data_lock mutex is aquired by the caller.
@return FALSE success, TRUE otherwise
*/
@@ -4199,7 +4204,7 @@ int slave_start_single_worker(Relay_log_
}
mysql_mutex_lock(&w->jobs_lock);
- if (!w->running_status)
+ if (w->running_status == Slave_worker::NOT_RUNNING)
mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
mysql_mutex_unlock(&w->jobs_lock);
// Least occupied inited with zero
@@ -4256,6 +4261,7 @@ int slave_start_workers(Relay_log_info *
rli->mts_total_groups= 0;
rli->curr_group_seen_begin= FALSE;
rli->checkpoint_seqno= 0;
+ rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
/*
dyn memory to consume by Coordinator per event
*/
@@ -4299,6 +4305,12 @@ void slave_stop_workers(Relay_log_info *
if (rli->slave_parallel_workers == 0)
return;
+ /*
+ this is the soft stop. In order for waiting be successful Coordinator
+ needs (*TODO*) to guarantee Workers were assigned with full groups.
+ */
+ // (void) wait_for_workers_to_finish(rli);
+
for (i= rli->workers.elements - 1; i >= 0; i--)
{
Slave_worker *w;
@@ -4306,19 +4318,21 @@ void slave_stop_workers(Relay_log_info *
mysql_mutex_lock(&w->jobs_lock);
- if (!w->running_status)
+ if (w->running_status != Slave_worker::RUNNING)
{
mysql_mutex_unlock(&w->jobs_lock);
continue;
}
+
+ w->running_status= Slave_worker::KILLED;
+ mysql_cond_signal(&w->jobs_cond);
+
mysql_mutex_unlock(&w->jobs_lock);
- sql_print_information("Notifying Worker %lu to exit", w->id);
-
- mysql_mutex_lock(&w->info_thd->LOCK_thd_data);
- w->info_thd->awake(THD::KILL_QUERY);
- mysql_mutex_unlock(&w->info_thd->LOCK_thd_data);
+
+ sql_print_information("Notifying Worker %lu to exit, thd %p", w->id,
+ w->info_thd);
}
-
+
thd_proc_info(thd, "Waiting for workers to exit");
for (i= rli->workers.elements - 1; i >= 0; i--)
@@ -4327,9 +4341,12 @@ void slave_stop_workers(Relay_log_info *
get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
mysql_mutex_lock(&w->jobs_lock);
- while (w->running_status)
+ while (w->running_status != Slave_worker::NOT_RUNNING)
{
const char *save_proc_info;
+
+ DBUG_ASSERT(w->running_status == Slave_worker::KILLED);
+
save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
"Waiting for workers to exit");
mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
@@ -4358,6 +4375,7 @@ void slave_stop_workers(Relay_log_info *
DBUG_ASSERT(rli->pending_jobs == 0);
DBUG_ASSERT(rli->mts_pending_jobs_size == 0);
+ rli->mts_group_status= Relay_log_info::MTS_NOT_IN_GROUP;
destroy_hash_workers(rli);
delete rli->gaq;
delete_dynamic(&rli->least_occupied_workers); // least occupied
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110605170151-eo9khhhrzf9op05z.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3281) WL#5569 | Andrei Elkin | 6 Jun |