List:Commits« Previous MessageNext Message »
From:Alfranio Correia Date:December 10 2010 12:11pm
Subject:bzr push into mysql-next-mr-wl5569 branch (alfranio.correia:3240 to 3241)
WL#5599
View as plain text  
 3241 Alfranio Correia	2010-12-10
      WL#5599
      
      Sketch of a recovery routine.

    modified:
      mysql-test/r/mysqld--help-notwin.result
      mysql-test/r/mysqld--help-win.result
      mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
      sql/dynamic_ids.cc
      sql/dynamic_ids.h
      sql/mysqld.cc
      sql/mysqld.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sys_vars.cc
 3240 Alfranio Correia	2010-12-09
      WL#5599
      
      Fixed the use of the dummy repository.

    modified:
      mysql-test/suite/rpl/t/rpl_parallel-slave.opt
      sql/rpl_info_dummy.cc
      sql/rpl_info_dummy.h
      sql/rpl_info_factory.cc
      sql/rpl_info_factory.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/r/mysqld--help-notwin.result'
--- a/mysql-test/r/mysqld--help-notwin.result	2010-12-09 13:23:19 +0000
+++ b/mysql-test/r/mysqld--help-notwin.result	2010-12-10 12:10:20 +0000
@@ -339,6 +339,10 @@ The following options may be given as th
  --min-examined-row-limit=# 
  Don't write queries to slow log that examine fewer rows
  than that
+ --mts-checkpoint-period=# 
+ Gather workers' activities and synchronously flush relay
+ log info to disk after every #th mili-seconds. Use 0
+ (default) to disable checkpoint
  --mts-partition-hash-soft-max=# 
  Number of records in the mts partition hash below which
  entries with zero usage are tolerated
@@ -883,6 +887,7 @@ max-user-connections 0
 max-write-lock-count 18446744073709551615
 memlock FALSE
 min-examined-row-limit 0
+mts-checkpoint-period 0
 mts-partition-hash-soft-max 16
 mts-pending-jobs-size-max 16777216
 mts-slave-worker-queue-len-max 40000

=== modified file 'mysql-test/r/mysqld--help-win.result'
--- a/mysql-test/r/mysqld--help-win.result	2010-12-09 13:23:19 +0000
+++ b/mysql-test/r/mysqld--help-win.result	2010-12-10 12:10:20 +0000
@@ -338,6 +338,10 @@ The following options may be given as th
  --min-examined-row-limit=# 
  Don't write queries to slow log that examine fewer rows
  than that
+ --mts-checkpoint-period=# 
+ Gather workers' activities and synchronously flush relay
+ log info to disk after every #th mili-seconds. Use 0
+ (default) to disable checkpoint
  --mts-partition-hash-soft-max=# 
  Number of records in the mts partition hash below which
  entries with zero usage are tolerated
@@ -886,6 +890,7 @@ max-user-connections 0
 max-write-lock-count 18446744073709551615
 memlock FALSE
 min-examined-row-limit 0
+mts-checkpoint-period 0
 mts-partition-hash-soft-max 16
 mts-pending-jobs-size-max 16777216
 mts-slave-worker-queue-len-max 40000

=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2010-12-07 17:35:16 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2010-12-10 12:10:20 +0000
@@ -6,6 +6,7 @@
 #
 
 source include/master-slave.inc;
+source include/have_binlog_format_row.inc;
 
 connection slave;
 

=== modified file 'sql/dynamic_ids.cc'
--- a/sql/dynamic_ids.cc	2010-12-08 12:59:07 +0000
+++ b/sql/dynamic_ids.cc	2010-12-10 12:10:20 +0000
@@ -1,6 +1,16 @@
 #include "dynamic_ids.h"
 
-Dynamic_ids::Dynamic_ids(size_t size)
+int cmp_string(const void *id1, const void *id2)
+{
+  return strcmp((char *) id1, (char *) id2);
+}
+
+int cmp_ulong(const void *id1, const void *id2)
+{
+  return ((*(ulong *) id1) - (* (ulong *)id2));
+}
+
+Dynamic_ids::Dynamic_ids(size_t param_size): size(param_size)
 {
   my_init_dynamic_array(&dynamic_ids, size, 16, 16);
 }
