List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:November 19 2010 2:53pm
Subject:bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3212 to 3213)
WL#5569
View as plain text  
 3213 Andrei Elkin	2010-11-19
      wl#5569 
      
      recovery interfaces for wl#5599 implementation.
      
      The essence of this patch is to provide GAQ object implimentation
      and valid life cycle. 
      The checkpoint handler prior to call store methods of wl#5599 is supposed
      to invoke rli->gaq->move_queue_head(&rli->workers).
      
      See a simulation of that near ev->update_pos() of the mail sql thread loop.
      The checkpoint info is composed as instance of Slave_job_group to reside
      as rli->gap->lwm.
      
      
      Todo: uncomment 
      +  // delete ev;  // after ev->update_pos() event is garbage
      once the real checkpoint has been done.
      
      Todo: the real implemention needs to take care of filing
      Slave_job_group::update_current_binlog as initially so at time of executing
      Rotate/FD methods.
      
      
      +  // experimental checkpoint per each scheduling attempt
      +  // logics of next_event()
      +
      +    rli->gaq->move_queue_head(&rli->workers);
     @ sql/log_event.cc
        Log_event::get_slave_worker_id() got shaped more to the final version with elements
        necessary to rli->gaq lify cycle.
     @ sql/log_event.h
        Log_event::mts_group_cnt is added as a part of GAQ index propagation path 
        from C to W.
     @ sql/rpl_rli.h
        Further extension to RLI necessary to the distribution hash function (APH).
     @ sql/rpl_rli_pdb.cc
        Implementing circular_buffer_queue::*queue and few other methods incl
        ulong Slave_committed_queue::move_queue_head()
        the main concern for checkpoint.
     @ sql/rpl_rli_pdb.h
        Extending classes with few new member definitions necessary for GAQ interface / checkpoint / recovery.
     @ sql/rpl_slave.cc
        Simulation of the lwm-checkpoint and changes due to rpl_rli_pdb classes extensions.

    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
 3212 Andrei Elkin	2010-11-18
      wl#5569 wl#5599
      
      Recovery related. Prototyping the worker RLI instantiation, to be elaborated on.

    modified:
      sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-18 14:00:52 +0000
+++ b/sql/log_event.cc	2010-11-19 14:51:58 +0000
@@ -2201,13 +2201,14 @@ bool Log_event::contains_partition_info(
 
 Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
 {
+  Slave_worker *worker= NULL;
   /* checking properties and perform corresponding actions */
 
   // g
   if (contains_partition_info())
   {
     // a lot of things inside `get_slave_worker_id'
-    Slave_worker *worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
+    worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
     const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker;
   }
 
@@ -2217,18 +2218,30 @@ Slave_worker *Log_event::get_slave_worke
     // insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
     // Group_cnt= rli->gaq->e;
     // rli->gaq->en_queue({NULL, W_s}); 
+    Slave_job_group gaq_item=
+      {
+        //{NULL, log_pos},
+        log_pos,
+
+        worker->id,   // todo: -> NULL and implement set_dynamic in DA
+        
+        const_cast<Relay_log_info*>(rli)->mts_total_groups++};
+
+    rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &gaq_item);
+
+    DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
+
     // B-event is appended to the Deferred Array associated with GCAP
+    // TODO: refine  contains_partition_info() to not include BEGIN
   }
 
   // T
   if (ends_group())
   {
     //  assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
+    mts_group_cnt= rli->gaq->assigned_group_index;
 
-    // cleanup: CGAP := nil
-
-    //  ev->group_cnt := GAQ.Group_cnt, so that
-    //  at procesing *Worker*.WQ.last_group_cnt := ev->group_cnt
+    // *TODO* cleanup: CGAP := nil
   }
 
   // todo: p. the first p-event clears CGAP. Each p-event is appened to DA.
