#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped
3213 Andrei Elkin 2010-11-19
wl#5569
recovery interfaces for wl#5599 implementation.
The essence of this patch is to provide GAQ object implimentation
and valid life cycle.
The checkpoint handler prior to call store methods of wl#5599 is supposed
to invoke rli->gaq->move_queue_head(&rli->workers).
See a simulation of that near ev->update_pos() of the mail sql thread loop.
The checkpoint info is composed as instance of Slave_job_group to reside
as rli->gap->lwm.
Todo: uncomment
+ // delete ev; // after ev->update_pos() event is garbage
once the real checkpoint has been done.
Todo: the real implemention needs to take care of filing
Slave_job_group::update_current_binlog as initially so at time of executing
Rotate/FD methods.
+ // experimental checkpoint per each scheduling attempt
+ // logics of next_event()
+
+ rli->gaq->move_queue_head(&rli->workers);
@ sql/log_event.cc
Log_event::get_slave_worker_id() got shaped more to the final version with elements
necessary to rli->gaq lify cycle.
@ sql/log_event.h
Log_event::mts_group_cnt is added as a part of GAQ index propagation path
from C to W.
@ sql/rpl_rli.h
Further extension to RLI necessary to the distribution hash function (APH).
@ sql/rpl_rli_pdb.cc
Implementing circular_buffer_queue::*queue and few other methods incl
ulong Slave_committed_queue::move_queue_head()
the main concern for checkpoint.
@ sql/rpl_rli_pdb.h
Extending classes with few new member definitions necessary for GAQ interface / checkpoint / recovery.
@ sql/rpl_slave.cc
Simulation of the lwm-checkpoint and changes due to rpl_rli_pdb classes extensions.
modified:
sql/log_event.cc
sql/log_event.h
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-18 14:00:52 +0000
+++ b/sql/log_event.cc 2010-11-19 14:51:58 +0000
@@ -2201,13 +2201,14 @@ bool Log_event::contains_partition_info(
Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
{
+ Slave_worker *worker= NULL;
/* checking properties and perform corresponding actions */
// g
if (contains_partition_info())
{
// a lot of things inside `get_slave_worker_id'
- Slave_worker *worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
+ worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker;
}
@@ -2217,18 +2218,30 @@ Slave_worker *Log_event::get_slave_worke
// insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
// Group_cnt= rli->gaq->e;
// rli->gaq->en_queue({NULL, W_s});
+ Slave_job_group gaq_item=
+ {
+ //{NULL, log_pos},
+ log_pos,
+
+ worker->id, // todo: -> NULL and implement set_dynamic in DA
+
+ const_cast<Relay_log_info*>(rli)->mts_total_groups++};
+
+ rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &gaq_item);
+
+ DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
+
// B-event is appended to the Deferred Array associated with GCAP
+ // TODO: refine contains_partition_info() to not include BEGIN
}
// T
if (ends_group())
{
// assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
+ mts_group_cnt= rli->gaq->assigned_group_index;
- // cleanup: CGAP := nil
-
- // ev->group_cnt := GAQ.Group_cnt, so that
- // at procesing *Worker*.WQ.last_group_cnt := ev->group_cnt
+ // *TODO* cleanup: CGAP := nil
}
// todo: p. the first p-event clears CGAP. Each p-event is appened to DA.
@@ -2288,7 +2301,6 @@ static void * head_queue(Slave_jobs_queu
return a job item through a struct which point is supplied via argument.
*/
static Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
-
{
if (jobs->e == jobs->s)
{
@@ -2483,10 +2495,7 @@ int slave_worker_exec_job(Slave_worker *
}
if (ev->ends_group())
{
- w->slave_worker_ends_group();
-
- // TODO: GAQ related
- // w->last_group_done_index = ev->group_cnt
+ w->slave_worker_ends_group(ev->mts_group_cnt);
}
error= ev->do_apply_event(rli);
@@ -2499,7 +2508,9 @@ int slave_worker_exec_job(Slave_worker *
to remove w_rli w/a
*/
ev->update_pos(w->w_rli);
- delete ev; // after ev->update_pos() event is garbage
+
+ // delete ev; // after ev->update_pos() event is garbage
+
mysql_mutex_unlock(&w->jobs_lock);
/* statistics */
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-11-18 14:00:52 +0000
+++ b/sql/log_event.h 2010-11-19 14:51:58 +0000
@@ -992,6 +992,13 @@ public:
*/
ulong slave_exec_mode;
+ /**
+ Index in @c rli->gaq array to indicate a group that this event is purging.
+ The index is set by C:r to a group terminator event is checked by W at
+ the event execution. The indexed data represent the Worker progress status.
+ */
+ ulong mts_group_cnt;
+
#ifdef MYSQL_SERVER
THD* thd;
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-11-18 14:00:52 +0000
+++ b/sql/rpl_rli.h 2010-11-19 14:51:58 +0000
@@ -432,10 +432,13 @@ public:
int slave_pending_jobs_max;
Slave_worker *last_assigned_worker; // a hint to partitioning func for some events
Slave_committed_queue *gaq;
+ DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
Slave_worker* get_current_worker() const;
Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
+ ulonglong mts_total_groups; // total event groups distributed in current session
+ ulong least_occupied_worker; // ... todo: APH ...
/**
Helper function to do after statement completion.
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-11-18 14:00:52 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-11-19 14:51:58 +0000
@@ -331,7 +331,7 @@ Slave_worker *get_free_worker(DYNAMIC_AR
@c get_slave_worker().
Affected by the being committed group APH tuples are updated.
-
+ @c last_group_done_index member is set to the arg value.
for each D_i in CGEP
assert (W_id == APH.W_id|P_d == D_i)
update APH set U-- where P_d = D_i
@@ -341,6 +341,166 @@ Slave_worker *get_free_worker(DYNAMIC_AR
CGEP the Worker partition cache is cleaned up.
*/
-void Slave_worker::slave_worker_ends_group()
+void Slave_worker::slave_worker_ends_group(ulong gaq_idx)
+{
+ last_group_done_index = gaq_idx;
+}
+
+
+/**
+ Class circular_buffer_queue
+*/
+
+ulong circular_buffer_queue::de_queue(uchar *val)
+{
+ ulong ret;
+ if (e == s)
+ {
+ DBUG_ASSERT(len == 0);
+ return (ulong) -1;
+ }
+
+ ret= e;
+ get_dynamic(&Q, val, e);
+ len--;
+
+ // pre boundary cond
+ if (a == s)
+ a= e;
+ e= (e + 1) % s;
+
+ // post boundary cond
+ if (a == e)
+ e= s;
+
+ DBUG_ASSERT(e == s ||
+ (len == (a >= e)? (a - e) :
+ (s + a - e)));
+
+ return ret;
+}
+
+ulong circular_buffer_queue::en_queue(void *item)
+{
+ ulong ret;
+ if (a == s)
+ {
+ DBUG_ASSERT(a == Q.elements);
+ return (ulong) -1;
+ }
+
+ // store
+
+ ret= a;
+ set_dynamic(&Q, (uchar*) item, ret= a);
+
+
+ // pre-boundary cond
+ if (e == s)
+ e= a;
+
+ a= (a + 1) % s;
+ len++;
+
+ // post-boundary cond
+ if (a == e)
+ a= s;
+ DBUG_ASSERT(a == e ||
+ len == (a >= e) ?
+ (a - e) : (s + a - e));
+ return ret;
+}
+
+void* circular_buffer_queue::head_queue()
+{
+ uchar *ret= NULL;
+ if (e == s)
+ {
+ DBUG_ASSERT(len == 0);
+ }
+ else
+ {
+ get_dynamic(&Q, (uchar*) ret, e);
+ }
+ return (void*) ret;
+}
+
+/**
+ two index comparision.
+
+ @note The caller makes sure the args are within the valid
+ range, incl cases the queue is empty or full.
+
+ @return TRUE if the first arg identifies a queue entity ordered
+ before one defined by the 2nd arg,
+ FALSE otherwise.
+*/
+bool circular_buffer_queue::gt(ulong i, ulong k)
+{
+ if (i >= e)
+ if (k >= e)
+ return i > k;
+ else
+ return FALSE;
+ else
+ if (k >= e)
+ return TRUE;
+ else
+ return i > k;
+}
+
+/**
+ The queue is processed from the head item by item
+ to purge items representing committed groups.
+ Progress of each Worker is monitored through @c last_done
+ and @c last_group_done_index.
+ It's compared first against the polled
+ to break out of the loop at once if no progress.
+
+
+ The caller is expected to be the checkpoint handler.
+
+ A copy of the last discarded item containing
+ the refreshed value of the committed low-water-mark is stored
+ into @c lwm member for further caller's processing.
+
+
+ @return number of discarded items
+*/
+ulong Slave_committed_queue::move_queue_head(DYNAMIC_ARRAY *ws)
{
+ ulong i, cnt= 0;
+ for (i= e; i != a && !empty();)
+ {
+ Slave_worker *w_i;
+ Slave_job_group g;
+ ulong l;
+ get_dynamic(&Q, (uchar *) &g, i);
+ get_dynamic(ws, (uchar *) &w_i, g.worker_id);
+ get_dynamic(&last_done, (uchar *) &l, w_i->id);
+
+ DBUG_ASSERT(l <= s);
+
+ if (l == w_i->last_group_done_index)
+ break; /* no progress case */
+
+ DBUG_ASSERT(w_i->last_group_done_index >= i ||
+ (((i > a && e > a) || a == s) && (w_i->last_group_done_index < a)));
+
+ if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
+ {
+ ulong ind= de_queue((uchar*) &lwm);
+
+ DBUG_ASSERT(ind == i);
+ DBUG_ASSERT(g.total_seqno == lwm.total_seqno);
+
+ set_dynamic(&last_done, (uchar*) &i, w_i->id);
+ }
+ else
+ break;
+ cnt++;
+ i= (i + 1) % s;
+ }
+
+ return cnt;
}
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-11-18 14:00:52 +0000
+++ b/sql/rpl_rli_pdb.h 2010-11-19 14:51:58 +0000
@@ -25,24 +25,68 @@ typedef struct slave_job_item
void *data;
} Slave_job_item;
+/**
+ The class defines a type of queue with a predefined max size that is
+ implemented using the circular memory buffer.
+ That is items of the queue are accessed as indexed elements of
+ the array buffer in a way that when the index value reaches
+ a max value it wraps around to point to the first buffer element.
+*/
class circular_buffer_queue
{
public:
DYNAMIC_ARRAY Q;
- ulong s;
- ulong a;
- ulong e;
+ ulong s; // the Size of the queue in terms of element
+ ulong a; // first Available index to append at (next to tail)
+ ulong e; // the head index
volatile ulong len; // it is also queried to compute least occupied
circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
s(max), a(0), e(max), len(0)
{
+ DBUG_ASSERT(s < ULONG_MAX);
my_init_dynamic_array(&Q, el_size, s, alloc_inc);
- };
- circular_buffer_queue () {};
+ }
+ circular_buffer_queue () {}
+ ~circular_buffer_queue () { delete_dynamic(&Q); }
+
+ /**
+ Content of the being dequeued item is copied to the arg-pointer
+ location.
+
+ @return the queue's array index that the de-queued item
+ locates at, or
+ an error encoded in beyond the index legacy range.
+ */
+ ulong de_queue(uchar *);
+
+ /**
+ return the index where the arg item locates
+ or an error encoded as a value in beyond of the legacy range
+ [0, circular_buffer_max_index].
+
+ Todo: define the range.
+ */
+ ulong en_queue(void *item);
+ /**
+ return the value of @c data member of the head of the queue.
+ */
+ void* head_queue();
+ bool gt(ulong i, ulong k); // comparision of ordering of two entities
+ bool empty() { return e == s; }
+ bool full() { return a == s; }
};
+typedef struct st_slave_job_group
+{
+ //struct event_coordinates coord;
+ my_off_t pos; // filename in Slave_committed_queue::current_binlog[]
+
+ ulong worker_id;
+ ulonglong total_seqno;
+} Slave_job_group;
+
/**
Group Assigned Queue whose first element identifies first gap
in committed sequence. The head of the queue is therefore next to
@@ -51,22 +95,43 @@ public:
class Slave_committed_queue : public circular_buffer_queue
{
public:
- // Allocation of file_name that is common for all Slave_assigned_job_group:s
+
+ /* Allocation of file_name that is common for all Slave_assigned_job_group:s */
char current_binlog[FN_REFLEN];
- void update_current_binlog(const char *post_rotate); //master's Rotate exec it
- Slave_committed_queue (const char *log, uint el_size, ulong max, uint inc= 0)
+
+ /* master's Rot-ev exec */
+ void update_current_binlog(const char *post_rotate);
+
+ /*
+ The last checkpoint time Low-Water-Mark
+ */
+ Slave_job_group lwm;
+
+ /* last time processed indexes for each worker */
+ DYNAMIC_ARRAY last_done;
+
+ /* the being assigned group index in GAQ */
+ ulong assigned_group_index;
+
+ Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
+ uint inc= 0)
: circular_buffer_queue(el_size, max, inc)
{
+ uint k;
strmake(current_binlog, log, sizeof(current_binlog) - 1);
- };
-};
+ my_init_dynamic_array(&last_done, sizeof(s), n, 0);
+ for (k= 0; k < n; k++)
+ insert_dynamic(&last_done, (uchar*) &s); // empty for each Worker
+ }
+
+ ~Slave_committed_queue ()
+ {
+ delete_dynamic(&last_done);
+ }
-typedef struct st_slave_assigned_job_group
-{
- struct event_coordinates coord;
- ulong worker_id;
- ulonglong total_seqno;
-} Slave_assigned_job_group;
+ /* Checkpoint routine refreshes the queue */
+ ulong move_queue_head(DYNAMIC_ARRAY *ws);
+};
class Slave_jobs_queue : public circular_buffer_queue
{
@@ -90,9 +155,10 @@ public:
// @c last_group_done_index is for recovery, although can be viewed
// as statistics as well.
// C marks a T-event with the incremented group_cnt that is
- // an index in GAQ; W stores it at the event execution.
+ // an index in GAQ; W stores it at the event execution.
// C polls the value periodically to maintain an array
// of the indexes in order to progress on GAQ's lwm, see @c next_event().
+ // see @c Log_event::group_cnt.
volatile ulong last_group_done_index; // it's index in GAQ
List<Log_event> data_in_use; // events are still in use by SQL thread
@@ -126,7 +192,7 @@ public:
size_t get_number_worker_fields();
- void slave_worker_ends_group(); // CGEP walk through to upd APH
+ void slave_worker_ends_group(ulong); // CGEP walk through to upd APH
private:
bool read_info(Rpl_info_handler *from);
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-18 14:50:54 +0000
+++ b/sql/rpl_slave.cc 2010-11-19 14:51:58 +0000
@@ -2676,6 +2676,15 @@ int apply_event_and_update_pos(Log_event
// if (!rli->is_parallel_exec() || ev->only_serial_exec())
error= ev->update_pos(rli);
+#if 1
+
+ // experimental checkpoint per each scheduling attempt
+ // logics of next_event()
+
+ rli->gaq->move_queue_head(&rli->workers);
+
+#endif
+
#ifndef DBUG_OFF
DBUG_PRINT("info", ("update_pos error = %d", error));
if (!rli->belongs_to_client())
@@ -2852,7 +2861,7 @@ static int exec_relay_log_event(THD* thd
if (thd->variables.binlog_rows_query_log_events)
handle_rows_query_log_event(ev, rli);
- if (!rli->is_parallel_exec() &&
+ if (!rli->is_parallel_exec() && !ev->only_serial_exec() &&
ev->get_type_code() != ROWS_QUERY_LOG_EVENT) // mts todo: check this case
{
@@ -3648,7 +3657,8 @@ int slave_start_workers(Relay_log_info *
uint i;
int error= 0;
- // TODO: CGEP dynarray holds id:s of partitions of the Current being executed Group
+ // CGAP dynarray holds id:s of partitions of the Current being executed Group
+ my_init_dynamic_array(&rli->curr_group_assigned_parts, NAME_LEN, SLAVE_INIT_DBS_IN_GROUP, 1);
// GAQ queue holds seqno:s of scheduled groups. C polls workers in
// @c lwm_checkpoint_period to update GAQ (see @c @next_event())
@@ -3657,8 +3667,9 @@ int slave_start_workers(Relay_log_info *
// ::slave_max_pending_jobs is the worst case when all jobs contain
// one event and map to one worker.
rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
- sizeof(Slave_assigned_job_group),
- ::slave_max_pending_jobs);
+ sizeof(Slave_job_group),
+ ::slave_max_pending_jobs, n);
+ rli->mts_total_groups= 0;
for (i= 0; i < n; i++)
{
uint k;
Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101119145158-e984dthkb1ussm1b.bundle
| Thread |
|---|
| • bzr commit into mysql-next-mr.crash-safe branch (andrei.elkin:3213) WL#5569 | Andrei Elkin | 19 Nov |