List:Commits« Previous MessageNext Message »
From:Alfranio Correia Date:September 16 2010 7:29pm
Subject:bzr commit into mysql-next-mr-rpl-merge branch (alfranio.correia:3037)
View as plain text  
#At file:///home/acorreia/workspace.sun/repository.mysql.new/bzrwork/wl-5563/mysql-next-mr.crash-safe/ based on revid:alfranio.correia@stripped

 3037 Alfranio Correia	2010-09-16
      Reported errors while getting a valid worker to execut events.

    modified:
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-09-15 17:26:44 +0000
+++ b/sql/log_event.cc	2010-09-16 19:29:26 +0000
@@ -2107,7 +2107,7 @@ Log_event::continue_group(Relay_log_info
    @return index \in [0, M] range, where M is the max index of the worker pool.
 */
 
-uint Log_event::get_slave_worker_id(Relay_log_info const *rli)
+Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
 {
   if (!(get_type_code() == XID_EVENT  ||
     get_type_code() == INTVAR_EVENT ||
@@ -2117,17 +2117,15 @@ uint Log_event::get_slave_worker_id(Rela
     get_type_code() == UPDATE_ROWS_EVENT ||
     get_type_code() == WRITE_ROWS_EVENT))
   {
-    // TODO --- ANDREI, please you need to take care of errors at this point
-    // worker == NULL means that it was not possible to get an worker.
     Slave_worker *worker= get_slave_worker(get_db(), rli->workers);
-    const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker->id;
+    const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker;
   }
 
   return (rli->last_assigned_worker);
 }
 
-void append_item_to_jobs(struct slave_job_item *job_item,
-                          struct slave_worker *w, Relay_log_info *rli)
+void append_item_to_jobs(slave_job_item *job_item,
+                         Slave_worker *w, Relay_log_info *rli)
 {
   THD *thd= rli->info_thd;
   DBUG_ASSERT(thd == current_thd);
@@ -2142,17 +2140,6 @@ void append_item_to_jobs(struct slave_jo
     const char *old_msg;
     //   Coordinator needs to wait to avoid the pending jobs queue overrun
     rli->wait_jobs++;
-    if (rli->wait_jobs % 100 == 0)  // debug
-    {
-      ulong curr_jobs[256];
-      for (uint i= 0; i < rli->workers.elements; i++)
-      {
-        struct slave_worker *w_i;
-        get_dynamic(&rli->workers, (uchar*) &w_i, i);
-        curr_jobs[i]= w_i->curr_jobs;
-      }
-      sql_print_warning("Parallel Slave: pending jobs get to the max: pending %d, stmts (%lu, %lu), waits %lu; curr worker %lu; workers: %lu, %lu, %lu, %lu", rli->pending_jobs, rli->stmt_jobs, rli->trans_jobs, rli->wait_jobs, w->id,curr_jobs[0], curr_jobs[1], curr_jobs[2], curr_jobs[3]);
-    }
     old_msg= thd->enter_cond(&rli->pending_jobs_cond,
                                       &rli->pending_jobs_lock,
                                       "Waiting for an event from sql thread");
@@ -2193,24 +2180,21 @@ int Log_event::apply_event(Relay_log_inf
 }
 #else
 {
-  uint i_w;
   DBUG_ENTER("LOG_EVENT:apply_event");
-  struct slave_worker *w;
+  Slave_worker *w= NULL;
   struct slave_job_item *job_item= NULL;
 
   if (!rli->is_parallel_exec() || only_serial_exec())
     DBUG_RETURN(do_apply_event(rli));
+ 
+  if (!(w= get_slave_worker_id(rli)) ||
+      DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
+    DBUG_RETURN(TRUE);
 
-  /* mst-II TODO: to convert into hashing per db name */
-
-  i_w= get_slave_worker_id(rli);
-  get_dynamic((DYNAMIC_ARRAY*) &rli->workers, (uchar*) &w, i_w);
-
-  /* preparing task: get transport, fill it in data */
   mysql_mutex_lock(&const_cast<Relay_log_info*>(rli)->pending_jobs_lock);
   job_item= const_cast<Relay_log_info*>(rli)->free_jobs.pop();
   mysql_mutex_unlock(&const_cast<Relay_log_info*>(rli)->pending_jobs_lock);
-  DBUG_PRINT("Log_event::apply_event:", ("-> job item: %p data %p to W_%d", job_item, this, i_w));
+  DBUG_PRINT("Log_event::apply_event:", ("-> job item: %p data %p to W_%lu", job_item, this, w->id));
   if (!job_item)
   { 
     /*
@@ -2218,13 +2202,14 @@ int Log_event::apply_event(Relay_log_inf
        0. statistics 
        1. reallocate... or shrink back not letting the free jobs grow over
     */
-    job_item= (struct slave_job_item *)
+    if (!(job_item= (struct slave_job_item *)
       alloc_root(&const_cast<Relay_log_info*>(rli)->job_list_mem_root,
-                 sizeof(struct slave_job_item));
+                 sizeof(struct slave_job_item))))
+      DBUG_RETURN(TRUE);
   }
   job_item->data= this;
   append_item_to_jobs(job_item, w, const_cast<Relay_log_info*>(rli));
-  DBUG_RETURN(0);
+  DBUG_RETURN(FALSE);
 }
 
 #endif
@@ -2236,7 +2221,7 @@ int Log_event::apply_event(Relay_log_inf
    @return NULL failure or
            a-pointer to an item.
 */
-struct slave_job_item* pop_jobs_item(struct slave_worker *w)
+struct slave_job_item* pop_jobs_item(Slave_worker *w)
 {
   struct slave_job_item *job_item= NULL;
   THD *thd= w->thd;
@@ -2270,7 +2255,7 @@ struct slave_job_item* pop_jobs_item(str
   @return 0 success 
          -1 got killed or an error happened during appying
 */
-int slave_worker_exec_job(struct slave_worker *w, Relay_log_info *rli)
+int slave_worker_exec_job(Slave_worker *w, Relay_log_info *rli)
 {
   int error= 0;
   struct slave_job_item *job_item;

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-09-15 01:39:03 +0000
+++ b/sql/log_event.h	2010-09-16 19:29:26 +0000
@@ -623,6 +623,7 @@ class THD;
 
 class Format_description_log_event;
 class Relay_log_info;
+struct Slave_worker;
 
 #ifdef MYSQL_CLIENT
 enum enum_base64_output_mode {
@@ -1157,7 +1158,7 @@ public:
              to be assigned worker;
              M is the max index of the worker pool.
   */
-  uint get_slave_worker_id(Relay_log_info const *rli);
+  Slave_worker *get_slave_worker_id(Relay_log_info const *rli);
 
   /**
      Apply the event to the database.

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-09-15 17:26:44 +0000
+++ b/sql/rpl_rli.cc	2010-09-16 19:29:26 +0000
@@ -97,8 +97,11 @@ Relay_log_info::Relay_log_info(bool is_s
   */
   slave_pending_jobs_max= 50000; // TODO: convert into a param
 
-  // TODO -- ANDREI --- You need to move this to a init routine because it may fail.
-  my_init_dynamic_array(&workers, sizeof(struct slave_worker *), slave_parallel_workers, 4);
+  //
+  // TODO -- ANDREI --- You need to take care of possible failures related to
+  //                    the dynamic array.
+  //
+  my_init_dynamic_array(&workers, sizeof(Slave_worker *), slave_parallel_workers, 4);
   init_hash_workers(slave_parallel_workers);
 
   DBUG_VOID_RETURN;
@@ -114,9 +117,10 @@ Relay_log_info::~Relay_log_info()
 
   mysql_mutex_destroy(&pending_jobs_lock);
   mysql_cond_destroy(&pending_jobs_cond);
-
+  //
   // ANDREI ---- See comment on the init routine in the constructor.
   //free_root(&jobs_mem_root, MYF(0));
+  //
   destroy_hash_workers();
   free_root(&job_list_mem_root, MYF(0));
   delete_dynamic(&workers);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-09-15 17:26:44 +0000
+++ b/sql/rpl_rli.h	2010-09-16 19:29:26 +0000
@@ -39,7 +39,7 @@ struct slave_job_item
   void *data;
 };
 
-typedef struct slave_worker
+typedef struct Slave_worker
 {
   // operational
   THD *thd;
@@ -320,7 +320,7 @@ public:
   mysql_mutex_t pending_jobs_lock;
   mysql_cond_t pending_jobs_cond;
   int   slave_pending_jobs_max;
-  ulong  last_assigned_worker; // a hint to partitioning func for some events
+  Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   /*
     Keeper of `free_jobs'. Items in `free_jobs' migrate to workers' assignement
     private queues, processed and become back free. At the end of event execution

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-09-15 17:26:44 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-09-16 19:29:26 +0000
@@ -5,7 +5,8 @@
 #include "sql_string.h"
 #include <hash.h>
 
-HASH mapping_db_to_worker;
+static HASH mapping_db_to_worker;
+static bool inited_hash_workers= FALSE;
 
 extern "C" uchar *get_key(const uchar *record, size_t *length,
                           my_bool not_used __attribute__((unused)))
@@ -35,29 +36,35 @@ static void free_entry(db_worker *entry)
 }
 
 
-int init_hash_workers(ulong slave_parallel_workers)
+bool init_hash_workers(ulong slave_parallel_workers)
 {
   DBUG_ENTER("init_hash_workers");
-  DBUG_RETURN (my_hash_init(&mapping_db_to_worker, &my_charset_bin,
-                            0, 0, 0, get_key,
-                            (my_hash_free_key) free_entry, 0) != 0);
+  inited_hash_workers=
+    (my_hash_init(&mapping_db_to_worker, &my_charset_bin,
+                 0, 0, 0, get_key,
+                 (my_hash_free_key) free_entry, 0) == 0);
+  DBUG_RETURN (!inited_hash_workers);
 }
 
 void destroy_hash_workers()
 {
   DBUG_ENTER("destroy_hash_workers");
-  my_hash_free(&mapping_db_to_worker);
+  if (inited_hash_workers)
+    my_hash_free(&mapping_db_to_worker);
   DBUG_VOID_RETURN;
 }
 
 Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers)
 {
+  DBUG_ENTER("get_slave_worker");
+
+  if (!inited_hash_workers)
+    DBUG_RETURN(NULL);
+
   db_worker *entry= NULL;
   my_hash_value_type hash_value;
   uint dblength= (uint) strlen(dbname);
 
-  DBUG_ENTER("get_slave_worker");
-
   DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
   /*
     The database name was not found which means that a worker never

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-09-15 17:26:44 +0000
+++ b/sql/rpl_rli_pdb.h	2010-09-16 19:29:26 +0000
@@ -9,7 +9,7 @@
     Slave_worker *worker;
   } typedef db_worker;
 
-  int init_hash_workers(ulong slave_parallel_workers);
+  bool init_hash_workers(ulong slave_parallel_workers);
   void destroy_hash_workers();
   Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
   Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-09-15 17:26:44 +0000
+++ b/sql/rpl_slave.cc	2010-09-16 19:29:26 +0000
@@ -165,7 +165,7 @@ static int terminate_slave_thread(THD *t
                                   volatile uint *slave_running,
                                   bool skip_lock);
 static bool check_io_slave_killed(THD *thd, Master_info *mi, const char *info);
-int slave_worker_exec_job(struct slave_worker * w, Relay_log_info *rli);
+int slave_worker_exec_job(Slave_worker * w, Relay_log_info *rli);
 
 /*
   Find out which replications threads are running
@@ -3402,7 +3402,7 @@ int check_temp_dir(char* tmp_file)
 pthread_handler_t handle_slave_worker(void *arg)
 {
   THD *thd;                     /* needs to be first for thread_stack */
-  struct slave_worker *w;
+  Slave_worker *w;
   int error= 0;
 
   Relay_log_info* rli = ((Master_info*)arg)->rli;
@@ -3463,7 +3463,7 @@ int slave_start_workers(Relay_log_info *
   int error= 0;
   for (i= 0; i < n; i++)
   {
-    struct slave_worker *w= new struct slave_worker;
+    Slave_worker *w= new Slave_worker;
     w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
     w->id= i;
     w->current_table= NULL;
@@ -3477,7 +3477,7 @@ int slave_start_workers(Relay_log_info *
   for (i= 0; i < rli->workers.elements; i++)
   {
     pthread_t th;
-    struct slave_worker *w;
+    Slave_worker *w;
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
     mysql_mutex_lock(&rli->pending_jobs_lock);
     rli->pending_jobs= i + 1;
@@ -3505,7 +3505,7 @@ void slave_stop_workers(Relay_log_info *
   int i;
   for (i= rli->workers.elements - 1; i >= 0; i--)
   {
-    struct slave_worker *w;
+    Slave_worker *w;
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
     mysql_mutex_lock(&rli->pending_jobs_lock);
     rli->pending_jobs= 1;


Attachment: [text/bzr-bundle] bzr/alfranio.correia@oracle.com-20100916192926-lyvc290hy1t9vlw1.bundle
Thread
bzr commit into mysql-next-mr-rpl-merge branch (alfranio.correia:3037) Alfranio Correia16 Sep