#At file:///media/sda3/work/mysql/bzrwork/semisync/6.0-rpl-merge-new/
2688 He Zhenxing 2008-09-20
Apply patch of WL#4398 to mysql-6.0-rpl-merge
modified:
include/mysql/plugin.h
include/mysql/plugin.h.pp
libmysqld/CMakeLists.txt
libmysqld/Makefile.am
sql/CMakeLists.txt
sql/Makefile.am
sql/handler.cc
sql/log.cc
sql/log.h
sql/mysqld.cc
sql/slave.cc
sql/sql_parse.cc
sql/sql_plugin.cc
sql/sql_plugin.h
sql/sql_repl.cc
sql/transaction.cc
=== modified file 'include/mysql/plugin.h'
--- a/include/mysql/plugin.h 2008-06-28 11:00:59 +0000
+++ b/include/mysql/plugin.h 2008-09-20 09:17:36 +0000
@@ -16,6 +16,11 @@
#ifndef _my_plugin_h
#define _my_plugin_h
+/* size_t */
+#include <stdlib.h>
+
+typedef struct st_mysql MYSQL;
+
#ifdef __cplusplus
class THD;
class Item;
@@ -66,7 +71,8 @@ typedef struct st_mysql_xid MYSQL_XID;
#define MYSQL_DAEMON_PLUGIN 3 /* The daemon/raw plugin type */
#define MYSQL_INFORMATION_SCHEMA_PLUGIN 4 /* The I_S plugin type */
#define MYSQL_AUDIT_PLUGIN 5 /* The Audit plugin type */
-#define MYSQL_MAX_PLUGIN_TYPE_NUM 6 /* The number of plugin types */
+#define MYSQL_REPLICATION_PLUGIN 6 /* The replication plugin type */
+#define MYSQL_MAX_PLUGIN_TYPE_NUM 7 /* The number of plugin types */
/* We use the following strings to define licenses for plugins */
#define PLUGIN_LICENSE_PROPRIETARY 0
@@ -461,6 +467,19 @@ struct handlerton;
/*************************************************************************
+ API for Replication plugin. (MYSQL_REPLICATION_PLUGIN)
+*/
+#define MYSQL_REPLICATION_INTERFACE_VERSION 0x0100
+
+/**
+ Replication plugin descriptor
+*/
+struct Mysql_replication {
+ int interface_version;
+};
+
+
+/*************************************************************************
st_mysql_value struct for reading values from mysqld.
Used by server variables framework to parse user-provided values.
Will be used for arguments when implementing UDFs.
@@ -613,6 +632,64 @@ void mysql_query_cache_invalidate4(MYSQL
const char *key, unsigned int key_length,
int using_trx);
+/**
+ Get the value of user variable as an integer.
+
+ This function will return the value of variable @a name as an
+ integer. If the original value of the variable is not an integer,
+ the value will be converted into an integer.
+
+ @param name user variable name
+ @param value pointer to return the value
+ @param null_value if not NULL, the function will set it to true if
+ the value of variable is null, set to false if not
+
+ @retval 0 Success
+ @retval 1 Variable not found
+*/
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value);
+
+/**
+ Get the value of user variable as a double precision float number.
+
+ This function will return the value of variable @a name as real
+ number. If the original value of the variable is not a real number,
+ the value will be converted into a real number.
+
+ @param name user variable name
+ @param value pointer to return the value
+ @param null_value if not NULL, the function will set it to true if
+ the value of variable is null, set to false if not
+
+ @retval 0 Success
+ @retval 1 Variable not found
+*/
+int get_user_var_real(const char *name,
+ double *value, int *null_value);
+
+/**
+ Get the value of user variable as a string.
+
+ This function will return the value of variable @a name as
+ string. If the original value of the variable is not a string,
+ the value will be converted into a string.
+
+ @param name user variable name
+ @param value pointer to the value buffer
+ @param len length of the value buffer
+ @param precision precision of the value if it is a float number
+ @param null_value if not NULL, the function will set it to true if
+ the value of variable is null, set to false if not
+
+ @retval 0 Success
+ @retval 1 Variable not found
+*/
+int get_user_var_str(const char *name,
+ char *value, unsigned long len,
+ unsigned int precision, int *null_value);
+
+
#ifdef __cplusplus
}
#endif
=== modified file 'include/mysql/plugin.h.pp'
--- a/include/mysql/plugin.h.pp 2008-07-11 12:09:05 +0000
+++ b/include/mysql/plugin.h.pp 2008-09-20 09:17:36 +0000
@@ -1,3 +1,5 @@
+#include <stdlib.h>
+typedef struct st_mysql MYSQL;
struct st_mysql_lex_string
{
char *str;
@@ -106,6 +108,9 @@ struct st_mysql_storage_engine
int interface_version;
};
struct handlerton;
+struct Mysql_replication {
+ int interface_version;
+};
struct st_mysql_value
{
int (*value_type)(struct st_mysql_value *);
@@ -139,3 +144,10 @@ void thd_get_xid(const void* thd, MYSQL_
void mysql_query_cache_invalidate4(void* thd,
const char *key, unsigned int key_length,
int using_trx);
+int get_user_var_int(const char *name,
+ long long int *value, int *null_value);
+int get_user_var_real(const char *name,
+ double *value, int *null_value);
+int get_user_var_str(const char *name,
+ char *value, unsigned long len,
+ unsigned int precision, int *null_value);
=== modified file 'libmysqld/CMakeLists.txt'
--- a/libmysqld/CMakeLists.txt 2008-08-08 01:33:43 +0000
+++ b/libmysqld/CMakeLists.txt 2008-09-20 09:17:36 +0000
@@ -204,6 +204,7 @@ SET(LIBMYSQLD_SOURCES emb_qcache.cc libm
../sql/ddl_blocker.cc ../sql/si_objects.cc
../sql/event_parse_data.cc ../sql/mdl.cc
../sql/transaction.cc
+ ../sql/rpl_handler.cc
${GEN_SOURCES}
${LIB_SOURCES})
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am 2008-08-08 01:33:43 +0000
+++ b/libmysqld/Makefile.am 2008-09-20 09:17:36 +0000
@@ -81,7 +81,8 @@ sqlsources = derror.cc field.cc field_co
debug_sync.cc sql_tablespace.cc transaction.cc \
rpl_injector.cc my_user.c partition_info.cc \
sql_servers.cc ddl_blocker.cc si_objects.cc sql_audit.cc \
- event_parse_data.cc mdl.cc
+ event_parse_data.cc mdl.cc \
+ rpl_handler.cc
libmysqld_int_a_SOURCES= $(libmysqld_sources)
nodist_libmysqld_int_a_SOURCES= $(libmysqlsources) $(sqlsources)
=== modified file 'sql/CMakeLists.txt'
--- a/sql/CMakeLists.txt 2008-08-08 01:33:43 +0000
+++ b/sql/CMakeLists.txt 2008-09-20 09:17:36 +0000
@@ -78,6 +78,7 @@ ADD_EXECUTABLE(mysqld
sql_connect.cc scheduler.cc transaction.cc
ddl_blocker.cc si_objects.cc
sql_profile.cc event_parse_data.cc mdl.cc
+ rpl_handler.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.cc
${PROJECT_SOURCE_DIR}/sql/sql_yacc.h
${PROJECT_SOURCE_DIR}/include/mysqld_error.h
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2008-08-11 12:40:09 +0000
+++ b/sql/Makefile.am 2008-09-20 09:17:36 +0000
@@ -89,7 +89,8 @@ noinst_HEADERS = item.h item_func.h item
sql_partition.h partition_info.h partition_element.h \
probes.h sql_audit.h transaction.h \
contributors.h sql_servers.h ddl_blocker.h \
- si_objects.h sql_plist.h mdl.h records.h
+ si_objects.h sql_plist.h mdl.h records.h \
+ rpl_handler.h replication.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \
@@ -136,7 +137,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.
sql_builtin.cc sql_tablespace.cc partition_info.cc \
sql_servers.cc sql_audit.cc sha2.cc \
ddl_blocker.cc si_objects.cc event_parse_data.cc \
- mdl.cc transaction.cc
+ mdl.cc transaction.cc \
+ rpl_handler.cc
if HAVE_DTRACE
mysqld_SOURCES += probes.d
=== modified file 'sql/handler.cc'
--- a/sql/handler.cc 2008-09-04 18:30:34 +0000
+++ b/sql/handler.cc 2008-09-20 09:17:36 +0000
@@ -33,6 +33,8 @@
#include "ha_partition.h"
#endif
+#include "rpl_handler.h"
+
/*
While we have legacy_db_type, we have this array to
check for dups and to find handlerton from legacy_db_type.
@@ -1145,6 +1147,7 @@ int ha_commit_trans(THD *thd, bool all)
if (cookie)
tc_log->unlog(cookie, xid);
DBUG_EXECUTE_IF("crash_commit_after", DBUG_ABORT(););
+ RUN_HOOK(transaction, after_commit, (thd, all));
end:
if (rw_trans)
start_waiting_global_read_lock(thd);
@@ -1275,6 +1278,7 @@ int ha_rollback_trans(THD *thd, bool all
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_WARNING_NOT_COMPLETE_ROLLBACK,
ER(ER_WARNING_NOT_COMPLETE_ROLLBACK));
+ RUN_HOOK(transaction, after_rollback, (thd, all));
DBUG_RETURN(error);
}
=== modified file 'sql/log.cc'
--- a/sql/log.cc 2008-07-25 17:21:55 +0000
+++ b/sql/log.cc 2008-09-20 09:17:36 +0000
@@ -39,6 +39,7 @@
#endif
#include <mysql/plugin.h>
+#include "rpl_handler.h"
/* max size of the log message */
#define MAX_LOG_BUFFER_SIZE 1024
@@ -3588,9 +3589,11 @@ err:
}
-bool MYSQL_BIN_LOG::flush_and_sync()
+bool MYSQL_BIN_LOG::flush_and_sync(bool *synced)
{
int err=0, fd=log_file.file;
+ if (synced)
+ *synced= 0;
safe_mutex_assert_owner(&LOCK_log);
if (flush_io_cache(&log_file))
return 1;
@@ -3598,6 +3601,8 @@ bool MYSQL_BIN_LOG::flush_and_sync()
{
sync_binlog_counter= 0;
err=my_sync(fd, MYF(MY_WME));
+ if (synced)
+ *synced= 1;
}
return err;
}
@@ -3862,7 +3867,7 @@ MYSQL_BIN_LOG::flush_and_set_pending_row
if (file == &log_file)
{
- error= flush_and_sync();
+ error= flush_and_sync(0);
if (!error)
{
signal_update();
@@ -4048,14 +4053,21 @@ bool MYSQL_BIN_LOG::write(Log_event *eve
if (event_info->write(file))
goto err;
+ error=0;
if (file == &log_file) // we are writing to the real log (disk)
{
- if (flush_and_sync())
+ bool synced;
+ if (flush_and_sync(&synced))
goto err;
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, file->pos_in_file, synced))) {
+ goto err;
+ }
+
signal_update();
rotate_and_purge(RP_LOCK_LOG_IS_ALREADY_LOCKED);
}
- error=0;
err:
if (error)
@@ -4312,7 +4324,7 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE
DBUG_ASSERT(carry == 0);
if (sync_log)
- flush_and_sync();
+ flush_and_sync(0);
return 0; // All OK
}
@@ -4398,7 +4410,9 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
if (commit_event && commit_event->write(&log_file))
goto err;
- if (flush_and_sync())
+
+ bool synced;
+ if (flush_and_sync(&synced))
goto err;
DBUG_EXECUTE_IF("half_binlogged_transaction", DBUG_ABORT(););
if (cache->error) // Error on read
@@ -4407,6 +4421,11 @@ bool MYSQL_BIN_LOG::write(THD *thd, IO_C
write_error=1; // Don't give more errors
goto err;
}
+
+ if (RUN_HOOK(binlog_storage, after_flush,
+ (thd, log_file_name, log_file.pos_in_file, synced)))
+ goto err;
+
signal_update();
}
=== modified file 'sql/log.h'
--- a/sql/log.h 2008-06-27 20:56:54 +0000
+++ b/sql/log.h 2008-09-20 09:17:36 +0000
@@ -364,7 +364,21 @@ public:
bool is_active(const char* log_file_name);
int update_log_index(LOG_INFO* linfo, bool need_update_threads);
void rotate_and_purge(uint flags);
- bool flush_and_sync();
+
+ /**
+ Flush binlog cache and synchronize to disk.
+
+ This function flushes events in binlog cache to binary log file,
+ it will do synchronizing according to the setting of system
+ variable 'sync_binlog'. If file is synchronized, @c synced will
+ be set to 1, otherwise 0.
+
+ @param[out] synced if not NULL, set to 1 if file is synchronized, otherwise 0
+
+ @retval 0 Success
+ @retval other Failure
+ */
+ bool flush_and_sync(bool *synced);
int purge_logs(const char *to_log, bool included,
bool need_mutex, bool need_update_threads,
ulonglong *decrease_log_space);
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2008-08-26 15:01:13 +0000
+++ b/sql/mysqld.cc 2008-09-20 09:17:36 +0000
@@ -33,6 +33,8 @@
#include "rpl_injector.h"
+#include "rpl_handler.h"
+
#ifdef HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
#endif
@@ -1348,6 +1350,7 @@ void clean_up(bool print_message)
ha_end();
if (tc_log)
tc_log->close();
+ delegates_destroy();
xid_cache_free();
delete_elements(&key_caches, (void (*)(const char*, uchar*)) free_key_cache);
multi_keycache_free();
@@ -3905,6 +3908,13 @@ static int init_server_components()
unireg_abort(1);
}
+ /* initialize delegates for extension observers */
+ if (delegates_init())
+ {
+ sql_print_error("Initialize extension delegates failed");
+ unireg_abort(1);
+ }
+
/* need to configure logging before initializing storage engines */
if (opt_update_log)
{
=== modified file 'sql/slave.cc'
--- a/sql/slave.cc 2008-08-08 01:39:23 +0000
+++ b/sql/slave.cc 2008-09-20 09:17:36 +0000
@@ -40,6 +40,7 @@
#include <sql_common.h>
#include <errmsg.h>
#include <mysys_err.h>
+#include "rpl_handler.h"
#ifdef HAVE_REPLICATION
@@ -68,6 +69,8 @@ ulonglong relay_log_space_limit = 0;
int disconnect_slave_event_count = 0, abort_slave_event_count = 0;
int events_till_abort = -1;
+static pthread_key(Master_info*, RPL_MASTER_INFO);
+
enum enum_slave_reconnect_actions
{
SLAVE_RECON_ACT_REG= 0,
@@ -1500,17 +1503,22 @@ static int safe_sleep(THD* thd, int sec,
}
-static int request_dump(MYSQL* mysql, Master_info* mi,
- bool *suppress_warnings)
+static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
+ bool *suppress_warnings)
{
uchar buf[FN_REFLEN + 10];
int len;
- int binlog_flags = 0; // for now
+ ushort binlog_flags = 0; // for now
char* logname = mi->master_log_name;
DBUG_ENTER("request_dump");
*suppress_warnings= FALSE;
+ if (RUN_HOOK(binlog_relay_io,
+ before_request_transmit,
+ (thd, mi, binlog_flags)))
+ DBUG_RETURN(1);
+
// TODO if big log files: Change next to int8store()
int4store(buf, (ulong) mi->master_log_pos);
int2store(buf + 4, binlog_flags);
@@ -2135,6 +2143,12 @@ pthread_handler_t handle_slave_io(void *
mi->master_log_name,
llstr(mi->master_log_pos,llbuff)));
+ /* This must be called before run any binlog_relay_io hooks */
+ my_pthread_setspecific_ptr(RPL_MASTER_INFO, mi);
+
+ if (RUN_HOOK(binlog_relay_io, thread_start, (thd, mi)))
+ goto err;
+
if (!(mi->mysql = mysql = mysql_init(NULL)))
{
mi->report(ERROR_LEVEL, ER_SLAVE_FATAL_ERROR,
@@ -2209,7 +2223,7 @@ connected:
while (!io_slave_killed(thd,mi))
{
thd_proc_info(thd, "Requesting binlog dump");
- if (request_dump(mysql, mi, &suppress_warnings))
+ if (request_dump(thd, mysql, mi, &suppress_warnings))
{
sql_print_error("Failed on request_dump()");
if (check_io_slave_killed(thd, mi, "Slave I/O thread killed while \
@@ -2229,6 +2243,7 @@ requesting master dump") ||
goto err;
goto connected;
});
+ const char *event_buf;
while (!io_slave_killed(thd,mi))
{
@@ -2284,10 +2299,27 @@ Stopping slave I/O thread due to out-of-
retry_count=0; // ok event, reset retry counter
thd_proc_info(thd, "Queueing master event to the relay log");
- if (queue_event(mi,(const char*)mysql->net.read_pos + 1, event_len))
+ event_buf= (const char*)mysql->net.read_pos + 1;
+ if (RUN_HOOK(binlog_relay_io, after_read_event,
+ (thd, mi,(const char*)mysql->net.read_pos + 1,
+ event_len, &event_buf, &event_len)))
{
+ sql_print_error("Failed to run 'after_read_event' hook");
goto err;
}
+
+ /* XXX: 'synced' should be updated by queue_event to indicate
+ whether event has been synced to disk */
+ bool synced= 0;
+ if (queue_event(mi, event_buf, event_len))
+ {
+ goto err;
+ }
+
+ if (RUN_HOOK(binlog_relay_io, after_queue_event,
+ (thd, mi, event_buf, event_len, synced)))
+ goto err;
+
if (flush_master_info(mi, 1))
{
sql_print_error("Failed to flush master info file");
@@ -2305,6 +2337,7 @@ Stopping slave I/O thread due to out-of-
for no reason, but this function will do a clean read, notice the clean
value and exit immediately.
*/
+
#ifndef DBUG_OFF
{
char llbuf1[22], llbuf2[22];
@@ -2334,6 +2367,7 @@ err:
sql_print_information("Slave I/O thread exiting, read up to log '%s', position %s",
IO_RPL_LOG_NAME, llstr(mi->master_log_pos,llbuff));
pthread_mutex_lock(&LOCK_thread_count);
+ RUN_HOOK(binlog_relay_io, thread_stop, (thd, mi));
thd->query = thd->db = 0; // extra safety
thd->query_length= thd->db_length= 0;
pthread_mutex_unlock(&LOCK_thread_count);
@@ -3454,6 +3488,64 @@ static int safe_reconnect(THD* thd, MYSQ
}
+MYSQL *rpl_connect_master(MYSQL *mysql)
+{
+ THD *thd= current_thd;
+ Master_info *mi= my_pthread_getspecific_ptr(Master_info*, RPL_MASTER_INFO);
+ if (!mi)
+ {
+ sql_print_error("'rpl_connect_master' must be called in slave I/O thread context.");
+ return NULL;
+ }
+
+ bool allocated= false;
+
+ if (!mysql)
+ {
+ if(!(mysql= mysql_init(NULL)))
+ return NULL;
+ allocated= true;
+ }
+
+ /*
+ XXX: copied from connect_to_master, this function should not
+ change the slave status, so we cannot use connect_to_master
+ directly
+
+ TODO: make this part a seperate function to eliminate duplication
+ */
+ mysql_options(mysql, MYSQL_OPT_CONNECT_TIMEOUT, (char *) &slave_net_timeout);
+ mysql_options(mysql, MYSQL_OPT_READ_TIMEOUT, (char *) &slave_net_timeout);
+
+#ifdef HAVE_OPENSSL
+ if (mi->ssl)
+ {
+ mysql_ssl_set(mysql,
+ mi->ssl_key[0]?mi->ssl_key:0,
+ mi->ssl_cert[0]?mi->ssl_cert:0,
+ mi->ssl_ca[0]?mi->ssl_ca:0,
+ mi->ssl_capath[0]?mi->ssl_capath:0,
+ mi->ssl_cipher[0]?mi->ssl_cipher:0);
+ mysql_options(mysql, MYSQL_OPT_SSL_VERIFY_SERVER_CERT,
+ &mi->ssl_verify_server_cert);
+ }
+#endif
+
+ mysql_options(mysql, MYSQL_SET_CHARSET_NAME, default_charset_info->csname);
+ /* This one is not strictly needed but we have it here for completeness */
+ mysql_options(mysql, MYSQL_SET_CHARSET_DIR, (char *) charsets_dir);
+
+ if (io_slave_killed(thd, mi)
+ || !mysql_real_connect(mysql, mi->host, mi->user, mi->password, 0,
+ mi->port, 0, 0))
+ {
+ if (allocated)
+ mysql_close(mysql); // this will free the object
+ return NULL;
+ }
+ return mysql;
+}
+
/*
Store the file and position where the execute-slave thread are in the
relay log.
=== modified file 'sql/sql_parse.cc'
--- a/sql/sql_parse.cc 2008-09-05 14:16:07 +0000
+++ b/sql/sql_parse.cc 2008-09-20 09:17:36 +0000
@@ -21,6 +21,7 @@
#include <m_ctype.h>
#include <myisam.h>
#include <my_dir.h>
+#include "rpl_handler.h"
#include "sp_head.h"
#include "sp.h"
=== modified file 'sql/sql_plugin.cc'
--- a/sql/sql_plugin.cc 2008-08-19 15:58:48 +0000
+++ b/sql/sql_plugin.cc 2008-09-20 09:17:36 +0000
@@ -20,14 +20,6 @@
#define REPORT_TO_LOG 1
#define REPORT_TO_USER 2
-#ifdef DBUG_OFF
-#define plugin_ref_to_int(A) A
-#define plugin_int_to_ref(A) A
-#else
-#define plugin_ref_to_int(A) (A ? A[0] : NULL)
-#define plugin_int_to_ref(A) &(A)
-#endif
-
extern struct st_mysql_plugin *mysqld_builtins[];
char *opt_plugin_load= NULL;
@@ -44,7 +36,8 @@ const LEX_STRING plugin_type_names[MYSQL
{ C_STRING_WITH_LEN("FTPARSER") },
{ C_STRING_WITH_LEN("DAEMON") },
{ C_STRING_WITH_LEN("INFORMATION SCHEMA") },
- { C_STRING_WITH_LEN("AUDIT") }
+ { C_STRING_WITH_LEN("AUDIT") },
+ { C_STRING_WITH_LEN("REPLICATION") },
};
extern int initialize_schema_table(st_plugin_int *plugin);
@@ -89,7 +82,8 @@ static int min_plugin_info_interface_ver
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
- MYSQL_AUDIT_INTERFACE_VERSION
+ MYSQL_AUDIT_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION
};
static int cur_plugin_info_interface_version[MYSQL_MAX_PLUGIN_TYPE_NUM]=
{
@@ -98,7 +92,8 @@ static int cur_plugin_info_interface_ver
MYSQL_FTPARSER_INTERFACE_VERSION,
MYSQL_DAEMON_INTERFACE_VERSION,
MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION,
- MYSQL_AUDIT_INTERFACE_VERSION
+ MYSQL_AUDIT_INTERFACE_VERSION,
+ MYSQL_REPLICATION_INTERFACE_VERSION
};
static bool initialized= 0;
=== modified file 'sql/sql_plugin.h'
--- a/sql/sql_plugin.h 2008-06-27 20:56:54 +0000
+++ b/sql/sql_plugin.h 2008-09-20 09:17:36 +0000
@@ -18,6 +18,14 @@
class sys_var;
+#ifdef DBUG_OFF
+#define plugin_ref_to_int(A) A
+#define plugin_int_to_ref(A) A
+#else
+#define plugin_ref_to_int(A) (A ? A[0] : NULL)
+#define plugin_int_to_ref(A) &(A)
+#endif
+
/*
the following flags are valid for plugin_init()
*/
=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc 2008-09-04 18:30:34 +0000
+++ b/sql/sql_repl.cc 2008-09-20 09:17:36 +0000
@@ -21,6 +21,7 @@
#include "log_event.h"
#include "rpl_filter.h"
#include <my_dir.h>
+#include "rpl_handler.h"
int max_binlog_dump_events = 0; // unlimited
my_bool opt_sporadic_binlog_dump_fail = 0;
@@ -378,11 +379,36 @@ static int send_heartbeat_event(NET* net
{
DBUG_RETURN(-1);
}
- packet->set("\0", 1, &my_charset_bin);
DBUG_RETURN(0);
}
/*
+ Reset thread transmit packet buffer for event sending
+
+ This function allocates header bytes for event transmission, and
+ should be called before store the event data to the packet buffer.
+*/
+static int reset_transmit_packet(THD *thd, ushort flags,
+ ulong *ev_offset, const char **errmsg)
+{
+ int ret= 0;
+ String *packet= &thd->packet;
+
+ /* reserve and set default header */
+ packet->length(0);
+ packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, reserve_header, (thd, flags, packet)))
+ {
+ *errmsg= "Failed to run hook 'reserve_header'";
+ my_errno= ER_UNKNOWN_ERROR;
+ ret= 1;
+ }
+ *ev_offset= packet->length();
+ return ret;
+}
+
+/*
TODO: Clean up loop to only have one call to send_file()
*/
@@ -392,6 +418,9 @@ void mysql_binlog_send(THD* thd, char* l
LOG_INFO linfo;
char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN], *name;
+
+ ulong ev_offset;
+
IO_CACHE log;
File file = -1;
String* packet = &thd->packet;
@@ -407,6 +436,9 @@ void mysql_binlog_send(THD* thd, char* l
DBUG_PRINT("enter",("log_ident: '%s' pos: %ld", log_ident, (long) pos));
bzero((char*) &log,sizeof(log));
+ sql_print_information("Start binlog_dump to slave_server(%d), pos(%s, %lu)",
+ thd->server_id, log_ident, (ulong)pos);
+
/*
heartbeat_period from @master_heartbeat_period user variable
*/
@@ -423,6 +455,13 @@ void mysql_binlog_send(THD* thd, char* l
coord->file_name= log_file_name; // initialization basing on what slave remembers
coord->pos= pos;
}
+ if (RUN_HOOK(binlog_transmit, transmit_start, (thd, flags, log_ident, pos)))
+ {
+ errmsg= "Failed to run hook 'transmit_start'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
#ifndef DBUG_OFF
if (opt_sporadic_binlog_dump_fail && (binlog_dump_count++ % 2))
{
@@ -477,11 +516,9 @@ impossible position";
goto err;
}
- /*
- We need to start a packet with something other than 255
- to distinguish it from error
- */
- packet->set("\0", 1, &my_charset_bin); /* This is the start of a new packet */
+ /* reset transmit packet for the fake rotate event below */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
/*
Tell the client about the log name with a fake Rotate event;
@@ -521,7 +558,7 @@ impossible position";
my_errno= ER_MASTER_FATAL_ERROR_READING_BINLOG;
goto err;
}
- packet->set("\0", 1, &my_charset_bin);
+
/*
Adding MAX_LOG_EVENT_HEADER_LEN, since a binlog event can become
this larger than the corresponding packet (query) sent
@@ -537,6 +574,11 @@ impossible position";
log_lock = mysql_bin_log.get_log_lock();
if (pos > BIN_LOG_HEADER_SIZE)
{
+ /* reset transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Try to find a Format_description_log_event at the beginning of
the binlog
@@ -545,28 +587,28 @@ impossible position";
{
/*
The packet has offsets equal to the normal offsets in a binlog
- event +1 (the first character is \0).
+ event + ev_offset (the first character is \0).
*/
DBUG_PRINT("info",
("Looked for a Format_description_log_event, found event type %d",
- (*packet)[EVENT_TYPE_OFFSET+1]));
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ (*packet)[EVENT_TYPE_OFFSET+ev_offset]));
+ if ((*packet)[EVENT_TYPE_OFFSET+ev_offset] == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
/*
mark that this event with "log_pos=0", so the slave
should not increment master's binlog position
(rli->group_master_log_pos)
*/
- int4store((char*) packet->ptr()+LOG_POS_OFFSET+1, 0);
+ int4store((char*) packet->ptr()+LOG_POS_OFFSET+ev_offset, 0);
/*
if reconnect master sends FD event with `created' as 0
to avoid destroying temp tables.
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
- ST_CREATED_OFFSET+1, (ulong) 0);
+ ST_CREATED_OFFSET+ev_offset, (ulong) 0);
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -592,8 +634,6 @@ impossible position";
Format_description_log_event will be found naturally if it is written.
*/
}
- /* reset the packet as we wrote to it in any case */
- packet->set("\0", 1, &my_charset_bin);
} /* end of if (pos > BIN_LOG_HEADER_SIZE); */
else
{
@@ -605,6 +645,12 @@ impossible position";
while (!net->error && net->vio != 0 && !thd->killed)
{
+ Log_event_type event_type= UNKNOWN_EVENT;
+
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{
#ifndef DBUG_OFF
@@ -620,17 +666,26 @@ impossible position";
log's filename does not change while it's active
*/
if (coord)
- coord->pos= uint4korr(packet->ptr() + 1 + LOG_POS_OFFSET);
+ coord->pos= uint4korr(packet->ptr() + ev_offset + LOG_POS_OFFSET);
- if ((*packet)[EVENT_TYPE_OFFSET+1] == FORMAT_DESCRIPTION_EVENT)
+ event_type= (Log_event_type)((*packet)[LOG_EVENT_OFFSET+ev_offset]);
+ if (event_type == FORMAT_DESCRIPTION_EVENT)
{
- binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+1] &
+ binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
- (*packet)[FLAGS_OFFSET+1] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+ (*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
}
- else if ((*packet)[EVENT_TYPE_OFFSET+1] == STOP_EVENT)
+ else if (event_type == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
errmsg = "Failed on my_net_write()";
@@ -638,9 +693,8 @@ impossible position";
goto err;
}
- DBUG_PRINT("info", ("log event code %d",
- (*packet)[LOG_EVENT_OFFSET+1] ));
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ DBUG_PRINT("info", ("log event code %d", event_type));
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -649,7 +703,17 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ errmsg= "Failed to run hook 'after_send_event'";
+ my_errno= ER_UNKNOWN_ERROR;
+ goto err;
+ }
+
+ /* reset transmit packet for next loop */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
}
/*
@@ -700,6 +764,11 @@ impossible position";
}
#endif
+ /* reset the transmit packet for the event read from binary log
+ file */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
No one will update the log while we are reading
now, but we'll be quick and just read one record
@@ -753,6 +822,9 @@ impossible position";
sql_print_information("the rest of heartbeat info skipped ...");
}
#endif
+ /* reset transmit packet for the heartbeat event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
if (send_heartbeat_event(net, packet, coord))
{
errmsg = "Failed on my_net_write()";
@@ -778,8 +850,17 @@ impossible position";
}
if (read_packet)
- {
- thd_proc_info(thd, "Sending binlog event to slave");
+ {
+ thd_proc_info(thd, "Sending binlog event to slave");
+ pos = my_b_tell(&log);
+ if (RUN_HOOK(binlog_transmit, before_send_event,
+ (thd, flags, packet, log_file_name, pos)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "run 'before_send_event' hook failed";
+ goto err;
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
@@ -787,7 +868,7 @@ impossible position";
goto err;
}
- if ((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
+ if (event_type == LOAD_EVENT)
{
if (send_file(thd))
{
@@ -796,11 +877,13 @@ impossible position";
goto err;
}
}
- packet->set("\0", 1, &my_charset_bin);
- /*
- No need to net_flush because we will get to flush later when
- we hit EOF pretty quick
- */
+
+ if (RUN_HOOK(binlog_transmit, after_send_event, (thd, flags, packet)))
+ {
+ my_errno= ER_UNKNOWN_ERROR;
+ errmsg= "Failed to run hook 'after_send_event'";
+ goto err;
+ }
}
if (fatal_error)
@@ -836,6 +919,10 @@ impossible position";
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
+ /* reset transmit packet for the possible fake rotate event */
+ if (reset_transmit_packet(thd, flags, &ev_offset, &errmsg))
+ goto err;
+
/*
Call fake_rotate_event() in case the previous log (the one which
we have just finished reading) did not contain a Rotate event
@@ -853,8 +940,6 @@ impossible position";
goto err;
}
- packet->length(0);
- packet->append('\0');
if (coord)
coord->file_name= log_file_name; // reset to the next
}
@@ -864,6 +949,7 @@ end:
end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
my_eof(thd);
thd_proc_info(thd, "Waiting to finalize termination");
pthread_mutex_lock(&LOCK_thread_count);
@@ -874,6 +960,7 @@ end:
err:
thd_proc_info(thd, "Waiting to finalize termination");
end_io_cache(&log);
+ RUN_HOOK(binlog_transmit, transmit_stop, (thd, flags));
/*
Exclude iteration through thread list
this is needed for purge_logs() - it will iterate through
@@ -1134,6 +1221,7 @@ int reset_slave(THD *thd, Master_info* m
goto err;
}
+ RUN_HOOK(binlog_relay_io, after_reset_slave, (thd, mi));
err:
unlock_slave_threads(mi);
if (error)
@@ -1416,7 +1504,11 @@ int reset_master(THD* thd)
ER(ER_FLUSH_MASTER_BINLOG_CLOSED), MYF(ME_BELL+ME_WAITTANG));
return 1;
}
- return mysql_bin_log.reset_logs(thd);
+
+ if (mysql_bin_log.reset_logs(thd))
+ return 1;
+ RUN_HOOK(binlog_transmit, after_reset_master, (thd, 0 /* flags */));
+ return 0;
}
int cmp_master_pos(const char* log_file_name1, ulonglong log_pos1,
@@ -1926,5 +2018,3 @@ int init_replication_sys_vars()
}
#endif /* HAVE_REPLICATION */
-
-
=== modified file 'sql/transaction.cc'
--- a/sql/transaction.cc 2008-08-12 22:30:55 +0000
+++ b/sql/transaction.cc 2008-09-20 09:17:36 +0000
@@ -20,6 +20,7 @@
#include "transaction.h"
#include "mysql_priv.h"
+#include "rpl_handler.h"
#ifdef WITH_MARIA_STORAGE_ENGINE
#include "../storage/maria/ha_maria.h"
@@ -192,6 +193,13 @@ bool trans_commit_stmt(THD *thd)
int res= FALSE;
if (thd->transaction.stmt.ha_list)
res= ha_commit_trans(thd, FALSE);
+ else
+ {
+ /* call hooks for statement commit of non-transactional tables,
+ for transactional tables, the hooks will be called in
+ ha_commit_trans */
+ RUN_HOOK(transaction, after_commit, (thd, FALSE));
+ }
DBUG_RETURN(test(res));
}
@@ -215,6 +223,13 @@ bool trans_rollback_stmt(THD *thd)
if (thd->transaction_rollback_request && !thd->in_sub_stmt)
ha_rollback_trans(thd, TRUE);
}
+ else
+ {
+ /* call hooks for statement rollback of non-transactional tables,
+ for transactional tables, the hooks will be called in
+ ha_rollback_trans */
+ RUN_HOOK(transaction, after_rollback, (thd, FALSE));
+ }
DBUG_RETURN(FALSE);
}