List:Commits« Previous MessageNext Message »
From:He Zhenxing Date:September 12 2008 1:43am
Subject:bzr push into mysql-6.0-sea branch (hezx:2682) Bug#11714 Bug#20129 Bug#21579
Bug#26180 Bug#26687 Bug#27526 Bug#29825 Bug#30129 Bug#30394 Bug#31048
B...
View as plain text  
 2682 He Zhenxing	2008-09-12 [merge]
      Auto merge
      Update plugin.h.pp for WL#4398
removed:
  include/mysql_h.ic
  mysql-test/r/concurrent_innodb.result
  mysql-test/r/innodb-autoinc.result
  mysql-test/r/innodb_bug34300.result
  mysql-test/r/innodb_bug35220.result
  mysql-test/std_data/ndb_backup51_data_be/BACKUP-1-0.1.Data
  mysql-test/std_data/ndb_backup51_data_le/BACKUP-1-0.2.Data
  mysql-test/suite/rpl/t/rpl_view-slave.opt
  mysql-test/t/concurrent_innodb-master.opt
  mysql-test/t/concurrent_innodb.test
  mysql-test/t/innodb-autoinc.test
  mysql-test/t/innodb_bug34300.test
  mysql-test/t/innodb_bug35220.test
added:
  include/mysql.h.pp
  include/mysql/plugin.h.pp
  mysql-test/include/have_case_insensitive_file_system.inc
  mysql-test/include/have_lowercase2.inc
  mysql-test/include/ps_ddl_1.inc
  mysql-test/r/backup_db_grants.result
  mysql-test/r/backup_default.result
  mysql-test/r/backup_timeout.result
  mysql-test/r/backup_view_on_view.result
  mysql-test/r/case_insensitive_file_system.require
  mysql-test/r/concurrent_innodb_safelog.result
  mysql-test/r/concurrent_innodb_unsafelog.result
  mysql-test/r/innodb-autoinc.result
  mysql-test/r/innodb_bug34300.result
  mysql-test/r/innodb_bug35220.result
  mysql-test/r/skip_log_bin.result
  mysql-test/std_data/corrupt_t1#P#p1.MYI
  mysql-test/std_data/corrupt_t1.MYI
  mysql-test/std_data/ndb_backup51_data_be/BACKUP-1-0.1.Data
  mysql-test/std_data/ndb_backup51_data_le/BACKUP-1-0.2.Data
  mysql-test/std_data/parts/t1_will_crash#P#p1_first_1024.MYD
  mysql-test/std_data/parts/t1_will_crash#P#p2.MYD
  mysql-test/std_data/parts/t1_will_crash#P#p2.MYI
  mysql-test/std_data/parts/t1_will_crash#P#p3.MYI
  mysql-test/std_data/parts/t1_will_crash#P#p4.MYI
  mysql-test/std_data/parts/t1_will_crash#P#p6.MYD
  mysql-test/std_data/parts/t1_will_crash#P#p6_2.MYD
  mysql-test/std_data/parts/t1_will_crash#P#p6_3.MYD
  mysql-test/suite/parts/inc/partition_mgm.inc
  mysql-test/suite/parts/r/partition_mgm_lc0_archive.result
  mysql-test/suite/parts/r/partition_mgm_lc0_innodb.result
  mysql-test/suite/parts/r/partition_mgm_lc0_memory.result
  mysql-test/suite/parts/r/partition_mgm_lc0_myisam.result
  mysql-test/suite/parts/r/partition_mgm_lc0_ndb.result
  mysql-test/suite/parts/r/partition_mgm_lc1_archive.result
  mysql-test/suite/parts/r/partition_mgm_lc1_innodb.result
  mysql-test/suite/parts/r/partition_mgm_lc1_memory.result
  mysql-test/suite/parts/r/partition_mgm_lc1_myisam.result
  mysql-test/suite/parts/r/partition_mgm_lc1_ndb.result
  mysql-test/suite/parts/r/partition_mgm_lc2_archive.result
  mysql-test/suite/parts/r/partition_mgm_lc2_innodb.result
  mysql-test/suite/parts/r/partition_mgm_lc2_memory.result
  mysql-test/suite/parts/r/partition_mgm_lc2_myisam.result
  mysql-test/suite/parts/r/partition_mgm_lc2_ndb.result
  mysql-test/suite/parts/r/partition_recover_myisam.result
  mysql-test/suite/parts/r/partition_repair_myisam.result
  mysql-test/suite/parts/t/partition_mgm_lc0_archive.test
  mysql-test/suite/parts/t/partition_mgm_lc0_innodb.test
  mysql-test/suite/parts/t/partition_mgm_lc0_memory.test
  mysql-test/suite/parts/t/partition_mgm_lc0_myisam.test
  mysql-test/suite/parts/t/partition_mgm_lc0_ndb.test
  mysql-test/suite/parts/t/partition_mgm_lc1_archive-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc1_archive.test
  mysql-test/suite/parts/t/partition_mgm_lc1_innodb-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc1_innodb.test
  mysql-test/suite/parts/t/partition_mgm_lc1_memory-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc1_memory.test
  mysql-test/suite/parts/t/partition_mgm_lc1_myisam-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc1_myisam.test
  mysql-test/suite/parts/t/partition_mgm_lc1_ndb-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc1_ndb.test
  mysql-test/suite/parts/t/partition_mgm_lc2_archive-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc2_archive.test
  mysql-test/suite/parts/t/partition_mgm_lc2_innodb-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc2_innodb.test
  mysql-test/suite/parts/t/partition_mgm_lc2_memory-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc2_memory.test
  mysql-test/suite/parts/t/partition_mgm_lc2_myisam-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc2_myisam.test
  mysql-test/suite/parts/t/partition_mgm_lc2_ndb-master.opt
  mysql-test/suite/parts/t/partition_mgm_lc2_ndb.test
  mysql-test/suite/parts/t/partition_recover_myisam-master.opt
  mysql-test/suite/parts/t/partition_recover_myisam.test
  mysql-test/suite/parts/t/partition_repair_myisam.test
  mysql-test/suite/parts/t/partition_special_innodb-master.opt
  mysql-test/suite/rpl/r/rpl_plugin_load.result
  mysql-test/suite/rpl/t/rpl_plugin_load-master.opt
  mysql-test/suite/rpl/t/rpl_plugin_load-slave.opt
  mysql-test/suite/rpl/t/rpl_plugin_load.test
  mysql-test/suite/rpl/t/rpl_row_err_daisychain-master.opt
  mysql-test/suite/rpl/t/rpl_row_err_daisychain-slave.opt
  mysql-test/suite/rpl/t/rpl_truncate_7ndb_2-master.opt
  mysql-test/t/backup_db_grants.test
  mysql-test/t/backup_default.test
  mysql-test/t/backup_timeout.test
  mysql-test/t/backup_view_on_view.test
  mysql-test/t/concurrent_innodb_safelog-master.opt
  mysql-test/t/concurrent_innodb_safelog.test
  mysql-test/t/concurrent_innodb_unsafelog-master.opt
  mysql-test/t/concurrent_innodb_unsafelog.test
  mysql-test/t/innodb-autoinc.test
  mysql-test/t/innodb_bug34300.test
  mysql-test/t/innodb_bug35220.test
  mysql-test/t/skip_log_bin-master.opt
  mysql-test/t/skip_log_bin.test
  sql/mysql_priv.h.pp
  win/build-vs9.bat
  win/build-vs9_x64.bat
