Hi Sven
On 2008-06-24 Tue 18:24 +020, Sven Sandberg wrote:
> Hi Jason!
>
> Thank you for this big piece of work. Generally, the interface is very
> good. In long term, I think it will increase code robustness of
> replication. The code is of high quality and it follows object oriented
> principles. It is also quite well documented.
>
> I have a few suggestions for improvements. I have been quite detailed,
> so there are many, but most of them are small and easily fixable. Since
> I understand it is important to get this pushed soon, I marked more
> important comments with !!!. I think those have to be addressed before
> the first push, and if the schedule is tight, you can address the other
> comments later.
>
>
> First, some general notes:
>
> !!!==== Error handling ====
>
> The patch handles errors returned from plugin callback inconsistently.
> In the patch, all errors stop other hooks for the same call from being
> invoked. However, after that, some errors are ignored (e.g.,
> transaction_delegate.after_commit), some are reported to the user (e.g.,
> binlog_relay_io), and some affect the server (e.g., errors in
> binlog_transmit stops the dump thread in most cases).
>
> We should have a consistent design for this. Two questions we need to
> settle:
>
> - Should the plugin report errors to the user itself, or should it
> only pass an error code to the server and the server reports it to the
> user? If possible, it would be good if the plugin could report the error
> (since it can give a plugin-specific message), but I don't know the
> plugin system well enough to know if that is possible.
>
I think plugins should only report error to the server, then it is the
server's decision whether the error should be reported to user or not.
> - What should the server do after an error? I imagine plugins may want
> to report different levels of errors, depending on the severity. I can
> imagine the following severity levels:
> 0 - no error
> 1 - the server only reports an error and continues
> 2 - the server discards the currently processed event and does not
> invoke any more plugins for the currently processed event.
> 3 - the server stops the current thread and does not invoke any more
> plugins for the currently processed event (i.e., client/dump/io/sql thread).
> I suggest we create an enum value which describes the actions that the
> server can take. Then, each callback function returns that enumeration
> type instead of 0/1.
>
I agree with your suggestion, and that's why the return value is 'int'
instead of 'bool'. But since this interface if targeted for semi-sync
and as a start point for a universal replication interface, I tried to
keep the interface as simple as possible, so I'd like to keep the error
code as it is for now and will consider your suggestion for the
universal replication interface.
> ==== Tabs ====
>
> Please do not use tabs in source code. This patch contains a few tabs.
> Hint: use the `expand' program to convert to spaces, and add
> https://intranet.mysql.com/secure/paste/displaypaste.php?codeid=4565 to
> ~/.emacs
>
>
> More comments inline.
>
>
>
> He Zhenxing wrote:
> > Hi,
> >
> > Here is the patch for code review of the replication inteface of
> > WL#4398.
> >
> > Best regards!
> >
> >
> >
> > ------------------------------------------------------------------------
> >
> > Subject:
> > bzr commit into mysql-6.0-semi-sync-1.0 tree (hezx:2635) WL#4398
> > From:
> > He Zhenxing <hezx@stripped>
> > Date:
> > Tue, 17 Jun 2008 11:05:49 +0800
> > To:
> > commits@stripped
> >
> > To:
> > commits@stripped
> >
> >
> > #At file:///media/sda3/work/mysql/bzrwork/semisync/mysql-6.0-semi-sync-1.0/
> >
> > ------------------------------------------------------------
> > revno: 2635
> > revision-id: hezx@stripped
> > parent: sp1r-kostja@bodhi.(none)-20080420124250-11357
> > committer: He Zhenxing <hezx@stripped>
> > branch nick: mysql-6.0-semi-sync-1.0
> > timestamp: 二 2008-06-17 11:04:54 +0800
> > message:
> > WL#4398 Replication Interface for semi-synchronous replication
> >
> > Add replication interface and plugin support
> > added:
> > sql/replication.h
> replication.h-20080520140908-iazlmzffc1xcnz71-2
> > sql/rpl_handler.cc
> rpl_handler.cc-20080520140908-iazlmzffc1xcnz71-1
> > sql/rpl_handler.h
> rpl_handler.h-20080520140902-36338uea7oevomai-1
> > modified:
> > include/mysql/plugin.h
> sp1f-plugin.h-20051105112032-xacmvx22ghtcgtqhu6v56b4bneqtx7l5
> > libmysqld/Makefile.am
> sp1f-makefile.am-20010411110351-26htpk3ynkyh7pkfvnshztqrxx3few4g
> > sql/Makefile.am
> sp1f-makefile.am-19700101030959-xsjdiakci3nqcdd4xl4yomwdl5eo2f3q
> > sql/handler.cc
> sp1f-handler.cc-19700101030959-ta6zfrlbxzucylciyro3musjsdpocrdh
> > sql/log.cc
> sp1f-log.cc-19700101030959-r3hdfovek4kl6nd64ovoaknmirota6bq
> > sql/log.h
> sp1f-log.h-20051222053446-ggv6hdi5fnxggnjemezvv7n2bcbkx45e
> > sql/mysqld.cc
> sp1f-mysqld.cc-19700101030959-zpswdvekpvixxzxf7gdtofzel7nywtfj
> > sql/slave.cc
> sp1f-slave.cc-19700101030959-a636aj3mjxgu7fnznrg5kt77p3u2bvhh
> > sql/sql_parse.cc
> sp1f-sql_parse.cc-19700101030959-ehcre3rwhv5l3mlxqhaxg36ujenxnrcd
> > sql/sql_plugin.cc
> sp1f-sql_plugin.cc-20051105112032-hrm64p6xfjq33ud6zy3uivpo7azm75a2
> > sql/sql_repl.cc
> sp1f-sql_repl.cc-20001002032713-xqbns5ofqsaebhgi2ypcfn7nhz7nh5rp
> > per-file comments:
> > include/mysql/plugin.h
> > Add replication plugin support and some API functions
> > sql/Makefile.am
> > Add replication interface support
> > sql/handler.cc
> > Add replication interface support
> > sql/log.cc
> > Add replication interface support
> > sql/mysqld.cc
> > Add replication interface support
> > sql/replication.h
> > Add replication interface support
> > sql/rpl_handler.cc
> > Add replication interface support
> > sql/rpl_handler.h
> > Add replication interface support
> > sql/slave.cc
> > Add replication interface support
> > sql/sql_parse.cc
> > Add replication interface support
> > sql/sql_plugin.cc
> > Add replication plugin support
> > sql/sql_repl.cc
> > Add replication interface support
> >
> >
> > ------------------------------------------------------------------------
> >
> >
> >
> > ------------------------------------------------------------------------
> >
> > === modified file 'include/mysql/plugin.h'
> > --- a/include/mysql/plugin.h 2008-03-27 19:02:15 +0000
> > +++ b/include/mysql/plugin.h 2008-06-17 03:04:54 +0000
> > @@ -16,6 +16,11 @@
> > #ifndef _my_plugin_h
> > #define _my_plugin_h
> >
> > +/* size_t */
> > +#include <stdlib.h>
> > +
> > +typedef struct st_mysql MYSQL;
> > +
> > #ifdef __cplusplus
> > class THD;
> > class Item;
> > @@ -66,7 +71,8 @@
> > #define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */
> > #define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */
> > #define MYSQL_AUDIT_PLUGIN 5 /* The Audit plugin type */
> > -#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */
> > +#define MYSQL_REPLICATION_PLUGIN 6 /* The replication plugin type */
> > +#define MYSQL_MAX_PLUGIN_TYPE_NUM 7 /* The number of plugin types */
> >
> > /* We use the following strings to define licenses for plugins */
> > #define PLUGIN_LICENSE_PROPRIETARY 0
> > @@ -461,6 +467,19 @@
> >
> >
> > /*************************************************************************
> > + API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
> > +*/
> > +#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
> > +
> > +/**
> > + Replication plugin descriptor
> > +*/
> > +struct st_mysql_replication {
> > + int interface_version;
> > +};
> > +
> > +
> > +/*************************************************************************
> > st_mysql_value struct for reading values from mysqld.
> > Used by server variables framework to parse user-provided values.
> > Will be used for arguments when implementing UDFs.
> > @@ -610,6 +629,103 @@
> > const char *key, unsigned int key_length,
> > int using_trx);
> >
> > +/**
> > + Read a packet from the thread connection.
>
> I think it would be more clear to say "the calling thread's connection"
>
or 'current' thread
> > +
> > + @param packet return the pointer to the packet read
> > + @param len return the length of packet read
>
> Please be more specific on who owns the memory `*packet' and `len' point
> to: the memory pointed to by `packet' belongs to the current THD object
> of the current thread and will be freed automatically by the thread
> handling system. The caller should make sure that `len' points to valid
> memory, and the function will overwrite that memory.
>
yes, good point
> > +
> > + @return 0 on success, 1 on failure.
> > +*/
> > +int thd_net_read(const unsigned char **packet, size_t *len);
> > +
> > +/**
> > + Write a packet to the thread connection.
> > +
> > + @param packet packet to write to the connection
> > + @param len length of the packet
> > +
> > + @return 0 on success, 1 on failure
> > +*/
> > +int thd_net_write(const unsigned char *packet, size_t len);
> > +
> > +/**
> > + Flush write buffer of thread connection.
> > +
> > + @return 0 on success, 1 on failure
> > +*/
> > +int thd_net_flush();
> > +
> > +/**
> > + Read a packet from the connection.
> > +
> > + @param mysql mysql client connection
> > + @param packet return the pointer to the packet read
> > + @param len return the length of packet read
>
> Same as for thd_net_read above.
>
> > + @return 0 on success, 1 on failure.
> > +*/
> > +int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len);
> > +
> > +/**
> > + Write a packet to the connection.
> > +
> > + @param mysql mysql client connection
> > + @param packet packet to write to the connection
> > + @param len length of the packet
> > + @return 0 on success, 1 on failure
> > +*/
> > +int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len);
> > +
> > +/**
> > + Flush write buffer of connection.
> > +
> > + @param mysql mysql client connection
> > +
> > + @return 0 on success, 1 on failure
> > +*/
> > +int mysql_net_flush(MYSQL *mysql);
> > +
> > +/**
> > + Get the value of user variable as an integer.
> > +
> > + @param name user variable name
> > + @param value pointer to return the value
> > + @param null_value true if the value of variable is null, false if not
>
> For value and null_value, please state that the caller has to provide
> pointers to valid memory (the function will not allocate it). For
> null_value, please say something like "the function will set *null_value
> to true if ...", to clarify that it is not the caller that sets *null_value.
>
good
> Please specify what happens if the variable is not of integer type.
>
OK, I'll add some description to clarify this
> > +
> > + @return 0 success, 1 variable not found
> > +*/
> > +int get_user_var_int(const char *name,
> > + long long int *value, int *null_value);
> > +
> > +/**
> > + Get the value of user variable as a double precision float number.
> > +
> > + @param name user variable name
> > + @param value pointer to return the value
> > + @param null_value true if the value of variable is null, false if not
>
> Same as for get_user_var_int.
>
> > +
> > + @return 0 success, 1 variable not found
> > +*/
> > +int get_user_var_real(const char *name,
> > + double *value, int *null_value);
> > +
> > +/**
> > + Get the value of user variable as a string.
>
> Please state that if the variable holds a numeric value, it will be
> converted to a string representation.
>
OK, more description to clarify
> > +
> > + @param name user variable name
> > + @param value pointer to the value buffer
> > + @param len length of the value buffer
> > + @param precision precision of the value if it is a float number
> > + @param null_value true if the value of variable is null, false if not
>
> Same as for get_user_var_int.
>
> > +
> > + @return 0 success, 1 variable not found
> > +*/
> > +int get_user_var_str(const char *name,
> > + char *value, unsigned long len,
> > + unsigned int precision, int *null_value);
> > +
> > +
> > #ifdef __cplusplus
> > }
> > #endif
> >
> > === modified file 'libmysqld/Makefile.am'
> > --- a/libmysqld/Makefile.am 2008-03-27 22:47:31 +0000
> > +++ b/libmysqld/Makefile.am 2008-06-17 03:04:54 +0000
> > @@ -82,7 +82,8 @@
> > rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \
> > sql_tablespace.cc \
> > rpl_injector.cc my_user.c partition_info.cc \
> > - sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc
> > + sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \
> > + rpl_handler.cc
> >
> > libmysqld_int_a_SOURCES= $(libmysqld_sources)
> > nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)
> >
> > === modified file 'sql/Makefile.am'
> > --- a/sql/Makefile.am 2008-03-27 22:47:31 +0000
> > +++ b/sql/Makefile.am 2008-06-17 03:04:54 +0000
> > @@ -88,7 +88,8 @@
> > event_data_objects.h event_scheduler.h \
> > sql_partition.h partition_info.h partition_element.h \
> > probes.h sql_audit.h \
> > - contributors.h sql_servers.h ddl_blocker.h si_objects.h
> > + contributors.h sql_servers.h ddl_blocker.h si_objects.h \
> > + rpl_handler.h
> >
> > mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
> > item.cc item_sum.cc item_buff.cc item_func.cc \
> > @@ -133,7 +134,7 @@
> > sql_plugin.cc sql_binlog.cc \
> > sql_builtin.cc sql_tablespace.cc partition_info.cc \
> > sql_servers.cc sql_audit.cc sha2.cc \
> > - ddl_blocker.cc si_objects.cc
> > + ddl_blocker.cc si_objects.cc rpl_handler.cc
> >
> > if HAVE_DTRACE
> > mysqld_SOURCES += probes.d
> >
> > === modified file 'sql/handler.cc'
> > --- a/sql/handler.cc 2008-04-07 15:32:14 +0000
> > +++ b/sql/handler.cc 2008-06-17 03:04:54 +0000
> > @@ -33,6 +33,8 @@
> > #include "ha_partition.h"
> > #endif
> >
> > +#include "rpl_handler.h"
> > +
> > /*
> > While we have legacy_db_type, we have this array to
> > check for dups and to find handlerton from legacy_db_type.
> > @@ -1139,6 +1141,7 @@
> > if (cookie)
> > tc_log->unlog(cookie, xid);
> > DBUG_EXECUTE_IF("crash_commit_after", abort(););
> > + RUN_HOOK(transaction, after_commit, thd, all);
>
> !!!The return value from the callback is not taken into account.
>
Actually, this is done on purpose, because the transaction has already
been committed, the return value of this callback should not affect the
execution of the server. I am not quit sure if this is a good choice or
not.
> > end:
> > if (is_real_trans)
> > start_waiting_global_read_lock(thd);
> > @@ -1269,6 +1272,7 @@
> > push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
> > ER_WARNING_NOT_COMPLETE_ROLLBACK,
> > ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
> > + RUN_HOOK(transaction, after_rollback, thd, all);
>
> !!!The return value from the callback is not taken into account.
>
same as above
> > DBUG_RETURN(error);
> > }
> >
> >
> > === modified file 'sql/log.cc'
> > --- a/sql/log.cc 2008-04-14 10:15:04 +0000
> > +++ b/sql/log.cc 2008-06-17 03:04:54 +0000
> > @@ -39,6 +39,7 @@
> > #endif
> >
> > #include <mysql/plugin.h>
> > +#include "rpl_handler.h"
> >
> > /* max size of the log message */
> > #define MAX_LOG_BUFFER_SIZE 1024
> > @@ -3520,16 +3521,18 @@
> > }
> >
> >
> > -bool MYSQL_BIN_LOG::flush_and_sync()
> > +bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
> > {
> > int err=0, fd=log_file.file;
> > safe_mutex_assert_owner(&LOCK_log);
> > if (flush_io_cache(&log_file))
> > return 1;
> > + *synced= 0;
>
> !!! I think *synced= 0 should be moved to above the `if', so that
> *synced gets the correct value even if flush_io_cache() fails.
>
right, good catch.
> Also, I would suggest to allow `synced' to be NULL, in which case it
> will not be updated. This will be useful since many calls to
> flush_and_sync() ignore the value of `synced'.
>
> > if (++sync_binlog_counter >= sync_binlog_period &&
> sync_binlog_period)
> > {
> > sync_binlog_counter= 0;
> > err=my_sync(fd, MYF(MY_WME));
> > + *synced= 1;
> > }
> > return err;
> > }
> > @@ -3794,7 +3797,8 @@
> >
> > if (file == &log_file)
> > {
> > - error= flush_and_sync();
> > + bool synced;
> > + error= flush_and_sync(&synced);
> > if (!error)
> > {
> > signal_update();
> > @@ -3981,14 +3985,21 @@
> > if (event_info->write(file))
> > goto err;
> >
> > + error=0;
> > if (file == &log_file) // we are writing to the real log (disk)
> > {
> > - if (flush_and_sync())
> > + bool synced;
> > + if (flush_and_sync(&synced))
> > goto err;
> > +
> > + if (RUN_HOOK(binlog_storage, after_flush,
> > + thd, log_file_name, file->pos_in_file, synced)) {
> > + goto err;
> > + }
> > +
> > signal_update();
> > rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
> > }
> > - error=0;
> >
> > err:
> > if (error)
> > @@ -4244,8 +4255,9 @@
> >
> > DBUG_ASSERT(carry == 0);
> >
> > + bool synced=0;
> > if (sync_log)
> > - flush_and_sync();
> > + flush_and_sync(&synced);
> >
> > return 0; // All OK
> > }
> > @@ -4331,7 +4343,9 @@
> >
> > if (commit_event && commit_event->write(&log_file))
> > goto err;
> > - if (flush_and_sync())
> > +
> > + bool synced;
> > + if (flush_and_sync(&synced))
> > goto err;
> > DBUG_EXECUTE_IF("half_binlogged_transaction", abort(););
> > if (cache->error) // Error on read
> > @@ -4340,6 +4354,11 @@
> > write_error=1; // Don't give more errors
> > goto err;
> > }
> > +
> > + if (RUN_HOOK(binlog_storage, after_flush,
> > + thd, log_file_name, log_file.pos_in_file, synced))
> > + goto err;
> > +
> > signal_update();
> > }
> >
> >
> > === modified file 'sql/log.h'
> > --- a/sql/log.h 2008-02-03 09:00:49 +0000
> > +++ b/sql/log.h 2008-06-17 03:04:54 +0000
> > @@ -364,7 +364,7 @@
> > bool is_active(const char* log_file_name);
> > int update_log_index(LOG_INFO* linfo, bool need_update_threads);
> > void rotate_and_purge(uint flags);
> > - bool flush_and_sync();
> > + bool flush_and_sync(bool *synced);
> > int purge_logs(const char *to_log, bool included,
> > bool need_mutex, bool need_update_threads,
> > ulonglong *decrease_log_space);
> >
> > === modified file 'sql/mysqld.cc'
> > --- a/sql/mysqld.cc 2008-04-20 08:06:51 +0000
> > +++ b/sql/mysqld.cc 2008-06-17 03:04:54 +0000
> > @@ -33,6 +33,8 @@
> >
> > #include "rpl_injector.h"
> >
> > +#include "rpl_handler.h"
> > +
> > #ifdef HAVE_SYS_PRCTL_H
> > #include <sys/prctl.h>
> > #endif
> > @@ -1322,6 +1324,7 @@
> > ha_end();
> > if (tc_log)
> > tc_log->close();
> > + delegates_destroy();
> > xid_cache_free();
> > delete_elements(&key_caches, (void (*)(const char*, uchar*))
> free_key_cache);
> > multi_keycache_free();
> > @@ -3827,6 +3830,13 @@
> > unireg_abort(1);
> > }
> >
> > + /* initialize delegates for extension observers */
> > + if (delegates_init())
> > + {
> > + sql_print_error("Initialize extension delegates failed");
> > + unireg_abort(1);
> > + }
> > +
> > /* need to configure logging before initializing storage engines */
> > if (opt_update_log)
> > {
> >
> > === added file 'sql/replication.h'
> > --- a/sql/replication.h 1970-01-01 00:00:00 +0000
> > +++ b/sql/replication.h 2008-06-17 03:04:54 +0000
> > @@ -0,0 +1,385 @@
> > +/* Copyright (C) 2008 MySQL AB
> > +
> > + This program is free software; you can redistribute it and/or modify
> > + it under the terms of the GNU General Public License as published by
> > + the Free Software Foundation; version 2 of the License.
> > +
> > + This program is distributed in the hope that it will be useful,
> > + but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + GNU General Public License for more details.
> > +
> > + You should have received a copy of the GNU General Public License
> > + along with this program; if not, write to the Free Software
> > + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> */
> > +
> > +#ifndef REPLICATION_H
> > +#define REPLICATION_H
> > +
> > +/**
> > + Transaction observer flags.
> > +*/
> > +enum Trans_flags {
> > + /** Transaction is a real transaction */
> > + TRANS_IS_REAL_TRANS = 1
> > +};
> > +
> > +/**
> > + Transaction observer parameter
> > +*/
> > +typedef struct Trans_param {
> > + uint32 server_id;
> > + uint32 flags;
> > +} Trans_param;
> > +
> > +/**
> > + Observes and extends transaction execution
> > +*/
> > +typedef struct Trans_observer {
> > + uint32 len;
> > +
> > + /**
> > + After transaction commit to storage engines
> > +
> > + This callback is called right after commit to storage engines for
> > + transactional tables.
>
> Note that the first line of a doxygen comment will be used as a brief
> description of the function, so it should look like a complete sentence
> rather than a header. I think it would be more clear to merge the above
> two sentences to something like "Callback that will be invoked after
> each transaction commits to storage engines" or something similar.
>
I think you're right, this and the brief descriptions of callback
functions below will be changed.
> > +
> > + For non-transactional tables, this is called at the end of the
> > + statement, before sending statement status, if the statement
> > + succeeded.
> > +
> > + @param param The parameter for transaction observers
> > +
> > + @return 0 on sucess, 1 on failure
>
> Please say what the semantics is for success and failure. I.e., how does
> the server behave on failure? Does it rollback? Does it print an error
> message?
>
yes, I think it good to state how the return value affects the flow of
the server.
> > + */
> > + int (*after_commit)(Trans_param *param);
>
> Can the plugin modify param? If not, please make it const.
>
Maybe in the future, e.g. we can add a void *data member to param, and
developer of plugins can use this to pass around some data structure of
their own.
> > +
> > + /**
> > + After transaction rollback to storage engines
> > +
> > + This callback is called right after rollback to storage engines
> > + for transactional tables.
>
> Same as for Trans_observer::after_commit: please merge these two
> paragraph so the first line becomes a full sentence.
>
OK
> > +
> > + For non-transactional tables, this is called at the end of the
> > + statement, before sending statement status, if the statement
> > + failed.
> > +
> > + @param param The parameter for transaction observers
> > +
> > + @return 0 on sucess, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*after_rollback)(Trans_param *param);
>
> !!!Can the plugin modify param? If not, please make it const.
>
see above.
> > +} Trans_observer;
> > +
> > +/**
> > + Binlog storage flags
> > +*/
> > +enum Binlog_storage_flags {
> > + /** Binary log was sync:ed */
> > + BINLOG_STORAGE_IS_SYNCED = 1
> > +};
> > +
> > +/**
> > + Binlog storage observer parameters
> > + */
> > +typedef struct Binlog_storage_param {
> > + uint32 server_id;
> > +} Binlog_storage_param;
> > +
> > +/**
> > + Observe binlog logging storage
> > +*/
> > +typedef struct Binlog_storage_observer {
> > + uint32 len;
> > +
> > + /**
> > + After binlog has been updated and flushed
>
> Please extend this to a full description, similar to above.
>
OK
> > +
> > + @param param Observer common parameter
> > + @param log_file Binlog file name been updated
> > + @param log_pos Binlog position after update
> > + @param flags flags for binlog storage
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*after_flush)(Binlog_storage_param *param,
> > + const char *log_file, my_off_t log_pos,
> > + uint32 flags);
> > +} Binlog_storage_observer;
> > +
> > +/**
> > + Replication binlog tramsmitter (binlog dump) observer parameter.
>
> typo: transmitter with n
>
Yes
> > +*/
> > +typedef struct Binlog_transmit_param {
> > + uint32 server_id;
> > + uint32 flags;
> > +} Binlog_transmit_param;
> > +
> > +/**
> > + Observe and extends the binlog dumping thread.
> > +*/
> > +typedef struct Binlog_transmit_observer {
> > + uint32 len;
> > +
> > + /**
> > + Start binlog dumping thread
>
> Please extend to a full description, similar to above.
>
> > +
> > + @param param Observer common parameter
> > + @param log_file Binlog file name to transmit from
> > + @param log_pos Binlog position to transmit from
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*transmit_start)(Binlog_transmit_param *param,
> > + const char *log_file, my_off_t log_pos);
>
> !!!Can the plugin modify param? If not, please make it const.
>
> > +
> > + /**
> > + Start binlog dumping thread
>
> Please extend to a full description, similar to above.
>
> > +
> > + @param param Observer common parameter
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*transmit_stop)(Binlog_transmit_param *param);
>
> !!!Can the plugin modify param? If not, please make it const.
>
> > +
> > + /**
> > + Reserve bytes for header for binlog packet for transmision
>
> Please explain when this callback will be invoked and how it can be used
> by the plugin.
>
> Typo: transmission is with three s
>
right
> > +
> > + @param param Observer common parameter
> > + @param packet The packet buffer for transmision
>
> It should be specified who owns `packet', and whether it can be modified
> by the plugin. Please state that the server ensures that `packet' points
> to valid memory before invoking the callback, that the plugin can modify
> `packet' as it likes, and that the server will take care of freeing
> `packet' when needed.
>
good
> > + @param hlen Header length before and after reserve header for
> > + this observer
>
> !!!It is not clear what the purpose of `hlen' is. Isn't it equal to
> packet.length()? In that case, it is redundant and I think it should be
> dropped.
>
This interface has been changed, String is internal, should not be used
in interfaces, the type of 'packet' changed to 'unsigned char', and then
the length param is required.
> See also my comment in rpl_handler.cc below, inside the function
> "Binlog_transmit_delegate::reserve_header"
>
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*reserve_header)(Binlog_transmit_param *param,
> > + String *packet, ulong *len);
>
> !!!Can the plugin modify `param' and `packet'? If not, please make it const.
>
> > +
> > + /**
> > + Before sending an event packet
>
> Please extend to a full description, similar to above.
>
> > +
> > + @param param Observer common parameter
> > + @param packet Binlog event packet to send
>
> It should be specified who owns `packet', and whether it can be modified
> by the plugin. Please state that the server ensures that `packet' points
> to valid memory before invoking the callback, that the plugin can modify
> `packet' as it likes, and that the server will take care of freeing
> `packet' when needed.
>
OK
> > + @param log_file Binlog file name of the event packet to send
> > + @param log_pos Binlog position of the event packet to send
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*before_send_event)(Binlog_transmit_param *param,
> > + String *packet,
> > + const char *log_file, my_off_t log_pos );
>
> !!!Can the plugin modify `param'? If not, please make it const.
>
> > +
> > + /**
> > + After sending an event packet
> > +
> > + @param param Observer common parameter
> > + @param event_buf Binlog event packet buffer sent
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*after_send_event)(Binlog_transmit_param *param,
> > + const char *event_buf);
>
> !!!Can the plugin modify param? If not, please make it const.
>
> > +
> > + /**
> > + Reset transmision (binlog dumping) status
> > +
> > + This is called when executing the command RESET MASTER, and is
> > + used to reset status variables added by observers.
> > +
> > + @param param Observer common parameter
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*after_reset_master)(Binlog_transmit_param *param);
>
> Can the plugin modify param? If not, please make it const.
>
> > +} Binlog_transmit_observer;
> > +
> > +/**
> > + Binlog relay IO flags
> > +*/
> > +enum Binlog_relay_IO_flags {
> > + /** Binary relay log was sync:ed */
> > + BINLOG_RELAY_IS_SYNCED = 1
> > +};
> > +
> > +
> > +/**
> > + Replication binlog relay IO observer parameter
> > +*/
> > +typedef struct Binlog_relay_IO_param {
> > + uint32 server_id;
> > +
> > + /* Master host, user and port */
> > + char *host;
> > + char *user;
> > + uint port;
> > +
> > + char *master_log_name;
> > + my_off_t master_log_pos;
> > +
> > + MYSQL *mysql; /* the connection to master */
> > +} Binlog_relay_IO_param;
> > +
> > +/**
> > + Observes and extends the service of slave IO thread.
> > +*/
> > +typedef struct Binlog_relay_IO_observer {
> > + uint32 len;
> > +
> > + /**
> > + Start of slave IO thread
>
> Please make it a full sentence, like above.
>
> > +
> > + @param param Observer common parameter
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*thread_start)(Binlog_relay_IO_param *param);
>
> !!!Can the plugin modify param? If not, please make it const.
>
> > +
> > + /**
> > + Stop of slave IO thread
>
> Please make it a full sentence, like above.
>
> > +
> > + @param param Observer common parameter
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*thread_stop)(Binlog_relay_IO_param *param);
>
> !!!Can the plugin modify param? If not, please make it const.
>
> > +
> > + /**
> > + Before slave request binlog transmision from master
> > +
> > + This is called before slave issuing BINLOG_DUMP command to master
> > + to request binlog.
>
> Please merge these lines so that the first line becomes a full sentence,
> like above.
>
> > +
> > + @param param Observer common parameter
> > + @param flags binlog dump flags
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
>
> !!!Can the plugin modify param? If not, please make it const.
>
> > +
> > + /**
> > + After read an event packet from master
>
> Please extend to a full sentence, and explain what the plugin can use
> the callback for.
>
> > +
> > + @param param Observer common parameter
> > + @param packet The event packet read from master
> > + @param len Length of the event packet read from master
> > + @param event_buf The event packet return after process
> > + @param event_len The length of event packet return after process
>
> !!!This is not symmetric with `reserve_header'. `reserve_header' can
> modify the packet as it likes, whereas `after_read_event' can only
> return a substring of the packet. I would suggest to make
> `after_read_event' take a single `String *packet' as parameter, instead
> of `packet, len, event_buf, event_len'.
>
As stated in previous comment, String should not be used in the
interfaces, and I think it's not a good idea to allow a plugin to modify
the packet as it likes, so I changed the 'reserve_header' interface, it
now can only return the bytes it want to insert into the header, it can
not modify the thd->packet buffer directly.
> I understand that it is more efficient to take the substring by pointer
> arithmetic than by copying the string. However, that is an
> implementation detail that should not be reflected in the interface. If
> we need a more efficient plugin, we can later add functionality to the
> String class that allows the start of the text to be different from the
> beginning of the buffer.
>
> See also my comment in slave.cc below, in the diff chunk
> "@@ -2283,7 +2294,19 @@"
>
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*after_read_event)(Binlog_relay_IO_param *param,
> > + const char *packet, ulong len,
> > + const char **event_buf, ulong *event_len);
>
> !!!Can the plugin modify param? If not, please make it const.
>
> > +
> > + /**
> > + After written event to relay log
>
> Please extend to a full sentence.
>
> > +
> > + @param param Observer common parameter
> > + @param event_buf Event packet written to relay log
> > + @param event_len Length of the event packet written to relay log
> > + @param flags flags for relay log
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*after_queue_event)(Binlog_relay_IO_param *param,
> > + const char *event_buf, ulong event_len,
> > + uint32 flags);
>
> !!!If param cannot be modified by the plugin, please make it const.
>
> > +
> > + /**
> > + Reset slave relay log IO status
> > +
> > + @param param Observer common parameter
> > +
> > + @return 0 on success, 1 on failure
>
> Same as for Trans_observer::after_commit: please specify server semantics.
>
> > + */
> > + int (*after_reset_slave)(Binlog_relay_IO_param *param);
>
> !!!If param cannot be modified by the plugin, please make it const.
>
> > +} Binlog_relay_IO_observer;
> > +
> > +#ifdef __cplusplus
> > +extern "C" {
> > +#endif
> > +
> > +/**
> > + Register a transaction observer
> > +
> > + @param observer The transaction observer to register
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer already exists
> > +*/
> > +int register_trans_observer(Trans_observer *observer, void *p);
> > +
> > +/**
> > + Unregister a transaction observer
> > +
> > + @param observer The transaction observer to unregister
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer not exists
> > +*/
> > +int unregister_trans_observer(Trans_observer *observer, void *p);
> > +
> > +/**
> > + Register a binlog storage observer
> > +
> > + @param observer The binlog storage observer to register
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer already exists
> > +*/
> > +int register_binlog_storage_observer(Binlog_storage_observer *observer, void
> *p);
> > +
> > +/**
> > + Unregister a binlog storage observer
> > +
> > + @param observer The binlog storage observer to unregister
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer not exists
> > +*/
> > +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void
> *p);
> > +
> > +/**
> > + Register a binlog transmit observer
> > +
> > + @param observer The binlog transmit observer to register
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer already exists
> > +*/
> > +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void
> *p);
> > +
> > +/**
> > + Unregister a binlog transmit observer
> > +
> > + @param observer The binlog transmit observer to unregister
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer not exists
> > +*/
> > +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer,
> void *p);
> > +
> > +/**
> > + Register a binlog relay IO (slave IO thread) observer
> > +
> > + @param observer The binlog relay IO observer to register
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer already exists
> > +*/
> > +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> *p);
> > +
> > +/**
> > + Unregister a binlog relay IO (slave IO thread) observer
> > +
> > + @param observer The binlog relay IO observer to unregister
> > + @param p pointer to the internal plugin structure
> > +
> > + @return 0 if success, 1 if the observer not exists
> > +*/
> > +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
> void *p);
> > +
> > +#ifdef __cplusplus
> > +}
> > +#endif
> > +#endif /* REPLICATION_H */
> >
> > === added file 'sql/rpl_handler.cc'
> > --- a/sql/rpl_handler.cc 1970-01-01 00:00:00 +0000
> > +++ b/sql/rpl_handler.cc 2008-06-17 03:04:54 +0000
> > @@ -0,0 +1,401 @@
> > +/* Copyright (C) 2008 MySQL AB
> > +
> > + This program is free software; you can redistribute it and/or modify
> > + it under the terms of the GNU General Public License as published by
> > + the Free Software Foundation; version 2 of the License.
> > +
> > + This program is distributed in the hope that it will be useful,
> > + but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + GNU General Public License for more details.
> > +
> > + You should have received a copy of the GNU General Public License
> > + along with this program; if not, write to the Free Software
> > + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> */
> > +
> > +#include "mysql_priv.h"
> > +
> > +#include "rpl_mi.h"
> > +#include "sql_repl.h"
> > +#include "log_event.h"
> > +#include "rpl_filter.h"
> > +#include <my_dir.h>
> > +#include "rpl_handler.h"
> > +
> > +Trans_delegate transaction_delegate;
> > +Binlog_storage_delegate binlog_storage_delegate;
> > +#ifdef HAVE_REPLICATION
> > +Binlog_transmit_delegate binlog_transmit_delegate;
> > +Binlog_relay_IO_delegate binlog_relay_io_delegate;
> > +#endif /* HAVE_REPLICATION */
> > +
> > +int thd_net_read(const unsigned char **packet, size_t *len)
> > +{
> > + THD *thd= current_thd;
> > + ulong ret= my_net_read(&thd->net);
> > + if (ret == packet_error)
> > + return 1;
> > + *len= ret;
> > + *packet= thd->net.read_pos;
> > + return 0;
> > +}
> > +
> > +int thd_net_write(const unsigned char *packet, size_t len)
> > +{
> > + return my_net_write(¤t_thd->net, packet, len);
> > +}
> > +
> > +int thd_net_flush()
> > +{
> > + return net_flush(¤t_thd->net);
> > +}
> > +
> > +int mysql_net_read(MYSQL *mysql, const unsigned char **packet, size_t *len)
> > +{
> > + ulong ret= my_net_read(&mysql->net);
> > + if (ret == packet_error)
> > + return 1;
> > + *len= ret;
> > + *packet= mysql->net.read_pos;
> > + return 0;
> > +}
> > +
> > +int mysql_net_write(MYSQL *mysql, const unsigned char *packet, size_t len)
> > +{
> > + return my_net_write(&mysql->net, packet, len);
> > +}
> > +
> > +int mysql_net_flush(MYSQL *mysql)
> > +{
> > + return net_flush(&mysql->net);
> > +}
> > +
> > +int get_user_var_int(const char *name,
> > + long long int *value, int *null_value)
> > +{
> > + my_bool null_val;
> > + user_var_entry *entry=
> > + (user_var_entry*) hash_search(¤t_thd->user_vars,
> > + (uchar*) name, strlen(name));
> > + if (!entry)
> > + return 1;
> > + *value= entry->val_int(&null_val);
> > + if (null_value)
> > + *null_value= null_val;
> > + return 0;
> > +}
> > +
> > +int get_user_var_real(const char *name,
> > + double *value, int *null_value)
> > +{
> > + my_bool null_val;
> > + user_var_entry *entry=
> > + (user_var_entry*) hash_search(¤t_thd->user_vars,
> > + (uchar*) name, strlen(name));
> > + if (!entry)
> > + return 1;
> > + *value= entry->val_real(&null_val);
> > + if (null_value)
> > + *null_value= null_val;
> > + return 0;
> > +}
> > +
> > +int get_user_var_str(const char *name, char *value,
> > + size_t len, unsigned int precision, int *null_value)
> > +{
> > + String str;
> > + my_bool null_val;
> > + user_var_entry *entry=
> > + (user_var_entry*) hash_search(¤t_thd->user_vars,
> > + (uchar*) name, strlen(name));
> > + if (!entry)
> > + return 1;
> > + entry->val_str(&null_val, &str, precision);
> > + strncpy(value, str.c_ptr(), len);
> > + if (null_value)
> > + *null_value= null_val;
> > + return 0;
> > +}
> > +
> > +int delegates_init()
> > +{
> > + if (transaction_delegate.init()
> > + || binlog_storage_delegate.init()
> > +#ifdef HAVE_REPLICATION
> > + || binlog_transmit_delegate.init()
> > + || binlog_relay_io_delegate.init()
> > +#endif /* HAVE_REPLICATION */
> > + )
> > + return 1;
> > + return 0;
> > +}
> > +
> > +void delegates_destroy()
> > +{
> > + transaction_delegate.destroy();
> > + binlog_storage_delegate.destroy();
> > +#ifdef HAVE_REPLICATION
> > + binlog_transmit_delegate.destroy();
> > + binlog_relay_io_delegate.destroy();
> > +#endif /* HAVE_REPLICATION */
> > +}
> > +
> > +/*
> > + NOTE2ME: I (hezx) am not sure if it's correct to add observer
> > + plugins to the thd->lex list. Because afer each statement, all
> > + plugins add to thd->lex will be automatically unlocked.
> > + */
>
> Please add a comment explaining what the macro does.
>
OK, I'll try to clarify this
> > +#define FOREACH_OBSERVER(f, thd, args...) \
>
> !!!This syntax for variable-length argument lists is a gcc extension.
> See http://gcc.gnu.org/onlinedocs/gcc/Variadic-Macros.html .
> I'd suggest to replace this by a single argument, and let the caller
> provide the parentheses (like in DBUG_PRINT). So the caller will do
>
> FOREACH_OBSERVER(after_commit, thd, (¶m));
>
> or
>
> FOREACH_OBSERVER(after_flush, thd, (¶m, log_file, log_pos, flags));
>
right, Mats pointed out the same, already fixed in my new patch
> > + param.server_id= thd->server_id; \
> > + read_lock(); \
> > + Observer_info_iterator iter= observer_info_iter(); \
> > + Observer_info *info= iter++; \
> > + for (; info; info= iter++) \
> > + { \
> > + plugin_ref plugin= \
> > + my_plugin_lock(thd, &info->plugin); \
> > + if (!plugin) \
> > + { \
> > + unlock(); \
> > + return 1; \
> > + } \
> > + if (((Observer *)info->observer)->f \
> > + && ((Observer *)info->observer)->f(args)) \
>
> !!!If you follow my suggestion above regarding the variable-length
> argument list, here you have to replace "f(args)" by "f args"
>
> > + { \
> > + unlock(); \
> > + return 1; \
> > + } \
> > + } \
> > + unlock(); \
> > + return 0
>
> In the macro definition above, please use spaces instead of tabs.
>
OK
> > +
> > +
> > +int Trans_delegate::after_commit(THD *thd, bool all)
> > +{
> > + Trans_param param;
> > + if (all || thd->transaction.all.ha_list == 0)
> > + param.flags |= TRANS_IS_REAL_TRANS;
> > +
> > + FOREACH_OBSERVER(after_commit, thd, ¶m);
> > +}
> > +
> > +int Trans_delegate::after_rollback(THD *thd, bool all)
> > +{
> > + Trans_param param;
> > + if (all || thd->transaction.all.ha_list == 0)
> > + param.flags |= TRANS_IS_REAL_TRANS;
> > +
> > + FOREACH_OBSERVER(after_rollback, thd, ¶m);
> > +}
> > +
> > +int Binlog_storage_delegate::after_flush(THD *thd,
> > + const char *log_file,
> > + my_off_t log_pos,
> > + bool synced)
> > +{
> > + Binlog_storage_param param;
> > + uint32 flags=0;
> > + if (synced)
> > + flags |= BINLOG_STORAGE_IS_SYNCED;
> > +
> > + FOREACH_OBSERVER(after_flush, thd, ¶m, log_file, log_pos, flags);
> > +}
> > +
> > +#ifdef HAVE_REPLICATION
> > +int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
> > + const char *log_file,
> > + my_off_t log_pos)
> > +{
> > + Binlog_transmit_param param;
> > + param.flags= flags;
> > +
> > + FOREACH_OBSERVER(transmit_start, thd, ¶m, log_file, log_pos);
> > +}
> > +
> > +int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
> > +{
> > + Binlog_transmit_param param;
> > + param.flags= flags;
> > +
> > + FOREACH_OBSERVER(transmit_stop, thd, ¶m);
> > +}
> > +
> > +int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
> > + String *packet, ulong *len)
> > +{
> > + Binlog_transmit_param param;
> > + param.flags= flags;
> > +
> > + /* reserve and set default header */
> > + packet->length(0);
> > + packet->set("\0", 1, &my_charset_bin);
> > + *len= 1;
>
> OK, I think it's a good idea to refactor so that `packet' is reset in
> only one place in the code. However, I think the delegates should only
> be responsible for plugin-related things, not for resetting the packet.
> Can you do it in a separate function, so that:
>
> - instead of calling RUN_HOOK(reserve_header), you call
> dumpthread_begin_transmit_event()
>
> - dumpthread_begin_transmit_event() resets packet and calls
> RUN_HOOK(reserve_header)
>
Good, but I'd like to name the function 'reset_transmit_packet()'
> !!!When you have done this, you can add the line `*len=
> packet->length()' to dumpthread_begin_transmit_event(), and remove `len'
> from the reserve_handler prototype.
>
> > +
> > + FOREACH_OBSERVER(reserve_header, thd, ¶m, packet, len);
> > +}
> > +
> > +int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
> > + String *packet,
> > + const char *log_file,
> > + my_off_t log_pos)
> > +{
> > + Binlog_transmit_param param;
> > + param.flags= flags;
> > +
> > + FOREACH_OBSERVER(before_send_event, thd, ¶m, packet, log_file,
> log_pos);
> > +}
> > +
> > +int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
> > + const char *event_buf)
> > +{
> > + Binlog_transmit_param param;
> > + param.flags= flags;
> > +
> > + FOREACH_OBSERVER(after_send_event, thd, ¶m, event_buf);
> > +}
> > +
> > +int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
> > +
> > +{
> > + Binlog_transmit_param param;
> > + param.flags= flags;
> > +
> > + FOREACH_OBSERVER(after_reset_master, thd, ¶m);
> > +}
> > +
> > +int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
> > +{
> > + Binlog_relay_IO_param param;
> > + param.mysql= mi->mysql;
> > + param.user= mi->user;
> > + param.host= mi->host;
> > + param.port= mi->port;
> > + param.master_log_name= mi->master_log_name;
> > + param.master_log_pos= mi->master_log_pos;
> > +
> > + FOREACH_OBSERVER(thread_start, thd, ¶m);
> > +}
> > +
> > +int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
> > +{
> > +
> > + Binlog_relay_IO_param param;
> > + param.mysql= mi->mysql;
> > + param.user= mi->user;
> > + param.host= mi->host;
> > + param.port= mi->port;
> > + param.master_log_name= mi->master_log_name;
> > + param.master_log_pos= mi->master_log_pos;
> > +
> > + FOREACH_OBSERVER(thread_stop, thd, ¶m);
> > +}
> > +
> > +int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
> > + Master_info *mi,
> > + ushort flags)
> > +{
> > + Binlog_relay_IO_param param;
> > + param.mysql= mi->mysql;
> > + param.user= mi->user;
> > + param.host= mi->host;
> > + param.port= mi->port;
> > + param.master_log_name= mi->master_log_name;
> > + param.master_log_pos= mi->master_log_pos;
> > +
> > + FOREACH_OBSERVER(before_request_transmit, thd, ¶m, (uint32)flags);
> > +}
> > +
> > +int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
> > + const char *packet, ulong len,
> > + const char **event_buf,
> > + ulong *event_len)
> > +{
> > + Binlog_relay_IO_param param;
> > + param.mysql= mi->mysql;
> > + param.user= mi->user;
> > + param.host= mi->host;
> > + param.port= mi->port;
> > + param.master_log_name= mi->master_log_name;
> > + param.master_log_pos= mi->master_log_pos;
> > +
> > + FOREACH_OBSERVER(after_read_event, thd,
> > + ¶m, packet, len, event_buf, event_len);
> > +}
> > +
> > +int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
> > + const char *event_buf,
> > + ulong event_len,
> > + bool synced)
> > +{
> > + Binlog_relay_IO_param param;
> > + param.mysql= mi->mysql;
> > + param.user= mi->user;
> > + param.host= mi->host;
> > + param.port= mi->port;
> > + param.master_log_name= mi->master_log_name;
> > + param.master_log_pos= mi->master_log_pos;
> > +
> > + uint32 flags=0;
> > + if (synced)
> > + flags |= BINLOG_STORAGE_IS_SYNCED;
> > +
> > + FOREACH_OBSERVER(after_queue_event, thd,
> > + ¶m, event_buf, event_len, flags);
> > +}
> > +
> > +int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
> > +
> > +{
> > + Binlog_relay_IO_param param;
> > + param.mysql= mi->mysql;
> > + param.user= mi->user;
> > + param.host= mi->host;
> > + param.port= mi->port;
> > + param.master_log_name= mi->master_log_name;
> > + param.master_log_pos= mi->master_log_pos;
> > +
> > + FOREACH_OBSERVER(after_reset_slave, thd, ¶m);
> > +}
> > +#endif /* HAVE_REPLICATION */
> > +
> > +int register_trans_observer(Trans_observer *observer, void *p)
> > +{
> > + return transaction_delegate.add_observer(observer, (st_plugin_int *)p);
> > +}
> > +
> > +int unregister_trans_observer(Trans_observer *observer, void *p)
> > +{
> > + return transaction_delegate.remove_observer(observer, (st_plugin_int *)p);
> > +}
> > +
> > +int register_binlog_storage_observer(Binlog_storage_observer *observer, void
> *p)
> > +{
> > + return binlog_storage_delegate.add_observer(observer, (st_plugin_int *)p);
> > +}
> > +
> > +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void
> *p)
> > +{
> > + return binlog_storage_delegate.remove_observer(observer, (st_plugin_int
> *)p);
> > +}
> > +
> > +#ifdef HAVE_REPLICATION
> > +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void
> *p)
> > +{
> > + return binlog_transmit_delegate.add_observer(observer, (st_plugin_int *)p);
> > +}
> > +
> > +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer,
> void *p)
> > +{
> > + return binlog_transmit_delegate.remove_observer(observer, (st_plugin_int
> *)p);
> > +}
> > +
> > +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void
> *p)
> > +{
> > + return binlog_relay_io_delegate.add_observer(observer, (st_plugin_int *)p);
> > +}
> > +
> > +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer,
> void *p)
> > +{
> > + return binlog_relay_io_delegate.remove_observer(observer, (st_plugin_int
> *)p);
> > +}
> > +#endif /* HAVE_REPLICATION */
> >
> > === added file 'sql/rpl_handler.h'
> > --- a/sql/rpl_handler.h 1970-01-01 00:00:00 +0000
> > +++ b/sql/rpl_handler.h 2008-06-17 03:04:54 +0000
> > @@ -0,0 +1,195 @@
> > +/* Copyright (C) 2008 MySQL AB
> > +
> > + This program is free software; you can redistribute it and/or modify
> > + it under the terms of the GNU General Public License as published by
> > + the Free Software Foundation; version 2 of the License.
> > +
> > + This program is distributed in the hope that it will be useful,
> > + but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + GNU General Public License for more details.
> > +
> > + You should have received a copy of the GNU General Public License
> > + along with this program; if not, write to the Free Software
> > + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> */
> > +
> > +#ifndef RPL_HANDLER_H
> > +#define RPL_HANDLER_H
> > +
> > +#include "mysql_priv.h"
> > +#include "rpl_mi.h"
> > +#include "rpl_rli.h"
> > +#include "sql_plugin.h"
> > +#include "replication.h"
> > +
> > +class Observer_info {
> > +public:
> > + void *observer;
> > + st_plugin_int *plugin_int;
> > + plugin_ref plugin;
> > +
> > + Observer_info(void *ob, st_plugin_int *p)
> > + :observer(ob), plugin_int(p)
> > + {
> > +#ifdef DBUG_OFF
> > + plugin= plugin_int;
> > +#else
> > + plugin= &plugin_int;
> > +#endif
>
> Yuck :-) OK, I read the source and figured out why you need this. It's
> not your fault, but it breaks an important OO principle when the user of
> the type plugin_ref relies on the implementation details of plugin_ref.
> To avoid that, I suggest adding something like this to sql_plugin.h
> (near 'typedef struct st_plugin_int...'):
>
> #ifdef DBUG_OFF
> #define plugin_ref_from_pointer(ptr) (ptr)
> #else
> #define plugin_ref_from_pointer(ptr) (&(ptr))
> #endif
>
> And replace the preprocessor conditional above by
>
> plugin= plugin_ref_from_pointer(ptr);
>
There are already such macros defined in sql_plugin.cc, I could move
them to sql_plugin.h and then use them, the reason I did not do this is
because I only need it once, but now I think I should do it to reduce
confusion for others to read my code.
> > + }
> > +};
> > +
> > +class Delegate {
> > +public:
> > + typedef List<Observer_info> Observer_info_list;
> > + typedef List_iterator<Observer_info> Observer_info_iterator;
> > +
> > + int add_observer(void *observer, st_plugin_int *plugin)
> > + {
> > + int ret= FALSE;
> > + write_lock();
> > + Observer_info_iterator iter(observer_info_list);
> > + Observer_info *info= iter++;
> > + while (info && info->observer != observer)
> > + info= iter++;
> > + if (!info)
> > + {
> > + info= new Observer_info(observer, plugin);
>
> !!!Please check that info is not NULL (ouf of memory).
>
> > + observer_info_list.push_back(info, &memroot);
>
> !!!Please check the return value of push_back (1=out of memory)
>
OK
> > + }
> > + else
> > + ret= TRUE;
> > + unlock();
> > + return ret;
> > + }
> > +
> > + int remove_observer(void *observer, st_plugin_int *plugin)
> > + {
> > + int ret= FALSE;
> > + write_lock();
> > + Observer_info_iterator iter(observer_info_list);
> > + Observer_info *info= iter++;
> > + while (info && info->observer != observer)
> > + info= iter++;
> > + if (info)
> > + iter.remove();
> > + else
> > + ret= TRUE;
> > + unlock();
> > + return ret;
> > + }
> > +
> > + inline Observer_info_iterator observer_info_iter()
> > + {
> > + return Observer_info_iterator(observer_info_list);
> > + }
> > +
> > + inline bool is_empty()
> > + {
> > + return observer_info_list.is_empty();
> > + }
> > +
> > + inline int read_lock()
> > + {
> > + return rw_rdlock(&lock);
> > + }
> > +
> > + inline int write_lock()
> > + {
> > + return rw_wrlock(&lock);
> > + }
> > +
> > + inline int unlock()
> > + {
> > + return rw_unlock(&lock);
> > + }
> > +
> > + int init()
> > + {
> > + if (my_rwlock_init(&lock, NULL))
> > + return 1;
> > + init_sql_alloc(&memroot, 1024, 0);
> > + return 0;
> > + }
> > +
> > + void destroy()
> > + {
> > + rwlock_destroy(&lock);
> > + free_root(&memroot, MYF(0));
> > + }
> > +
> > + Delegate(){}
> > + ~Delegate() {}
> > +
> > +private:
> > + Observer_info_list observer_info_list;
> > + rw_lock_t lock;
> > + MEM_ROOT memroot;
> > +};
> > +
> > +class Trans_delegate
> > + :public Delegate {
> > +public:
> > + typedef Trans_observer Observer;
> > + int before_commit(THD *thd, bool all);
> > + int before_rollback(THD *thd, bool all);
> > + int after_commit(THD *thd, bool all);
> > + int after_rollback(THD *thd, bool all);
> > +};
> > +
> > +class Binlog_storage_delegate
> > + :public Delegate {
> > +public:
> > + typedef Binlog_storage_observer Observer;
> > + int after_flush(THD *thd, const char *log_file,
> > + my_off_t log_pos, bool synced);
> > +};
> > +
> > +#ifdef HAVE_REPLICATION
> > +class Binlog_transmit_delegate
> > + :public Delegate {
> > +public:
> > + typedef Binlog_transmit_observer Observer;
> > + int transmit_start(THD *thd, ushort flags,
> > + const char *log_file, my_off_t log_pos);
> > + int transmit_stop(THD *thd, ushort flags);
> > + int reserve_header(THD *thd, ushort flags, String *packet, ulong *len);
> > + int before_send_event(THD *thd, ushort flags,
> > + String *packet, const
> > + char *log_file, my_off_t log_pos );
> > + int after_send_event(THD *thd, ushort flags,
> > + const char *event_buf);
> > + int after_reset_master(THD *thd, ushort flags);
> > +};
> > +
> > +class Binlog_relay_IO_delegate
> > + :public Delegate {
> > +public:
> > + typedef Binlog_relay_IO_observer Observer;
> > + int thread_start(THD *thd, Master_info *mi);
> > + int thread_stop(THD *thd, Master_info *mi);
> > + int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
> > + int after_read_event(THD *thd, Master_info *mi,
> > + const char *packet, ulong len,
> > + const char **event_buf, ulong *event_len);
> > + int after_queue_event(THD *thd, Master_info *mi,
> > + const char *event_buf, ulong event_len,
> > + bool synced);
> > + int after_reset_slave(THD *thd, Master_info *mi);
> > +};
> > +#endif /* HAVE_REPLICATION */
> > +
> > +int delegates_init();
> > +void delegates_destroy();
> > +
> > +extern Trans_delegate transaction_delegate;
> > +extern Binlog_storage_delegate binlog_storage_delegate;
> > +#ifdef HAVE_REPLICATION
> > +extern Binlog_transmit_delegate binlog_transmit_delegate;
> > +extern Binlog_relay_IO_delegate binlog_relay_io_delegate;
> > +#endif /* HAVE_REPLICATION */
> > +
> > +#define RUN_HOOK(group, hook, args...) \
> > + group ##_delegate.hook(args)
> > +
> > +#endif /* RPL_HANDLER_H */
> >
> > === modified file 'sql/slave.cc'
> > --- a/sql/slave.cc 2008-04-20 12:25:54 +0000
> > +++ b/sql/slave.cc 2008-06-17 03:04:54 +0000
> > @@ -39,6 +39,7 @@
> > #include <sql_common.h>
> > #include <errmsg.h>
> > #include <mysys_err.h>
> > +#include "rpl_handler.h"
> >
> > #ifdef HAVE_REPLICATION
> >
> > @@ -1499,17 +1500,22 @@
> > }
> >
> >
> > -static int request_dump(MYSQL* mysql, Master_info* mi,
> > - bool *suppress_warnings)
> > +static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
> > + bool *suppress_warnings)
> > {
> > uchar buf[FN_REFLEN + 10];
> > int len;
> > - int binlog_flags = 0; // for now
> > + ushort binlog_flags = 0; // for now
> > char* logname = mi->master_log_name;
> > DBUG_ENTER("request_dump");
> >
> > *suppress_warnings= FALSE;
> >
> > + if (RUN_HOOK(binlog_relay_io,
> > + before_request_transmit,
> > + thd, mi, binlog_flags))
> > + DBUG_RETURN(1);
> > +
> > // TODO if big log files: Change next to int8store()
> > int4store(buf, (ulong) mi->master_log_pos);
> > int2store(buf + 4, binlog_flags);
> > @@ -2134,6 +2140,10 @@
> > mi->master_log_name,
> > llstr(mi->master_log_pos,llbuff)));
> >
> > +
> > + if (RUN_HOOK(binlog_relay_io, thread_start, thd, mi))
> > + goto err;
> > +
> > if (!(mi->mysql = mysql = mysql_init(NULL)))
> > {
> > mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
> > @@ -2208,7 +2218,7 @@
> > while (!io_slave_killed(thd,mi))
> > {
> > thd_proc_info(thd, "Requesting binlog dump");
> > - if (request_dump(mysql, mi, &suppress_warnings))
> > + if (request_dump(thd, mysql, mi, &suppress_warnings))
> > {
> > sql_print_error("Failed on request_dump()");
> > if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
> > @@ -2231,6 +2241,7 @@
> >
> > while (!io_slave_killed(thd,mi))
> > {
> > + const char* event_buf;
> > ulong event_len;
> > /*
> > We say "waiting" because read_event() will wait if there's nothing to
> > @@ -2283,7 +2294,19 @@
> >
> > retry_count=0; // ok event, reset retry counter
> > thd_proc_info(thd, "Queueing master event to the relay log");
> > - if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
> > + event_buf= (const char*)mysql->net.read_pos + 1;
> > + if (RUN_HOOK(binlog_relay_io, after_read_event,
> > + thd, mi,(const char*)mysql->net.read_pos + 1,
> > + event_len, &event_buf, &event_len))
>
> OK, so if you follow my suggestion above to use a single `String *'
> instead of `packet, len, event_buf, event_len', then you'll have to
> initialize the String here, so the four lines above become:
>
> ev_string.set(mysql->net.read_pos + 1,
> strlen(mysql->net.read_pos + 1), &my_charset_bin);
> if (RUN_HOOK(binlog_relay_io, after_read_event, thd, mi, &ev_string))
>
> and then you have to declare `String ev_string' at the beginning of the
> function and use `ev_string' instead of `event_buf' below.
>
Since String should be avoided in interfaces, so this will not be
changed.
> > + {
> > + sql_print_error("Failed to run 'after_read_event' hook");
> > + goto err;
> > + }
> > +
> > + /* XXX: 'synced' should be updated by queue_event to indicate
> > + whether event has been synced to disk */
> > + bool synced= 0;
> > + if (queue_event(mi, event_buf, event_len))
> > {
> > goto err;
> > }
> > @@ -2292,6 +2315,11 @@
> > sql_print_error("Failed to flush master info file");
> > goto err;
> > }
> > +
> > + if (RUN_HOOK(binlog_relay_io, after_queue_event,
> > + thd, mi, event_buf, event_len, synced))
> > + goto err;
> > +
> > /*
> > See if the relay logs take too much space.
> > We don't lock mi->rli.log_space_lock here; this dirty read saves
> time
> > @@ -2333,6 +2361,7 @@
> > sql_print_information("Slave I/O thread exiting, read up to log '%s',
> position %s",
> > IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
> > VOID(pthread_mutex_lock(&LOCK_thread_count));
> > + RUN_HOOK(binlog_relay_io, thread_stop, thd, mi);
>
> !!!The return value from the callback is not taken into account.
>
This is on purpose, the return value should not stop the slave IO thread
to exist.
> > thd->query = thd->db = 0; // extra safety
> > thd->query_length= thd->db_length= 0;
> > VOID(pthread_mutex_unlock(&LOCK_thread_count));
> >
> > === modified file 'sql/sql_parse.cc'
> > --- a/sql/sql_parse.cc 2008-04-16 07:53:20 +0000
> > +++ b/sql/sql_parse.cc 2008-06-17 03:04:54 +0000
> > @@ -21,6 +21,7 @@
> > #include <m_ctype.h>
> > #include <myisam.h>
> > #include <my_dir.h>
> > +#include "rpl_handler.h"
> >
> > #include "sp_head.h"
> > #include "sp.h"
> > @@ -1395,7 +1396,17 @@
> >
> > /* If commit fails, we should be able to reset the OK status. */
> > thd->main_da.can_overwrite_status= TRUE;
> > - ha_autocommit_or_rollback(thd, thd->is_error());
> > + if (thd->transaction.stmt.ha_list)
> > + {
> > + ha_autocommit_or_rollback(thd, thd->is_error());
> > + }
> > + else
> > + {
> > + if (!thd->is_error())
> > + RUN_HOOK(transaction, after_commit, thd, 0);
>
> !!!The return value from the callback is not taken into account.
>
> > + else
> > + RUN_HOOK(transaction, after_rollback, thd, 0);
>
> !!!The return value from the callback is not taken into account.
>
See above.
> > + }
> > thd->main_da.can_overwrite_status= FALSE;
> >
> > thd->transaction.stmt.reset();
> >
> > === modified file 'sql/sql_plugin.cc'
> > --- a/sql/sql_plugin.cc 2008-04-14 10:15:04 +0000
> > +++ b/sql/sql_plugin.cc 2008-06-17 03:04:54 +0000
> > @@ -44,7 +44,8 @@
> > { C_STRING_WITH_LEN("FTPARSER") },
> > { C_STRING_WITH_LEN("DAEMON") },
> > { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
> > - { C_STRING_WITH_LEN("AUDIT") }
> > + { C_STRING_WITH_LEN("AUDIT") },
> > + { C_STRING_WITH_LEN("REPLICATION") }
> > };
> >
> > extern int initialize_schema_table(st_plugin_int *plugin);
> > @@ -89,7 +90,8 @@
> > MYSQL_FTPARSER_INTERFACE_VERSION,
> > MYSQL_DAEMON_INTERFACE_VERSION,
> > MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
> > - MYSQL_AUDIT_INTERFACE_VERSION
> > + MYSQL_AUDIT_INTERFACE_VERSION,
> > + MYSQL_REPLICATION_INTERFACE_VERSION
> > };
> > static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
> > {
> > @@ -98,7 +100,8 @@
> > MYSQL_FTPARSER_INTERFACE_VERSION,
> > MYSQL_DAEMON_INTERFACE_VERSION,
> > MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
> > - MYSQL_AUDIT_INTERFACE_VERSION
> > + MYSQL_AUDIT_INTERFACE_VERSION,
> > + MYSQL_REPLICATION_INTERFACE_VERSION
> > };
> >
> > static bool initialized= 0;
> >
> > === modified file 'sql/sql_repl.cc'
> > --- a/sql/sql_repl.cc 2008-03-08 10:14:47 +0000
> > +++ b/sql/sql_repl.cc 2008-06-17 03:04:54 +0000
> > @@ -21,6 +21,7 @@
> > #include "log_event.h"
> > #include "rpl_filter.h"
> > #include <my_dir.h>
> > +#include "rpl_handler.h"
> >
> > int max_binlog_dump_events = 0; // unlimited
> > my_bool opt_sporadic_binlog_dump_fail = 0;
> > @@ -392,6 +393,9 @@
> > LOG_INFO linfo;
> > char *log_file_name = linfo.log_file_name;
> > char search_file_name[FN_REFLEN], *name;
> > +
> > + ulong ev_offset;
> > +
> > IO_CACHE log;
> > File file = -1;
> > String* packet = &thd->packet;
> > @@ -407,6 +411,10 @@
> > DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
> >
> > bzero((char*) &log,sizeof(log));
> > + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
> > + thd->server_id, log_ident, (ulong)pos);
> > +
> > + RUN_HOOK(binlog_transmit, transmit_start, thd, flags, log_ident, pos);
>
> !!!The return value from the callback is not taken into account.
>
This should be fixed.
> > /*
> > heartbeat_period from @master_heartbeat_period user variable
> > */
> > @@ -477,11 +485,12 @@
> > goto err;
> > }
> >
> > - /*
> > - We need to start a packet with something other than 255
> > - to distinguish it from error
> > - */
> > - packet->set("\0", 1, &my_charset_bin); /* This is the start of a new
> packet */
> > + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> > + {
> > + errmsg= "Failed to run hook 'reserve_header'";
> > + my_errno= ER_UNKNOWN_ERROR;
>
> !!!I'd suggest to define an ER_PLUGIN_ERROR in sql/share/errmsg.txt and
> use that instead of ER_UNKNOWN_ERROR.
>
Agreed that we should have one or even more specific error codes for
callback errors, but I'd like to delay this issue to the universal
replication interface, and leave this as a temporary solution of
semi-sync interface.
> > + goto err;
> > + }
> >
> > /*
> > Tell the client about the log name with a fake Rotate event;
> > @@ -521,7 +530,13 @@
> > my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
> > goto err;
> > }
> > - packet->set("\0", 1, &my_charset_bin);
> > + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> > + {
> > + errmsg= "Failed to run hook 'reserve_header'";
> > + my_errno= ER_UNKNOWN_ERROR;
>
> !!!As above, please define and use ER_PLUGIN_ERROR.
>
> > + goto err;
> > + }
> > +
> > /*
> > Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
> > this larger than the corresponding packet (query) sent
> > @@ -545,28 +560,28 @@
> > {
> > /*
> > The packet has offsets equal to the normal offsets in a binlog
> > - event +1 (the first character is \0).
> > + event + ev_offset (the first character is \0).
> > */
> > DBUG_PRINT("info",
> > ("Looked for a Format_description_log_event, found event type
> %d",
> > - (*packet)[EVENT_TYPE_OFFSET+1]));
> > - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
> > + (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
> > + if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
> > {
> > - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
> > + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
> > LOG_EVENT_BINLOG_IN_USE_F);
> > - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> > + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> > /*
> > mark that this event with "log_pos=0", so the slave
> > should not increment master's binlog position
> > (rli->group_master_log_pos)
> > */
> > - int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
> > + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
> > /*
> > if reconnect master sends FD event with `created' as 0
> > to avoid destroying temp tables.
> > */
> > int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
> > - ST_CREATED_OFFSET+1, (ulong) 0);
> > + ST_CREATED_OFFSET+ev_offset, (ulong) 0);
> > /* send it */
> > if (my_net_write(net, (uchar*) packet->ptr(),
> packet->length()))
> > {
> > @@ -593,7 +608,12 @@
> > */
> > }
> > /* reset the packet as we wrote to it in any case */
> > - packet->set("\0", 1, &my_charset_bin);
> > + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> > + {
> > + errmsg= "Failed to run hook 'reserve_header'";
> > + my_errno= ER_UNKNOWN_ERROR;
>
> !!!As above, please define and use ER_PLUGIN_ERROR.
>
> > + goto err;
> > + }
> > } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
> > else
> > {
> > @@ -605,6 +625,8 @@
> >
> > while (!net->error && net->vio != 0 &&
> !thd->killed)
> > {
> > + Log_event_type event_type= UNKNOWN_EVENT;
> > +
> > while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
> > {
> > #ifndef DBUG_OFF
> > @@ -620,17 +642,25 @@
> > log's filename does not change while it's active
> > */
> > if (coord)
> > - coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
> > + coord->pos= uint4korr(packet->ptr() + ev_offset +
> LOG_POS_OFFSET);
> >
> > - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
> > + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
> > + if (event_type == FORMAT_DESCRIPTION_EVENT)
> > {
> > - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
> > + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
> > LOG_EVENT_BINLOG_IN_USE_F);
> > - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> > + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
> > }
> > - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
> > + else if (event_type == STOP_EVENT)
> > binlog_can_be_corrupted= FALSE;
> >
> > + pos = my_b_tell(&log);
> > + if (RUN_HOOK(binlog_transmit, before_send_event,
> > + thd, flags, packet, log_file_name, pos))
> > + {
> > + errmsg= "run 'before_send_event' hook failed";
>
> !!!Please set my_errno
>
right
> > + goto err;
> > + }
> > if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
> > {
> > errmsg = "Failed on my_net_write()";
> > @@ -638,9 +668,8 @@
> > goto err;
> > }
> >
> > - DBUG_PRINT("info", ("log event code %d",
> > - (*packet)[LOG_EVENT_OFFSET+1] ));
> > - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
> > + DBUG_PRINT("info", ("log event code %d", event_type));
> > + if (event_type == LOAD_EVENT)
> > {
> > if (send_file(thd))
> > {
> > @@ -649,7 +678,20 @@
> > goto err;
> > }
> > }
> > - packet->set("\0", 1, &my_charset_bin);
> > +
> > + if (RUN_HOOK(binlog_transmit, after_send_event, thd, flags,
> packet->ptr()))
> > + {
> > + errmsg= "Failed to run hook 'after_send_event'";
> > + my_errno= ER_UNKNOWN_ERROR;
>
> !!!As above, please define and use ER_PLUGIN_ERROR.
>
> > + goto err;
> > + }
> > +
> > + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> > + {
> > + errmsg= "Failed to run hook 'reserve_header'";
> > + my_errno= ER_UNKNOWN_ERROR;
>
> !!!As above, please define and use ER_PLUGIN_ERROR.
>
> > + goto err;
> > + }
> > }
> >
> > /*
> > @@ -779,7 +821,15 @@
> >
> > if (read_packet)
> > {
> > - thd_proc_info(thd, "Sending binlog event to slave");
> > + thd_proc_info(thd, "Sending binlog event to slave");
> > + pos = my_b_tell(&log);
> > + if (RUN_HOOK(binlog_transmit, before_send_event,
> > + thd, flags, packet, log_file_name, pos))
> > + {
> > + errmsg= "run 'before_send_event' hook failed";
>
> !!!Please set my_errno
>
OK
> > + goto err;
> > + }
> > +
> > if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
> > {
> > errmsg = "Failed on my_net_write()";
> > @@ -787,7 +837,7 @@
> > goto err;
> > }
> >
> > - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
> > + if (event_type == LOAD_EVENT)
> > {
> > if (send_file(thd))
> > {
> > @@ -796,11 +846,18 @@
> > goto err;
> > }
> > }
> > - packet->set("\0", 1, &my_charset_bin);
> > - /*
> > - No need to net_flush because we will get to flush later when
> > - we hit EOF pretty quick
> > - */
> > +
> > + if (RUN_HOOK(binlog_transmit, after_send_event, thd, flags,
> packet->ptr()))
> > + {
> > + errmsg= "Failed to run hook 'after_send_event'";
>
> !!!Please set my_errno
>
OK
>
> > + goto err;
> > + }
> > +
> > + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> > + {
> > + errmsg= "Failed to run hook 'reserve_header'";
>
> !!!Please set my_errno
>
OK
> > + goto err;
> > + }
> > }
> >
> > if (fatal_error)
> > @@ -853,8 +910,12 @@
> > goto err;
> > }
> >
> > - packet->length(0);
> > - packet->append('\0');
> > + if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet,
> &ev_offset))
> > + {
> > + errmsg= "Failed to run hook 'reserve_header'";
> > + my_errno= ER_UNKNOWN_ERROR;
>
> !!!As above, please define and use ER_PLUGIN_ERROR.
>
> > + goto err;
> > + }
> > if (coord)
> > coord->file_name= log_file_name; // reset to the next
> > }
> > @@ -864,6 +925,7 @@
> > end_io_cache(&log);
> > (void)my_close(file, MYF(MY_WME));
> >
> > + RUN_HOOK(binlog_transmit, transmit_stop, thd, flags);
>
> !!!The return value from the callback is not taken into account.
>
> > my_eof(thd);
> > thd_proc_info(thd, "Waiting to finalize termination");
> > pthread_mutex_lock(&LOCK_thread_count);
> > @@ -874,6 +936,7 @@
> > err:
> > thd_proc_info(thd, "Waiting to finalize termination");
> > end_io_cache(&log);
> > + RUN_HOOK(binlog_transmit, transmit_stop, thd, flags);
>
> !!!The return value from the callback is not taken into account.
>
> > /*
> > Exclude iteration through thread list
> > this is needed for purge_logs() - it will iterate through
> > @@ -1134,6 +1197,7 @@
> > goto err;
> > }
> >
> > + RUN_HOOK(binlog_relay_io, after_reset_slave, thd, mi);
>
> !!!The return value from the callback is not taken into account.
>
> > err:
> > unlock_slave_threads(mi);
> > if (error)
> > @@ -1416,7 +1480,11 @@
> > ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
> > return 1;
> > }
> > - return mysql_bin_log.reset_logs(thd);
> > +
> > + if (mysql_bin_log.reset_logs(thd))
> > + return 1;
> > + RUN_HOOK(binlog_transmit, after_reset_master, thd, 0 /* flags */);
>
> !!!The return value from the callback is not taken into account.
>
The above four ignorance of return value are on purpose.
> > + return 0;
> > }
> >
> > int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
> > @@ -1923,5 +1991,3 @@
> > }
> >
> > #endif /* HAVE_REPLICATION */
> > -
> > -
> >
> >
> >
>