List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:December 21 2010 7:35pm
Subject:bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3255)
View as plain text  
 3255 Andrei Elkin	2010-12-21 [merge]
      merging with repo

    modified:
      mysql-test/suite/funcs_1/r/is_columns_mysql.result
      scripts/mysql_system_tables.sql
      sql/rpl_info_table.cc
      sql/rpl_info_table_access.cc
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_slave.cc
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-12-16 21:41:45 +0000
+++ b/sql/log_event.h	2010-12-21 19:31:29 +0000
@@ -757,11 +757,11 @@ typedef struct st_print_event_info
   Such identifier is not yet unique generally as the event originating master
   is resetable. Also the crashed master can be replaced with some other.
 */
-struct event_coordinates
+typedef struct event_coordinates
 {
   char * file_name; // binlog file name (directories stripped)
   my_off_t  pos;       // event's position in the binlog file
-};
+} LOG_POS_COORD;
 
 /**
   @class Log_event

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-20 22:18:33 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-12-21 19:31:29 +0000
@@ -15,10 +15,14 @@ const char *info_slave_worker_fields []=
   "relay_log_pos",
   "master_log_name",
   "master_log_pos",
+
+  // todo: remove the next four
   "checkpoint_relay_log_name",
   "checkpoint_relay_log_pos",
   "checkpoint_master_log_name",
   "checkpoint_master_log_pos",
+
+  "checkpoint_seqno", // index of the last committed group in the bitmap
   "checkpoint_group_size",
   "checkpoint_group_bitmap"
 };
@@ -27,7 +31,7 @@ Slave_worker::Slave_worker(const char* t
                            Relay_log_info *rli)
   : Rpl_info_worker(type, pfs), c_rli(rli), curr_group_exec_parts(0),
   group_relay_log_pos(0), group_master_log_pos(0),
-  checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0),
+  checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
   inited_group_execed(0)
 {
   group_relay_log_name[0]= 0;
@@ -133,6 +137,7 @@ bool Slave_worker::read_info(Rpl_info_ha
   ulong temp_group_master_log_pos= 0;
   ulong temp_checkpoint_relay_log_pos= 0;
   ulong temp_checkpoint_master_log_pos= 0;
+  ulong temp_checkpoint_seqno= 0;
   ulong nbytes= 0;
   uchar *buffer= (uchar *) group_execed.bitmap;
 
@@ -159,6 +164,8 @@ bool Slave_worker::read_info(Rpl_info_ha
                      (char *) "") ||
       from->get_info((ulong *) &temp_checkpoint_master_log_pos,
                      (ulong) 0) ||
+      from->get_info((ulong *) &temp_checkpoint_seqno,
+                     (ulong) 0) ||
       from->get_info(&nbytes, (ulong) 0) ||
       from->get_info(buffer, (size_t) nbytes,
                      (uchar *) 0))
@@ -168,6 +175,7 @@ bool Slave_worker::read_info(Rpl_info_ha
   group_master_log_pos= temp_group_master_log_pos;
   checkpoint_relay_log_pos=  temp_checkpoint_relay_log_pos;
   checkpoint_master_log_pos= temp_checkpoint_master_log_pos;
+  checkpoint_seqno= temp_checkpoint_seqno;
 
   DBUG_RETURN(FALSE);
 }
@@ -188,6 +196,7 @@ bool Slave_worker::write_info(Rpl_info_h
       to->set_info((ulong) checkpoint_relay_log_pos) ||
       to->set_info(checkpoint_master_log_name) ||
       to->set_info((ulong) checkpoint_master_log_pos) ||
+      to->set_info((ulong) checkpoint_seqno) ||
       to->set_info(nbytes) ||
       to->set_info(buffer, (size_t) nbytes))
     DBUG_RETURN(TRUE);
@@ -224,7 +233,7 @@ bool Slave_worker::commit_positions(Log_
   }
   
   bitmap_set_bit(&group_execed, ptr_g->checkpoint_seqno);
-
+  checkpoint_seqno= ptr_g->checkpoint_seqno;
   group_relay_log_pos= ev->future_event_relay_log_pos;
   group_master_log_pos= ev->log_pos;
   strmake(group_master_log_name, c_rli->get_group_master_log_name(),

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-20 22:18:33 +0000
+++ b/sql/rpl_rli_pdb.h	2010-12-21 19:31:29 +0000
@@ -139,6 +139,7 @@ typedef struct st_slave_job_group
   do \
   { \
     to.worker_id= from->id; \
+    to.checkpoint_seqno= from->checkpoint_seqno; \
     to.group_master_log_pos= from->checkpoint_master_log_pos; \
     to.group_master_log_name= from->checkpoint_master_log_name; \
     to.group_relay_log_pos= from->checkpoint_relay_log_pos; \
@@ -253,10 +254,14 @@ public:
   ulonglong group_relay_log_pos;
   char group_master_log_name[FN_REFLEN];
   ulonglong group_master_log_pos;
+
+  // todo: remove
   char checkpoint_relay_log_name[FN_REFLEN];
   ulonglong checkpoint_relay_log_pos;
+
   char checkpoint_master_log_name[FN_REFLEN];
   ulonglong checkpoint_master_log_pos;
+  ulong checkpoint_seqno;
 
   int init_info();
   void end_info();

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-12-21 14:11:05 +0000
+++ b/sql/rpl_slave.cc	2010-12-21 19:34:58 +0000
@@ -3791,69 +3791,185 @@ err:
 /**
    Orders jobs by comparing relay log information.
 */
