#At file:///home/acorreia/workspace.sun/repository.mysql.new/bzrwork/wl-5569/mysql-next-mr-wl5569/ based on revid:alfranio.correia@stripped
3207 Alfranio Correia 2010-11-10
Refactory to start working on WL#5599.
modified:
scripts/mysql_system_tables.sql
sql/log_event.cc
sql/log_event.h
sql/rpl_info.cc
sql/rpl_info.h
sql/rpl_info_factory.cc
sql/rpl_info_factory.h
sql/rpl_mi.cc
sql/rpl_mi.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/rpl_rli_pdb.cc
sql/rpl_rli_pdb.h
sql/rpl_slave.cc
sql/table.cc
=== modified file 'scripts/mysql_system_tables.sql'
--- a/scripts/mysql_system_tables.sql 2010-11-02 18:28:20 +0000
+++ b/scripts/mysql_system_tables.sql 2010-11-10 01:24:48 +0000
@@ -104,6 +104,8 @@ CREATE TABLE IF NOT EXISTS slave_relay_l
CREATE TABLE IF NOT EXISTS slave_master_info (Master_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, Host TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_name TEXT CHARACTER SET utf8 COLLATE utf8_bin, User_password TEXT CHARACTER SET utf8 COLLATE utf8_bin, Port INTEGER UNSIGNED NOT NULL, Connect_retry INTEGER UNSIGNED NOT NULL, Enabled_ssl BOOLEAN NOT NULL, Ssl_ca TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_capath TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cert TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_cipher TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_key TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ssl_verify_servert_cert BOOLEAN NOT NULL, Heartbeat FLOAT NOT NULL, Bind TEXT CHARACTER SET utf8 COLLATE utf8_bin, Ignored_server_ids TEXT CHARACTER SET utf8 COLLATE utf8_bin, Uuid TEXT CHARACTER SET utf8 COLLATE utf8_bin, Retry_count BIGIN!
T UNSIGNED NOT NULL, PRIMARY KEY(Master_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Master Information';
+CREATE TABLE IF NOT EXISTS slave_worker_info (Master_id INTEGER UNSIGNED NOT NULL, Worker_id INTEGER UNSIGNED NOT NULL, Number_of_lines INTEGER UNSIGNED NOT NULL, Partition TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Relay_log_pos BIGINT UNSIGNED NOT NULL, Master_log_name TEXT CHARACTER SET utf8 COLLATE utf8_bin NOT NULL, Master_log_pos BIGINT UNSIGNED NOT NULL, PRIMARY KEY(Master_id, Worker_id)) ENGINE=MYISAM DEFAULT CHARSET=utf8 COMMENT 'Worker Information';
+
--
-- PERFORMANCE SCHEMA INSTALLATION
-- Note that this script is also reused by mysql_upgrade,
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2010-11-09 13:04:14 +0000
+++ b/sql/log_event.cc 2010-11-10 01:24:48 +0000
@@ -2291,7 +2291,7 @@ void append_item_to_jobs(slave_job_item
mysql_mutex_unlock(&rli->pending_jobs_lock);
mysql_mutex_lock(&w->jobs_lock);
- if (!w->thd->is_slave_error) // error check exists at stmt commit as well
+ if (!w->info_thd->is_slave_error) // error check exists at stmt commit as well
{
int ret;
@@ -2349,7 +2349,7 @@ int Log_event::apply_event(Relay_log_inf
*/
struct slave_job_item* pop_jobs_item(Slave_worker *w, Slave_job_item *job_item)
{
- THD *thd= w->thd;
+ THD *thd= w->info_thd;
mysql_mutex_lock(&w->jobs_lock);
while (!job_item->data && !thd->killed)
{
@@ -2371,7 +2371,7 @@ struct slave_job_item* pop_jobs_item(Sla
w->curr_jobs--;
mysql_mutex_unlock(&w->jobs_lock);
- thd_proc_info(w->thd, "Executing event");
+ thd_proc_info(w->info_thd, "Executing event");
return job_item;
}
@@ -2387,7 +2387,7 @@ int slave_worker_exec_job(Slave_worker *
{
int error= 0;
struct slave_job_item item= {NULL}, *job_item= &item;
- THD *thd= w->thd;
+ THD *thd= w->info_thd;
Log_event *ev= NULL;
DBUG_ENTER("slave_worker_exec_job");
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2010-11-09 13:04:14 +0000
+++ b/sql/log_event.h 2010-11-10 01:24:48 +0000
@@ -644,7 +644,7 @@ class THD;
class Format_description_log_event;
class Relay_log_info;
-struct Slave_worker;
+class Slave_worker;
#ifdef MYSQL_CLIENT
enum enum_base64_output_mode {
=== modified file 'sql/rpl_info.cc'
--- a/sql/rpl_info.cc 2010-07-27 19:26:54 +0000
+++ b/sql/rpl_info.cc 2010-11-10 01:24:48 +0000
@@ -17,21 +17,38 @@
#include <sql_priv.h>
#include "rpl_info.h"
-Rpl_info::Rpl_info(const char* type,
+Rpl_info::Rpl_info(const char* type)
+ : Slave_reporting_capability(type),
+ info_thd(0), inited(0), abort_slave(0),
+ slave_running(0), slave_run_id(0),
+ handler(0)
+{
+
+}
+
+Rpl_info::~Rpl_info()
+{
+ if (handler)
+ delete handler;
+}
+
+void Rpl_info::set_rpl_info_handler(Rpl_info_handler * param_handler)
+{
+ handler= param_handler;
+}
+
+Rpl_info_coordinator::Rpl_info_coordinator(const char* type,
PSI_mutex_key *param_key_info_run_lock,
PSI_mutex_key *param_key_info_data_lock,
PSI_mutex_key *param_key_info_data_cond,
PSI_mutex_key *param_key_info_start_cond,
PSI_mutex_key *param_key_info_stop_cond)
- :Slave_reporting_capability(type),
+ : Rpl_info(type),
key_info_run_lock(param_key_info_run_lock),
key_info_data_lock(param_key_info_data_lock),
key_info_data_cond(param_key_info_data_cond),
key_info_start_cond(param_key_info_start_cond),
- key_info_stop_cond(param_key_info_stop_cond),
- info_thd(0), inited(0), abort_slave(0),
- slave_running(0), slave_run_id(0),
- handler(0)
+ key_info_stop_cond(param_key_info_stop_cond)
{
mysql_mutex_init(*key_info_run_lock, &run_lock, MY_MUTEX_INIT_FAST);
mysql_mutex_init(*key_info_data_lock,
@@ -41,9 +58,9 @@ Rpl_info::Rpl_info(const char* type,
mysql_cond_init(*key_info_stop_cond, &stop_cond, NULL);
}
-Rpl_info::~Rpl_info()
+Rpl_info_coordinator::~Rpl_info_coordinator()
{
- DBUG_ENTER("Rpl_info::~Rpl_info");
+ DBUG_ENTER("Rpl_info_coordinator::~Rpl_info_coordinator");
mysql_mutex_destroy(&run_lock);
mysql_mutex_destroy(&data_lock);
@@ -51,13 +68,15 @@ Rpl_info::~Rpl_info()
mysql_cond_destroy(&start_cond);
mysql_cond_destroy(&stop_cond);
- if (handler)
- delete handler;
-
DBUG_VOID_RETURN;
}
-void Rpl_info::set_rpl_info_handler(Rpl_info_handler * param_handler)
+Rpl_info_worker::Rpl_info_worker(const char* type)
+ : Rpl_info(type)
{
- handler= param_handler;
}
+
+Rpl_info_worker::~Rpl_info_worker()
+{
+}
+
=== modified file 'sql/rpl_info.h'
--- a/sql/rpl_info.h 2010-10-25 10:39:01 +0000
+++ b/sql/rpl_info.h 2010-11-10 01:24:48 +0000
@@ -24,22 +24,6 @@
class Rpl_info : public Slave_reporting_capability
{
public:
- /*
- standard lock acquisition order to avoid deadlocks:
- run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
- */
- mysql_mutex_t data_lock,run_lock;
- /*
- start_cond is broadcast when SQL thread is started
- stop_cond - when stopped
- data_cond - when data protected by data_lock changes
- */
- mysql_cond_t data_cond,start_cond,stop_cond;
-
- PSI_mutex_key *key_info_run_lock, *key_info_data_lock;
-
- PSI_mutex_key *key_info_data_cond, *key_info_start_cond, *key_info_stop_cond;
-
THD *info_thd;
bool inited;
@@ -47,16 +31,7 @@ public:
volatile uint slave_running;
volatile ulong slave_run_id;
-#ifndef DBUG_OFF
- int events_until_exit;
-#endif
-
- Rpl_info(const char* type,
- PSI_mutex_key *param_key_info_run_lock,
- PSI_mutex_key *param_key_info_data_lock,
- PSI_mutex_key *param_key_info_data_cond,
- PSI_mutex_key *param_key_info_start_cond,
- PSI_mutex_key *param_key_info_stop_cond);
+ Rpl_info(const char *type);
virtual ~Rpl_info();
int check_info()
@@ -104,4 +79,52 @@ private:
Rpl_info& operator=(const Rpl_info& info);
Rpl_info(const Rpl_info& info);
};
+
+class Rpl_info_coordinator: public Rpl_info
+{
+public:
+ /*
+ standard lock acquisition order to avoid deadlocks:
+ run_lock, data_lock, relay_log.LOCK_log, relay_log.LOCK_index
+ */
+ mysql_mutex_t data_lock,run_lock;
+ /*
+ start_cond is broadcast when SQL thread is started
+ stop_cond - when stopped
+ data_cond - when data protected by data_lock changes
+ */
+ mysql_cond_t data_cond, start_cond, stop_cond;
+
+ PSI_mutex_key *key_info_run_lock, *key_info_data_lock;
+
+ PSI_mutex_key *key_info_data_cond, *key_info_start_cond, *key_info_stop_cond;
+
+#ifndef DBUG_OFF
+ int events_until_exit;
+#endif
+
+ Rpl_info_coordinator(const char* type,
+ PSI_mutex_key *param_key_info_run_lock,
+ PSI_mutex_key *param_key_info_data_lock,
+ PSI_mutex_key *param_key_info_data_cond,
+ PSI_mutex_key *param_key_info_start_cond,
+ PSI_mutex_key *param_key_info_stop_cond);
+ virtual ~Rpl_info_coordinator();
+
+private:
+ Rpl_info_coordinator& operator=(const Rpl_info_coordinator& info);
+ Rpl_info_coordinator(const Rpl_info_coordinator& info);
+};
+
+class Rpl_info_worker: public Rpl_info
+{
+public:
+
+ Rpl_info_worker(const char* type);
+ virtual ~Rpl_info_worker();
+
+private:
+ Rpl_info_worker& operator=(const Rpl_info_worker& info);
+ Rpl_info_worker(const Rpl_info_worker& info);
+};
#endif /* RPL_INFO_H */
=== modified file 'sql/rpl_info_factory.cc'
--- a/sql/rpl_info_factory.cc 2010-10-29 18:16:48 +0000
+++ b/sql/rpl_info_factory.cc 2010-11-10 01:24:48 +0000
@@ -39,10 +39,10 @@
@retval FALSE No error
@retval TRUE Failure
*/
-bool Rpl_info_factory::create(uint mi_option, Master_info **mi,
- uint rli_option, Relay_log_info **rli)
+bool Rpl_info_factory::create_coordinators(uint mi_option, Master_info **mi,
+ uint rli_option, Relay_log_info **rli)
{
- DBUG_ENTER("Rpl_info_factory::Rpl_info_factory");
+ DBUG_ENTER("Rpl_info_factory::create_coordinators");
if (!((*mi)= Rpl_info_factory::create_mi(mi_option)))
DBUG_RETURN(TRUE);
@@ -85,7 +85,7 @@ Master_info *Rpl_info_factory::create_mi
const char *msg= "Failed to allocate memory for the master info "
"structure";
- DBUG_ENTER("Rpl_info_factory::Rpl_info_factory");
+ DBUG_ENTER("Rpl_info_factory::create_mi");
if (!(mi= new Master_info(&key_master_info_run_lock,
&key_master_info_data_lock,
@@ -347,3 +347,69 @@ bool Rpl_info_factory::change_engine(Rpl
DBUG_RETURN(FALSE);
}
+
+Slave_worker *Rpl_info_factory::create_worker(uint rli_option, Relay_log_info *rli, uint worker_id)
+{
+ DBUG_ENTER("Rpl_info_factory::create_worker");
+
+ char info_fname[FN_REFLEN];
+ char info_name[FN_REFLEN];
+ Rpl_info_file* w_file= NULL;
+ Rpl_info_table* w_table= NULL;
+ Slave_worker *worker= NULL;
+
+ const char *msg= "Failed to allocate memory for the Slave worker info "
+ "structure";
+ /*
+ Defining the name of the worker and its repository.
+ */
+ char *pos= strmov(info_fname, relay_log_info_file);
+ sprintf(pos, ".%u", worker_id);
+ pos= strmov(info_name, "worker");
+ sprintf(pos, ".%u", worker_id);
+
+ if (!(worker= new Slave_worker(info_name)))
+ goto err;
+
+ /*
+ Now we instantiate all info repos and later decide which one to take,
+ but not without first checking if there is already existing data for
+ a repo different from the one that is being requested.
+ */
+ if (!(w_file= new Rpl_info_file(worker->get_number_worker_fields(),
+ info_fname)))
+ goto err;
+
+ if (!(w_table= new Rpl_info_table(worker->get_number_worker_fields() + 1,
+ WORKER_FIELD_ID, WORKER_SCHEMA, WORKER_TABLE)))
+ goto err;
+
+ DBUG_ASSERT(rli_option == RLI_REPOSITORY_FILE ||
+ rli_option == RLI_REPOSITORY_TABLE);
+
+ if (decide_repository(worker, &w_table, &w_file,
+ rli_option == MI_REPOSITORY_TABLE, &msg))
+ goto err;
+
+ if ((rli_option == RLI_REPOSITORY_TABLE) &&
+ change_engine(static_cast<Rpl_info_table *>(w_table),
+ relay_log_info_engine, &msg))
+ goto err;
+
+ DBUG_RETURN(worker);
+
+err:
+ if (w_file) delete w_file;
+ if (w_table) delete w_table;
+ if (worker)
+ {
+ /*
+ The handler was previously deleted so we need to remove
+ any reference to it.
+ */
+ worker->set_rpl_info_handler(NULL);
+ delete worker;
+ }
+ sql_print_error("%s", msg);
+ DBUG_RETURN(NULL);
+}
=== modified file 'sql/rpl_info_factory.h'
--- a/sql/rpl_info_factory.h 2010-10-25 10:39:01 +0000
+++ b/sql/rpl_info_factory.h 2010-11-10 01:24:48 +0000
@@ -19,6 +19,7 @@
#include "rpl_info.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
+#include "rpl_rli_pdb.h"
#include "rpl_info_file.h"
#include "rpl_info_table.h"
#include "rpl_info_handler.h"
@@ -47,12 +48,19 @@ extern ulong opt_rli_repository_id;
#define RLI_SCHEMA "mysql"
#define RLI_TABLE "slave_relay_log_info"
+#define WORKER_FIELD_ID 0
+
+#define WORKER_SCHEMA "mysql"
+#define WORKER_TABLE "slave_worker_info"
+
class Rpl_info_factory
{
public:
- static bool create(uint mi_option, Master_info **mi,
- uint rli_option, Relay_log_info **rli);
+ static bool create_coordinators(uint mi_option, Master_info **mi,
+ uint rli_option, Relay_log_info **rli);
+ static Slave_worker *create_worker(uint rli_option, Relay_log_info *rli,
+ uint worker_id);
static Master_info *create_mi(uint rli_option);
static Relay_log_info *create_rli(uint rli_option, bool is_slave_recovery);
static bool decide_repository(Rpl_info *info, Rpl_info_table **table,
=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc 2010-08-24 08:39:26 +0000
+++ b/sql/rpl_mi.cc 2010-11-10 01:24:48 +0000
@@ -81,7 +81,8 @@ Master_info::Master_info(PSI_mutex_key *
PSI_mutex_key *param_key_info_data_cond,
PSI_mutex_key *param_key_info_start_cond,
PSI_mutex_key *param_key_info_stop_cond)
- :Rpl_info("I/O", param_key_info_run_lock, param_key_info_data_lock,
+ :Rpl_info_coordinator("I/O",
+ param_key_info_run_lock, param_key_info_data_lock,
param_key_info_data_cond, param_key_info_start_cond,
param_key_info_stop_cond),
ssl(0), ssl_verify_server_cert(0),
=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h 2010-08-05 17:45:25 +0000
+++ b/sql/rpl_mi.h 2010-11-10 01:24:48 +0000
@@ -60,7 +60,7 @@ typedef struct st_mysql MYSQL;
*****************************************************************************/
-class Master_info : public Rpl_info
+class Master_info : public Rpl_info_coordinator
{
public:
Master_info(PSI_mutex_key *param_key_info_run_lock,
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2010-11-09 13:04:14 +0000
+++ b/sql/rpl_rli.cc 2010-11-10 01:24:48 +0000
@@ -61,7 +61,8 @@ Relay_log_info::Relay_log_info(bool is_s
PSI_mutex_key *param_key_info_data_cond,
PSI_mutex_key *param_key_info_start_cond,
PSI_mutex_key *param_key_info_stop_cond)
- :Rpl_info("SQL", param_key_info_run_lock, param_key_info_data_lock,
+ :Rpl_info_coordinator("SQL",
+ param_key_info_run_lock, param_key_info_data_lock,
param_key_info_data_cond, param_key_info_start_cond,
param_key_info_stop_cond),
replicate_same_server_id(::replicate_same_server_id),
@@ -159,7 +160,7 @@ Slave_worker* Relay_log_info::get_curren
{
// convert to use hashing
get_dynamic(const_cast<DYNAMIC_ARRAY*>(&workers), (uchar*) &w_i, i);
- if (w_i->thd == current_thd)
+ if (w_i->info_thd == current_thd)
return w_i;
}
DBUG_ASSERT(0);
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2010-11-09 13:04:14 +0000
+++ b/sql/rpl_rli.h 2010-11-10 01:24:48 +0000
@@ -30,57 +30,6 @@ struct RPL_TABLE_LIST;
class Master_info;
extern uint sql_slave_skip_counter;
-
-#define SLAVE_WORKER_QUEUE_SIZE 8096
-
-typedef struct slave_job_item
-{
- void *data;
-} Slave_job_item;
-
-typedef struct slave_jobs_queue
-{
- DYNAMIC_ARRAY Q;
- ulong s;
- ulong a;
- ulong e;
- ulong len;
-} Slave_jobs_queue;
-
-typedef struct Slave_worker
-{
- // operational
- THD *thd;
- mysql_mutex_t jobs_lock;
- mysql_cond_t jobs_cond;
-
- //List<struct slave_job_item> jobs;
- Slave_jobs_queue jobs;
-
- List<Log_event> data_in_use; // events are still in use by SQL thread
- ulong id;
- TABLE *current_table;
-
- // rbr
- RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
- uint tables_to_lock_count; /* RBR: Count of tables to lock */
- table_mapping m_table_map; /* RBR: Mapping table-id to table */
-
- // statictics
- ulong wait_jobs; // to gather statistics how many times got idle
- ulong stmt_jobs; // how many jobs per stmt
- ulong trans_jobs; // how many jobs per trns
- volatile int curr_jobs; // the current assignments
- ulong usage_partition; // number of different partitions handled by this worker
-} Slave_worker;
-
-
-extern PSI_mutex_key *key_mutex_slave_parallel_worker;
-extern PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
-
-extern PSI_cond_key *key_cond_slave_parallel_worker;
-extern PSI_cond_key key_cond_slave_parallel_pend_jobs;
-
/*******************************************************************************
Replication SQL Thread
@@ -153,7 +102,7 @@ transactional or non-transactional is us
To correctly recovery from failures, one should combine transactional system
tables along with the --relay-log-recovery.
*******************************************************************************/
-class Relay_log_info : public Rpl_info
+class Relay_log_info : public Rpl_info_coordinator
{
public:
/**
=== modified file 'sql/rpl_rli_pdb.cc'
--- a/sql/rpl_rli_pdb.cc 2010-09-16 19:29:26 +0000
+++ b/sql/rpl_rli_pdb.cc 2010-11-10 01:24:48 +0000
@@ -5,6 +5,164 @@
#include "sql_string.h"
#include <hash.h>
+enum {
+ LINES_IN_SLAVE_WORKER_INFO= 6
+};
+
+/*
+ Please every time you add a new field to the worker slave info, update
+ what follows. For now, this is just used to get the number of fields.
+*/
+const char *info_slave_worker_fields []=
+{
+ "number_of_lines",
+ "partition",
+ "relay_log_name",
+ "relay_log_pos",
+ "master_log_name",
+ "master_log_pos"
+};
+
+Slave_worker::Slave_worker(const char* type): Rpl_info_worker(type)
+{
+
+}
+
+Slave_worker::~Slave_worker()
+{
+
+}
+
+int Slave_worker::init_info()
+{
+ DBUG_ENTER("Slave_worker::init_info");
+
+ if (inited)
+ DBUG_RETURN(0);
+
+ /*
+ The init_info() is used to either create or read information
+ from the repository, in order to initialize the Slave_worker.
+ */
+ int necessary_to_configure= check_info();
+
+ if (handler->init_info())
+ goto err;
+
+ if (!necessary_to_configure && read_info(handler))
+ goto err;
+
+ if (flush_info(TRUE))
+ goto err;
+
+ inited= 1;
+ DBUG_RETURN(0);
+
+err:
+ sql_print_error("Error reading slave worker configuration");
+ DBUG_RETURN(1);
+}
+
+void Slave_worker::end_info()
+{
+ DBUG_ENTER("Slave_worker::end_info");
+
+ if (!inited)
+ DBUG_VOID_RETURN;
+
+ handler->end_info();
+
+ inited = 0;
+
+ DBUG_VOID_RETURN;
+}
+
+int Slave_worker::flush_info(bool force)
+{
+ DBUG_ENTER("Slave_worker::flush_info");
+
+ /*
+ We update the sync_period at this point because only here we
+ now that we are handling a Slave_worker. This needs to be
+ update every time we call flush because the option may be
+ dinamically set.
+ */
+ handler->set_sync_period(sync_relayloginfo_period);
+
+ if (write_info(handler, force))
+ goto err;
+
+ DBUG_RETURN(0);
+
+err:
+ sql_print_error("Error writing slave worker configuration");
+ DBUG_RETURN(1);
+}
+
+bool Slave_worker::read_info(Rpl_info_handler *from)
+{
+ DBUG_ENTER("Slave_worker::read_info");
+
+ ulong nline= 0;
+ ulong temp_group_relay_log_pos= 0;
+ ulong temp_group_master_log_pos= 0;
+
+ if (from->prepare_info_for_read())
+ DBUG_RETURN(TRUE);
+
+ if (from->get_info((ulong *) &nline, (ulong) 0) ||
+ nline != LINES_IN_SLAVE_WORKER_INFO)
+ DBUG_RETURN(TRUE);
+
+ if (from->get_info(partition,
+ sizeof(partition), "") ||
+ from->get_info(group_relay_log_name,
+ sizeof(group_relay_log_name), "") ||
+ from->get_info((ulong *) &temp_group_relay_log_pos,
+ (ulong) 0) ||
+ from->get_info(group_master_log_name,
+ sizeof(group_master_log_name), "") ||
+ from->get_info((ulong *) &temp_group_master_log_pos,
+ (ulong) 0))
+ DBUG_RETURN(TRUE);
+
+ group_relay_log_pos= temp_group_relay_log_pos;
+ group_master_log_pos= temp_group_master_log_pos;
+
+ DBUG_RETURN(FALSE);
+}
+
+bool Slave_worker::write_info(Rpl_info_handler *to, bool force)
+{
+ DBUG_ENTER("Master_info::write_info");
+
+ /*
+ In certain cases this code may create master.info files that seems
+ corrupted, because of extra lines filled with garbage in the end
+ file (this happens if new contents take less space than previous
+ contents of file). But because of number of lines in the first line
+ of file we don't care about this garbage.
+ */
+
+ if (to->prepare_info_for_write() ||
+ to->set_info((int) LINES_IN_SLAVE_WORKER_INFO) ||
+ to->set_info(group_relay_log_name) ||
+ to->set_info((ulong)group_relay_log_pos) ||
+ to->set_info(group_master_log_name) ||
+ to->set_info((ulong)group_master_log_pos))
+ DBUG_RETURN(TRUE);
+
+ if (to->flush_info(force))
+ DBUG_RETURN(TRUE);
+
+ DBUG_RETURN(FALSE);
+}
+
+size_t Slave_worker::get_number_worker_fields()
+{
+ return sizeof(info_slave_worker_fields)/sizeof(info_slave_worker_fields[0]);
+}
+
static HASH mapping_db_to_worker;
static bool inited_hash_workers= FALSE;
=== modified file 'sql/rpl_rli_pdb.h'
--- a/sql/rpl_rli_pdb.h 2010-09-16 19:29:26 +0000
+++ b/sql/rpl_rli_pdb.h 2010-11-10 01:24:48 +0000
@@ -1,17 +1,89 @@
#ifndef RPL_RLI_PDB_H
- #include "sql_string.h"
- #include "rpl_rli.h"
- #include <my_sys.h>
-
- struct db_worker
- {
- const char *db;
- Slave_worker *worker;
- } typedef db_worker;
-
- bool init_hash_workers(ulong slave_parallel_workers);
- void destroy_hash_workers();
- Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
- Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
+#define RPL_RLI_PDB_H
+
+#include "sql_string.h"
+#include "rpl_rli.h"
+#include <my_sys.h>
+
+struct db_worker
+{
+ const char *db;
+ Slave_worker *worker;
+} typedef db_worker;
+
+bool init_hash_workers(ulong slave_parallel_workers);
+void destroy_hash_workers();
+Slave_worker *get_slave_worker(const char *dbname, DYNAMIC_ARRAY workers);
+Slave_worker *get_free_worker(DYNAMIC_ARRAY workers);
+
+#define SLAVE_WORKER_QUEUE_SIZE 8096
+
+typedef struct slave_job_item
+{
+ void *data;
+} Slave_job_item;
+
+typedef struct slave_jobs_queue
+{
+ DYNAMIC_ARRAY Q;
+ ulong s;
+ ulong a;
+ ulong e;
+ ulong len;
+} Slave_jobs_queue;
+
+class Slave_worker : public Rpl_info_worker
+{
+public:
+ Slave_worker(const char *type);
+ virtual ~Slave_worker();
+
+ mysql_mutex_t jobs_lock;
+ mysql_cond_t jobs_cond;
+
+ //List<struct slave_job_item> jobs;
+ Slave_jobs_queue jobs;
+
+ List<Log_event> data_in_use; // events are still in use by SQL thread
+ ulong id;
+ TABLE *current_table;
+
+ // rbr
+ RPL_TABLE_LIST *tables_to_lock; /* RBR: Tables to lock */
+ uint tables_to_lock_count; /* RBR: Count of tables to lock */
+ table_mapping m_table_map; /* RBR: Mapping table-id to table */
+
+ // statictics
+ ulong wait_jobs; // to gather statistics how many times got idle
+ ulong stmt_jobs; // how many jobs per stmt
+ ulong trans_jobs; // how many jobs per trns
+ volatile int curr_jobs; // the current assignments
+ ulong usage_partition; // number of different partitions handled by this worker
+
+ char *partition;
+ char group_relay_log_name[FN_REFLEN];
+ ulonglong group_relay_log_pos;
+ char group_master_log_name[FN_REFLEN];
+ ulonglong group_master_log_pos;
+
+ int init_info();
+ void end_info();
+ int flush_info(bool force= FALSE);
+
+ size_t get_number_worker_fields();
+
+private:
+ bool read_info(Rpl_info_handler *from);
+ bool write_info(Rpl_info_handler *to, bool force);
+
+ Slave_worker& operator=(const Slave_worker& info);
+ Slave_worker(const Slave_worker& info);
+};
+
+extern PSI_mutex_key *key_mutex_slave_parallel_worker;
+extern PSI_mutex_key key_mutex_slave_parallel_pend_jobs;
+
+extern PSI_cond_key *key_cond_slave_parallel_worker;
+extern PSI_cond_key key_cond_slave_parallel_pend_jobs;
#endif
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-11-09 13:04:14 +0000
+++ b/sql/rpl_slave.cc 2010-11-10 01:24:48 +0000
@@ -277,8 +277,8 @@ int init_slave()
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
DBUG_RETURN(1);
- if ((error= Rpl_info_factory::create(opt_mi_repository_id, &active_mi,
- opt_rli_repository_id, &rli)))
+ if ((error= Rpl_info_factory::create_coordinators(opt_mi_repository_id, &active_mi,
+ opt_rli_repository_id, &rli)))
{
error= 1;
goto err;
@@ -3467,7 +3467,7 @@ pthread_handler_t handle_slave_worker(vo
mysql_mutex_lock(&rli->pending_jobs_lock);
get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, rli->pending_jobs - 1);
rli->pending_jobs= 0;
- w->thd= thd;
+ w->info_thd= thd;
w->tables_to_lock= NULL;
w->tables_to_lock_count= 0;
thd->thread_stack = (char*)&thd;
@@ -3526,7 +3526,8 @@ int slave_start_workers(Relay_log_info *
for (i= 0; i < n; i++)
{
uint k;
- Slave_worker *w= new Slave_worker;
+ Slave_worker *w=
+ Rpl_info_factory::create_worker(opt_rli_repository_id, rli, i);
Slave_job_item empty= {NULL};
w->wait_jobs= w->trans_jobs= w->stmt_jobs= w->curr_jobs= 0;
w->id= i;
@@ -3598,9 +3599,9 @@ void slave_stop_workers(Relay_log_info *
{
Slave_worker *w;
get_dynamic((DYNAMIC_ARRAY*)&rli->workers, (uchar*) &w, i);
- mysql_mutex_lock(&w->thd->LOCK_thd_data);
- w->thd->awake(THD::KILL_QUERY);
- mysql_mutex_unlock(&w->thd->LOCK_thd_data);
+ mysql_mutex_lock(&w->info_thd->LOCK_thd_data);
+ w->info_thd->awake(THD::KILL_QUERY);
+ mysql_mutex_unlock(&w->info_thd->LOCK_thd_data);
}
while (rli->pending_jobs > 0)
=== modified file 'sql/table.cc'
--- a/sql/table.cc 2010-10-25 10:39:01 +0000
+++ b/sql/table.cc 2010-11-10 01:24:48 +0000
@@ -57,6 +57,9 @@ LEX_STRING RLI_INFO_NAME= {C_STRING_WITH
/* MI_INFO name */
LEX_STRING MI_INFO_NAME= {C_STRING_WITH_LEN("slave_master_info")};
+/* WORKER_INFO name */
+LEX_STRING WORKER_INFO_NAME= {C_STRING_WITH_LEN("slave_worker_info")};
+
/* Functions defined in this file */
void open_table_error(TABLE_SHARE *share, int error, int db_errno,
@@ -278,6 +281,12 @@ TABLE_CATEGORY get_table_category(const
MI_INFO_NAME.str,
name->str) == 0))
return TABLE_CATEGORY_RPL_INFO;
+
+ if ((name->length == WORKER_INFO_NAME.length) &&
+ (my_strcasecmp(system_charset_info,
+ WORKER_INFO_NAME.str,
+ name->str) == 0))
+ return TABLE_CATEGORY_RPL_INFO;
}
return TABLE_CATEGORY_USER;
Attachment: [text/bzr-bundle] bzr/alfranio.correia@oracle.com-20101110012448-84ifl7teq2e3iq57.bundle
Thread |
---|
• bzr commit into mysql-next-mr.crash-safe branch (alfranio.correia:3207)WL#5599 | Alfranio Correia | 10 Nov |