modified:
  .bzr-mysql/default.conf
  BUILD/check-cpu
  Makefile.am
  configure.in
  include/Makefile.am
  include/my_pthread.h
  include/my_sys.h
  include/myisam.h
  libmysql/CMakeLists.txt
  libmysql/dll.c
  libmysqld/lib_sql.cc
  mysql-test/Makefile.am
  mysql-test/extra/rpl_tests/rpl_ddl.test
  mysql-test/extra/rpl_tests/rpl_log.test
  mysql-test/include/concurrent.inc
  mysql-test/include/have_falcon.inc
  mysql-test/include/have_lowercase0.inc
  mysql-test/include/wait_for_slave_sql_error_and_skip.inc
  mysql-test/lib/mtr_report.pl
  mysql-test/r/backup_ddl_blocker.result
  mysql-test/r/backup_errors.result
  mysql-test/r/backup_security.result
  mysql-test/r/backup_views.result
  mysql-test/r/constraints.result
  mysql-test/r/csv.result
  mysql-test/r/federated.result
  mysql-test/r/func_regexp.result
  mysql-test/r/group_min_max.result
  mysql-test/r/handler_innodb.result
  mysql-test/r/have_utf8.require
  mysql-test/r/innodb-autoinc-optimize.result
  mysql-test/r/innodb.result
  mysql-test/r/innodb_mysql.result
  mysql-test/r/join.result
  mysql-test/r/log_tables.result
  mysql-test/r/lowercase0.require
  mysql-test/r/maria3.result
  mysql-test/r/metadata.result
  mysql-test/r/myisam.result
  mysql-test/r/partition.result
  mysql-test/r/partition_innodb.result
  mysql-test/r/partition_symlink.result
  mysql-test/r/ps_ddl.result
  mysql-test/r/show_check.result
  mysql-test/r/sp.result
  mysql-test/r/subselect.result
  mysql-test/r/subselect3.result
  mysql-test/r/subselect_no_mat.result
  mysql-test/r/subselect_no_opts.result
  mysql-test/r/subselect_no_semijoin.result
  mysql-test/r/symlink.result
  mysql-test/r/trigger-trans.result
  mysql-test/r/type_bit.result
  mysql-test/r/type_newdecimal.result
  mysql-test/suite/falcon/r/falcon_bug_22165.result
  mysql-test/suite/falcon/t/disabled.def
  mysql-test/suite/falcon/t/falcon_bug_22165.test
  mysql-test/suite/funcs_1/datadict/processlist_priv.inc
  mysql-test/suite/funcs_1/datadict/processlist_val.inc
  mysql-test/suite/funcs_1/r/processlist_priv_no_prot.result
  mysql-test/suite/funcs_1/r/processlist_priv_ps.result
  mysql-test/suite/funcs_1/r/processlist_val_no_prot.result
  mysql-test/suite/funcs_1/r/processlist_val_ps.result
  mysql-test/suite/jp/std_data/jisx0208_sjis2.dat
  mysql-test/suite/jp/t/disabled.def
  mysql-test/suite/ndb/r/ndb_partition_key.result
  mysql-test/suite/ndb/t/ndb_partition_key.test
  mysql-test/suite/parts/inc/partition_alter4.inc
  mysql-test/suite/parts/inc/partition_key_32col.inc
  mysql-test/suite/parts/r/partition_alter1_1_2_innodb.result
  mysql-test/suite/parts/r/partition_alter1_1_2_myisam.result
  mysql-test/suite/parts/r/partition_alter1_1_2_ndb.result
  mysql-test/suite/parts/r/partition_alter1_1_innodb.result
  mysql-test/suite/parts/r/partition_alter1_1_myisam.result
  mysql-test/suite/parts/r/partition_alter1_1_ndb.result
  mysql-test/suite/parts/r/partition_alter1_2_innodb.result
  mysql-test/suite/parts/r/partition_alter1_2_myisam.result
  mysql-test/suite/parts/r/partition_alter1_2_ndb.result
  mysql-test/suite/parts/r/partition_alter2_innodb.result
  mysql-test/suite/parts/r/partition_alter2_myisam.result
  mysql-test/suite/parts/r/partition_alter4_innodb.result
  mysql-test/suite/parts/r/partition_alter4_myisam.result
  mysql-test/suite/parts/r/partition_basic_innodb.result
  mysql-test/suite/parts/r/partition_basic_myisam.result
  mysql-test/suite/parts/r/partition_basic_symlink_myisam.result
  mysql-test/suite/parts/r/partition_engine_innodb.result
  mysql-test/suite/parts/r/partition_engine_myisam.result
  mysql-test/suite/parts/r/partition_special_innodb.result
  mysql-test/suite/parts/t/disabled.def
  mysql-test/suite/parts/t/partition_special_innodb.test
  mysql-test/suite/rpl/r/rpl_ddl.result
  mysql-test/suite/rpl/r/rpl_failed_optimize.result
  mysql-test/suite/rpl/r/rpl_innodb_mixed_dml.result
  mysql-test/suite/rpl/r/rpl_row_log.result
  mysql-test/suite/rpl/r/rpl_row_log_innodb.result
  mysql-test/suite/rpl/r/rpl_stm_log.result
  mysql-test/suite/rpl/t/disabled.def
  mysql-test/suite/rpl/t/rpl_stm_log-slave.opt
  mysql-test/t/backup_errors.test
  mysql-test/t/backup_views.test
  mysql-test/t/constraints.test
  mysql-test/t/csv.test
  mysql-test/t/disabled.def
  mysql-test/t/events_grant.test
  mysql-test/t/federated.test
  mysql-test/t/func_regexp.test
  mysql-test/t/group_min_max.test
  mysql-test/t/join.test
  mysql-test/t/log_tables.test
  mysql-test/t/lowercase_table3.test
  mysql-test/t/myisam.test
  mysql-test/t/partition.test
  mysql-test/t/partition_innodb.test
  mysql-test/t/partition_not_windows.test
  mysql-test/t/partition_symlink.test
  mysql-test/t/ps_ddl.test
  mysql-test/t/show_check.test
  mysql-test/t/sp.test
  mysql-test/t/subselect.test
  mysql-test/t/subselect3.test
  mysql-test/t/symlink.test
  mysql-test/t/type_bit.test
  mysql-test/t/type_newdecimal.test
  mysys/mf_pack.c
  mysys/my_static.c
  mysys/my_symlink.c
  mysys/safemalloc.c
  netware/BUILD/compile-linux-tools
  netware/BUILD/nwbootstrap
  netware/Makefile.am
  netware/mysql_install_db.c
  scripts/mysql_install_db.sh
  sql/backup/backup_aux.h
  sql/backup/backup_info.cc
  sql/backup/backup_info.h
  sql/backup/backup_test.cc
  sql/backup/be_default.cc
  sql/backup/image_info.cc
  sql/backup/image_info.h
  sql/backup/kernel.cc
  sql/backup/logger.cc
  sql/ddl_blocker.cc
  sql/field.h
  sql/ha_ndbcluster.cc
  sql/ha_partition.cc
  sql/ha_partition.h
  sql/handler.cc
  sql/handler.h
  sql/item.cc
  sql/item.h
  sql/item_cmpfunc.cc
  sql/mysql_priv.h
  sql/mysqld.cc
  sql/opt_range.cc
  sql/protocol.cc
  sql/set_var.cc
  sql/set_var.h
  sql/share/errmsg.txt
  sql/si_objects.cc
  sql/si_objects.h
  sql/sp_head.cc
  sql/sp_rcontext.cc
  sql/sp_rcontext.h
  sql/sql_class.cc
  sql/sql_class.h
  sql/sql_insert.cc
  sql/sql_parse.cc
  sql/sql_partition.cc
  sql/sql_plugin.cc
  sql/sql_select.cc
  sql/sql_show.cc
  sql/sql_table.cc
  sql/sql_yacc.yy
  sql/table.h
  storage/csv/ha_tina.cc
  storage/falcon/BackLog.cpp
  storage/falcon/CMakeLists.txt
  storage/falcon/Configuration.cpp
  storage/falcon/IO.cpp
  storage/falcon/Index.cpp
  storage/falcon/Index2RootPage.cpp
  storage/falcon/IndexRootPage.cpp
  storage/falcon/Interlock.h
  storage/falcon/MemMgr.cpp
  storage/falcon/RepositoryVolume.cpp
  storage/falcon/Statement.cpp
  storage/falcon/StorageDatabase.cpp
  storage/falcon/StorageDatabase.h
  storage/falcon/StorageTable.cpp
  storage/falcon/StorageTable.h
  storage/falcon/StorageTableShare.cpp
  storage/falcon/StorageTableShare.h
  storage/falcon/Table.cpp
  storage/falcon/Table.h
  storage/falcon/TableSpace.cpp
  storage/falcon/Transaction.cpp
  storage/falcon/Transaction.h
  storage/falcon/TransactionManager.cpp
  storage/falcon/ha_falcon.cpp
  storage/falcon/ha_falcon.h
  storage/falcon/plug.in
  storage/federated/ha_federated.cc
  storage/federated/ha_federated.h
  storage/innobase/ha/ha0ha.c
  storage/innobase/handler/ha_innodb.cc
  storage/innobase/lock/lock0lock.c
  storage/maria/CMakeLists.txt
  storage/myisam/mi_check.c
  storage/myisam/mi_create.c
  storage/myisam/mi_open.c
  storage/myisam/mi_static.c
  storage/myisam/myisamchk.c
  storage/myisam/myisamdef.h
  strings/ctype-simple.c
  strings/ctype-sjis.c
  strings/decimal.c
  tests/mysql_client_test.c

=== modified file 'include/mysql/plugin.h'
--- a/include/mysql/plugin.h	2008-06-28 11:00:59 +0000
+++ b/include/mysql/plugin.h	2008-09-11 14:36:17 +0000
@@ -16,6 +16,11 @@
 #ifndef _my_plugin_h
 #define _my_plugin_h
 
+/* size_t */
+#include <stdlib.h>
+
+typedef struct st_mysql MYSQL;
+
 #ifdef __cplusplus
 class THD;
 class Item;
@@ -66,7 +71,8 @@ typedef struct st_mysql_xid MYSQL_XID;
 #define MYSQL_DAEMON_PLUGIN          3  /* The daemon/raw plugin type */
 #define MYSQL_INFORMATION_SCHEMA_PLUGIN  4  /* The I_S plugin type */
 #define MYSQL_AUDIT_PLUGIN           5  /* The Audit plugin type        */
