List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:November 25 2010 9:14am
Subject:bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3217) WL#5569
WL#5599
View as plain text  
 3217 Andrei Elkin	2010-11-25 [merge]
      wl#5569 
      merging with wl#5599 piece of code

    added:
      mysql-test/suite/rpl/t/rpl_parallel-slave.opt
      mysql-test/suite/rpl/t/rpl_parallel_start_stop-slave.opt
      sql/rpl_info_dummy.cc
      sql/rpl_info_dummy.h
    modified:
      sql/CMakeLists.txt
      sql/Makefile.am
      sql/mysqld.cc
      sql/rpl_info.cc
      sql/rpl_info.h
      sql/rpl_info_factory.cc
      sql/rpl_mi.cc
      sql/rpl_mi.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2010-11-20 17:23:42 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2010-11-25 08:47:39 +0000
@@ -79,8 +79,24 @@ CREATE TABLE t1 (a int primary key);
 
 insert into t1 values (1),(2);
 
-#connection slave;
-sync_slave_with_master;
+#
+# todo: remove when recovery recovers `sync_slave_with_master'
+#
+
+--sleep 3
+
+--disable_result_log
+--disable_query_log
+#select sleep(600);
+--enable_result_log
+--enable_query_log
+
+connection slave;
+# sync_slave_with_master;
+let $count= 2;
+let $table= t1;
+source include/wait_until_rows_count.inc;
+
 # create an offending record
 insert into t1 values (3);
 
@@ -95,6 +111,12 @@ let $count= 0;
 let $table= worker_proc_list;
 source include/wait_until_rows_count.inc;
 
+--disable_result_log
+--disable_query_log
+#select sleep(600);
+--enable_result_log
+--enable_query_log
+
 source include/wait_for_slave_sql_to_stop.inc;
 delete from t1;
 
@@ -105,8 +127,14 @@ source include/start_slave.inc;
 connection master;
 drop table t1;
 
