=== modified file 'include/mysql/plugin.h'
--- a/include/mysql/plugin.h	2008-03-27 19:02:15 +0000
+++ b/include/mysql/plugin.h	2008-06-05 09:05:38 +0000
@@ -16,6 +16,8 @@
 #ifndef _my_plugin_h
 #define _my_plugin_h
 
+#include <my_global.h>
+
 #ifdef __cplusplus
 class THD;
 class Item;
@@ -66,7 +68,8 @@
 #define MYSQL_DAEMON_PLUGIN          3  /* The daemon/raw plugin type */
 #define MYSQL_INFORMATION_SCHEMA_PLUGIN  4  /* The I_S plugin type */
 #define MYSQL_AUDIT_PLUGIN           5  /* The Audit plugin type        */
-#define MYSQL_MAX_PLUGIN_TYPE_NUM    6  /* The number of plugin types   */
+#define MYSQL_REPLICATION_PLUGIN     6	/* The replication plugin type */
+#define MYSQL_MAX_PLUGIN_TYPE_NUM    7  /* The number of plugin types   */
 
 /* We use the following strings to define licenses for plugins */
 #define PLUGIN_LICENSE_PROPRIETARY 0
@@ -461,6 +464,19 @@
 
 
 /*************************************************************************
+  API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
+*/
+#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
+
+/**
+   Replication plugin descriptor
+*/
+struct st_mysql_replication {
+  int interface_version;
+};
+
+
+/*************************************************************************
   st_mysql_value struct for reading values from mysqld.
   Used by server variables framework to parse user-provided values.
   Will be used for arguments when implementing UDFs.
@@ -610,6 +626,87 @@
                                    const char *key, unsigned int key_length,
                                    int using_trx);
 
+/**
+   Get the server id of thread connection.
+
+   @param thd       user thread connection handle
+
+   @return server id of current thread
+*/
+uint32 thd_server_id(MYSQL_THD thd);
+
+/**
+   Read a packet from the thread connection.
+
+   @param thd      user thread connection handle
+   @param packet   return the pointer to the packet read
+   @param len      return the length of packet read
+
+   @return 0 on success, 1 on failure.
+*/
+int thd_net_read(MYSQL_THD thd, const uchar **packet, size_t *len);
+
+/**
+   Write a packet to the thread connection.
+
+   @param thd      user thread connection handle
+
+   @return 0 on success, 1 on failure
+*/
+int thd_net_write(MYSQL_THD thd, const uchar *packet, size_t len);
+
+/**
+   Flush write buffer of thread connection.
+
+   @param thd      user thread connection handle
+
+   @return 0 on success, 1 on failure
+*/
+int thd_net_flush(MYSQL_THD thd);
+
+/**
+   Get the value of user variable as an integer.
+
+   @param thd      user thread connection handle
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param is_null  if this is not NULL, set to TRUE if the variable is not defined
+
+   @return always 0 now.
+*/
+int get_user_var_int(MYSQL_THD thd, const char *name,
+		     longlong *value, my_bool *is_null);
+
+/**
+   Get the value of user variable as a double precision float number.
+
+   @param thd      user thread connection handle
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param is_null  if this is not NULL, set to TRUE if the variable is not defined
+
+   @return always 0 now.
+*/
+int get_user_var_real(MYSQL_THD thd, const char *name,
+		      double *value, my_bool *is_null);
+
+/**
+   Get the value of user variable as a string.
+
+   @param thd      user thread connection handle
+   @param name     user variable name
+   @param value    pointer to the value buffer
+   @param len      length of the value buffer
+   @param precision precision of the value if it is a float number
+   @param is_null  if this is not NULL, set to TRUE if the variable is not defined
+
+   @return always 0 now.
+*/
+int get_user_var_str(MYSQL_THD thd, const char *name,
+		     char *value, ulong len,
+		     uint precision, my_bool *is_null);
+
+  
 #ifdef __cplusplus
 }
 #endif

=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2008-03-27 22:47:31 +0000
+++ b/libmysqld/Makefile.am	2008-06-05 09:05:38 +0000
@@ -82,7 +82,8 @@
 	rpl_filter.cc sql_partition.cc sql_builtin.cc sql_plugin.cc \
 	sql_tablespace.cc \
 	rpl_injector.cc my_user.c partition_info.cc \
-	sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc
+	sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \
+	rpl_handler.cc
 
 libmysqld_int_a_SOURCES= $(libmysqld_sources)
 nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2008-03-27 22:47:31 +0000
+++ b/sql/Makefile.am	2008-06-05 09:05:38 +0000
@@ -88,7 +88,8 @@
 			event_data_objects.h event_scheduler.h \
 			sql_partition.h partition_info.h partition_element.h \
 			probes.h sql_audit.h \
-			contributors.h sql_servers.h ddl_blocker.h si_objects.h
+			contributors.h sql_servers.h ddl_blocker.h si_objects.h \
+			rpl_handler.h
 
 mysqld_SOURCES =	sql_lex.cc sql_handler.cc sql_partition.cc \
 			item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -133,7 +134,7 @@
 			sql_plugin.cc sql_binlog.cc \
 			sql_builtin.cc sql_tablespace.cc partition_info.cc \
 			sql_servers.cc sql_audit.cc sha2.cc \