-#define MYSQL_MAX_PLUGIN_TYPE_NUM    6  /* The number of plugin types   */
+#define MYSQL_REPLICATION_PLUGIN     6	/* The replication plugin type */
+#define MYSQL_MAX_PLUGIN_TYPE_NUM    7  /* The number of plugin types   */
 
 /* We use the following strings to define licenses for plugins */
 #define PLUGIN_LICENSE_PROPRIETARY 0
@@ -461,6 +467,19 @@ struct handlerton;
 
 
 /*************************************************************************
+  API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
+*/
+#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
+
+/**
+   Replication plugin descriptor
+*/
+struct Mysql_replication {
+  int interface_version;
+};
+
+
+/*************************************************************************
   st_mysql_value struct for reading values from mysqld.
   Used by server variables framework to parse user-provided values.
   Will be used for arguments when implementing UDFs.
@@ -613,6 +632,64 @@ void mysql_query_cache_invalidate4(MYSQL
                                    const char *key, unsigned int key_length,
                                    int using_trx);
 
+/**
+   Get the value of user variable as an integer.
+
+   This function will return the value of variable @a name as an
+   integer. If the original value of the variable is not an integer,
+   the value will be converted into an integer.
+
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value);
+
+/**
+   Get the value of user variable as a double precision float number.
+
+   This function will return the value of variable @a name as real
+   number. If the original value of the variable is not a real number,
+   the value will be converted into a real number.
+
+   @param name     user variable name
+   @param value    pointer to return the value
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_real(const char *name,
+                      double *value, int *null_value);
+
+/**
+   Get the value of user variable as a string.
+
+   This function will return the value of variable @a name as
+   string. If the original value of the variable is not a string,
+   the value will be converted into a string.
+
+   @param name     user variable name
+   @param value    pointer to the value buffer
+   @param len      length of the value buffer
+   @param precision precision of the value if it is a float number
+   @param null_value if not NULL, the function will set it to true if
+   the value of variable is null, set to false if not
+
+   @retval 0 Success
+   @retval 1 Variable not found
+*/
+int get_user_var_str(const char *name,
+                     char *value, unsigned long len,
+                     unsigned int precision, int *null_value);
+
+  
 #ifdef __cplusplus
 }
 #endif

=== modified file 'include/mysql/plugin.h.pp'
--- a/include/mysql/plugin.h.pp	2008-07-11 12:09:05 +0000
+++ b/include/mysql/plugin.h.pp	2008-09-12 01:35:24 +0000
@@ -1,3 +1,5 @@
+#include <stdlib.h>
+typedef struct st_mysql MYSQL;
 struct st_mysql_lex_string
 {
   char *str;
@@ -106,6 +108,9 @@ struct st_mysql_storage_engine
   int interface_version;
 };
 struct handlerton;
+struct Mysql_replication {
+  int interface_version;
+};
 struct st_mysql_value
 {
   int (*value_type)(struct st_mysql_value *);
@@ -139,3 +144,10 @@ void thd_get_xid(const void* thd, MYSQL_
 void mysql_query_cache_invalidate4(void* thd,
                                    const char *key, unsigned int key_length,
                                    int using_trx);
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value);
+int get_user_var_real(const char *name,
+                      double *value, int *null_value);
+int get_user_var_str(const char *name,
+                     char *value, unsigned long len,
+                     unsigned int precision, int *null_value);

=== modified file 'libmysqld/CMakeLists.txt'
--- a/libmysqld/CMakeLists.txt	2008-07-09 07:12:43 +0000
+++ b/libmysqld/CMakeLists.txt	2008-09-11 14:36:17 +0000
@@ -203,6 +203,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libm
            ../sql/scheduler.cc ../sql/sql_audit.cc
            ../sql/ddl_blocker.cc ../sql/si_objects.cc
            ../sql/event_parse_data.cc ../sql/mdl.cc
+           ../sql/rpl_handler.cc
            ${GEN_SOURCES}
            ${LIB_SOURCES})
 

=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2008-07-09 07:12:43 +0000
+++ b/libmysqld/Makefile.am	2008-09-11 14:28:48 +0000
@@ -82,7 +82,8 @@ sqlsources = derror.cc field.cc field_co
 	sql_tablespace.cc \
 	rpl_injector.cc my_user.c partition_info.cc \
 	sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \
-        event_parse_data.cc mdl.cc
+        event_parse_data.cc mdl.cc \
+        rpl_handler.cc
 
 libmysqld_int_a_SOURCES= $(libmysqld_sources)
 nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)

=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt	2008-07-09 07:12:43 +0000
+++ b/sql/CMakeLists.txt	2008-09-11 14:28:48 +0000
@@ -78,6 +78,7 @@ ADD_EXECUTABLE(mysqld
                sql_connect.cc scheduler.cc 
                ddl_blocker.cc si_objects.cc
                sql_profile.cc event_parse_data.cc mdl.cc
+               rpl_handler.cc
                ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
                ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
                ${PROJECT_SOURCE_DIR}/include/mysqld_error.h

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2008-07-09 07:12:43 +0000
+++ b/sql/Makefile.am	2008-09-11 14:36:17 +0000
@@ -89,7 +89,8 @@ noinst_HEADERS =	item.h item_func.h item
 			sql_partition.h partition_info.h partition_element.h \
 			probes.h sql_audit.h \
 			contributors.h sql_servers.h ddl_blocker.h \
-			si_objects.h sql_plist.h mdl.h
+			si_objects.h sql_plist.h mdl.h \
+			rpl_handler.h replication.h
 
 mysqld_SOURCES =	sql_lex.cc sql_handler.cc sql_partition.cc \
 			item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -136,7 +137,8 @@ mysqld_SOURCES =	sql_lex.cc sql_handler.
 			sql_builtin.cc sql_tablespace.cc partition_info.cc \
 			sql_servers.cc sql_audit.cc sha2.cc \
 			ddl_blocker.cc si_objects.cc event_parse_data.cc \
-			mdl.cc
+			mdl.cc \
+			rpl_handler.cc
 
 if HAVE_DTRACE
   mysqld_SOURCES += probes.d

=== modified file 'sql/handler.cc'
--- a/sql/handler.cc	2008-09-03 14:40:19 +0000
+++ b/sql/handler.cc	2008-09-12 01:35:24 +0000
@@ -32,6 +32,8 @@
 #include "ha_partition.h"
 #endif
 
+#include "rpl_handler.h"
+
 /*
   While we have legacy_db_type, we have this array to
   check for dups and to find handlerton from legacy_db_type.
@@ -1144,6 +1146,7 @@ int ha_commit_trans(THD *thd, bool all)
     if (cookie)
       tc_log->unlog(cookie, xid);
     DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+    RUN_HOOK(transaction, after_commit, (thd, all));
 end:
     if (rw_trans)
       start_waiting_global_read_lock(thd);
@@ -1274,6 +1277,7 @@ int ha_rollback_trans(THD *thd, bool all
     push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                  ER_WARNING_NOT_COMPLETE_ROLLBACK,
                  ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
+  RUN_HOOK(transaction, after_rollback, (thd, all));
   DBUG_RETURN(error);
 }
 
@@ -1308,7 +1312,13 @@ int ha_autocommit_or_rollback(THD *thd, 
 
     thd->variables.tx_isolation=thd->session_tx_isolation;
   }
-
+  else
+  {
+    if (!thd->is_error())
+      RUN_HOOK(transaction, after_commit, (thd, 0));
+    else
+      RUN_HOOK(transaction, after_rollback, (thd, 0));
+  }
   DBUG_RETURN(error);
 }
 

=== modified file 'sql/log.cc'
--- a/sql/log.cc	2008-07-25 17:21:55 +0000
+++ b/sql/log.cc	2008-09-11 14:28:48 +0000
@@ -39,6 +39,7 @@
 #endif
 
 #include <mysql/plugin.h>
+#include "rpl_handler.h"
 
 /* max size of the log message */
 #define MAX_LOG_BUFFER_SIZE 1024
@@ -3588,9 +3589,11 @@ err:
 }
 
 
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
 {
   int err=0, fd=log_file.file;
+  if (synced)
+    *synced= 0;
   safe_mutex_assert_owner(&LOCK_log);
   if (flush_io_cache(&log_file))
     return 1;
@@ -3598,6 +3601,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
   {
     sync_binlog_counter= 0;
     err=my_sync(fd, MYF(MY_WME));
+    if (synced)
+      *synced= 1;
   }
   return err;
 }
@@ -3862,7 +3867,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
 
     if (file == &log_file)
     {
-      error= flush_and_sync();
+      error= flush_and_sync(0);
       if (!error)
       {
         signal_update();
@@ -4048,14 +4053,21 @@ bool MYSQL_BIN_LOG::write(Log_event *eve
     if (event_info->write(file))
       goto err;
 
+    error=0;
     if (file == &log_file) // we are writing to the real log (disk)
     {
-      if (flush_and_sync())
+      bool synced;
+      if (flush_and_sync(&synced))
 	goto err;
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+                   (thd, log_file_name, file->pos_in_file, synced))) {
+        goto err;
+      }
+
       signal_update();
       rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
     }
-    error=0;
 
 err:
     if (error)
@@ -4312,7 +4324,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE 
   DBUG_ASSERT(carry == 0);
 
   if (sync_log)
-    flush_and_sync();
+    flush_and_sync(0);
 
   return 0;                                     // All OK
 }
