List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 9 2010 5:45pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3237) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped

 3237 Andrei Elkin	2010-12-09
      wl#5569 MTS
      
      Integration with wl#5599 recovery for MTS and fixing two asserts.
      One is due to missed cleanup of errored-out rows-events;
      the other is a work-around on
        w->curr_group_exec_parts->dynamic_ids
      is initialized to have one partition on the Worker startup, but it should not.
     @ sql/log_event.cc
        Propagating CP related info from C to W.
     @ sql/rpl_rli.cc
        Added a part of CP info from C to W propagation.
     @ sql/rpl_rli.h
        New members to RLI due to CP info from C to W propagation.
     @ sql/rpl_rli_pdb.cc
        Worker stores the new CP to mention it in flush_info() along with
        (todo) a bitmap of the executed groups within the checkpoint interval.
     @ sql/rpl_rli_pdb.h
        New members to a transport and the Worker class due to CP info.
     @ sql/rpl_slave.cc
        missed cleanup of errored-out rows-events;
        work-around on
          w->curr_group_exec_parts->dynamic_ids
        is initialized to have one partition on the Worker startup, but it should not.

    modified:
      sql/log_event.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-12-08 00:33:48 +0000
+++ b/sql/log_event.cc	2010-12-09 17:45:02 +0000
@@ -2418,9 +2418,13 @@ Slave_worker *Log_event::get_slave_worke
   {
     g.master_log_pos= log_pos;
     g.group_master_log_pos= g.group_relay_log_pos= 0;
+    g.group_master_log_name= NULL; // todo: remove
     g.group_relay_log_name= NULL;
     g.worker_id= (ulong) -1;
     g.total_seqno= const_cast<Relay_log_info*>(rli)->mts_total_groups++;
+    g.checkpoint_log_name= NULL;
+    g.checkpoint_log_pos= 0;
+    g.checkpoint_seqno= (uint) -1;
 
     // the last occupied GAQ's array index
     rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
@@ -2490,6 +2494,10 @@ Slave_worker *Log_event::get_slave_worke
   {
     uint i;
     mts_group_cnt= rli->gaq->assigned_group_index;
+    Slave_job_group *ptr_g=
+      (Slave_job_group *)
+      dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
+
 
     // TODO: throw an error when relay-log reading starts from inside of a group!!
 
@@ -2501,11 +2509,6 @@ Slave_worker *Log_event::get_slave_worke
         Now group terminating event initiates the new name
         delivery through the current group relaylog slot in GAQ.
       */
-
-      Slave_job_group *ptr_g=
-        (Slave_job_group *)
-        dynamic_array_ptr(&rli->gaq->Q, rli->gaq->assigned_group_index);
-
       DBUG_ASSERT(ptr_g->group_relay_log_name == NULL);
 
       ptr_g->group_relay_log_name= (char *)
@@ -2519,6 +2522,20 @@ Slave_worker *Log_event::get_slave_worke
       worker->relay_log_change_notified= TRUE;
     }
 
+    if (!worker->checkpoint_notified)
+    {
+      // Worker dealloc
+      ptr_g->checkpoint_log_name= (char *)
+        my_malloc(strlen(const_cast<Relay_log_info*>(rli)->
+                         get_group_master_log_name()) + 1, MYF(MY_WME));
+      strcpy(ptr_g->checkpoint_log_name,
+             const_cast<Relay_log_info*>(rli)->get_group_master_log_name());
+      ptr_g->checkpoint_log_pos= const_cast<Relay_log_info*>(rli)->get_group_master_log_pos();
+      worker->checkpoint_notified= TRUE;
+    }
+    ptr_g->checkpoint_seqno= rli->checkpoint_seqno;
+    const_cast<Relay_log_info*>(rli)->checkpoint_seqno++;
+
     DBUG_ASSERT(worker == rli->last_assigned_worker);
 
     if (!worker)
@@ -6641,7 +6658,11 @@ int Xid_log_event::do_apply_event(Relay_
   }
   else if (is_trans_repo && is_parallel)
   {
-    if ((error= w->commit_positions(this)))
+    ulong gaq_idx= mts_group_cnt;
+    Slave_job_group *ptr_g=
+      (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+
+    if ((error= w->commit_positions(this, ptr_g)))
       goto err;
   }
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-08 01:30:32 +0000
+++ b/sql/rpl_rli.cc	2010-12-09 17:45:02 +0000
@@ -82,9 +82,9 @@ Relay_log_info::Relay_log_info(bool is_s
     We need to decide if you are going to use an
     option and store it in a storage.
 
-    100 msec. 
+    500 msec. 
   */
-  lwm_period= 0.100;
+  lwm_period= 0.500;
   set_timespec_nsec(last_clock, 0);
 
   bzero((char*) &cache_buf, sizeof(cache_buf));
@@ -241,6 +241,27 @@ void Relay_log_info::reset_notified_rela
 }
 
 /**
+   Method is called in mts_checkpoint_routine()
+   to marks each Worker as requiring to adapt to a new checkpoint interval
+   whose coordinates is passed to it through GAQ index.
+
+   Worker notices the new checkpoint value at the group commit
+   to reset the current bitmap and set ON a bit number put by C into GAQ index
+   as the first group committed after the new checkpoint.
+*/
+void Relay_log_info::reset_notified_checkpoint()
+{
+  if (!is_parallel_exec())
+    return;
+  for (uint i= 0; i < workers.elements; i++)
+  {
+    Slave_worker *w= *(Slave_worker **) dynamic_array_ptr(&workers, i);
+    w->checkpoint_notified= FALSE;
+  }
+  checkpoint_seqno= 0;
+}
+
+/**
    The method can be run both by C having the Main (coord) rli context and
    by W having both the main and the Local (worker) rli context.
    Decision matrix:
@@ -1113,7 +1134,16 @@ void Relay_log_info::stmt_done(my_off_t 
     /* Alfranio needs to update the coordinator and workers. */
     
     if ((w= get_current_worker()) == NULL)
+    {
       flush_info(is_transactional() ? TRUE : FALSE);
+
+      /* 
+         The central recovery commit run in sequential mode forces
+         notification on the defacto new checkpoint.
+      */
+      if (is_parallel_exec())
+        reset_notified_checkpoint();
+    }
   }
 }
 

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-12-08 00:33:48 +0000
+++ b/sql/rpl_rli.h	2010-12-09 17:45:02 +0000
@@ -471,6 +471,7 @@ public:
      a new partition. Is updated at checkpoint commit to the main RLI.
   */
   DYNAMIC_ARRAY least_occupied_workers;