-#connection slave;
-sync_slave_with_master;
+#
+# todo: remove when recovery recovers `sync_slave_with_master'
+#
+
+--sleep 3
+
+connection slave;
+#sync_slave_with_master;
 
 drop view worker_proc_list;
 drop view coord_proc_list;

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-22 18:57:13 +0000
+++ b/sql/log_event.cc	2010-11-25 08:47:39 +0000
@@ -2202,52 +2202,107 @@ bool Log_event::contains_partition_info(
 Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
 {
   Slave_worker *worker= NULL;
-  /* checking properties and perform corresponding actions */
+  Slave_job_group g;
+  bool is_b_event;
 
-  // g
-  if (contains_partition_info())
-  {
-    // a lot of things inside `get_slave_worker_id'
-    worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
-    const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker;
-  }
+  /* checking properties and perform corresponding actions */
 
-  // B
-  if (starts_group())
+  // B or a DDL
+  if ((is_b_event= starts_group()) || !rli->curr_group_seen_begin)
   {
     // insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
     // Group_cnt= rli->gaq->e;
     // rli->gaq->en_queue({NULL, W_s}); 
-    Slave_job_group gaq_item=
-      {
-        //{NULL, log_pos},
-        log_pos,
+    g= {
+      log_pos,
+      (ulong) -1,
+      const_cast<Relay_log_info*>(rli)->mts_total_groups++
+    };
 
-        worker->id,   // todo: -> NULL and implement set_dynamic in DA
-        
-        const_cast<Relay_log_info*>(rli)->mts_total_groups++};
-
-    rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &gaq_item);
+    // the last occupied GAQ's array index
+    rli->gaq->assigned_group_index= rli->gaq->en_queue((void *) &g);
 
     DBUG_ASSERT(rli->gaq->assigned_group_index != (ulong) -1); // gaq must have room
+    DBUG_ASSERT(rli->last_assigned_worker == NULL);
+    if (is_b_event)
+    {
+      Log_event *ptr_curr_ev= this;
+      // B-event is appended to the Deferred Array associated with GCAP
+      insert_dynamic(&const_cast<Relay_log_info*>(rli)->curr_group_da,
+                     (uchar*) &ptr_curr_ev);
+
+      DBUG_ASSERT(rli->curr_group_da.elements == 1);
+
+      // mark the current grup as started with B-event
+      const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= is_b_event;
+      return NULL;
+    } 
+    else 
+    { 
+      DBUG_ASSERT(!rli->curr_group_seen_begin);
+    }
+  }
+
+  //else // g
 
-    // B-event is appended to the Deferred Array associated with GCAP
-    // TODO: refine  contains_partition_info() to not include BEGIN
+  if (contains_partition_info())
+  {
+    // a lot of things inside `get_slave_worker_id'
+    const_cast<Relay_log_info *>(rli)->last_assigned_worker=
+      worker= get_slave_worker(get_db(), const_cast<Relay_log_info *>(rli));
+    get_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+    if (g.worker_id == (ulong) -1)  // assign "offically" the current group
+    {
+      g.worker_id= worker->id;       // todo/fixme: think of Slave_worker* here
+      set_dynamic(&rli->gaq->Q, (uchar*) &g, rli->gaq->assigned_group_index);
+    }
   }
+  else // r
+    if (rli->last_assigned_worker)
+    {
+      worker= rli->last_assigned_worker;
+      
+      DBUG_ASSERT(rli->curr_group_assigned_parts.elements > 0); // g must've done
+    }
+    else // p
+    {
+      Log_event *ptr_curr_ev= this;
+
+      // TODO: assert possible event types
+
+      insert_dynamic(&const_cast<Relay_log_info*>(rli)->curr_group_da,
+                     (uchar*) &ptr_curr_ev);
+      
+      DBUG_ASSERT(rli->curr_group_da.elements > 0);
+    }
 
   // T
-  if (ends_group())
+  if (ends_group() || !rli->curr_group_seen_begin)
   {
+    uint i;
     //  assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
     mts_group_cnt= rli->gaq->assigned_group_index;
+    
+    DBUG_ASSERT(worker == rli->last_assigned_worker);
+    if (!worker)
+    {
+      // a very special case of the empty group: {B, T}
+      DBUG_ASSERT(rli->curr_group_assigned_parts.elements == 0
+                  && rli->curr_group_da.elements == 1);
+      worker= get_slave_worker("", const_cast<Relay_log_info *>(rli));
+    }
+    
+    // CGAP cleanup
+    for (i= rli->curr_group_assigned_parts.elements; i > 0; i--)
+      delete_dynamic_element(&const_cast<Relay_log_info*>(rli)->
+                             curr_group_assigned_parts, i - 1);
+    const_cast<Relay_log_info*>(rli)->last_assigned_worker= NULL;
 
-    // *TODO* cleanup: CGAP := nil
+    // reset the B-group marker
+    const_cast<Relay_log_info*>(rli)->curr_group_seen_begin= FALSE;
   }
-
-  // todo: p. the first p-event clears CGAP. Each p-event is appened to DA.
-
-  // r
-  return (rli->last_assigned_worker);
+  
+  return worker;
 }
 
 static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
@@ -2385,29 +2440,45 @@ void append_item_to_jobs(slave_job_item 
 */
 int Log_event::apply_event(Relay_log_info const *rli)
 {
+  uint i;
   DBUG_ENTER("LOG_EVENT:apply_event");
   Slave_worker *w= NULL;
   Slave_job_item item= {NULL}, *job_item= &item;
+  Relay_log_info *c_rli= const_cast<Relay_log_info*>(rli);  // constless alias
 
   if (!rli->is_parallel_exec() || only_serial_exec() /* || wait(APH.N == 0) */)
     DBUG_RETURN(do_apply_event(rli));
  
-  if (!(w= get_slave_worker_id(rli)) ||
-      DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0))
-    DBUG_RETURN(TRUE);
+  if ((!(w= get_slave_worker_id(rli)) ||
+       DBUG_EVALUATE_IF("fault_injection_get_slave_worker", 1, 0)))
+    DBUG_RETURN(rli->curr_group_assigned_parts.elements == 0 ? FALSE : TRUE);
 
   job_item->data= this;
 
   DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
 
-  // TODO: MTS-APH  DA (BEGIN)
-  // for (i= 0; i < CGDA.elements; i++)
-  // {
-  //   append_item_to_jobs(job_item, CGDA[i], const_cast<Relay_log_info*>(rli));
-  // }
-  // todo: resize if elements > min
-  // CGDA.elements= 0;
-  append_item_to_jobs(job_item, w, const_cast<Relay_log_info*>(rli));
+  if (rli->curr_group_da.elements > 0)
+  {
+    // the current event sorted out which partion the current group belongs to.
+    // It's time now to processed deferred array events.
+    for (i= 0; i < rli->curr_group_da.elements; i++)
+    { 
+      Slave_job_item da_item;
+      get_dynamic(&c_rli->curr_group_da, (uchar*) &da_item.data, i);
+      append_item_to_jobs(&da_item, w, c_rli);
+    }
+    
+    if (rli->curr_group_da.elements > rli->curr_group_da.max_element)
+    {
+      DBUG_ASSERT(rli->curr_group_da.max_element < rli->curr_group_da.elements);
+      c_rli->curr_group_da.elements= rli->curr_group_da.max_element;
+      c_rli->curr_group_da.max_element= 0;
+      freeze_size(&c_rli->curr_group_da); // restores max_element
+    }
+    c_rli->curr_group_da.elements= 0;     // to keep max_element-based allocation
+  }
+
+  append_item_to_jobs(job_item, w, c_rli);
   DBUG_RETURN(FALSE);
 }
 
