#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3277 Andrei Elkin 2011-05-16
wl#5569 MTS
Simplifying Coordinator-Worker interfaces. In essence after this patch Worker execute
events in its private context (class Slave_worker :public Relay_log_info).
The only exception is Query referring to temporary table. The temp:s are maintained
in the Coordinator's "central" rli;
removing some dead code;
performing a lot of cleanup.
There are few todo items incl:
1. To implement several todo:s scattered across MTS' code and tests (e.g to restore
protected for few members of RLI of rpl_rli.h);
2. to cover Rows_query_log_event that currently can cause hanging (e.g rpl_parallel_fallback)
3. To sort out names of classes based on Rpl_info, possibly remove Rpl_info_worker
@ mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
The test as most of rpl_parallel* bunch can't yet stand `include/rpl_end.inc'.
@ sql/log_event.cc
Defining the default Log_event::do_apply_event_worker() that simply executes
canonical do_apply_event() however supplying Slave_worker intance reference
that is critical in order to execute different rli->methods(), e.g `report'.
Xid_log_event::do_apply_event_worker() runs the Worker version of Xid commit;
simplifying Rows event parallel applying to remove or elaborate some host of the early prototype code incl. rli->get_tables_to_lock() and related logics;
@ sql/log_event.h
Adding virtual int do_apply_event_worker() to Log_event and specializing
it for Xid class;
@ sql/rpl_reporting.cc
Spliting report() into two methods in order to make possible
to call the functional part of the two with va_list as an arg
be called from Slave_worker class.
@ sql/rpl_reporting.h
New va_list version of report method is declared.
@ sql/rpl_rli.cc
removing early prototype time support to Rows-event parallel execution.
The new scheme of applying is almost equivalent to the standard sequential algorith
thanks to Slave_worker :public Relay_log_info inheritence implementation.
@ sql/rpl_rli.h
Removing unnecessary interfaces;
TODO: restore `protected' for few members.
@ sql/rpl_rli_pdb.cc
Some cleanup and
defining Slave_worker::report() to eventially call the Coordinator's rli->report() and exploit
a fact that the latter was designed for concurrent use.
@ sql/rpl_rli_pdb.h
Changing base class for Slave_worker to make it behaving
as Relay_log_info when needed;
Removing some dead code;
Adding report() methods to run it in do_apply_event().
@ sql/rpl_slave.cc
Removed UNTIL todo as it's actually not supported with a warning;
Removed a todo for cleanup of error-out statement format transaction
because w->cleanup_context() impelements it indeed;
Cleanup or transition from w->w_rli (of Relay_log_info) to w (of Slave_worker);
Adding forgotten unlock_mutex;
Simplifying definitions of few func:s (mts_is_worker() etc);
modified:
mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
sql/log_event.cc
sql/log_event.h
sql/rpl_reporting.cc
sql/rpl_reporting.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test 2011-05-16 19:43:58 +0000
@@ -273,5 +273,5 @@ set @@global.mts_slave_parallel_workers=
set @@global.slave_transaction_retries= @save.slave_transaction_retries;
--echo end of the tests
---source include/rpl_end.inc
-
+### TODO: restore --source include/rpl_end.inc
+--echo include/rpl_end.inc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-04-06 12:51:58 +0000
+++ b/sql/log_event.cc 2011-05-16 19:43:58 +0000
@@ -795,6 +795,10 @@ Log_event::Log_event(const char* buf,
#ifndef MYSQL_CLIENT
#ifdef HAVE_REPLICATION
+inline int Log_event::do_apply_event_worker(Slave_worker *w)
+{
+ return do_apply_event(w);
+}
int Log_event::do_update_pos(Relay_log_info *rli)
{
@@ -3044,9 +3048,9 @@ int slave_worker_exec_job(Slave_worker *
}
}
}
- w->w_rli->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
- error= ev->do_apply_event(rli);
+ error= ev->do_apply_event_worker(w);
+
if (ev->ends_group() || !w->curr_group_seen_begin)
{
DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed %lu", ev->mts_group_cnt, w->last_group_done_index));
@@ -3054,13 +3058,6 @@ int slave_worker_exec_job(Slave_worker *
w->slave_worker_ends_group(ev, error); /* last done sets post exec */
}
- /*
- commit_positions() fullfils group pos incr and flush
- TODO: remove
- if (!error)
- ev->update_pos(w->w_rli);
- */
-
mysql_mutex_lock(&w->jobs_lock);
de_queue(&w->jobs, job_item);
@@ -3119,7 +3116,7 @@ int slave_worker_exec_job(Slave_worker *
err:
- // todo: fix w/a for Rows_query_log_event
+ // TODO: fix w/a for Rows_query_log_event
if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
delete ev; // after ev->update_pos() event is garbage
@@ -4266,8 +4263,12 @@ int Query_log_event::do_apply_event(Rela
const_cast<Relay_log_info*>(rli)->set_future_group_master_log_pos(log_pos);
DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
+ /*
+ todo: such cleanup should not be specific to Query event and therefore
+ is preferable at a common with other event pre-execution point
+ */
clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
- if (strcmp("COMMIT", query) == 0 && *(rli->get_tables_to_lock()) != NULL)
+ if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock != NULL)
{
/*
Cleaning-up the last statement context:
@@ -4579,6 +4580,9 @@ Default database: '%s'. Query: '%s'",
The sql thread receives the killed status and will proceed
to shutdown trying to finish incomplete events group.
*/
+
+ // TODO: address the middle-group killing in MTS case
+
DBUG_EXECUTE_IF("stop_slave_middle_group",
if (strcmp("COMMIT", query) != 0 &&
strcmp("BEGIN", query) != 0)
@@ -6774,6 +6778,28 @@ void Xid_log_event::print(FILE* file, PR
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+
+int Xid_log_event::do_apply_event_worker(Slave_worker *w)
+{
+ int error= 0;
+ bool is_trans_repo= w->is_transactional();
+
+ if (is_trans_repo)
+ {
+ ulong gaq_idx= mts_group_cnt;
+ Slave_job_group *ptr_g=
+ (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+
+ if ((error= w->commit_positions(this, ptr_g)))
+ goto err;
+ }
+ error= trans_commit(thd); /* Automatically rolls back on error. */
+ thd->mdl_context.release_transactional_locks();
+
+err:
+ return error;
+}
+
int Xid_log_event::do_apply_event(Relay_log_info const *rli)
{
int error= 0;
@@ -6785,15 +6811,13 @@ int Xid_log_event::do_apply_event(Relay_
the context of the current transaction in order to provide
data integrity. See sql/rpl_rli.h for further details.
*/
- Slave_worker *w= rli_ptr->get_current_worker();
- bool is_parallel= (w != NULL);
- bool is_trans_repo= (is_parallel ? w->is_transactional() : rli_ptr->is_transactional());
+ bool is_trans_repo= rli_ptr->is_transactional();
/* For a slave Xid_log_event is COMMIT */
general_log_print(thd, COM_QUERY,
"COMMIT /* implicit, from Xid_log_event */");
- if (is_trans_repo && !is_parallel)
+ if (is_trans_repo)
{
mysql_mutex_lock(&rli_ptr->data_lock);
}
@@ -6811,7 +6835,7 @@ int Xid_log_event::do_apply_event(Relay_
/*
We need to update the positions in here to make it transactional.
*/
- if (is_trans_repo && !is_parallel)
+ if (is_trans_repo)
{
rli_ptr->inc_event_relay_log_pos();
rli_ptr->set_group_relay_log_pos(rli_ptr->get_event_relay_log_pos());
@@ -6827,15 +6851,6 @@ int Xid_log_event::do_apply_event(Relay_
if ((error= rli_ptr->flush_info(TRUE)))
goto err;
}
- else if (is_trans_repo && is_parallel)
- {
- ulong gaq_idx= mts_group_cnt;
- Slave_job_group *ptr_g=
- (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
-
- if ((error= w->commit_positions(this, ptr_g)))
- goto err;
- }
DBUG_PRINT("info", ("do_apply group master %s %lu group relay %s %lu event %s %lu\n",
rli_ptr->get_group_master_log_name(),
@@ -6852,7 +6867,7 @@ int Xid_log_event::do_apply_event(Relay_
thd->mdl_context.release_transactional_locks();
err:
- if (is_trans_repo && !is_parallel)
+ if (is_trans_repo)
{
mysql_cond_broadcast(&rli_ptr->data_cond);
mysql_mutex_unlock(&rli_ptr->data_lock);
@@ -8711,7 +8726,7 @@ int Rows_log_event::do_apply_event(Relay
do_apply_event(). We still check here to prevent future coding
errors.
*/
- DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
+ DBUG_ASSERT(rli->info_thd == thd);
/*
If there is no locks taken, this is the first binrow event seen
@@ -8763,7 +8778,7 @@ int Rows_log_event::do_apply_event(Relay
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
- if (open_and_lock_tables(thd, *rli->get_tables_to_lock(), FALSE, 0))
+ if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
{
uint actual_error= thd->stmt_da->sql_errno();
if (thd->is_slave_error || thd->is_fatal_error)
@@ -8794,8 +8809,8 @@ int Rows_log_event::do_apply_event(Relay
{
DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
- *rli->get_tables_to_lock()));
- RPL_TABLE_LIST *ptr= *rli->get_tables_to_lock();
+ rli->tables_to_lock));
+ RPL_TABLE_LIST *ptr= rli->tables_to_lock;
for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
{
TABLE *conv_table;
@@ -8835,22 +8850,17 @@ int Rows_log_event::do_apply_event(Relay
Rows_log_event, we can invalidate the query cache for the
associated table.
*/
- for (TABLE_LIST *ptr= *rli->get_tables_to_lock() ; ptr ; ptr= ptr->next_global)
+ for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr; ptr= ptr->next_global)
{
- if (!rli->is_parallel_exec())
- const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
- else
- rli->get_current_worker()->m_table_map.set_table(ptr->table_id, ptr->table);
+ const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
}
#ifdef HAVE_QUERY_CACHE
- query_cache.invalidate_locked_for_write(*rli->get_tables_to_lock());
+ query_cache.invalidate_locked_for_write(rli->tables_to_lock);
#endif
}
- TABLE*
- table= m_table= (!rli->is_parallel_exec()) ?
- const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id) :
- rli->get_current_worker()->m_table_map.get_table(m_table_id);
+ TABLE* table=
+ m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
@@ -9566,7 +9576,7 @@ int Table_map_log_event::do_apply_event(
size_t dummy_len;
void *memory;
DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
- DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
+ DBUG_ASSERT(rli->info_thd == thd);
/* Step the query id to mark what columns that are actually used. */
thd->set_query_id(next_query_id());
@@ -9631,11 +9641,9 @@ int Table_map_log_event::do_apply_event(
We record in the slave's information that the table should be
locked by linking the table into the list of tables to lock.
*/
- table_list->next_global= table_list->next_local= *rli->get_tables_to_lock();
-
- //const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
- //const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
- const_cast<Relay_log_info*>(rli)->add_table_to_lock(table_list);
+ table_list->next_global= table_list->next_local= rli->tables_to_lock;
+ const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
+ const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
/* 'memory' is freed in clear_tables_to_lock */
}
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-02-27 17:35:25 +0000
+++ b/sql/log_event.h 2011-05-16 19:43:58 +0000
@@ -1356,6 +1356,8 @@ public:
return 0; /* Default implementation does nothing */
}
+ virtual int do_apply_event_worker(Slave_worker *w);
+
protected:
/**
@@ -2663,6 +2665,7 @@ class Xid_log_event: public Log_event
private:
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
virtual int do_apply_event(Relay_log_info const *rli);
+ virtual int do_apply_event_worker(Slave_worker *rli);
enum_skip_reason do_shall_skip(Relay_log_info *rli);
#endif
};
=== modified file 'sql/rpl_reporting.cc'
--- a/sql/rpl_reporting.cc 2010-08-05 17:45:25 +0000
+++ b/sql/rpl_reporting.cc 2011-05-16 19:43:58 +0000
@@ -29,12 +29,20 @@ void
Slave_reporting_capability::report(loglevel level, int err_code,
const char *msg, ...) const
{
+ va_list args;
+ va_start(args, msg);
+ do_report(level, err_code, msg, args);
+ va_end(args);
+}
+
+void
+Slave_reporting_capability::do_report(loglevel level, int err_code,
+ const char *msg, va_list args) const
+{
void (*report_function)(const char *, ...);
char buff[MAX_SLAVE_ERRMSG];
char *pbuff= buff;
uint pbuffsize= sizeof(buff);
- va_list args;
- va_start(args, msg);
mysql_mutex_lock(&err_lock);
switch (level)
@@ -64,7 +72,6 @@ Slave_reporting_capability::report(logle
my_vsnprintf(pbuff, pbuffsize, msg, args);
mysql_mutex_unlock(&err_lock);
- va_end(args);
/* If the msg string ends with '.', do not add a ',' it would be ugly */
report_function("Slave %s: %s%s Error_code: %d",
=== modified file 'sql/rpl_reporting.h'
--- a/sql/rpl_reporting.h 2010-08-05 17:45:25 +0000
+++ b/sql/rpl_reporting.h 2011-05-16 19:43:58 +0000
@@ -52,8 +52,10 @@ public:
code, but can contain more information), in
printf() format.
*/
- void report(loglevel level, int err_code, const char *msg, ...) const
+ virtual void report(loglevel level, int err_code, const char *msg, ...) const
ATTRIBUTE_FORMAT(printf, 4, 5);
+ void do_report(loglevel level, int err_code,
+ const char *msg, va_list v_args) const;
/**
Clear errors. They will not show up under <code>SHOW SLAVE
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-04-06 12:51:58 +0000
+++ b/sql/rpl_rli.cc 2011-05-16 19:43:58 +0000
@@ -146,8 +146,7 @@ void Relay_log_info::deinit_workers()
mysql_cond_destroy(&pending_jobs_cond);
mysql_mutex_destroy(&mts_temp_tables_lock);
- if (!this_worker)
- delete_dynamic(&workers);
+ delete_dynamic(&workers);
}
Relay_log_info::~Relay_log_info()
@@ -203,57 +202,6 @@ void Relay_log_info::reset_notified_chec
checkpoint_seqno= 0;
}
-/**
- The method can be run both by C having the Main (coord) rli context and
- by W having both the main and the Local (worker) rli context.
- Decision matrix:
-
- Main Local
- -+-------------
- C| this_w -
- W| w_i this_w
-
-*/
-Slave_worker* Relay_log_info::get_current_worker() const
-{
- uint i;
- if (!is_parallel_exec() || info_thd == current_thd)
- return this_worker; // can be asserted: !this_worker => C
- for (i= 0; i< workers.elements; i++)
- {
- Slave_worker* w_i= *(Slave_worker**)
- dynamic_array_ptr(const_cast<DYNAMIC_ARRAY*>(&workers), i);
- if (w_i->info_thd == current_thd)
- {
- return w_i;
- }
- }
- DBUG_ASSERT(0);
-}
-
-/**
- The method can be run both by C having the Main context.
-*/
-RPL_TABLE_LIST** Relay_log_info::get_tables_to_lock() const
-{
- return
- ((!is_parallel_exec()) || info_thd == current_thd) ?
- const_cast<RPL_TABLE_LIST**>(&tables_to_lock) :
- &get_current_worker()->tables_to_lock;
-}
-
-uint Relay_log_info::add_table_to_lock(RPL_TABLE_LIST *table_list)
-{
- if (!is_parallel_exec())
- tables_to_lock= table_list;
- else
- get_current_worker()->tables_to_lock= table_list;
- return
- (!is_parallel_exec()) ?
- tables_to_lock_count++ :
- get_current_worker()->tables_to_lock_count++;
-}
-
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
{
MY_STAT s;
@@ -1070,22 +1018,18 @@ void Relay_log_info::stmt_done(my_off_t
inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
else
{
- Slave_worker *w;
inc_group_relay_log_pos(event_master_log_pos);
- /* Alfranio needs to update the coordinator and workers. */
+ DBUG_ASSERT(this_worker == NULL);
- if ((w= get_current_worker()) == NULL)
- {
- flush_info(is_transactional() ? TRUE : FALSE);
+ flush_info(is_transactional() ? TRUE : FALSE);
- /*
- The central recovery commit run in sequential mode forces
- notification on the defacto new checkpoint.
- */
- if (is_parallel_exec())
- reset_notified_checkpoint();
- }
+ /*
+ The central recovery commit run in sequential mode forces
+ notification on the defacto new checkpoint.
+ */
+ if (is_parallel_exec())
+ reset_notified_checkpoint();
}
}
@@ -1094,7 +1038,7 @@ void Relay_log_info::cleanup_context(THD
{
DBUG_ENTER("Relay_log_info::cleanup_context");
- DBUG_ASSERT((info_thd == thd) || is_parallel_exec() || is_mts_recovery());
+ DBUG_ASSERT((info_thd == thd));
/*
1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
may have opened tables, which we cannot be sure have been closed (because
@@ -1116,7 +1060,7 @@ void Relay_log_info::cleanup_context(THD
MTS W/a for Rows_query_log_event.
Cleanup of rows_query_ev at the end of the current statement.
- todo: move handle_rows_query_log_event() cleanup logics into this method
+ TODO: move handle_rows_query_log_event() cleanup logics into this method
inconditionally.
*/
if (error || is_parallel_exec())
@@ -1127,10 +1071,7 @@ void Relay_log_info::cleanup_context(THD
info_thd->set_query(NULL, 0);
}
- if (!is_parallel_exec() || thd == info_thd)
- m_table_map.clear_tables();
- else
- get_current_worker()->m_table_map.clear_tables();
+ m_table_map.clear_tables();
slave_close_thread_tables(thd);
if (error)
@@ -1146,26 +1087,20 @@ void Relay_log_info::cleanup_context(THD
void Relay_log_info::clear_tables_to_lock()
{
- RPL_TABLE_LIST **p_tables= get_tables_to_lock();
- Slave_worker* w_c= get_current_worker();
- while ((*p_tables))
+ while (tables_to_lock)
{
- uchar* to_free= reinterpret_cast<uchar*>((*p_tables));
- if ((*p_tables)->m_tabledef_valid)
+ uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
+ if (tables_to_lock->m_tabledef_valid)
{
- (*p_tables)->m_tabledef.table_def::~table_def();
- (*p_tables)->m_tabledef_valid= FALSE;
+ tables_to_lock->m_tabledef.table_def::~table_def();
+ tables_to_lock->m_tabledef_valid= FALSE;
}
- (*p_tables)=
- static_cast<RPL_TABLE_LIST*>((*p_tables)->next_global);
- if (w_c)
- w_c->tables_to_lock_count--;
- else
- tables_to_lock_count--;
+ tables_to_lock=
+ static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
+ tables_to_lock_count--;
my_free(to_free);
}
- DBUG_ASSERT((*p_tables) == NULL &&
- ((!w_c && tables_to_lock_count == 0) || w_c->tables_to_lock_count == 0));
+ DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
}
void Relay_log_info::slave_close_thread_tables(THD *thd)
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-04-06 12:51:58 +0000
+++ b/sql/rpl_rli.h 2011-05-16 19:43:58 +0000
@@ -206,7 +206,6 @@ public:
happen when, for example, the relay log gets rotated because of
max_binlog_size.
*/
-protected:
char group_relay_log_name[FN_REFLEN];
ulonglong group_relay_log_pos;
char event_relay_log_name[FN_REFLEN];
@@ -228,6 +227,8 @@ protected:
char group_master_log_name[FN_REFLEN];
volatile my_off_t group_master_log_pos;
+// TODO: Restore!
+protected:
/*
When it commits, InnoDB internally stores the master log position it has
processed so far; the position to store is the one of the end of the
@@ -284,9 +285,6 @@ public:
*/
ulong slave_exec_mode;
- RPL_TABLE_LIST** get_tables_to_lock() const;
- uint add_table_to_lock(RPL_TABLE_LIST *table_list);
-
/*
Condition and its parameters from START SLAVE UNTIL clause.
@@ -394,7 +392,8 @@ public:
bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
{
- TABLE_LIST *tables= *get_tables_to_lock();
+ TABLE_LIST *tables= tables_to_lock;
+
DBUG_ASSERT(tabledef_var && conv_table_var);
for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
if (ptr->table == table_arg)
@@ -455,8 +454,6 @@ public:
ulong mts_wqs_overfill_cnt; // Coord waits if a W's queue is full
long mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
- Slave_worker* get_current_worker() const;
- Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
ulonglong mts_total_groups; // total event groups distributed in current session
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-23 13:34:02 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-05-16 19:43:58 +0000
@@ -29,13 +29,10 @@ const char *info_slave_worker_fields []=
Slave_worker::Slave_worker(const char* type, const char* pfs,
Relay_log_info *rli)
- : Rpl_info_worker(type, pfs), c_rli(rli), curr_group_exec_parts(0),
- group_relay_log_pos(0), group_master_log_pos(0),
+ : Relay_log_info(FALSE), c_rli(rli), curr_group_exec_parts(0),
checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
inited_group_execed(0)
{
- group_relay_log_name[0]= 0;
- group_master_log_name[0]= 0;
checkpoint_relay_log_name[0]= 0;
checkpoint_master_log_name[0]= 0;
}
@@ -777,14 +774,14 @@ bool circular_buffer_queue::gt(ulong i,
Progress in GAQ is assessed through comparision of GAQ index value
with Worker's @c last_group_done_index.
Purging breaks at a first discovered gap, that is an item
- that the assinged item->w_id'th Worker has not completed yet.
+ that the assinged item->w_id'th Worker has not yet completed.
The caller is supposed to be the checkpoint handler.
A copy of the last discarded item containing
the refreshed value of the committed low-water-mark is stored
into @c lwm container member for further caller's processing.
- @last_done is updated with the latests total_seqno for each Worker
+ @c last_done is updated with the latest total_seqno for each Worker
that was met during GAQ parse.
@note dyn-allocated members of Slave_job_group such as
@@ -864,6 +861,19 @@ ulong Slave_committed_queue::move_queue_
return cnt;
}
+void Slave_worker::do_report(loglevel level, int err_code, const char *msg, va_list vargs) const
+{
+ c_rli->do_report(level, err_code, msg, vargs);
+}
+
+void Slave_worker::report(loglevel level, int err_code, const char *msg, ...) const
+{
+ va_list vargs;
+ va_start(vargs, msg);
+
+ do_report(level, err_code, msg, vargs);
+ va_end(vargs);
+}
int wait_for_workers_to_finish(Relay_log_info const *rli, Slave_worker *ignore)
{
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-21 19:31:29 +0000
+++ b/sql/rpl_rli_pdb.h 2011-05-16 19:43:58 +0000
@@ -202,7 +202,7 @@ public:
ulonglong waited_overfill;
};
-class Slave_worker : public Rpl_info_worker
+class Slave_worker : public Relay_log_info
{
public:
Slave_worker(const char *type, const char *pfs,
@@ -214,8 +214,6 @@ public:
Slave_jobs_queue jobs;
Relay_log_info *c_rli;
- // fixme: experimental
- Relay_log_info *w_rli;
Dynamic_ids *curr_group_exec_parts; // CGEP
bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
@@ -250,11 +248,6 @@ public:
We need to make this a dynamic field. /Alfranio
*/
char partitions[FN_REFLEN];
- char group_relay_log_name[FN_REFLEN];
- ulonglong group_relay_log_pos;
- char group_master_log_name[FN_REFLEN];
- ulonglong group_master_log_pos;
-
// todo: remove
char checkpoint_relay_log_name[FN_REFLEN];
ulonglong checkpoint_relay_log_pos;
@@ -273,6 +266,10 @@ public:
bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
+ void report(loglevel level, int err_code, const char *msg, ...) const
+ ATTRIBUTE_FORMAT(printf, 4, 5);
+ void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
+
MY_BITMAP group_execed;
bool inited_group_execed;
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-04-06 12:51:58 +0000
+++ b/sql/rpl_slave.cc 2011-05-16 19:43:58 +0000
@@ -2998,8 +2998,6 @@ static int exec_relay_log_event(THD* thd
MTS: since master,relay-group coordinates change per checkpoint
at the end of the checkpoint interval UNTIL can be left far behind.
Hence, UNTIL forces the sequential applying.
-
- TODO: to not let to start with UNTIL whenever @@global.max_slave_workers>0.
*/
if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
rli->is_until_satisfied(thd, ev))
@@ -3720,8 +3718,6 @@ pthread_handler_t handle_slave_worker(vo
goto err;
}
w->info_thd= thd;
- w->w_rli->info_thd= thd;
-
thd->thread_stack = (char*)&thd;
pthread_detach_this_thread();
@@ -3756,9 +3752,8 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn
mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
- // Todo: add necessary stuff to clean up after Q-log-event, a Q trans
thd->clear_error();
- rli->cleanup_context(thd, error);
+ w->cleanup_context(thd, error);
}
mysql_mutex_lock(&w->jobs_lock);
@@ -3871,26 +3866,6 @@ bool mts_recovery_groups(Relay_log_info
delete worker;
};
-#if 0
- for (uint id= 0; id < rli->slave_parallel_workers; id++)
- {
- Slave_worker *worker=
- Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
- worker->init_info();
- retrieve_job(worker, job_file);
- /*
- This avoids gathering information on workers that haven't
- processed anything.
- */
-
- // TODO: disregard W_i | W_i->coord < LWM
-
- if (job_file.group_relay_log_name != NULL && strcmp(job_file.group_relay_log_name, "") &&
- job_file.group_master_log_name != NULL && strcmp(job_file.group_master_log_name, ""))
- insert_dynamic(&above_lwm_jobs, (uchar*) &job_file);
- }
-#endif
-
sort_dynamic(&above_lwm_jobs, (qsort_cmp) mts_event_coord_cmp);
/*
In what follows, the group Recovery Bitmap is constructed.
@@ -3962,9 +3937,11 @@ bool mts_recovery_groups(Relay_log_info
ev->log_pos };
if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
{
- // hit it
- // w.B << group_cnt++;
- // RB |= w.B;
+ /*
+ hit it
+ w.B << group_cnt++;
+ RB |= w.B;
+ */
for (uint i= w->checkpoint_seqno - rli->mts_recovery_group_cnt, j= 0;
i <= w->checkpoint_seqno; i++, j++)
{
@@ -4178,36 +4155,41 @@ bool mts_checkpoint_routine(Relay_log_in
if (!locked)
mysql_mutex_lock(&rli->data_lock);
- // Coordinator::commit_positions() {
-
- // rli->gaq->lwm contains all but rli->group_master_log_name
+ /*
+ Coordinator::commit_positions() {
- // group_master_log_name is updated only by Coordinator and it can't change
- // within checkpoint interval because Coordinator flushes the updated value
- // at once.
- // Note, unlike group_master_log_name, event_relay_log_pos is updated solely
- // within Coordinator read loop context. Hence, it's possible at times
- // event_rlp > group_rlp.
+ rli->gaq->lwm contains all but rli->group_master_log_name
+ group_master_log_name is updated only by Coordinator and it can't change
+ within checkpoint interval because Coordinator flushes the updated value
+ at once.
+ Note, unlike group_master_log_name, event_relay_log_pos is updated solely
+ within Coordinator read loop context. Hence, it's possible at times
+ event_rlp > group_rlp.
+ */
rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
if (rli->gaq->lwm.group_relay_log_name[0] != 0)
rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
- //todo: uncomment notifies when UNTIL will be supported
+ /*
+ todo: uncomment notifies when UNTIL will be supported
- //rli->notify_group_master_log_name_update();
- //rli->notify_group_relay_log_name_update();
+ rli->notify_group_master_log_name_update();
+ rli->notify_group_relay_log_name_update();
- // todo: optimize with if (wait_flag) broadcast
- // waiter: set wait_flag; waits....; drops wait_flag;
+ Todo: optimize with if (wait_flag) broadcast
+ waiter: set wait_flag; waits....; drops wait_flag;
+ */
mysql_cond_broadcast(&rli->data_cond);
if (!locked)
mysql_mutex_unlock(&rli->data_lock);
error= rli->flush_info(TRUE);
- // end of commit_positions
+ /*
+ } // end of commit_positions
+ */
rli->reset_notified_checkpoint();
@@ -4229,7 +4211,6 @@ int slave_start_single_worker(Relay_log_
pthread_t th;
Slave_worker *w= NULL;
Slave_job_item empty= {NULL};
- Rpl_info_dummy *dummy_handler= NULL;
if (!(w=
Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli)))
@@ -4243,11 +4224,6 @@ int slave_start_single_worker(Relay_log_
w->tables_to_lock= NULL;
w->tables_to_lock_count= 0;
- // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
- // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
- w->w_rli= new Relay_log_info(FALSE);
- dummy_handler= new Rpl_info_dummy(rli->get_number_info_rli_fields());
- w->w_rli->set_rpl_info_handler(dummy_handler);
if (w->init_info())
{
sql_print_error("Failed during slave worker thread create");
@@ -4258,12 +4234,10 @@ int slave_start_single_worker(Relay_log_
// TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
// entry->usage assert
w->curr_group_exec_parts->dynamic_ids.elements= 0;
-
w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
w->checkpoint_notified= FALSE;
- w->w_rli->workers= rli->workers; // shallow copying is sufficient
- w->w_rli->this_worker= w;
-
+ w->workers= rli->workers; // shallow copying is sufficient
+ w->this_worker= w;
w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
w->id= i;
w->current_table= NULL;
@@ -4438,8 +4412,6 @@ void slave_stop_workers(Relay_log_info *
DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
delete_dynamic(&w->jobs.Q);
delete_dynamic_element(&rli->workers, i);
- delete w->w_rli;
-
delete w;
}
@@ -4508,8 +4480,10 @@ pthread_handler_t handle_slave_sql(void
/* mts-II: starting the worker pool */
if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
- goto err;
-
+ {
+ mysql_mutex_unlock(&rli->run_lock);
+ goto err;
+ }
if (init_slave_thread(thd, SLAVE_THD_SQL))
{
/*
@@ -7143,25 +7117,13 @@ mysql_mutex_t* mts_get_temp_table_mutex(
*/
THD* mts_get_coordinator_thd()
{
- Slave_worker *w= NULL;
return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
- NULL : !(w= active_mi->rli->get_current_worker()) ?
- NULL : w->c_rli->info_thd;
+ NULL : active_mi->rli->info_thd;
}
/**
- @return a reference to THD of a Worker thread or NULL
- in case of no replication is set up or it's in the sequential mode.
-*/
-THD* mts_get_worker_thd()
-{
- Slave_worker *w= NULL;
- return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
- NULL : !(w= active_mi->rli->get_current_worker()) ?
- NULL : w->w_rli->info_thd;
-}
+ TODO: exploint new slave_worker system thread type property
-/**
@param thd a reference to THD
@return TRUE if thd belongs to a Worker thread and FALSE otherwise.
@@ -7169,9 +7131,7 @@ THD* mts_get_worker_thd()
bool mts_is_worker(THD *thd)
{
return
- thd->slave_thread &&
- thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
- (mts_get_worker_thd() != NULL);
+ thd->slave_thread && active_mi->rli->info_thd != thd;
}
/* end of MTS temp table support section */
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110516194358-bebbn66gdh75adif.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3277) WL#5569 | Andrei Elkin | 16 May |