Hi Jason!
Thank you for this big piece of work. Generally, the interface is very
good. In long term, I think it will increase code robustness of
replication. The code is of high quality and it follows object oriented
principles. It is also quite well documented.
I have a few suggestions for improvements. I have been quite detailed,
so there are many, but most of them are small and easily fixable. Since
I understand it is important to get this pushed soon, I marked more
important comments with !!!. I think those have to be addressed before
the first push, and if the schedule is tight, you can address the other
comments later.
First, some general notes:
!!!==== Error handling ====
The patch handles errors returned from plugin callback inconsistently.
In the patch, all errors stop other hooks for the same call from being
invoked. However, after that, some errors are ignored (e.g.,
transaction_delegate.after_commit), some are reported to the user (e.g.,
binlog_relay_io), and some affect the server (e.g., errors in
binlog_transmit stops the dump thread in most cases).
We should have a consistent design for this. Two questions we need to
settle:
- Should the plugin report errors to the user itself, or should it
only pass an error code to the server and the server reports it to the
user? If possible, it would be good if the plugin could report the error
(since it can give a plugin-specific message), but I don't know the
plugin system well enough to know if that is possible.
- What should the server do after an error? I imagine plugins may want
to report different levels of errors, depending on the severity. I can
imagine the following severity levels:
0 - no error
1 - the server only reports an error and continues
2 - the server discards the currently processed event and does not
invoke any more plugins for the currently processed event.
3 - the server stops the current thread and does not invoke any more
plugins for the currently processed event (i.e., client/dump/io/sql thread).
I suggest we create an enum value which describes the actions that the
server can take. Then, each callback function returns that enumeration
type instead of 0/1.
==== Tabs ====
Please do not use tabs in source code. This patch contains a few tabs.
Hint: use the `expand' program to convert to spaces, and add
https://intranet.mysql.com/secure/paste/displaypaste.php?codeid=4565 to
~/.emacs
More comments inline.
He Zhenxing wrote:
> Hi,
>
> Here is the patch for code review of the replication inteface of
> WL#4398.
>
> Best regards!
>
>
>
> ------------------------------------------------------------------------
>
> Subject:
> bzr commit into mysql-6.0-semi-sync-1.0 tree (hezx:2635) WL#4398
> From:
> He Zhenxing <hezx@stripped>
> Date:
> Tue, 17 Jun 2008 11:05:49 +0800
> To:
> commits@stripped
>
> To:
> commits@stripped
>
>
> #At file:///media/sda3/work/mysql/bzrwork/semisync/mysql-6.0-semi-sync-1.0/
>
> ------------------------------------------------------------
> revno: 2635
> revision-id: hezx@stripped
> parent: sp1r-kostja@bodhi.(none)-20080420124250-11357
> committer: He Zhenxing <hezx@stripped>
> branch nick: mysql-6.0-semi-sync-1.0
> timestamp: 二 2008-06-17 11:04:54 +0800
> message:
> WL#4398 Replication Interface for semi-synchronous replication
>
> Add replication interface and plugin support
> added:
> sql/replication.h replication.h-20080520140908-iazlmzffc1xcnz71-2
> sql/rpl_handler.cc rpl_handler.cc-20080520140908-iazlmzffc1xcnz71-1
> sql/rpl_handler.h rpl_handler.h-20080520140902-36338uea7oevomai-1
> modified:
> include/mysql/plugin.h
> sp1f-plugin.h-20051105112032-xacmvx22ghtcgtqhu6v56b4bneqtx7l5
> libmysqld/Makefile.am
> sp1f-makefile.am-20010411110351-26htpk3ynkyh7pkfvnshztqrxx3few4g
> sql/Makefile.am
> sp1f-makefile.am-19700101030959-xsjdiakci3nqcdd4xl4yomwdl5eo2f3q
> sql/handler.cc
> sp1f-handler.cc-19700101030959-ta6zfrlbxzucylciyro3musjsdpocrdh
> sql/log.cc
> sp1f-log.cc-19700101030959-r3hdfovek4kl6nd64ovoaknmirota6bq
> sql/log.h
> sp1f-log.h-20051222053446-ggv6hdi5fnxggnjemezvv7n2bcbkx45e
> sql/mysqld.cc
> sp1f-mysqld.cc-19700101030959-zpswdvekpvixxzxf7gdtofzel7nywtfj
> sql/slave.cc
> sp1f-slave.cc-19700101030959-a636aj3mjxgu7fnznrg5kt77p3u2bvhh
> sql/sql_parse.cc
> sp1f-sql_parse.cc-19700101030959-ehcre3rwhv5l3mlxqhaxg36ujenxnrcd
> sql/sql_plugin.cc
> sp1f-sql_plugin.cc-20051105112032-hrm64p6xfjq33ud6zy3uivpo7azm75a2
> sql/sql_repl.cc
> sp1f-sql_repl.cc-20001002032713-xqbns5ofqsaebhgi2ypcfn7nhz7nh5rp
> per-file comments:
> include/mysql/plugin.h
> Add replication plugin support and some API functions
> sql/Makefile.am
> Add replication interface support
> sql/handler.cc
> Add replication interface support
> sql/log.cc
> Add replication interface support
> sql/mysqld.cc
> Add replication interface support
> sql/replication.h
> Add replication interface support
> sql/rpl_handler.cc
> Add replication interface support
> sql/rpl_handler.h
> Add replication interface support
> sql/slave.cc
> Add replication interface support
> sql/sql_parse.cc
> Add replication interface support
> sql/sql_plugin.cc
> Add replication plugin support
> sql/sql_repl.cc
> Add replication interface support
>
>
> ------------------------------------------------------------------------
>
>
>
> ------------------------------------------------------------------------
>
> === modified file 'include/mysql/plugin.h'
> --- a/include/mysql/plugin.h 2008-03-27 19:02:15 +0000
> +++ b/include/mysql/plugin.h 2008-06-17 03:04:54 +0000
> @@ -16,6 +16,11 @@
> #ifndef _my_plugin_h
> #define _my_plugin_h
>
> +/* size_t */
> +#include <stdlib.h>
> +
> +typedef struct st_mysql MYSQL;
> +
> #ifdef __cplusplus
> class THD;
> class Item;
> @@ -66,7 +71,8 @@
> #define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */
> #define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */
> #define MYSQL_AUDIT_PLUGIN 5 /* The Audit plugin type */
> -#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */
> +#define MYSQL_REPLICATION_PLUGIN 6 /* The replication plugin type */
> +#define MYSQL_MAX_PLUGIN_TYPE_NUM 7 /* The number of plugin types */
>
> /* We use the following strings to define licenses for plugins */
> #define PLUGIN_LICENSE_PROPRIETARY 0
> @@ -461,6 +467,19 @@
>
>
> /*************************************************************************
> + API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
> +*/
> +#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
> +
> +/**
> + Replication plugin descriptor
> +*/
> +struct st_mysql_replication {
> + int interface_version;
> +};
> +
> +
> +/*************************************************************************
> st_mysql_value struct for reading values from mysqld.
> Used by server variables framework to parse user-provided values.
> Will be used for arguments when implementing UDFs.
> @@ -610,6 +629,103 @@
> const char *key, unsigned int key_length,
> int using_trx);
>
> +/**
> + Read a packet from the thread connection.
I think it would be more clear to say "the calling thread's connection"
> +
> + @param packet return the pointer to the packet read
> + @param len return the length of packet read
Please be more specific on who owns the memory `*packet' and `len' point
to: the memory pointed to by `packet' belongs to the current THD object
of the current thread and will be freed automatically by the thread
handling system. The caller should make sure that `len' points to valid
memory, and the function will overwrite that memory.
> +
> + @return 0 on success, 1 on failure.
> +*/
> +int thd_net_read(const unsigned char **packet, size_t *len);
> +
> +/**
> + Write a packet to the thread connection.
> +
> + @param packet packet to write to the connection
> + @param len length of the packet
> +
> + @return 0 on success, 1 on failure
> +*/
> +int thd_net_write(const unsigned char *packet, size_t len);
> +
> +/**
> + Flush write buffer of thread connection.
> +
> + @return 0 on success, 1 on failure
> +*/
> +int thd_net_flush();
> +
> +/**
> + Read a packet from the connection.
> +
> + @param mysql mysql client connection
> + @param packet return the pointer to the packet read
> + @param len return the length of packet read
Same as for thd_net_read above.
> + @return 0 on success, 1 on failure.
> +*/
> +int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len);
> +
> +/**
> + Write a packet to the connection.
> +
> + @param mysql mysql client connection
> + @param packet packet to write to the connection
> + @param len length of the packet
> + @return 0 on success, 1 on failure
> +*/
> +int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len);
> +
> +/**
> + Flush write buffer of connection.
> +
> + @param mysql mysql client connection
> +
> + @return 0 on success, 1 on failure
> +*/
> +int mysql_net_flush(MYSQL *mysql);
> +
> +/**
> + Get the value of user variable as an integer.
> +
> + @param name user variable name
> + @param value pointer to return the value
> + @param null_value true if the value of variable is null, false if not
For value and null_value, please state that the caller has to provide
pointers to valid memory (the function will not allocate it). For
null_value, please say something like "the function will set *null_value
to true if ...", to clarify that it is not the caller that sets *null_value.
Please specify what happens if the variable is not of integer type.
> +
> + @return 0 success, 1 variable not found
> +*/
> +int get_user_var_int(const char *name,
> + long long int *value, int *null_value);
> +
> +/**
> + Get the value of user variable as a double precision float number.
> +
> + @param name user variable name
> + @param value pointer to return the value
> + @param null_value true if the value of variable is null, false if not
Same as for get_user_var_int.
> +
> + @return 0 success, 1 variable not found
> +*/
> +int get_user_var_real(const char *name,
> + double *value, int *null_value);
> +
> +/**
> + Get the value of user variable as a string.
Please state that if the variable holds a numeric value, it will be
converted to a string representation.
> +
> + @param name user variable name
> + @param value pointer to the value buffer
> + @param len length of the value buffer
> + @param precision precision of the value if it is a float number
> + @param null_value true if the value of variable is null, false if not
Same as for get_user_var_int.
> +
> + @return 0 success, 1 variable not found
> +*/
> +int get_user_var_str(const char *name,
> + char *value, unsigned long len,
> + unsigned int precision, int *null_value);
> +
> +
> #ifdef __cplusplus
> }
> #endif
>
> === modified file 'libmysqld/Makefile.am'
> --- a/libmysqld/Makefile.am 2008-03-27 22:47:31 +0000
> +++ b/libmysqld/Makefile.am 2008-06-17 03:04:54 +0000
> @@ -82,7 +82,8 @@
> rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \
> sql_tablespace.cc \
> rpl_injector.cc my_user.c partition_info.cc \
> - sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc
> + sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \
> + rpl_handler.cc
>
> libmysqld_int_a_SOURCES= $(libmysqld_sources)
> nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)
>
> === modified file 'sql/Makefile.am'
> --- a/sql/Makefile.am 2008-03-27 22:47:31 +0000
> +++ b/sql/Makefile.am 2008-06-17 03:04:54 +0000
> @@ -88,7 +88,8 @@
> event_data_objects.h event_scheduler.h \
> sql_partition.h partition_info.h partition_element.h \
> probes.h sql_audit.h \
> - contributors.h sql_servers.h ddl_blocker.h si_objects.h
> + contributors.h sql_servers.h ddl_blocker.h si_objects.h \
> + rpl_handler.h
>
> mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
> item.cc item_sum.cc item_buff.cc item_func.cc \
> @@ -133,7 +134,7 @@
> sql_plugin.cc sql_binlog.cc \
> sql_builtin.cc sql_tablespace.cc partition_info.cc \
> sql_servers.cc sql_audit.cc sha2.cc \
> - ddl_blocker.cc si_objects.cc
> + ddl_blocker.cc si_objects.cc rpl_handler.cc
>
> if HAVE_DTRACE
> mysqld_SOURCES += probes.d
>
> === modified file 'sql/handler.cc'
> --- a/sql/handler.cc 2008-04-07 15:32:14 +0000
> +++ b/sql/handler.cc 2008-06-17 03:04:54 +0000
> @@ -33,6 +33,8 @@
> #include "ha_partition.h"
> #endif
>
> +#include "rpl_handler.h"
> +
> /*
> While we have legacy_db_type, we have this array to
> check for dups and to find handlerton from legacy_db_type.
> @@ -1139,6 +1141,7 @@
> if (cookie)
> tc_log->unlog(cookie, xid);
> DBUG_EXECUTE_IF("crash_commit_after", abort(););
> + RUN_HOOK(transaction, after_commit, thd, all);
!!!The return value from the callback is not taken into account.
> end:
> if (is_real_trans)
> start_waiting_global_read_lock(thd);
> @@ -1269,6 +1272,7 @@
> push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
> ER_WARNING_NOT_COMPLETE_ROLLBACK,
> ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
> + RUN_HOOK(transaction, after_rollback, thd, all);
!!!The return value from the callback is not taken into account.
> DBUG_RETURN(error);
> }
>
>
> === modified file 'sql/log.cc'
> --- a/sql/log.cc 2008-04-14 10:15:04 +0000
> +++ b/sql/log.cc 2008-06-17 03:04:54 +0000
> @@ -39,6 +39,7 @@
> #endif
>
> #include <mysql/plugin.h>
> +#include "rpl_handler.h"
>
> /* max size of the log message */
> #define MAX_LOG_BUFFER_SIZE 1024
> @@ -3520,16 +3521,18 @@
> }
>
>
> -bool MYSQL_BIN_LOG::flush_and_sync()
> +bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
> {
> int err=0, fd=log_file.file;
> safe_mutex_assert_owner(&LOCK_log);
> if (flush_io_cache(&log_file))
> return 1;
> + *synced= 0;
!!! I think *synced= 0 should be moved to above the `if', so that
*synced gets the correct value even if flush_io_cache() fails.
Also, I would suggest to allow `synced' to be NULL, in which case it
will not be updated. This will be useful since many calls to
flush_and_sync() ignore the value of `synced'.
> if (++sync_binlog_counter >= sync_binlog_period && sync_binlog_period)
> {
> sync_binlog_counter= 0;
> err=my_sync(fd, MYF(MY_WME));
> + *synced= 1;
> }
> return err;
> }
> @@ -3794,7 +3797,8 @@
>
> if (file == &log_file)
> {
> - error= flush_and_sync();
> + bool synced;
> + error= flush_and_sync(&synced);
> if (!error)
> {
> signal_update();
> @@ -3981,14 +3985,21 @@
> if (event_info->write(file))
> goto err;
>
> + error=0;
> if (file == &log_file) // we are writing to the real log (disk)
> {
> - if (flush_and_sync())
> + bool synced;
> + if (flush_and_sync(&synced))
> goto err;
> +
> + if (RUN_HOOK(binlog_storage, after_flush,
> + thd, log_file_name, file->pos_in_file, synced)) {
> + goto err;
> + }
> +
> signal_update();
> rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
> }
> - error=0;
>
> err:
> if (error)
> @@ -4244,8 +4255,9 @@
>
> DBUG_ASSERT(carry == 0);
>
> + bool synced=0;
> if (sync_log)
> - flush_and_sync();
> + flush_and_sync(&synced);
>
> return 0; // All OK
> }
> @@ -4331,7 +4343,9 @@
>
> if (commit_event && commit_event->write(&log_file))
> goto err;
> - if (flush_and_sync())
> +
> + bool synced;
> + if (flush_and_sync(&synced))
> goto err;
> DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
> if (cache->error) // Error on read
> @@ -4340,6 +4354,11 @@
> write_error=1; // Don't give more errors
> goto err;
> }
> +
> + if (RUN_HOOK(binlog_storage, after_flush,
> + thd, log_file_name, log_file.pos_in_file, synced))
> + goto err;
> +
> signal_update();
> }
>
>
> === modified file 'sql/log.h'
> --- a/sql/log.h 2008-02-03 09:00:49 +0000
> +++ b/sql/log.h 2008-06-17 03:04:54 +0000
> @@ -364,7 +364,7 @@
> bool is_active(const char* log_file_name);
> int update_log_index(LOG_INFO* linfo, bool need_update_threads);
> void rotate_and_purge(uint flags);
> - bool flush_and_sync();
> + bool flush_and_sync(bool *synced);
> int purge_logs(const char *to_log, bool included,
> bool need_mutex, bool need_update_threads,
> ulonglong *decrease_log_space);
>
> === modified file 'sql/mysqld.cc'
> --- a/sql/mysqld.cc 2008-04-20 08:06:51 +0000
> +++ b/sql/mysqld.cc 2008-06-17 03:04:54 +0000
> @@ -33,6 +33,8 @@
>
> #include "rpl_injector.h"
>
> +#include "rpl_handler.h"
> +
> #ifdef HAVE_SYS_PRCTL_H
> #include <sys/prctl.h>
> #endif
> @@ -1322,6 +1324,7 @@
> ha_end();
> if (tc_log)
> tc_log->close();
> + delegates_destroy();
> xid_cache_free();
> delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
> multi_keycache_free();
> @@ -3827,6 +3830,13 @@
> unireg_abort(1);
> }
>
> + /* initialize delegates for extension observers */
> + if (delegates_init())
> + {
> + sql_print_error("Initialize extension delegates failed");
> + unireg_abort(1);
> + }
> +
> /* need to configure logging before initializing storage engines */
> if (opt_update_log)
> {
>
> === added file 'sql/replication.h'
> --- a/sql/replication.h 1970-01-01 00:00:00 +0000
> +++ b/sql/replication.h 2008-06-17 03:04:54 +0000
> @@ -0,0 +1,385 @@
> +/* Copyright (C) 2008 MySQL AB
> +
> + This program is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; version 2 of the License.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program; if not, write to the Free Software
> + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
> +
> +#ifndef REPLICATION_H
> +#define REPLICATION_H
> +
> +/**
> + Transaction observer flags.
> +*/
> +enum Trans_flags {
> + /** Transaction is a real transaction */
> + TRANS_IS_REAL_TRANS = 1
> +};
> +
> +/**
> + Transaction observer parameter
> +*/
> +typedef struct Trans_param {
> + uint32 server_id;
> + uint32 flags;
> +} Trans_param;
> +
> +/**
> + Observes and extends transaction execution
> +*/
> +typedef struct Trans_observer {
> + uint32 len;
> +
> + /**
> + After transaction commit to storage engines
> +
> + This callback is called right after commit to storage engines for
> + transactional tables.
Note that the first line of a doxygen comment will be used as a brief
description of the function, so it should look like a complete sentence
rather than a header. I think it would be more clear to merge the above
two sentences to something like "Callback that will be invoked after
each transaction commits to storage engines" or something similar.
> +
> + For non-transactional tables, this is called at the end of the
> + statement, before sending statement status, if the statement
> + succeeded.
> +
> + @param param The parameter for transaction observers
> +
> + @return 0 on sucess, 1 on failure
Please say what the semantics is for success and failure. I.e., how does
the server behave on failure? Does it rollback? Does it print an error
message?
> + */
> + int (*after_commit)(Trans_param *param);
Can the plugin modify param? If not, please make it const.
> +
> + /**
> + After transaction rollback to storage engines
> +
> + This callback is called right after rollback to storage engines
> + for transactional tables.
Same as for Trans_observer::after_commit: please merge these two
paragraph so the first line becomes a full sentence.
> +
> + For non-transactional tables, this is called at the end of the
> + statement, before sending statement status, if the statement
> + failed.
> +
> + @param param The parameter for transaction observers
> +
> + @return 0 on sucess, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*after_rollback)(Trans_param *param);
!!!Can the plugin modify param? If not, please make it const.
> +} Trans_observer;
> +
> +/**
> + Binlog storage flags
> +*/
> +enum Binlog_storage_flags {
> + /** Binary log was sync:ed */
> + BINLOG_STORAGE_IS_SYNCED = 1
> +};
> +
> +/**
> + Binlog storage observer parameters
> + */
> +typedef struct Binlog_storage_param {
> + uint32 server_id;
> +} Binlog_storage_param;
> +
> +/**
> + Observe binlog logging storage
> +*/
> +typedef struct Binlog_storage_observer {
> + uint32 len;
> +
> + /**
> + After binlog has been updated and flushed
Please extend this to a full description, similar to above.
> +
> + @param param Observer common parameter
> + @param log_file Binlog file name been updated
> + @param log_pos Binlog position after update
> + @param flags flags for binlog storage
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*after_flush)(Binlog_storage_param *param,
> + const char *log_file, my_off_t log_pos,
> + uint32 flags);
> +} Binlog_storage_observer;
> +
> +/**
> + Replication binlog tramsmitter (binlog dump) observer parameter.
typo: transmitter with n
> +*/
> +typedef struct Binlog_transmit_param {
> + uint32 server_id;
> + uint32 flags;
> +} Binlog_transmit_param;
> +
> +/**
> + Observe and extends the binlog dumping thread.
> +*/
> +typedef struct Binlog_transmit_observer {
> + uint32 len;
> +
> + /**
> + Start binlog dumping thread
Please extend to a full description, similar to above.
> +
> + @param param Observer common parameter
> + @param log_file Binlog file name to transmit from
> + @param log_pos Binlog position to transmit from
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*transmit_start)(Binlog_transmit_param *param,
> + const char *log_file, my_off_t log_pos);
!!!Can the plugin modify param? If not, please make it const.
> +
> + /**
> + Start binlog dumping thread
Please extend to a full description, similar to above.
> +
> + @param param Observer common parameter
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*transmit_stop)(Binlog_transmit_param *param);
!!!Can the plugin modify param? If not, please make it const.
> +
> + /**
> + Reserve bytes for header for binlog packet for transmision
Please explain when this callback will be invoked and how it can be used
by the plugin.
Typo: transmission is with three s
> +
> + @param param Observer common parameter
> + @param packet The packet buffer for transmision
It should be specified who owns `packet', and whether it can be modified
by the plugin. Please state that the server ensures that `packet' points
to valid memory before invoking the callback, that the plugin can modify
`packet' as it likes, and that the server will take care of freeing
`packet' when needed.
> + @param hlen Header length before and after reserve header for
> + this observer
!!!It is not clear what the purpose of `hlen' is. Isn't it equal to
packet.length()? In that case, it is redundant and I think it should be
dropped.
See also my comment in rpl_handler.cc below, inside the function
"Binlog_transmit_delegate::reserve_header"
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*reserve_header)(Binlog_transmit_param *param,
> + String *packet, ulong *len);
!!!Can the plugin modify `param' and `packet'? If not, please make it const.
> +
> + /**
> + Before sending an event packet
Please extend to a full description, similar to above.
> +
> + @param param Observer common parameter
> + @param packet Binlog event packet to send
It should be specified who owns `packet', and whether it can be modified
by the plugin. Please state that the server ensures that `packet' points
to valid memory before invoking the callback, that the plugin can modify
`packet' as it likes, and that the server will take care of freeing
`packet' when needed.
> + @param log_file Binlog file name of the event packet to send
> + @param log_pos Binlog position of the event packet to send
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*before_send_event)(Binlog_transmit_param *param,
> + String *packet,
> + const char *log_file, my_off_t log_pos );
!!!Can the plugin modify `param'? If not, please make it const.
> +
> + /**
> + After sending an event packet
> +
> + @param param Observer common parameter
> + @param event_buf Binlog event packet buffer sent
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*after_send_event)(Binlog_transmit_param *param,
> + const char *event_buf);
!!!Can the plugin modify param? If not, please make it const.
> +
> + /**
> + Reset transmision (binlog dumping) status
> +
> + This is called when executing the command RESET MASTER, and is
> + used to reset status variables added by observers.
> +
> + @param param Observer common parameter
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*after_reset_master)(Binlog_transmit_param *param);
Can the plugin modify param? If not, please make it const.
> +} Binlog_transmit_observer;
> +
> +/**
> + Binlog relay IO flags
> +*/
> +enum Binlog_relay_IO_flags {
> + /** Binary relay log was sync:ed */
> + BINLOG_RELAY_IS_SYNCED = 1
> +};
> +
> +
> +/**
> + Replication binlog relay IO observer parameter
> +*/
> +typedef struct Binlog_relay_IO_param {
> + uint32 server_id;
> +
> + /* Master host, user and port */
> + char *host;
> + char *user;
> + uint port;
> +
> + char *master_log_name;
> + my_off_t master_log_pos;
> +
> + MYSQL *mysql; /* the connection to master */
> +} Binlog_relay_IO_param;
> +
> +/**
> + Observes and extends the service of slave IO thread.
> +*/
> +typedef struct Binlog_relay_IO_observer {
> + uint32 len;
> +
> + /**
> + Start of slave IO thread
Please make it a full sentence, like above.
> +
> + @param param Observer common parameter
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*thread_start)(Binlog_relay_IO_param *param);
!!!Can the plugin modify param? If not, please make it const.
> +
> + /**
> + Stop of slave IO thread
Please make it a full sentence, like above.
> +
> + @param param Observer common parameter
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*thread_stop)(Binlog_relay_IO_param *param);
!!!Can the plugin modify param? If not, please make it const.
> +
> + /**
> + Before slave request binlog transmision from master
> +
> + This is called before slave issuing BINLOG_DUMP command to master
> + to request binlog.
Please merge these lines so that the first line becomes a full sentence,
like above.
> +
> + @param param Observer common parameter
> + @param flags binlog dump flags
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
!!!Can the plugin modify param? If not, please make it const.
> +
> + /**
> + After read an event packet from master
Please extend to a full sentence, and explain what the plugin can use
the callback for.
> +
> + @param param Observer common parameter
> + @param packet The event packet read from master
> + @param len Length of the event packet read from master
> + @param event_buf The event packet return after process
> + @param event_len The length of event packet return after process
!!!This is not symmetric with `reserve_header'. `reserve_header' can
modify the packet as it likes, whereas `after_read_event' can only
return a substring of the packet. I would suggest to make
`after_read_event' take a single `String *packet' as parameter, instead
of `packet, len, event_buf, event_len'.
I understand that it is more efficient to take the substring by pointer
arithmetic than by copying the string. However, that is an
implementation detail that should not be reflected in the interface. If
we need a more efficient plugin, we can later add functionality to the
String class that allows the start of the text to be different from the
beginning of the buffer.
See also my comment in slave.cc below, in the diff chunk
"@@ -2283,7 +2294,19 @@"
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*after_read_event)(Binlog_relay_IO_param *param,
> + const char *packet, ulong len,
> + const char **event_buf, ulong *event_len);
!!!Can the plugin modify param? If not, please make it const.
> +
> + /**
> + After written event to relay log
Please extend to a full sentence.
> +
> + @param param Observer common parameter
> + @param event_buf Event packet written to relay log
> + @param event_len Length of the event packet written to relay log
> + @param flags flags for relay log
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*after_queue_event)(Binlog_relay_IO_param *param,
> + const char *event_buf, ulong event_len,
> + uint32 flags);
!!!If param cannot be modified by the plugin, please make it const.
> +
> + /**
> + Reset slave relay log IO status
> +
> + @param param Observer common parameter
> +
> + @return 0 on success, 1 on failure
Same as for Trans_observer::after_commit: please specify server semantics.
> + */
> + int (*after_reset_slave)(Binlog_relay_IO_param *param);
!!!If param cannot be modified by the plugin, please make it const.
> +} Binlog_relay_IO_observer;
> +
> +#ifdef __cplusplus
> +extern "C" {
> +#endif
> +
> +/**
> + Register a transaction observer
> +
> + @param observer The transaction observer to register
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer already exists
> +*/
> +int register_trans_observer(Trans_observer *observer, void *p);
> +
> +/**
> + Unregister a transaction observer
> +
> + @param observer The transaction observer to unregister
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer not exists
> +*/
> +int unregister_trans_observer(Trans_observer *observer, void *p);
> +
> +/**
> + Register a binlog storage observer
> +
> + @param observer The binlog storage observer to register
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer already exists
> +*/
> +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
> +
> +/**
> + Unregister a binlog storage observer
> +
> + @param observer The binlog storage observer to unregister
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer not exists
> +*/
> +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
> +
> +/**
> + Register a binlog transmit observer
> +
> + @param observer The binlog transmit observer to register
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer already exists
> +*/
> +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
> +
> +/**
> + Unregister a binlog transmit observer
> +
> + @param observer The binlog transmit observer to unregister
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer not exists
> +*/
> +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void
> *p);
> +
> +/**
> + Register a binlog relay IO (slave IO thread) observer
> +
> + @param observer The binlog relay IO observer to register
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer already exists
> +*/
> +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
> +
> +/**
> + Unregister a binlog relay IO (slave IO thread) observer
> +
> + @param observer The binlog relay IO observer to unregister
> + @param p pointer to the internal plugin structure
> +
> + @return 0 if success, 1 if the observer not exists
> +*/
> +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> *p);
> +
> +#ifdef __cplusplus
> +}
> +#endif
> +#endif /* REPLICATION_H */
>
> === added file 'sql/rpl_handler.cc'
> --- a/sql/rpl_handler.cc 1970-01-01 00:00:00 +0000
> +++ b/sql/rpl_handler.cc 2008-06-17 03:04:54 +0000
> @@ -0,0 +1,401 @@
> +/* Copyright (C) 2008 MySQL AB
> +
> + This program is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; version 2 of the License.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program; if not, write to the Free Software
> + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
> +
> +#include "mysql_priv.h"
> +
> +#include "rpl_mi.h"
> +#include "sql_repl.h"
> +#include "log_event.h"
> +#include "rpl_filter.h"
> +#include <my_dir.h>
> +#include "rpl_handler.h"
> +
> +Trans_delegate transaction_delegate;
> +Binlog_storage_delegate binlog_storage_delegate;
> +#ifdef HAVE_REPLICATION
> +Binlog_transmit_delegate binlog_transmit_delegate;
> +Binlog_relay_IO_delegate binlog_relay_io_delegate;
> +#endif /* HAVE_REPLICATION */
> +
> +int thd_net_read(const unsigned char **packet, size_t *len)
> +{
> + THD *thd= current_thd;
> + ulong ret= my_net_read(&thd->net);
> + if (ret == packet_error)
> + return 1;
> + *len= ret;
> + *packet= thd->net.read_pos;
> + return 0;
> +}
> +
> +int thd_net_write(const unsigned char *packet, size_t len)
> +{
> + return my_net_write(¤t_thd->net, packet, len);
> +}
> +
> +int thd_net_flush()
> +{
> + return net_flush(¤t_thd->net);
> +}
> +
> +int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len)
> +{
> + ulong ret= my_net_read(&mysql->net);
> + if (ret == packet_error)
> + return 1;
> + *len= ret;
> + *packet= mysql->net.read_pos;
> + return 0;
> +}
> +
> +int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len)
> +{
> + return my_net_write(&mysql->net, packet, len);
> +}
> +
> +int mysql_net_flush(MYSQL *mysql)
> +{
> + return net_flush(&mysql->net);
> +}
> +
> +int get_user_var_int(const char *name,
> + long long int *value, int *null_value)
> +{
> + my_bool null_val;
> + user_var_entry *entry=
> + (user_var_entry*) hash_search(¤t_thd->user_vars,
> + (uchar*) name, strlen(name));
> + if (!entry)
> + return 1;
> + *value= entry->val_int(&null_val);
> + if (null_value)
> + *null_value= null_val;
> + return 0;
> +}
> +
> +int get_user_var_real(const char *name,
> + double *value, int *null_value)
> +{
> + my_bool null_val;
> + user_var_entry *entry=
> + (user_var_entry*) hash_search(¤t_thd->user_vars,
> + (uchar*) name, strlen(name));
> + if (!entry)
> + return 1;
> + *value= entry->val_real(&null_val);
> + if (null_value)
> + *null_value= null_val;
> + return 0;
> +}
> +
> +int get_user_var_str(const char *name, char *value,
> + size_t len, unsigned int precision, int *null_value)
> +{
> + String str;
> + my_bool null_val;
> + user_var_entry *entry=
> + (user_var_entry*) hash_search(¤t_thd->user_vars,
> + (uchar*) name, strlen(name));
> + if (!entry)
> + return 1;
> + entry->val_str(&null_val, &str, precision);
> + strncpy(value, str.c_ptr(), len);
> + if (null_value)
> + *null_value= null_val;
> + return 0;
> +}
> +
> +int delegates_init()
> +{
> + if (transaction_delegate.init()
> + || binlog_storage_delegate.init()
> +#ifdef HAVE_REPLICATION
> + || binlog_transmit_delegate.init()
> + || binlog_relay_io_delegate.init()
> +#endif /* HAVE_REPLICATION */
> + )
> + return 1;
> + return 0;
> +}
> +
> +void delegates_destroy()
> +{
> + transaction_delegate.destroy();
> + binlog_storage_delegate.destroy();
> +#ifdef HAVE_REPLICATION
> + binlog_transmit_delegate.destroy();
> + binlog_relay_io_delegate.destroy();
> +#endif /* HAVE_REPLICATION */
> +}
> +
> +/*
> + NOTE2ME: I (hezx) am not sure if it's correct to add observer
> + plugins to the thd->lex list. Because afer each statement, all
> + plugins add to thd->lex will be automatically unlocked.
> + */
Please add a comment explaining what the macro does.
> +#define FOREACH_OBSERVER(f, thd, args...) \
!!!This syntax for variable-length argument lists is a gcc extension.
See http://gcc.gnu.org/onlinedocs/gcc/Variadic-Macros.html .
I'd suggest to replace this by a single argument, and let the caller
provide the parentheses (like in DBUG_PRINT). So the caller will do
FOREACH_OBSERVER(after_commit, thd, (¶m));
or
FOREACH_OBSERVER(after_flush, thd, (¶m, log_file, log_pos, flags));
> + param.server_id= thd->server_id; \
> + read_lock(); \
> + Observer_info_iterator iter= observer_info_iter(); \
> + Observer_info *info= iter++; \
> + for (; info; info= iter++) \
> + { \
> + plugin_ref plugin= \
> + my_plugin_lock(thd, &info->plugin); \
> + if (!plugin) \
> + { \
> + unlock(); \
> + return 1; \
> + } \
> + if (((Observer *)info->observer)->f \
> + && ((Observer *)info->observer)->f(args)) \
!!!If you follow my suggestion above regarding the variable-length
argument list, here you have to replace "f(args)" by "f args"
> + { \
> + unlock(); \
> + return 1; \
> + } \
> + } \
> + unlock(); \
> + return 0
In the macro definition above, please use spaces instead of tabs.
> +
> +
> +int Trans_delegate::after_commit(THD *thd, bool all)
> +{
> + Trans_param param;
> + if (all || thd->transaction.all.ha_list == 0)
> + param.flags |= TRANS_IS_REAL_TRANS;
> +
> + FOREACH_OBSERVER(after_commit, thd, ¶m);
> +}
> +
> +int Trans_delegate::after_rollback(THD *thd, bool all)
> +{
> + Trans_param param;
> + if (all || thd->transaction.all.ha_list == 0)
> + param.flags |= TRANS_IS_REAL_TRANS;
> +
> + FOREACH_OBSERVER(after_rollback, thd, ¶m);
> +}
> +
> +int Binlog_storage_delegate::after_flush(THD *thd,
> + const char *log_file,
> + my_off_t log_pos,
> + bool synced)
> +{
> + Binlog_storage_param param;
> + uint32 flags=0;
> + if (synced)
> + flags |= BINLOG_STORAGE_IS_SYNCED;
> +
> + FOREACH_OBSERVER(after_flush, thd, ¶m, log_file, log_pos, flags);
> +}
> +
> +#ifdef HAVE_REPLICATION
> +int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
> + const char *log_file,
> + my_off_t log_pos)
> +{
> + Binlog_transmit_param param;
> + param.flags= flags;
> +
> + FOREACH_OBSERVER(transmit_start, thd, ¶m, log_file, log_pos);
> +}
> +
> +int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
> +{
> + Binlog_transmit_param param;
> + param.flags= flags;
> +
> + FOREACH_OBSERVER(transmit_stop, thd, ¶m);
> +}
> +
> +int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
> + String *packet, ulong *len)
> +{
> + Binlog_transmit_param param;
> + param.flags= flags;
> +
> + /* reserve and set default header */
> + packet->length(0);
> + packet->set("\0", 1, &my_charset_bin);
> + *len= 1;
OK, I think it's a good idea to refactor so that `packet' is reset in
only one place in the code. However, I think the delegates should only
be responsible for plugin-related things, not for resetting the packet.
Can you do it in a separate function, so that:
- instead of calling RUN_HOOK(reserve_header), you call
dumpthread_begin_transmit_event()
- dumpthread_begin_transmit_event() resets packet and calls
RUN_HOOK(reserve_header)
!!!When you have done this, you can add the line `*len=
packet->length()' to dumpthread_begin_transmit_event(), and remove `len'
from the reserve_handler prototype.
> +
> + FOREACH_OBSERVER(reserve_header, thd, ¶m, packet, len);
> +}
> +
> +int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
> + String *packet,
> + const char *log_file,
> + my_off_t log_pos)
> +{
> + Binlog_transmit_param param;
> + param.flags= flags;
> +
> + FOREACH_OBSERVER(before_send_event, thd, ¶m, packet, log_file, log_pos);
> +}
> +
> +int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
> + const char *event_buf)
> +{
> + Binlog_transmit_param param;
> + param.flags= flags;
> +
> + FOREACH_OBSERVER(after_send_event, thd, ¶m, event_buf);
> +}
> +
> +int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
> +
> +{
> + Binlog_transmit_param param;
> + param.flags= flags;
> +
> + FOREACH_OBSERVER(after_reset_master, thd, ¶m);
> +}
> +
> +int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
> +{
> + Binlog_relay_IO_param param;
> + param.mysql= mi->mysql;
> + param.user= mi->user;
> + param.host= mi->host;
> + param.port= mi->port;
> + param.master_log_name= mi->master_log_name;
> + param.master_log_pos= mi->master_log_pos;
> +
> + FOREACH_OBSERVER(thread_start, thd, ¶m);
> +}
> +
> +int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
> +{
> +
> + Binlog_relay_IO_param param;
> + param.mysql= mi->mysql;
> + param.user= mi->user;
> + param.host= mi->host;
> + param.port= mi->port;
> + param.master_log_name= mi->master_log_name;
> + param.master_log_pos= mi->master_log_pos;
> +
> + FOREACH_OBSERVER(thread_stop, thd, ¶m);
> +}
> +
> +int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
> + Master_info *mi,
> + ushort flags)
> +{
> + Binlog_relay_IO_param param;
> + param.mysql= mi->mysql;
> + param.user= mi->user;
> + param.host= mi->host;
> + param.port= mi->port;
> + param.master_log_name= mi->master_log_name;
> + param.master_log_pos= mi->master_log_pos;
> +
> + FOREACH_OBSERVER(before_request_transmit, thd, ¶m, (uint32)flags);
> +}
> +
> +int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
> + const char *packet, ulong len,
> + const char **event_buf,
> + ulong *event_len)
> +{
> + Binlog_relay_IO_param param;
> + param.mysql= mi->mysql;
> + param.user= mi->user;
> + param.host= mi->host;
> + param.port= mi->port;
> + param.master_log_name= mi->master_log_name;
> + param.master_log_pos= mi->master_log_pos;
> +
> + FOREACH_OBSERVER(after_read_event, thd,
> + ¶m, packet, len, event_buf, event_len);
> +}
> +
> +int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
> + const char *event_buf,
> + ulong event_len,
> + bool synced)
> +{
> + Binlog_relay_IO_param param;
> + param.mysql= mi->mysql;
> + param.user= mi->user;
> + param.host= mi->host;
> + param.port= mi->port;
> + param.master_log_name= mi->master_log_name;
> + param.master_log_pos= mi->master_log_pos;
> +
> + uint32 flags=0;
> + if (synced)
> + flags |= BINLOG_STORAGE_IS_SYNCED;
> +
> + FOREACH_OBSERVER(after_queue_event, thd,
> + ¶m, event_buf, event_len, flags);
> +}
> +
> +int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
> +
> +{
> + Binlog_relay_IO_param param;
> + param.mysql= mi->mysql;
> + param.user= mi->user;
> + param.host= mi->host;
> + param.port= mi->port;
> + param.master_log_name= mi->master_log_name;
> + param.master_log_pos= mi->master_log_pos;
> +
> + FOREACH_OBSERVER(after_reset_slave, thd, ¶m);
> +}
> +#endif /* HAVE_REPLICATION */
> +
> +int register_trans_observer(Trans_observer *observer, void *p)
> +{
> + return transaction_delegate.add_observer(observer, (st_plugin_int *)p);
> +}
> +
> +int unregister_trans_observer(Trans_observer *observer, void *p)
> +{
> + return transaction_delegate.remove_observer(observer, (st_plugin_int *)p);
> +}
> +
> +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
> +{
> + return binlog_storage_delegate.add_observer(observer, (st_plugin_int *)p);
> +}
> +
> +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
> +{
> + return binlog_storage_delegate.remove_observer(observer, (st_plugin_int *)p);
> +}
> +
> +#ifdef HAVE_REPLICATION
> +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
> +{
> + return binlog_transmit_delegate.add_observer(observer, (st_plugin_int *)p);
> +}
> +
> +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void
> *p)
> +{
> + return binlog_transmit_delegate.remove_observer(observer, (st_plugin_int *)p);
> +}
> +
> +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
> +{
> + return binlog_relay_io_delegate.add_observer(observer, (st_plugin_int *)p);
> +}
> +
> +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> *p)
> +{
> + return binlog_relay_io_delegate.remove_observer(observer, (st_plugin_int *)p);
> +}
> +#endif /* HAVE_REPLICATION */
>
> === added file 'sql/rpl_handler.h'
> --- a/sql/rpl_handler.h 1970-01-01 00:00:00 +0000
> +++ b/sql/rpl_handler.h 2008-06-17 03:04:54 +0000
> @@ -0,0 +1,195 @@
> +/* Copyright (C) 2008 MySQL AB
> +
> + This program is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; version 2 of the License.
> +
> + This program is distributed in the hope that it will be useful,
> + but WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program; if not, write to the Free Software
> + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
> +
> +#ifndef RPL_HANDLER_H
> +#define RPL_HANDLER_H
> +
> +#include "mysql_priv.h"
> +#include "rpl_mi.h"
> +#include "rpl_rli.h"
> +#include "sql_plugin.h"
> +#include "replication.h"
> +
> +class Observer_info {
> +public:
> + void *observer;
> + st_plugin_int *plugin_int;
> + plugin_ref plugin;
> +
> + Observer_info(void *ob, st_plugin_int *p)
> + :observer(ob), plugin_int(p)
> + {
> +#ifdef DBUG_OFF
> + plugin= plugin_int;
> +#else
> + plugin= &plugin_int;
> +#endif
Yuck :-) OK, I read the source and figured out why you need this. It's
not your fault, but it breaks an important OO principle when the user of
the type plugin_ref relies on the implementation details of plugin_ref.
To avoid that, I suggest adding something like this to sql_plugin.h
(near 'typedef struct st_plugin_int...'):
#ifdef DBUG_OFF
#define plugin_ref_from_pointer(ptr) (ptr)
#else
#define plugin_ref_from_pointer(ptr) (&(ptr))
#endif
And replace the preprocessor conditional above by
plugin= plugin_ref_from_pointer(ptr);
> + }
> +};
> +
> +class Delegate {
> +public:
> + typedef List<Observer_info> Observer_info_list;
> + typedef List_iterator<Observer_info> Observer_info_iterator;
> +
> + int add_observer(void *observer, st_plugin_int *plugin)
> + {
> + int ret= FALSE;
> + write_lock();
> + Observer_info_iterator iter(observer_info_list);
> + Observer_info *info= iter++;
> + while (info && info->observer != observer)
> + info= iter++;
> + if (!info)
> + {
> + info= new Observer_info(observer, plugin);
!!!Please check that info is not NULL (ouf of memory).
> + observer_info_list.push_back(info, &memroot);
!!!Please check the return value of push_back (1=out of memory)
> + }
> + else
> + ret= TRUE;
> + unlock();
> + return ret;
> + }
> +
> + int remove_observer(void *observer, st_plugin_int *plugin)
> + {
> + int ret= FALSE;
> + write_lock();
> + Observer_info_iterator iter(observer_info_list);
> + Observer_info *info= iter++;
> + while (info && info->observer != observer)
> + info= iter++;
> + if (info)
> + iter.remove();
> + else
> + ret= TRUE;
> + unlock();
> + return ret;
> + }
> +
> + inline Observer_info_iterator observer_info_iter()
> + {
> + return Observer_info_iterator(observer_info_list);
> + }
> +
> + inline bool is_empty()
> + {
> + return observer_info_list.is_empty();
> + }
> +
> + inline int read_lock()
> + {
> + return rw_rdlock(&lock);
> + }
> +
> + inline int write_lock()
> + {
> + return rw_wrlock(&lock);
> + }
> +
> + inline int unlock()
> + {
> + return rw_unlock(&lock);
> + }
> +
> + int init()
> + {
> + if (my_rwlock_init(&lock, NULL))
> + return 1;
> + init_sql_alloc(&memroot, 1024, 0);
> + return 0;
> + }
> +
> + void destroy()
> + {
> + rwlock_destroy(&lock);
> + free_root(&memroot, MYF(0));
> + }
> +
> + Delegate(){}
> + ~Delegate() {}
> +
> +private:
> + Observer_info_list observer_info_list;
> + rw_lock_t lock;
> + MEM_ROOT memroot;
> +};
> +
> +class Trans_delegate
> + :public Delegate {
> +public:
> + typedef Trans_observer Observer;
> + int before_commit(THD *thd, bool all);
> + int before_rollback(THD *thd, bool all);
> + int after_commit(THD *thd, bool all);
> + int after_rollback(THD *thd, bool all);
> +};
> +
> +class Binlog_storage_delegate
> + :public Delegate {
> +public:
> + typedef Binlog_storage_observer Observer;
> + int after_flush(THD *thd, const char *log_file,
> + my_off_t log_pos, bool synced);
> +};
> +
> +#ifdef HAVE_REPLICATION
> +class Binlog_transmit_delegate
> + :public Delegate {
> +public:
> + typedef Binlog_transmit_observer Observer;
> + int transmit_start(THD *thd, ushort flags,
> + const char *log_file, my_off_t log_pos);
> + int transmit_stop(THD *thd, ushort flags);
> + int reserve_header(THD *thd, ushort flags, String *packet, ulong *len);
> + int before_send_event(THD *thd, ushort flags,
> + String *packet, const
> + char *log_file, my_off_t log_pos );
> + int after_send_event(THD *thd, ushort flags,
> + const char *event_buf);
> + int after_reset_master(THD *thd, ushort flags);
> +};
> +
> +class Binlog_relay_IO_delegate
> + :public Delegate {
> +public:
> + typedef Binlog_relay_IO_observer Observer;
> + int thread_start(THD *thd, Master_info *mi);
> + int thread_stop(THD *thd, Master_info *mi);
> + int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
> + int after_read_event(THD *thd, Master_info *mi,
> + const char *packet, ulong len,
> + const char **event_buf, ulong *event_len);
> + int after_queue_event(THD *thd, Master_info *mi,
> + const char *event_buf, ulong event_len,
> + bool synced);
> + int after_reset_slave(THD *thd, Master_info *mi);
> +};
> +#endif /* HAVE_REPLICATION */
> +
> +int delegates_init();
> +void delegates_destroy();
> +
> +extern Trans_delegate transaction_delegate;
> +extern Binlog_storage_delegate binlog_storage_delegate;
> +#ifdef HAVE_REPLICATION
> +extern Binlog_transmit_delegate binlog_transmit_delegate;
> +extern Binlog_relay_IO_delegate binlog_relay_io_delegate;
> +#endif /* HAVE_REPLICATION */
> +
> +#define RUN_HOOK(group, hook, args...) \
> + group ##_delegate.hook(args)
> +
> +#endif /* RPL_HANDLER_H */
>
> === modified file 'sql/slave.cc'
> --- a/sql/slave.cc 2008-04-20 12:25:54 +0000
> +++ b/sql/slave.cc 2008-06-17 03:04:54 +0000
> @@ -39,6 +39,7 @@
> #include <sql_common.h>
> #include <errmsg.h>
> #include <mysys_err.h>
> +#include "rpl_handler.h"
>
> #ifdef HAVE_REPLICATION
>
> @@ -1499,17 +1500,22 @@
> }
>
>
> -static int request_dump(MYSQL* mysql, Master_info* mi,
> - bool *suppress_warnings)
> +static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
> + bool *suppress_warnings)
> {
> uchar buf[FN_REFLEN + 10];
> int len;
> - int binlog_flags = 0; // for now
> + ushort binlog_flags = 0; // for now
> char* logname = mi->master_log_name;
> DBUG_ENTER("request_dump");
>
> *suppress_warnings= FALSE;
>
> + if (RUN_HOOK(binlog_relay_io,
> + before_request_transmit,
> + thd, mi, binlog_flags))
> + DBUG_RETURN(1);
> +
> // TODO if big log files: Change next to int8store()
> int4store(buf, (ulong) mi->master_log_pos);
> int2store(buf + 4, binlog_flags);
> @@ -2134,6 +2140,10 @@
> mi->master_log_name,
> llstr(mi->master_log_pos,llbuff)));
>
> +
> + if (RUN_HOOK(binlog_relay_io, thread_start, thd, mi))
> + goto err;
> +
> if (!(mi->mysql = mysql = mysql_init(NULL)))
> {
> mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
> @@ -2208,7 +2218,7 @@
> while (!io_slave_killed(thd,mi))
> {
> thd_proc_info(thd, "Requesting binlog dump");
> - if (request_dump(mysql, mi, &suppress_warnings))
> + if (request_dump(thd, mysql, mi, &suppress_warnings))
> {
> sql_print_error("Failed on request_dump()");
> if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
> @@ -2231,6 +2241,7 @@
>
> while (!io_slave_killed(thd,mi))
> {
> + const char* event_buf;
> ulong event_len;
> /*
> We say "waiting" because read_event() will wait if there's nothing to
> @@ -2283,7 +2294,19 @@
>
> retry_count=0; // ok event, reset retry counter
> thd_proc_info(thd, "Queueing master event to the relay log");
> - if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
> + event_buf= (const char*)mysql->net.read_pos + 1;
> + if (RUN_HOOK(binlog_relay_io, after_read_event,
> + thd, mi,(const char*)mysql->net.read_pos + 1,
> + event_len, &event_buf, &event_len))
OK, so if you follow my suggestion above to use a single `String *'
instead of `packet, len, event_buf, event_len', then you'll have to
initialize the String here, so the four lines above become:
ev_string.set(mysql->net.read_pos + 1,
strlen(mysql->net.read_pos + 1), &my_charset_bin);
if (RUN_HOOK(binlog_relay_io, after_read_event, thd, mi, &ev_string))
and then you have to declare `String ev_string' at the beginning of the
function and use `ev_string' instead of `event_buf' below.
> + {
> + sql_print_error("Failed to run 'after_read_event' hook");
> + goto err;
> + }
> +
> + /* XXX: 'synced' should be updated by queue_event to indicate
> + whether event has been synced to disk */
> + bool synced= 0;
> + if (queue_event(mi, event_buf, event_len))
> {
> goto err;
> }
> @@ -2292,6 +2315,11 @@
> sql_print_error("Failed to flush master info file");
> goto err;
> }
> +
> + if (RUN_HOOK(binlog_relay_io, after_queue_event,
> + thd, mi, event_buf, event_len, synced))
> + goto err;
> +
> /*
> See if the relay logs take too much space.
> We don't lock mi->rli.log_space_lock here; this dirty read saves time
> @@ -2333,6 +2361,7 @@
> sql_print_information("Slave I/O thread exiting, read up to log '%s', position
> %s",
> IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
> VOID(pthread_mutex_lock(&LOCK_thread_count));
> + RUN_HOOK(binlog_relay_io, thread_stop, thd, mi);
!!!The return value from the callback is not taken into account.
> thd->query = thd->db = 0; // extra safety
> thd->query_length= thd->db_length= 0;
> VOID(pthread_mutex_unlock(&LOCK_thread_count));
>
> === modified file 'sql/sql_parse.cc'
> --- a/sql/sql_parse.cc 2008-04-16 07:53:20 +0000
> +++ b/sql/sql_parse.cc 2008-06-17 03:04:54 +0000
> @@ -21,6 +21,7 @@
> #include <m_ctype.h>
> #include <myisam.h>
> #include <my_dir.h>
> +#include "rpl_handler.h"
>
> #include "sp_head.h"
> #include "sp.h"
> @@ -1395,7 +1396,17 @@
>
> /* If commit fails, we should be able to reset the OK status. */
> thd->main_da.can_overwrite_status= TRUE;
> - ha_autocommit_or_rollback(thd, thd->is_error());
> + if (thd->transaction.stmt.ha_list)
> + {
> + ha_autocommit_or_rollback(thd, thd->is_error());
> + }
> + else
> + {
> + if (!thd->is_error())
> + RUN_HOOK(transaction, after_commit, thd, 0);
!!!The return value from the callback is not taken into account.
> + else
> + RUN_HOOK(transaction, after_rollback, thd, 0);
!!!The return value from the callback is not taken into account.
> + }
> thd->main_da.can_overwrite_status= FALSE;
>
> thd->transaction.stmt.reset();
>
> === modified file 'sql/sql_plugin.cc'
> --- a/sql/sql_plugin.cc 2008-04-14 10:15:04 +0000
> +++ b/sql/sql_plugin.cc 2008-06-17 03:04:54 +0000
> @@ -44,7 +44,8 @@
> { C_STRING_WITH_LEN("FTPARSER") },
> { C_STRING_WITH_LEN("DAEMON") },
> { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
> - { C_STRING_WITH_LEN("AUDIT") }
> + { C_STRING_WITH_LEN("AUDIT") },
> + { C_STRING_WITH_LEN("REPLICATION") }
> };
>
> extern int initialize_schema_table(st_plugin_int *plugin);
> @@ -89,7 +90,8 @@
> MYSQL_FTPARSER_INTERFACE_VERSION,
> MYSQL_DAEMON_INTERFACE_VERSION,
> MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
> - MYSQL_AUDIT_INTERFACE_VERSION
> + MYSQL_AUDIT_INTERFACE_VERSION,
> + MYSQL_REPLICATION_INTERFACE_VERSION
> };
> static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
> {
> @@ -98,7 +100,8 @@
> MYSQL_FTPARSER_INTERFACE_VERSION,
> MYSQL_DAEMON_INTERFACE_VERSION,
> MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
> - MYSQL_AUDIT_INTERFACE_VERSION
> + MYSQL_AUDIT_INTERFACE_VERSION,
> + MYSQL_REPLICATION_INTERFACE_VERSION
> };
>
> static bool initialized= 0;
>
> === modified file 'sql/sql_repl.cc'
> --- a/sql/sql_repl.cc 2008-03-08 10:14:47 +0000
> +++ b/sql/sql_repl.cc 2008-06-17 03:04:54 +0000
> @@ -21,6 +21,7 @@
> #include "log_event.h"
> #include "rpl_filter.h"
> #include <my_dir.h>
> +#include "rpl_handler.h"
>
> int max_binlog_dump_events = 0; // unlimited
> my_bool opt_sporadic_binlog_dump_fail = 0;
> @@ -392,6 +393,9 @@
> LOG_INFO linfo;
> char *log_file_name = linfo.log_file_name;
> char search_file_name[FN_REFLEN], *name;
> +
> + ulong ev_offset;
> +
> IO_CACHE log;
> File file = -1;
> String* packet = &thd->packet;
> @@ -407,6 +411,10 @@
> DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
>
> bzero((char*) &log,sizeof(log));
> + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
> + thd->server_id, log_ident, (ulong)pos);
> +
> + RUN_HOOK(binlog_transmit, transmit_start, thd, flags, log_ident, pos);
!!!The return value from the callback is not taken into account.
> /*
> heartbeat_period from @master_heartbeat_period user variable
> */
> @@ -477,11 +485,12 @@
> goto err;
> }
>
> - /*
> - We need to start a packet with something other than 255
> - to distinguish it from error
> - */
> - packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet
> */
> + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> + {
> + errmsg= "Failed to run hook 'reserve_header'";
> + my_errno= ER_UNKNOWN_ERROR;
!!!I'd suggest to define an ER_PLUGIN_ERROR in sql/share/errmsg.txt and
use that instead of ER_UNKNOWN_ERROR.
> + goto err;
> + }
>
> /*
> Tell the client about the log name with a fake Rotate event;
> @@ -521,7 +530,13 @@
> my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
> goto err;
> }
> - packet->set("\0", 1, &my_charset_bin);
> + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> + {
> + errmsg= "Failed to run hook 'reserve_header'";
> + my_errno= ER_UNKNOWN_ERROR;
!!!As above, please define and use ER_PLUGIN_ERROR.
> + goto err;
> + }
> +
> /*
> Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
> this larger than the corresponding packet (query) sent
> @@ -545,28 +560,28 @@
> {
> /*
> The packet has offsets equal to the normal offsets in a binlog
> - event +1 (the first character is \0).
> + event + ev_offset (the first character is \0).
> */
> DBUG_PRINT("info",
> ("Looked for a Format_description_log_event, found event type
> %d",
> - (*packet)[EVENT_TYPE_OFFSET+1]));
> - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
> + (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
> + if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
> {
> - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
> + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
> LOG_EVENT_BINLOG_IN_USE_F);
> - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> /*
> mark that this event with "log_pos=0", so the slave
> should not increment master's binlog position
> (rli->group_master_log_pos)
> */
> - int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
> + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
> /*
> if reconnect master sends FD event with `created' as 0
> to avoid destroying temp tables.
> */
> int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
> - ST_CREATED_OFFSET+1, (ulong) 0);
> + ST_CREATED_OFFSET+ev_offset, (ulong) 0);
> /* send it */
> if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
> {
> @@ -593,7 +608,12 @@
> */
> }
> /* reset the packet as we wrote to it in any case */
> - packet->set("\0", 1, &my_charset_bin);
> + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> + {
> + errmsg= "Failed to run hook 'reserve_header'";
> + my_errno= ER_UNKNOWN_ERROR;
!!!As above, please define and use ER_PLUGIN_ERROR.
> + goto err;
> + }
> } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
> else
> {
> @@ -605,6 +625,8 @@
>
> while (!net->error && net->vio != 0 && !thd->killed)
> {
> + Log_event_type event_type= UNKNOWN_EVENT;
> +
> while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
> {
> #ifndef DBUG_OFF
> @@ -620,17 +642,25 @@
> log's filename does not change while it's active
> */
> if (coord)
> - coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
> + coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
>
> - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
> + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
> + if (event_type == FORMAT_DESCRIPTION_EVENT)
> {
> - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
> + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
> LOG_EVENT_BINLOG_IN_USE_F);
> - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> }
> - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
> + else if (event_type == STOP_EVENT)
> binlog_can_be_corrupted= FALSE;
>
> + pos = my_b_tell(&log);
> + if (RUN_HOOK(binlog_transmit, before_send_event,
> + thd, flags, packet, log_file_name, pos))
> + {
> + errmsg= "run 'before_send_event' hook failed";
!!!Please set my_errno
> + goto err;
> + }
> if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
> {
> errmsg = "Failed on my_net_write()";
> @@ -638,9 +668,8 @@
> goto err;
> }
>
> - DBUG_PRINT("info", ("log event code %d",
> - (*packet)[LOG_EVENT_OFFSET+1] ));
> - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
> + DBUG_PRINT("info", ("log event code %d", event_type));
> + if (event_type == LOAD_EVENT)
> {
> if (send_file(thd))
> {
> @@ -649,7 +678,20 @@
> goto err;
> }
> }
> - packet->set("\0", 1, &my_charset_bin);
> +
> + if (RUN_HOOK(binlog_transmit, after_send_event, thd, flags,
> packet->ptr()))
> + {
> + errmsg= "Failed to run hook 'after_send_event'";
> + my_errno= ER_UNKNOWN_ERROR;
!!!As above, please define and use ER_PLUGIN_ERROR.
> + goto err;
> + }
> +
> + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> + {
> + errmsg= "Failed to run hook 'reserve_header'";
> + my_errno= ER_UNKNOWN_ERROR;
!!!As above, please define and use ER_PLUGIN_ERROR.
> + goto err;
> + }
> }
>
> /*
> @@ -779,7 +821,15 @@
>
> if (read_packet)
> {
> - thd_proc_info(thd, "Sending binlog event to slave");
> + thd_proc_info(thd, "Sending binlog event to slave");
> + pos = my_b_tell(&log);
> + if (RUN_HOOK(binlog_transmit, before_send_event,
> + thd, flags, packet, log_file_name, pos))
> + {
> + errmsg= "run 'before_send_event' hook failed";
!!!Please set my_errno
> + goto err;
> + }
> +
> if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
> {
> errmsg = "Failed on my_net_write()";
> @@ -787,7 +837,7 @@
> goto err;
> }
>
> - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
> + if (event_type == LOAD_EVENT)
> {
> if (send_file(thd))
> {
> @@ -796,11 +846,18 @@
> goto err;
> }
> }
> - packet->set("\0", 1, &my_charset_bin);
> - /*
> - No need to net_flush because we will get to flush later when
> - we hit EOF pretty quick
> - */
> +
> + if (RUN_HOOK(binlog_transmit, after_send_event, thd, flags, packet->ptr()))
> + {
> + errmsg= "Failed to run hook 'after_send_event'";
!!!Please set my_errno
> + goto err;
> + }
> +
> + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> + {
> + errmsg= "Failed to run hook 'reserve_header'";
!!!Please set my_errno
> + goto err;
> + }
> }
>
> if (fatal_error)
> @@ -853,8 +910,12 @@
> goto err;
> }
>
> - packet->length(0);
> - packet->append('\0');
> + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> + {
> + errmsg= "Failed to run hook 'reserve_header'";
> + my_errno= ER_UNKNOWN_ERROR;
!!!As above, please define and use ER_PLUGIN_ERROR.
> + goto err;
> + }
> if (coord)
> coord->file_name= log_file_name; // reset to the next
> }
> @@ -864,6 +925,7 @@
> end_io_cache(&log);
> (void)my_close(file, MYF(MY_WME));
>
> + RUN_HOOK(binlog_transmit, transmit_stop, thd, flags);
!!!The return value from the callback is not taken into account.
> my_eof(thd);
> thd_proc_info(thd, "Waiting to finalize termination");
> pthread_mutex_lock(&LOCK_thread_count);
> @@ -874,6 +936,7 @@
> err:
> thd_proc_info(thd, "Waiting to finalize termination");
> end_io_cache(&log);
> + RUN_HOOK(binlog_transmit, transmit_stop, thd, flags);
!!!The return value from the callback is not taken into account.
> /*
> Exclude iteration through thread list
> this is needed for purge_logs() - it will iterate through
> @@ -1134,6 +1197,7 @@
> goto err;
> }
>
> + RUN_HOOK(binlog_relay_io, after_reset_slave, thd, mi);
!!!The return value from the callback is not taken into account.
> err:
> unlock_slave_threads(mi);
> if (error)
> @@ -1416,7 +1480,11 @@
> ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
> return 1;
> }
> - return mysql_bin_log.reset_logs(thd);
> +
> + if (mysql_bin_log.reset_logs(thd))
> + return 1;
> + RUN_HOOK(binlog_transmit, after_reset_master, thd, 0 /* flags */);
!!!The return value from the callback is not taken into account.
> + return 0;
> }
>
> int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
> @@ -1923,5 +1991,3 @@
> }
>
> #endif /* HAVE_REPLICATION */
> -
> -
>
>
>
--
Sven Sandberg, Software Engineer
MySQL AB, www.mysql.com