@@ -4398,7 +4410,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
 
       if (commit_event && commit_event->write(&log_file))
         goto err;
-      if (flush_and_sync())
+      
+      bool synced;
+      if (flush_and_sync(&synced))
         goto err;
       DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT(););
       if (cache->error)				// Error on read
@@ -4407,6 +4421,11 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
         write_error=1;				// Don't give more errors
         goto err;
       }
+
+      if (RUN_HOOK(binlog_storage, after_flush,
+                   (thd, log_file_name, log_file.pos_in_file, synced)))
+        goto err;
+
       signal_update();
     }
 

=== modified file 'sql/log.h'
--- a/sql/log.h	2008-06-27 20:56:54 +0000
+++ b/sql/log.h	2008-09-11 14:28:48 +0000
@@ -364,7 +364,21 @@ public:
   bool is_active(const char* log_file_name);
   int update_log_index(LOG_INFO* linfo, bool need_update_threads);
   void rotate_and_purge(uint flags);
-  bool flush_and_sync();
+
+  /**
+     Flush binlog cache and synchronize to disk.
+
+     This function flushes events in binlog cache to binary log file,
+     it will do synchronizing according to the setting of system
+     variable 'sync_binlog'. If file is synchronized, @c synced will
+     be set to 1, otherwise 0.
+
+     @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
+
+     @retval 0 Success
+     @retval other Failure
+  */
+  bool flush_and_sync(bool *synced);
   int purge_logs(const char *to_log, bool included,
                  bool need_mutex, bool need_update_threads,
                  ulonglong *decrease_log_space);

=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc	2008-08-26 15:01:13 +0000
+++ b/sql/mysqld.cc	2008-09-12 01:35:24 +0000
@@ -33,6 +33,8 @@
 
 #include "rpl_injector.h"
 
+#include "rpl_handler.h"
+
 #ifdef HAVE_SYS_PRCTL_H
 #include <sys/prctl.h>
 #endif
@@ -1348,6 +1350,7 @@ void clean_up(bool print_message)
   ha_end();
   if (tc_log)
     tc_log->close();
+  delegates_destroy();
   xid_cache_free();
   delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
   multi_keycache_free();
@@ -3905,6 +3908,13 @@ static int init_server_components()
     unireg_abort(1);
   }
 