@@ -58,6 +68,15 @@ bool Server_ids::do_pack_dynamic_ids(Str
   DBUG_RETURN(FALSE);
 }
 
+bool Server_ids::do_search_id(const void *id)
+{
+  return (bsearch((ulong *) id, dynamic_ids.buffer,
+          dynamic_ids.elements, size,
+          (int (*) (const void*, const void*))
+          cmp_ulong) != NULL);
+}
+
+
 bool Database_ids::do_unpack_dynamic_ids(char *param_dynamic_ids)
 {
   char *token= NULL, *last= NULL;
@@ -115,3 +134,11 @@ bool Database_ids::do_pack_dynamic_ids(S
 
   DBUG_RETURN(FALSE);
 }
+
+bool Database_ids::do_search_id(const void *id)
+{
+  return (bsearch((const char *) id, dynamic_ids.buffer,
+          dynamic_ids.elements, size,
+          (int (*) (const void*, const void*))
+          cmp_string) != NULL);
+}

=== modified file 'sql/dynamic_ids.h'
--- a/sql/dynamic_ids.h	2010-12-08 12:59:07 +0000
+++ b/sql/dynamic_ids.h	2010-12-10 12:10:20 +0000
@@ -25,7 +25,7 @@ class Dynamic_ids
 public:
     DYNAMIC_ARRAY dynamic_ids;
 
-    Dynamic_ids(size_t size);
+    Dynamic_ids(size_t param_size);
     virtual ~Dynamic_ids();
 
     bool pack_dynamic_ids(String *buffer)
@@ -38,9 +38,18 @@ public:
       return(do_unpack_dynamic_ids(param_dynamic_ids));
     }
 
+    bool search_id(const void *id)
+    {
+      return (do_search_id(id));
+    }
+
+protected:
+    size_t size;
+
 private:
     virtual bool do_pack_dynamic_ids(String *buffer)= 0;
     virtual bool do_unpack_dynamic_ids(char *param_dynamic_ids)= 0;
+    virtual bool do_search_id(const void *id)= 0;
 };
 
 class Server_ids : public Dynamic_ids
@@ -52,6 +61,7 @@ public:
 private:
     bool do_pack_dynamic_ids(String *buffer);
     bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+    bool do_search_id(const void *id);
 };
 
 class Database_ids : public Dynamic_ids
@@ -63,5 +73,6 @@ public:
 private:
     bool do_pack_dynamic_ids(String *buffer);
     bool do_unpack_dynamic_ids(char *param_dynamic_ids);
+    bool do_search_id(const void *id);
 };
 #endif

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2010-12-08 00:33:48 +0000
+++ b/sql/mysqld.cc	2010-12-10 12:10:20 +0000
@@ -503,7 +503,8 @@ ulong prepared_stmt_count=0;
 ulong thread_id=1L,current_pid;
 ulong slow_launch_threads = 0;
 uint sync_binlog_period= 0, sync_relaylog_period= 0,
