List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:November 27 2010 3:37pm
Subject:bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3219 to 3220)
WL#5569
View as plain text  
 3220 Andrei Elkin	2010-11-27
      wl#5569
      
      Providing relay-log name for wl#5599.
      Protocol of action on the C and W sides is described in rpl_rli_pdb.h.
      
      Erroring out in case of parallel exec and ROWS_QUERY_LOG_EVENT.
      
      (todo: the native sequential mode for the event needs some revision, in particular 
      `delete ev' shall happen *always* in rli->cleanup_context not in two places as of current).
     @ sql/log_event.cc
        Erroring out in case of parallel exec and ROWS_QUERY_LOG_EVENT;
        Deploying C role of handling relay-log name change;
     @ sql/rpl_rli_pdb.cc
        Providing relay-log name for wl#5599.
        Freeing allocated memory for relay-log name at the end of the group execution by Worker.
     @ sql/rpl_rli_pdb.h
        Protocol of action on the C and W sides is here.
        Removing current_binlog;
        Adding a pointer group_relay_log_name member to st_slave_job_group.

    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
 3219 Andrei Elkin	2010-11-26
      wl#5569 MTS
      
      Partitioning conflict detection and handling is implemented.
      A new option to run Query in parallel though incompatibly with Rows- case in that
      the default db not the actual db:s are used as the partition key.
      
      User interface gained the global var and the cmd line opt:
      
      slave_run_query_in_parallel (Welcome to the set! :-)
     @ mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
        new tests result file is added.
     @ mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
        Partitioning conflicts detection, handling basic initial test is added.
     @ sql/log_event.cc
        Refining parallel vs sequential decider to cover optional support for Query parallelization.
     @ sql/log_event.h
        Refining only_serial_exec() with providing hints through two new args.
     @ sql/mysqld.cc
        new Query limited parallelization support related.
     @ sql/mysqld.h
        new Query limited parallelization support related.
     @ sql/rpl_rli.h
        changed are due to new Query limited parallelization support.
     @ sql/rpl_rli_pdb.cc
        Conflict detection, waiting, partition release is implemented.

    added:
      mysql-test/suite/rpl/r/rpl_parallel_conflicts.result
      mysql-test/suite/rpl/t/rpl_parallel_conflicts.test
    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sys_vars.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-26 21:08:30 +0000
+++ b/sql/log_event.cc	2010-11-27 15:36:50 +0000
@@ -2181,7 +2181,7 @@ bool Log_event::contains_partition_info(
    r - a mini-group internal "regular" event that follows its g-parent
       (Write, Update, Delete -rows)
    S - sequentially applied event (may not be a part of any group).
-       Events of this type are determined via @c only_serial_exec()
+       Events of this type are determined via @c only_sequential_exec()
        earlier and don't cause calling this method .
    T - terminator of the group (XID, COMMIT, ROLLBACK)
 
@@ -2215,6 +2215,7 @@ Slave_worker *Log_event::get_slave_worke
     // rli->gaq->en_queue({NULL, W_s}); 
     g= {
       log_pos,
+      NULL,
       (ulong) -1,
       const_cast<Relay_log_info*>(rli)->mts_total_groups++
     };
@@ -2280,12 +2281,35 @@ Slave_worker *Log_event::get_slave_worke
   if (ends_group() || !rli->curr_group_seen_begin)
   {
     uint i;
-    //  assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
     mts_group_cnt= rli->gaq->assigned_group_index;
-    
+
+    if (!worker->relay_log_change_notified)
+    {
+      /*
+        Prior this event, C rotated the relay log to drop each
+        Worker's notified flag.
+        Now group terminating event initiates the new name
+        delivery through the current group relaylog slot in GAQ.
+      */
+
+      Slave_job_group *ptr_g=
+        (Slave_job_group *)
+        dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+
+      DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
+
+      ptr_g->group_relay_log_name= (char *)
+        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->get_group_relay_log_name()) + 1, MYF(MY_WME));
+      strcpy(ptr_g->group_relay_log_name, const_cast<Relay_log_info*>(rli)->get_group_relay_log_name());
+      worker->relay_log_change_notified= TRUE;
+    }
+
     DBUG_ASSERT(worker == rli->last_assigned_worker);
+
     if (!worker)
     {
+      DBUG_ASSERT(0); 
+
       // a very special case of the empty group: {B, T}
       DBUG_ASSERT(rli->curr_group_assigned_parts.elements == 0
                   && rli->curr_group_da.elements == 1);
@@ -2450,7 +2474,7 @@ int Log_event::apply_event(Relay_log_inf
   bool parallel;
 
   if (!(parallel= rli->is_parallel_exec()) ||
-      only_serial_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
+      only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_seen_begin))
   {
     if (parallel)
     {
@@ -2478,15 +2502,18 @@ int Log_event::apply_event(Relay_log_inf
 
       DBUG_ASSERT(!rli->curr_group_seen_begin);
       c_rli->curr_group_is_parallel= FALSE;   // Coord will destruct all the rest of events
-
-      (void) wait_for_workers_to_finish(rli);
+      if (!parallel_exec_by_coordinator(::server_id))
+        (void) wait_for_workers_to_finish(rli);
     }
     DBUG_RETURN(do_apply_event(rli));
   }
 
-  // !!! TODO: suppress
-  // if (get_type_code() == ROWS_QUERY_LOG_EVENT)
-    
+  if (get_type_code() == ROWS_QUERY_LOG_EVENT)
+  {      
+    rli->report(ERROR_LEVEL, 0,
+                "No parallel support for ROWS_QUERY_LOG_EVENT");
+    DBUG_RETURN(1);
+  }
 
   if ((!(w= get_slave_worker_id(rli)) ||
        DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-11-26 21:08:30 +0000
+++ b/sql/log_event.h	2010-11-27 15:36:50 +0000
@@ -1155,9 +1155,10 @@ public:
   /**
      MST: to execute serially due to technical or conceptual limitation
 
-     @return TRUE for all but {Query,Rand,User_var,Intvar,Rows}_log_event
+     @return TRUE if despite permanent parallel execution mode an event
+                  needs applying in a real isolation that is sequentially.
   */
-  bool only_serial_exec(bool query_in_parallel, bool group_term_in_parallel)
+  bool only_sequential_exec(bool query_in_parallel, bool group_term_in_parallel)
   {
     return
        /* 
@@ -1194,7 +1195,20 @@ public:
       get_type_code() == PRE_GA_DELETE_ROWS_EVENT||
       get_type_code() == INCIDENT_EVENT;
   }
-  
+
+  /**
+     MST: some events can be applied by Coordinator concurrently with Workers.
+
+     @return TRUE  if that's the case,
+             FALSE otherwise.
+  */
+  bool parallel_exec_by_coordinator(ulong slave_server_id)
+  {
+    return
+      get_type_code() == FORMAT_DESCRIPTION_EVENT &&
+      (server_id == (uint32) ::server_id);
+  }
+
   /**
      Events of a cetain type carry partitioning data such as db names.
   */

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-11-26 21:08:30 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-11-27 15:36:50 +0000
@@ -448,7 +448,23 @@ void Slave_worker::slave_worker_ends_gro
   uint i;
 
   if (!error)
+  {
+    Slave_job_group *ptr_g=
+      (Slave_job_group *)
+      dynamic_array_ptr(&c_rli->gaq->Q, c_rli->gaq->assigned_group_index);
+    if (ptr_g->group_relay_log_name != NULL)
+    {
+      // memorizing a new relay-log file name
+
+      DBUG_ASSERT(strlen(ptr_g->group_relay_log_name) + 1
+                  <= sizeof(group_relay_log_name));
+
+      strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
+      delete ptr_g->group_relay_log_name;  // C allocated
+      ptr_g->group_relay_log_name= NULL;   // mark freed
+    }
     last_group_done_index = gaq_idx;
+  }
 
   for (i= curr_group_exec_parts.elements; i > 0; i--)
   {

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-26 21:08:30 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-27 15:36:50 +0000
@@ -91,9 +91,19 @@ public:
 
 typedef struct st_slave_job_group
 {
-  //struct event_coordinates coord;
-  my_off_t  pos;  // filename in Slave_committed_queue::current_binlog[]
+  my_off_t master_log_pos; 
 
+  /* 
+     When RL name changes C allocates and fill in a new name of RL,
+     otherwise it fills in NULL.
+     C keeps track of each Worker has been notified on the updating
+     to make sure the routine runs once per change.
+
+     W checks the value at commit and memoriezes a not-NULL
+     with prior freeing old one's allocation. The memorized value
+     plays its role at commit until a new has arrived.
+  */
+  char     *group_relay_log_name;
   ulong worker_id;
   ulonglong total_seqno;
 } Slave_job_group;
@@ -107,9 +117,6 @@ class Slave_committed_queue : public cir
 {
 public:
 
-  /* Allocation of file_name that is common for all Slave_assigned_job_group:s */
-  char current_binlog[FN_REFLEN];
-
   /* master's Rot-ev exec */
   void update_current_binlog(const char *post_rotate);
 
@@ -129,7 +136,6 @@ public:
     : circular_buffer_queue(el_size, max, inc)
   {
     uint k;
-    strmake(current_binlog, log, sizeof(current_binlog) - 1);
     my_init_dynamic_array(&last_done, sizeof(s), n, 0);
     for (k= 0; k < n; k++)
       insert_dynamic(&last_done, (uchar*) &s);  // empty for each Worker
@@ -188,6 +194,7 @@ public:
   ulong trans_jobs;  // how many jobs per trns
   volatile int curr_jobs; // the current assignments
   ulong usage_partition; // number of different partitions handled by this worker
+  volatile bool relay_log_change_notified; // Coord sets and resets, W can read
 
   /*
     We need to make this a dynamic field. /Alfranio

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-26 21:08:30 +0000
+++ b/sql/rpl_slave.cc	2010-11-27 15:36:50 +0000
@@ -2677,7 +2677,7 @@ int apply_event_and_update_pos(Log_event
 //      It was served so inside apply_event() above.
 //      The main RLI table is safe to update now.
 
-//      if (!rli->is_parallel_exec() || ev->only_serial_exec())
+//      if (!rli->is_parallel_exec() || ev->only_sequential_exec())
            error= ev->update_pos(rli);
 #if 1
 
@@ -2879,7 +2879,7 @@ static int exec_relay_log_event(THD* thd
         handle_rows_query_log_event(ev, rli);
 
       if ((!rli->is_parallel_exec() ||
-           ev->only_serial_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
+           ev->only_sequential_exec(rli->run_query_in_parallel, rli->curr_group_is_parallel))
           && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)  // mts TODO: check this case
       {
 
@@ -3603,7 +3603,7 @@ err:
 
   my_thread_end();
   pthread_exit(0);
-  DBUG_RETURN(0);        
+  DBUG_RETURN(0); 
 }
 
 /**
@@ -3632,6 +3632,8 @@ int slave_start_single_worker(Relay_log_
   ulong key_worker_idx[]= { server_id, w->id };
   w->init_info(key_worker_idx, NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER);
 
+  w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
+
   // ALFRANIO --> The recovery procedure must be introduced here.
 
   w->w_rli->workers= rli->workers; // shallow copying is sufficient
@@ -5352,6 +5354,13 @@ static Log_event* next_event(Relay_log_i
         rli->flush_info(key_info_idx, key_info_size);
       }
 
+      /* Reset the relay-log-change-notified status of  Slave Workers */
+      for (uint i; i < rli->workers.elements; i++)
+      {
+        Slave_worker *w= (Slave_worker *) dynamic_array_ptr(&rli->workers, i);
+        w->relay_log_change_notified= FALSE;
+      }
+
       /*
         Now we want to open this next log. To know if it's a hot log (the one
         being written by the I/O thread now) or a cold log, we can use


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101127153650-tizvu3c0ovmoj1g5.bundle
Thread
bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3219 to 3220)WL#5569Andrei Elkin27 Nov