-			ddl_blocker.cc si_objects.cc
+			ddl_blocker.cc si_objects.cc rpl_handler.cc
 
 if HAVE_DTRACE
   mysqld_SOURCES += probes.d

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2008-04-07 15:32:14 +0000
+++ b/sql/handler.cc	2008-06-05 09:05:38 +0000
@@ -33,6 +33,8 @@
 #include "ha_partition.h"
 #endif
 
+#include "rpl_handler.h"
+
 /*
   While we have legacy_db_type, we have this array to
   check for dups and to find handlerton from legacy_db_type.
@@ -1134,11 +1136,21 @@
       }
       DBUG_EXECUTE_IF("crash_commit_after_log", abort(););
     }
+    if (RUN_HOOK(transaction, before_commit, thd, all))
+    {
+      /*
+	Transaction must be committed after log_xid, so we can not
+	rollback here, abort and let the recover recommit this
+	transaction.
+      */
+      abort();
+    }
     error=ha_commit_one_phase(thd, all) ? (cookie ? 2 : 1) : 0;
     DBUG_EXECUTE_IF("crash_commit_before_unlog", abort(););
     if (cookie)
       tc_log->unlog(cookie, xid);
     DBUG_EXECUTE_IF("crash_commit_after", abort(););
+    RUN_HOOK(transaction, after_commit, thd, all);
 end:
     if (is_real_trans)
       start_waiting_global_read_lock(thd);
@@ -1224,6 +1236,8 @@
 #ifdef USING_TRANSACTIONS
   if (ha_info)
   {
+    RUN_HOOK(transaction, before_rollback, thd, all);
+
     /* Close all cursors that can not survive ROLLBACK */
     if (is_real_trans)                          /* not a statement commit */
       thd->stmt_map.close_transient_cursors();
@@ -1269,6 +1283,7 @@
     push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                  ER_WARNING_NOT_COMPLETE_ROLLBACK,
                  ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
+  RUN_HOOK(transaction, after_rollback, thd, all);
   DBUG_RETURN(error);
 }
 

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2008-04-14 10:15:04 +0000
+++ b/sql/log.cc	2008-06-05 09:05:38 +0000
@@ -39,6 +39,7 @@
 #endif
 
 #include <mysql/plugin.h>
+#include "rpl_handler.h"
 
 /* max size of the log message */
 #define MAX_LOG_BUFFER_SIZE 1024
@@ -3981,14 +3982,20 @@
     if (event_info->write(file))
       goto err;
 
+    error=0;
     if (file == &log_file) // we are writing to the real log (disk)
     {
       if (flush_and_sync())
 	goto err;
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+		   thd, log_file_name, file->pos_in_file)) {
+        goto err;
+      }
+
       signal_update();
       rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
     }
-    error=0;
 
 err:
     if (error)
@@ -4340,6 +4347,11 @@
         write_error=1;				// Don't give more errors
         goto err;
       }
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+		   thd, log_file_name, log_file.pos_in_file))
+        goto err;
+
       signal_update();
     }
 

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2008-04-20 08:06:51 +0000
+++ b/sql/mysqld.cc	2008-06-05 09:05:38 +0000
@@ -33,6 +33,8 @@
 
 #include "rpl_injector.h"
 
+#include "rpl_handler.h"
+
 #ifdef HAVE_SYS_PRCTL_H
 #include <sys/prctl.h>
 #endif
@@ -1322,6 +1324,7 @@
   ha_end();
   if (tc_log)
     tc_log->close();
+  delegates_destroy();
   xid_cache_free();
   delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
   multi_keycache_free();
@@ -3827,6 +3830,13 @@
     unireg_abort(1);
   }
 
