List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 9 2010 3:51pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3232)
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/wl5569-r3232_andrei.elkin%40oracle.com-20101207180139-08mzrmaq8k5qygaf/ based on revid:andrei.elkin@stripped

 3232 Andrei Elkin	2010-12-09 [merge]
      merging up to r 3232 containing checkpoint in the read event loop plus TABLE storage for W

    modified:
      mysql-test/suite/rpl/r/rpl_parallel.result
      mysql-test/suite/rpl/t/rpl_parallel-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel.test
      sql/rpl_info_dummy.cc
      sql/rpl_info_dummy.h
      sql/rpl_info_factory.cc
      sql/rpl_info_file.cc
      sql/rpl_info_file.h
      sql/rpl_info_handler.h
      sql/rpl_info_table.cc
      sql/rpl_info_table.h
      sql/rpl_mi.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/rpl_slave.h
=== modified file 'mysql-test/suite/rpl/r/rpl_parallel.result'
--- a/mysql-test/suite/rpl/r/rpl_parallel.result	2010-12-07 17:35:16 +0000
+++ b/mysql-test/suite/rpl/r/rpl_parallel.result	2010-12-07 18:01:39 +0000
@@ -25,3 +25,9 @@ Comparing tables master:test2.tm_nk and 
 Comparing tables master:test1.tm_nk and slave:test1.tm_nk
 Comparing tables master:test0.tm_nk and slave:test0.tm_nk
 set @@global.slave_parallel_workers= @save.slave_parallel_workers;
+SELECT * FROM mysql.slave_worker_info;
+Master_id	Worker_id	Partitions	Relay_log_name	Relay_log_pos	Master_log_name	Master_log_pos
+2	0	1 test0	./slave-relay-bin.000004	6452394	master-bin.000001	6452245
+2	1	1 test1	./slave-relay-bin.000004	6451588	master-bin.000001	6451439
+2	2	1 test2	./slave-relay-bin.000004	6450782	master-bin.000001	6450633
+2	3	1 test3	./slave-relay-bin.000004	6449976	master-bin.000001	6449827

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_parallel-slave.opt	2010-12-02 17:46:46 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel-slave.opt	2010-12-07 03:05:41 +0000
@@ -1,2 +1 @@
---relay-log-info-repository=FILE --slave-parallel-workers=4
-
+--relay-log-info-repository=TABLE --slave-parallel-workers=4

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel.test	2010-12-02 18:13:12 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel.test	2010-12-07 03:05:41 +0000
@@ -46,3 +46,4 @@ source extra/rpl_tests/rpl_parallel_load
 
 connection slave;
 set @@global.slave_parallel_workers= @save.slave_parallel_workers;
+SELECT * FROM mysql.slave_worker_info;

=== modified file 'sql/rpl_info_dummy.cc'
--- a/sql/rpl_info_dummy.cc	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_info_dummy.cc	2010-12-07 03:05:41 +0000
@@ -27,13 +27,15 @@ int Rpl_info_dummy::do_init_info(const u
   return 0;
 }
 
-int Rpl_info_dummy::do_prepare_info_for_read()
+int Rpl_info_dummy::do_prepare_info_for_read(const uint nidx
+                                             __attribute__((unused)))
 {
   if (abort) DBUG_ASSERT(0);
   return 0;
 }
 
-int Rpl_info_dummy::do_prepare_info_for_write()
+int Rpl_info_dummy::do_prepare_info_for_write(const uint nidx
+                                              __attribute__((unused)))
 {
   if (abort) DBUG_ASSERT(0);
   return 0;

=== modified file 'sql/rpl_info_dummy.h'
--- a/sql/rpl_info_dummy.h	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_info_dummy.h	2010-12-07 03:05:41 +0000
@@ -41,8 +41,8 @@ private:
                     const bool force);
   int do_remove_info(const ulong *uidx, const uint nidx);
 
-  int do_prepare_info_for_read();
-  int do_prepare_info_for_write();
+  int do_prepare_info_for_read(const uint nidx);
+  int do_prepare_info_for_write(const uint nidx);
   bool do_set_info(const int pos, const char *value);
   bool do_set_info(const int pos, const int value);
   bool do_set_info(const int pos, const ulong value);

