List:Commits« Previous MessageNext Message »
From:Andrei Elkin Date:May 16 2011 7:44pm
Subject:bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3277) WL#5569
View as plain text  
#At file:///home/andrei/MySQL/BZR/2a-23May/WL/mysql-next-mr-wl5569/ based on revid:andrei.elkin@stripped

 3277 Andrei Elkin	2011-05-16
      wl#5569 MTS
      
      Simplifying Coordinator-Worker interfaces. In essence after this patch Worker execute
        events in its private context (class Slave_worker :public Relay_log_info).
        The only exception is Query referring to temporary table. The temp:s are maintained 
        in the Coordinator's "central" rli;
      removing some dead code;
      performing a lot of cleanup.
      
      There are few todo items incl:
      
      1. To implement several todo:s scattered across MTS' code and tests (e.g to restore 
         protected for few members of RLI of rpl_rli.h);
      2. to cover Rows_query_log_event that currently can cause hanging (e.g rpl_parallel_fallback)
      3. To sort out names of classes based on Rpl_info, possibly remove  Rpl_info_worker
     @ mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
        The test as most of rpl_parallel* bunch can't yet stand `include/rpl_end.inc'.
     @ sql/log_event.cc
        Defining the default Log_event::do_apply_event_worker() that simply executes
          canonical do_apply_event() however supplying Slave_worker intance reference
          that is critical in order to execute different rli->methods(), e.g `report'.
        Xid_log_event::do_apply_event_worker() runs the Worker version of Xid commit;
        simplifying Rows event parallel applying to remove or elaborate some host of the early prototype code incl. rli->get_tables_to_lock() and related logics;
     @ sql/log_event.h
        Adding virtual int do_apply_event_worker() to Log_event and specializing
        it for Xid class;
     @ sql/rpl_reporting.cc
        Spliting report() into two methods in order to make possible
        to call the functional part of the two  with va_list as an arg 
        be called from Slave_worker class.
     @ sql/rpl_reporting.h
        New  va_list version of report method is declared.
     @ sql/rpl_rli.cc
        removing early prototype time support to Rows-event parallel execution.
        The new scheme of applying is almost equivalent to the standard sequential algorith
        thanks to Slave_worker :public Relay_log_info inheritence implementation.
     @ sql/rpl_rli.h
        Removing unnecessary interfaces;
        TODO: restore `protected' for few members.
     @ sql/rpl_rli_pdb.cc
        Some cleanup and 
        defining Slave_worker::report() to eventially call the Coordinator's rli->report() and exploit
        a fact that the latter was designed for concurrent use.
     @ sql/rpl_rli_pdb.h
        Changing base class for Slave_worker to make it behaving
        as Relay_log_info when needed;
        Removing some dead code;
        Adding report() methods to run it in do_apply_event().
     @ sql/rpl_slave.cc
        Removed UNTIL todo as it's actually not supported with a warning;
        Removed a todo for cleanup of error-out statement format transaction
          because  w->cleanup_context() impelements it indeed;
        Cleanup or transition from w->w_rli (of Relay_log_info) to w (of Slave_worker);
        Adding forgotten unlock_mutex;
        Simplifying definitions of few func:s (mts_is_worker() etc);

    modified:
      mysql-test/suite/rpl/t/rpl_parallel_start_stop.test
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_reporting.cc
      sql/rpl_reporting.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
=== modified file 'mysql-test/suite/rpl/t/rpl_parallel_start_stop.test'
--- a/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2011-02-27 17:35:25 +0000
+++ b/mysql-test/suite/rpl/t/rpl_parallel_start_stop.test	2011-05-16 19:43:58 +0000
@@ -273,5 +273,5 @@ set @@global.mts_slave_parallel_workers=
 set @@global.slave_transaction_retries= @save.slave_transaction_retries;
 --echo end of the tests
 
---source include/rpl_end.inc
-
+### TODO: restore --source include/rpl_end.inc
+--echo include/rpl_end.inc

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-04-06 12:51:58 +0000
+++ b/sql/log_event.cc	2011-05-16 19:43:58 +0000
@@ -795,6 +795,10 @@ Log_event::Log_event(const char* buf,
 
 #ifndef MYSQL_CLIENT
 #ifdef HAVE_REPLICATION
+inline int Log_event::do_apply_event_worker(Slave_worker *w)
+{ 
+  return do_apply_event(w);
+}
 
 int Log_event::do_update_pos(Relay_log_info *rli)
 {
@@ -3044,9 +3048,9 @@ int slave_worker_exec_job(Slave_worker *
       }
     }
   }
-  w->w_rli->set_future_event_relay_log_pos(ev->future_event_relay_log_pos);
-  error= ev->do_apply_event(rli);
 
