#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3220 Andrei Elkin 2010-11-27
wl#5569
Providing relay-log name for wl#5599.
Protocol of action on the C and W sides is described in rpl_rli_pdb.h.
Erroring out in case of parallel exec and ROWS_QUERY_LOG_EVENT.
(todo: the native sequential mode for the event needs some revision, in particular
`delete ev' shall happen *always* in rli->cleanup_context not in two places as of current).
@ sql/log_event.cc
Erroring out in case of parallel exec and ROWS_QUERY_LOG_EVENT;
Deploying C role of handling relay-log name change;
@ sql/rpl_rli_pdb.cc
Providing relay-log name for wl#5599.
Freeing allocated memory for relay-log name at the end of the group execution by Worker.
@ sql/rpl_rli_pdb.h
Protocol of action on the C and W sides is here.
Removing current_binlog;
Adding a pointer group_relay_log_name member to st_slave_job_group.
modified:
sql/log_event.cc
sql/log_event.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-26 21:08:30 +0000
+++ b/sql/log_event.cc 2010-11-27 15:36:50 +0000
@@ -2181,7 +2181,7 @@ bool Log_event::contains_partition_info(
r - a mini-group internal "regular" event that follows its g-parent
(Write, Update, Delete -rows)
S - sequentially applied event (may not be a part of any group).
- Events of this type are determined via @c only_serial_exec()
+ Events of this type are determined via @c only_sequential_exec()
earlier and don't cause calling this method .
T - terminator of the group (XID, COMMIT, ROLLBACK)
@@ -2215,6 +2215,7 @@ Slave_worker *Log_event::get_slave_worke
// rli->gaq->en_queue({NULL, W_s});
g= {
log_pos,
+ NULL,
(ulong) -1,
const_cast<Relay_log_info*>(rli)->mts_total_groups++
};
@@ -2280,12 +2281,35 @@ Slave_worker *Log_event::get_slave_worke
if (ends_group() || !rli->curr_group_seen_begin)
{
uint i;
- // assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
mts_group_cnt= rli->gaq->assigned_group_index;
-
+
+ if (!worker->relay_log_change_notified)
+ {
+ /*
+ Prior this event, C rotated the relay log to drop each
+ Worker's notified flag.
+ Now group terminating event initiates the new name
+ delivery through the current group relaylog slot in GAQ.
+ */
+
+ Slave_job_group *ptr_g=
+ (Slave_job_group *)
+ dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+
+ DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
+
+ ptr_g->group_relay_log_name= (char *)
+ my_malloc(strlen(const_cast<Relay_log_info*>(rli)->get_group_relay_log_name()) + 1, MYF(MY_WME));
+ strcpy(ptr_g->group_relay_log_name, const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
+ worker->relay_log_change_notified= TRUE;
+ }
+
DBUG_ASSERT(worker == rli->last_assigned_worker);
+
if (!worker)
{
+ DBUG_ASSERT(0);
+
// a very special case of the empty group: {B, T}
DBUG_ASSERT(rli->curr_group_assigned_parts.elements == 0
&& rli->curr_group_da.elements == 1);
@@ -2450,7 +2474,7 @@ int Log_event::apply_event(Relay_log_inf
bool parallel;
if (!(parallel= rli->is_parallel_exec()) ||
- only_serial_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
+ only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
{
if (parallel)
{
@@ -2478,15 +2502,18 @@ int Log_event::apply_event(Relay_log_inf
DBUG_ASSERT(!rli->curr_group_seen_begin);
c_rli->curr_group_is_parallel= FALSE; // Coord will destruct all the rest of events
-
- (void) wait_for_workers_to_finish(rli);
+ if (!parallel_exec_by_coordinator(::server_id))
+ (void) wait_for_workers_to_finish(rli);
}
DBUG_RETURN(do_apply_event(rli));
}
- // !!! TODO: suppress
- // if (get_type_code() == ROWS_QUERY_LOG_EVENT)
-
+ if (get_type_code() == ROWS_QUERY_LOG_EVENT)
+ {
+ rli->report(ERROR_LEVEL, 0,
+ "No parallel support for ROWS_QUERY_LOG_EVENT");
+ DBUG_RETURN(1);
+ }
if ((!(w= get_slave_worker_id(rli)) ||
DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-11-26 21:08:30 +0000
+++ b/sql/log_event.h 2010-11-27 15:36:50 +0000
@@ -1155,9 +1155,10 @@ public:
/**
MST: to execute serially due to technical or conceptual limitation
- @return TRUE for all but {Query,Rand,User_var,Intvar,Rows}_log_event
+ @return TRUE if despite permanent parallel execution mode an event
+ needs applying in a real isolation that is sequentially.
*/
- bool only_serial_exec(bool query_in_parallel, bool group_term_in_parallel)
+ bool only_sequential_exec(bool query_in_parallel, bool group_term_in_parallel)
{
return
/*
@@ -1194,7 +1195,20 @@ public:
get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
get_type_code() == INCIDENT_EVENT;
}
-
+
+ /**
+ MST: some events can be applied by Coordinator concurrently with Workers.
+
+ @return TRUE if that's the case,
+ FALSE otherwise.
+ */
+ bool parallel_exec_by_coordinator(ulong slave_server_id)
+ {
+ return
+ get_type_code() == FORMAT_DESCRIPTION_EVENT &&
+ (server_id == (uint32) ::server_id);
+ }
+
/**
Events of a cetain type carry partitioning data such as db names.
*/
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-11-26 21:08:30 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-11-27 15:36:50 +0000
@@ -448,7 +448,23 @@ void Slave_worker::slave_worker_ends_gro
uint i;
if (!error)
+ {
+ Slave_job_group *ptr_g=
+ (Slave_job_group *)
+ dynamic_array_ptr(&c_rli->gaq->Q, c_rli->gaq->assigned_group_index);
+ if (ptr_g->group_relay_log_name != NULL)
+ {
+ // memorizing a new relay-log file name
+
+ DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
+ <= sizeof(group_relay_log_name));
+
+ strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
+ delete ptr_g->group_relay_log_name; // C allocated
+ ptr_g->group_relay_log_name= NULL; // mark freed
+ }
last_group_done_index = gaq_idx;
+ }
for (i= curr_group_exec_parts.elements; i > 0; i--)
{
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-11-26 21:08:30 +0000
+++ b/sql/rpl_rli_pdb.h 2010-11-27 15:36:50 +0000
@@ -91,9 +91,19 @@ public:
typedef struct st_slave_job_group
{
- //struct event_coordinates coord;
- my_off_t pos; // filename in Slave_committed_queue::current_binlog[]
+ my_off_t master_log_pos;
+ /*
+ When RL name changes C allocates and fill in a new name of RL,
+ otherwise it fills in NULL.
+ C keeps track of each Worker has been notified on the updating
+ to make sure the routine runs once per change.
+
+ W checks the value at commit and memoriezes a not-NULL
+ with prior freeing old one's allocation. The memorized value
+ plays its role at commit until a new has arrived.
+ */
+ char *group_relay_log_name;
ulong worker_id;
ulonglong total_seqno;
} Slave_job_group;
@@ -107,9 +117,6 @@ class Slave_committed_queue : public cir
{
public:
- /* Allocation of file_name that is common for all Slave_assigned_job_group:s */
- char current_binlog[FN_REFLEN];
-
/* master's Rot-ev exec */
void update_current_binlog(const char *post_rotate);
@@ -129,7 +136,6 @@ public:
: circular_buffer_queue(el_size, max, inc)
{
uint k;
- strmake(current_binlog, log, sizeof(current_binlog) - 1);
my_init_dynamic_array(&last_done, sizeof(s), n, 0);
for (k= 0; k < n; k++)
insert_dynamic(&last_done, (uchar*) &s); // empty for each Worker
@@ -188,6 +194,7 @@ public:
ulong trans_jobs; // how many jobs per trns
volatile int curr_jobs; // the current assignments
ulong usage_partition; // number of different partitions handled by this worker
+ volatile bool relay_log_change_notified; // Coord sets and resets, W can read
/*
We need to make this a dynamic field. /Alfranio
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-26 21:08:30 +0000
+++ b/sql/rpl_slave.cc 2010-11-27 15:36:50 +0000
@@ -2677,7 +2677,7 @@ int apply_event_and_update_pos(Log_event
// It was served so inside apply_event() above.
// The main RLI table is safe to update now.
-// if (!rli->is_parallel_exec() || ev->only_serial_exec())
+// if (!rli->is_parallel_exec() || ev->only_sequential_exec())
error= ev->update_pos(rli);
#if 1
@@ -2879,7 +2879,7 @@ static int exec_relay_log_event(THD* thd
handle_rows_query_log_event(ev, rli);
if ((!rli->is_parallel_exec() ||
- ev->only_serial_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
+ ev->only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
&& ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts TODO: check this case
{
@@ -3603,7 +3603,7 @@ err:
my_thread_end();
pthread_exit(0);
- DBUG_RETURN(0);
+ DBUG_RETURN(0);
}
/**
@@ -3632,6 +3632,8 @@ int slave_start_single_worker(Relay_log_
ulong key_worker_idx[]= { server_id, w->id };
w->init_info(key_worker_idx, NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER);
+ w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
+
// ALFRANIO --> The recovery procedure must be introduced here.
w->w_rli->workers= rli->workers; // shallow copying is sufficient
@@ -5352,6 +5354,13 @@ static Log_event* next_event(Relay_log_i
rli->flush_info(key_info_idx, key_info_size);
}
+ /* Reset the relay-log-change-notified status of Slave Workers */
+ for (uint i; i < rli->workers.elements; i++)
+ {
+ Slave_worker *w= (Slave_worker *) dynamic_array_ptr(&rli->workers, i);
+ w->relay_log_change_notified= FALSE;
+ }
+
/*
Now we want to open this next log. To know if it's a hot log (the one
being written by the I/O thread now) or a cold log, we can use
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101127153650-tizvu3c0ovmoj1g5.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3220) WL#5569 | Andrei Elkin | 27 Nov |