List:Commits« Previous MessageNext Message »
From:Alfranio Correia Date:January 11 2011 7:47pm
Subject:Re: bzr commit into mysql-next-mr branch (luis.soares:3204) WL#5597
View as plain text  
Hi Luis,

Really great work !!!

However, the patch still needs to be improved.


STATUS
------

Not approved.

REQUESTS
--------

1 - I don't agree with the following decision:
  "Setting Slave_rows_search_algorithm to INDEX_SCAN only is disallowed and
   trying to do it ends up in error (ER_WRONG_VALUE_FOR_VAR)."

Find further comments in-line.

2 - Why do you allocate information to handle key searches without checking
if an INDEX_SCAN will be used. See an example from the Delete_rows_log_event:


int
Delete_rows_log_event::do_before_row_operations(const Slave_reporting_capability *const)
{
  if (m_table->s->keys > 0)
  {
    // Allocate buffer for key searches
    m_key= (uchar*)my_malloc(MAX_KEY_LENGTH, MYF(MY_WME));
    if (!m_key)
      return HA_ERR_OUT_OF_MEM;
  }

  /* will we be using a hash to lookup rows? If so, initialize it. */
  m_rows_lookup_algorithm= decide_row_lookup_algorithm(m_table, &m_cols,
get_type_code());
  if (m_rows_lookup_algorithm == ROW_LOOKUP_HASH_SCAN)
    m_hash.init();

  return 0;
}

int
Delete_rows_log_event::do_after_row_operations(const Slave_reporting_capability *const,
                                               int error)
{
  /*error= ToDo:find out what this should really be, this triggers close_scan in nbd,
returning error?*/
  m_table->file->ha_index_or_rnd_end();
  my_free(m_key);
  m_key= NULL;

  /* we don't need the hash anymore, free it */
  if (m_rows_lookup_algorithm == ROW_LOOKUP_HASH_SCAN)
    m_hash.deinit();
  m_rows_lookup_algorithm= ROW_LOOKUP_UNDEFINED;

  return error;
}

3 - In the Rows_log_event::do_index_scan_and_update, for every row, you check what index
should
be used. In my opinion, this should be moved to the do_before_row_operations as an
initialization
routine.

4 - Can you re-write this assertion? It sounds strange.

DBUG_ASSERT((m_hash.is_empty()) ? (error == 0) : (!m_hash.is_empty()));

5 - Address Jashon's requests too.

6 - See furhter comments in-line.

Cheers.