+#if 0
 int mts_recovery_cmp(Slave_job_group *id1, Slave_job_group *id2)
+#endif
+
+int mts_event_coord_cmp(LOG_POS_COORD *id1, LOG_POS_COORD *id2)
 {
-  longlong filecmp= strcmp(id1->group_relay_log_name, id2->group_relay_log_name);
-  longlong poscmp= id1->group_relay_log_pos - id2->group_relay_log_pos;
+  longlong filecmp= strcmp(id1->file_name, id2->file_name);
+  longlong poscmp= id1->pos - id2->pos;
   return (filecmp < 0  ? -1 : (filecmp > 0  ?  1 :
          (poscmp  < 0  ? -1 : (poscmp  > 0  ?  1 : 0))));
 }
 
 bool mts_recovery_groups(Relay_log_info *rli, MY_BITMAP *groups)
 { 
-  Log_event *ev= NULL, *desc= NULL;
-  char *log_name= NULL;
+  Log_event *ev= NULL; // , *desc= NULL;
+  const char *log_name= NULL;
   const char *errmsg= NULL;
   bool error= FALSE;
-  DYNAMIC_ARRAY jobs;
-  uint group_worker_counter;
-  uint group_lwm_counter;
+  DYNAMIC_ARRAY above_lwm_jobs;
   bool curr_group_seen_begin= FALSE;
   Slave_job_group job_worker;
   Slave_job_group job_file;
   IO_CACHE log;
   File file;
   MY_STAT s;
-  longlong filecmp= 0;
-  longlong poscmp= 0;
+
+  LOG_POS_COORD cp=
+  {
+    (char *) rli->get_group_master_log_name(),
+    rli->get_group_master_log_pos()
+  };
 
   DBUG_ENTER("mts_recovery_groups");
   DBUG_ASSERT(rli->recovery_parallel_workers > 0);
 
   /*
-    Gathers information on workers and stores it in jobs ordered by 
-    relay log information.
+    Gathers information on valuable workers and stores it in 
+    above_lwm_jobs in asc ordered by the master binlog coordinates.
   */
-  my_init_dynamic_array(&jobs, sizeof(Slave_job_group),
+  my_init_dynamic_array(&above_lwm_jobs, sizeof(Slave_job_group),
                         rli->recovery_parallel_workers, rli->recovery_parallel_workers);
+
   for (uint id= 0; id < rli->recovery_parallel_workers; id++)
   {
     Slave_worker *worker=
       Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
     worker->init_info();
     retrieve_job(worker, job_file);
+    LOG_POS_COORD w_last= {worker->group_master_log_name, worker->group_master_log_pos};
+    if (mts_event_coord_cmp(&w_last, &cp) > 0)
+      insert_dynamic(&above_lwm_jobs, (uchar*) &job_file);
+    else
+      delete worker;
+  };
+
+#if 0
+  for (uint id= 0; id < rli->slave_parallel_workers; id++)
+  {
+    Slave_worker *worker=
+      Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
+    worker->init_info();
+    retrieve_job(worker, job_file);
     /*
       This avoids gathering information on workers that haven't
       processed anything.
     */
+
+    // TODO: disregard W_i | W_i->coord < LWM
+
     if (job_file.group_relay_log_name != NULL && strcmp(job_file.group_relay_log_name, "") &&
         job_file.group_master_log_name != NULL && strcmp(job_file.group_master_log_name, ""))
-      insert_dynamic(&jobs, (uchar*) &job_file);
+      insert_dynamic(&above_lwm_jobs, (uchar*) &job_file);
   }
-  sort_dynamic(&jobs, (qsort_cmp) mts_recovery_cmp);
+#endif
 
+  sort_dynamic(&above_lwm_jobs, (qsort_cmp) mts_event_coord_cmp);
   /*
-    In what follows, the group bitmap is constructed.
+    In what follows, the group Recovery Bitmap is constructed.
+
+     seek(lwm);
+
+     while(w= next(above_lwm_w))
+       do
+         read G
+         if G == w->last_comm
+           w.B << group_cnt++;
+           RB |= w.B;
+            break;
+         else
+           group_cnt++;
+        while(!eof);
+        continue;
   */
   Format_description_log_event fdle(BINLOG_VERSION);
   if (!fdle.is_valid())
-    goto end;
+  {
+    error= TRUE;
+    goto err;
+  }
 
-  for (uint it_job= 0; it_job < jobs.elements; it_job++)
+  log_name= const_cast<Relay_log_info*>(rli)->get_group_relay_log_name();
+  if ((file= open_binlog(&log, log_name, &errmsg)) < 0)
+  {
+    error= TRUE;
+    sql_print_error("%s", errmsg);
+    goto err;
+  }
+  DBUG_ASSERT(my_stat(log_name, &s, MYF(0))); // TODO: Alfranio, why my_stat?
+
+  my_b_seek(&log, (my_off_t) rli->get_group_relay_log_pos());
+
+  bitmap_clear_all(groups);
+
+  for (uint it_job= 0, group_worker_counter= 0; it_job < above_lwm_jobs.elements; it_job++)
+  {
+    Slave_worker *w= ((Slave_job_group *)
+                      dynamic_array_ptr(&above_lwm_jobs, it_job))->worker;
+    LOG_POS_COORD w_last= { w->group_master_log_name, w->group_master_log_pos };
+
+    sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
+                          "group_relay_log_name %s, group_relay_log_pos %lu "
+                          "group_master_log_name %s, group_master_log_pos %lu",
+                          w->id,
+                          w->group_relay_log_name,
+                          (ulong) w->group_relay_log_pos,
+                          w->group_master_log_name,
+                          (ulong) w->group_master_log_pos);
+
+    // TODO: extend to handle sequence of relay logs (read(ev) -> EOF)
+
+    while ((ev= Log_event::read_log_event(&log, 0, &fdle,
+                                          opt_master_verify_checksum)))
+    {
+        DBUG_ASSERT(ev->is_valid());
+        DBUG_ASSERT(group_worker_counter < rli->checkpoint_group);
+
+        // TODO: relax condition to allow --mts_exp_run_query_in_parallel= 1
+        if (ev->starts_group())
+          curr_group_seen_begin= TRUE;
+        else
+          if (ev->ends_group())
+          {
+            int ret;
+            LOG_POS_COORD ev_coord= { (char *) rli->get_group_master_log_name(),
+                                      ev->log_pos };
+            if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
+            {
+              // hit it
+              // w.B << group_cnt++;
+              // RB |= w.B;
+              for (uint i= w->checkpoint_seqno - group_worker_counter, j= 0;
+                   i <= w->checkpoint_seqno; i++, j++)
+              {
+                //bitmap_intersect(&rli->groups, &w->group_execed);
+                if (_bitmap_is_set(&w->group_execed, i))
+                  bitmap_fast_test_and_set(groups, j);
+              }
+              group_worker_counter++;
+              delete ev;
+              ev= NULL;
+              break;
+            }
+            else
+            {
+              DBUG_ASSERT(ret < 0);
+              group_worker_counter++;
+            }
+          }
+        delete ev;
+        ev= NULL;
+    }
+  }
+
+#if 0
+  for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
   {
     group_worker_counter= 0;
     group_lwm_counter= 0;
-    get_dynamic(&jobs, (uchar *) &job_worker, it_job);
+    get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
 
     sql_print_information("Recoverying relay log info based on Worker-Id %lu, "
                           "group_relay_log_name %s, group_relay_log_pos %lu "
@@ -3864,9 +3980,9 @@ bool mts_recovery_groups(Relay_log_info 
                           job_worker.group_master_log_name,
                           (ulong) job_worker.group_master_log_pos);
 
-    for (uint it_file= 0; it_file < jobs.elements; it_file++)
+    for (uint it_file= 0; it_file < above_lwm_jobs.elements; it_file++)
     {
-      get_dynamic(&jobs, (uchar *) &job_file, it_file);
+      get_dynamic(&above_lwm_jobs, (uchar *) &job_file, it_file);
 
       /*
         Either the current relay log file was already processed by the
@@ -3959,21 +4075,29 @@ end:
     desc= NULL;
   }
 
+
   if (log_name)
   {
     end_io_cache(&log);
     mysql_file_close(file, MYF(MY_WME));
     log_name= NULL;
   }
+#endif
 
-  for (uint it_job= 0; it_job < jobs.elements; it_job++)
+  end_io_cache(&log);
+  mysql_file_close(file, MYF(MY_WME));
+  log_name= NULL;
+
+err:
+  
+  for (uint it_job= 0; it_job < above_lwm_jobs.elements; it_job++)
   {
-    get_dynamic(&jobs, (uchar *) &job_worker, it_job);
+    get_dynamic(&above_lwm_jobs, (uchar *) &job_worker, it_job);
     job_worker.worker->end_info();
     delete job_worker.worker;
   }
 
-  delete_dynamic(&jobs);
+  delete_dynamic(&above_lwm_jobs);
 
   DBUG_RETURN(error);
 }


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101221193458-dha63d1348f3trsz.bundle
Thread
bzr push into mysql-next-mr-wl5569 branch (andrei.elkin:3255) Andrei Elkin21 Dec