#At file:///home/acorreia/workspace.sun/repository.mysql/bzrwork/wl-2775/mysql-6.0-rpl/ based on revid:luis.soares@stripped
2818 Alfranio Correia 2009-03-02
Refactored relay log information.
added:
sql/rpl_rli_file.cc
sql/rpl_rli_file.h
modified:
sql/Makefile.am
sql/ha_ndbcluster.cc
sql/item_func.cc
sql/log.cc
sql/log_event.cc
sql/mysql_priv.h
sql/mysqld.cc
sql/rpl_mi.cc
sql/rpl_mi.h
sql/rpl_rli.cc
sql/rpl_rli.h
sql/set_var.cc
sql/slave.cc
sql/slave.h
sql/sql_insert.cc
sql/sql_repl.cc
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2009-02-02 12:28:30 +0000
+++ b/sql/Makefile.am 2009-03-02 21:53:42 +0000
@@ -83,7 +83,7 @@ noinst_HEADERS = item.h item_func.h item
ha_partition.h rpl_constants.h \
opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
rpl_reporting.h \
- log.h sql_show.h rpl_rli.h rpl_mi.h \
+ log.h sql_show.h rpl_rli.h rpl_rli_file.h rpl_mi.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h \
lex.h lex_symbol.h sql_acl.h sql_crypt.h \
sql_repl.h slave.h rpl_filter.h rpl_injector.h \
@@ -134,7 +134,7 @@ mysqld_SOURCES = sql_lex.cc sql_handler.
sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \
sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \
slave.cc sql_repl.cc rpl_filter.cc rpl_tblmap.cc \
- rpl_utility.cc rpl_injector.cc rpl_rli.cc rpl_mi.cc \
+ rpl_utility.cc rpl_injector.cc rpl_rli.cc rpl_rli_file.cc rpl_mi.cc \
rpl_reporting.cc \
sql_union.cc sql_derived.cc \
sql_client.cc \
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2009-02-09 13:34:12 +0000
+++ b/sql/ha_ndbcluster.cc 2009-03-02 21:53:42 +0000
@@ -4496,17 +4496,17 @@ static int ndbcluster_update_apply_statu
// log_name
char tmp_buf[FN_REFLEN];
ndb_pack_varchar(ndbtab->getColumn(2u), tmp_buf,
- active_mi->rli.group_master_log_name,
- strlen(active_mi->rli.group_master_log_name));
+ active_mi->rli->group_master_log_name,
+ strlen(active_mi->rli->group_master_log_name));
r|= op->setValue(2u, tmp_buf);
DBUG_ASSERT(r == 0);
// start_pos
- r|= op->setValue(3u, (Uint64)active_mi->rli.group_master_log_pos);
+ r|= op->setValue(3u, (Uint64)active_mi->rli->group_master_log_pos);
DBUG_ASSERT(r == 0);
// end_pos
- r|= op->setValue(4u, (Uint64)active_mi->rli.group_master_log_pos +
- ((Uint64)active_mi->rli.future_event_relay_log_pos -
- (Uint64)active_mi->rli.group_relay_log_pos));
+ r|= op->setValue(4u, (Uint64)active_mi->rli->group_master_log_pos +
+ ((Uint64)active_mi->rli->future_event_relay_log_pos -
+ (Uint64)active_mi->rli->group_relay_log_pos));
DBUG_ASSERT(r == 0);
return 0;
}
=== modified file 'sql/item_func.cc'
--- a/sql/item_func.cc 2009-02-05 12:49:39 +0000
+++ b/sql/item_func.cc 2009-03-02 21:53:42 +0000
@@ -3399,7 +3399,7 @@ longlong Item_master_pos_wait::val_int()
#ifdef HAVE_REPLICATION
longlong pos = (ulong)args[1]->val_int();
longlong timeout = (arg_count==3) ? args[2]->val_int() : 0 ;
- if ((event_count = active_mi->rli.wait_for_pos(thd, log_name, pos, timeout)) == -2)
+ if ((event_count = active_mi->rli->wait_for_pos(thd, log_name, pos, timeout)) == -2)
{
null_value = 1;
event_count=0;
=== modified file 'sql/log.cc'
--- a/sql/log.cc 2009-02-19 08:59:10 +0000
+++ b/sql/log.cc 2009-03-02 21:53:42 +0000
@@ -4794,7 +4794,7 @@ int MYSQL_BIN_LOG::purge_first_log(Relay
}
/* Store where we are in the new file for the execution thread */
- flush_relay_log_info(rli);
+ rli->flush_relay_log_info();
DBUG_EXECUTE_IF("crash_before_purge_logs", abort(););
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2009-02-02 15:58:48 +0000
+++ b/sql/log_event.cc 2009-03-02 21:53:42 +0000
@@ -4786,7 +4786,7 @@ int Rotate_log_event::do_update_pos(Rela
rli->group_master_log_name,
(ulong) rli->group_master_log_pos));
pthread_mutex_unlock(&rli->data_lock);
- flush_relay_log_info(rli);
+ rli->flush_relay_log_info();
/*
Reset thd->options and sql_mode etc, because this could be the signal of
@@ -5768,7 +5768,7 @@ int Stop_log_event::do_update_pos(Relay_
else
{
rli->inc_group_relay_log_pos(0);
- flush_relay_log_info(rli);
+ rli->flush_relay_log_info();
}
return 0;
}
=== modified file 'sql/mysql_priv.h'
--- a/sql/mysql_priv.h 2009-02-17 23:18:08 +0000
+++ b/sql/mysql_priv.h 2009-03-02 21:53:42 +0000
@@ -2033,6 +2033,7 @@ extern char *opt_backup_history_logname,
*opt_backup_settings_name;
extern const char *og_output_str;
extern const char *log_backup_output_str;
+extern my_bool transact_replication;
extern MYSQL_BIN_LOG mysql_bin_log;
extern LOGGER logger;
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2009-02-17 23:18:08 +0000
+++ b/sql/mysqld.cc 2009-03-02 21:53:42 +0000
@@ -738,6 +738,7 @@ char *opt_relay_logname = 0, *opt_relayl
char *opt_logname, *opt_slow_logname;
char *opt_backup_history_logname, *opt_backup_progress_logname,
*opt_backup_settings_name;
+my_bool transact_replication= 0;
/* Static variables */
@@ -5899,6 +5900,7 @@ enum options_mysqld
OPT_PLUGIN_DIR,
OPT_LOG_OUTPUT,
OPT_LOG_BACKUP_OUTPUT,
+ OPT_TRANSACT_REP,
OPT_PORT_OPEN_TIMEOUT,
OPT_PROFILING,
OPT_KEEP_FILES_ON_CREATE,
@@ -6203,6 +6205,12 @@ Disable with --skip-large-pages.",
(uchar**) &opt_backup_history_logname,
(uchar**) &opt_backup_history_logname, 0, GET_STR,
REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
+ {"transact_replication", OPT_TRANSACT_REP,
+ "Enables transactional replication which means that "
+ "relay log info and master info are stored into tables.",
+ (uchar**) &transact_replication,
+ (uchar**) &transact_replication, 0, GET_BOOL, NO_ARG,
+ 0, 0, 1, 0, 1, 0},
{"backup_progress_log_file", OPT_BACKUP_PROGRESS_LOG_FILE,
"Log backup progress to a given file.",
(uchar**) &opt_backup_progress_logname,
@@ -7387,7 +7395,7 @@ static int show_slave_running(THD *thd,
pthread_mutex_lock(&LOCK_active_mi);
var->value= buff;
*((my_bool *)buff)= (my_bool) (active_mi && active_mi->slave_running &&
- active_mi->rli.slave_running);
+ active_mi->rli->slave_running);
pthread_mutex_unlock(&LOCK_active_mi);
return 0;
}
@@ -7403,9 +7411,9 @@ static int show_slave_retried_trans(THD
{
var->type= SHOW_LONG;
var->value= buff;
- pthread_mutex_lock(&active_mi->rli.data_lock);
- *((long *)buff)= (long)active_mi->rli.retried_trans;
- pthread_mutex_unlock(&active_mi->rli.data_lock);
+ pthread_mutex_lock(&active_mi->rli->data_lock);
+ *((long *)buff)= (long)active_mi->rli->retried_trans;
+ pthread_mutex_unlock(&active_mi->rli->data_lock);
}
else
var->type= SHOW_UNDEF;
@@ -7420,9 +7428,9 @@ static int show_slave_received_heartbeat
{
var->type= SHOW_LONGLONG;
var->value= buff;
- pthread_mutex_lock(&active_mi->rli.data_lock);
+ pthread_mutex_lock(&active_mi->rli->data_lock);
*((longlong *)buff)= active_mi->received_heartbeats;
- pthread_mutex_unlock(&active_mi->rli.data_lock);
+ pthread_mutex_unlock(&active_mi->rli->data_lock);
}
else
var->type= SHOW_UNDEF;
=== modified file 'sql/rpl_mi.cc'
--- a/sql/rpl_mi.cc 2009-02-19 08:54:07 +0000
+++ b/sql/rpl_mi.cc 2009-03-02 21:53:42 +0000
@@ -30,15 +30,34 @@ int init_strvar_from_file(char *var, int
int init_floatvar_from_file(float* var, IO_CACHE* f, float default_val);
int init_dynarray_intvar_from_file(DYNAMIC_ARRAY* arr, IO_CACHE* f);
-Master_info::Master_info(bool is_slave_recovery)
+int init_master_file_info(Master_info *mi,
+ const char* master_info_fname,
+ bool abort_if_no_master_info_file,
+ int thread_mask);
+void close_master_file_info(Master_info *mi);
+int flush_master_file_info(Master_info *mi, bool flush_relay_log_cache);
+int reset_master_file_info(Master_info *mi);
+char static *master_file = 0;
+
+
+Master_info::Master_info(Relay_log_info *param_rli)
:Slave_reporting_capability("I/O"),
ssl(0), ssl_verify_server_cert(0), fd(-1), io_thd(0),
- rli(is_slave_recovery), port(MYSQL_PORT),
+ rli(param_rli), port(MYSQL_PORT),
connect_retry(DEFAULT_CONNECT_RETRY), heartbeat_period(0),
received_heartbeats(0), inited(0), master_id(0),
abort_slave(0), slave_running(0), slave_run_id(0),
sync_counter(0)
{
+ create_master_info= (transact_replication ? init_master_file_info :\
+ init_master_file_info),
+ save_master_info= (transact_replication ? flush_master_file_info :\
+ flush_master_file_info),
+ close_master_info= (transact_replication ? close_master_file_info :\
+ close_master_file_info);
+ erase_master_info= (transact_replication ? reset_master_file_info :\
+ reset_master_file_info);
+
host[0] = 0; user[0] = 0; password[0] = 0;
ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0;
ssl_cipher[0]= 0; ssl_key[0]= 0;
@@ -152,14 +171,14 @@ enum {
};
-int init_master_info(Master_info* mi, const char* master_info_fname,
- const char* slave_info_fname,
+int init_master_file_info(Master_info* mi,
+ const char* master_info_fname,
bool abort_if_no_master_info_file,
int thread_mask)
{
int fd,error;
char fname[FN_REFLEN+128];
- DBUG_ENTER("init_master_info");
+ DBUG_ENTER("init_master_file_info");
if (mi->inited)
{
@@ -177,7 +196,7 @@ int init_master_info(Master_info* mi, co
*/
if (thread_mask & SLAVE_SQL)
{
- my_b_seek(mi->rli.cur_log, (my_off_t) 0);
+ my_b_seek(mi->rli->cur_log, (my_off_t) 0);
}
DBUG_RETURN(0);
}
@@ -185,6 +204,7 @@ int init_master_info(Master_info* mi, co
mi->mysql=0;
mi->file_id=1;
fn_format(fname, master_info_fname, mysql_data_home, "", 4+32);
+ master_file = (char *) master_info_fname;
/*
We need a mutex while we are changing master info parameters to
@@ -375,14 +395,14 @@ file '%s')", fname);
mi->master_log_name,
(ulong) mi->master_log_pos));
- mi->rli.mi = mi;
- if (init_relay_log_info(&mi->rli, slave_info_fname))
+ mi->rli->mi = mi;
+ if (mi->rli->init_relay_log_info())
goto err;
mi->inited = 1;
// now change cache READ -> WRITE - must do this before flush_master_info
reinit_io_cache(&mi->file, WRITE_CACHE, 0L, 0, 1);
- if ((error=test(flush_master_info(mi, 1))))
+ if ((error=test(mi->flush_master_info(1))))
sql_print_error("Failed to flush master info file");
pthread_mutex_unlock(&mi->data_lock);
DBUG_RETURN(error);
@@ -408,13 +428,13 @@ err:
1 - flush master info failed
0 - all ok
*/
-int flush_master_info(Master_info* mi, bool flush_relay_log_cache)
+int flush_master_file_info(Master_info* mi, bool flush_relay_log_cache)
{
IO_CACHE* file = &mi->file;
char lbuf[22];
int err= 0;
- DBUG_ENTER("flush_master_info");
+ DBUG_ENTER("flush_master_file_info");
DBUG_PRINT("enter",("master_pos: %ld", (long) mi->master_log_pos));
/*
@@ -431,7 +451,7 @@ int flush_master_info(Master_info* mi, b
*/
if (flush_relay_log_cache)
{
- IO_CACHE *log_file= mi->rli.relay_log.get_log_file();
+ IO_CACHE *log_file= mi->rli->relay_log.get_log_file();
if (flush_io_cache(log_file))
DBUG_RETURN(2);
}
@@ -498,14 +518,13 @@ int flush_master_info(Master_info* mi, b
DBUG_RETURN(-err);
}
-
-void end_master_info(Master_info* mi)
+void close_master_file_info(Master_info* mi)
{
- DBUG_ENTER("end_master_info");
+ DBUG_ENTER("end_master_file_info");
if (!mi->inited)
DBUG_VOID_RETURN;
- end_relay_log_info(&mi->rli);
+ mi->rli->end_relay_log_info();
if (mi->fd >= 0)
{
end_io_cache(&mi->file);
@@ -517,5 +536,18 @@ void end_master_info(Master_info* mi)
DBUG_VOID_RETURN;
}
+int reset_master_file_info(Master_info* mi)
+{
+ MY_STAT stat_area;
+ char fname[FN_REFLEN];
+ int error= 0;
+
+ fn_format(fname, master_file, mysql_data_home, "", 4+32);
+ if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
+ error= 1;
+
+ return (error);
+}
+
#endif /* HAVE_REPLICATION */
=== modified file 'sql/rpl_mi.h'
--- a/sql/rpl_mi.h 2009-02-17 23:06:00 +0000
+++ b/sql/rpl_mi.h 2009-03-02 21:53:42 +0000
@@ -59,10 +59,17 @@
class Master_info : public Slave_reporting_capability
{
public:
- Master_info(bool is_slave_recovery);
+ Master_info(Relay_log_info *rli);
~Master_info();
bool shall_ignore_server_id(ulong s_id);
+ int (*create_master_info)(Master_info *mi, const char* master_info_fname,
+ bool abort_if_no_master_info_file,
+ int thread_mask);
+ void (*close_master_info)(Master_info *mi);
+ int (*save_master_info)(Master_info *mi, bool flush_relay_log_cache);
+ int (*erase_master_info)(Master_info *mi);
+
/* the variables below are needed because we can change masters on the fly */
char master_log_name[FN_REFLEN];
char host[HOSTNAME_LENGTH+1];
@@ -82,7 +89,7 @@ class Master_info : public Slave_reporti
THD *io_thd;
MYSQL* mysql;
uint32 file_id; /* for 3.23 load data infile */
- Relay_log_info rli;
+ Relay_log_info *rli;
uint port;
uint connect_retry;
float heartbeat_period; // interface with CHANGE MASTER or master.info
@@ -113,15 +120,34 @@ class Master_info : public Slave_reporti
* events should happen before fsyncing.
*/
uint sync_counter;
+
+ int init_master_info(const char* master_info_fname,
+ bool abort_if_no_master_info_file,
+ int thread_mask)
+ {
+ return (*create_master_info)(this, master_info_fname,
+ abort_if_no_master_info_file,
+ thread_mask);
+ }
+
+ void end_master_info()
+ {
+ (*close_master_info)(this);
+ }
+
+ int flush_master_info(bool flush_relay_log_cache)
+ {
+ return (*save_master_info)(this, flush_relay_log_cache);
+ }
+
+ int reset_master_info()
+ {
+ return (*erase_master_info)(this);
+ }
};
void init_master_log_pos(Master_info* mi);
-int init_master_info(Master_info* mi, const char* master_info_fname,
- const char* slave_info_fname,
- bool abort_if_no_master_info_file,
- int thread_mask);
-void end_master_info(Master_info* mi);
-int flush_master_info(Master_info* mi, bool flush_relay_log_cache);
+
int server_id_cmp(ulong *id1, ulong *id2);
#endif /* HAVE_REPLICATION */
=== modified file 'sql/rpl_rli.cc'
--- a/sql/rpl_rli.cc 2009-02-17 23:06:00 +0000
+++ b/sql/rpl_rli.cc 2009-03-02 21:53:42 +0000
@@ -22,19 +22,11 @@
#include "rpl_utility.h"
#include "transaction.h"
-static int count_relay_log_space(Relay_log_info* rli);
-
-// Defined in slave.cc
-int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
-int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
- const char *default_val);
-
-
Relay_log_info::Relay_log_info(bool is_slave_recovery)
:Slave_reporting_capability("SQL"),
no_storage(FALSE), replicate_same_server_id(::replicate_same_server_id),
- info_fd(-1), cur_log_fd(-1),
- relay_log(&sync_relaylog_period), sync_counter(0),
+ cur_log_fd(-1),
+ relay_log(&sync_relaylog_period),
is_relay_log_recovery(is_slave_recovery),
save_temporary_tables(0),
cur_log_old_open_count(0), group_relay_log_pos(0), event_relay_log_pos(0),
@@ -52,9 +44,8 @@ Relay_log_info::Relay_log_info(bool is_s
DBUG_ENTER("Relay_log_info::Relay_log_info");
group_relay_log_name[0]= event_relay_log_name[0]=
- group_master_log_name[0]= 0;
+ group_master_log_name[0]= 0;
until_log_name[0]= ign_master_log_name_end[0]= 0;
- bzero((char*) &info_file, sizeof(info_file));
bzero((char*) &cache_buf, sizeof(cache_buf));
cached_charset_invalidate();
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
@@ -65,10 +56,10 @@ Relay_log_info::Relay_log_info(bool is_s
pthread_cond_init(&stop_cond, NULL);
pthread_cond_init(&log_space_cond, NULL);
relay_log.init_pthread_objects();
+
DBUG_VOID_RETURN;
}
-
Relay_log_info::~Relay_log_info()
{
DBUG_ENTER("Relay_log_info::~Relay_log_info");
@@ -84,227 +75,6 @@ Relay_log_info::~Relay_log_info()
DBUG_VOID_RETURN;
}
-
-int init_relay_log_info(Relay_log_info* rli,
- const char* info_fname)
-{
- char fname[FN_REFLEN+128];
- int info_fd;
- const char* msg = 0;
- int error = 0;
- DBUG_ENTER("init_relay_log_info");
- DBUG_ASSERT(!rli->no_storage); // Don't init if there is no storage
-
- if (rli->inited) // Set if this function called
- DBUG_RETURN(0);
- fn_format(fname, info_fname, mysql_data_home, "", 4+32);
- pthread_mutex_lock(&rli->data_lock);
- info_fd = rli->info_fd;
- rli->cur_log_fd = -1;
- rli->slave_skip_counter=0;
- rli->abort_pos_wait=0;
- rli->log_space_limit= relay_log_space_limit;
- rli->log_space_total= 0;
- rli->tables_to_lock= 0;
- rli->tables_to_lock_count= 0;
-
- /*
- The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
- Note that the I/O thread flushes it to disk after writing every
- event, in flush_master_info(mi, 1).
- */
-
- /*
- For the maximum log size, we choose max_relay_log_size if it is
- non-zero, max_binlog_size otherwise. If later the user does SET
- GLOBAL on one of these variables, fix_max_binlog_size and
- fix_max_relay_log_size will reconsider the choice (for example
- if the user changes max_relay_log_size to zero, we have to
- switch to using max_binlog_size for the relay log) and update
- rli->relay_log.max_size (and mysql_bin_log.max_size).
- */
- {
- char buf[FN_REFLEN];
- const char *ln;
- static bool name_warning_sent= 0;
- ln= rli->relay_log.generate_name(opt_relay_logname, "-relay-bin",
- 1, buf);
- /* We send the warning only at startup, not after every RESET SLAVE */
- if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
- {
- /*
- User didn't give us info to name the relay log index file.
- Picking `hostname`-relay-bin.index like we do, causes replication to
- fail if this slave's hostname is changed later. So, we would like to
- instead require a name. But as we don't want to break many existing
- setups, we only give warning, not error.
- */
- sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
- " so replication "
- "may break when this MySQL server acts as a "
- "slave and has his hostname changed!! Please "
- "use '--relay-log=%s' to avoid this problem.", ln);
- name_warning_sent= 1;
- }
- /*
- note, that if open() fails, we'll still have index file open
- but a destructor will take care of that
- */
- if (rli->relay_log.open_index_file(opt_relaylog_index_name, ln) ||
- rli->relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
- (max_relay_log_size ? max_relay_log_size :
- max_binlog_size), 1))
- {
- pthread_mutex_unlock(&rli->data_lock);
- sql_print_error("Failed in open_log() called from init_relay_log_info()");
- DBUG_RETURN(1);
- }
- rli->relay_log.is_relay_log= TRUE;
- }
-
- /* if file does not exist */
- if (access(fname,F_OK))
- {
- /*
- If someone removed the file from underneath our feet, just close
- the old descriptor and re-create the old file
- */
- if (info_fd >= 0)
- my_close(info_fd, MYF(MY_WME));
- if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
- {
- sql_print_error("Failed to create a new relay log info file (\
-file '%s', errno %d)", fname, my_errno);
- msg= current_thd->stmt_da->message();
- goto err;
- }
- if (init_io_cache(&rli->info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
- MYF(MY_WME)))
- {
- sql_print_error("Failed to create a cache on relay log info file '%s'",
- fname);
- msg= current_thd->stmt_da->message();
- goto err;
- }
-
- /* Init relay log with first entry in the relay index file */
- if (init_relay_log_pos(rli,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
- &msg, 0))
- {
- sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
- goto err;
- }
- rli->group_master_log_name[0]= 0;
- rli->group_master_log_pos= 0;
- rli->info_fd= info_fd;
- }
- else // file exists
- {
- if (info_fd >= 0)
- reinit_io_cache(&rli->info_file, READ_CACHE, 0L,0,0);
- else
- {
- int error=0;
- if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
- {
- sql_print_error("\
-Failed to open the existing relay log info file '%s' (errno %d)",
- fname, my_errno);
- error= 1;
- }
- else if (init_io_cache(&rli->info_file, info_fd,
- IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
- {
- sql_print_error("Failed to create a cache on relay log info file '%s'",
- fname);
- error= 1;
- }
- if (error)
- {
- if (info_fd >= 0)
- my_close(info_fd, MYF(0));
- rli->info_fd= -1;
- rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
- pthread_mutex_unlock(&rli->data_lock);
- DBUG_RETURN(1);
- }
- }
-
- rli->info_fd = info_fd;
- int relay_log_pos, master_log_pos;
- if (init_strvar_from_file(rli->group_relay_log_name,
- sizeof(rli->group_relay_log_name),
- &rli->info_file, "") ||
- init_intvar_from_file(&relay_log_pos,
- &rli->info_file, BIN_LOG_HEADER_SIZE) ||
- init_strvar_from_file(rli->group_master_log_name,
- sizeof(rli->group_master_log_name),
- &rli->info_file, "") ||
- init_intvar_from_file(&master_log_pos, &rli->info_file, 0))
- {
- msg="Error reading slave log configuration";
- goto err;
- }
- strmake(rli->event_relay_log_name,rli->group_relay_log_name,
- sizeof(rli->event_relay_log_name)-1);
- rli->group_relay_log_pos= rli->event_relay_log_pos= relay_log_pos;
- rli->group_master_log_pos= master_log_pos;
-
- if (!rli->is_relay_log_recovery &&
- init_relay_log_pos(rli,
- rli->group_relay_log_name,
- rli->group_relay_log_pos,
- 0 /* no data lock*/,
- &msg, 0))
- {
- char llbuf[22];
- sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
- rli->group_relay_log_name,
- llstr(rli->group_relay_log_pos, llbuf));
- goto err;
- }
- }
-
-#ifndef DBUG_OFF
- if (!(rli->is_relay_log_recovery))
- {
- char llbuf1[22], llbuf2[22];
- DBUG_PRINT("info", ("my_b_tell(rli->cur_log)=%s rli->event_relay_log_pos=%s",
- llstr(my_b_tell(rli->cur_log),llbuf1),
- llstr(rli->event_relay_log_pos,llbuf2)));
- DBUG_ASSERT(rli->event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
- DBUG_ASSERT((my_b_tell(rli->cur_log) == rli->event_relay_log_pos));
- }
-#endif
-
- /*
- Now change the cache from READ to WRITE - must do this
- before flush_relay_log_info
- */
- reinit_io_cache(&rli->info_file, WRITE_CACHE,0L,0,1);
- if ((error= flush_relay_log_info(rli)))
- sql_print_error("Failed to flush relay log info file");
- if (count_relay_log_space(rli))
- {
- msg="Error counting relay log space";
- goto err;
- }
- rli->inited= 1;
- pthread_mutex_unlock(&rli->data_lock);
- DBUG_RETURN(error);
-
-err:
- sql_print_error(msg);
- end_io_cache(&rli->info_file);
- if (info_fd >= 0)
- my_close(info_fd, MYF(0));
- rli->info_fd= -1;
- rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
- pthread_mutex_unlock(&rli->data_lock);
- DBUG_RETURN(1);
-}
-
-
static inline int add_relay_log(Relay_log_info* rli,LOG_INFO* linfo)
{
MY_STAT s;
@@ -324,31 +94,30 @@ static inline int add_relay_log(Relay_lo
}
-static int count_relay_log_space(Relay_log_info* rli)
+int Relay_log_info::count_relay_log_space()
{
LOG_INFO linfo;
DBUG_ENTER("count_relay_log_space");
- rli->log_space_total= 0;
- if (rli->relay_log.find_log_pos(&linfo, NullS, 1))
+ log_space_total= 0;
+ if (relay_log.find_log_pos(&linfo, NullS, 1))
{
sql_print_error("Could not find first log while counting relay log space");
DBUG_RETURN(1);
}
do
{
- if (add_relay_log(rli,&linfo))
+ if (add_relay_log(this, &linfo))
DBUG_RETURN(1);
- } while (!rli->relay_log.find_next_log(&linfo, 1));
+ } while (!relay_log.find_next_log(&linfo, 1));
/*
As we have counted everything, including what may have written in a
preceding write, we must reset bytes_written, or we may count some space
twice.
*/
- rli->relay_log.reset_bytes_written();
+ relay_log.reset_bytes_written();
DBUG_RETURN(0);
}
-
/*
Reset UNTIL condition for Relay_log_info
@@ -938,7 +707,7 @@ int purge_relay_logs(Relay_log_info* rli
strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->event_relay_log_name)-1);
rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
- if (count_relay_log_space(rli))
+ if (rli->count_relay_log_space())
{
*errmsg= "Error counting relay log space";
goto err;
@@ -1131,7 +900,7 @@ void Relay_log_info::stmt_done(my_off_t
else
{
inc_group_relay_log_pos(event_master_log_pos);
- flush_relay_log_info(this);
+ flush_relay_log_info();
/*
Note that Rotate_log_event::do_apply_event() does not call this
function, so there is no chance that a fake rotate event resets
=== modified file 'sql/rpl_rli.h'
--- a/sql/rpl_rli.h 2009-02-17 23:06:00 +0000
+++ b/sql/rpl_rli.h 2009-03-02 21:53:42 +0000
@@ -80,14 +80,10 @@ public:
bool replicate_same_server_id;
/*** The following variables can only be read when protect by data lock ****/
-
/*
- info_fd - file descriptor of the info file. set only during
- initialization or clean up - safe to read anytime
cur_log_fd - file descriptor of the current read relay log
*/
- File info_fd,cur_log_fd;
-
+ File cur_log_fd;
/*
Protected with internal locks.
Must get data_lock when resetting the logs.
@@ -97,13 +93,6 @@ public:
IO_CACHE cache_buf,*cur_log;
/*
- * Keeps track of the number of transactions that commits
- * before fsyncing. The option --sync-relay-log-info determines
- * how many transactions should commit before fsyncing.
- */
- uint sync_counter;
-
- /*
* Identifies when the recovery process is going on.
* See sql/slave.cc:init_recovery for further details.
*/
@@ -111,9 +100,6 @@ public:
/* The following variables are safe to read any time */
- /* IO_CACHE of the info file - set only during init or end */
- IO_CACHE info_file;
-
/*
When we restart slave thread we need to have access to the previously
created temporary tables. Modified only on init/end and by the SQL
@@ -274,7 +260,7 @@ public:
ulonglong ign_master_log_pos_end;
Relay_log_info(bool is_slave_recovery);
- ~Relay_log_info();
+ virtual ~Relay_log_info();
/*
Invalidate cached until_log_name and group_relay_log_name comparison
@@ -420,13 +406,35 @@ public:
(m_flags & (1UL << IN_STMT));
}
-private:
- uint32 m_flags;
-};
+ int count_relay_log_space();
+ int init_relay_log_info()
+ {
+ return create_relay_info();
+ }
-// Defined in rpl_rli.cc
-int init_relay_log_info(Relay_log_info* rli, const char* info_fname);
+ bool flush_relay_log_info()
+ {
+ return save_relay_info();
+ }
+
+ void end_relay_log_info()
+ {
+ close_relay_info();
+ }
+ int reset_relay_log_info()
+ {
+ return erase_relay_info();
+ }
+
+ virtual int create_relay_info() { return (1); };
+ virtual bool save_relay_info() { return (FALSE); };
+ virtual void close_relay_info() { };
+ virtual int erase_relay_info() { return (1); };
+
+private:
+ uint32 m_flags;
+};
#endif /* RPL_RLI_H */
=== added file 'sql/rpl_rli_file.cc'
--- a/sql/rpl_rli_file.cc 1970-01-01 00:00:00 +0000
+++ b/sql/rpl_rli_file.cc 2009-03-02 21:53:42 +0000
@@ -0,0 +1,362 @@
+/* Copyright (C) 2000-2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "rpl_rli_file.h"
+#include <my_dir.h> // For MY_STAT
+#include "sql_repl.h" // For check_binlog_magic
+#include "rpl_utility.h"
+#include "transaction.h"
+
+// Defined in slave.cc
+int init_intvar_from_file(int* var, IO_CACHE* f, int default_val);
+int init_strvar_from_file(char *var, int max_size, IO_CACHE *f,
+ const char *default_val);
+
+Relay_log_file_info::Relay_log_file_info(bool is_slave_recovery, const char* param_info_fname)
+ :Relay_log_info(is_slave_recovery),
+ info_fname(param_info_fname), info_fd(-1), sync_counter(0)
+{
+ DBUG_ENTER("Relay_log_file_info::Relay_log_info");
+
+ bzero((char*) &info_file, sizeof(info_file));
+
+ DBUG_VOID_RETURN;
+}
+
+int Relay_log_file_info::create_relay_info()
+{
+ char fname[FN_REFLEN+128];
+ const char* msg = 0;
+ int error = 0;
+ DBUG_ENTER("Relay_log_file_info::create_relay_info");
+ DBUG_ASSERT(!no_storage); // Don't init if there is no storage
+
+ if (inited) // Set if this function called
+ DBUG_RETURN(0);
+ fn_format(fname, info_fname, mysql_data_home, "", 4+32);
+ pthread_mutex_lock(&data_lock);
+ cur_log_fd = -1;
+ slave_skip_counter=0;
+ abort_pos_wait=0;
+ log_space_limit= relay_log_space_limit;
+ log_space_total= 0;
+ tables_to_lock= 0;
+ tables_to_lock_count= 0;
+
+ /*
+ The relay log will now be opened, as a SEQ_READ_APPEND IO_CACHE.
+ Note that the I/O thread flushes it to disk after writing every
+ event, in flush_master_info(mi, 1).
+ */
+
+ /*
+ For the maximum log size, we choose max_relay_log_size if it is
+ non-zero, max_binlog_size otherwise. If later the user does SET
+ GLOBAL on one of these variables, fix_max_binlog_size and
+ fix_max_relay_log_size will reconsider the choice (for example
+ if the user changes max_relay_log_size to zero, we have to
+ switch to using max_binlog_size for the relay log) and update
+ relay_log.max_size (and mysql_bin_log.max_size).
+ */
+ {
+ char buf[FN_REFLEN];
+ const char *ln;
+ static bool name_warning_sent= 0;
+ ln= relay_log.generate_name(opt_relay_logname, "-relay-bin",
+ 1, buf);
+ /* We send the warning only at startup, not after every RESET SLAVE */
+ if (!opt_relay_logname && !opt_relaylog_index_name && !name_warning_sent)
+ {
+ /*
+ User didn't give us info to name the relay log index file.
+ Picking `hostname`-relay-bin.index like we do, causes replication to
+ fail if this slave's hostname is changed later. So, we would like to
+ instead require a name. But as we don't want to break many existing
+ setups, we only give warning, not error.
+ */
+ sql_print_warning("Neither --relay-log nor --relay-log-index were used;"
+ " so replication "
+ "may break when this MySQL server acts as a "
+ "slave and has his hostname changed!! Please "
+ "use '--relay-log=%s' to avoid this problem.", ln);
+ name_warning_sent= 1;
+ }
+ /*
+ note, that if open() fails, we'll still have index file open
+ but a destructor will take care of that
+ */
+ if (relay_log.open_index_file(opt_relaylog_index_name, ln) ||
+ relay_log.open(ln, LOG_BIN, 0, SEQ_READ_APPEND, 0,
+ (max_relay_log_size ? max_relay_log_size :
+ max_binlog_size), 1))
+ {
+ pthread_mutex_unlock(&data_lock);
+ sql_print_error("Failed in open_log() called from init_relay_log_info()");
+ DBUG_RETURN(1);
+ }
+ relay_log.is_relay_log= TRUE;
+ }
+
+ /* if file does not exist */
+ if (access(fname,F_OK))
+ {
+ /*
+ If someone removed the file from underneath our feet, just close
+ the old descriptor and re-create the old file
+ */
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(MY_WME));
+ if ((info_fd = my_open(fname, O_CREAT|O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
+ {
+ sql_print_error("Failed to create a new relay log info file (\
+file '%s', errno %d)", fname, my_errno);
+ msg= current_thd->stmt_da->message();
+ goto err;
+ }
+ if (init_io_cache(&info_file, info_fd, IO_SIZE*2, READ_CACHE, 0L,0,
+ MYF(MY_WME)))
+ {
+ sql_print_error("Failed to create a cache on relay log info file '%s'",
+ fname);
+ msg= current_thd->stmt_da->message();
+ goto err;
+ }
+
+ /* Init relay log with first entry in the relay index file */
+ if (init_relay_log_pos(this,NullS,BIN_LOG_HEADER_SIZE,0 /* no data lock */,
+ &msg, 0))
+ {
+ sql_print_error("Failed to open the relay log 'FIRST' (relay_log_pos 4)");
+ goto err;
+ }
+ group_master_log_name[0]= 0;
+ group_master_log_pos= 0;
+ }
+ else // file exists
+ {
+ if (info_fd >= 0)
+ reinit_io_cache(&info_file, READ_CACHE, 0L,0,0);
+ else
+ {
+ int error=0;
+ if ((info_fd = my_open(fname, O_RDWR|O_BINARY, MYF(MY_WME))) < 0)
+ {
+ sql_print_error("\
+Failed to open the existing relay log info file '%s' (errno %d)",
+ fname, my_errno);
+ error= 1;
+ }
+ else if (init_io_cache(&info_file, info_fd,
+ IO_SIZE*2, READ_CACHE, 0L, 0, MYF(MY_WME)))
+ {
+ sql_print_error("Failed to create a cache on relay log info file '%s'",
+ fname);
+ error= 1;
+ }
+ if (error)
+ {
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ info_fd= -1;
+ relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+ pthread_mutex_unlock(&data_lock);
+ DBUG_RETURN(1);
+ }
+ }
+
+ int relay_log_pos, master_log_pos;
+ if (init_strvar_from_file(group_relay_log_name,
+ sizeof(group_relay_log_name),
+ &info_file, "") ||
+ init_intvar_from_file(&relay_log_pos,
+ &info_file, BIN_LOG_HEADER_SIZE) ||
+ init_strvar_from_file(group_master_log_name,
+ sizeof(group_master_log_name),
+ &info_file, "") ||
+ init_intvar_from_file(&master_log_pos, &info_file, 0))
+ {
+ msg="Error reading slave log configuration";
+ goto err;
+ }
+ strmake(event_relay_log_name,group_relay_log_name,
+ sizeof(event_relay_log_name)-1);
+ group_relay_log_pos= event_relay_log_pos= relay_log_pos;
+ group_master_log_pos= master_log_pos;
+
+ if (!is_relay_log_recovery &&
+ init_relay_log_pos(this,
+ group_relay_log_name,
+ group_relay_log_pos,
+ 0 /* no data lock*/,
+ &msg, 0))
+ {
+ char llbuf[22];
+ sql_print_error("Failed to open the relay log '%s' (relay_log_pos %s)",
+ group_relay_log_name,
+ llstr(group_relay_log_pos, llbuf));
+ goto err;
+ }
+ }
+
+#ifndef DBUG_OFF
+ if (!(is_relay_log_recovery))
+ {
+ char llbuf1[22], llbuf2[22];
+ DBUG_PRINT("info", ("my_b_tell(cur_log)=%s event_relay_log_pos=%s",
+ llstr(my_b_tell(cur_log),llbuf1),
+ llstr(event_relay_log_pos,llbuf2)));
+ DBUG_ASSERT(event_relay_log_pos >= BIN_LOG_HEADER_SIZE);
+ DBUG_ASSERT((my_b_tell(cur_log) == event_relay_log_pos));
+ }
+#endif
+
+ /*
+ Now change the cache from READ to WRITE - must do this
+ before flush_relay_log_info
+ */
+ reinit_io_cache(&info_file, WRITE_CACHE,0L,0,1);
+ if ((error= flush_relay_log_info()))
+ sql_print_error("Failed to flush relay log info file");
+ if (count_relay_log_space())
+ {
+ msg="Error counting relay log space";
+ goto err;
+ }
+ inited= 1;
+ pthread_mutex_unlock(&data_lock);
+ DBUG_RETURN(error);
+
+err:
+ sql_print_error(msg);
+ end_io_cache(&info_file);
+ if (info_fd >= 0)
+ my_close(info_fd, MYF(0));
+ info_fd= -1;
+ relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+ pthread_mutex_unlock(&data_lock);
+ DBUG_RETURN(1);
+}
+
+void Relay_log_file_info::close_relay_info()
+{
+ DBUG_ENTER("Relay_log_file_info::close_relay_info");
+
+ if (!inited)
+ DBUG_VOID_RETURN;
+ if (info_fd >= 0)
+ {
+ end_io_cache(&info_file);
+ (void) my_close(info_fd, MYF(MY_WME));
+ info_fd = -1;
+ }
+ if (cur_log_fd >= 0)
+ {
+ end_io_cache(&cache_buf);
+ (void)my_close(cur_log_fd, MYF(MY_WME));
+ cur_log_fd = -1;
+ }
+ inited = 0;
+ relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
+ relay_log.harvest_bytes_written(&log_space_total);
+ /*
+ Delete the slave's temporary tables from memory.
+ In the future there will be other actions than this, to ensure persistance
+ of slave's temp tables after shutdown.
+ */
+ close_temporary_tables();
+ DBUG_VOID_RETURN;
+}
+
+/*
+ Store the file and position where the execute-slave thread are in the
+ relay log.
+
+ SYNOPSIS
+ flush_relay_log_info()
+ rli Relay log information
+
+ NOTES
+ - As this is only called by the slave thread, we don't need to
+ have a lock on this.
+ - If there is an active transaction, then we don't update the position
+ in the relay log. This is to ensure that we re-execute statements
+ if we die in the middle of an transaction that was rolled back.
+ - As a transaction never spans binary logs, we don't have to handle the
+ case where we do a relay-log-rotation in the middle of the transaction.
+ If this would not be the case, we would have to ensure that we
+ don't delete the relay log file where the transaction started when
+ we switch to a new relay log file.
+
+ TODO
+ - Change the log file information to a binary format to avoid calling
+ longlong2str.
+
+ RETURN VALUES
+ 0 ok
+ 1 write error
+*/
+
+bool Relay_log_file_info::save_relay_info()
+{
+ bool error=0;
+ DBUG_ENTER("Relay_log_file_info::flush_relay_log_info");
+
+ if (unlikely(no_storage))
+ DBUG_RETURN(0);
+
+ IO_CACHE *file = &info_file;
+ char buff[FN_REFLEN*2+22*2+4], *pos;
+
+ my_b_seek(file, 0L);
+ pos=strmov(buff, group_relay_log_name);
+ *pos++='\n';
+ pos=longlong2str(group_relay_log_pos, pos, 10);
+ *pos++='\n';
+ pos=strmov(pos, group_master_log_name);
+ *pos++='\n';
+ pos=longlong2str(group_master_log_pos, pos, 10);
+ *pos='\n';
+ if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
+ error=1;
+ if (flush_io_cache(file))
+ error=1;
+ if (sync_relayloginfo_period &&
+ !error &&
+ ++(sync_counter) >= sync_relayloginfo_period)
+ {
+ if (my_sync(info_fd, MYF(MY_WME)))
+ error=0;
+ sync_counter= 0;
+ }
+ /* Flushing the relay log is done by the slave I/O thread */
+ DBUG_RETURN(error);
+}
+
+int Relay_log_file_info::erase_relay_info()
+{
+ MY_STAT stat_area;
+ char fname[FN_REFLEN];
+ int error= 0;
+
+ fn_format(fname, info_fname, mysql_data_home, "", 4+32);
+ if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
+ error= 1;
+
+ return (error);
+}
=== added file 'sql/rpl_rli_file.h'
--- a/sql/rpl_rli_file.h 1970-01-01 00:00:00 +0000
+++ b/sql/rpl_rli_file.h 2009-03-02 21:53:42 +0000
@@ -0,0 +1,47 @@
+/* Copyright (C) 2005 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef RPL_RLI_FILE_H
+#define RPL_RLI_FILE_H
+
+#include "rpl_rli.h"
+
+class Relay_log_file_info : public Relay_log_info
+{
+ public:
+
+ const char* info_fname;
+ /*
+ info_fd - file descriptor of the info file. set only during
+ initialization or clean up - safe to read anytime
+ */
+ File info_fd;
+ /* IO_CACHE of the info file - set only during init or end */
+ IO_CACHE info_file;
+ /*
+ Keeps track of the number of transactions that commits
+ before fsyncing. The option --sync-relay-log-info determines
+ how many transactions should commit before fsyncing.
+ */
+ uint sync_counter;
+
+ Relay_log_file_info(bool is_slave_recovery, const char* info_name);
+ int create_relay_info();
+ bool save_relay_info();
+ void close_relay_info();
+ int erase_relay_info();
+};
+
+#endif /* RPL_RLI_FILE_H */
=== modified file 'sql/set_var.cc'
--- a/sql/set_var.cc 2009-02-05 12:49:39 +0000
+++ b/sql/set_var.cc 2009-03-02 21:53:42 +0000
@@ -1404,7 +1404,7 @@ static void fix_max_binlog_size(THD *thd
mysql_bin_log.set_max_size(max_binlog_size);
#ifdef HAVE_REPLICATION
if (!max_relay_log_size)
- active_mi->rli.relay_log.set_max_size(max_binlog_size);
+ active_mi->rli->relay_log.set_max_size(max_binlog_size);
#endif
DBUG_VOID_RETURN;
}
@@ -1415,7 +1415,7 @@ static void fix_max_relay_log_size(THD *
DBUG_PRINT("info",("max_binlog_size=%lu max_relay_log_size=%lu",
max_binlog_size, max_relay_log_size));
#ifdef HAVE_REPLICATION
- active_mi->rli.relay_log.set_max_size(max_relay_log_size ?
+ active_mi->rli->relay_log.set_max_size(max_relay_log_size ?
max_relay_log_size: max_binlog_size);
#endif
DBUG_VOID_RETURN;
=== modified file 'sql/slave.cc'
--- a/sql/slave.cc 2009-02-17 23:18:08 +0000
+++ b/sql/slave.cc 2009-03-02 21:53:42 +0000
@@ -31,6 +31,7 @@
#include "slave.h"
#include "rpl_mi.h"
#include "rpl_rli.h"
+#include "rpl_rli_file.h"
#include "sql_repl.h"
#include "rpl_filter.h"
#include "repl_failsafe.h"
@@ -170,7 +171,7 @@ static bool check_io_slave_killed(THD *t
void init_thread_mask(int* mask,Master_info* mi,bool inverse)
{
- bool set_io = mi->slave_running, set_sql = mi->rli.slave_running;
+ bool set_io = mi->slave_running, set_sql = mi->rli->slave_running;
register int tmp_mask=0;
DBUG_ENTER("init_thread_mask");
@@ -195,7 +196,7 @@ void lock_slave_threads(Master_info* mi)
//TODO: see if we can do this without dual mutex
pthread_mutex_lock(&mi->run_lock);
- pthread_mutex_lock(&mi->rli.run_lock);
+ pthread_mutex_lock(&mi->rli->run_lock);
DBUG_VOID_RETURN;
}
@@ -209,7 +210,7 @@ void unlock_slave_threads(Master_info* m
DBUG_ENTER("unlock_slave_threads");
//TODO: see if we can do this without dual mutex
- pthread_mutex_unlock(&mi->rli.run_lock);
+ pthread_mutex_unlock(&mi->rli->run_lock);
pthread_mutex_unlock(&mi->run_lock);
DBUG_VOID_RETURN;
}
@@ -236,10 +237,16 @@ int init_slave()
if (pthread_key_create(&RPL_MASTER_INFO, NULL))
{
error= 1;
- goto err;
+ DBUG_RETURN(error);
}
- active_mi= new Master_info(relay_log_recovery);
+ Relay_log_info *rli= (transact_replication ?
+ new Relay_log_file_info(relay_log_recovery,
+ relay_log_info_file)
+ : new Relay_log_file_info(relay_log_recovery,
+ relay_log_info_file));
+
+ active_mi= new Master_info(rli);
/*
If master_host is not specified, try to read it from the master_info file.
@@ -253,7 +260,7 @@ int init_slave()
goto err;
}
- if (init_master_info(active_mi,master_info_file,relay_log_info_file,
+ if (active_mi->init_master_info(master_info_file,
1, (SLAVE_IO | SLAVE_SQL)))
{
sql_print_error("Failed to initialize the master info structure");
@@ -261,7 +268,7 @@ int init_slave()
goto err;
}
- if (active_mi->rli.is_relay_log_recovery && init_recovery(active_mi))
+ if (active_mi->rli->is_relay_log_recovery && init_recovery(active_mi))
{
error= 1;
goto err;
@@ -285,7 +292,9 @@ int init_slave()
}
err:
- active_mi->rli.is_relay_log_recovery= FALSE;
+ if (active_mi) {
+ active_mi->rli->is_relay_log_recovery= FALSE;
+ }
pthread_mutex_unlock(&LOCK_active_mi);
DBUG_RETURN(error);
}
@@ -322,7 +331,7 @@ static int init_recovery(Master_info* mi
const char *errmsg= 0;
DBUG_ENTER("init_recovery");
- Relay_log_info *rli= &mi->rli;
+ Relay_log_info *rli= mi->rli;
if (rli->group_master_log_name[0])
{
mi->master_log_pos= max(BIN_LOG_HEADER_SIZE,
@@ -336,7 +345,7 @@ static int init_recovery(Master_info* mi
strmake(rli->group_relay_log_name, rli->relay_log.get_log_fname(),
sizeof(rli->group_relay_log_name)-1);
strmake(rli->event_relay_log_name, rli->relay_log.get_log_fname(),
- sizeof(mi->rli.event_relay_log_name)-1);
+ sizeof(mi->rli->event_relay_log_name)-1);
rli->group_relay_log_pos= rli->event_relay_log_pos= BIN_LOG_HEADER_SIZE;
@@ -347,12 +356,12 @@ static int init_recovery(Master_info* mi
&errmsg, 0))
DBUG_RETURN(1);
- if (flush_master_info(mi, 0))
+ if (mi->flush_master_info(0))
{
sql_print_error("Failed to flush master info file");
DBUG_RETURN(1);
}
- if (flush_relay_log_info(rli))
+ if (rli->flush_relay_log_info())
{
sql_print_error("Failed to flush relay info file");
DBUG_RETURN(1);
@@ -472,7 +481,7 @@ int terminate_slave_threads(Master_info*
if (!mi->inited)
DBUG_RETURN(0); /* successfully do nothing */
int error,force_all = (thread_mask & SLAVE_FORCE_ALL);
- pthread_mutex_t *sql_lock = &mi->rli.run_lock, *io_lock = &mi->run_lock;
+ pthread_mutex_t *sql_lock = &mi->rli->run_lock, *io_lock = &mi->run_lock;
if ((thread_mask & (SLAVE_IO|SLAVE_FORCE_ALL)))
{
@@ -488,10 +497,10 @@ int terminate_slave_threads(Master_info*
if ((thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)))
{
DBUG_PRINT("info",("Terminating SQL thread"));
- mi->rli.abort_slave=1;
- if ((error=terminate_slave_thread(mi->rli.sql_thd,sql_lock,
- &mi->rli.stop_cond,
- &mi->rli.slave_running,
+ mi->rli->abort_slave=1;
+ if ((error=terminate_slave_thread(mi->rli->sql_thd,sql_lock,
+ &mi->rli->stop_cond,
+ &mi->rli->slave_running,
skip_lock)) &&
!force_all)
DBUG_RETURN(error);
@@ -677,14 +686,14 @@ int start_slave_threads(bool need_slave_
if (need_slave_mutex)
{
lock_io = &mi->run_lock;
- lock_sql = &mi->rli.run_lock;
+ lock_sql = &mi->rli->run_lock;
}
if (wait_for_start)
{
cond_io = &mi->start_cond;
- cond_sql = &mi->rli.start_cond;
+ cond_sql = &mi->rli->start_cond;
lock_cond_io = &mi->run_lock;
- lock_cond_sql = &mi->rli.run_lock;
+ lock_cond_sql = &mi->rli->run_lock;
}
if (thread_mask & SLAVE_IO)
@@ -696,7 +705,7 @@ int start_slave_threads(bool need_slave_
{
error=start_slave_thread(handle_slave_sql,lock_sql,lock_cond_sql,
cond_sql,
- &mi->rli.slave_running, &mi->rli.slave_run_id,
+ &mi->rli->slave_running, &mi->rli->slave_run_id,
mi);
if (error)
terminate_slave_threads(mi, thread_mask & SLAVE_IO, 0);
@@ -710,7 +719,7 @@ static int end_slave_on_walk(Master_info
{
DBUG_ENTER("end_slave_on_walk");
- end_master_info(mi);
+ mi->end_master_info();
DBUG_RETURN(0);
}
#endif
@@ -743,7 +752,7 @@ void end_slave()
once multi-master code is ready.
*/
terminate_slave_threads(active_mi,SLAVE_FORCE_ALL);
- end_master_info(active_mi);
+ active_mi->end_master_info();
delete active_mi;
active_mi= 0;
}
@@ -1039,8 +1048,8 @@ static int get_master_version_and_clock(
Free old description_event_for_queue (that is needed if we are in
a reconnection).
*/
- delete mi->rli.relay_log.description_event_for_queue;
- mi->rli.relay_log.description_event_for_queue= 0;
+ delete mi->rli->relay_log.description_event_for_queue;
+ mi->rli->relay_log.description_event_for_queue= 0;
if (!my_isdigit(&my_charset_bin,*mysql->server_version))
{
@@ -1065,11 +1074,11 @@ static int get_master_version_and_clock(
err_msg.append(err_buff);
break;
case '3':
- mi->rli.relay_log.description_event_for_queue= new
+ mi->rli->relay_log.description_event_for_queue= new
Format_description_log_event(1, mysql->server_version);
break;
case '4':
- mi->rli.relay_log.description_event_for_queue= new
+ mi->rli->relay_log.description_event_for_queue= new
Format_description_log_event(3, mysql->server_version);
break;
default:
@@ -1081,7 +1090,7 @@ static int get_master_version_and_clock(
(it has the format of the *slave*); it's only good to help know if the
master is 3.23, 4.0, etc.
*/
- mi->rli.relay_log.description_event_for_queue= new
+ mi->rli->relay_log.description_event_for_queue= new
Format_description_log_event(4, mysql->server_version);
break;
}
@@ -1098,7 +1107,7 @@ static int get_master_version_and_clock(
goto err;
/* as we are here, we tried to allocate the event */
- if (!mi->rli.relay_log.description_event_for_queue)
+ if (!mi->rli->relay_log.description_event_for_queue)
{
errmsg= "default Format_description_log_event";
err_code= ER_SLAVE_CREATE_EVENT_FAILURE;
@@ -1146,7 +1155,7 @@ static int get_master_version_and_clock(
{
if ((master_row= mysql_fetch_row(master_res)) &&
(::server_id == (mi->master_id= strtoul(master_row[1], 0, 10))) &&
- !mi->rli.replicate_same_server_id)
+ !mi->rli->replicate_same_server_id)
{
errmsg=
"The slave I/O thread stops because master and slave have equal"
@@ -1327,7 +1336,7 @@ Waiting for the slave SQL thread to free
*/
static void write_ignored_events_info_to_relay_log(THD *thd, Master_info *mi)
{
- Relay_log_info *rli= &mi->rli;
+ Relay_log_info *rli= mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
DBUG_ENTER("write_ignored_events_info_to_relay_log");
@@ -1352,7 +1361,7 @@ static void write_ignored_events_info_to
" to the relay log, SHOW SLAVE STATUS may be"
" inaccurate");
rli->relay_log.harvest_bytes_written(&rli->log_space_total);
- if (flush_master_info(mi, 1))
+ if (mi->flush_master_info(1))
sql_print_error("Failed to flush master info file");
delete ev;
}
@@ -1519,21 +1528,21 @@ bool show_master_info(THD* thd, Master_i
pthread_mutex_unlock(&mi->run_lock);
pthread_mutex_lock(&mi->data_lock);
- pthread_mutex_lock(&mi->rli.data_lock);
+ pthread_mutex_lock(&mi->rli->data_lock);
protocol->store(mi->host, &my_charset_bin);
protocol->store(mi->user, &my_charset_bin);
protocol->store((uint32) mi->port);
protocol->store((uint32) mi->connect_retry);
protocol->store(mi->master_log_name, &my_charset_bin);
protocol->store((ulonglong) mi->master_log_pos);
- protocol->store(mi->rli.group_relay_log_name +
- dirname_length(mi->rli.group_relay_log_name),
+ protocol->store(mi->rli->group_relay_log_name +
+ dirname_length(mi->rli->group_relay_log_name),
&my_charset_bin);
- protocol->store((ulonglong) mi->rli.group_relay_log_pos);
- protocol->store(mi->rli.group_master_log_name, &my_charset_bin);
+ protocol->store((ulonglong) mi->rli->group_relay_log_pos);
+ protocol->store(mi->rli->group_master_log_name, &my_charset_bin);
protocol->store(mi->slave_running == MYSQL_SLAVE_RUN_CONNECT ?
"Yes" : "No", &my_charset_bin);
- protocol->store(mi->rli.slave_running ? "Yes":"No", &my_charset_bin);
+ protocol->store(mi->rli->slave_running ? "Yes":"No", &my_charset_bin);
protocol->store(rpl_filter->get_do_db());
protocol->store(rpl_filter->get_ignore_db());
@@ -1548,18 +1557,18 @@ bool show_master_info(THD* thd, Master_i
rpl_filter->get_wild_ignore_table(&tmp);
protocol->store(&tmp);
- protocol->store(mi->rli.last_error().number);
- protocol->store(mi->rli.last_error().message, &my_charset_bin);
- protocol->store((uint32) mi->rli.slave_skip_counter);
- protocol->store((ulonglong) mi->rli.group_master_log_pos);
- protocol->store((ulonglong) mi->rli.log_space_total);
+ protocol->store(mi->rli->last_error().number);
+ protocol->store(mi->rli->last_error().message, &my_charset_bin);
+ protocol->store((uint32) mi->rli->slave_skip_counter);
+ protocol->store((ulonglong) mi->rli->group_master_log_pos);
+ protocol->store((ulonglong) mi->rli->log_space_total);
protocol->store(
- mi->rli.until_condition==Relay_log_info::UNTIL_NONE ? "None":
- ( mi->rli.until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
+ mi->rli->until_condition==Relay_log_info::UNTIL_NONE ? "None":
+ ( mi->rli->until_condition==Relay_log_info::UNTIL_MASTER_POS? "Master":
"Relay"), &my_charset_bin);
- protocol->store(mi->rli.until_log_name, &my_charset_bin);
- protocol->store((ulonglong) mi->rli.until_log_pos);
+ protocol->store(mi->rli->until_log_name, &my_charset_bin);
+ protocol->store((ulonglong) mi->rli->until_log_pos);
#ifdef HAVE_OPENSSL
protocol->store(mi->ssl? "Yes":"No", &my_charset_bin);
@@ -1577,9 +1586,9 @@ bool show_master_info(THD* thd, Master_i
connected, we can compute it otherwise show NULL (i.e. unknown).
*/
if ((mi->slave_running == MYSQL_SLAVE_RUN_CONNECT) &&
- mi->rli.slave_running)
+ mi->rli->slave_running)
{
- long time_diff= ((long)(time(0) - mi->rli.last_master_timestamp)
+ long time_diff= ((long)(time(0) - mi->rli->last_master_timestamp)
- mi->clock_diff_with_master);
/*
Apparently on some systems time_diff can be <0. Here are possible
@@ -1601,7 +1610,7 @@ bool show_master_info(THD* thd, Master_i
last_master_timestamp == 0 (an "impossible" timestamp 1970) is a
special marker to say "consider we have caught up".
*/
- protocol->store((longlong)(mi->rli.last_master_timestamp ?
+ protocol->store((longlong)(mi->rli->last_master_timestamp ?
max(0, time_diff) : 0));
}
else
@@ -1615,9 +1624,9 @@ bool show_master_info(THD* thd, Master_i
// Last_IO_Error
protocol->store(mi->last_error().message, &my_charset_bin);
// Last_SQL_Errno
- protocol->store(mi->rli.last_error().number);
+ protocol->store(mi->rli->last_error().number);
// Last_SQL_Error
- protocol->store(mi->rli.last_error().message, &my_charset_bin);
+ protocol->store(mi->rli->last_error().message, &my_charset_bin);
// Replicate_Ignore_Server_Ids
{
char buff[FN_REFLEN];
@@ -1645,7 +1654,7 @@ bool show_master_info(THD* thd, Master_i
// Master_Server_id
protocol->store((uint32) mi->master_id);
- pthread_mutex_unlock(&mi->rli.data_lock);
+ pthread_mutex_unlock(&mi->rli->data_lock);
pthread_mutex_unlock(&mi->data_lock);
if (my_net_write(&thd->net, (uchar*) thd->packet.ptr(), packet->length()))
@@ -2233,7 +2242,7 @@ static int exec_relay_log_event(THD* thd
*/
if (rli->trans_retries < slave_trans_retries)
{
- if (init_master_info(rli->mi, 0, 0, 0, SLAVE_SQL))
+ if (rli->mi->init_master_info(0, 0, SLAVE_SQL))
sql_print_error("Failed to initialize the master info structure");
else if (init_relay_log_pos(rli,
rli->group_relay_log_name,
@@ -2383,7 +2392,7 @@ pthread_handler_t handle_slave_io(void *
THD *thd; // needs to be first for thread_stack
MYSQL *mysql;
Master_info *mi = (Master_info*)arg;
- Relay_log_info *rli= &mi->rli;
+ Relay_log_info *rli= mi->rli;
char llbuff[22];
uint retry_count;
bool suppress_warnings;
@@ -2476,7 +2485,7 @@ connected:
if (get_master_version_and_clock(mysql, mi))
goto err;
- if (mi->rli.relay_log.description_event_for_queue->binlog_version > 1)
+ if (mi->rli->relay_log.description_event_for_queue->binlog_version > 1)
{
/*
Register ourselves with the master.
@@ -2609,19 +2618,19 @@ Stopping slave I/O thread due to out-of-
(thd, mi, event_buf, event_len, synced)))
goto err;
- if (flush_master_info(mi, 1))
+ if (mi->flush_master_info(1))
{
sql_print_error("Failed to flush master info file");
goto err;
}
/*
See if the relay logs take too much space.
- We don't lock mi->rli.log_space_lock here; this dirty read saves time
+ We don't lock mi->rli->log_space_lock here; this dirty read saves time
and does not introduce any problem:
- - if mi->rli.ignore_log_space_limit is 1 but becomes 0 just after (so
+ - if mi->rli->ignore_log_space_limit is 1 but becomes 0 just after (so
the clean value is 0), then we are reading only one more event as we
should, and we'll block only at the next event. No big deal.
- - if mi->rli.ignore_log_space_limit is 0 but becomes 1 just after (so
+ - if mi->rli->ignore_log_space_limit is 0 but becomes 1 just after (so
the clean value is 1), then we are going into wait_for_relay_log_space()
for no reason, but this function will do a clean read, notice the clean
value and exit immediately.
@@ -2679,8 +2688,8 @@ err:
pthread_mutex_lock(&mi->run_lock);
/* Forget the relay log's format */
- delete mi->rli.relay_log.description_event_for_queue;
- mi->rli.relay_log.description_event_for_queue= 0;
+ delete mi->rli->relay_log.description_event_for_queue;
+ mi->rli->relay_log.description_event_for_queue= 0;
// TODO: make rpl_status part of Master_info
change_rpl_status(RPL_ACTIVE_SLAVE,RPL_IDLE_SLAVE);
DBUG_ASSERT(thd->net.buff != 0);
@@ -2719,7 +2728,7 @@ pthread_handler_t handle_slave_sql(void
THD *thd; /* needs to be first for thread_stack */
char llbuff[22],llbuff1[22];
- Relay_log_info* rli = &((Master_info*)arg)->rli;
+ Relay_log_info* rli = ((Master_info*)arg)->rli;
const char *errmsg;
// needs to call my_thread_init(), otherwise we get a coredump in DBUG_ stuff
@@ -3070,21 +3079,21 @@ static int process_io_create_file(Master
break;
Execute_load_log_event xev(thd,0,0);
xev.log_pos = cev->log_pos;
- if (unlikely(mi->rli.relay_log.append(&xev)))
+ if (unlikely(mi->rli->relay_log.append(&xev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Exec_load event to relay log");
goto err;
}
- mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
+ mi->rli->relay_log.harvest_bytes_written(&mi->rli->log_space_total);
break;
}
if (unlikely(cev_not_written))
{
cev->block = net->read_pos;
cev->block_len = num_bytes;
- if (unlikely(mi->rli.relay_log.append(cev)))
+ if (unlikely(mi->rli->relay_log.append(cev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
@@ -3092,21 +3101,21 @@ static int process_io_create_file(Master
goto err;
}
cev_not_written=0;
- mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total);
+ mi->rli->relay_log.harvest_bytes_written(&mi->rli->log_space_total);
}
else
{
aev.block = net->read_pos;
aev.block_len = num_bytes;
aev.log_pos = cev->log_pos;
- if (unlikely(mi->rli.relay_log.append(&aev)))
+ if (unlikely(mi->rli->relay_log.append(&aev)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_RELAY_LOG_WRITE_FAILURE,
ER(ER_SLAVE_RELAY_LOG_WRITE_FAILURE),
"error writing Append_block event to relay log");
goto err;
}
- mi->rli.relay_log.harvest_bytes_written(&mi->rli.log_space_total) ;
+ mi->rli->relay_log.harvest_bytes_written(&mi->rli->log_space_total) ;
}
}
}
@@ -3167,11 +3176,11 @@ static int process_io_rotate(Master_info
no need to reset description_event_for_queue now. And if it's nothing (same
master version as before), no need (still using the slave's format).
*/
- if (mi->rli.relay_log.description_event_for_queue->binlog_version >= 4)
+ if (mi->rli->relay_log.description_event_for_queue->binlog_version >= 4)
{
- delete mi->rli.relay_log.description_event_for_queue;
+ delete mi->rli->relay_log.description_event_for_queue;
/* start from format 3 (MySQL 4.0) again */
- mi->rli.relay_log.description_event_for_queue= new
+ mi->rli->relay_log.description_event_for_queue= new
Format_description_log_event(3);
}
/*
@@ -3193,7 +3202,7 @@ static int queue_binlog_ver_1_event(Mast
ulong inc_pos;
bool ignore_event= 0;
char *tmp_buf = 0;
- Relay_log_info *rli= &mi->rli;
+ Relay_log_info *rli= mi->rli;
DBUG_ENTER("queue_binlog_ver_1_event");
/*
@@ -3227,7 +3236,7 @@ static int queue_binlog_ver_1_event(Mast
connected to the master).
*/
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
- mi->rli.relay_log.description_event_for_queue);
+ mi->rli->relay_log.description_event_for_queue);
if (unlikely(!ev))
{
sql_print_error("Read invalid event from master: '%s',\
@@ -3310,12 +3319,12 @@ static int queue_binlog_ver_3_event(Mast
const char *errmsg = 0;
ulong inc_pos;
char *tmp_buf = 0;
- Relay_log_info *rli= &mi->rli;
+ Relay_log_info *rli= mi->rli;
DBUG_ENTER("queue_binlog_ver_3_event");
/* read_log_event() will adjust log_pos to be end_log_pos */
Log_event *ev = Log_event::read_log_event(buf,event_len, &errmsg,
- mi->rli.relay_log.description_event_for_queue);
+ mi->rli->relay_log.description_event_for_queue);
if (unlikely(!ev))
{
sql_print_error("Read invalid event from master: '%s',\
@@ -3373,7 +3382,7 @@ static int queue_old_event(Master_info *
{
DBUG_ENTER("queue_old_event");
- switch (mi->rli.relay_log.description_event_for_queue->binlog_version)
+ switch (mi->rli->relay_log.description_event_for_queue->binlog_version)
{
case 1:
DBUG_RETURN(queue_binlog_ver_1_event(mi,buf,event_len));
@@ -3381,7 +3390,7 @@ static int queue_old_event(Master_info *
DBUG_RETURN(queue_binlog_ver_3_event(mi,buf,event_len));
default: /* unsupported format; eg version 2 */
DBUG_PRINT("info",("unsupported binlog format %d in queue_old_event()",
- mi->rli.relay_log.description_event_for_queue->binlog_version));
+ mi->rli->relay_log.description_event_for_queue->binlog_version));
DBUG_RETURN(1);
}
}
@@ -3401,14 +3410,14 @@ static int queue_event(Master_info* mi,c
int error= 0;
String error_msg;
ulong inc_pos;
- Relay_log_info *rli= &mi->rli;
+ Relay_log_info *rli= mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
ulong s_id;
DBUG_ENTER("queue_event");
LINT_INIT(inc_pos);
- if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
+ if (mi->rli->relay_log.description_event_for_queue->binlog_version<4 &&
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT /* a way to escape */)
DBUG_RETURN(queue_old_event(mi,buf,event_len));
@@ -3432,7 +3441,7 @@ static int queue_event(Master_info* mi,c
goto err;
case ROTATE_EVENT:
{
- Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
+ Rotate_log_event rev(buf,event_len,mi->rli->relay_log.description_event_for_queue);
if (unlikely(process_io_rotate(mi,&rev)))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
@@ -3460,13 +3469,13 @@ static int queue_event(Master_info* mi,c
const char* errmsg;
if (!(tmp= (Format_description_log_event*)
Log_event::read_log_event(buf, event_len, &errmsg,
- mi->rli.relay_log.description_event_for_queue)))
+ mi->rli->relay_log.description_event_for_queue)))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
goto err;
}
- delete mi->rli.relay_log.description_event_for_queue;
- mi->rli.relay_log.description_event_for_queue= tmp;
+ delete mi->rli->relay_log.description_event_for_queue;
+ mi->rli->relay_log.description_event_for_queue= tmp;
/*
Though this does some conversion to the slave's format, this will
preserve the master's binlog format version, and number of event types.
@@ -3477,7 +3486,7 @@ static int queue_event(Master_info* mi,c
*/
inc_pos= uint4korr(buf+LOG_POS_OFFSET) ? event_len : 0;
DBUG_PRINT("info",("binlog format is now %d",
- mi->rli.relay_log.description_event_for_queue->binlog_version));
+ mi->rli->relay_log.description_event_for_queue->binlog_version));
}
break;
@@ -3488,7 +3497,7 @@ static int queue_event(Master_info* mi,c
HB (heartbeat) cannot come before RL (Relay)
*/
char llbuf[22];
- Heartbeat_log_event hb(buf, event_len, mi->rli.relay_log.description_event_for_queue);
+ Heartbeat_log_event hb(buf, event_len, mi->rli->relay_log.description_event_for_queue);
if (!hb.is_valid())
{
error= ER_SLAVE_HEARTBEAT_FAILURE;
@@ -3551,7 +3560,7 @@ static int queue_event(Master_info* mi,c
pthread_mutex_lock(log_lock);
s_id= uint4korr(buf + SERVER_ID_OFFSET);
- if ((s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ if ((s_id == ::server_id && !mi->rli->replicate_same_server_id) ||
/*
the following conjunction deals with IGNORE_SERVER_IDS, if set
If the master is on the ignore list, execution of
@@ -3582,7 +3591,7 @@ static int queue_event(Master_info* mi,c
IGNORE_SERVER_IDS it increments mi->master_log_pos
as well as rli->group_relay_log_pos.
*/
- if (!(s_id == ::server_id && !mi->rli.replicate_same_server_id) ||
+ if (!(s_id == ::server_id && !mi->rli->replicate_same_server_id) ||
buf[EVENT_TYPE_OFFSET] != FORMAT_DESCRIPTION_EVENT &&
buf[EVENT_TYPE_OFFSET] != ROTATE_EVENT &&
buf[EVENT_TYPE_OFFSET] != STOP_EVENT)
@@ -3627,36 +3636,6 @@ err:
}
-void end_relay_log_info(Relay_log_info* rli)
-{
- DBUG_ENTER("end_relay_log_info");
-
- if (!rli->inited)
- DBUG_VOID_RETURN;
- if (rli->info_fd >= 0)
- {
- end_io_cache(&rli->info_file);
- (void) my_close(rli->info_fd, MYF(MY_WME));
- rli->info_fd = -1;
- }
- if (rli->cur_log_fd >= 0)
- {
- end_io_cache(&rli->cache_buf);
- (void)my_close(rli->cur_log_fd, MYF(MY_WME));
- rli->cur_log_fd = -1;
- }
- rli->inited = 0;
- rli->relay_log.close(LOG_CLOSE_INDEX | LOG_CLOSE_STOP_EVENT);
- rli->relay_log.harvest_bytes_written(&rli->log_space_total);
- /*
- Delete the slave's temporary tables from memory.
- In the future there will be other actions than this, to ensure persistance
- of slave's temp tables after shutdown.
- */
- rli->close_temporary_tables();
- DBUG_VOID_RETURN;
-}
-
/*
Try to connect until successful or slave killed
@@ -3862,72 +3841,6 @@ MYSQL *rpl_connect_master(MYSQL *mysql)
}
/*
- Store the file and position where the execute-slave thread are in the
- relay log.
-
- SYNOPSIS
- flush_relay_log_info()
- rli Relay log information
-
- NOTES
- - As this is only called by the slave thread, we don't need to
- have a lock on this.
- - If there is an active transaction, then we don't update the position
- in the relay log. This is to ensure that we re-execute statements
- if we die in the middle of an transaction that was rolled back.
- - As a transaction never spans binary logs, we don't have to handle the
- case where we do a relay-log-rotation in the middle of the transaction.
- If this would not be the case, we would have to ensure that we
- don't delete the relay log file where the transaction started when
- we switch to a new relay log file.
-
- TODO
- - Change the log file information to a binary format to avoid calling
- longlong2str.
-
- RETURN VALUES
- 0 ok
- 1 write error
-*/
-
-bool flush_relay_log_info(Relay_log_info* rli)
-{
- bool error=0;
- DBUG_ENTER("flush_relay_log_info");
-
- if (unlikely(rli->no_storage))
- DBUG_RETURN(0);
-
- IO_CACHE *file = &rli->info_file;
- char buff[FN_REFLEN*2+22*2+4], *pos;
-
- my_b_seek(file, 0L);
- pos=strmov(buff, rli->group_relay_log_name);
- *pos++='\n';
- pos=longlong2str(rli->group_relay_log_pos, pos, 10);
- *pos++='\n';
- pos=strmov(pos, rli->group_master_log_name);
- *pos++='\n';
- pos=longlong2str(rli->group_master_log_pos, pos, 10);
- *pos='\n';
- if (my_b_write(file, (uchar*) buff, (size_t) (pos-buff)+1))
- error=1;
- if (flush_io_cache(file))
- error=1;
- if (sync_relayloginfo_period &&
- !error &&
- ++(rli->sync_counter) >= sync_relayloginfo_period)
- {
- if (my_sync(rli->info_fd, MYF(MY_WME)))
- error=1;
- rli->sync_counter= 0;
- }
- /* Flushing the relay log is done by the slave I/O thread */
- DBUG_RETURN(error);
-}
-
-
-/*
Called when we notice that the current "hot" log got rotated under our feet.
*/
@@ -4226,7 +4139,7 @@ static Log_event* next_event(Relay_log_i
rli->event_relay_log_pos = BIN_LOG_HEADER_SIZE;
strmake(rli->event_relay_log_name,rli->linfo.log_file_name,
sizeof(rli->event_relay_log_name)-1);
- flush_relay_log_info(rli);
+ rli->flush_relay_log_info();
}
/*
@@ -4326,7 +4239,7 @@ err:
void rotate_relay_log(Master_info* mi)
{
DBUG_ENTER("rotate_relay_log");
- Relay_log_info* rli= &mi->rli;
+ Relay_log_info* rli= mi->rli;
DBUG_EXECUTE_IF("crash_before_rotate_relaylog", exit(1););
@@ -4457,9 +4370,9 @@ bool rpl_master_has_bug(const Relay_log_
*/
bool rpl_master_erroneous_autoinc(THD *thd)
{
- if (active_mi && active_mi->rli.sql_thd == thd)
+ if (active_mi && active_mi->rli->sql_thd == thd)
{
- Relay_log_info *rli= &active_mi->rli;
+ Relay_log_info *rli= active_mi->rli;
DBUG_EXECUTE_IF("simulate_bug33029", return TRUE;);
return rpl_master_has_bug(rli, 33029, FALSE, NULL, NULL);
}
=== modified file 'sql/slave.h'
--- a/sql/slave.h 2008-12-24 10:48:24 +0000
+++ b/sql/slave.h 2009-03-02 21:53:42 +0000
@@ -145,7 +145,6 @@ extern ulonglong relay_log_space_limit;
int init_slave();
void init_slave_skip_errors(const char* arg);
-bool flush_relay_log_info(Relay_log_info* rli);
int register_slave_on_master(MYSQL* mysql);
int terminate_slave_threads(Master_info* mi, int thread_mask,
bool skip_lock = 0);
@@ -186,7 +185,6 @@ void skip_load_data_infile(NET* net);
void end_slave(); /* clean up */
void clear_until_condition(Relay_log_info* rli);
void clear_slave_error(Relay_log_info* rli);
-void end_relay_log_info(Relay_log_info* rli);
void lock_slave_threads(Master_info* mi);
void unlock_slave_threads(Master_info* mi);
void init_thread_mask(int* mask,Master_info* mi,bool inverse);
=== modified file 'sql/sql_insert.cc'
--- a/sql/sql_insert.cc 2009-01-18 23:21:43 +0000
+++ b/sql/sql_insert.cc 2009-03-02 21:53:42 +0000
@@ -708,7 +708,7 @@ bool mysql_insert(THD *thd,TABLE_LIST *t
if (thd->slave_thread &&
(info.handle_duplicates == DUP_UPDATE) &&
(table->next_number_field != NULL) &&
- rpl_master_has_bug(&active_mi->rli, 24432, TRUE, NULL, NULL))
+ rpl_master_has_bug(active_mi->rli, 24432, TRUE, NULL, NULL))
goto abort;
#endif
@@ -3062,7 +3062,7 @@ select_insert::prepare(List<Item> &value
if (thd->slave_thread &&
(info.handle_duplicates == DUP_UPDATE) &&
(table->next_number_field != NULL) &&
- rpl_master_has_bug(&active_mi->rli, 24432, TRUE, NULL, NULL))
+ rpl_master_has_bug(active_mi->rli, 24432, TRUE, NULL, NULL))
DBUG_RETURN(1);
#endif
=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc 2009-02-17 23:18:08 +0000
+++ b/sql/sql_repl.cc 2009-03-02 21:53:42 +0000
@@ -1094,7 +1094,7 @@ int start_slave(THD* thd , Master_info*
thread_mask&= thd->lex->slave_thd_opt;
if (thread_mask) //some threads are stopped, start them
{
- if (init_master_info(mi,master_info_file,relay_log_info_file, 0,
+ if (mi->init_master_info(master_info_file, 0,
thread_mask))
slave_errno=ER_MASTER_INFO;
else if (server_id_supplied && *mi->host)
@@ -1106,38 +1106,38 @@ int start_slave(THD* thd , Master_info*
*/
if (thread_mask & SLAVE_SQL)
{
- pthread_mutex_lock(&mi->rli.data_lock);
+ pthread_mutex_lock(&mi->rli->data_lock);
if (thd->lex->mi.pos)
{
- mi->rli.until_condition= Relay_log_info::UNTIL_MASTER_POS;
- mi->rli.until_log_pos= thd->lex->mi.pos;
+ mi->rli->until_condition= Relay_log_info::UNTIL_MASTER_POS;
+ mi->rli->until_log_pos= thd->lex->mi.pos;
/*
We don't check thd->lex->mi.log_file_name for NULL here
since it is checked in sql_yacc.yy
*/
- strmake(mi->rli.until_log_name, thd->lex->mi.log_file_name,
- sizeof(mi->rli.until_log_name)-1);
+ strmake(mi->rli->until_log_name, thd->lex->mi.log_file_name,
+ sizeof(mi->rli->until_log_name)-1);
}
else if (thd->lex->mi.relay_log_pos)
{
- mi->rli.until_condition= Relay_log_info::UNTIL_RELAY_POS;
- mi->rli.until_log_pos= thd->lex->mi.relay_log_pos;
- strmake(mi->rli.until_log_name, thd->lex->mi.relay_log_name,
- sizeof(mi->rli.until_log_name)-1);
+ mi->rli->until_condition= Relay_log_info::UNTIL_RELAY_POS;
+ mi->rli->until_log_pos= thd->lex->mi.relay_log_pos;
+ strmake(mi->rli->until_log_name, thd->lex->mi.relay_log_name,
+ sizeof(mi->rli->until_log_name)-1);
}
else
- mi->rli.clear_until_condition();
+ mi->rli->clear_until_condition();
- if (mi->rli.until_condition != Relay_log_info::UNTIL_NONE)
+ if (mi->rli->until_condition != Relay_log_info::UNTIL_NONE)
{
/* Preparing members for effective until condition checking */
- const char *p= fn_ext(mi->rli.until_log_name);
+ const char *p= fn_ext(mi->rli->until_log_name);
char *p_end;
if (*p)
{
//p points to '.'
- mi->rli.until_log_name_extension= strtoul(++p,&p_end, 10);
+ mi->rli->until_log_name_extension= strtoul(++p,&p_end, 10);
/*
p_end points to the first invalid character. If it equals
to p, no digits were found, error. If it contains '\0' it
@@ -1150,7 +1150,7 @@ int start_slave(THD* thd , Master_info*
slave_errno=ER_BAD_SLAVE_UNTIL_COND;
/* mark the cached result of the UNTIL comparison as "undefined" */
- mi->rli.until_log_names_cmp_result=
+ mi->rli->until_log_names_cmp_result=
Relay_log_info::UNTIL_LOG_NAMES_CMP_UNKNOWN;
/* Issuing warning then started without --skip-slave-start */
@@ -1160,7 +1160,7 @@ int start_slave(THD* thd , Master_info*
ER(ER_MISSING_SKIP_SLAVE));
}
- pthread_mutex_unlock(&mi->rli.data_lock);
+ pthread_mutex_unlock(&mi->rli->data_lock);
}
else if (thd->lex->mi.pos || thd->lex->mi.relay_log_pos)
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_NOTE, ER_UNTIL_COND_IGNORED,
@@ -1276,8 +1276,6 @@ int stop_slave(THD* thd, Master_info* mi
*/
int reset_slave(THD *thd, Master_info* mi)
{
- MY_STAT stat_area;
- char fname[FN_REFLEN];
int thread_mask= 0, error= 0;
uint sql_errno=0;
const char* errmsg=0;
@@ -1296,35 +1294,26 @@ int reset_slave(THD *thd, Master_info* m
goto err;
// delete relay logs, clear relay log coordinates
- if ((error= purge_relay_logs(&mi->rli, thd,
+ if ((error= purge_relay_logs(mi->rli, thd,
1 /* just reset */,
&errmsg)))
goto err;
/* Clear master's log coordinates */
init_master_log_pos(mi);
+
/*
Reset errors (the idea is that we forget about the
old master).
*/
mi->clear_error();
- mi->rli.clear_error();
- mi->rli.clear_until_condition();
+ mi->rli->clear_error();
+ mi->rli->clear_until_condition();
+ mi->end_master_info();
- // close master_info_file, relay_log_info_file, set mi->inited=rli->inited=0
- end_master_info(mi);
- // and delete these two files
- fn_format(fname, master_info_file, mysql_data_home, "", 4+32);
- if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
+ if (mi->reset_master_info() || mi->rli->reset_relay_log_info())
{
- error=1;
- goto err;
- }
- // delete relay_log_info_file
- fn_format(fname, relay_log_info_file, mysql_data_home, "", 4+32);
- if (my_stat(fname, &stat_area, MYF(0)) && my_delete(fname, MYF(MY_WME)))
- {
- error=1;
+ error= 1;
goto err;
}
@@ -1430,7 +1419,7 @@ bool change_master(THD* thd, Master_info
}
// TODO: see if needs re-write
- if (init_master_info(mi, master_info_file, relay_log_info_file, 0,
+ if (mi->init_master_info(master_info_file, 0,
thread_mask))
{
my_message(ER_MASTER_INFO, ER(ER_MASTER_INFO), MYF(0));
@@ -1536,16 +1525,16 @@ bool change_master(THD* thd, Master_info
if (lex_mi->relay_log_name)
{
need_relay_log_purge= 0;
- strmake(mi->rli.group_relay_log_name,lex_mi->relay_log_name,
- sizeof(mi->rli.group_relay_log_name)-1);
- strmake(mi->rli.event_relay_log_name,lex_mi->relay_log_name,
- sizeof(mi->rli.event_relay_log_name)-1);
+ strmake(mi->rli->group_relay_log_name,lex_mi->relay_log_name,
+ sizeof(mi->rli->group_relay_log_name)-1);
+ strmake(mi->rli->event_relay_log_name,lex_mi->relay_log_name,
+ sizeof(mi->rli->event_relay_log_name)-1);
}
if (lex_mi->relay_log_pos)
{
need_relay_log_purge= 0;
- mi->rli.group_relay_log_pos= mi->rli.event_relay_log_pos= lex_mi->relay_log_pos;
+ mi->rli->group_relay_log_pos= mi->rli->event_relay_log_pos= lex_mi->relay_log_pos;
}
/*
@@ -1567,22 +1556,22 @@ bool change_master(THD* thd, Master_info
need_relay_log_purge)
{
/*
- Sometimes mi->rli.master_log_pos == 0 (it happens when the SQL thread is
+ Sometimes mi->rli->master_log_pos == 0 (it happens when the SQL thread is
not initialized), so we use a max().
- What happens to mi->rli.master_log_pos during the initialization stages
+ What happens to mi->rli->master_log_pos during the initialization stages
of replication is not 100% clear, so we guard against problems using
max().
*/
mi->master_log_pos = max(BIN_LOG_HEADER_SIZE,
- mi->rli.group_master_log_pos);
- strmake(mi->master_log_name, mi->rli.group_master_log_name,
+ mi->rli->group_master_log_pos);
+ strmake(mi->master_log_name, mi->rli->group_master_log_name,
sizeof(mi->master_log_name)-1);
}
/*
Relay log's IO_CACHE may not be inited, if rli->inited==0 (server was never
a slave before).
*/
- if (flush_master_info(mi, 0))
+ if (mi->flush_master_info(0))
{
my_error(ER_RELAY_LOG_INIT, MYF(0), "Failed to flush master info file");
ret= TRUE;
@@ -1592,7 +1581,7 @@ bool change_master(THD* thd, Master_info
{
relay_log_purge= 1;
thd_proc_info(thd, "Purging old relay logs");
- if (purge_relay_logs(&mi->rli, thd,
+ if (purge_relay_logs(mi->rli, thd,
0 /* not only reset, but also reinit */,
&errmsg))
{
@@ -1606,9 +1595,9 @@ bool change_master(THD* thd, Master_info
const char* msg;
relay_log_purge= 0;
/* Relay log is already initialized */
- if (init_relay_log_pos(&mi->rli,
- mi->rli.group_relay_log_name,
- mi->rli.group_relay_log_pos,
+ if (init_relay_log_pos(mi->rli,
+ mi->rli->group_relay_log_name,
+ mi->rli->group_relay_log_pos,
0 /*no data lock*/,
&msg, 0))
{
@@ -1627,19 +1616,19 @@ bool change_master(THD* thd, Master_info
''/0: we have lost all copies of the original good coordinates.
That's why we always save good coords in rli.
*/
- mi->rli.group_master_log_pos= mi->master_log_pos;
+ mi->rli->group_master_log_pos= mi->master_log_pos;
DBUG_PRINT("info", ("master_log_pos: %lu", (ulong) mi->master_log_pos));
- strmake(mi->rli.group_master_log_name,mi->master_log_name,
- sizeof(mi->rli.group_master_log_name)-1);
+ strmake(mi->rli->group_master_log_name,mi->master_log_name,
+ sizeof(mi->rli->group_master_log_name)-1);
- if (!mi->rli.group_master_log_name[0]) // uninitialized case
- mi->rli.group_master_log_pos=0;
+ if (!mi->rli->group_master_log_name[0]) // uninitialized case
+ mi->rli->group_master_log_pos=0;
- pthread_mutex_lock(&mi->rli.data_lock);
- mi->rli.abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
+ pthread_mutex_lock(&mi->rli->data_lock);
+ mi->rli->abort_pos_wait++; /* for MASTER_POS_WAIT() to abort */
/* Clear the errors, for a clean start */
- mi->rli.clear_error();
- mi->rli.clear_until_condition();
+ mi->rli->clear_error();
+ mi->rli->clear_until_condition();
/*
If we don't write new coordinates to disk now, then old will remain in
relay-log.info until START SLAVE is issued; but if mysqld is shutdown
@@ -1647,9 +1636,9 @@ bool change_master(THD* thd, Master_info
in-memory value at restart (thus causing errors, as the old relay log does
not exist anymore).
*/
- flush_relay_log_info(&mi->rli);
+ mi->rli->flush_relay_log_info();
pthread_cond_broadcast(&mi->data_cond);
- pthread_mutex_unlock(&mi->rli.data_lock);
+ pthread_mutex_unlock(&mi->rli->data_lock);
err:
unlock_slave_threads(mi);
@@ -2121,19 +2110,21 @@ static sys_var_const sys_slave_skip_e
static sys_var_long_ptr sys_slave_trans_retries(&vars, "slave_transaction_retries",
&slave_trans_retries);
static sys_var_slave_skip_counter sys_slave_skip_counter(&vars, "sql_slave_skip_counter");
+static sys_var_bool_ptr sys_transact_replication(&vars, "transact_replication",
+ &transact_replication);
bool sys_var_slave_skip_counter::check(THD *thd, set_var *var)
{
int result= 0;
pthread_mutex_lock(&LOCK_active_mi);
- pthread_mutex_lock(&active_mi->rli.run_lock);
- if (active_mi->rli.slave_running)
+ pthread_mutex_lock(&active_mi->rli->run_lock);
+ if (active_mi->rli->slave_running)
{
my_message(ER_SLAVE_MUST_STOP, ER(ER_SLAVE_MUST_STOP), MYF(0));
result=1;
}
- pthread_mutex_unlock(&active_mi->rli.run_lock);
+ pthread_mutex_unlock(&active_mi->rli->run_lock);
pthread_mutex_unlock(&LOCK_active_mi);
var->save_result.ulong_value= (ulong) var->value->val_int();
return result;
@@ -2143,19 +2134,19 @@ bool sys_var_slave_skip_counter::check(T
bool sys_var_slave_skip_counter::update(THD *thd, set_var *var)
{
pthread_mutex_lock(&LOCK_active_mi);
- pthread_mutex_lock(&active_mi->rli.run_lock);
+ pthread_mutex_lock(&active_mi->rli->run_lock);
/*
The following test should normally never be true as we test this
in the check function; To be safe against multiple
SQL_SLAVE_SKIP_COUNTER request, we do the check anyway
*/
- if (!active_mi->rli.slave_running)
+ if (!active_mi->rli->slave_running)
{
- pthread_mutex_lock(&active_mi->rli.data_lock);
- active_mi->rli.slave_skip_counter= var->save_result.ulong_value;
- pthread_mutex_unlock(&active_mi->rli.data_lock);
+ pthread_mutex_lock(&active_mi->rli->data_lock);
+ active_mi->rli->slave_skip_counter= var->save_result.ulong_value;
+ pthread_mutex_unlock(&active_mi->rli->data_lock);
}
- pthread_mutex_unlock(&active_mi->rli.run_lock);
+ pthread_mutex_unlock(&active_mi->rli->run_lock);
pthread_mutex_unlock(&LOCK_active_mi);
return 0;
}
| Thread |
|---|
| • bzr commit into mysql-6.0-rpl branch (alfranio.correia:2818) | Alfranio Correia | 2 Mar |