List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 28 2011 10:42am
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3285) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3285 Andrei Elkin	2011-05-28
      wl#5569 MTS
      
      This patch contains cleanup and simplification of logics of handling some events 
      sequentially by Coordinator and adds memory-allocation failure branch 
      to workers starting routine.
     @ sql/log_event.cc
        Cleanup and simplification of logics of handling some events sequentially
        by Coordinator.
        An event is marked as parallel or sequential through C's rli that affects
        commit to info table by C as well as the event's destruction.
     @ sql/rpl_rli.h
        simplified (curr_group_is_parallel + curr_group_split) into curr_*event*_is_parallel.
     @ sql/rpl_rli_pdb.h
        Adding GAQ memory-allocation failure notification.
     @ sql/rpl_slave.cc
        simplified (curr_group_is_parallel + curr_group_split) into curr_event_is_parallel;
        GAQ memory-allocation failure branch is added to workers starting routine.

    modified:
      sql/log_event.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-05-27 21:29:14 +0000
+++ b/sql/log_event.cc	2011-05-28 10:42:40 +0000
@@ -2368,6 +2368,7 @@ bool Log_event::contains_partition_info(
 /**
    General hashing function to compute the id of an applier for
    the current event.
+   The event is marked as belonging to a Worker.
    At computing the id few rules apply depending on partitioning properties
    that the event instance can feature.
 
@@ -2405,6 +2406,8 @@ Slave_worker *Log_event::get_slave_worke
   Slave_job_group g, *ptr_g;
   bool is_b_event;
 
+  rli->curr_event_is_parallel= TRUE; // event belongs to a Worker
+  
   /* checking partioning properties and perform corresponding actions */
 
   // Beginning of a group designated explicitly with BEGIN
@@ -2435,8 +2438,6 @@ Slave_worker *Log_event::get_slave_worke
 
     // the last occupied GAQ's array index
     gaq_idx= rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
-    // serves as a mark for Coord to delete events otherwise
-    rli->curr_group_is_parallel= TRUE;
     
     DBUG_ASSERT(gaq_idx != (ulong) -1 && gaq_idx < rli->gaq->s);
     DBUG_ASSERT(((Slave_job_group *) 
@@ -2519,14 +2520,6 @@ Slave_worker *Log_event::get_slave_worke
 
     DBUG_ASSERT(i == num_dbs || num_dbs == OVER_MAX_DBS_IN_EVENT_MTS);
 
-    /* 
-       Either old master binlog (todo: assert), or a specific "corner"
-       case (todo: wrap with BEGIN/COMMIT on master anyway) of logging 
-       like SELECT sf(), where sf() has a side effect.
-    */
-    if (!rli->curr_group_seen_begin)
-      rli->curr_group_is_parallel= TRUE;
-
     // TODO: convert to C's private mem_root.
 
     // Releasing the Coord's mem-root from the updated dbs. It's safe to do at this
@@ -2579,12 +2572,13 @@ Slave_worker *Log_event::get_slave_worke
   if (ends_group() || !rli->curr_group_seen_begin)
   {
     uint i;
+
+    // index of GAQ that this terminal event belongs to
     mts_group_cnt= rli->gaq->assigned_group_index;
+
     ptr_g= (Slave_job_group *)
       dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
 
-    DBUG_ASSERT(rli->curr_group_is_parallel);
-
     // TODO: throw an error when relay-log reading starts from inside of a group!!
 
     if (!worker->relay_log_change_notified)
@@ -2855,6 +2849,7 @@ int Log_event::apply_event(Relay_log_inf
   {
     if (parallel)
     {
+      c_rli->curr_event_is_parallel= FALSE; // mark event as belonging to Coordinator
       /* 
          There are two classes of events that Coordinator executes
          itself. One e.g the master Rotate requires all Workers to finish up 
@@ -2893,10 +2888,8 @@ int Log_event::apply_event(Relay_log_inf
           DBUG_RETURN(-1);
         }
         /*
-          Marking sure the event won't be executed in parallel.
-          That affects memory deallocation in the following execution path.
+          Marking sure the event will be executed in sequential mode.
         */
-        c_rli->curr_group_is_parallel= FALSE;
         (void) wait_for_workers_to_finish(rli);
 
 #ifndef DBUG_OFF
@@ -2910,19 +2903,6 @@ int Log_event::apply_event(Relay_log_inf
         }
 #endif
       }
-      else
-      {
-        if (rli->curr_group_is_parallel)
-        {
-          /* 
-             the event is artifical to splits the current group into separate
-             relay-logs. Differently to the previous events of the group this one
-             is applied by Coordinator and w/o any synchronization with Workers.
-          */
-          c_rli->curr_group_split= TRUE;
-          c_rli->curr_group_is_parallel= FALSE;
-        }
-      }
     }
     DBUG_RETURN(do_apply_event(rli));
   }

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-05-25 16:02:13 +0000
+++ b/sql/rpl_rli.h	2011-05-28 10:42:40 +0000
@@ -455,8 +455,7 @@ public:
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
  
-  bool curr_group_is_parallel; // an event to process by Coordinator
-  bool curr_group_split;       // an event split the current group forcing C to exec it
+  bool curr_event_is_parallel; // if FALSE the current event is processed by C
   ulong opt_slave_parallel_workers; // auxiliary cache for ::opt_slave_parallel_workers
   ulong slave_parallel_workers;     // the one slave session time number of workers
   ulong recovery_parallel_workers; // number of workers while recovering.

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2011-05-27 21:29:14 +0000
+++ b/sql/rpl_rli_pdb.h	2011-05-28 10:42:40 +0000
@@ -166,6 +166,8 @@ typedef struct st_slave_job_group
 class Slave_committed_queue : public circular_buffer_queue
 {
 public:
+  
+  bool inited;
 
   /* master's Rot-ev exec */
   void update_current_binlog(const char *post_rotate);
@@ -183,10 +185,15 @@ public:
 
   Slave_committed_queue (const char *log, uint el_size, ulong max, uint n,
                          uint inc= 0)
-    : circular_buffer_queue(el_size, max, inc)
+    : circular_buffer_queue(el_size, max, inc), inited(FALSE)
   {
     uint k;
     ulonglong l= 0;
+    
+    if (max >= (ulong) -1 || !circular_buffer_queue::inited_queue)
+      return;
+    else
+      inited= TRUE;
     my_init_dynamic_array(&last_done, sizeof(lwm.total_seqno), n, 0);
     for (k= 0; k < n; k++)
       insert_dynamic(&last_done, (uchar*) &l);  // empty for each Worker
@@ -196,8 +203,11 @@ public:
 
   ~Slave_committed_queue ()
   { 
-    delete_dynamic(&last_done);
-    my_free(lwm.group_relay_log_name);
+    if (inited)
+    {
+      delete_dynamic(&last_done);
+      my_free(lwm.group_relay_log_name);
+    }
   }
 
   /* Checkpoint routine refreshes the queue */

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-05-27 21:29:14 +0000
+++ b/sql/rpl_slave.cc	2011-05-28 10:42:40 +0000
@@ -2842,7 +2842,7 @@ int apply_event_and_update_pos(Log_event
       See sql/rpl_rli.h for further details.
     */
     int error= 0;
-    if (skip_event || !rli->is_parallel_exec() || !rli->curr_group_is_parallel)
+    if (skip_event || !rli->is_parallel_exec() || !rli->curr_event_is_parallel)
     {
 #ifndef DBUG_OFF
       /*
@@ -3034,17 +3034,10 @@ static int exec_relay_log_event(THD* thd
 
     exec_res= apply_event_and_update_pos(ev, thd, rli);
 
-    if ((!rli->is_parallel_exec() || !rli->curr_group_is_parallel))
+    if ((!rli->is_parallel_exec() || !rli->curr_event_is_parallel))
     {
-      DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_group_is_parallel ||
+      DBUG_ASSERT(!rli->is_parallel_exec() || !rli->curr_event_is_parallel ||
                   ev->shall_skip(rli) != Log_event::EVENT_SKIP_NOT);
-      
-      if (rli->curr_group_split)
-      {
-        // the current group split status is reset
-        rli->curr_group_is_parallel= TRUE;
-        rli->curr_group_split= FALSE;
-      }
       /*
         Format_description_log_event should not be deleted because it will be
         used to read info about the relay log's format; it will be deleted when
@@ -4171,6 +4164,9 @@ int slave_start_workers(Relay_log_info *
                                       sizeof(Slave_job_group),
                                       1 + rli->opt_slave_parallel_workers *
                                       rli->mts_slave_worker_queue_len_max, n);
+  if (!rli->gaq->inited)
+    return 1;
+
   rli->mts_pending_jobs_size= 0;
   rli->mts_pending_jobs_size_max= ::opt_mts_pending_jobs_size_max;
   rli->mts_wqs_underrun_w_id= (ulong) -1;
@@ -4180,9 +4176,8 @@ int slave_start_workers(Relay_log_info *
   rli->mts_worker_underrun_level= ::opt_mts_worker_underrun_level;
   rli->mts_total_groups= 0;
   rli->curr_group_seen_begin= FALSE;
-  rli->curr_group_is_parallel= FALSE;
+  rli->curr_event_is_parallel= FALSE;
   rli->curr_group_isolated= FALSE;
-  rli->curr_group_split= FALSE;
   rli->checkpoint_seqno= 0;
   /*
     dyn memory to consume by Coordinator per event
@@ -4343,6 +4338,8 @@ pthread_handler_t handle_slave_sql(void 
   if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
   {
       mysql_mutex_unlock(&rli->run_lock);
+      rli->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, 
+                  "Failed during slave workers initialization");
       goto err;
   }
   if (init_slave_thread(thd, SLAVE_THD_SQL))


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110528104240-ofiubd6s1y8b9m6e.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3285) WL#5569Andrei Elkin31 May