#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3281 Andrei Elkin 2011-05-25
wl#5569 MTS
Wl#5754 Query event parallel appying
wl#5599 MTS recovery
The patch includes
some cleanup, including one for temp tables support, realization of few todo:s.
@ sql/field.cc
Restoring asserts that were before changes to sql_base.cc.
@ sql/handler.cc
Restoring assert that were before changes to sql_base.cc.
@ sql/log_event.cc
cleanup, incl restoration of the trunk version of some pieces of code.
passing future_event_relay_log_pos to Worker to stike out a todo in rpl_rli.cc.
@ sql/rpl_rli.cc
passing future_event_relay_log_pos is done via an assignment to
Worker's member in slave_worker_exec_job().
@ sql/rpl_rli.h
restoring the original version of get_table_data() though
no real changes.
@ sql/rpl_rli_pdb.cc
cleanup.
@ sql/rpl_slave.cc
Restoring original sequential mode version of assert
in sql_slave_killed.
Worker is not supposed to run this function.
Testing of skipping logics is left to the rpl suite
be run in the parallel mode.
Cleanup.
Marking recovery related todo items explicitly.
Setting up guards to guarantee sequential mode in requested
points of the code.
@ sql/sql_base.cc
removing unnecessary return value in incr_slave_open_temp_tables def.
@ sql/sql_parse.cc
cleanup.
modified:
sql/field.cc
sql/handler.cc
sql/log_event.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_slave.cc
sql/sql_base.cc
sql/sql_parse.cc
=== modified file 'sql/field.cc'
--- a/sql/field.cc 2011-02-27 17:35:25 +0000
+++ b/sql/field.cc 2011-05-25 16:02:13 +0000
@@ -3736,12 +3736,7 @@ longlong Field_long::val_int(void)
ASSERT_COLUMN_MARKED_FOR_READ;
int32 j;
/* See the comment in Field_long::store(long long) */
- /*
- In case the method is executed not by the table's owner
- that one must be a Slave worker thread.
- */
- DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
-
+ DBUG_ASSERT(table->in_use == current_thd);
#ifdef WORDS_BIGENDIAN
if (table->s->db_low_byte_first)
j=sint4korr(ptr);
@@ -6313,8 +6308,8 @@ int Field_string::store(const char *from
const char *cannot_convert_error_pos;
const char *from_end_pos;
- /* See the comment for Field_long::store(long long) and Field_long::val_int */
- DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+ /* See the comment for Field_long::store(long long) */
+ DBUG_ASSERT(table->in_use == current_thd);
copy_length= well_formed_copy_nchars(field_charset,
(char*) ptr, field_length,
@@ -6463,8 +6458,8 @@ String *Field_string::val_str(String *va
String *val_ptr)
{
ASSERT_COLUMN_MARKED_FOR_READ;
- /* See the comment for Field_long::store(long long) and Field_long::val_int */
- DBUG_ASSERT(table->in_use == current_thd || (current_thd)->slave_thread);
+ /* See the comment for Field_long::store(long long) */
+ DBUG_ASSERT(table->in_use == current_thd);
uint length;
if (table->in_use->variables.sql_mode &
MODE_PAD_CHAR_TO_FULL_LENGTH)
=== modified file 'sql/handler.cc'
--- a/sql/handler.cc 2011-02-27 17:35:25 +0000
+++ b/sql/handler.cc 2011-05-25 16:02:13 +0000
@@ -2127,13 +2127,7 @@ void **handler::ha_data(THD *thd) const
THD *handler::ha_thd(void) const
{
- /*
- About current_thd->slave_thread alternative,
- MTS coordinator open/closes a temp table while the rest of operation
- is done by Workers.
- */
- DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd ||
- current_thd->slave_thread);
+ DBUG_ASSERT(!table || !table->in_use || table->in_use == current_thd);
return (table && table->in_use) ? table->in_use : current_thd;
}
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-05-24 14:29:35 +0000
+++ b/sql/log_event.cc 2011-05-25 16:02:13 +0000
@@ -2932,14 +2932,14 @@ int Log_event::apply_event(Relay_log_inf
struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item)
{
THD *thd= w->info_thd;
+
mysql_mutex_lock(&w->jobs_lock);
+
while (!job_item->data && !thd->killed)
{
const char *old_msg;
- //job_item= w->jobs.pop(); // LABS-TODO de_queue()
head_queue(&w->jobs, job_item);
-
if (job_item->data == NULL)
{
w->wait_jobs++;
@@ -2952,6 +2952,7 @@ struct slave_job_item* pop_jobs_item(Sla
}
if (job_item->data)
w->curr_jobs--;
+
mysql_mutex_unlock(&w->jobs_lock);
thd_proc_info(w->info_thd, "Executing event");
@@ -2960,7 +2961,7 @@ struct slave_job_item* pop_jobs_item(Sla
/**
- mts-II worker main routine.
+ MTS worker main routine.
The worker thread waits for an event, execute it, fixes statistics counters.
@note the function maintains CGEP and modifies APH, and causes
@@ -3030,7 +3031,7 @@ int slave_worker_exec_job(Slave_worker *
}
}
}
-
+ w->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
error= ev->do_apply_event_worker(w);
if (ev->ends_group() || !w->curr_group_seen_begin)
@@ -6920,7 +6921,7 @@ int Xid_log_event::do_apply_event(Relay_
{
rli_ptr->set_group_master_log_pos(log_pos);
}
-
+
if ((error= rli_ptr->flush_info(TRUE)))
goto err;
}
@@ -8923,7 +8924,7 @@ int Rows_log_event::do_apply_event(Relay
Rows_log_event, we can invalidate the query cache for the
associated table.
*/
- for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr; ptr= ptr->next_global)
+ for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
{
const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
}
@@ -8932,9 +8933,10 @@ int Rows_log_event::do_apply_event(Relay
#endif
}
- TABLE* table=
+ TABLE*
+ table=
m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
-
+
DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
if (table)
@@ -9717,7 +9719,6 @@ int Table_map_log_event::do_apply_event(
table_list->next_global= table_list->next_local= rli->tables_to_lock;
const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
-
/* 'memory' is freed in clear_tables_to_lock */
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2011-05-25 07:36:36 +0000
+++ b/sql/rpl_rli.cc 2011-05-25 16:02:13 +0000
@@ -1017,7 +1017,7 @@ void Relay_log_info::stmt_done(my_off_t
while the MyISAM table has already been updated.
*/
if ((info_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
- inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
+ inc_event_relay_log_pos();
else
{
inc_group_relay_log_pos(event_master_log_pos);
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli.h 2011-05-25 16:02:13 +0000
@@ -392,10 +392,8 @@ public:
bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
{
- TABLE_LIST *tables= tables_to_lock;
-
DBUG_ASSERT(tabledef_var && conv_table_var);
- for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
+ for (TABLE_LIST *ptr= tables_to_lock ; ptr != NULL ; ptr= ptr->next_global)
if (ptr->table == table_arg)
{
*tabledef_var= &static_cast<RPL_TABLE_LIST*>(ptr)->m_tabledef;
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2011-05-24 14:29:35 +0000
+++ b/sql/rpl_rli_pdb.cc 2011-05-25 16:02:13 +0000
@@ -63,7 +63,7 @@ int Slave_worker::init_info()
goto err;
inited_group_execed= 1;
-
+
/*
The init_info() is used to either create or read information
from the repository, in order to initialize the Slave_worker.
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-05-25 07:36:36 +0000
+++ b/sql/rpl_slave.cc 2011-05-25 16:02:13 +0000
@@ -1075,8 +1075,8 @@ static bool sql_slave_killed(THD* thd, R
bool ret= FALSE;
DBUG_ENTER("sql_slave_killed");
- DBUG_ASSERT(rli->info_thd == thd || thd->slave_thread);
- DBUG_ASSERT(rli->slave_running == 1 || thd->slave_thread);// tracking buffer overrun
+ DBUG_ASSERT(rli->info_thd == thd);
+ DBUG_ASSERT(rli->slave_running == 1);// tracking buffer overrun
if (abort_loop || thd->killed || rli->abort_slave)
{
/*
@@ -2819,13 +2819,7 @@ int apply_event_and_update_pos(Log_event
}
if (reason == Log_event::EVENT_SKIP_NOT)
{
- /*
- MTS-todo: to test neither skipping nor delayed-exec logics
- are affected by parallel exec mode.
- */
-
// Sleeps if needed, and unlocks rli->data_lock.
-
if (sql_delay_event(ev, thd, rli))
DBUG_RETURN(0);
exec_res= ev->apply_event(rli);
@@ -2923,6 +2917,7 @@ int apply_event_and_update_pos(Log_event
DBUG_RETURN(exec_res ? 1 : 0);
}
+
/**
Top-level function for executing the next event in the relay log.
This is called from the SQL thread.
@@ -3798,9 +3793,6 @@ err:
/**
Orders jobs by comparing relay log information.
*/
-#if 0
-int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
-#endif
int mts_event_coord_cmp(LOG_POS_COORD *id1, LOG_POS_COORD *id2)
{
@@ -3954,126 +3946,6 @@ bool mts_recovery_groups(Relay_log_info
DBUG_ASSERT(rli->mts_recovery_group_cnt < groups->n_bits);
-#if 0
- for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
- {
- group_worker_counter= 0;
- group_lwm_counter= 0;
- get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
-
- sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
- "group_relay_log_name %s, group_relay_log_pos %lu "
- "group_master_log_name %s, group_master_log_pos %lu",
- job_worker.worker_id,
- job_worker.group_relay_log_name,
- (ulong) job_worker.group_relay_log_pos,
- job_worker.group_master_log_name,
- (ulong) job_worker.group_master_log_pos);
-
- for (uint it_file= 0; it_file < above_lwm_jobs.elements; it_file++)
- {
- get_dynamic(&above_lwm_jobs, (uchar *) &job_file, it_file);
-
- /*
- Either the current relay log file was already processed by the
- current worker or all groups were analyzed. So, the next file
- is checked.
- */
- if ((strcmp(job_worker.group_relay_log_name,
- job_file.group_relay_log_name) > 0) ||
- (group_worker_counter > (rli->checkpoint_group - 1)))
- continue;
-
- if (desc)
- {
- delete desc;
- desc= NULL;
- }
-
- if (log_name)
- {
- end_io_cache(&log);
- mysql_file_close(file, MYF(MY_WME));
- log_name= NULL;
- }
-
- if ((file= open_binlog(&log, job_file.group_relay_log_name, &errmsg)) < 0)
- {
- sql_print_error("%s", errmsg);
- goto end;
- }
- log_name= job_file.group_relay_log_name;
- my_stat(log_name, &s, MYF(0));
-
- if (!((desc= Log_event::read_log_event(&log, 0, &fdle,
- opt_master_verify_checksum)) &&
- desc->get_type_code() == FORMAT_DESCRIPTION_EVENT))
- goto end;
-
- my_b_seek(&log, (my_off_t) 0);
- while ((ev= Log_event::read_log_event(&log, 0, &fdle,
- opt_master_verify_checksum)))
- {
- DBUG_ASSERT(ev->is_valid());
-
- /*
- All groups were analyzed. So, the next worker needs to
- be checked.
- */
- if (group_worker_counter > (rli->checkpoint_group - 1))
- break;
-
- filecmp= strcmp(job_file.group_relay_log_name,
- job_worker.group_relay_log_name);
- poscmp= ev->log_pos -
- job_worker.group_master_log_pos;
-
- if (filecmp > 0 || (filecmp == 0 && poscmp > 0))
- {
- bool unhandled= !bitmap_is_set(&job_worker.worker->group_execed,
- group_worker_counter);
- if (ev->starts_group())
- curr_group_seen_begin= TRUE;
-
- if (ev->ends_group() || !curr_group_seen_begin)
- {
- filecmp= strcmp(job_file.group_relay_log_name,
- rli->get_group_master_log_name());
- poscmp= ev->log_pos -
- rli->get_group_master_log_pos();
- if (filecmp > 0 || (filecmp == 0 && poscmp > 0))
- {
- if (unhandled)
- bitmap_is_set(groups, group_lwm_counter);
- group_lwm_counter++;
- }
- curr_group_seen_begin= FALSE;
- group_worker_counter++;
- }
- }
-
- delete ev;
- ev= NULL;
- }
- }
- }
-
-end:
- if (desc)
- {
- delete desc;
- desc= NULL;
- }
-
-
- if (log_name)
- {
- end_io_cache(&log);
- mysql_file_close(file, MYF(MY_WME));
- log_name= NULL;
- }
-#endif
-
end_io_cache(&log);
mysql_file_close(file, MYF(MY_WME));
log_name= NULL;
@@ -4471,7 +4343,7 @@ pthread_handler_t handle_slave_sql(void
pthread_detach_this_thread();
- /* mts-II: starting the worker pool */
+ /* MTS: starting the worker pool */
if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
{
mysql_mutex_unlock(&rli->run_lock);
@@ -4539,7 +4411,6 @@ pthread_handler_t handle_slave_sql(void
goto err;
}
THD_CHECK_SENTRY(thd);
-
#ifndef DBUG_OFF
{
char llbuf1[22], llbuf2[22];
@@ -5839,7 +5710,6 @@ static Log_event* next_event(Relay_log_i
mysql_mutex_t *log_lock = rli->relay_log.get_log_lock();
const char* errmsg=0;
THD* thd = rli->info_thd;
-
DBUG_ENTER("next_event");
DBUG_ASSERT(thd != 0);
@@ -5954,7 +5824,7 @@ static Log_event* next_event(Relay_log_i
if (rli->is_parallel_exec() && (mts_checkpoint_period != 0 || force))
{
ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
- mts_checkpoint_routine(rli, period, force, TRUE); // ALFRANIO ERROR
+ mts_checkpoint_routine(rli, period, force, TRUE); // TODO: ALFRANIO ERROR
}
if (hot_log)
@@ -6085,7 +5955,7 @@ static Log_event* next_event(Relay_log_i
do
{
- mts_checkpoint_routine(rli, period, FALSE, FALSE); // ALFRANIO ERROR
+ mts_checkpoint_routine(rli, period, FALSE, FALSE); // TODO: ALFRANIO ERROR
set_timespec_nsec(waittime, period);
thd->enter_cond(log_cond, log_lock,
"Slave has read all relay log; "
@@ -6751,9 +6621,8 @@ int reset_slave(THD *thd, Master_info* m
/*
Clear master's log coordinates
-
- Andrei needs to guarantee that this done in sequential mode.
*/
+ DBUG_ASSERT(!mi->rli || !mi->rli->slave_running); // none writes in rli table
mi->init_master_log_pos();
if (remove_info(mi))
@@ -7074,9 +6943,9 @@ bool change_master(THD* thd, Master_info
info and relay log info are prepared to handle events from all
masters. In such case, we need to execute the code below for each
master and correctly set the key_info_idx. /Alfranio
-
- Andrei needs to guarantee that this done in sequential mode.
*/
+
+ DBUG_ASSERT(!mi->rli->slave_running); // none writes in rli table
ret= mi->rli->flush_info(TRUE);
mysql_cond_broadcast(&mi->data_cond);
mysql_mutex_unlock(&mi->rli->data_lock);
=== modified file 'sql/sql_base.cc'
--- a/sql/sql_base.cc 2011-05-25 07:36:36 +0000
+++ b/sql/sql_base.cc 2011-05-25 16:02:13 +0000
@@ -125,7 +125,7 @@ static void init_tdc_psi_keys(void)
}
#endif /* HAVE_PSI_INTERFACE */
-static int32 incr_slave_open_temp_tables(THD *thd, int inc)
+static void incr_slave_open_temp_tables(THD *thd, int inc)
{
int32 ret;
@@ -139,7 +139,7 @@ static int32 incr_slave_open_temp_tables
else
ret= (slave_open_temp_tables += inc);
- return ret;
+ return;
}
/**
=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc 2011-01-11 23:01:02 +0000
+++ b/sql/sql_parse.cc 2011-05-25 16:02:13 +0000
@@ -5164,7 +5164,7 @@ bool check_stack_overrun(THD *thd, long
uchar *buf __attribute__((unused)))
{
long stack_used;
- DBUG_ASSERT(thd == current_thd); // mts-II: be prepared to hit it
+ DBUG_ASSERT(thd == current_thd);
if ((stack_used=used_stack(thd->thread_stack,(char*) &stack_used)) >=
(long) (my_thread_stack_size - margin))
{
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110525160213-yg6qlowj5fitnvop.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3281) WL#5569WL#5599 WL#5754 | Andrei Elkin | 25 May |