+  /* initialize delegates for extension observers */
+  if (delegates_init())
+  {
+    sql_print_error("Initialize extension delegates failed");
+    unireg_abort(1);
+  }
+
   /* need to configure logging before initializing storage engines */
   if (opt_update_log)
   {

=== added file 'sql/replication.h'
--- a/sql/replication.h	1970-01-01 00:00:00 +0000
+++ b/sql/replication.h	2008-09-11 14:36:17 +0000
@@ -0,0 +1,461 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef REPLICATION_H
+#define REPLICATION_H
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+   Transaction observer flags.
+*/
+enum Trans_flags {
+  /** Transaction is a real transaction */
+  TRANS_IS_REAL_TRANS = 1
+};
+
+/**
+   Transaction observer parameter
+*/
+typedef struct Trans_param {
+  uint32 server_id;
+  uint32 flags;
+
+  /*
+    The latest binary log file name and position written by current
+    transaction, if binary log is disabled or no log event has been
+    written into binary log file by current transaction (events
+    written into transaction log cache are not counted), these two
+    member will be zero.
+  */
+  const char *log_file;
+  my_off_t log_pos;
+} Trans_param;
+
+/**
+   Observes and extends transaction execution
+*/
+typedef struct Trans_observer {
+  uint32 len;
+
+  /**
+     This callback is called after transaction commit
+     
+     This callback is called right after commit to storage engines for
+     transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     succeeded.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_commit)(Trans_param *param);
+
+  /**
+     This callback is called after transaction rollback
+
+     This callback is called right after rollback to storage engines
+     for transactional tables.
+
+     For non-transactional tables, this is called at the end of the
+     statement, before sending statement status, if the statement
+     failed.
+
+     @note The return value is currently ignored by the server.
+
+     @param param The parameter for transaction observers
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_rollback)(Trans_param *param);
+} Trans_observer;
+
+/**
+   Binlog storage flags
+*/
+enum Binlog_storage_flags {
+  /** Binary log was sync:ed */
+  BINLOG_STORAGE_IS_SYNCED = 1
+};
+
+/**
+   Binlog storage observer parameters
+ */
+typedef struct Binlog_storage_param {
+  uint32 server_id;
+} Binlog_storage_param;
+
+/**
+   Observe binlog logging storage
+*/
+typedef struct Binlog_storage_observer {
+  uint32 len;
+
+  /**
+     This callback is called after binlog has been flushed
+
+     This callback is called after cached events have been flushed to
+     binary log file. Whether the binary log file is synchronized to
+     disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags.
+
+     @param param Observer common parameter
+     @param log_file Binlog file name been updated
+     @param log_pos Binlog position after update
+     @param flags flags for binlog storage
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_flush)(Binlog_storage_param *param,
+                     const char *log_file, my_off_t log_pos,
+                     uint32 flags);
+} Binlog_storage_observer;
+
+/**
+   Replication binlog transmitter (binlog dump) observer parameter.
+*/
+typedef struct Binlog_transmit_param {
+  uint32 server_id;
+  uint32 flags;
+} Binlog_transmit_param;
+
+/**
+   Observe and extends the binlog dumping thread.
+*/
+typedef struct Binlog_transmit_observer {
+  uint32 len;
+  
+  /**
+     This callback is called when binlog dumping starts
+
+
+     @param param Observer common parameter
+     @param log_file Binlog file name to transmit from
+     @param log_pos Binlog position to transmit from
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_start)(Binlog_transmit_param *param,
+                        const char *log_file, my_off_t log_pos);
+
+  /**
+     This callback is called when binlog dumping stops
+
+     @param param Observer common parameter
+     
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*transmit_stop)(Binlog_transmit_param *param);
+
+  /**
+     This callback is called to reserve bytes in packet header for event transmission
+
+     This callback is called when resetting transmit packet header to
+     reserve bytes for this observer in packet header.
+
+     The @a header buffer is allocated by the server code, and @a size
+     is the size of the header buffer. Each observer can only reserve
+     a maximum size of @a size in the header.
+
+     @param param Observer common parameter
+     @param header Pointer of the header buffer
+     @param size Size of the header buffer
+     @param len Header length reserved by this observer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*reserve_header)(Binlog_transmit_param *param,
+                        unsigned char *header,
+                        unsigned long size,
+                        unsigned long *len);
+
+  /**
+     This callback is called before sending an event packet to slave
+
+     @param param Observer common parameter
+     @param packet Binlog event packet to send
+     @param len Length of the event packet
+     @param log_file Binlog file name of the event packet to send
+     @param log_pos Binlog position of the event packet to send
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_send_event)(Binlog_transmit_param *param,
+                           unsigned char *packet, unsigned long len,
+                           const char *log_file, my_off_t log_pos );
+
+  /**
+     This callback is called after sending an event packet to slave
+
+     @param param Observer common parameter
+     @param event_buf Binlog event packet buffer sent
+     @param len length of the event packet buffer
+
+     @retval 0 Sucess
+     @retval 1 Failure
+   */
+  int (*after_send_event)(Binlog_transmit_param *param,
+                          const char *event_buf, unsigned long len);
+
+  /**
+     This callback is called after resetting master status
+
+     This is called when executing the command RESET MASTER, and is
+     used to reset status variables added by observers.
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_master)(Binlog_transmit_param *param);
+} Binlog_transmit_observer;
+
+/**
+   Binlog relay IO flags
+*/
+enum Binlog_relay_IO_flags {
+  /** Binary relay log was sync:ed */
+  BINLOG_RELAY_IS_SYNCED = 1
+};
+
+
+/**
+  Replication binlog relay IO observer parameter
+*/
+typedef struct Binlog_relay_IO_param {
+  uint32 server_id;
+
+  /* Master host, user and port */
+  char *host;
+  char *user;
+  unsigned int port;
+
+  char *master_log_name;
+  my_off_t master_log_pos;
+
+  MYSQL *mysql;                        /* the connection to master */
+} Binlog_relay_IO_param;
+
+/**
+   Observes and extends the service of slave IO thread.
+*/
+typedef struct Binlog_relay_IO_observer {
+  uint32 len;
+
+  /**
+     This callback is called when slave IO thread starts
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_start)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called when slave IO thread stops
+
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*thread_stop)(Binlog_relay_IO_param *param);
+
+  /**
+     This callback is called before slave requesting binlog transmission from master
+
+     This is called before slave issuing BINLOG_DUMP command to master
+     to request binlog.
+
+     @param param Observer common parameter
+     @param flags binlog dump flags
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags);
+
+  /**
+     This callback is called after read an event packet from master
+
+     @param param Observer common parameter
+     @param packet The event packet read from master
+     @param len Length of the event packet read from master
+     @param event_buf The event packet return after process
+     @param event_len The length of event packet return after process
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_read_event)(Binlog_relay_IO_param *param,
+                          const char *packet, unsigned long len,
+                          const char **event_buf, unsigned long *event_len);
+
+  /**
+     This callback is called after written an event packet to relay log
+
+     @param param Observer common parameter
+     @param event_buf Event packet written to relay log
+     @param event_len Length of the event packet written to relay log
+     @param flags flags for relay log
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_queue_event)(Binlog_relay_IO_param *param,
+                           const char *event_buf, unsigned long event_len,
+                           uint32 flags);
+
+  /**
+     This callback is called after reset slave relay log IO status
+     
+     @param param Observer common parameter
+
+     @retval 0 Sucess
+     @retval 1 Failure
+  */
+  int (*after_reset_slave)(Binlog_relay_IO_param *param);
+} Binlog_relay_IO_observer;
+
+
+/**
+   Register a transaction observer
+
+   @param observer The transaction observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Unregister a transaction observer
+
+   @param observer The transaction observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_trans_observer(Trans_observer *observer, void *p);
+
+/**
+   Register a binlog storage observer
+
+   @param observer The binlog storage observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Unregister a binlog storage observer
+
+   @param observer The binlog storage observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p);
+
+/**
+   Register a binlog transmit observer
+
+   @param observer The binlog transmit observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Unregister a binlog transmit observer
+
+   @param observer The binlog transmit observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p);
+
+/**
+   Register a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to register
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer already exists
+*/
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Unregister a binlog relay IO (slave IO thread) observer
+
+   @param observer The binlog relay IO observer to unregister
+   @param p pointer to the internal plugin structure
+
+   @retval 0 Sucess
+   @retval 1 Observer not exists
+*/
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p);
+
+/**
+   Connect to master
+
+   This function can only used in the slave I/O thread context, and
+   will use the same master information to do the connection.
+
+   @code
+   MYSQL *mysql = mysql_init(NULL);
+   if (rpl_connect_master(mysql))
+   {
+     // do stuff with the connection
+   }
+   mysql_close(mysql); // close the connection
+   @endcode
+   
+   @param mysql address of MYSQL structure to use, pass NULL will
+   create a new one
+
+   @return address of MYSQL structure on success, NULL on failure
+*/
+MYSQL *rpl_connect_master(MYSQL *mysql);
+
+#ifdef __cplusplus
+}
+#endif
+#endif /* REPLICATION_H */

=== added file 'sql/rpl_handler.cc'
--- a/sql/rpl_handler.cc	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.cc	2008-09-11 14:36:17 +0000
@@ -0,0 +1,485 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#include "mysql_priv.h"
+
+#include "rpl_mi.h"
+#include "sql_repl.h"
+#include "log_event.h"
+#include "rpl_filter.h"
+#include <my_dir.h>
+#include "rpl_handler.h"
+
+Trans_delegate *transaction_delegate;
+Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+Binlog_transmit_delegate *binlog_transmit_delegate;
+Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+  structure to save transaction log filename and position
+*/
+typedef struct Trans_binlog_info {
+  my_off_t log_pos;
+  char log_file[FN_REFLEN];
+} Trans_binlog_info;
+
+static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+int get_user_var_int(const char *name,
+                     long long int *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_int(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_real(const char *name,
+                      double *value, int *null_value)
+{
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  *value= entry->val_real(&null_val);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int get_user_var_str(const char *name, char *value,
+                     size_t len, unsigned int precision, int *null_value)
+{
+  String str;
+  my_bool null_val;
+  user_var_entry *entry= 
+    (user_var_entry*) hash_search(&current_thd->user_vars,
+                                  (uchar*) name, strlen(name));
+  if (!entry)
+    return 1;
+  entry->val_str(&null_val, &str, precision);
+  strncpy(value, str.c_ptr(), len);
+  if (null_value)
+    *null_value= null_val;
+  return 0;
+}
+
+int delegates_init()
+{
+  static unsigned char trans_mem[sizeof(Trans_delegate)];
+  static unsigned char storage_mem[sizeof(Binlog_storage_delegate)];
+#ifdef HAVE_REPLICATION
+  static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)];
+  static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)];
+#endif
+  
+  if (!(transaction_delegate= new (trans_mem) Trans_delegate)
+      || (!transaction_delegate->is_inited())
+      || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate)
+      || (!binlog_storage_delegate->is_inited())
+#ifdef HAVE_REPLICATION
+      || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate)
+      || (!binlog_transmit_delegate->is_inited())
+      || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate)
+      || (!binlog_relay_io_delegate->is_inited())
+#endif /* HAVE_REPLICATION */
+      )
+    return 1;
+
+  if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL))
+    return 1;
+  return 0;
+}
+
+void delegates_destroy()
+{
+  transaction_delegate->~Trans_delegate();
+  binlog_storage_delegate->~Binlog_storage_delegate();
+#ifdef HAVE_REPLICATION
+  binlog_transmit_delegate->~Binlog_transmit_delegate();
+  binlog_relay_io_delegate->~Binlog_relay_IO_delegate();
+#endif /* HAVE_REPLICATION */
+}
+
+/*
+  This macro is used by almost all the Delegate methods to iterate
+  over all the observers running given callback function of the
+  delegate .
+  
+  Add observer plugins to the thd->lex list, after each statement, all
+  plugins add to thd->lex will be automatically unlocked.
+ */
+#define FOREACH_OBSERVER(r, f, thd, args)                               \
+  param.server_id= thd->server_id;                                      \
+  read_lock();                                                          \
+  Observer_info_iterator iter= observer_info_iter();                    \
+  Observer_info *info= iter++;                                          \
+  for (; info; info= iter++)                                            \
+  {                                                                     \
+    plugin_ref plugin=                                                  \
+      my_plugin_lock(thd, &info->plugin);                               \
+    if (!plugin)                                                        \
+    {                                                                   \
+      r= 1;                                                             \
+      break;                                                            \
+    }                                                                   \
+    if (((Observer *)info->observer)->f                                 \
+        && ((Observer *)info->observer)->f args)                        \
+    {                                                                   \
+      r= 1;                                                             \
+      break;                                                            \
+    }                                                                   \
+  }                                                                     \
+  unlock()
+
+
+int Trans_delegate::after_commit(THD *thd, bool all)
+{
+  Trans_param param;
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
+}
+
+int Trans_delegate::after_rollback(THD *thd, bool all)
+{
+  Trans_param param;
+  bool is_real_trans= (all || thd->transaction.all.ha_list == 0);
+  if (is_real_trans)
+    param.flags |= TRANS_IS_REAL_TRANS;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  param.log_file= log_info ? log_info->log_file : 0;
+  param.log_pos= log_info ? log_info->log_pos : 0;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_commit, thd, (&param));
+
+  /*
+    This is the end of a real transaction or autocommit statement, we
+    can free the memory allocated for binlog file and position.
+  */
+  if (is_real_trans && log_info)
+  {
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL);
+    my_free(log_info, MYF(0));
+  }
+  return ret;
+}
+
+int Binlog_storage_delegate::after_flush(THD *thd,
+                                         const char *log_file,
+                                         my_off_t log_pos,
+                                         bool synced)
+{
+  Binlog_storage_param param;
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  Trans_binlog_info *log_info=
+    my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO);
+    
+  if (!log_info)
+  {
+    if(!(log_info=
+         (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0))))
+      return 1;
+    my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info);
+  }
+    
+  strcpy(log_info->log_file, log_file+dirname_length(log_file));
+  log_info->log_pos = log_pos;
+  
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_flush, thd,
+                   (&param, log_info->log_file, log_info->log_pos, flags));
+  return ret;
+}
+
+#ifdef HAVE_REPLICATION
+int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags,
+                                             const char *log_file,
+                                             my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_start, thd, (&param, log_file, log_pos));
+  return ret;
+}
+
+int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, transmit_stop, thd, (&param));
+  return ret;
+}
+
+int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags,
+                                             String *packet)
+{
+  /* NOTE2ME: Maximum extra header size for each observer, I hope 32
+     bytes should be enough for each Observer to reserve their extra
+     header. If later found this is not enough, we can increase this
+     /HEZX
+  */
+#define RESERVE_HEADER_SIZE 32
+  unsigned char header[RESERVE_HEADER_SIZE];
+  ulong hlen;
+  Binlog_transmit_param param;
+  param.flags= flags;
+  param.server_id= thd->server_id;
+
+  int ret= 0;
+  read_lock();
+  Observer_info_iterator iter= observer_info_iter();
+  Observer_info *info= iter++;
+  for (; info; info= iter++)
+  {
+    plugin_ref plugin=
+      my_plugin_lock(thd, &info->plugin);
+    if (!plugin)
+    {
+      ret= 1;
+      break;
+    }
+    hlen= 0;
+    if (((Observer *)info->observer)->reserve_header
+        && ((Observer *)info->observer)->reserve_header(&param,
+                                                        header,
+                                                        RESERVE_HEADER_SIZE,
+                                                        &hlen))
+    {
+      ret= 1;
+      break;
+    }
+    if (hlen == 0)
+      continue;
+    if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen))
+    {
+      ret= 1;
+      break;
+    }
+  }
+  unlock();
+  return ret;
+}
+
+int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags,
+                                                String *packet,
+                                                const char *log_file,
+                                                my_off_t log_pos)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_send_event, thd,
+                   (&param, (uchar *)packet->c_ptr(),
+                    packet->length(),
+                    log_file+dirname_length(log_file), log_pos));
+  return ret;
+}
+
+int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags,
+                                               String *packet)
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_send_event, thd,
+                   (&param, packet->c_ptr(), packet->length()));
+  return ret;
+}
+
+int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags)
+
+{
+  Binlog_transmit_param param;
+  param.flags= flags;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_master, thd, (&param));
+  return ret;
+}
+
+void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param,
+                                          Master_info *mi)
+{
+  param->mysql= mi->mysql;
+  param->user= mi->user;
+  param->host= mi->host;
+  param->port= mi->port;
+  param->master_log_name= mi->master_log_name;
+  param->master_log_pos= mi->master_log_pos;
+}
+
+int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_start, thd, (&param));
+  return ret;
+}
+
+
+int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi)
+{
+
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, thread_stop, thd, (&param));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::before_request_transmit(THD *thd,
+                                                      Master_info *mi,
+                                                      ushort flags)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, before_request_transmit, thd, (&param, (uint32)flags));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi,
+                                               const char *packet, ulong len,
+                                               const char **event_buf,
+                                               ulong *event_len)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_read_event, thd,
+                   (&param, packet, len, event_buf, event_len));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi,
+                                                const char *event_buf,
+                                                ulong event_len,
+                                                bool synced)
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  uint32 flags=0;
+  if (synced)
+    flags |= BINLOG_STORAGE_IS_SYNCED;
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_queue_event, thd,
+                   (&param, event_buf, event_len, flags));
+  return ret;
+}
+
+int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi)
+
+{
+  Binlog_relay_IO_param param;
+  init_param(&param, mi);
+
+  int ret= 0;
+  FOREACH_OBSERVER(ret, after_reset_slave, thd, (&param));
+  return ret;
+}
+#endif /* HAVE_REPLICATION */
+
+int register_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_trans_observer(Trans_observer *observer, void *p)
+{
+  return transaction_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p)
+{
+  return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+#ifdef HAVE_REPLICATION
+int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p)
+{
+  return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+
+int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p);
+}
+
+int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p)
+{
+  return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p);
+}
+#endif /* HAVE_REPLICATION */