On 11/23/2010 01:01 AM, Luis Soares wrote:
> #At
> file:///home/lsoares/Workspace/bzr/work/features/wl5597/mysql-next-mr-wl5597-commit/ based
> on revid:alexander.nozdrin@stripped
> 
>  3204 Luis Soares	2010-11-23 [merge]
>       WL#5597: Using batch operations when there is no index in RBR
>       
>       When applying large delete_rows or update_rows log events and the
>       destination table does not have any index, the operation can take
>       a very long time. This is due to the fact that several table
>       scans are actually performed instead of just one.
>       
>       We fix this by adding a new search and update procedure to the
>       slave. Consequently, when a delete_rows or update_rows event is
>       about to be processed and there is no PK or INDEX on the
>       associated table, the slave is able to create a temporary
>       in-memory hash table and store the rows to be updated in
>       it. Then, for each row in the storage engine table, it checks if
>       the row exists in the hash table. If there is a match, it does
>       the operation. This is done in a one time table scan instead of
>       several.
> 
>     added:
>       mysql-test/extra/rpl_tests/rpl_row_idempotency.test
>       mysql-test/include/rpl_hash_scan_assertion.inc
>       mysql-test/suite/rpl/r/rpl_row_hash_scan.result
>       mysql-test/suite/rpl/r/rpl_row_hash_scan_sanity.result
>       mysql-test/suite/rpl/t/rpl_row_hash_scan.test
>       mysql-test/suite/rpl/t/rpl_row_hash_scan_sanity.test
>       mysql-test/suite/sys_vars/r/slave_rows_search_algorithms_basic.result
>       mysql-test/suite/sys_vars/t/slave_rows_search_algorithms_basic.test
>     modified:
>       mysql-test/r/mysqld--help-notwin.result
>       mysql-test/r/mysqld--help-win.result
>       mysql-test/suite/rpl/r/rpl_row_idempotency.result
>       mysql-test/suite/rpl/t/rpl_row_idempotency.test
>       mysql-test/suite/sys_vars/r/all_vars.result
>       sql/log_event.cc
>       sql/log_event.h
>       sql/mysqld.cc
>       sql/mysqld.h
>       sql/rpl_utility.cc
>       sql/rpl_utility.h
>       sql/sql_class.h
>       sql/sys_vars.cc
> === added file 'mysql-test/extra/rpl_tests/rpl_row_idempotency.test'
> --- a/mysql-test/extra/rpl_tests/rpl_row_idempotency.test	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/extra/rpl_tests/rpl_row_idempotency.test	2010-11-23 00:08:01 +0000
> @@ -0,0 +1,313 @@
> +
> +# bug#31609 Not all RBR slave errors reported as errors
> +# bug#31552 Replication breaks when deleting rows from out-of-sync table
> +#           without PK
> +
> +# The default for slave-exec-mode option and server
> +# variable slave_exec_mode  is 'STRICT'.
> +# When 'STRICT' mode is set, the slave SQL thread will stop whenever
> +# the row to change is not found. In 'IDEMPOTENT' mode, the SQL thread
> +# will continue running and apply the row - replace if it's Write_rows event -
> +# or skip to the next event.
> +
> +# the previous part of the tests was with IDEMPOTENT slave's mode.
> +
> +
> +#
> +# Other than above idempotent errors dealing with foreign keys constraint
> +#
> +connection slave;
> +
> +set @old_slave_exec_mode= @@global.slave_exec_mode;
> +set @@global.slave_exec_mode= IDEMPOTENT;
> +
> +connection master;
> +
> +create table ti1 (b int primary key) engine = innodb;
> +create table ti2 (a int primary key, b int, foreign key (b) references ti1(b))
> +     engine = innodb;
> +set foreign_key_checks=1 /* ensure the check */;
> +
> +insert into ti1 values (1),(2),(3);
> +insert into ti2 set a=2, b=2;
> +
> +sync_slave_with_master;
> +
> +#connection slave;
> +select * from ti1 order by b /* must be (1),(2),(3) */;
> +insert into ti2 set a=1, b=1;
> +select * from ti2 order by b /* must be (1,1) (2,2) */;
> +
> +connection master;
> +
> +# from now on checking rbr specific idempotent errors
> +set @save_binlog_format= @@session.binlog_format;
> +set @@session.binlog_format= row;
> +delete from ti1 where b=1;
> +
> +select * from ti1 order by b /* must be (2),(3) */;
> +
> +# slave must catch up (expect some warnings in error.log)
> +sync_slave_with_master;
> +
> +#connection slave;
> +select * from ti1 order by b /* must stays as were on master (1),(2),(3) */;
> +
> +delete from ti1 where b=3;
> +
> +connection master;
> +insert into ti2 set a=3, b=3;
> +
> +# slave must catch up (expect some warnings in error.log)
> +sync_slave_with_master;
> +
> +#connection slave;
> +select * from ti2 order by b /* must be (1,1),(2,2) - not inserted */;
> +
> +
> +#
> +# Checking the new global sys variable
> +#
> +
> +connection slave;
> +
> +set global slave_exec_mode='IDEMPOTENT';
> +set global slave_exec_mode='STRICT';
> +
> +# checking mutual exclusion for the options
> +--error ER_WRONG_VALUE_FOR_VAR
> +set global slave_exec_mode='IDEMPOTENT,STRICT';
> +
> +select @@global.slave_exec_mode /* must be STRICT */;
> +
> +#
> +# Checking stops.
> +# In the following sections strict slave sql thread is going to
> +# stop when faces an idempotent error. In order to proceed
> +# the mode is temporarily switched to indempotent.
> +#
> +
> +#
> +--echo *** foreign keys errors as above now forces to stop
> +#
> +
> +connection master;
> +
> +set foreign_key_checks=0;
> +drop table ti2, ti1;
> +
> +create table ti1 (b int primary key) engine = innodb;
> +create table ti2 (a int primary key, b int, foreign key (b) references ti1(b))
> +     engine = innodb;
> +set foreign_key_checks=1 /* ensure the check */;
> +
> +insert into ti1 values (1),(2),(3);
> +insert into ti2 set a=2, b=2;
> +
> +sync_slave_with_master;
> +
> +#connection slave;
> +select * from ti1 order by b /* must be (1),(2),(3) */;
> +--echo *** conspire future problem
> +insert into ti2 set a=1, b=1;
> +select * from ti2 order by b /* must be (1,1) (2,2) */;
> +
> +connection master;
> +
> +delete from ti1 where b=1 /* offending delete event */;
> +select * from ti1 order by b /* must be (2),(3) */;
> +
> +# foreign key: row is referenced
> +
> +--echo *** slave must stop (Trying to delete a referenced foreing key)
> +connection slave;
> +source include/wait_for_slave_sql_to_stop.inc;
> +
> +let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> +disable_query_log;
> +eval SELECT "$last_error" AS Last_SQL_Error;
> +enable_query_log;
> +
> +select * from ti1 order by b /* must be (1),(2),(3) - not deleted */;
> +set foreign_key_checks= 0;
> +delete from ti2 where b=1;
> +set foreign_key_checks= 1;
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +connection master;
> +sync_slave_with_master;
> +#connection slave;
> +set global slave_exec_mode='STRICT';
> +
> +connection master;
> +
> +sync_slave_with_master;
> +
> +#connection slave;
> +--echo *** conspire the following insert failure
> +# foreign key: no referenced row
> +
> +--echo *** conspire future problem
> +delete from ti1 where b=3;
> +
> +connection master;
> +insert into ti2 set a=3, b=3 /* offending write event */;
> +
> +--echo *** slave must stop (Trying to insert an invalid foreign key)
> +connection slave;
> +source include/wait_for_slave_sql_to_stop.inc;
> +
> +let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> +disable_query_log;
> +eval SELECT "$last_error" AS Last_SQL_Error;
> +enable_query_log;
> +
> +select * from ti2 order by b /* must be (2,2) */;
> +set foreign_key_checks= 0;
> +insert into ti1 set b=3;
> +set foreign_key_checks= 1;
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +connection master;
> +sync_slave_with_master;
> +#connection slave;
> +set global slave_exec_mode='STRICT';
> +
> +connection master;
> +
> +sync_slave_with_master;
> +
> +select * from ti2 order by b /* must be (2,2),(3,3) */;
> +
> +# 
> +--echo *** other errors
> +# 
> +
> +# dup key insert
> +
> +#connection slave;
> +--echo *** conspiring query
> +insert into ti1 set b=1;
> +
> +connection master;
> +insert into ti1 set b=1 /* offending write event */;
> +
> +--echo *** slave must stop (Trying to insert a dupliacte key)
> +connection slave;
> +source include/wait_for_slave_sql_to_stop.inc;
> +
> +let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> +disable_query_log;
> +eval SELECT "$last_error" AS Last_SQL_Error;
> +enable_query_log;
> +
> +set foreign_key_checks= 0;
> +delete from ti1 where b=1;
> +set foreign_key_checks= 1;
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +connection master;
> +sync_slave_with_master;
> +#connection slave;
> +set global slave_exec_mode='STRICT';
> +
> +# key not found
> +
> +connection master;
> +
> +CREATE TABLE t1 (a INT PRIMARY KEY);
> +CREATE TABLE t2 (a INT);
> +INSERT INTO t1 VALUES (-1),(-2),(-3);
> +INSERT INTO t2 VALUES (-1),(-2),(-3);
> +sync_slave_with_master;
> +
> +#connection slave;
> +DELETE FROM t1 WHERE a = -2;
> +DELETE FROM t2 WHERE a = -2;
> +connection master;
> +DELETE FROM t1 WHERE a = -2;
> +
> +--echo *** slave must stop (Key was not found)
> +connection slave;
> +source include/wait_for_slave_sql_to_stop.inc;
> +
> +let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> +disable_query_log;
> +eval SELECT "$last_error" AS Last_SQL_Error;
> +enable_query_log;
> +
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +connection master;
> +sync_slave_with_master;
> +#connection slave;
> +set global slave_exec_mode='STRICT';
> +
> +connection master;
> +DELETE FROM t2 WHERE a = -2; 
> +--echo *** slave must stop (Key was not found)
> +connection slave;
> +source include/wait_for_slave_sql_to_stop.inc;
> +
> +let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> +disable_query_log;
> +eval SELECT "$last_error" AS Last_SQL_Error;
> +enable_query_log;
> +
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +connection master;
> +sync_slave_with_master;
> +#connection slave;
> +set global slave_exec_mode='STRICT';
> +
> +UPDATE t1 SET a = 1 WHERE a = -1;
> +UPDATE t2 SET a = 1 WHERE a = -1;
> +
> +connection master;
> +UPDATE t1 SET a = 1 WHERE a = -1;
> +
> +--echo *** slave must stop (Key was not found)
> +connection slave;
> +source include/wait_for_slave_sql_to_stop.inc;
> +
> +let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> +disable_query_log;
> +eval SELECT "$last_error" AS Last_SQL_Error;
> +enable_query_log;
> +
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +connection master;
> +sync_slave_with_master;
> +#connection slave;
> +set global slave_exec_mode='STRICT';
> +
> +
> +connection master;
> +UPDATE t2 SET a = 1 WHERE a = -1;
> +
> +--echo *** slave must stop (Key was not found)
> +connection slave;
> +source include/wait_for_slave_sql_to_stop.inc;
> +
> +let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> +disable_query_log;
> +eval SELECT "$last_error" AS Last_SQL_Error;
> +enable_query_log;
> +
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +connection master;
> +sync_slave_with_master;
> +#connection slave;
> +SET @@global.slave_exec_mode= @old_slave_exec_mode;
> +
> +# cleanup for bug#31609 tests
> +
> +connection master;
> +
> +drop table t1,t2,ti2,ti1;
> +sync_slave_with_master;
> +set @@global.slave_exec_mode= @old_slave_exec_mode;
> +
> 
> === added file 'mysql-test/include/rpl_hash_scan_assertion.inc'
> --- a/mysql-test/include/rpl_hash_scan_assertion.inc	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/include/rpl_hash_scan_assertion.inc	2010-11-23 00:53:54 +0000
> @@ -0,0 +1,9 @@
> +-- sync_slave_with_master
> +-- let $scan_alg= query_get_value(SHOW STATUS LIKE
> "Slave_rows_last_search_algorithm_used", Value, 1)
> +if (`SELECT '$scan_alg' <> $expected_alg`)
> +{
> +    -- source include/show_rpl_debug_info.inc
> +    -- echo Unexcepted search algorithm at the slave: got $scan_alg, expected:
> $expected_alg
> +    -- die 
> +}
> +-- connection master
> 
> === modified file 'mysql-test/r/mysqld--help-notwin.result'
> --- a/mysql-test/r/mysqld--help-notwin.result	2010-10-08 14:35:24 +0000
> +++ b/mysql-test/r/mysqld--help-notwin.result	2010-11-22 21:10:41 +0000
> @@ -641,6 +641,14 @@ The following options may be given as th
>   --slave-net-timeout=# 
>   Number of seconds to wait for more data from a
>   master/slave connection before aborting the read
> + --slave-rows-search-algorithms=name 
> + Set of searching algorithms that the slave will use while
> + searching for records from the storage engine to either
> + updated or deleted them. Possible values are: INDEX_SCAN,
> + TABLE_SCAN and HASH_SCAN. Any combination is allowed, and
> + the slave will always pick the most suitable algorithm
> + for any given scenario. (Default: INDEX_SCAN,
> + TABLE_SCAN).
>   --slave-skip-errors=name 
>   Tells the slave thread to continue replication when a
>   query event returns an error from the provided list
> @@ -930,6 +938,7 @@ skip-slave-start FALSE
>  slave-compressed-protocol FALSE
>  slave-exec-mode STRICT
>  slave-net-timeout 3600
> +slave-rows-search-algorithms TABLE_SCAN,INDEX_SCAN
>  slave-skip-errors (No default value)
>  slave-transaction-retries 10
>  slave-type-conversions 
> 
> === modified file 'mysql-test/r/mysqld--help-win.result'
> --- a/mysql-test/r/mysqld--help-win.result	2010-10-08 14:35:24 +0000
> +++ b/mysql-test/r/mysqld--help-win.result	2010-11-22 21:10:41 +0000
> @@ -644,6 +644,14 @@ The following options may be given as th
>   --slave-net-timeout=# 
>   Number of seconds to wait for more data from a
>   master/slave connection before aborting the read
> + --slave-rows-search-algorithms=name 
> + Set of searching algorithms that the slave will use while
> + searching for records from the storage engine to either
> + updated or deleted them. Possible values are: INDEX_SCAN,
> + TABLE_SCAN and HASH_SCAN. Any combination is allowed, and
> + the slave will always pick the most suitable algorithm
> + for any given scenario. (Default: INDEX_SCAN,
> + TABLE_SCAN).
>   --slave-skip-errors=name 
>   Tells the slave thread to continue replication when a
>   query event returns an error from the provided list
> 
> === added file 'mysql-test/suite/rpl/r/rpl_row_hash_scan.result'
> --- a/mysql-test/suite/rpl/r/rpl_row_hash_scan.result	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/suite/rpl/r/rpl_row_hash_scan.result	2010-11-23 00:08:01 +0000
> @@ -0,0 +1,105 @@
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +call mtr.add_suppression(" Slave: Can't find record in 't1' Error_code: 1032");
> +SET @saved_slave_rows_search_algorithms= @@global.slave_rows_search_algorithms;
> +SET GLOBAL slave_rows_search_algorithms= 'INDEX_SCAN,HASH_SCAN';
> +CREATE TABLE t1 (a INT);
> +INSERT INTO t1 VALUES (1), (1), (2), (3);
> +DELETE FROM t1;
> +INSERT INTO t1 VALUES (2), (1), (3), (1);
> +UPDATE t1 SET a=1000 WHERE a=1;
> +Comparing tables master:test.t1 and master:test.t1
> +DELETE FROM t1 WHERE a=1000;
> +DELETE FROM t1 WHERE a=2 OR a=3;
> +Comparing tables master:test.t1 and slave:test.t1
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +SET SQL_LOG_BIN=0;
> +CREATE TABLE t1 (a INT, b INT);
> +SET SQL_LOG_BIN=1;
> +CREATE TABLE t1 (a INT);
> +INSERT INTO t1 VALUES (1,1), (1,2), (2,1), (2,2);
> +UPDATE t1 SET a=1000 WHERE a=1;
> +SELECT * FROM t1;
> +a	b
> +1000	1
> +1000	2
> +2	1
> +2	2
> +SELECT * FROM t1;
> +a
> +1000
> +1000
> +2
> +2
> +DELETE FROM t1 WHERE a=1000;
> +DELETE FROM t1 WHERE a=2;
> +SELECT * FROM t1;
> +a	b
> +SELECT * FROM t1;
> +a
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +CREATE TABLE t1 (a INT);
> +INSERT INTO t1 VALUES (1), (1), (2), (3);
> +DELETE FROM t1 WHERE a=1;
> +DELETE FROM t1 WHERE a=2;
> +UPDATE t1 SET a=1000 WHERE a=1;
> +--source include/wait_for_slave_sql_error_and_skip.inc
> +SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1;
> +include/start_slave.inc
> +DELETE FROM t1 WHERE a=2;
> +--source include/wait_for_slave_sql_error_and_skip.inc
> +SET GLOBAL SQL_SLAVE_SKIP_COUNTER=1;
> +include/start_slave.inc
> +DROP TABLE t1;
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +CREATE TABLE t1 (a INT, b TINYBLOB);
> +INSERT INTO t1 VALUES (1,'a'), (1, 'b'), (2,'aa'), (2, 'aa');
> +UPDATE t1 SET b='c' WHERE a=1;
> +Comparing tables master:test.t1 and slave:test.t1
> +UPDATE t1 SET a=10000 WHERE b='aa';
> +Comparing tables master:test.t1 and slave:test.t1
> +UPDATE t1 SET b='c' WHERE b='aa';
> +Comparing tables master:test.t1 and slave:test.t1
> +DELETE FROM t1 WHERE b='c';
> +Comparing tables master:test.t1 and slave:test.t1
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +CREATE TABLE t1 (a TINYBLOB, b TINYBLOB);
> +INSERT INTO t1 VALUES ('a','a'), ('b', 'b'), ('a','aa'), ('a', 'aa');
> +UPDATE t1 SET b='c' WHERE b='aa';
> +Comparing tables master:test.t1 and slave:test.t1
> +DELETE FROM t1;
> +Comparing tables master:test.t1 and slave:test.t1
> +INSERT INTO t1 VALUES (NULL,NULL), (NULL, NULL);
> +DELETE FROM t1;
> +Comparing tables master:test.t1 and slave:test.t1
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +SET @@global.slave_rows_search_algorithms= @saved_slave_rows_search_algorithms;
> 
> === added file 'mysql-test/suite/rpl/r/rpl_row_hash_scan_sanity.result'
> --- a/mysql-test/suite/rpl/r/rpl_row_hash_scan_sanity.result	1970-01-01 00:00:00
> +0000
> +++ b/mysql-test/suite/rpl/r/rpl_row_hash_scan_sanity.result	2010-11-23 00:53:54
> +0000
> @@ -0,0 +1,52 @@
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +CREATE TABLE t1 (c1 INT);
> +CREATE TABLE t2 (c1 INT PRIMARY KEY);
> +CREATE TABLE t3 (c1 INT UNIQUE KEY);
> +CREATE TABLE t4 (c1 INT KEY);
> +INSERT INTO t1 VALUES (1);
> +INSERT INTO t2 VALUES (1);
> +INSERT INTO t3 VALUES (1);
> +INSERT INTO t4 VALUES (1);
> +SET @saved_slave_rows_search_algorithms= @@global.slave_rows_search_algorithms;
> +SET @@global.slave_rows_search_algorithms= 'TABLE_SCAN';
> +UPDATE t1 SET c1= 2;
> +UPDATE t2 SET c1= 2;
> +UPDATE t3 SET c1= 2;
> +UPDATE t4 SET c1= 2;
> +SET @@global.slave_rows_search_algorithms= 'TABLE_SCAN,INDEX_SCAN';
> +UPDATE t1 SET c1= 3;
> +UPDATE t2 SET c1= 3;
> +UPDATE t3 SET c1= 3;
> +UPDATE t4 SET c1= 3;
> +SET @@global.slave_rows_search_algorithms= 'TABLE_SCAN,HASH_SCAN';
> +UPDATE t1 SET c1= 4;
> +UPDATE t2 SET c1= 4;
> +UPDATE t3 SET c1= 4;
> +UPDATE t4 SET c1= 4;
> +SET @@global.slave_rows_search_algorithms= 'HASH_SCAN';
> +UPDATE t1 SET c1= 5;
> +UPDATE t2 SET c1= 5;
> +UPDATE t3 SET c1= 5;
> +UPDATE t4 SET c1= 5;
> +SET @@global.slave_rows_search_algorithms= 'HASH_SCAN,INDEX_SCAN';
> +UPDATE t1 SET c1= 6;
> +UPDATE t2 SET c1= 6;
> +UPDATE t3 SET c1= 6;
> +UPDATE t4 SET c1= 6;
> +SET @@global.slave_rows_search_algorithms= 'HASH_SCAN,INDEX_SCAN,TABLE_SCAN';
> +UPDATE t1 SET c1= 7;
> +UPDATE t2 SET c1= 7;
> +UPDATE t3 SET c1= 7;
> +UPDATE t4 SET c1= 7;
> +SET @@global.slave_rows_search_algorithms= @saved_slave_rows_search_algorithms;
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> 
> === modified file 'mysql-test/suite/rpl/r/rpl_row_idempotency.result'
> --- a/mysql-test/suite/rpl/r/rpl_row_idempotency.result	2010-01-12 17:52:02 +0000
> +++ b/mysql-test/suite/rpl/r/rpl_row_idempotency.result	2010-11-23 00:08:01 +0000
> @@ -7,6 +7,8 @@ start slave;
>  call mtr.add_suppression("Slave: Can't find record in 't.' Error_code: 1032");
>  call mtr.add_suppression("Slave: Cannot delete or update a parent row: a foreign key
> constraint fails .* Error_code: 1451");
>  call mtr.add_suppression("Slave: Cannot add or update a child row: a foreign key
> constraint fails .* Error_code: 1452");
> +set @saved_slave_rows_search_algorithms= @@global.slave_rows_search_algorithms;
> +SET GLOBAL slave_rows_search_algorithms= 'INDEX_SCAN,TABLE_SCAN';
>  set @old_slave_exec_mode= @@global.slave_exec_mode;
>  set @@global.slave_exec_mode= IDEMPOTENT;
>  create table ti1 (b int primary key) engine = innodb;
> @@ -160,4 +162,165 @@ start slave sql_thread;
>  SET @@global.slave_exec_mode= @old_slave_exec_mode;
>  drop table t1,t2,ti2,ti1;
>  set @@global.slave_exec_mode= @old_slave_exec_mode;
> +stop slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +reset master;
> +reset slave;
> +drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
> +start slave;
> +SET GLOBAL slave_rows_search_algorithms= 'INDEX_SCAN,HASH_SCAN';
> +set @old_slave_exec_mode= @@global.slave_exec_mode;
> +set @@global.slave_exec_mode= IDEMPOTENT;
> +create table ti1 (b int primary key) engine = innodb;
> +create table ti2 (a int primary key, b int, foreign key (b) references ti1(b))
> +engine = innodb;
> +set foreign_key_checks=1 /* ensure the check */;
> +insert into ti1 values (1),(2),(3);
> +insert into ti2 set a=2, b=2;
> +select * from ti1 order by b /* must be (1),(2),(3) */;
> +b
> +1
> +2
> +3
> +insert into ti2 set a=1, b=1;
> +select * from ti2 order by b /* must be (1,1) (2,2) */;
> +a	b
> +1	1
> +2	2
> +set @save_binlog_format= @@session.binlog_format;
> +set @@session.binlog_format= row;
> +delete from ti1 where b=1;
> +select * from ti1 order by b /* must be (2),(3) */;
> +b
> +2
> +3
> +select * from ti1 order by b /* must stays as were on master (1),(2),(3) */;
> +b
> +1
> +2
> +3
> +delete from ti1 where b=3;
> +insert into ti2 set a=3, b=3;
> +select * from ti2 order by b /* must be (1,1),(2,2) - not inserted */;
> +a	b
> +1	1
> +2	2
> +set global slave_exec_mode='IDEMPOTENT';
> +set global slave_exec_mode='STRICT';
> +set global slave_exec_mode='IDEMPOTENT,STRICT';
> +ERROR 42000: Variable 'slave_exec_mode' can't be set to the value of
> 'IDEMPOTENT,STRICT'
> +select @@global.slave_exec_mode /* must be STRICT */;
> +@@global.slave_exec_mode
> +STRICT
> +*** foreign keys errors as above now forces to stop
> +set foreign_key_checks=0;
> +drop table ti2, ti1;
> +create table ti1 (b int primary key) engine = innodb;
> +create table ti2 (a int primary key, b int, foreign key (b) references ti1(b))
> +engine = innodb;
> +set foreign_key_checks=1 /* ensure the check */;
> +insert into ti1 values (1),(2),(3);
> +insert into ti2 set a=2, b=2;
> +select * from ti1 order by b /* must be (1),(2),(3) */;
> +b
> +1
> +2
> +3
> +*** conspire future problem
> +insert into ti2 set a=1, b=1;
> +select * from ti2 order by b /* must be (1,1) (2,2) */;
> +a	b
> +1	1
> +2	2
> +delete from ti1 where b=1 /* offending delete event */;
> +select * from ti1 order by b /* must be (2),(3) */;
> +b
> +2
> +3
> +*** slave must stop (Trying to delete a referenced foreing key)
> +Last_SQL_Error
> +1451
> +select * from ti1 order by b /* must be (1),(2),(3) - not deleted */;
> +b
> +1
> +2
> +3
> +set foreign_key_checks= 0;
> +delete from ti2 where b=1;
> +set foreign_key_checks= 1;
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +set global slave_exec_mode='STRICT';
> +*** conspire the following insert failure
> +*** conspire future problem
> +delete from ti1 where b=3;
> +insert into ti2 set a=3, b=3 /* offending write event */;
> +*** slave must stop (Trying to insert an invalid foreign key)
> +Last_SQL_Error
> +1452
> +select * from ti2 order by b /* must be (2,2) */;
> +a	b
> +2	2
> +set foreign_key_checks= 0;
> +insert into ti1 set b=3;
> +set foreign_key_checks= 1;
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +set global slave_exec_mode='STRICT';
> +select * from ti2 order by b /* must be (2,2),(3,3) */;
> +a	b
> +2	2
> +3	3
> +*** other errors
> +*** conspiring query
> +insert into ti1 set b=1;
> +insert into ti1 set b=1 /* offending write event */;
> +*** slave must stop (Trying to insert a dupliacte key)
> +Last_SQL_Error
> +1062
> +set foreign_key_checks= 0;
> +delete from ti1 where b=1;
> +set foreign_key_checks= 1;
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +set global slave_exec_mode='STRICT';
> +CREATE TABLE t1 (a INT PRIMARY KEY);
> +CREATE TABLE t2 (a INT);
> +INSERT INTO t1 VALUES (-1),(-2),(-3);
> +INSERT INTO t2 VALUES (-1),(-2),(-3);
> +DELETE FROM t1 WHERE a = -2;
> +DELETE FROM t2 WHERE a = -2;
> +DELETE FROM t1 WHERE a = -2;
> +*** slave must stop (Key was not found)
> +Last_SQL_Error
> +1032
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +set global slave_exec_mode='STRICT';
> +DELETE FROM t2 WHERE a = -2;
> +*** slave must stop (Key was not found)
> +Last_SQL_Error
> +1032
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +set global slave_exec_mode='STRICT';
> +UPDATE t1 SET a = 1 WHERE a = -1;
> +UPDATE t2 SET a = 1 WHERE a = -1;
> +UPDATE t1 SET a = 1 WHERE a = -1;
> +*** slave must stop (Key was not found)
> +Last_SQL_Error
> +1032
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +set global slave_exec_mode='STRICT';
> +UPDATE t2 SET a = 1 WHERE a = -1;
> +*** slave must stop (Key was not found)
> +Last_SQL_Error
> +1032
> +set global slave_exec_mode='IDEMPOTENT';
> +start slave sql_thread;
> +SET @@global.slave_exec_mode= @old_slave_exec_mode;
> +drop table t1,t2,ti2,ti1;
> +set @@global.slave_exec_mode= @old_slave_exec_mode;
> +set @@global.slave_rows_search_algorithms= @saved_slave_rows_search_algorithms;
>  *** end of tests
> 
> === added file 'mysql-test/suite/rpl/t/rpl_row_hash_scan.test'
> --- a/mysql-test/suite/rpl/t/rpl_row_hash_scan.test	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/suite/rpl/t/rpl_row_hash_scan.test	2010-11-23 00:08:01 +0000
> @@ -0,0 +1,218 @@
> +-- source include/master-slave.inc
> +-- source include/have_binlog_format_row.inc
> +
> +#
> +# Test cases for WL#5597
> +#
> +# In this file, we only test for the following cases:
> +
> +#
> +# CASE #1: update/delete multiple records from a table that share the
> +#          same hashtable key (in slave HASH_SCAN algorithm).
> +
> +#
> +# CASE #2: same as CASE #1, but the reason is that the master has more
> +#          columns than the slave, thence duplicate keys in slave's
> +#          hashtable are a side effect, but should not be a problem.
> +
> +#
> +# CASE #3: the slave stops gracefully when it is updating a row that
> +#          does not exist on its table.
> +
> +#
> +# CASE #4: update/delete multiple records with blobs. Given that blobs
> +#          are not included in hashing, some records keys will collide.
> +
> +#
> +# CASE #5: update/delete tables with only blob columns.
> +# 
> +
> +-- connection slave
> +call mtr.add_suppression(" Slave: Can't find record in 't1' Error_code: 1032");
> +
> +SET @saved_slave_rows_search_algorithms= @@global.slave_rows_search_algorithms;
> +SET GLOBAL slave_rows_search_algorithms= 'INDEX_SCAN,HASH_SCAN';
> +
> +#
> +# CASE #1: entries that generating the same key for the slave internal
> +#          hash table.
> +#
> +# ASSERTS that no updates are lost due to having multiple entries for
> +#         the same hashtable key in the slave HASH_SCAN.
> +#
> +
> +-- connection master
> +CREATE TABLE t1 (a INT);
> +INSERT INTO t1 VALUES (1), (1), (2), (3);
> +-- sync_slave_with_master
> +DELETE FROM t1;
> +
> +# try to change the order of the rows in the engine.
> +INSERT INTO t1 VALUES (2), (1), (3), (1);
> +
> +-- connection master
> +UPDATE t1 SET a=1000 WHERE a=1;
> +-- sync_slave_with_master
> +
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=master:test.t1
> +-- source include/diff_tables.inc
> +
> +-- connection master
> +DELETE FROM t1 WHERE a=1000;
> +DELETE FROM t1 WHERE a=2 OR a=3;
> +-- sync_slave_with_master
> +
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- source include/master-slave-reset.inc
> +
> +# CASE #2: entries generating the same key for the slave internal
> +#          hashtable because master table has more columns than the
> +#          slave's.
> +#
> +# ASSERTS that no updates are lost due to having multiple entries for
> +#         the same hashtable key in the slave HASH_SCAN when master
> +#         has more tables than the slave.
> +
> +-- connection master
> +
> +SET SQL_LOG_BIN=0;
> +CREATE TABLE t1 (a INT, b INT);
> +SET SQL_LOG_BIN=1;
> +-- connection slave
> +CREATE TABLE t1 (a INT);
> +-- connection master
> +INSERT INTO t1 VALUES (1,1), (1,2), (2,1), (2,2);
> +UPDATE t1 SET a=1000 WHERE a=1;
> +
> +SELECT * FROM t1;
> +-- sync_slave_with_master
> +SELECT * FROM t1;
> +
> +-- connection master
> +DELETE FROM t1 WHERE a=1000;
> +DELETE FROM t1 WHERE a=2;
> +SELECT * FROM t1;
> +-- sync_slave_with_master
> +SELECT * FROM t1;
> +
> +-- source include/master-slave-reset.inc
> +
> +#
> +# CASE #3: The master updates and deletes some row that the slave does
> +#          not have.
> +#
> +# ASSERTS that the slave shall fail gracefully when the row is not found.
> +#
> +
> +-- connection master
> +
> +CREATE TABLE t1 (a INT);
> +INSERT INTO t1 VALUES (1), (1), (2), (3);
> +-- sync_slave_with_master
> +DELETE FROM t1 WHERE a=1;
> +DELETE FROM t1 WHERE a=2;
> +
> +-- connection master
> +UPDATE t1 SET a=1000 WHERE a=1;
> +
> +-- let $slave_sql_errno= 1032
> +-- source include/wait_for_slave_sql_error_and_skip.inc
> +
> +-- connection master
> +DELETE FROM t1 WHERE a=2;
> +-- let $slave_sql_errno= 1032
> +-- source include/wait_for_slave_sql_error_and_skip.inc
> +DROP TABLE t1;
> +-- sync_slave_with_master
> +
> +-- source include/master-slave-reset.inc
> +
> +#
> +# CASE #4: covers the case of tables that have blobs in them.
> +#
> +# ASSERTS that there are no lost updates
> +
> +-- connection master
> +
> +CREATE TABLE t1 (a INT, b TINYBLOB);
> +INSERT INTO t1 VALUES (1,'a'), (1, 'b'), (2,'aa'), (2, 'aa');
> +
> +UPDATE t1 SET b='c' WHERE a=1;
> +-- sync_slave_with_master
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- connection master
> +
> +UPDATE t1 SET a=10000 WHERE b='aa';
> +-- sync_slave_with_master
> +
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- connection master
> +
> +UPDATE t1 SET b='c' WHERE b='aa';
> +-- sync_slave_with_master
> +
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- connection master
> +
> +DELETE FROM t1 WHERE b='c';
> +-- sync_slave_with_master
> +
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- source include/master-slave-reset.inc
> +
> +#
> +# CASE #5: covers the case on which the table has only blobs in it.
> +#
> +# ASSERTS that there are no issues even if blobs are skipped from the
> +#         hashing. Tables on master and slave will not go out-of-sync.
> +#
> +
> +-- connection master
> +
> +CREATE TABLE t1 (a TINYBLOB, b TINYBLOB);
> +INSERT INTO t1 VALUES ('a','a'), ('b', 'b'), ('a','aa'), ('a', 'aa');
> +
> +UPDATE t1 SET b='c' WHERE b='aa';
> +-- sync_slave_with_master
> +
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- connection master
> +
> +DELETE FROM t1;
> +-- sync_slave_with_master
> +
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- connection master
> +INSERT INTO t1 VALUES (NULL,NULL), (NULL, NULL);
> +DELETE FROM t1;
> +
> +-- sync_slave_with_master
> +-- let $diff_table_1=master:test.t1
> +-- let $diff_table_2=slave:test.t1
> +-- source include/diff_tables.inc
> +
> +-- source include/master-slave-reset.inc
> +
> +SET @@global.slave_rows_search_algorithms= @saved_slave_rows_search_algorithms;
> \ No newline at end of file
> 
> === added file 'mysql-test/suite/rpl/t/rpl_row_hash_scan_sanity.test'
> --- a/mysql-test/suite/rpl/t/rpl_row_hash_scan_sanity.test	1970-01-01 00:00:00 +0000
> +++ b/mysql-test/suite/rpl/t/rpl_row_hash_scan_sanity.test	2010-11-23 00:53:54 +0000
> @@ -0,0 +1,172 @@
> +-- source include/master-slave.inc
> +-- source include/have_binlog_format_row.inc
> +-- source include/have_debug.inc
> +
> +#
> +# WL#5597 tests
> +#
> +# These tests check whether the correct algorithm for searching the
> +# rows was chosen, depending on the setting of
> +# @@global.slave_rows_search_algorithms and the table definition.
> +#
> +# We test all combinations, but leave out the offending ones:
> +# - @@global.slave_rows_search_algorithms= ''
> +# - @@global.slave_rows_search_algorithms= 'INDEX_SCAN'
> +#
> +# We do not allow setting only INDEX_SCAN or the empty value.
> +#
> +
> +-- connection master
> +
> +CREATE TABLE t1 (c1 INT);
> +CREATE TABLE t2 (c1 INT PRIMARY KEY);
> +CREATE TABLE t3 (c1 INT UNIQUE KEY);
> +CREATE TABLE t4 (c1 INT KEY);
> +
> +INSERT INTO t1 VALUES (1);
> +INSERT INTO t2 VALUES (1);
> +INSERT INTO t3 VALUES (1);
> +INSERT INTO t4 VALUES (1);
> +
> +-- sync_slave_with_master
> +SET @saved_slave_rows_search_algorithms= @@global.slave_rows_search_algorithms;
> +
> +###################### TABLE_SCAN assertions
> +
> +-- connection slave
> +SET @@global.slave_rows_search_algorithms= 'TABLE_SCAN';
> +-- connection master
> +
> +UPDATE t1 SET c1= 2;
> +-- let $expected_alg= 'TABLE_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t2 SET c1= 2;
> +-- let $expected_alg= 'TABLE_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t3 SET c1= 2;
> +-- let $expected_alg= 'TABLE_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t4 SET c1= 2;
> +-- let $expected_alg= 'TABLE_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +###################### TABLE_SCAN,INDEX_SCAN
> +
> +-- connection slave
> +SET @@global.slave_rows_search_algorithms= 'TABLE_SCAN,INDEX_SCAN';
> +-- connection master
> +
> +UPDATE t1 SET c1= 3;
> +-- let $expected_alg= 'TABLE_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t2 SET c1= 3;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t3 SET c1= 3;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t4 SET c1= 3;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +###################### TABLE_SCAN,HASH_SCAN
> +
> +-- connection slave
> +SET @@global.slave_rows_search_algorithms= 'TABLE_SCAN,HASH_SCAN';
> +-- connection master
> +
> +UPDATE t1 SET c1= 4;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t2 SET c1= 4;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t3 SET c1= 4;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t4 SET c1= 4;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +###################### HASH_SCAN
> +
> +-- connection slave
> +SET @@global.slave_rows_search_algorithms= 'HASH_SCAN';
> +-- connection master
> +
> +UPDATE t1 SET c1= 5;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t2 SET c1= 5;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t3 SET c1= 5;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t4 SET c1= 5;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +###################### HASH_SCAN,INDEX_SCAN
> +
> +-- connection slave
> +SET @@global.slave_rows_search_algorithms= 'HASH_SCAN,INDEX_SCAN';
> +-- connection master
> +
> +UPDATE t1 SET c1= 6;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t2 SET c1= 6;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t3 SET c1= 6;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t4 SET c1= 6;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +###################### HASH_SCAN,INDEX_SCAN,TABLE_SCAN
> +
> +-- connection slave
> +SET @@global.slave_rows_search_algorithms= 'HASH_SCAN,INDEX_SCAN,TABLE_SCAN';
> +-- connection master
> +
> +UPDATE t1 SET c1= 7;
> +-- let $expected_alg= 'HASH_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t2 SET c1= 7;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t3 SET c1= 7;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +UPDATE t4 SET c1= 7;
> +-- let $expected_alg= 'INDEX_SCAN'
> +-- source include/rpl_hash_scan_assertion.inc
> +
> +-- connection slave
> +
> +SET @@global.slave_rows_search_algorithms= @saved_slave_rows_search_algorithms;
> +-- source include/master-slave-reset.inc
> +
> +-- source include/master-slave-end.inc
> +
> 
> === modified file 'mysql-test/suite/rpl/t/rpl_row_idempotency.test'
> --- a/mysql-test/suite/rpl/t/rpl_row_idempotency.test	2010-01-12 17:52:02 +0000
> +++ b/mysql-test/suite/rpl/t/rpl_row_idempotency.test	2010-11-23 00:08:01 +0000
> @@ -13,319 +13,17 @@ call mtr.add_suppression("Slave: Can't f
>  call mtr.add_suppression("Slave: Cannot delete or update a parent row: a foreign key
> constraint fails .* Error_code: 1451");
>  call mtr.add_suppression("Slave: Cannot add or update a child row: a foreign key
> constraint fails .* Error_code: 1452");
>  
> +set @saved_slave_rows_search_algorithms= @@global.slave_rows_search_algorithms;
>  
> -# bug#31609 Not all RBR slave errors reported as errors
> -# bug#31552 Replication breaks when deleting rows from out-of-sync table
> -#           without PK
> +SET GLOBAL slave_rows_search_algorithms= 'INDEX_SCAN,TABLE_SCAN';
> +-- source extra/rpl_tests/rpl_row_idempotency.test
>  
> -# The default for slave-exec-mode option and server
> -# variable slave_exec_mode  is 'STRICT'.
> -# When 'STRICT' mode is set, the slave SQL thread will stop whenever
> -# the row to change is not found. In 'IDEMPOTENT' mode, the SQL thread
> -# will continue running and apply the row - replace if it's Write_rows event -
> -# or skip to the next event.
> +-- source include/master-slave-reset.inc
>  
> -# the previous part of the tests was with IDEMPOTENT slave's mode.
> -
> -
> -#
> -# Other than above idempotent errors dealing with foreign keys constraint
> -#
> -connection slave;
> -
> -set @old_slave_exec_mode= @@global.slave_exec_mode;
> -set @@global.slave_exec_mode= IDEMPOTENT;
> -
> -connection master;
> -
> -create table ti1 (b int primary key) engine = innodb;
> -create table ti2 (a int primary key, b int, foreign key (b) references ti1(b))
> -     engine = innodb;
> -set foreign_key_checks=1 /* ensure the check */;
> -
> -insert into ti1 values (1),(2),(3);
> -insert into ti2 set a=2, b=2;
> -
> -sync_slave_with_master;
> -
> -#connection slave;
> -select * from ti1 order by b /* must be (1),(2),(3) */;
> -insert into ti2 set a=1, b=1;
> -select * from ti2 order by b /* must be (1,1) (2,2) */;
> -
> -connection master;
> -
> -# from now on checking rbr specific idempotent errors
> -set @save_binlog_format= @@session.binlog_format;
> -set @@session.binlog_format= row;
> -delete from ti1 where b=1;
> -
> -select * from ti1 order by b /* must be (2),(3) */;
> -
> -# slave must catch up (expect some warnings in error.log)
> -sync_slave_with_master;
> -
> -#connection slave;
> -select * from ti1 order by b /* must stays as were on master (1),(2),(3) */;
> -
> -delete from ti1 where b=3;
> -
> -connection master;
> -insert into ti2 set a=3, b=3;
> -
> -# slave must catch up (expect some warnings in error.log)
> -sync_slave_with_master;
> -
> -#connection slave;
> -select * from ti2 order by b /* must be (1,1),(2,2) - not inserted */;
> -
> -
> -#
> -# Checking the new global sys variable
> -#
> -
> -connection slave;
> -
> -set global slave_exec_mode='IDEMPOTENT';
> -set global slave_exec_mode='STRICT';
> -
> -# checking mutual exclusion for the options
> ---error ER_WRONG_VALUE_FOR_VAR
> -set global slave_exec_mode='IDEMPOTENT,STRICT';
> -
> -select @@global.slave_exec_mode /* must be STRICT */;
> -
> -#
> -# Checking stops.
> -# In the following sections strict slave sql thread is going to
> -# stop when faces an idempotent error. In order to proceed
> -# the mode is temporarily switched to indempotent.
> -#
> -
> -#
> ---echo *** foreign keys errors as above now forces to stop
> -#
> -
> -connection master;
> -
> -set foreign_key_checks=0;
> -drop table ti2, ti1;
> -
> -create table ti1 (b int primary key) engine = innodb;
> -create table ti2 (a int primary key, b int, foreign key (b) references ti1(b))
> -     engine = innodb;
> -set foreign_key_checks=1 /* ensure the check */;
> -
> -insert into ti1 values (1),(2),(3);
> -insert into ti2 set a=2, b=2;
> -
> -sync_slave_with_master;
> -
> -#connection slave;
> -select * from ti1 order by b /* must be (1),(2),(3) */;
> ---echo *** conspire future problem
> -insert into ti2 set a=1, b=1;
> -select * from ti2 order by b /* must be (1,1) (2,2) */;
> -
> -connection master;
> -
> -delete from ti1 where b=1 /* offending delete event */;
> -select * from ti1 order by b /* must be (2),(3) */;
> -
> -# foreign key: row is referenced
> -
> ---echo *** slave must stop (Trying to delete a referenced foreing key)
> -connection slave;
> -source include/wait_for_slave_sql_to_stop.inc;
> -
> -let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> -disable_query_log;
> -eval SELECT "$last_error" AS Last_SQL_Error;
> -enable_query_log;
> -
> -select * from ti1 order by b /* must be (1),(2),(3) - not deleted */;
> -set foreign_key_checks= 0;
> -delete from ti2 where b=1;
> -set foreign_key_checks= 1;
> -set global slave_exec_mode='IDEMPOTENT';
> -start slave sql_thread;
> -connection master;
> -sync_slave_with_master;
> -#connection slave;
> -set global slave_exec_mode='STRICT';
> -
> -connection master;
> -
> -sync_slave_with_master;
> -
> -#connection slave;
> ---echo *** conspire the following insert failure
> -# foreign key: no referenced row
> -
> ---echo *** conspire future problem
> -delete from ti1 where b=3;
> -
> -connection master;
> -insert into ti2 set a=3, b=3 /* offending write event */;
> -
> ---echo *** slave must stop (Trying to insert an invalid foreign key)
> -connection slave;
> -source include/wait_for_slave_sql_to_stop.inc;
> -
> -let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> -disable_query_log;
> -eval SELECT "$last_error" AS Last_SQL_Error;
> -enable_query_log;
> -
> -select * from ti2 order by b /* must be (2,2) */;
> -set foreign_key_checks= 0;
> -insert into ti1 set b=3;
> -set foreign_key_checks= 1;
> -set global slave_exec_mode='IDEMPOTENT';
> -start slave sql_thread;
> -connection master;
> -sync_slave_with_master;
> -#connection slave;
> -set global slave_exec_mode='STRICT';
> -
> -connection master;
> -
> -sync_slave_with_master;
> -
> -select * from ti2 order by b /* must be (2,2),(3,3) */;
> -
> -# 
> ---echo *** other errors
> -# 
> -
> -# dup key insert
> -
> -#connection slave;
> ---echo *** conspiring query
> -insert into ti1 set b=1;
> -
> -connection master;
> -insert into ti1 set b=1 /* offending write event */;
> -
> ---echo *** slave must stop (Trying to insert a dupliacte key)
> -connection slave;
> -source include/wait_for_slave_sql_to_stop.inc;
> -
> -let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> -disable_query_log;
> -eval SELECT "$last_error" AS Last_SQL_Error;
> -enable_query_log;
> -
> -set foreign_key_checks= 0;
> -delete from ti1 where b=1;
> -set foreign_key_checks= 1;
> -set global slave_exec_mode='IDEMPOTENT';
> -start slave sql_thread;
> -connection master;
> -sync_slave_with_master;
> -#connection slave;
> -set global slave_exec_mode='STRICT';
> -
> -# key not found
> -
> -connection master;
> -
> -CREATE TABLE t1 (a INT PRIMARY KEY);
> -CREATE TABLE t2 (a INT);
> -INSERT INTO t1 VALUES (-1),(-2),(-3);
> -INSERT INTO t2 VALUES (-1),(-2),(-3);
> -sync_slave_with_master;
> -
> -#connection slave;
> -DELETE FROM t1 WHERE a = -2;
> -DELETE FROM t2 WHERE a = -2;
> -connection master;
> -DELETE FROM t1 WHERE a = -2;
> -
> ---echo *** slave must stop (Key was not found)
> -connection slave;
> -source include/wait_for_slave_sql_to_stop.inc;
> -
> -let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> -disable_query_log;
> -eval SELECT "$last_error" AS Last_SQL_Error;
> -enable_query_log;
> -
> -set global slave_exec_mode='IDEMPOTENT';
> -start slave sql_thread;
> -connection master;
> -sync_slave_with_master;
> -#connection slave;
> -set global slave_exec_mode='STRICT';
> -
> -connection master;
> -DELETE FROM t2 WHERE a = -2; 
> ---echo *** slave must stop (Key was not found)
> -connection slave;
> -source include/wait_for_slave_sql_to_stop.inc;
> -
> -let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> -disable_query_log;
> -eval SELECT "$last_error" AS Last_SQL_Error;
> -enable_query_log;
> -
> -set global slave_exec_mode='IDEMPOTENT';
> -start slave sql_thread;
> -connection master;
> -sync_slave_with_master;
> -#connection slave;
> -set global slave_exec_mode='STRICT';
> -
> -UPDATE t1 SET a = 1 WHERE a = -1;
> -UPDATE t2 SET a = 1 WHERE a = -1;
> -
> -connection master;
> -UPDATE t1 SET a = 1 WHERE a = -1;
> -
> ---echo *** slave must stop (Key was not found)
> -connection slave;
> -source include/wait_for_slave_sql_to_stop.inc;
> -
> -let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> -disable_query_log;
> -eval SELECT "$last_error" AS Last_SQL_Error;
> -enable_query_log;
> -
> -set global slave_exec_mode='IDEMPOTENT';
> -start slave sql_thread;
> -connection master;
> -sync_slave_with_master;
> -#connection slave;
> -set global slave_exec_mode='STRICT';
> -
> -
> -connection master;
> -UPDATE t2 SET a = 1 WHERE a = -1;
> -
> ---echo *** slave must stop (Key was not found)
> -connection slave;
> -source include/wait_for_slave_sql_to_stop.inc;
> -
> -let $last_error = query_get_value("SHOW SLAVE STATUS", Last_SQL_Errno, 1);
> -disable_query_log;
> -eval SELECT "$last_error" AS Last_SQL_Error;
> -enable_query_log;
> -
> -set global slave_exec_mode='IDEMPOTENT';
> -start slave sql_thread;
> -connection master;
> -sync_slave_with_master;
> -#connection slave;
> -SET @@global.slave_exec_mode= @old_slave_exec_mode;
> -
> -# cleanup for bug#31609 tests
> -
> -connection master;
> -
> -drop table t1,t2,ti2,ti1;
> -sync_slave_with_master;
> -set @@global.slave_exec_mode= @old_slave_exec_mode;
> +SET GLOBAL slave_rows_search_algorithms= 'INDEX_SCAN,HASH_SCAN';
> +-- source extra/rpl_tests/rpl_row_idempotency.test
>  
> +set @@global.slave_rows_search_algorithms= @saved_slave_rows_search_algorithms;
>  --source include/master-slave-end.inc
>  
>  --echo *** end of tests
> 
> === modified file 'mysql-test/suite/sys_vars/r/all_vars.result'
> --- a/mysql-test/suite/sys_vars/r/all_vars.result	2010-10-11 18:03:03 +0000
> +++ b/mysql-test/suite/sys_vars/r/all_vars.result	2010-11-22 21:10:41 +0000
> @@ -10,11 +10,13 @@ There should be *no* long test name list
>  select variable_name as `There should be *no* variables listed below:` from t2
>  left join t1 on variable_name=test_name where test_name is null;
>  There should be *no* variables listed below:
> +SLAVE_ROWS_SEARCH_ALGORITHMS
>  INNODB_MONITOR_COUNTER_RESET
>  INNODB_MONITOR_COUNTER_RESET_ALL
>  INNODB_MONITOR_COUNTER_ON
>  INNODB_MONITOR_COUNTER_OFF
>  INNODB_FILE_FORMAT_MAX
> +SLAVE_ROWS_SEARCH_ALGORITHMS
>  INNODB_MONITOR_COUNTER_RESET
>  INNODB_MONITOR_COUNTER_RESET_ALL
>  INNODB_MONITOR_COUNTER_ON
> 
> === added file
> 'mysql-test/suite/sys_vars/r/slave_rows_search_algorithms_basic.result'
> ---
> a/mysql-test/suite/sys_vars/r/slave_rows_search_algorithms_basic.result	1970-01-01
> 00:00:00 +0000
> +++
> b/mysql-test/suite/sys_vars/r/slave_rows_search_algorithms_basic.result	2010-11-23
> 00:08:01 +0000
> @@ -0,0 +1,51 @@
> +set @saved_slave_rows_search_algorithms = @@global.slave_rows_search_algorithms;
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +TABLE_SCAN,INDEX_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +TABLE_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='HASH_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +HASH_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +INDEX_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN,HASH_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +TABLE_SCAN,HASH_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN,HASH_SCAN,INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +TABLE_SCAN,INDEX_SCAN,HASH_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN,INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +TABLE_SCAN,INDEX_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='HASH_SCAN,INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +INDEX_SCAN,HASH_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_5CAN';
> +ERROR 42000: Variable 'slave_rows_search_algorithms' can't be set to the value of
> 'TABLE_5CAN'
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +INDEX_SCAN,HASH_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='';
> +ERROR 42000: Variable 'slave_rows_search_algorithms' can't be set to the value of
> ''
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +INDEX_SCAN,HASH_SCAN
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='1';
> +ERROR 42000: Variable 'slave_rows_search_algorithms' can't be set to the value of
> '1'
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +INDEX_SCAN,HASH_SCAN
> +set global slave_rows_search_algorithms = @saved_slave_rows_search_algorithms;
> +SELECT @@global.slave_rows_search_algorithms;
> +@@global.slave_rows_search_algorithms
> +TABLE_SCAN,INDEX_SCAN
> 
> === added file 'mysql-test/suite/sys_vars/t/slave_rows_search_algorithms_basic.test'
> --- a/mysql-test/suite/sys_vars/t/slave_rows_search_algorithms_basic.test	1970-01-01
> 00:00:00 +0000
> +++ b/mysql-test/suite/sys_vars/t/slave_rows_search_algorithms_basic.test	2010-11-23
> 00:08:01 +0000
> @@ -0,0 +1,44 @@
> +--source include/not_embedded.inc
> +
> +set @saved_slave_rows_search_algorithms = @@global.slave_rows_search_algorithms;
> +
> +
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='HASH_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN,HASH_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN,HASH_SCAN,INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_SCAN,INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='HASH_SCAN,INDEX_SCAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +
> +# checking that setting variable to a non existing value raises error
> +--error ER_WRONG_VALUE_FOR_VAR
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='TABLE_5CAN';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +--error ER_WRONG_VALUE_FOR_VAR
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +--error ER_WRONG_VALUE_FOR_VAR
> +SET GLOBAL SLAVE_ROWS_SEARCH_ALGORITHMS='1';
> +SELECT @@global.slave_rows_search_algorithms;
> +
> +set global slave_rows_search_algorithms = @saved_slave_rows_search_algorithms;
> +SELECT @@global.slave_rows_search_algorithms;
> 
> === modified file 'sql/log_event.cc'
> --- a/sql/log_event.cc	2010-10-17 23:27:40 +0000
> +++ b/sql/log_event.cc	2010-11-23 00:08:01 +0000
> @@ -7556,2165 +7556,2487 @@ int Rows_log_event::do_add_row_data(ucha
>  #endif
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Rows_log_event::do_apply_event(Relay_log_info const *rli)
> -{
> -  DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
> -  int error= 0;
> -  /*
> -    If m_table_id == ~0UL, then we have a dummy event that does not
> -    contain any data.  In that case, we just remove all tables in the
> -    tables_to_lock list, close the thread tables, and return with
> -    success.
> -   */
> -  if (m_table_id == ~0UL)
> -  {
> -    /*
> -       This one is supposed to be set: just an extra check so that
> -       nothing strange has happened.
> -     */
> -    DBUG_ASSERT(get_flags(STMT_END_F));
>  
> -    const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
> -    thd->clear_error();
> -    DBUG_RETURN(0);
> -  }
>  
> -  /*
> -    'thd' has been set by exec_relay_log_event(), just before calling
> -    do_apply_event(). We still check here to prevent future coding
> -    errors.
> -  */
> -  DBUG_ASSERT(rli->info_thd == thd);
> +/**
> +  Checks if any of the columns in the given table is
> +  signaled in the bitmap.
>  
> -  /*
> -    If there is no locks taken, this is the first binrow event seen
> -    after the table map events.  We should then lock all the tables
> -    used in the transaction and proceed with execution of the actual
> -    event.
> -  */
> -  if (!thd->lock)
> -  {
> -    /*
> -      Lock_tables() reads the contents of thd->lex, so they must be
> -      initialized.
> +  For each column in the given table checks if it is
> +  signaled in the bitmap. This is most useful when deciding
> +  whether a before image (BI) can be used or not for
> +  searching a row. If no column is signaled, then the
> +  image cannot be used for searching a record (regardless
> +  of using position(), index scan or table scan). Here is
> +  an example:
>  
> -      We also call the mysql_reset_thd_for_next_command(), since this
> -      is the logical start of the next "statement". Note that this
> -      call might reset the value of current_stmt_binlog_format, so
> -      we need to do any changes to that value after this function.
> -    */
> -    lex_start(thd);
> -    mysql_reset_thd_for_next_command(thd);
> -    /*
> -      The current statement is just about to begin and 
> -      has not yet modified anything. Note, all.modified is reset
> -      by mysql_reset_thd_for_next_command.
> -    */
> -    thd->transaction.stmt.modified_non_trans_table= FALSE;
> -    /*
> -      This is a row injection, so we flag the "statement" as
> -      such. Note that this code is called both when the slave does row
> -      injections and when the BINLOG statement is used to do row
> -      injections.
> -    */
> -    thd->lex->set_stmt_row_injection();
> +  MASTER> SET @@binlog_row_image='MINIMAL';
> +  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
> +  SLAVE> CREATE TABLE t1 (a int, b int);
> +  MASTER> INSERT INTO t1 VALUES (1,2,3);
> +  MASTER> UPDATE t1 SET a=2 WHERE b=2;
>  
> -    /*
> -      There are a few flags that are replicated with each row event.
> -      Make sure to set/clear them before executing the main body of
> -      the event.
> -    */
> -    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
> -        thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
> -    else
> -        thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
> +  For the update statement only the PK (column c) is
> +  logged in the before image (BI). As such, given that
> +  the slave has no column c, it will not be able to
> +  find the row, because BI has no values for the columns
> +  the slave knows about (column a and b).
>  
> -    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
> -        thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
> -    else
> -        thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
> -    /* A small test to verify that objects have consistent types */
> -    DBUG_ASSERT(sizeof(thd->variables.option_bits) ==
> sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
> +  @param table   the table reference on the slave.
> +  @param cols the bitmap signaling columns available in
> +                 the BI.
>  
> -    if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
> -    {
> -      uint actual_error= thd->stmt_da->sql_errno();
> -      if (thd->is_slave_error || thd->is_fatal_error)
> -      {
> -        /*
> -          Error reporting borrowed from Query_log_event with many excessive
> -          simplifications. 
> -          We should not honour --slave-skip-errors at this point as we are
> -          having severe errors which should not be skiped.
> -        */
> -        rli->report(ERROR_LEVEL, actual_error,
> -                    "Error executing row event: '%s'",
> -                    (actual_error ? thd->stmt_da->message() :
> -                     "unexpected success or fatal error"));
> -        thd->is_slave_error= 1;
> -      }
> -      const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
> -      DBUG_RETURN(actual_error);
> -    }
> +  @return TRUE if BI contains usable colums for searching,
> +          FALSE otherwise.
> +*/
> +static
> +my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
> +{
> +  DBUG_ENTER("is_any_column_signaled_for_table");
>  
> -    /*
> -      When the open and locking succeeded, we check all tables to
> -      ensure that they still have the correct type.
> +  int nfields_set= 0;
> +  for (Field **ptr=table->field ;
> +       *ptr && ((*ptr)->field_index < cols->n_bits);
> +       ptr++)
> +  {
> +    if (bitmap_is_set(cols, (*ptr)->field_index))
> +      nfields_set++;
> +  }
>  
> -      We can use a down cast here since we know that every table added
> -      to the tables_to_lock is a RPL_TABLE_LIST.
> -    */
> +  DBUG_RETURN (nfields_set != 0);
> +}
>  
> -    {
> -      DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock:
> %p",
> -                           rli->tables_to_lock));
> -      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
> -      for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
> -      {
> -        TABLE *conv_table;
> -        if (!ptr->m_tabledef.compatible_with(thd,
> const_cast<Relay_log_info*>(rli),
> -                                             ptr->table, &conv_table))
> -        {
> -          DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
> -                               ptr->table->s->db.str,
> -                               ptr->table->s->table_name.str));
> -          /*
> -            We should not honour --slave-skip-errors at this point as we are
> -            having severe errors which should not be skiped.
> -          */
> -          thd->is_slave_error= 1;
> -         
> const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
> -          DBUG_RETURN(ERR_BAD_TABLE_DEF);
> -        }
> -        DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
> -                             " - conv_table: %p",
> -                             ptr->table->s->db.str,
> -                             ptr->table->s->table_name.str, conv_table));
> -        ptr->m_conv_table= conv_table;
> -      }
> -    }
> +/**
> +  Checks if the fields in the given key are signaled in
> +  the bitmap.
>  
> -    /*
> -      ... and then we add all the tables to the table map and but keep
> -      them in the tables to lock list.
> +  Validates whether the before image is usable for the
> +  given key. It can be the case that the before image
> +  does not contain values for the key (eg, master was
> +  using 'minimal' option for image logging and slave has
> +  different index structure on the table). Here is an
> +  example:
>  
> -      We also invalidate the query cache for all the tables, since
> -      they will now be changed.
> +  MASTER> SET @@binlog_row_image='MINIMAL';
> +  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
> +  SLAVE> CREATE TABLE t1 (a int, b int, c int, key(a,c));
> +  MASTER> INSERT INTO t1 VALUES (1,2,3);
> +  MASTER> UPDATE t1 SET a=2 WHERE b=2;
>  
> -      TODO [/Matz]: Maybe the query cache should not be invalidated
> -      here? It might be that a table is not changed, even though it
> -      was locked for the statement.  We do know that each
> -      Rows_log_event contain at least one row, so after processing one
> -      Rows_log_event, we can invalidate the query cache for the
> -      associated table.
> -     */
> -    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
> -    {
> -     
> const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id,
> ptr->table);
> -    }
> -#ifdef HAVE_QUERY_CACHE
> -    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
> -#endif
> -  }
> +  When finding the row on the slave, one cannot use the
> +  index (a,c) to search for the row, because there is only
> +  data in the before image for column c. This function
> +  checks the fields needed for a given key and searches
> +  the bitmap to see if all the fields required are
> +  signaled.
>  
> -  TABLE* 
> -    table= 
> -    m_table=
> const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
> +  @param keyinfo  reference to key.
> +  @param cols     the bitmap signaling which columns
> +                  have available data.
>  
> -  DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table,
> m_table_id));
> +  @return TRUE if all fields are signaled in the bitmap
> +          for the given key, FALSE otherwise.
> +*/
> +static
> +my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
> +{
> +  DBUG_ENTER("are_all_columns_signaled_for_key");
>  
> -  if (table)
> +  for (uint i=0 ; i < keyinfo->key_parts ;i++)
>    {
> -    bool transactional_table= table->file->has_transactions();
> -    /*
> -      table == NULL means that this table should not be replicated
> -      (this was set up by Table_map_log_event::do_apply_event()
> -      which tested replicate-* rules).
> -    */
> +    uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
> +    if (fieldnr >= cols->n_bits ||
> +        !bitmap_is_set(cols, fieldnr))
> +      DBUG_RETURN(FALSE);
> +  }
>  
> -    /*
> -      It's not needed to set_time() but
> -      1) it continues the property that "Time" in SHOW PROCESSLIST shows how
> -      much slave is behind
> -      2) it will be needed when we allow replication from a table with no
> -      TIMESTAMP column to a table with one.
> -      So we call set_time(), like in SBR. Presently it changes nothing.
> -    */
> -    thd->set_time((time_t)when);
> +  DBUG_RETURN(TRUE);
> +}
>  
> -    /*
> -      Now we are in a statement and will stay in a statement until we
> -      see a STMT_END_F.
> +/**
> +  Searches the table for a given key that can be used
> +  according to the existing values, ie, columns set
> +  in the bitmap.
>  
> -      We set this flag here, before actually applying any rows, in
> -      case the SQL thread is stopped and we need to detect that we're
> -      inside a statement and halting abruptly might cause problems
> -      when restarting.
> -     */
> -    const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
> +  The caller can specify which type of key to find by
> +  setting the following flags in the key_type parameter:
>  
> -     if ( m_width == table->s->fields &&
> bitmap_is_set_all(&m_cols))
> -      set_flags(COMPLETE_ROWS_F);
> +    - PRI_KEY_FLAG
> +      Returns the primary key.
>  
> -    /* 
> -      Set tables write and read sets.
> -      
> -      Read_set contains all slave columns (in case we are going to fetch
> -      a complete record from slave)
> -      
> -      Write_set equals the m_cols bitmap sent from master but it can be 
> -      longer if slave has extra columns. 
> -     */ 
> +    - UNIQUE_KEY_FLAG
> +      Returns a unique key (flagged with HA_NOSAME)
>  
> -    DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
> -    
> -    bitmap_set_all(table->read_set);
> -    if (get_type_code() == DELETE_ROWS_EVENT)
> -        bitmap_intersect(table->read_set,&m_cols);
> -
> -    bitmap_set_all(table->write_set);
> -    if (!get_flags(COMPLETE_ROWS_F))
> -    {
> -      if (get_type_code() == UPDATE_ROWS_EVENT)
> -        bitmap_intersect(table->write_set,&m_cols_ai);
> -      else /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
> -        bitmap_intersect(table->write_set,&m_cols);
> -    }
> -
> -    this->slave_exec_mode= slave_exec_mode_options; // fix the mode
> +    - MULTIPLE_KEY_FLAG
> +      Returns a key that is not unique (flagged with HA_NOSAME
> +      and without HA_NULL_PART_KEY) nor PK.
>  
> -    // Do event specific preparations 
> -    error= do_before_row_operations(rli);
> +  The above flags can be used together, in which case, the
> +  search is conducted in the above listed order. Eg, the
> +  following flag:
>  
> -    // row processing loop
> +    (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)
>  
> -    while (error == 0)
> -    {
> -      /* in_use can have been set to NULL in close_tables_for_reopen */
> -      THD* old_thd= table->in_use;
> -      if (!table->in_use)
> -        table->in_use= thd;
> +  means that a primary key is returned if it is suitable. If
> +  not then the unique keys are searched. If no unique key is
> +  suitable, then the keys are searched. Finally, if no key
> +  is suitable, MAX_KEY is returned.
>  
> -      error= do_exec_row(rli);
> +  @param table    reference to the table.
> +  @param bi_cols  a bitmap that filters out columns that should
> +                  not be considered while searching the key.
> +                  Columns that should be considered are set.
> +  @param key_type the type of key to search for.
>  
> -      DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
> -      DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
> +  @return MAX_KEY if no key, according to the key_type specified
> +          is suitable. Returns the key otherwise.
>  
> -      table->in_use = old_thd;
> +*/
> +static
> +uint
> +search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
> +{
> +  DBUG_ENTER("search_key_in_table");
>  
> -      if (error)
> -      {
> -        int actual_error= convert_handler_error(error, thd, table);
> -        bool idempotent_error= (idempotent_error_code(error) &&
> -                               (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
> -        bool ignored_error= (idempotent_error == 0 ?
> -                             ignored_error_code(actual_error) : 0);
> +  KEY *keyinfo;
> +  uint res= MAX_KEY;
> +  uint key;
>  
> -        if (idempotent_error || ignored_error)
> -        {
> -          if (global_system_variables.log_warnings)
> -            slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
> -                                    get_type_str(),
> -                                   
> const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
> -                                    (ulong) log_pos);
> -          clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
> -          error= 0;
> -          if (idempotent_error == 0)
> -            break;
> -        }
> -      }
> +  if (key_type & PRI_KEY_FLAG && (table->s->primary_key <
> MAX_KEY))
> +  {
> +    keyinfo= table->s->key_info + (uint) table->s->primary_key;
> +    if (are_all_columns_signaled_for_key(keyinfo, bi_cols))
> +      DBUG_RETURN(table->s->primary_key);
> +  }
>  
> +  if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
> +  {
> +    for (key=0,keyinfo= table->key_info ;
> +         (key < table->s->keys) && (res == MAX_KEY);
> +         key++,keyinfo++)
> +    {
>        /*
> -       If m_curr_row_end  was not set during event execution (e.g., because
> -       of errors) we can't proceed to the next row. If the error is transient
> -       (i.e., error==0 at this point) we must call unpack_current_row() to set 
> -       m_curr_row_end.
> -      */ 
> -   
> -      DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
> -                          (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong)
> m_rows_end));
> -
> -      if (!m_curr_row_end && !error)
> -        error= unpack_current_row(rli, &m_cols);
> -  
> -      // at this moment m_curr_row_end should be set
> -      DBUG_ASSERT(error || m_curr_row_end != NULL); 
> -      DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
> -      DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
> -  
> -      m_curr_row= m_curr_row_end;
> - 
> -      if (error == 0 && !transactional_table)
> -        thd->transaction.all.modified_non_trans_table=
> -          thd->transaction.stmt.modified_non_trans_table= TRUE;
> -
> -      if (m_curr_row == m_rows_end)
> -        break;
> -    } // row processing loop
> +        - Unique keys cannot be disabled, thence we skip the check.
> +        - Skip unique keys with nullable parts
> +        - Skip primary keys
> +      */
> +      if (!((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) != HA_NOSAME)
> ||
> +          (key == table->s->primary_key))
> +        continue;
> +      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
> +           key : MAX_KEY;
>  
> -    {/**
> -         The following failure injecion works in cooperation with tests 
> -         setting @@global.debug= 'd,stop_slave_middle_group'.
> -         The sql thread receives the killed status and will proceed 
> -         to shutdown trying to finish incomplete events group.
> -     */
> -      DBUG_EXECUTE_IF("stop_slave_middle_group",
> -                      if (thd->transaction.all.modified_non_trans_table)
> -                        const_cast<Relay_log_info*>(rli)->abort_slave=
> 1;);
> +      if (res < MAX_KEY)
> +        DBUG_RETURN(res);
>      }
> +  }
>  
> -    if ((error= do_after_row_operations(rli, error)) &&
> -        ignored_error_code(convert_handler_error(error, thd, table)))
> +  if (key_type & MULTIPLE_KEY_FLAG && table->s->keys)
> +  {
> +    for (key=0,keyinfo= table->key_info ;
> +         (key < table->s->keys) && (res == MAX_KEY);
> +         key++,keyinfo++)
>      {
> +      /*
> +        - Skip innactive keys
> +        - Skip unique keys without nullable parts
> +        - Skip primary keys
> +      */
> +      if (!(table->s->keys_in_use.is_set(key)) ||
> +          ((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)
> ||
> +          (key == table->s->primary_key))
> +        continue;
>  
> -      if (global_system_variables.log_warnings)
> -        slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
> -                                get_type_str(),
> -                               
> const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
> -                                (ulong) log_pos);
> -      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
> -      error= 0;
> -    }
> -  } // if (table)
> -
> -  
> -  if (error)
> -  {
> -    slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
> -                             get_type_str(),
> -                            
> const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
> -                             (ulong) log_pos);
> -    /*
> -      @todo We should probably not call
> -      reset_current_stmt_binlog_format_row() from here.
> +      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ?
> +           key : MAX_KEY;
>  
> -      Note: this applies to log_event_old.cc too.
> -      /Sven
> -    */
> -    thd->reset_current_stmt_binlog_format_row();
> -    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
> -    thd->is_slave_error= 1;
> -    DBUG_RETURN(error);
> +      if (res < MAX_KEY)
> +        DBUG_RETURN(res);
> +    }
>    }
>  
> -  if (get_flags(STMT_END_F))
> -    if ((error= rows_event_stmt_cleanup(rli, thd)))
> -      rli->report(ERROR_LEVEL, error,
> -                  "Error in %s event: commit of row events failed, "
> -                  "table `%s`.`%s`",
> -                  get_type_str(), m_table->s->db.str,
> -                  m_table->s->table_name.str);
> -
> -  DBUG_RETURN(error);
> +  DBUG_RETURN(res);
>  }
>  
> -Log_event::enum_skip_reason
> -Rows_log_event::do_shall_skip(Relay_log_info *rli)
> +static uint decide_row_lookup_algorithm(TABLE* table, MY_BITMAP *cols, uint
> event_type)
>  {
> -  /*
> -    If the slave skip counter is 1 and this event does not end a
> -    statement, then we should not start executing on the next event.
> -    Otherwise, we defer the decision to the normal skipping logic.
> -  */
> -  if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
> -    return Log_event::EVENT_SKIP_IGNORE;
> -  else
> -    return Log_event::do_shall_skip(rli);
> -}
> -
> -/**
> -   The function is called at Rows_log_event statement commit time,
> -   normally from Rows_log_event::do_update_pos() and possibly from
> -   Query_log_event::do_apply_event() of the COMMIT.
> -   The function commits the last statement for engines, binlog and
> -   releases resources have been allocated for the statement.
> -  
> -   @retval  0         Ok.
> -   @retval  non-zero  Error at the commit.
> - */
> +  DBUG_ENTER("decide_row_lookup_algorithm");
>  
> -static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
> -{
> -  int error;
> -  {
> -    /*
> -      This is the end of a statement or transaction, so close (and
> -      unlock) the tables we opened when processing the
> -      Table_map_log_event starting the statement.
> +  uint res= Rows_log_event::ROW_LOOKUP_NOT_NEEDED;
> +  uint key_index;
> +  if (event_type == WRITE_ROWS_EVENT)
> +    DBUG_RETURN(res);
>  
> -      OBSERVER.  This will clear *all* mappings, not only those that
> -      are open for the table. There is not good handle for on-close
> -      actions for tables.
> +  key_index= search_key_in_table(table, cols, (PRI_KEY_FLAG | 
> +                                               UNIQUE_KEY_FLAG | 
> +                                               MULTIPLE_KEY_FLAG));
>  
> -      NOTE. Even if we have no table ('table' == 0) we still need to be
> -      here, so that we increase the group relay log position. If we didn't, we
> -      could have a group relay log position which lags behind "forever"
> -      (assume the last master's transaction is ignored by the slave because of
> -      replicate-ignore rules).
> -    */
> -    error= thd->binlog_flush_pending_rows_event(TRUE);
> +  if (((key_index != MAX_KEY) && (key_index < table->s->keys))
> &&
> +      (slave_rows_search_algorithms_options & SLAVE_ROWS_INDEX_SCAN))
> +    res= Rows_log_event::ROW_LOOKUP_INDEX_SCAN;
> +  else
> +  {
> +    /**
> +       Blackhole does not use hash scan.  (NOTE: This is a hackish
> +       implementation and I know it).
> +
> +       TODO: remove this DB_TYPE_BLACKHOLE_DB dependency.
> +    */


Please, remove the comment "(NOTE: This is a hackish implementation and I know it)"
and explains exactly why we cannot use HASH_SCAN with a BLACKHOLE engine.

IMHO, there is no property we could use to remove this hack. Any idea?

> +    if ((slave_rows_search_algorithms_options & SLAVE_ROWS_HASH_SCAN)
> &&
> +        (table->s->db_type()->db_type != DB_TYPE_BLACKHOLE_DB))
> +      res=  Rows_log_event::ROW_LOOKUP_HASH_SCAN;
> +    else
> +    {
> +      DBUG_ASSERT((table->s->db_type()->db_type == DB_TYPE_BLACKHOLE_DB) ||
> 
> +                  slave_rows_search_algorithms_options &
> SLAVE_ROWS_TABLE_SCAN);
> +      res= Rows_log_event::ROW_LOOKUP_TABLE_SCAN;
> +    }
> +  }
>  
> -    /*
> -      If this event is not in a transaction, the call below will, if some
> -      transactional storage engines are involved, commit the statement into
> -      them and flush the pending event to binlog.
> -      If this event is in a transaction, the call will do nothing, but a
> -      Xid_log_event will come next which will, if some transactional engines
> -      are involved, commit the transaction and flush the pending event to the
> -      binlog.
> -    */
> -    error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
> +#ifndef DBUG_OFF
> +  const char* s= ((res == Rows_log_event::ROW_LOOKUP_TABLE_SCAN) ? "TABLE_SCAN" :
> +                  ((res == Rows_log_event::ROW_LOOKUP_HASH_SCAN) ? "HASH_SCAN" : 
> +                   "INDEX_SCAN"));
> +
> +  // only for testing purposes
> +  slave_rows_last_search_algorithm_used= res;
> +  DBUG_PRINT("debug", ("Row lookup method: %s", s));
> +#endif
>  
> -    /*
> -      Now what if this is not a transactional engine? we still need to
> -      flush the pending event to the binlog; we did it with
> -      thd->binlog_flush_pending_rows_event(). Note that we imitate
> -      what is done for real queries: a call to
> -      ha_autocommit_or_rollback() (sometimes only if involves a
> -      transactional engine), and a call to be sure to have the pending
> -      event flushed.
> -    */
> +  
> +  DBUG_RETURN(res);
> +}
>  
> -    /*
> -      @todo We should probably not call
> -      reset_current_stmt_binlog_format_row() from here.
> +/*
> +  Compares table->record[0] and table->record[1]
>  
> -      Note: this applies to log_event_old.cc too
> +  Returns TRUE if different.
> +*/
> +static bool record_compare(TABLE *table, MY_BITMAP *cols)
> +{
> +  DBUG_ENTER("record_compare");
>  
> -      Btw, the previous comment about transactional engines does not
> -      seem related to anything that happens here.
> -      /Sven
> -    */
> -    thd->reset_current_stmt_binlog_format_row();
> +  /*
> +    Need to set the X bit and the filler bits in both records since
> +    there are engines that do not set it correctly.
>  
> -    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
> -  }
> -  return error;
> -}
> +    In addition, since MyISAM checks that one hasn't tampered with the
> +    record, it is necessary to restore the old bytes into the record
> +    after doing the comparison.
>  
> -/**
> -   The method either increments the relay log position or
> -   commits the current statement and increments the master group 
> -   possition if the event is STMT_END_F flagged and
> -   the statement corresponds to the autocommit query (i.e replicated
> -   without wrapping in BEGIN/COMMIT)
> +    TODO[record format ndb]: Remove it once NDB returns correct
> +    records. Check that the other engines also return correct records.
> +   */
>  
> -   @retval 0         Success
> -   @retval non-zero  Error in the statement commit
> - */
> -int
> -Rows_log_event::do_update_pos(Relay_log_info *rli)
> -{
> -  DBUG_ENTER("Rows_log_event::do_update_pos");
> -  int error= 0;
> +  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> +  DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
>  
> -  DBUG_PRINT("info", ("flags: %s",
> -                      get_flags(STMT_END_F) ? "STMT_END_F " : ""));
> +  bool result= FALSE;
> +  uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
>  
> -  if (get_flags(STMT_END_F))
> +  if (table->s->null_bytes > 0)
>    {
> -    /*
> -      Indicate that a statement is finished.
> -      Step the group log position if we are not in a transaction,
> -      otherwise increase the event log position.
> -    */
> -    rli->stmt_done(log_pos);
> -    /*
> -      Clear any errors in thd->net.last_err*. It is not known if this is
> -      needed or not. It is believed that any errors that may exist in
> -      thd->net.last_err* are allowed. Examples of errors are "key not
> -      found", which is produced in the test case rpl_row_conflicts.test
> -    */
> -    thd->clear_error();
> +    for (int i = 0 ; i < 2 ; ++i)
> +    {
> +      /*
> +        If we have an X bit then we need to take care of it.
> +      */
> +      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
> +      {
> +        saved_x[i]= table->record[i][0];
> +        table->record[i][0]|= 1U;
> +      }
> +
> +      /*
> +         If (last_null_bit_pos == 0 && null_bytes > 1), then:
> +
> +         X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
> +
> +         Ie, the entire byte is used.
> +      */
> +      if (table->s->last_null_bit_pos > 0)
> +      {
> +        saved_filler[i]= table->record[i][table->s->null_bytes - 1];
> +        table->record[i][table->s->null_bytes - 1]|=
> +          256U - (1U << table->s->last_null_bit_pos);
> +      }
> +    }
>    }
> -  else
> +
> +  if (table->s->blob_fields + table->s->varchar_fields == 0 &&
> +      bitmap_is_set_all(cols))
>    {
> -    rli->inc_event_relay_log_pos();
> +    result= cmp_record(table,record[1]);
> +    goto record_compare_exit;
>    }
>  
> -  DBUG_RETURN(error);
> -}
> +  /* Compare null bits */
> +  if (bitmap_is_set_all(cols) &&
> +      memcmp(table->null_flags,
> +       table->null_flags+table->s->rec_buff_length,
> +       table->s->null_bytes))
> +  {
> +    result= TRUE;       // Diff in NULL value
> +    goto record_compare_exit;
> +  }
>  
> -#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> +  /* Compare updated fields */
> +  for (Field **ptr=table->field ;
> +       *ptr && ((*ptr)->field_index < cols->n_bits);
> +       ptr++)
> +  {
> +    if (bitmap_is_set(cols, (*ptr)->field_index))
> +    {
> +      if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
> +      {
> +        result= TRUE;
> +        goto record_compare_exit;
> +      }
> +    }
> +  }
>  
> -#ifndef MYSQL_CLIENT
> -bool Rows_log_event::write_data_header(IO_CACHE *file)
> -{
> -  uchar buf[ROWS_HEADER_LEN];	// No need to init the buffer
> -  DBUG_ASSERT(m_table_id != ~0UL);
> -  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
> -                  {
> -                    int4store(buf + 0, m_table_id);
> -                    int2store(buf + 4, m_flags);
> -                    return (my_b_safe_write(file, buf, 6));
> -                  });
> -  int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
> -  int2store(buf + RW_FLAGS_OFFSET, m_flags);
> -  return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
> +record_compare_exit:
> +  /*
> +    Restore the saved bytes.
> +
> +    TODO[record format ndb]: Remove this code once NDB returns the
> +    correct record format.
> +  */
> +  if (table->s->null_bytes > 0)
> +  {
> +    for (int i = 0 ; i < 2 ; ++i)
> +    {
> +      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
> +        table->record[i][0]= saved_x[i];
> +
> +      if (table->s->last_null_bit_pos)
> +        table->record[i][table->s->null_bytes - 1]= saved_filler[i];
> +    }
> +  }
> +
> +  DBUG_RETURN(result);
>  }
>  
> -bool Rows_log_event::write_data_body(IO_CACHE*file)
> +void Rows_log_event::do_post_row_operations(Relay_log_info const *rli, int error)
>  {
> +        
>    /*
> -     Note that this should be the number of *bits*, not the number of
> -     bytes.
> +    If m_curr_row_end  was not set during event execution (e.g., because
> +    of errors) we can't proceed to the next row. If the error is transient
> +    (i.e., error==0 at this point) we must call unpack_current_row() to set
> +    m_curr_row_end.
>    */
> -  uchar sbuf[sizeof(m_width) + 1];
> -  my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
> -  bool res= false;
> -  uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
> -  DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
> +  
> +  DBUG_PRINT("info", ("curr_row: 0x%lu; curr_row_end: 0x%lu; rows_end: 0x%lu",
> +                      (ulong) m_curr_row, (ulong) m_curr_row_end, (ulong)
> m_rows_end));
> +  
> +  if (!m_curr_row_end && !error)
> +  {
> +    error= unpack_current_row(rli, &m_cols);
> +  }
>  
> -  DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
> -  res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
> +  // at this moment m_curr_row_end should be set
> +  DBUG_ASSERT(error || m_curr_row_end != NULL);
> +  DBUG_ASSERT(error || m_curr_row <= m_curr_row_end);
> +  DBUG_ASSERT(error || m_curr_row_end <= m_rows_end);
> +  
> +  m_curr_row= m_curr_row_end;
> +  
> +  if (error == 0 && !m_table->file->has_transactions())
> +    thd->transaction.all.modified_non_trans_table=
> +      thd->transaction.stmt.modified_non_trans_table= TRUE;
> +  
> +}
>  
> -  DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
> -  res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
> -                              no_bytes_in_map(&m_cols));
> -  /*
> -    TODO[refactor write]: Remove the "down cast" here (and elsewhere).
> -   */
> -  if (get_type_code() == UPDATE_ROWS_EVENT)
> +int Rows_log_event::handle_idempotent_errors(Relay_log_info const *rli, int *err)
> +{
> +  int error= *err;
> +  if (error)
>    {
> -    DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
> -              no_bytes_in_map(&m_cols_ai));
> -    res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
> -                                no_bytes_in_map(&m_cols_ai));
> -  }
> -  DBUG_DUMP("rows", m_rows_buf, data_size);
> -  res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
> +    int actual_error= convert_handler_error(error, thd, m_table);
> +    bool idempotent_error= (idempotent_error_code(error) &&
> +                           (slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT));
> +    bool ignored_error= (idempotent_error == 0 ?
> +                         ignored_error_code(actual_error) : 0);
>  
> -  return res;
> +    if (idempotent_error || ignored_error)
> +    {
> +      if (global_system_variables.log_warnings)
> +        slave_rows_error_report(WARNING_LEVEL, error, rli, thd, m_table,
> +                                get_type_str(),
> +                               
> const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
> +                                (ulong) log_pos);
> +      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
> +      *err= 0;
> +      if (idempotent_error == 0)
> +        return ignored_error;
> +    }
> +  }
>  
> +  return *err;
>  }
> -#endif
>  
> -#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
> -void Rows_log_event::pack_info(Protocol *protocol)
> +int Rows_log_event::do_apply_row(Relay_log_info const *rli)
>  {
> -  char buf[256];
> -  char const *const flagstr=
> -    get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
> -  size_t bytes= my_snprintf(buf, sizeof(buf),
> -                               "table_id: %lu%s", m_table_id, flagstr);
> -  protocol->store(buf, bytes, &my_charset_bin);
> +  DBUG_ENTER("Rows_log_event::do_apply_row");
> +
> +  int error= 0;
> +  
> +  /* in_use can have been set to NULL in close_tables_for_reopen */
> +  THD* old_thd= m_table->in_use;
> +  if (!m_table->in_use)
> +    m_table->in_use= thd;
> +  
> +  error= do_exec_row(rli);
> +  
> +  DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
> +  DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
> +  
> +  m_table->in_use = old_thd;
> +
> +  DBUG_RETURN(error);
>  }
> -#endif
>  
> -#ifdef MYSQL_CLIENT
> -void Rows_log_event::print_helper(FILE *file,
> -                                  PRINT_EVENT_INFO *print_event_info,
> -                                  char const *const name)
> +
> +int Rows_log_event::do_index_scan_and_update(Relay_log_info const *rli)
>  {
> -  IO_CACHE *const head= &print_event_info->head_cache;
> -  IO_CACHE *const body= &print_event_info->body_cache;
> -  if (!print_event_info->short_form)
> -  {
> -    bool const last_stmt_event= get_flags(STMT_END_F);
> -    print_header(head, print_event_info, !last_stmt_event);
> -    my_b_printf(head, "\t%s: table id %lu%s\n",
> -                name, m_table_id,
> -                last_stmt_event ? " flags: STMT_END_F" : "");
> -    print_base64(body, print_event_info, !last_stmt_event);
> -  }
> +  DBUG_ENTER("Rows_log_event::do_index_scan_and_update");
> +  DBUG_ASSERT(m_table && m_table->in_use != NULL);
>  
> -  if (get_flags(STMT_END_F))
> +  TABLE *table= m_table;
> +  int error= 0;
> +  KEY *keyinfo;
> +  uint key;
> +  const uchar *saved_m_curr_row= m_curr_row;
> +
> +  /*
> +    rpl_row_tabledefs.test specifies that
> +    if the extra field on the slave does not have a default value
> +    and this is okay with Delete or Update events.
> +    Todo: fix wl3228 hld that requires defauls for all types of events
> +  */
> +
> +  prepare_record(table, &m_cols, FALSE);
> +  if ((error= unpack_current_row(rli, &m_cols)))
> +    goto end;
> +
> +  saved_m_curr_row= m_curr_row;
> +
> +  // Temporary fix to find out why it fails [/Matz]
> +  memcpy(m_table->read_set->bitmap, m_cols.bitmap,
> (m_table->read_set->n_bits + 7) / 8);
> +
> +  if (!is_any_column_signaled_for_table(table, &m_cols))
>    {
> -    copy_event_cache_to_file_and_reinit(head, file);
> -    copy_event_cache_to_file_and_reinit(body, file);
> +    error= HA_ERR_END_OF_FILE;
> +    goto end;
>    }
> -}
> +
> +#ifndef DBUG_OFF
> +  DBUG_PRINT("info",("looking for the following record"));
> +  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
>  #endif
>  
> -/**************************************************************************
> -	Table_map_log_event member functions and support functions
> -**************************************************************************/
> +  if ((key= search_key_in_table(table, &m_cols, PRI_KEY_FLAG)) >= MAX_KEY)
> +    /* we dont have a PK, or PK is not usable with BI values */
> +    goto INDEX_SCAN;
>  
> -/**
> -  @page How replication of field metadata works.
> -  
> -  When a table map is created, the master first calls 
> -  Table_map_log_event::save_field_metadata() which calculates how many 
> -  values will be in the field metadata. Only those fields that require the 
> -  extra data are added. The method also loops through all of the fields in 
> -  the table calling the method Field::save_field_metadata() which returns the
> -  values for the field that will be saved in the metadata and replicated to
> -  the slave. Once all fields have been processed, the table map is written to
> -  the binlog adding the size of the field metadata and the field metadata to
> -  the end of the body of the table map.
> +  if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
> +  {
> +    /*
> +      Use a more efficient method to fetch the record given by
> +      table->record[0] if the engine allows it.  We first compute a
> +      row reference using the position() member function (it will be
> +      stored in table->file->ref) and the use rnd_pos() to position
> +      the "cursor" (i.e., record[0] in this case) at the correct row.
>  
> -  When a table map is read on the slave, the field metadata is read from the 
> -  table map and passed to the table_def class constructor which saves the 
> -  field metadata from the table map into an array based on the type of the 
> -  field. Field metadata values not present (those fields that do not use extra 
> -  data) in the table map are initialized as zero (0). The array size is the 
> -  same as the columns for the table on the slave.
> +      TODO: Add a check that the correct record has been fetched by
> +      comparing with the original record. Take into account that the
> +      record on the master and slave can be of different
> +      length. Something along these lines should work:
>  
> -  Additionally, values saved for field metadata on the master are saved as a 
> -  string of bytes (uchar) in the binlog. A field may require 1 or more bytes
> -  to store the information. In cases where values require multiple bytes 
> -  (e.g. values > 255), the endian-safe methods are used to properly encode 
> -  the values on the master and decode them on the slave. When the field
> -  metadata values are captured on the slave, they are stored in an array of
> -  type uint16. This allows the least number of casts to prevent casting bugs
> -  when the field metadata is used in comparisons of field attributes. When
> -  the field metadata is used for calculating addresses in pointer math, the
> -  type used is uint32. 
> -*/
> +      ADD>>>  store_record(table,record[1]);
> +              int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> +      ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
> +                                 table->s->reclength) == 0);
>  
> -#if !defined(MYSQL_CLIENT)
> -/**
> -  Save the field metadata based on the real_type of the field.
> -  The metadata saved depends on the type of the field. Some fields
> -  store a single byte for pack_length() while others store two bytes
> -  for field_length (max length).
> -  
> -  @retval  0  Ok.
> +    */
>  
> -  @todo
> -  We may want to consider changing the encoding of the information.
> -  Currently, the code attempts to minimize the number of bytes written to 
> -  the tablemap. There are at least two other alternatives; 1) using 
> -  net_store_length() to store the data allowing it to choose the number of
> -  bytes that are appropriate thereby making the code much easier to 
> -  maintain (only 1 place to change the encoding), or 2) use a fixed number
> -  of bytes for each field. The problem with option 1 is that net_store_length()
> -  will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
> -  for fields like CHAR which can be no larger than 255 characters, the method
> -  will use 3 bytes when the value is > 250. Further, every value that is
> -  encoded using 2 parts (e.g., pack_length, field_length) will be numerically
> -  > 250 therefore will use 3 bytes for eah value. The problem with option 2
> -  is less wasteful for space but does waste 1 byte for every field that does
> -  not encode 2 parts. 
> -*/
> -int Table_map_log_event::save_field_metadata()
> -{
> -  DBUG_ENTER("Table_map_log_event::save_field_metadata");
> -  int index= 0;
> -  for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
> -  {
> -    DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
> -    index+=
> m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
> +    DBUG_PRINT("info",("locating record using primary key (position)"));
> +    if (table->file->inited && (error=
> table->file->ha_index_end()))
> +      goto end;
> +
> +    if ((error= table->file->ha_rnd_init(FALSE)))
> +      goto end;
> +
> +    error= table->file->rnd_pos_by_record(table->record[0]);
> +
> +    table->file->ha_rnd_end();
> +    if (error)
> +    {
> +      DBUG_PRINT("info",("rnd_pos returns error %d",error));
> +      if (error == HA_ERR_RECORD_DELETED)
> +        error= HA_ERR_KEY_NOT_FOUND;
> +    }
> +    
> +    goto end;
>    }
> -  DBUG_RETURN(index);
> -}
> -#endif /* !defined(MYSQL_CLIENT) */
>  
> -/*
> -  Constructor used to build an event for writing to the binary log.
> -  Mats says tbl->s lives longer than this event so it's ok to copy pointers
> -  (tbl->s->db etc) and not pointer content.
> - */
> -#if !defined(MYSQL_CLIENT)
> -Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
> -                                         bool is_transactional)
> -  : Log_event(thd, 0, is_transactional),
> -    m_table(tbl),
> -    m_dbnam(tbl->s->db.str),
> -    m_dblen(m_dbnam ? tbl->s->db.length : 0),
> -    m_tblnam(tbl->s->table_name.str),
> -    m_tbllen(tbl->s->table_name.length),
> -    m_colcnt(tbl->s->fields),
> -    m_memory(NULL),
> -    m_table_id(tid),
> -    m_flags(TM_BIT_LEN_EXACT_F),
> -    m_data_size(0),
> -    m_field_metadata(0),
> -    m_field_metadata_size(0),
> -    m_null_bits(0),
> -    m_meta_memory(NULL)
> -{
> -  uchar cbuf[sizeof(m_colcnt) + 1];
> -  uchar *cbuf_end;
> -  DBUG_ASSERT(m_table_id != ~0UL);
> -  /*
> -    In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
> -    table.cc / alloc_table_share():
> -      Use the fact the key is db/0/table_name/0
> -    As we rely on this let's assert it.
> -  */
> -  DBUG_ASSERT((tbl->s->db.str == 0) ||
> -              (tbl->s->db.str[tbl->s->db.length] == 0));
> -  DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
> +  // We can't use position() - try other methods.
>  
> +INDEX_SCAN:
>  
> -  m_data_size=  TABLE_MAP_HEADER_LEN;
> -  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
> -  m_data_size+= m_dblen + 2;	// Include length and terminating \0
> -  m_data_size+= m_tbllen + 2;	// Include length and terminating \0
> -  cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
> -  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
> -  m_data_size+= (cbuf_end - cbuf) + m_colcnt;	// COLCNT and column types
> +  /*
> +    Save copy of the record in table->record[1]. It might be needed
> +    later if linear search is used to find exact match.
> +   */
> +  store_record(table,record[1]);
>  
> -  /* If malloc fails, caught in is_valid() */
> -  if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
> +  if ((key= search_key_in_table(table, &m_cols,
> +                                (PRI_KEY_FLAG | UNIQUE_KEY_FLAG |
> MULTIPLE_KEY_FLAG)))
> +       >= MAX_KEY)
> +    /* we dont have a key, or no key is suitable for the BI values */
>    {
> -    m_coltype= reinterpret_cast<uchar*>(m_memory);
> -    for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
> -      m_coltype[i]= m_table->field[i]->type();
> +    error= HA_ERR_KEY_NOT_FOUND;
> +    goto end;
>    }
>  
> -  /*
> -    Calculate a bitmap for the results of maybe_null() for all columns.
> -    The bitmap is used to determine when there is a column from the master
> -    that is not on the slave and is null and thus not in the row data during
> -    replication.
> -  */
> -  uint num_null_bytes= (m_table->s->fields + 7) / 8;
> -  m_data_size+= num_null_bytes;
> -  m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
> -                                 &m_null_bits, num_null_bytes,
> -                                 &m_field_metadata, (m_colcnt * 2),
> -                                 NULL);
> -
> -  bzero(m_field_metadata, (m_colcnt * 2));
> +  {
> +    keyinfo= table->key_info + key;
>  
> -  /*
> -    Create an array for the field metadata and store it.
> -  */
> -  m_field_metadata_size= save_field_metadata();
> -  DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
>  
> -  /*
> -    Now set the size of the data to the size of the field metadata array
> -    plus one or three bytes (see pack.c:net_store_length) for number of 
> -    elements in the field metadata array.
> -  */
> -  if (m_field_metadata_size < 251)
> -    m_data_size+= m_field_metadata_size + 1; 
> -  else
> -    m_data_size+= m_field_metadata_size + 3; 
> +    DBUG_PRINT("info",("locating record using primary key (index_read)"));
>  
> -  bzero(m_null_bits, num_null_bytes);
> -  for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
> -    if (m_table->field[i]->maybe_null())
> -      m_null_bits[(i / 8)]+= 1 << (i % 8);
> +    /* The key'th key is active and usable: search the table using the index */
> +    if (!table->file->inited && (error=
> table->file->ha_index_init(key, FALSE)))
> +    {
> +      DBUG_PRINT("info",("ha_index_init returns error %d",error));
> +      goto end;
> +    }
>  
> -}
> -#endif /* !defined(MYSQL_CLIENT) */
> +    /* Fill key data for the row */
>  
> -/*
> -  Constructor used by slave to read the event from the binary log.
> - */
> -#if defined(HAVE_REPLICATION)
> -Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
> -                                         const Format_description_log_event
> -                                         *description_event)
> +    DBUG_ASSERT(m_key);
> +    key_copy(m_key, table->record[0], keyinfo, 0);
>  
> -  : Log_event(buf, description_event),
> -#ifndef MYSQL_CLIENT
> -    m_table(NULL),
> +    /*
> +      Don't print debug messages when running valgrind since they can
> +      trigger false warnings.
> +     */
> +#ifndef HAVE_purify
> +    DBUG_DUMP("key data", m_key, keyinfo->key_length);
>  #endif
> -    m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
> -    m_colcnt(0), m_coltype(0),
> -    m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
> -    m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
> -    m_null_bits(0), m_meta_memory(NULL)
> -{
> -  unsigned int bytes_read= 0;
> -  DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
>  
> -  uint8 common_header_len= description_event->common_header_len;
> -  uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
> -  DBUG_PRINT("info",("event_len: %u  common_header_len: %d  post_header_len: %d",
> -                     event_len, common_header_len, post_header_len));
> +    /*
> +      We need to set the null bytes to ensure that the filler bit are
> +      all set when returning.  There are storage engines that just set
> +      the necessary bits on the bytes and don't set the filler bits
> +      correctly.
> +    */
> +    if (table->s->null_bytes > 0)
> +      table->record[0][table->s->null_bytes - 1]|=
> +        256U - (1U << table->s->last_null_bit_pos);
> +
> +    if ((error= table->file->ha_index_read_map(table->record[0], m_key,
> +                                               HA_WHOLE_KEY,
> +                                               HA_READ_KEY_EXACT)))
> +    {
> +      DBUG_PRINT("info",("no record matching the key found in the table"));
> +      if (error == HA_ERR_RECORD_DELETED)
> +        error= HA_ERR_KEY_NOT_FOUND;
> +      goto end;
> +    }
>  
>    /*
>      Don't print debug messages when running valgrind since they can
>      trigger false warnings.
>     */
>  #ifndef HAVE_purify
> -  DBUG_DUMP("event buffer", (uchar*) buf, event_len);
> +    DBUG_PRINT("info",("found first matching record"));
> +    DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
>  #endif
> +    /*
> +      Below is a minor "optimization".  If the key (i.e., key number
> +      0) has the HA_NOSAME flag set, we know that we have found the
> +      correct record (since there can be no duplicates); otherwise, we
> +      have to compare the record with the one found to see if it is
> +      the correct one.
>  
> -  /* Read the post-header */
> -  const char *post_start= buf + common_header_len;
> -
> -  post_start+= TM_MAPID_OFFSET;
> -  if (post_header_len == 6)
> -  {
> -    /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
> -    m_table_id= uint4korr(post_start);
> -    post_start+= 4;
> -  }
> -  else
> -  {
> -    DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
> -    m_table_id= (ulong) uint6korr(post_start);
> -    post_start+= TM_FLAGS_OFFSET;
> -  }
> +      CAVEAT! This behaviour is essential for the replication of,
> +      e.g., the mysql.proc table since the correct record *shall* be
> +      found using the primary key *only*.  There shall be no
> +      comparison of non-PK columns to decide if the correct record is
> +      found.  I can see no scenario where it would be incorrect to
> +      chose the row to change only using a PK or an UNNI.
> +    */
> +    if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
> +    {
> +      /* Unique does not have non nullable part */
> +      if (!(table->key_info->flags & (HA_NULL_PART_KEY)))      
> +        goto end;  // record found
> +      else
> +      {
> +        KEY *keyinfo= table->key_info;
> +        /*
> +          Unique has nullable part. We need to check if there is any field in the
> +          BI image that is null and part of UNNI.
> +        */
> +        bool null_found= FALSE;
> +        for (uint i=0; i < keyinfo->key_parts && !null_found; i++)
> +        {
> +          uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
> +          Field **f= table->field+fieldnr;
> +          null_found= (*f)->is_null();
> +        }
>  
> -  DBUG_ASSERT(m_table_id != ~0UL);
> +        if (!null_found)
> +          goto end;           // record found
>  
> -  m_flags= uint2korr(post_start);
> +        /* else fall through to index scan */
> +      }
> +    }
>  
> -  /* Read the variable part of the event */
> -  const char *const vpart= buf + common_header_len + post_header_len;
> +    /*
> +      In case key is not unique, we still have to iterate over records found
> +      and find the one which is identical to the row given. A copy of the
> +      record we are looking for is stored in record[1].
> +     */
> +    DBUG_PRINT("info",("non-unique index, scanning it to find matching record"));
>  
> -  /* Extract the length of the various parts from the buffer */
> -  uchar const *const ptr_dblen= (uchar const*)vpart + 0;
> -  m_dblen= *(uchar*) ptr_dblen;
> +    while (record_compare(table, &m_cols))
> +    {
> +      /*
> +        We need to set the null bytes to ensure that the filler bit
> +        are all set when returning.  There are storage engines that
> +        just set the necessary bits on the bytes and don't set the
> +        filler bits correctly.
>  
> -  /* Length of database name + counter + terminating null */
> -  uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
> -  m_tbllen= *(uchar*) ptr_tbllen;
> +        TODO[record format ndb]: Remove this code once NDB returns the
> +        correct record format.
> +      */
> +      if (table->s->null_bytes > 0)
> +      {
> +        table->record[0][table->s->null_bytes - 1]|=
> +          256U - (1U << table->s->last_null_bit_pos);
> +      }
>  
> -  /* Length of table name + counter + terminating null */
> -  uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
> -  uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
> -  m_colcnt= net_field_length(&ptr_after_colcnt);
> +      while ((error= table->file->ha_index_next(table->record[0])))
> +      {
> +        /* We just skip records that has already been deleted */
> +        if (error == HA_ERR_RECORD_DELETED)
> +          continue;
> +        DBUG_PRINT("info",("no record matching the given row found"));
> +        goto end;
> +      }
> +    }
> +  }
>  
> -  DBUG_PRINT("info",("m_dblen: %lu  off: %ld  m_tbllen: %lu  off: %ld  m_colcnt: %lu
>  off: %ld",
> -                     (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart), 
> -                     (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
> -                     m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
> +end:
>  
> -  /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
> -  m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> -                                     &m_dbnam, (uint) m_dblen + 1,
> -                                     &m_tblnam, (uint) m_tbllen + 1,
> -                                     &m_coltype, (uint) m_colcnt,
> -                                     NullS);
> +  DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
>  
> -  if (m_memory)
> -  {
> -    /* Copy the different parts into their memory */
> -    strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen  + 1, m_dblen +
> 1);
> -    strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen
> + 1);
> -    memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
> +  if (error && error != HA_ERR_RECORD_DELETED)
> +    table->file->print_error(error, MYF(0));
> +  else
> +    error= do_apply_row(rli);
>  
> -    ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
> -    bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
> -    DBUG_PRINT("info", ("Bytes read: %d.\n", bytes_read));
> -    if (bytes_read < event_len)
> -    {
> -      m_field_metadata_size= net_field_length(&ptr_after_colcnt);
> -      DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
> -      uint num_null_bytes= (m_colcnt + 7) / 8;
> -      m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
> -                                     &m_null_bits, num_null_bytes,
> -                                     &m_field_metadata, m_field_metadata_size,
> -                                     NULL);
> -      memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
> -      ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
> -      memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
> -    }
> +  if (table->file->inited)
> +    table->file->ha_index_end();
> +
> +  if ((get_type_code() == UPDATE_ROWS_EVENT) && 
> +      (saved_m_curr_row == m_curr_row)) 
> +  {
> +    /* we need to unpack the AI so that positions get updated */
> +    m_curr_row= m_curr_row_end;
> +    unpack_current_row(rli, &m_cols);
>    }
>  
> -  DBUG_VOID_RETURN;
> -}
> -#endif
> +  table->default_column_bitmaps();
> +  DBUG_RETURN(error);
>  
> -Table_map_log_event::~Table_map_log_event()
> -{
> -  my_free(m_meta_memory);
> -  my_free(m_memory);
>  }
>  
> -/*
> -  Return value is an error code, one of:
> -
> -      -1     Failure to open table   [from open_tables()]
> -       0     Success
> -       1     No room for more tables [from set_table()]
> -       2     Out of memory           [from set_table()]
> -       3     Wrong table definition
> -       4     Daisy-chaining RBR with SBR not possible
> - */
> -
> -#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
> +int Rows_log_event::do_hash_scan_and_update(Relay_log_info const *rli)
>  {
> -  RPL_TABLE_LIST *table_list;
> -  char *db_mem, *tname_mem;
> -  size_t dummy_len;
> -  void *memory;
> -  DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
> -  DBUG_ASSERT(rli->info_thd == thd);
> +  int error= 0;
> +  const uchar *saved_last_m_curr_row= NULL;
> +  const uchar *bi_start= NULL;
> +  const uchar *bi_ends= NULL;
> +  const uchar *ai_start= NULL;
> +  const uchar *ai_ends= NULL;
> +  HASH_ROW_POS_ENTRY* entry;
>  
> -  /* Step the query id to mark what columns that are actually used. */
> -  thd->set_query_id(next_query_id());
> +  DBUG_ENTER("Rows_log_event::do_hash_scan_and_update");
>  
> -  if (!(memory= my_multi_malloc(MYF(MY_WME),
> -                                &table_list, (uint) sizeof(RPL_TABLE_LIST),
> -                                &db_mem, (uint) NAME_LEN + 1,
> -                                &tname_mem, (uint) NAME_LEN + 1,
> -                                NullS)))
> -    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
> +  bi_start= m_curr_row;
> +  if ((error= unpack_current_row(rli, &m_cols, 0)))
> +    goto err;
> +  bi_ends= m_curr_row_end;
>  
> -  strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
> -  strmov(tname_mem, m_tblnam);
> +  store_record(m_table, record[1]);
>  
> -  table_list->init_one_table(db_mem, strlen(db_mem),
> -                             tname_mem, strlen(tname_mem),
> -                             tname_mem, TL_WRITE);
> +  if (get_type_code() == UPDATE_ROWS_EVENT)
> +  {
> +    /*
> +      This is the situation after hashing the BI:
> +      
> +      ===|=== before image ====|=== after image ===|===
> +         ^                     ^
> +         m_curr_row            m_curr_row_end
> +      
> +      We need to skip the AI as well, before moving on to the
> +      next row.
> +    */
> +    ai_start= m_curr_row= m_curr_row_end;
> +    error= unpack_current_row(rli, &m_cols_ai);
> +    ai_ends= m_curr_row_end;
> +  }
>  
> -  table_list->table_id= m_table_id;
> -  table_list->updating= 1;
> +  /* move BI to index 0 */
> +  memcpy(m_table->record[0], m_table->record[1],
> m_table->s->reclength);
>  
> -  int error= 0;
> +  /* create an entry to add to the hash table */
> +  entry= m_hash.make_entry(bi_start, bi_ends, ai_start, ai_ends);
>  
> -  if (rli->info_thd->slave_thread /* filtering is for slave only */
> &&
> -      (!rpl_filter->db_ok(table_list->db) ||
> -       (rpl_filter->is_on() && !rpl_filter->tables_ok("",
> table_list))))
> -  {
> -    my_free(memory);
> -  }
> -  else
> +  /* add it to the hash table */
> +  m_hash.put(m_table, &m_cols, entry);
> +            
> +  /**
> +    Last row hashed. We are handling the last (pair of) row(s).  So
> +    now we do the table scan and match against the entries in the hash
> +    table.
> +   */
> +  if (m_curr_row_end == m_rows_end)
>    {
> -    DBUG_ASSERT(thd->lex->query_tables != table_list);
> +    saved_last_m_curr_row=m_curr_row;
>  
> -    /*
> -      Use placement new to construct the table_def instance in the
> -      memory allocated for it inside table_list.
> +    DBUG_PRINT("info",("Hash was populated with %d records!", m_hash.size()));
> +    TABLE* table= m_table;
>  
> -      The memory allocated by the table_def structure (i.e., not the
> -      memory allocated *for* the table_def structure) is released
> -      inside Relay_log_info::clear_tables_to_lock() by calling the
> -      table_def destructor explicitly.
> -    */
> -    new (&table_list->m_tabledef)
> -      table_def(m_coltype, m_colcnt,
> -                m_field_metadata, m_field_metadata_size,
> -                m_null_bits, m_flags);
> -    table_list->m_tabledef_valid= TRUE;
> +    if ((error= table->file->ha_rnd_init(1)))
> +    {
> +      DBUG_PRINT("info",("error initializing table scan"
> +          " (ha_rnd_init returns %d)",error));
> +      table->file->print_error(error, MYF(0));
> +      goto err;
> +    }
>  
> -    /*
> -      We record in the slave's information that the table should be
> -      locked by linking the table into the list of tables to lock.
> -    */
> -    table_list->next_global= table_list->next_local= rli->tables_to_lock;
> -    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
> -    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
> -    /* 'memory' is freed in clear_tables_to_lock */
> -  }
> +    /* 
> +       Scan the table only once and compare against entries in hash.
> +       When a match is found, apply the changes.
> +     */
> +    do
> +    {
> +      /* get the first record from the table */
> +      error= table->file->ha_rnd_next(table->record[0]);
>  
> -  DBUG_RETURN(error);
> -}
> +      DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
> +      switch (error) {
> +        case 0:
> +        {
> +          entry= NULL;
> +          m_hash.get(table, &m_cols, &entry);
> +          store_record(table, record[1]);
> +
> +          /**
> +             If there are collisions we need to be sure that this is
> +             indeed the record we want.  Loop through all records for
> +             the given key and explicitly compare them against the
> +             record we got from the storage engine.
> +           */
> +          while(entry)
> +          {
> +            m_curr_row= entry->bi_start;
> +            m_curr_row_end= entry->bi_ends;
> +
> +            if ((error= unpack_current_row(rli, &m_cols)))
> +              goto close_table;
> +            
> +            if (record_compare(m_table, &m_cols))
> +              m_hash.next(&entry);
> +            else
> +              break;   // we found a match
> +          }
> +
> +          /**
> +             We found the entry we needed, just apply the changes.
> +           */
> +          if (entry)
> +          {
> +            // just to be safe, copy the record from the SE to table->record[0]
> +            memcpy(table->record[0], table->record[1],
> table->s->reclength);
> +
> +            /**
> +               At this point, both table->record[0] and
> +               table->record[1] have the SE row that matched the one
> +               in the hash table.
> +               
> +               Thence if this is a DELETE we wouldn't need to mess
> +               around with positions anymore, but since this can be an
> +               update, we need to provide positions so that AI is
> +               unpacked correctly to table->record[0] in UPDATE
> +               implementation of do_exec_row().
> +            */
> +            m_curr_row= entry->bi_start;
> +            m_curr_row_end= entry->bi_ends;
> +
> +            /* we don't need this entry anymore, just delete it */
> +            m_hash.del(entry);
> +            
> +            if ((error= do_apply_row(rli)))
> +            {
> +              if (handle_idempotent_errors(rli, &error) || error)
> +                goto close_table;
> +
> +              do_post_row_operations(rli, error);
> +            }
> +          }
> +        }
> +        break;
>  
> -Log_event::enum_skip_reason
> -Table_map_log_event::do_shall_skip(Relay_log_info *rli)
> -{
> -  /*
> -    If the slave skip counter is 1, then we should not start executing
> -    on the next event.
> -  */
> -  return continue_group(rli);
> -}
> +        case HA_ERR_RECORD_DELETED:
> +          // get next
> +          continue;
>  
> -int Table_map_log_event::do_update_pos(Relay_log_info *rli)
> -{
> -  rli->inc_event_relay_log_pos();
> -  return 0;
> -}
> +        case HA_ERR_END_OF_FILE:
> +        default:
> +          // exception (hash is not empty and we have reached EOF or
> +          // other error happened)
> +          goto close_table;
> +      }
> +    }
>  
> -#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> +    while ((!m_hash.is_empty()) && 
> +           (!error || (error == HA_ERR_RECORD_DELETED)));
>  
> -#ifndef MYSQL_CLIENT
> -bool Table_map_log_event::write_data_header(IO_CACHE *file)
> -{
> -  DBUG_ASSERT(m_table_id != ~0UL);
> -  uchar buf[TABLE_MAP_HEADER_LEN];
> -  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
> -                  {
> -                    int4store(buf + 0, m_table_id);
> -                    int2store(buf + 4, m_flags);
> -                    return (my_b_safe_write(file, buf, 6));
> -                  });
> -  int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
> -  int2store(buf + TM_FLAGS_OFFSET, m_flags);
> -  return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
> +close_table:
> +    if (error)
> +    {
> +      m_table->file->print_error(error, MYF(0));
> +      DBUG_PRINT("info", ("Failed to get next record"
> +                          " (ha_rnd_next returns %d)",error));
> +    }
> +    m_table->file->ha_rnd_end();
> +  }
> +
> +err:
> +  if (m_hash.is_empty() && !error)
> +  {
> +    /**
> +       Reset the last positions, because the positions are lost while
> +       handling entries in the hash.
> +     */
> +    m_curr_row= saved_last_m_curr_row;
> +    m_curr_row_end= m_rows_end;
> +  }
> +  DBUG_ASSERT((m_hash.is_empty()) ? (error == 0) : (!m_hash.is_empty()));
> +  DBUG_RETURN(error);  
>  }
>  
> -bool Table_map_log_event::write_data_body(IO_CACHE *file)
> +int Rows_log_event::do_table_scan_and_update(Relay_log_info const *rli)
>  {
> -  DBUG_ASSERT(m_dbnam != NULL);
> -  DBUG_ASSERT(m_tblnam != NULL);
> -  /* We use only one byte per length for storage in event: */
> -  DBUG_ASSERT(m_dblen < 128);
> -  DBUG_ASSERT(m_tbllen < 128);
> -
> -  uchar const dbuf[]= { (uchar) m_dblen };
> -  uchar const tbuf[]= { (uchar) m_tbllen };
> -
> -  uchar cbuf[sizeof(m_colcnt) + 1];
> -  uchar *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
> -  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
> +  int error= 0;
> +  const uchar* saved_m_curr_row= m_curr_row;
> +  TABLE* table= m_table;
>  
> -  /*
> -    Store the size of the field metadata.
> -  */
> -  uchar mbuf[sizeof(m_field_metadata_size)];
> -  uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
> +  DBUG_ENTER("Rows_log_event::do_table_scan_and_update");
> +  DBUG_ASSERT(m_curr_row != m_rows_end);
> +  DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
>  
> -  return (my_b_safe_write(file, dbuf,      sizeof(dbuf)) ||
> -          my_b_safe_write(file, (const uchar*)m_dbnam,   m_dblen+1) ||
> -          my_b_safe_write(file, tbuf,      sizeof(tbuf)) ||
> -          my_b_safe_write(file, (const uchar*)m_tblnam,  m_tbllen+1) ||
> -          my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
> -          my_b_safe_write(file, m_coltype, m_colcnt) ||
> -          my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
> -          my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
> -          my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
> - }
> -#endif
> +  saved_m_curr_row= m_curr_row;
>  
> -#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
> +  /** unpack the before image */
> +  prepare_record(table, &m_cols, FALSE);
> +  if (!(error= unpack_current_row(rli, &m_cols, 0)))
> +  {
> +    // Temporary fix to find out why it fails [/Matz]
> +    memcpy(m_table->read_set->bitmap, m_cols.bitmap,
> (m_table->read_set->n_bits + 7) / 8);
>  
> -/*
> -  Print some useful information for the SHOW BINARY LOG information
> -  field.
> - */
> +    /** save a copy so that we can compare against it later */
> +    store_record(m_table, record[1]);
>  
> -#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
> -void Table_map_log_event::pack_info(Protocol *protocol)
> -{
> -    char buf[256];
> -    size_t bytes= my_snprintf(buf, sizeof(buf),
> -                                 "table_id: %lu (%s.%s)",
> -                              m_table_id, m_dbnam, m_tblnam);
> -    protocol->store(buf, bytes, &my_charset_bin);
> -}
> -#endif
> +    int restart_count= 0; // Number of times scanning has restarted from top
> +    
> +    if ((error= m_table->file->ha_rnd_init(1)))
> +    {
> +      DBUG_PRINT("info",("error initializing table scan"
> +                         " (ha_rnd_init returns %d)",error));
> +      goto end;
> +    }
> +    
> +    /* Continue until we find the right record or have made a full loop */
> +    do
> +    {
> +      error= m_table->file->ha_rnd_next(m_table->record[0]);
> +      
> +      DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
> +      switch (error) {
> +      case HA_ERR_END_OF_FILE:
> +        // restart scan from top
> +        if (++restart_count < 2)
> +          error= m_table->file->ha_rnd_init(1);
> +        break;
>  
> +      case HA_ERR_RECORD_DELETED:
> +        // fetch next
> +      case 0:
> +        // we're good, check if record matches
> +        break;
>  
> -#endif
> +      default:
> +        // exception
> +        goto end;
> +      }
> +    }
> +    while ((error == HA_ERR_END_OF_FILE && restart_count < 2) ||
> +           (error == HA_ERR_RECORD_DELETED) ||
> +           (!error && record_compare(m_table, &m_cols)));
> +  }
>  
> +end:
>  
> -#ifdef MYSQL_CLIENT
> -void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
> -{
> -  if (!print_event_info->short_form)
> +  DBUG_ASSERT(error != HA_ERR_RECORD_DELETED);
> +  
> +  /* either we report error or apply the changes */
> +  if (error && error != HA_ERR_RECORD_DELETED)
>    {
> -    print_header(&print_event_info->head_cache, print_event_info, TRUE);
> -    my_b_printf(&print_event_info->head_cache,
> -                "\tTable_map: `%s`.`%s` mapped to number %lu\n",
> -                m_dbnam, m_tblnam, m_table_id);
> -    print_base64(&print_event_info->body_cache, print_event_info, TRUE);
> +    DBUG_PRINT("info", ("Failed to get next record"
> +                        " (ha_rnd_next returns %d)",error));
> +    m_table->file->print_error(error, MYF(0));
>    }
> -}
> -#endif
> +  else 
> +    error= do_apply_row(rli);
>  
> -/**************************************************************************
> -	Write_rows_log_event member functions
> -**************************************************************************/
> +  /* close the index */
> +  if (table->file->inited)
> +    table->file->ha_rnd_end();
>  
> -/*
> -  Constructor used to build an event for writing to the binary log.
> - */
> -#if !defined(MYSQL_CLIENT)
> -Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
> -                                           ulong tid_arg,
> -                                           bool is_transactional)
> -  : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set,
> is_transactional)
> -{
> -}
> -#endif
> +  if ((get_type_code() == UPDATE_ROWS_EVENT) && 
> +      (saved_m_curr_row == m_curr_row)) // we need to unpack the AI
> +  {
> +    m_curr_row= m_curr_row_end;
> +    unpack_current_row(rli, &m_cols);
> +  }
>  
> -/*
> -  Constructor used by slave to read the event from the binary log.
> - */
> -#ifdef HAVE_REPLICATION
> -Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
> -                                           const Format_description_log_event
> -                                           *description_event)
> -: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
> -{
> +  table->default_column_bitmaps();
> +  DBUG_RETURN(error);
>  }
> -#endif
>  
> -#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -int 
> -Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
> *const)
> +int Rows_log_event::do_apply_event(Relay_log_info const *rli)
>  {
> +  DBUG_ENTER("Rows_log_event::do_apply_event(Relay_log_info*)");
>    int error= 0;
> +  /*
> +    If m_table_id == ~0UL, then we have a dummy event that does not
> +    contain any data.  In that case, we just remove all tables in the
> +    tables_to_lock list, close the thread tables, and return with
> +    success.
> +   */
> +  if (m_table_id == ~0UL)
> +  {
> +    /*
> +       This one is supposed to be set: just an extra check so that
> +       nothing strange has happened.
> +     */
> +    DBUG_ASSERT(get_flags(STMT_END_F));
>  
> -  /**
> -     todo: to introduce a property for the event (handler?) which forces
> -     applying the event in the replace (idempotent) fashion.
> +    const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
> +    thd->clear_error();
> +    DBUG_RETURN(0);
> +  }
> +
> +  /*
> +    'thd' has been set by exec_relay_log_event(), just before calling
> +    do_apply_event(). We still check here to prevent future coding
> +    errors.
>    */
> -  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
> -      (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
> +  DBUG_ASSERT(rli->info_thd == thd);
> +
> +  /*
> +    If there is no locks taken, this is the first binrow event seen
> +    after the table map events.  We should then lock all the tables
> +    used in the transaction and proceed with execution of the actual
> +    event.
> +  */
> +  if (!thd->lock)
>    {
>      /*
> -      We are using REPLACE semantics and not INSERT IGNORE semantics
> -      when writing rows, that is: new rows replace old rows.  We need to
> -      inform the storage engine that it should use this behaviour.
> +      Lock_tables() reads the contents of thd->lex, so they must be
> +      initialized.
> +
> +      We also call the mysql_reset_thd_for_next_command(), since this
> +      is the logical start of the next "statement". Note that this
> +      call might reset the value of current_stmt_binlog_format, so
> +      we need to do any changes to that value after this function.
>      */
> -    
> -    /* Tell the storage engine that we are using REPLACE semantics. */
> -    thd->lex->duplicates= DUP_REPLACE;
> -    
> +    lex_start(thd);
> +    mysql_reset_thd_for_next_command(thd);
>      /*
> -      Pretend we're executing a REPLACE command: this is needed for
> -      InnoDB and NDB Cluster since they are not (properly) checking the
> -      lex->duplicates flag.
> -    */
> -    thd->lex->sql_command= SQLCOM_REPLACE;
> -    /* 
> -       Do not raise the error flag in case of hitting to an unique attribute
> -    */
> -    m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
> -    /* 
> -       NDB specific: update from ndb master wrapped as Write_rows
> -       so that the event should be applied to replace slave's row
> +      The current statement is just about to begin and 
> +      has not yet modified anything. Note, all.modified is reset
> +      by mysql_reset_thd_for_next_command.
>      */
> -    m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
> -    /* 
> -       NDB specific: if update from ndb master wrapped as Write_rows
> -       does not find the row it's assumed idempotent binlog applying
> -       is taking place; don't raise the error.
> +    thd->transaction.stmt.modified_non_trans_table= FALSE;
> +    /*
> +      This is a row injection, so we flag the "statement" as
> +      such. Note that this code is called both when the slave does row
> +      injections and when the BINLOG statement is used to do row
> +      injections.
>      */
> -    m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
> +    thd->lex->set_stmt_row_injection();
> +
>      /*
> -      TODO: the cluster team (Tomas?) says that it's better if the engine knows
> -      how many rows are going to be inserted, then it can allocate needed memory
> -      from the start.
> +      There are a few flags that are replicated with each row event.
> +      Make sure to set/clear them before executing the main body of
> +      the event.
>      */
> -  }
> +    if (get_flags(NO_FOREIGN_KEY_CHECKS_F))
> +        thd->variables.option_bits|= OPTION_NO_FOREIGN_KEY_CHECKS;
> +    else
> +        thd->variables.option_bits&= ~OPTION_NO_FOREIGN_KEY_CHECKS;
>  
> -  /*
> -    We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
> -    any TIMESTAMP column with data from the row but instead will use
> -    the event's current time.
> -    As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
> -    columns, we know that all TIMESTAMP columns on slave will receive explicit
> -    data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
> -    When we allow a table without TIMESTAMP to be replicated to a table having
> -    more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
> -    column to be replicated into a BIGINT column and the slave's table has a
> -    TIMESTAMP column, then the slave's TIMESTAMP column will take its value
> -    from set_time() which we called earlier (consistent with SBR). And then in
> -    some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
> -    analyze if explicit data is provided for slave's TIMESTAMP columns).
> -  */
> -  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> -  
> -  /* Honor next number column if present */
> -  m_table->next_number_field= m_table->found_next_number_field;
> -  /*
> -   * Fixed Bug#45999, In RBR, Store engine of Slave auto-generates new
> -   * sequence numbers for auto_increment fields if the values of them are 0.
> -   * If generateing a sequence number is decided by the values of
> -   * table->auto_increment_field_not_null and SQL_MODE(if includes
> -   * MODE_NO_AUTO_VALUE_ON_ZERO) in update_auto_increment function.
> -   * SQL_MODE of slave sql thread is always consistency with master's.
> -   * In RBR, auto_increment fields never are NULL.
> -   */
> -  m_table->auto_increment_field_not_null= TRUE;
> -  return error;
> -}
> +    if (get_flags(RELAXED_UNIQUE_CHECKS_F))
> +        thd->variables.option_bits|= OPTION_RELAXED_UNIQUE_CHECKS;
> +    else
> +        thd->variables.option_bits&= ~OPTION_RELAXED_UNIQUE_CHECKS;
> +    /* A small test to verify that objects have consistent types */
> +    DBUG_ASSERT(sizeof(thd->variables.option_bits) ==
> sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
> +
> +    if (open_and_lock_tables(thd, rli->tables_to_lock, FALSE, 0))
> +    {
> +      uint actual_error= thd->stmt_da->sql_errno();
> +      if (thd->is_slave_error || thd->is_fatal_error)
> +      {
> +        /*
> +          Error reporting borrowed from Query_log_event with many excessive
> +          simplifications. 
> +          We should not honour --slave-skip-errors at this point as we are
> +          having severe errors which should not be skiped.
> +        */
> +        rli->report(ERROR_LEVEL, actual_error,
> +                    "Error executing row event: '%s'",
> +                    (actual_error ? thd->stmt_da->message() :
> +                     "unexpected success or fatal error"));
> +        thd->is_slave_error= 1;
> +      }
> +      const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
> +      DBUG_RETURN(actual_error);
> +    }
>  
> -int 
> -Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability
> *const,
> -                                              int error)
> -{
> -  int local_error= 0;
> -  m_table->next_number_field=0;
> -  m_table->auto_increment_field_not_null= FALSE;
> -  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
> -      m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
> -  {
> -    m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> -    m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
>      /*
> -      resetting the extra with 
> -      table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
> -      fires bug#27077
> -      explanation: file->reset() performs this duty
> -      ultimately. Still todo: fix
> +      When the open and locking succeeded, we check all tables to
> +      ensure that they still have the correct type.
> +
> +      We can use a down cast here since we know that every table added
> +      to the tables_to_lock is a RPL_TABLE_LIST.
>      */
> -  }
> -  if ((local_error= m_table->file->ha_end_bulk_insert()))
> -  {
> -    m_table->file->print_error(local_error, MYF(0));
> -  }
> -  return error? error : local_error;
> -}
>  
> -#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -
> -/*
> -  Check if there are more UNIQUE keys after the given key.
> -*/
> -static int
> -last_uniq_key(TABLE *table, uint keyno)
> -{
> -  while (++keyno < table->s->keys)
> -    if (table->key_info[keyno].flags & HA_NOSAME)
> -      return 0;
> -  return 1;
> -}
> -
> -/**
> -   Check if an error is a duplicate key error.
> +    {
> +      DBUG_PRINT("debug", ("Checking compability of tables to lock - tables_to_lock:
> %p",
> +                           rli->tables_to_lock));
> +      RPL_TABLE_LIST *ptr= rli->tables_to_lock;
> +      for ( ; ptr ; ptr= static_cast<RPL_TABLE_LIST*>(ptr->next_global))
> +      {
> +        TABLE *conv_table;
> +        if (!ptr->m_tabledef.compatible_with(thd,
> const_cast<Relay_log_info*>(rli),
> +                                             ptr->table, &conv_table))
> +        {
> +          DBUG_PRINT("debug", ("Table: %s.%s is not compatible with master",
> +                               ptr->table->s->db.str,
> +                               ptr->table->s->table_name.str));
> +          /*
> +            We should not honour --slave-skip-errors at this point as we are
> +            having severe errors which should not be skiped.
> +          */
> +          thd->is_slave_error= 1;
> +         
> const_cast<Relay_log_info*>(rli)->slave_close_thread_tables(thd);
> +          DBUG_RETURN(ERR_BAD_TABLE_DEF);
> +        }
> +        DBUG_PRINT("debug", ("Table: %s.%s is compatible with master"
> +                             " - conv_table: %p",
> +                             ptr->table->s->db.str,
> +                             ptr->table->s->table_name.str, conv_table));
> +        ptr->m_conv_table= conv_table;
> +      }
> +    }
>  
> -   This function is used to check if an error code is one of the
> -   duplicate key error, i.e., and error code for which it is sensible
> -   to do a <code>get_dup_key()</code> to retrieve the duplicate key.
> +    /*
> +      ... and then we add all the tables to the table map and but keep
> +      them in the tables to lock list.
>  
> -   @param errcode The error code to check.
> +      We also invalidate the query cache for all the tables, since
> +      they will now be changed.
>  
> -   @return <code>true</code> if the error code is such that
> -   <code>get_dup_key()</code> will return true,
> <code>false</code>
> -   otherwise.
> - */
> -bool
> -is_duplicate_key_error(int errcode)
> -{
> -  switch (errcode)
> -  {
> -  case HA_ERR_FOUND_DUPP_KEY:
> -  case HA_ERR_FOUND_DUPP_UNIQUE:
> -    return true;
> +      TODO [/Matz]: Maybe the query cache should not be invalidated
> +      here? It might be that a table is not changed, even though it
> +      was locked for the statement.  We do know that each
> +      Rows_log_event contain at least one row, so after processing one
> +      Rows_log_event, we can invalidate the query cache for the
> +      associated table.
> +     */
> +    for (TABLE_LIST *ptr= rli->tables_to_lock ; ptr ; ptr= ptr->next_global)
> +    {
> +     
> const_cast<Relay_log_info*>(rli)->m_table_map.set_table(ptr->table_id,
> ptr->table);
> +    }
> +#ifdef HAVE_QUERY_CACHE
> +    query_cache.invalidate_locked_for_write(rli->tables_to_lock);
> +#endif
>    }
> -  return false;
> -}
> -
> -/**
> -  Write the current row into event's table.
> -
> -  The row is located in the row buffer, pointed by @c m_curr_row member.
> -  Number of columns of the row is stored in @c m_width member (it can be 
> -  different from the number of columns in the table to which we insert). 
> -  Bitmap @c m_cols indicates which columns are present in the row. It is assumed 
> -  that event's table is already open and pointed by @c m_table.
>  
> -  If the same record already exists in the table it can be either overwritten 
> -  or an error is reported depending on the value of @c overwrite flag 
> -  (error reporting not yet implemented). Note that the matching record can be
> -  different from the row we insert if we use primary keys to identify records in
> -  the table.
> +  TABLE* 
> +    table= 
> +    m_table=
> const_cast<Relay_log_info*>(rli)->m_table_map.get_table(m_table_id);
>  
> -  The row to be inserted can contain values only for selected columns. The 
> -  missing columns are filled with default values using @c prepare_record() 
> -  function. If a matching record is found in the table and @c overwritte is
> -  true, the missing columns are taken from it.
> +  DBUG_PRINT("debug", ("m_table: 0x%lx, m_table_id: %lu", (ulong) m_table,
> m_table_id));
>  
> -  @param  rli   Relay log info (needed for row unpacking).
> -  @param  overwrite  
> -                Shall we overwrite if the row already exists or signal 
> -                error (currently ignored).
> +  if (table)
> +  {
> +    /*
> +      table == NULL means that this table should not be replicated
> +      (this was set up by Table_map_log_event::do_apply_event()
> +      which tested replicate-* rules).
> +    */
>  
> -  @returns Error code on failure, 0 on success.
> +    /*
> +      It's not needed to set_time() but
> +      1) it continues the property that "Time" in SHOW PROCESSLIST shows how
> +      much slave is behind
> +      2) it will be needed when we allow replication from a table with no
> +      TIMESTAMP column to a table with one.
> +      So we call set_time(), like in SBR. Presently it changes nothing.
> +    */
> +    thd->set_time((time_t)when);
>  
> -  This method, if successful, sets @c m_curr_row_end pointer to point at the
> -  next row in the rows buffer. This is done when unpacking the row to be 
> -  inserted.
> +    /*
> +      Now we are in a statement and will stay in a statement until we
> +      see a STMT_END_F.
>  
> -  @note If a matching record is found, it is either updated using 
> -  @c ha_update_row() or first deleted and then new record written.
> -*/ 
> +      We set this flag here, before actually applying any rows, in
> +      case the SQL thread is stopped and we need to detect that we're
> +      inside a statement and halting abruptly might cause problems
> +      when restarting.
> +     */
> +    const_cast<Relay_log_info*>(rli)->set_flag(Relay_log_info::IN_STMT);
>  
> -int
> -Rows_log_event::write_row(const Relay_log_info *const rli,
> -                          const bool overwrite)
> -{
> -  DBUG_ENTER("write_row");
> -  DBUG_ASSERT(m_table != NULL && thd != NULL);
> +     if ( m_width == table->s->fields &&
> bitmap_is_set_all(&m_cols))
> +      set_flags(COMPLETE_ROWS_F);
>  
> -  TABLE *table= m_table;  // pointer to event's table
> -  int error;
> -  int UNINIT_VAR(keynum);
> -  auto_afree_ptr<char> key(NULL);
> +    /* 
> +      Set tables write and read sets.
> +      
> +      Read_set contains all slave columns (in case we are going to fetch
> +      a complete record from slave)
> +      
> +      Write_set equals the m_cols bitmap sent from master but it can be 
> +      longer if slave has extra columns. 
> +     */ 
>  
> -  /* fill table->record[0] with default values */
> -  bool abort_on_warnings= (rli->info_thd->variables.sql_mode &
> -                           (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
> -  if ((error= prepare_record(table, &m_cols,
> -                             table->file->ht->db_type !=
> DB_TYPE_NDBCLUSTER,
> -                             abort_on_warnings, m_curr_row == m_rows_buf)))
> -    DBUG_RETURN(error);
> -  
> -  /* unpack row into table->record[0] */
> -  if ((error= unpack_current_row(rli, &m_cols, abort_on_warnings)))
> -    DBUG_RETURN(error);
> +    DBUG_PRINT_BITSET("debug", "Setting table's write_set from: %s", &m_cols);
> +    
> +    bitmap_set_all(table->read_set);
> +    if (get_type_code() == DELETE_ROWS_EVENT)
> +        bitmap_intersect(table->read_set,&m_cols);
>  
> -  // Temporary fix to find out why it fails [/Matz]
> -  memcpy(m_table->write_set->bitmap, m_cols.bitmap,
> (m_table->write_set->n_bits + 7) / 8);
> +    bitmap_set_all(table->write_set);
> +    if (!get_flags(COMPLETE_ROWS_F))
> +    {
> +      if (get_type_code() == UPDATE_ROWS_EVENT)
> +        bitmap_intersect(table->write_set,&m_cols_ai);
> +      else /* WRITE ROWS EVENTS store the bitmap in m_cols instead of m_cols_ai */
> +        bitmap_intersect(table->write_set,&m_cols);
> +    }
>  
> -  if (m_curr_row == m_rows_buf)
> -  {
> -    /* this is the first row to be inserted, we estimate the rows with
> -       the size of the first row and use that value to initialize
> -       storage engine for bulk insertion */
> -    DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
> -    ulong estimated_rows= 0;
> -    if (m_curr_row < m_curr_row_end)
> -      estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
> -    else if (m_curr_row == m_curr_row_end)
> -      estimated_rows= 1;
> +    this->slave_exec_mode= slave_exec_mode_options; // fix the mode
> +    // Do event specific preparations
>  
> -    m_table->file->ha_start_bulk_insert(estimated_rows);
> -  }
> -  
> -  
> -#ifndef DBUG_OFF
> -  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> -  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
> -  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
> -#endif
> +    error= do_before_row_operations(rli);
>  
> -  /* 
> -    Try to write record. If a corresponding record already exists in the table,
> -    we try to change it using ha_update_row() if possible. Otherwise we delete
> -    it and repeat the whole process again. 
> +    int (Rows_log_event::*do_apply_row_ptr)(Relay_log_info const *)= NULL;
> +    switch (m_rows_lookup_algorithm)
> +    {
> +      case ROW_LOOKUP_HASH_SCAN:
> +        do_apply_row_ptr= &Rows_log_event::do_hash_scan_and_update;
> +        break;
>  
> -    TODO: Add safety measures against infinite looping. 
> -   */
> +      case ROW_LOOKUP_INDEX_SCAN:
> +        do_apply_row_ptr= &Rows_log_event::do_index_scan_and_update;
> +        break;
>  
> -  m_table->mark_columns_per_binlog_row_image();
> +      case ROW_LOOKUP_TABLE_SCAN:
> +        do_apply_row_ptr= &Rows_log_event::do_table_scan_and_update;
> +        break;
> +      
> +      case ROW_LOOKUP_NOT_NEEDED:
> +        DBUG_ASSERT(get_type_code() == WRITE_ROWS_EVENT);
> +      
> +        /* No need to scan for rows, just apply it */
> +        do_apply_row_ptr= &Rows_log_event::do_apply_row;
> +        break;
>  
> -  while ((error= table->file->ha_write_row(table->record[0])))
> -  {
> -    if (error == HA_ERR_LOCK_DEADLOCK ||
> -        error == HA_ERR_LOCK_WAIT_TIMEOUT ||
> -        (keynum= table->file->get_dup_key(error)) < 0 ||
> -        !overwrite)
> -    {
> -      DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
> -      /*
> -        Deadlock, waiting for lock or just an error from the handler
> -        such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
> -        Retrieval of the duplicate key number may fail
> -        - either because the error was not "duplicate key" error
> -        - or because the information which key is not available
> -      */
> -      table->file->print_error(error, MYF(0));
> -      goto error;
> +      default:
> +        DBUG_ASSERT(0);
> +        error= 1;
> +        goto AFTER_MAIN_EXEC_ROW_LOOP;
> +        break;
>      }
> -    /*
> -       We need to retrieve the old row into record[1] to be able to
> -       either update or delete the offending record.  We either:
> -
> -       - use ha_rnd_pos() with a row-id (available as dupp_row) to the
> -         offending row, if that is possible (MyISAM and Blackhole), or else
> -
> -       - use ha_index_read_idx_map() with the key that is duplicated, to
> -         retrieve the offending row.
> +    
> +    /**
> +       Skip update rows events that don't have data for this slave's
> +       table.
>       */
> -    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
> +    if ((get_type_code() == UPDATE_ROWS_EVENT) &&
> +        !is_any_column_signaled_for_table(table, &m_cols_ai))
> +      goto AFTER_MAIN_EXEC_ROW_LOOP;
> +
> +    /**
> +       If there are no columns marked in the read_set for this table,
> +       that means that we cannot lookup any row using the available BI
> +       in the binary log. Thence, we immediatly raise an error:
> +       HA_ERR_END_OF_FILE.
> +     */
> +    if ((m_rows_lookup_algorithm != ROW_LOOKUP_NOT_NEEDED) && 
> +        !is_any_column_signaled_for_table(table, &m_cols))
>      {
> -      DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
> +      error= HA_ERR_END_OF_FILE;
> +      goto AFTER_MAIN_EXEC_ROW_LOOP;
> +    }
> +        
> +    // row processing loop
>  
> -      if (table->file->inited && (error=
> table->file->ha_index_end()))
> -        DBUG_RETURN(error);
> -      if ((error= table->file->ha_rnd_init(FALSE)))
> -        DBUG_RETURN(error);
> +    do {
> +      
> +      error= (this->*do_apply_row_ptr)(rli);
> +      
> +      if (handle_idempotent_errors(rli, &error))
> +        break;
> +      
> +      /* this advances m_curr_row */
> +      do_post_row_operations(rli, error);
> +      
> +    } while (!error && (m_curr_row != m_rows_end));
>  
> -      error= table->file->ha_rnd_pos(table->record[1],
> table->file->dup_ref);
> +AFTER_MAIN_EXEC_ROW_LOOP:
>  
> -      table->file->ha_rnd_end();
> -      if (error)
> -      {
> -        DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
> -        if (error == HA_ERR_RECORD_DELETED)
> -          error= HA_ERR_KEY_NOT_FOUND;
> -        table->file->print_error(error, MYF(0));
> -        goto error;
> -      }
> +    {/**
> +         The following failure injecion works in cooperation with tests 
> +         setting @@global.debug= 'd,stop_slave_middle_group'.
> +         The sql thread receives the killed status and will proceed 
> +         to shutdown trying to finish incomplete events group.
> +     */
> +      DBUG_EXECUTE_IF("stop_slave_middle_group",
> +                      if (thd->transaction.all.modified_non_trans_table)
> +                        const_cast<Relay_log_info*>(rli)->abort_slave=
> 1;);
>      }
> -    else
> +
> +    if ((error= do_after_row_operations(rli, error)) &&
> +        ignored_error_code(convert_handler_error(error, thd, table)))
>      {
> -      DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
>  
> -      if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
> -      {
> -        DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
> -        error= my_errno;
> -        goto error;
> -      }
> +      if (global_system_variables.log_warnings)
> +        slave_rows_error_report(WARNING_LEVEL, error, rli, thd, table,
> +                                get_type_str(),
> +                               
> const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
> +                                (ulong) log_pos);
> +      clear_all_errors(thd, const_cast<Relay_log_info*>(rli));
> +      error= 0;
> +    }
> +  } // if (table)
>  
> -      if (key.get() == NULL)
> -      {
> -       
> key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
> -        if (key.get() == NULL)
> -        {
> -          DBUG_PRINT("info",("Can't allocate key buffer"));
> -          error= ENOMEM;
> -          goto error;
> -        }
> -      }
> +  
> +  if (error)
> +  {
> +    slave_rows_error_report(ERROR_LEVEL, error, rli, thd, table,
> +                             get_type_str(),
> +                            
> const_cast<Relay_log_info*>(rli)->get_rpl_log_name(),
> +                             (ulong) log_pos);
> +    /*
> +      @todo We should probably not call
> +      reset_current_stmt_binlog_format_row() from here.
>  
> -      key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
> -               0);
> -      error= table->file->ha_index_read_idx_map(table->record[1], keynum,
> -                                                (const uchar*)key.get(),
> -                                                HA_WHOLE_KEY,
> -                                                HA_READ_KEY_EXACT);
> -      if (error)
> -      {
> -        DBUG_PRINT("info",("ha_index_read_idx_map() returns %s", HA_ERR(error)));
> -        if (error == HA_ERR_RECORD_DELETED)
> -          error= HA_ERR_KEY_NOT_FOUND;
> -        table->file->print_error(error, MYF(0));
> -        goto error;
> -      }
> -    }
> +      Note: this applies to log_event_old.cc too.
> +      /Sven
> +    */
> +    thd->reset_current_stmt_binlog_format_row();
> +    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, error);
> +    thd->is_slave_error= 1;
> +    DBUG_RETURN(error);
> +  }
> +
> +  if (get_flags(STMT_END_F))
> +    if ((error= rows_event_stmt_cleanup(rli, thd)))
> +      rli->report(ERROR_LEVEL, error,
> +                  "Error in %s event: commit of row events failed, "
> +                  "table `%s`.`%s`",
> +                  get_type_str(), m_table->s->db.str,
> +                  m_table->s->table_name.str);
> +
> +  DBUG_RETURN(error);
> +}
> +
> +Log_event::enum_skip_reason
> +Rows_log_event::do_shall_skip(Relay_log_info *rli)
> +{
> +  /*
> +    If the slave skip counter is 1 and this event does not end a
> +    statement, then we should not start executing on the next event.
> +    Otherwise, we defer the decision to the normal skipping logic.
> +  */
> +  if (rli->slave_skip_counter == 1 && !get_flags(STMT_END_F))
> +    return Log_event::EVENT_SKIP_IGNORE;
> +  else
> +    return Log_event::do_shall_skip(rli);
> +}
> +
> +/**
> +   The function is called at Rows_log_event statement commit time,
> +   normally from Rows_log_event::do_update_pos() and possibly from
> +   Query_log_event::do_apply_event() of the COMMIT.
> +   The function commits the last statement for engines, binlog and
> +   releases resources have been allocated for the statement.
> +  
> +   @retval  0         Ok.
> +   @retval  non-zero  Error at the commit.
> + */
>  
> +static int rows_event_stmt_cleanup(Relay_log_info const *rli, THD * thd)
> +{
> +  int error;
> +  {
>      /*
> -       Now, record[1] should contain the offending row.  That
> -       will enable us to update it or, alternatively, delete it (so
> -       that we can insert the new row afterwards).
> -     */
> +      This is the end of a statement or transaction, so close (and
> +      unlock) the tables we opened when processing the
> +      Table_map_log_event starting the statement.
> +
> +      OBSERVER.  This will clear *all* mappings, not only those that
> +      are open for the table. There is not good handle for on-close
> +      actions for tables.
> +
> +      NOTE. Even if we have no table ('table' == 0) we still need to be
> +      here, so that we increase the group relay log position. If we didn't, we
> +      could have a group relay log position which lags behind "forever"
> +      (assume the last master's transaction is ignored by the slave because of
> +      replicate-ignore rules).
> +    */
> +    error= thd->binlog_flush_pending_rows_event(TRUE);
>  
>      /*
> -      If row is incomplete we will use the record found to fill 
> -      missing columns.  
> +      If this event is not in a transaction, the call below will, if some
> +      transactional storage engines are involved, commit the statement into
> +      them and flush the pending event to binlog.
> +      If this event is in a transaction, the call will do nothing, but a
> +      Xid_log_event will come next which will, if some transactional engines
> +      are involved, commit the transaction and flush the pending event to the
> +      binlog.
>      */
> -    if (!get_flags(COMPLETE_ROWS_F))
> -    {
> -      restore_record(table,record[1]);
> -      error= unpack_current_row(rli, &m_cols);
> -    }
> +    error|= (error ? trans_rollback_stmt(thd) : trans_commit_stmt(thd));
>  
> -#ifndef DBUG_OFF
> -    DBUG_PRINT("debug",("preparing for update: before and after image"));
> -    DBUG_DUMP("record[1] (before)", table->record[1],
> table->s->reclength);
> -    DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
> -#endif
> +    /*
> +      Now what if this is not a transactional engine? we still need to
> +      flush the pending event to the binlog; we did it with
> +      thd->binlog_flush_pending_rows_event(). Note that we imitate
> +      what is done for real queries: a call to
> +      ha_autocommit_or_rollback() (sometimes only if involves a
> +      transactional engine), and a call to be sure to have the pending
> +      event flushed.
> +    */
>  
>      /*
> -       REPLACE is defined as either INSERT or DELETE + INSERT.  If
> -       possible, we can replace it with an UPDATE, but that will not
> -       work on InnoDB if FOREIGN KEY checks are necessary.
> +      @todo We should probably not call
> +      reset_current_stmt_binlog_format_row() from here.
>  
> -       I (Matz) am not sure of the reason for the last_uniq_key()
> -       check as, but I'm guessing that it's something along the
> -       following lines.
> +      Note: this applies to log_event_old.cc too
>  
> -       Suppose that we got the duplicate key to be a key that is not
> -       the last unique key for the table and we perform an update:
> -       then there might be another key for which the unique check will
> -       fail, so we're better off just deleting the row and inserting
> -       the correct row.
> -     */
> -    if (last_uniq_key(table, keynum) &&
> -        !table->file->referenced_by_foreign_key())
> -    {
> -      DBUG_PRINT("info",("Updating row using ha_update_row()"));
> -      error=table->file->ha_update_row(table->record[1],
> -                                       table->record[0]);
> -      switch (error) {
> -                
> -      case HA_ERR_RECORD_IS_THE_SAME:
> -        DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
> -                           " ha_update_row()"));
> -        error= 0;
> -      
> -      case 0:
> -        break;
> -        
> -      default:    
> -        DBUG_PRINT("info",("ha_update_row() returns error %d",error));
> -        table->file->print_error(error, MYF(0));
> -      }
> -      
> -      goto error;
> -    }
> -    else
> -    {
> -      DBUG_PRINT("info",("Deleting offending row and trying to write new one
> again"));
> -      if ((error= table->file->ha_delete_row(table->record[1])))
> -      {
> -        DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
> -        table->file->print_error(error, MYF(0));
> -        goto error;
> -      }
> -      /* Will retry ha_write_row() with the offending row removed. */
> +      Btw, the previous comment about transactional engines does not
> +      seem related to anything that happens here.
> +      /Sven
> +    */
> +    thd->reset_current_stmt_binlog_format_row();
> +
> +    const_cast<Relay_log_info*>(rli)->cleanup_context(thd, 0);
> +  }
> +  return error;
> +}
> +
> +/**
> +   The method either increments the relay log position or
> +   commits the current statement and increments the master group 
> +   possition if the event is STMT_END_F flagged and
> +   the statement corresponds to the autocommit query (i.e replicated
> +   without wrapping in BEGIN/COMMIT)
> +
> +   @retval 0         Success
> +   @retval non-zero  Error in the statement commit
> + */
> +int
> +Rows_log_event::do_update_pos(Relay_log_info *rli)
> +{
> +  DBUG_ENTER("Rows_log_event::do_update_pos");
> +  int error= 0;
> +
> +  DBUG_PRINT("info", ("flags: %s",
> +                      get_flags(STMT_END_F) ? "STMT_END_F " : ""));
> +
> +  if (get_flags(STMT_END_F))
> +  {
> +    /*
> +      Indicate that a statement is finished.
> +      Step the group log position if we are not in a transaction,
> +      otherwise increase the event log position.
> +    */
> +    rli->stmt_done(log_pos);
> +    /*
> +      Clear any errors in thd->net.last_err*. It is not known if this is
> +      needed or not. It is believed that any errors that may exist in
> +      thd->net.last_err* are allowed. Examples of errors are "key not
> +      found", which is produced in the test case rpl_row_conflicts.test
> +    */
> +    thd->clear_error();
> +  }
> +  else
> +  {
> +    rli->inc_event_relay_log_pos();
> +  }
> +
> +  DBUG_RETURN(error);
> +}
> +
> +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> +
> +#ifndef MYSQL_CLIENT
> +bool Rows_log_event::write_data_header(IO_CACHE *file)
> +{
> +  uchar buf[ROWS_HEADER_LEN];	// No need to init the buffer
> +  DBUG_ASSERT(m_table_id != ~0UL);
> +  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
> +                  {
> +                    int4store(buf + 0, m_table_id);
> +                    int2store(buf + 4, m_flags);
> +                    return (my_b_safe_write(file, buf, 6));
> +                  });
> +  int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
> +  int2store(buf + RW_FLAGS_OFFSET, m_flags);
> +  return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
> +}
> +
> +bool Rows_log_event::write_data_body(IO_CACHE*file)
> +{
> +  /*
> +     Note that this should be the number of *bits*, not the number of
> +     bytes.
> +  */
> +  uchar sbuf[sizeof(m_width) + 1];
> +  my_ptrdiff_t const data_size= m_rows_cur - m_rows_buf;
> +  bool res= false;
> +  uchar *const sbuf_end= net_store_length(sbuf, (size_t) m_width);
> +  DBUG_ASSERT(static_cast<size_t>(sbuf_end - sbuf) <= sizeof(sbuf));
> +
> +  DBUG_DUMP("m_width", sbuf, (size_t) (sbuf_end - sbuf));
> +  res= res || my_b_safe_write(file, sbuf, (size_t) (sbuf_end - sbuf));
> +
> +  DBUG_DUMP("m_cols", (uchar*) m_cols.bitmap, no_bytes_in_map(&m_cols));
> +  res= res || my_b_safe_write(file, (uchar*) m_cols.bitmap,
> +                              no_bytes_in_map(&m_cols));
> +  /*
> +    TODO[refactor write]: Remove the "down cast" here (and elsewhere).
> +   */
> +  if (get_type_code() == UPDATE_ROWS_EVENT)
> +  {
> +    DBUG_DUMP("m_cols_ai", (uchar*) m_cols_ai.bitmap,
> +              no_bytes_in_map(&m_cols_ai));
> +    res= res || my_b_safe_write(file, (uchar*) m_cols_ai.bitmap,
> +                                no_bytes_in_map(&m_cols_ai));
> +  }
> +  DBUG_DUMP("rows", m_rows_buf, data_size);
> +  res= res || my_b_safe_write(file, m_rows_buf, (size_t) data_size);
> +
> +  return res;
> +
> +}
> +#endif
> +
> +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
> +void Rows_log_event::pack_info(Protocol *protocol)
> +{
> +  char buf[256];
> +  char const *const flagstr=
> +    get_flags(STMT_END_F) ? " flags: STMT_END_F" : "";
> +  size_t bytes= my_snprintf(buf, sizeof(buf),
> +                               "table_id: %lu%s", m_table_id, flagstr);
> +  protocol->store(buf, bytes, &my_charset_bin);
> +}
> +#endif
> +
> +#ifdef MYSQL_CLIENT
> +void Rows_log_event::print_helper(FILE *file,
> +                                  PRINT_EVENT_INFO *print_event_info,
> +                                  char const *const name)
> +{
> +  IO_CACHE *const head= &print_event_info->head_cache;
> +  IO_CACHE *const body= &print_event_info->body_cache;
> +  if (!print_event_info->short_form)
> +  {
> +    bool const last_stmt_event= get_flags(STMT_END_F);
> +    print_header(head, print_event_info, !last_stmt_event);
> +    my_b_printf(head, "\t%s: table id %lu%s\n",
> +                name, m_table_id,
> +                last_stmt_event ? " flags: STMT_END_F" : "");
> +    print_base64(body, print_event_info, !last_stmt_event);
> +  }
> +
> +  if (get_flags(STMT_END_F))
> +  {
> +    copy_event_cache_to_file_and_reinit(head, file);
> +    copy_event_cache_to_file_and_reinit(body, file);
> +  }
> +}
> +#endif
> +
> +/**************************************************************************
> +	Table_map_log_event member functions and support functions
> +**************************************************************************/
> +
> +/**
> +  @page How replication of field metadata works.
> +  
> +  When a table map is created, the master first calls 
> +  Table_map_log_event::save_field_metadata() which calculates how many 
> +  values will be in the field metadata. Only those fields that require the 
> +  extra data are added. The method also loops through all of the fields in 
> +  the table calling the method Field::save_field_metadata() which returns the
> +  values for the field that will be saved in the metadata and replicated to
> +  the slave. Once all fields have been processed, the table map is written to
> +  the binlog adding the size of the field metadata and the field metadata to
> +  the end of the body of the table map.
> +
> +  When a table map is read on the slave, the field metadata is read from the 
> +  table map and passed to the table_def class constructor which saves the 
> +  field metadata from the table map into an array based on the type of the 
> +  field. Field metadata values not present (those fields that do not use extra 
> +  data) in the table map are initialized as zero (0). The array size is the 
> +  same as the columns for the table on the slave.
> +
> +  Additionally, values saved for field metadata on the master are saved as a 
> +  string of bytes (uchar) in the binlog. A field may require 1 or more bytes
> +  to store the information. In cases where values require multiple bytes 
> +  (e.g. values > 255), the endian-safe methods are used to properly encode 
> +  the values on the master and decode them on the slave. When the field
> +  metadata values are captured on the slave, they are stored in an array of
> +  type uint16. This allows the least number of casts to prevent casting bugs
> +  when the field metadata is used in comparisons of field attributes. When
> +  the field metadata is used for calculating addresses in pointer math, the
> +  type used is uint32. 
> +*/
> +
> +#if !defined(MYSQL_CLIENT)
> +/**
> +  Save the field metadata based on the real_type of the field.
> +  The metadata saved depends on the type of the field. Some fields
> +  store a single byte for pack_length() while others store two bytes
> +  for field_length (max length).
> +  
> +  @retval  0  Ok.
> +
> +  @todo
> +  We may want to consider changing the encoding of the information.
> +  Currently, the code attempts to minimize the number of bytes written to 
> +  the tablemap. There are at least two other alternatives; 1) using 
> +  net_store_length() to store the data allowing it to choose the number of
> +  bytes that are appropriate thereby making the code much easier to 
> +  maintain (only 1 place to change the encoding), or 2) use a fixed number
> +  of bytes for each field. The problem with option 1 is that net_store_length()
> +  will use one byte if the value < 251, but 3 bytes if it is > 250. Thus,
> +  for fields like CHAR which can be no larger than 255 characters, the method
> +  will use 3 bytes when the value is > 250. Further, every value that is
> +  encoded using 2 parts (e.g., pack_length, field_length) will be numerically
> +  > 250 therefore will use 3 bytes for eah value. The problem with option 2
> +  is less wasteful for space but does waste 1 byte for every field that does
> +  not encode 2 parts. 
> +*/
> +int Table_map_log_event::save_field_metadata()
> +{
> +  DBUG_ENTER("Table_map_log_event::save_field_metadata");
> +  int index= 0;
> +  for (unsigned int i= 0 ; i < m_table->s->fields ; i++)
> +  {
> +    DBUG_PRINT("debug", ("field_type: %d", m_coltype[i]));
> +    index+=
> m_table->s->field[i]->save_field_metadata(&m_field_metadata[index]);
> +  }
> +  DBUG_RETURN(index);
> +}
> +#endif /* !defined(MYSQL_CLIENT) */
> +
> +/*
> +  Constructor used to build an event for writing to the binary log.
> +  Mats says tbl->s lives longer than this event so it's ok to copy pointers
> +  (tbl->s->db etc) and not pointer content.
> + */
> +#if !defined(MYSQL_CLIENT)
> +Table_map_log_event::Table_map_log_event(THD *thd, TABLE *tbl, ulong tid,
> +                                         bool is_transactional)
> +  : Log_event(thd, 0, is_transactional),
> +    m_table(tbl),
> +    m_dbnam(tbl->s->db.str),
> +    m_dblen(m_dbnam ? tbl->s->db.length : 0),
> +    m_tblnam(tbl->s->table_name.str),
> +    m_tbllen(tbl->s->table_name.length),
> +    m_colcnt(tbl->s->fields),
> +    m_memory(NULL),
> +    m_table_id(tid),
> +    m_flags(TM_BIT_LEN_EXACT_F),
> +    m_data_size(0),
> +    m_field_metadata(0),
> +    m_field_metadata_size(0),
> +    m_null_bits(0),
> +    m_meta_memory(NULL)
> +{
> +  uchar cbuf[sizeof(m_colcnt) + 1];
> +  uchar *cbuf_end;
> +  DBUG_ASSERT(m_table_id != ~0UL);
> +  /*
> +    In TABLE_SHARE, "db" and "table_name" are 0-terminated (see this comment in
> +    table.cc / alloc_table_share():
> +      Use the fact the key is db/0/table_name/0
> +    As we rely on this let's assert it.
> +  */
> +  DBUG_ASSERT((tbl->s->db.str == 0) ||
> +              (tbl->s->db.str[tbl->s->db.length] == 0));
> +  DBUG_ASSERT(tbl->s->table_name.str[tbl->s->table_name.length] == 0);
> +
> +
> +  m_data_size=  TABLE_MAP_HEADER_LEN;
> +  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master", m_data_size= 6;);
> +  m_data_size+= m_dblen + 2;	// Include length and terminating \0
> +  m_data_size+= m_tbllen + 2;	// Include length and terminating \0
> +  cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
> +  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
> +  m_data_size+= (cbuf_end - cbuf) + m_colcnt;	// COLCNT and column types
> +
> +  /* If malloc fails, caught in is_valid() */
> +  if ((m_memory= (uchar*) my_malloc(m_colcnt, MYF(MY_WME))))
> +  {
> +    m_coltype= reinterpret_cast<uchar*>(m_memory);
> +    for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
> +      m_coltype[i]= m_table->field[i]->type();
> +  }
> +
> +  /*
> +    Calculate a bitmap for the results of maybe_null() for all columns.
> +    The bitmap is used to determine when there is a column from the master
> +    that is not on the slave and is null and thus not in the row data during
> +    replication.
> +  */
> +  uint num_null_bytes= (m_table->s->fields + 7) / 8;
> +  m_data_size+= num_null_bytes;
> +  m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
> +                                 &m_null_bits, num_null_bytes,
> +                                 &m_field_metadata, (m_colcnt * 2),
> +                                 NULL);
> +
> +  bzero(m_field_metadata, (m_colcnt * 2));
> +
> +  /*
> +    Create an array for the field metadata and store it.
> +  */
> +  m_field_metadata_size= save_field_metadata();
> +  DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
> +
> +  /*
> +    Now set the size of the data to the size of the field metadata array
> +    plus one or three bytes (see pack.c:net_store_length) for number of 
> +    elements in the field metadata array.
> +  */
> +  if (m_field_metadata_size < 251)
> +    m_data_size+= m_field_metadata_size + 1; 
> +  else
> +    m_data_size+= m_field_metadata_size + 3; 
> +
> +  bzero(m_null_bits, num_null_bytes);
> +  for (unsigned int i= 0 ; i < m_table->s->fields ; ++i)
> +    if (m_table->field[i]->maybe_null())
> +      m_null_bits[(i / 8)]+= 1 << (i % 8);
> +
> +}
> +#endif /* !defined(MYSQL_CLIENT) */
> +
> +/*
> +  Constructor used by slave to read the event from the binary log.
> + */
> +#if defined(HAVE_REPLICATION)
> +Table_map_log_event::Table_map_log_event(const char *buf, uint event_len,
> +                                         const Format_description_log_event
> +                                         *description_event)
> +
> +  : Log_event(buf, description_event),
> +#ifndef MYSQL_CLIENT
> +    m_table(NULL),
> +#endif
> +    m_dbnam(NULL), m_dblen(0), m_tblnam(NULL), m_tbllen(0),
> +    m_colcnt(0), m_coltype(0),
> +    m_memory(NULL), m_table_id(ULONG_MAX), m_flags(0),
> +    m_data_size(0), m_field_metadata(0), m_field_metadata_size(0),
> +    m_null_bits(0), m_meta_memory(NULL)
> +{
> +  unsigned int bytes_read= 0;
> +  DBUG_ENTER("Table_map_log_event::Table_map_log_event(const char*,uint,...)");
> +
> +  uint8 common_header_len= description_event->common_header_len;
> +  uint8 post_header_len= description_event->post_header_len[TABLE_MAP_EVENT-1];
> +  DBUG_PRINT("info",("event_len: %u  common_header_len: %d  post_header_len: %d",
> +                     event_len, common_header_len, post_header_len));
> +
> +  /*
> +    Don't print debug messages when running valgrind since they can
> +    trigger false warnings.
> +   */
> +#ifndef HAVE_purify
> +  DBUG_DUMP("event buffer", (uchar*) buf, event_len);
> +#endif
> +
> +  /* Read the post-header */
> +  const char *post_start= buf + common_header_len;
> +
> +  post_start+= TM_MAPID_OFFSET;
> +  if (post_header_len == 6)
> +  {
> +    /* Master is of an intermediate source tree before 5.1.4. Id is 4 bytes */
> +    m_table_id= uint4korr(post_start);
> +    post_start+= 4;
> +  }
> +  else
> +  {
> +    DBUG_ASSERT(post_header_len == TABLE_MAP_HEADER_LEN);
> +    m_table_id= (ulong) uint6korr(post_start);
> +    post_start+= TM_FLAGS_OFFSET;
> +  }
> +
> +  DBUG_ASSERT(m_table_id != ~0UL);
> +
> +  m_flags= uint2korr(post_start);
> +
> +  /* Read the variable part of the event */
> +  const char *const vpart= buf + common_header_len + post_header_len;
> +
> +  /* Extract the length of the various parts from the buffer */
> +  uchar const *const ptr_dblen= (uchar const*)vpart + 0;
> +  m_dblen= *(uchar*) ptr_dblen;
> +
> +  /* Length of database name + counter + terminating null */
> +  uchar const *const ptr_tbllen= ptr_dblen + m_dblen + 2;
> +  m_tbllen= *(uchar*) ptr_tbllen;
> +
> +  /* Length of table name + counter + terminating null */
> +  uchar const *const ptr_colcnt= ptr_tbllen + m_tbllen + 2;
> +  uchar *ptr_after_colcnt= (uchar*) ptr_colcnt;
> +  m_colcnt= net_field_length(&ptr_after_colcnt);
> +
> +  DBUG_PRINT("info",("m_dblen: %lu  off: %ld  m_tbllen: %lu  off: %ld  m_colcnt: %lu
>  off: %ld",
> +                     (ulong) m_dblen, (long) (ptr_dblen-(const uchar*)vpart), 
> +                     (ulong) m_tbllen, (long) (ptr_tbllen-(const uchar*)vpart),
> +                     m_colcnt, (long) (ptr_colcnt-(const uchar*)vpart)));
> +
> +  /* Allocate mem for all fields in one go. If fails, caught in is_valid() */
> +  m_memory= (uchar*) my_multi_malloc(MYF(MY_WME),
> +                                     &m_dbnam, (uint) m_dblen + 1,
> +                                     &m_tblnam, (uint) m_tbllen + 1,
> +                                     &m_coltype, (uint) m_colcnt,
> +                                     NullS);
> +
> +  if (m_memory)
> +  {
> +    /* Copy the different parts into their memory */
> +    strncpy(const_cast<char*>(m_dbnam), (const char*)ptr_dblen  + 1, m_dblen +
> 1);
> +    strncpy(const_cast<char*>(m_tblnam), (const char*)ptr_tbllen + 1, m_tbllen
> + 1);
> +    memcpy(m_coltype, ptr_after_colcnt, m_colcnt);
> +
> +    ptr_after_colcnt= ptr_after_colcnt + m_colcnt;
> +    bytes_read= (uint) (ptr_after_colcnt - (uchar *)buf);
> +    DBUG_PRINT("info", ("Bytes read: %d.\n", bytes_read));
> +    if (bytes_read < event_len)
> +    {
> +      m_field_metadata_size= net_field_length(&ptr_after_colcnt);
> +      DBUG_ASSERT(m_field_metadata_size <= (m_colcnt * 2));
> +      uint num_null_bytes= (m_colcnt + 7) / 8;
> +      m_meta_memory= (uchar *)my_multi_malloc(MYF(MY_WME),
> +                                     &m_null_bits, num_null_bytes,
> +                                     &m_field_metadata, m_field_metadata_size,
> +                                     NULL);
> +      memcpy(m_field_metadata, ptr_after_colcnt, m_field_metadata_size);
> +      ptr_after_colcnt= (uchar*)ptr_after_colcnt + m_field_metadata_size;
> +      memcpy(m_null_bits, ptr_after_colcnt, num_null_bytes);
>      }
>    }
>  
> -error:
> -  m_table->default_column_bitmaps();
> -  DBUG_RETURN(error);
> +  DBUG_VOID_RETURN;
>  }
> -
>  #endif
>  
> -int
> -Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
> +Table_map_log_event::~Table_map_log_event()
>  {
> -  DBUG_ASSERT(m_table != NULL);
> -  int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
> -
> -  if (error && !thd->is_error())
> -  {
> -    DBUG_ASSERT(0);
> -    my_error(ER_UNKNOWN_ERROR, MYF(0));
> -  }
> -
> -  return error;
> +  my_free(m_meta_memory);
> +  my_free(m_memory);
>  }
>  
> -#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> -
> -#ifdef MYSQL_CLIENT
> -void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
> -{
> -  Rows_log_event::print_helper(file, print_event_info, "Write_rows");
> -}
> -#endif
> +/*
> +  Return value is an error code, one of:
>  
> -/**************************************************************************
> -	Delete_rows_log_event member functions
> -**************************************************************************/
> +      -1     Failure to open table   [from open_tables()]
> +       0     Success
> +       1     No room for more tables [from set_table()]
> +       2     Out of memory           [from set_table()]
> +       3     Wrong table definition
> +       4     Daisy-chaining RBR with SBR not possible
> + */
>  
>  #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> -/*
> -  Compares table->record[0] and table->record[1]
> -
> -  Returns TRUE if different.
> -*/
> -static bool record_compare(TABLE *table, MY_BITMAP *cols)
> +int Table_map_log_event::do_apply_event(Relay_log_info const *rli)
>  {
> -  /*
> -    Need to set the X bit and the filler bits in both records since
> -    there are engines that do not set it correctly.
> -
> -    In addition, since MyISAM checks that one hasn't tampered with the
> -    record, it is necessary to restore the old bytes into the record
> -    after doing the comparison.
> -
> -    TODO[record format ndb]: Remove it once NDB returns correct
> -    records. Check that the other engines also return correct records.
> -   */
> +  RPL_TABLE_LIST *table_list;
> +  char *db_mem, *tname_mem;
> +  size_t dummy_len;
> +  void *memory;
> +  DBUG_ENTER("Table_map_log_event::do_apply_event(Relay_log_info*)");
> +  DBUG_ASSERT(rli->info_thd == thd);
>  
> -  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> -  DBUG_DUMP("record[1]", table->record[1], table->s->reclength);
> +  /* Step the query id to mark what columns that are actually used. */
> +  thd->set_query_id(next_query_id());
>  
> -  bool result= FALSE;
> -  uchar saved_x[2]= {0, 0}, saved_filler[2]= {0, 0};
> +  if (!(memory= my_multi_malloc(MYF(MY_WME),
> +                                &table_list, (uint) sizeof(RPL_TABLE_LIST),
> +                                &db_mem, (uint) NAME_LEN + 1,
> +                                &tname_mem, (uint) NAME_LEN + 1,
> +                                NullS)))
> +    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
>  
> -  if (table->s->null_bytes > 0)
> -  {
> -    for (int i = 0 ; i < 2 ; ++i)
> -    {
> -      /* 
> -        If we have an X bit then we need to take care of it.
> -      */
> -      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
> -      {
> -        saved_x[i]= table->record[i][0];
> -        table->record[i][0]|= 1U;
> -      }
> +  strmov(db_mem, rpl_filter->get_rewrite_db(m_dbnam, &dummy_len));
> +  strmov(tname_mem, m_tblnam);
>  
> -      /*
> -         If (last_null_bit_pos == 0 && null_bytes > 1), then:
> +  table_list->init_one_table(db_mem, strlen(db_mem),
> +                             tname_mem, strlen(tname_mem),
> +                             tname_mem, TL_WRITE);
>  
> -         X bit (if any) + N nullable fields + M Field_bit fields = 8 bits 
> +  table_list->table_id= m_table_id;
> +  table_list->updating= 1;
>  
> -         Ie, the entire byte is used.
> -      */
> -      if (table->s->last_null_bit_pos > 0)
> -      {
> -        saved_filler[i]= table->record[i][table->s->null_bytes - 1];
> -        table->record[i][table->s->null_bytes - 1]|=
> -          256U - (1U << table->s->last_null_bit_pos);
> -      }
> -    }
> -  }
> +  int error= 0;
>  
> -  if (table->s->blob_fields + table->s->varchar_fields == 0 &&
> -      bitmap_is_set_all(cols))
> +  if (rli->info_thd->slave_thread /* filtering is for slave only */
> &&
> +      (!rpl_filter->db_ok(table_list->db) ||
> +       (rpl_filter->is_on() && !rpl_filter->tables_ok("",
> table_list))))
>    {
> -    result= cmp_record(table,record[1]);
> -    goto record_compare_exit;
> +    my_free(memory);
>    }
> -
> -  /* Compare null bits */
> -  if (bitmap_is_set_all(cols) &&
> -      memcmp(table->null_flags,
> -	     table->null_flags+table->s->rec_buff_length,
> -	     table->s->null_bytes))
> +  else
>    {
> -    result= TRUE;				// Diff in NULL value
> -    goto record_compare_exit;
> -  }
> +    DBUG_ASSERT(thd->lex->query_tables != table_list);
>  
> -  /* Compare updated fields */
> -  for (Field **ptr=table->field ; 
> -       *ptr && ((*ptr)->field_index < cols->n_bits);
> -       ptr++)
> -  {
> -    if (bitmap_is_set(cols, (*ptr)->field_index))
> -    {
> -      if ((*ptr)->cmp_binary_offset(table->s->rec_buff_length))
> -      {
> -        result= TRUE;
> -        goto record_compare_exit;
> -      }
> -    }
> +    /*
> +      Use placement new to construct the table_def instance in the
> +      memory allocated for it inside table_list.
> +
> +      The memory allocated by the table_def structure (i.e., not the
> +      memory allocated *for* the table_def structure) is released
> +      inside Relay_log_info::clear_tables_to_lock() by calling the
> +      table_def destructor explicitly.
> +    */
> +    new (&table_list->m_tabledef)
> +      table_def(m_coltype, m_colcnt,
> +                m_field_metadata, m_field_metadata_size,
> +                m_null_bits, m_flags);
> +    table_list->m_tabledef_valid= TRUE;
> +
> +    /*
> +      We record in the slave's information that the table should be
> +      locked by linking the table into the list of tables to lock.
> +    */
> +    table_list->next_global= table_list->next_local= rli->tables_to_lock;
> +    const_cast<Relay_log_info*>(rli)->tables_to_lock= table_list;
> +    const_cast<Relay_log_info*>(rli)->tables_to_lock_count++;
> +    /* 'memory' is freed in clear_tables_to_lock */
>    }
>  
> -record_compare_exit:
> -  /*
> -    Restore the saved bytes.
> +  DBUG_RETURN(error);
> +}
>  
> -    TODO[record format ndb]: Remove this code once NDB returns the
> -    correct record format.
> +Log_event::enum_skip_reason
> +Table_map_log_event::do_shall_skip(Relay_log_info *rli)
> +{
> +  /*
> +    If the slave skip counter is 1, then we should not start executing
> +    on the next event.
>    */
> -  if (table->s->null_bytes > 0)
> -  {
> -    for (int i = 0 ; i < 2 ; ++i)
> -    {
> -      if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
> -        table->record[i][0]= saved_x[i];
> -
> -      if (table->s->last_null_bit_pos)
> -        table->record[i][table->s->null_bytes - 1]= saved_filler[i];
> -    }
> -  }
> +  return continue_group(rli);
> +}
>  
> -  return result;
> +int Table_map_log_event::do_update_pos(Relay_log_info *rli)
> +{
> +  rli->inc_event_relay_log_pos();
> +  return 0;
>  }
>  
> +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
>  
> -/**
> -  Checks if any of the columns in the given table is
> -  signaled in the bitmap.
> +#ifndef MYSQL_CLIENT
> +bool Table_map_log_event::write_data_header(IO_CACHE *file)
> +{
> +  DBUG_ASSERT(m_table_id != ~0UL);
> +  uchar buf[TABLE_MAP_HEADER_LEN];
> +  DBUG_EXECUTE_IF("old_row_based_repl_4_byte_map_id_master",
> +                  {
> +                    int4store(buf + 0, m_table_id);
> +                    int2store(buf + 4, m_flags);
> +                    return (my_b_safe_write(file, buf, 6));
> +                  });
> +  int6store(buf + TM_MAPID_OFFSET, (ulonglong)m_table_id);
> +  int2store(buf + TM_FLAGS_OFFSET, m_flags);
> +  return (my_b_safe_write(file, buf, TABLE_MAP_HEADER_LEN));
> +}
>  
> -  For each column in the given table checks if it is
> -  signaled in the bitmap. This is most useful when deciding
> -  whether a before image (BI) can be used or not for 
> -  searching a row. If no column is signaled, then the 
> -  image cannot be used for searching a record (regardless 
> -  of using position(), index scan or table scan). Here is 
> -  an example:
> +bool Table_map_log_event::write_data_body(IO_CACHE *file)
> +{
> +  DBUG_ASSERT(m_dbnam != NULL);
> +  DBUG_ASSERT(m_tblnam != NULL);
> +  /* We use only one byte per length for storage in event: */
> +  DBUG_ASSERT(m_dblen < 128);
> +  DBUG_ASSERT(m_tbllen < 128);
>  
> -  MASTER> SET @@binlog_row_image='MINIMAL';
> -  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
> -  SLAVE> CREATE TABLE t1 (a int, b int);
> -  MASTER> INSERT INTO t1 VALUES (1,2,3);
> -  MASTER> UPDATE t1 SET a=2 WHERE b=2;
> +  uchar const dbuf[]= { (uchar) m_dblen };
> +  uchar const tbuf[]= { (uchar) m_tbllen };
>  
> -  For the update statement only the PK (column c) is 
> -  logged in the before image (BI). As such, given that 
> -  the slave has no column c, it will not be able to 
> -  find the row, because BI has no values for the columns
> -  the slave knows about (column a and b).
> +  uchar cbuf[sizeof(m_colcnt) + 1];
> +  uchar *const cbuf_end= net_store_length(cbuf, (size_t) m_colcnt);
> +  DBUG_ASSERT(static_cast<size_t>(cbuf_end - cbuf) <= sizeof(cbuf));
>  
> -  @param table   the table reference on the slave.
> -  @param cols the bitmap signaling columns available in 
> -                 the BI.
> +  /*
> +    Store the size of the field metadata.
> +  */
> +  uchar mbuf[sizeof(m_field_metadata_size)];
> +  uchar *const mbuf_end= net_store_length(mbuf, m_field_metadata_size);
> +
> +  return (my_b_safe_write(file, dbuf,      sizeof(dbuf)) ||
> +          my_b_safe_write(file, (const uchar*)m_dbnam,   m_dblen+1) ||
> +          my_b_safe_write(file, tbuf,      sizeof(tbuf)) ||
> +          my_b_safe_write(file, (const uchar*)m_tblnam,  m_tbllen+1) ||
> +          my_b_safe_write(file, cbuf, (size_t) (cbuf_end - cbuf)) ||
> +          my_b_safe_write(file, m_coltype, m_colcnt) ||
> +          my_b_safe_write(file, mbuf, (size_t) (mbuf_end - mbuf)) ||
> +          my_b_safe_write(file, m_field_metadata, m_field_metadata_size),
> +          my_b_safe_write(file, m_null_bits, (m_colcnt + 7) / 8));
> + }
> +#endif
>  
> -  @return TRUE if BI contains usable colums for searching, 
> -          FALSE otherwise.
> -*/
> -static
> -my_bool is_any_column_signaled_for_table(TABLE *table, MY_BITMAP *cols)
> -{
> +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
>  
> -  int nfields_set= 0;
> -  for (Field **ptr=table->field ; 
> -       *ptr && ((*ptr)->field_index < cols->n_bits);
> -       ptr++)
> -  {
> -    if (bitmap_is_set(cols, (*ptr)->field_index))
> -      nfields_set++;
> -  }
> +/*
> +  Print some useful information for the SHOW BINARY LOG information
> +  field.
> + */
>  
> -  return (nfields_set != 0);
> +#if defined(HAVE_REPLICATION) && !defined(MYSQL_CLIENT)
> +void Table_map_log_event::pack_info(Protocol *protocol)
> +{
> +    char buf[256];
> +    size_t bytes= my_snprintf(buf, sizeof(buf),
> +                                 "table_id: %lu (%s.%s)",
> +                              m_table_id, m_dbnam, m_tblnam);
> +    protocol->store(buf, bytes, &my_charset_bin);
>  }
> +#endif
>  
> -/**
> -  Checks if the fields in the given key are signaled in
> -  the bitmap.
> -
> -  Validates whether the before image is usable for the
> -  given key. It can be the case that the before image
> -  does not contain values for the key (eg, master was
> -  using 'minimal' option for image logging and slave has
> -  different index structure on the table). Here is an
> -  example:
>  
> -  MASTER> SET @@binlog_row_image='MINIMAL';
> -  MASTER> CREATE TABLE t1 (a int, b int, c int, primary key(c));
> -  SLAVE> CREATE TABLE t1 (a int, b int, c int, key(a,c));
> -  MASTER> INSERT INTO t1 VALUES (1,2,3);
> -  MASTER> UPDATE t1 SET a=2 WHERE b=2;
> +#endif
>  
> -  When finding the row on the slave, one cannot use the
> -  index (a,c) to search for the row, because there is only
> -  data in the before image for column c. This function
> -  checks the fields needed for a given key and searches
> -  the bitmap to see if all the fields required are 
> -  signaled.
> -  
> -  @param keyinfo  reference to key.
> -  @param cols     the bitmap signaling which columns 
> -                  have available data.
>  
> -  @return TRUE if all fields are signaled in the bitmap 
> -          for the given key, FALSE otherwise.
> -*/
> -static
> -my_bool are_all_columns_signaled_for_key(KEY *keyinfo, MY_BITMAP *cols)
> +#ifdef MYSQL_CLIENT
> +void Table_map_log_event::print(FILE *file, PRINT_EVENT_INFO *print_event_info)
>  {
> -  for (uint i=0 ; i < keyinfo->key_parts ;i++)
> +  if (!print_event_info->short_form)
>    {
> -    uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
> -    if (fieldnr >= cols->n_bits || 
> -        !bitmap_is_set(cols, fieldnr))
> -      return FALSE;
> +    print_header(&print_event_info->head_cache, print_event_info, TRUE);
> +    my_b_printf(&print_event_info->head_cache,
> +                "\tTable_map: `%s`.`%s` mapped to number %lu\n",
> +                m_dbnam, m_tblnam, m_table_id);
> +    print_base64(&print_event_info->body_cache, print_event_info, TRUE);
>    }
> - 
> -  return TRUE;
>  }
> +#endif
>  
> -/**
> -  Searches the table for a given key that can be used
> -  according to the existing values, ie, columns set
> -  in the bitmap.
> -
> -  The caller can specify which type of key to find by
> -  setting the following flags in the key_type parameter:
> -
> -    - PRI_KEY_FLAG
> -      Returns the primary key.
> -
> -    - UNIQUE_KEY_FLAG
> -      Returns a unique key (flagged with HA_NOSAME)
> -
> -    - MULTIPLE_KEY_FLAG
> -      Returns a key that is not unique (flagged with HA_NOSAME 
> -      and without HA_NULL_PART_KEY) nor PK.
> -
> -  The above flags can be used together, in which case, the
> -  search is conducted in the above listed order. Eg, the 
> -  following flag:
> -
> -    (PRI_KEY_FLAG | UNIQUE_KEY_FLAG | MULTIPLE_KEY_FLAG)
> -
> -  means that a primary key is returned if it is suitable. If
> -  not then the unique keys are searched. If no unique key is
> -  suitable, then the keys are searched. Finally, if no key
> -  is suitable, MAX_KEY is returned.
> +/**************************************************************************
> +	Write_rows_log_event member functions
> +**************************************************************************/
>  
> -  @param table    reference to the table.
> -  @param bi_cols  a bitmap that filters out columns that should
> -                  not be considered while searching the key. 
> -                  Columns that should be considered are set.
> -  @param key_type the type of key to search for.
> +/*
> +  Constructor used to build an event for writing to the binary log.
> + */
> +#if !defined(MYSQL_CLIENT)
> +Write_rows_log_event::Write_rows_log_event(THD *thd_arg, TABLE *tbl_arg,
> +                                           ulong tid_arg,
> +                                           bool is_transactional)
> +  : Rows_log_event(thd_arg, tbl_arg, tid_arg, tbl_arg->write_set,
> is_transactional)
> +{
> +}
> +#endif
>  
> -  @return MAX_KEY if no key, according to the key_type specified
> -          is suitable. Returns the key otherwise.
> +/*
> +  Constructor used by slave to read the event from the binary log.
> + */
> +#ifdef HAVE_REPLICATION
> +Write_rows_log_event::Write_rows_log_event(const char *buf, uint event_len,
> +                                           const Format_description_log_event
> +                                           *description_event)
> +: Rows_log_event(buf, event_len, WRITE_ROWS_EVENT, description_event)
> +{
> +}
> +#endif
>  
> -*/
> -static
> -uint
> -search_key_in_table(TABLE *table, MY_BITMAP *bi_cols, uint key_type)
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +int 
> +Write_rows_log_event::do_before_row_operations(const Slave_reporting_capability
> *const)
>  {
> -  KEY *keyinfo;
> -  uint res= MAX_KEY;
> -  uint key;
> +  int error= 0;
>  
> -  if (key_type & PRI_KEY_FLAG && (table->s->primary_key <
> MAX_KEY))
> +  /**
> +     todo: to introduce a property for the event (handler?) which forces
> +     applying the event in the replace (idempotent) fashion.
> +  */
> +  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
> +      (m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER))
>    {
> -    keyinfo= table->s->key_info + (uint) table->s->primary_key;
> -    if (are_all_columns_signaled_for_key(keyinfo, bi_cols)) 
> -      return table->s->primary_key;
> +    /*
> +      We are using REPLACE semantics and not INSERT IGNORE semantics
> +      when writing rows, that is: new rows replace old rows.  We need to
> +      inform the storage engine that it should use this behaviour.
> +    */
> +    
> +    /* Tell the storage engine that we are using REPLACE semantics. */
> +    thd->lex->duplicates= DUP_REPLACE;
> +    
> +    /*
> +      Pretend we're executing a REPLACE command: this is needed for
> +      InnoDB and NDB Cluster since they are not (properly) checking the
> +      lex->duplicates flag.
> +    */
> +    thd->lex->sql_command= SQLCOM_REPLACE;
> +    /* 
> +       Do not raise the error flag in case of hitting to an unique attribute
> +    */
> +    m_table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
> +    /* 
> +       NDB specific: update from ndb master wrapped as Write_rows
> +       so that the event should be applied to replace slave's row
> +    */
> +    m_table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
> +    /* 
> +       NDB specific: if update from ndb master wrapped as Write_rows
> +       does not find the row it's assumed idempotent binlog applying
> +       is taking place; don't raise the error.
> +    */
> +    m_table->file->extra(HA_EXTRA_IGNORE_NO_KEY);
> +    /*
> +      TODO: the cluster team (Tomas?) says that it's better if the engine knows
> +      how many rows are going to be inserted, then it can allocate needed memory
> +      from the start.
> +    */
>    }
>  
> -  if (key_type & UNIQUE_KEY_FLAG && table->s->uniques)
> -  {
> -    for (key=0,keyinfo= table->key_info ; 
> -         (key < table->s->keys) && (res == MAX_KEY);
> -         key++,keyinfo++)
> -    {
> -      /*
> -        - Unique keys cannot be disabled, thence we skip the check.
> -        - Skip unique keys with nullable parts
> -        - Skip primary keys
> -      */
> -      if (!((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) != HA_NOSAME)
> ||
> -          (key == table->s->primary_key))
> -        continue;
> -      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ? 
> -           key : MAX_KEY;
> +  /*
> +    We need TIMESTAMP_NO_AUTO_SET otherwise ha_write_row() will not use fill
> +    any TIMESTAMP column with data from the row but instead will use
> +    the event's current time.
> +    As we replicate from TIMESTAMP to TIMESTAMP and slave has no extra
> +    columns, we know that all TIMESTAMP columns on slave will receive explicit
> +    data from the row, so TIMESTAMP_NO_AUTO_SET is ok.
> +    When we allow a table without TIMESTAMP to be replicated to a table having
> +    more columns including a TIMESTAMP column, or when we allow a TIMESTAMP
> +    column to be replicated into a BIGINT column and the slave's table has a
> +    TIMESTAMP column, then the slave's TIMESTAMP column will take its value
> +    from set_time() which we called earlier (consistent with SBR). And then in
> +    some cases we won't want TIMESTAMP_NO_AUTO_SET (will require some code to
> +    analyze if explicit data is provided for slave's TIMESTAMP columns).
> +  */
> +  m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
> +  
> +  /* Honor next number column if present */
> +  m_table->next_number_field= m_table->found_next_number_field;
> +  /*
> +   * Fixed Bug#45999, In RBR, Store engine of Slave auto-generates new
> +   * sequence numbers for auto_increment fields if the values of them are 0.
> +   * If generateing a sequence number is decided by the values of
> +   * table->auto_increment_field_not_null and SQL_MODE(if includes
> +   * MODE_NO_AUTO_VALUE_ON_ZERO) in update_auto_increment function.
> +   * SQL_MODE of slave sql thread is always consistency with master's.
> +   * In RBR, auto_increment fields never are NULL.
> +   */
> +  m_table->auto_increment_field_not_null= TRUE;
>  
> -      if (res < MAX_KEY)
> -        return res;
> -    }
> -  }
> +  /**
> +     Sets it to ROW_LOOKUP_NOT_NEEDED.
> +   */
> +  m_rows_lookup_algorithm= decide_row_lookup_algorithm(m_table, &m_cols,
> get_type_code());
> +  DBUG_ASSERT(m_rows_lookup_algorithm==ROW_LOOKUP_NOT_NEEDED);
> +  return error;
> +}
>  
> -  if (key_type & MULTIPLE_KEY_FLAG && table->s->keys)
> +int 
> +Write_rows_log_event::do_after_row_operations(const Slave_reporting_capability
> *const,
> +                                              int error)
> +{
> +  int local_error= 0;
> +  m_table->next_number_field=0;
> +  m_table->auto_increment_field_not_null= FALSE;
> +  if ((slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT) ||
> +      m_table->s->db_type()->db_type == DB_TYPE_NDBCLUSTER)
>    {
> -    for (key=0,keyinfo= table->key_info ; 
> -         (key < table->s->keys) && (res == MAX_KEY);
> -         key++,keyinfo++)
> -    {
> -      /*
> -        - Skip innactive keys
> -        - Skip unique keys without nullable parts
> -        - Skip primary keys
> -      */
> -      if (!(table->s->keys_in_use.is_set(key)) ||
> -          ((keyinfo->flags & (HA_NOSAME | HA_NULL_PART_KEY)) == HA_NOSAME)
> ||
> -          (key == table->s->primary_key))
> -        continue;
> -
> -      res= are_all_columns_signaled_for_key(keyinfo, bi_cols) ? 
> -           key : MAX_KEY;
> -
> -      if (res < MAX_KEY)
> -        return res;
> -    }
> +    m_table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
> +    m_table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
> +    /*
> +      resetting the extra with 
> +      table->file->extra(HA_EXTRA_NO_IGNORE_NO_KEY); 
> +      fires bug#27077
> +      explanation: file->reset() performs this duty
> +      ultimately. Still todo: fix
> +    */
> +  }
> +  if ((local_error= m_table->file->ha_end_bulk_insert()))
> +  {
> +    m_table->file->print_error(local_error, MYF(0));
>    }
>  
> -  return res;
> +  m_rows_lookup_algorithm= ROW_LOOKUP_UNDEFINED;
> +
> +  return error? error : local_error;
>  }
>  
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +
> +/*
> +  Check if there are more UNIQUE keys after the given key.
> +*/
> +static int
> +last_uniq_key(TABLE *table, uint keyno)
> +{
> +  while (++keyno < table->s->keys)
> +    if (table->key_info[keyno].flags & HA_NOSAME)
> +      return 0;
> +  return 1;
> +}
>  
>  /**
> -  Locate the current row in event's table.
> +   Check if an error is a duplicate key error.
>  
> -  The current row is pointed by @c m_curr_row. Member @c m_width tells how many 
> -  columns are there in the row (this can be differnet from the number of columns 
> -  in the table). It is assumed that event's table is already open and pointed 
> -  by @c m_table.
> -
> -  If a corresponding record is found in the table it is stored in 
> -  @c m_table->record[0]. Note that when record is located based on a primary 
> -  key, it is possible that the record found differs from the row being located.
> -
> -  If no key is specified or table does not have keys, a table scan is used to 
> -  find the row. In that case the row should be complete and contain values for
> -  all columns. However, it can still be shorter than the table, i.e. the table 
> -  can contain extra columns not present in the row. It is also possible that 
> -  the table has fewer columns than the row being located. 
> +   This function is used to check if an error code is one of the
> +   duplicate key error, i.e., and error code for which it is sensible
> +   to do a <code>get_dup_key()</code> to retrieve the duplicate key.
>  
> -  @returns Error code on failure, 0 on success. 
> -  
> -  @post In case of success @c m_table->record[0] contains the record found. 
> -  Also, the internal "cursor" of the table is positioned at the record found.
> +   @param errcode The error code to check.
>  
> -  @note If the engine allows random access of the records, a combination of
> -  @c position() and @c rnd_pos() will be used. 
> +   @return <code>true</code> if the error code is such that
> +   <code>get_dup_key()</code> will return true,
> <code>false</code>
> +   otherwise.
>   */
> -
> -
> -int Rows_log_event::find_row(const Relay_log_info *rli)
> +bool
> +is_duplicate_key_error(int errcode)
>  {
> -  DBUG_ENTER("Rows_log_event::find_row");
> -
> -  DBUG_ASSERT(m_table && m_table->in_use != NULL);
> -
> -  TABLE *table= m_table;
> -  int error= 0;
> -  KEY *keyinfo;
> -  uint key;
> +  switch (errcode)
> +  {
> +  case HA_ERR_FOUND_DUPP_KEY:
> +  case HA_ERR_FOUND_DUPP_UNIQUE:
> +    return true;
> +  }
> +  return false;
> +}
>  
> -  /*
> -    rpl_row_tabledefs.test specifies that
> -    if the extra field on the slave does not have a default value
> -    and this is okay with Delete or Update events.
> -    Todo: fix wl3228 hld that requires defauls for all types of events
> -  */
> -  
> -  prepare_record(table, &m_cols, FALSE);
> -  error= unpack_current_row(rli, &m_cols);
> +/**
> +  Write the current row into event's table.
>  
> -  // Temporary fix to find out why it fails [/Matz]
> -  memcpy(m_table->read_set->bitmap, m_cols.bitmap,
> (m_table->read_set->n_bits + 7) / 8);
> +  The row is located in the row buffer, pointed by @c m_curr_row member.
> +  Number of columns of the row is stored in @c m_width member (it can be 
> +  different from the number of columns in the table to which we insert). 
> +  Bitmap @c m_cols indicates which columns are present in the row. It is assumed 
> +  that event's table is already open and pointed by @c m_table.
>  
> -  if (!is_any_column_signaled_for_table(table, &m_cols))
> -  {
> -    error= HA_ERR_END_OF_FILE;
> -    goto err;
> -  }
> +  If the same record already exists in the table it can be either overwritten 
> +  or an error is reported depending on the value of @c overwrite flag 
> +  (error reporting not yet implemented). Note that the matching record can be
> +  different from the row we insert if we use primary keys to identify records in
> +  the table.
>  
> -#ifndef DBUG_OFF
> -  DBUG_PRINT("info",("looking for the following record"));
> -  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> -#endif
> +  The row to be inserted can contain values only for selected columns. The 
> +  missing columns are filled with default values using @c prepare_record() 
> +  function. If a matching record is found in the table and @c overwritte is
> +  true, the missing columns are taken from it.
>  
> -  if ((key= search_key_in_table(table, &m_cols, PRI_KEY_FLAG)) >= MAX_KEY)
> -    /* we dont have a PK, or PK is not usable with BI values */
> -    goto INDEX_SCAN;
> +  @param  rli   Relay log info (needed for row unpacking).
> +  @param  overwrite  
> +                Shall we overwrite if the row already exists or signal 
> +                error (currently ignored).
>  
> -  if ((table->file->ha_table_flags() &
> HA_PRIMARY_KEY_REQUIRED_FOR_POSITION))
> -  {
> -    /*
> -      Use a more efficient method to fetch the record given by
> -      table->record[0] if the engine allows it.  We first compute a
> -      row reference using the position() member function (it will be
> -      stored in table->file->ref) and the use rnd_pos() to position
> -      the "cursor" (i.e., record[0] in this case) at the correct row.
> +  @returns Error code on failure, 0 on success.
>  
> -      TODO: Add a check that the correct record has been fetched by
> -      comparing with the original record. Take into account that the
> -      record on the master and slave can be of different
> -      length. Something along these lines should work:
> +  This method, if successful, sets @c m_curr_row_end pointer to point at the
> +  next row in the rows buffer. This is done when unpacking the row to be 
> +  inserted.
>  
> -      ADD>>>  store_record(table,record[1]);
> -              int error= table->file->rnd_pos(table->record[0],
> table->file->ref);
> -      ADD>>>  DBUG_ASSERT(memcmp(table->record[1], table->record[0],
> -                                 table->s->reclength) == 0);
> +  @note If a matching record is found, it is either updated using 
> +  @c ha_update_row() or first deleted and then new record written.
> +*/ 
>  
> -    */
> -    DBUG_PRINT("info",("locating record using primary key (position)"));
> -    int error;
> -    if (table->file->inited && (error=
> table->file->ha_index_end()))
> -      DBUG_RETURN(error);
> -    if ((error= table->file->ha_rnd_init(FALSE)))
> -      DBUG_RETURN(error);
> +int
> +Write_rows_log_event::write_row(const Relay_log_info *const rli,
> +                                const bool overwrite)
> +{
> +  DBUG_ENTER("write_row");
> +  DBUG_ASSERT(m_table != NULL && thd != NULL);
>  
> -    error= table->file->rnd_pos_by_record(table->record[0]);
> +  TABLE *table= m_table;  // pointer to event's table
> +  int error;
> +  int UNINIT_VAR(keynum);
> +  auto_afree_ptr<char> key(NULL);
>  
> -    table->file->ha_rnd_end();
> -    if (error)
> -    {
> -      DBUG_PRINT("info",("rnd_pos returns error %d",error));
> -      if (error == HA_ERR_RECORD_DELETED)
> -        error= HA_ERR_KEY_NOT_FOUND;
> -      table->file->print_error(error, MYF(0));
> -    }
> +  /* fill table->record[0] with default values */
> +  bool abort_on_warnings= (rli->info_thd->variables.sql_mode &
> +                           (MODE_STRICT_TRANS_TABLES | MODE_STRICT_ALL_TABLES));
> +  if ((error= prepare_record(table, &m_cols,
> +                             table->file->ht->db_type !=
> DB_TYPE_NDBCLUSTER,
> +                             abort_on_warnings, m_curr_row == m_rows_buf)))
>      DBUG_RETURN(error);
> -  }
> -
> -  // We can't use position() - try other methods.
>    
> -INDEX_SCAN:
> -
> -  /*
> -    Save copy of the record in table->record[1]. It might be needed 
> -    later if linear search is used to find exact match.
> -   */ 
> -  store_record(table,record[1]);    
> +  /* unpack row into table->record[0] */
> +  if ((error= unpack_current_row(rli, &m_cols, abort_on_warnings)))
> +    DBUG_RETURN(error);
>  
> -  if ((key= search_key_in_table(table, &m_cols, 
> -                                (PRI_KEY_FLAG | UNIQUE_KEY_FLAG |
> MULTIPLE_KEY_FLAG))) 
> -       >= MAX_KEY)
> -    /* we dont have a key, or no key is suitable for the BI values */
> -    goto TABLE_SCAN; 
> +  // Temporary fix to find out why it fails [/Matz]
> +  memcpy(m_table->write_set->bitmap, m_cols.bitmap,
> (m_table->write_set->n_bits + 7) / 8);
>  
> +  if (m_curr_row == m_rows_buf)
>    {
> -    keyinfo= table->key_info + key;
> -
> -
> -    DBUG_PRINT("info",("locating record using primary key (index_read)"));
> -
> -    /* The key'th key is active and usable: search the table using the index */
> -    if (!table->file->inited && (error=
> table->file->ha_index_init(key, FALSE)))
> -    {
> -      DBUG_PRINT("info",("ha_index_init returns error %d",error));
> -      table->file->print_error(error, MYF(0));
> -      goto err;
> -    }
> +    /* this is the first row to be inserted, we estimate the rows with
> +       the size of the first row and use that value to initialize
> +       storage engine for bulk insertion */
> +    DBUG_ASSERT(!(m_curr_row > m_curr_row_end));
> +    ulong estimated_rows= 0;
> +    if (m_curr_row < m_curr_row_end)
> +      estimated_rows= (m_rows_end - m_curr_row) / (m_curr_row_end - m_curr_row);
> +    else if (m_curr_row == m_curr_row_end)
> +      estimated_rows= 1;
>  
> -    /* Fill key data for the row */
> +    m_table->file->ha_start_bulk_insert(estimated_rows);
> +  }
> +  
> +  
> +#ifndef DBUG_OFF
> +  DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> +  DBUG_PRINT_BITSET("debug", "write_set = %s", table->write_set);
> +  DBUG_PRINT_BITSET("debug", "read_set = %s", table->read_set);
> +#endif
>  
> -    DBUG_ASSERT(m_key);
> -    key_copy(m_key, table->record[0], keyinfo, 0);
> +  /* 
> +    Try to write record. If a corresponding record already exists in the table,
> +    we try to change it using ha_update_row() if possible. Otherwise we delete
> +    it and repeat the whole process again. 
>  
> -    /*
> -      Don't print debug messages when running valgrind since they can
> -      trigger false warnings.
> -     */
> -#ifndef HAVE_purify
> -    DBUG_DUMP("key data", m_key, keyinfo->key_length);
> -#endif
> +    TODO: Add safety measures against infinite looping. 
> +   */
>  
> -    /*
> -      We need to set the null bytes to ensure that the filler bit are
> -      all set when returning.  There are storage engines that just set
> -      the necessary bits on the bytes and don't set the filler bits
> -      correctly.
> -    */
> -    if (table->s->null_bytes > 0)
> -      table->record[0][table->s->null_bytes - 1]|=
> -        256U - (1U << table->s->last_null_bit_pos);
> +  m_table->mark_columns_per_binlog_row_image();
>  
> -    if ((error= table->file->ha_index_read_map(table->record[0], m_key,
> -                                               HA_WHOLE_KEY,
> -                                               HA_READ_KEY_EXACT)))
> +  while ((error= table->file->ha_write_row(table->record[0])))
> +  {
> +    if (error == HA_ERR_LOCK_DEADLOCK ||
> +        error == HA_ERR_LOCK_WAIT_TIMEOUT ||
> +        (keynum= table->file->get_dup_key(error)) < 0 ||
> +        !overwrite)
>      {
> -      DBUG_PRINT("info",("no record matching the key found in the table"));
> -      if (error == HA_ERR_RECORD_DELETED)
> -        error= HA_ERR_KEY_NOT_FOUND;
> +      DBUG_PRINT("info",("get_dup_key returns %d)", keynum));
> +      /*
> +        Deadlock, waiting for lock or just an error from the handler
> +        such as HA_ERR_FOUND_DUPP_KEY when overwrite is false.
> +        Retrieval of the duplicate key number may fail
> +        - either because the error was not "duplicate key" error
> +        - or because the information which key is not available
> +      */
>        table->file->print_error(error, MYF(0));
> -      table->file->ha_index_end();
> -      goto err;
> -    }
> -
> -  /*
> -    Don't print debug messages when running valgrind since they can
> -    trigger false warnings.
> -   */
> -#ifndef HAVE_purify
> -    DBUG_PRINT("info",("found first matching record")); 
> -    DBUG_DUMP("record[0]", table->record[0], table->s->reclength);
> -#endif
> -    /*
> -      Below is a minor "optimization".  If the key (i.e., key number
> -      0) has the HA_NOSAME flag set, we know that we have found the
> -      correct record (since there can be no duplicates); otherwise, we
> -      have to compare the record with the one found to see if it is
> -      the correct one.
> +      goto error;
> +    }
> +    /*
> +       We need to retrieve the old row into record[1] to be able to
> +       either update or delete the offending record.  We either:
>  
> -      CAVEAT! This behaviour is essential for the replication of,
> -      e.g., the mysql.proc table since the correct record *shall* be
> -      found using the primary key *only*.  There shall be no
> -      comparison of non-PK columns to decide if the correct record is
> -      found.  I can see no scenario where it would be incorrect to
> -      chose the row to change only using a PK or an UNNI.
> -    */
> -    if (keyinfo->flags & HA_NOSAME || key == table->s->primary_key)
> +       - use ha_rnd_pos() with a row-id (available as dupp_row) to the
> +         offending row, if that is possible (MyISAM and Blackhole), or else
> +
> +       - use ha_index_read_idx_map() with the key that is duplicated, to
> +         retrieve the offending row.
> +     */
> +    if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
>      {
> -      /* Unique does not have non nullable part */
> -      if (!(table->key_info->flags & (HA_NULL_PART_KEY)))
> -      {
> -        table->file->ha_index_end();
> -        goto ok;
> -      }
> -      else
> -      {
> -        KEY *keyinfo= table->key_info;
> -        /*
> -          Unique has nullable part. We need to check if there is any field in the
> -          BI image that is null and part of UNNI.
> -        */
> -        bool null_found= FALSE;
> -        for (uint i=0; i < keyinfo->key_parts && !null_found; i++)
> -        {
> -          uint fieldnr= keyinfo->key_part[i].fieldnr - 1;
> -          Field **f= table->field+fieldnr;
> -          null_found= (*f)->is_null();
> -        }
> +      DBUG_PRINT("info",("Locating offending record using ha_rnd_pos()"));
>  
> -        if (!null_found)
> -        {
> -          table->file->ha_index_end();
> -          goto ok;
> -        }
> +      if (table->file->inited && (error=
> table->file->ha_index_end()))
> +        DBUG_RETURN(error);
> +      if ((error= table->file->ha_rnd_init(FALSE)))
> +        DBUG_RETURN(error);
>  
> -        /* else fall through to index scan */
> +      error= table->file->ha_rnd_pos(table->record[1],
> table->file->dup_ref);
> +
> +      table->file->ha_rnd_end();
> +      if (error)
> +      {
> +        DBUG_PRINT("info",("ha_rnd_pos() returns error %d",error));
> +        if (error == HA_ERR_RECORD_DELETED)
> +          error= HA_ERR_KEY_NOT_FOUND;
> +        table->file->print_error(error, MYF(0));
> +        goto error;
>        }
>      }
> -
> -    /*
> -      In case key is not unique, we still have to iterate over records found
> -      and find the one which is identical to the row given. A copy of the 
> -      record we are looking for is stored in record[1].
> -     */ 
> -    DBUG_PRINT("info",("non-unique index, scanning it to find matching record")); 
> -
> -    while (record_compare(table, &m_cols))
> +    else
>      {
> -      /*
> -        We need to set the null bytes to ensure that the filler bit
> -        are all set when returning.  There are storage engines that
> -        just set the necessary bits on the bytes and don't set the
> -        filler bits correctly.
> +      DBUG_PRINT("info",("Locating offending record using index_read_idx()"));
>  
> -        TODO[record format ndb]: Remove this code once NDB returns the
> -        correct record format.
> -      */
> -      if (table->s->null_bytes > 0)
> +      if (table->file->extra(HA_EXTRA_FLUSH_CACHE))
>        {
> -        table->record[0][table->s->null_bytes - 1]|=
> -          256U - (1U << table->s->last_null_bit_pos);
> +        DBUG_PRINT("info",("Error when setting HA_EXTRA_FLUSH_CACHE"));
> +        error= my_errno;
> +        goto error;
>        }
>  
> -      while ((error= table->file->ha_index_next(table->record[0])))
> +      if (key.get() == NULL)
>        {
> -        /* We just skip records that has already been deleted */
> +       
> key.assign(static_cast<char*>(my_alloca(table->s->max_unique_length)));
> +        if (key.get() == NULL)
> +        {
> +          DBUG_PRINT("info",("Can't allocate key buffer"));
> +          error= ENOMEM;
> +          goto error;
> +        }
> +      }
> +
> +      key_copy((uchar*)key.get(), table->record[0], table->key_info + keynum,
> +               0);
> +      error= table->file->ha_index_read_idx_map(table->record[1], keynum,
> +                                                (const uchar*)key.get(),
> +                                                HA_WHOLE_KEY,
> +                                                HA_READ_KEY_EXACT);
> +      if (error)
> +      {
> +        DBUG_PRINT("info",("ha_index_read_idx_map() returns %s", HA_ERR(error)));
>          if (error == HA_ERR_RECORD_DELETED)
> -          continue;
> -        DBUG_PRINT("info",("no record matching the given row found"));
> +          error= HA_ERR_KEY_NOT_FOUND;
>          table->file->print_error(error, MYF(0));
> -        table->file->ha_index_end();
> -        goto err;
> +        goto error;
>        }
>      }
>  
>      /*
> -      Have to restart the scan to be able to fetch the next row.
> -    */
> -    table->file->ha_index_end();
> -    goto ok;
> -  }
> +       Now, record[1] should contain the offending row.  That
> +       will enable us to update it or, alternatively, delete it (so
> +       that we can insert the new row afterwards).
> +     */
>  
> -TABLE_SCAN:
> +    /*
> +      If row is incomplete we will use the record found to fill 
> +      missing columns.  
> +    */
> +    if (!get_flags(COMPLETE_ROWS_F))
> +    {
> +      restore_record(table,record[1]);
> +      error= unpack_current_row(rli, &m_cols);
> +    }
>  
> -  /* All that we can do now is rely on a table scan */
> -  {
> -    DBUG_PRINT("info",("locating record using table scan (ha_rnd_next)"));
> +#ifndef DBUG_OFF
> +    DBUG_PRINT("debug",("preparing for update: before and after image"));
> +    DBUG_DUMP("record[1] (before)", table->record[1],
> table->s->reclength);
> +    DBUG_DUMP("record[0] (after)", table->record[0], table->s->reclength);
> +#endif
>  
> -    int restart_count= 0; // Number of times scanning has restarted from top
> +    /*
> +       REPLACE is defined as either INSERT or DELETE + INSERT.  If
> +       possible, we can replace it with an UPDATE, but that will not
> +       work on InnoDB if FOREIGN KEY checks are necessary.
>  
> -    /* We don't have a key: search the table using ha_rnd_next() */
> -    if ((error= table->file->ha_rnd_init(1)))
> -    {
> -      DBUG_PRINT("info",("error initializing table scan"
> -                         " (ha_rnd_init returns %d)",error));
> -      table->file->print_error(error, MYF(0));
> -      goto err;
> -    }
> +       I (Matz) am not sure of the reason for the last_uniq_key()
> +       check as, but I'm guessing that it's something along the
> +       following lines.
>  
> -    /* Continue until we find the right record or have made a full loop */
> -    do
> +       Suppose that we got the duplicate key to be a key that is not
> +       the last unique key for the table and we perform an update:
> +       then there might be another key for which the unique check will
> +       fail, so we're better off just deleting the row and inserting
> +       the correct row.
> +     */
> +    if (last_uniq_key(table, keynum) &&
> +        !table->file->referenced_by_foreign_key())
>      {
> -  restart_ha_rnd_next:
> -      error= table->file->ha_rnd_next(table->record[0]);
> -
> -      DBUG_PRINT("info", ("error: %s", HA_ERR(error)));
> +      DBUG_PRINT("info",("Updating row using ha_update_row()"));
> +      error=table->file->ha_update_row(table->record[1],
> +                                       table->record[0]);
>        switch (error) {
> -
> +                
> +      case HA_ERR_RECORD_IS_THE_SAME:
> +        DBUG_PRINT("info",("ignoring HA_ERR_RECORD_IS_THE_SAME error from"
> +                           " ha_update_row()"));
> +        error= 0;
> +      
>        case 0:
>          break;
>  
> -      /*
> -        If the record was deleted, we pick the next one without doing
> -        any comparisons.
> -      */
> -      case HA_ERR_RECORD_DELETED:
> -        goto restart_ha_rnd_next;
> -
> -      case HA_ERR_END_OF_FILE:
> -        if (++restart_count < 2)
> -          table->file->ha_rnd_init(1);
> -        break;
> -
>        default:
> -        DBUG_PRINT("info", ("Failed to get next record"
> -                            " (ha_rnd_next returns %d)",error));
> +        DBUG_PRINT("info",("ha_update_row() returns error %d",error));
>          table->file->print_error(error, MYF(0));
> -        table->file->ha_rnd_end();
> -        goto err;
>        }
> -    }
> -    while (restart_count < 2 && record_compare(table, &m_cols));
> -    
> -    /* 
> -      Note: above record_compare will take into accout all record fields 
> -      which might be incorrect in case a partial row was given in the event
> -     */
>  
> -    /*
> -      Have to restart the scan to be able to fetch the next row.
> -    */
> -    if (restart_count == 2)
> -      DBUG_PRINT("info", ("Record not found"));
> +      goto error;
> +    }
>      else
> -      DBUG_DUMP("record found", table->record[0], table->s->reclength);
> -    table->file->ha_rnd_end();
> -
> -    DBUG_ASSERT(error == HA_ERR_END_OF_FILE || error == 0);
> -    goto err;
> +    {
> +      DBUG_PRINT("info",("Deleting offending row and trying to write new one
> again"));
> +      if ((error= table->file->ha_delete_row(table->record[1])))
> +      {
> +        DBUG_PRINT("info",("ha_delete_row() returns error %d",error));
> +        table->file->print_error(error, MYF(0));
> +        goto error;
> +      }
> +      /* Will retry ha_write_row() with the offending row removed. */
> +    }
>    }
> -ok:
> -  table->default_column_bitmaps();
> -  DBUG_RETURN(0);
>  
> -err:
> -  table->default_column_bitmaps();
> +error:
> +  m_table->default_column_bitmaps();
>    DBUG_RETURN(error);
>  }
>  
>  #endif
>  
> +int
> +Write_rows_log_event::do_exec_row(const Relay_log_info *const rli)
> +{
> +  DBUG_ASSERT(m_table != NULL);
> +  int error= write_row(rli, slave_exec_mode == SLAVE_EXEC_MODE_IDEMPOTENT);
> +
> +  if (error && !thd->is_error())
> +  {
> +    DBUG_ASSERT(0);
> +    my_error(ER_UNKNOWN_ERROR, MYF(0));
> +  }
> +
> +  return error;
> +}
> +
> +#endif /* !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION) */
> +
> +#ifdef MYSQL_CLIENT
> +void Write_rows_log_event::print(FILE *file, PRINT_EVENT_INFO* print_event_info)
> +{
> +  Rows_log_event::print_helper(file, print_event_info, "Write_rows");
> +}
> +#endif
> +
> +/**************************************************************************
> +	Delete_rows_log_event member functions
> +**************************************************************************/
> +
> +#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
> +
> +#endif
> +
>  /*
>    Constructor used to build an event for writing to the binary log.
>   */
> @@ -9753,6 +10075,11 @@ Delete_rows_log_event::do_before_row_ope
>        return HA_ERR_OUT_OF_MEM;
>    }
>  
> +  /* will we be using a hash to lookup rows? If so, initialize it. */
> +  m_rows_lookup_algorithm= decide_row_lookup_algorithm(m_table, &m_cols,
> get_type_code());
> +  if (m_rows_lookup_algorithm == ROW_LOOKUP_HASH_SCAN)
> +    m_hash.init();
> +
>    return 0;
>  }
>  
> @@ -9765,6 +10092,11 @@ Delete_rows_log_event::do_after_row_oper
>    my_free(m_key);
>    m_key= NULL;
>  
> +  /* we don't need the hash anymore, free it */
> +  if (m_rows_lookup_algorithm == ROW_LOOKUP_HASH_SCAN)
> +    m_hash.deinit();
> +  m_rows_lookup_algorithm= ROW_LOOKUP_UNDEFINED;
> +
>    return error;
>  }
>  
> @@ -9773,16 +10105,11 @@ int Delete_rows_log_event::do_exec_row(c
>    int error;
>    DBUG_ASSERT(m_table != NULL);
>  
> -  if (!(error= find_row(rli))) 
> -  { 
> +  /* m_table->record[0] contains the BI */
> +  m_table->mark_columns_per_binlog_row_image();
> +  error= m_table->file->ha_delete_row(m_table->record[0]);
> +  m_table->default_column_bitmaps();
>  
> -    m_table->mark_columns_per_binlog_row_image();
> -    /*
> -      Delete the record found, located in record[0]
> -    */
> -    error= m_table->file->ha_delete_row(m_table->record[0]);
> -    m_table->default_column_bitmaps();
> -  }
>    return error;
>  }
>  
> @@ -9868,6 +10195,10 @@ Update_rows_log_event::do_before_row_ope
>  
>    m_table->timestamp_field_type= TIMESTAMP_NO_AUTO_SET;
>  
> +  /* will we be using a hash to lookup rows? If so, initialize it. */
> +  m_rows_lookup_algorithm= decide_row_lookup_algorithm(m_table, &m_cols,
> get_type_code());
> +  if (m_rows_lookup_algorithm == ROW_LOOKUP_HASH_SCAN)
> +    m_hash.init();
>    return 0;
>  }
>  
> @@ -9880,6 +10211,11 @@ Update_rows_log_event::do_after_row_oper
>    my_free(m_key); // Free for multi_malloc
>    m_key= NULL;
>  
> +  /* we don't need the hash anymore, free it */
> +  if (m_rows_lookup_algorithm == ROW_LOOKUP_HASH_SCAN)  
> +    m_hash.deinit();
> +  m_rows_lookup_algorithm= ROW_LOOKUP_UNDEFINED;
> +
>    return error;
>  }
>  
> @@ -9889,53 +10225,6 @@ Update_rows_log_event::do_exec_row(const
>    DBUG_ASSERT(m_table != NULL);
>    int error= 0;
>  
> -  /**
> -     Check if update contains only values in AI for columns that do 
> -     not exist on the slave. If it does, we can just unpack the rows 
> -     and return (do nothing on the local table).
> -
> -     NOTE: We do the following optimization and check only if there 
> -     are usable values on the AI and disregard the fact that there 
> -     might be usable values in the BI. In practice this means that 
> -     the slave will not go through find_row (since we have nothing
> -     on the record to update, why go looking for it?).
> -
> -     If we wanted find_row to run anyway, we could move this
> -     check after find_row, but then we would have to face the fact
> -     that the slave might stop without finding the proper record 
> -     (because it might have incomplete BI), even though there were
> -     no values in AI.
> -
> -     On the other hand, if AI has usable values but BI has not,
> -     then find_row will return an error (and the error is then
> -     propagated as it was already).
> -   */
> -  if (!is_any_column_signaled_for_table(m_table, &m_cols_ai))
> -  {
> -    /* 
> -      Read and discard images, because:
> -      1. AI does not contain any useful values to replay;
> -      2. BI is irrelevant if there is nothing useful in AI.
> -    */
> -    error = unpack_current_row(rli, &m_cols);
> -    m_curr_row= m_curr_row_end;
> -    error = error | unpack_current_row(rli, &m_cols_ai);
> -
> -    return error;
> -  }
> -
> -  error= find_row(rli); 
> -  if (error)
> -  {
> -    /*
> -      We need to read the second image in the event of error to be
> -      able to skip to the next pair of updates
> -    */
> -    m_curr_row= m_curr_row_end;
> -    unpack_current_row(rli, &m_cols_ai);
> -    return error;
> -  }
> -
>    /*
>      This is the situation after locating BI:
>  
> 
> === modified file 'sql/log_event.h'
> --- a/sql/log_event.h	2010-10-08 14:35:24 +0000
> +++ b/sql/log_event.h	2010-11-22 21:36:08 +0000
> @@ -47,6 +47,7 @@
>  #include "rpl_record.h"
>  #include "rpl_reporting.h"
>  #include "sql_class.h"                          /* THD */
> +#include "rpl_utility.h"                        /* Hash_slave_rows */
>  #endif
>  
>  /* Forward declarations */
> @@ -3528,6 +3529,14 @@ private:
>  class Rows_log_event : public Log_event
>  {
>  public:
> +  enum row_lookup_mode {
> +       ROW_LOOKUP_NOT_NEEDED= 0,
> +       ROW_LOOKUP_INDEX_SCAN= 1,
> +       ROW_LOOKUP_TABLE_SCAN= 2,
> +       ROW_LOOKUP_HASH_SCAN= 3,
> +       ROW_LOOKUP_UNDEFINED= 4,
> +  };
> +
>    /**
>       Enumeration of the errors that can be returned.
>     */
> @@ -3703,6 +3712,19 @@ protected:
>    ulong       m_table_id;	/* Table ID */
>    MY_BITMAP   m_cols;		/* Bitmap denoting columns available */
>    ulong       m_width;          /* The width of the columns bitmap */
> +#ifndef MYSQL_CLIENT
> +  /**
> +     Hash table that will hold the entries for while using HASH_SCAN
> +     algorithm to search and update/delete rows.
> +   */
> +  Hash_slave_rows m_hash;
> +
> +  /**
> +     The algorithm to use while searching for rows using the before
> +     image.
> +  */
> +  uint            m_rows_lookup_algorithm;  
> +#endif
>    /*
>      Bitmap for columns available in the after image, if present. These
>      fields are only available for Update_rows events. Observe that the
> @@ -3732,7 +3754,6 @@ protected:
>    uchar    *m_key;      /* Buffer to keep key value during searches */
>  
>    int find_row(const Relay_log_info *const);
> -  int write_row(const Relay_log_info *const, const bool);
>  
>    // Unpack the current row into m_table->record[0]
>    int unpack_current_row(const Relay_log_info *const rli,
> @@ -3810,6 +3831,55 @@ private:
>        
>    */
>    virtual int do_exec_row(const Relay_log_info *const rli) = 0;
> +
> +  /**
> +    Private member function called while handling idempotent errors.
> +
> +    @param err[IN/OUT] the error to handle. If it is listed as
> +                       idempotent related error, then it is cleared.
> +    @returns true if the slave should stop executing rows.
> +   */
> +  int handle_idempotent_errors(Relay_log_info const *rli, int *err);
> +
> +  /**
> +     Private member function called after updating/deleting a row. It
> +     performs some assertions and more importantly, it updates
> +     m_curr_row so that the next row is processed during the row
> +     execution main loop (@c Rows_log_event::do_apply_event()).
> +
> +     @param err[IN] the current error code.
> +   */
> +  void do_post_row_operations(Relay_log_info const *rli, int err);
> +
> +  /**
> +     Commodity wrapper around do_exec_row(), that deals with resetting
> +     the thd reference in the table.
> +   */
> +  int do_apply_row(Relay_log_info const *rli);
> +
> +  /**
> +     Implementation of the index scan and update algorithm. It uses
> +     PK, UK or regular Key to search for the record to update. When
> +     found it updates it.
> +   */
> +  int do_index_scan_and_update(Relay_log_info const *rli);
> +  
> +  /**
> +     Implementation of the hash_scan and update algorithm. It collects
> +     rows positions in a hashtable until the last row is
> +     unpacked. Then it scans the table to update and when a record in
> +     the table matches the one in the hashtable, the update/delete is
> +     performed.
> +   */
> +  int do_hash_scan_and_update(Relay_log_info const *rli);
> +
> +  /**
> +     Implementation of the legacy table_scan and update algorithm. For
> +     each unpacked row it scans the storage engine table for a
> +     match. When a match is found, the update/delete operations are
> +     performed.
> +   */
> +  int do_table_scan_and_update(Relay_log_info const *rli);
>  #endif /* defined(MYSQL_SERVER) && defined(HAVE_REPLICATION) */
>  
>    friend class Old_rows_log_event;
> @@ -3853,6 +3923,9 @@ public:
>    }
>  #endif
>  
> +protected:
> +  int write_row(const Relay_log_info *const, const bool);
> +
>  private:
>    virtual Log_event_type get_type_code() { return (Log_event_type)TYPE_CODE; }
>  
> 
> === modified file 'sql/mysqld.cc'
> --- a/sql/mysqld.cc	2010-10-08 14:35:24 +0000
> +++ b/sql/mysqld.cc	2010-11-23 00:08:01 +0000
> @@ -460,6 +460,10 @@ ulong slave_trans_retries;
>  uint  slave_net_timeout;
>  ulong slave_exec_mode_options;
>  ulonglong slave_type_conversions_options;
> +ulonglong slave_rows_search_algorithms_options;
> +#ifndef DBUG_OFF
> +uint slave_rows_last_search_algorithm_used;
> +#endif
>  ulong thread_cache_size=0;
>  ulong binlog_cache_size=0;
>  ulonglong  max_binlog_cache_size=0;
> @@ -6276,6 +6280,21 @@ static int show_heartbeat_period(THD *th
>    return 0;
>  }
>  
> +#ifndef DBUG_OFF
> +static int show_slave_rows_last_search_algorithm_used(THD *thd, SHOW_VAR *var, char
> *buff)
> +{
> +  uint res= slave_rows_last_search_algorithm_used;
> +  const char* s= ((res == Rows_log_event::ROW_LOOKUP_TABLE_SCAN) ? "TABLE_SCAN" :
> +                  ((res == Rows_log_event::ROW_LOOKUP_HASH_SCAN) ? "HASH_SCAN" : 
> +                   "INDEX_SCAN"));
> +
> +  var->type= SHOW_CHAR;
> +  var->value= buff;
> +  sprintf(buff, "%s", s);
> +
> +  return 0;
> +}
> +#endif
>  
>  #endif /* HAVE_REPLICATION */
>  
> @@ -6642,6 +6661,9 @@ SHOW_VAR status_vars[]= {
>    {"Slave_retried_transactions",(char*) &show_slave_retried_trans, SHOW_FUNC},
>    {"Slave_heartbeat_period",   (char*) &show_heartbeat_period, SHOW_FUNC},
>    {"Slave_received_heartbeats",(char*) &show_slave_received_heartbeats,
> SHOW_FUNC},
> +#ifndef DBUG_OFF
> +  {"Slave_rows_last_search_algorithm_used",(char*)
> &show_slave_rows_last_search_algorithm_used, SHOW_FUNC},
> +#endif
>    {"Slave_running",            (char*) &show_slave_running,     SHOW_FUNC},
>  #endif
>    {"Slow_launch_threads",      (char*) &slow_launch_threads,    SHOW_LONG},
> 
> === modified file 'sql/mysqld.h'
> --- a/sql/mysqld.h	2010-09-01 13:06:14 +0000
> +++ b/sql/mysqld.h	2010-11-23 00:08:01 +0000
> @@ -107,6 +107,10 @@ extern my_bool opt_safe_show_db, opt_loc
>  extern my_bool opt_slave_compressed_protocol, use_temp_pool;
>  extern ulong slave_exec_mode_options;
>  extern ulonglong slave_type_conversions_options;
> +extern ulonglong slave_rows_search_algorithms_options;
> +#ifndef DBUG_OFF
> +extern uint slave_rows_last_search_algorithm_used;
> +#endif
>  extern my_bool opt_readonly, lower_case_file_system;
>  extern my_bool opt_enable_named_pipe, opt_sync_frm, opt_allow_suspicious_udfs;
>  extern my_bool opt_secure_auth;
> 
> === modified file 'sql/rpl_utility.cc'
> --- a/sql/rpl_utility.cc	2010-07-16 21:00:50 +0000
> +++ b/sql/rpl_utility.cc	2010-11-22 21:10:41 +0000
> @@ -1056,3 +1056,363 @@ table_def::~table_def()
>  #endif
>  }
>  
> +#ifndef MYSQL_CLIENT
> +
> +/**
> +  Utility methods for handling row based operations.
> + */ 
> +
> +/**
> +
> +   Internal structure that acts as a preamble for HASH_ROW_POS_ENTRY
> +   in memory structure. Allocation in make_entry is done as follows:
> +
> +   preamble_ptr= malloc (sizeof(preamble)+sizeof(entry));
> +   entry_ptr= preamble_ptr+1;
> +   
> +   preamble_ptr  -----> |-HASH_ROWS_POS_PREAMBLE--|
> +                        | - key                   |
> +                        | - length                |
> +                        | - hash_value            |
> +                        | - is_search_state_inited|
> +                        | - search_state          |
> +                        |                         |
> +   entry_ptr     -----> |-HASH_ROW_POS_ENTRY------|
> +                        | - bi_start              |
> +                        | - bi_ends               |
> +                        | - ai_start              |
> +                        | - ai_ends               |
> +                        |                         |
> +                        |-------------------------|
> +
> +                     
> +   return entry_ptr;
> +
> +   When iterating over an entry with multiple records, we can just use
> +   pointer arithmetic to retrieve the preamble pointer. This way we
> +   hide from the hash table user the gory details of key bookeeping
> +   for the hash (including collision handling).


See some comments in what follows.

> +
> +*/
> +typedef struct hash_row_pos_preamble
> +{
> +  
> +  /**
> +     The pointer to the hash table key.
> +  */
> +  uchar* key;
> +  
> +  /**
> +     Length of the key.
> +  */
> +  uint length;
> +  
> +  /**
> +     The actual key.
> +  */
> +  my_hash_value_type hash_value;
> +  
> +  /**
> +     The search state used to iterate over multiple entries for a
> +     given key.
> +  */
> +  HASH_SEARCH_STATE search_state;
> +
> +  /**
> +     Wether this search_state is usable or not.
> +   */
> +  bool is_search_state_inited;
> +  
> +} HASH_ROW_POS_PREAMBLE;

ok.

> +
> +
> +static uchar* 
> +hash_slave_rows_get_key(const uchar *record, 
> +                        size_t *length,
> +                        my_bool not_used __attribute__((unused)))
> +{
> +  DBUG_ENTER("get_key");
> +
> +  HASH_ROW_POS_PREAMBLE *preamble=(HASH_ROW_POS_PREAMBLE *) record;
> +  *length= preamble->length;
> +
> +  DBUG_RETURN((uchar*) preamble->key);
> +}
> +
> +static void 
> +hash_slave_rows_free_entry(HASH_ROW_POS_PREAMBLE *preamble)
> +{
> +  DBUG_ENTER("free_entry");
> +  if (preamble)
> +    my_free(preamble);
> +  DBUG_VOID_RETURN;
> +}

ok.


> +
> +bool Hash_slave_rows::is_empty(void)
> +{
> +  return (m_hash.records == 0);
> +}
> +
> +/**
> +   Hashing commodity structures and functions.
> + */ 
> +
> +bool Hash_slave_rows::init(void)
> +{
> +  my_hash_init(&m_hash,
> +               &my_charset_bin,                /* the charater set information
> */
> +               16 /* TODO */,                  /* growth size */
> +               0,                              /* key offset */
> +               0,                              /* key length */
> +               hash_slave_rows_get_key,                        /* get function
> pointer */
> +               (my_hash_free_key) hash_slave_rows_free_entry,  /* freefunction
> pointer */
> +               MYF(0));                        /* flags */
> +
> +  return 0;
> +}


Please, check the return of my_hash_init().

> +
> +bool Hash_slave_rows::deinit(void)
> +{
> +  if (my_hash_inited(&m_hash))
> +    my_hash_free(&m_hash);
> +
> +  return 0;
> +}

ok.

> +
> +int Hash_slave_rows::size()
> +{
> +  return m_hash.records;
> +}

ok.

> +
> +HASH_ROW_POS_ENTRY* Hash_slave_rows::make_entry(const uchar* bi_start, const uchar*
> bi_ends,
> +                                                const uchar* ai_start, const uchar*
> ai_ends)
> +{
> +  DBUG_ENTER("Hash_slave_rows::make_entry");
> +  
> +  size_t size= sizeof(struct hash_row_pos_preamble) + 
> +               sizeof(struct hash_row_pos_entry);
> +  
> +  HASH_ROW_POS_PREAMBLE *preamble= 
> +    (HASH_ROW_POS_PREAMBLE*) my_malloc(size, MYF(0));
> +
> +  if (!preamble)
> +    DBUG_RETURN(NULL);
> +
> +  HASH_ROW_POS_ENTRY* entry= (HASH_ROW_POS_ENTRY*) (preamble+1);

Is there any possibility of an alignment problem here?

> +  
> +  /**
> +     Filling in the preamble.
> +   */
> +  preamble->key= (uchar*)&preamble->hash_value;
> +  preamble->length= sizeof(my_hash_value_type);
> +  preamble->search_state= -1;
> +  preamble->hash_value= -1;
> +  preamble->is_search_state_inited= false;

I think you can improve this by casting &preamble->hash_value whenever
necessary and thus avoind preamble->key.

> +    
> +  /**
> +     Filling in the values.
> +   */
> +  entry->bi_start= (const uchar *) bi_start;
> +  entry->bi_ends= (const uchar *) bi_ends;
> +  entry->ai_start= (const uchar *) ai_start;
> +  entry->ai_ends= (const uchar *) ai_ends;
> +
> +  DBUG_PRINT("debug", ("Added record to hash with key=%u",
> preamble->hash_value));
> +  
> +  /**
> +     Return the pointer to the entry. The caller should not
> +     be exposed to the internal preamble.
> +   */
> +  DBUG_RETURN(entry);
> +}
> +
> +bool 
> +Hash_slave_rows::put(TABLE *table, 
> +                     MY_BITMAP *cols,
> +                     HASH_ROW_POS_ENTRY* entry)
> +{
> +  DBUG_ENTER("Hash_slave_rows::put");
> +
> +  HASH_ROW_POS_PREAMBLE* preamble= ((HASH_ROW_POS_PREAMBLE*)entry)-1;
> +
> +  /**
> +     Skip blobs from key calculation.
> +     Handle X bits.
> +     Handle nulled fields.
> +     Handled fields not signaled.
> +  */  
> +  make_hash_key(table, cols, &preamble->hash_value);
> +  my_hash_insert(&m_hash, (uchar *) preamble);
> +  DBUG_PRINT("debug", ("Added record to hash with key=%u",
> preamble->hash_value));
> +  DBUG_RETURN(false);
> +}
> +
> +bool
> +Hash_slave_rows::get(TABLE *table,
> +                     MY_BITMAP *cols,
> +                     HASH_ROW_POS_ENTRY** entry)
> +{
> +  DBUG_ENTER("Hash_slave_rows::get");
> +  HASH_SEARCH_STATE state;
> +  HASH_ROW_POS_PREAMBLE* preamble;
> +  my_hash_value_type key;
> +          
> +  make_hash_key(table, cols, &key);
> +
> +  DBUG_PRINT("debug", ("Looking for record with key=%u in the hash.", key));
> +
> +  preamble= (HASH_ROW_POS_PREAMBLE*) my_hash_first(&m_hash, 
> +                                                   (const uchar*) &key, 
> +                                                   sizeof(my_hash_value_type), 
> +                                                   &state);
> +  if (preamble)
> +  {
> +    DBUG_PRINT("debug", ("Found record with key=%u in the hash.", key));
> +
> +    /**
> +       Save the search state in case we need to go through entries for
> +       the given key.
> +    */
> +    preamble->search_state= state;
> +    preamble->is_search_state_inited= true;
> +    
> +    *entry= (HASH_ROW_POS_ENTRY*) (preamble+1);
> +  }
> +  else
> +    *entry= NULL;
> +
> +  DBUG_RETURN(false);
> +}
> +
> +bool Hash_slave_rows::next(HASH_ROW_POS_ENTRY** entry)
> +{
> +  DBUG_ENTER("Hash_slave_rows::next");
> +
> +  if (*entry)
> +  {
> +    HASH_ROW_POS_PREAMBLE* preamble= 
> +      ((HASH_ROW_POS_PREAMBLE*) *entry) - 1;
> +
> +    if (preamble->is_search_state_inited)
> +    {
> +      my_hash_value_type key= preamble->hash_value;
> +      HASH_SEARCH_STATE state= preamble->search_state;
> +      preamble->search_state= -1;
> +      preamble->is_search_state_inited= false;
> +
> +      DBUG_PRINT("debug", ("Looking for record with key=%u in the hash (next).",
> key));
> +      
> +      preamble= (HASH_ROW_POS_PREAMBLE*) my_hash_next(&m_hash, 
> +                                                      (const uchar*) &key, 
> +                                                      sizeof(my_hash_value_type),
> +                                                      &state);
> +      if (preamble)
> +      {
> +        DBUG_PRINT("debug", ("Found record with key=%u in the hash (next).", key));
> +        preamble->search_state= state;
> +        preamble->is_search_state_inited= true;
> +        *entry= (HASH_ROW_POS_ENTRY*) (preamble+1);
> +      }
> +      else
> +        *entry= NULL;
> +    }
> +    else
> +      DBUG_RETURN(true);
> +  }
> +  else
> +    DBUG_RETURN(true);
> +
> +  DBUG_RETURN(false);
> +}

I also think you should have an assertion here specifying that
entry should not be NULL and remove the "if".

Keep the "if" if you want to.


Please, replace -1 by NO_RECORD in preamble->search_state= -1.
There are other occurrences.


> +
> +bool
> +Hash_slave_rows::del(HASH_ROW_POS_ENTRY* entry)
> +{
> +  DBUG_ENTER("Hash_slave_rows::del");
> +  if (entry)
> +  {
> +    HASH_ROW_POS_PREAMBLE* preamble= 
> +      ((HASH_ROW_POS_PREAMBLE*)entry)-1;
> +    my_hash_delete(&m_hash, (uchar *) preamble);
> +  }
> +  else
> +    DBUG_RETURN(true);
> +  DBUG_RETURN(false);
> +}


Please, check the return of my_hash_delete().
I also think you should have an assertion here specifying that
entry should not be NULL and remove the else.

Keep the else if you want to.

> +
> +bool
> +Hash_slave_rows::make_hash_key(TABLE *table, 
> +                               MY_BITMAP *cols, 
> +                               my_hash_value_type *key)
> +{ 
> +  DBUG_ENTER("Hash_slave_rows::make_hash_key");
> +  ha_checksum crc= 0L;
> +
> +  uchar *record= table->record[0];
> +  uchar saved_x= 0, saved_filler= 0;
> +
> +  if (table->s->null_bytes > 0)
> +  {
> +    /*
> +      If we have an X bit then we need to take care of it.
> +    */
> +    if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
> +    {
> +      saved_x= record[0];
> +      record[0]|= 1U;
> +    }
> +
> +    /*
> +      If (last_null_bit_pos == 0 && null_bytes > 1), then:
> +      
> +      X bit (if any) + N nullable fields + M Field_bit fields = 8 bits
> +      
> +      Ie, the entire byte is used.
> +    */
> +    if (table->s->last_null_bit_pos > 0)
> +    {
> +      saved_filler= record[table->s->null_bytes - 1];
> +      record[table->s->null_bytes - 1]|=
> +        256U - (1U << table->s->last_null_bit_pos);
> +    }
> +  }
> +
> +  /* initialize crc */
> +  crc= my_checksum(crc, table->null_flags, table->s->null_bytes);
> +
> +  for (Field **ptr=table->field ;
> +       *ptr && ((*ptr)->field_index < cols->n_bits);
> +       ptr++)
> +  {
> +    Field *f= (*ptr);
> +
> +    /* field is set in the read_set and is not a blob */
> +    if (bitmap_is_set(cols, f->field_index) && 
> +        (f->type() != MYSQL_TYPE_BLOB))
> +      crc= my_checksum(crc, f->ptr, f->data_length());
> +  }
> +  
> +  /*
> +    Restore the saved bytes.
> +
> +    TODO[record format ndb]: Remove this code once NDB returns the
> +    correct record format.
> +  */
> +  if (table->s->null_bytes > 0)
> +  {
> +    if (!(table->s->db_options_in_use & HA_OPTION_PACK_RECORD))
> +      record[0]= saved_x;
> +
> +    if (table->s->last_null_bit_pos)
> +      record[table->s->null_bytes - 1]= saved_filler;
> +  }
> +
> +  DBUG_PRINT("debug", ("Created key=%u", crc));
> +
> +  DBUG_ASSERT(crc > 0);
> +  *key= crc;
> +  DBUG_RETURN(false);
> +}
> +

Great idea the use of the checksum.


> +
> +#endif
> 
> === modified file 'sql/rpl_utility.h'
> --- a/sql/rpl_utility.h	2010-07-02 18:15:21 +0000
> +++ b/sql/rpl_utility.h	2010-11-22 02:11:25 +0000
> @@ -26,9 +26,159 @@
>  #include "table.h"                              /* TABLE_LIST */
>  #endif
>  #include "mysql_com.h"
> +#include <hash.h>
> +
>  
>  class Relay_log_info;
>  
> +#ifndef MYSQL_CLIENT
> +
> +/**
> +   Hash table used when applying row events on the slave and there is
> +   no index on the slave's table.
> + */
> +
> +typedef struct hash_row_pos_entry
> +{
> +  /** 
> +      Points at the position where the row starts in the
> +      event buffer (ie, area in memory before unpacking takes
> +      place).
> +  */
> +  const uchar *bi_start;
> +  const uchar *bi_ends;
> +
> +  const uchar *ai_start;
> +  const uchar *ai_ends;
> +
> +} HASH_ROW_POS_ENTRY;
> +
> +
> +class Hash_slave_rows 
> +{
> +public:
> +
> +  /**
> +     This member function allocates an entry to be added to the hash
> +     table. It should be called before calling member function add.
> +     
> +     @param bi_start the position to where in the rows buffer the
> +                     before image begins.
> +     @param bi_ends  the position to where in the rows buffer the
> +                     before image ends.
> +     @param ai_start the position to where in the rows buffer the 
> +                     after image starts (if any).
> +     @param ai_ends  the position to where in the rows buffer the
> +                     after image ends (if any).
> +     @returns NULL if a problem occured, a valid pointer otherwise.
> +   */
> +  HASH_ROW_POS_ENTRY* make_entry(const uchar *bi_start, const uchar *bi_ends,
> +                                 const uchar *ai_start, const uchar *ai_ends);
> +
> +  /**
> +     Member function that puts data into the hash table.
> +
> +     @param table   The table holding the buffer used to calculate the
> +                    key, ie, table->record[0].
> +     @param cols    The read_set bitmap signaling which columns are used.
> +     @param entry   The entry with the values to store.
> +
> +     @returns true if something went wrong, false otherwise.
> +   */
> +  bool put(TABLE* table, MY_BITMAP *cols, HASH_ROW_POS_ENTRY* entry);
> +
> +  /**
> +     This member function gets the entry, from the hash table, that
> +     matches the data in table->record[0] and signaled using cols.
> +     
> +     @param table   The table holding the buffer containing data used to
> +                    make the entry lookup.
> +     @param cols    Bitmap signaling which columns, from
> +                    table->record[0], should be used.
> +     @param entry   Pointer that will hold a reference to the entry
> +                    fetched. If the entry is not found, then NULL
> +                    shall be returned.
> +     @returns true if something went wrong, false otherwise.
> +   */
> +  bool get(TABLE *table, MY_BITMAP *cols, HASH_ROW_POS_ENTRY** entry);


You need to update the comments: param entry [OUT].


> +
> +  /**
> +     This member function gets the entry that stands next to the one
> +     pointed to by *entry. Before calling this member function, the
> +     entry that one uses as parameter must have: 1. been obtained
> +     through get() or next() invocations; and 2. must have not been
> +     used before in a next() operation.
> +
> +     @param entry[IN/OUT] contains a pointer to an entry that we can
> +                          use to search for another adjacent entry
> +                          (ie, that shares the same key).
> +
> +     @returns true if something went wrong, false otherwise. In the
> +              case that this entry was already used in a next()
> +              operation this member function returns true and does not
> +              update the pointer.
> +   */
> +  bool next(HASH_ROW_POS_ENTRY** entry);
> +
> +  /**
> +     Deletes the entry pointed by entry. This is the only
> +     safe way to free memory allocated for the structure
> +     pointed to by entry.
> +
> +     @param entry  Pointer to the entry to be deleted.
> +     @returns true if something went wrong, false otherwise.
> +   */
> +  bool del(HASH_ROW_POS_ENTRY* entry);

It is not clear here if the entry is removed from the
hash table and memory deallocated. Please, re-write this comment.

> +
> +  /**
> +     Initializes the hash table.
> +
> +     @returns true if something went wrong, false otherwise.
> +   */
> +  bool init(void);
> +
> +  /**
> +     De-initializes the hash table.
> +
> +     @returns true if something went wrong, false otherwise.
> +   */
> +  bool deinit(void);
> +
> +  /**
> +     Checks if the hash table is empty or not.
> +
> +     @returns true if the hash table has zero entries, false otherwise.
> +   */
> +  bool is_empty(void);
> +
> +  /**
> +     Returns the number of entries in the hash table.
> +
> +     @returns the number of entries in the hash table.
> +   */
> +  int size();
> +  
> +private:
> +
> +  /**
> +     The hashtable itself.
> +   */
> +  HASH m_hash;
> +
> +  /**
> +     Auxiliar and internal method used to create an hash key, based on
> +     the data in table->record[0] buffer and signaled as used in cols.
> +
> +     @param table  The table that is being scanned
> +     @param cols   The read_set bitmap signaling which columns are used.
> +     @param key    Output parameter where the key will be stored.
> +
> +     @retuns true if something went wrong, false otherwise.
> +   */
> +  bool make_hash_key(TABLE *table, MY_BITMAP* cols, my_hash_value_type *key);
> +};
> +
> +#endif
>  
>  /**
>    A table definition from the master.
> @@ -275,3 +425,4 @@ CPP_UNNAMED_NS_END
>    } while (0)
>  
>  #endif /* RPL_UTILITY_H */
> +

There is no need to have "This " in several places.
Please, s/member function//


> 
> === modified file 'sql/sql_class.h'
> --- a/sql/sql_class.h	2010-10-08 16:11:32 +0000
> +++ b/sql/sql_class.h	2010-11-22 21:10:41 +0000
> @@ -65,6 +65,10 @@ enum enum_slave_exec_mode { SLAVE_EXEC_M
>                              SLAVE_EXEC_MODE_LAST_BIT};
>  enum enum_slave_type_conversions { SLAVE_TYPE_CONVERSIONS_ALL_LOSSY,
>                                     SLAVE_TYPE_CONVERSIONS_ALL_NON_LOSSY};
> +enum enum_slave_rows_search_algorithms { SLAVE_ROWS_TABLE_SCAN = (1U << 0),
> +                                         SLAVE_ROWS_INDEX_SCAN = (1U << 1),
> +                                         SLAVE_ROWS_HASH_SCAN  = (1U << 2)};
> +

ok.

>  enum enum_mark_columns
>  { MARK_COLUMNS_NONE, MARK_COLUMNS_READ, MARK_COLUMNS_WRITE};
>  enum enum_filetype { FILETYPE_CSV, FILETYPE_XML };
> 
> === modified file 'sql/sys_vars.cc'
> --- a/sql/sys_vars.cc	2010-10-08 16:11:32 +0000
> +++ b/sql/sys_vars.cc	2010-11-23 00:51:55 +0000
> @@ -1875,6 +1875,51 @@ static Sys_var_set Slave_type_conversion
>         GLOBAL_VAR(slave_type_conversions_options), CMD_LINE(REQUIRED_ARG),
>         slave_type_conversions_name,
>         DEFAULT(0));
> +
> +
> +static bool slave_rows_search_algorithms_check(sys_var *self, THD *thd, set_var
> *var)
> +{
> +  String str, *res;
> +
> +  /**
> +     'DEFAULT' values are not allowed.
> +   */
> +  if(!var->value)
> +    return true;

Why not? You have/define a default value when you create the option and now
you disable its use. This does not make sense.

> +
> +  /**
> +     NULL is not allowed.
> +   */
> +  if (check_not_null(self, thd, var))
> +    return true;
> +

I don't think this case is possible.


> +  /** empty value ('') is not allowed */
> +  res= var->value->val_str(&str);
> +  if (res->is_empty())
> +    return true;

ok.

> +
> +  /** We don't allow only INDEX_SCAN to be set. */
> +  if((res->length()==strlen("INDEX_SCAN")) && 
> +     !strncasecmp(res->c_ptr_safe(), "index_scan", res->length()))
> +    return true;

Why? It is up to the user to decide what he/she wants.
I think you should remove this and update the WL.

> +
> +  return false;
> +}
> +
> +static const char *slave_rows_search_algorithms_names[]= {"TABLE_SCAN",
> "INDEX_SCAN", "HASH_SCAN", 0};
> +static Sys_var_set Slave_rows_search_algorithms(
> +       "slave_rows_search_algorithms", 
> +       "Set of searching algorithms that the slave will use while "
> +       "searching for records from the storage engine to either "
> +       "updated or deleted them. Possible values are: INDEX_SCAN, "
> +       "TABLE_SCAN and HASH_SCAN. Any combination is allowed, and "
> +       "the slave will always pick the most suitable algorithm for "
> +       "any given scenario. "
> +       "(Default: INDEX_SCAN, TABLE_SCAN).",
> +       GLOBAL_VAR(slave_rows_search_algorithms_options), CMD_LINE(REQUIRED_ARG),
> +       slave_rows_search_algorithms_names,
> +       DEFAULT(SLAVE_ROWS_INDEX_SCAN | SLAVE_ROWS_TABLE_SCAN),  NO_MUTEX_GUARD,
> +       NOT_IN_BINLOG, ON_CHECK(slave_rows_search_algorithms_check),
> ON_UPDATE(NULL));
>  #endif
>  
>  
> 
> No bundle (reason: revision is a merge).
> 

Thread
bzr commit into mysql-next-mr branch (luis.soares:3204) WL#5597Luis Soares23 Nov
  • Re: bzr commit into mysql-next-mr branch (luis.soares:3204) WL#5597He Zhenxing4 Jan
    • Re: bzr commit into mysql-next-mr branch (luis.soares:3204) WL#5597Luís Soares11 Jan
      • Re: bzr commit into mysql-next-mr branch (luis.soares:3204) WL#5597He Zhenxing11 Jan
        • Re: bzr commit into mysql-next-mr branch (luis.soares:3204) WL#5597He Zhenxing11 Jan
  • Re: bzr commit into mysql-next-mr branch (luis.soares:3204) WL#5597Alfranio Correia11 Jan