@@ -2288,7 +2301,6 @@ static void * head_queue(Slave_jobs_queu
    return a job item through a struct which point is supplied via argument.
 */
 static Slave_job_item * de_queue(Slave_jobs_queue *jobs, Slave_job_item *ret)
-
 {
   if (jobs->e == jobs->s)
   {
@@ -2483,10 +2495,7 @@ int slave_worker_exec_job(Slave_worker *
   }
   if (ev->ends_group())
   {
-    w->slave_worker_ends_group();
-
-    // TODO: GAQ related
-    // w->last_group_done_index = ev->group_cnt
+    w->slave_worker_ends_group(ev->mts_group_cnt);
   }
 
   error= ev->do_apply_event(rli);
@@ -2499,7 +2508,9 @@ int slave_worker_exec_job(Slave_worker *
           to remove w_rli w/a
   */
   ev->update_pos(w->w_rli);
-  delete ev;  // after ev->update_pos() event is garbage
+
+  // delete ev;  // after ev->update_pos() event is garbage
+
   mysql_mutex_unlock(&w->jobs_lock);
 
   /* statistics */

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-11-18 14:00:52 +0000
+++ b/sql/log_event.h	2010-11-19 14:51:58 +0000
@@ -992,6 +992,13 @@ public:
   */
   ulong slave_exec_mode;
 
+  /**
+    Index in @c rli->gaq array to indicate a group that this event is purging.
+    The index is set by C:r to a group terminator event is checked by W at 
+    the event execution. The indexed data represent the Worker progress status.
+  */
+  ulong mts_group_cnt;
+
 #ifdef MYSQL_SERVER
   THD* thd;
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-18 14:00:52 +0000
+++ b/sql/rpl_rli.h	2010-11-19 14:51:58 +0000
@@ -432,10 +432,13 @@ public:
   int   slave_pending_jobs_max;
   Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   Slave_committed_queue *gaq;
+  DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
 
   Slave_worker* get_current_worker() const;
   Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
+  ulonglong mts_total_groups; // total event groups distributed in current session
+  ulong least_occupied_worker; // ... todo: APH ...
 
   /**
     Helper function to do after statement completion.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-11-18 14:00:52 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-11-19 14:51:58 +0000
@@ -331,7 +331,7 @@ Slave_worker *get_free_worker(DYNAMIC_AR
    @c get_slave_worker().
 
    Affected by the being committed group APH tuples are updated.
-
+   @c last_group_done_index member is set to the arg value.
    for each D_i in CGEP
        assert (W_id == APH.W_id|P_d == D_i)
        update APH set U-- where P_d = D_i
@@ -341,6 +341,166 @@ Slave_worker *get_free_worker(DYNAMIC_AR
     CGEP the Worker partition cache is cleaned up.
 */
 
-void Slave_worker::slave_worker_ends_group()
+void Slave_worker::slave_worker_ends_group(ulong gaq_idx)
+{
+  last_group_done_index = gaq_idx;
+}
+
+
+/**
+   Class circular_buffer_queue
+*/
+
+ulong circular_buffer_queue::de_queue(uchar *val)
+{
+  ulong ret;
+  if (e == s)
+  {
+    DBUG_ASSERT(len == 0);
+    return (ulong) -1;
+  }
+
+  ret= e;
+  get_dynamic(&Q, val, e);
+  len--;
+  
+  // pre boundary cond
+  if (a == s)
+    a= e;
+  e= (e + 1) % s;
+
+  // post boundary cond
+  if (a == e)
+    e= s;
+
+  DBUG_ASSERT(e == s ||
+              (len == (a >= e)? (a - e) :
+               (s + a - e)));
+
+  return ret;
+}
+
+ulong circular_buffer_queue::en_queue(void *item)
+{
+  ulong ret;
+  if (a == s)
+  {
+    DBUG_ASSERT(a == Q.elements);
+    return (ulong) -1;
+  }
+
+  // store
+
+  ret= a;
+  set_dynamic(&Q, (uchar*) item, ret= a);
+
+
+  // pre-boundary cond
+  if (e == s)
+    e= a;
+  
+  a= (a + 1) % s;
+  len++;
+
+  // post-boundary cond
+  if (a == e)
+    a= s;
+  DBUG_ASSERT(a == e || 
+              len == (a >= e) ?
+              (a - e) : (s + a - e));
+  return ret;
+}
+
+void* circular_buffer_queue::head_queue()
+{
+  uchar *ret= NULL;
+  if (e == s)
+  {
+    DBUG_ASSERT(len == 0);
+  }
+  else
+  {
+    get_dynamic(&Q, (uchar*) ret, e);
+  }
+  return (void*) ret;
+}
+
+/**
+   two index comparision.
+
+   @note   The caller makes sure the args are within the valid
+           range, incl cases the queue is empty or full.
+
+   @return TRUE  if the first arg identifies a queue entity ordered
+                 before one defined by the 2nd arg,
+           FALSE otherwise.
+*/
+bool circular_buffer_queue::gt(ulong i, ulong k)
+{
+  if (i >= e)
+    if (k >= e)
+      return i > k;
+    else
+      return FALSE;
+  else
+    if (k >= e)
+      return TRUE;
+    else
+      return i > k;
+}
+
+/**
+   The queue is processed from the head item by item
+   to purge items representing committed groups.
+   Progress of each Worker is monitored through @c last_done
+   and @c last_group_done_index.
+   It's compared first against the polled
+   to break out of the loop at once if no progress.
+
+
+   The caller is expected to be the checkpoint handler.
+
+   A copy of the last discarded item containing
+   the refreshed value of the committed low-water-mark is stored
+   into @c lwm member for further caller's processing.
+
+
+   @return number of discarded items
+*/
+ulong Slave_committed_queue::move_queue_head(DYNAMIC_ARRAY *ws)
 {
+  ulong i, cnt= 0;
+  for (i= e; i != a && !empty();)
+  {
+    Slave_worker *w_i;
+    Slave_job_group g;
+    ulong l;
+    get_dynamic(&Q, (uchar *) &g, i);
+    get_dynamic(ws, (uchar *) &w_i, g.worker_id);
+    get_dynamic(&last_done, (uchar *) &l, w_i->id);
+
+    DBUG_ASSERT(l <= s);
+
+    if (l == w_i->last_group_done_index)
+      break; /* no progress case */
+
+    DBUG_ASSERT(w_i->last_group_done_index >= i ||
+                (((i > a && e > a)  || a == s) && (w_i->last_group_done_index < a)));
+
+    if (w_i->last_group_done_index == i || gt(w_i->last_group_done_index, i))
+    {
+      ulong ind= de_queue((uchar*) &lwm);
+
+      DBUG_ASSERT(ind == i);
+      DBUG_ASSERT(g.total_seqno == lwm.total_seqno);
+
+      set_dynamic(&last_done, (uchar*) &i, w_i->id);
+    }
+    else
+      break;
+    cnt++;
+    i= (i + 1) % s;
+  }
+
+  return cnt;
 }

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-18 14:00:52 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-19 14:51:58 +0000
@@ -25,24 +25,68 @@ typedef struct slave_job_item
   void *data;
 } Slave_job_item;
 
+/**
+   The class defines a type of queue with a predefined max size that is
+   implemented using the circular memory buffer.
+   That is items of the queue are accessed as indexed elements of
+   the array buffer in a way that when the index value reaches
+   a max value it wraps around to point to the first buffer element.
+*/
 class circular_buffer_queue
 {
 public:
 
   DYNAMIC_ARRAY Q;
-  ulong s;
-  ulong a;
-  ulong e;
+  ulong s;              // the Size of the queue in terms of element
+  ulong a;              // first Available index to append at (next to tail)
+  ulong e;              // the head index
   volatile ulong len;   // it is also queried to compute least occupied
 
   circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
     s(max), a(0), e(max), len(0)
   {
+    DBUG_ASSERT(s < ULONG_MAX);
     my_init_dynamic_array(&Q, el_size, s, alloc_inc);
-  };
-  circular_buffer_queue () {};
+  }
+  circular_buffer_queue () {}
+  ~circular_buffer_queue () { delete_dynamic(&Q); }
+
+   /**
+      Content of the being dequeued item is copied to the arg-pointer
+      location.
+      
+      @return the queue's array index that the de-queued item
+      locates at, or
+      an error encoded in beyond the index legacy range.
+   */
+  ulong de_queue(uchar *);
+
+  /**
+    return the index where the arg item locates
+           or an error encoded as a value in beyond of the legacy range
+           [0, circular_buffer_max_index].
+           
+           Todo: define the range.
+  */
+  ulong en_queue(void *item);
+  /**
+     return the value of @c data member of the head of the queue.
+  */
+  void* head_queue();
+  bool   gt(ulong i, ulong k); // comparision of ordering of two entities
+  bool empty() { return e == s; }
+  bool full() { return a == s; }
 };
 
+typedef struct st_slave_job_group
+{
+  //struct event_coordinates coord;
+  my_off_t  pos;  // filename in Slave_committed_queue::current_binlog[]
+
+  ulong worker_id;
+  ulonglong total_seqno;
+} Slave_job_group;
+
 /**
   Group Assigned Queue whose first element identifies first gap
   in committed sequence. The head of the queue is therefore next to 
@@ -51,22 +95,43 @@ public:
 class Slave_committed_queue : public circular_buffer_queue
 {
 public:
-  // Allocation of file_name that is common for all Slave_assigned_job_group:s
+
+  /* Allocation of file_name that is common for all Slave_assigned_job_group:s */
   char current_binlog[FN_REFLEN];
-  void update_current_binlog(const char *post_rotate); //master's Rotate exec it 
-  Slave_committed_queue (const char *log, uint el_size, ulong max, uint inc= 0)
+
+  /* master's Rot-ev exec */
+  void update_current_binlog(const char *post_rotate);
+
+  /*
+     The last checkpoint time Low-Water-Mark
+  */
+  Slave_job_group lwm;
+  
+  /* last time processed indexes for each worker */
+  DYNAMIC_ARRAY last_done;
+
+  /* the being assigned group index in GAQ */
+  ulong assigned_group_index;
+
+  Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
+                         uint inc= 0)
     : circular_buffer_queue(el_size, max, inc)
   {
+    uint k;
     strmake(current_binlog, log, sizeof(current_binlog) - 1);
-  };
-};
+    my_init_dynamic_array(&last_done, sizeof(s), n, 0);
+    for (k= 0; k < n; k++)
+      insert_dynamic(&last_done, (uchar*) &s);  // empty for each Worker
+  }
+
+  ~Slave_committed_queue ()
+  { 
+    delete_dynamic(&last_done);
+  }
 