=== added file 'sql/rpl_handler.h'
--- a/sql/rpl_handler.h	1970-01-01 00:00:00 +0000
+++ b/sql/rpl_handler.h	2008-09-11 14:36:17 +0000
@@ -0,0 +1,213 @@
+/* Copyright (C) 2008 MySQL AB
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef RPL_HANDLER_H
+#define RPL_HANDLER_H
+
+#include "mysql_priv.h"
+#include "rpl_mi.h"
+#include "rpl_rli.h"
+#include "sql_plugin.h"
+#include "replication.h"
+
+class Observer_info {
+public:
+  void *observer;
+  st_plugin_int *plugin_int;
+  plugin_ref plugin;
+
+  Observer_info(void *ob, st_plugin_int *p)
+    :observer(ob), plugin_int(p)
+  {
+    plugin= plugin_int_to_ref(plugin_int);
+  }
+};
+
+class Delegate {
+public:
+  typedef List<Observer_info> Observer_info_list;
+  typedef List_iterator<Observer_info> Observer_info_iterator;
+  
+  int add_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (!info)
+    {
+      info= new Observer_info(observer, plugin);
+      if (!info || observer_info_list.push_back(info, &memroot))
+        ret= TRUE;
+    }
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+  
+  int remove_observer(void *observer, st_plugin_int *plugin)
+  {
+    int ret= FALSE;
+    if (!inited)
+      return TRUE;
+    write_lock();
+    Observer_info_iterator iter(observer_info_list);
+    Observer_info *info= iter++;
+    while (info && info->observer != observer)
+      info= iter++;
+    if (info)
+      iter.remove();
+    else
+      ret= TRUE;
+    unlock();
+    return ret;
+  }
+
+  inline Observer_info_iterator observer_info_iter()
+  {
+    return Observer_info_iterator(observer_info_list);
+  }
+
+  inline bool is_empty()
+  {
+    return observer_info_list.is_empty();
+  }
+
+  inline int read_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_rdlock(&lock);
+  }
+
+  inline int write_lock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_wrlock(&lock);
+  }
+
+  inline int unlock()
+  {
+    if (!inited)
+      return TRUE;
+    return rw_unlock(&lock);
+  }
+
+  inline bool is_inited()
+  {
+    return inited;
+  }
+  
+  Delegate()
+  {
+    inited= FALSE;
+    if (my_rwlock_init(&lock, NULL))
+      return;
+    init_sql_alloc(&memroot, 1024, 0);
+    inited= TRUE;
+  }
+  ~Delegate()
+  {
+    inited= FALSE;
+    rwlock_destroy(&lock);
+    free_root(&memroot, MYF(0));
+  }
+
+private:
+  Observer_info_list observer_info_list;
+  rw_lock_t lock;
+  MEM_ROOT memroot;
+  bool inited;
+};
+
+class Trans_delegate
+  :public Delegate {
+public:
+  typedef Trans_observer Observer;
+  int before_commit(THD *thd, bool all);
+  int before_rollback(THD *thd, bool all);
+  int after_commit(THD *thd, bool all);
+  int after_rollback(THD *thd, bool all);
+};
+
+class Binlog_storage_delegate
+  :public Delegate {
+public:
+  typedef Binlog_storage_observer Observer;
+  int after_flush(THD *thd, const char *log_file,
+                  my_off_t log_pos, bool synced);
+};
+
+#ifdef HAVE_REPLICATION
+class Binlog_transmit_delegate
+  :public Delegate {
+public:
+  typedef Binlog_transmit_observer Observer;
+  int transmit_start(THD *thd, ushort flags,
+                     const char *log_file, my_off_t log_pos);
+  int transmit_stop(THD *thd, ushort flags);
+  int reserve_header(THD *thd, ushort flags, String *packet);
+  int before_send_event(THD *thd, ushort flags,
+                        String *packet, const
+                        char *log_file, my_off_t log_pos );
+  int after_send_event(THD *thd, ushort flags,
+                       String *packet);
+  int after_reset_master(THD *thd, ushort flags);
+};
+
+class Binlog_relay_IO_delegate
+  :public Delegate {
+public:
+  typedef Binlog_relay_IO_observer Observer;
+  int thread_start(THD *thd, Master_info *mi);
+  int thread_stop(THD *thd, Master_info *mi);
+  int before_request_transmit(THD *thd, Master_info *mi, ushort flags);
+  int after_read_event(THD *thd, Master_info *mi,
+                       const char *packet, ulong len,
+                       const char **event_buf, ulong *event_len);
+  int after_queue_event(THD *thd, Master_info *mi,
+                        const char *event_buf, ulong event_len,
+                        bool synced);
+  int after_reset_slave(THD *thd, Master_info *mi);
+private:
+  void init_param(Binlog_relay_IO_param *param, Master_info *mi);
+};
+#endif /* HAVE_REPLICATION */
+
+int delegates_init();
+void delegates_destroy();
+
+extern Trans_delegate *transaction_delegate;
+extern Binlog_storage_delegate *binlog_storage_delegate;
+#ifdef HAVE_REPLICATION
+extern Binlog_transmit_delegate *binlog_transmit_delegate;
+extern Binlog_relay_IO_delegate *binlog_relay_io_delegate;
+#endif /* HAVE_REPLICATION */
+
+/*
+  if there is no observers in the delegate, we can return 0
+  immediately.
+*/
+#define RUN_HOOK(group, hook, args)             \
+  (group ##_delegate->is_empty() ?              \
+   0 : group ##_delegate->hook args)
+
+#endif /* RPL_HANDLER_H */

=== modified file 'sql/slave.cc'
--- a/sql/slave.cc	2008-07-21 03:55:09 +0000
+++ b/sql/slave.cc	2008-09-11 14:36:17 +0000
@@ -39,6 +39,7 @@
 #include <sql_common.h>
 #include <errmsg.h>
 #include <mysys_err.h>
+#include "rpl_handler.h"
 
 #ifdef HAVE_REPLICATION
 
@@ -67,6 +68,8 @@ ulonglong relay_log_space_limit = 0;
 int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
 int events_till_abort = -1;
 
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
 enum enum_slave_reconnect_actions
 {
   SLAVE_RECON_ACT_REG= 0,
@@ -1499,17 +1502,22 @@ static int safe_sleep(THD* thd, int sec,
 }
 
 
-static int request_dump(MYSQL* mysql, Master_info* mi,
-                        bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+			bool *suppress_warnings)
 {
   uchar buf[FN_REFLEN + 10];
   int len;
-  int binlog_flags = 0; // for now
+  ushort binlog_flags = 0; // for now
   char* logname = mi->master_log_name;
   DBUG_ENTER("request_dump");
   
   *suppress_warnings= FALSE;
 
+  if (RUN_HOOK(binlog_relay_io,
+               before_request_transmit,
+               (thd, mi, binlog_flags)))
+    DBUG_RETURN(1);
+  
   // TODO if big log files: Change next to int8store()
   int4store(buf, (ulong) mi->master_log_pos);
   int2store(buf + 4, binlog_flags);
@@ -2134,6 +2142,12 @@ pthread_handler_t handle_slave_io(void *
                             mi->master_log_name,
                             llstr(mi->master_log_pos,llbuff)));
 
+  /* This must be called before run any binlog_relay_io hooks */
+  my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+  if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+    goto err;
+
   if (!(mi->mysql = mysql = mysql_init(NULL)))
   {
     mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2208,7 +2222,7 @@ connected:
   while (!io_slave_killed(thd,mi))
   {
     thd_proc_info(thd, "Requesting binlog dump");
-    if (request_dump(mysql, mi, &suppress_warnings))
+    if (request_dump(thd, mysql, mi, &suppress_warnings))
     {
       sql_print_error("Failed on request_dump()");
       if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2228,6 +2242,7 @@ requesting master dump") ||
           goto err;
         goto connected;
       });
+    const char *event_buf;
 
     while (!io_slave_killed(thd,mi))
     {
@@ -2283,10 +2298,27 @@ Stopping slave I/O thread due to out-of-
 
       retry_count=0;                    // ok event, reset retry counter
       thd_proc_info(thd, "Queueing master event to the relay log");
-      if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
+      event_buf= (const char*)mysql->net.read_pos + 1;
+      if (RUN_HOOK(binlog_relay_io, after_read_event,
+                   (thd, mi,(const char*)mysql->net.read_pos + 1,
+                    event_len, &event_buf, &event_len)))
       {
+        sql_print_error("Failed to run 'after_read_event' hook");
         goto err;
       }
+
+      /* XXX: 'synced' should be updated by queue_event to indicate
+         whether event has been synced to disk */
+      bool synced= 0;
+      if (queue_event(mi, event_buf, event_len))
+      {
+        goto err;
+      }
+
+      if (RUN_HOOK(binlog_relay_io, after_queue_event,
+                   (thd, mi, event_buf, event_len, synced)))
+        goto err;
+
       if (flush_master_info(mi, 1))
       {
         sql_print_error("Failed to flush master info file");
@@ -2304,6 +2336,7 @@ Stopping slave I/O thread due to out-of-
         for no reason, but this function will do a clean read, notice the clean
         value and exit immediately.
       */
+
 #ifndef DBUG_OFF
       {
         char llbuf1[22], llbuf2[22];
@@ -2333,6 +2366,7 @@ err:
   sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
                   IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
   pthread_mutex_lock(&LOCK_thread_count);
+  RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
   thd->query = thd->db = 0; // extra safety
   thd->query_length= thd->db_length= 0;
   pthread_mutex_unlock(&LOCK_thread_count);
@@ -3453,6 +3487,64 @@ static int safe_reconnect(THD* thd, MYSQ
 }
 
 
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+  THD *thd= current_thd;
+  Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+  if (!mi)
+  {
+    sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+    return NULL;
+  }
+
+  bool allocated= false;
+  
+  if (!mysql)
+  {
+    if(!(mysql= mysql_init(NULL)))
+      return NULL;
+    allocated= true;
+  }
+
+  /*
+    XXX: copied from connect_to_master, this function should not
+    change the slave status, so we cannot use connect_to_master
+    directly
+    
+    TODO: make this part a seperate function to eliminate duplication
+  */
+  mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+  mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+  if (mi->ssl)
+  {
+    mysql_ssl_set(mysql,
+                  mi->ssl_key[0]?mi->ssl_key:0,
+                  mi->ssl_cert[0]?mi->ssl_cert:0,
+                  mi->ssl_ca[0]?mi->ssl_ca:0,
+                  mi->ssl_capath[0]?mi->ssl_capath:0,
+                  mi->ssl_cipher[0]?mi->ssl_cipher:0);
+    mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+                  &mi->ssl_verify_server_cert);
+  }
+#endif
+
+  mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+  /* This one is not strictly needed but we have it here for completeness */
+  mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+  if (io_slave_killed(thd, mi)
+      || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+                             mi->port, 0, 0))
+  {
+    if (allocated)
+      mysql_close(mysql);                       // this will free the object
+    return NULL;
+  }
+  return mysql;
+}
+
 /*
   Store the file and position where the execute-slave thread are in the
   relay log.

=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc	2008-09-04 13:46:04 +0000
+++ b/sql/sql_parse.cc	2008-09-12 01:35:24 +0000
@@ -21,6 +21,7 @@
 #include <m_ctype.h>
 #include <myisam.h>
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 #include "sp_head.h"
 #include "sp.h"

=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc	2008-08-19 15:58:48 +0000
+++ b/sql/sql_plugin.cc	2008-09-12 01:35:24 +0000
@@ -20,14 +20,6 @@
 #define REPORT_TO_LOG  1
 #define REPORT_TO_USER 2
 
-#ifdef DBUG_OFF
-#define plugin_ref_to_int(A) A
-#define plugin_int_to_ref(A) A
-#else
-#define plugin_ref_to_int(A) (A ? A[0] : NULL)
-#define plugin_int_to_ref(A) &(A)
-#endif
-
 extern struct st_mysql_plugin *mysqld_builtins[];
 
 char *opt_plugin_load= NULL;
@@ -44,7 +36,8 @@ const LEX_STRING plugin_type_names[MYSQL
   { C_STRING_WITH_LEN("FTPARSER") },
   { C_STRING_WITH_LEN("DAEMON") },
   { C_STRING_WITH_LEN("INFORMATION SCHEMA") },
-  { C_STRING_WITH_LEN("AUDIT") }
+  { C_STRING_WITH_LEN("AUDIT") },
+  { C_STRING_WITH_LEN("REPLICATION") },
 };
 
 extern int initialize_schema_table(st_plugin_int *plugin);
@@ -89,7 +82,8 @@ static int min_plugin_info_interface_ver
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
   MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
-  MYSQL_AUDIT_INTERFACE_VERSION
+  MYSQL_AUDIT_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION
 };
 static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
 {
@@ -98,7 +92,8 @@ static int cur_plugin_info_interface_ver
   MYSQL_FTPARSER_INTERFACE_VERSION,
   MYSQL_DAEMON_INTERFACE_VERSION,
   MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
-  MYSQL_AUDIT_INTERFACE_VERSION
+  MYSQL_AUDIT_INTERFACE_VERSION,
+  MYSQL_REPLICATION_INTERFACE_VERSION
 };
 
 static bool initialized= 0;

=== modified file 'sql/sql_plugin.h'
--- a/sql/sql_plugin.h	2008-06-27 20:56:54 +0000
+++ b/sql/sql_plugin.h	2008-09-11 14:28:48 +0000
@@ -18,6 +18,14 @@
 
 class sys_var;
 
+#ifdef DBUG_OFF
+#define plugin_ref_to_int(A) A
+#define plugin_int_to_ref(A) A
+#else
+#define plugin_ref_to_int(A) (A ? A[0] : NULL)
+#define plugin_int_to_ref(A) &(A)
+#endif
+
 /*
   the following flags are valid for plugin_init()
 */

=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc	2008-08-22 06:48:55 +0000
+++ b/sql/sql_repl.cc	2008-09-11 14:28:48 +0000
@@ -21,6 +21,7 @@
 #include "log_event.h"
 #include "rpl_filter.h"
 #include <my_dir.h>
+#include "rpl_handler.h"
 
 int max_binlog_dump_events = 0; // unlimited
 my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -378,11 +379,36 @@ static int send_heartbeat_event(NET* net
   {
     DBUG_RETURN(-1);
   }
-  packet->set("\0", 1, &my_charset_bin);
   DBUG_RETURN(0);
 }
 
 /*
+  Reset thread transmit packet buffer for event sending
+
+  This function allocates header bytes for event transmission, and
+  should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+                                 ulong *ev_offset, const char **errmsg)
+{
+  int ret= 0;
+  String *packet= &thd->packet;
+
+  /* reserve and set default header */
+  packet->length(0);
+  packet->set("\0", 1, &my_charset_bin);
+
+  if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+  {
+    *errmsg= "Failed to run hook 'reserve_header'";
+    my_errno= ER_UNKNOWN_ERROR;
+    ret= 1;
+  }
+  *ev_offset= packet->length();
+  return ret;
+}
+
+/*
   TODO: Clean up loop to only have one call to send_file()
 */
 
@@ -392,6 +418,9 @@ void mysql_binlog_send(THD* thd, char* l
   LOG_INFO linfo;
   char *log_file_name = linfo.log_file_name;
   char search_file_name[FN_REFLEN], *name;
+
+  ulong ev_offset;
+
   IO_CACHE log;
   File file = -1;
   String* packet = &thd->packet;
@@ -407,6 +436,9 @@ void mysql_binlog_send(THD* thd, char* l
   DBUG_PRINT("enter",("log_ident: '%s'  pos: %ld", log_ident, (long) pos));
 
   bzero((char*) &log,sizeof(log));
+  sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+                        thd->server_id, log_ident, (ulong)pos);
+
   /* 
      heartbeat_period from @master_heartbeat_period user variable
   */
@@ -423,6 +455,13 @@ void mysql_binlog_send(THD* thd, char* l
     coord->file_name= log_file_name; // initialization basing on what slave remembers
     coord->pos= pos;
   }
+  if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+  {
+    errmsg= "Failed to run hook 'transmit_start'";
+    my_errno= ER_UNKNOWN_ERROR;
+    goto err;
+  }
+  
 #ifndef DBUG_OFF
   if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
   {
@@ -477,11 +516,9 @@ impossible position";
     goto err;
   }
 
-  /*
-    We need to start a packet with something other than 255
-    to distinguish it from error
-  */
-  packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+  /* reset transmit packet for the fake rotate event below */
+  if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+    goto err;
 
   /*
     Tell the client about the log name with a fake Rotate event;
@@ -521,7 +558,7 @@ impossible position";
     my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
     goto err;
   }
-  packet->set("\0", 1, &my_charset_bin);
+
   /*
     Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
     this larger than the corresponding packet (query) sent 
@@ -537,6 +574,11 @@ impossible position";
   log_lock = mysql_bin_log.get_log_lock();
   if (pos > BIN_LOG_HEADER_SIZE)
   {
+    /* reset transmit packet for the event read from binary log
+       file */
+    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+      goto err;
+
      /*
        Try to find a Format_description_log_event at the beginning of
        the binlog
@@ -545,28 +587,28 @@ impossible position";
      {
        /*
          The packet has offsets equal to the normal offsets in a binlog
-         event +1 (the first character is \0).
+         event + ev_offset (the first character is \0).
        */
        DBUG_PRINT("info",
                   ("Looked for a Format_description_log_event, found event type %d",
-                   (*packet)[EVENT_TYPE_OFFSET+1]));
-       if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+                   (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+       if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
        {
-         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+         binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                        LOG_EVENT_BINLOG_IN_USE_F);
-         (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+         (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
          /*
            mark that this event with "log_pos=0", so the slave
            should not increment master's binlog position
            (rli->group_master_log_pos)
          */
-         int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+         int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
          /*
            if reconnect master sends FD event with `created' as 0
            to avoid destroying temp tables.
           */
          int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
-                   ST_CREATED_OFFSET+1, (ulong) 0);
+                   ST_CREATED_OFFSET+ev_offset, (ulong) 0);
          /* send it */
          if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
          {
@@ -592,8 +634,6 @@ impossible position";
          Format_description_log_event will be found naturally if it is written.
        */
      }
-     /* reset the packet as we wrote to it in any case */
-     packet->set("\0", 1, &my_charset_bin);
   } /* end of if (pos > BIN_LOG_HEADER_SIZE); */
   else
   {
@@ -605,6 +645,12 @@ impossible position";
 
   while (!net->error && net->vio != 0 && !thd->killed)
   {
+    Log_event_type event_type= UNKNOWN_EVENT;
+
+    /* reset the transmit packet for the event read from binary log
+       file */
+    if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+      goto err;
     while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
     {
 #ifndef DBUG_OFF
@@ -620,17 +666,26 @@ impossible position";
         log's filename does not change while it's active
       */
       if (coord)
-        coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
+        coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
 
-      if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+      event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+      if (event_type == FORMAT_DESCRIPTION_EVENT)
       {
-        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+        binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
                                       LOG_EVENT_BINLOG_IN_USE_F);
-        (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+        (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
       }
-      else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+      else if (event_type == STOP_EVENT)
         binlog_can_be_corrupted= FALSE;
 
+      pos = my_b_tell(&log);
+      if (RUN_HOOK(binlog_transmit, before_send_event,
+                   (thd, flags, packet, log_file_name, pos)))
+      {
+        my_errno= ER_UNKNOWN_ERROR;
+        errmsg= "run 'before_send_event' hook failed";
+        goto err;
+      }
       if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
       {
 	errmsg = "Failed on my_net_write()";
@@ -638,9 +693,8 @@ impossible position";
 	goto err;
       }
 
-      DBUG_PRINT("info", ("log event code %d",
-			  (*packet)[LOG_EVENT_OFFSET+1] ));
-      if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+      DBUG_PRINT("info", ("log event code %d", event_type));
+      if (event_type == LOAD_EVENT)
       {
 	if (send_file(thd))
 	{
@@ -649,7 +703,17 @@ impossible position";
 	  goto err;
 	}
       }
-      packet->set("\0", 1, &my_charset_bin);
+
+      if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+      {
+        errmsg= "Failed to run hook 'after_send_event'";
+        my_errno= ER_UNKNOWN_ERROR;
+        goto err;
+      }
+
+      /* reset transmit packet for next loop */
+      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+        goto err;
     }
 
     /*
@@ -700,6 +764,11 @@ impossible position";
 	}
 #endif
 
+        /* reset the transmit packet for the event read from binary log
+           file */
+        if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+          goto err;
+        
 	/*
 	  No one will update the log while we are reading
 	  now, but we'll be quick and just read one record
@@ -753,6 +822,9 @@ impossible position";
                   sql_print_information("the rest of heartbeat info skipped ...");
               }
 #endif
+              /* reset transmit packet for the heartbeat event */
+              if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+                goto err;
               if (send_heartbeat_event(net, packet, coord))
               {
                 errmsg = "Failed on my_net_write()";
@@ -778,8 +850,17 @@ impossible position";
 	}
 
 	if (read_packet)
-	{
-	  thd_proc_info(thd, "Sending binlog event to slave");
+        {
+          thd_proc_info(thd, "Sending binlog event to slave");
+          pos = my_b_tell(&log);
+          if (RUN_HOOK(binlog_transmit, before_send_event,
+                       (thd, flags, packet, log_file_name, pos)))
+          {
+            my_errno= ER_UNKNOWN_ERROR;
+            errmsg= "run 'before_send_event' hook failed";
+            goto err;
+          }
+	  
 	  if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
 	  {
 	    errmsg = "Failed on my_net_write()";
@@ -787,7 +868,7 @@ impossible position";
 	    goto err;
 	  }
 
-	  if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+	  if (event_type == LOAD_EVENT)
 	  {
 	    if (send_file(thd))
 	    {
@@ -796,11 +877,13 @@ impossible position";
 	      goto err;
 	    }
 	  }
-	  packet->set("\0", 1, &my_charset_bin);
-	  /*
-	    No need to net_flush because we will get to flush later when
-	    we hit EOF pretty quick
-	  */
+
+          if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+          {
+            my_errno= ER_UNKNOWN_ERROR;
+            errmsg= "Failed to run hook 'after_send_event'";
+            goto err;
+          }
 	}
 
 	if (fatal_error)
@@ -836,6 +919,10 @@ impossible position";
       end_io_cache(&log);
       (void) my_close(file, MYF(MY_WME));
 
+      /* reset transmit packet for the possible fake rotate event */
+      if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+        goto err;
+      
       /*
         Call fake_rotate_event() in case the previous log (the one which
         we have just finished reading) did not contain a Rotate event
@@ -853,8 +940,6 @@ impossible position";
 	goto err;
       }
 
-      packet->length(0);
-      packet->append('\0');
       if (coord)
         coord->file_name= log_file_name; // reset to the next
     }
@@ -864,6 +949,7 @@ end:
   end_io_cache(&log);
   (void)my_close(file, MYF(MY_WME));
 
+  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
   my_eof(thd);
   thd_proc_info(thd, "Waiting to finalize termination");
   pthread_mutex_lock(&LOCK_thread_count);
@@ -874,6 +960,7 @@ end:
 err:
   thd_proc_info(thd, "Waiting to finalize termination");
   end_io_cache(&log);
+  RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
   /*
     Exclude  iteration through thread list
     this is needed for purge_logs() - it will iterate through
@@ -1134,6 +1221,7 @@ int reset_slave(THD *thd, Master_info* m
     goto err;
   }
 
+  RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
 err:
   unlock_slave_threads(mi);
   if (error)
@@ -1416,7 +1504,11 @@ int reset_master(THD* thd)
                ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
     return 1;
   }
-  return mysql_bin_log.reset_logs(thd);
+
+  if (mysql_bin_log.reset_logs(thd))
+    return 1;
+  RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+  return 0;
 }
 
 int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1926,5 +2018,3 @@ int init_replication_sys_vars()
 }
 
 #endif /* HAVE_REPLICATION */
-
-

Thread
bzr push into mysql-6.0-sea branch (hezx:2682) Bug#11714 Bug#20129 Bug#21579Bug#26180 Bug#26687 Bug#27526 Bug#29825 Bug#30129 Bug#30394 Bug#31048Bug#3...He Zhenxing12 Sep