+  uint checkpoint_seqno;  // counter of groups executed after the most recent CP
 
   /* most of allocation in the coordinator rli is there */
   void init_workers(ulong);
@@ -490,9 +491,21 @@ public:
     return ret;
   }
 
+  /**
+     While a group is executed by a Worker the relay log can change.
+     Coordinator notifies Workers about this event. Worker is supposed
+     to commit to the recovery table with the new info.
+  */
   void reset_notified_relay_log_change();
 
   /**
+     While a group is executed by a Worker the relay log can change.
+     Coordinator notifies Workers about this event. Coordinator and Workers
+     maintain a bitmap of executed group that is reset with a new checkpoint. 
+  */
+  void reset_notified_checkpoint();
+
+  /**
     Helper function to do after statement completion.
 
     This function is called from an event to complete the group by

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-08 13:59:07 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-09 17:45:02 +0000
@@ -20,10 +20,11 @@ const char *info_slave_worker_fields []=
 
 Slave_worker::Slave_worker(const char* type, const char* pfs)
   : Rpl_info_worker(type, pfs), group_relay_log_pos(0),
-  group_master_log_pos(0)
+  group_master_log_pos(0), checkpoint_log_pos(0)
 {
   group_relay_log_name[0]= 0;
   group_master_log_name[0]= 0;
+  checkpoint_log_name[0]= 0;
   curr_group_exec_parts= new Database_ids(NAME_LEN);
 }
 
@@ -157,12 +158,24 @@ size_t Slave_worker::get_number_worker_f
   return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
 }
 
-bool Slave_worker::commit_positions(Log_event *ev)
+bool Slave_worker::commit_positions(Log_event *ev, Slave_job_group* ptr_g)
 {
   DBUG_ENTER("Slave_worker::checkpoint_positions");
 
   bool error= FALSE;
 
+  if (ptr_g->checkpoint_log_name != NULL)
+  {
+    strmake(checkpoint_log_name, ptr_g->checkpoint_log_name,
+            sizeof(checkpoint_log_name) - 1);
+    checkpoint_log_pos= ptr_g->checkpoint_log_pos;
+
+    my_free(ptr_g->checkpoint_log_name);
+    ptr_g->checkpoint_log_name= NULL;
+  }
+      
+  // TODO: update the group bitmap ptr_g->checkpoint_seqno 'th bit
+
   group_relay_log_pos= ev->future_event_relay_log_pos;
   group_master_log_pos= ev->log_pos;
   strmake(group_master_log_name, c_rli->get_group_master_log_name(),
@@ -502,16 +515,10 @@ void Slave_worker::slave_worker_ends_gro
       strcpy(group_relay_log_name, ptr_g->group_relay_log_name);
     }
 
-    // GAQ is updated with the checkpoint info
-    
-    // delete ptr_g->group_relay_log_name;  // C allocated
-    // ptr_g->group_relay_log_name= NULL;   // mark freed
-
-
-    // TODO: as it's same as ev->update_pos(w_rli) remove the latter.
-
     if (!(ev->get_type_code() == XID_EVENT && is_transactional()))
-      commit_positions(ev);
+    {
+      commit_positions(ev, ptr_g);
+    }
 
     ptr_g->group_master_log_pos= group_master_log_pos;
     ptr_g->group_relay_log_pos= group_relay_log_pos;
@@ -536,7 +543,7 @@ void Slave_worker::slave_worker_ends_gro
       my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
                                       (uchar*) key + 1, key[0]);
 
-    DBUG_ASSERT(entry && entry->usage != 0);
+    DBUG_ASSERT(entry && entry->usage != 0); // was used to break
     DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
 
     entry->usage--;

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-08 01:30:32 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-09 17:45:02 +0000
@@ -95,7 +95,7 @@ public:
 
 typedef struct st_slave_job_group
 {
-  char     *group_master_log_name; // This is used upon recovery.
+  char     *group_master_log_name; // (actually redundant)
   Dynamic_ids *db_ids;             // This is used upon recovery.
 
   my_off_t master_log_pos;       // B-event log_pos
@@ -114,6 +114,11 @@ typedef struct st_slave_job_group
   char     *group_relay_log_name; // The value is last seen relay-log 
   ulong worker_id;
   ulonglong total_seqno;
+
+  /* checkpoint coord are reset by CP and rotate:s */
+  uint  checkpoint_seqno;
+  my_off_t checkpoint_log_pos; // T-event lop_pos filled by W for CheckPoint
+  char*    checkpoint_log_name;
 } Slave_job_group;
 
 #define copy_job(from, to) \
