From: He Zhenxing Date: September 12 2008 1:43am Subject: bzr push into mysql-6.0-sea branch (hezx:2682) Bug#11714 Bug#20129 Bug#21579 Bug#26180 Bug#26687 Bug#27526 Bug#29825 Bug#30129 Bug#30394 Bug#31048 Bug#31315 Bug#32167 Bug#32426 Bug#32653 Bug#32654 Bug#33031 Bug#33300 Bug#33414 Bug#33702 Bug#33907 Bug#34604 Bug#34779 Bug#35117 Bug#35161 Bug#35578 Bug#35602 Bug#35616 Bug#35765 Bug#35807 Bug#36135 Bug#36270 Bug#36399 Bug#36510 Bug#36579 Bug#36597 Bug#36600 Bug#36638 Bug#36639 Bug#36793 Bug#36941 Bug#36942 Bug#37051 Bug#37226 Bug#37277 Bug#37301 Bug#37310 Bug#37337 Bug#37402 Bug#37428 Bug#37531 Bug#37537 Bug#37548 Bug#37799 Bug#37853 Bug#38039 Bug#38041 Bug#38043 Bug#38044 Bug#38120 Bug#38185 Bug#38195 Bug#38270 Bug#38272 Bug#38291 Bug#38296 Bug#38377 Bug#38560 Bug#38624 Bug#38696 Bug#38766 Bug#38798 Bug#38824 Bug#38843 Bug#38891 Bug#38933 Bug#38947 Bug#39017 Bug#39025 Bug#39059 WL#1213 WL#4048 WL#4073 WL#4380 WL#4398 WL#4448 WL#4454 List-Archive: http://lists.mysql.com/commits/53909 X-Bug: 11714, 20129, 21579, 26180, 26687, 27526, 29825, 30129, 30394, 31048, 31315, 32167, 32426, 32653, 32654, 33031, 33300, 33414, 33702, 33907, 34604, 34779, 35117, 35161, 35578, 35602, 35616, 35765, 35807, 36135, 36270, 36399, 36510, 36579, 36597, 36600, 36638, 36639, 36793, 36941, 36942, 37051, 37226, 37277, 37301, 37310, 37337, 37402, 37428, 37531, 37537, 37548, 37799, 37853, 38039, 38041, 38043, 38044, 38120, 38185, 38195, 38270, 38272, 38291, 38296, 38377, 38560, 38624, 38696, 38766, 38798, 38824, 38843, 38891, 38933, 38947, 39017, 39025, 39059 Message-Id: <200809120143.m8C1hVj6009086@mail.hezx.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 2682 He Zhenxing 2008-09-12 [merge] Auto merge Update plugin.h.pp for WL#4398 removed: include/mysql_h.ic mysql-test/r/concurrent_innodb.result mysql-test/r/innodb-autoinc.result mysql-test/r/innodb_bug34300.result mysql-test/r/innodb_bug35220.result mysql-test/std_data/ndb_backup51_data_be/BACKUP-1-0.1.Data mysql-test/std_data/ndb_backup51_data_le/BACKUP-1-0.2.Data mysql-test/suite/rpl/t/rpl_view-slave.opt mysql-test/t/concurrent_innodb-master.opt mysql-test/t/concurrent_innodb.test mysql-test/t/innodb-autoinc.test mysql-test/t/innodb_bug34300.test mysql-test/t/innodb_bug35220.test added: include/mysql.h.pp include/mysql/plugin.h.pp mysql-test/include/have_case_insensitive_file_system.inc mysql-test/include/have_lowercase2.inc mysql-test/include/ps_ddl_1.inc mysql-test/r/backup_db_grants.result mysql-test/r/backup_default.result mysql-test/r/backup_timeout.result mysql-test/r/backup_view_on_view.result mysql-test/r/case_insensitive_file_system.require mysql-test/r/concurrent_innodb_safelog.result mysql-test/r/concurrent_innodb_unsafelog.result mysql-test/r/innodb-autoinc.result mysql-test/r/innodb_bug34300.result mysql-test/r/innodb_bug35220.result mysql-test/r/skip_log_bin.result mysql-test/std_data/corrupt_t1#P#p1.MYI mysql-test/std_data/corrupt_t1.MYI mysql-test/std_data/ndb_backup51_data_be/BACKUP-1-0.1.Data mysql-test/std_data/ndb_backup51_data_le/BACKUP-1-0.2.Data mysql-test/std_data/parts/t1_will_crash#P#p1_first_1024.MYD mysql-test/std_data/parts/t1_will_crash#P#p2.MYD mysql-test/std_data/parts/t1_will_crash#P#p2.MYI mysql-test/std_data/parts/t1_will_crash#P#p3.MYI mysql-test/std_data/parts/t1_will_crash#P#p4.MYI mysql-test/std_data/parts/t1_will_crash#P#p6.MYD mysql-test/std_data/parts/t1_will_crash#P#p6_2.MYD mysql-test/std_data/parts/t1_will_crash#P#p6_3.MYD mysql-test/suite/parts/inc/partition_mgm.inc mysql-test/suite/parts/r/partition_mgm_lc0_archive.result mysql-test/suite/parts/r/partition_mgm_lc0_innodb.result mysql-test/suite/parts/r/partition_mgm_lc0_memory.result mysql-test/suite/parts/r/partition_mgm_lc0_myisam.result mysql-test/suite/parts/r/partition_mgm_lc0_ndb.result mysql-test/suite/parts/r/partition_mgm_lc1_archive.result mysql-test/suite/parts/r/partition_mgm_lc1_innodb.result mysql-test/suite/parts/r/partition_mgm_lc1_memory.result mysql-test/suite/parts/r/partition_mgm_lc1_myisam.result mysql-test/suite/parts/r/partition_mgm_lc1_ndb.result mysql-test/suite/parts/r/partition_mgm_lc2_archive.result mysql-test/suite/parts/r/partition_mgm_lc2_innodb.result mysql-test/suite/parts/r/partition_mgm_lc2_memory.result mysql-test/suite/parts/r/partition_mgm_lc2_myisam.result mysql-test/suite/parts/r/partition_mgm_lc2_ndb.result mysql-test/suite/parts/r/partition_recover_myisam.result mysql-test/suite/parts/r/partition_repair_myisam.result mysql-test/suite/parts/t/partition_mgm_lc0_archive.test mysql-test/suite/parts/t/partition_mgm_lc0_innodb.test mysql-test/suite/parts/t/partition_mgm_lc0_memory.test mysql-test/suite/parts/t/partition_mgm_lc0_myisam.test mysql-test/suite/parts/t/partition_mgm_lc0_ndb.test mysql-test/suite/parts/t/partition_mgm_lc1_archive-master.opt mysql-test/suite/parts/t/partition_mgm_lc1_archive.test mysql-test/suite/parts/t/partition_mgm_lc1_innodb-master.opt mysql-test/suite/parts/t/partition_mgm_lc1_innodb.test mysql-test/suite/parts/t/partition_mgm_lc1_memory-master.opt mysql-test/suite/parts/t/partition_mgm_lc1_memory.test mysql-test/suite/parts/t/partition_mgm_lc1_myisam-master.opt mysql-test/suite/parts/t/partition_mgm_lc1_myisam.test mysql-test/suite/parts/t/partition_mgm_lc1_ndb-master.opt mysql-test/suite/parts/t/partition_mgm_lc1_ndb.test mysql-test/suite/parts/t/partition_mgm_lc2_archive-master.opt mysql-test/suite/parts/t/partition_mgm_lc2_archive.test mysql-test/suite/parts/t/partition_mgm_lc2_innodb-master.opt mysql-test/suite/parts/t/partition_mgm_lc2_innodb.test mysql-test/suite/parts/t/partition_mgm_lc2_memory-master.opt mysql-test/suite/parts/t/partition_mgm_lc2_memory.test mysql-test/suite/parts/t/partition_mgm_lc2_myisam-master.opt mysql-test/suite/parts/t/partition_mgm_lc2_myisam.test mysql-test/suite/parts/t/partition_mgm_lc2_ndb-master.opt mysql-test/suite/parts/t/partition_mgm_lc2_ndb.test mysql-test/suite/parts/t/partition_recover_myisam-master.opt mysql-test/suite/parts/t/partition_recover_myisam.test mysql-test/suite/parts/t/partition_repair_myisam.test mysql-test/suite/parts/t/partition_special_innodb-master.opt mysql-test/suite/rpl/r/rpl_plugin_load.result mysql-test/suite/rpl/t/rpl_plugin_load-master.opt mysql-test/suite/rpl/t/rpl_plugin_load-slave.opt mysql-test/suite/rpl/t/rpl_plugin_load.test mysql-test/suite/rpl/t/rpl_row_err_daisychain-master.opt mysql-test/suite/rpl/t/rpl_row_err_daisychain-slave.opt mysql-test/suite/rpl/t/rpl_truncate_7ndb_2-master.opt mysql-test/t/backup_db_grants.test mysql-test/t/backup_default.test mysql-test/t/backup_timeout.test mysql-test/t/backup_view_on_view.test mysql-test/t/concurrent_innodb_safelog-master.opt mysql-test/t/concurrent_innodb_safelog.test mysql-test/t/concurrent_innodb_unsafelog-master.opt mysql-test/t/concurrent_innodb_unsafelog.test mysql-test/t/innodb-autoinc.test mysql-test/t/innodb_bug34300.test mysql-test/t/innodb_bug35220.test mysql-test/t/skip_log_bin-master.opt mysql-test/t/skip_log_bin.test sql/mysql_priv.h.pp win/build-vs9.bat win/build-vs9_x64.bat modified: .bzr-mysql/default.conf BUILD/check-cpu Makefile.am configure.in include/Makefile.am include/my_pthread.h include/my_sys.h include/myisam.h libmysql/CMakeLists.txt libmysql/dll.c libmysqld/lib_sql.cc mysql-test/Makefile.am mysql-test/extra/rpl_tests/rpl_ddl.test mysql-test/extra/rpl_tests/rpl_log.test mysql-test/include/concurrent.inc mysql-test/include/have_falcon.inc mysql-test/include/have_lowercase0.inc mysql-test/include/wait_for_slave_sql_error_and_skip.inc mysql-test/lib/mtr_report.pl mysql-test/r/backup_ddl_blocker.result mysql-test/r/backup_errors.result mysql-test/r/backup_security.result mysql-test/r/backup_views.result mysql-test/r/constraints.result mysql-test/r/csv.result mysql-test/r/federated.result mysql-test/r/func_regexp.result mysql-test/r/group_min_max.result mysql-test/r/handler_innodb.result mysql-test/r/have_utf8.require mysql-test/r/innodb-autoinc-optimize.result mysql-test/r/innodb.result mysql-test/r/innodb_mysql.result mysql-test/r/join.result mysql-test/r/log_tables.result mysql-test/r/lowercase0.require mysql-test/r/maria3.result mysql-test/r/metadata.result mysql-test/r/myisam.result mysql-test/r/partition.result mysql-test/r/partition_innodb.result mysql-test/r/partition_symlink.result mysql-test/r/ps_ddl.result mysql-test/r/show_check.result mysql-test/r/sp.result mysql-test/r/subselect.result mysql-test/r/subselect3.result mysql-test/r/subselect_no_mat.result mysql-test/r/subselect_no_opts.result mysql-test/r/subselect_no_semijoin.result mysql-test/r/symlink.result mysql-test/r/trigger-trans.result mysql-test/r/type_bit.result mysql-test/r/type_newdecimal.result mysql-test/suite/falcon/r/falcon_bug_22165.result mysql-test/suite/falcon/t/disabled.def mysql-test/suite/falcon/t/falcon_bug_22165.test mysql-test/suite/funcs_1/datadict/processlist_priv.inc mysql-test/suite/funcs_1/datadict/processlist_val.inc mysql-test/suite/funcs_1/r/processlist_priv_no_prot.result mysql-test/suite/funcs_1/r/processlist_priv_ps.result mysql-test/suite/funcs_1/r/processlist_val_no_prot.result mysql-test/suite/funcs_1/r/processlist_val_ps.result mysql-test/suite/jp/std_data/jisx0208_sjis2.dat mysql-test/suite/jp/t/disabled.def mysql-test/suite/ndb/r/ndb_partition_key.result mysql-test/suite/ndb/t/ndb_partition_key.test mysql-test/suite/parts/inc/partition_alter4.inc mysql-test/suite/parts/inc/partition_key_32col.inc mysql-test/suite/parts/r/partition_alter1_1_2_innodb.result mysql-test/suite/parts/r/partition_alter1_1_2_myisam.result mysql-test/suite/parts/r/partition_alter1_1_2_ndb.result mysql-test/suite/parts/r/partition_alter1_1_innodb.result mysql-test/suite/parts/r/partition_alter1_1_myisam.result mysql-test/suite/parts/r/partition_alter1_1_ndb.result mysql-test/suite/parts/r/partition_alter1_2_innodb.result mysql-test/suite/parts/r/partition_alter1_2_myisam.result mysql-test/suite/parts/r/partition_alter1_2_ndb.result mysql-test/suite/parts/r/partition_alter2_innodb.result mysql-test/suite/parts/r/partition_alter2_myisam.result mysql-test/suite/parts/r/partition_alter4_innodb.result mysql-test/suite/parts/r/partition_alter4_myisam.result mysql-test/suite/parts/r/partition_basic_innodb.result mysql-test/suite/parts/r/partition_basic_myisam.result mysql-test/suite/parts/r/partition_basic_symlink_myisam.result mysql-test/suite/parts/r/partition_engine_innodb.result mysql-test/suite/parts/r/partition_engine_myisam.result mysql-test/suite/parts/r/partition_special_innodb.result mysql-test/suite/parts/t/disabled.def mysql-test/suite/parts/t/partition_special_innodb.test mysql-test/suite/rpl/r/rpl_ddl.result mysql-test/suite/rpl/r/rpl_failed_optimize.result mysql-test/suite/rpl/r/rpl_innodb_mixed_dml.result mysql-test/suite/rpl/r/rpl_row_log.result mysql-test/suite/rpl/r/rpl_row_log_innodb.result mysql-test/suite/rpl/r/rpl_stm_log.result mysql-test/suite/rpl/t/disabled.def mysql-test/suite/rpl/t/rpl_stm_log-slave.opt mysql-test/t/backup_errors.test mysql-test/t/backup_views.test mysql-test/t/constraints.test mysql-test/t/csv.test mysql-test/t/disabled.def mysql-test/t/events_grant.test mysql-test/t/federated.test mysql-test/t/func_regexp.test mysql-test/t/group_min_max.test mysql-test/t/join.test mysql-test/t/log_tables.test mysql-test/t/lowercase_table3.test mysql-test/t/myisam.test mysql-test/t/partition.test mysql-test/t/partition_innodb.test mysql-test/t/partition_not_windows.test mysql-test/t/partition_symlink.test mysql-test/t/ps_ddl.test mysql-test/t/show_check.test mysql-test/t/sp.test mysql-test/t/subselect.test mysql-test/t/subselect3.test mysql-test/t/symlink.test mysql-test/t/type_bit.test mysql-test/t/type_newdecimal.test mysys/mf_pack.c mysys/my_static.c mysys/my_symlink.c mysys/safemalloc.c netware/BUILD/compile-linux-tools netware/BUILD/nwbootstrap netware/Makefile.am netware/mysql_install_db.c scripts/mysql_install_db.sh sql/backup/backup_aux.h sql/backup/backup_info.cc sql/backup/backup_info.h sql/backup/backup_test.cc sql/backup/be_default.cc sql/backup/image_info.cc sql/backup/image_info.h sql/backup/kernel.cc sql/backup/logger.cc sql/ddl_blocker.cc sql/field.h sql/ha_ndbcluster.cc sql/ha_partition.cc sql/ha_partition.h sql/handler.cc sql/handler.h sql/item.cc sql/item.h sql/item_cmpfunc.cc sql/mysql_priv.h sql/mysqld.cc sql/opt_range.cc sql/protocol.cc sql/set_var.cc sql/set_var.h sql/share/errmsg.txt sql/si_objects.cc sql/si_objects.h sql/sp_head.cc sql/sp_rcontext.cc sql/sp_rcontext.h sql/sql_class.cc sql/sql_class.h sql/sql_insert.cc sql/sql_parse.cc sql/sql_partition.cc sql/sql_plugin.cc sql/sql_select.cc sql/sql_show.cc sql/sql_table.cc sql/sql_yacc.yy sql/table.h storage/csv/ha_tina.cc storage/falcon/BackLog.cpp storage/falcon/CMakeLists.txt storage/falcon/Configuration.cpp storage/falcon/IO.cpp storage/falcon/Index.cpp storage/falcon/Index2RootPage.cpp storage/falcon/IndexRootPage.cpp storage/falcon/Interlock.h storage/falcon/MemMgr.cpp storage/falcon/RepositoryVolume.cpp storage/falcon/Statement.cpp storage/falcon/StorageDatabase.cpp storage/falcon/StorageDatabase.h storage/falcon/StorageTable.cpp storage/falcon/StorageTable.h storage/falcon/StorageTableShare.cpp storage/falcon/StorageTableShare.h storage/falcon/Table.cpp storage/falcon/Table.h storage/falcon/TableSpace.cpp storage/falcon/Transaction.cpp storage/falcon/Transaction.h storage/falcon/TransactionManager.cpp storage/falcon/ha_falcon.cpp storage/falcon/ha_falcon.h storage/falcon/plug.in storage/federated/ha_federated.cc storage/federated/ha_federated.h storage/innobase/ha/ha0ha.c storage/innobase/handler/ha_innodb.cc storage/innobase/lock/lock0lock.c storage/maria/CMakeLists.txt storage/myisam/mi_check.c storage/myisam/mi_create.c storage/myisam/mi_open.c storage/myisam/mi_static.c storage/myisam/myisamchk.c storage/myisam/myisamdef.h strings/ctype-simple.c strings/ctype-sjis.c strings/decimal.c tests/mysql_client_test.c === modified file 'include/mysql/plugin.h' --- a/include/mysql/plugin.h 2008-06-28 11:00:59 +0000 +++ b/include/mysql/plugin.h 2008-09-11 14:36:17 +0000 @@ -16,6 +16,11 @@ #ifndef _my_plugin_h #define _my_plugin_h +/* size_t */ +#include + +typedef struct st_mysql MYSQL; + #ifdef __cplusplus class THD; class Item; @@ -66,7 +71,8 @@ typedef struct st_mysql_xid MYSQL_XID; #define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */ #define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */ #define MYSQL_AUDIT_PLUGIN 5 /* The Audit plugin type */ -#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */ +#define MYSQL_REPLICATION_PLUGIN 6 /* The replication plugin type */ +#define MYSQL_MAX_PLUGIN_TYPE_NUM 7 /* The number of plugin types */ /* We use the following strings to define licenses for plugins */ #define PLUGIN_LICENSE_PROPRIETARY 0 @@ -461,6 +467,19 @@ struct handlerton; /************************************************************************* + API for Replication plugin. (MYSQL_REPLICATION_PLUGIN) +*/ +#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100 + +/** + Replication plugin descriptor +*/ +struct Mysql_replication { + int interface_version; +}; + + +/************************************************************************* st_mysql_value struct for reading values from mysqld. Used by server variables framework to parse user-provided values. Will be used for arguments when implementing UDFs. @@ -613,6 +632,64 @@ void mysql_query_cache_invalidate4(MYSQL const char *key, unsigned int key_length, int using_trx); +/** + Get the value of user variable as an integer. + + This function will return the value of variable @a name as an + integer. If the original value of the variable is not an integer, + the value will be converted into an integer. + + @param name user variable name + @param value pointer to return the value + @param null_value if not NULL, the function will set it to true if + the value of variable is null, set to false if not + + @retval 0 Success + @retval 1 Variable not found +*/ +int get_user_var_int(const char *name, + long long int *value, int *null_value); + +/** + Get the value of user variable as a double precision float number. + + This function will return the value of variable @a name as real + number. If the original value of the variable is not a real number, + the value will be converted into a real number. + + @param name user variable name + @param value pointer to return the value + @param null_value if not NULL, the function will set it to true if + the value of variable is null, set to false if not + + @retval 0 Success + @retval 1 Variable not found +*/ +int get_user_var_real(const char *name, + double *value, int *null_value); + +/** + Get the value of user variable as a string. + + This function will return the value of variable @a name as + string. If the original value of the variable is not a string, + the value will be converted into a string. + + @param name user variable name + @param value pointer to the value buffer + @param len length of the value buffer + @param precision precision of the value if it is a float number + @param null_value if not NULL, the function will set it to true if + the value of variable is null, set to false if not + + @retval 0 Success + @retval 1 Variable not found +*/ +int get_user_var_str(const char *name, + char *value, unsigned long len, + unsigned int precision, int *null_value); + + #ifdef __cplusplus } #endif === modified file 'include/mysql/plugin.h.pp' --- a/include/mysql/plugin.h.pp 2008-07-11 12:09:05 +0000 +++ b/include/mysql/plugin.h.pp 2008-09-12 01:35:24 +0000 @@ -1,3 +1,5 @@ +#include +typedef struct st_mysql MYSQL; struct st_mysql_lex_string { char *str; @@ -106,6 +108,9 @@ struct st_mysql_storage_engine int interface_version; }; struct handlerton; +struct Mysql_replication { + int interface_version; +}; struct st_mysql_value { int (*value_type)(struct st_mysql_value *); @@ -139,3 +144,10 @@ void thd_get_xid(const void* thd, MYSQL_ void mysql_query_cache_invalidate4(void* thd, const char *key, unsigned int key_length, int using_trx); +int get_user_var_int(const char *name, + long long int *value, int *null_value); +int get_user_var_real(const char *name, + double *value, int *null_value); +int get_user_var_str(const char *name, + char *value, unsigned long len, + unsigned int precision, int *null_value); === modified file 'libmysqld/CMakeLists.txt' --- a/libmysqld/CMakeLists.txt 2008-07-09 07:12:43 +0000 +++ b/libmysqld/CMakeLists.txt 2008-09-11 14:36:17 +0000 @@ -203,6 +203,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libm ../sql/scheduler.cc ../sql/sql_audit.cc ../sql/ddl_blocker.cc ../sql/si_objects.cc ../sql/event_parse_data.cc ../sql/mdl.cc + ../sql/rpl_handler.cc ${GEN_SOURCES} ${LIB_SOURCES}) === modified file 'libmysqld/Makefile.am' --- a/libmysqld/Makefile.am 2008-07-09 07:12:43 +0000 +++ b/libmysqld/Makefile.am 2008-09-11 14:28:48 +0000 @@ -82,7 +82,8 @@ sqlsources = derror.cc field.cc field_co sql_tablespace.cc \ rpl_injector.cc my_user.c partition_info.cc \ sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \ - event_parse_data.cc mdl.cc + event_parse_data.cc mdl.cc \ + rpl_handler.cc libmysqld_int_a_SOURCES= $(libmysqld_sources) nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources) === modified file 'sql/CMakeLists.txt' --- a/sql/CMakeLists.txt 2008-07-09 07:12:43 +0000 +++ b/sql/CMakeLists.txt 2008-09-11 14:28:48 +0000 @@ -78,6 +78,7 @@ ADD_EXECUTABLE(mysqld sql_connect.cc scheduler.cc ddl_blocker.cc si_objects.cc sql_profile.cc event_parse_data.cc mdl.cc + rpl_handler.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc ${PROJECT_SOURCE_DIR}/sql/sql_yacc.h ${PROJECT_SOURCE_DIR}/include/mysqld_error.h === modified file 'sql/Makefile.am' --- a/sql/Makefile.am 2008-07-09 07:12:43 +0000 +++ b/sql/Makefile.am 2008-09-11 14:36:17 +0000 @@ -89,7 +89,8 @@ noinst_HEADERS = item.h item_func.h item sql_partition.h partition_info.h partition_element.h \ probes.h sql_audit.h \ contributors.h sql_servers.h ddl_blocker.h \ - si_objects.h sql_plist.h mdl.h + si_objects.h sql_plist.h mdl.h \ + rpl_handler.h replication.h mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \ @@ -136,7 +137,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler. sql_builtin.cc sql_tablespace.cc partition_info.cc \ sql_servers.cc sql_audit.cc sha2.cc \ ddl_blocker.cc si_objects.cc event_parse_data.cc \ - mdl.cc + mdl.cc \ + rpl_handler.cc if HAVE_DTRACE mysqld_SOURCES += probes.d === modified file 'sql/handler.cc' --- a/sql/handler.cc 2008-09-03 14:40:19 +0000 +++ b/sql/handler.cc 2008-09-12 01:35:24 +0000 @@ -32,6 +32,8 @@ #include "ha_partition.h" #endif +#include "rpl_handler.h" + /* While we have legacy_db_type, we have this array to check for dups and to find handlerton from legacy_db_type. @@ -1144,6 +1146,7 @@ int ha_commit_trans(THD *thd, bool all) if (cookie) tc_log->unlog(cookie, xid); DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT();); + RUN_HOOK(transaction, after_commit, (thd, all)); end: if (rw_trans) start_waiting_global_read_lock(thd); @@ -1274,6 +1277,7 @@ int ha_rollback_trans(THD *thd, bool all push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_WARNING_NOT_COMPLETE_ROLLBACK, ER(ER_WARNING_NOT_COMPLETE_ROLLBACK)); + RUN_HOOK(transaction, after_rollback, (thd, all)); DBUG_RETURN(error); } @@ -1308,7 +1312,13 @@ int ha_autocommit_or_rollback(THD *thd, thd->variables.tx_isolation=thd->session_tx_isolation; } - + else + { + if (!thd->is_error()) + RUN_HOOK(transaction, after_commit, (thd, 0)); + else + RUN_HOOK(transaction, after_rollback, (thd, 0)); + } DBUG_RETURN(error); } === modified file 'sql/log.cc' --- a/sql/log.cc 2008-07-25 17:21:55 +0000 +++ b/sql/log.cc 2008-09-11 14:28:48 +0000 @@ -39,6 +39,7 @@ #endif #include +#include "rpl_handler.h" /* max size of the log message */ #define MAX_LOG_BUFFER_SIZE 1024 @@ -3588,9 +3589,11 @@ err: } -bool MYSQL_BIN_LOG::flush_and_sync() +bool MYSQL_BIN_LOG::flush_and_sync(bool *synced) { int err=0, fd=log_file.file; + if (synced) + *synced= 0; safe_mutex_assert_owner(&LOCK_log); if (flush_io_cache(&log_file)) return 1; @@ -3598,6 +3601,8 @@ bool MYSQL_BIN_LOG::flush_and_sync() { sync_binlog_counter= 0; err=my_sync(fd, MYF(MY_WME)); + if (synced) + *synced= 1; } return err; } @@ -3862,7 +3867,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_row if (file == &log_file) { - error= flush_and_sync(); + error= flush_and_sync(0); if (!error) { signal_update(); @@ -4048,14 +4053,21 @@ bool MYSQL_BIN_LOG::write(Log_event *eve if (event_info->write(file)) goto err; + error=0; if (file == &log_file) // we are writing to the real log (disk) { - if (flush_and_sync()) + bool synced; + if (flush_and_sync(&synced)) goto err; + + if (RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, file->pos_in_file, synced))) { + goto err; + } + signal_update(); rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED); } - error=0; err: if (error) @@ -4312,7 +4324,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE DBUG_ASSERT(carry == 0); if (sync_log) - flush_and_sync(); + flush_and_sync(0); return 0; // All OK } @@ -4398,7 +4410,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C if (commit_event && commit_event->write(&log_file)) goto err; - if (flush_and_sync()) + + bool synced; + if (flush_and_sync(&synced)) goto err; DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT();); if (cache->error) // Error on read @@ -4407,6 +4421,11 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C write_error=1; // Don't give more errors goto err; } + + if (RUN_HOOK(binlog_storage, after_flush, + (thd, log_file_name, log_file.pos_in_file, synced))) + goto err; + signal_update(); } === modified file 'sql/log.h' --- a/sql/log.h 2008-06-27 20:56:54 +0000 +++ b/sql/log.h 2008-09-11 14:28:48 +0000 @@ -364,7 +364,21 @@ public: bool is_active(const char* log_file_name); int update_log_index(LOG_INFO* linfo, bool need_update_threads); void rotate_and_purge(uint flags); - bool flush_and_sync(); + + /** + Flush binlog cache and synchronize to disk. + + This function flushes events in binlog cache to binary log file, + it will do synchronizing according to the setting of system + variable 'sync_binlog'. If file is synchronized, @c synced will + be set to 1, otherwise 0. + + @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0 + + @retval 0 Success + @retval other Failure + */ + bool flush_and_sync(bool *synced); int purge_logs(const char *to_log, bool included, bool need_mutex, bool need_update_threads, ulonglong *decrease_log_space); === modified file 'sql/mysqld.cc' --- a/sql/mysqld.cc 2008-08-26 15:01:13 +0000 +++ b/sql/mysqld.cc 2008-09-12 01:35:24 +0000 @@ -33,6 +33,8 @@ #include "rpl_injector.h" +#include "rpl_handler.h" + #ifdef HAVE_SYS_PRCTL_H #include #endif @@ -1348,6 +1350,7 @@ void clean_up(bool print_message) ha_end(); if (tc_log) tc_log->close(); + delegates_destroy(); xid_cache_free(); delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache); multi_keycache_free(); @@ -3905,6 +3908,13 @@ static int init_server_components() unireg_abort(1); } + /* initialize delegates for extension observers */ + if (delegates_init()) + { + sql_print_error("Initialize extension delegates failed"); + unireg_abort(1); + } + /* need to configure logging before initializing storage engines */ if (opt_update_log) { === added file 'sql/replication.h' --- a/sql/replication.h 1970-01-01 00:00:00 +0000 +++ b/sql/replication.h 2008-09-11 14:36:17 +0000 @@ -0,0 +1,461 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef REPLICATION_H +#define REPLICATION_H + +#ifdef __cplusplus +extern "C" { +#endif + +/** + Transaction observer flags. +*/ +enum Trans_flags { + /** Transaction is a real transaction */ + TRANS_IS_REAL_TRANS = 1 +}; + +/** + Transaction observer parameter +*/ +typedef struct Trans_param { + uint32 server_id; + uint32 flags; + + /* + The latest binary log file name and position written by current + transaction, if binary log is disabled or no log event has been + written into binary log file by current transaction (events + written into transaction log cache are not counted), these two + member will be zero. + */ + const char *log_file; + my_off_t log_pos; +} Trans_param; + +/** + Observes and extends transaction execution +*/ +typedef struct Trans_observer { + uint32 len; + + /** + This callback is called after transaction commit + + This callback is called right after commit to storage engines for + transactional tables. + + For non-transactional tables, this is called at the end of the + statement, before sending statement status, if the statement + succeeded. + + @note The return value is currently ignored by the server. + + @param param The parameter for transaction observers + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_commit)(Trans_param *param); + + /** + This callback is called after transaction rollback + + This callback is called right after rollback to storage engines + for transactional tables. + + For non-transactional tables, this is called at the end of the + statement, before sending statement status, if the statement + failed. + + @note The return value is currently ignored by the server. + + @param param The parameter for transaction observers + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_rollback)(Trans_param *param); +} Trans_observer; + +/** + Binlog storage flags +*/ +enum Binlog_storage_flags { + /** Binary log was sync:ed */ + BINLOG_STORAGE_IS_SYNCED = 1 +}; + +/** + Binlog storage observer parameters + */ +typedef struct Binlog_storage_param { + uint32 server_id; +} Binlog_storage_param; + +/** + Observe binlog logging storage +*/ +typedef struct Binlog_storage_observer { + uint32 len; + + /** + This callback is called after binlog has been flushed + + This callback is called after cached events have been flushed to + binary log file. Whether the binary log file is synchronized to + disk is indicated by the bit BINLOG_STORAGE_IS_SYNCED in @a flags. + + @param param Observer common parameter + @param log_file Binlog file name been updated + @param log_pos Binlog position after update + @param flags flags for binlog storage + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_flush)(Binlog_storage_param *param, + const char *log_file, my_off_t log_pos, + uint32 flags); +} Binlog_storage_observer; + +/** + Replication binlog transmitter (binlog dump) observer parameter. +*/ +typedef struct Binlog_transmit_param { + uint32 server_id; + uint32 flags; +} Binlog_transmit_param; + +/** + Observe and extends the binlog dumping thread. +*/ +typedef struct Binlog_transmit_observer { + uint32 len; + + /** + This callback is called when binlog dumping starts + + + @param param Observer common parameter + @param log_file Binlog file name to transmit from + @param log_pos Binlog position to transmit from + + @retval 0 Sucess + @retval 1 Failure + */ + int (*transmit_start)(Binlog_transmit_param *param, + const char *log_file, my_off_t log_pos); + + /** + This callback is called when binlog dumping stops + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*transmit_stop)(Binlog_transmit_param *param); + + /** + This callback is called to reserve bytes in packet header for event transmission + + This callback is called when resetting transmit packet header to + reserve bytes for this observer in packet header. + + The @a header buffer is allocated by the server code, and @a size + is the size of the header buffer. Each observer can only reserve + a maximum size of @a size in the header. + + @param param Observer common parameter + @param header Pointer of the header buffer + @param size Size of the header buffer + @param len Header length reserved by this observer + + @retval 0 Sucess + @retval 1 Failure + */ + int (*reserve_header)(Binlog_transmit_param *param, + unsigned char *header, + unsigned long size, + unsigned long *len); + + /** + This callback is called before sending an event packet to slave + + @param param Observer common parameter + @param packet Binlog event packet to send + @param len Length of the event packet + @param log_file Binlog file name of the event packet to send + @param log_pos Binlog position of the event packet to send + + @retval 0 Sucess + @retval 1 Failure + */ + int (*before_send_event)(Binlog_transmit_param *param, + unsigned char *packet, unsigned long len, + const char *log_file, my_off_t log_pos ); + + /** + This callback is called after sending an event packet to slave + + @param param Observer common parameter + @param event_buf Binlog event packet buffer sent + @param len length of the event packet buffer + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_send_event)(Binlog_transmit_param *param, + const char *event_buf, unsigned long len); + + /** + This callback is called after resetting master status + + This is called when executing the command RESET MASTER, and is + used to reset status variables added by observers. + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_reset_master)(Binlog_transmit_param *param); +} Binlog_transmit_observer; + +/** + Binlog relay IO flags +*/ +enum Binlog_relay_IO_flags { + /** Binary relay log was sync:ed */ + BINLOG_RELAY_IS_SYNCED = 1 +}; + + +/** + Replication binlog relay IO observer parameter +*/ +typedef struct Binlog_relay_IO_param { + uint32 server_id; + + /* Master host, user and port */ + char *host; + char *user; + unsigned int port; + + char *master_log_name; + my_off_t master_log_pos; + + MYSQL *mysql; /* the connection to master */ +} Binlog_relay_IO_param; + +/** + Observes and extends the service of slave IO thread. +*/ +typedef struct Binlog_relay_IO_observer { + uint32 len; + + /** + This callback is called when slave IO thread starts + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*thread_start)(Binlog_relay_IO_param *param); + + /** + This callback is called when slave IO thread stops + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*thread_stop)(Binlog_relay_IO_param *param); + + /** + This callback is called before slave requesting binlog transmission from master + + This is called before slave issuing BINLOG_DUMP command to master + to request binlog. + + @param param Observer common parameter + @param flags binlog dump flags + + @retval 0 Sucess + @retval 1 Failure + */ + int (*before_request_transmit)(Binlog_relay_IO_param *param, uint32 flags); + + /** + This callback is called after read an event packet from master + + @param param Observer common parameter + @param packet The event packet read from master + @param len Length of the event packet read from master + @param event_buf The event packet return after process + @param event_len The length of event packet return after process + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_read_event)(Binlog_relay_IO_param *param, + const char *packet, unsigned long len, + const char **event_buf, unsigned long *event_len); + + /** + This callback is called after written an event packet to relay log + + @param param Observer common parameter + @param event_buf Event packet written to relay log + @param event_len Length of the event packet written to relay log + @param flags flags for relay log + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_queue_event)(Binlog_relay_IO_param *param, + const char *event_buf, unsigned long event_len, + uint32 flags); + + /** + This callback is called after reset slave relay log IO status + + @param param Observer common parameter + + @retval 0 Sucess + @retval 1 Failure + */ + int (*after_reset_slave)(Binlog_relay_IO_param *param); +} Binlog_relay_IO_observer; + + +/** + Register a transaction observer + + @param observer The transaction observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_trans_observer(Trans_observer *observer, void *p); + +/** + Unregister a transaction observer + + @param observer The transaction observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_trans_observer(Trans_observer *observer, void *p); + +/** + Register a binlog storage observer + + @param observer The binlog storage observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p); + +/** + Unregister a binlog storage observer + + @param observer The binlog storage observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p); + +/** + Register a binlog transmit observer + + @param observer The binlog transmit observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p); + +/** + Unregister a binlog transmit observer + + @param observer The binlog transmit observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p); + +/** + Register a binlog relay IO (slave IO thread) observer + + @param observer The binlog relay IO observer to register + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer already exists +*/ +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p); + +/** + Unregister a binlog relay IO (slave IO thread) observer + + @param observer The binlog relay IO observer to unregister + @param p pointer to the internal plugin structure + + @retval 0 Sucess + @retval 1 Observer not exists +*/ +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p); + +/** + Connect to master + + This function can only used in the slave I/O thread context, and + will use the same master information to do the connection. + + @code + MYSQL *mysql = mysql_init(NULL); + if (rpl_connect_master(mysql)) + { + // do stuff with the connection + } + mysql_close(mysql); // close the connection + @endcode + + @param mysql address of MYSQL structure to use, pass NULL will + create a new one + + @return address of MYSQL structure on success, NULL on failure +*/ +MYSQL *rpl_connect_master(MYSQL *mysql); + +#ifdef __cplusplus +} +#endif +#endif /* REPLICATION_H */ === added file 'sql/rpl_handler.cc' --- a/sql/rpl_handler.cc 1970-01-01 00:00:00 +0000 +++ b/sql/rpl_handler.cc 2008-09-11 14:36:17 +0000 @@ -0,0 +1,485 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "mysql_priv.h" + +#include "rpl_mi.h" +#include "sql_repl.h" +#include "log_event.h" +#include "rpl_filter.h" +#include +#include "rpl_handler.h" + +Trans_delegate *transaction_delegate; +Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +Binlog_transmit_delegate *binlog_transmit_delegate; +Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + structure to save transaction log filename and position +*/ +typedef struct Trans_binlog_info { + my_off_t log_pos; + char log_file[FN_REFLEN]; +} Trans_binlog_info; + +static pthread_key(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + +int get_user_var_int(const char *name, + long long int *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_int(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_real(const char *name, + double *value, int *null_value) +{ + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + *value= entry->val_real(&null_val); + if (null_value) + *null_value= null_val; + return 0; +} + +int get_user_var_str(const char *name, char *value, + size_t len, unsigned int precision, int *null_value) +{ + String str; + my_bool null_val; + user_var_entry *entry= + (user_var_entry*) hash_search(¤t_thd->user_vars, + (uchar*) name, strlen(name)); + if (!entry) + return 1; + entry->val_str(&null_val, &str, precision); + strncpy(value, str.c_ptr(), len); + if (null_value) + *null_value= null_val; + return 0; +} + +int delegates_init() +{ + static unsigned char trans_mem[sizeof(Trans_delegate)]; + static unsigned char storage_mem[sizeof(Binlog_storage_delegate)]; +#ifdef HAVE_REPLICATION + static unsigned char transmit_mem[sizeof(Binlog_transmit_delegate)]; + static unsigned char relay_io_mem[sizeof(Binlog_relay_IO_delegate)]; +#endif + + if (!(transaction_delegate= new (trans_mem) Trans_delegate) + || (!transaction_delegate->is_inited()) + || !(binlog_storage_delegate= new (storage_mem) Binlog_storage_delegate) + || (!binlog_storage_delegate->is_inited()) +#ifdef HAVE_REPLICATION + || !(binlog_transmit_delegate= new (transmit_mem) Binlog_transmit_delegate) + || (!binlog_transmit_delegate->is_inited()) + || !(binlog_relay_io_delegate= new (relay_io_mem) Binlog_relay_IO_delegate) + || (!binlog_relay_io_delegate->is_inited()) +#endif /* HAVE_REPLICATION */ + ) + return 1; + + if (pthread_key_create(&RPL_TRANS_BINLOG_INFO, NULL)) + return 1; + return 0; +} + +void delegates_destroy() +{ + transaction_delegate->~Trans_delegate(); + binlog_storage_delegate->~Binlog_storage_delegate(); +#ifdef HAVE_REPLICATION + binlog_transmit_delegate->~Binlog_transmit_delegate(); + binlog_relay_io_delegate->~Binlog_relay_IO_delegate(); +#endif /* HAVE_REPLICATION */ +} + +/* + This macro is used by almost all the Delegate methods to iterate + over all the observers running given callback function of the + delegate . + + Add observer plugins to the thd->lex list, after each statement, all + plugins add to thd->lex will be automatically unlocked. + */ +#define FOREACH_OBSERVER(r, f, thd, args) \ + param.server_id= thd->server_id; \ + read_lock(); \ + Observer_info_iterator iter= observer_info_iter(); \ + Observer_info *info= iter++; \ + for (; info; info= iter++) \ + { \ + plugin_ref plugin= \ + my_plugin_lock(thd, &info->plugin); \ + if (!plugin) \ + { \ + r= 1; \ + break; \ + } \ + if (((Observer *)info->observer)->f \ + && ((Observer *)info->observer)->f args) \ + { \ + r= 1; \ + break; \ + } \ + } \ + unlock() + + +int Trans_delegate::after_commit(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Trans_delegate::after_rollback(THD *thd, bool all) +{ + Trans_param param; + bool is_real_trans= (all || thd->transaction.all.ha_list == 0); + if (is_real_trans) + param.flags |= TRANS_IS_REAL_TRANS; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + param.log_file= log_info ? log_info->log_file : 0; + param.log_pos= log_info ? log_info->log_pos : 0; + + int ret= 0; + FOREACH_OBSERVER(ret, after_commit, thd, (¶m)); + + /* + This is the end of a real transaction or autocommit statement, we + can free the memory allocated for binlog file and position. + */ + if (is_real_trans && log_info) + { + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, NULL); + my_free(log_info, MYF(0)); + } + return ret; +} + +int Binlog_storage_delegate::after_flush(THD *thd, + const char *log_file, + my_off_t log_pos, + bool synced) +{ + Binlog_storage_param param; + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + Trans_binlog_info *log_info= + my_pthread_getspecific_ptr(Trans_binlog_info*, RPL_TRANS_BINLOG_INFO); + + if (!log_info) + { + if(!(log_info= + (Trans_binlog_info *)my_malloc(sizeof(Trans_binlog_info), MYF(0)))) + return 1; + my_pthread_setspecific_ptr(RPL_TRANS_BINLOG_INFO, log_info); + } + + strcpy(log_info->log_file, log_file+dirname_length(log_file)); + log_info->log_pos = log_pos; + + int ret= 0; + FOREACH_OBSERVER(ret, after_flush, thd, + (¶m, log_info->log_file, log_info->log_pos, flags)); + return ret; +} + +#ifdef HAVE_REPLICATION +int Binlog_transmit_delegate::transmit_start(THD *thd, ushort flags, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_start, thd, (¶m, log_file, log_pos)); + return ret; +} + +int Binlog_transmit_delegate::transmit_stop(THD *thd, ushort flags) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, transmit_stop, thd, (¶m)); + return ret; +} + +int Binlog_transmit_delegate::reserve_header(THD *thd, ushort flags, + String *packet) +{ + /* NOTE2ME: Maximum extra header size for each observer, I hope 32 + bytes should be enough for each Observer to reserve their extra + header. If later found this is not enough, we can increase this + /HEZX + */ +#define RESERVE_HEADER_SIZE 32 + unsigned char header[RESERVE_HEADER_SIZE]; + ulong hlen; + Binlog_transmit_param param; + param.flags= flags; + param.server_id= thd->server_id; + + int ret= 0; + read_lock(); + Observer_info_iterator iter= observer_info_iter(); + Observer_info *info= iter++; + for (; info; info= iter++) + { + plugin_ref plugin= + my_plugin_lock(thd, &info->plugin); + if (!plugin) + { + ret= 1; + break; + } + hlen= 0; + if (((Observer *)info->observer)->reserve_header + && ((Observer *)info->observer)->reserve_header(¶m, + header, + RESERVE_HEADER_SIZE, + &hlen)) + { + ret= 1; + break; + } + if (hlen == 0) + continue; + if (hlen > RESERVE_HEADER_SIZE || packet->append((char *)header, hlen)) + { + ret= 1; + break; + } + } + unlock(); + return ret; +} + +int Binlog_transmit_delegate::before_send_event(THD *thd, ushort flags, + String *packet, + const char *log_file, + my_off_t log_pos) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, before_send_event, thd, + (¶m, (uchar *)packet->c_ptr(), + packet->length(), + log_file+dirname_length(log_file), log_pos)); + return ret; +} + +int Binlog_transmit_delegate::after_send_event(THD *thd, ushort flags, + String *packet) +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_send_event, thd, + (¶m, packet->c_ptr(), packet->length())); + return ret; +} + +int Binlog_transmit_delegate::after_reset_master(THD *thd, ushort flags) + +{ + Binlog_transmit_param param; + param.flags= flags; + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_master, thd, (¶m)); + return ret; +} + +void Binlog_relay_IO_delegate::init_param(Binlog_relay_IO_param *param, + Master_info *mi) +{ + param->mysql= mi->mysql; + param->user= mi->user; + param->host= mi->host; + param->port= mi->port; + param->master_log_name= mi->master_log_name; + param->master_log_pos= mi->master_log_pos; +} + +int Binlog_relay_IO_delegate::thread_start(THD *thd, Master_info *mi) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_start, thd, (¶m)); + return ret; +} + + +int Binlog_relay_IO_delegate::thread_stop(THD *thd, Master_info *mi) +{ + + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, thread_stop, thd, (¶m)); + return ret; +} + +int Binlog_relay_IO_delegate::before_request_transmit(THD *thd, + Master_info *mi, + ushort flags) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, before_request_transmit, thd, (¶m, (uint32)flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, + ulong *event_len) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_read_event, thd, + (¶m, packet, len, event_buf, event_len)); + return ret; +} + +int Binlog_relay_IO_delegate::after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, + ulong event_len, + bool synced) +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + uint32 flags=0; + if (synced) + flags |= BINLOG_STORAGE_IS_SYNCED; + + int ret= 0; + FOREACH_OBSERVER(ret, after_queue_event, thd, + (¶m, event_buf, event_len, flags)); + return ret; +} + +int Binlog_relay_IO_delegate::after_reset_slave(THD *thd, Master_info *mi) + +{ + Binlog_relay_IO_param param; + init_param(¶m, mi); + + int ret= 0; + FOREACH_OBSERVER(ret, after_reset_slave, thd, (¶m)); + return ret; +} +#endif /* HAVE_REPLICATION */ + +int register_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_trans_observer(Trans_observer *observer, void *p) +{ + return transaction_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_storage_observer(Binlog_storage_observer *observer, void *p) +{ + return binlog_storage_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +#ifdef HAVE_REPLICATION +int register_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_transmit_observer(Binlog_transmit_observer *observer, void *p) +{ + return binlog_transmit_delegate->remove_observer(observer, (st_plugin_int *)p); +} + +int register_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->add_observer(observer, (st_plugin_int *)p); +} + +int unregister_binlog_relay_io_observer(Binlog_relay_IO_observer *observer, void *p) +{ + return binlog_relay_io_delegate->remove_observer(observer, (st_plugin_int *)p); +} +#endif /* HAVE_REPLICATION */ === added file 'sql/rpl_handler.h' --- a/sql/rpl_handler.h 1970-01-01 00:00:00 +0000 +++ b/sql/rpl_handler.h 2008-09-11 14:36:17 +0000 @@ -0,0 +1,213 @@ +/* Copyright (C) 2008 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef RPL_HANDLER_H +#define RPL_HANDLER_H + +#include "mysql_priv.h" +#include "rpl_mi.h" +#include "rpl_rli.h" +#include "sql_plugin.h" +#include "replication.h" + +class Observer_info { +public: + void *observer; + st_plugin_int *plugin_int; + plugin_ref plugin; + + Observer_info(void *ob, st_plugin_int *p) + :observer(ob), plugin_int(p) + { + plugin= plugin_int_to_ref(plugin_int); + } +}; + +class Delegate { +public: + typedef List Observer_info_list; + typedef List_iterator Observer_info_iterator; + + int add_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (!info) + { + info= new Observer_info(observer, plugin); + if (!info || observer_info_list.push_back(info, &memroot)) + ret= TRUE; + } + else + ret= TRUE; + unlock(); + return ret; + } + + int remove_observer(void *observer, st_plugin_int *plugin) + { + int ret= FALSE; + if (!inited) + return TRUE; + write_lock(); + Observer_info_iterator iter(observer_info_list); + Observer_info *info= iter++; + while (info && info->observer != observer) + info= iter++; + if (info) + iter.remove(); + else + ret= TRUE; + unlock(); + return ret; + } + + inline Observer_info_iterator observer_info_iter() + { + return Observer_info_iterator(observer_info_list); + } + + inline bool is_empty() + { + return observer_info_list.is_empty(); + } + + inline int read_lock() + { + if (!inited) + return TRUE; + return rw_rdlock(&lock); + } + + inline int write_lock() + { + if (!inited) + return TRUE; + return rw_wrlock(&lock); + } + + inline int unlock() + { + if (!inited) + return TRUE; + return rw_unlock(&lock); + } + + inline bool is_inited() + { + return inited; + } + + Delegate() + { + inited= FALSE; + if (my_rwlock_init(&lock, NULL)) + return; + init_sql_alloc(&memroot, 1024, 0); + inited= TRUE; + } + ~Delegate() + { + inited= FALSE; + rwlock_destroy(&lock); + free_root(&memroot, MYF(0)); + } + +private: + Observer_info_list observer_info_list; + rw_lock_t lock; + MEM_ROOT memroot; + bool inited; +}; + +class Trans_delegate + :public Delegate { +public: + typedef Trans_observer Observer; + int before_commit(THD *thd, bool all); + int before_rollback(THD *thd, bool all); + int after_commit(THD *thd, bool all); + int after_rollback(THD *thd, bool all); +}; + +class Binlog_storage_delegate + :public Delegate { +public: + typedef Binlog_storage_observer Observer; + int after_flush(THD *thd, const char *log_file, + my_off_t log_pos, bool synced); +}; + +#ifdef HAVE_REPLICATION +class Binlog_transmit_delegate + :public Delegate { +public: + typedef Binlog_transmit_observer Observer; + int transmit_start(THD *thd, ushort flags, + const char *log_file, my_off_t log_pos); + int transmit_stop(THD *thd, ushort flags); + int reserve_header(THD *thd, ushort flags, String *packet); + int before_send_event(THD *thd, ushort flags, + String *packet, const + char *log_file, my_off_t log_pos ); + int after_send_event(THD *thd, ushort flags, + String *packet); + int after_reset_master(THD *thd, ushort flags); +}; + +class Binlog_relay_IO_delegate + :public Delegate { +public: + typedef Binlog_relay_IO_observer Observer; + int thread_start(THD *thd, Master_info *mi); + int thread_stop(THD *thd, Master_info *mi); + int before_request_transmit(THD *thd, Master_info *mi, ushort flags); + int after_read_event(THD *thd, Master_info *mi, + const char *packet, ulong len, + const char **event_buf, ulong *event_len); + int after_queue_event(THD *thd, Master_info *mi, + const char *event_buf, ulong event_len, + bool synced); + int after_reset_slave(THD *thd, Master_info *mi); +private: + void init_param(Binlog_relay_IO_param *param, Master_info *mi); +}; +#endif /* HAVE_REPLICATION */ + +int delegates_init(); +void delegates_destroy(); + +extern Trans_delegate *transaction_delegate; +extern Binlog_storage_delegate *binlog_storage_delegate; +#ifdef HAVE_REPLICATION +extern Binlog_transmit_delegate *binlog_transmit_delegate; +extern Binlog_relay_IO_delegate *binlog_relay_io_delegate; +#endif /* HAVE_REPLICATION */ + +/* + if there is no observers in the delegate, we can return 0 + immediately. +*/ +#define RUN_HOOK(group, hook, args) \ + (group ##_delegate->is_empty() ? \ + 0 : group ##_delegate->hook args) + +#endif /* RPL_HANDLER_H */ === modified file 'sql/slave.cc' --- a/sql/slave.cc 2008-07-21 03:55:09 +0000 +++ b/sql/slave.cc 2008-09-11 14:36:17 +0000 @@ -39,6 +39,7 @@ #include #include #include +#include "rpl_handler.h" #ifdef HAVE_REPLICATION @@ -67,6 +68,8 @@ ulonglong relay_log_space_limit = 0; int disconnect_slave_event_count = 0, abort_slave_event_count = 0; int events_till_abort = -1; +static pthread_key(Master_info*, RPL_MASTER_INFO); + enum enum_slave_reconnect_actions { SLAVE_RECON_ACT_REG= 0, @@ -1499,17 +1502,22 @@ static int safe_sleep(THD* thd, int sec, } -static int request_dump(MYSQL* mysql, Master_info* mi, - bool *suppress_warnings) +static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, + bool *suppress_warnings) { uchar buf[FN_REFLEN + 10]; int len; - int binlog_flags = 0; // for now + ushort binlog_flags = 0; // for now char* logname = mi->master_log_name; DBUG_ENTER("request_dump"); *suppress_warnings= FALSE; + if (RUN_HOOK(binlog_relay_io, + before_request_transmit, + (thd, mi, binlog_flags))) + DBUG_RETURN(1); + // TODO if big log files: Change next to int8store() int4store(buf, (ulong) mi->master_log_pos); int2store(buf + 4, binlog_flags); @@ -2134,6 +2142,12 @@ pthread_handler_t handle_slave_io(void * mi->master_log_name, llstr(mi->master_log_pos,llbuff))); + /* This must be called before run any binlog_relay_io hooks */ + my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi); + + if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi))) + goto err; + if (!(mi->mysql = mysql = mysql_init(NULL))) { mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR, @@ -2208,7 +2222,7 @@ connected: while (!io_slave_killed(thd,mi)) { thd_proc_info(thd, "Requesting binlog dump"); - if (request_dump(mysql, mi, &suppress_warnings)) + if (request_dump(thd, mysql, mi, &suppress_warnings)) { sql_print_error("Failed on request_dump()"); if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \ @@ -2228,6 +2242,7 @@ requesting master dump") || goto err; goto connected; }); + const char *event_buf; while (!io_slave_killed(thd,mi)) { @@ -2283,10 +2298,27 @@ Stopping slave I/O thread due to out-of- retry_count=0; // ok event, reset retry counter thd_proc_info(thd, "Queueing master event to the relay log"); - if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len)) + event_buf= (const char*)mysql->net.read_pos + 1; + if (RUN_HOOK(binlog_relay_io, after_read_event, + (thd, mi,(const char*)mysql->net.read_pos + 1, + event_len, &event_buf, &event_len))) { + sql_print_error("Failed to run 'after_read_event' hook"); goto err; } + + /* XXX: 'synced' should be updated by queue_event to indicate + whether event has been synced to disk */ + bool synced= 0; + if (queue_event(mi, event_buf, event_len)) + { + goto err; + } + + if (RUN_HOOK(binlog_relay_io, after_queue_event, + (thd, mi, event_buf, event_len, synced))) + goto err; + if (flush_master_info(mi, 1)) { sql_print_error("Failed to flush master info file"); @@ -2304,6 +2336,7 @@ Stopping slave I/O thread due to out-of- for no reason, but this function will do a clean read, notice the clean value and exit immediately. */ + #ifndef DBUG_OFF { char llbuf1[22], llbuf2[22]; @@ -2333,6 +2366,7 @@ err: sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s", IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff)); pthread_mutex_lock(&LOCK_thread_count); + RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi)); thd->query = thd->db = 0; // extra safety thd->query_length= thd->db_length= 0; pthread_mutex_unlock(&LOCK_thread_count); @@ -3453,6 +3487,64 @@ static int safe_reconnect(THD* thd, MYSQ } +MYSQL *rpl_connect_master(MYSQL *mysql) +{ + THD *thd= current_thd; + Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO); + if (!mi) + { + sql_print_error("'rpl_connect_master' must be called in slave I/O thread context."); + return NULL; + } + + bool allocated= false; + + if (!mysql) + { + if(!(mysql= mysql_init(NULL))) + return NULL; + allocated= true; + } + + /* + XXX: copied from connect_to_master, this function should not + change the slave status, so we cannot use connect_to_master + directly + + TODO: make this part a seperate function to eliminate duplication + */ + mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout); + mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout); + +#ifdef HAVE_OPENSSL + if (mi->ssl) + { + mysql_ssl_set(mysql, + mi->ssl_key[0]?mi->ssl_key:0, + mi->ssl_cert[0]?mi->ssl_cert:0, + mi->ssl_ca[0]?mi->ssl_ca:0, + mi->ssl_capath[0]?mi->ssl_capath:0, + mi->ssl_cipher[0]?mi->ssl_cipher:0); + mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT, + &mi->ssl_verify_server_cert); + } +#endif + + mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname); + /* This one is not strictly needed but we have it here for completeness */ + mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir); + + if (io_slave_killed(thd, mi) + || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0, + mi->port, 0, 0)) + { + if (allocated) + mysql_close(mysql); // this will free the object + return NULL; + } + return mysql; +} + /* Store the file and position where the execute-slave thread are in the relay log. === modified file 'sql/sql_parse.cc' --- a/sql/sql_parse.cc 2008-09-04 13:46:04 +0000 +++ b/sql/sql_parse.cc 2008-09-12 01:35:24 +0000 @@ -21,6 +21,7 @@ #include #include #include +#include "rpl_handler.h" #include "sp_head.h" #include "sp.h" === modified file 'sql/sql_plugin.cc' --- a/sql/sql_plugin.cc 2008-08-19 15:58:48 +0000 +++ b/sql/sql_plugin.cc 2008-09-12 01:35:24 +0000 @@ -20,14 +20,6 @@ #define REPORT_TO_LOG 1 #define REPORT_TO_USER 2 -#ifdef DBUG_OFF -#define plugin_ref_to_int(A) A -#define plugin_int_to_ref(A) A -#else -#define plugin_ref_to_int(A) (A ? A[0] : NULL) -#define plugin_int_to_ref(A) &(A) -#endif - extern struct st_mysql_plugin *mysqld_builtins[]; char *opt_plugin_load= NULL; @@ -44,7 +36,8 @@ const LEX_STRING plugin_type_names[MYSQL { C_STRING_WITH_LEN("FTPARSER") }, { C_STRING_WITH_LEN("DAEMON") }, { C_STRING_WITH_LEN("INFORMATION SCHEMA") }, - { C_STRING_WITH_LEN("AUDIT") } + { C_STRING_WITH_LEN("AUDIT") }, + { C_STRING_WITH_LEN("REPLICATION") }, }; extern int initialize_schema_table(st_plugin_int *plugin); @@ -89,7 +82,8 @@ static int min_plugin_info_interface_ver MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION, MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION, - MYSQL_AUDIT_INTERFACE_VERSION + MYSQL_AUDIT_INTERFACE_VERSION, + MYSQL_REPLICATION_INTERFACE_VERSION }; static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]= { @@ -98,7 +92,8 @@ static int cur_plugin_info_interface_ver MYSQL_FTPARSER_INTERFACE_VERSION, MYSQL_DAEMON_INTERFACE_VERSION, MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION, - MYSQL_AUDIT_INTERFACE_VERSION + MYSQL_AUDIT_INTERFACE_VERSION, + MYSQL_REPLICATION_INTERFACE_VERSION }; static bool initialized= 0; === modified file 'sql/sql_plugin.h' --- a/sql/sql_plugin.h 2008-06-27 20:56:54 +0000 +++ b/sql/sql_plugin.h 2008-09-11 14:28:48 +0000 @@ -18,6 +18,14 @@ class sys_var; +#ifdef DBUG_OFF +#define plugin_ref_to_int(A) A +#define plugin_int_to_ref(A) A +#else +#define plugin_ref_to_int(A) (A ? A[0] : NULL) +#define plugin_int_to_ref(A) &(A) +#endif + /* the following flags are valid for plugin_init() */ === modified file 'sql/sql_repl.cc' --- a/sql/sql_repl.cc 2008-08-22 06:48:55 +0000 +++ b/sql/sql_repl.cc 2008-09-11 14:28:48 +0000 @@ -21,6 +21,7 @@ #include "log_event.h" #include "rpl_filter.h" #include +#include "rpl_handler.h" int max_binlog_dump_events = 0; // unlimited my_bool opt_sporadic_binlog_dump_fail = 0; @@ -378,11 +379,36 @@ static int send_heartbeat_event(NET* net { DBUG_RETURN(-1); } - packet->set("\0", 1, &my_charset_bin); DBUG_RETURN(0); } /* + Reset thread transmit packet buffer for event sending + + This function allocates header bytes for event transmission, and + should be called before store the event data to the packet buffer. +*/ +static int reset_transmit_packet(THD *thd, ushort flags, + ulong *ev_offset, const char **errmsg) +{ + int ret= 0; + String *packet= &thd->packet; + + /* reserve and set default header */ + packet->length(0); + packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet))) + { + *errmsg= "Failed to run hook 'reserve_header'"; + my_errno= ER_UNKNOWN_ERROR; + ret= 1; + } + *ev_offset= packet->length(); + return ret; +} + +/* TODO: Clean up loop to only have one call to send_file() */ @@ -392,6 +418,9 @@ void mysql_binlog_send(THD* thd, char* l LOG_INFO linfo; char *log_file_name = linfo.log_file_name; char search_file_name[FN_REFLEN], *name; + + ulong ev_offset; + IO_CACHE log; File file = -1; String* packet = &thd->packet; @@ -407,6 +436,9 @@ void mysql_binlog_send(THD* thd, char* l DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos)); bzero((char*) &log,sizeof(log)); + sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)", + thd->server_id, log_ident, (ulong)pos); + /* heartbeat_period from @master_heartbeat_period user variable */ @@ -423,6 +455,13 @@ void mysql_binlog_send(THD* thd, char* l coord->file_name= log_file_name; // initialization basing on what slave remembers coord->pos= pos; } + if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos))) + { + errmsg= "Failed to run hook 'transmit_start'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + #ifndef DBUG_OFF if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2)) { @@ -477,11 +516,9 @@ impossible position"; goto err; } - /* - We need to start a packet with something other than 255 - to distinguish it from error - */ - packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */ + /* reset transmit packet for the fake rotate event below */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; /* Tell the client about the log name with a fake Rotate event; @@ -521,7 +558,7 @@ impossible position"; my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG; goto err; } - packet->set("\0", 1, &my_charset_bin); + /* Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become this larger than the corresponding packet (query) sent @@ -537,6 +574,11 @@ impossible position"; log_lock = mysql_bin_log.get_log_lock(); if (pos > BIN_LOG_HEADER_SIZE) { + /* reset transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Try to find a Format_description_log_event at the beginning of the binlog @@ -545,28 +587,28 @@ impossible position"; { /* The packet has offsets equal to the normal offsets in a binlog - event +1 (the first character is \0). + event + ev_offset (the first character is \0). */ DBUG_PRINT("info", ("Looked for a Format_description_log_event, found event type %d", - (*packet)[EVENT_TYPE_OFFSET+1])); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + (*packet)[EVENT_TYPE_OFFSET+ev_offset])); + if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; /* mark that this event with "log_pos=0", so the slave should not increment master's binlog position (rli->group_master_log_pos) */ - int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0); + int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0); /* if reconnect master sends FD event with `created' as 0 to avoid destroying temp tables. */ int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+ - ST_CREATED_OFFSET+1, (ulong) 0); + ST_CREATED_OFFSET+ev_offset, (ulong) 0); /* send it */ if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { @@ -592,8 +634,6 @@ impossible position"; Format_description_log_event will be found naturally if it is written. */ } - /* reset the packet as we wrote to it in any case */ - packet->set("\0", 1, &my_charset_bin); } /* end of if (pos > BIN_LOG_HEADER_SIZE); */ else { @@ -605,6 +645,12 @@ impossible position"; while (!net->error && net->vio != 0 && !thd->killed) { + Log_event_type event_type= UNKNOWN_EVENT; + + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; while (!(error = Log_event::read_log_event(&log, packet, log_lock))) { #ifndef DBUG_OFF @@ -620,17 +666,26 @@ impossible position"; log's filename does not change while it's active */ if (coord) - coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET); + coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET); - if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT) + event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]); + if (event_type == FORMAT_DESCRIPTION_EVENT) { - binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] & + binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] & LOG_EVENT_BINLOG_IN_USE_F); - (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F; + (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F; } - else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT) + else if (event_type == STOP_EVENT) binlog_can_be_corrupted= FALSE; + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } if (my_net_write(net, (uchar*) packet->ptr(), packet->length())) { errmsg = "Failed on my_net_write()"; @@ -638,9 +693,8 @@ impossible position"; goto err; } - DBUG_PRINT("info", ("log event code %d", - (*packet)[LOG_EVENT_OFFSET+1] )); - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + DBUG_PRINT("info", ("log event code %d", event_type)); + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -649,7 +703,17 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + errmsg= "Failed to run hook 'after_send_event'"; + my_errno= ER_UNKNOWN_ERROR; + goto err; + } + + /* reset transmit packet for next loop */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; } /* @@ -700,6 +764,11 @@ impossible position"; } #endif + /* reset the transmit packet for the event read from binary log + file */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* No one will update the log while we are reading now, but we'll be quick and just read one record @@ -753,6 +822,9 @@ impossible position"; sql_print_information("the rest of heartbeat info skipped ..."); } #endif + /* reset transmit packet for the heartbeat event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; if (send_heartbeat_event(net, packet, coord)) { errmsg = "Failed on my_net_write()"; @@ -778,8 +850,17 @@ impossible position"; } if (read_packet) - { - thd_proc_info(thd, "Sending binlog event to slave"); + { + thd_proc_info(thd, "Sending binlog event to slave"); + pos = my_b_tell(&log); + if (RUN_HOOK(binlog_transmit, before_send_event, + (thd, flags, packet, log_file_name, pos))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "run 'before_send_event' hook failed"; + goto err; + } + if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) ) { errmsg = "Failed on my_net_write()"; @@ -787,7 +868,7 @@ impossible position"; goto err; } - if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) + if (event_type == LOAD_EVENT) { if (send_file(thd)) { @@ -796,11 +877,13 @@ impossible position"; goto err; } } - packet->set("\0", 1, &my_charset_bin); - /* - No need to net_flush because we will get to flush later when - we hit EOF pretty quick - */ + + if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet))) + { + my_errno= ER_UNKNOWN_ERROR; + errmsg= "Failed to run hook 'after_send_event'"; + goto err; + } } if (fatal_error) @@ -836,6 +919,10 @@ impossible position"; end_io_cache(&log); (void) my_close(file, MYF(MY_WME)); + /* reset transmit packet for the possible fake rotate event */ + if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg)) + goto err; + /* Call fake_rotate_event() in case the previous log (the one which we have just finished reading) did not contain a Rotate event @@ -853,8 +940,6 @@ impossible position"; goto err; } - packet->length(0); - packet->append('\0'); if (coord) coord->file_name= log_file_name; // reset to the next } @@ -864,6 +949,7 @@ end: end_io_cache(&log); (void)my_close(file, MYF(MY_WME)); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); my_eof(thd); thd_proc_info(thd, "Waiting to finalize termination"); pthread_mutex_lock(&LOCK_thread_count); @@ -874,6 +960,7 @@ end: err: thd_proc_info(thd, "Waiting to finalize termination"); end_io_cache(&log); + RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags)); /* Exclude iteration through thread list this is needed for purge_logs() - it will iterate through @@ -1134,6 +1221,7 @@ int reset_slave(THD *thd, Master_info* m goto err; } + RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi)); err: unlock_slave_threads(mi); if (error) @@ -1416,7 +1504,11 @@ int reset_master(THD* thd) ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG)); return 1; } - return mysql_bin_log.reset_logs(thd); + + if (mysql_bin_log.reset_logs(thd)) + return 1; + RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */)); + return 0; } int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1, @@ -1926,5 +2018,3 @@ int init_replication_sys_vars() } #endif /* HAVE_REPLICATION */ - -