#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped
3237 Andrei Elkin 2010-12-09
wl#5569 MTS
Integration with wl#5599 recovery for MTS and fixing two asserts.
One is due to missed cleanup of errored-out rows-events;
the other is a work-around on
w->curr_group_exec_parts->dynamic_ids
is initialized to have one partition on the Worker startup, but it should not.
@ sql/log_event.cc
Propagating CP related info from C to W.
@ sql/rpl_rli.cc
Added a part of CP info from C to W propagation.
@ sql/rpl_rli.h
New members to RLI due to CP info from C to W propagation.
@ sql/rpl_rli_pdb.cc
Worker stores the new CP to mention it in flush_info() along with
(todo) a bitmap of the executed groups within the checkpoint interval.
@ sql/rpl_rli_pdb.h
New members to a transport and the Worker class due to CP info.
@ sql/rpl_slave.cc
missed cleanup of errored-out rows-events;
work-around on
w->curr_group_exec_parts->dynamic_ids
is initialized to have one partition on the Worker startup, but it should not.
modified:
sql/log_event.cc
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-12-08 00:33:48 +0000
+++ b/sql/log_event.cc 2010-12-09 17:45:02 +0000
@@ -2418,9 +2418,13 @@ Slave_worker *Log_event::get_slave_worke
{
g.master_log_pos= log_pos;
g.group_master_log_pos= g.group_relay_log_pos= 0;
+ g.group_master_log_name= NULL; // todo: remove
g.group_relay_log_name= NULL;
g.worker_id= (ulong) -1;
g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+ g.checkpoint_log_name= NULL;
+ g.checkpoint_log_pos= 0;
+ g.checkpoint_seqno= (uint) -1;
// the last occupied GAQ's array index
rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
@@ -2490,6 +2494,10 @@ Slave_worker *Log_event::get_slave_worke
{
uint i;
mts_group_cnt= rli->gaq->assigned_group_index;
+ Slave_job_group *ptr_g=
+ (Slave_job_group *)
+ dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+
// TODO: throw an error when relay-log reading starts from inside of a group!!
@@ -2501,11 +2509,6 @@ Slave_worker *Log_event::get_slave_worke
Now group terminating event initiates the new name
delivery through the current group relaylog slot in GAQ.
*/
-
- Slave_job_group *ptr_g=
- (Slave_job_group *)
- dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
-
DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
ptr_g->group_relay_log_name= (char *)
@@ -2519,6 +2522,20 @@ Slave_worker *Log_event::get_slave_worke
worker->relay_log_change_notified= TRUE;
}
+ if (!worker->checkpoint_notified)
+ {
+ // Worker dealloc
+ ptr_g->checkpoint_log_name= (char *)
+ my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+ get_group_master_log_name()) + 1, MYF(MY_WME));
+ strcpy(ptr_g->checkpoint_log_name,
+ const_cast<Relay_log_info*>(rli)->get_group_master_log_name());
+ ptr_g->checkpoint_log_pos= const_cast<Relay_log_info*>(rli)->get_group_master_log_pos();
+ worker->checkpoint_notified= TRUE;
+ }
+ ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
+ const_cast<Relay_log_info*>(rli)->checkpoint_seqno++;
+
DBUG_ASSERT(worker == rli->last_assigned_worker);
if (!worker)
@@ -6641,7 +6658,11 @@ int Xid_log_event::do_apply_event(Relay_
}
else if (is_trans_repo && is_parallel)
{
- if ((error= w->commit_positions(this)))
+ ulong gaq_idx= mts_group_cnt;
+ Slave_job_group *ptr_g=
+ (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+
+ if ((error= w->commit_positions(this, ptr_g)))
goto err;
}
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-08 01:30:32 +0000
+++ b/sql/rpl_rli.cc 2010-12-09 17:45:02 +0000
@@ -82,9 +82,9 @@ Relay_log_info::Relay_log_info(bool is_s
We need to decide if you are going to use an
option and store it in a storage.
- 100 msec.
+ 500 msec.
*/
- lwm_period= 0.100;
+ lwm_period= 0.500;
set_timespec_nsec(last_clock, 0);
bzero((char*) &cache_buf, sizeof(cache_buf));
@@ -241,6 +241,27 @@ void Relay_log_info::reset_notified_rela
}
/**
+ Method is called in mts_checkpoint_routine()
+ to marks each Worker as requiring to adapt to a new checkpoint interval
+ whose coordinates is passed to it through GAQ index.
+
+ Worker notices the new checkpoint value at the group commit
+ to reset the current bitmap and set ON a bit number put by C into GAQ index
+ as the first group committed after the new checkpoint.
+*/
+void Relay_log_info::reset_notified_checkpoint()
+{
+ if (!is_parallel_exec())
+ return;
+ for (uint i= 0; i < workers.elements; i++)
+ {
+ Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
+ w->checkpoint_notified= FALSE;
+ }
+ checkpoint_seqno= 0;
+}
+
+/**
The method can be run both by C having the Main (coord) rli context and
by W having both the main and the Local (worker) rli context.
Decision matrix:
@@ -1113,7 +1134,16 @@ void Relay_log_info::stmt_done(my_off_t
/* Alfranio needs to update the coordinator and workers. */
if ((w= get_current_worker()) == NULL)
+ {
flush_info(is_transactional() ? TRUE : FALSE);
+
+ /*
+ The central recovery commit run in sequential mode forces
+ notification on the defacto new checkpoint.
+ */
+ if (is_parallel_exec())
+ reset_notified_checkpoint();
+ }
}
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-12-08 00:33:48 +0000
+++ b/sql/rpl_rli.h 2010-12-09 17:45:02 +0000
@@ -471,6 +471,7 @@ public:
a new partition. Is updated at checkpoint commit to the main RLI.
*/
DYNAMIC_ARRAY least_occupied_workers;
+ uint checkpoint_seqno; // counter of groups executed after the most recent CP
/* most of allocation in the coordinator rli is there */
void init_workers(ulong);
@@ -490,9 +491,21 @@ public:
return ret;
}
+ /**
+ While a group is executed by a Worker the relay log can change.
+ Coordinator notifies Workers about this event. Worker is supposed
+ to commit to the recovery table with the new info.
+ */
void reset_notified_relay_log_change();
/**
+ While a group is executed by a Worker the relay log can change.
+ Coordinator notifies Workers about this event. Coordinator and Workers
+ maintain a bitmap of executed group that is reset with a new checkpoint.
+ */
+ void reset_notified_checkpoint();
+
+ /**
Helper function to do after statement completion.
This function is called from an event to complete the group by
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-08 13:59:07 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-09 17:45:02 +0000
@@ -20,10 +20,11 @@ const char *info_slave_worker_fields []=
Slave_worker::Slave_worker(const char* type, const char* pfs)
: Rpl_info_worker(type, pfs), group_relay_log_pos(0),
- group_master_log_pos(0)
+ group_master_log_pos(0), checkpoint_log_pos(0)
{
group_relay_log_name[0]= 0;
group_master_log_name[0]= 0;
+ checkpoint_log_name[0]= 0;
curr_group_exec_parts= new Database_ids(NAME_LEN);
}
@@ -157,12 +158,24 @@ size_t Slave_worker::get_number_worker_f
return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
}
-bool Slave_worker::commit_positions(Log_event *ev)
+bool Slave_worker::commit_positions(Log_event *ev, Slave_job_group* ptr_g)
{
DBUG_ENTER("Slave_worker::checkpoint_positions");
bool error= FALSE;
+ if (ptr_g->checkpoint_log_name != NULL)
+ {
+ strmake(checkpoint_log_name, ptr_g->checkpoint_log_name,
+ sizeof(checkpoint_log_name) - 1);
+ checkpoint_log_pos= ptr_g->checkpoint_log_pos;
+
+ my_free(ptr_g->checkpoint_log_name);
+ ptr_g->checkpoint_log_name= NULL;
+ }
+
+ // TODO: update the group bitmap ptr_g->checkpoint_seqno 'th bit
+
group_relay_log_pos= ev->future_event_relay_log_pos;
group_master_log_pos= ev->log_pos;
strmake(group_master_log_name, c_rli->get_group_master_log_name(),
@@ -502,16 +515,10 @@ void Slave_worker::slave_worker_ends_gro
strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
}
- // GAQ is updated with the checkpoint info
-
- // delete ptr_g->group_relay_log_name; // C allocated
- // ptr_g->group_relay_log_name= NULL; // mark freed
-
-
- // TODO: as it's same as ev->update_pos(w_rli) remove the latter.
-
if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
- commit_positions(ev);
+ {
+ commit_positions(ev, ptr_g);
+ }
ptr_g->group_master_log_pos= group_master_log_pos;
ptr_g->group_relay_log_pos= group_relay_log_pos;
@@ -536,7 +543,7 @@ void Slave_worker::slave_worker_ends_gro
my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
(uchar*) key + 1, key[0]);
- DBUG_ASSERT(entry && entry->usage != 0);
+ DBUG_ASSERT(entry && entry->usage != 0); // was used to break
DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
entry->usage--;
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-08 01:30:32 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-09 17:45:02 +0000
@@ -95,7 +95,7 @@ public:
typedef struct st_slave_job_group
{
- char *group_master_log_name; // This is used upon recovery.
+ char *group_master_log_name; // (actually redundant)
Dynamic_ids *db_ids; // This is used upon recovery.
my_off_t master_log_pos; // B-event log_pos
@@ -114,6 +114,11 @@ typedef struct st_slave_job_group
char *group_relay_log_name; // The value is last seen relay-log
ulong worker_id;
ulonglong total_seqno;
+
+ /* checkpoint coord are reset by CP and rotate:s */
+ uint checkpoint_seqno;
+ my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
+ char* checkpoint_log_name;
} Slave_job_group;
#define copy_job(from, to) \
@@ -274,6 +279,7 @@ public:
volatile int curr_jobs; // the current assignments
ulong usage_partition; // number of different partitions handled by this worker
volatile bool relay_log_change_notified; // Coord sets and resets, W can read
+ volatile bool checkpoint_notified; // Coord sets and resets, W can read
bool wq_overrun_set; // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
/*
We need to make this a dynamic field. /Alfranio
@@ -283,6 +289,8 @@ public:
ulonglong group_relay_log_pos;
char group_master_log_name[FN_REFLEN];
ulonglong group_master_log_pos;
+ char checkpoint_log_name[FN_REFLEN];
+ ulonglong checkpoint_log_pos;
int init_info();
void end_info();
@@ -292,7 +300,7 @@ public:
void slave_worker_ends_group(Log_event*, int); // CGEP walk through to upd APH
- bool commit_positions(Log_event *evt);
+ bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
private:
bool read_info(Rpl_info_handler *from);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-08 13:59:07 +0000
+++ b/sql/rpl_slave.cc 2010-12-09 17:45:02 +0000
@@ -3751,6 +3751,8 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
rli->info_thd->awake(THD::KILL_QUERY); // notify Crdn
mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+ // Todo: add necessary stuff to clean up after Q-log-event, a Q trans
+ rli->cleanup_context(thd, error);
}
mysql_mutex_lock(&w->jobs_lock);
@@ -3932,6 +3934,8 @@ bool mts_checkpoint_routine(Relay_log_in
error= rli->flush_info(TRUE);
// end of commit_positions
+ rli->reset_notified_checkpoint();
+
end:
set_timespec_nsec(rli->last_clock, 0);
@@ -3962,9 +3966,13 @@ int slave_start_single_worker(Relay_log_
Rpl_info_dummy *dummy_handler= new Rpl_info_dummy();
w->w_rli->set_rpl_info_handler(dummy_handler);
w->init_info();
+
+ // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
+ // entry->usage assert
+ w->curr_group_exec_parts->dynamic_ids.elements= 0;
w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
-
+ w->checkpoint_notified= FALSE;
w->w_rli->workers= rli->workers; // shallow copying is sufficient
w->w_rli->this_worker= w;
@@ -4053,6 +4061,8 @@ int slave_start_workers(Relay_log_info *
rli->mts_total_groups= 0;
rli->curr_group_seen_begin= 0;
rli->run_query_in_parallel= opt_slave_run_query_in_parallel;
+ rli->checkpoint_seqno= 0;
+
for (i= 0; i < n; i++)
{
if ((error= slave_start_single_worker(rli, i)))
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101209174502-qai8fb14z3hn96m4.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3237) WL#5569 | Andrei Elkin | 9 Dec |