List:Commits« Previous MessageNext Message »
From:Alfranio Correia Date:November 10 2010 1:24am
Subject:bzr commit into mysql-next-mr.crash-safe branch (alfranio.correia:3207)
WL#5599
View as plain text  
#At file:///home/acorreia/workspace.sun/repository.mysql.new/bzrwork/wl-5569/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped

 3207 Alfranio Correia	2010-11-10
      Refactory to start working on WL#5599.

    modified:
      scripts/mysql_system_tables.sql
      sql/log_event.cc
      sql/log_event.h
      sql/rpl_info.cc
      sql/rpl_info.h
      sql/rpl_info_factory.cc
      sql/rpl_info_factory.h
      sql/rpl_mi.cc
      sql/rpl_mi.h
      sql/rpl_rli.cc
      sql/rpl_rli.h
      sql/rpl_rli_pdb.cc
      sql/rpl_rli_pdb.h
      sql/rpl_slave.cc
      sql/table.cc
=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql	2010-11-02 18:28:20 +0000
+++ b/scripts/mysql_system_tables.sql	2010-11-10 01:24:48 +0000
@@ -104,6 +104,8 @@ CREATE TABLE IF NOT EXISTS slave_relay_l
 
 CREATE TABLE IF NOT EXISTS slave_master_info (Master_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Host TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_name TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_password TEXT CHARACTER SET utf8 COLLATE utf8_bin, Port INTEGER UNSIGNED NOT NULL, Connect_retry INTEGER UNSIGNED NOT NULL, Enabled_ssl BOOLEAN NOT NULL, Ssl_ca TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_capath TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cert TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cipher TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_key TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_verify_servert_cert BOOLEAN NOT NULL, Heartbeat FLOAT NOT NULL, Bind TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ignored_server_ids TEXT CHARACTER SET utf8 COLLATE utf8_bin, Uuid TEXT CHARACTER SET utf8 COLLATE utf8_bin, Retry_count BIGIN!
 T UNSIGNED NOT NULL, PRIMARY KEY(Master_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Master Information';
 
+CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Partition TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
+
 --
 -- PERFORMANCE SCHEMA INSTALLATION
 -- Note that this script is also reused by mysql_upgrade,

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2010-11-09 13:04:14 +0000
+++ b/sql/log_event.cc	2010-11-10 01:24:48 +0000
@@ -2291,7 +2291,7 @@ void append_item_to_jobs(slave_job_item 
   mysql_mutex_unlock(&rli->pending_jobs_lock);
 
   mysql_mutex_lock(&w->jobs_lock);
-  if (!w->thd->is_slave_error)  // error check exists at stmt commit as well
+  if (!w->info_thd->is_slave_error)  // error check exists at stmt commit as well
   {
     int ret;
 
@@ -2349,7 +2349,7 @@ int Log_event::apply_event(Relay_log_inf
 */
 struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item)
 {
-  THD *thd= w->thd;
+  THD *thd= w->info_thd;
   mysql_mutex_lock(&w->jobs_lock);
   while (!job_item->data && !thd->killed)
   {
@@ -2371,7 +2371,7 @@ struct slave_job_item* pop_jobs_item(Sla
     w->curr_jobs--;
   mysql_mutex_unlock(&w->jobs_lock);
 
-  thd_proc_info(w->thd, "Executing event");
+  thd_proc_info(w->info_thd, "Executing event");
   return job_item;
 }
 
@@ -2387,7 +2387,7 @@ int slave_worker_exec_job(Slave_worker *
 {
   int error= 0;
   struct slave_job_item item= {NULL}, *job_item= &item;
-  THD *thd= w->thd;
+  THD *thd= w->info_thd;
   Log_event *ev= NULL;
 
   DBUG_ENTER("slave_worker_exec_job");

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2010-11-09 13:04:14 +0000
+++ b/sql/log_event.h	2010-11-10 01:24:48 +0000
@@ -644,7 +644,7 @@ class THD;
 
 class Format_description_log_event;
 class Relay_log_info;
-struct Slave_worker;
+class Slave_worker;
 
 #ifdef MYSQL_CLIENT
 enum enum_base64_output_mode {

=== modified file 'sql/rpl_info.cc'
--- a/sql/rpl_info.cc	2010-07-27 19:26:54 +0000
+++ b/sql/rpl_info.cc	2010-11-10 01:24:48 +0000
@@ -17,21 +17,38 @@
 #include <sql_priv.h>
 #include "rpl_info.h"
 
-Rpl_info::Rpl_info(const char* type,
+Rpl_info::Rpl_info(const char* type)
+  : Slave_reporting_capability(type),
+  info_thd(0), inited(0), abort_slave(0),
+  slave_running(0), slave_run_id(0),
+  handler(0)
+{
+
+}
+
+Rpl_info::~Rpl_info()
+{
+  if (handler)
+    delete handler;
+}
+
+void Rpl_info::set_rpl_info_handler(Rpl_info_handler * param_handler)
+{
+  handler= param_handler;
+}
+
+Rpl_info_coordinator::Rpl_info_coordinator(const char* type,
                    PSI_mutex_key *param_key_info_run_lock,
                    PSI_mutex_key *param_key_info_data_lock,
                    PSI_mutex_key *param_key_info_data_cond,
                    PSI_mutex_key *param_key_info_start_cond,
                    PSI_mutex_key *param_key_info_stop_cond)
-  :Slave_reporting_capability(type),
+  : Rpl_info(type),
   key_info_run_lock(param_key_info_run_lock),
   key_info_data_lock(param_key_info_data_lock),
   key_info_data_cond(param_key_info_data_cond),
   key_info_start_cond(param_key_info_start_cond),
-  key_info_stop_cond(param_key_info_stop_cond),
-  info_thd(0), inited(0), abort_slave(0),
-  slave_running(0), slave_run_id(0),
-  handler(0)
+  key_info_stop_cond(param_key_info_stop_cond)
 {
   mysql_mutex_init(*key_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
   mysql_mutex_init(*key_info_data_lock,
@@ -41,9 +58,9 @@ Rpl_info::Rpl_info(const char* type,
   mysql_cond_init(*key_info_stop_cond, &stop_cond, NULL);
 }
 
-Rpl_info::~Rpl_info()
+Rpl_info_coordinator::~Rpl_info_coordinator()
 {
-  DBUG_ENTER("Rpl_info::~Rpl_info");
+  DBUG_ENTER("Rpl_info_coordinator::~Rpl_info_coordinator");
 
   mysql_mutex_destroy(&run_lock);
   mysql_mutex_destroy(&data_lock);
@@ -51,13 +68,15 @@ Rpl_info::~Rpl_info()
   mysql_cond_destroy(&start_cond);
   mysql_cond_destroy(&stop_cond);
 
-  if (handler)
-    delete handler;
-
   DBUG_VOID_RETURN;
 }
 
-void Rpl_info::set_rpl_info_handler(Rpl_info_handler * param_handler)
+Rpl_info_worker::Rpl_info_worker(const char* type)
+  : Rpl_info(type)
 {
-  handler= param_handler;
 }
+
+Rpl_info_worker::~Rpl_info_worker()
+{
+}
+

=== modified file 'sql/rpl_info.h'
--- a/sql/rpl_info.h	2010-10-25 10:39:01 +0000
+++ b/sql/rpl_info.h	2010-11-10 01:24:48 +0000
@@ -24,22 +24,6 @@
 class Rpl_info : public Slave_reporting_capability
 {
 public:
-  /*
-    standard lock acquisition order to avoid deadlocks:
-    run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
-  */
-  mysql_mutex_t data_lock,run_lock;
-  /*
-    start_cond is broadcast when SQL thread is started
-    stop_cond - when stopped
-    data_cond - when data protected by data_lock changes
-  */
-  mysql_cond_t data_cond,start_cond,stop_cond;
-
-  PSI_mutex_key *key_info_run_lock, *key_info_data_lock;
-
-  PSI_mutex_key *key_info_data_cond, *key_info_start_cond, *key_info_stop_cond;
-
   THD *info_thd;
 
   bool inited;
@@ -47,16 +31,7 @@ public:
   volatile uint slave_running;
   volatile ulong slave_run_id;
 
-#ifndef DBUG_OFF
-  int events_until_exit;
-#endif
-
-  Rpl_info(const char* type,
-           PSI_mutex_key *param_key_info_run_lock,
-           PSI_mutex_key *param_key_info_data_lock,
-           PSI_mutex_key *param_key_info_data_cond,
-           PSI_mutex_key *param_key_info_start_cond,
-           PSI_mutex_key *param_key_info_stop_cond);
+  Rpl_info(const char *type);
   virtual ~Rpl_info();
 
   int check_info()
@@ -104,4 +79,52 @@ private:
   Rpl_info& operator=(const Rpl_info& info);
   Rpl_info(const Rpl_info& info);
 };
+
+class Rpl_info_coordinator: public Rpl_info
+{
+public:
+  /*
+    standard lock acquisition order to avoid deadlocks:
+    run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
+  */
+  mysql_mutex_t data_lock,run_lock;
+  /*
+    start_cond is broadcast when SQL thread is started
+    stop_cond - when stopped
+    data_cond - when data protected by data_lock changes
+  */
+  mysql_cond_t data_cond, start_cond, stop_cond;
+
+  PSI_mutex_key *key_info_run_lock, *key_info_data_lock;
+
+  PSI_mutex_key *key_info_data_cond, *key_info_start_cond, *key_info_stop_cond;
+
+#ifndef DBUG_OFF
+  int events_until_exit;
+#endif
+
+  Rpl_info_coordinator(const char* type,
+           PSI_mutex_key *param_key_info_run_lock,
+           PSI_mutex_key *param_key_info_data_lock,
+           PSI_mutex_key *param_key_info_data_cond,
+           PSI_mutex_key *param_key_info_start_cond,
+           PSI_mutex_key *param_key_info_stop_cond);
+  virtual ~Rpl_info_coordinator();
+
+private:
+  Rpl_info_coordinator& operator=(const Rpl_info_coordinator& info);
+  Rpl_info_coordinator(const Rpl_info_coordinator& info);
+};
+
+class Rpl_info_worker: public Rpl_info
+{
+public:
+
+  Rpl_info_worker(const char* type);
+  virtual ~Rpl_info_worker();
+
+private:
+  Rpl_info_worker& operator=(const Rpl_info_worker& info);
+  Rpl_info_worker(const Rpl_info_worker& info);
+};
 #endif /* RPL_INFO_H */

=== modified file 'sql/rpl_info_factory.cc'
--- a/sql/rpl_info_factory.cc	2010-10-29 18:16:48 +0000
+++ b/sql/rpl_info_factory.cc	2010-11-10 01:24:48 +0000
@@ -39,10 +39,10 @@
   @retval FALSE No error
   @retval TRUE  Failure
 */ 
-bool Rpl_info_factory::create(uint mi_option, Master_info **mi,
-                              uint rli_option, Relay_log_info **rli)
+bool Rpl_info_factory::create_coordinators(uint mi_option, Master_info **mi,
+                                           uint rli_option, Relay_log_info **rli)
 {
-  DBUG_ENTER("Rpl_info_factory::Rpl_info_factory");
+  DBUG_ENTER("Rpl_info_factory::create_coordinators");
 
   if (!((*mi)= Rpl_info_factory::create_mi(mi_option)))
     DBUG_RETURN(TRUE);
@@ -85,7 +85,7 @@ Master_info *Rpl_info_factory::create_mi
   const char *msg= "Failed to allocate memory for the master info "
                    "structure";
 
-  DBUG_ENTER("Rpl_info_factory::Rpl_info_factory");
+  DBUG_ENTER("Rpl_info_factory::create_mi");
 
   if (!(mi= new Master_info(&key_master_info_run_lock,
                             &key_master_info_data_lock,
@@ -347,3 +347,69 @@ bool Rpl_info_factory::change_engine(Rpl
 
   DBUG_RETURN(FALSE);
 }
+
+Slave_worker *Rpl_info_factory::create_worker(uint rli_option, Relay_log_info *rli, uint worker_id)
+{
+  DBUG_ENTER("Rpl_info_factory::create_worker");
+
+  char info_fname[FN_REFLEN];
+  char info_name[FN_REFLEN];
+  Rpl_info_file*  w_file= NULL;
+  Rpl_info_table* w_table= NULL;
+  Slave_worker *worker= NULL;
+  
+  const char *msg= "Failed to allocate memory for the Slave worker info "
+                   "structure";
+  /*
+    Defining the name of the worker and its repository.
+  */
+  char *pos= strmov(info_fname, relay_log_info_file);
+  sprintf(pos, ".%u", worker_id);
+  pos= strmov(info_name, "worker");
+  sprintf(pos, ".%u", worker_id);
+
+  if (!(worker= new Slave_worker(info_name)))
+    goto err;
+
+  /*
+    Now we instantiate all info repos and later decide which one to take,
+    but not without first checking if there is already existing data for
+    a repo different from the one that is being requested.
+  */
+  if (!(w_file= new Rpl_info_file(worker->get_number_worker_fields(),
+                                  info_fname)))
+    goto err;
+
+  if (!(w_table= new Rpl_info_table(worker->get_number_worker_fields() + 1,
+                                    WORKER_FIELD_ID, WORKER_SCHEMA, WORKER_TABLE)))
+    goto err;
+
+  DBUG_ASSERT(rli_option == RLI_REPOSITORY_FILE ||
+              rli_option == RLI_REPOSITORY_TABLE);
+
+  if (decide_repository(worker, &w_table, &w_file,
+                        rli_option == MI_REPOSITORY_TABLE, &msg))
+    goto err;
+
+  if ((rli_option == RLI_REPOSITORY_TABLE) &&
+       change_engine(static_cast<Rpl_info_table *>(w_table),
+                     relay_log_info_engine, &msg))
+    goto err;
+
+  DBUG_RETURN(worker);
+
+err:
+  if (w_file) delete w_file;
+  if (w_table) delete w_table;
+  if (worker)
+  {
+    /*
+      The handler was previously deleted so we need to remove
+      any reference to it.  
+    */
+    worker->set_rpl_info_handler(NULL);
+    delete worker;
+  }
+  sql_print_error("%s", msg);
+  DBUG_RETURN(NULL);
+}

=== modified file 'sql/rpl_info_factory.h'
--- a/sql/rpl_info_factory.h	2010-10-25 10:39:01 +0000
+++ b/sql/rpl_info_factory.h	2010-11-10 01:24:48 +0000
@@ -19,6 +19,7 @@
 #include "rpl_info.h"
 #include "rpl_mi.h"
 #include "rpl_rli.h"
+#include "rpl_rli_pdb.h"
 #include "rpl_info_file.h"
 #include "rpl_info_table.h"
 #include "rpl_info_handler.h"
@@ -47,12 +48,19 @@ extern ulong opt_rli_repository_id;
 #define RLI_SCHEMA "mysql"
 #define RLI_TABLE  "slave_relay_log_info"
 
+#define WORKER_FIELD_ID 0
+
+#define WORKER_SCHEMA "mysql"
+#define WORKER_TABLE  "slave_worker_info"
+
 class Rpl_info_factory
 {
   public:
 
-  static bool create(uint mi_option, Master_info **mi,
-                     uint rli_option, Relay_log_info **rli);
+  static bool create_coordinators(uint mi_option, Master_info **mi,
+                                  uint rli_option, Relay_log_info **rli);
+  static Slave_worker *create_worker(uint rli_option, Relay_log_info *rli,
+                                     uint worker_id);
   static Master_info *create_mi(uint rli_option);
   static Relay_log_info *create_rli(uint rli_option, bool is_slave_recovery);
   static bool decide_repository(Rpl_info *info, Rpl_info_table **table,

=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc	2010-08-24 08:39:26 +0000
+++ b/sql/rpl_mi.cc	2010-11-10 01:24:48 +0000
@@ -81,7 +81,8 @@ Master_info::Master_info(PSI_mutex_key *
                          PSI_mutex_key *param_key_info_data_cond,
                          PSI_mutex_key *param_key_info_start_cond,
                          PSI_mutex_key *param_key_info_stop_cond)
-   :Rpl_info("I/O", param_key_info_run_lock, param_key_info_data_lock,
+   :Rpl_info_coordinator("I/O",
+             param_key_info_run_lock, param_key_info_data_lock,
              param_key_info_data_cond, param_key_info_start_cond,
              param_key_info_stop_cond),
    ssl(0), ssl_verify_server_cert(0),

=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h	2010-08-05 17:45:25 +0000
+++ b/sql/rpl_mi.h	2010-11-10 01:24:48 +0000
@@ -60,7 +60,7 @@ typedef struct st_mysql MYSQL;
 
 *****************************************************************************/
 
-class Master_info : public Rpl_info
+class Master_info : public Rpl_info_coordinator
 {
  public:
   Master_info(PSI_mutex_key *param_key_info_run_lock,

=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc	2010-11-09 13:04:14 +0000
+++ b/sql/rpl_rli.cc	2010-11-10 01:24:48 +0000
@@ -61,7 +61,8 @@ Relay_log_info::Relay_log_info(bool is_s
                                PSI_mutex_key *param_key_info_data_cond,
                                PSI_mutex_key *param_key_info_start_cond,
                                PSI_mutex_key *param_key_info_stop_cond)
-   :Rpl_info("SQL", param_key_info_run_lock, param_key_info_data_lock,
+   :Rpl_info_coordinator("SQL",
+             param_key_info_run_lock, param_key_info_data_lock,
              param_key_info_data_cond, param_key_info_start_cond,
              param_key_info_stop_cond),
    replicate_same_server_id(::replicate_same_server_id),
@@ -159,7 +160,7 @@ Slave_worker* Relay_log_info::get_curren
   {
     // convert to use hashing
     get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &w_i, i);
-    if (w_i->thd == current_thd)
+    if (w_i->info_thd == current_thd)
       return w_i;
   }
   DBUG_ASSERT(0);

=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h	2010-11-09 13:04:14 +0000
+++ b/sql/rpl_rli.h	2010-11-10 01:24:48 +0000
@@ -30,57 +30,6 @@ struct RPL_TABLE_LIST;
 class Master_info;
 extern uint sql_slave_skip_counter;
 
-
-#define SLAVE_WORKER_QUEUE_SIZE 8096
-
-typedef struct slave_job_item
-{
-  void *data;
-} Slave_job_item;
-
-typedef struct slave_jobs_queue
-{
-  DYNAMIC_ARRAY Q;
-  ulong s;
-  ulong a;
-  ulong e;
-  ulong len;
-} Slave_jobs_queue;
-
-typedef struct Slave_worker
-{
-  // operational
-  THD *thd;
-  mysql_mutex_t jobs_lock;
-  mysql_cond_t  jobs_cond;
-
-  //List<struct slave_job_item> jobs;
-  Slave_jobs_queue jobs;
-
-  List<Log_event> data_in_use; // events are still in use by SQL thread
-  ulong id;
-  TABLE *current_table;
-
-  // rbr
-  RPL_TABLE_LIST *tables_to_lock;           /* RBR: Tables to lock  */
-  uint tables_to_lock_count;        /* RBR: Count of tables to lock */
-  table_mapping m_table_map;      /* RBR: Mapping table-id to table */
-
-  // statictics
-  ulong wait_jobs;  // to gather statistics how many times got idle
-  ulong stmt_jobs;  // how many jobs per stmt
-  ulong trans_jobs;  // how many jobs per trns
-  volatile int curr_jobs; // the current assignments
-  ulong usage_partition; // number of different partitions handled by this worker
-} Slave_worker;
-
-
-extern PSI_mutex_key *key_mutex_slave_parallel_worker;
-extern PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
-
-extern PSI_cond_key *key_cond_slave_parallel_worker;
-extern PSI_cond_key key_cond_slave_parallel_pend_jobs;
-
 /*******************************************************************************
 Replication SQL Thread
 
@@ -153,7 +102,7 @@ transactional or non-transactional is us
 To correctly recovery from failures, one should combine transactional system
 tables along with the --relay-log-recovery.
 *******************************************************************************/
-class Relay_log_info : public Rpl_info
+class Relay_log_info : public Rpl_info_coordinator
 {
 public:
   /**

=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc	2010-09-16 19:29:26 +0000
+++ b/sql/rpl_rli_pdb.cc	2010-11-10 01:24:48 +0000
@@ -5,6 +5,164 @@
 #include "sql_string.h"
 #include <hash.h>
 
+enum {
+  LINES_IN_SLAVE_WORKER_INFO= 6
+};
+
+/*
+  Please every time you add a new field to the worker slave info, update
+  what follows. For now, this is just used to get the number of fields.
+*/
+const char *info_slave_worker_fields []=
+{
+  "number_of_lines",
+  "partition",
+  "relay_log_name",
+  "relay_log_pos",
+  "master_log_name",
+  "master_log_pos"
+};
+
+Slave_worker::Slave_worker(const char* type): Rpl_info_worker(type)
+{ 
+
+}
+
+Slave_worker::~Slave_worker() 
+{
+
+}
+
+int Slave_worker::init_info()
+{
+  DBUG_ENTER("Slave_worker::init_info");
+
+  if (inited)
+    DBUG_RETURN(0);
+
+  /*
+    The init_info() is used to either create or read information
+    from the repository, in order to initialize the Slave_worker.
+  */
+  int necessary_to_configure= check_info();
+
+  if (handler->init_info())
+    goto err;
+
+  if (!necessary_to_configure && read_info(handler))
+    goto err;
+
+  if (flush_info(TRUE))
+    goto err;
+
+  inited= 1;
+  DBUG_RETURN(0);
+
+err:
+  sql_print_error("Error reading slave worker configuration");
+  DBUG_RETURN(1);
+}
+
+void Slave_worker::end_info()
+{
+  DBUG_ENTER("Slave_worker::end_info");
+
+  if (!inited)
+    DBUG_VOID_RETURN;
+
+  handler->end_info();
+
+  inited = 0;
+
+  DBUG_VOID_RETURN;
+}
+
+int Slave_worker::flush_info(bool force)
+{
+  DBUG_ENTER("Slave_worker::flush_info");
+
+  /*
+    We update the sync_period at this point because only here we
+    now that we are handling a Slave_worker. This needs to be
+    update every time we call flush because the option may be
+    dinamically set.
+  */
+  handler->set_sync_period(sync_relayloginfo_period);
+
+  if (write_info(handler, force))
+    goto err;
+
+  DBUG_RETURN(0);
+
+err:
+  sql_print_error("Error writing slave worker configuration");
+  DBUG_RETURN(1);
+}
+
+bool Slave_worker::read_info(Rpl_info_handler *from)
+{
+  DBUG_ENTER("Slave_worker::read_info");
+
+  ulong nline= 0;
+  ulong temp_group_relay_log_pos= 0;
+  ulong temp_group_master_log_pos= 0;
+
+  if (from->prepare_info_for_read())
+    DBUG_RETURN(TRUE);
+
+  if (from->get_info((ulong *) &nline, (ulong) 0) ||
+      nline != LINES_IN_SLAVE_WORKER_INFO)
+    DBUG_RETURN(TRUE);
+
+  if (from->get_info(partition,
+                     sizeof(partition), "") ||
+      from->get_info(group_relay_log_name,
+                     sizeof(group_relay_log_name), "") ||
+      from->get_info((ulong *) &temp_group_relay_log_pos,
+                     (ulong) 0) ||
+      from->get_info(group_master_log_name,
+                     sizeof(group_master_log_name), "") ||
+      from->get_info((ulong *) &temp_group_master_log_pos,
+                     (ulong) 0))
+    DBUG_RETURN(TRUE);
+
+  group_relay_log_pos=  temp_group_relay_log_pos;
+  group_master_log_pos= temp_group_master_log_pos;
+
+  DBUG_RETURN(FALSE);
+}
+
+bool Slave_worker::write_info(Rpl_info_handler *to, bool force)
+{
+  DBUG_ENTER("Master_info::write_info");
+
+  /*
+     In certain cases this code may create master.info files that seems
+     corrupted, because of extra lines filled with garbage in the end
+     file (this happens if new contents take less space than previous
+     contents of file). But because of number of lines in the first line
+     of file we don't care about this garbage.
+  */
+
+  if (to->prepare_info_for_write() ||
+      to->set_info((int) LINES_IN_SLAVE_WORKER_INFO) ||
+      to->set_info(group_relay_log_name) ||
+      to->set_info((ulong)group_relay_log_pos) ||
+      to->set_info(group_master_log_name) ||
+      to->set_info((ulong)group_master_log_pos))
+    DBUG_RETURN(TRUE);
+
+  if (to->flush_info(force))
+    DBUG_RETURN(TRUE);
+
+  DBUG_RETURN(FALSE);
+}
+
+size_t Slave_worker::get_number_worker_fields()
+{
+  return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
+}
+
 static HASH mapping_db_to_worker;
 static bool inited_hash_workers= FALSE;
 

=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h	2010-09-16 19:29:26 +0000
+++ b/sql/rpl_rli_pdb.h	2010-11-10 01:24:48 +0000
@@ -1,17 +1,89 @@
 #ifndef RPL_RLI_PDB_H
-  #include "sql_string.h"
-  #include "rpl_rli.h"
-  #include <my_sys.h>
-
-  struct db_worker
-  {
-    const char *db;
-    Slave_worker *worker;
-  } typedef db_worker;
-
-  bool init_hash_workers(ulong slave_parallel_workers);
-  void destroy_hash_workers();
-  Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
-  Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
 
+#define RPL_RLI_PDB_H
+
+#include "sql_string.h"
+#include "rpl_rli.h"
+#include <my_sys.h>
+
+struct db_worker
+{
+  const char *db;
+  Slave_worker *worker;
+} typedef db_worker;
+
+bool init_hash_workers(ulong slave_parallel_workers);
+void destroy_hash_workers();
+Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
+Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
+
+#define SLAVE_WORKER_QUEUE_SIZE 8096
+
+typedef struct slave_job_item
+{
+  void *data;
+} Slave_job_item;
+
+typedef struct slave_jobs_queue
+{
+  DYNAMIC_ARRAY Q;
+  ulong s;
+  ulong a;
+  ulong e;
+  ulong len;
+} Slave_jobs_queue;
+
+class Slave_worker : public Rpl_info_worker
+{
+public:
+  Slave_worker(const char *type);
+  virtual ~Slave_worker();
+
+  mysql_mutex_t jobs_lock;
+  mysql_cond_t  jobs_cond;
+
+  //List<struct slave_job_item> jobs;
+  Slave_jobs_queue jobs;
+
+  List<Log_event> data_in_use; // events are still in use by SQL thread
+  ulong id;
+  TABLE *current_table;
+
+  // rbr
+  RPL_TABLE_LIST *tables_to_lock;           /* RBR: Tables to lock  */
+  uint tables_to_lock_count;        /* RBR: Count of tables to lock */
+  table_mapping m_table_map;      /* RBR: Mapping table-id to table */
+
+  // statictics
+  ulong wait_jobs;  // to gather statistics how many times got idle
+  ulong stmt_jobs;  // how many jobs per stmt
+  ulong trans_jobs;  // how many jobs per trns
+  volatile int curr_jobs; // the current assignments
+  ulong usage_partition; // number of different partitions handled by this worker
+
+  char *partition;
+  char group_relay_log_name[FN_REFLEN];
+  ulonglong group_relay_log_pos;
+  char group_master_log_name[FN_REFLEN];
+  ulonglong group_master_log_pos;
+
+  int init_info();
+  void end_info();
+  int flush_info(bool force= FALSE);
+
+  size_t get_number_worker_fields();
+
+private:
+  bool read_info(Rpl_info_handler *from);
+  bool write_info(Rpl_info_handler *to, bool force);
+
+  Slave_worker& operator=(const Slave_worker& info);
+  Slave_worker(const Slave_worker& info);
+};
+
+extern PSI_mutex_key *key_mutex_slave_parallel_worker;
+extern PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
+
+extern PSI_cond_key *key_cond_slave_parallel_worker;
+extern PSI_cond_key key_cond_slave_parallel_pend_jobs;
 #endif

=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc	2010-11-09 13:04:14 +0000
+++ b/sql/rpl_slave.cc	2010-11-10 01:24:48 +0000
@@ -277,8 +277,8 @@ int init_slave()
   if (pthread_key_create(&RPL_MASTER_INFO, NULL))
     DBUG_RETURN(1);
 
-  if ((error= Rpl_info_factory::create(opt_mi_repository_id, &active_mi,
-                                       opt_rli_repository_id, &rli)))
+  if ((error= Rpl_info_factory::create_coordinators(opt_mi_repository_id, &active_mi,
+                                                    opt_rli_repository_id, &rli)))
   {
     error= 1;
     goto err;
@@ -3467,7 +3467,7 @@ pthread_handler_t handle_slave_worker(vo
   mysql_mutex_lock(&rli->pending_jobs_lock);
   get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, rli->pending_jobs - 1);
   rli->pending_jobs= 0;
-  w->thd= thd;
+  w->info_thd= thd;
   w->tables_to_lock= NULL;
   w->tables_to_lock_count= 0;
   thd->thread_stack = (char*)&thd;
@@ -3526,7 +3526,8 @@ int slave_start_workers(Relay_log_info *
   for (i= 0; i < n; i++)
   {
     uint k;
-    Slave_worker *w= new Slave_worker;
+    Slave_worker *w=
+      Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i);
     Slave_job_item empty= {NULL};
     w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
     w->id= i;
@@ -3598,9 +3599,9 @@ void slave_stop_workers(Relay_log_info *
   {
     Slave_worker *w;
     get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
-    mysql_mutex_lock(&w->thd->LOCK_thd_data);
-    w->thd->awake(THD::KILL_QUERY);
-    mysql_mutex_unlock(&w->thd->LOCK_thd_data);
+    mysql_mutex_lock(&w->info_thd->LOCK_thd_data);
+    w->info_thd->awake(THD::KILL_QUERY);
+    mysql_mutex_unlock(&w->info_thd->LOCK_thd_data);
   }
   
   while (rli->pending_jobs > 0)

=== modified file 'sql/table.cc'
--- a/sql/table.cc	2010-10-25 10:39:01 +0000
+++ b/sql/table.cc	2010-11-10 01:24:48 +0000
@@ -57,6 +57,9 @@ LEX_STRING RLI_INFO_NAME= {C_STRING_WITH
 /* MI_INFO name */
 LEX_STRING MI_INFO_NAME= {C_STRING_WITH_LEN("slave_master_info")};
 
+/* WORKER_INFO name */
+LEX_STRING WORKER_INFO_NAME= {C_STRING_WITH_LEN("slave_worker_info")};
+
 	/* Functions defined in this file */
 
 void open_table_error(TABLE_SHARE *share, int error, int db_errno,
@@ -278,6 +281,12 @@ TABLE_CATEGORY get_table_category(const 
                       MI_INFO_NAME.str,
                       name->str) == 0))
       return TABLE_CATEGORY_RPL_INFO;
+
+    if ((name->length == WORKER_INFO_NAME.length) &&
+        (my_strcasecmp(system_charset_info,
+                      WORKER_INFO_NAME.str,
+                      name->str) == 0))
+      return TABLE_CATEGORY_RPL_INFO;
   }
 
   return TABLE_CATEGORY_USER;


Attachment: [text/bzr-bundle] bzr/alfranio.correia@oracle.com-20101110012448-84ifl7teq2e3iq57.bundle
Thread
bzr commit into mysql-next-mr.crash-safe branch (alfranio.correia:3207)WL#5599Alfranio Correia10 Nov