#At file:///media/sda3/work/mysql/bzr/w5779/trunk/ based on revid:john.embretsen@stripped
3518 He Zhenxing 2011-01-31
WL#5779 Replication master and slave interface
Define interface for replication master and slave so that we can
extend replication without touching the server code.
Defined class Rpl_master and Rpl_slave as the interfaces for replication
master and slave handling respectively.
added:
sql/rpl.h
modified:
sql/field.cc
sql/field.h
sql/mysqld.cc
sql/mysqld.h
sql/rpl_filter.cc
sql/rpl_filter.h
sql/rpl_master.cc
sql/rpl_master.h
sql/rpl_slave.cc
sql/rpl_slave.h
sql/rpl_utility.cc
sql/sql_parse.cc
sql/sql_reload.cc
=== modified file 'sql/field.cc'
--- a/sql/field.cc 2010-12-29 00:38:59 +0000
+++ b/sql/field.cc 2011-01-31 07:48:25 +0000
@@ -27,8 +27,6 @@
#include "sql_priv.h"
#include "sql_select.h"
-#include "rpl_rli.h" // Pull in Relay_log_info
-#include "rpl_slave.h" // Pull in rpl_master_has_bug()
#include "strfunc.h" // find_type2, find_set
#include "sql_time.h" // str_to_datetime_with_warn,
// str_to_time_with_warn,
@@ -1398,7 +1396,6 @@ bool Field::send_binary(Protocol *protoc
master's field size, @c false otherwise.
*/
bool Field::compatible_field_size(uint field_metadata,
- Relay_log_info *rli_arg __attribute__((unused)),
uint16 mflags __attribute__((unused)),
int *order_var)
{
@@ -2896,7 +2893,6 @@ uint Field_new_decimal::pack_length_from
@return @c true
*/
bool Field_new_decimal::compatible_field_size(uint field_metadata,
- Relay_log_info * __attribute__((unused)),
uint16 mflags __attribute__((unused)),
int *order_var)
{
@@ -6491,37 +6487,12 @@ my_decimal *Field_string::val_decimal(my
}
-struct Check_field_param {
- Field *field;
-};
-
-#ifdef HAVE_REPLICATION
-static bool
-check_field_for_37426(const void *param_arg)
-{
- Check_field_param *param= (Check_field_param*) param_arg;
- DBUG_ASSERT(param->field->real_type() == MYSQL_TYPE_STRING);
- DBUG_PRINT("debug", ("Field %s - type: %d, size: %d",
- param->field->field_name,
- param->field->real_type(),
- param->field->row_pack_length()));
- return param->field->row_pack_length() > 255;
-}
-#endif
-
bool
Field_string::compatible_field_size(uint field_metadata,
- Relay_log_info *rli_arg,
uint16 mflags __attribute__((unused)),
int *order_var)
{
-#ifdef HAVE_REPLICATION
- const Check_field_param check_param = { this };
- if (rpl_master_has_bug(rli_arg, 37426, TRUE,
- check_field_for_37426, &check_param))
- return FALSE; // Not compatible field sizes
-#endif
- return Field::compatible_field_size(field_metadata, rli_arg, mflags, order_var);
+ return Field::compatible_field_size(field_metadata, mflags, order_var);
}
@@ -8891,7 +8862,6 @@ uint Field_bit::pack_length_from_metadat
*/
bool
Field_bit::compatible_field_size(uint field_metadata,
- Relay_log_info * __attribute__((unused)),
uint16 mflags,
int *order_var)
{
=== modified file 'sql/field.h'
--- a/sql/field.h 2010-11-19 19:27:31 +0000
+++ b/sql/field.h 2011-01-31 07:48:25 +0000
@@ -36,7 +36,6 @@
class Send_field;
class Protocol;
class Create_field;
-class Relay_log_info;
class Field;
enum enum_check_fields
@@ -202,7 +201,7 @@ public:
table, which is located on disk).
*/
virtual uint32 pack_length_in_rec() const { return pack_length(); }
- virtual bool compatible_field_size(uint metadata, Relay_log_info *rli,
+ virtual bool compatible_field_size(uint metadata,
uint16 mflags, int *order);
virtual uint pack_length_from_metadata(uint field_metadata)
{
@@ -926,7 +925,7 @@ public:
uint32 pack_length() const { return (uint32) bin_size; }
uint pack_length_from_metadata(uint field_metadata);
uint row_pack_length() { return pack_length(); }
- bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
+ bool compatible_field_size(uint field_metadata,
uint16 mflags, int *order_var);
uint is_equal(Create_field *new_field);
virtual const uchar *unpack(uchar* to, const uchar *from,
@@ -1621,7 +1620,7 @@ public:
return row_pack_length();
return (((field_metadata >> 4) & 0x300) ^ 0x300) + (field_metadata & 0x00ff);
}
- bool compatible_field_size(uint field_metadata, Relay_log_info *rli,
+ bool compatible_field_size(uint field_metadata,
uint16 mflags, int *order_var);
uint row_pack_length() { return field_length; }
int pack_cmp(const uchar *a,const uchar *b,uint key_length,
@@ -2087,7 +2086,7 @@ public:
uint pack_length_from_metadata(uint field_metadata);
uint row_pack_length()
{ return (bytes_in_rec + ((bit_len > 0) ? 1 : 0)); }
- bool compatible_field_size(uint metadata, Relay_log_info *rli,
+ bool compatible_field_size(uint metadata,
uint16 mflags, int *order_var);
void sql_type(String &str) const;
virtual uchar *pack(uchar *to, const uchar *from,
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2011-01-14 13:42:41 +0000
+++ b/sql/mysqld.cc 2011-01-31 07:48:25 +0000
@@ -53,7 +53,6 @@
#include <m_ctype.h>
#include <my_dir.h>
#include <my_bit.h>
-#include "rpl_slave.h"
#include "rpl_master.h"
#include "rpl_mi.h"
#include "rpl_filter.h"
@@ -1111,7 +1110,7 @@ static void close_connections(void)
mysql_mutex_unlock(&LOCK_thread_count); // For unlink from list
Events::deinit();
- end_slave();
+ rpl_slave.end();
if (thread_count)
sleep(2); // Give threads time to die
@@ -1473,7 +1472,7 @@ void clean_up(bool print_message)
bitmap_free(&temp_pool);
free_max_user_conn();
#ifdef HAVE_REPLICATION
- end_slave_list();
+ rpl_master.end();
#endif
delete binlog_filter;
delete rpl_filter;
@@ -3535,18 +3534,11 @@ You should consider changing lower_case_
&my_charset_utf8_tolower_ci :
&my_charset_bin);
- /*
- Build do_table and ignore_table rules to hush
- after the resetting of table_alias_charset
- */
- if (rpl_filter->build_do_table_hash() ||
- rpl_filter->build_ignore_table_hash())
- {
- sql_print_error("An error occurred while building do_table"
- "and ignore_table rules to hush.");
+#ifdef HAVE_REPLICATION
+ if (rpl_filter->table_alias_charset_changed())
return 1;
- }
-
+#endif
+
return 0;
}
@@ -3914,7 +3906,8 @@ static int init_server_components()
setup_fpu();
init_thr_lock();
#ifdef HAVE_REPLICATION
- init_slave_list();
+ if (rpl_master.init())
+ unireg_abort(1);
#endif
/* Setup logs */
@@ -4752,14 +4745,9 @@ int mysqld_main(int argc, char **argv)
binlog_unsafe_map_init();
/*
init_slave() must be called after the thread keys are created.
- Some parts of the code (e.g. SHOW STATUS LIKE 'slave_running' and other
- places) assume that active_mi != 0, so let's fail if it's 0 (out of
- memory); a message has already been printed.
*/
- if (init_slave() && !active_mi)
- {
+ if (rpl_slave.init())
unireg_abort(1);
- }
#ifdef WITH_PERFSCHEMA_STORAGE_ENGINE
initialize_performance_schema_acl(opt_bootstrap);
=== modified file 'sql/mysqld.h'
--- a/sql/mysqld.h 2011-01-14 13:42:41 +0000
+++ b/sql/mysqld.h 2011-01-31 07:48:25 +0000
@@ -150,6 +150,7 @@ extern char pidfile_name[FN_REFLEN], sys
extern char default_logfile_name[FN_REFLEN];
extern char log_error_file[FN_REFLEN], *opt_tc_log_file;
/*Move UUID_LENGTH from item_strfunc.h*/
+#define HAVE_SERVER_UUID 1
#define UUID_LENGTH (8+1+4+1+4+1+4+1+12)
extern char server_uuid[UUID_LENGTH+1];
extern const char *server_uuid_ptr;
=== added file 'sql/rpl.h'
--- a/sql/rpl.h 1970-01-01 00:00:00 +0000
+++ b/sql/rpl.h 2011-01-31 07:48:25 +0000
@@ -0,0 +1,49 @@
+#ifndef RPL_H_INCLUDED
+/* Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software Foundation,
+ 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
+
+#define RPL_H_INCLUDED
+
+#ifdef HAVE_REPLICATION
+
+class Rpl_master {
+public:
+ int init();
+ void end();
+ int register_slave(THD* thd, char* packet, uint packet_length);
+ int show_slave_hosts(THD* thd);
+ int binlog_dump(THD* thd, char* packet, uint packet_length);
+ int show_master_status(THD* thd);
+ int reset_master(THD* thd);
+};
+
+class Rpl_slave {
+public:
+ int init();
+ void end();
+ int change_master(THD* thd);
+ int start_slave(THD* thd);
+ int stop_slave(THD* thd);
+ int reset_slave(THD* thd);
+ int show_slave_status(THD* thd);
+ int flush_relay_log(THD* thd);
+ int show_relay_log_events(THD* thd);
+};
+
+extern Rpl_master rpl_master;
+extern Rpl_slave rpl_slave;
+
+#endif /*HAVE_REPLICATION */
+#endif /* RPL_H_INCLUDED */
=== modified file 'sql/rpl_filter.cc'
--- a/sql/rpl_filter.cc 2010-11-05 22:14:29 +0000
+++ b/sql/rpl_filter.cc 2011-01-31 07:48:25 +0000
@@ -18,6 +18,7 @@
#include "rpl_filter.h"
#include "hash.h" // my_hash_free
#include "table.h" // TABLE_LIST
+#include "log.h" // sql_print_error
#define TABLE_RULE_HASH_SIZE 16
#define TABLE_RULE_ARR_SIZE 16
@@ -687,3 +688,19 @@ Rpl_filter::get_ignore_db()
{
return &ignore_db;
}
+
+int Rpl_filter::table_alias_charset_changed()
+{
+ /*
+ Build do_table and ignore_table rules to hush
+ after the resetting of table_alias_charset
+ */
+ if (rpl_filter->build_do_table_hash() ||
+ rpl_filter->build_ignore_table_hash())
+ {
+ sql_print_error("An error occurred while building do_table"
+ "and ignore_table rules to hush.");
+ return 1;
+ }
+ return 0;
+}
=== modified file 'sql/rpl_filter.h'
--- a/sql/rpl_filter.h 2010-10-13 23:16:09 +0000
+++ b/sql/rpl_filter.h 2011-01-31 07:48:25 +0000
@@ -58,7 +58,11 @@ public:
int build_do_table_hash();
int build_ignore_table_hash();
+ int add_do_table(const char* table_spec)
+ { return add_do_table_array(table_spec); }
int add_do_table_array(const char* table_spec);
+ int add_ignore_table(const char* table_spec)
+ { return add_ignore_table_array(table_spec); }
int add_ignore_table_array(const char* table_spec);
int add_wild_do_table(const char* table_spec);
@@ -82,6 +86,8 @@ public:
I_List<i_string>* get_do_db();
I_List<i_string>* get_ignore_db();
+ int table_alias_charset_changed();
+
private:
bool table_rules_on;
=== modified file 'sql/rpl_master.cc'
--- a/sql/rpl_master.cc 2010-12-17 10:07:30 +0000
+++ b/sql/rpl_master.cc 2011-01-31 07:48:25 +0000
@@ -16,16 +16,40 @@
#include "sql_priv.h"
#include "unireg.h"
-#include "sql_parse.h" // check_access
#ifdef HAVE_REPLICATION
-#include "sql_acl.h" // SUPER_ACL
#include "log_event.h"
+#include "binlog.h"
#include "rpl_filter.h"
#include <my_dir.h>
#include "rpl_handler.h"
#include "rpl_master.h"
+typedef struct st_slave_info
+{
+ uint32 server_id;
+ uint32 rpl_recovery_rank, master_id;
+ char host[HOSTNAME_LENGTH+1];
+ char user[USERNAME_LENGTH+1];
+ char password[MAX_PASSWORD_LENGTH+1];
+ uint16 port;
+ THD* thd;
+} SLAVE_INFO;
+
+static void init_slave_list();
+static void end_slave_list();
+static void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
+
+#if HAVE_SERVER_UUID
+static String *get_slave_uuid(THD *thd, String *value);
+static void kill_zombie_dump_threads(String *slave_uuid);
+#else
+static void kill_zombie_dump_threads(uint32 slave_server_id);
+#endif
+
+static void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
+
+
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -85,7 +109,7 @@ static void init_all_slave_list_mutexes(
}
#endif /* HAVE_PSI_INTERFACE */
-void init_slave_list()
+static void init_slave_list()
{
#ifdef HAVE_PSI_INTERFACE
init_all_slave_list_mutexes();
@@ -97,7 +121,7 @@ void init_slave_list()
mysql_mutex_init(key_LOCK_slave_list, &LOCK_slave_list, MY_MUTEX_INIT_FAST);
}
-void end_slave_list()
+static void end_slave_list()
{
/* No protection by a mutex needed as we are only called at shutdown */
if (my_hash_inited(&slave_list))
@@ -116,15 +140,13 @@ void end_slave_list()
1 Error. Error message sent to client
*/
-int register_slave(THD* thd, uchar* packet, uint packet_length)
+int Rpl_master::register_slave(THD* thd, char* packet, uint packet_length)
{
int res;
SLAVE_INFO *si;
- uchar *p= packet, *p_end= packet + packet_length;
+ char *p= packet, *p_end= packet + packet_length;
const char *errmsg= "Wrong parameters to function register_slave";
- if (check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0))
- return 1;
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err2;
@@ -161,7 +183,7 @@ err2:
return 1;
}
-void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
+static void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
if (thd->server_id)
{
@@ -189,7 +211,7 @@ void unregister_slave(THD* thd, bool onl
@retval FALSE success
@retval TRUE failure
*/
-bool show_slave_hosts(THD* thd)
+int Rpl_master::show_slave_hosts(THD* thd)
{
List<Item> field_list;
Protocol *protocol= thd->protocol;
@@ -206,8 +228,9 @@ bool show_slave_hosts(THD* thd)
field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG));
field_list.push_back(new Item_return_int("Master_id", 10,
MYSQL_TYPE_LONG));
+#if HAVE_SERVER_UUID
field_list.push_back(new Item_empty_string("Slave_UUID", UUID_LENGTH));
-
+#endif
if (protocol->send_result_set_metadata(&field_list,
Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
DBUG_RETURN(TRUE);
@@ -228,10 +251,12 @@ bool show_slave_hosts(THD* thd)
protocol->store((uint32) si->port);
protocol->store((uint32) si->master_id);
+#if HAVE_SERVER_UUID
/* get slave's UUID */
String slave_uuid;
if (get_slave_uuid(si->thd, &slave_uuid))
protocol->store(slave_uuid.c_ptr_safe(), &my_charset_bin);
+#endif
if (protocol->write())
{
mysql_mutex_unlock(&LOCK_slave_list);
@@ -614,11 +639,12 @@ static int send_heartbeat_event(NET* net
DBUG_RETURN(0);
}
+
/*
TODO: Clean up loop to only have one call to send_file()
*/
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
+static void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
ushort flags)
{
LOG_INFO linfo;
@@ -1303,7 +1329,7 @@ String *get_slave_uuid(THD *thd, String
*/
-
+#if HAVE_SERVER_UUID
void kill_zombie_dump_threads(String *slave_uuid)
{
if (slave_uuid->length() == 0)
@@ -1340,6 +1366,36 @@ void kill_zombie_dump_threads(String *sl
}
}
+#else /* HAVE_SERVER_UUID */
+
+void kill_zombie_dump_threads(uint32 slave_server_id)
+{
+ mysql_mutex_lock(&LOCK_thread_count);
+ I_List_iterator<THD> it(threads);
+ THD *tmp;
+
+ while ((tmp=it++))
+ {
+ if (tmp->command == COM_BINLOG_DUMP &&
+ tmp->server_id == slave_server_id)
+ {
+ mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
+ break;
+ }
+ }
+ mysql_mutex_unlock(&LOCK_thread_count);
+ if (tmp)
+ {
+ /*
+ Here we do not call kill_one_thread() as
+ it will be slow because it will iterate through the list
+ again. We just to do kill the thread ourselves.
+ */
+ tmp->awake(THD::KILL_QUERY);
+ mysql_mutex_unlock(&tmp->LOCK_thd_data);
+ }
+}
+#endif /* HAVE_SERVER_UUID */
/**
Execute a RESET MASTER statement.
@@ -1350,7 +1406,7 @@ void kill_zombie_dump_threads(String *sl
@retval 0 success
@retval 1 error
*/
-int reset_master(THD* thd)
+int Rpl_master::reset_master(THD* thd)
{
if (!mysql_bin_log.is_open())
{
@@ -1365,23 +1421,6 @@ int reset_master(THD* thd)
return 0;
}
-int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
- const char* log_file_name2, ulonglong log_pos2)
-{
- int res;
- size_t log_file_name1_len= strlen(log_file_name1);
- size_t log_file_name2_len= strlen(log_file_name2);
-
- // We assume that both log names match up to '.'
- if (log_file_name1_len == log_file_name2_len)
- {
- if ((res= strcmp(log_file_name1, log_file_name2)))
- return res;
- return (log_pos1 < log_pos2) ? -1 : (log_pos1 == log_pos2) ? 0 : 1;
- }
- return ((log_file_name1_len < log_file_name2_len) ? -1 : 1);
-}
-
/**
Execute a SHOW MASTER STATUS statement.
@@ -1508,4 +1547,55 @@ err:
DBUG_RETURN(TRUE);
}
+int Rpl_master::init()
+{
+ init_slave_list();
+ return 0;
+}
+
+void Rpl_master::end()
+{
+ end_slave_list();
+}
+
+int Rpl_master::binlog_dump(THD* thd, char* packet, uint packet_length)
+{
+ ulong pos;
+ ushort flags;
+#if HAVE_SERVER_UUID
+ String slave_uuid;
+#else
+ uint32 slave_server_id;
+#endif
+
+ /* TODO: The following has to be changed to an 8 byte integer */
+ pos = uint4korr(packet);
+ flags = uint2korr(packet + 4);
+
+#if HAVE_SERVER_UUID
+ thd->server_id= uint4korr(packet+6);
+ get_slave_uuid(thd, &slave_uuid);
+ kill_zombie_dump_threads(&slave_uuid);
+#else
+ thd->server_id=0; /* avoid suicide */
+ if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
+ kill_zombie_dump_threads(slave_server_id);
+ thd->server_id = slave_server_id;
+#endif
+
+ general_log_print(thd, COM_BINLOG_DUMP, "Log: '%s' Pos: %ld", packet+10,
+ (long) pos);
+ mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags);
+ unregister_slave(thd,1,1);
+ /* fake COM_QUIT -- if we get here, the thread needs to terminate */
+ return 1;
+}
+
+int Rpl_master::show_master_status(THD* thd)
+{
+ return show_binlog_info(thd);
+}
+
+Rpl_master rpl_master;
+
#endif /* HAVE_REPLICATION */
=== modified file 'sql/rpl_master.h'
--- a/sql/rpl_master.h 2010-07-19 16:09:51 +0000
+++ b/sql/rpl_master.h 2011-01-31 07:48:25 +0000
@@ -16,32 +16,15 @@
#define RPL_MASTER_H_INCLUDED
+
+#include "rpl.h"
+
extern bool server_id_supplied;
extern int max_binlog_dump_events;
extern my_bool opt_sporadic_binlog_dump_fail;
extern my_bool opt_show_slave_auth_info;
-typedef struct st_slave_info
-{
- uint32 server_id;
- uint32 rpl_recovery_rank, master_id;
- char host[HOSTNAME_LENGTH+1];
- char user[USERNAME_LENGTH+1];
- char password[MAX_PASSWORD_LENGTH+1];
- uint16 port;
- THD* thd;
-} SLAVE_INFO;
-
-void init_slave_list();
-void end_slave_list();
-int register_slave(THD* thd, uchar* packet, uint packet_length);
-void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
-bool show_slave_hosts(THD* thd);
-
-String *get_slave_uuid(THD *thd, String *value);
bool mysql_show_binlog_events(THD* thd);
bool show_binlogs(THD* thd);
-void kill_zombie_dump_threads(String *slave_uuid);
-void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
-int reset_master(THD* thd);
+
#endif /* RPL_MASTER_H_INCLUDED */
=== modified file 'sql/rpl_slave.cc'
--- a/sql/rpl_slave.cc 2010-12-10 16:55:50 +0000
+++ b/sql/rpl_slave.cc 2011-01-31 07:48:25 +0000
@@ -158,7 +158,9 @@ static int connect_to_master(THD* thd, M
static int safe_sleep(THD* thd, int sec, CHECK_KILLED_FUNC thread_killed,
void* thread_killed_arg);
static int get_master_version_and_clock(MYSQL* mysql, Master_info* mi);
+#if HAVE_SERVER_UUID
static int get_master_uuid(MYSQL *mysql, Master_info *mi);
+#endif
int io_thread_init_commands(MYSQL *mysql, Master_info *mi);
static Log_event* next_event(Relay_log_info* rli);
static int queue_event(Master_info* mi,const char* buf,ulong event_len);
@@ -1154,6 +1156,7 @@ bool is_network_error(uint errorno)
return FALSE;
}
+#if HAVE_SERVER_UUID
/**
Set user variables after connecting to the master.
@@ -1279,6 +1282,7 @@ static int get_master_uuid(MYSQL *mysql,
mysql_free_result(master_res);
return ret;
}
+#endif /* HAVE_SERVER_UUID */
/*
Note that we rely on the master's version (3.23, 4.0.14 etc) instead of
@@ -2057,7 +2061,9 @@ bool show_master_info(THD* thd, Master_i
FN_REFLEN));
field_list.push_back(new Item_return_int("Master_Server_Id", sizeof(ulong),
MYSQL_TYPE_LONG));
+#if HAVE_SERVER_UUID
field_list.push_back(new Item_empty_string("Master_UUID", UUID_LENGTH));
+#endif
field_list.push_back(new Item_empty_string("Master_Info_File",
2 * FN_REFLEN));
field_list.push_back(new Item_return_int("SQL_Delay", 10, MYSQL_TYPE_LONG));
@@ -2235,7 +2241,9 @@ bool show_master_info(THD* thd, Master_i
}
// Master_Server_id
protocol->store((uint32) mi->master_id);
+#if HAVE_SERVER_UUID
protocol->store(mi->master_uuid, &my_charset_bin);
+#endif
// Master_Info_File
protocol->store(mi->get_description_info(), &my_charset_bin);
// SQL_Delay
@@ -3268,10 +3276,12 @@ connected:
thd->slave_net = &mysql->net;
thd_proc_info(thd, "Checking master version");
ret= get_master_version_and_clock(mysql, mi);
+#if HAVE_SERVER_UUID
if (!ret)
ret= get_master_uuid(mysql, mi);
if (!ret)
io_thread_init_commands(mysql, mi);
+#endif
if (ret == 1)
/* Fatal error */
@@ -5580,8 +5590,6 @@ int start_slave(THD* thd , Master_info*
int thread_mask;
DBUG_ENTER("start_slave");
- if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
- DBUG_RETURN(1);
lock_slave_threads(mi); // this allows us to cleanly read slave_running
// Get a mask of _stopped_ threads
init_thread_mask(&thread_mask,mi,1 /* inverse */);
@@ -5718,8 +5726,6 @@ int stop_slave(THD* thd, Master_info* mi
if (!thd)
thd = current_thd;
- if (check_access(thd, SUPER_ACL, any_db, NULL, NULL, 0, 0))
- DBUG_RETURN(1);
thd_proc_info(thd, "Killing slave");
int thread_mask;
lock_slave_threads(mi);
@@ -5898,7 +5904,9 @@ bool change_master(THD* thd, Master_info
if ((lex_mi->host && strcmp(lex_mi->host, mi->host)) ||
(lex_mi->port && lex_mi->port != mi->port))
{
+#if HAVE_SERVER_UUID
mi->master_uuid[0]= 0;
+#endif
mi->master_id= 0;
}
@@ -6130,6 +6138,135 @@ err:
DBUG_RETURN(ret);
}
+int Rpl_slave::change_master(THD* thd)
+{
+ int res= 0;
+ mysql_mutex_lock(&LOCK_active_mi);
+ res = ::change_master(thd,active_mi);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return res;
+}
+
+int Rpl_slave::start_slave(THD*thd)
+{
+ mysql_mutex_lock(&LOCK_active_mi);
+ ::start_slave(thd,active_mi,1 /* net report*/);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return 0;
+}
+
+int Rpl_slave::stop_slave(THD* thd)
+{
+ /*
+ If the client thread has locked tables, a deadlock is possible.
+ Assume that
+ - the client thread does LOCK TABLE t READ.
+ - then the master updates t.
+ - then the SQL slave thread wants to update t,
+ so it waits for the client thread because t is locked by it.
+ - then the client thread does SLAVE STOP.
+ SLAVE STOP waits for the SQL slave thread to terminate its
+ update t, which waits for the client thread because t is locked by it.
+ To prevent that, refuse SLAVE STOP if the
+ client thread has locked tables
+ */
+ if (thd->locked_tables_mode ||
+ thd->in_active_multi_stmt_transaction() || thd->global_read_lock.is_acquired())
+ {
+ my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
+ ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
+ return 1;
+ }
+ mysql_mutex_lock(&LOCK_active_mi);
+ int res= ::stop_slave(thd,active_mi,1/* net report*/);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return res;
+}
+
+int Rpl_slave::show_slave_status(THD* thd)
+{
+ int res= 0;
+ mysql_mutex_lock(&LOCK_active_mi);
+ if (active_mi != NULL)
+ {
+ res = show_master_info(thd, active_mi);
+ }
+ else
+ {
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ WARN_NO_MASTER_INFO, ER(WARN_NO_MASTER_INFO));
+ my_ok(thd);
+ }
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return res;
+}
+
+int Rpl_slave::flush_relay_log(THD* thd)
+{
+ int res= 0;
+ mysql_mutex_lock(&LOCK_active_mi);
+ rotate_relay_log(active_mi);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return res;
+}
+
+int Rpl_slave::reset_slave(THD* thd)
+{
+ int res= 0;
+ mysql_mutex_lock(&LOCK_active_mi);
+ res= ::reset_slave(thd, active_mi);
+ mysql_mutex_unlock(&LOCK_active_mi);
+ return res;
+}
+
+/**
+ Execute a SHOW RELAYLOG EVENTS statement.
+
+ @param thd Pointer to THD object for the client thread executing the
+ statement.
+
+ @retval FALSE success
+ @retval TRUE failure
+*/
+int Rpl_slave::show_relay_log_events(THD* thd)
+{
+ Protocol *protocol= thd->protocol;
+ List<Item> field_list;
+ DBUG_ENTER("mysql_show_relaylog_events");
+
+ DBUG_ASSERT(thd->lex->sql_command == SQLCOM_SHOW_RELAYLOG_EVENTS);
+
+ Log_event::init_show_field_list(&field_list);
+ if (protocol->send_result_set_metadata(&field_list,
+ Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
+ DBUG_RETURN(TRUE);
+
+ if (!active_mi)
+ DBUG_RETURN(TRUE);
+
+ DBUG_RETURN(show_binlog_events(thd, &active_mi->rli->relay_log));
+}
+
+int Rpl_slave::init()
+{
+ /*
+ init_slave() must be called after the thread keys are created.
+ Some parts of the code (e.g. SHOW STATUS LIKE 'slave_running' and other
+ places) assume that active_mi != 0, so let's fail if it's 0 (out of
+ memory); a message has already been printed.
+ */
+ if (init_slave() && !active_mi)
+ return 1;
+ return 0;
+}
+
+void Rpl_slave::end()
+{
+ end_slave();
+}
+
+Rpl_slave rpl_slave;
+
/**
@} (end of group Replication)
*/
=== modified file 'sql/rpl_slave.h'
--- a/sql/rpl_slave.h 2010-12-10 16:55:50 +0000
+++ b/sql/rpl_slave.h 2011-01-31 07:48:25 +0000
@@ -48,6 +48,7 @@
#include "my_list.h"
#include "rpl_filter.h"
#include "rpl_tblmap.h"
+#include "rpl.h"
#define SLAVE_NET_TIMEOUT 3600
@@ -125,7 +126,6 @@ extern bool use_slave_mask;
extern char *slave_load_tmpdir;
extern char *master_info_file, *relay_log_info_file;
extern char *opt_relay_logname, *opt_relaylog_index_name;
-extern char *opt_binlog_index_name;
extern my_bool opt_skip_slave_start, opt_reckless_slave;
extern my_bool opt_log_slave_updates;
extern char *opt_slave_skip_errors;
@@ -158,8 +158,6 @@ extern const char *relay_log_basename;
int start_slave(THD* thd, Master_info* mi, bool net_report);
int stop_slave(THD* thd, Master_info* mi, bool net_report);
bool change_master(THD* thd, Master_info* mi);
-int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
- const char* log_file_name2, ulonglong log_pos2);
int reset_slave(THD *thd, Master_info* mi);
int init_slave();
int init_recovery(Master_info* mi, const char** errmsg);
=== modified file 'sql/rpl_utility.cc'
--- a/sql/rpl_utility.cc 2010-11-16 12:14:06 +0000
+++ b/sql/rpl_utility.cc 2011-01-31 07:48:25 +0000
@@ -18,6 +18,7 @@
#ifndef MYSQL_CLIENT
#include "unireg.h" // REQUIRED by later includes
#include "rpl_rli.h"
+#include "rpl_slave.h"
#include "sql_select.h"
/**
@@ -543,6 +544,24 @@ bool is_conversion_ok(int order, Relay_l
DBUG_RETURN(true);
}
+struct Check_field_param {
+ Field *field;
+};
+
+#ifdef HAVE_REPLICATION
+static bool
+check_field_for_37426(const void *param_arg)
+{
+ Check_field_param *param= (Check_field_param*) param_arg;
+ DBUG_ASSERT(param->field->real_type() == MYSQL_TYPE_STRING);
+ DBUG_PRINT("debug", ("Field %s - type: %d, size: %d",
+ param->field->field_name,
+ param->field->real_type(),
+ param->field->row_pack_length()));
+ return param->field->row_pack_length() > 255;
+}
+#endif
+
/**
Can a type potentially be converted to another type?
@@ -608,8 +627,18 @@ can_convert_field_to(Field *field,
DBUG_RETURN(true);
}
+#ifdef HAVE_REPLICATION
+ if(field->real_type() == MYSQL_TYPE_STRING)
+ {
+ const Check_field_param check_param = { field };
+ if (rpl_master_has_bug(rli, 37426, TRUE,
+ check_field_for_37426, &check_param))
+ DBUG_RETURN(false); // Not compatible field sizes
+ }
+#endif
+
DBUG_PRINT("debug", ("Base types are identical, doing field size comparison"));
- if (field->compatible_field_size(metadata, rli, mflags, order_var))
+ if (field->compatible_field_size(metadata, mflags, order_var))
DBUG_RETURN(is_conversion_ok(*order_var, rli));
else
DBUG_RETURN(false);
=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc 2011-01-15 05:56:24 +0000
+++ b/sql/sql_parse.cc 2011-01-31 07:48:25 +0000
@@ -973,7 +973,9 @@ bool dispatch_command(enum enum_server_c
#ifdef HAVE_REPLICATION
case COM_REGISTER_SLAVE:
{
- if (!register_slave(thd, (uchar*)packet, packet_length))
+ if (check_global_access(thd, REPL_SLAVE_ACL))
+ break;
+ if (!rpl_master.register_slave(thd, packet, packet_length))
my_ok(thd);
break;
}
@@ -1233,29 +1235,11 @@ bool dispatch_command(enum enum_server_c
#ifndef EMBEDDED_LIBRARY
case COM_BINLOG_DUMP:
{
- ulong pos;
- ushort flags;
- String slave_uuid;
-
status_var_increment(thd->status_var.com_other);
thd->enable_slow_log= opt_log_slow_admin_statements;
if (check_global_access(thd, REPL_SLAVE_ACL))
break;
-
- /* TODO: The following has to be changed to an 8 byte integer */
- pos = uint4korr(packet);
- flags = uint2korr(packet + 4);
- thd->server_id= uint4korr(packet+6);
-
- get_slave_uuid(thd, &slave_uuid);
- kill_zombie_dump_threads(&slave_uuid);
-
- general_log_print(thd, command, "Log: '%s' Pos: %ld", packet+10,
- (long) pos);
- mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags);
- unregister_slave(thd,1,1);
- /* fake COM_QUIT -- if we get here, the thread needs to terminate */
- error = TRUE;
+ error = rpl_master.binlog_dump(thd, packet, packet_length);
break;
}
#endif
@@ -2202,14 +2186,14 @@ case SQLCOM_PREPARE:
{
if (check_global_access(thd, REPL_SLAVE_ACL))
goto error;
- res = show_slave_hosts(thd);
+ res = rpl_master.show_slave_hosts(thd);
break;
}
case SQLCOM_SHOW_RELAYLOG_EVENTS:
{
if (check_global_access(thd, REPL_SLAVE_ACL))
goto error;
- res = mysql_show_relaylog_events(thd);
+ res = rpl_slave.show_relay_log_events(thd);
break;
}
case SQLCOM_SHOW_BINLOG_EVENTS:
@@ -2248,9 +2232,7 @@ case SQLCOM_PREPARE:
{
if (check_global_access(thd, SUPER_ACL))
goto error;
- mysql_mutex_lock(&LOCK_active_mi);
- res = change_master(thd,active_mi);
- mysql_mutex_unlock(&LOCK_active_mi);
+ res = rpl_slave.change_master(thd);
break;
}
case SQLCOM_SHOW_SLAVE_STAT:
@@ -2258,18 +2240,7 @@ case SQLCOM_PREPARE:
/* Accept one of two privileges */
if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL))
goto error;
- mysql_mutex_lock(&LOCK_active_mi);
- if (active_mi != NULL)
- {
- res = show_master_info(thd, active_mi);
- }
- else
- {
- push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
- WARN_NO_MASTER_INFO, ER(WARN_NO_MASTER_INFO));
- my_ok(thd);
- }
- mysql_mutex_unlock(&LOCK_active_mi);
+ res = rpl_slave.show_slave_status(thd);
break;
}
case SQLCOM_SHOW_MASTER_STAT:
@@ -2277,7 +2248,7 @@ case SQLCOM_PREPARE:
/* Accept one of two privileges */
if (check_global_access(thd, SUPER_ACL | REPL_CLIENT_ACL))
goto error;
- res = show_binlog_info(thd);
+ res = rpl_master.show_master_status(thd);
break;
}
@@ -2547,36 +2518,18 @@ end_with_restore_list:
#ifdef HAVE_REPLICATION
case SQLCOM_SLAVE_START:
{
- mysql_mutex_lock(&LOCK_active_mi);
- start_slave(thd,active_mi,1 /* net report*/);
- mysql_mutex_unlock(&LOCK_active_mi);
+ if (check_global_access(thd, SUPER_ACL))
+ goto error;
+ if (rpl_slave.start_slave(thd))
+ goto error;
break;
}
case SQLCOM_SLAVE_STOP:
- /*
- If the client thread has locked tables, a deadlock is possible.
- Assume that
- - the client thread does LOCK TABLE t READ.
- - then the master updates t.
- - then the SQL slave thread wants to update t,
- so it waits for the client thread because t is locked by it.
- - then the client thread does SLAVE STOP.
- SLAVE STOP waits for the SQL slave thread to terminate its
- update t, which waits for the client thread because t is locked by it.
- To prevent that, refuse SLAVE STOP if the
- client thread has locked tables
- */
- if (thd->locked_tables_mode ||
- thd->in_active_multi_stmt_transaction() || thd->global_read_lock.is_acquired())
- {
- my_message(ER_LOCK_OR_ACTIVE_TRANSACTION,
- ER(ER_LOCK_OR_ACTIVE_TRANSACTION), MYF(0));
- goto error;
- }
{
- mysql_mutex_lock(&LOCK_active_mi);
- stop_slave(thd,active_mi,1/* net report*/);
- mysql_mutex_unlock(&LOCK_active_mi);
+ if (check_global_access(thd, SUPER_ACL))
+ goto error;
+ if (rpl_slave.stop_slave(thd))
+ goto error;
break;
}
#endif /* HAVE_REPLICATION */
=== modified file 'sql/sql_reload.cc'
--- a/sql/sql_reload.cc 2010-12-10 16:55:50 +0000
+++ b/sql/sql_reload.cc 2011-01-31 07:48:25 +0000
@@ -149,10 +149,8 @@ bool reload_acl_and_cache(THD *thd, unsi
if (options & REFRESH_RELAY_LOG)
{
#ifdef HAVE_REPLICATION
- mysql_mutex_lock(&LOCK_active_mi);
- if (rotate_relay_log(active_mi))
+ if (rpl_slave.flush_relay_log(thd))
*write_to_binlog= -1;
- mysql_mutex_unlock(&LOCK_active_mi);
#endif
}
#ifdef HAVE_QUERY_CACHE
@@ -258,7 +256,7 @@ bool reload_acl_and_cache(THD *thd, unsi
{
DBUG_ASSERT(thd);
tmp_write_to_binlog= 0;
- if (reset_master(thd))
+ if (rpl_master.reset_master(thd))
{
result=1;
}
@@ -275,10 +273,8 @@ bool reload_acl_and_cache(THD *thd, unsi
if (options & REFRESH_SLAVE)
{
tmp_write_to_binlog= 0;
- mysql_mutex_lock(&LOCK_active_mi);
- if (reset_slave(thd, active_mi))
+ if (rpl_slave.reset_slave(thd))
result=1;
- mysql_mutex_unlock(&LOCK_active_mi);
}
#endif
if (options & REFRESH_USER_RESOURCES)
Attachment: [text/bzr-bundle] bzr/zhenxing.he@sun.com-20110131074825-xizilfwbh4v96ygx.bundle
| Thread |
|---|
| • bzr commit into mysql-trunk branch (zhenxing.he:3518) WL#5779 | He Zhenxing | 31 Jan |