#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped
3229 Andrei Elkin 2010-12-04
merging from the repo wl5569
modified:
mysql-test/extra/rpl_tests/rpl_parallel_load.test
sql/log_event.cc
sql/rpl_rli.cc
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/share/errmsg-utf8.txt
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-12-02 17:46:46 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test 2010-12-04 17:14:50 +0000
@@ -91,6 +91,11 @@ while($i)
# Exec log position is not accurate in the prototype
--sleep 2
+--disable_query_log
+--disable_result_log
+###select sleep(300);
+--enable_result_log
+--enable_query_log
sync_slave_with_master;
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-12-02 18:13:12 +0000
+++ b/sql/log_event.cc 2010-12-04 17:14:50 +0000
@@ -2211,6 +2211,7 @@ Slave_worker *Log_event::get_slave_worke
if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
{
g.master_log_pos= log_pos;
+ g.group_master_log_pos= g.group_relay_log_pos= 0;
g.group_relay_log_name= NULL;
g.worker_id= (ulong) -1;
g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
@@ -2284,6 +2285,8 @@ Slave_worker *Log_event::get_slave_worke
uint i;
mts_group_cnt= rli->gaq->assigned_group_index;
+ // TODO: throw an error when relay-log reading starts from inside of a group!!
+
if (!worker->relay_log_change_notified)
{
/*
@@ -2300,8 +2303,10 @@ Slave_worker *Log_event::get_slave_worke
DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
ptr_g->group_relay_log_name= (char *)
- my_malloc(strlen(const_cast<Relay_log_info*>(rli)->get_group_relay_log_name()) + 1, MYF(MY_WME));
- strcpy(ptr_g->group_relay_log_name, const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
+ my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+ get_group_relay_log_name()) + 1, MYF(MY_WME));
+ strcpy(ptr_g->group_relay_log_name,
+ const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
DBUG_ASSERT(ptr_g->group_relay_log_name != NULL);
@@ -2416,15 +2421,15 @@ void append_item_to_jobs(slave_job_item
Slave_worker *w, Relay_log_info *rli)
{
THD *thd= rli->info_thd;
- int ret= -1;
- ulonglong new_pend_size= rli->mts_pending_jobs_size +
- ((Log_event*) (job_item->data))->data_written;
+ int ret;
+ ulong ev_size= ((Log_event*) (job_item->data))->data_written;
+ ulonglong new_pend_size;
DBUG_ASSERT(thd == current_thd);
thd_proc_info(thd, "Feeding an event to a worker thread");
mysql_mutex_lock(&rli->pending_jobs_lock);
-
+ new_pend_size= rli->mts_pending_jobs_size + ev_size;
// C waits basing on *data* sizes in the queues
while (new_pend_size > rli->mts_pending_jobs_size_max)
{
@@ -2440,11 +2445,12 @@ void append_item_to_jobs(slave_job_item
wait_info);
mysql_cond_wait(&rli->pending_jobs_cond, &rli->pending_jobs_lock);
thd->exit_cond(old_msg);
- mysql_mutex_lock(&rli->pending_jobs_lock);
if (thd->killed)
return;
- new_pend_size= rli->mts_pending_jobs_size +
- ((Log_event*) (job_item->data))->data_written;
+
+ mysql_mutex_lock(&rli->pending_jobs_lock);
+
+ new_pend_size= rli->mts_pending_jobs_size + ev_size;
}
rli->pending_jobs++;
rli->mts_pending_jobs_size= new_pend_size;
@@ -2463,6 +2469,8 @@ void append_item_to_jobs(slave_job_item
my_sleep(nap_weight * rli->mts_coordinator_basic_nap);
}
+ ret= -1;
+
mysql_mutex_lock(&w->jobs_lock);
// possible WQ overfill
@@ -2495,9 +2503,10 @@ void append_item_to_jobs(slave_job_item
else
{
mysql_mutex_unlock(&w->jobs_lock);
+
mysql_mutex_lock(&rli->pending_jobs_lock);
rli->pending_jobs--; // roll back of the prev incr
- rli->mts_pending_jobs_size -= new_pend_size;
+ rli->mts_pending_jobs_size -= ev_size;
mysql_mutex_unlock(&rli->pending_jobs_lock);
}
}
@@ -2651,6 +2660,7 @@ int slave_worker_exec_job(Slave_worker *
job_item= pop_jobs_item(w, job_item);
if (thd->killed)
{
+ // de-queueing and decrement counters is in the caller's exit branch
error= -1;
goto err;
}
@@ -2701,9 +2711,7 @@ int slave_worker_exec_job(Slave_worker *
{
DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed %lu", ev->mts_group_cnt, w->last_group_done_index));
- w->slave_worker_ends_group(ev->mts_group_cnt, error); /* last done sets post exec */
- if (!(ev->get_type_code() == XID_EVENT && w->is_transactional()))
- w->commit_positions(ev);
+ w->slave_worker_ends_group(ev, error); /* last done sets post exec */
}
mysql_mutex_lock(&w->jobs_lock);
@@ -2720,6 +2728,7 @@ int slave_worker_exec_job(Slave_worker *
preserving signatures of existing methods.
todo: convert update_pos(w->w_rli) -> update_pos(w)
to remove w_rli w/a
+ TODO: remove ?
*/
if (!error)
ev->update_pos(w->w_rli);
@@ -2769,16 +2778,13 @@ int slave_worker_exec_job(Slave_worker *
mysql_cond_signal(&rli->pending_jobs_cond);
}
- //DBUG_ASSERT(rli->pending_jobs < rli->slave_pending_jobs_max);
- // if (rli->pending_jobs == rli->slave_pending_jobs_max - 1 ||
- // rli->pending_jobs == 0)
- // mysql_cond_signal(&rli->pending_jobs_cond);
-
mysql_mutex_unlock(&rli->pending_jobs_lock);
w->stmt_jobs++;
+
err:
+ // TODO!!! ANDREI to RESTORE
// if (!ev)
// delete ev; // after ev->update_pos() event is garbage
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-12-03 16:56:11 +0000
+++ b/sql/rpl_rli.cc 2010-12-04 17:14:50 +0000
@@ -152,7 +152,6 @@ void Relay_log_info::init_workers(ulong
{
uint wi= 0;
- slave_parallel_workers= n;
/*
Parallel slave parameters initialization is done regardless
whether the feature is or going to be active or not.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-12-03 10:15:45 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-12-04 17:14:50 +0000
@@ -138,7 +138,8 @@ bool Slave_worker::write_info(Rpl_info_h
*/
if (to->prepare_info_for_write() ||
- to->set_info(curr_group_exec_parts) ||
+ //to->set_info(curr_group_exec_parts) ||
+ to->set_info("") ||
to->set_info(group_relay_log_name) ||
to->set_info((ulong)group_relay_log_pos) ||
to->set_info(group_master_log_name) ||
@@ -474,9 +475,10 @@ Slave_worker *get_least_occupied_worker(
TODO: reclaim space if the actual size exceeds the limit.
*/
-void Slave_worker::slave_worker_ends_group(ulong gaq_idx, int error)
+void Slave_worker::slave_worker_ends_group(Log_event* ev, int error)
{
int i;
+ ulong gaq_idx= ev->mts_group_cnt;
if (!error)
{
@@ -495,12 +497,27 @@ void Slave_worker::slave_worker_ends_gro
<= sizeof(group_relay_log_name));
strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
- delete ptr_g->group_relay_log_name; // C allocated
- ptr_g->group_relay_log_name= NULL; // mark freed
}
- last_group_done_index = gaq_idx;
+
+ // GAQ is updated with the checkpoint info
+
+ // delete ptr_g->group_relay_log_name; // C allocated
+ // ptr_g->group_relay_log_name= NULL; // mark freed
+
+
+ // TODO: as it's same as ev->update_pos(w_rli) remove the latter.
+
+ if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
+ commit_positions(ev);
+
+ ptr_g->group_master_log_pos= group_master_log_pos;
+ ptr_g->group_relay_log_pos= group_relay_log_pos;
+
+ last_group_done_index = gaq_idx; // GAQ index is available to C now
}
+ // cleanup relating to the last executed group regardless of error
+
for (i= curr_group_exec_parts->dynamic_ids.elements; i > 0; i--)
{
db_worker *entry= NULL;
@@ -531,12 +548,6 @@ void Slave_worker::slave_worker_ends_gro
}
else
DBUG_ASSERT(usage_partition != 0);
- /*
- TODO:
- if U == 0 \and count(APH) > max
- delete from APH where U = 0;
- delete entry;
- */
mysql_mutex_unlock(&slave_worker_hash_lock);
@@ -659,13 +670,14 @@ bool circular_buffer_queue::gt(ulong i,
It's compared first against the polled
to break out of the loop at once if no progress.
-
- The caller is expected to be the checkpoint handler.
+ The caller is supposed to be the checkpoint handler.
A copy of the last discarded item containing
the refreshed value of the committed low-water-mark is stored
- into @c lwm member for further caller's processing.
+ into @c lwm container member for further caller's processing.
+ @note dyn-allocated members of Slave_job_group such as
+ group_relay_log_name as freed here.
@return number of discarded items
*/
@@ -675,12 +687,15 @@ ulong Slave_committed_queue::move_queue_
for (i= e; i != a && !empty();)
{
Slave_worker *w_i;
- Slave_job_group g;
+ Slave_job_group *ptr_g;
ulong l;
- get_dynamic(&Q, (uchar *) &g, i);
- if (g.worker_id == (ulong) -1)
+ char grl_name[FN_REFLEN];
+
+ grl_name[0]= 0;
+ ptr_g= (Slave_job_group *) dynamic_array_ptr(&Q, i);
+ if (ptr_g->worker_id == (ulong) -1)
break; /* the head is not even assigned */
- get_dynamic(ws, (uchar *) &w_i, g.worker_id);
+ get_dynamic(ws, (uchar *) &w_i, ptr_g->worker_id);
get_dynamic(&last_done, (uchar *) &l, w_i->id);
DBUG_ASSERT(l <= s);
@@ -691,9 +706,29 @@ ulong Slave_committed_queue::move_queue_
DBUG_ASSERT(w_i->last_group_done_index >= i ||
(((i > a && e > a) || a == s) && (w_i->last_group_done_index < a)));
+ // memorize the last met group_relay_log_name
+ if (ptr_g->group_relay_log_name)
+ {
+ strcpy(grl_name, ptr_g->group_relay_log_name);
+ my_free(ptr_g->group_relay_log_name);
+ ptr_g->group_relay_log_name= NULL; // mark freed
+ }
+
if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
{
- ulong ind= de_queue((uchar*) &lwm);
+ Slave_job_group g;
+ ulong ind= de_queue((uchar*) &g);
+
+ // stored the memorized name into result struct
+ if (grl_name[0] != 0)
+ strcpy(lwm.group_relay_log_name, grl_name);
+ else
+ lwm.group_relay_log_name[0]= 0;
+
+ DBUG_ASSERT(!ptr_g->group_relay_log_name);
+
+ g.group_relay_log_name= lwm.group_relay_log_name;
+ lwm= g; // the result struct is done for the current iteration
/* todo/fixme: the least occupied sorting out can be triggered here */
/* e.g
@@ -706,7 +741,7 @@ ulong Slave_committed_queue::move_queue_
}
*/
DBUG_ASSERT(ind == i);
- DBUG_ASSERT(g.total_seqno == lwm.total_seqno);
+ DBUG_ASSERT(ptr_g->total_seqno == lwm.total_seqno);
set_dynamic(&last_done, (uchar*) &i, w_i->id);
}
@@ -751,7 +786,7 @@ int wait_for_workers_to_finish(Relay_log
thd->exit_cond(proc_info);
ret++;
- DBUG_ASSERT(entry->usage == 0);
+ DBUG_ASSERT(entry->usage == 0 || thd->killed);
}
else
{
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-12-01 19:15:08 +0000
+++ b/sql/rpl_rli_pdb.h 2010-12-04 17:14:50 +0000
@@ -91,7 +91,9 @@ public:
typedef struct st_slave_job_group
{
- my_off_t master_log_pos;
+ my_off_t master_log_pos; // B-event log_pos
+ my_off_t group_master_log_pos; // T-event lop_pos filled by W for CheckPoint
+ my_off_t group_relay_log_pos; // filled by W
/*
When RL name changes C allocates and fill in a new name of RL,
@@ -99,11 +101,10 @@ typedef struct st_slave_job_group
C keeps track of each Worker has been notified on the updating
to make sure the routine runs once per change.
- W checks the value at commit and memoriezes a not-NULL
- with prior freeing old one's allocation. The memorized value
- plays its role at commit until a new has arrived.
+ W checks the value at commit and memoriezes a not-NULL.
+ Freeing unless NULL is left to C at CheckPoint.
*/
- char *group_relay_log_name;
+ char *group_relay_log_name; // The value is last seen relay-log
ulong worker_id;
ulonglong total_seqno;
} Slave_job_group;
@@ -139,11 +140,14 @@ public:
my_init_dynamic_array(&last_done, sizeof(s), n, 0);
for (k= 0; k < n; k++)
insert_dynamic(&last_done, (uchar*) &s); // empty for each Worker
+ lwm.group_relay_log_name= (char *) my_malloc(FN_REFLEN + 1, MYF(0));
+ lwm.group_relay_log_name[0]= 0;
}
~Slave_committed_queue ()
{
delete_dynamic(&last_done);
+ my_free(lwm.group_relay_log_name);
}
/* Checkpoint routine refreshes the queue */
@@ -216,7 +220,7 @@ public:
size_t get_number_worker_fields();
- void slave_worker_ends_group(ulong, int); // CGEP walk through to upd APH
+ void slave_worker_ends_group(Log_event*, int); // CGEP walk through to upd APH
bool commit_positions(Log_event *evt);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-04 15:45:02 +0000
+++ b/sql/rpl_slave.cc 2010-12-04 17:14:50 +0000
@@ -168,7 +168,7 @@ static int terminate_slave_thread(THD *t
bool skip_lock);
static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-bool checkpoint_routine(Relay_log_info *rli);
+bool mts_checkpoint_routine(Relay_log_info *rli);
/*
Find out which replications threads are running
@@ -3571,6 +3571,8 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&rli->pending_jobs_lock);
rli->pending_jobs -= purge_cnt;
rli->mts_pending_jobs_size -= purge_size;
+ DBUG_ASSERT(rli->mts_pending_jobs_size < rli->mts_pending_jobs_size_max);
+
mysql_mutex_unlock(&rli->pending_jobs_lock);
mysql_mutex_lock(&w->jobs_lock);
@@ -3593,22 +3595,27 @@ err:
DBUG_RETURN(0);
}
-bool checkpoint_routine(Relay_log_info *rli)
+/**
+ Processing rli->gaq to find out the low-water-mark coordinates
+ stored into the cental recovery table.
+
+
+ @return FALSE success, TRUE otherwise
+*/
+bool mts_checkpoint_routine(Relay_log_info *rli)
{
bool error= FALSE;
+ ulong cnt;
DBUG_ENTER("checkpoint_routine");
- mysql_mutex_lock(&rli->data_lock);
-
- uint i;
- rli->gaq->move_queue_head(&rli->workers);
+ if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
+ DBUG_RETURN(error);
/* TODO:
- the least occupied sorting out needs moving to the actual
- checkpoint location - next_event()
+ to turn the least occupied selection in terms of jobs pieces
*/
- for (i= 0; i < rli->workers.elements; i++)
+ for (uint i= 0; i < rli->workers.elements; i++)
{
Slave_worker *w_i;
get_dynamic(&rli->workers, (uchar *) &w_i, i);
@@ -3616,6 +3623,21 @@ bool checkpoint_routine(Relay_log_info *
};
sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
+ // Coordinator::commit_positions() {
+
+ // Alfranio, rli->gaq->lwm contains all but rli->group_master_log_name
+
+ // group_master_log_name is updated only by Coordinator and it can't change
+ // within checkpoint interval because Coordinator flushes the updated value
+ // at once.
+
+ mysql_mutex_lock(&rli->data_lock);
+
+ rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
+ rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
+ if (rli->gaq->lwm.group_relay_log_name[0] != 0)
+ rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
+
error= rli->flush_info(TRUE);
mysql_mutex_unlock(&rli->data_lock);
@@ -3766,7 +3788,7 @@ void slave_stop_workers(Relay_log_info *
int i;
THD *thd= rli->info_thd;
- if (rli->workers.elements == 0)
+ if (rli->slave_parallel_workers == 0)
return;
for (i= rli->workers.elements - 1; i >= 0; i--)
@@ -3875,7 +3897,7 @@ pthread_handler_t handle_slave_sql(void
pthread_detach_this_thread();
/* mts-II: starting the worker pool */
- if (slave_start_workers(rli, opt_slave_parallel_workers) != 0)
+ if (slave_start_workers(rli, rli->slave_parallel_workers) != 0)
goto err;
if (init_slave_thread(thd, SLAVE_THD_SQL))
@@ -5321,7 +5343,7 @@ static Log_event* next_event(Relay_log_i
ulong diff= diff_timespec(rli->curr_clock, rli->last_clock);
if (diff > period)
{
- checkpoint_routine(rli);
+ mts_checkpoint_routine(rli);
set_timespec_nsec(rli->last_clock, 0);
}
set_timespec_nsec(waittime, period);
@@ -5707,6 +5729,12 @@ int start_slave(THD* thd , Master_info*
*/
if (thread_mask & SLAVE_SQL)
{
+ /*
+ To cache the system var value and used it in the following.
+ The system var can change but not the cached.
+ */
+ mi->rli->slave_parallel_workers= opt_slave_parallel_workers;
+
mysql_mutex_lock(&mi->rli->data_lock);
if (thd->lex->mi.pos)
@@ -5759,6 +5787,13 @@ int start_slave(THD* thd , Master_info*
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
ER_MISSING_SKIP_SLAVE,
ER(ER_MISSING_SKIP_SLAVE));
+ if (mi->rli->slave_parallel_workers != 0)
+ {
+ mi->rli->slave_parallel_workers= 0;
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE,
+ ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE,
+ ER(ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE));
+ }
}
mysql_mutex_unlock(&mi->rli->data_lock);
=== modified file 'sql/share/errmsg-utf8.txt'
--- a/sql/share/errmsg-utf8.txt 2010-10-25 10:39:01 +0000
+++ b/sql/share/errmsg-utf8.txt 2010-12-04 17:14:50 +0000
@@ -6399,3 +6399,6 @@ ER_RPL_INFO_DATA_TOO_LONG
eng "Data for column '%s' too long"
ER_CANT_LOCK_RPL_INFO_TABLE
eng "You can't use locks with rpl info tables."
+
+ER_NO_UNTIL_COND_WITH_PARALLEL_SLAVE
+ eng "Until condition is not supported in Parallel Slave. Slave is started in the sequential mode."
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101204171450-bvuh8o3qb4h1lkt7.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3229) | Andrei Elkin | 4 Dec |