#At file:///home/lsoares/Workspace/mysql-server/features/checksum/6.0-rpl/ based on revid:zhenxing.he@stripped
2820 Luis Soares 2009-04-27
WL#2540: Replication event checksums
BIG FAT WARNING
===============
This is a *WORK IN PROGRESS* checkpoint commit and a sketch for log
event checksum implementation.
DESCRIPTION
===========
Events are written to the binary log as they are executed. They are
then sent to the slave on an event-by-event basis. On its journey to
the slave, an event is written/read to/from disk and to/from socket
(perhaps even several times). During this, an event may get corrupted
and there is no procedure at the application level (mysql server /
mysqlbinlog) to detect these kind of faults.
This patch addresses this problem by adding log event checksums. On
every event serialization a CRC32 number is appended at the end and
the length of the event is updated. Conversely, whenever an event is
read the CRC32 number is recomputed using the current buffer and
compared against the CRC32 got from the buffer. Should they differ,
then corruption is found and an error is returned. This check is
performed at two locations:
i) queue_event - (check for network I/O corruption);
ii) read_log_event - (check for disk I/O corruption, both on master
and slave while reading from the binlog by dump and I/O thread
respectively);
This feature augments the server with yet another option
(--binlog-crc) which is not activated by default.
DISCLAIMER
==========
This commit is *EXPERIMENTAL STATUS* and stands for *WORK IN
PROGRESS*. Its purpose is to report the SotA on the work done and to
get feedback from mysql replication team.
TODO
====
1. MTR is 94% reports successul with checksums enabled. Most of the
failures are due to result files with invalid positions (because
of extra bytes for CRC). However, I need to check all failures.
2. Put some effort on code structuring.
3. Correct the scope of some functions and variables in Log_event
(eg, wrapper_my_b_safe_write could probably be private instead of
public scope, or even not an object method but rather a static
function on log_event.cc).
4. Support for CRC64 (perhaps?).
5. Implement multiple slave version awareness (perhaps?).
@ client/Makefile.am
Added checksum.c to Makefile.
@ mysql-test/include/show_binlog_events.inc
'SHOW BINLOG EVENTS FROM $pos' now needs to take into account whether
checksums are used or not, when computing the default init '$pos'.
@ mysql-test/include/show_binlog_events2.inc
'SHOW BINLOG EVENTS FROM $pos' now needs to take into account whether
checksums are used or not, when computing the default init '$pos'.
@ sql/log.cc
Changed write_cache so that it fixes the checksum when relative
positions are set to absolute.
@ sql/log_event.cc
Changed log events write procedure and added code relative to checksum
checking. Major changes include:
i) write_header turns on CRC flags and changes length
conditionally, whether the binlog crc flag is on or not;
ii) ::write(...) is implemented as write_header && write_body &&
write_footer. write_header and write_body functions, compute
aggregated checksums. The write_footer function appends the
computed checksum to the file.
iii) Calls to my_b_safe_write are intercepted
(Log_event::wrapper_my_b_safe_write) and before writing with
my_b_safe_write, crc computation takes place (if binlog crc
option is set);
iv) static write_str function gets an IN/OUT parameter (ha_checksum
*crc), so that it computes the CRC inside;
v) Changed write_data so that it takes an IN/OUT parameter
(ha_checksum *crc), so that it hands the aggregated CRC to
write_str inside;
vi) Added conditional checksum check to
Log_event::read_log_event. The check only takes place if event
being read is marked with checksum flag.
vii) Log_event::read_log_event truncates event length to
event_length-BINLOG_CRC_LEN before instanciating new events;
viii) crc field of log event read is set to the value read from the
buffer. This is useful for mysqlbinlog printing the CRC#.
iv) Added printing of CRC# for mysqlbinlog.
@ sql/log_event.h
Changes related to log events and CRCs. These include:
i) Added binlog crc length macro (BINLOG_CRC_LEN);
ii) Added event header CRC flag (LOG_EVENT_CRC_F);
iii) Added field crc to log events;
iv) Added declaration of Log_event::write_footer and
Log_event::wrapper_my_b_safe_write;
v) Changed implementation of Log_event::write so that it includes
the write_footer.
vi) Exported the function event_checksum_test so that it is used on
another module, slave.cc (queue_event).
@ sql/mysql_priv.h
Added declaration of external variable (option) for activating binlog
crc.
@ sql/mysqld.cc
Added binlog-crc to server options.
@ sql/share/errmsg.txt
Added two errors/error messages:
i) ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE
ii) ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE
@ sql/slave.cc
Added checksum verification to queue_event. This provides means to
detect network checksum failures. Log_event::read_log_event will catch
disk I/O failures.
Strip CRC length (if event has CRC flag on) while creating an instance
from the network received buffer for the Rotate_log_event.
@ sql/sql_repl.cc
Three changes in this file:
i) Checksum fixing on mysql_binlog_send because, some flags in the
header file for some events are update on the fly here;
ii) Added calculation of CRC for fake rotate events
(fake_rotate_event).
iii) Added sys_opt_binlog_crc.
modified:
client/Makefile.am
mysql-test/include/show_binlog_events.inc
mysql-test/include/show_binlog_events2.inc
sql/log.cc
sql/log_event.cc
sql/log_event.h
sql/mysql_priv.h
sql/mysqld.cc
sql/share/errmsg.txt
sql/slave.cc
sql/sql_repl.cc
=== modified file 'client/Makefile.am'
--- a/client/Makefile.am 2009-02-11 12:11:20 +0000
+++ b/client/Makefile.am 2009-04-27 10:34:54 +0000
@@ -63,7 +63,8 @@ mysqlbinlog_SOURCES = mysqlbinlog.cc \
$(top_srcdir)/mysys/my_bit.c \
$(top_srcdir)/mysys/my_bitmap.c \
$(top_srcdir)/mysys/my_vle.c \
- $(top_srcdir)/mysys/base64.c
+ $(top_srcdir)/mysys/base64.c \
+ $(top_srcdir)/mysys/checksum.c
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
mysqldump_SOURCES= mysqldump.c \
=== modified file 'mysql-test/include/show_binlog_events.inc'
--- a/mysql-test/include/show_binlog_events.inc 2008-09-06 07:22:50 +0000
+++ b/mysql-test/include/show_binlog_events.inc 2009-04-27 10:34:54 +0000
@@ -2,8 +2,19 @@
if (!$binlog_start)
{
- let $binlog_start=107;
+ let $binlog_crc= `select variable_value='ON' from information_schema.GLOBAL_VARIABLES where variable_name='binlog_crc';`;
+
+ if($binlog_crc)
+ {
+ let $binlog_start=111;
+ }
+
+ if(!$binlog_crc)
+ {
+ let $binlog_start=107;
+ }
}
+
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR $binlog_start <binlog_start>
--replace_column 2 # 4 # 5 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/ /file_id=[0-9]+/file_id=#/ /block_len=[0-9]+/block_len=#/
=== modified file 'mysql-test/include/show_binlog_events2.inc'
--- a/mysql-test/include/show_binlog_events2.inc 2008-02-05 14:30:44 +0000
+++ b/mysql-test/include/show_binlog_events2.inc 2009-04-27 10:34:54 +0000
@@ -1,4 +1,18 @@
---let $binlog_start=107
+if (!$binlog_start)
+{
+ let $binlog_crc= `select variable_value='ON' from information_schema.GLOBAL_VARIABLES where variable_name='binlog_crc';`;
+
+ if($binlog_crc)
+ {
+ let $binlog_start=111;
+ }
+
+ if(!$binlog_crc)
+ {
+ let $binlog_start=107;
+ }
+}
+
--replace_result $binlog_start <binlog_start>
--replace_column 2 # 5 #
--replace_regex /\/\* xid=.* \*\//\/* XID *\// /table_id: [0-9]+/table_id: #/
=== modified file 'sql/log.cc'
--- a/sql/log.cc 2009-02-27 13:20:11 +0000
+++ b/sql/log.cc 2009-04-27 10:34:54 +0000
@@ -6034,6 +6034,20 @@ uint MYSQL_BIN_LOG::next_file_id()
return res;
}
+static void fix_log_event_crc(IO_CACHE *cache, uint hdr_offs)
+{
+ uint16 flags= uint2korr(cache->read_pos + FLAGS_OFFSET);
+
+ if ((flags & LOG_EVENT_CRC_F) == LOG_EVENT_CRC_F)
+ {
+ uint event_length= uint4korr(cache->read_pos + hdr_offs + EVENT_LEN_OFFSET);
+ uchar *event_begin= cache->read_pos + hdr_offs;
+ uchar *event_crc_pos= (event_begin + event_length) - BINLOG_CRC_LEN;
+ ha_checksum crc= my_checksum(0L, NULL, 0);
+ crc= my_checksum(crc, event_begin, (event_length - BINLOG_CRC_LEN));
+ int4store(event_crc_pos, crc);
+ }
+}
/*
Write the contents of a cache to the binary log.
@@ -6094,6 +6108,10 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE
val= uint4korr(&header[LOG_POS_OFFSET]) + group;
int4store(&header[LOG_POS_OFFSET], val);
+ /* fix crc */
+ if (opt_binlog_crc)
+ fix_log_event_crc(cache, hdr_offs);
+
/* write the first half of the split header */
if (my_b_write(&log_file, header, carry))
return ER_ERROR_ON_WRITE;
@@ -6144,6 +6162,10 @@ int MYSQL_BIN_LOG::write_cache(IO_CACHE
val= uint4korr(log_pos) + group;
int4store(log_pos, val);
+ /* fix CRC */
+ if (opt_binlog_crc)
+ fix_log_event_crc(cache, hdr_offs);
+
/* next event header at ... */
log_pos= (uchar *)cache->read_pos + hdr_offs + EVENT_LEN_OFFSET;
hdr_offs += uint4korr(log_pos);
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2009-02-16 21:18:45 +0000
+++ b/sql/log_event.cc 2009-04-27 10:34:54 +0000
@@ -418,10 +418,21 @@ static void cleanup_load_tmpdir()
write_str()
*/
-static bool write_str(IO_CACHE *file, const char *str, uint length)
+static bool write_str(IO_CACHE *file, const char *str, uint length, ha_checksum* crc)
{
uchar tmp[1];
tmp[0]= (uchar) length;
+
+#ifndef MYSQL_CLIENT
+
+ /* update the crc */
+ if (opt_binlog_crc)
+ {
+ *crc= my_checksum(*crc, (uchar*)tmp, 1);
+ *crc= my_checksum(*crc, (uchar*)str, length);
+ }
+#endif
+
return (my_b_safe_write(file, tmp, sizeof(tmp)) ||
my_b_safe_write(file, (uchar*) str, length));
}
@@ -515,6 +526,31 @@ static void print_set_option(IO_CACHE* f
}
#endif
+bool event_checksum_test(uchar *buf, ulong event_len)
+{
+ uint16 flags= uint2korr(buf + FLAGS_OFFSET);
+ bool res= FALSE;
+
+ if ((flags & LOG_EVENT_CRC_F) == LOG_EVENT_CRC_F)
+ {
+ ha_checksum incomming= uint4korr(buf + (event_len - BINLOG_CRC_LEN));
+ ha_checksum computed= my_checksum(0L, NULL, 0);
+
+ /* checksum the event contents without the CRC part */
+ computed= my_checksum(computed, (const uchar*) buf, (event_len - BINLOG_CRC_LEN));
+
+ res= !(computed == incomming);
+ }
+#ifndef MYSQL_CLIENT
+ else
+ {
+ if (opt_binlog_crc)
+ sql_print_warning("Event of type %d without CRC flags set!", buf[EVENT_TYPE_OFFSET]);
+ }
+#endif
+
+ return res;
+}
/**************************************************************************
Log_event methods (= the parent class of all events)
**************************************************************************/
@@ -569,7 +605,7 @@ const char* Log_event::get_type_str()
#ifndef MYSQL_CLIENT
Log_event::Log_event(THD* thd_arg, uint16 flags_arg, bool using_trans)
- :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), thd(thd_arg)
+ :log_pos(0), temp_buf(0), exec_time(0), flags(flags_arg), crc(0), thd(thd_arg)
{
server_id= thd->server_id;
when= thd->start_time;
@@ -585,7 +621,7 @@ Log_event::Log_event(THD* thd_arg, uint1
*/
Log_event::Log_event()
- :temp_buf(0), exec_time(0), flags(0), cache_stmt(0),
+ :temp_buf(0), exec_time(0), flags(0), cache_stmt(0), crc(0),
thd(0)
{
server_id= ::server_id;
@@ -605,7 +641,7 @@ Log_event::Log_event()
Log_event::Log_event(const char* buf,
const Format_description_log_event* description_event)
- :temp_buf(0), cache_stmt(0)
+ :temp_buf(0), cache_stmt(0), crc(0)
{
#ifndef MYSQL_CLIENT
thd = 0;
@@ -791,6 +827,27 @@ void Log_event::init_show_field_list(Lis
field_list->push_back(new Item_empty_string("Info", 20));
}
+bool Log_event::wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong size)
+{
+ if (opt_binlog_crc)
+ crc= my_checksum(crc, buf, size);
+
+ return my_b_safe_write(file, buf, size);
+}
+
+bool Log_event::write_footer(IO_CACHE* file)
+{
+ /* footer contains: i) CRC */
+ if (opt_binlog_crc)
+ {
+ uchar buf[BINLOG_CRC_LEN];
+ int4store(buf, crc);
+
+ return my_b_safe_write(file, (uchar*) buf, sizeof(buf));
+ }
+
+ return 0;
+}
/*
Log_event::write()
@@ -805,6 +862,13 @@ bool Log_event::write_header(IO_CACHE* f
/* Store number of bytes that will be written by this event */
data_written= event_data_length + sizeof(header);
+ if (opt_binlog_crc)
+ {
+ crc= my_checksum(0L, NULL, 0);
+ data_written += BINLOG_CRC_LEN;
+ flags = flags | LOG_EVENT_CRC_F;
+ }
+
/*
log_pos != 0 if this is relay-log event. In this case we should not
change the position
@@ -863,9 +927,11 @@ bool Log_event::write_header(IO_CACHE* f
int4store(header+ SERVER_ID_OFFSET, server_id);
int4store(header+ EVENT_LEN_OFFSET, data_written);
int4store(header+ LOG_POS_OFFSET, log_pos);
- int2store(header+ FLAGS_OFFSET, flags);
+ int2store(header+ FLAGS_OFFSET, flags);
+
+ /* writing to the binlog, we must crc it */
+ DBUG_RETURN(wrapper_my_b_safe_write(file, header, sizeof(header)) != 0);
- DBUG_RETURN(my_b_safe_write(file, header, sizeof(header)) != 0);
}
@@ -1066,6 +1132,7 @@ Log_event* Log_event::read_log_event(con
const Format_description_log_event *description_event)
{
Log_event* ev;
+ uint16 flags= uint2korr(buf + FLAGS_OFFSET);
DBUG_ENTER("Log_event::read_log_event(char*,...)");
DBUG_ASSERT(description_event != 0);
DBUG_PRINT("info", ("binlog_version: %d", description_event->binlog_version));
@@ -1081,6 +1148,28 @@ Log_event* Log_event::read_log_event(con
}
uint event_type= buf[EVENT_TYPE_OFFSET];
+
+ /*
+ CRC verification.
+ */
+ if (event_checksum_test((uchar *)buf, event_len))
+ {
+#ifdef MYSQL_CLIENT
+ *error= "Event crc check failed! Most likely there is event corruption.";
+ if (force_opt)
+ {
+ ev= new Unknown_log_event(buf, description_event);
+ DBUG_RETURN(ev);
+ }
+ else
+ DBUG_RETURN(NULL);
+#else
+ *error= ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE);
+ sql_print_error("%s", ER(ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE));
+ DBUG_RETURN(NULL);
+#endif
+ }
+
if (event_type > description_event->number_of_event_types &&
event_type != FORMAT_DESCRIPTION_EVENT)
{
@@ -1119,6 +1208,9 @@ Log_event* Log_event::read_log_event(con
event_type= description_event->event_type_permutation[event_type];
}
+ if ((flags & LOG_EVENT_CRC_F) == LOG_EVENT_CRC_F)
+ event_len= event_len - BINLOG_CRC_LEN;
+
switch(event_type) {
case QUERY_EVENT:
ev = new Query_log_event(buf, event_len, description_event, QUERY_EVENT);
@@ -1210,6 +1302,9 @@ Log_event* Log_event::read_log_event(con
}
}
+ if ((flags & LOG_EVENT_CRC_F) == LOG_EVENT_CRC_F)
+ ev->crc= uint4korr(buf + (event_len));
+
DBUG_PRINT("read_event", ("%s(type_code: %d; event_len: %d)",
ev ? ev->get_type_str() : "<unknown>",
buf[EVENT_TYPE_OFFSET],
@@ -1262,6 +1357,11 @@ void Log_event::print_header(IO_CACHE* f
my_b_printf(file, " server id %d end_log_pos %s ", server_id,
llstr(log_pos,llbuff));
+ /* print the checksum */
+ if (uint2korr(temp_buf+FLAGS_OFFSET) & LOG_EVENT_CRC_F)
+ my_b_printf(file, " CRC# %lu ", crc);
+
+
/* mysqlbinlog --hexdump */
if (print_event_info->hexdump_from)
{
@@ -2232,12 +2332,13 @@ bool Query_log_event::write(IO_CACHE* fi
event_length= (uint) (start-buf) + get_post_header_size_for_derived() + db_len + 1 + q_len;
return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf, QUERY_HEADER_LEN) ||
write_post_header_for_derived(file) ||
- my_b_safe_write(file, (uchar*) start_of_status,
+ wrapper_my_b_safe_write(file, (uchar*) start_of_status,
(uint) (start-start_of_status)) ||
- my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
- my_b_safe_write(file, (uchar*) query, q_len)) ? 1 : 0;
+ wrapper_my_b_safe_write(file, (db) ? (uchar*) db : (uchar*)"", db_len + 1) ||
+ wrapper_my_b_safe_write(file, (uchar*) query, q_len) ||
+ write_footer(file)) ? 1 : 0;
}
/**
@@ -3358,7 +3459,8 @@ bool Start_log_event_v3::write(IO_CACHE*
created= when= get_time();
int4store(buff + ST_CREATED_OFFSET,created);
return (write_header(file, sizeof(buff)) ||
- my_b_safe_write(file, (uchar*) buff, sizeof(buff)));
+ wrapper_my_b_safe_write(file, (uchar*) buff, sizeof(buff)) ||
+ write_footer(file));
}
#endif
@@ -3761,7 +3863,8 @@ bool Format_description_log_event::write
memcpy((char*) buff+ST_COMMON_HEADER_LEN_OFFSET+1, (uchar*) post_header_len,
LOG_EVENT_TYPES);
return (write_header(file, sizeof(buff)) ||
- my_b_safe_write(file, buff, sizeof(buff)));
+ wrapper_my_b_safe_write(file, buff, sizeof(buff)) ||
+ write_footer(file));
}
#endif
@@ -4036,7 +4139,7 @@ bool Load_log_event::write_data_header(I
buf[L_TBL_LEN_OFFSET] = (char)table_name_len;
buf[L_DB_LEN_OFFSET] = (char)db_len;
int4store(buf + L_NUM_FIELDS_OFFSET, num_fields);
- return my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0;
+ return wrapper_my_b_safe_write(file, (uchar*)buf, LOAD_HEADER_LEN) != 0;
}
@@ -4046,17 +4149,17 @@ bool Load_log_event::write_data_header(I
bool Load_log_event::write_data_body(IO_CACHE* file)
{
- if (sql_ex.write_data(file))
+ if (sql_ex.write_data(file, &crc))
return 1;
if (num_fields && fields && field_lens)
{
- if (my_b_safe_write(file, (uchar*)field_lens, num_fields) ||
- my_b_safe_write(file, (uchar*)fields, field_block_len))
+ if (wrapper_my_b_safe_write(file, (uchar*)field_lens, num_fields) ||
+ wrapper_my_b_safe_write(file, (uchar*)fields, field_block_len))
return 1;
}
- return (my_b_safe_write(file, (uchar*)table_name, table_name_len + 1) ||
- my_b_safe_write(file, (uchar*)db, db_len + 1) ||
- my_b_safe_write(file, (uchar*)fname, fname_len));
+ return (wrapper_my_b_safe_write(file, (uchar*)table_name, table_name_len + 1) ||
+ wrapper_my_b_safe_write(file, (uchar*)db, db_len + 1) ||
+ wrapper_my_b_safe_write(file, (uchar*)fname, fname_len));
}
@@ -4752,8 +4855,9 @@ bool Rotate_log_event::write(IO_CACHE* f
char buf[ROTATE_HEADER_LEN];
int8store(buf + R_POS_OFFSET, pos);
return (write_header(file, ROTATE_HEADER_LEN + ident_len) ||
- my_b_safe_write(file, (uchar*)buf, ROTATE_HEADER_LEN) ||
- my_b_safe_write(file, (uchar*)new_log_ident, (uint) ident_len));
+ wrapper_my_b_safe_write(file, (uchar*)buf, ROTATE_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*)new_log_ident, (uint) ident_len)||
+ write_footer(file));
}
#endif
@@ -4922,7 +5026,8 @@ bool Intvar_log_event::write(IO_CACHE* f
buf[I_TYPE_OFFSET]= (uchar) type;
int8store(buf + I_VAL_OFFSET, val);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -5050,7 +5155,8 @@ bool Rand_log_event::write(IO_CACHE* fil
int8store(buf + RAND_SEED1_OFFSET, seed1);
int8store(buf + RAND_SEED2_OFFSET, seed2);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -5152,8 +5258,9 @@ Xid_log_event(const char* buf,
bool Xid_log_event::write(IO_CACHE* file)
{
DBUG_EXECUTE_IF("do_not_write_xid", return 0;);
- return write_header(file, sizeof(xid)) ||
- my_b_safe_write(file, (uchar*) &xid, sizeof(xid));
+ return (write_header(file, sizeof(xid)) ||
+ wrapper_my_b_safe_write(file, (uchar*) &xid, sizeof(xid)) ||
+ write_footer(file));
}
#endif
@@ -5372,10 +5479,11 @@ bool User_var_log_event::write(IO_CACHE*
event_length= sizeof(buf)+ name_len + buf1_length + val_len;
return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
- my_b_safe_write(file, (uchar*) name, name_len) ||
- my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
- my_b_safe_write(file, pos, val_len));
+ wrapper_my_b_safe_write(file, (uchar*) buf, sizeof(buf)) ||
+ wrapper_my_b_safe_write(file, (uchar*) name, name_len) ||
+ wrapper_my_b_safe_write(file, (uchar*) buf1, buf1_length) ||
+ wrapper_my_b_safe_write(file, pos, val_len) ||
+ write_footer(file));
}
#endif
@@ -5704,7 +5812,8 @@ bool Slave_log_event::write(IO_CACHE* fi
// log and host are already there
return (write_header(file, event_length) ||
- my_b_safe_write(file, (uchar*) mem_pool, event_length));
+ wrapper_my_b_safe_write(file, (uchar*) mem_pool, event_length) ||
+ write_footer(file));
}
#endif
@@ -5844,8 +5953,8 @@ bool Create_file_log_event::write_data_b
bool res;
if ((res= Load_log_event::write_data_body(file)) || fake_base)
return res;
- return (my_b_safe_write(file, (uchar*) "", 1) ||
- my_b_safe_write(file, (uchar*) block, block_len));
+ return (wrapper_my_b_safe_write(file, (uchar*) "", 1) ||
+ wrapper_my_b_safe_write(file, (uchar*) block, block_len));
}
@@ -5860,7 +5969,7 @@ bool Create_file_log_event::write_data_h
if ((res= Load_log_event::write_data_header(file)) || fake_base)
return res;
int4store(buf + CF_FILE_ID_OFFSET, file_id);
- return my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
+ return wrapper_my_b_safe_write(file, buf, CREATE_FILE_HEADER_LEN) != 0;
}
@@ -6129,8 +6238,9 @@ bool Append_block_log_event::write(IO_CA
uchar buf[APPEND_BLOCK_HEADER_LEN];
int4store(buf + AB_FILE_ID_OFFSET, file_id);
return (write_header(file, APPEND_BLOCK_HEADER_LEN + block_len) ||
- my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
- my_b_safe_write(file, (uchar*) block, block_len));
+ wrapper_my_b_safe_write(file, buf, APPEND_BLOCK_HEADER_LEN) ||
+ wrapper_my_b_safe_write(file, (uchar*) block, block_len) ||
+ write_footer(file));
}
#endif
@@ -6274,7 +6384,8 @@ bool Delete_file_log_event::write(IO_CAC
uchar buf[DELETE_FILE_HEADER_LEN];
int4store(buf + DF_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -6371,7 +6482,8 @@ bool Execute_load_log_event::write(IO_CA
uchar buf[EXEC_LOAD_HEADER_LEN];
int4store(buf + EL_FILE_ID_OFFSET, file_id);
return (write_header(file, sizeof(buf)) ||
- my_b_safe_write(file, buf, sizeof(buf)));
+ wrapper_my_b_safe_write(file, buf, sizeof(buf)) ||
+ write_footer(file));
}
#endif
@@ -6604,7 +6716,7 @@ Execute_load_query_log_event::write_post
int4store(buf + 4, fn_pos_start);
int4store(buf + 4 + 4, fn_pos_end);
*(buf + 4 + 4 + 4)= (uchar) dup_handling;
- return my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
+ return wrapper_my_b_safe_write(file, buf, EXECUTE_LOAD_QUERY_EXTRA_HEADER_LEN);
}
#endif
@@ -6745,16 +6857,23 @@ Execute_load_query_log_event::do_apply_e
sql_ex_info::write_data()
*/
-bool sql_ex_info::write_data(IO_CACHE* file)
+bool sql_ex_info::write_data(IO_CACHE* file, ha_checksum* crc)
{
+
+ bool res= FALSE;
if (new_format())
{
- return (write_str(file, field_term, (uint) field_term_len) ||
- write_str(file, enclosed, (uint) enclosed_len) ||
- write_str(file, line_term, (uint) line_term_len) ||
- write_str(file, line_start, (uint) line_start_len) ||
- write_str(file, escaped, (uint) escaped_len) ||
- my_b_safe_write(file,(uchar*) &opt_flags,1));
+ res = res || (write_str(file, field_term, (uint) field_term_len, crc) ||
+ write_str(file, enclosed, (uint) enclosed_len, crc) ||
+ write_str(file, line_term, (uint) line_term_len, crc) ||
+ write_str(file, line_start, (uint) line_start_len, crc) ||
+ write_str(file, escaped, (uint) escaped_len, crc));
+#ifndef MYSQL_CLIENT
+ if (opt_binlog_crc)
+ *crc= my_checksum(*crc, (uchar*) &opt_flags, 1);
+#endif
+
+ return (res || my_b_safe_write(file,(uchar*) &opt_flags, 1));
}
else
{
@@ -6770,7 +6889,13 @@ bool sql_ex_info::write_data(IO_CACHE* f
old_ex.escaped= *escaped;
old_ex.opt_flags= opt_flags;
old_ex.empty_flags=empty_flags;
- return my_b_safe_write(file, (uchar*) &old_ex, sizeof(old_ex)) != 0;
+
+#ifndef MYSQL_CLIENT
+ if (opt_binlog_crc)
+ *crc= my_checksum(*crc, (uchar*) &opt_flags, 1);
+#endif
+
+ return (res || (my_b_safe_write(file, (uchar*)&old_ex, sizeof(old_ex)) != 0));
}
}
@@ -7610,11 +7735,11 @@ bool Rows_log_event::write_data_header(I
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
+ return (wrapper_my_b_safe_write(file, buf, 6));
});
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+ return (wrapper_my_b_safe_write(file, buf, ROWS_HEADER_LEN));
}
bool Rows_log_event::write_data_body(IO_CACHE*file)
@@ -7630,10 +7755,10 @@ bool Rows_log_event::write_data_body(IO_
DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
- res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
+ res= res || wrapper_my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
DBUG_PRINT_BITSET("debug", "writing cols: %s", &m_cols);
- res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
+ res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols.bitmap,
no_bytes_in_map(&m_cols));
/*
TODO[refactor write]: Remove the "down cast" here (and elsewhere).
@@ -7642,11 +7767,11 @@ bool Rows_log_event::write_data_body(IO_
{
DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
- res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
+ res= res || wrapper_my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
no_bytes_in_map(&m_cols_ai));
}
DBUG_DUMP("rows", m_rows_buf, data_size);
- res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
+ res= res || wrapper_my_b_safe_write(file, m_rows_buf, (size_t) data_size);
return res;
@@ -8081,11 +8206,11 @@ bool Table_map_log_event::write_data_hea
{
int4store(buf + 0, m_table_id);
int2store(buf + 4, m_flags);
- return (my_b_safe_write(file, buf, 6));
+ return (wrapper_my_b_safe_write(file, buf, 6));
});
int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + TM_FLAGS_OFFSET, m_flags);
- return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
+ return (wrapper_my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
}
bool Table_map_log_event::write_data_body(IO_CACHE *file)
@@ -8109,15 +8234,15 @@ bool Table_map_log_event::write_data_bod
uchar mbuf[sizeof(m_field_metadata_size)];
uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
- return (my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
- my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
- my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
- my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
- my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
- my_b_safe_write(file, m_coltype, m_colcnt) ||
- my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
- my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
- my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
+ return (wrapper_my_b_safe_write(file, dbuf, sizeof(dbuf)) ||
+ wrapper_my_b_safe_write(file, (const uchar*)m_dbnam, m_dblen+1) ||
+ wrapper_my_b_safe_write(file, tbuf, sizeof(tbuf)) ||
+ wrapper_my_b_safe_write(file, (const uchar*)m_tblnam, m_tbllen+1) ||
+ wrapper_my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
+ wrapper_my_b_safe_write(file, m_coltype, m_colcnt) ||
+ wrapper_my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
+ wrapper_my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
+ wrapper_my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
}
#endif
@@ -9335,14 +9460,18 @@ Incident_log_event::write_data_header(IO
DBUG_PRINT("enter", ("m_incident: %d", m_incident));
uchar buf[sizeof(int16)];
int2store(buf, (int16) m_incident);
+#ifndef MYSQL_CLIENT
+ DBUG_RETURN(wrapper_my_b_safe_write(file, buf, sizeof(buf)));
+#else
DBUG_RETURN(my_b_safe_write(file, buf, sizeof(buf)));
+#endif
}
bool
Incident_log_event::write_data_body(IO_CACHE *file)
{
DBUG_ENTER("Incident_log_event::write_data_body");
- DBUG_RETURN(write_str(file, m_message.str, m_message.length));
+ DBUG_RETURN(write_str(file, m_message.str, m_message.length, &crc));
}
#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
@@ -9387,3 +9516,4 @@ st_print_event_info::st_print_event_info
open_cached_file(&body_cache, NULL, NULL, 0, flags);
}
#endif
+
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2009-02-02 15:58:48 +0000
+++ b/sql/log_event.h 2009-04-27 10:34:54 +0000
@@ -178,7 +178,7 @@ struct sql_ex_info
field_term_len + enclosed_len + line_term_len +
line_start_len + escaped_len + 6 : 7);
}
- bool write_data(IO_CACHE* file);
+ bool write_data(IO_CACHE* file, ha_checksum* crc);
const char* init(const char* buf, const char* buf_end, bool use_new_format);
bool new_format()
{
@@ -270,6 +270,10 @@ struct sql_ex_info
MAX_SIZE_LOG_EVENT_STATUS + /* status */ \
NAME_LEN + 1)
+/*
+ CRC field length.
+ */
+#define BINLOG_CRC_LEN 4
/*
Event header offsets;
these point to places inside the fixed header.
@@ -481,6 +485,8 @@ struct sql_ex_info
*/
#define LOG_EVENT_RELAY_LOG_F 0x40
+#define LOG_EVENT_CRC_F 0x80
+
/**
@def OPTIONS_WRITTEN_TO_BIN_LOG
@@ -926,6 +932,11 @@ public:
*/
ulong slave_exec_mode;
+ /*
+ Place holder for event checksum when server is configured to
+ use checksums while writing to binlog.
+ */
+ ha_checksum crc;
#ifndef MYSQL_CLIENT
THD* thd;
@@ -998,13 +1009,17 @@ public:
static void *operator new(size_t, void* ptr) { return ptr; }
static void operator delete(void*, void*) { }
+
#ifndef MYSQL_CLIENT
+ bool wrapper_my_b_safe_write(IO_CACHE* file, const uchar* buf, ulong data_length);
bool write_header(IO_CACHE* file, ulong data_length);
+ bool write_footer(IO_CACHE* file);
virtual bool write(IO_CACHE* file)
{
- return (write_header(file, get_data_size()) ||
- write_data_header(file) ||
- write_data_body(file));
+ return(write_header(file, get_data_size()) ||
+ write_data_header(file) ||
+ write_data_body(file) ||
+ write_footer(file));
}
virtual bool write_data_header(IO_CACHE* file)
{ return 0; }
@@ -3963,4 +3978,6 @@ private:
@} (end of group Replication)
*/
+bool
+event_checksum_test(uchar *buf, ulong event_len);
#endif /* _log_event_h */
=== modified file 'sql/mysql_priv.h'
--- a/sql/mysql_priv.h 2009-02-27 13:20:11 +0000
+++ b/sql/mysql_priv.h 2009-04-27 10:34:54 +0000
@@ -1958,6 +1958,7 @@ extern ulong max_prepared_stmt_count, pr
extern ulong binlog_cache_size, max_binlog_cache_size, open_files_limit;
extern ulong max_binlog_size, max_relay_log_size;
extern ulong opt_binlog_rows_event_max_size;
+extern my_bool opt_binlog_crc;
extern ulong rpl_recovery_rank, thread_cache_size, thread_pool_size;
extern ulong back_log;
#endif /* MYSQL_SERVER */
=== modified file 'sql/mysqld.cc'
--- a/sql/mysqld.cc 2009-02-27 13:20:11 +0000
+++ b/sql/mysqld.cc 2009-04-27 10:34:54 +0000
@@ -526,6 +526,7 @@ my_bool sp_automatic_privileges= 1;
my_bool disable_slaves= 0;
ulong opt_binlog_rows_event_max_size;
+my_bool opt_binlog_crc;
const char *binlog_format_names[]= {"MIXED", "STATEMENT", "ROW", NullS};
TYPELIB binlog_format_typelib=
{ array_elements(binlog_format_names) - 1, "",
@@ -5777,6 +5778,7 @@ enum options_mysqld
OPT_BINLOG_SHOW_XID,
#endif
OPT_BINLOG_ROWS_EVENT_MAX_SIZE,
+ OPT_BINLOG_CRC,
OPT_WANT_CORE, OPT_CONCURRENT_INSERT,
OPT_MEMLOCK, OPT_MYISAM_RECOVER,
OPT_REPLICATE_REWRITE_DB, OPT_SERVER_ID,
@@ -6025,6 +6027,11 @@ struct my_option my_long_options[] =
/* sub_size */ 0, /* block_size */ 256,
/* app_type */ 0
},
+ {"binlog-crc", OPT_BINLOG_CRC,
+ "CRC32 for binlog events."
+ "Use 0 (default) to disable.",
+ (uchar**) &opt_binlog_crc, (uchar**) &opt_binlog_crc, 0, GET_BOOL,
+ NO_ARG, 0, 0, 0, 0, 0, 0},
#ifndef DISABLE_GRANT_OPTIONS
{"bootstrap", OPT_BOOTSTRAP, "Used by mysql installation scripts.", 0, 0, 0,
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
=== modified file 'sql/share/errmsg.txt'
--- a/sql/share/errmsg.txt 2009-02-12 17:56:03 +0000
+++ b/sql/share/errmsg.txt 2009-04-27 10:34:54 +0000
@@ -6463,4 +6463,7 @@ ER_OPERATION_ABORTED
eng "Operation aborted"
ER_OPERATION_ABORTED_CORRUPTED
eng "Operation aborted - data might be corrupted"
-
+ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE
+ eng "Log event CRC check failed while reading from network."
+ER_BINLOG_READ_EVENT_CHECKSUM_FAILURE
+ eng "Log event CRC check failed while reading from binlog file."
=== modified file 'sql/slave.cc'
--- a/sql/slave.cc 2009-02-27 13:20:11 +0000
+++ b/sql/slave.cc 2009-04-27 10:34:54 +0000
@@ -3405,8 +3405,17 @@ static int queue_event(Master_info* mi,c
Relay_log_info *rli= &mi->rli;
pthread_mutex_t *log_lock= rli->relay_log.get_log_lock();
ulong s_id;
+ bool unlock_data_lock= TRUE;
+ uint16 flags= uint2korr(buf + FLAGS_OFFSET);
DBUG_ENTER("queue_event");
+ if (event_checksum_test((uchar *)buf, event_len))
+ {
+ error= ER_NETWORK_READ_EVENT_CHECKSUM_FAILURE;
+ unlock_data_lock= FALSE;
+ goto err;
+ }
+
LINT_INIT(inc_pos);
if (mi->rli.relay_log.description_event_for_queue->binlog_version<4 &&
@@ -3433,7 +3442,12 @@ static int queue_event(Master_info* mi,c
goto err;
case ROTATE_EVENT:
{
- Rotate_log_event rev(buf,event_len,mi->rli.relay_log.description_event_for_queue);
+ Rotate_log_event rev(buf,
+ (((flags & LOG_EVENT_CRC_F) == LOG_EVENT_CRC_F) ?
+ event_len - BINLOG_CRC_LEN :
+ event_len),
+ mi->rli.relay_log.description_event_for_queue);
+
if (unlikely(process_io_rotate(mi,&rev)))
{
error= ER_SLAVE_RELAY_LOG_WRITE_FAILURE;
@@ -3617,7 +3631,8 @@ static int queue_event(Master_info* mi,c
skip_relay_logging:
err:
- pthread_mutex_unlock(&mi->data_lock);
+ if (unlock_data_lock)
+ pthread_mutex_unlock(&mi->data_lock);
DBUG_PRINT("info", ("error: %d", error));
if (error)
mi->report(ERROR_LEVEL, error, ER(error),
=== modified file 'sql/sql_repl.cc'
--- a/sql/sql_repl.cc 2009-02-27 13:20:11 +0000
+++ b/sql/sql_repl.cc 2009-04-27 10:34:54 +0000
@@ -31,6 +31,20 @@ static int binlog_dump_count = 0;
#endif
extern my_bool disable_slaves;
+static void fix_checksum(String *packet, ulong ev_offset)
+{
+ uint16 flags= uint2korr(packet->ptr() + ev_offset + FLAGS_OFFSET);
+
+ /* recalculate the crc for this event */
+ if (opt_binlog_crc && ((flags & LOG_EVENT_CRC_F) == LOG_EVENT_CRC_F))
+ {
+ uint data_len = uint4korr(packet->ptr() + ev_offset + EVENT_LEN_OFFSET);
+ ha_checksum crc= my_checksum(0L, NULL, 0);
+ crc= my_checksum(crc, (uchar *)packet->ptr() + ev_offset, data_len - BINLOG_CRC_LEN);
+ int4store(packet->ptr() + ev_offset + data_len - BINLOG_CRC_LEN, crc);
+ }
+}
+
/*
fake_rotate_event() builds a fake (=which does not exist physically in any
binlog) Rotate event, which contains the name of the binlog we are going to
@@ -59,22 +73,36 @@ static int fake_rotate_event(NET* net, S
real and fake Rotate events (if necessary)
*/
memset(header, 0, 4);
- header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
+ header[EVENT_TYPE_OFFSET]= ROTATE_EVENT;
char* p = log_file_name+dirname_length(log_file_name);
uint ident_len = (uint) strlen(p);
- ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN;
+ ulong event_len = ident_len + LOG_EVENT_HEADER_LEN + ROTATE_HEADER_LEN + (opt_binlog_crc ? BINLOG_CRC_LEN : 0);
+ uint16 flags= LOG_EVENT_ARTIFICIAL_F | (opt_binlog_crc ? LOG_EVENT_CRC_F : 0);
+
int4store(header + SERVER_ID_OFFSET, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
- int2store(header + FLAGS_OFFSET, LOG_EVENT_ARTIFICIAL_F);
+ int2store(header + FLAGS_OFFSET, flags);
// TODO: check what problems this may cause and fix them
int4store(header + LOG_POS_OFFSET, 0);
packet->append(header, sizeof(header));
- int8store(buf+R_POS_OFFSET,position);
+ int8store(buf+R_POS_OFFSET, position);
packet->append(buf, ROTATE_HEADER_LEN);
- packet->append(p,ident_len);
+ packet->append(p, ident_len);
+
+ if (opt_binlog_crc)
+ {
+ char b[BINLOG_CRC_LEN];
+ ha_checksum crc= my_checksum(0L, NULL, 0);
+ crc= my_checksum(crc, (uchar*)header, sizeof(header));
+ crc= my_checksum(crc, (uchar*)buf, ROTATE_HEADER_LEN);
+ crc= my_checksum(crc, (uchar*)p, ident_len);
+ int4store(b, crc);
+ packet->append(b, sizeof(b));
+ }
+
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
*errmsg = "failed on my_net_write()";
@@ -645,6 +673,10 @@ impossible position";
*/
int4store((char*) packet->ptr()+LOG_EVENT_MINIMAL_HEADER_LEN+
ST_CREATED_OFFSET+ev_offset, (ulong) 0);
+
+ /* fix the checksum due to latest changes in header */
+ fix_checksum(packet, ev_offset);
+
/* send it */
if (my_net_write(net, (uchar*) packet->ptr(), packet->length()))
{
@@ -710,6 +742,9 @@ impossible position";
binlog_can_be_corrupted= test((*packet)[FLAGS_OFFSET+ev_offset] &
LOG_EVENT_BINLOG_IN_USE_F);
(*packet)[FLAGS_OFFSET+ev_offset] &= ~LOG_EVENT_BINLOG_IN_USE_F;
+
+ /* fix the checksum due to latest changes in header */
+ fix_checksum(packet, ev_offset);
}
else if (event_type == STOP_EVENT)
binlog_can_be_corrupted= FALSE;
@@ -2079,7 +2114,9 @@ static void fix_slave_net_timeout(THD *t
}
static sys_var_chain vars = { NULL, NULL };
-
+static sys_var_const sys_opt_binlog_crc(&vars, "binlog_crc",
+ OPT_GLOBAL, SHOW_MY_BOOL,
+ (uchar*) &opt_binlog_crc);
static sys_var_const sys_log_slave_updates(&vars, "log_slave_updates",
OPT_GLOBAL, SHOW_MY_BOOL,
(uchar*) &opt_log_slave_updates);
Attachment: [text/bzr-bundle] bzr/luis.soares@sun.com-20090427103454-iw9afkzfa0rpd3h3.bundle