List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:November 18 2010 2:01pm
Subject:bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3210 to 3211)
WL#5569
View as plain text  
 3211 Andrei Elkin	2010-11-18
      wl#5569 MTS
      
      Extending the wl#5563 prototype gradually.
      This commit addresses:
      1. recovery interface (a new Worker rli plus rli->gaq and pseudo-code for checkpoint
         to update GAQ and the central RLI recovery table.
         Wrt rli, C and W execute do_apply_event(c_rli) where c_rli is the central
         instance. C executes update_pos(c_rli), but W update_pos(w_rli).
      
      
      
      others:
      - decreased processing time for rpl_parallel, serial.
     @ sql/log_event.cc
        Enhance Log_event::get_slave_worker_id() to classify events by set of parallelization properties;
         Presence of a property in an event forces some actions both on C and W side.
        en_queue etc are prepared to turn into circular_buffer_queue methods.
        Pseudo-coded numerious todo:s wrt to low-level-design implementation.
        Deployed changes due to Worker private rli.
        Annotated on Deferred Array for B,p,r property events.
        
        delete ev is moved from C to W which is fault-prone but it could not be kept
        any longer as a part of de_queue() that transits into cir_buf_queue class.
     @ sql/log_event.h
        removed `soiled' that was used to make delete ev run safely.
        Added Log_event methods identifying the parallelization properties, incl
         - contains_partition_info() to identify events containing 
           info to be processed by the partition hash func
         - starts,ends_group()
         - also updated the list of only_serial().
     @ sql/rpl_rli.cc
        Only Coordinator can destroy Workers dynarray;
        Relay_log_info::get_current_worker() turned out to become more complicated, see comments;
        Reminder to migrate rli->future... into  ev-> future_event_relay_log_pos
        which would make Worker to find the value out the event's context;
        Prototyped //  w->flush_info() in stmt_done;
     @ sql/rpl_rli.h
        The worker RLI has `this_worker' pointing to the actual worker instance.
     @ sql/rpl_rli_pdb.cc
        Annotated with fine details APH etc implementation.
     @ sql/rpl_rli_pdb.h
        Trasformed earlier queue struct into a family of classes.
        Recovery interface: last_group_done_index of Slave_worker to be filled in with an index
        of GAQ queue by W. To poll the value by C at checkpoint.
        Added CGEP to W context (sim to CGAP of C).
     @ sql/rpl_slave.cc
        Simplified the Worker poll.
        Deployed worker rli initialization.
        Recovery: rli->gaq is instantiated by C at worker poll activization.
        Recovery: pseudo-code for checkpoint in next_event().
     @ sql/sys_vars.cc
        editted help lines for slave_max_pending_jobs.

    modified:
      mysql-test/extra/rpl_tests/rpl_parallel_load.test
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/sys_vars.cc
 3210 Alfranio Correia	2010-11-14
      Post-push fix for WL#5599.

    modified:
      mysql-test/suite/funcs_1/r/is_columns_mysql.result
      mysql-test/suite/funcs_1/r/is_key_column_usage.result
      mysql-test/suite/funcs_1/r/is_statistics.result
      mysql-test/suite/funcs_1/r/is_statistics_mysql.result
      mysql-test/suite/funcs_1/r/is_table_constraints.result
      mysql-test/suite/funcs_1/r/is_table_constraints_mysql.result
      mysql-test/suite/funcs_1/r/is_tables_mysql.result
=== modified file 'mysql-test/extra/rpl_tests/rpl_parallel_load.test'
--- a/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-09-21 22:19:05 +0000
+++ b/mysql-test/extra/rpl_tests/rpl_parallel_load.test	2010-11-18 14:00:52 +0000
@@ -13,7 +13,7 @@ let $workers = `select @@global.slave_pa
 #
 # load volume parameter
 #
-let $iter = 5000;
+let $iter = 1000;
 
 connection slave;
 
@@ -230,6 +230,7 @@ start slave sql_thread;
 let $wait_timeout= 600;
 let $wait_condition= SELECT count(*)+sleep(1) = 5 FROM test0.benchmark;
 source include/wait_condition.inc;
+
 use test;
 select * from test0.benchmark into outfile 'benchmark.out';
 select ts from test0.benchmark where state like 'master started load' into @m_0;
@@ -239,6 +240,13 @@ select ts from test0.benchmark where sta
 select time_to_sec(@m_1) - time_to_sec(@m_0) as 'delta_m', 
        time_to_sec(@s_1) - time_to_sec(@s_0) as 'delta_s' into outfile 'delta.out';
 
+# debug: pre diff check-out
+--disable_result_log
+--disable_query_log
+###select sleep(9999);
+--enable_result_log
+--enable_query_log
+
 let $i = $workers + 1;
 while($i)
 {
@@ -255,6 +263,13 @@ while($i)
 --enable_result_log
 --enable_query_log
 
+# debug: pre diff check-out
+--disable_result_log
+--disable_query_log
+###select sleep(9999);
+--enable_result_log
+--enable_query_log
+
 
 connection master;
 

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-11 11:53:01 +0000
+++ b/sql/log_event.cc	2010-11-18 14:00:52 +0000
@@ -2154,47 +2154,100 @@ Log_event::continue_group(Relay_log_info
   return Log_event::do_shall_skip(rli);
 }
 
+
+bool Log_event::contains_partition_info()
+{
+  return get_type_code() == TABLE_MAP_EVENT ||
+
+    // todo: the 4 types below are limitly parallel-supported (the default 
+    // session db not the actual db)
+      
+    get_type_code() == QUERY_EVENT;
+}
+
 /**
-   *** a template method to be replaced by the actual hash func   ***
-   *** Limitted to work for STMT binlog format events             *** 
-   *** Rows-event handling is limitted to one Worker              ***
-   @return index \in [0, M] range, where M is the max index of the worker pool.
+   General hashing function to compute the id of an applier for
+   the current event.
+   At computing the id few rules apply depending on partitioning properties
+   that the event instance can feature.
+
+   Let's call the properties.
+
+   B - beginning of a group of events (BEGIN query_log_event)
+   g - mini-group representative event containing the partition info
+      (any Table_map, a Query_log_event)
+   p - a mini-group internal event that *p*receeding its g-parent
+      (int_, rand_, user_ var:s) 
+   r - a mini-group internal "regular" event that follows its g-parent
+      (Write, Update, Delete -rows)
+   S - sequentially applied event (may not be a part of any group).
+       Events of this type are determined via @c only_serial_exec()
+       earlier and don't cause calling this method .
+   T - terminator of the group (XID, COMMIT, ROLLBACK)
+
+   Only `g' case requires to compute the assigned Worker id.
+   In `T, r' cases it's @c last_assigned_worker that is one that was
+   assigned at the last `g' processing.
+   In `B' case it's NULL to indicate the Coordinator will skip doing anything
+   more with the event. Its scheduling gets deffered until the following 
+   `g' event names a Worker.
+   
+   @note `p' and g-Query-log-event is not supported yet.
+
+   @note The function can update APH, CGAP objects.
+
+   @return a pointer to the Worker stuct or NULL.
 */
 
 Slave_worker *Log_event::get_slave_worker_id(Relay_log_info const *rli)
 {
-  if (!(get_type_code() == XID_EVENT  ||
-    get_type_code() == INTVAR_EVENT ||
-    get_type_code() == USER_VAR_EVENT ||
-    get_type_code() == RAND_EVENT ||
-    get_type_code() == DELETE_ROWS_EVENT ||
-    get_type_code() == UPDATE_ROWS_EVENT ||
-    get_type_code() == WRITE_ROWS_EVENT))
+  /* checking properties and perform corresponding actions */
+
+  // g
+  if (contains_partition_info())
   {
-    Slave_worker *worker= get_slave_worker(get_db(), rli->workers);
+    // a lot of things inside `get_slave_worker_id'
+    Slave_worker *worker= get_slave_worker(get_db(), rli->workers /*, W_c */);
     const_cast<Relay_log_info *>(rli)->last_assigned_worker= worker;
   }
 