@@ -2486,20 +2557,42 @@ int slave_worker_exec_job(Slave_worker *
 
   if (ev->starts_group())
   {
-    // ... ? do nothing ?
-  }
-  if (ev->contains_partition_info())
-  {
-    // TODO: w->CGEP .= ev->get_db()
-  }
-  if (ev->ends_group())
+    w->curr_group_seen_begin= TRUE; // The current group is started with B-event
+  } 
+  else
   {
-    w->slave_worker_ends_group(ev->mts_group_cnt);
+    if (ev->contains_partition_info())
+    {
+      DYNAMIC_ARRAY *ep= &w->curr_group_exec_parts;
+      uint i;
+      char key[NAME_LEN + 2];
+      bool found= FALSE;
+      const char *dbname= ev->get_db();
+      uchar dblength= (uint) strlen(dbname);
+      
+      for (i= 0; i < ep->elements && !found; i++)
+      {
+        get_dynamic(ep, (uchar*) key, i);
+        found=
+          (key[0] == dblength) &&
+          (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0);
+      }
+      if (!found)
+      {
+        key[0]= dblength;
+        memcpy(key + 1, dbname, dblength + 1);
+        insert_dynamic(ep, (uchar*) key);
+      }
+    }
   }
-
   w->w_rli->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
   error= ev->do_apply_event(rli);
 
+  if (ev->ends_group() || !w->curr_group_seen_begin)
+  {
+    w->slave_worker_ends_group(ev->mts_group_cnt, error); /* last done sets post exec */
+  }
+
   mysql_mutex_lock(&w->jobs_lock);
   de_queue(&w->jobs, job_item);
   /*
@@ -2507,7 +2600,8 @@ int slave_worker_exec_job(Slave_worker *
     todo: convert update_pos(w->w_rli) -> update_pos(w)
           to remove w_rli w/a
   */