+  /* initialize delegates for extension observers */
+  if (delegates_init())
+  {
+    sql_print_error("Initialize extension delegates failed");
+    unireg_abort(1);
+  }
+
   /* need to configure logging before initializing storage engines */
   if (opt_update_log)
   {

=== added file 'sql/replication.h'
--- a/sql/replication.h	1970-01-01 00:00:00 +0000
+++ b/sql/replication.h	2008-06-05 09:05:38 +0000
@@ -0,0 +1,417 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#include <rpl_mi.h>
+#include <rpl_rli.h>
+
+/**
+   Transaction observer parameter
+*/
+typedef struct Trans_param {
+  MYSQL_THD thd;
+  bool is_real_trans;
+} Trans_param;
+
+/**
+   Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+  uint32 len;
+
+  /**
+     Before transaction commit to storage engines
+
+     This callback is called right before commit to storage engines
+     for transactional tables.
+
+     This is not called for non-transactional tables.
+     
+     @param param The parameter for transaction observers
+
+     @return 0 on sucess, 1 on failure
+   */
+  int (*before_commit)(Trans_param *param);
+
+  /**
+     Before transaction rollback to storage engines
+
+     This callback is called right before rollback to storage engines
+     for transactional tables.
+
+     This is not called for non-transactional tables.
+     
+     @param param The parameter for transaction observers
+
+     @return 0 on sucess, 1 on failure
+   */
+
+  int (*before_rollback)(Trans_param *param);
+  
+  /**
+     After transaction commit to storage engines
+
+     This callback is called right after commit to storage engines for
+     transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     succeeded.
+
+     @param param The parameter for transaction observers
+
+     @return 0 on sucess, 1 on failure
+  */
+  int (*after_commit)(Trans_param *param);
+
+  /**
+     After transaction rollback to storage engines
+
+     This callback is called right after rollback to storage engines
+     for transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     failed.
+
+     @param param The parameter for transaction observers
+
+     @return 0 on sucess, 1 on failure
+  */
+  int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+   Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+  MYSQL_THD thd;
+} Binlog_storage_param;
+
+/**
+   Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+  uint32 len;
+
+  /**
+     After binlog has been updated and flushed
+
+     @param param Observer common parameter
+     @param log_file Binlog file name been updated
+     @param log_pos Binlog position after update
+
+     @return 0 on success, 1 on failure
+  */
+  int (*after_flush)(Binlog_storage_param *param,
+		      const char *log_file, my_off_t log_pos);
+} Binlog_storage_observer;
+
+/**
+   Replication binlog tramsmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+  MYSQL_THD thd;
+  ushort flags;
+} Binlog_transmit_param;
+
+/**
+   Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+  uint32 len;
+  
+  /**
+     Start binlog dumping thread
+
+     @param param Observer common parameter
+     @param log_file Binlog file name to transmit from
+     @param log_pos Binlog position to transmit from
+
+     @return 0 on success, 1 on failure
+  */
+  int (*transmit_start)(Binlog_transmit_param *param,
+			 const char *log_file, my_off_t log_pos);
+
+  /**
+     Start binlog dumping thread
+
+     @param param Observer common parameter
+     
+     @return 0 on success, 1 on failure
+  */
+  int (*transmit_stop)(Binlog_transmit_param *param);
+
+  /**
+     Reserve bytes for header for binlog packet for transmision
+
+     @param param Observer common parameter
+     @param packet The packet buffer for transmision
+     @param hlen Header length before and after reserve header for
+     this observer
+
+     @return 0 on success, 1 on failure
+  */
+  int (*reserve_header)(Binlog_transmit_param *param,
+			 String *packet, ulong *len);
+
+  /**
+     Before sending an event packet
+
+     @param param Observer common parameter
+     @param packet Binlog event packet to send
+     @param log_file Binlog file name of the event packet to send
+     @param log_pos Binlog position of the event packet to send
+
+     @return 0 on success, 1 on failure
+  */
+  int (*before_send_event)(Binlog_transmit_param *param,
+			    String *packet,
+			    const char *log_file, my_off_t log_pos );
+
+  /**
+     After sending an event packet
+
+     @param param Observer common parameter
+     @param event_buf Binlog event packet buffer sent
+
+     @return 0 on success, 1 on failure
+   */
+  int (*after_send_event)(Binlog_transmit_param *param,
+			   const char *event_buf);
+
+  /**
+     Reset transmision (binlog dumping) status
+
+     This is called when executing the command RESET MASTER, and is
+     used to reset status variables added by observers.
+
+     @param param Observer common parameter
+
+     @return 0 on success, 1 on failure
+  */
+  int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+  Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+  MYSQL_THD thd;
+  Master_info *mi;
+} Binlog_relay_IO_param;
+
+/**
+   Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+  uint32 len;
+
+  /**
+     Start of slave IO thread
+
+     @param param Observer common parameter
+
+     @return 0 on success, 1 on failure
+  */
+  int (*thread_start)(Binlog_relay_IO_param *param);
+
+  /**
+     Stop of slave IO thread
+
+     @param param Observer common parameter
+
+     @return 0 on success, 1 on failure
+  */
+  int (*thread_stop)(Binlog_relay_IO_param *param);
+
+  /**
+     Before slave request binlog transmision from master
+
+     This is called before slave issuing BINLOG_DUMP command to master
+     to request binlog.
+
+     @param param Observer common parameter
+     @param flags point to binlog dump flags, observer can set special
+     flag bits
+
+     @return 0 on success, 1 on failure
+  */
+  int (*before_request_transmit)(Binlog_relay_IO_param *param, ushort *flags);
+
+  /**
+     After read an event packet from master
+
+     @param param Observer common parameter
+     @param packet The event packet read from master
+     @param len Length of the event packet read from master
+     @param event_buf The event packet return after process
+     @param event_len The length of event packet return after process
+
+     @return 0 on success, 1 on failure
+  */
+  int (*after_read_event)(Binlog_relay_IO_param *param,
+			  const char *packet, ulong len,
+			  const char **event_buf, ulong *event_len);
+
+  /**
+     After written event to relay log
+
+     @param param Observer common parameter
+     @param event_buf Event packet written to relay log
+     @param event_len Length of the event packet written to relay log
+
+     @return 0 on success, 1 on failure
+  */
+  int (*after_queue_event)(Binlog_relay_IO_param *param,
+			   const char *event_buf, ulong event_len);
+
+  /**
+     Reset slave relay log IO status
+     
+     @param param Observer common parameter
+
+     @return 0 on success, 1 on failure
+  */
+  int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+/**
+   Replicator structure for extension of replication.
+
+   This is a collection of observers for extension of replication for
+   convenience.
+ */
+typedef struct Replicator {
+  uint32 len;
+  
+  Trans_observer *transaction;
+  Binlog_storage_observer *binlog_storage;
+  Binlog_transmit_observer *binlog_transmit;
+  Binlog_relay_IO_observer *binlog_relay_io;
+} Replicator;
+
+/**
+   Register a replicator
+
+   A replicator contains pointers to transaction, binlog storage,
+   binlog transmit, relay IO and relay SQl observers.
+
+   This function is a convenient function to register these observers
+   if the corresponding pointers are not NULL.
+
+   @param replicator    the replicator to register
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the replicator already exists
+ */
+int register_replicator(Replicator *replicator, void *p);
+
+/**
+   Unregister a replicator
+
+   This function is a convenient function to unregister all the
+   observers that are not NULL in the Replicator structure.
+
+   @param replicator    the replicator to unregister
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the replicator not exists
+ */
+int unregister_replicator(Replicator *replicator, void *p);
+
+/**
+   Register a transaction observer
+
+   @param observer The transaction observer to register
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Unregister a transaction observer
+
+   @param observer The transaction observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Register a binlog storage observer
+
+   @param observer The binlog storage observer to register
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Unregister a binlog storage observer
+
+   @param observer The binlog storage observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Register a binlog transmit observer
+
+   @param observer The binlog transmit observer to register
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Unregister a binlog transmit observer
+
+   @param observer The binlog transmit observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Register a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to register
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Unregister a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @return 0 if success, 1 if the observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+#endif /* REPLICATION_H */