+  // B
+  if (starts_group())
+  {
+    // insert {NULL, W_s} to yield a new Group_cnt indexed element in GAQ
+    // Group_cnt= rli->gaq->e;
+    // rli->gaq->en_queue({NULL, W_s}); 
+    // B-event is appended to the Deferred Array associated with GCAP
+  }
+
+  // T
+  if (ends_group())
+  {
+    //  assert (\exists P_k . W_s \in CGAP) if P_k is present in ev
+
+    // cleanup: CGAP := nil
+
+    //  ev->group_cnt := GAQ.Group_cnt, so that
+    //  at procesing *Worker*.WQ.last_group_cnt := ev->group_cnt
+  }
+
+  // todo: p. the first p-event clears CGAP. Each p-event is appened to DA.
+
+  // r
   return (rli->last_assigned_worker);
 }
 
 static int en_queue(Slave_jobs_queue *jobs, Slave_job_item *item)
 {
-  Slave_job_item free;
-
   if (jobs->a == jobs->s)
   {
     DBUG_ASSERT(jobs->a == jobs->Q.elements);
     return -1;
   }
-  // GC
-  get_dynamic(&jobs->Q, (uchar*) &free, jobs->a);
-  if (free.data != NULL)
-    delete static_cast<Log_event *>(free.data);
 
   // store
-  free.data= item->data;
-  set_dynamic(&jobs->Q, (uchar*) &free, jobs->a);
+
+  set_dynamic(&jobs->Q, (uchar*) item, jobs->a);
 
   // pre-boundary cond
   if (jobs->e == jobs->s)
@@ -2220,12 +2273,12 @@ static void * head_queue(Slave_jobs_queu
   if (jobs->e == jobs->s)
   {
     DBUG_ASSERT(jobs->len == 0);
-    ret->data= NULL;
+    ret->data= NULL;               // todo: move to caller
     return NULL;
   }
   get_dynamic(&jobs->Q, (uchar*) ret, jobs->e);
 
-  DBUG_ASSERT(ret->data);
+  DBUG_ASSERT(ret->data);         // todo: move to caller
  
   return ret;
 }
@@ -2325,7 +2378,7 @@ int Log_event::apply_event(Relay_log_inf
   Slave_worker *w= NULL;
   Slave_job_item item= {NULL}, *job_item= &item;
 
-  if (!rli->is_parallel_exec() || only_serial_exec())
+  if (!rli->is_parallel_exec() || only_serial_exec() /* || wait(APH.N == 0) */)
     DBUG_RETURN(do_apply_event(rli));
  
   if (!(w= get_slave_worker_id(rli)) ||
@@ -2335,6 +2388,14 @@ int Log_event::apply_event(Relay_log_inf
   job_item->data= this;
 
   DBUG_PRINT("Log_event::apply_event:", ("-> job item data %p to W_%lu", job_item->data, w->id));
+
+  // TODO: MTS-APH  DA (BEGIN)
+  // for (i= 0; i < CGDA.elements; i++)
+  // {
+  //   append_item_to_jobs(job_item, CGDA[i], const_cast<Relay_log_info*>(rli));
+  // }
+  // todo: resize if elements > min
+  // CGDA.elements= 0;
   append_item_to_jobs(job_item, w, const_cast<Relay_log_info*>(rli));
   DBUG_RETURN(FALSE);
 }
@@ -2380,6 +2441,9 @@ struct slave_job_item* pop_jobs_item(Sla
   mts-II worker main routine.
   The worker thread waits for an event, execute it, fixes statistics counters.
 
+  @note the function maintains CGEP and modifies APH, and causes
+        modification of GAQ.
+
   @return 0 success 
          -1 got killed or an error happened during appying
 */
@@ -2409,10 +2473,33 @@ int slave_worker_exec_job(Slave_worker *
 
   DBUG_PRINT("slave_worker_exec_job:", ("W_%lu <- job item: %p data: %p thd: %p", w->id, job_item, ev, thd));
 
+  if (ev->starts_group())
+  {
+    // ... ? do nothing ?
+  }
+  if (ev->contains_partition_info())
+  {
+    // TODO: w->CGEP .= ev->get_db()
+  }
+  if (ev->ends_group())
+  {
+    w->slave_worker_ends_group();
+
+    // TODO: GAQ related
+    // w->last_group_done_index = ev->group_cnt
+  }
+
   error= ev->do_apply_event(rli);
 
   mysql_mutex_lock(&w->jobs_lock);
-  de_queue(&w->jobs, job_item); // now event of the item is garbage
+  de_queue(&w->jobs, job_item);
+  /*
+    preserving signatures of existing methods.
+    todo: convert update_pos(w->w_rli) -> update_pos(w)
+          to remove w_rli w/a
+  */
+  ev->update_pos(w->w_rli);
+  delete ev;  // after ev->update_pos() event is garbage
   mysql_mutex_unlock(&w->jobs_lock);
 
   /* statistics */

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-11-10 10:57:13 +0000
+++ b/sql/log_event.h	2010-11-18 14:00:52 +0000
@@ -645,6 +645,7 @@ class THD;
 class Format_description_log_event;
 class Relay_log_info;
 class Slave_worker;
+class Slave_committed_queue;
 
 #ifdef MYSQL_CLIENT
 enum enum_base64_output_mode {
@@ -1141,13 +1142,6 @@ public:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
 public:
 
-    /**
-     mts-II (todo: may change in favor of relocatin update_pos to rli class)
-     Coordinator to mark the instance as available to delete.
-     The member is designed for the slave side instance.
-  */
-  volatile bool soiled;
-
   /**
      mst-II: to execute serially due to 
              technical or conceptual limitation
@@ -1157,6 +1151,14 @@ public:
   bool only_serial_exec()
   {
     return
+      // todo: the 4 types below are limitly parallel-supported (the default 
+      // session db not the actual db)
+      
+      // get_type_code() == QUERY_EVENT ||
+      // get_type_code() == INTVAR_EVENT ||
+      // get_type_code() == USER_VAR_EVENT ||
+      // get_type_code() == RAND_EVENT ||
+
       get_type_code() == STOP_EVENT ||
       get_type_code() == ROTATE_EVENT ||
       get_type_code() == LOAD_EVENT ||
@@ -1174,6 +1176,18 @@ public:
       get_type_code() == PRE_GA_DELETE_ROWS_EVENT ||
       get_type_code() == INCIDENT_EVENT;
   }
+  
+  /**
+     Events of a cetain type carry partitioning data such as db names.
+  */
+  bool contains_partition_info();
+
+  /**
+     Events of a cetain type start or end a group of events treated
+     transactionally wrt binlog.
+  */
+  virtual bool starts_group() { return FALSE; }
+  virtual bool ends_group()   { return FALSE; }
 
   /**
      @return index  in \in [0, M] range to indicate
@@ -1832,6 +1846,19 @@ public:        /* !!! Public in this pat
       !strncasecmp(query, "SAVEPOINT", 9) ||
       !strncasecmp(query, "ROLLBACK", 8);
   }
+  /**
+     todo: Parallel support for DDL:s.
+     DDL queries are logged without BEGIN/COMMIT parentheses
+     and can be regarded as the starting and the ending events of 
+     its self-group.
+  */
+  bool starts_group() { return !strncmp(query, "BEGIN", q_len); }
+  bool ends_group()
+  {  
+    return
+      !strncmp(query, "COMMIT", q_len) ||
+      !strncasecmp(query, STRING_WITH_LEN("ROLLBACK"));
+  }
 };
 
 
@@ -2579,7 +2606,7 @@ class Xid_log_event: public Log_event
   bool write(IO_CACHE* file);
 #endif
   bool is_valid() const { return 1; }
-
+  bool ends_group() { return TRUE; }
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli.cc	2010-11-18 14:00:52 +0000
@@ -76,6 +76,7 @@ Relay_log_info::Relay_log_info(bool is_s
    until_log_pos(0), retried_trans(0),
    tables_to_lock(0), tables_to_lock_count(0),
    rows_query_ev(NULL), last_event_start_time(0),
+   this_worker(NULL),
    sql_delay(0), sql_delay_end(0),
    m_flags(0)
 {
@@ -145,31 +146,48 @@ Relay_log_info::~Relay_log_info()
   mysql_mutex_destroy(&pending_jobs_lock);
   mysql_cond_destroy(&pending_jobs_cond);
 
-  delete_dynamic(&workers);
+  if (!this_worker)
+    delete_dynamic(&workers);
 
   DBUG_VOID_RETURN;
 }
 
+/**
+   The method can be run both by C having the Main (coord) rli context and
+   by W having both the main and the Local (worker) rli context.
+   Decistion matrix:
+
+       Main   Local
+     -+-------------
+     C| this_w  -
+     W| w_i    this_w
+
+*/
 Slave_worker* Relay_log_info::get_current_worker() const
 { 
   uint i;
-  Slave_worker* w_i;
   if (!is_parallel_exec() || info_thd == current_thd)
-    return NULL;
+    return this_worker; //  can be asserted:  !this_worker => C
   for (i= 0; i< workers.elements; i++)
   {
-    // convert to use hashing
+    Slave_worker* w_i;
+    // todo: optimaze/replace the loop
     get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &w_i, i);
     if (w_i->info_thd == current_thd)
+    {
       return w_i;
+    }
   }
   DBUG_ASSERT(0);
 }
 
+/**
+   The method can be run both by C having the Main context.
+*/
 RPL_TABLE_LIST** Relay_log_info::get_tables_to_lock() const
 {
   return
-    ((!is_parallel_exec()) || (info_thd == current_thd)) ?
+    ((!is_parallel_exec()) || info_thd == current_thd) ?
     const_cast<RPL_TABLE_LIST**>(&tables_to_lock) :
     &get_current_worker()->tables_to_lock;
 }
@@ -998,12 +1016,22 @@ void Relay_log_info::stmt_done(my_off_t 
     while the MyISAM table has already been updated.
   */
   if ((info_thd->variables.option_bits & OPTION_BEGIN) && opt_using_transactions)
-    inc_event_relay_log_pos();
+    inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
   else
   {
+    Slave_worker *w;
     inc_group_relay_log_pos(event_master_log_pos);
+    
     /* Alfranio needs to update the coordinator and workers. */
-    flush_info(key_info_idx, key_info_size, is_transactional() ? TRUE : FALSE);
+    
+    if ((w= get_current_worker()) == NULL)
+      flush_info(key_info_idx, key_info_size, is_transactional() ? TRUE : FALSE);
+    
+    // Andrei: tried testing the worker's method but got a segfault
+    // because in to->set_info(group_relay_log_name) the arg is NULL.
+
+    //else
+    //  w->flush_info(key_info_idx, key_info_size, is_transactional() ? TRUE : FALSE);
   }
 }
 
@@ -1012,7 +1040,7 @@ void Relay_log_info::cleanup_context(THD
 {
   DBUG_ENTER("Relay_log_info::cleanup_context");
 
-  DBUG_ASSERT(info_thd == thd || is_parallel_exec());
+  DBUG_ASSERT((info_thd == thd) || is_parallel_exec());
   /*
     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
     may have opened tables, which we cannot be sure have been closed (because

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli.h	2010-11-18 14:00:52 +0000
@@ -271,20 +271,6 @@ public:
   */
   ulong slave_exec_mode;
 
-  /*
-    WL#5563 mts-II
-  */
-  DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
-  volatile int pending_jobs;
-  ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch)
-  mysql_mutex_t pending_jobs_lock;
-  mysql_cond_t pending_jobs_cond;
-  int   slave_pending_jobs_max;
-  Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
-
-  // List<slave_job_item> free_jobs;
-
-  Slave_worker* get_current_worker() const;
   RPL_TABLE_LIST** get_tables_to_lock() const;
   uint add_table_to_lock(RPL_TABLE_LIST *table_list);
 
@@ -435,6 +421,22 @@ public:
   */
   time_t last_event_start_time;
 
+  /*
+    WL#5569 MTS-II
+  */
+  DYNAMIC_ARRAY workers; // number's is determined by global slave_parallel_workers
+  volatile int pending_jobs;
+  ulong trans_jobs, wait_jobs, stmt_jobs; // live time is one trans, statement (ndb epoch)
+  mysql_mutex_t pending_jobs_lock;
+  mysql_cond_t pending_jobs_cond;
+  int   slave_pending_jobs_max;
+  Slave_worker  *last_assigned_worker; // a hint to partitioning func for some events
+  Slave_committed_queue *gaq;
+
+  Slave_worker* get_current_worker() const;
+  Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
+  Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
+
   /**
     Helper function to do after statement completion.
 

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-11-18 14:00:52 +0000
@@ -201,7 +201,47 @@ void destroy_hash_workers()
   DBUG_VOID_RETURN;
 }
 
-Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers)
+/**
+   The function produces a reference to the struct of a Worker
+   that has been or will be engaged to process the @c dbname -keyed  partition (D).
+   It checks a local to Coordinator CGAP list first and returns 
+   @c last_assigned_worker when found (todo: assert).
+
+   Otherwise, the partition is appended to the current group list:
+
+        CGAP .= D
+
+   and a possible  D's Worker id is searched in APH that collects tuples
+   (P, W_id, U, mutex, cond).
+   In case not found,
+
+        W_d := W_c unless W_c is NULL.
+
+   When W_c is NULL it is assigned to a least occupied,
+
+        W_d := W_c := W_{least_occupied}
+
+        APH .=  a new (D, W_d, 1) 
+
+   In a case APH contains W_d == W_c, (assert U >= 1)
+
+        update APH set  U++ where  APH.P = D
+
+   The case APH contains a W_d != W_c != NULL assigned to D-partition represents
+   the hashing conflict and is handled as the following:
+
+     a. marks the record of APH with a flag requesting to signal in the
+        cond var when `U' the usage counter drops to zero by the other Worker;
+     b. waits for the other Worker to finish tasks on that partition and
+        gets the signal;
+     c. updates the APH record to point to the first Worker (naturally, U := 1),
+        scheduled the event, and goes back into the parallel mode
+
+   @note modifies  CGAP, APH
+
+   @return the pointer to a Worker struct 
+*/
+Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers /*, W_c */)
 {
   DBUG_ENTER("get_slave_worker");
 
@@ -285,3 +325,22 @@ Slave_worker *get_free_worker(DYNAMIC_AR
   DBUG_ASSERT(worker != NULL);
   return(worker);
 }
+
+/**
+   Deallocative routine that makes few things in opposite to
+   @c get_slave_worker().
+
+   Affected by the being committed group APH tuples are updated.
+
+   for each D_i in CGEP
+       assert (W_id == APH.W_id|P_d == D_i)
+       update APH set U-- where P_d = D_i
+       if U == 0 \and count(APH) > max 
+          delete from APH where U = 0
+       
+    CGEP the Worker partition cache is cleaned up.
+*/
+
+void Slave_worker::slave_worker_ends_group()
+{
+}

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-18 14:00:52 +0000
@@ -18,20 +18,59 @@ Slave_worker *get_slave_worker(const cha
 Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
 
 #define SLAVE_WORKER_QUEUE_SIZE 8096
+#define SLAVE_INIT_DBS_IN_GROUP 4     // initial allocation for CGEP dynarray
 
 typedef struct slave_job_item
 {
   void *data;
 } Slave_job_item;
 
-typedef struct slave_jobs_queue
+class circular_buffer_queue
 {
+public:
+
   DYNAMIC_ARRAY Q;
   ulong s;
   ulong a;
   ulong e;
-  ulong len;
-} Slave_jobs_queue;
+  volatile ulong len;   // it is also queried to compute least occupied
+
+  circular_buffer_queue(uint el_size, ulong max, uint alloc_inc= 0) :
+    s(max), a(0), e(max), len(0)
+  {
+    my_init_dynamic_array(&Q, el_size, s, alloc_inc);
+  };
+  circular_buffer_queue () {};
+};
+
+/**
+  Group Assigned Queue whose first element identifies first gap
+  in committed sequence. The head of the queue is therefore next to 
+  the low-water-mark.
+*/
+class Slave_committed_queue : public circular_buffer_queue
+{
+public:
+  // Allocation of file_name that is common for all Slave_assigned_job_group:s
+  char current_binlog[FN_REFLEN];
+  void update_current_binlog(const char *post_rotate); //master's Rotate exec it 
+  Slave_committed_queue (const char *log, uint el_size, ulong max, uint inc= 0)
+    : circular_buffer_queue(el_size, max, inc)
+  {
+    strmake(current_binlog, log, sizeof(current_binlog) - 1);
+  };
+};
+
+typedef struct st_slave_assigned_job_group
+{
+  struct event_coordinates coord;
+  ulong worker_id;
+  ulonglong total_seqno;
+} Slave_assigned_job_group;
+
+class Slave_jobs_queue : public circular_buffer_queue
+{
+};
 
 class Slave_worker : public Rpl_info_worker
 {
@@ -41,10 +80,21 @@ public:
 
   mysql_mutex_t jobs_lock;
   mysql_cond_t  jobs_cond;
-
-  //List<struct slave_job_item> jobs;
   Slave_jobs_queue jobs;
 
+  // fixme: experimental
+  Relay_log_info *w_rli;
+
+  DYNAMIC_ARRAY curr_group_exec_parts; // CGEP
+
+  // @c last_group_done_index is for recovery, although can be viewed
+  //    as statistics as well.
+  // C marks a T-event with the incremented group_cnt that is
+  // an index in GAQ; W stores it  at the event execution. 
+  // C polls the value periodically to maintain an array
+  // of the indexes in order to progress on GAQ's lwm, see @c next_event().
+  volatile ulong last_group_done_index; // it's index in GAQ
+
   List<Log_event> data_in_use; // events are still in use by SQL thread
   ulong id;
   TABLE *current_table;
@@ -76,6 +126,8 @@ public:
 
   size_t get_number_worker_fields();
 
+  void slave_worker_ends_group();  // CGEP walk through to upd APH
+
 private:
   bool read_info(Rpl_info_handler *from);
   bool write_info(Rpl_info_handler *to);

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-11 11:53:01 +0000
+++ b/sql/rpl_slave.cc	2010-11-18 14:00:52 +0000
@@ -70,6 +70,12 @@ MY_BITMAP slave_error_mask;
 char slave_skip_error_names[SHOW_VAR_FUNC_BUFF_SIZE];
 
 typedef bool (*CHECK_KILLED_FUNC)(THD*,void*);
+typedef struct st_slave_worker_init_args
+{
+  Relay_log_info *rli;
+  Slave_worker *w;
+} SLAVE_WORKER_INIT_ARGS;
+
 
 char* slave_load_tmpdir = 0;
 Master_info *active_mi= 0;
@@ -2617,8 +2623,6 @@ int apply_event_and_update_pos(Log_event
     // Sleeps if needed, and unlocks rli->data_lock.
     if (sql_delay_event(ev, thd, rli))
       DBUG_RETURN(0);
-    if (rli->is_parallel_exec()) // todo: consider adding `ev' as an arg 
-      ev->soiled= FALSE;
     exec_res= ev->apply_event(rli);
   }
   else
@@ -2662,7 +2666,16 @@ int apply_event_and_update_pos(Log_event
     int error= 0;
     if (!(ev->get_type_code() == XID_EVENT && rli->is_transactional()) ||
         skip_event)
-      error= ev->update_pos(rli);
+
+//  TODO: Alfranio to fix/restore the condition to update the main RLI
+//      It is kept the prototype time way in order to process rpl_parallel.
+//      This is parallel exec and event required the sequential mode
+//      that also includes all Workers finished their assignements.
+//      It was served so inside apply_event() above.
+//      The main RLI table is safe to update now.
+
+//      if (!rli->is_parallel_exec() || ev->only_serial_exec())
+           error= ev->update_pos(rli);
 #ifndef DBUG_OFF
     DBUG_PRINT("info", ("update_pos error = %d", error));
     if (!rli->belongs_to_client())
@@ -2693,15 +2706,10 @@ int apply_event_and_update_pos(Log_event
                   " Stopped in %s position %s",
                   rli->get_group_relay_log_name(),
                   llstr(rli->get_group_relay_log_pos(), buf));
-      if (rli->is_parallel_exec())
-        ev->soiled= TRUE;
       DBUG_RETURN(2);
     }
   }
 
-  if (rli->is_parallel_exec())
-    ev->soiled= TRUE;
-
   DBUG_RETURN(exec_res ? 1 : 0);
 }
 
@@ -2845,7 +2853,7 @@ static int exec_relay_log_event(THD* thd
         handle_rows_query_log_event(ev, rli);
 
       if (!rli->is_parallel_exec() &&
-           ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
+          ev->get_type_code() != ROWS_QUERY_LOG_EVENT)  // mts todo: check this case
       {
 
         DBUG_PRINT("info", ("Deleting the event after it has been executed"));
@@ -3481,7 +3489,13 @@ pthread_handler_t handle_slave_worker(vo
   Slave_worker *w;
   int error= 0;
 
+  // TODO: remove pending_jobs ref:s to
+  // TODO: ulong id;
+
+  // rli= ((SLAVE_WORKER_INIT_ARGS *) arg)->rli;
+  
   Relay_log_info* rli = ((Master_info*)arg)->rli;
+
   my_thread_init();
   DBUG_ENTER("handle_slave_worker");
 
@@ -3498,13 +3512,23 @@ pthread_handler_t handle_slave_worker(vo
     goto err;
   }
 
-  /* handshake with each started workers */
+  /* handshake with each started worker */
   mysql_mutex_lock(&rli->pending_jobs_lock);
   get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, rli->pending_jobs - 1);
   rli->pending_jobs= 0;
+
+  // TODO: w= ((SLAVE_WORKER_INIT_ARGS *) arg)->w;
+
   w->info_thd= thd;
   w->tables_to_lock= NULL;
   w->tables_to_lock_count= 0;
+
+  // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
+  w->w_rli= Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
+  w->w_rli->info_thd= thd;
+  w->w_rli->workers= rli->workers; // shallow copying is sufficient
+  w->w_rli->this_worker= w;
+
   thd->thread_stack = (char*)&thd;
   mysql_cond_signal(&rli->pending_jobs_cond); // informing the parent
   mysql_mutex_unlock(&rli->pending_jobs_lock);
@@ -3543,21 +3567,92 @@ err:
     delete thd;
     mysql_mutex_unlock(&LOCK_thread_count);
   }
+
+  delete w->w_rli; // fixme: experimenting
+
   my_thread_end();
   pthread_exit(0);
   DBUG_RETURN(0);        
 }
 
+/**
+   A single Worker thread is forked out.
+   
+   @return 0 suppress or 1 if fails
+*/
+int slave_start_single_worker(Relay_log_info *rli, ulong i)
+{
+  int error= 0;
+  uint k;
+  pthread_t th;
+  Slave_worker *w=
+    Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i);
+  Slave_job_item empty= {NULL};
+  SLAVE_WORKER_INIT_ARGS worker_args= {rli, w};
+
+  w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
+  w->id= i;
+  w->current_table= NULL;
+  w->usage_partition= 0;
+  w->last_group_done_index= rli->gaq->s; // out of range
+
+  // Queue initialization
+  rli->slave_pending_jobs_max= ::slave_max_pending_jobs; // may change while offline
+  w->jobs.s= rli->slave_pending_jobs_max + 1;
+  my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
+  for (k= 0; k < w->jobs.s; k++)
+    insert_dynamic(&w->jobs.Q, (uchar*) &empty);
+  
+  DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
+  
+  w->jobs.e= w->jobs.s;
+  w->jobs.len= w->jobs.a= 0;
+  
+  set_dynamic(&rli->workers, (uchar*) &w, i);
+  mysql_mutex_init(key_mutex_slave_parallel_worker[i], &w->jobs_lock,
+                   MY_MUTEX_INIT_FAST);
+  mysql_cond_init(key_cond_slave_parallel_worker[i], &w->jobs_cond, NULL);
+
+  // CGEP dynarray holds id:s of partitions of the Current being executed Group
+  my_init_dynamic_array(&w->curr_group_exec_parts, NAME_LEN,
+                        SLAVE_INIT_DBS_IN_GROUP, 1);
 
+  if (pthread_create(&th, &connection_attrib, handle_slave_worker,
+                     (void*) &worker_args))
+  {
+    sql_print_error("Failed during slave worker thread create");
+    error= 1;
+    goto err;
+  }
 
-/* 
-   Worker threads start one-by-one with synch through rli->pending_jobs
-   returns 0 success or 1 if fails 
+err:
+  return error;
+}
+
+/**
+   The @c init_slave_workers number of Worker threads start one-by-one with synch
+   through rli->pending_jobs.
+   Also objects are initialized that Coordinator and Workers will maintain during their
+   session life time.
+
+   @return 0 success or 1 if fails 
 */
 int slave_start_workers(Relay_log_info *rli, ulong n)
 {
   uint i;
   int error= 0;
+
+  // TODO: CGEP dynarray holds id:s of partitions of the Current being executed Group
+
+  // GAQ  queue holds seqno:s of scheduled groups. C polls workers in 
+  //      @c lwm_checkpoint_period to update GAQ (see @c @next_event())
+  // The length of GAQ is derived from @c slave_max_pending_jobs to guarantee
+  // each assigned job being sent to a WQ will be represented by an item in GAQ.
+  // ::slave_max_pending_jobs is the worst case when all jobs contain
+  // one event and map to one worker.
+  rli->gaq= new Slave_committed_queue(rli->get_group_master_log_name(),
+                                      sizeof(Slave_assigned_job_group),
+                                      ::slave_max_pending_jobs);
   for (i= 0; i < n; i++)
   {
     uint k;
@@ -3568,9 +3663,9 @@ int slave_start_workers(Relay_log_info *
     w->id= i;
     w->current_table= NULL;
     w->usage_partition= 0;
+    w->last_group_done_index= rli->gaq->s; // out of range
 
     // Queue initialization
-    rli->slave_pending_jobs_max= ::slave_max_pending_jobs; // may change while offline
     w->jobs.s= rli->slave_pending_jobs_max + 1;
     my_init_dynamic_array(&w->jobs.Q, sizeof(Slave_job_item), w->jobs.s, 0); // todo: implement increment e.g  n * 10;
     for (k= 0; k < w->jobs.s; k++)
@@ -3610,11 +3705,12 @@ int slave_start_workers(Relay_log_info *
     mysql_mutex_unlock(&rli->pending_jobs_lock);
   }
 
-  /*
-    Error checking is done internally when creating, destroying and using the
-    associated hash functions.
-  */
-  init_hash_workers(n);
+  if (init_hash_workers(n))  // MTS TODO: mapping_db_to_worker -> APH
+  {
+    sql_print_error("Failed to init partitions hash");
+    error= 1;
+    goto err;
+  }
 
 err:
   return error;
@@ -3627,6 +3723,7 @@ void slave_stop_workers(Relay_log_info *
 {
   int i;
 
+  
   mysql_mutex_lock(&rli->pending_jobs_lock);
   rli->pending_jobs += rli->workers.elements;
 
@@ -3661,6 +3758,7 @@ void slave_stop_workers(Relay_log_info *
     delete w;
   }
   destroy_hash_workers();
+  delete rli->gaq;
 }
 
 /**
@@ -5020,7 +5118,7 @@ static Log_event* next_event(Relay_log_i
         read it while we have a lock, to avoid a mutex lock in
         inc_event_relay_log_pos()
       */
-      rli->set_future_event_relay_log_pos(my_b_tell(cur_log));
+      rli->set_future_event_relay_log_pos(my_b_tell(cur_log)); // TODO_MTS: ev->f_rlp
       if (hot_log)
         mysql_mutex_unlock(log_lock);
       DBUG_RETURN(ev);
@@ -5134,6 +5232,21 @@ static Log_event* next_event(Relay_log_i
         mysql_mutex_unlock(&rli->log_space_lock);
         mysql_cond_broadcast(&rli->log_space_cond);
         // Note that wait_for_update_relay_log unlocks lock_log !
+
+        // TODO: MTS LWM
+        // show-slave-status and 
+        // if (rli->is_parallel_exec()
+        // {
+        //   do
+        //   {
+        //     if (curr_clock - last_clock > lwm_period)
+        //       last_clock= curr_clock
+        //       \foreach W poll i:= W->last_group_done_index
+        //       update rli->gaq[i > lwm] and RLI.exec_master_log_pos
+        //     ret= mysql_bin_log.wait_for_update_relay_log(thd, lwm_period);
+        //    } while ((ret == ETIMEDOUT || ret == ETIME) && && !thd->killed);
+        // }
+        
         rli->relay_log.wait_for_update_relay_log(rli->info_thd);
         // re-acquire data lock since we released it earlier
         mysql_mutex_lock(&rli->data_lock);

=== modified file 'sql/sys_vars.cc'
--- a/sql/sys_vars.cc	2010-11-09 13:04:14 +0000
+++ b/sql/sys_vars.cc	2010-11-18 14:00:52 +0000
@@ -3111,8 +3111,8 @@ static Sys_var_ulong Sys_slave_parallel_
        VALID_RANGE(0, ULONG_MAX), DEFAULT(4), BLOCK_SIZE(1));
 static Sys_var_ulong Sys_slave_max_pending_jobs(
        "slave_max_pending_jobs",
-       "Number of replication events read out of Relay log and still not applied "
-       "at one time. The coordinator thread suspends further jobs assigning until "
+       "Number of replication events read out of Relay log and still not applied. "
+       "The coordinator thread suspends further jobs assigning until "
        "conditions have been improved ",
        GLOBAL_VAR(slave_max_pending_jobs), CMD_LINE(REQUIRED_ARG),
        VALID_RANGE(0, ULONG_MAX), DEFAULT(40000), BLOCK_SIZE(1));


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20101118140052-mqha481cp01dqa6i.bundle
Thread
bzr push into mysql-next-mr.crash-safe branch (andrei.elkin:3210 to 3211)WL#5569Andrei Elkin19 Nov