-typedef struct st_slave_assigned_job_group
-{
-  struct event_coordinates coord;
-  ulong worker_id;
-  ulonglong total_seqno;
-} Slave_assigned_job_group;
+  /* Checkpoint routine refreshes the queue */
+  ulong move_queue_head(DYNAMIC_ARRAY *ws);
+};
 
 class Slave_jobs_queue : public circular_buffer_queue
 {
@@ -90,9 +155,10 @@ public:
   // @c last_group_done_index is for recovery, although can be viewed
   //    as statistics as well.
   // C marks a T-event with the incremented group_cnt that is
-  // an index in GAQ; W stores it  at the event execution. 
+  // an index in GAQ; W stores it at the event execution. 
   // C polls the value periodically to maintain an array
   // of the indexes in order to progress on GAQ's lwm, see @c next_event().
+  // see @c Log_event::group_cnt.
   volatile ulong last_group_done_index; // it's index in GAQ
 
   List<Log_event> data_in_use; // events are still in use by SQL thread
@@ -126,7 +192,7 @@ public:
 
   size_t get_number_worker_fields();
 
-  void slave_worker_ends_group();  // CGEP walk through to upd APH
+  void slave_worker_ends_group(ulong);  // CGEP walk through to upd APH
 
 private:
   bool read_info(Rpl_info_handler *from);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-18 14:50:54 +0000
+++ b/sql/rpl_slave.cc	2010-11-19 14:51:58 +0000
@@ -2676,6 +2676,15 @@ int apply_event_and_update_pos(Log_event
 
 //      if (!rli->is_parallel_exec() || ev->only_serial_exec())
            error= ev->update_pos(rli);
+#if 1
+
+  // experimental checkpoint per each scheduling attempt
+  // logics of next_event()
+
+    rli->gaq->move_queue_head(&rli->workers);
+
+#endif
+
 #ifndef DBUG_OFF
     DBUG_PRINT("info", ("update_pos error = %d", error));
     if (!rli->belongs_to_client())
@@ -2852,7 +2861,7 @@ static int exec_relay_log_event(THD* thd
       if (thd->variables.binlog_rows_query_log_events)
         handle_rows_query_log_event(ev, rli);
 
-      if (!rli->is_parallel_exec() &&
+      if (!rli->is_parallel_exec() && !ev->only_serial_exec() &&
           ev->get_type_code() != ROWS_QUERY_LOG_EVENT)  // mts todo: check this case
       {
 
@@ -3648,7 +3657,8 @@ int slave_start_workers(Relay_log_info *
   uint i;
   int error= 0;
 
-  // TODO: CGEP dynarray holds id:s of partitions of the Current being executed Group
+  // CGAP dynarray holds id:s of partitions of the Current being executed Group
+  my_init_dynamic_array(&rli->curr_group_assigned_parts, NAME_LEN, SLAVE_INIT_DBS_IN_GROUP, 1);
 
   // GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
   //      @c lwm_checkpoint_period to update GAQ (see @c @next_event())
@@ -3657,8 +3667,9 @@ int slave_start_workers(Relay_log_info *
   // ::slave_max_pending_jobs is the worst case when all jobs contain
   // one event and map to one worker.
   rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
-                                      sizeof(Slave_assigned_job_group),
-                                      ::slave_max_pending_jobs);
+                                      sizeof(Slave_job_group),
+                                      ::slave_max_pending_jobs, n);
+  rli->mts_total_groups= 0;
   for (i= 0; i < n; i++)
   {
     uint k;


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101119145158-e984dthkb1ussm1b.bundle
Thread
bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3212 to 3213)WL#5569Andrei Elkin19 Nov