Sven Sandberg wrote:
> Hi Zhenxing,
>
> Thanks for this. I think you refactored this completely right! Great
> that you also changed some functions to static.
>
> I will only ask the same changes as in the previous patch: please use
> #ifndef HAVE_SERVER_UUID, and please don't include headers from headers.
>
Fixed
> So if you can fix these things, I'm happy to approve this patch.
Thank you!
> /Sven
>
>
> He Zhenxing wrote:
> > #At file:///home/hezx/work/mysql/bzr/w5789/trunk/ based on
> revid:tor.didriksen@stripped
> >
> > 3710 He Zhenxing 2011-03-02
> > WL#5789 Replication master interface
> >
> > In this worklog, we define an interface for the replication master. We
> only create
> > a class that groups together the functions that the core server needs in
> order to
> > execute master-related commands.
> >
> > added:
> > sql/rpl.h
> > modified:
> > sql/mysqld.cc
> > sql/mysqld.h
> > sql/rpl_master.cc
> > sql/rpl_master.h
> > sql/sql_parse.cc
> > === modified file 'sql/mysqld.cc'
> > --- a/sql/mysqld.cc 2011-02-22 02:58:40 +0000
> > +++ b/sql/mysqld.cc 2011-03-02 06:53:30 +0000
> > @@ -1485,7 +1485,7 @@ void clean_up(bool print_message)
> > bitmap_free(&temp_pool);
> > free_max_user_conn();
> > #ifdef HAVE_REPLICATION
> > - end_slave_list();
> > + rpl_master.end();
> > #endif
> > delete binlog_filter;
> > delete rpl_filter;
> > @@ -3945,7 +3945,8 @@ static int init_server_components()
> > setup_fpu();
> > init_thr_lock();
> > #ifdef HAVE_REPLICATION
> > - init_slave_list();
> > + if (rpl_master.init())
> > + unireg_abort(1);
> > #endif
> >
> > /* Setup logs */
> >
> > === modified file 'sql/mysqld.h'
> > --- a/sql/mysqld.h 2011-02-18 11:39:05 +0000
> > +++ b/sql/mysqld.h 2011-03-02 06:53:30 +0000
> > @@ -154,6 +154,7 @@ extern char pidfile_name[FN_REFLEN], sys
> > extern char default_logfile_name[FN_REFLEN];
> > extern char log_error_file[FN_REFLEN], *opt_tc_log_file;
> > /*Move UUID_LENGTH from item_strfunc.h*/
> > +#define HAVE_SERVER_UUID 1
> > #define UUID_LENGTH (8+1+4+1+4+1+4+1+12)
> > extern char server_uuid[UUID_LENGTH+1];
> > extern const char *server_uuid_ptr;
> >
> > === added file 'sql/rpl.h'
> > --- a/sql/rpl.h 1970-01-01 00:00:00 +0000
> > +++ b/sql/rpl.h 2011-03-02 06:53:30 +0000
> > @@ -0,0 +1,33 @@
> > +#ifndef RPL_H_INCLUDED
> > +/* Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
> > +
> > + This program is free software; you can redistribute it and/or modify
> > + it under the terms of the GNU General Public License as published by
> > + the Free Software Foundation; version 2 of the License.
> > +
> > + This program is distributed in the hope that it will be useful,
> > + but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + GNU General Public License for more details.
> > +
> > + You should have received a copy of the GNU General Public License
> > + along with this program; if not, write to the Free Software Foundation,
> > + 51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA */
> > +
> > +#define RPL_H_INCLUDED
> > +
> > +#ifdef HAVE_REPLICATION
> > +
> > +class Rpl_master {
> > +public:
> > + int init();
> > + void end();
> > + int register_slave(THD* thd, char* packet, uint packet_length);
> > + int show_slave_hosts(THD* thd);
> > + int binlog_dump(THD* thd, char* packet, uint packet_length);
> > +};
> > +
> > +extern Rpl_master rpl_master;
> > +
> > +#endif /*HAVE_REPLICATION */
> > +#endif /* RPL_H_INCLUDED */
> >
> > === modified file 'sql/rpl_master.cc'
> > --- a/sql/rpl_master.cc 2011-02-16 17:13:30 +0000
> > +++ b/sql/rpl_master.cc 2011-03-02 06:53:30 +0000
> > @@ -16,17 +16,41 @@
> >
> > #include "sql_priv.h"
> > #include "unireg.h"
> > -#include "sql_parse.h" // check_access
> > #ifdef HAVE_REPLICATION
> >
> > -#include "sql_acl.h" // SUPER_ACL
> > #include "log_event.h"
> > +#include "binlog.h"
> > #include "rpl_filter.h"
> > #include <my_dir.h>
> > #include "rpl_handler.h"
> > #include "rpl_master.h"
> > #include "debug_sync.h"
> >
> > +typedef struct st_slave_info
> > +{
> > + uint32 server_id;
> > + uint32 rpl_recovery_rank, master_id;
> > + char host[HOSTNAME_LENGTH+1];
> > + char user[USERNAME_LENGTH+1];
> > + char password[MAX_PASSWORD_LENGTH+1];
> > + uint16 port;
> > + THD* thd;
> > +} SLAVE_INFO;
> > +
> > +static void init_slave_list();
> > +static void end_slave_list();
> > +static void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
> > +
> > +#if HAVE_SERVER_UUID
> > +static String *get_slave_uuid(THD *thd, String *value);
> > +static void kill_zombie_dump_threads(String *slave_uuid);
> > +#else
> > +static void kill_zombie_dump_threads(uint32 slave_server_id);
> > +#endif
> > +
> > +static void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort
> flags);
> > +
> > +
> > int max_binlog_dump_events = 0; // unlimited
> > my_bool opt_sporadic_binlog_dump_fail = 0;
> >
> > @@ -86,7 +110,7 @@ static void init_all_slave_list_mutexes(
> > }
> > #endif /* HAVE_PSI_INTERFACE */
> >
> > -void init_slave_list()
> > +static void init_slave_list()
> > {
> > #ifdef HAVE_PSI_INTERFACE
> > init_all_slave_list_mutexes();
> > @@ -98,7 +122,7 @@ void init_slave_list()
> > mysql_mutex_init(key_LOCK_slave_list, &LOCK_slave_list,
> MY_MUTEX_INIT_FAST);
> > }
> >
> > -void end_slave_list()
> > +static void end_slave_list()
> > {
> > /* No protection by a mutex needed as we are only called at shutdown */
> > if (my_hash_inited(&slave_list))
> > @@ -117,15 +141,13 @@ void end_slave_list()
> > 1 Error. Error message sent to client
> > */
> >
> > -int register_slave(THD* thd, uchar* packet, uint packet_length)
> > +int Rpl_master::register_slave(THD* thd, char* packet, uint packet_length)
> > {
> > int res;
> > SLAVE_INFO *si;
> > - uchar *p= packet, *p_end= packet + packet_length;
> > + char *p= packet, *p_end= packet + packet_length;
> > const char *errmsg= "Wrong parameters to function register_slave";
> >
> > - if (check_access(thd, REPL_SLAVE_ACL, any_db, NULL, NULL, 0, 0))
> > - return 1;
> > if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
> > goto err2;
> >
> > @@ -162,7 +184,7 @@ err2:
> > return 1;
> > }
> >
> > -void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
> > +static void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
> > {
> > if (thd->server_id)
> > {
> > @@ -190,7 +212,7 @@ void unregister_slave(THD* thd, bool onl
> > @retval FALSE success
> > @retval TRUE failure
> > */
> > -bool show_slave_hosts(THD* thd)
> > +int Rpl_master::show_slave_hosts(THD* thd)
> > {
> > List<Item> field_list;
> > Protocol *protocol= thd->protocol;
> > @@ -207,8 +229,9 @@ bool show_slave_hosts(THD* thd)
> > field_list.push_back(new Item_return_int("Port", 7, MYSQL_TYPE_LONG));
> > field_list.push_back(new Item_return_int("Master_id", 10,
> > MYSQL_TYPE_LONG));
> > +#if HAVE_SERVER_UUID
> > field_list.push_back(new Item_empty_string("Slave_UUID", UUID_LENGTH));
> > -
> > +#endif
> > if (protocol->send_result_set_metadata(&field_list,
> > Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))
> > DBUG_RETURN(TRUE);
> > @@ -229,10 +252,12 @@ bool show_slave_hosts(THD* thd)
> > protocol->store((uint32) si->port);
> > protocol->store((uint32) si->master_id);
> >
> > +#if HAVE_SERVER_UUID
> > /* get slave's UUID */
> > String slave_uuid;
> > if (get_slave_uuid(si->thd, &slave_uuid))
> > protocol->store(slave_uuid.c_ptr_safe(), &my_charset_bin);
> > +#endif
> > if (protocol->write())
> > {
> > mysql_mutex_unlock(&LOCK_slave_list);
> > @@ -615,11 +640,12 @@ static int send_heartbeat_event(NET* net
> > DBUG_RETURN(0);
> > }
> >
> > +
> > /*
> > TODO: Clean up loop to only have one call to send_file()
> > */
> >
> > -void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
> > +static void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos,
> > ushort flags)
> > {
> > LOG_INFO linfo;
> > @@ -1333,7 +1359,7 @@ String *get_slave_uuid(THD *thd, String
> >
> > */
> >
> > -
> > +#if HAVE_SERVER_UUID
> > void kill_zombie_dump_threads(String *slave_uuid)
> > {
> > if (slave_uuid->length() == 0)
> > @@ -1370,6 +1396,36 @@ void kill_zombie_dump_threads(String *sl
> > }
> > }
> >
> > +#else /* HAVE_SERVER_UUID */
> > +
> > +void kill_zombie_dump_threads(uint32 slave_server_id)
> > +{
> > + mysql_mutex_lock(&LOCK_thread_count);
> > + I_List_iterator<THD> it(threads);
> > + THD *tmp;
> > +
> > + while ((tmp=it++))
> > + {
> > + if (tmp->command == COM_BINLOG_DUMP &&
> > + tmp->server_id == slave_server_id)
> > + {
> > + mysql_mutex_lock(&tmp->LOCK_thd_data); // Lock from delete
> > + break;
> > + }
> > + }
> > + mysql_mutex_unlock(&LOCK_thread_count);
> > + if (tmp)
> > + {
> > + /*
> > + Here we do not call kill_one_thread() as
> > + it will be slow because it will iterate through the list
> > + again. We just to do kill the thread ourselves.
> > + */
> > + tmp->awake(THD::KILL_QUERY);
> > + mysql_mutex_unlock(&tmp->LOCK_thd_data);
> > + }
> > +}
> > +#endif /* HAVE_SERVER_UUID */
> >
> > /**
> > Execute a RESET MASTER statement.
> > @@ -1521,4 +1577,50 @@ err:
> > DBUG_RETURN(TRUE);
> > }
> >
> > +int Rpl_master::init()
> > +{
> > + init_slave_list();
> > + return 0;
> > +}
> > +
> > +void Rpl_master::end()
> > +{
> > + end_slave_list();
> > +}
> > +
> > +int Rpl_master::binlog_dump(THD* thd, char* packet, uint packet_length)
> > +{
> > + ulong pos;
> > + ushort flags;
> > +#if HAVE_SERVER_UUID
> > + String slave_uuid;
> > +#else
> > + uint32 slave_server_id;
> > +#endif
> > +
> > + /* TODO: The following has to be changed to an 8 byte integer */
> > + pos = uint4korr(packet);
> > + flags = uint2korr(packet + 4);
> > +
> > +#if HAVE_SERVER_UUID
> > + thd->server_id= uint4korr(packet+6);
> > + get_slave_uuid(thd, &slave_uuid);
> > + kill_zombie_dump_threads(&slave_uuid);
> > +#else
> > + thd->server_id=0; /* avoid suicide */
> > + if ((slave_server_id= uint4korr(packet+6))) // mysqlbinlog.server_id==0
> > + kill_zombie_dump_threads(slave_server_id);
> > + thd->server_id = slave_server_id;
> > +#endif
> > +
> > + general_log_print(thd, COM_BINLOG_DUMP, "Log: '%s' Pos: %ld", packet+10,
> > + (long) pos);
> > + mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos, flags);
> > + unregister_slave(thd,1,1);
> > + /* fake COM_QUIT -- if we get here, the thread needs to terminate */
> > + return 1;
> > +}
> > +
> > +Rpl_master rpl_master;
> > +
> > #endif /* HAVE_REPLICATION */
> >
> > === modified file 'sql/rpl_master.h'
> > --- a/sql/rpl_master.h 2010-07-19 16:09:51 +0000
> > +++ b/sql/rpl_master.h 2011-03-02 06:53:30 +0000
> > @@ -16,32 +16,16 @@
> >
> >
> > #define RPL_MASTER_H_INCLUDED
> > +
> > +#include "rpl.h"
> > +
>
> Please, don't include headers from headers unless necessary.
>
> > extern bool server_id_supplied;
> > extern int max_binlog_dump_events;
> > extern my_bool opt_sporadic_binlog_dump_fail;
> > extern my_bool opt_show_slave_auth_info;
> >
> > -typedef struct st_slave_info
> > -{
> > - uint32 server_id;
> > - uint32 rpl_recovery_rank, master_id;
> > - char host[HOSTNAME_LENGTH+1];
> > - char user[USERNAME_LENGTH+1];
> > - char password[MAX_PASSWORD_LENGTH+1];
> > - uint16 port;
> > - THD* thd;
> > -} SLAVE_INFO;
> > -
> > -void init_slave_list();
> > -void end_slave_list();
> > -int register_slave(THD* thd, uchar* packet, uint packet_length);
> > -void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
> > -bool show_slave_hosts(THD* thd);
> > -
> > -String *get_slave_uuid(THD *thd, String *value);
> > bool mysql_show_binlog_events(THD* thd);
> > bool show_binlogs(THD* thd);
> > -void kill_zombie_dump_threads(String *slave_uuid);
> > -void mysql_binlog_send(THD* thd, char* log_ident, my_off_t pos, ushort flags);
> > int reset_master(THD* thd);
> > +
> > #endif /* RPL_MASTER_H_INCLUDED */
> >
> > === modified file 'sql/sql_parse.cc'
> > --- a/sql/sql_parse.cc 2011-02-21 11:34:14 +0000
> > +++ b/sql/sql_parse.cc 2011-03-02 06:53:30 +0000
> > @@ -972,7 +972,9 @@ bool dispatch_command(enum enum_server_c
> > #ifdef HAVE_REPLICATION
> > case COM_REGISTER_SLAVE:
> > {
> > - if (!register_slave(thd, (uchar*)packet, packet_length))
> > + if (check_global_access(thd, REPL_SLAVE_ACL))
> > + break;
> > + if (!rpl_master.register_slave(thd, packet, packet_length))
> > my_ok(thd);
> > break;
> > }
> > @@ -1232,29 +1234,11 @@ bool dispatch_command(enum enum_server_c
> > #ifndef EMBEDDED_LIBRARY
> > case COM_BINLOG_DUMP:
> > {
> > - ulong pos;
> > - ushort flags;
> > - String slave_uuid;
> > -
> > status_var_increment(thd->status_var.com_other);
> > thd->enable_slow_log= opt_log_slow_admin_statements;
> > if (check_global_access(thd, REPL_SLAVE_ACL))
> > break;
> > -
> > - /* TODO: The following has to be changed to an 8 byte integer */
> > - pos = uint4korr(packet);
> > - flags = uint2korr(packet + 4);
> > - thd->server_id= uint4korr(packet+6);
> > -
> > - get_slave_uuid(thd, &slave_uuid);
> > - kill_zombie_dump_threads(&slave_uuid);
> > -
> > - general_log_print(thd, command, "Log: '%s' Pos: %ld", packet+10,
> > - (long) pos);
> > - mysql_binlog_send(thd, thd->strdup(packet + 10), (my_off_t) pos,
> flags);
> > - unregister_slave(thd,1,1);
> > - /* fake COM_QUIT -- if we get here, the thread needs to terminate */
> > - error = TRUE;
> > + error = rpl_master.binlog_dump(thd, packet, packet_length);
> > break;
> > }
> > #endif
> > @@ -2192,7 +2176,7 @@ case SQLCOM_PREPARE:
> > {
> > if (check_global_access(thd, REPL_SLAVE_ACL))
> > goto error;
> > - res = show_slave_hosts(thd);
> > + res = rpl_master.show_slave_hosts(thd);
> > break;
> > }
> > case SQLCOM_SHOW_RELAYLOG_EVENTS:
> >
> >
> >
> > ------------------------------------------------------------------------
> >
> >
--
北京万里开源件有限公司
地址:北京市朝阳区东三环中路39号建外SOHO七号楼2302
邮编:100022
邮件:hezx@stripped
电话:(010) 58699616转318
传真:(010) 58699926
网站:http://www.greatopensource.com