@@ -274,6 +279,7 @@ public:
   volatile int curr_jobs; // the current assignments
   ulong usage_partition; // number of different partitions handled by this worker
   volatile bool relay_log_change_notified; // Coord sets and resets, W can read
+  volatile bool checkpoint_notified; // Coord sets and resets, W can read
   bool wq_overrun_set;  // W monitors its queue usage to incr/decr rli->mts_wqs_overrun
   /*
     We need to make this a dynamic field. /Alfranio
@@ -283,6 +289,8 @@ public:
   ulonglong group_relay_log_pos;
   char group_master_log_name[FN_REFLEN];
   ulonglong group_master_log_pos;
+  char checkpoint_log_name[FN_REFLEN];
+  ulonglong checkpoint_log_pos;
 
   int init_info();
   void end_info();
@@ -292,7 +300,7 @@ public:
 
   void slave_worker_ends_group(Log_event*, int);  // CGEP walk through to upd APH
 
-  bool commit_positions(Log_event *evt);
+  bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
 
 private:
   bool read_info(Rpl_info_handler *from);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-08 13:59:07 +0000
+++ b/sql/rpl_slave.cc	2010-12-09 17:45:02 +0000
@@ -3751,6 +3751,8 @@ pthread_handler_t handle_slave_worker(vo
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
+    // Todo: add necessary stuff to clean up after Q-log-event, a Q trans
+    rli->cleanup_context(thd, error);
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3932,6 +3934,8 @@ bool mts_checkpoint_routine(Relay_log_in
   error= rli->flush_info(TRUE);
   // end of commit_positions
 
+  rli->reset_notified_checkpoint();
+
 end:
   set_timespec_nsec(rli->last_clock, 0);
   
@@ -3962,9 +3966,13 @@ int slave_start_single_worker(Relay_log_
   Rpl_info_dummy *dummy_handler= new Rpl_info_dummy();
   w->w_rli->set_rpl_info_handler(dummy_handler);
   w->init_info();
+  
+  // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
+  // entry->usage assert
+  w->curr_group_exec_parts->dynamic_ids.elements= 0;
 
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
-
+  w->checkpoint_notified= FALSE;
   w->w_rli->workers= rli->workers; // shallow copying is sufficient
   w->w_rli->this_worker= w;
 
@@ -4053,6 +4061,8 @@ int slave_start_workers(Relay_log_info *
   rli->mts_total_groups= 0;
   rli->curr_group_seen_begin= 0;
   rli->run_query_in_parallel= opt_slave_run_query_in_parallel;
+  rli->checkpoint_seqno= 0;
+
   for (i= 0; i < n; i++)
   {
     if ((error= slave_start_single_worker(rli, i)))


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101209174502-qai8fb14z3hn96m4.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3237) WL#5569Andrei Elkin9 Dec