=== added file 'sql/rpl_handler.cc'
--- a/sql/rpl_handler.cc	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.cc	2008-06-05 09:05:38 +0000
@@ -0,0 +1,423 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate transaction_delegate;
+Binlog_storage_delegate binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate binlog_transmit_delegate;
+Binlog_relay_IO_delegate binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+inline MYSQL_THD thd_to_mysql_thd(THD *thd)
+{
+  return (MYSQL_THD)thd;
+}
+
+inline THD *mysql_thd_to_thd(MYSQL_THD thd)
+{
+  return (THD*)thd;
+}
+
+uint32 thd_server_id(MYSQL_THD thd)
+{
+  return thd_to_mysql_thd(thd)->server_id;
+}
+
+int thd_net_read(MYSQL_THD mysql_thd, const uchar **packet, size_t *len)
+{
+  THD *thd= mysql_thd_to_thd(mysql_thd);
+  ulong ret= my_net_read(&thd->net);
+  if (ret == packet_error)
+    return 1;
+  *len= ret;
+  *packet= thd->net.read_pos;
+  return 0;
+}
+
+int thd_net_write(MYSQL_THD thd, const uchar *packet, size_t len)
+{
+  return my_net_write(&mysql_thd_to_thd(thd)->net, packet, len);
+}
+
+int thd_net_flush(MYSQL_THD thd)
+{
+  return net_flush(&mysql_thd_to_thd(thd)->net);
+}
+
+int get_user_var_int(MYSQL_THD thd, const char *name,
+		     longlong *value, my_bool *null_value)
+{
+  my_bool null_value_;
+  if (!null_value)
+    null_value= &null_value_;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&mysql_thd_to_thd(thd)->user_vars, (uchar*) name,
+                                  strlen(name));
+  *value= entry? entry->val_int(null_value) : 0;
+  return 0;
+}
+
+int get_user_var_real(MYSQL_THD thd, const char *name,
+		     double *value, my_bool *null_value)
+{
+  my_bool null_value_;
+  if (!null_value)
+    null_value= &null_value_;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&mysql_thd_to_thd(thd)->user_vars, (uchar*) name,
+                                  strlen(name));
+  *value= entry? entry->val_real(null_value) : 0;
+  return 0;
+}
+
+int get_user_var_str(MYSQL_THD thd, const char *name,
+		     char *value, size_t len, uint precision, my_bool *null_value)
+{
+  String str;
+  my_bool null_value_;
+  if (!null_value)
+    null_value= &null_value_;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&mysql_thd_to_thd(thd)->user_vars, (uchar*) name,
+                                  strlen(name));
+  if (entry)
+  {
+    entry->val_str(null_value, &str, precision);
+    strncpy(value, str.c_ptr(), len);
+  }
+  else
+  {
+    *value= 0;
+  }
+  return 0;
+}
+
+int delegates_init()
+{
+  if (transaction_delegate.init()
+      || binlog_storage_delegate.init()
+#ifdef HAVE_REPLICATION
+      || binlog_transmit_delegate.init()
+      || binlog_relay_io_delegate.init()
+#endif /* HAVE_REPLICATION */
+      )
+    return 1;
+  return 0;
+}
+
+void delegates_destroy()
+{
+  transaction_delegate.destroy();
+  binlog_storage_delegate.destroy();
+#ifdef HAVE_REPLICATION
+  binlog_transmit_delegate.destroy();
+  binlog_relay_io_delegate.destroy();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+  NOTE2ME: I (hezx) am not sure if it's correct to add observer
+  plugins to the thd->lex list. Because afer each statement, all
+  plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(f, thd, args...)				\
+  read_lock();								\
+  Observer_info_iterator iter= observer_info_iter();			\
+  Observer_info *info= iter++;						\
+  for (; info; info= iter++)						\
+  {									\
+    plugin_ref plugin=							\
+      my_plugin_lock(thd, &info->plugin);				\
+    if (!plugin)							\
+    {									\
+      unlock();								\
+      return 1;								\
+    }									\
+    if (info->observer->f && info->observer->f(args))			\
+    {									\
+      unlock();								\
+      return 1;								\
+    }									\
+  }									\
+  unlock();								\
+  return 0
+
+
+int Trans_delegate::before_commit(THD *thd, bool all)
+{
+  Trans_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.is_real_trans= all || thd->transaction.all.ha_list == 0;
+
+  FOREACH_OBSERVER(before_commit, thd, &param);
+}
+
+int Trans_delegate::before_rollback(THD *thd, bool all)
+{
+  Trans_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.is_real_trans= all || thd->transaction.all.ha_list == 0;
+
+  FOREACH_OBSERVER(before_rollback, thd, &param);
+}
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+  Trans_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.is_real_trans= all || thd->transaction.all.ha_list == 0;
+
+  FOREACH_OBSERVER(after_commit, thd, &param);
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+  Trans_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.is_real_trans= all || thd->transaction.all.ha_list == 0;
+
+  FOREACH_OBSERVER(after_rollback, thd, &param);
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+					  const char *log_file,
+					  my_off_t log_pos)
+{
+  Binlog_storage_param param;
+  param.thd= (MYSQL_THD) thd;
+
+  FOREACH_OBSERVER(after_flush, thd, &param, log_file, log_pos);
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+					      const char *log_file,
+					      my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(transmit_start, thd, &param, log_file, log_pos);
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+  Binlog_transmit_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(transmit_stop, thd, &param);
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+					      String *packet, ulong *len)
+{
+  Binlog_transmit_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.flags= flags;
+
+  /* reserve and set default header */
+  packet->length(0);
+  packet->append("\0", 1);
+  *len= 1;
+
+  FOREACH_OBSERVER(reserve_header, thd, &param, packet, len);
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+						 String *packet,
+						 const char *log_file,
+						 my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(before_send_event, thd, &param, packet, log_file, log_pos);
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+						const char *event_buf)
+{
+  Binlog_transmit_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(after_send_event, thd, &param, event_buf);
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+  Binlog_transmit_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.flags= flags;
+
+  FOREACH_OBSERVER(after_reset_master, thd, &param);
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+  Binlog_relay_IO_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.mi= mi;
+
+  FOREACH_OBSERVER(thread_start, thd, &param);
+}
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+  Binlog_relay_IO_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.mi= mi;
+
+  FOREACH_OBSERVER(thread_stop, thd, &param);
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+						       Master_info *mi,
+						       ushort *flags)
+{
+  Binlog_relay_IO_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.mi= mi;
+
+  FOREACH_OBSERVER(before_request_transmit, thd, &param, flags);
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+						const char *packet, ulong len,
+						const char **event_buf,
+						ulong *event_len)
+{
+  Binlog_relay_IO_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.mi= mi;
+
+  FOREACH_OBSERVER(after_read_event, thd, &param, packet, len, event_buf, event_len);
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+						const char *event_buf,
+						ulong event_len)
+{
+  Binlog_relay_IO_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.mi= mi;
+
+  FOREACH_OBSERVER(after_queue_event, thd, &param, event_buf, event_len);
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+  Binlog_relay_IO_param param;
+  param.thd= (MYSQL_THD) thd;
+  param.mi= mi;
+
+  FOREACH_OBSERVER(after_reset_slave, thd, &param);
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate.add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate.remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate.add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate.remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate.add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate.remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate.add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate.remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */
+
+int register_replicator(Replicator *replicator, void *p)
+{
+  if (replicator->transaction)
+    if (register_trans_observer(replicator->transaction, p))
+      return 1;
+  if (replicator->binlog_storage)
+    if (register_binlog_storage_observer(replicator->binlog_storage, p))
+      return 1;
+#ifdef HAVE_REPLICATION
+  if (replicator->binlog_transmit)
+    if (register_binlog_transmit_observer(replicator->binlog_transmit, p))
+      return 1;
+  if (replicator->binlog_relay_io)
+    if (register_binlog_relay_io_observer(replicator->binlog_relay_io, p))
+      return 1;
+#endif /* HAVE_REPLICATION */
+  return 0;
+}
+
+int unregister_replicator(Replicator *replicator, void *p)
+{
+  if (replicator->transaction)
+    if (unregister_trans_observer(replicator->transaction, p))
+      return 1;
+  if (replicator->binlog_storage)
+    if (unregister_binlog_storage_observer(replicator->binlog_storage, p))
+      return 1;
+#ifdef HAVE_REPLICATION
+  if (replicator->binlog_transmit)
+    if (unregister_binlog_transmit_observer(replicator->binlog_transmit, p))
+      return 1;
+  if (replicator->binlog_relay_io)
+    if (unregister_binlog_relay_io_observer(replicator->binlog_relay_io, p))
+      return 1;
+#endif /* HAVE_REPLICATION */
+  return 0;
+}

=== added file 'sql/rpl_handler.h'
--- a/sql/rpl_handler.h	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.h	2008-06-05 09:05:38 +0000
@@ -0,0 +1,193 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+template <typename T>
+class Observer_info_ {
+public:
+  T *observer;
+  st_plugin_int *plugin_int;
+  plugin_ref plugin;
+
+  Observer_info_(T *ob,st_plugin_int *p)
+    :observer(ob), plugin_int(p)
+    {
+#ifdef DBUG_OFF
+      plugin= plugin_int;
+#else
+      plugin= &plugin_int;
+#endif
+    }
+};
+
+template <typename T>
+class Delegate {
+public:
+  typedef T Observer;
+  typedef Observer_info_<Observer> Observer_info;
+  typedef List<Observer_info> Observer_info_list;
+  typedef List_iterator<Observer_info> Observer_info_iterator;
+  
+  int add_observer(Observer *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (!info)
+    {
+      info= new Observer_info(observer, plugin);
+      observer_info_list.push_back(info, &memroot);
+    }
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+  
+  int remove_observer(Observer *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (info)
+      iter.remove();
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+
+  inline Observer_info_iterator observer_info_iter()
+  {
+    return Observer_info_iterator(observer_info_list);
+  }
+
+  inline bool is_empty()
+  {
+    return observer_info_list.is_empty();
+  }
+
+  inline int read_lock()
+  {
+    return rw_rdlock(&lock);
+  }
+
+  inline int write_lock()
+  {
+    return rw_wrlock(&lock);
+  }
+
+  inline int unlock()
+  {
+    return rw_unlock(&lock);
+  }
+  
+  int init()
+  {
+    if (my_rwlock_init(&lock, NULL))
+      return 1;
+    init_sql_alloc(&memroot, 1024, 0);
+    return 0;
+  }
+
+  void destroy()
+  {
+    rwlock_destroy(&lock);
+    free_root(&memroot, MYF(0));
+  }
+
+  Delegate(){}
+  ~Delegate() {}
+
+private:
+  Observer_info_list observer_info_list;
+  rw_lock_t lock;
+  MEM_ROOT memroot;
+};
+
+class Trans_delegate
+  :public Delegate<Trans_observer> {
+public:
+  int before_commit(THD *thd, bool all);
+  int before_rollback(THD *thd, bool all);
+  int after_commit(THD *thd, bool all);
+  int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+  :public Delegate<Binlog_storage_observer> {
+public:
+  int after_flush(THD *thd, const char *log_file, my_off_t log_pos);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+  :public Delegate<Binlog_transmit_observer> {
+public:
+  int transmit_start(THD *thd, ushort flags,
+	     const char *log_file, my_off_t log_pos);
+  int transmit_stop(THD *thd, ushort flags);
+  int reserve_header(THD *thd, ushort flags, String *packet, ulong *len);
+  int before_send_event(THD *thd, ushort flags,
+			 String *packet, const
+			 char *log_file, my_off_t log_pos );
+  int after_send_event(THD *thd, ushort flags,
+			const char *event_buf);
+  int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+  :public Delegate<Binlog_relay_IO_observer> {
+public:
+  int thread_start(THD *thd, Master_info *mi);
+  int thread_stop(THD *thd, Master_info *mi);
+  int before_request_transmit(THD *thd, Master_info *mi, ushort *flags);
+  int after_read_event(THD *thd, Master_info *mi,
+			const char *packet, ulong len,
+			const char **event_buf, ulong *event_len);
+  int after_queue_event(THD *thd, Master_info *mi,
+			 const char *event_buf, ulong event_len);
+  int after_reset_slave(THD *thd, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate transaction_delegate;
+extern Binlog_storage_delegate binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+#define RUN_HOOK(group, hook, args...)		\
+  group ##_delegate.hook(args)
+
+#endif /* RPL_HANDLER_H */

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2008-04-20 12:25:54 +0000
+++ b/sql/slave.cc	2008-06-05 09:05:38 +0000
@@ -39,6 +39,7 @@
 #include <sql_common.h>
 #include <errmsg.h>
 #include <mysys_err.h>
+#include "rpl_handler.h"
 
 #ifdef HAVE_REPLICATION
 
@@ -1499,17 +1500,22 @@
 }
 
 
-static int request_dump(MYSQL* mysql, Master_info* mi,
-                        bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+			bool *suppress_warnings)
 {
   uchar buf[FN_REFLEN + 10];
   int len;
-  int binlog_flags = 0; // for now
+  ushort binlog_flags = 0; // for now
   char* logname = mi->master_log_name;
   DBUG_ENTER("request_dump");
   
   *suppress_warnings= FALSE;
 
+  if (RUN_HOOK(binlog_relay_io,
+	       before_request_transmit,
+	       thd, mi, &binlog_flags))
+    DBUG_RETURN(1);
+  
   // TODO if big log files: Change next to int8store()
   int4store(buf, (ulong) mi->master_log_pos);
   int2store(buf + 4, binlog_flags);
@@ -2134,6 +2140,10 @@
                             mi->master_log_name,
                             llstr(mi->master_log_pos,llbuff)));
 
+
+  if (RUN_HOOK(binlog_relay_io, thread_start, thd, mi))
+    goto err;
+
   if (!(mi->mysql = mysql = mysql_init(NULL)))
   {
     mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2208,7 +2218,7 @@
   while (!io_slave_killed(thd,mi))
   {
     thd_proc_info(thd, "Requesting binlog dump");
-    if (request_dump(mysql, mi, &suppress_warnings))
+    if (request_dump(thd, mysql, mi, &suppress_warnings))
     {
       sql_print_error("Failed on request_dump()");
       if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2231,6 +2241,7 @@
 
     while (!io_slave_killed(thd,mi))
     {
+      const char* event_buf;
       ulong event_len;
       /*
          We say "waiting" because read_event() will wait if there's nothing to
@@ -2283,7 +2294,15 @@
 
       retry_count=0;                    // ok event, reset retry counter
       thd_proc_info(thd, "Queueing master event to the relay log");
-      if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
+      event_buf= (const char*)mysql->net.read_pos + 1;
+      if (RUN_HOOK(binlog_relay_io, after_read_event,
+		   thd, mi,(const char*)mysql->net.read_pos + 1,
+		   event_len, &event_buf, &event_len))
+      {
+        sql_print_error("Failed to run 'after_read_event' hook");
+	goto err;
+      }
+      if (queue_event(mi, event_buf, event_len))
       {
         goto err;
       }
@@ -2292,6 +2311,11 @@
         sql_print_error("Failed to flush master info file");
         goto err;
       }
+
+      if (RUN_HOOK(binlog_relay_io, after_queue_event,
+		   thd, mi, event_buf, event_len))
+        goto err;
+
       /*
         See if the relay logs take too much space.
         We don't lock mi->rli.log_space_lock here; this dirty read saves time
@@ -2333,6 +2357,7 @@
   sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
                   IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
   VOID(pthread_mutex_lock(&LOCK_thread_count));
+  RUN_HOOK(binlog_relay_io, thread_stop, thd, mi);
   thd->query = thd->db = 0; // extra safety
   thd->query_length= thd->db_length= 0;
   VOID(pthread_mutex_unlock(&LOCK_thread_count));

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2008-04-16 07:53:20 +0000
+++ b/sql/sql_parse.cc	2008-06-05 09:05:38 +0000
@@ -21,6 +21,7 @@
 #include <m_ctype.h>
 #include <myisam.h>
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 #include "sp_head.h"
 #include "sp.h"
@@ -1395,7 +1396,21 @@
 
   /* If commit fails, we should be able to reset the OK status. */
   thd->main_da.can_overwrite_status= TRUE;
-  ha_autocommit_or_rollback(thd, thd->is_error());
+  if (thd->transaction.stmt.ha_list)
+  {
+    ha_autocommit_or_rollback(thd, thd->is_error());
+  }
+  else
+  {
+    if (!thd->is_error())
+    {
+      RUN_HOOK(transaction, after_commit, thd, 0);
+    }
+    else
+    {
+      RUN_HOOK(transaction, after_rollback, thd, 0);
+    }
+  }
   thd->main_da.can_overwrite_status= FALSE;
 
   thd->transaction.stmt.reset();

=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc	2008-04-14 10:15:04 +0000
+++ b/sql/sql_plugin.cc	2008-06-05 09:05:38 +0000
@@ -44,7 +44,8 @@
   { C_STRING_WITH_LEN("FTPARSER") },
   { C_STRING_WITH_LEN("DAEMON") },
   { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
-  { C_STRING_WITH_LEN("AUDIT") }
+  { C_STRING_WITH_LEN("AUDIT") },
+  { C_STRING_WITH_LEN("REPLICATION") }
 };
 
 extern int initialize_schema_table(st_plugin_int *plugin);
@@ -89,7 +90,8 @@
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
   MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
-  MYSQL_AUDIT_INTERFACE_VERSION
+  MYSQL_AUDIT_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION
 };
 static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
 {
@@ -98,7 +100,8 @@
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
   MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
-  MYSQL_AUDIT_INTERFACE_VERSION
+  MYSQL_AUDIT_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION
 };
 
 static bool initialized= 0;

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2008-03-08 10:14:47 +0000
+++ b/sql/sql_repl.cc	2008-06-05 09:05:38 +0000
@@ -21,6 +21,7 @@
 #include "log_event.h"
 #include "rpl_filter.h"
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 int max_binlog_dump_events = 0; // unlimited
 my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -392,6 +393,9 @@
   LOG_INFO linfo;
   char *log_file_name = linfo.log_file_name;
   char search_file_name[FN_REFLEN], *name;
+
+  ulong ev_offset;
+
   IO_CACHE log;
   File file = -1;
   String* packet = &thd->packet;
@@ -407,6 +411,10 @@
   DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
   bzero((char*) &log,sizeof(log));
+  sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+                        thd->server_id, log_ident, (ulong)pos);
+
+  RUN_HOOK(binlog_transmit, transmit_start, thd, flags, log_ident, pos);
   /* 
      heartbeat_period from @master_heartbeat_period user variable
   */
@@ -477,11 +485,12 @@
     goto err;
   }
 
-  /*
-    We need to start a packet with something other than 255
-    to distinguish it from error
-  */
-  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+  if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet, &ev_offset))
+  {
+    errmsg= "Failed to run hook 'reserve_header'";
+    my_errno= ER_UNKNOWN_ERROR;
+    goto err;
+  }
 
   /*
     Tell the client about the log name with a fake Rotate event;
@@ -521,7 +530,13 @@
     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
     goto err;
   }
-  packet->set("\0", 1, &my_charset_bin);
+  if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet, &ev_offset))
+  {
+    errmsg= "Failed to run hook 'reserve_header'";
+    my_errno= ER_UNKNOWN_ERROR;
+    goto err;
+  }
+
   /*
     Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
     this larger than the corresponding packet (query) sent 
@@ -545,28 +560,28 @@
      {
        /*
          The packet has offsets equal to the normal offsets in a binlog
-         event +1 (the first character is \0).
+         event + ev_offset (the first character is \0).
        */
        DBUG_PRINT("info",
                   ("Looked for a Format_description_log_event, found event type %d",
-                   (*packet)[EVENT_TYPE_OFFSET+1]));
-       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+                   (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+       if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
        {
-         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                        LOG_EVENT_BINLOG_IN_USE_F);
-         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
          /*
            mark that this event with "log_pos=0", so the slave
            should not increment master's binlog position
            (rli->group_master_log_pos)
          */
-         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+         int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
          /*
            if reconnect master sends FD event with `created' as 0
            to avoid destroying temp tables.
           */
          int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
-                   ST_CREATED_OFFSET+1, (ulong) 0);
+                   ST_CREATED_OFFSET+ev_offset, (ulong) 0);
          /* send it */
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
          {
@@ -593,7 +608,12 @@
        */
      }
      /* reset the packet as we wrote to it in any case */
-     packet->set("\0", 1, &my_charset_bin);
+     if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet, &ev_offset))
+     {
+       errmsg= "Failed to run hook 'reserve_header'";
+       my_errno= ER_UNKNOWN_ERROR;
+       goto err;
+     }
   } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
   else
   {
@@ -605,6 +625,8 @@
 
   while (!net->error && net->vio != 0 && !thd->killed)
   {
+    Log_event_type event_type= UNKNOWN_EVENT;
+
     while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
     {
 #ifndef DBUG_OFF
@@ -620,17 +642,25 @@
         log's filename does not change while it's active
       */
       if (coord)
-        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
+        coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
 
-      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+      event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+      if (event_type == FORMAT_DESCRIPTION_EVENT)
       {
-        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                       LOG_EVENT_BINLOG_IN_USE_F);
-        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+        (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
       }
-      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+      else if (event_type == STOP_EVENT)
         binlog_can_be_corrupted= FALSE;
 
+      pos = my_b_tell(&log);
+      if (RUN_HOOK(binlog_transmit, before_send_event,
+		   thd, flags, packet, log_file_name, pos))
+      {
+        errmsg= "run 'before_send_event' hook failed";
+	goto err;
+      }
       if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
       {
 	errmsg = "Failed on my_net_write()";
@@ -638,9 +668,8 @@
 	goto err;
       }
 
-      DBUG_PRINT("info", ("log event code %d",
-			  (*packet)[LOG_EVENT_OFFSET+1] ));
-      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+      DBUG_PRINT("info", ("log event code %d", event_type));
+      if (event_type == LOAD_EVENT)
       {
 	if (send_file(thd))
 	{
@@ -649,7 +678,20 @@
 	  goto err;
 	}
       }
-      packet->set("\0", 1, &my_charset_bin);
+
+      if (RUN_HOOK(binlog_transmit, after_send_event, thd, flags, packet->ptr()))
+      {
+	errmsg= "Failed to run hook 'after_send_event'";
+	my_errno= ER_UNKNOWN_ERROR;
+        goto err;
+      }
+
+      if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet, &ev_offset))
+      {
+	errmsg= "Failed to run hook 'reserve_header'";
+	my_errno= ER_UNKNOWN_ERROR;
+	goto err;
+      }
     }
 
     /*
@@ -779,7 +821,15 @@
 
 	if (read_packet)
 	{
-	  thd_proc_info(thd, "Sending binlog event to slave");
+          thd_proc_info(thd, "Sending binlog event to slave");
+          pos = my_b_tell(&log);
+	  if (RUN_HOOK(binlog_transmit, before_send_event,
+		       thd, flags, packet, log_file_name, pos))
+	  {
+            errmsg= "run 'before_send_event' hook failed";
+            goto err;
+	  }
+
 	  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 	  {
 	    errmsg = "Failed on my_net_write()";
@@ -787,7 +837,7 @@
 	    goto err;
 	  }
 
-	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+	  if (event_type == LOAD_EVENT)
 	  {
 	    if (send_file(thd))
 	    {
@@ -796,11 +846,18 @@
 	      goto err;
 	    }
 	  }
-	  packet->set("\0", 1, &my_charset_bin);
-	  /*
-	    No need to net_flush because we will get to flush later when
-	    we hit EOF pretty quick
-	  */
+
+	  if (RUN_HOOK(binlog_transmit, after_send_event, thd, flags, packet->ptr()))
+	  {
+            errmsg= "Failed to run hook 'after_send_event'";
+            goto err;
+	  }
+
+	  if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet, &ev_offset))
+	  {
+	    errmsg= "Failed to run hook 'reserve_header'";
+	    goto err;
+	  }
 	}
 
 	if (fatal_error)