=== modified file 'sql/rpl_info_factory.cc'
--- a/sql/rpl_info_factory.cc	2010-11-30 02:08:01 +0000
+++ b/sql/rpl_info_factory.cc	2010-12-07 03:05:41 +0000
@@ -460,7 +460,7 @@ Slave_worker *Rpl_info_factory::create_w
                                     info_fname)))
       goto err;
 
-    if (!(w_table= new Rpl_info_table(worker->get_number_worker_fields() + 1,
+    if (!(w_table= new Rpl_info_table(worker->get_number_worker_fields() + 2,
                                       WORKER_SCHEMA, WORKER_TABLE)))
       goto err;
 

=== modified file 'sql/rpl_info_file.cc'
--- a/sql/rpl_info_file.cc	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_info_file.cc	2010-12-07 03:05:41 +0000
@@ -107,14 +107,16 @@ file '%s')", info_fname);
   DBUG_RETURN(error);
 }
 
-int Rpl_info_file::do_prepare_info_for_read()
+int Rpl_info_file::do_prepare_info_for_read(const uint nidx
+                                            __attribute__((unused)))
 {
   cursor= 0;
   prv_error= FALSE;
   return (reinit_io_cache(&info_file, READ_CACHE, 0L, 0, 0));
 }
 
-int Rpl_info_file::do_prepare_info_for_write()
+int Rpl_info_file::do_prepare_info_for_write(const uint nidx
+                                             __attribute__((unused)))
 {
   cursor= 0;
   prv_error= FALSE;

=== modified file 'sql/rpl_info_file.h'
--- a/sql/rpl_info_file.h	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_info_file.h	2010-12-07 03:05:41 +0000
@@ -48,8 +48,8 @@ private:
                     const bool force);
   int do_remove_info(const ulong *uidx, const uint nidx);
 
-  int do_prepare_info_for_read();
-  int do_prepare_info_for_write();
+  int do_prepare_info_for_read(const uint nidx);
+  int do_prepare_info_for_write(const uint nidx);
   bool do_set_info(const int pos, const char *value);
   bool do_set_info(const int pos, const int value);
   bool do_set_info(const int pos, const ulong value);

=== modified file 'sql/rpl_info_handler.h'
--- a/sql/rpl_info_handler.h	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_info_handler.h	2010-12-07 03:05:41 +0000
@@ -118,9 +118,9 @@ public:
     @retval FALSE No error
     @retval TRUE  Failure
   */
-  int prepare_info_for_read()
+  int prepare_info_for_read(const uint nidx)
   {
-    return (do_prepare_info_for_read());
+    return (do_prepare_info_for_read(nidx));
   }
 
   /**
@@ -130,9 +130,9 @@ public:
     @retval FALSE No error
     @retval TRUE  Failure
   */
-  int prepare_info_for_write()
+  int prepare_info_for_write(const uint nidx)
   {
-    return (do_prepare_info_for_write());
+    return (do_prepare_info_for_write(nidx));
   }
 
   /**
@@ -310,8 +310,8 @@ private:
                             const bool force)= 0;
   virtual int do_remove_info(const ulong *uidx, const uint nidx)= 0;
   virtual void do_end_info(const ulong *uidx, const uint nidx)= 0;
-  virtual int do_prepare_info_for_read()= 0;
-  virtual int do_prepare_info_for_write()= 0;
+  virtual int do_prepare_info_for_read(const uint nidx)= 0;
+  virtual int do_prepare_info_for_write(const uint nidx)= 0;
 
   virtual bool do_set_info(const int pos, const char *value)= 0;
   virtual bool do_set_info(const int pos, const ulong value)= 0;

=== modified file 'sql/rpl_info_table.cc'
--- a/sql/rpl_info_table.cc	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_info_table.cc	2010-12-07 03:05:41 +0000
@@ -315,19 +315,19 @@ void Rpl_info_table::do_end_info(const u
 {
 }
 
-int Rpl_info_table::do_prepare_info_for_read()
+int Rpl_info_table::do_prepare_info_for_read(const uint nidx)
 {
   if (!field_values)
     return TRUE;
 
-  cursor= 1;
+  cursor= nidx;
 
   return FALSE;
 }
 
-int Rpl_info_table::do_prepare_info_for_write()
+int Rpl_info_table::do_prepare_info_for_write(const uint nidx)
 {
-  return(do_prepare_info_for_read());
+  return(do_prepare_info_for_read(nidx));
 }
 
 bool Rpl_info_table::do_set_info(const int pos, const char *value)

=== modified file 'sql/rpl_info_table.h'
--- a/sql/rpl_info_table.h	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_info_table.h	2010-12-07 03:05:41 +0000
@@ -67,8 +67,8 @@ private:
                     const bool force);
   int do_remove_info(const ulong *uidx, const uint nidx);
 
-  int do_prepare_info_for_read();
-  int do_prepare_info_for_write();
+  int do_prepare_info_for_read(const uint nidx);
+  int do_prepare_info_for_write(const uint nidx);
   bool do_set_info(const int pos, const char *value);
   bool do_set_info(const int pos, const int value);
   bool do_set_info(const int pos, const ulong value);

=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc	2010-12-01 19:15:08 +0000
+++ b/sql/rpl_mi.cc	2010-12-07 03:05:41 +0000
@@ -77,7 +77,7 @@ const char *info_mi_fields []=
 };
 
 Master_info::Master_info()
-   :Rpl_info_coordinator("I/O"),
+   :Rpl_info_coordinator("IO"),
    ssl(0), ssl_verify_server_cert(0),
    port(MYSQL_PORT), connect_retry(DEFAULT_CONNECT_RETRY),
    clock_diff_with_master(0), heartbeat_period(0),
@@ -300,7 +300,7 @@ bool Master_info::read_info(Rpl_info_han
      is this.
   */
 
-  if (from->prepare_info_for_read() || 
+  if (from->prepare_info_for_read(nidx) || 
       from->get_info(master_log_name, sizeof(master_log_name), ""))
     DBUG_RETURN(TRUE);
 
@@ -427,7 +427,7 @@ bool Master_info::write_info(Rpl_info_ha
      of file we don't care about this garbage.
   */
 
-  if (to->prepare_info_for_write() ||
+  if (to->prepare_info_for_write(nidx) ||
       to->set_info((int) LINES_IN_MASTER_INFO) ||
       to->set_info(master_log_name) ||
       to->set_info((ulong)master_log_pos) ||

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-04 17:14:50 +0000
+++ b/sql/rpl_rli.cc	2010-12-07 03:05:41 +0000
@@ -1281,6 +1281,9 @@ int Relay_log_info::init_info()
     if (hot_log)
       mysql_mutex_unlock(log_lock);
 
+    // The correct place should be here. // ANDREI
+    // mts_recovery_routine(this);
+
     DBUG_RETURN(0);
   }
 
@@ -1644,7 +1647,7 @@ bool Relay_log_info::read_info(Rpl_info_
     it is line count and not binlog name (new format) it will be
     overwritten by the second row later.
   */
-  if (from->prepare_info_for_read() ||
+  if (from->prepare_info_for_read(nidx) ||
       from->get_info(group_relay_log_name, sizeof(group_relay_log_name), ""))
     DBUG_RETURN(TRUE);
 
@@ -1691,7 +1694,7 @@ bool Relay_log_info::write_info(Rpl_info
   */
   //DBUG_ASSERT(!belongs_to_client());
 
-  if (to->prepare_info_for_write() ||
+  if (to->prepare_info_for_write(nidx) ||
       to->set_info((int) LINES_IN_RELAY_LOG_INFO_WITH_DELAY) ||
       to->set_info(group_relay_log_name) ||
       to->set_info((ulong) group_relay_log_pos) ||

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-12-03 16:56:11 +0000
+++ b/sql/rpl_rli.h	2010-12-07 03:05:41 +0000
@@ -333,7 +333,7 @@ public:
   char slave_patternload_file[FN_REFLEN]; 
   size_t slave_patternload_file_size;
 
-  struct timespec curr_clock, last_clock;
+  struct timespec last_clock;
   float lwm_period;
 
   Relay_log_info(bool is_slave_recovery);

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-07 17:35:16 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-07 18:01:39 +0000
@@ -18,8 +18,12 @@ const char *info_slave_worker_fields []=
   "master_log_pos"
 };
 
-Slave_worker::Slave_worker(const char* type): Rpl_info_worker(type)
+Slave_worker::Slave_worker(const char* type)
+  : Rpl_info_worker(type), group_relay_log_pos(0),
+  group_master_log_pos(0)
 {
+  group_relay_log_name[0]= 0;
+  group_master_log_name[0]= 0;
   curr_group_exec_parts= new Database_ids();
 }
 
@@ -105,7 +109,7 @@ bool Slave_worker::read_info(Rpl_info_ha
   ulong temp_group_relay_log_pos= 0;
   ulong temp_group_master_log_pos= 0;
 
-  if (from->prepare_info_for_read())
+  if (from->prepare_info_for_read(nidx))
     DBUG_RETURN(TRUE);
 
   if (from->get_info(curr_group_exec_parts, (Dynamic_ids *) NULL) ||
@@ -137,9 +141,8 @@ bool Slave_worker::write_info(Rpl_info_h
      of file we don't care about this garbage.
   */
 
-  if (to->prepare_info_for_write() ||
-      //to->set_info(curr_group_exec_parts) ||
-      to->set_info("") ||
+  if (to->prepare_info_for_write(nidx) ||
+      to->set_info(curr_group_exec_parts) ||
       to->set_info(group_relay_log_name) ||
       to->set_info((ulong)group_relay_log_pos) ||
       to->set_info(group_master_log_name) ||

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-07 17:35:16 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-07 18:01:39 +0000
@@ -95,6 +95,9 @@ public:
 
 typedef struct st_slave_job_group
 {
+  char     *group_master_log_name; // This is used upon recovery.
+  Dynamic_ids *db_ids;             // This is used upon recovery.
+
   my_off_t master_log_pos;       // B-event log_pos
   my_off_t group_master_log_pos; // T-event lop_pos filled by W for CheckPoint
   my_off_t group_relay_log_pos;  // filled by W
@@ -113,6 +116,69 @@ typedef struct st_slave_job_group
   ulonglong total_seqno;
 } Slave_job_group;
 
+#define copy_job(from, to) \
+  do \
+  { \
+    to.group_master_log_pos= from.group_master_log_pos; \
+    to.group_relay_log_pos= from.group_relay_log_pos; \
+    to.group_relay_log_name= from.group_relay_log_name; \
+    to.group_master_log_name= from.group_master_log_name; \
+    to.db_ids= from.db_ids; \
+    to.worker_id= from.worker_id; \
+  } while (0)
+
+#define exchange_jobs(j1, j2) \
+  do \
+  { \
+    Slave_job_group tmp; \
+    copy_job(j2, tmp); \
+    copy_job(j1, j2); \
+    copy_job(tmp, j1); \
+  } while (0)
+
+#define greater_jobs(j1, j2) \
+  (j1.group_relay_log_pos > j2.group_relay_log_pos)
+
+#define lower_jobs(j1, j2) \
+  (j1.group_relay_log_pos < j2.group_relay_log_pos)
+
+#define init_job(from, to) \
+  do \
+  { \
+    to.worker_id= from->id; \
+    fprintf(stderr, "DEBUGGING Worker-Id %lu Worker-Id %lu\n", \
+                     to.worker_id, from->id); \
+    to.group_relay_log_pos= from->group_relay_log_pos; \
+    fprintf(stderr, "DEBUGGING group_relay_log_pos %lu group_relay_log_pos %lu\n", \
+                     (ulong) to.group_relay_log_pos, (ulong) from->group_relay_log_pos); \
+    to.group_relay_log_name= from->group_relay_log_name; \
+    fprintf(stderr, "DEBUGGING group_relay_log_name %s group_relay_log_name %s\n", \
+                     to.group_relay_log_name, from->group_relay_log_name); \
+    to.group_master_log_pos= from->group_master_log_pos; \
+    fprintf(stderr, "DEBUGGING to group_master_log_pos %lu group_master_log_pos %lu\n", \
+                     (ulong) to.group_master_log_pos, (ulong) from->group_master_log_pos); \
+    to.group_master_log_name= from->group_master_log_name; \
+    fprintf(stderr, "DEBUGGING to group_master_log_name %s group_master_log_pos %s\n", \
+                     to.group_master_log_name, from->group_master_log_name); \
+    to.db_ids= from->curr_group_exec_parts; \
+  } while (0)
+
+#define debug_jobs(jobs, size) \
+  do \
+  { \
+    for (uint pos= 0; pos < size; pos++) \
+    { \
+      fprintf(stderr, "DEBUGGING Worker-Id %lu, " \
+                       "group_relay_log_name %s, group_relay_log_pos %lu, " \
+                       "group_master_log_name %s, group_master_lo_pos %lu\n", \
+                       jobs[pos].worker_id, \
+                       jobs[pos].group_relay_log_name, \
+                       (ulong) jobs[pos].group_relay_log_pos, \
+                       jobs[pos].group_master_log_name, \
+                       (ulong) jobs[pos].group_master_log_pos); \
+    } \
+  } while (0)
+
 /**
   Group Assigned Queue whose first element identifies first gap
   in committed sequence. The head of the queue is therefore next to 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-07 17:35:16 +0000
+++ b/sql/rpl_slave.cc	2010-12-07 18:01:39 +0000
@@ -168,7 +168,8 @@ static int terminate_slave_thread(THD *t
                                   bool skip_lock);
 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-static bool mts_checkpoint_routine(Relay_log_info *rli, bool locked);
+static bool mts_checkpoint_routine(Relay_log_info *rli, ulong period, bool locked);
+bool mts_recovery_routine(Relay_log_info *rli);
 
 /*
   Find out which replications threads are running
@@ -364,8 +365,15 @@ int init_recovery(Master_info* mi, const
 {
   DBUG_ENTER("init_recovery");
 
+  int error= 0;
   Relay_log_info *rli= mi->rli;
-  const char *group_master_log_name=  rli->get_group_master_log_name();
+  char *group_master_log_name= NULL;
+
+  // ANDREI The correct place should be here.
+  // if (rli->is_parallel_exec() && !(error= mts_recovery_routine(rli)))
+  //  goto err;
+
+  group_master_log_name= const_cast<char *>(rli->get_group_master_log_name());
   if (group_master_log_name[0])
   {
     mi->set_master_log_pos(max(BIN_LOG_HEADER_SIZE,
@@ -381,7 +389,8 @@ int init_recovery(Master_info* mi, const
     rli->set_event_relay_log_pos(BIN_LOG_HEADER_SIZE);
   }
 
-  DBUG_RETURN(0);
+err:
+  DBUG_RETURN(error);
 }
 
 int init_info(Master_info* mi, bool ignore_if_no_info, int thread_mask)
@@ -3610,6 +3619,70 @@ err:
   DBUG_RETURN(0); 
 }
 
+int mts_recovery_partition(Slave_job_group *jobs, int p /* pos_ini */, int r /* pos_end */)
+{
+  Slave_job_group x;
+
+  copy_job(jobs[p], x);
+
+  int i= p - 1;
+  int j= r + 1;
+
+  for (;;)
+  {
+    do
+    {
+      j= j - 1;
+    }
+    while (greater_jobs(jobs[j], x));
+
+    do
+    {
+      i= i + 1;
+    }
+    while (lower_jobs(jobs[i], x));
+
+    if (i < j)
+       exchange_jobs(jobs[i], jobs[j]); 
+    else
+      return j;
+  }
+}
+
+void mts_recovery_quicksort(Slave_job_group *jobs, int p /* pos_ini */, int r /* pos_end */)
+{
+  if (p < r)
+  {
+    int q= mts_recovery_partition(jobs, p, r);
+    mts_recovery_quicksort(jobs, p, q);
+    mts_recovery_quicksort(jobs, q + 1, r);
+  }
+}
+
+bool mts_recovery_routine(Relay_log_info *rli)
+{
+  DBUG_ENTER("mts_recovery_routine");
+
+  DBUG_ASSERT(rli->workers.elements > 0);
+  Slave_job_group *jobs= new Slave_job_group[rli->workers.elements];
+  
+  for (uint i= 0; i < rli->workers.elements; i++)
+  {
+    Slave_worker *w_i;
+    get_dynamic(&rli->workers, (uchar *) &w_i, i);
+    init_job(w_i, jobs[i]);
+  }
+
+  mts_recovery_quicksort(jobs, 0, rli->workers.elements - 1);
+
+  debug_jobs(jobs, rli->workers.elements);
+
+  // TODO -- ALFRANIO CORE OF RECOVERY
+
+  delete []jobs;
+  DBUG_RETURN(rli->flush_info(TRUE));
+}
+
 /**
    Processing rli->gaq to find out the low-water-mark coordinates
    stored into the cental recovery table.
@@ -3617,13 +3690,25 @@ err:
 
    @return FALSE success, TRUE otherwise
 */
-bool mts_checkpoint_routine(Relay_log_info *rli, bool locked)
+bool mts_checkpoint_routine(Relay_log_info *rli, ulong period, bool locked)
 {
-  bool error= FALSE;
   ulong cnt;
+  bool error= FALSE;
+  struct timespec curr_clock;
 
   DBUG_ENTER("checkpoint_routine");
 
+  set_timespec_nsec(curr_clock, 0);
+  ulong diff= diff_timespec(curr_clock, rli->last_clock);
+  if (diff < period)
+  {
+    /*
+      We do not need to execute the checkpoint now because
+      the time elapsed is not enough.
+    */
+    DBUG_RETURN(FALSE);
+  }
+      
   if (!(cnt= rli->gaq->move_queue_head(&rli->workers)))
     goto end;
 
@@ -3673,6 +3758,7 @@ bool mts_checkpoint_routine(Relay_log_in
   // end of commit_positions
 
 end:
+  set_timespec_nsec(rli->last_clock, 0);
   
   DBUG_RETURN(error);
 }
@@ -3704,8 +3790,6 @@ int slave_start_single_worker(Relay_log_
 
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
 
-  // ALFRANIO --> The recovery procedure must be introduced here.
-
   w->w_rli->workers= rli->workers; // shallow copying is sufficient
   w->w_rli->this_worker= w;
 
@@ -4033,6 +4117,9 @@ log '%s' at position %s, relay log '%s' 
     goto err;
   }
 
+  /* Recovery routine */
+  mts_recovery_routine(rli);
+
   /* execute init_slave variable */
   if (opt_init_slave.length)
   {
@@ -5244,25 +5331,18 @@ static Log_event* next_event(Relay_log_i
       */
       rli->set_future_event_relay_log_pos(my_b_tell(cur_log));
       ev->future_event_relay_log_pos= rli->get_future_event_relay_log_pos();
-      if (hot_log)
-        mysql_mutex_unlock(log_lock);
+
       /* 
          MTS checkpoint in the successful read branch 
       */
       if (rli->is_parallel_exec() && rli->lwm_period != 0.0)
       {
-        int ret= 0;
-        struct timespec waittime;
         ulong period= rli->lwm_period * 1000000000UL;
-        set_timespec_nsec(rli->curr_clock, 0);
-        ulong diff= diff_timespec(rli->curr_clock, rli->last_clock);
-        if (diff > period)
-        {
-          mts_checkpoint_routine(rli, TRUE);
-          set_timespec_nsec(rli->last_clock, 0);
-        }
+        mts_checkpoint_routine(rli, period, TRUE); // ALFRANIO --- WHAT TO DO with ERRORS?
       }
 
+      if (hot_log)
+        mysql_mutex_unlock(log_lock);
       DBUG_RETURN(ev);
     }
     DBUG_ASSERT(thd==rli->info_thd);
@@ -5384,16 +5464,10 @@ static Log_event* next_event(Relay_log_i
         {
           int ret= 0;
           struct timespec waittime;
+          ulong period= rli->lwm_period * 1000000000UL;
           do
           {
-            ulong period= rli->lwm_period * 1000000000UL;
-            set_timespec_nsec(rli->curr_clock, 0);
-            ulong diff= diff_timespec(rli->curr_clock, rli->last_clock);
-            if (diff > period)
-            {
-              mts_checkpoint_routine(rli, FALSE);
-              set_timespec_nsec(rli->last_clock, 0);
-            }
+            mts_checkpoint_routine(rli, period, FALSE); // ALFRANIO ERROR
             set_timespec_nsec(waittime, period);
             thd->enter_cond(log_cond, log_lock,
                             "Slave has read all relay log; "

=== modified file 'sql/rpl_slave.h'
--- a/sql/rpl_slave.h	2010-10-25 10:39:01 +0000
+++ b/sql/rpl_slave.h	2010-12-07 03:05:41 +0000
@@ -235,6 +235,8 @@ extern char *master_ssl_cipher, *master_
        
 extern I_List<THD> threads;
 
+bool mts_recovery_routine(Relay_log_info *rli);
+
 #endif /* HAVE_REPLICATION */
 
 /* masks for start/stop operations on io and sql slave threads */


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101209155112-219sizqfqz061tww.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3232) Andrei Elkin9 Dec