Hi Jason!
Here are my comments on the code.
Best wishes,
Mats Kindahl
He Zhenxing wrote:
> #At file:///media/sda3/work/mysql/bzrwork/semisync/mysql-6.0-semi-sync-1.0/
>
> 2637 He Zhenxing 2008-07-10
> WL#4398 Replication interface for semi-synchronous replication
>
> Removed thd_net_/read/write/flush and mysql_net/read/write/flush
> functions from the interface.
>
> Add rpl_connect_master function
>
> Record the binary log file and position after every flush of binlog file
> of current transaction and pass these to Trans_observer in the Trans_param
> structure. This is required to remove the thread model dependency of
> semi-sync components.
> modified:
> include/mysql/plugin.h
> sql/replication.h
> sql/rpl_handler.cc
> sql/rpl_handler.h
> sql/slave.cc
>
> per-file messages:
> include/mysql/plugin.h
> Removed thd_net_/read/write/flush and mysql_net/read/write/flush
> functions from the interface.
> sql/replication.h
> Add log_file and log_pos to Trans_param
> Add rpl_connect_master function
> sql/rpl_handler.cc
> Removed thd_net_/read/write/flush and mysql_net/read/write/flush
> functions from the interface.
> Record and pass the latest binlog file and position written by current
> transaction to Trans_observer
> sql/rpl_handler.h
> Add init_param to Binlog_relay_IO_param to initialize parameters.
> sql/slave.cc
> Add rpl_connect_master function
> === modified file 'include/mysql/plugin.h'
> --- a/include/mysql/plugin.h 2008-06-26 14:22:39 +0000
> +++ b/include/mysql/plugin.h 2008-07-10 11:24:34 +0000
> @@ -630,86 +630,6 @@ void mysql_query_cache_invalidate4(MYSQL
> int using_trx);
>
> /**
> - Read a packet from the current thread connection.
> -
> - @note The packet buffer will be allocated and freed automatically,
> - the memory pointed to by @a packet will be overwritten or freed by
> - the next read or write using current thread connection.
> -
> - @param packet return the pointer to the packet read
> - @param len return the length of packet read
> -
> - @retval 0 Success
> - @retval 1 Failure
> -*/
> -int thd_net_read(const unsigned char **packet, size_t *len);
> -
> -/**
> - Write a packet to the current thread connection.
> -
> - @note The packet buffer will be allocated and freed automatically,
> - the memory pointed to by @a packet will be overwritten or freed by
> - the next read or write using current thread connection.
> -
> - @param packet packet to write to the connection
> - @param len length of the packet
> -
> - @retval 0 Success
> - @retval 1 Failure
> -*/
> -int thd_net_write(const unsigned char *packet, size_t len);
> -
> -/**
> - Flush write buffer of current thread connection.
> -
> - @retval 0 Success
> - @retval 1 Failure
> -*/
> -int thd_net_flush();
> -
> -/**
> - Read a packet from the connection.
> -
> - @note The packet buffer will be allocated and freed automatically,
> - the memory pointed to by @a packet will be overwritten or freed by
> - the next read or write using the same @a mysql connection.
> -
> - @param mysql mysql client connection
> - @param packet return the pointer to the packet read
> - @param len return the length of packet read
> -
> - @retval 0 Success
> - @retval 1 Failure
> -*/
> -int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len);
> -
> -/**
> - Write a packet to the connection.
> -
> - @note The packet buffer will be allocated and freed automatically,
> - the memory pointed to by @a packet will be overwritten or freed by
> - the next read or write using the same @a mysql connection.
> -
> - @param mysql mysql client connection
> - @param packet packet to write to the connection
> - @param len length of the packet
> -
> - @retval 0 Success
> - @retval 1 Failure
> -*/
> -int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len);
> -
> -/**
> - Flush write buffer of connection.
> -
> - @param mysql mysql client connection
> -
> - @retval 0 Success
> - @retval 1 Failure
> -*/
> -int mysql_net_flush(MYSQL *mysql);
> -
> -/**
> Get the value of user variable as an integer.
>
> This function will return the value of variable @a name as an
>
> === modified file 'sql/replication.h'
> --- a/sql/replication.h 2008-06-26 14:22:39 +0000
> +++ b/sql/replication.h 2008-07-10 11:24:34 +0000
> @@ -34,6 +34,16 @@ enum Trans_flags {
> typedef struct Trans_param {
> uint32 server_id;
> uint32 flags;
> +
> + /*
> + The latest binary log file name and position written by current
> + transaction, if binary log is disabled or no log event has been
> + written into binary log file by current transaction (events
> + written into transaction log cache are not counted), these two
> + member will be zero.
> + */
> + const char *log_file;
> + my_off_t log_pos;
> } Trans_param;
>
> /**
> @@ -244,7 +254,7 @@ typedef struct Binlog_relay_IO_param {
> /* Master host, user and port */
> char *host;
> char *user;
> - uint port;
> + unsigned int port;
>
> char *master_log_name;
> my_off_t master_log_pos;
> @@ -423,6 +433,19 @@ int register_binlog_relay_io_observer(Bi
> */
> int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> *p);
>
> +/**
> + Connect to master
> +
> + This function can only used in the slave I/O thread context, and
> + will use the same master information to do the connection.
What happens if it is not used in a slave I/O context? You can leave it
undefined (in the same sense as in the standard), but then add a text
for that so that nobody misses it.
> +
> + @param mysql address of MYSQL structure to use, pass NULL will
> + create a new one
> +
> + @return address of MYSQL structure on success, NULL on failure
> +*/
> +MYSQL *rpl_connect_master(MYSQL *mysql);
> +
> #ifdef __cplusplus
> }
> #endif
>
> === modified file 'sql/rpl_handler.cc'
> --- a/sql/rpl_handler.cc 2008-06-26 14:22:39 +0000
> +++ b/sql/rpl_handler.cc 2008-07-10 11:24:34 +0000
> @@ -29,46 +29,15 @@ Binlog_transmit_delegate *binlog_transmi
> Binlog_relay_IO_delegate *binlog_relay_io_delegate;
> #endif /* HAVE_REPLICATION */
>
> -int thd_net_read(const unsigned char **packet, size_t *len)
> -{
> - THD *thd= current_thd;
> - ulong ret= my_net_read(&thd->net);
> - if (ret == packet_error)
> - return 1;
> - *len= ret;
> - *packet= thd->net.read_pos;
> - return 0;
> -}
> -
> -int thd_net_write(const unsigned char *packet, size_t len)
> -{
> - return my_net_write(¤t_thd->net, packet, len);
> -}
> -
> -int thd_net_flush()
> -{
> - return net_flush(¤t_thd->net);
> -}
> -
> -int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len)
> -{
> - ulong ret= my_net_read(&mysql->net);
> - if (ret == packet_error)
> - return 1;
> - *len= ret;
> - *packet= mysql->net.read_pos;
> - return 0;
> -}
> -
> -int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len)
> -{
> - return my_net_write(&mysql->net, packet, len);
> -}
> +/*
> + structure to save transaction log filename and position
> +*/
> +typedef struct Trans_binlog_info {
> + my_off_t log_pos;
> + char log_file[FN_REFLEN];
> +} Trans_binlog_info;
>
> -int mysql_net_flush(MYSQL *mysql)
> -{
> - return net_flush(&mysql->net);
> -}
> +static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
>
> int get_user_var_int(const char *name,
> long long int *value, int *null_value)
> @@ -138,6 +107,9 @@ int delegates_init()
> #endif /* HAVE_REPLICATION */
> )
> return 1;
> +
> + if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
> + return 1;
> return 0;
> }
>
> @@ -170,36 +142,73 @@ void delegates_destroy()
> my_plugin_lock(thd, &info->plugin); \
> if (!plugin) \
> { \
> - unlock(); \
> - return 1; \
> + ret= 1; \
> + break; \
> } \
> if (((Observer *)info->observer)->f \
> && ((Observer *)info->observer)->f args)
> \
> { \
> - unlock(); \
> - return 1; \
> + ret= 1; \
> + break; \
> } \
> } \
> - unlock(); \
> - return 0
> + unlock()
Might be a better idea to pass the 'ret' variable as a parameter to the
macro. That will make it clear that the variable is needed.
>
>
> int Trans_delegate::after_commit(THD *thd, bool all)
> {
> Trans_param param;
> - if (all || thd->transaction.all.ha_list == 0)
> + bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
> + if (is_real_trans)
> param.flags |= TRANS_IS_REAL_TRANS;
>
> + Trans_binlog_info *log_info=
> + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
> +
> + param.log_file= log_info ? log_info->log_file : 0;
> + param.log_pos= log_info ? log_info->log_pos : 0;
> +
> + int ret= 0;
> FOREACH_OBSERVER(after_commit, thd, (¶m));
> +
> + /*
> + This is the end of a real transaction or autocommit statement, we
> + can free the memory allocated for binlog file and position.
> + */
> + if (is_real_trans && log_info)
> + {
> + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
> + my_free(log_info, MYF(0));
> + }
> + return ret;
> }
>
> int Trans_delegate::after_rollback(THD *thd, bool all)
> {
> Trans_param param;
> - if (all || thd->transaction.all.ha_list == 0)
> + bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
> + if (is_real_trans)
> param.flags |= TRANS_IS_REAL_TRANS;
>
> - FOREACH_OBSERVER(after_rollback, thd, (¶m));
> + Trans_binlog_info *log_info=
> + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
> +
> + param.log_file= log_info ? log_info->log_file : 0;
> + param.log_pos= log_info ? log_info->log_pos : 0;
> +
> + int ret= 0;
> + FOREACH_OBSERVER(after_commit, thd, (¶m));
> +
> + /*
> + This is the end of a real transaction or autocommit statement, we
> + can free the memory allocated for binlog file and position.
> + */
> + if (is_real_trans && log_info)
> + {
> + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
> + my_free(log_info, MYF(0));
> + }
> + return ret;
> }
>
> int Binlog_storage_delegate::after_flush(THD *thd,
> @@ -212,7 +221,24 @@ int Binlog_storage_delegate::after_flush
> if (synced)
> flags |= BINLOG_STORAGE_IS_SYNCED;
>
> - FOREACH_OBSERVER(after_flush, thd, (¶m, log_file, log_pos, flags));
> + Trans_binlog_info *log_info=
> + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
> +
> + if (!log_info)
> + {
> + if(!(log_info=
> + (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
> + return 1;
> + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
> + }
> +
> + strcpy(log_info->log_file, log_file+dirname_length(log_file));
> + log_info->log_pos = log_pos;
> +
> + int ret= 0;
> + FOREACH_OBSERVER(after_flush, thd,
> + (¶m, log_info->log_file, log_info->log_pos,
> flags));
> + return ret;
> }
>
> #ifdef HAVE_REPLICATION
> @@ -223,7 +249,9 @@ int Binlog_transmit_delegate::transmit_s
> Binlog_transmit_param param;
> param.flags= flags;
>
> + int ret= 0;
> FOREACH_OBSERVER(transmit_start, thd, (¶m, log_file, log_pos));
> + return ret;
> }
>
> int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
> @@ -231,7 +259,9 @@ int Binlog_transmit_delegate::transmit_s
> Binlog_transmit_param param;
> param.flags= flags;
>
> + int ret= 0;
> FOREACH_OBSERVER(transmit_stop, thd, (¶m));
> + return ret;
> }
>
> int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
> @@ -248,7 +278,8 @@ int Binlog_transmit_delegate::reserve_he
> Binlog_transmit_param param;
> param.flags= flags;
> param.server_id= thd->server_id;
> -
> +
> + int ret= 0;
> read_lock();
> Observer_info_iterator iter= observer_info_iter();
> Observer_info *info= iter++;
> @@ -258,8 +289,8 @@ int Binlog_transmit_delegate::reserve_he
> my_plugin_lock(thd, &info->plugin);
> if (!plugin)
> {
> - unlock();
> - return 1;
> + ret= 1;
> + break;
> }
> hlen= 0;
> if (((Observer *)info->observer)->reserve_header
> @@ -268,19 +299,19 @@ int Binlog_transmit_delegate::reserve_he
> RESERVE_HEADER_SIZE,
> &hlen))
> {
> - unlock();
> - return 1;
> + ret= 1;
> + break;
> }
> if (hlen == 0)
> continue;
> if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
> {
> - unlock();
> - return 1;
> + ret= 1;
> + break;
> }
> }
> unlock();
> - return 0;
> + return ret;
> }
>
> int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
> @@ -291,9 +322,12 @@ int Binlog_transmit_delegate::before_sen
> Binlog_transmit_param param;
> param.flags= flags;
>
> + int ret= 0;
> FOREACH_OBSERVER(before_send_event, thd,
> (¶m, (uchar *)packet->c_ptr(),
> - packet->length(), log_file, log_pos));
> + packet->length(),
> + log_file+dirname_length(log_file), log_pos));
> + return ret;
> }
>
> int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
> @@ -302,8 +336,10 @@ int Binlog_transmit_delegate::after_send
> Binlog_transmit_param param;
> param.flags= flags;
>
> + int ret= 0;
> FOREACH_OBSERVER(after_send_event, thd,
> (¶m, packet->c_ptr(), packet->length()));
> + return ret;
> }
>
> int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
> @@ -312,34 +348,42 @@ int Binlog_transmit_delegate::after_rese
> Binlog_transmit_param param;
> param.flags= flags;
>
> + int ret= 0;
> FOREACH_OBSERVER(after_reset_master, thd, (¶m));
> + return ret;
> +}
> +
> +void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
> + Master_info *mi)
> +{
> + param->mysql= mi->mysql;
> + param->user= mi->user;
> + param->host= mi->host;
> + param->port= mi->port;
> + param->master_log_name= mi->master_log_name;
> + param->master_log_pos= mi->master_log_pos;
> }
>
> int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
> {
> Binlog_relay_IO_param param;
> - param.mysql= mi->mysql;
> - param.user= mi->user;
> - param.host= mi->host;
> - param.port= mi->port;
> - param.master_log_name= mi->master_log_name;
> - param.master_log_pos= mi->master_log_pos;
> + init_param(¶m, mi);
>
> + int ret= 0;
> FOREACH_OBSERVER(thread_start, thd, (¶m));
> + return ret;
> }
>
> +
> int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
> {
>
> Binlog_relay_IO_param param;
> - param.mysql= mi->mysql;
> - param.user= mi->user;
> - param.host= mi->host;
> - param.port= mi->port;
> - param.master_log_name= mi->master_log_name;
> - param.master_log_pos= mi->master_log_pos;
> + init_param(¶m, mi);
>
> + int ret= 0;
> FOREACH_OBSERVER(thread_stop, thd, (¶m));
> + return ret;
> }
>
> int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
> @@ -347,14 +391,11 @@ int Binlog_relay_IO_delegate::before_req
> ushort flags)
> lo{
> Binlog_relay_IO_param param;
> - param.mysql= mi->mysql;
> - param.user= mi->user;
> - param.host= mi->host;
> - param.port= mi->port;
> - param.master_log_name= mi->master_log_name;
> - param.master_log_pos= mi->master_log_pos;
> + init_param(¶m, mi);
>
> + int ret= 0;
> FOREACH_OBSERVER(before_request_transmit, thd, (¶m, (uint32)flags));
> + return ret;
> }
>
> int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
> @@ -363,15 +404,12 @@ int Binlog_relay_IO_delegate::after_read
> ulong *event_len)
> {
> Binlog_relay_IO_param param;
> - param.mysql= mi->mysql;
> - param.user= mi->user;
> - param.host= mi->host;
> - param.port= mi->port;
> - param.master_log_name= mi->master_log_name;
> - param.master_log_pos= mi->master_log_pos;
> + init_param(¶m, mi);
>
> + int ret= 0;
> FOREACH_OBSERVER(after_read_event, thd,
> (¶m, packet, len, event_buf, event_len));
> + return ret;
> }
>
> int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
> @@ -380,33 +418,27 @@ int Binlog_relay_IO_delegate::after_queu
> bool synced)
> {
> Binlog_relay_IO_param param;
> - param.mysql= mi->mysql;
> - param.user= mi->user;
> - param.host= mi->host;
> - param.port= mi->port;
> - param.master_log_name= mi->master_log_name;
> - param.master_log_pos= mi->master_log_pos;
> + init_param(¶m, mi);
>
> uint32 flags=0;
> if (synced)
> flags |= BINLOG_STORAGE_IS_SYNCED;
>
> + int ret= 0;
> FOREACH_OBSERVER(after_queue_event, thd,
> (¶m, event_buf, event_len, flags));
> + return ret;
> }
>
> int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
>
> {
> Binlog_relay_IO_param param;
> - param.mysql= mi->mysql;
> - param.user= mi->user;
> - param.host= mi->host;
> - param.port= mi->port;
> - param.master_log_name= mi->master_log_name;
> - param.master_log_pos= mi->master_log_pos;
> + init_param(¶m, mi);
>
> + int ret= 0;
> FOREACH_OBSERVER(after_reset_slave, thd, (¶m));
> + return ret;
> }
> #endif /* HAVE_REPLICATION */
>
>
> === modified file 'sql/rpl_handler.h'
> --- a/sql/rpl_handler.h 2008-06-26 14:22:39 +0000
> +++ b/sql/rpl_handler.h 2008-07-10 11:24:34 +0000
> @@ -187,6 +187,8 @@ public:
> const char *event_buf, ulong event_len,
> bool synced);
> int after_reset_slave(THD *thd, Master_info *mi);
> +private:
> + void init_param(Binlog_relay_IO_param *param, Master_info *mi);
> };
> #endif /* HAVE_REPLICATION */
>
> @@ -200,7 +202,12 @@ extern Binlog_transmit_delegate *binlog_
> extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
> #endif /* HAVE_REPLICATION */
>
> +/*
> + if there is no observers in the delegate, we can return 0
> + immediately.
> +*/
> #define RUN_HOOK(group, hook, args) \
> - group ##_delegate->hook args
> + (group ##_delegate->is_empty() ? \
> + 0 : group ##_delegate->hook args)
Very nice.
>
> #endif /* RPL_HANDLER_H */
>
> === modified file 'sql/slave.cc'
> --- a/sql/slave.cc 2008-06-26 14:22:39 +0000
> +++ b/sql/slave.cc 2008-07-10 11:24:34 +0000
> @@ -68,6 +68,8 @@ ulonglong relay_log_space_limit = 0;
> int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
> int events_till_abort = -1;
>
> +static pthread_key(Master_info*, RPL_MASTER_INFO);
> +
> enum enum_slave_reconnect_actions
> {
> SLAVE_RECON_ACT_REG= 0,
> @@ -2140,6 +2142,8 @@ pthread_handler_t handle_slave_io(void *
> mi->master_log_name,
> llstr(mi->master_log_pos,llbuff)));
>
> + /* This must be called before run any binlog_relay_io hooks */
> + my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
>
> if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
> goto err;
> @@ -3482,6 +3486,62 @@ static int safe_reconnect(THD* thd, MYSQ
> }
>
>
> +MYSQL *rpl_connect_master(MYSQL *mysql)
> +{
> + THD *thd= current_thd;
> + Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
> + if (!mi)
> + {
> + sql_print_error("'rpl_connect_master' must be called in slave I/O thread
> context.");
> + return NULL;
> + }
You can replace all this code
> +
> + bool allocated= false;
> +
> + if (!mysql)
> + {
> + if(!(mysql= mysql_init(NULL)))
> + return NULL;
> + allocated= true;
> + }
with just:
if (!(mysql= mysql_init(mysql))
return NULL;
the C API keeps track of whether memory was allocated or not internally,
so there is no need to do that. Only difference is that now
rpl_connect_master() does a mysql_init() as well, but that is quite
natural and considering the greatly simplified code, that is a minor
difference.
> +
> + /*
> + XXX: copied from connect_to_master, this function should not
> + change the slave status, so we cannot use connect_to_master
> + directly
> + */
> + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
> + mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
Maybe factor out the common parts into a separate function?
> +
> +#ifdef HAVE_OPENSSL
> + if (mi->ssl)
> + {
> + mysql_ssl_set(mysql,
> + mi->ssl_key[0]?mi->ssl_key:0,
> + mi->ssl_cert[0]?mi->ssl_cert:0,
> + mi->ssl_ca[0]?mi->ssl_ca:0,
> + mi->ssl_capath[0]?mi->ssl_capath:0,
> + mi->ssl_cipher[0]?mi->ssl_cipher:0);
> + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
> + &mi->ssl_verify_server_cert);
> + }
> +#endif
> +
> + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
> + /* This one is not strictly needed but we have it here for completeness */
> + mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
> +
> + if (io_slave_killed(thd, mi)
> + || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
> + mi->port, 0, 0))
> + {
> + if (allocated)
> + mysql_close(mysql); // this will free the object
No need to keep track of whether it was allocated or not, that is done
internally. Just write:
mysql_close(mysql);
> + return NULL;
> + }
> + return mysql;
> +}
> +
> /*
> Store the file and position where the execute-slave thread are in the
> relay log.
>
>
--
Mats Kindahl
Lead Software Developer
Replication Team
MySQL AB, www.mysql.com