-  ev->update_pos(w->w_rli);
+  if (!error)
+    ev->update_pos(w->w_rli);
 
   // delete ev;  // after ev->update_pos() event is garbage
 

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-11-23 09:03:37 +0000
+++ b/sql/rpl_rli.cc	2010-11-25 09:03:54 +0000
@@ -68,7 +68,7 @@ Relay_log_info::Relay_log_info(bool is_s
    until_log_pos(0), retried_trans(0),
    tables_to_lock(0), tables_to_lock_count(0),
    rows_query_ev(NULL), last_event_start_time(0),
-   this_worker(NULL),
+   this_worker(NULL), slave_worker_is_error(NULL),
    sql_delay(0), sql_delay_end(0),
    m_flags(0)
 {
@@ -147,7 +147,7 @@ Relay_log_info::~Relay_log_info()
 /**
    The method can be run both by C having the Main (coord) rli context and
    by W having both the main and the Local (worker) rli context.
-   Decistion matrix:
+   Decision matrix:
 
        Main   Local
      -+-------------

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-23 09:03:37 +0000
+++ b/sql/rpl_rli.h	2010-11-25 09:03:54 +0000
@@ -428,12 +428,22 @@ public:
   Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
   Slave_committed_queue *gaq;
   DYNAMIC_ARRAY curr_group_assigned_parts; // CGAP
+  DYNAMIC_ARRAY curr_group_da;   // deferred array to hold part-info-free events
+  bool curr_group_seen_begin;     // current group started with B-event or not
+  volatile Slave_worker* slave_worker_is_error;
 
   Slave_worker* get_current_worker() const;
   Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
-  ulong least_occupied_worker; // ... todo: APH ...
+
+  /* 
+     A sorted array of Worker current assignements number to provide
+     approximate view on Workers loading.
+     The first row of the least occupied Worker is queried at assigning 
+     a new partition. Is updated at checkpoint commit to the main RLI.
+  */
+  DYNAMIC_ARRAY least_occupied_workers;
 
   /**
     Helper function to do after statement completion.

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-11-19 14:51:58 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-11-25 08:47:39 +0000
@@ -155,6 +155,12 @@ size_t Slave_worker::get_number_worker_f
 static HASH mapping_db_to_worker;
 static bool inited_hash_workers= FALSE;
 
+PSI_mutex_key key_mutex_slave_worker_hash;
+PSI_cond_key key_cond_slave_worker_hash;
+static  mysql_mutex_t slave_worker_hash_lock;
+static  mysql_cond_t slave_worker_hash_cond;
+
+
 extern "C" uchar *get_key(const uchar *record, size_t *length,
                           my_bool not_used __attribute__((unused)))
 {
@@ -175,17 +181,20 @@ static void free_entry(db_worker *entry)
 
   DBUG_PRINT("info", ("free_entry %s, %d", entry->db, strlen(entry->db)));
 
-  entry->worker->usage_partition--;
   my_free((void *) entry->db);
   my_free(entry);
 
   DBUG_VOID_RETURN;
 }
 
-
 bool init_hash_workers(ulong slave_parallel_workers)
 {
   DBUG_ENTER("init_hash_workers");
+
+  mysql_mutex_init(key_mutex_slave_worker_hash, &slave_worker_hash_lock,
+                   MY_MUTEX_INIT_FAST);
+  mysql_cond_init(key_cond_slave_worker_hash, &slave_worker_hash_cond, NULL);
+
   inited_hash_workers=
     (my_hash_init(&mapping_db_to_worker, &my_charset_bin,
                  0, 0, 0, get_key,
@@ -198,6 +207,9 @@ void destroy_hash_workers()
   DBUG_ENTER("destroy_hash_workers");
   if (inited_hash_workers)
     my_hash_free(&mapping_db_to_worker);
+  mysql_mutex_destroy(&slave_worker_hash_lock);
+  mysql_cond_destroy(&slave_worker_hash_cond);
+
   DBUG_VOID_RETURN;
 }
 
@@ -217,7 +229,8 @@ void destroy_hash_workers()
 
         W_d := W_c unless W_c is NULL.
 
-   When W_c is NULL it is assigned to a least occupied,
+   When W_c is NULL it is assigned to a least occupied as defined by
+   @c get_least_occupied_worker().
 
         W_d := W_c := W_{least_occupied}
 
@@ -241,8 +254,12 @@ void destroy_hash_workers()
 
    @return the pointer to a Worker struct 
 */
-Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers /*, W_c */)
+Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli)
 {
+  uint i;
+  char key[NAME_LEN + 2];
+  DYNAMIC_ARRAY *workers= &rli->workers;
+
   DBUG_ENTER("get_slave_worker");
 
   if (!inited_hash_workers)
@@ -250,7 +267,21 @@ Slave_worker *get_slave_worker(const cha
 
   db_worker *entry= NULL;
   my_hash_value_type hash_value;
-  uint dblength= (uint) strlen(dbname);
+  uchar dblength= (uint) strlen(dbname);
+
+  // Search in CGAP
+  for (i= 0; i < rli->curr_group_assigned_parts.elements; i++)
+  {
+    get_dynamic(&rli->curr_group_assigned_parts, (uchar*) key, i);
+    if ((uchar) key[0] != dblength)
+      continue;
+    else
+      if (strncmp(key + 1, const_cast<char*>(dbname), dblength) == 0)
+        DBUG_RETURN(rli->last_assigned_worker);
+  }
+  key[0]= dblength;
+  memcpy(key + 1, dbname, dblength + 1);
+  insert_dynamic(&rli->curr_group_assigned_parts, (uchar*) key);
 
   DBUG_PRINT("info", ("Searching for %s, %d", dbname, dblength));
   /*
@@ -262,10 +293,17 @@ Slave_worker *get_slave_worker(const cha
 
   hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) dbname,
                            dblength);
-  if (!(entry= (db_worker *)
-      my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
-      (uchar*) dbname, dblength)))
+
+  mysql_mutex_lock(&slave_worker_hash_lock);
+
+  entry= (db_worker *)
+    my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
+                                    (uchar*) dbname, dblength);
+  if (!entry)
   {
+    my_bool ret;
+    mysql_mutex_unlock(&slave_worker_hash_lock);
+
     DBUG_PRINT("info", ("Inserting %s, %d", dbname, dblength));
     /*
       Allocate an entry to be inserted and if the operation fails
@@ -281,15 +319,18 @@ Slave_worker *get_slave_worker(const cha
     }
     strmov(db, dbname);
     entry->db= db;
+    entry->usage= 1;
     /*
-      Get a free worker based on a policy described in the function
-      get_free_worker().
+      Unless \exists the last assigned Worker, get a free worker based
+      on a policy described in the function get_least_occupied_worker().
     */
-    Slave_worker *worker= get_free_worker(workers);
-    entry->worker= worker;
-    worker->usage_partition++;
+    entry->worker= !rli->last_assigned_worker ?
+      get_least_occupied_worker(workers) : rli->last_assigned_worker;
 
-    if (my_hash_insert(&mapping_db_to_worker, (uchar*) entry))
+    mysql_mutex_lock(&slave_worker_hash_lock);
+    ret= my_hash_insert(&mapping_db_to_worker, (uchar*) entry);
+    mysql_mutex_unlock(&slave_worker_hash_lock);
+    if (ret)
     {
       my_free(db);
       my_free(entry);
@@ -298,6 +339,45 @@ Slave_worker *get_slave_worker(const cha
     }
     DBUG_PRINT("info", ("Inserted %s, %d", entry->db, (int) strlen(entry->db)));
   }
+  else
+  {
+    /* There is a record. Either  */
+    if (entry->usage == 0)
+    {
+      entry->worker= !rli->last_assigned_worker ? 
+        get_least_occupied_worker(workers) : rli->last_assigned_worker;
+      entry->usage++;
+      my_hash_update(&mapping_db_to_worker, (uchar*) entry,
+                     (uchar*) dbname, dblength);
+    }
+    else if (entry->worker == rli->last_assigned_worker ||
+             !rli->last_assigned_worker)
+    {
+
+      DBUG_ASSERT(entry->worker);
+
+      entry->usage++;
+      my_hash_update(&mapping_db_to_worker, (uchar*) entry,
+                     (uchar*) dbname, dblength);
+    }
+    else  // may be the hashing conflict
+    {
+      DBUG_ASSERT(rli->last_assigned_worker == NULL ||
+                  rli->curr_group_assigned_parts.elements > 1);
+
+      DBUG_ASSERT(0); // ... TODO ... *not* ready yet 
+
+      // future assignenment and marking at the same time
+      entry->worker= rli->last_assigned_worker;
+
+      wait();
+
+      DBUG_ASSERT(entry->usage == 0);
+      entry->usage= 1;
+
+    }
+    mysql_mutex_unlock(&slave_worker_hash_lock);
+  }
 
 err:
   if (entry)
@@ -306,15 +386,18 @@ err:
   DBUG_RETURN(entry ? entry->worker : NULL);
 }
 
-Slave_worker *get_free_worker(DYNAMIC_ARRAY workers)
+/**
+   least_occupied in partition number sense.
+*/
+Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *ws)
 {
   ulong usage= ULONG_MAX;
   Slave_worker *current_worker= NULL, *worker= NULL;
   ulong i= 0;
 
-  for (i= 0; i< workers.elements; i++)
+  for (i= 0; i< ws->elements; i++)
   {
-    get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &current_worker, i);
+    get_dynamic(ws, (uchar*) &current_worker, i);
     if (current_worker->usage_partition <= usage)
     {
       worker= current_worker;
@@ -323,6 +406,11 @@ Slave_worker *get_free_worker(DYNAMIC_AR
   }
   
   DBUG_ASSERT(worker != NULL);
+
+  worker->usage_partition++;
+
+  DBUG_ASSERT(worker->usage_partition != 0);
+
   return(worker);
 }
 
@@ -332,18 +420,58 @@ Slave_worker *get_free_worker(DYNAMIC_AR
 
    Affected by the being committed group APH tuples are updated.
    @c last_group_done_index member is set to the arg value.
-   for each D_i in CGEP
-       assert (W_id == APH.W_id|P_d == D_i)
-       update APH set U-- where P_d = D_i
-       if U == 0 \and count(APH) > max 
-          delete from APH where U = 0
-       
-    CGEP the Worker partition cache is cleaned up.
+
+   CGEP the Worker partition cache is cleaned up.
+
+   TODO: reclaim space if the actual size exceeds the limit.
 */
 
-void Slave_worker::slave_worker_ends_group(ulong gaq_idx)
+void Slave_worker::slave_worker_ends_group(ulong gaq_idx, int error)
 {
-  last_group_done_index = gaq_idx;
+  uint i;
+
+  if (!error)
+    last_group_done_index = gaq_idx;
+
+  for (i= curr_group_exec_parts.elements; i > 0; i--)
+  {
+    db_worker *entry= NULL;
+    my_hash_value_type hash_value;
+    char key[NAME_LEN + 2];
+
+    get_dynamic(&curr_group_exec_parts, (uchar*) key, i - 1);
+    hash_value= my_calc_hash(&mapping_db_to_worker, (uchar*) key + 1, key[0]);
+    
+    mysql_mutex_lock(&slave_worker_hash_lock);
+
+    entry= (db_worker *)
+      my_hash_search_using_hash_value(&mapping_db_to_worker, hash_value,
+                                      (uchar*) key + 1, key[0]);
+
+    DBUG_ASSERT(entry && entry->usage != 0 && entry->worker == this);
+
+    DBUG_ASSERT(strlen(key + 1) == (uchar) key[0]);
+
+    entry->usage--;
+    my_hash_update(&mapping_db_to_worker, (uchar*) entry,
+                   (uchar*) key + 1, key[0]);
+
+    if (entry->usage == 0)
+      usage_partition--;
+    else
+      DBUG_ASSERT(usage_partition != 0);
+    /*
+     TODO:
+       if U == 0 \and count(APH) > max 
+          delete from APH where U = 0;
+          delete entry;
+    */  
+
+    mysql_mutex_unlock(&slave_worker_hash_lock);
+    
+    delete_dynamic_element(&curr_group_exec_parts, i - 1);
+  }
+  curr_group_seen_begin= FALSE;
 }
 
 
@@ -476,6 +604,8 @@ ulong Slave_committed_queue::move_queue_
     Slave_job_group g;
     ulong l;
     get_dynamic(&Q, (uchar *) &g, i);
+    if (g.worker_id == (ulong) -1)
+      break; /* the head is not even assigned */
     get_dynamic(ws, (uchar *) &w_i, g.worker_id);
     get_dynamic(&last_done, (uchar *) &l, w_i->id);
 
@@ -491,6 +621,16 @@ ulong Slave_committed_queue::move_queue_
     {
       ulong ind= de_queue((uchar*) &lwm);
 
+      /* todo/fixme: the least occupied sorting out can be triggered here */
+      /* e.g 
+         set_dynamic(&w_id->c_rli->least_occupied_worker, &w_i->Q.len, w_i->id);
+         sort_dynamic(&w_id->c_rli->least_occupied_worker, (qsort_cmp) ulong_cmp);
+                  
+         int ulong_cmp(ulong *id1, ulong *id2)
+         {
+            return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+         }
+      */
       DBUG_ASSERT(ind == i);
       DBUG_ASSERT(g.total_seqno == lwm.total_seqno);
 

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-23 09:03:37 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-25 09:03:54 +0000
@@ -6,16 +6,24 @@
 #include "rpl_rli.h"
 #include <my_sys.h>
 
+/* APH entry */
 struct db_worker
 {
   const char *db;
   Slave_worker *worker;
+  ulong usage;
+
+  // todo: relax concurrency after making APH mutex/cond pair has worked
+  // pthread_mutex_t
+  // pthread_cond_t
+  // timestamp updated_at;
+
 } typedef db_worker;
 
 bool init_hash_workers(ulong slave_parallel_workers);
 void destroy_hash_workers();
-Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
-Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
+Slave_worker *get_slave_worker(const char *dbname, Relay_log_info *rli);
+Slave_worker *get_least_occupied_worker(DYNAMIC_ARRAY *workers);
 
 #define SLAVE_WORKER_QUEUE_SIZE 8096
 #define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
@@ -154,7 +162,7 @@ public:
   Relay_log_info *w_rli;
 
   DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
-
+  bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
   // @c last_group_done_index is for recovery, although can be viewed
   //    as statistics as well.
   // C marks a T-event with the incremented group_cnt that is
@@ -195,7 +203,7 @@ public:
 
   size_t get_number_worker_fields();
 
-  void slave_worker_ends_group(ulong);  // CGEP walk through to upd APH
+  void slave_worker_ends_group(ulong, int);  // CGEP walk through to upd APH
 
 private:
   bool read_info(Rpl_info_handler *from);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-23 09:03:37 +0000
+++ b/sql/rpl_slave.cc	2010-11-25 09:03:54 +0000
@@ -1041,7 +1041,7 @@ static bool sql_slave_killed(THD* thd, R
 
   DBUG_ASSERT(rli->info_thd == thd || thd->slave_thread);
   DBUG_ASSERT(rli->slave_running == 1 || thd->slave_thread);// tracking buffer overrun
-  if (abort_loop || thd->killed || rli->abort_slave)
+  if (abort_loop || thd->killed || rli->abort_slave || rli->slave_worker_is_error)
   {
     if (thd->transaction.all.modified_non_trans_table && rli->is_in_group())
     {
@@ -2509,6 +2509,14 @@ static int sql_delay_event(Log_event *ev
   DBUG_RETURN(0);
 }
 
+/**
+   a sort_dynamic function on ulong type
+   returns as specified by @c qsort_cmp
+*/
+int ulong_cmp(ulong *id1, ulong *id2)
+{
+  return *id1 < *id2? -1 : (*id1 > *id2? 1 : 0);
+}
 
 /**
   Applies the given event and advances the relay log position.
@@ -2673,10 +2681,24 @@ int apply_event_and_update_pos(Log_event
            error= ev->update_pos(rli);
 #if 1
 
-  // experimental checkpoint per each scheduling attempt
-  // logics of next_event()
+    // experimental checkpoint per each scheduling attempt
+    // logics of next_event()
+    {
+      uint i;
+      rli->gaq->move_queue_head(&rli->workers);
 
-    rli->gaq->move_queue_head(&rli->workers);
+      /* TODO: 
+         the least occupied sorting out needs moving to the actual
+         checkpoint location - next_event()
+      */
+      for (i= 0; i < rli->workers.elements; i++)
+      {
+        Slave_worker *w_i;
+        get_dynamic(&rli->workers, (uchar *) &w_i, i);
+        set_dynamic(&rli->least_occupied_workers, (uchar*) &w_i->jobs.len, w_i->id);
+      };
+      sort_dynamic(&rli->least_occupied_workers, (qsort_cmp) ulong_cmp);
+    }           
 
 #endif
 
@@ -3538,8 +3560,9 @@ pthread_handler_t handle_slave_worker(vo
       error= slave_worker_exec_job(w, rli);
   }
 
-  if (!rli->info_thd->killed)
+  if (error)
   {
+    rli->slave_worker_is_error= w;
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
@@ -3642,8 +3665,7 @@ int slave_start_single_worker(Relay_log_
   // CGEP dynarray holds id:s of partitions of the Current being executed Group
   my_init_dynamic_array(&w->curr_group_exec_parts, NAME_LEN,
                         SLAVE_INIT_DBS_IN_GROUP, 1);
-
-  
+  w->curr_group_seen_begin= FALSE;
 
   if (pthread_create(&th, &connection_attrib, handle_slave_worker,
                      (void*) w))
@@ -3657,6 +3679,8 @@ int slave_start_single_worker(Relay_log_
   if (w->jobs.len != 0)
     mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
   mysql_mutex_unlock(&w->jobs_lock);
+  // Least occupied inited with zero
+  insert_dynamic(&rli->least_occupied_workers, (uchar*) &w->jobs.len);
 
 err:
   return error;
@@ -3669,7 +3693,12 @@ int slave_start_workers(Relay_log_info *
   int error= 0;
 
   // CGAP dynarray holds id:s of partitions of the Current being executed Group
-  my_init_dynamic_array(&rli->curr_group_assigned_parts, NAME_LEN, SLAVE_INIT_DBS_IN_GROUP, 1);
+  my_init_dynamic_array(&rli->curr_group_assigned_parts, 1 + NAME_LEN + 1, SLAVE_INIT_DBS_IN_GROUP, 1);
+  rli->last_assigned_worker= NULL; /* associated with curr_group_assigned */
+  my_init_dynamic_array(&rli->curr_group_da, sizeof(Log_event*), 8, 2);
+  // Least_occupied_workers array
+  my_init_dynamic_array(&rli->least_occupied_workers,
+                        sizeof(Slave_jobs_queue::len), n, 0);
 
   // GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
   //      @c lwm_checkpoint_period to update GAQ (see @c next_event())
@@ -3681,15 +3710,16 @@ int slave_start_workers(Relay_log_info *
                                       sizeof(Slave_job_group),
                                       ::slave_max_pending_jobs, n);
   rli->mts_total_groups= 0;
+  rli->slave_worker_is_error= NULL;
+  rli->curr_group_seen_begin= NULL;
+
   for (i= 0; i < n; i++)
   {
     if ((error= slave_start_single_worker(rli, i)))
-    {
       goto err;
-    }
   }
 
-  if (init_hash_workers(n))  // MTS TODO: mapping_db_to_worker -> APH
+  if (init_hash_workers(n))  // MTS: mapping_db_to_worker
   {
     sql_print_error("Failed to init partitions hash");
     error= 1;
@@ -3706,7 +3736,8 @@ err:
 void slave_stop_workers(Relay_log_info *rli)
 {
   int i;
-
+  THD *thd= rli->info_thd;
+  
   for (i= rli->workers.elements - 1; i >= 0; i--)
   {
     Slave_worker *w;
@@ -3726,7 +3757,7 @@ void slave_stop_workers(Relay_log_info *
     mysql_mutex_unlock(&w->info_thd->LOCK_thd_data);
   }
   
-  thd_proc_info(rli->info_thd, "Waiting for workers to exit");
+  thd_proc_info(thd, "Waiting for workers to exit");
 
   for (i= rli->workers.elements - 1; i >= 0; i--)
   {
@@ -3737,16 +3768,22 @@ void slave_stop_workers(Relay_log_info *
     w->end_info(key_worker_idx, NUMBER_OF_FIELDS_TO_IDENTIFY_WORKER);
 
     mysql_mutex_lock(&w->jobs_lock);
-    if (w->jobs.len != rli->slave_pending_jobs_max + 1)
+    while (w->jobs.len != rli->slave_pending_jobs_max + 1)
+    {
+      const char *save_proc_info;
+      save_proc_info= thd->enter_cond(&w->jobs_cond, &w->jobs_lock,
+                                      "Waiting for workers to exit");
       mysql_cond_wait(&w->jobs_cond, &w->jobs_lock);
+      thd->exit_cond(save_proc_info);
+      mysql_mutex_lock(&w->jobs_lock);
+    }
     mysql_mutex_unlock(&w->jobs_lock);
-
     mysql_mutex_destroy(&w->jobs_lock);
     mysql_cond_destroy(&w->jobs_cond);
 
     DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
     delete_dynamic(&w->jobs.Q);
-
+    delete_dynamic(&w->curr_group_exec_parts);   // GCEP
     delete_dynamic_element(&rli->workers, i);
     delete w;
   }
@@ -3755,6 +3792,9 @@ void slave_stop_workers(Relay_log_info *
 
   destroy_hash_workers();
   delete rli->gaq;
+  delete_dynamic(&rli->least_occupied_workers);    // least occupied
+  delete_dynamic(&rli->curr_group_da);             // GCDA
+  delete_dynamic(&rli->curr_group_assigned_parts); // GCAP
 }
 
 /**


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101125090354-b3lesk38qgv8xvha.bundle
Thread
bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3217) WL#5569WL#5599Andrei Elkin25 Nov