+  error= ev->do_apply_event_worker(w);
+  
   if (ev->ends_group() || !w->curr_group_seen_begin)
   {
     DBUG_PRINT("slave_worker_exec_job:", (" commits GAQ index %lu, last committed  %lu", ev->mts_group_cnt, w->last_group_done_index));
@@ -3054,13 +3058,6 @@ int slave_worker_exec_job(Slave_worker *
     w->slave_worker_ends_group(ev, error); /* last done sets post exec */
   }
 
-    /*
-      commit_positions() fullfils group pos incr and flush
-      TODO: remove
-      if (!error)
-      ev->update_pos(w->w_rli);
-    */
-
   mysql_mutex_lock(&w->jobs_lock);
   de_queue(&w->jobs, job_item);
 
@@ -3119,7 +3116,7 @@ int slave_worker_exec_job(Slave_worker *
 
 err:
 
-  // todo: fix w/a for Rows_query_log_event
+  // TODO: fix w/a for Rows_query_log_event
   if (ev && ev->get_type_code() != ROWS_QUERY_LOG_EVENT)
     delete ev;  // after ev->update_pos() event is garbage
 
@@ -4266,8 +4263,12 @@ int Query_log_event::do_apply_event(Rela
   const_cast<Relay_log_info*>(rli)->set_future_group_master_log_pos(log_pos);
   DBUG_PRINT("info", ("log_pos: %lu", (ulong) log_pos));
 
+  /*
+    todo: such cleanup should not be specific to Query event and therefore
+          is preferable at a common with other event pre-execution point
+  */
   clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
-  if (strcmp("COMMIT", query) == 0 && *(rli->get_tables_to_lock()) != NULL)
+  if (strcmp("COMMIT", query) == 0 && rli->tables_to_lock != NULL)
   {
     /*
       Cleaning-up the last statement context:
@@ -4579,6 +4580,9 @@ Default database: '%s'. Query: '%s'",
       The sql thread receives the killed status and will proceed
       to shutdown trying to finish incomplete events group.
     */
+
+    // TODO: address the middle-group killing in MTS case
+
     DBUG_EXECUTE_IF("stop_slave_middle_group",
                     if (strcmp("COMMIT", query) != 0 &&
                         strcmp("BEGIN", query) != 0)
@@ -6774,6 +6778,28 @@ void Xid_log_event::print(FILE* file, PR
 
 
 #if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
+
+int Xid_log_event::do_apply_event_worker(Slave_worker *w)
+{
+  int error= 0;
+  bool is_trans_repo= w->is_transactional();
+
+  if (is_trans_repo)
+  {
+    ulong gaq_idx= mts_group_cnt;
+    Slave_job_group *ptr_g=
+      (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
+
+    if ((error= w->commit_positions(this, ptr_g)))
+      goto err;
+  }
+  error= trans_commit(thd); /* Automatically rolls back on error. */
+  thd->mdl_context.release_transactional_locks();
+
+err:
+  return error;
+}
+
 int Xid_log_event::do_apply_event(Relay_log_info const *rli)
 {
   int error= 0;
@@ -6785,15 +6811,13 @@ int Xid_log_event::do_apply_event(Relay_
     the context of the current transaction in order to provide
     data integrity. See sql/rpl_rli.h for further details.
   */
-  Slave_worker *w= rli_ptr->get_current_worker();
-  bool is_parallel= (w != NULL);
-  bool is_trans_repo= (is_parallel ? w->is_transactional() : rli_ptr->is_transactional());
+  bool is_trans_repo= rli_ptr->is_transactional();
 
   /* For a slave Xid_log_event is COMMIT */
   general_log_print(thd, COM_QUERY,
                     "COMMIT /* implicit, from Xid_log_event */");
 
-  if (is_trans_repo && !is_parallel)
+  if (is_trans_repo)
   {
     mysql_mutex_lock(&rli_ptr->data_lock);
   }
@@ -6811,7 +6835,7 @@ int Xid_log_event::do_apply_event(Relay_
   /*
     We need to update the positions in here to make it transactional.  
   */
-  if (is_trans_repo && !is_parallel)
+  if (is_trans_repo)
   {
     rli_ptr->inc_event_relay_log_pos();
     rli_ptr->set_group_relay_log_pos(rli_ptr->get_event_relay_log_pos());
@@ -6827,15 +6851,6 @@ int Xid_log_event::do_apply_event(Relay_
     if ((error= rli_ptr->flush_info(TRUE)))
       goto err;
   }
-  else if (is_trans_repo && is_parallel)
-  {
-    ulong gaq_idx= mts_group_cnt;
-    Slave_job_group *ptr_g=
-      (Slave_job_group *) dynamic_array_ptr(&w->c_rli->gaq->Q, gaq_idx);
-
-    if ((error= w->commit_positions(this, ptr_g)))
-      goto err;
-  }
 
   DBUG_PRINT("info", ("do_apply group master %s %lu  group relay %s %lu event %s %lu\n",
     rli_ptr->get_group_master_log_name(),
@@ -6852,7 +6867,7 @@ int Xid_log_event::do_apply_event(Relay_
   thd->mdl_context.release_transactional_locks();
 
 err:
-  if (is_trans_repo && !is_parallel)
+  if (is_trans_repo)
   {
     mysql_cond_broadcast(&rli_ptr->data_cond);
     mysql_mutex_unlock(&rli_ptr->data_lock);
@@ -8711,7 +8726,7 @@ int Rows_log_event::do_apply_event(Relay
     do_apply_event(). We still check here to prevent future coding
     errors.
   */
-  DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
+  DBUG_ASSERT(rli->info_thd == thd);
 
   /*
     If there is no locks taken, this is the first binrow event seen
@@ -8763,7 +8778,7 @@ int Rows_log_event::do_apply_event(Relay
     /* A small test to verify that objects have consistent types */
     DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
-    if (open_and_lock_tables(thd, *rli->get_tables_to_lock(), FALSE, 0))
+    if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
     {
       uint actual_error= thd->stmt_da->sql_errno();
       if (thd->is_slave_error || thd->is_fatal_error)
@@ -8794,8 +8809,8 @@ int Rows_log_event::do_apply_event(Relay
 
     {
       DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock: %p",
-                           *rli->get_tables_to_lock()));
-      RPL_TABLE_LIST *ptr= *rli->get_tables_to_lock();
+                           rli->tables_to_lock));
+      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
       for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
       {
         TABLE *conv_table;
@@ -8835,22 +8850,17 @@ int Rows_log_event::do_apply_event(Relay
       Rows_log_event, we can invalidate the query cache for the
       associated table.
      */
-    for (TABLE_LIST *ptr= *rli->get_tables_to_lock() ; ptr ; ptr= ptr->next_global)
+    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr; ptr= ptr->next_global)
     {
-      if (!rli->is_parallel_exec())
-        const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
-      else
-        rli->get_current_worker()->m_table_map.set_table(ptr->table_id, ptr->table);
+      const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id, ptr->table);
     }
 #ifdef HAVE_QUERY_CACHE
-    query_cache.invalidate_locked_for_write(*rli->get_tables_to_lock());
+    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
 #endif
   }
 
-  TABLE* 
-    table= m_table= (!rli->is_parallel_exec()) ?
-    const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id) :
-    rli->get_current_worker()->m_table_map.get_table(m_table_id);
+  TABLE* table= 
+    m_table= const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
   
   DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table, m_table_id));
 
@@ -9566,7 +9576,7 @@ int Table_map_log_event::do_apply_event(
   size_t dummy_len;
   void *memory;
   DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
-  DBUG_ASSERT(rli->info_thd == thd || rli->is_parallel_exec());
+  DBUG_ASSERT(rli->info_thd == thd);
 
   /* Step the query id to mark what columns that are actually used. */
   thd->set_query_id(next_query_id());
@@ -9631,11 +9641,9 @@ int Table_map_log_event::do_apply_event(
       We record in the slave's information that the table should be
       locked by linking the table into the list of tables to lock.
     */
-    table_list->next_global= table_list->next_local= *rli->get_tables_to_lock();
-
-    //const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
-    //const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
-    const_cast<Relay_log_info*>(rli)->add_table_to_lock(table_list);
+    table_list->next_global= table_list->next_local= rli->tables_to_lock;
+    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
+    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
 
     /* 'memory' is freed in clear_tables_to_lock */
   }

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-02-27 17:35:25 +0000
+++ b/sql/log_event.h	2011-05-16 19:43:58 +0000
@@ -1356,6 +1356,8 @@ public:
     return 0;                /* Default implementation does nothing */
   }
 
+  virtual int do_apply_event_worker(Slave_worker *w);
+
 protected:
 
   /**
@@ -2663,6 +2665,7 @@ class Xid_log_event: public Log_event
 private:
 #if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
   virtual int do_apply_event(Relay_log_info const *rli);
+  virtual int do_apply_event_worker(Slave_worker *rli);
   enum_skip_reason do_shall_skip(Relay_log_info *rli);
 #endif
 };

=== modified file 'sql/rpl_reporting.cc'
--- a/sql/rpl_reporting.cc	2010-08-05 17:45:25 +0000
+++ b/sql/rpl_reporting.cc	2011-05-16 19:43:58 +0000
@@ -29,12 +29,20 @@ void
 Slave_reporting_capability::report(loglevel level, int err_code,
                                    const char *msg, ...) const
 {
+  va_list args;
+  va_start(args, msg);
+  do_report(level,  err_code, msg, args);
+  va_end(args);
+}
+
+void
+Slave_reporting_capability::do_report(loglevel level, int err_code,
+                                   const char *msg, va_list args) const
+{
   void (*report_function)(const char *, ...);
   char buff[MAX_SLAVE_ERRMSG];
   char *pbuff= buff;
   uint pbuffsize= sizeof(buff);
-  va_list args;
-  va_start(args, msg);
 
   mysql_mutex_lock(&err_lock);
   switch (level)
@@ -64,7 +72,6 @@ Slave_reporting_capability::report(logle
   my_vsnprintf(pbuff, pbuffsize, msg, args);
 
   mysql_mutex_unlock(&err_lock);
-  va_end(args);
 
   /* If the msg string ends with '.', do not add a ',' it would be ugly */
   report_function("Slave %s: %s%s Error_code: %d",

=== modified file 'sql/rpl_reporting.h'
--- a/sql/rpl_reporting.h	2010-08-05 17:45:25 +0000
+++ b/sql/rpl_reporting.h	2011-05-16 19:43:58 +0000
@@ -52,8 +52,10 @@ public:
                         code, but can contain more information), in
                         printf() format.
   */
-  void report(loglevel level, int err_code, const char *msg, ...) const
+  virtual void report(loglevel level, int err_code, const char *msg, ...) const
     ATTRIBUTE_FORMAT(printf, 4, 5);
+  void do_report(loglevel level, int err_code,
+                 const char *msg, va_list v_args) const;
 
   /**
      Clear errors. They will not show up under <code>SHOW SLAVE

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2011-04-06 12:51:58 +0000
+++ b/sql/rpl_rli.cc	2011-05-16 19:43:58 +0000
@@ -146,8 +146,7 @@ void Relay_log_info::deinit_workers()
   mysql_cond_destroy(&pending_jobs_cond);
   mysql_mutex_destroy(&mts_temp_tables_lock);
 
-  if (!this_worker)
-    delete_dynamic(&workers);
+  delete_dynamic(&workers);
 }
 
 Relay_log_info::~Relay_log_info()
@@ -203,57 +202,6 @@ void Relay_log_info::reset_notified_chec
   checkpoint_seqno= 0;
 }
 
-/**
-   The method can be run both by C having the Main (coord) rli context and
-   by W having both the main and the Local (worker) rli context.
-   Decision matrix:
-
-       Main   Local
-     -+-------------
-     C| this_w  -
-     W| w_i    this_w
-
-*/
-Slave_worker* Relay_log_info::get_current_worker() const
-{ 
-  uint i;
-  if (!is_parallel_exec() || info_thd == current_thd)
-    return this_worker; //  can be asserted:  !this_worker => C
-  for (i= 0; i< workers.elements; i++)
-  {
-    Slave_worker* w_i= *(Slave_worker**)
-      dynamic_array_ptr(const_cast<DYNAMIC_ARRAY*>(&workers), i);
-    if (w_i->info_thd == current_thd)
-    {
-      return w_i;
-    }
-  }
-  DBUG_ASSERT(0);
-}
-
-/**
-   The method can be run both by C having the Main context.
-*/
-RPL_TABLE_LIST** Relay_log_info::get_tables_to_lock() const
-{
-  return
-    ((!is_parallel_exec()) || info_thd == current_thd) ?
-    const_cast<RPL_TABLE_LIST**>(&tables_to_lock) :
-    &get_current_worker()->tables_to_lock;
-}
-
-uint Relay_log_info::add_table_to_lock(RPL_TABLE_LIST *table_list)
-{
-  if (!is_parallel_exec())
-    tables_to_lock= table_list;
-  else
-    get_current_worker()->tables_to_lock= table_list;
-  return 
-    (!is_parallel_exec()) ? 
-    tables_to_lock_count++ : 
-    get_current_worker()->tables_to_lock_count++;
-}
-
 static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
 {
   MY_STAT s;
@@ -1070,22 +1018,18 @@ void Relay_log_info::stmt_done(my_off_t 
     inc_event_relay_log_pos(); // todo: ev-> future_event_relay_log_pos
   else
   {
-    Slave_worker *w;
     inc_group_relay_log_pos(event_master_log_pos);
     
-    /* Alfranio needs to update the coordinator and workers. */
+    DBUG_ASSERT(this_worker == NULL);
     
-    if ((w= get_current_worker()) == NULL)
-    {
-      flush_info(is_transactional() ? TRUE : FALSE);
+    flush_info(is_transactional() ? TRUE : FALSE);
 
-      /* 
-         The central recovery commit run in sequential mode forces
-         notification on the defacto new checkpoint.
-      */
-      if (is_parallel_exec())
-        reset_notified_checkpoint();
-    }
+    /* 
+       The central recovery commit run in sequential mode forces
+       notification on the defacto new checkpoint.
+    */
+    if (is_parallel_exec())
+      reset_notified_checkpoint();
   }
 }
 
@@ -1094,7 +1038,7 @@ void Relay_log_info::cleanup_context(THD
 {
   DBUG_ENTER("Relay_log_info::cleanup_context");
 
-  DBUG_ASSERT((info_thd == thd) || is_parallel_exec() || is_mts_recovery());
+  DBUG_ASSERT((info_thd == thd));
   /*
     1) Instances of Table_map_log_event, if ::do_apply_event() was called on them,
     may have opened tables, which we cannot be sure have been closed (because
@@ -1116,7 +1060,7 @@ void Relay_log_info::cleanup_context(THD
     MTS W/a for Rows_query_log_event.
     Cleanup of rows_query_ev at the end of the current statement.
 
-    todo: move handle_rows_query_log_event() cleanup logics into this method
+    TODO: move handle_rows_query_log_event() cleanup logics into this method
           inconditionally.
   */
   if (error || is_parallel_exec())
@@ -1127,10 +1071,7 @@ void Relay_log_info::cleanup_context(THD
       info_thd->set_query(NULL, 0);
     }
 
-  if (!is_parallel_exec() || thd == info_thd)
-    m_table_map.clear_tables();
-  else
-    get_current_worker()->m_table_map.clear_tables();
+  m_table_map.clear_tables();
 
   slave_close_thread_tables(thd);
   if (error)
@@ -1146,26 +1087,20 @@ void Relay_log_info::cleanup_context(THD
 
 void Relay_log_info::clear_tables_to_lock()
 {
-  RPL_TABLE_LIST **p_tables= get_tables_to_lock();
-  Slave_worker* w_c= get_current_worker();
-  while ((*p_tables))
+  while (tables_to_lock)
   {
-    uchar* to_free= reinterpret_cast<uchar*>((*p_tables));
-    if ((*p_tables)->m_tabledef_valid)
+    uchar* to_free= reinterpret_cast<uchar*>(tables_to_lock);
+    if (tables_to_lock->m_tabledef_valid)
     {
-      (*p_tables)->m_tabledef.table_def::~table_def();
-      (*p_tables)->m_tabledef_valid= FALSE;
+      tables_to_lock->m_tabledef.table_def::~table_def();
+      tables_to_lock->m_tabledef_valid= FALSE;
     }
-    (*p_tables)=
-      static_cast<RPL_TABLE_LIST*>((*p_tables)->next_global);
-    if (w_c)
-      w_c->tables_to_lock_count--;
-    else
-      tables_to_lock_count--;
+    tables_to_lock=
+      static_cast<RPL_TABLE_LIST*>(tables_to_lock->next_global);
+    tables_to_lock_count--;
     my_free(to_free);
   }
-  DBUG_ASSERT((*p_tables) == NULL &&
-              ((!w_c && tables_to_lock_count == 0) || w_c->tables_to_lock_count == 0));
+  DBUG_ASSERT(tables_to_lock == NULL && tables_to_lock_count == 0);
 }
 
 void Relay_log_info::slave_close_thread_tables(THD *thd)

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2011-04-06 12:51:58 +0000
+++ b/sql/rpl_rli.h	2011-05-16 19:43:58 +0000
@@ -206,7 +206,6 @@ public:
     happen when, for example, the relay log gets rotated because of
     max_binlog_size.
   */
-protected:
   char group_relay_log_name[FN_REFLEN];
   ulonglong group_relay_log_pos;
   char event_relay_log_name[FN_REFLEN];
@@ -228,6 +227,8 @@ protected:
   char group_master_log_name[FN_REFLEN];
   volatile my_off_t group_master_log_pos;
 
+// TODO: Restore!
+protected:
   /*
     When it commits, InnoDB internally stores the master log position it has
     processed so far; the position to store is the one of the end of the
@@ -284,9 +285,6 @@ public:
   */
   ulong slave_exec_mode;
 
-  RPL_TABLE_LIST** get_tables_to_lock() const;
-  uint add_table_to_lock(RPL_TABLE_LIST *table_list);
-
   /*
      Condition and its parameters from START SLAVE UNTIL clause.
      
@@ -394,7 +392,8 @@ public:
 
   bool get_table_data(TABLE *table_arg, table_def **tabledef_var, TABLE **conv_table_var) const
   {
-    TABLE_LIST *tables= *get_tables_to_lock();
+    TABLE_LIST *tables= tables_to_lock;
+
     DBUG_ASSERT(tabledef_var && conv_table_var);
     for (TABLE_LIST *ptr= tables ; ptr != NULL ; ptr= ptr->next_global)
       if (ptr->table == table_arg)
@@ -455,8 +454,6 @@ public:
   ulong mts_wqs_overfill_cnt;  // Coord waits if a W's queue is full
   long  mts_worker_underrun_level; // percent of WQ size at which Worker claims hungry
   ulong mts_coordinator_basic_nap; // C sleeps to avoid WQs overrun
-  Slave_worker* get_current_worker() const;
-  Slave_worker* set_this_worker(Slave_worker *w) { return this_worker= w; }
   Slave_worker* this_worker; // used by w_rli. The cental rli has it as NULL.
   ulonglong mts_total_groups; // total event groups distributed in current session
  

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-12-23 13:34:02 +0000
+++ b/sql/rpl_rli_pdb.cc	2011-05-16 19:43:58 +0000
@@ -29,13 +29,10 @@ const char *info_slave_worker_fields []=
 
 Slave_worker::Slave_worker(const char* type, const char* pfs,
                            Relay_log_info *rli)
-  : Rpl_info_worker(type, pfs), c_rli(rli), curr_group_exec_parts(0),
-  group_relay_log_pos(0), group_master_log_pos(0),
+  : Relay_log_info(FALSE), c_rli(rli), curr_group_exec_parts(0),
   checkpoint_relay_log_pos(0), checkpoint_master_log_pos(0), checkpoint_seqno(0),
   inited_group_execed(0)
 {
-  group_relay_log_name[0]= 0;
-  group_master_log_name[0]= 0;
   checkpoint_relay_log_name[0]= 0;
   checkpoint_master_log_name[0]= 0;
 }
@@ -777,14 +774,14 @@ bool circular_buffer_queue::gt(ulong i, 
    Progress in GAQ is assessed through comparision of GAQ index value 
    with Worker's @c last_group_done_index.
    Purging breaks at a first discovered gap, that is an item
-   that the assinged item->w_id'th Worker has not completed yet.
+   that the assinged item->w_id'th Worker has not yet completed.
 
    The caller is supposed to be the checkpoint handler.
 
    A copy of the last discarded item containing
    the refreshed value of the committed low-water-mark is stored
    into @c lwm container member for further caller's processing.
-   @last_done is updated with the latests total_seqno for each Worker
+   @c last_done is updated with the latest total_seqno for each Worker
    that was met during GAQ parse.
 
    @note dyn-allocated members of Slave_job_group such as
@@ -864,6 +861,19 @@ ulong Slave_committed_queue::move_queue_
   return cnt;
 }
 
+void Slave_worker::do_report(loglevel level, int err_code, const char *msg, va_list vargs) const
+{
+  c_rli->do_report(level, err_code, msg, vargs);
+}
+
+void Slave_worker::report(loglevel level, int err_code, const char *msg, ...) const
+{
+  va_list vargs;
+  va_start(vargs, msg);
+
+  do_report(level, err_code, msg, vargs);
+  va_end(vargs);
+}
 
 int wait_for_workers_to_finish(Relay_log_info const *rli, Slave_worker *ignore)
 {

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-12-21 19:31:29 +0000
+++ b/sql/rpl_rli_pdb.h	2011-05-16 19:43:58 +0000
@@ -202,7 +202,7 @@ public:
   ulonglong waited_overfill;
 };
 
-class Slave_worker : public Rpl_info_worker
+class Slave_worker : public Relay_log_info
 {
 public:
   Slave_worker(const char *type, const char *pfs,
@@ -214,8 +214,6 @@ public:
   Slave_jobs_queue jobs;
 
   Relay_log_info *c_rli;
-  // fixme: experimental
-  Relay_log_info *w_rli;
 
   Dynamic_ids *curr_group_exec_parts; // CGEP
   bool curr_group_seen_begin; // is set to TRUE with B-event at Worker exec
@@ -250,11 +248,6 @@ public:
     We need to make this a dynamic field. /Alfranio
   */
   char partitions[FN_REFLEN];
-  char group_relay_log_name[FN_REFLEN];
-  ulonglong group_relay_log_pos;
-  char group_master_log_name[FN_REFLEN];
-  ulonglong group_master_log_pos;
-
   // todo: remove
   char checkpoint_relay_log_name[FN_REFLEN];
   ulonglong checkpoint_relay_log_pos;
@@ -273,6 +266,10 @@ public:
 
   bool commit_positions(Log_event *evt, Slave_job_group *ptr_g);
 
+  void report(loglevel level, int err_code, const char *msg, ...) const
+    ATTRIBUTE_FORMAT(printf, 4, 5);
+  void do_report(loglevel level, int err_code, const char *msg, va_list vargs) const;
+
   MY_BITMAP group_execed;
   
   bool inited_group_execed;

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2011-04-06 12:51:58 +0000
+++ b/sql/rpl_slave.cc	2011-05-16 19:43:58 +0000
@@ -2998,8 +2998,6 @@ static int exec_relay_log_event(THD* thd
       MTS: since master,relay-group coordinates change per checkpoint
       at the end of the checkpoint interval UNTIL can be left far behind.
       Hence, UNTIL forces the sequential applying.
-
-      TODO: to not let to start with UNTIL whenever @@global.max_slave_workers>0.
     */
     if (rli->until_condition != Relay_log_info::UNTIL_NONE &&
         rli->is_until_satisfied(thd, ev))
@@ -3720,8 +3718,6 @@ pthread_handler_t handle_slave_worker(vo
     goto err;
   }
   w->info_thd= thd;
-  w->w_rli->info_thd= thd;
-
   thd->thread_stack = (char*)&thd;
   
   pthread_detach_this_thread();
@@ -3756,9 +3752,8 @@ pthread_handler_t handle_slave_worker(vo
     mysql_mutex_lock(&rli->info_thd->LOCK_thd_data);
     rli->info_thd->awake(THD::KILL_QUERY);          // notify Crdn
     mysql_mutex_unlock(&rli->info_thd->LOCK_thd_data);
-    // Todo: add necessary stuff to clean up after Q-log-event, a Q trans
     thd->clear_error();
-    rli->cleanup_context(thd, error);
+    w->cleanup_context(thd, error);
   }
 
   mysql_mutex_lock(&w->jobs_lock);
@@ -3871,26 +3866,6 @@ bool mts_recovery_groups(Relay_log_info 
       delete worker;
   };
 
-#if 0
-  for (uint id= 0; id < rli->slave_parallel_workers; id++)
-  {
-    Slave_worker *worker=
-      Rpl_info_factory::create_worker(opt_worker_repository_id, id, rli);
-    worker->init_info();
-    retrieve_job(worker, job_file);
-    /*
-      This avoids gathering information on workers that haven't
-      processed anything.
-    */
-
-    // TODO: disregard W_i | W_i->coord < LWM
-
-    if (job_file.group_relay_log_name != NULL && strcmp(job_file.group_relay_log_name, "") &&
-        job_file.group_master_log_name != NULL && strcmp(job_file.group_master_log_name, ""))
-      insert_dynamic(&above_lwm_jobs, (uchar*) &job_file);
-  }
-#endif
-
   sort_dynamic(&above_lwm_jobs, (qsort_cmp) mts_event_coord_cmp);
   /*
     In what follows, the group Recovery Bitmap is constructed.
@@ -3962,9 +3937,11 @@ bool mts_recovery_groups(Relay_log_info 
                                       ev->log_pos };
             if ((ret= mts_event_coord_cmp(&ev_coord, &w_last)) == 0)
             {
-              // hit it
-              // w.B << group_cnt++;
-              // RB |= w.B;
+              /* 
+                 hit it
+                 w.B << group_cnt++;
+                 RB |= w.B;
+              */
               for (uint i= w->checkpoint_seqno - rli->mts_recovery_group_cnt, j= 0;
                    i <= w->checkpoint_seqno; i++, j++)
               {
@@ -4178,36 +4155,41 @@ bool mts_checkpoint_routine(Relay_log_in
   if (!locked)
     mysql_mutex_lock(&rli->data_lock);
 
-  // Coordinator::commit_positions() {
-
-  // rli->gaq->lwm contains all but rli->group_master_log_name
+  /*
+    Coordinator::commit_positions() {
 
-  // group_master_log_name is updated only by Coordinator and it can't change
-  // within checkpoint interval because Coordinator flushes the updated value
-  // at once.
-  // Note, unlike group_master_log_name, event_relay_log_pos is updated solely 
-  // within Coordinator read loop context. Hence, it's possible at times 
-  // event_rlp > group_rlp.
+    rli->gaq->lwm contains all but rli->group_master_log_name
 
+    group_master_log_name is updated only by Coordinator and it can't change
+    within checkpoint interval because Coordinator flushes the updated value
+    at once.
+    Note, unlike group_master_log_name, event_relay_log_pos is updated solely 
+    within Coordinator read loop context. Hence, it's possible at times 
+    event_rlp > group_rlp.
+  */
   rli->set_group_master_log_pos(rli->gaq->lwm.group_master_log_pos);
   rli->set_group_relay_log_pos(rli->gaq->lwm.group_relay_log_pos);
 
   if (rli->gaq->lwm.group_relay_log_name[0] != 0)
     rli->set_group_relay_log_name(rli->gaq->lwm.group_relay_log_name);
 
-  //todo: uncomment notifies when UNTIL will be supported
+  /* 
+     todo: uncomment notifies when UNTIL will be supported
 
-  //rli->notify_group_master_log_name_update();
-  //rli->notify_group_relay_log_name_update();
+     rli->notify_group_master_log_name_update();
+     rli->notify_group_relay_log_name_update();
 
-  // todo: optimize with if (wait_flag) broadcast
-  //       waiter: set wait_flag; waits....; drops wait_flag;
+     Todo: optimize with if (wait_flag) broadcast
+         waiter: set wait_flag; waits....; drops wait_flag;
+  */
   mysql_cond_broadcast(&rli->data_cond);
   if (!locked)
     mysql_mutex_unlock(&rli->data_lock);
 
   error= rli->flush_info(TRUE);
-  // end of commit_positions
+  /*
+    } // end of commit_positions
+  */
 
   rli->reset_notified_checkpoint();
 
@@ -4229,7 +4211,6 @@ int slave_start_single_worker(Relay_log_
   pthread_t th;
   Slave_worker *w= NULL;
   Slave_job_item empty= {NULL};
-  Rpl_info_dummy *dummy_handler= NULL;
 
   if (!(w=
       Rpl_info_factory::create_worker(opt_worker_repository_id, i, rli)))
@@ -4243,11 +4224,6 @@ int slave_start_single_worker(Relay_log_
   w->tables_to_lock= NULL;
   w->tables_to_lock_count= 0;
 
-  // fixme: experimenting to make Workers to run ev->update_pos(w->w_rli)
-  // fixme: a real hack! part of Rpl_info_factory::create_rli(RLI_REPOSITORY_FILE, FALSE);
-  w->w_rli= new Relay_log_info(FALSE);
-  dummy_handler= new Rpl_info_dummy(rli->get_number_info_rli_fields());
-  w->w_rli->set_rpl_info_handler(dummy_handler);
   if (w->init_info())
   {
     sql_print_error("Failed during slave worker thread create");
@@ -4258,12 +4234,10 @@ int slave_start_single_worker(Relay_log_
   // TODO: remove after dynamic_ids will be sorted out (removed/refined) otherwise
   // entry->usage assert
   w->curr_group_exec_parts->dynamic_ids.elements= 0;
-
   w->relay_log_change_notified= FALSE; // the 1st group to contain relaylog name
   w->checkpoint_notified= FALSE;
-  w->w_rli->workers= rli->workers; // shallow copying is sufficient
-  w->w_rli->this_worker= w;
-
+  w->workers= rli->workers; // shallow copying is sufficient
+  w->this_worker= w;
   w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
   w->id= i;
   w->current_table= NULL;
@@ -4438,8 +4412,6 @@ void slave_stop_workers(Relay_log_info *
     DBUG_ASSERT(w->jobs.Q.elements == w->jobs.s);
     delete_dynamic(&w->jobs.Q);
     delete_dynamic_element(&rli->workers, i);
-    delete w->w_rli;
-
     delete w;
   }
 
@@ -4508,8 +4480,10 @@ pthread_handler_t handle_slave_sql(void 
 
   /* mts-II: starting the worker pool */
   if (slave_start_workers(rli, rli->opt_slave_parallel_workers) != 0)
-    goto err;
-  
+  {
+      mysql_mutex_unlock(&rli->run_lock);
+      goto err;
+  }
   if (init_slave_thread(thd, SLAVE_THD_SQL))
   {
     /*
@@ -7143,25 +7117,13 @@ mysql_mutex_t* mts_get_temp_table_mutex(
 */
 THD* mts_get_coordinator_thd()
 {
-  Slave_worker *w= NULL;
   return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
-    NULL : !(w= active_mi->rli->get_current_worker()) ?
-    NULL : w->c_rli->info_thd;
+    NULL : active_mi->rli->info_thd;
 }
 
 /**
-   @return a reference to THD of a Worker thread or NULL
-           in case of no replication is set up or it's in the sequential mode.
-*/
-THD* mts_get_worker_thd()
-{
-  Slave_worker *w= NULL;
-  return (!active_mi || !active_mi->rli || !active_mi->rli->is_parallel_exec()) ?
-    NULL : !(w= active_mi->rli->get_current_worker()) ?
-    NULL : w->w_rli->info_thd;
-}
+   TODO: exploint new slave_worker system thread type property
 
-/**
    @param  thd a reference to THD
 
    @return TRUE if thd belongs to a Worker thread and FALSE otherwise.
@@ -7169,9 +7131,7 @@ THD* mts_get_worker_thd()
 bool mts_is_worker(THD *thd)
 {
   return
-    thd->slave_thread &&
-    thd->system_thread == SYSTEM_THREAD_SLAVE_SQL &&
-    (mts_get_worker_thd() != NULL);
+    thd->slave_thread && active_mi->rli->info_thd != thd;
 }
 
 /* end of MTS temp table support section */


Attachment: [text/bzr-bundle] bzr/andrei.elkin@oracle.com-20110516194358-bebbn66gdh75adif.bundle
Thread
bzr commit into mysql-next-mr-wl5569 branch (andrei.elkin:3277) WL#5569Andrei Elkin16 May