#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3285 Andrei Elkin 2011-05-28
wl#5569 MTS
This patch contains cleanup and simplification of logics of handling some events
sequentially by Coordinator and adds memory-allocation failure branch
to workers starting routine.
@ sql/log_event.cc
Cleanup and simplification of logics of handling some events sequentially
by Coordinator.
An event is marked as parallel or sequential through C's rli that affects
commit to info table by C as well as the event's destruction.
@ sql/rpl_rli.h
simplified (curr_group_is_parallel + curr_group_split) into curr_*event*_is_parallel.
@ sql/rpl_rli_pdb.h
Adding GAQ memory-allocation failure notification.
@ sql/rpl_slave.cc
simplified (curr_group_is_parallel + curr_group_split) into curr_event_is_parallel;
GAQ memory-allocation failure branch is added to workers starting routine.
modified:
sql/log_event.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-05-27 21:29:14 +0000
+++ b/sql/log_event.cc 2011-05-28 10:42:40 +0000
@@ -2368,6 +2368,7 @@ bool Log_event::contains_partition_info(
/**
General hashing function to compute the id of an applier for
the current event.
+ The event is marked as belonging to a Worker.
At computing the id few rules apply depending on partitioning properties
that the event instance can feature.
@@ -2405,6 +2406,8 @@ Slave_worker *Log_event::get_slave_worke
Slave_job_group g, *ptr_g;
bool is_b_event;
+ rli->curr_event_is_parallel= TRUE; // event belongs to a Worker
+
/* checking partioning properties and perform corresponding actions */
// Beginning of a group designated explicitly with BEGIN
@@ -2435,8 +2438,6 @@ Slave_worker *Log_event::get_slave_worke
// the last occupied GAQ's array index
gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
- // serves as a mark for Coord to delete events otherwise
- rli->curr_group_is_parallel= TRUE;
DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s);
DBUG_ASSERT(((Slave_job_group *)
@@ -2519,14 +2520,6 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
- /*
- Either old master binlog (todo: assert), or a specific "corner"
- case (todo: wrap with BEGIN/COMMIT on master anyway) of logging
- like SELECT sf(), where sf() has a side effect.
- */
- if (!rli->curr_group_seen_begin)
- rli->curr_group_is_parallel= TRUE;
-
// TODO: convert to C's private mem_root.
// Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
@@ -2579,12 +2572,13 @@ Slave_worker *Log_event::get_slave_worke
if (ends_group() || !rli->curr_group_seen_begin)
{
uint i;
+
+ // index of GAQ that this terminal event belongs to
mts_group_cnt= rli->gaq->assigned_group_index;
+
ptr_g= (Slave_job_group *)
dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
- DBUG_ASSERT(rli->curr_group_is_parallel);
-
// TODO: throw an error when relay-log reading starts from inside of a group!!
if (!worker->relay_log_change_notified)
@@ -2855,6 +2849,7 @@ int Log_event::apply_event(Relay_log_inf
{
if (parallel)
{
+ c_rli->curr_event_is_parallel= FALSE; // mark event as belonging to Coordinator
/*
There are two classes of events that Coordinator executes
itself. One e.g the master Rotate requires all Workers to finish up
@@ -2893,10 +2888,8 @@ int Log_event::apply_event(Relay_log_inf
DBUG_RETURN(-1);
}
/*
- Marking sure the event won't be executed in parallel.
- That affects memory deallocation in the following execution path.
+ Marking sure the event will be executed in sequential mode.
*/
- c_rli->curr_group_is_parallel= FALSE;
(void) wait_for_workers_to_finish(rli);
#ifndef DBUG_OFF
@@ -2910,19 +2903,6 @@ int Log_event::apply_event(Relay_log_inf
}
#endif
}
- else
- {
- if (rli->curr_group_is_parallel)
- {
- /*
- the event is artifical to splits the current group into separate
- relay-logs. Differently to the previous events of the group this one
- is applied by Coordinator and w/o any synchronization with Workers.
- */
- c_rli->curr_group_split= TRUE;
- c_rli->curr_group_is_parallel= FALSE;
- }
- }
}
DBUG_RETURN(do_apply_event(rli));
}
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2011-05-25 16:02:13 +0000
+++ b/sql/rpl_rli.h 2011-05-28 10:42:40 +0000
@@ -455,8 +455,7 @@ public:
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
ulonglong mts_total_groups; // total event groups distributed in current session
- bool curr_group_is_parallel; // an event to process by Coordinator
- bool curr_group_split; // an event split the current group forcing C to exec it
+ bool curr_event_is_parallel; // if FALSE the current event is processed by C
ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
ulong slave_parallel_workers; // the one slave session time number of workers
ulong recovery_parallel_workers; // number of workers while recovering.
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2011-05-27 21:29:14 +0000
+++ b/sql/rpl_rli_pdb.h 2011-05-28 10:42:40 +0000
@@ -166,6 +166,8 @@ typedef struct st_slave_job_group
class Slave_committed_queue : public circular_buffer_queue
{
public:
+
+ bool inited;
/* master's Rot-ev exec */
void update_current_binlog(const char *post_rotate);
@@ -183,10 +185,15 @@ public:
Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
uint inc= 0)
- : circular_buffer_queue(el_size, max, inc)
+ : circular_buffer_queue(el_size, max, inc), inited(FALSE)
{
uint k;
ulonglong l= 0;
+
+ if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
+ return;
+ else
+ inited= TRUE;
my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
for (k= 0; k < n; k++)
insert_dynamic(&last_done, (uchar*) &l); // empty for each Worker
@@ -196,8 +203,11 @@ public:
~Slave_committed_queue ()
{
- delete_dynamic(&last_done);
- my_free(lwm.group_relay_log_name);
+ if (inited)
+ {
+ delete_dynamic(&last_done);
+ my_free(lwm.group_relay_log_name);
+ }
}
/* Checkpoint routine refreshes the queue */
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2011-05-27 21:29:14 +0000
+++ b/sql/rpl_slave.cc 2011-05-28 10:42:40 +0000
@@ -2842,7 +2842,7 @@ int apply_event_and_update_pos(Log_event
See sql/rpl_rli.h for further details.
*/
int error= 0;
- if (skip_event || !rli->is_parallel_exec() || !rli->curr_group_is_parallel)
+ if (skip_event || !rli->is_parallel_exec() || !rli->curr_event_is_parallel)
{
#ifndef DBUG_OFF
/*
@@ -3034,17 +3034,10 @@ static int exec_relay_log_event(THD* thd
exec_res= apply_event_and_update_pos(ev, thd, rli);
- if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
+ if ((!rli->is_parallel_exec() || !rli->curr_event_is_parallel))
{
- DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
+ DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_event_is_parallel ||
ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
-
- if (rli->curr_group_split)
- {
- // the current group split status is reset
- rli->curr_group_is_parallel= TRUE;
- rli->curr_group_split= FALSE;
- }
/*
Format_description_log_event should not be deleted because it will be
used to read info about the relay log's format; it will be deleted when
@@ -4171,6 +4164,9 @@ int slave_start_workers(Relay_log_info *
sizeof(Slave_job_group),
1 + rli->opt_slave_parallel_workers *
rli->mts_slave_worker_queue_len_max, n);
+ if (!rli->gaq->inited)
+ return 1;
+
rli->mts_pending_jobs_size= 0;
rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
rli->mts_wqs_underrun_w_id= (ulong) -1;
@@ -4180,9 +4176,8 @@ int slave_start_workers(Relay_log_info *
rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
rli->mts_total_groups= 0;
rli->curr_group_seen_begin= FALSE;
- rli->curr_group_is_parallel= FALSE;
+ rli->curr_event_is_parallel= FALSE;
rli->curr_group_isolated= FALSE;
- rli->curr_group_split= FALSE;
rli->checkpoint_seqno= 0;
/*
dyn memory to consume by Coordinator per event
@@ -4343,6 +4338,8 @@ pthread_handler_t handle_slave_sql(void
if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
{
mysql_mutex_unlock(&rli->run_lock);
+ rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
+ "Failed during slave workers initialization");
goto err;
}
if (init_slave_thread(thd, SLAVE_THD_SQL))
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110528104240-ofiubd6s1y8b9m6e.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3285) WL#5569 | Andrei Elkin | 31 May |