@@ -853,8 +910,12 @@
 	goto err;
       }
 
-      packet->length(0);
-      packet->append('\0');
+      if (RUN_HOOK(binlog_transmit, reserve_header, thd, flags, packet, &ev_offset))
+      {
+	errmsg= "Failed to run hook 'reserve_header'";
+	my_errno= ER_UNKNOWN_ERROR;
+	goto err;
+      }
       if (coord)
         coord->file_name= log_file_name; // reset to the next
     }
@@ -864,6 +925,7 @@
   end_io_cache(&log);
   (void)my_close(file, MYF(MY_WME));
 
+  RUN_HOOK(binlog_transmit, transmit_stop, thd, flags);
   my_eof(thd);
   thd_proc_info(thd, "Waiting to finalize termination");
   pthread_mutex_lock(&LOCK_thread_count);
@@ -874,6 +936,7 @@
 err:
   thd_proc_info(thd, "Waiting to finalize termination");
   end_io_cache(&log);
+  RUN_HOOK(binlog_transmit, transmit_stop, thd, flags);
   /*
     Exclude  iteration through thread list
     this is needed for purge_logs() - it will iterate through
@@ -1134,6 +1197,7 @@
     goto err;
   }
 
+  RUN_HOOK(binlog_relay_io, after_reset_slave, thd, mi);
 err:
   unlock_slave_threads(mi);
   if (error)
@@ -1416,7 +1480,11 @@
                ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
     return 1;
   }
-  return mysql_bin_log.reset_logs(thd);
+
+  if (mysql_bin_log.reset_logs(thd))
+    return 1;
+  RUN_HOOK(binlog_transmit, after_reset_master, thd, 0 /* flags */);
+  return 0;
 }
 
 int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1923,5 +1991,3 @@
 }
 
 #endif /* HAVE_REPLICATION */
-
-