-     sync_relayloginfo_period= 0, sync_masterinfo_period= 0;
+     sync_relayloginfo_period= 0, sync_masterinfo_period= 0,
+     mts_checkpoint_period= 0;
 ulong expire_logs_days = 0;
 
 const double log_10[] = {

=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h	2010-12-08 00:33:48 +0000
+++ b/sql/mysqld.h	2010-12-10 12:10:20 +0000
@@ -129,7 +129,8 @@ extern ulong current_pid;
 extern ulong expire_logs_days;
 extern my_bool relay_log_recovery;
 extern uint sync_binlog_period, sync_relaylog_period, 
-            sync_relayloginfo_period, sync_masterinfo_period;
+            sync_relayloginfo_period, sync_masterinfo_period,
+            mts_checkpoint_period;
 extern ulong opt_tc_log_size, tc_log_max_pages_used, tc_log_page_size;
 extern ulong tc_log_page_waits;
 extern my_bool relay_log_purge, opt_innodb_safe_binlog, opt_innodb;

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-12-08 01:30:32 +0000
+++ b/sql/rpl_rli.cc	2010-12-10 12:10:20 +0000
@@ -78,13 +78,6 @@ Relay_log_info::Relay_log_info(bool is_s
     group_master_log_name[0]= 0;
   until_log_name[0]= ign_master_log_name_end[0]= 0;
 
-  /* 
-    We need to decide if you are going to use an
-    option and store it in a storage.
-
-    100 msec. 
-  */
-  lwm_period= 0.100;
   set_timespec_nsec(last_clock, 0);
 
   bzero((char*) &cache_buf, sizeof(cache_buf));

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-12-08 00:33:48 +0000
+++ b/sql/rpl_rli.h	2010-12-10 12:10:20 +0000
@@ -344,7 +344,6 @@ public:
   size_t slave_patternload_file_size;
 
   struct timespec last_clock;
-  float lwm_period;
 
   Relay_log_info(bool is_slave_recovery);
   virtual ~Relay_log_info();

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-08 16:15:41 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-10 12:10:20 +0000
@@ -120,41 +120,13 @@ typedef struct st_slave_job_group
   do \
   { \
     to.worker_id= from->id; \
-    fprintf(stderr, "DEBUGGING Worker-Id %lu Worker-Id %lu\n", \
-                     to.worker_id, from->id); \
     to.group_relay_log_pos= from->group_relay_log_pos; \
-    fprintf(stderr, "DEBUGGING group_relay_log_pos %lu group_relay_log_pos %lu\n", \
-                     (ulong) to.group_relay_log_pos, (ulong) from->group_relay_log_pos); \
     to.group_relay_log_name= from->group_relay_log_name; \
-    fprintf(stderr, "DEBUGGING group_relay_log_name %s group_relay_log_name %s\n", \
-                     to.group_relay_log_name, from->group_relay_log_name); \
     to.group_master_log_pos= from->group_master_log_pos; \
-    fprintf(stderr, "DEBUGGING to group_master_log_pos %lu group_master_log_pos %lu\n", \
-                     (ulong) to.group_master_log_pos, (ulong) from->group_master_log_pos); \
     to.group_master_log_name= from->group_master_log_name; \
-    fprintf(stderr, "DEBUGGING to group_master_log_name %s group_master_log_pos %s\n", \
-                     to.group_master_log_name, from->group_master_log_name); \
     to.db_ids= from->curr_group_exec_parts; \
   } while (0)
 
-#define debug_jobs(jobs) \
-  do \
-  { \
-     Slave_job_group job; \
-    for (uint pos= 0; pos < jobs.elements; pos++) \
-    { \
-      get_dynamic(&jobs, (uchar *) &job, pos); \
-      fprintf(stderr, "DEBUGGING Worker-Id %lu, " \
-                       "group_relay_log_name %s, group_relay_log_pos %lu, " \
-                       "group_master_log_name %s, group_master_lo_pos %lu\n", \
-                       job.worker_id, \
-                       job.group_relay_log_name, \
-                       (ulong) job.group_relay_log_pos, \
-                       job.group_master_log_name, \
-                       (ulong) job.group_master_log_pos); \
-    } \
-  } while (0)
-
 /**
   Group Assigned Queue whose first element identifies first gap
   in committed sequence. The head of the queue is therefore next to 

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-09 16:17:32 +0000
+++ b/sql/rpl_slave.cc	2010-12-10 12:10:20 +0000
@@ -171,7 +171,7 @@ static int terminate_slave_thread(THD *t
                                   bool skip_lock);
 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
 int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
-static bool mts_checkpoint_routine(Relay_log_info *rli, ulong period, bool locked);
+static bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period, bool locked);
 bool mts_recovery_routine(Relay_log_info *rli);
 
 /*
@@ -3794,17 +3794,23 @@ err:
   DBUG_RETURN(0); 
 }
 
-int mts_jobs_cmp(Slave_job_group *id1, Slave_job_group *id2)
+int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
 {
   return id1->group_relay_log_pos < id2->group_relay_log_pos ? -1 :
          (id1->group_relay_log_pos > id2->group_relay_log_pos ? 1 : 0);
 }
 
-
 bool mts_recovery_routine(Relay_log_info *rli)
 {
+  Log_event *ev= NULL, *desc= NULL;
+  char *log_name= NULL;
+  const char *errmsg= NULL;
+  bool error= TRUE;
   DYNAMIC_ARRAY jobs;
   Slave_job_group job;
+  IO_CACHE log;
+  File file;
+  MY_STAT s;
 
   DBUG_ENTER("mts_recovery_routine");
   DBUG_ASSERT(rli->workers.elements > 0);
@@ -3819,16 +3825,157 @@ bool mts_recovery_routine(Relay_log_info
     get_job(worker, job);
     insert_dynamic(&jobs, (uchar*) &job);
   }
-  sort_dynamic(&jobs, (qsort_cmp) change_master_server_id_cmp);
+  sort_dynamic(&jobs, (qsort_cmp) mts_recovery_cmp);
   
   DBUG_ASSERT(rli->workers.elements == jobs.elements);
-  debug_jobs(jobs);
 
-  // TODO -- ALFRANIO CORE OF RECOVERY
+  Format_description_log_event fdle(BINLOG_VERSION);
+  if (!fdle.is_valid())
+    goto end;
+
+  for (uint pos= 0; pos < jobs.elements; pos++)
+  {
+    String buffer;
+    get_dynamic(&jobs, (uchar *) &job, pos);
+
+    job.db_ids->pack_dynamic_ids(&buffer);
+    sql_print_information("Recoverying relay log info based on Worker-Id %lu, partitions %s, "
+                          "group_relay_log_name %s, group_relay_log_pos %lu, "
+                          "group_master_log_name %s, group_master_lo_pos %lu",
+                          job.worker_id,
+                          buffer.c_ptr_safe(),
+                          job.group_relay_log_name,
+                          (ulong) job.group_relay_log_pos,
+                          job.group_master_log_name,
+                          (ulong) job.group_master_log_pos);
+
+    if (job.group_relay_log_name == NULL || job.group_relay_log_pos == 0 ||
+        rli->get_group_relay_log_pos() >= job.group_relay_log_pos)
+      continue;
+
+    if (log_name == NULL || strcmp(log_name, job.group_relay_log_name))
+    {
+      if (ev)
+      {
+        delete ev;
+        ev= NULL;
+      }
+
+      if (desc)
+      {
+        delete desc;
+        desc= NULL;
+      }
+
+      if (log_name)
+      {
+        end_io_cache(&log);
+        mysql_file_close(file, MYF(MY_WME));
+        log_name= NULL;
+      }
+
+      if ((file= open_binlog(&log, job.group_relay_log_name, &errmsg)) < 0)
+      {
+        sql_print_error("%s", errmsg);
+        goto end;
+      }
+      log_name= job.group_relay_log_name;
+
+      my_stat(log_name, &s, MYF(0));
+
+      if (!((desc= Log_event::read_log_event(&log, 0, &fdle,
+                                             opt_master_verify_checksum)) &&
+           desc->get_type_code() == FORMAT_DESCRIPTION_EVENT))
+      {
+        goto end;
+      }
+    }
+    my_b_seek(&log, job.group_relay_log_pos);
+    sql_print_information("Recoverying relay log info. Checking relay log name "
+                          "%s from pos %lu, maxsize %lu.", log_name,
+                          (ulong) job.group_relay_log_pos, (ulong) s.st_size);
+
+    bool found= FALSE;
+    int res= 0;
+    while ((ev= Log_event::read_log_event(&log, 0, &fdle,
+            opt_master_verify_checksum)))
+    {
+      DBUG_ASSERT(ev->is_valid());
+
+      String buffer;
+      const char *db= ev->get_db();
+      
+      if (db != NULL)
+      {
+        buffer.set_int(strlen(db), FALSE, &my_charset_bin);
+        buffer.append(db);
+        found= job.db_ids->search_id(buffer.c_ptr_safe());
+      }
 
+      sql_print_information("Recovery relay log info. Event %s on db %s "
+                            "sets cursor to pos %lu and was handled by worker(%d).",
+                            ev->get_type_str(), db, (ulong) my_b_tell(&log),
+                            found);
+      if (!found)
+      {
+        delete ev;
+        ev= NULL;
+        break;
+      }
+
+      res= ev->apply_event(rli);
+      delete ev;
+      ev= NULL;
+
+      if (res)
+        goto end;
+    }
+    if (!found)
+    {
+      sql_print_information("Before updating RLI "
+                            "group_relay_log_name %s, "
+                            "group_relay_log_pos %lu, "
+                            "group_master_log_name %s, "
+                            "group_master_lo_pos %lu.",
+                            rli->get_group_relay_log_name(),
+                            (ulong) rli->get_group_relay_log_pos(),
+                            rli->get_group_master_log_name(),
+                            (ulong) rli->get_group_master_log_pos());
+
+      rli->set_group_relay_log_pos(job.group_relay_log_pos);
+      rli->set_group_relay_log_name(job.group_relay_log_name);
+      rli->set_group_master_log_pos(job.group_master_log_pos);
+      rli->set_group_master_log_name(job.group_master_log_name);
+
+      sql_print_information("After updating RLI "
+                            "group_relay_log_name %s, "
+                            "group_relay_log_pos %lu, "
+                            "group_master_log_name %s, "
+                            "group_master_lo_pos %lu.",
+                            rli->get_group_relay_log_name(),
+                            (ulong) rli->get_group_relay_log_pos(),
+                            rli->get_group_master_log_name(),
+                            (ulong) rli->get_group_master_log_pos());
+    }
+  }
+  error= FALSE;
+
+end:
+  if (desc)
+  {
+    delete desc;
+    desc= NULL;
+  }
+
+  if (log_name)
+  {
+    end_io_cache(&log);
+    mysql_file_close(file, MYF(MY_WME));
+    log_name= NULL;
+  }
   delete_dynamic(&jobs);
-  
-  DBUG_RETURN(rli->flush_info(TRUE));
+
+  DBUG_RETURN(error ? error : rli->flush_info(TRUE));
 }
 
 /**
@@ -3838,7 +3985,7 @@ bool mts_recovery_routine(Relay_log_info
 
    @return FALSE success, TRUE otherwise
 */
-bool mts_checkpoint_routine(Relay_log_info *rli, ulong period, bool locked)
+bool mts_checkpoint_routine(Relay_log_info *rli, ulonglong period, bool locked)
 {
   ulong cnt;
   bool error= FALSE;
@@ -5642,9 +5789,9 @@ static Log_event* next_event(Relay_log_i
       /* 
          MTS checkpoint in the successful read branch 
       */
-      if (rli->is_parallel_exec() && rli->lwm_period != 0.0)
+      if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
       {
-        ulong period= static_cast<ulong>(rli->lwm_period * 1000000000UL);
+        ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
         mts_checkpoint_routine(rli, period, TRUE); // ALFRANIO --- WHAT TO DO with ERRORS?
       }
 
@@ -5767,11 +5914,11 @@ static Log_event* next_event(Relay_log_i
 
         const char* old_msg= thd->proc_info;
 
-        if (rli->is_parallel_exec() && rli->lwm_period != 0.0)
+        if (rli->is_parallel_exec() && mts_checkpoint_period != 0)
         {
           int ret= 0;
           struct timespec waittime;
-          ulong period= static_cast<ulong>(rli->lwm_period * 1000000000UL);
+          ulonglong period= static_cast<ulonglong>(mts_checkpoint_period * 1000000ULL);
           do
           {
             mts_checkpoint_routine(rli, period, FALSE); // ALFRANIO ERROR

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-12-09 13:23:19 +0000
+++ b/sql/sys_vars.cc	2010-12-10 12:10:20 +0000
@@ -3104,6 +3104,13 @@ static Sys_var_uint Sys_sync_relayloginf
        "synchronous flushing",
        GLOBAL_VAR(sync_relayloginfo_period), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1));
+
+static Sys_var_uint Sys_checkpoint_mts_period(
+       "mts_checkpoint_period", "Gather workers' activities and synchronously "
+       "flush relay log info to disk after every #th mili-seconds. Use 0 "
+       "(default) to disable checkpoint",
+       GLOBAL_VAR(mts_checkpoint_period), CMD_LINE(REQUIRED_ARG),
+       VALID_RANGE(0, UINT_MAX), DEFAULT(0), BLOCK_SIZE(1));
 #endif
 
 static Sys_var_uint Sys_sync_binlog_period(


Attachment: [text/bzr-bundle] bzr/alfranio.correia@oracle.com-20101210121020-da0dlx60unxbtxol.bundle
Thread
bzr push into mysql-next-mr-wl5569 branch (alfranio.correia:3240 to 3241)WL#5599Alfranio Correia10 Dec