List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:September 7 2011 10:59pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4507 to 4508)
View as plain text  
 4508 Frazer Clement	2011-09-07
      WL5353 Primary Cluster conflict resolution - transactional
      
       - A new --ndb-log-transaction-id option is added to the MySQL Server
       - A new NDB$EPOCH_TRANS() conflict detection algorithm is added
       - Five new status variables for tracking the behaviour of transactional 
         conflict detection.
      
      The NDB$EPOCH_TRANS() function is similar to NDB$EPOCH, except that 
      detected conflicts result in rejection and realignment of not just the
      conflicting rows, but their transactions, and any transactions 
      transitively depending on them.  This gives eventually consistent 
      asynchronous replication with cross-row transaction consistency.
      
       

    added:
      mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result
      mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
      mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt
      mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test
      mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc
      mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
      mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
      mysql-test/suite/rpl/r/rpl_extra_row_data.result
      mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt
      mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt
      mysql-test/suite/rpl/t/rpl_extra_row_data.test
      sql/ndb_conflict_trans.cc
      sql/ndb_conflict_trans.h
      storage/ndb/include/util/HashMap2.hpp
      storage/ndb/include/util/LinkedStack.hpp
      storage/ndb/src/common/util/HashMap2.cpp
      storage/ndb/src/common/util/LinkedStack.cpp
    modified:
      libmysqld/Makefile.am
      mysql-test/suite/ndb/r/ndb_basic.result
      sql/Makefile.am
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/log_event.cc
      sql/log_event.h
      sql/ndb_mi.cc
      sql/ndb_mi.h
      sql/rpl_constants.h
      sql/slave.h
      sql/sql_class.cc
      sql/sql_class.h
      storage/ndb/CMakeLists.txt
      storage/ndb/src/common/util/CMakeLists.txt
      storage/ndb/src/common/util/Makefile.am
 4507 jonas oreland	2011-09-07
      ndb - update result file :-(

    modified:
      mysql-test/suite/ndb/r/ndb_basic.result
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2011-07-04 13:37:56 +0000
+++ b/libmysqld/Makefile.am	2011-09-07 22:50:01 +0000
@@ -49,7 +49,7 @@ sqlsources = derror.cc field.cc field_co
 	     ha_ndbcluster.cc ha_ndbcluster_cond.cc \
 	ha_ndbcluster_connection.cc ha_ndbinfo.cc \
 	ha_ndb_index_stat.cc \
-	ha_ndbcluster_binlog.cc ha_partition.cc \
+	ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ha_partition.cc \
 	handler.cc sql_handler.cc \
 	hostname.cc init.cc password.c \
 	item.cc item_buff.cc item_cmpfunc.cc item_create.cc \
@@ -127,6 +127,9 @@ ha_ndb_index_stat.o: ha_ndb_index_stat.c
 ha_ndbinfo.o: ha_ndbinfo.cc
 		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
 
+ndb_conflict_trans.o: ndb_conflict_trans.cc
+		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+
 # Until we can remove dependency on ha_ndbcluster.h
 handler.o:	handler.cc
 		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<

=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2011-09-07 19:45:53 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2011-09-07 22:50:01 +0000
@@ -75,9 +75,15 @@ Ndb_cluster_node_id	#
 Ndb_config_from_host	#
 Ndb_config_from_port	#
 Ndb_conflict_fn_epoch	#
+Ndb_conflict_fn_epoch_trans	#
 Ndb_conflict_fn_max	#
 Ndb_conflict_fn_max_del_win	#
 Ndb_conflict_fn_old	#
+Ndb_conflict_trans_conflict_commit_count	#
+Ndb_conflict_trans_detect_iter_count	#
+Ndb_conflict_trans_reject_count	#
+Ndb_conflict_trans_row_conflict_count	#
+Ndb_conflict_trans_row_reject_count	#
 Ndb_connect_count	#
 Ndb_execute_count	#
 Ndb_index_stat_cache_clean	#
@@ -110,6 +116,7 @@ ndb_log_bin	#
 ndb_log_binlog_index	#
 ndb_log_empty_epochs	#
 ndb_log_orig	#
+ndb_log_transaction_id	#
 ndb_log_update_as_write	#
 ndb_log_updated_only	#
 ndb_mgmd_host	#

=== added file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result	2011-09-07 22:50:01 +0000
@@ -0,0 +1,104 @@
+use test;
+create table t1 (a int primary key, b int) engine=ndb;
+create table t2 (a int primary key, b int) engine=ndb;
+Single row autocommit transactions
+Should have 1 transaction id
+insert into t1 values (1,1);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+insert into t1 values (2,2);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+update t1 set b=20 where a=2;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi row autocommit transaction
+Should have 1 transaction id
+delete from t1;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi row explicit transaction
+Should have 1 transaction id
+begin;
+insert into t1 values (3,3);
+insert into t1 values (4,4);
+insert into t1 values (5,5);
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+begin;
+insert into t1 values (6,6);
+update t1 set b=40 where a=4;
+delete from t1 where a=5;
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi table explicit transaction
+Should have 1 transaction id
+begin;
+insert into t1 values (7,7);
+insert into t2 values (7,7);
+insert into t2 values (8,8);
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multiple autocommit transactions
+Should have 2 transaction ids
+insert into t1 values (8,8);
+insert into t1 values (9,9);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+2
+reset master;
+Multiple autocommit transactions on single row
+Should have 3 transaction ids
+insert into t1 values (10,10);
+update t1 set b=100 where a=10;
+delete from t1 where a=10;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+3
+reset master;
+Multiple explicit transactions
+Should have 2 transaction ids
+begin;
+insert into t1 values (11,11);
+delete from t1;
+commit;
+begin;
+insert into t2 values (11,11);
+delete from t2;
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+2
+reset master;
+drop table t1;
+drop table t2;

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc	2011-09-07 22:50:01 +0000
@@ -0,0 +1,30 @@
+#
+# Get the mysqlbinlog tool --verbose mode to dump the Binlog contents with
+# extra row data included, e.g.
+#
+#### Extra row data format: 0, len: 8 :0x0007200006000000
+#
+# Then process this input to get the number of distinct values, and hence
+# distinct transaction ids in the binlog
+#
+
+--disable_query_log
+let $MYSQLD_DATADIR= `select @@datadir;`;
+--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/mysqld-bin.000001 > $MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql
+
+create table raw_binlog_rows (txt varchar(1000));
+create table diff_extra_data (txt varchar(1000));
+
+--eval load data local infile '$MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql' into table raw_binlog_rows columns terminated by '\n';
+
+insert into diff_extra_data select distinct(txt) from raw_binlog_rows where txt like '### Extra row data %';
+--enable_query_log
+
+--echo Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+
+--disable_query_log
+drop table diff_extra_data;
+drop table raw_binlog_rows;
+--enable_query_log
+

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt	2011-09-07 22:50:01 +0000
@@ -0,0 +1 @@
+--ndb-log-transaction-id

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test	2011-09-07 22:50:01 +0000
@@ -0,0 +1,122 @@
+--source include/have_ndb.inc
+--source include/have_log_bin.inc
+--source include/have_debug.inc
+
+use test;
+
+create table t1 (a int primary key, b int) engine=ndb;
+create table t2 (a int primary key, b int) engine=ndb;
+
+--echo Single row autocommit transactions
+--echo Should have 1 transaction id
+insert into t1 values (1,1);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+insert into t1 values (2,2);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+update t1 set b=20 where a=2;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi row autocommit transaction
+--echo Should have 1 transaction id
+delete from t1;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi row explicit transaction
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (3,3);
+insert into t1 values (4,4);
+insert into t1 values (5,5);
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (6,6);
+update t1 set b=40 where a=4;
+delete from t1 where a=5;
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi table explicit transaction
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (7,7);
+insert into t2 values (7,7);
+insert into t2 values (8,8);
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple autocommit transactions
+--echo Should have 2 transaction ids
+insert into t1 values (8,8);
+insert into t1 values (9,9);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple autocommit transactions on single row
+--echo Should have 3 transaction ids
+insert into t1 values (10,10);
+update t1 set b=100 where a=10;
+delete from t1 where a=10;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple explicit transactions
+--echo Should have 2 transaction ids
+begin;
+insert into t1 values (11,11);
+delete from t1;
+commit;
+
+begin;
+insert into t2 values (11,11);
+delete from t2;
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+
+drop table t1;
+drop table t2;
\ No newline at end of file

=== added file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result	2011-09-07 22:50:01 +0000
@@ -0,0 +1,914 @@
+include/master-slave.inc
+[connection master]
+Setup circular replication
+RESET MASTER;
+select @slave_server_id:=(variable_value+0)
+from information_schema.global_variables
+where variable_name like 'server_id';
+@slave_server_id:=(variable_value+0)
+3
+CHANGE MASTER TO master_host="127.0.0.1",master_port=SLAVE_PORT,master_user="root";
+START SLAVE;
+select @master_server_id:=(variable_value+0)
+from information_schema.global_variables
+where variable_name like 'server_id';
+@master_server_id:=(variable_value+0)
+1
+Setup ndb_replication and t1 exceptions table
+Populate ndb_replication table as necessary
+replace into mysql.ndb_replication values
+("test", "t1", 3, 7, NULL),
+("test", "t1", 1, 7, "NDB$EPOCH_TRANS()");
+replace into mysql.ndb_replication values
+("test", "t2", 3, 7, NULL),
+("test", "t2", 1, 7, "NDB$EPOCH_TRANS()");
+create table test.t1 (
+a int primary key,
+b varchar(2000)) engine=ndb;
+create table test.t2 (
+a int primary key,
+b varchar(2000)) engine=ndb;
+Add some data
+insert into test.t1 values
+(1, "Initial data 1"),
+(2, "Initial data 2"),
+(3, "Initial data 3"),
+(4, "Initial data 4"),
+(5, "Initial data 5"),
+(6, "Initial data 6"),
+(7, "Initial data 7"),
+(8, "Initial data 8"),
+(9, "Initial data 9"),
+(10, "Initial data 10");
+Show basic row-level conflict detection
+---------------------------------------
+stop slave;
+update t1 set b="Primary first change 2" where a=2;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+update t1 set b="Secondary first change 2" where a=2;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Secondary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Primary should have rejected change from Secondary, keeping its value
+select * from t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+1
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should have been realigned to Primary
+select * from t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Show rollback of whole secondary transaction
+--------------------------------------------
+stop slave;
+update t1 set b="Primary second change 4" where a=4;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+begin;
+update t1 set b="Secondary second change 4" where a=4;
+update t1 set b="Secondary second change 5" where a=5;
+commit;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Secondary second change 4
+5	Secondary second change 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Primary should have rejected secondary changes on both rows
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Show rollback of dependent transaction as well
+----------------------------------------------
+stop slave;
+update t1 set b="Primary third change 1" where a=1;
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+begin;
+update t1 set b="Secondary third change 3" where a=3;
+update t1 set b="Secondary third change 1" where a=1;
+commit;
+begin;
+update t1 set b="Secondary fourth change 3" where a=3;
+insert into t1 values (11,"Secondary fourth change 11");
+commit;
+select * from test.t1 order by a;
+a	b
+1	Secondary third change 1
+2	Primary first change 2
+3	Secondary fourth change 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+11	Secondary fourth change 11
+Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+ndb_conflict_trans_row_reject_count
+4
+ndb_conflict_trans_reject_count
+2
+start slave;
+Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Show rollback of dependent transaction across different tables
+--------------------------------------------------------------
+stop slave;
+update t1 set b="Primary fifth change 6" where a=6;
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+begin;
+update t1 set b="Secondary fifth change 6" where a=6;
+insert into t2 values (1, "Secondary fifth change 1");
+insert into t2 values (2, "Secondary fifth change 2");
+commit;
+begin;
+update t2 set b="Secondary sixth change 1" where a=2;
+insert into t2 values (3, "Secondary sixth change 2");
+commit;
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Secondary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+select * from test.t2 order by a;
+a	b
+1	Secondary fifth change 1
+2	Secondary sixth change 1
+3	Secondary sixth change 2
+Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+select * from test.t2 order by a;
+a	b
+ndb_conflict_trans_row_reject_count
+5
+ndb_conflict_trans_reject_count
+2
+start slave;
+Secondary should have been realigned to primary
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+select * from test.t2 order by a;
+a	b
+Show that whole epoch is not rolled back
+----------------------------------------
+stop slave;
+update t1 set b="Primary is king" where a=10;
+begin;
+update t1 set b="Secondary is emperor" where a=10;
+insert into t1 values (11, "Secondary is pleni-potentiary");
+commit;
+begin;
+insert into t1 values (12, "Secondary ruled once");
+insert into t1 values (13, "This history will not be lost");
+insert into t1 values (14, "Look on my works ye mighty, and despair");
+commit;
+Primary should have rejected conflicting trans (emperor, pleni-potentiary)
+but accepted unrelated trans (history)
+select * from t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should be aligned with Primary
+select * from t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+Show that non-conflicting ancestors are not implicated
+------------------------------------------------------
+stop slave;
+update t1 set b="7 : Primary is king" where a=7;
+Primary state
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Primary is king
+8	Initial data 8
+9	Initial data 9
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+begin;
+update t1 set b="8 : Secondary innocent" where a=8;
+update t1 set b="9 : Secondary innocent" where a=9;
+commit;
+Secondary with innocent
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	8 : Secondary innocent
+9	9 : Secondary innocent
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+begin;
+update t1 set b="9 : Secondary guilty" where a=9;
+update t1 set b="7 : Secondary guilty" where a=7;
+commit;
+Secondary with guilty overlaid
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Secondary guilty
+8	8 : Secondary innocent
+9	9 : Secondary guilty
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+Primary cluster should have rejected 'guilty' secondary transaction, but
+accepted 'innocent' secondary transaction.
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Primary is king
+8	8 : Secondary innocent
+9	9 : Secondary innocent
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary cluster should be realigned with Primary
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Primary is king
+8	8 : Secondary innocent
+9	9 : Secondary innocent
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+Classic banking example
+-----------------------
+replace into mysql.ndb_replication values
+("test", "balances", 3, 7, NULL),
+("test", "balances", 1, 7, "NDB$EPOCH_TRANS()");
+replace into mysql.ndb_replication values
+("test", "transactions", 3, 7, NULL),
+("test", "transactions", 1, 7, "NDB$EPOCH_TRANS()");
+create table test.balances
+(name     varchar(100) primary key,
+balance  int) engine=ndb;
+create table test.transactions$EX
+(server_id             int unsigned,
+master_server_id      int unsigned,
+master_epoch          bigint unsigned,
+count                 int unsigned,
+auto_key              int not null,
+from_name             varchar(100) not null,
+to_name               varchar(100) not null,
+detail                varchar(100) not null,
+primary key(server_id, master_server_id, master_epoch, count))
+engine=ndb;
+create table test.transactions
+(auto_key      int auto_increment,
+from_name     varchar(100),
+to_name       varchar(100),
+detail        varchar(100),
+amount        int,
+primary key(auto_key, from_name, to_name, detail)) engine=ndb;
+Initialise balances across both bank sites
+insert into test.balances values
+("Larry", 100),
+("Employee-1", 0),
+("Employee-2", 0),
+("Yacht dealer", 0),
+("Newsagent", 0);
+FLUSH LOGS;
+Bank sites are disconnected
+stop slave;
+Larry buys a yacht using Primary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Yacht dealer", "Yacht purchase", 50);
+update test.balances set balance = balance - 50 where name = "Larry";
+update test.balances set balance = balance + 50 where name = "Yacht dealer";
+commit;
+Show yacht transaction records
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Yacht dealer	Yacht purchase	50
+select * from test.balances order by name;
+name	balance
+Employee-1	0
+Employee-2	0
+Larry	50
+Newsagent	0
+Yacht dealer	50
+Larry pays employees using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Employee-1", "Payment to Employee-1", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-1";
+commit;
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Employee-2", "Payment to Employee-2", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-2";
+commit;
+Employee-2 buys yacht magazine using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Employee-2", "Newsagent", "Buy yacht magazine", 1);
+update test.balances set balance = balance - 1 where name = "Employee-2";
+update test.balances set balance = balance + 1 where name = "Newsagent";
+commit;
+Show employee transactions
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Employee-1	Payment to Employee-1	1
+2	Larry	Employee-2	Payment to Employee-2	1
+3	Employee-2	Newsagent	Buy yacht magazine	1
+select * from test.balances order by name;
+name	balance
+Employee-1	1
+Employee-2	0
+Larry	98
+Newsagent	1
+Yacht dealer	0
+Bank sites re-connected
+start slave;
+Records at Primary bank site
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Yacht dealer	Yacht purchase	50
+select * from test.balances order by name;
+name	balance
+Employee-1	0
+Employee-2	0
+Larry	50
+Newsagent	0
+Yacht dealer	50
+Exceptions at Primary bank site
+select server_id, master_server_id, count, auto_key, from_name, to_name, detail
+from test.transactions$EX order by count;
+server_id	master_server_id	count	auto_key	from_name	to_name	detail
+1	3	1	1	Larry	Employee-1	Payment to Employee-1
+1	3	2	2	Larry	Employee-2	Payment to Employee-2
+1	3	3	3	Employee-2	Newsagent	Buy yacht magazine
+Conflict handling activity at Primary bank site
+Expect :
+1 conflict from slave T1 on Larry's balance
+1 conflict from slave T2 on Larry's balance
+=2 row conflicts
+
+3 (user) transactions rejected
+9 rows rejected (3 per transaction)
+Variability : # epoch transactions, # row conflicts detected
+1-3                   2-3
+# detect_iter_count
+1-3
+We only check stable values
+ndb_conflict_trans_row_reject_count
+9
+ndb_conflict_trans_reject_count
+3
+Records at Secondary bank site
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Yacht dealer	Yacht purchase	50
+select * from test.balances order by name;
+name	balance
+Employee-1	0
+Employee-2	0
+Larry	50
+Newsagent	0
+Yacht dealer	50
+drop table test.balances;
+drop table test.transactions;
+drop table test.transactions$EX;
+Test mixing transactional and non transactional
+-----------------------------------------------
+Remove old data from t1
+delete from test.t1;
+Define table with row-based epoch detection
+replace into mysql.ndb_replication values
+("test", "t3", 3, 7, NULL),
+("test", "t3", 1, 7, 'NDB$EPOCH()');
+create table t3 (a int primary key, b int) engine=ndb;
+create table t4 (a int primary key, b int) engine=ndb;
+create table t5 (a int primary key, b longtext) engine=ndb;
+Insert some data
+insert into test.t1 values
+(1,1),
+(2,2),
+(3,3),
+(4,4),
+(5,5),
+(6,6);
+insert into test.t3 values
+(11,11),
+(12,12),
+(13,13),
+(14,14),
+(15,15),
+(16,16);
+insert into test.t4 values
+(21,21),
+(22,22),
+(23,23),
+(24,24),
+(25,25),
+(26,26);
+insert into test.t5 values
+(1, REPEAT("B", 10000)),
+(2, REPEAT("E", 10000)),
+(3, REPEAT("A", 10000));
+Allow to propagate
+FLUSH LOGS;
+Case 1 : Transactional detection affects row - based entries in same trans
+stop slave;
+update test.t1 set b=100 where a=1;
+begin;
+update test.t3 set b=1100 where a=11;
+update test.t4 set b=2100 where a=21;
+update test.t1 set b=1000 where a=1;
+commit;
+Show slave transaction effect
+select * from test.t1 order by a;
+a	b
+1	1000
+2	2
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	1100
+12	12
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	2100
+22	22
+23	23
+24	24
+25	25
+26	26
+Expect Primary to have rejected whole trans across 3 tables
+select * from test.t1 order by a;
+a	b
+1	100
+2	2
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	12
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	22
+23	23
+24	24
+25	25
+26	26
+Expect 1 transaction rejected, 3 rows rejected
+1 conflict row, 1 epoch, 1 iteration
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+3
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+Now restart rep to Secondary, and check realignment
+start slave;
+select * from test.t1 order by a;
+a	b
+1	100
+2	2
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	12
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	22
+23	23
+24	24
+25	25
+26	26
+Case 2 : Row based detection does not affect other transaction entries
+stop slave;
+update test.t3 set b=1200 where a=12;
+begin;
+update test.t3 set b=1201 where a=12;
+update test.t4 set b=2200 where a=22;
+update test.t1 set b=2000 where a=2;
+commit;
+Show effect of transaction on Secondary
+select * from test.t1 order by a;
+a	b
+1	100
+2	2000
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	1201
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	2200
+23	23
+24	24
+25	25
+26	26
+Show effect of transaction on Primary
+Only t3 should have been reverted
+select * from test.t1 order by a;
+a	b
+1	100
+2	2000
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	1200
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	2200
+23	23
+24	24
+25	25
+26	26
+Expect all counters to be zero
+ndb_conflict_fn_epoch_trans
+0
+ndb_conflict_trans_row_conflict_count
+0
+ndb_conflict_trans_row_reject_count
+0
+ndb_conflict_trans_reject_count
+0
+ndb_conflict_trans_detect_iter_count
+0
+ndb_conflict_trans_conflict_commit_count
+0
+Show effect of transaction on Secondary
+start slave;
+select * from test.t1 order by a;
+a	b
+1	100
+2	2000
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	1200
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	2200
+23	23
+24	24
+25	25
+26	26
+flush logs;
+Case 3 : Check behaviour where table with Blob is implicated
+in transactional conflict.  Should result in Slave
+stopping with an error.
+STOP SLAVE;
+Setup warning suppression
+begin;
+update t1 set b= 11 where a=1;
+commit;
+begin;
+update t1 set b= 111 where a=1;
+update t1 set b= 222 where a=2;
+update t5 set b= REPEAT("T", 10000) where a=3;
+commit;
+Show effect of transaction on Secondary
+select * from test.t1 order by a;
+a	b
+1	111
+2	222
+3	3
+4	4
+5	5
+6	6
+select left(b,1), length(b) from test.t5 order by a;
+left(b,1)	length(b)
+B	10000
+E	10000
+T	10000
+Check that Primary Slave has stopped
+include/wait_for_slave_sql_error.inc [errno=1296]
+Restart Primary Slave
+set global sql_slave_skip_counter=1;
+START SLAVE;
+Restart Secondary Slave
+START SLAVE;
+flush logs;
+drop table test.t3;
+drop table test.t4;
+drop table test.t5;
+drop table mysql.ndb_replication;
+drop table test.t1;
+drop table test.t2;
+flush logs;
+stop slave;
+reset slave;
+change master to master_host='';
+include/rpl_end.inc

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf	2011-09-07 22:50:01 +0000
@@ -0,0 +1,47 @@
+# Could be :
+#  !include ndb_rpl_conflict_epoch.cnf
+#  [mysqld]
+#  server-id-bits=24
+#  ndb-serverid-transid-bits=8
+
+!include ../my.cnf
+
+# 2 clusters, each with 2 MySQLDs
+# All MySQLDs log-slave-updates
+# Potential infinite loops are broken by both servers
+# on each cluster having the same server-id
+
+[mysqld]
+log-slave-updates
+ndb-log-apply-status
+ndb-log-transaction-id
+
+[mysqld.1.1]
+server-id= 1
+log-bin = pref-master-1
+
+[mysqld.2.1]
+server-id= 2
+log-bin = pref-master-2
+
+[mysqld.1.slave]
+server-id= 3
+log-bin = sec-master-1
+skip-slave-start
+
+[mysqld.2.slave]
+server-id= 4
+log-bin = sec-master-2
+master-host=		127.0.0.1
+master-port=		@mysqld.2.1.port
+master-password=	@mysqld.2.1.#password
+master-user=		@mysqld.2.1.#user
+master-connect-retry=	1
+init-rpl-role=		slave
+skip-slave-start
+ndb_connectstring=	@mysql_cluster.slave.ndb_connectstring
+
+[ENV]
+
+SLAVE_MYPORT1=		@mysqld.2.slave.port
+SLAVE_MYSOCK1=		@mysqld.2.slave.socket

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test	2011-09-07 22:50:01 +0000
@@ -0,0 +1,746 @@
+#
+# Test engine native conflict resolution for ndb
+#   NDB$EPOCH_TRANS() function
+#
+#
+--source include/have_ndb.inc
+--source include/have_binlog_format_mixed_or_row.inc
+--source suite/ndb_rpl/ndb_master-slave.inc
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Setup circular replication
+
+--connection slave
+RESET MASTER;
+select @slave_server_id:=(variable_value+0)
+       from information_schema.global_variables
+       where variable_name like 'server_id';
+let $SLAVE_SERVER_ID= query_get_value('select @slave_server_id as v',v,1);
+
+--connection master
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+--eval CHANGE MASTER TO master_host="127.0.0.1",master_port=$SLAVE_MYPORT,master_user="root"
+START SLAVE;
+select @master_server_id:=(variable_value+0)
+       from information_schema.global_variables
+       where variable_name like 'server_id';
+let $MASTER_SERVER_ID= query_get_value('select @master_server_id as v',v,1);
+
+--echo Setup ndb_replication and t1$EX exceptions table
+
+--disable_warnings
+--disable_query_log
+--connection master
+drop table if exists mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+  (db VARBINARY(63),
+   table_name VARBINARY(63),
+   server_id INT UNSIGNED,
+   binlog_type INT UNSIGNED,
+   conflict_fn VARBINARY(128),
+   PRIMARY KEY USING HASH (db,table_name,server_id))
+  ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
+--enable_query_log
+
+--echo Populate ndb_replication table as necessary
+eval replace into mysql.ndb_replication values
+  ("test", "t1", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "t1", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+eval replace into mysql.ndb_replication values
+  ("test", "t2", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "t2", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+create table test.t1 (
+  a int primary key,
+  b varchar(2000)) engine=ndb;
+
+create table test.t2 (
+  a int primary key,
+  b varchar(2000)) engine=ndb;
+
+--sync_slave_with_master slave
+
+--connection master
+--echo Add some data
+insert into test.t1 values
+ (1, "Initial data 1"),
+ (2, "Initial data 2"),
+ (3, "Initial data 3"),
+ (4, "Initial data 4"),
+ (5, "Initial data 5"),
+ (6, "Initial data 6"),
+ (7, "Initial data 7"),
+ (8, "Initial data 8"),
+ (9, "Initial data 9"),
+ (10, "Initial data 10");
+
+--echo Show basic row-level conflict detection
+--echo ---------------------------------------
+--sync_slave_with_master slave
+--connection slave
+
+stop slave;
+
+--connection master
+
+update t1 set b="Primary first change 2" where a=2;
+select * from test.t1 order by a;
+
+--connection slave
+
+update t1 set b="Secondary first change 2" where a=2;
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--echo Primary should have rejected change from Secondary, keeping its value
+
+select * from t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+
+start slave;
+
+--connection master
+
+--sync_slave_with_master slave
+
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+
+select * from t1 order by a;
+
+--echo Show rollback of whole secondary transaction
+--echo --------------------------------------------
+
+--connection slave
+
+stop slave;
+
+--connection master
+update t1 set b="Primary second change 4" where a=4;
+
+select * from test.t1 order by a;
+
+--connection slave
+begin;
+update t1 set b="Secondary second change 4" where a=4;
+update t1 set b="Secondary second change 5" where a=5;
+commit;
+
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--connection master
+
+--echo Primary should have rejected secondary changes on both rows
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+
+--echo Show rollback of dependent transaction as well
+--echo ----------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="Primary third change 1" where a=1;
+
+select * from test.t1 order by a;
+
+--connection slave
+
+begin;
+update t1 set b="Secondary third change 3" where a=3;
+update t1 set b="Secondary third change 1" where a=1; # Conflict here
+commit;
+begin;
+update t1 set b="Secondary fourth change 3" where a=3; # Dependency on conflict here
+insert into t1 values (11,"Secondary fourth change 11");
+commit;
+
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+
+select * from test.t1 order by a;
+
+
+--echo Show rollback of dependent transaction across different tables
+--echo --------------------------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+
+update t1 set b="Primary fifth change 6" where a=6;
+
+select * from test.t1 order by a;
+
+--connection slave
+
+begin;
+update t1 set b="Secondary fifth change 6" where a=6; # Conflict row
+insert into t2 values (1, "Secondary fifth change 1");
+insert into t2 values (2, "Secondary fifth change 2");
+commit;
+begin;
+update t2 set b="Secondary sixth change 1" where a=2; # Dependent row
+insert into t2 values (3, "Secondary sixth change 2");
+commit;
+
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+--connection master
+
+--sync_slave_with_master slave
+
+--echo Secondary should have been realigned to primary
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--echo Show that whole epoch is not rolled back
+--echo ----------------------------------------
+# Whole epoch is rolled back when --ndb-serverid-transid-bits is 0!
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="Primary is king" where a=10;
+
+--connection slave
+begin;
+update t1 set b="Secondary is emperor" where a=10;
+insert into t1 values (11, "Secondary is pleni-potentiary");
+commit;
+
+begin;
+insert into t1 values (12, "Secondary ruled once");
+insert into t1 values (13, "This history will not be lost");
+insert into t1 values (14, "Look on my works ye mighty, and despair");
+commit;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected conflicting trans (emperor, pleni-potentiary)
+--echo but accepted unrelated trans (history)
+
+select * from t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary should be aligned with Primary
+
+select * from t1 order by a;
+
+
+--echo Show that non-conflicting ancestors are not implicated
+--echo ------------------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="7 : Primary is king" where a=7;
+
+--echo Primary state
+select * from test.t1 order by a;
+
+--connection slave
+
+# 'Innocent' secondary transaction
+begin;
+update t1 set b="8 : Secondary innocent" where a=8;
+update t1 set b="9 : Secondary innocent" where a=9;
+commit;
+
+--echo Secondary with innocent
+select * from test.t1 order by a;
+
+# 'Guilty secondary transaction, affecting one of the same rows as innocent
+begin;
+update t1 set b="9 : Secondary guilty" where a=9; # Dependency on innocent
+update t1 set b="7 : Secondary guilty" where a=7; # Conflict row
+commit;
+
+--echo Secondary with guilty overlaid
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--connection master
+
+--echo Primary cluster should have rejected 'guilty' secondary transaction, but
+--echo accepted 'innocent' secondary transaction.
+
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary cluster should be realigned with Primary
+
+select * from test.t1 order by a;
+
+
+--connection master
+
+--echo Classic banking example
+--echo -----------------------
+
+eval replace into mysql.ndb_replication values
+  ("test", "balances", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "balances", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+# Transactions table may not need conflict-detection?
+eval replace into mysql.ndb_replication values
+  ("test", "transactions", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "transactions", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+create table test.balances
+(name     varchar(100) primary key,
+ balance  int) engine=ndb;
+
+create table test.transactions$EX
+ (server_id             int unsigned,
+  master_server_id      int unsigned,
+  master_epoch          bigint unsigned,
+  count                 int unsigned,
+  auto_key              int not null,
+  from_name             varchar(100) not null,
+  to_name               varchar(100) not null,
+  detail                varchar(100) not null,
+  primary key(server_id, master_server_id, master_epoch, count))
+engine=ndb;
+
+
+create table test.transactions
+(auto_key      int auto_increment,
+ from_name     varchar(100),
+ to_name       varchar(100),
+ detail        varchar(100),
+ amount        int,
+ primary key(auto_key, from_name, to_name, detail)) engine=ndb;
+
+--echo Initialise balances across both bank sites
+insert into test.balances values
+ ("Larry", 100),
+ ("Employee-1", 0),
+ ("Employee-2", 0),
+ ("Yacht dealer", 0),
+ ("Newsagent", 0);
+
+--sync_slave_with_master slave
+--connection slave
+# Sync back to master, to ensure that what follows on slave,
+# is in a separate epoch transaction.
+# This is needed to get stable counts, not for correctness
+#
+FLUSH LOGS; # To give a position to sync
+--sync_slave_with_master master
+
+
+--echo Bank sites are disconnected
+--connection slave
+stop slave;
+--connection master
+
+--echo Larry buys a yacht using Primary bank site
+
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Larry", "Yacht dealer", "Yacht purchase", 50);
+update test.balances set balance = balance - 50 where name = "Larry";
+update test.balances set balance = balance + 50 where name = "Yacht dealer";
+commit;
+
+--echo Show yacht transaction records
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--connection slave
+--echo Larry pays employees using Secondary bank site
+
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Larry", "Employee-1", "Payment to Employee-1", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-1";
+commit;
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Larry", "Employee-2", "Payment to Employee-2", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-2";
+commit;
+
+--echo Employee-2 buys yacht magazine using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Employee-2", "Newsagent", "Buy yacht magazine", 1);
+update test.balances set balance = balance - 1 where name = "Employee-2";
+update test.balances set balance = balance + 1 where name = "Newsagent";
+commit;
+
+--echo Show employee transactions
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--sync_slave_with_master master
+
+--echo Bank sites re-connected
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+
+--connection master
+
+--echo Records at Primary bank site
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--echo Exceptions at Primary bank site
+
+select server_id, master_server_id, count, auto_key, from_name, to_name, detail
+  from test.transactions$EX order by count;
+
+--echo Conflict handling activity at Primary bank site
+--echo Expect :
+--echo   1 conflict from slave T1 on Larry's balance
+--echo   1 conflict from slave T2 on Larry's balance
+--echo  =2 row conflicts
+--echo
+--echo 3 (user) transactions rejected
+--echo 9 rows rejected (3 per transaction)
+--echo Variability : # epoch transactions, # row conflicts detected
+--echo               1-3                   2-3
+--echo               # detect_iter_count
+--echo               1-3
+--echo We only check stable values
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+
+--echo Records at Secondary bank site
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--sync_slave_with_master master
+
+--connection master
+drop table test.balances;
+drop table test.transactions;
+drop table test.transactions$EX;
+
+--echo Test mixing transactional and non transactional
+--echo -----------------------------------------------
+--echo Remove old data from t1
+--connection master
+delete from test.t1;
+--sync_slave_with_master slave
+--connection master
+
+--echo Define table with row-based epoch detection
+eval replace into mysql.ndb_replication values
+           ("test", "t3", $SLAVE_SERVER_ID, 7, NULL),
+           ("test", "t3", $MASTER_SERVER_ID, 7, 'NDB\$EPOCH()');
+
+create table t3 (a int primary key, b int) engine=ndb;
+create table t4 (a int primary key, b int) engine=ndb;
+create table t5 (a int primary key, b longtext) engine=ndb;
+
+--echo Insert some data
+
+insert into test.t1 values
+  (1,1),
+  (2,2),
+  (3,3),
+  (4,4),
+  (5,5),
+  (6,6);
+
+insert into test.t3 values
+  (11,11),
+  (12,12),
+  (13,13),
+  (14,14),
+  (15,15),
+  (16,16);
+
+insert into test.t4 values
+  (21,21),
+  (22,22),
+  (23,23),
+  (24,24),
+  (25,25),
+  (26,26);
+
+insert into test.t5 values
+  (1, REPEAT("B", 10000)),
+  (2, REPEAT("E", 10000)),
+  (3, REPEAT("A", 10000));
+
+--echo Allow to propagate
+--sync_slave_with_master slave
+
+--connection slave
+FLUSH LOGS;  # Ensure Inserts are in previous epoch trans to what follows
+--sync_slave_with_master master
+
+--echo Case 1 : Transactional detection affects row - based entries in same trans
+--connection slave
+stop slave;
+--connection master
+update test.t1 set b=100 where a=1;
+
+--connection slave
+# t3 is in a table without trans conflict detection (but with row based)
+# t4 is in a table without any detection
+# t1 is in a table with trans conflict detection
+begin;
+update test.t3 set b=1100 where a=11;
+update test.t4 set b=2100 where a=21;
+update test.t1 set b=1000 where a=1;
+commit;
+
+--echo Show slave transaction effect
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+
+--sync_slave_with_master master
+
+--connection master
+--echo Expect Primary to have rejected whole trans across 3 tables
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Expect 1 transaction rejected, 3 rows rejected
+--echo        1 conflict row, 1 epoch, 1 iteration
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Now restart rep to Secondary, and check realignment
+--connection slave
+start slave;
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Case 2 : Row based detection does not affect other transaction entries
+--connection slave
+stop slave;
+--connection master
+update test.t3 set b=1200 where a=12;
+
+--connection slave
+# Transaction conflicts with master, on table without transactional
+# conflict detection
+# Conflict will be detected on row, but no other transaction state
+# will be reverted
+#
+begin;
+update test.t3 set b=1201 where a=12;
+update test.t4 set b=2200 where a=22;
+update test.t1 set b=2000 where a=2;
+commit;
+
+--echo Show effect of transaction on Secondary
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--sync_slave_with_master master
+
+--echo Show effect of transaction on Primary
+--echo Only t3 should have been reverted
+
+--connection master
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Expect all counters to be zero
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Show effect of transaction on Secondary
+--connection slave
+start slave;
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+flush logs;
+--sync_slave_with_master master
+
+--echo Case 3 : Check behaviour where table with Blob is implicated
+--echo          in transactional conflict.  Should result in Slave
+--echo          stopping with an error.
+
+--connection slave
+STOP SLAVE;
+
+--connection master
+
+--echo Setup warning suppression
+--disable_query_log
+call mtr.add_suppression("Transaction conflict handling on table t5 failed as table has Blobs which cannot be refreshed");
+call mtr.add_suppression("NDBCLUSTER Error_code: 1296");
+--enable_query_log
+
+
+begin;
+update t1 set b= 11 where a=1;
+commit;
+
+--connection slave
+begin;
+update t1 set b= 111 where a=1;                 # Conflict
+update t1 set b= 222 where a=2;                 # Implicated row
+update t5 set b= REPEAT("T", 10000) where a=3;  # ImplicatedBlob update
+commit;
+
+--echo Show effect of transaction on Secondary
+select * from test.t1 order by a;
+select left(b,1), length(b) from test.t5 order by a;
+
+--echo Check that Primary Slave has stopped
+--connection master
+
+--let $slave_sql_errno=1296
+--source include/wait_for_slave_sql_error.inc
+#SHOW SLAVE STATUS;
+
+--echo Restart Primary Slave
+set global sql_slave_skip_counter=1;
+
+START SLAVE;
+
+--connection slave
+
+--echo Restart Secondary Slave
+START SLAVE;
+
+flush logs;
+--sync_slave_with_master master
+
+--connection master
+drop table test.t3;
+drop table test.t4;
+drop table test.t5;
+
+# Cleanup
+--connection master
+drop table mysql.ndb_replication;
+drop table test.t1;
+drop table test.t2;
+
+--sync_slave_with_master slave
+
+--connection slave
+flush logs;
+--sync_slave_with_master master
+stop slave;
+reset slave;
+change master to master_host='';
+--source include/rpl_end.inc
+
+# TODO
+# More complex dependencies
\ No newline at end of file

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc	2011-09-07 22:50:01 +0000
@@ -0,0 +1,8 @@
+--disable_query_log
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_epoch_trans AS ndb_conflict_fn_epoch_trans FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_conflict_count AS ndb_conflict_trans_row_conflict_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_reject_count AS ndb_conflict_trans_row_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_reject_count AS ndb_conflict_trans_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_detect_iter_count AS ndb_conflict_trans_detect_iter_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_conflict_commit_count AS ndb_conflict_trans_conflict_commit_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
\ No newline at end of file

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc	2011-09-07 22:50:01 +0000
@@ -0,0 +1,10 @@
+--disable_query_log
+--disable_result_log
+SELECT @init_ndb_conflict_fn_epoch_trans:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+SELECT @init_ndb_conflict_trans_row_conflict_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT @init_ndb_conflict_trans_row_reject_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT @init_ndb_conflict_trans_reject_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+SELECT @init_ndb_conflict_trans_detect_iter_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+SELECT @init_ndb_conflict_trans_conflict_commit_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
+--enable_result_log

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc	2011-09-07 22:50:01 +0000
@@ -0,0 +1,22 @@
+--disable_query_log
+# Where multiple user transactions from the Secondary site are
+# involved, the actual number of rows in-conflict, the number
+# of epoch transactions with conflicts, and the number of
+# iterations, vary depending on the distribution of the user
+# transactions across epochs.  e.g. if 3 transactions are in
+# one epoch (and executed in one batch), then only actual
+# conflicts will be recorded, but if they are separated, then
+# implied rows may conflict as well.  This can make these
+# counter values non-deterministic, so this .inc file is used
+# to check that the stable counters are correct.  The stable
+# counters are the number of rejected rows, and the number
+# of rejected transactions, which must be the same, regardless
+# of how the epoch boundaries lie.
+#
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_epoch_trans AS ndb_conflict_fn_epoch_trans FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_conflict_count AS ndb_conflict_trans_row_conflict_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_reject_count AS ndb_conflict_trans_row_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_reject_count AS ndb_conflict_trans_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_detect_iter_count AS ndb_conflict_trans_detect_iter_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_conflict_commit_count AS ndb_conflict_trans_conflict_commit_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
\ No newline at end of file

=== added file 'mysql-test/suite/rpl/r/rpl_extra_row_data.result'
--- a/mysql-test/suite/rpl/r/rpl_extra_row_data.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_extra_row_data.result	2011-09-07 22:50:01 +0000
@@ -0,0 +1,46 @@
+include/master-slave.inc
+[connection master]
+Basic insert, update, delete from Master->Slave
+DBUG code will set + check transfer of extra
+row data in RBR
+**** On Master ****
+CREATE TABLE t1 (a INT);
+Ten inserts in one transaction -> 1 epoch transaction
+BEGIN;
+INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+COMMIT;
+Wait for Binlog-on-disk
+flush logs;
+Check that we have the expected extra row data in the Binlog
+create table raw_data(txt varchar(1000));
+select replace(txt, '\r', '') from raw_data where txt like '%### Extra row data format%' order by txt;
+replace(txt, '\r', '')
+### Extra row data format: 0, len: 0 :
+### Extra row data format: 1, len: 1 :0x01
+### Extra row data format: 2, len: 2 :0x0202
+### Extra row data format: 3, len: 3 :0x030303
+### Extra row data format: 4, len: 4 :0x04040404
+### Extra row data format: 5, len: 5 :0x0505050505
+### Extra row data format: 6, len: 6 :0x060606060606
+### Extra row data format: 7, len: 7 :0x07070707070707
+### Extra row data format: 8, len: 8 :0x0808080808080808
+### Extra row data format: 9, len: 9 :0x090909090909090909
+drop table raw_data;
+Generate some more insert, update, delete traffic
+INSERT INTO t1 SELECT a+10 FROM t1;
+INSERT INTO t1 SELECT a+20 FROM t1;
+INSERT INTO t1 SELECT a+40 FROM t1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+DELETE FROM t1 WHERE a > 390;
+**** On Slave ****
+Check row count and that slave is running ok
+SELECT count(*) from t1;
+count(*)
+80
+include/check_slave_is_running.inc
+DROP TABLE t1;
+include/rpl_end.inc

=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt	2011-09-07 22:50:01 +0000
@@ -0,0 +1 @@
+--loose-debug=+d,extra_row_data_set

=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt	2011-09-07 22:50:01 +0000
@@ -0,0 +1 @@
+--loose-debug=+d,extra_row_data_check

=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data.test'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data.test	2011-09-07 22:50:01 +0000
@@ -0,0 +1,68 @@
+--source include/master-slave.inc
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+
+#
+# Test Binlog row extra data added as part of WL5353
+# Test depends on functionality added to server code within
+# #ifndef MCP_WL5353
+#
+#
+--echo Basic insert, update, delete from Master->Slave
+--echo DBUG code will set + check transfer of extra
+--echo row data in RBR
+--echo **** On Master ****
+CREATE TABLE t1 (a INT);
+
+--echo Ten inserts in one transaction -> 1 epoch transaction
+BEGIN;
+INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+COMMIT;
+
+--echo Wait for Binlog-on-disk
+flush logs;
+
+--echo Check that we have the expected extra row data in the Binlog
+create table raw_data(txt varchar(1000));
+--disable_query_log
+let $MYSQLD_DATADIR= `select @@datadir;`;
+--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/rpl_extra_row_data.out
+
+--eval load data local infile '$MYSQLTEST_VARDIR/tmp/rpl_extra_row_data.out' into table raw_data columns terminated by '\n';
+--enable_query_log
+
+select replace(txt, '\r', '') from raw_data where txt like '%### Extra row data format%' order by txt;
+drop table raw_data;
+
+--echo Generate some more insert, update, delete traffic
+INSERT INTO t1 SELECT a+10 FROM t1;
+INSERT INTO t1 SELECT a+20 FROM t1;
+INSERT INTO t1 SELECT a+40 FROM t1;
+# 80 rows, 80 inserts
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+# 5 Updates of 80 rows = 400 updates, enough to show all potential lengths
+# of Binlog extra row data including 0 + 255.
+
+# 10 deletes
+DELETE FROM t1 WHERE a > 390;
+
+#show binlog events;
+#let $MYSQLD_DATADIR= `select @@datadir;`;
+#--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/master-bin.000001
+
+--echo **** On Slave ****
+--sync_slave_with_master
+connection slave;
+
+--echo Check row count and that slave is running ok
+SELECT count(*) from t1;
+source include/check_slave_is_running.inc;
+
+connection master;
+DROP TABLE t1;
+--sync_slave_with_master
+--source include/rpl_end.inc

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2011-07-04 13:37:56 +0000
+++ b/sql/Makefile.am	2011-09-07 22:50:01 +0000
@@ -63,6 +63,7 @@ noinst_HEADERS =	item.h item_func.h item
 			ha_ndbcluster_glue.h \
 			ha_ndb_index_stat.h \
                         ndb_mi.h \
+			ndb_conflict_trans.h \
 			ha_partition.h rpl_constants.h \
 			debug_sync.h \
 			opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
@@ -139,7 +140,8 @@ libndb_la_SOURCES=	ha_ndbcluster.cc \
 			ha_ndbcluster_cond.cc \
 			ha_ndb_index_stat.cc \
 			ha_ndbinfo.cc \
-			ndb_mi.cc
+			ndb_mi.cc \
+			ndb_conflict_trans.cc
 
 gen_lex_hash_SOURCES =	gen_lex_hash.cc
 gen_lex_hash_LDFLAGS =  @NOINST_LDFLAGS@

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-09-07 17:12:12 +0000
+++ b/sql/ha_ndbcluster.cc	2011-09-07 22:50:01 +0000
@@ -45,6 +45,7 @@
 #include <mysql/plugin.h>
 #include <ndb_version.h>
 #include "ndb_mi.h"
+#include "ndb_conflict_trans.h"
 
 #ifdef ndb_dynamite
 #undef assert
@@ -465,96 +466,6 @@ update_slave_api_stats(Ndb* ndb)
 
 st_ndb_slave_state g_ndb_slave_state;
 
-st_ndb_slave_state::st_ndb_slave_state()
-  : current_conflict_defined_op_count(0),
-    current_master_server_epoch(0),
-    current_max_rep_epoch(0),
-    max_rep_epoch(0),
-    sql_run_id(~Uint32(0))
-{
-  memset(current_violation_count, 0, sizeof(current_violation_count));
-  memset(total_violation_count, 0, sizeof(total_violation_count));
-};
-
-void
-st_ndb_slave_state::atTransactionAbort()
-{
-  /* Reset current-transaction counters + state */
-  memset(current_violation_count, 0, sizeof(current_violation_count));
-  current_conflict_defined_op_count = 0;
-  current_max_rep_epoch = 0;
-}
-
-void
-st_ndb_slave_state::atTransactionCommit()
-{
-  /* Merge committed transaction counters into total state
-   * Then reset current transaction counters
-   */
-  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
-  {
-    total_violation_count[i]+= current_violation_count[i];
-    current_violation_count[i] = 0;
-  }
-  current_conflict_defined_op_count = 0;
-  if (current_max_rep_epoch > max_rep_epoch)
-  {
-    DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
-                        max_rep_epoch,
-                        current_max_rep_epoch));
-
-    max_rep_epoch = current_max_rep_epoch;
-  }
-  current_max_rep_epoch = 0;
-}
-
-void
-st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
-                                       Uint32 row_server_id,
-                                       Uint64 row_epoch,
-                                       bool is_row_server_id_local)
-{
-  if (row_server_id == master_server_id)
-  {
-    /*
-       WRITE_ROW to ndb_apply_status injected by MySQLD
-       immediately upstream of us.
-       Record epoch
-    */
-    current_master_server_epoch = row_epoch;
-    assert(! is_row_server_id_local);
-  }
-  else if (is_row_server_id_local)
-  {
-    DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
-                        " which is %s.",
-                        row_server_id, row_epoch,
-                        (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
-                        " new highest." : " older than previously applied"));
-    if (row_epoch > current_max_rep_epoch)
-    {
-      /*
-        Store new highest epoch in thdvar.  If we commit successfully
-        then this can become the new global max
-      */
-      current_max_rep_epoch = row_epoch;
-    }
-  }
-}
-
-void
-st_ndb_slave_state::atResetSlave()
-{
-  /* Reset the Maximum replicated epoch vars
-   * on slave reset
-   * No need to touch the sql_run_id as that
-   * will increment if the slave is started
-   * again.
-   */
-  current_max_rep_epoch = 0;
-  max_rep_epoch = 0;
-}
-
 static int check_slave_state(THD* thd)
 {
   DBUG_ENTER("check_slave_state");
@@ -573,6 +484,8 @@ static int check_slave_state(THD* thd)
                         g_ndb_slave_state.sql_run_id));
     g_ndb_slave_state.sql_run_id = runId;
 
+    g_ndb_slave_state.atStartSlave();
+
     /* Always try to load the Max Replicated Epoch info
      * first.
      * Could be made optional if it's a problem
@@ -813,6 +726,13 @@ SHOW_VAR ndb_status_conflict_variables[]
   {"fn_old",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONGLONG},
   {"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONGLONG},
   {"fn_epoch",     (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH], SHOW_LONGLONG},
+  {"fn_epoch_trans", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH_TRANS], SHOW_LONGLONG},
+  {"trans_row_conflict_count", (char*) &g_ndb_slave_state.trans_row_conflict_count, SHOW_LONGLONG},
+  {"trans_row_reject_count",   (char*) &g_ndb_slave_state.trans_row_reject_count, SHOW_LONGLONG},
+  {"trans_reject_count",       (char*) &g_ndb_slave_state.trans_in_conflict_count, SHOW_LONGLONG},
+  {"trans_detect_iter_count",  (char*) &g_ndb_slave_state.trans_detect_iter_count, SHOW_LONGLONG},
+  {"trans_conflict_commit_count",
+                               (char*) &g_ndb_slave_state.trans_conflict_commit_count, SHOW_LONGLONG},
   {NullS, NullS, SHOW_LONG}
 };
 
@@ -919,66 +839,6 @@ static int ndb_to_mysql_error(const NdbE
 
 #ifdef HAVE_NDB_BINLOG
 
-/* Write conflicting row to exceptions table. */
-static int write_conflict_row(NDB_SHARE *share,
-                              NdbTransaction *trans,
-                              const uchar *row,
-                              NdbError& err)
-{
-  DBUG_ENTER("write_conflict_row");
-
-  /* get exceptions table */
-  NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
-  const NDBTAB *ex_tab= cfn_share->m_ex_tab;
-  DBUG_ASSERT(ex_tab != NULL);
-
-  /* get insert op */
-  NdbOperation *ex_op= trans->getNdbOperation(ex_tab);
-  if (ex_op == NULL)
-  {
-    err= trans->getNdbError();
-    DBUG_RETURN(-1);
-  }
-  if (ex_op->insertTuple() == -1)
-  {
-    err= ex_op->getNdbError();
-    DBUG_RETURN(-1);
-  }
-  {
-    uint32 server_id= (uint32)::server_id;
-    uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
-    uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
-    uint32 count= (uint32)++(cfn_share->m_count);
-    if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
-        ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
-        ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
-        ex_op->setValue((Uint32)3, (const char *)&(count)))
-    {
-      err= ex_op->getNdbError();
-      DBUG_RETURN(-1);
-    }
-  }
-  /* copy primary keys */
-  {
-    const int fixed_cols= 4;
-    int nkey= cfn_share->m_pk_cols;
-    int k;
-    for (k= 0; k < nkey; k++)
-    {
-      DBUG_ASSERT(row != NULL);
-      const uchar* data= row + cfn_share->m_offset[k];
-      if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
-      {
-        err= ex_op->getNdbError();
-        DBUG_RETURN(-1);
-      }
-    }
-  }
-  DBUG_RETURN(0);
-}
-#endif
-
-#ifdef HAVE_NDB_BINLOG
 int
 handle_conflict_op_error(Thd_ndb* thd_ndb,
                          NdbTransaction* trans,
@@ -988,13 +848,14 @@ handle_conflict_op_error(Thd_ndb* thd_nd
 int
 handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
                     const char* tab_name,
+                    bool table_has_blobs,
+                    const char* handling_type,
                     const NdbRecord* key_rec,
                     const uchar* pk_row,
                     enum_conflicting_op_type op_type,
                     enum_conflict_cause conflict_cause,
                     const NdbError& conflict_error,
-                    NdbTransaction* conflict_trans,
-                    NdbError& err);
+                    NdbTransaction* conflict_trans);
 #endif
 
 static const Uint32 error_op_after_refresh_op = 920;
@@ -4223,17 +4084,619 @@ bool ha_ndbcluster::isManualBinlogExec(T
 
 }
 
-static inline bool
-thd_allow_batch(const THD* thd)
+static inline bool
+thd_allow_batch(const THD* thd)
+{
+#ifndef OPTION_ALLOW_BATCH
+  return false;
+#else
+  return (thd_options(thd) & OPTION_ALLOW_BATCH);
+#endif
+}
+
+/**
+   st_ndb_slave_state constructor
+
+   Initialise Ndb Slave state object
+*/
+st_ndb_slave_state::st_ndb_slave_state()
+  : current_master_server_epoch(0),
+    current_max_rep_epoch(0),
+    conflict_flags(0),
+    retry_trans_count(0),
+    current_trans_row_conflict_count(0),
+    current_trans_row_reject_count(0),
+    current_trans_in_conflict_count(0),
+    max_rep_epoch(0),
+    sql_run_id(~Uint32(0)),
+    trans_row_conflict_count(0),
+    trans_row_reject_count(0),
+    trans_detect_iter_count(0),
+    trans_in_conflict_count(0),
+    trans_conflict_commit_count(0),
+    trans_conflict_apply_state(SAS_NORMAL),
+    trans_dependency_tracker(NULL)
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  memset(total_violation_count, 0, sizeof(total_violation_count));
+
+  /* Init conflict handling state memroot */
+  const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
+  init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
+};
+
+/**
+   resetPerAttemptCounters
+
+   Reset the per-epoch-transaction-application-attempt counters
+*/
+void
+st_ndb_slave_state::resetPerAttemptCounters()
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  current_trans_row_conflict_count = 0;
+  current_trans_row_reject_count = 0;
+  current_trans_in_conflict_count = 0;
+
+  conflict_flags = 0;
+  current_max_rep_epoch = 0;
+}
+
+/**
+   atTransactionAbort()
+
+   Called by Slave SQL thread during transaction abort.
+*/
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+  /* Reset current-transaction counters + state */
+  resetPerAttemptCounters();
+}
+
+/**
+   atTransactionCommit()
+
+   Called by Slave SQL thread after transaction commit
+*/
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+  assert( ((trans_dependency_tracker == NULL) &&
+           (trans_conflict_apply_state == SAS_NORMAL)) ||
+          ((trans_dependency_tracker != NULL) &&
+           (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
+  assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
+
+  /* Merge committed transaction counters into total state
+   * Then reset current transaction counters
+   */
+  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+  {
+    total_violation_count[i]+= current_violation_count[i];
+  }
+  trans_row_conflict_count+= current_trans_row_conflict_count;
+  trans_row_reject_count+= current_trans_row_reject_count;
+  trans_in_conflict_count+= current_trans_in_conflict_count;
+
+  if (current_trans_in_conflict_count)
+    trans_conflict_commit_count++;
+
+  if (current_max_rep_epoch > max_rep_epoch)
+  {
+    DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
+                        max_rep_epoch,
+                        current_max_rep_epoch));
+    max_rep_epoch = current_max_rep_epoch;
+  }
+
+  resetPerAttemptCounters();
+
+  /* Clear per-epoch-transaction retry_trans_count */
+  retry_trans_count = 0;
+}
+
+/**
+   atApplyStatusWrite
+
+   Called by Slave SQL thread when applying an event to the
+   ndb_apply_status table
+*/
+void
+st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
+                                       Uint32 row_server_id,
+                                       Uint64 row_epoch,
+                                       bool is_row_server_id_local)
+{
+  if (row_server_id == master_server_id)
+  {
+    /*
+       WRITE_ROW to ndb_apply_status injected by MySQLD
+       immediately upstream of us.
+       Record epoch
+    */
+    current_master_server_epoch = row_epoch;
+    assert(! is_row_server_id_local);
+  }
+  else if (is_row_server_id_local)
+  {
+    DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
+                        " which is %s.",
+                        row_server_id, row_epoch,
+                        (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
+                        " new highest." : " older than previously applied"));
+    if (row_epoch > current_max_rep_epoch)
+    {
+      /*
+        Store new highest epoch in thdvar.  If we commit successfully
+        then this can become the new global max
+      */
+      current_max_rep_epoch = row_epoch;
+    }
+  }
+}
+
+/**
+   atResetSlave()
+
+   Called when RESET SLAVE command issued - in context of command client.
+*/
+void
+st_ndb_slave_state::atResetSlave()
+{
+  /* Reset the Maximum replicated epoch vars
+   * on slave reset
+   * No need to touch the sql_run_id as that
+   * will increment if the slave is started
+   * again.
+   */
+  resetPerAttemptCounters();
+
+  retry_trans_count = 0;
+  max_rep_epoch = 0;
+}
+
+/**
+   atStartSlave()
+
+   Called by Slave SQL thread when first applying a row to Ndb after
+   a START SLAVE command.
+*/
+void
+st_ndb_slave_state::atStartSlave()
+{
+#ifdef HAVE_NDB_BINLOG
+  if (trans_conflict_apply_state != SAS_NORMAL)
+  {
+    /*
+      Remove conflict handling state on a SQL thread
+      restart
+    */
+    atEndTransConflictHandling();
+    trans_conflict_apply_state = SAS_NORMAL;
+  }
+#endif
+};
+
+#ifdef HAVE_NDB_BINLOG
+
+/**
+   atBeginTransConflictHandling()
+
+   Called by Slave SQL thread when it determines that Transactional
+   Conflict handling is required
+*/
+void
+st_ndb_slave_state::atBeginTransConflictHandling()
+{
+  DBUG_ENTER("atBeginTransConflictHandling");
+  /*
+     Allocate and initialise Transactional Conflict
+     Resolution Handling Structures
+  */
+  assert(trans_dependency_tracker == NULL);
+  trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
+  DBUG_VOID_RETURN;
+};
+
+/**
+   atPrepareConflictDetection
+
+   Called by Slave SQL thread prior to defining an operation on
+   a table with conflict detection defined.
+*/
+int
+st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
+                                               const NdbRecord* key_rec,
+                                               const uchar* row_data,
+                                               Uint64 transaction_id,
+                                               bool& handle_conflict_now)
+{
+  DBUG_ENTER("atPrepareConflictDetection");
+  /*
+    Slave is preparing to apply an operation with conflict detection.
+    If we're performing Transactional Conflict Resolution, take
+    extra steps
+  */
+  switch( trans_conflict_apply_state )
+  {
+  case SAS_NORMAL:
+    DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
+    /* No special handling */
+    break;
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
+    /*
+      Track this operation and its transaction id, to determine
+      inter-transaction dependencies by {table, primary key}
+    */
+    assert( trans_dependency_tracker );
+
+    int res = trans_dependency_tracker
+      ->track_operation(table,
+                        key_rec,
+                        row_data,
+                        transaction_id);
+    if (res != 0)
+    {
+      sql_print_error("%s", trans_dependency_tracker->get_error_text());
+      DBUG_RETURN(res);
+    }
+    /* Proceed as normal */
+    break;
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
+    /*
+       Check if this operation's transaction id is marked in-conflict.
+       If it is, we tell the caller to perform conflict resolution now instead
+       of attempting to apply the operation.
+    */
+    assert( trans_dependency_tracker );
+
+    if (trans_dependency_tracker->in_conflict(transaction_id))
+    {
+      DBUG_PRINT("info", ("Event for transaction %llu is conflicting.  Handling.",
+                          transaction_id));
+      current_trans_row_reject_count++;
+      handle_conflict_now = true;
+      DBUG_RETURN(0);
+    }
+
+    /*
+       This transaction is not marked in-conflict, so continue with normal
+       processing.
+       Note that normal processing may subsequently detect a conflict which
+       didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
+       In this case, we will rollback and repeat the TRACK_DEPENDENCIES
+       stage.
+    */
+    DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
+                        transaction_id));
+    break;
+  }
+  }
+  DBUG_RETURN(0);
+}
+
+/**
+   atTransConflictDetected
+
+   Called by the Slave SQL thread when a conflict is detected on
+   an executed operation.
+*/
+int
+st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
+{
+  DBUG_ENTER("atTransConflictDetected");
+
+  /*
+     The Slave has detected a conflict on an operation applied
+     to a table with Transactional Conflict Resolution defined.
+     Handle according to current state.
+  */
+  conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
+  current_trans_row_conflict_count++;
+
+  switch (trans_conflict_apply_state)
+  {
+  case SAS_NORMAL:
+  {
+    DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
+                        "Requires multi-pass resolution.  Will transition to "
+                        "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
+    /*
+      Conflict on table with transactional conflict resolution
+      defined.
+      This is the trigger that we will do transactional conflict
+      resolution.
+      Record that we need to do multiple passes to correctly
+      perform resolution.
+      TODO : Early exit from applying epoch?
+    */
+    break;
+  }
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
+                        "had conflict",
+                        transaction_id));
+    /*
+       Conflict on table with transactional conflict resolution
+       defined.
+       We will mark the operation's transaction_id as in-conflict,
+       so that any other operations on the transaction are also
+       considered in-conflict, and any dependent transactions are also
+       considered in-conflict.
+    */
+    assert(trans_dependency_tracker != NULL);
+    int res = trans_dependency_tracker
+      ->mark_conflict(transaction_id);
+
+    if (res != 0)
+    {
+      sql_print_error("%s", trans_dependency_tracker->get_error_text());
+      DBUG_RETURN(res);
+    }
+    break;
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    /*
+       This must be a new conflict, not noticed on the previous
+       pass.
+    */
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected.  "
+                        "Must be further conflict.  Will return to "
+                        "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
+    // TODO : Early exit from applying epoch
+    break;
+  }
+  default:
+    break;
+  }
+
+  DBUG_RETURN(0);
+}
+
+/**
+   atConflictPreCommit
+
+   Called by the Slave SQL thread prior to committing a Slave transaction.
+   This method can request that the Slave transaction is retried.
+
+
+   State transitions :
+
+                       START SLAVE /
+                       RESET SLAVE /
+                        STARTUP
+                            |
+                            |
+                            v
+                    ****************
+                    *  SAS_NORMAL  *
+                    ****************
+                       ^       |
+    No transactional   |       | Conflict on transactional table
+       conflicts       |       | (Rollback)
+       (Commit)        |       |
+                       |       v
+            **********************************
+            *  SAS_TRACK_TRANS_DEPENDENCIES  *
+            **********************************
+               ^          I              ^
+     More      I          I Dependencies |
+    conflicts  I          I determined   | No new conflicts
+     found     I          I (Rollback)   | (Commit)
+    (Rollback) I          I              |
+               I          v              |
+           **********************************
+           *  SAS_APPLY_TRANS_DEPENDENCIES  *
+           **********************************
+
+
+   Operation
+     The initial state is SAS_NORMAL.
+
+     On detecting a conflict on a transactional conflict detetecing table,
+     SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
+     rolled back and reapplied.
+
+     In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
+     conflicts are tracked as the epoch transaction is applied.
+
+     Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
+     the epoch transaction is rolled back and reapplied.
+
+     In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
+     marked as in-conflict are not applied.
+
+     If this results in no new conflicts, the epoch transaction is committed,
+     and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
+     the next replicated epch transaction.
+     If it results in new conflicts, the epoch transactions is rolled back, and
+     the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
+     the new set of dependencies.
+
+     If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
+     the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
+     state.
+
+
+   Properties
+     1) Normally, there is no transaction dependency tracking overhead paid by
+        the slave.
+
+     2) On first detecting a transactional conflict, the epoch transaction must be
+        applied at least three times, with two rollbacks.
+
+     3) Transactional conflicts detected in subsequent epochs require the epoch
+        transaction to be applied two times, with one rollback.
+
+     4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
+        DEPENDENCIES occurs when further transactional conflicts are discovered
+        in SAS_APPLY_TRANS_DEPENDENCIES state.  This implies that the  conflicts
+        discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
+        so we revisit that state to get a more complete picture.
+
+     5) The number of iterations of this loop is fixed to a hard coded limit, after
+        which the Slave will stop with an error.  This should be an unlikely
+        occurrence, as it requires not just n conflicts, but at least 1 new conflict
+        appearing between the transactions in the epoch transaction and the
+        database between the two states, n times in a row.
+
+     6) Where conflicts are occasional, as expected, the post-commit transition to
+        SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
+        transaction having its transaction dependencies needlessly tracked.
+
+*/
+int
+st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
+{
+  DBUG_ENTER("atConflictPreCommit");
+
+  /*
+    Prior to committing a Slave transaction, we check whether
+    Transactional conflicts have been detected which require
+    us to retry the slave transaction
+  */
+  retry_slave_trans = false;
+  switch(trans_conflict_apply_state)
+  {
+  case SAS_NORMAL:
+  {
+    DBUG_PRINT("info", ("SAS_NORMAL"));
+    /*
+       Normal case.  Only if we defined conflict detection on a table
+       with transactional conflict detection, and saw conflicts (on any table)
+       do we go to another state
+     */
+    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+    {
+      DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
+                          "SAS_TRACK_TRANS_DEPENDENCIES."));
+      assert(conflict_flags & SCS_OPS_DEFINED);
+      /* Transactional conflict resolution required, switch state */
+      atBeginTransConflictHandling();
+      resetPerAttemptCounters();
+      trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+      retry_slave_trans = true;
+    }
+    break;
+  }
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
+
+    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+    {
+      /*
+         Conflict on table with transactional detection
+         this pass, we have collected the details and
+         dependencies, now transition to
+         SAS_APPLY_TRANS_DEPENDENCIES and
+         reapply the epoch transaction without the
+         conflicting transactions.
+      */
+      assert(conflict_flags & SCS_OPS_DEFINED);
+      DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
+                          "SAS_APPLY_TRANS_DEPENDENCIES"));
+
+      trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
+      trans_detect_iter_count++;
+      retry_slave_trans = true;
+      break;
+    }
+    else
+    {
+      /*
+         No transactional conflicts detected this pass, lets
+         return to SAS_NORMAL state after commit for more efficient
+         application of epoch transactions
+      */
+      DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
+                          "SAS_NORMAL"));
+      atEndTransConflictHandling();
+      trans_conflict_apply_state = SAS_NORMAL;
+      break;
+    }
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
+    assert(conflict_flags & SCS_OPS_DEFINED);
+    /*
+       We've applied the Slave epoch transaction subject to the
+       conflict detection.  If any further transactional
+       conflicts have been observed, then we must repeat the
+       process.
+    */
+    atEndTransConflictHandling();
+    atBeginTransConflictHandling();
+    trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+
+    if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
+    {
+      DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
+                          "TRACK_TRANS_DEPENDENCIES pass"));
+      /*
+         Further conflict observed when applying, need
+         to re-determine dependencies
+      */
+      resetPerAttemptCounters();
+      retry_slave_trans = true;
+      break;
+    }
+
+
+    DBUG_PRINT("info", ("No further conflicts detected, committing and "
+                        "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
+    /*
+       With dependencies taken into account, no further
+       conflicts detected, can now proceed to commit
+    */
+    break;
+  }
+  }
+
+  /*
+    Clear conflict flags, to ensure that we detect any new conflicts
+  */
+  conflict_flags = 0;
+
+  if (retry_slave_trans)
+  {
+    DBUG_PRINT("info", ("Requesting transaction restart"));
+    DBUG_RETURN(1);
+  }
+
+  DBUG_PRINT("info", ("Allowing commit to proceed"));
+  DBUG_RETURN(0);
+}
+
+/**
+   atEndTransConflictHandling
+
+   Called when transactional conflict handling has completed.
+*/
+void
+st_ndb_slave_state::atEndTransConflictHandling()
 {
-#ifndef OPTION_ALLOW_BATCH
-  return false;
-#else
-  return (thd_options(thd) & OPTION_ALLOW_BATCH);
-#endif
-}
+  DBUG_ENTER("atEndTransConflictHandling");
+  /* Release any conflict handling state */
+  if (trans_dependency_tracker)
+  {
+    current_trans_in_conflict_count =
+      trans_dependency_tracker->get_conflict_count();
+    trans_dependency_tracker = NULL;
+    free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
+  }
+  DBUG_VOID_RETURN;
+};
 
-#ifdef HAVE_NDB_BINLOG
 /**
    prepare_conflict_detection
 
@@ -4242,22 +4705,114 @@ thd_allow_batch(const THD* thd)
 
    It is responsible for defining and adding any operation filtering
    required, and for saving any operation definition state required
-   for post-execute analysis
+   for post-execute analysis.
+
+   For transactional detection, this method may determine that the
+   operation being defined should not be executed, and conflict
+   handling should occur immediately.  In this case, conflict_handled
+   is set to true.
 */
 int
 ha_ndbcluster::prepare_conflict_detection(enum_conflicting_op_type op_type,
                                           const NdbRecord* key_rec,
                                           const uchar* old_data,
                                           const uchar* new_data,
+                                          NdbTransaction* trans,
                                           NdbInterpretedCode* code,
-                                          NdbOperation::OperationOptions* options)
+                                          NdbOperation::OperationOptions* options,
+                                          bool& conflict_handled)
 {
   DBUG_ENTER("prepare_conflict_detection");
-
+  THD* thd = table->in_use;
   int res = 0;
+  assert(thd->slave_thread);
+
+  conflict_handled = false;
+
+  /*
+     Check transaction id first, as in transactional conflict detection,
+     the transaction id is what eventually dictates whether an operation
+     is applied or not.
+
+     Not that this applies even if the current operation's table does not
+     have a conflict function defined - if a transaction spans a 'transactional
+     conflict detection' table and a non transactional table, the non-transactional
+     table's data will also be reverted.
+  */
+  Uint64 transaction_id = Ndb_binlog_extra_row_info::InvalidTransactionId;
+  if (thd->binlog_row_event_extra_data)
+  {
+    Ndb_binlog_extra_row_info extra_row_info;
+    extra_row_info.loadFromBuffer(thd->binlog_row_event_extra_data);
+    if (extra_row_info.getFlags() &
+        Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID)
+      transaction_id = extra_row_info.getTransactionId();
+  }
+
+  {
+    bool handle_conflict_now = false;
+    const uchar* row_data = (op_type == WRITE_ROW? new_data : old_data);
+    int res = g_ndb_slave_state.atPrepareConflictDetection(m_table,
+                                                           key_rec,
+                                                           row_data,
+                                                           transaction_id,
+                                                           handle_conflict_now);
+    if (res)
+      DBUG_RETURN(res);
+
+    if (handle_conflict_now)
+    {
+      DBUG_PRINT("info", ("Conflict handling for row occurring now"));
+      NdbError noRealConflictError;
+      const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
+
+      /*
+         Directly handle the conflict here - e.g refresh/ write to
+         exceptions table etc.
+      */
+      res = handle_row_conflict(m_share->m_cfn_share,
+                                m_share->table_name,
+                                m_share->flags & NSF_BLOB_FLAG,
+                                "Transaction",
+                                key_rec,
+                                row_to_save,
+                                op_type,
+                                TRANS_IN_CONFLICT,
+                                noRealConflictError,
+                                trans);
+      if (unlikely(res))
+        DBUG_RETURN(res);
+
+      g_ndb_slave_state.conflict_flags |= SCS_OPS_DEFINED;
+
+      /*
+        Indicate that there (may be) some more operations to
+        execute before committing
+      */
+      m_thd_ndb->m_unsent_bytes+= 12;
+      conflict_handled = true;
+      DBUG_RETURN(0);
+    }
+  }
+
+  if (! (m_share->m_cfn_share &&
+         m_share->m_cfn_share->m_conflict_fn))
+  {
+    /* No conflict function definition required */
+    DBUG_RETURN(0);
+  }
+
   const st_conflict_fn_def* conflict_fn = m_share->m_cfn_share->m_conflict_fn;
   assert( conflict_fn != NULL );
 
+  if (unlikely((conflict_fn->flags & CF_TRANSACTIONAL) &&
+               (transaction_id == Ndb_binlog_extra_row_info::InvalidTransactionId)))
+  {
+    sql_print_warning("NDB Slave : Transactional conflict detection defined on table %s, but "
+                      "events received without transaction ids.  Check --ndb-log-transaction-id setting "
+                      "on upstream Cluster.",
+                      m_share->key);
+  }
 
   /*
      Prepare interpreted code for operation (update + delete only) according
@@ -4280,7 +4835,7 @@ ha_ndbcluster::prepare_conflict_detectio
     }
   } // if (op_type != WRITE_ROW)
 
-  g_ndb_slave_state.current_conflict_defined_op_count++;
+  g_ndb_slave_state.conflict_flags |= SCS_OPS_DEFINED;
 
   /* Now save data for potential insert to exceptions table... */
   const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
@@ -4288,6 +4843,7 @@ ha_ndbcluster::prepare_conflict_detectio
   ex_data.share= m_share;
   ex_data.key_rec= key_rec;
   ex_data.op_type= op_type;
+  ex_data.trans_id= transaction_id;
   /*
     We need to save the row data for possible conflict resolution after
     execute().
@@ -4335,32 +4891,38 @@ handle_conflict_op_error(Thd_ndb* thd_nd
       (err.classification == NdbError::NoDataFound))
   {
     DBUG_PRINT("info",
-               ("err.code %s (int) error_conflict_fn_violation, "
-                "err.classification %s",
-                err.code == (int) error_conflict_fn_violation ? "==" : "!=",
-                err.classification
-                == NdbError::ConstraintViolation
-                ? "== NdbError::ConstraintViolation"
-                : (err.classification == NdbError::NoDataFound
-                   ? "== NdbError::NoDataFound" : "!=")));
+               ("err.code = %s, err.classification = %s",
+               ((err.code == (int) error_conflict_fn_violation)?
+                "error_conflict_fn_violation":
+                ((err.code == (int) error_op_after_refresh_op)?
+                 "error_op_after_refresh_op" : "?")),
+               ((err.classification == NdbError::ConstraintViolation)?
+                "ConstraintViolation":
+                ((err.classification == NdbError::NoDataFound)?
+                 "NoDataFound" : "?"))));
 
     enum_conflict_cause conflict_cause;
 
+    /* Map cause onto our conflict description type */
     if ((err.code == (int) error_conflict_fn_violation) ||
         (err.code == (int) error_op_after_refresh_op))
     {
+      DBUG_PRINT("info", ("ROW_IN_CONFLICT"));
       conflict_cause= ROW_IN_CONFLICT;
     }
     else if (err.classification == NdbError::ConstraintViolation)
     {
+      DBUG_PRINT("info", ("ROW_ALREADY_EXISTS"));
       conflict_cause= ROW_ALREADY_EXISTS;
     }
     else
     {
       assert(err.classification == NdbError::NoDataFound);
+      DBUG_PRINT("info", ("ROW_DOES_NOT_EXIST"));
       conflict_cause= ROW_DOES_NOT_EXIST;
     }
 
+    /* Get exceptions data from operation */
     const void* buffer=op->getCustomData();
     assert(buffer);
     Ndb_exceptions_data ex_data;
@@ -4368,88 +4930,65 @@ handle_conflict_op_error(Thd_ndb* thd_nd
     NDB_SHARE *share= ex_data.share;
     const NdbRecord* key_rec= ex_data.key_rec;
     const uchar* row= ex_data.row;
-    enum_conflicting_op_type op_type = ex_data.op_type;
-    DBUG_ASSERT(share != NULL && row != NULL);
+    enum_conflicting_op_type causing_op_type = ex_data.op_type;
 
-    NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
-    if (cfn_share)
-    {
-      enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
-      bool haveExTable = cfn_share->m_ex_tab != NULL;
-
-      g_ndb_slave_state.current_violation_count[cft]++;
+    DBUG_PRINT("info", ("Conflict causing op type : %u",
+                        causing_op_type));
 
+    if (causing_op_type == REFRESH_ROW)
+    {
+      /*
+         The failing op was a refresh row, we require that it
+         failed due to being a duplicate (e.g. a refresh
+         occurring on a refreshed row)
+       */
+      if (err.code == (int) error_op_after_refresh_op)
       {
-        NdbError handle_error;
-        if (handle_row_conflict(cfn_share,
-                                share->table_name,
-                                key_rec,
-                                row,
-                                op_type,
-                                conflict_cause,
-                                err,
-                                trans,
-                                handle_error))
-        {
-          /* Error with handling of row conflict */
-          char msg[FN_REFLEN];
-          my_snprintf(msg, sizeof(msg), "Row conflict handling "
-                      "on table %s hit Ndb error %d '%s'",
-                      share->table_name,
-                      handle_error.code,
-                      handle_error.message);
-
-          if (handle_error.status == NdbError::TemporaryError)
-          {
-            /* Slave will roll back and retry entire transaction. */
-            ERR_RETURN(handle_error);
-          }
-          else
-          {
-            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                                ER_EXCEPTIONS_WRITE_ERROR,
-                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-            /* Slave will stop replication. */
-            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-          }
-        }
+        DBUG_PRINT("info", ("Operation after refresh - ignoring"));
+        DBUG_RETURN(0);
+      }
+      else
+      {
+        DBUG_PRINT("info", ("Refresh op hit real error %u", err.code));
+        /* Unexpected error, normal handling*/
+        DBUG_RETURN(err.code);
       }
+    }
 
+    DBUG_ASSERT(share != NULL && row != NULL);
+    NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
+    bool table_has_trans_conflict_detection =
+      cfn_share &&
+      cfn_share->m_conflict_fn &&
+      (cfn_share->m_conflict_fn->flags & CF_TRANSACTIONAL);
+
+    if (table_has_trans_conflict_detection)
+    {
+      /* Perform special transactional conflict-detected handling */
+      int res = g_ndb_slave_state.atTransConflictDetected(ex_data.trans_id);
+      if (res)
+        DBUG_RETURN(res);
+    }
 
-      if (haveExTable)
-      {
-        NdbError ex_err;
-        if (write_conflict_row(share, trans, row, ex_err))
-        {
-          char msg[FN_REFLEN];
-          my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
-                      cfn_share->m_ex_tab->getName(),
-                      ex_err.code, ex_err.message);
+    if (cfn_share)
+    {
+      /* Now handle the conflict on this row */
+      enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
 
-          NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
+      g_ndb_slave_state.current_violation_count[cft]++;
 
-          if (ex_err.classification == NdbError::SchemaError)
-          {
-            dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
-            cfn_share->m_ex_tab= NULL;
-          }
-          else if (ex_err.status == NdbError::TemporaryError)
-          {
-            /* Slave will roll back and retry entire transaction. */
-            ERR_RETURN(ex_err);
-          }
-          else
-          {
-            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                                ER_EXCEPTIONS_WRITE_ERROR,
-                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-            /* Slave will stop replication. */
-            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-          }
-        }
-      } // if (haveExTable)
+      int res = handle_row_conflict(cfn_share,
+                                    share->table_name,
+                                    false, /* table_has_blobs */
+                                    "Row",
+                                    key_rec,
+                                    row,
+                                    causing_op_type,
+                                    conflict_cause,
+                                    err,
+                                    trans);
 
-      DBUG_RETURN(0);
+      DBUG_RETURN(res);
     }
     else
     {
@@ -4466,10 +5005,7 @@ handle_conflict_op_error(Thd_ndb* thd_nd
 
   DBUG_RETURN(0); // Reachable?
 }
-#endif /* HAVE_NDB_BINLOG */
-
 
-#ifdef HAVE_NDB_BINLOG
 /*
   is_serverid_local
 */
@@ -4684,12 +5220,33 @@ int ha_ndbcluster::ndb_write_row(uchar *
   MY_BITMAP tmpBitmap;
   MY_BITMAP *user_cols_written_bitmap;
 #ifdef HAVE_NDB_BINLOG
-  bool haveConflictFunction =
-    (thd->slave_thread &&
-     m_share->m_cfn_share &&
-     m_share->m_cfn_share->m_conflict_fn);
+  /* Conflict resolution in slave thread */
+  bool haveConflictFunction = false;
+  if (thd->slave_thread)
+  {
+    haveConflictFunction =
+      m_share->m_cfn_share &&
+      m_share->m_cfn_share->m_conflict_fn;
+    bool conflict_handled = false;
+
+    if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
+                                                     key_rec,
+                                                     NULL,    /* old_data */
+                                                     record,  /* new_data */
+                                                     trans,
+                                                     NULL,    /* code */
+                                                     &options,
+                                                     conflict_handled))))
+      DBUG_RETURN(error);
+
+    if (unlikely(conflict_handled))
+    {
+      /* No need to continue with operation definition */
+      /* TODO : Ensure batch execution */
+      DBUG_RETURN(0);
+    }
+  };
 #endif
-  
   if (m_use_write
 #ifdef HAVE_NDB_BINLOG
       /* Conflict detection must use normal Insert */
@@ -4732,19 +5289,6 @@ int ha_ndbcluster::ndb_write_row(uchar *
   }
   else
   {
-#ifdef HAVE_NDB_BINLOG
-    if (haveConflictFunction)
-    {
-      /* Conflict detection in slave thread */
-      if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
-                                                       key_rec,
-                                                       NULL,    /* old_data */
-                                                       record,  /* new_data */
-                                                       NULL,    /* code */
-                                                       &options))))
-        DBUG_RETURN(error);
-    }
-#endif
     uchar *mask;
 
     /* Check whether Ndb table definition includes any default values. */
@@ -4891,20 +5435,34 @@ int ha_ndbcluster::primary_key_cmp(const
 }
 
 #ifdef HAVE_NDB_BINLOG
+
+static Ndb_exceptions_data StaticRefreshExceptionsData=
+{ NULL, NULL, NULL, REFRESH_ROW, 0 };
+
 int
 handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
                     const char* table_name,
+                    bool table_has_blobs,
+                    const char* handling_type,
                     const NdbRecord* key_rec,
                     const uchar* pk_row,
                     enum_conflicting_op_type op_type,
                     enum_conflict_cause conflict_cause,
                     const NdbError& conflict_error,
-                    NdbTransaction* conflict_trans,
-                    NdbError& err)
+                    NdbTransaction* conflict_trans)
 {
   DBUG_ENTER("handle_row_conflict");
 
-  if (cfn_share->m_flags & CFF_REFRESH_ROWS)
+  /*
+     We will refresh the row if the conflict function requires
+     it, or if we are handling a transactional conflict.
+  */
+  bool refresh_row =
+    (conflict_cause == TRANS_IN_CONFLICT) ||
+    (cfn_share &&
+     (cfn_share->m_flags & CFF_REFRESH_ROWS));
+
+  if (refresh_row)
   {
     /* A conflict has been detected between an applied replicated operation
      * and the data in the DB.
@@ -4927,66 +5485,220 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
     assert(key_rec != NULL);
     assert(pk_row != NULL);
 
-    /* When the slave splits an epoch into batches, a conflict row detected
-     * and refreshed in an early batch can be written to by operations in
-     * a later batch.  As the operations will not have applied, and the
-     * row has already been refreshed, we need not attempt to refresh
-     * it again
-     */
-    if ((conflict_cause == ROW_IN_CONFLICT) &&
-        (conflict_error.code == (int) error_op_after_refresh_op))
+    do
     {
-      /* Attempt to apply an operation after the row was refreshed
-       * Ignore the error
+      /* We cannot refresh a row which has Blobs, as we do not support
+       * Blob refresh yet.
+       * Rows implicated by a transactional conflict function may have
+       * Blobs.
+       * We will generate an error in this case
        */
-      DBUG_PRINT("info", ("Operation after refresh error - ignoring"));
-      DBUG_RETURN(0);
-    }
+      if (table_has_blobs)
+      {
+        char msg[FN_REFLEN];
+        my_snprintf(msg, sizeof(msg), "%s conflict handling "
+                    "on table %s failed as table has Blobs which cannot be refreshed.",
+                    handling_type,
+                    table_name);
+
+        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_EXCEPTIONS_WRITE_ERROR,
+                            ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+
+        DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+      }
+
+      /* When the slave splits an epoch into batches, a conflict row detected
+       * and refreshed in an early batch can be written to by operations in
+       * a later batch.  As the operations will not have applied, and the
+       * row has already been refreshed, we need not attempt to refresh
+       * it again
+       */
+      if ((conflict_cause == ROW_IN_CONFLICT) &&
+          (conflict_error.code == (int) error_op_after_refresh_op))
+      {
+        /* Attempt to apply an operation after the row was refreshed
+         * Ignore the error
+         */
+        DBUG_PRINT("info", ("Operation after refresh error - ignoring"));
+        break;
+      }
 
-    /* When a delete operation finds that the row does not exist, it indicates
-     * a DELETE vs DELETE conflict.  If we refresh the row then we can get
-     * non deterministic behaviour depending on slave batching as follows :
-     *   Row is deleted
-     *
-     *     Case 1
-     *       Slave applied DELETE, INSERT in 1 batch
-     *
-     *         After first batch, the row is present (due to INSERT), it is
-     *         refreshed.
-     *
-     *     Case 2
-     *       Slave applied DELETE in 1 batch, INSERT in 2nd batch
-     *
-     *         After first batch, the row is not present, it is refreshed
-     *         INSERT is then rejected.
-     *
-     * The problem of not being able to 'record' a DELETE vs DELETE conflict
-     * is known.  We attempt at least to give consistent behaviour for
-     * DELETE vs DELETE conflicts by :
-     *   NOT refreshing a row when a DELETE vs DELETE conflict is detected
-     * This should map all batching scenarios onto Case1.
-     */
-    if ((op_type == DELETE_ROW) &&
-        (conflict_cause == ROW_DOES_NOT_EXIST))
+      /* When a delete operation finds that the row does not exist, it indicates
+       * a DELETE vs DELETE conflict.  If we refresh the row then we can get
+       * non deterministic behaviour depending on slave batching as follows :
+       *   Row is deleted
+       *
+       *     Case 1
+       *       Slave applied DELETE, INSERT in 1 batch
+       *
+       *         After first batch, the row is present (due to INSERT), it is
+       *         refreshed.
+       *
+       *     Case 2
+       *       Slave applied DELETE in 1 batch, INSERT in 2nd batch
+       *
+       *         After first batch, the row is not present, it is refreshed
+       *         INSERT is then rejected.
+       *
+       * The problem of not being able to 'record' a DELETE vs DELETE conflict
+       * is known.  We attempt at least to give consistent behaviour for
+       * DELETE vs DELETE conflicts by :
+       *   NOT refreshing a row when a DELETE vs DELETE conflict is detected
+       * This should map all batching scenarios onto Case1.
+       */
+      if ((op_type == DELETE_ROW) &&
+          (conflict_cause == ROW_DOES_NOT_EXIST))
+      {
+        DBUG_PRINT("info", ("Delete vs Delete detected, NOT refreshing"));
+        break;
+      }
+
+      /*
+        We give the refresh operation some 'exceptions data', so that
+        it can be identified as part of conflict resolution when
+        handling operation errors.
+        Specifically we need to be able to handle duplicate row
+        refreshes.
+        As there is no unique exceptions data, we use a singleton.
+
+        We also need to 'force' the ANYVALUE of the row to 0 to
+        indicate that the refresh is locally-sourced.
+        Otherwise we can 'pickup' the ANYVALUE of a previous
+        update to the row.
+        If some previous update in this transaction came from a
+        Slave, then using its ANYVALUE can result in that Slave
+        ignoring this correction.
+      */
+      NdbOperation::OperationOptions options;
+      options.optionsPresent =
+        NdbOperation::OperationOptions::OO_CUSTOMDATA |
+        NdbOperation::OperationOptions::OO_ANYVALUE;
+      options.customData = &StaticRefreshExceptionsData;
+      options.anyValue = 0;
+
+      /* Create a refresh to operation to realign other clusters */
+      // TODO AnyValue
+      // TODO Do we ever get non-PK key?
+      //      Keyless table?
+      //      Unique index
+      const NdbOperation* refresh_op= conflict_trans->refreshTuple(key_rec,
+                                                                   (const char*) pk_row,
+                                                                   &options,
+                                                                   sizeof(options));
+      if (!refresh_op)
+      {
+        NdbError err = conflict_trans->getNdbError();
+
+        if (err.status == NdbError::TemporaryError)
+        {
+          /* Slave will roll back and retry entire transaction. */
+          ERR_RETURN(err);
+        }
+        else
+        {
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "Row conflict handling "
+                      "on table %s hit Ndb error %d '%s'",
+                      table_name,
+                      err.code,
+                      err.message);
+          push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                              ER_EXCEPTIONS_WRITE_ERROR,
+                              ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+          /* Slave will stop replication. */
+          DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+        }
+      }
+    } while(0); // End of 'refresh' block
+  }
+
+  if (cfn_share &&
+      cfn_share->m_ex_tab != NULL)
+  {
+    NdbError err;
+    assert(err.code == 0);
+    do
     {
-      DBUG_PRINT("info", ("Delete vs Delete detected, NOT refreshing"));
-      DBUG_RETURN(0);
-    }
+      /* Have exceptions table, add row to it */
+      const NDBTAB *ex_tab= cfn_share->m_ex_tab;
 
-    /* Create a refresh to operation to realign other clusters */
-    // TODO AnyValue
-    // TODO Do we ever get non-PK key?
-    //      Keyless table?
-    //      Unique index
-    const NdbOperation* refresh_op= conflict_trans->refreshTuple(key_rec,
-                                                                 (const char*) pk_row);
+      /* get insert op */
+      NdbOperation *ex_op= conflict_trans->getNdbOperation(ex_tab);
+      if (ex_op == NULL)
+      {
+        err= conflict_trans->getNdbError();
+        break;
+      }
+      if (ex_op->insertTuple() == -1)
+      {
+        err= ex_op->getNdbError();
+        break;
+      }
+      {
+        uint32 server_id= (uint32)::server_id;
+        uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
+        uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
+        uint32 count= (uint32)++(cfn_share->m_count);
+        if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
+            ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
+            ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
+            ex_op->setValue((Uint32)3, (const char *)&(count)))
+        {
+          err= ex_op->getNdbError();
+          break;
+        }
+      }
+      /* copy primary keys */
+      {
+        const int fixed_cols= 4;
+        int nkey= cfn_share->m_pk_cols;
+        int k;
+        for (k= 0; k < nkey; k++)
+        {
+          DBUG_ASSERT(pk_row != NULL);
+          const uchar* data= pk_row + cfn_share->m_offset[k];
+          if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
+          {
+            err= ex_op->getNdbError();
+            break;
+          }
+        }
+      }
+    } while (0);
 
-    if (!refresh_op)
+    if (err.code != 0)
     {
-      err= conflict_trans->getNdbError();
-      DBUG_RETURN(1);
+      char msg[FN_REFLEN];
+      my_snprintf(msg, sizeof(msg), "%s conflict handling "
+                  "on table %s hit Ndb error %d '%s'",
+                  handling_type,
+                  table_name,
+                  err.code,
+                  err.message);
+
+      if (err.classification == NdbError::SchemaError)
+      {
+        /* Something up with Exceptions table schema, forget it */
+        NdbDictionary::Dictionary* dict= conflict_trans->getNdb()->getDictionary();
+        dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
+        cfn_share->m_ex_tab= NULL;
+      }
+      else if (err.status == NdbError::TemporaryError)
+      {
+        /* Slave will roll back and retry entire transaction. */
+        ERR_RETURN(err);
+      }
+      else
+      {
+        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_EXCEPTIONS_WRITE_ERROR,
+                            ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+        /* Slave will stop replication. */
+        DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+      }
     }
-  } /* if (cfn_share->m_flags & CFF_REFRESH_ROWS) */
+  } /* if (cfn_share->m_ex_tab != NULL) */
 
   DBUG_RETURN(0);
 };
@@ -5344,17 +6056,27 @@ int ha_ndbcluster::ndb_update_row(const
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
 
-    if (thd->slave_thread && m_share->m_cfn_share &&
-        m_share->m_cfn_share->m_conflict_fn)
+    if (thd->slave_thread)
     {
-       /* Conflict resolution in slave thread. */
+      bool conflict_handled = false;
+      /* Conflict resolution in slave thread. */
+
       if (unlikely((error = prepare_conflict_detection(UPDATE_ROW,
                                                        key_rec,
                                                        old_data,
                                                        new_data,
+                                                       trans,
                                                        &code,
-                                                       &options))))
+                                                       &options,
+                                                       conflict_handled))))
         DBUG_RETURN(error);
+
+      if (unlikely(conflict_handled))
+      {
+        /* No need to continue with operation defintion */
+        /* TODO : Ensure batch execution */
+        DBUG_RETURN(0);
+      }
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent !=0)
@@ -5623,17 +6345,27 @@ int ha_ndbcluster::ndb_delete_row(const
     Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
-    if (thd->slave_thread && m_share->m_cfn_share &&
-        m_share->m_cfn_share->m_conflict_fn)
+    if (thd->slave_thread)
     {
+       bool conflict_handled = false;
+
       /* Conflict resolution in slave thread. */
       if (unlikely((error = prepare_conflict_detection(DELETE_ROW,
                                                        key_rec,
                                                        key_row, /* old_data */
                                                        NULL,    /* new_data */
+                                                       trans,
                                                        &code,
-                                                       &options))))
+                                                       &options,
+                                                       conflict_handled))))
         DBUG_RETURN(error);
+
+      if (unlikely(conflict_handled))
+      {
+        /* No need to continue with operation definition */
+        /* TODO : Ensure batch execution */
+        DBUG_RETURN(0);
+      }
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent != 0)
@@ -7541,6 +8273,8 @@ int ndbcluster_commit(handlerton *hton,
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   Ndb *ndb= thd_ndb->ndb;
   NdbTransaction *trans= thd_ndb->trans;
+  bool retry_slave_trans = false;
+  (void) retry_slave_trans;
 
   DBUG_ENTER("ndbcluster_commit");
   DBUG_ASSERT(ndb);
@@ -7580,9 +8314,23 @@ int ndbcluster_commit(handlerton *hton,
 
   if (thd->slave_thread)
   {
-    if (!g_ndb_slave_state.current_conflict_defined_op_count ||
-        !thd_ndb->m_unsent_bytes ||
-        !(res= execute_no_commit(thd_ndb, trans, TRUE)))
+#ifdef HAVE_NDB_BINLOG
+    /* If this slave transaction has included conflict detecting ops
+     * and some defined operations are not yet sent, then perform
+     * an execute(NoCommit) before committing, as conflict op handling
+     * is done by execute(NoCommit)
+     */
+    if (g_ndb_slave_state.conflict_flags & SCS_OPS_DEFINED)
+    {
+      if (thd_ndb->m_unsent_bytes)
+        res = execute_no_commit(thd_ndb, trans, TRUE);
+    }
+
+    if (likely(res == 0))
+      res = g_ndb_slave_state.atConflictPreCommit(retry_slave_trans);
+#endif /* HAVE_NDB_BINLOG */
+
+    if (likely(res == 0))
       res= execute_commit(thd, thd_ndb, trans, 1, TRUE);
 
     update_slave_api_stats(thd_ndb->ndb);
@@ -7611,12 +8359,50 @@ int ndbcluster_commit(handlerton *hton,
 
   if (res != 0)
   {
-    const NdbError err= trans->getNdbError();
-    const NdbOperation *error_op= trans->getNdbErrorOperation();
-    set_ndb_err(thd, err);
-    res= ndb_to_mysql_error(&err);
-    if (res != -1)
-      ndbcluster_print_error(res, error_op);
+#ifdef HAVE_NDB_BINLOG
+    if (retry_slave_trans)
+    {
+      if (st_ndb_slave_state::MAX_RETRY_TRANS_COUNT >
+          g_ndb_slave_state.retry_trans_count++)
+      {
+        /*
+           Warning is necessary to cause retry from slave.cc
+           exec_relay_log_event()
+        */
+        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                     ER_GET_TEMPORARY_ERRMSG,
+                     SLAVE_SILENT_RETRY_MSG);
+        /*
+          Set retry count to zero to:
+          1) Avoid consuming slave-temp-error retry attempts
+          2) Ensure no inter-attempt sleep
+
+          Better fix : Save + restore retry count around transactional
+          conflict handling
+        */
+        ndb_mi_set_relay_log_trans_retries(0);
+      }
+      else
+      {
+        /*
+           Too many retries, print error and exit - normal
+           too many retries mechanism will cause exit
+         */
+        sql_print_error("Ndb slave retried transaction %u time(s) in vain.  Giving up.",
+                        st_ndb_slave_state::MAX_RETRY_TRANS_COUNT);
+      }
+      res= ER_GET_TEMPORARY_ERRMSG;
+    }
+    else
+#endif
+    {
+      const NdbError err= trans->getNdbError();
+      const NdbOperation *error_op= trans->getNdbErrorOperation();
+      set_ndb_err(thd, err);
+      res= ndb_to_mysql_error(&err);
+      if (res != -1)
+        ndbcluster_print_error(res, error_op);
+    }
   }
   else
   {
@@ -8812,6 +9598,7 @@ int ha_ndbcluster::create(const char *na
     switch(conflict_fn->type)
     {
     case CFT_NDB_EPOCH:
+    case CFT_NDB_EPOCH_TRANS:
     {
       /* Default 6 extra Gci bits allows 2^6 == 64
        * epochs / saveGCP, a comfortable default
@@ -11482,6 +12269,12 @@ static int ndbcluster_init(void *p)
   if (ndbcluster_inited)
     DBUG_RETURN(FALSE);
 
+#ifdef HAVE_NDB_BINLOG
+  /* Check const alignment */
+  assert(DependencyTracker::InvalidTransactionId ==
+         Ndb_binlog_extra_row_info::InvalidTransactionId);
+#endif
+
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
   pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&COND_ndb_util_thread, NULL);
@@ -16342,6 +17135,18 @@ static MYSQL_SYSVAR_BOOL(
 );
 
 
+my_bool opt_ndb_log_transaction_id;
+static MYSQL_SYSVAR_BOOL(
+  log_transaction_id,               /* name */
+  opt_ndb_log_transaction_id,       /* var  */
+  PLUGIN_VAR_OPCMDARG,
+  "Log Ndb transaction identities per row in the Binlog",
+  NULL,                             /* check func. */
+  NULL,                             /* update func. */
+  0                                 /* default */
+);
+
+
 static MYSQL_SYSVAR_STR(
   connectstring,                    /* name */
   opt_ndb_connectstring,            /* var */
@@ -16449,6 +17254,7 @@ static struct st_mysql_sys_var* system_v
   MYSQL_SYSVAR(log_binlog_index),
   MYSQL_SYSVAR(log_empty_epochs),
   MYSQL_SYSVAR(log_apply_status),
+  MYSQL_SYSVAR(log_transaction_id),
   MYSQL_SYSVAR(connectstring),
   MYSQL_SYSVAR(mgmd_host),
   MYSQL_SYSVAR(nodeid),

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-08-31 10:53:27 +0000
+++ b/sql/ha_ndbcluster.h	2011-09-07 22:50:01 +0000
@@ -127,6 +127,7 @@ enum enum_conflict_fn_type
   ,CFT_NDB_OLD
   ,CFT_NDB_MAX_DEL_WIN
   ,CFT_NDB_EPOCH
+  ,CFT_NDB_EPOCH_TRANS
   ,CFT_NUMBER_OF_CFTS /* End marker */
 };
 
@@ -163,7 +164,8 @@ enum enum_conflicting_op_type
 {                /* NdbApi          */
   WRITE_ROW,     /* insert (!write) */
   UPDATE_ROW,    /* update          */
-  DELETE_ROW     /* delete          */
+  DELETE_ROW,    /* delete          */
+  REFRESH_ROW    /* refresh         */
 };
 
 /*
@@ -179,20 +181,27 @@ typedef int (* prepare_detect_func) (str
                                      const MY_BITMAP* write_set,
                                      class NdbInterpretedCode* code);
 
+enum enum_conflict_fn_flags
+{
+  CF_TRANSACTIONAL = 1
+};
+
 struct st_conflict_fn_def
 {
   const char *name;
   enum_conflict_fn_type type;
   const st_conflict_fn_arg_def* arg_defs;
   prepare_detect_func prep_func;
+  uint8 flags; /* enum_conflict_fn_flags */
 };
 
 /* What sort of conflict was found */
 enum enum_conflict_cause
 {
-  ROW_ALREADY_EXISTS,
-  ROW_DOES_NOT_EXIST,
-  ROW_IN_CONFLICT
+  ROW_ALREADY_EXISTS,   /* On insert */
+  ROW_DOES_NOT_EXIST,   /* On Update, Delete */
+  ROW_IN_CONFLICT,      /* On Update, Delete */
+  TRANS_IN_CONFLICT     /* Any of above, or implied by transaction */
 };
 
 /* NdbOperation custom data which points out handler and record. */
@@ -201,6 +210,7 @@ struct Ndb_exceptions_data {
   const NdbRecord* key_rec;
   const uchar* row;
   enum_conflicting_op_type op_type;
+  Uint64 trans_id;
 };
 
 enum enum_conflict_fn_table_flags
@@ -343,6 +353,32 @@ inline void set_binlog_use_update(NDB_SH
 inline my_bool get_binlog_use_update(NDB_SHARE *share)
 { return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
 
+enum enum_slave_trans_conflict_apply_state
+{
+  /* Normal with optional row-level conflict detection */
+  SAS_NORMAL,
+
+  /*
+    SAS_TRACK_TRANS_DEPENDENCIES
+    Track inter-transaction dependencies
+  */
+  SAS_TRACK_TRANS_DEPENDENCIES,
+
+  /*
+    SAS_APPLY_TRANS_DEPENDENCIES
+    Apply only non conflicting transactions
+  */
+  SAS_APPLY_TRANS_DEPENDENCIES
+};
+
+enum enum_slave_conflict_flags
+{
+  /* Conflict detection Ops defined */
+  SCS_OPS_DEFINED = 1,
+  /* Conflict detected on table with transactional resolution */
+  SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2
+};
+
 /*
   State associated with the Slave thread
   (From the Ndb handler's point of view)
@@ -350,17 +386,52 @@ inline my_bool get_binlog_use_update(NDB
 struct st_ndb_slave_state
 {
   /* Counter values for current slave transaction */
-  Uint32 current_conflict_defined_op_count;
   Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
   Uint64 current_master_server_epoch;
   Uint64 current_max_rep_epoch;
+  uint8 conflict_flags; /* enum_slave_conflict_flags */
+    /* Transactional conflict detection */
+  Uint32 retry_trans_count;
+  Uint32 current_trans_row_conflict_count;
+  Uint32 current_trans_row_reject_count;
+  Uint32 current_trans_in_conflict_count;
 
   /* Cumulative counter values */
   Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
   Uint64 max_rep_epoch;
   Uint32 sql_run_id;
+  /* Transactional conflict detection */
+  Uint64 trans_row_conflict_count;
+  Uint64 trans_row_reject_count;
+  Uint64 trans_detect_iter_count;
+  Uint64 trans_in_conflict_count;
+  Uint64 trans_conflict_commit_count;
+
+  static const Uint32 MAX_RETRY_TRANS_COUNT = 100;
+
+  /*
+    Slave Apply State
+
+    State of Binlog application from Ndb point of view.
+  */
+  enum_slave_trans_conflict_apply_state trans_conflict_apply_state;
+
+  MEM_ROOT conflict_mem_root;
+  class DependencyTracker* trans_dependency_tracker;
 
   /* Methods */
+  void atStartSlave();
+  int  atPrepareConflictDetection(const NdbDictionary::Table* table,
+                                  const NdbRecord* key_rec,
+                                  const uchar* row_data,
+                                  Uint64 transaction_id,
+                                  bool& handle_conflict_now);
+  int  atTransConflictDetected(Uint64 transaction_id);
+  int  atConflictPreCommit(bool& retry_slave_trans);
+
+  void atBeginTransConflictHandling();
+  void atEndTransConflictHandling();
+
   void atTransactionCommit();
   void atTransactionAbort();
   void atResetSlave();
@@ -370,6 +441,8 @@ struct st_ndb_slave_state
                           Uint64 row_epoch,
                           bool is_row_server_id_local);
 
+  void resetPerAttemptCounters();
+
   st_ndb_slave_state();
 };
 
@@ -702,8 +775,10 @@ private:
                                  const NdbRecord* key_rec,
                                  const uchar* old_data,
                                  const uchar* new_data,
+                                 NdbTransaction* trans,
                                  NdbInterpretedCode* code,
-                                 NdbOperation::OperationOptions* options);
+                                 NdbOperation::OperationOptions* options,
+                                 bool& conflict_handled);
 #endif
   void setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
                                     const uchar **key_row,

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-08-27 12:12:27 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-09-07 22:50:01 +0000
@@ -47,6 +47,7 @@ extern my_bool opt_ndb_log_binlog_index;
 extern my_bool opt_ndb_log_apply_status;
 extern ulong opt_ndb_extra_logging;
 extern st_ndb_slave_state g_ndb_slave_state;
+extern my_bool opt_ndb_log_transaction_id;
 
 bool ndb_log_empty_epochs(void);
 
@@ -4247,13 +4248,15 @@ static const st_conflict_fn_arg_def epoc
 static const st_conflict_fn_def conflict_fns[]=
 {
   { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
-    &resolve_col_args[0], row_conflict_fn_max_del_win },
+    &resolve_col_args[0], row_conflict_fn_max_del_win, 0 },
   { "NDB$MAX",            CFT_NDB_MAX,
-    &resolve_col_args[0], row_conflict_fn_max         },
+    &resolve_col_args[0], row_conflict_fn_max,         0 },
   { "NDB$OLD",            CFT_NDB_OLD,
-    &resolve_col_args[0], row_conflict_fn_old         },
+    &resolve_col_args[0], row_conflict_fn_old,         0 },
+  { "NDB$EPOCH_TRANS",    CFT_NDB_EPOCH_TRANS,
+    &epoch_fn_args[0],    row_conflict_fn_epoch,       CF_TRANSACTIONAL},
   { "NDB$EPOCH",          CFT_NDB_EPOCH,
-    &epoch_fn_args[0],    row_conflict_fn_epoch       }
+    &epoch_fn_args[0],    row_conflict_fn_epoch,       0 }
 };
 
 static unsigned n_conflict_fns=
@@ -4508,6 +4511,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
     break;
   }
   case CFT_NDB_EPOCH:
+  case CFT_NDB_EPOCH_TRANS:
   {
     if (num_args > 1)
     {
@@ -4520,7 +4524,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
     /* Check that table doesn't have Blobs as we don't support that */
     if (share->flags & NSF_BLOB_FLAG)
     {
-      my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH.");
+      my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH[_TRANS].");
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
@@ -4530,7 +4534,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
      * represent SavePeriod/EpochPeriod
      */
     if (ndbtab->getExtraRowGciBits() == 0)
-      sql_print_information("Ndb Slave : CFT_NDB_EPOCH, low epoch resolution");
+      sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution");
 
     if (ndbtab->getExtraRowAuthorBits() == 0)
     {
@@ -6140,8 +6144,9 @@ ndb_find_binlog_index_row(ndb_binlog_ind
   return row;
 }
 
+
 static int
-ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
+ndb_binlog_thread_handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
                                     ndb_binlog_index_row **rows,
                                     injector::transaction &trans,
                                     unsigned &trans_row_count,
@@ -6288,6 +6293,17 @@ ndb_binlog_thread_handle_data_event(Ndb
   uint32 logged_server_id= anyValue;
   ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
 
+  /*
+     Get NdbApi transaction id for this event to put into Binlog
+  */
+  Ndb_binlog_extra_row_info extra_row_info;
+  if (opt_ndb_log_transaction_id)
+  {
+    extra_row_info.setFlags(Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID);
+    extra_row_info.setTransactionId(pOp->getTransId());
+    thd->binlog_row_event_extra_data = extra_row_info.generateBuffer();
+  }
+
   DBUG_ASSERT(trans.good());
   DBUG_ASSERT(table != 0);
 
@@ -6457,6 +6473,11 @@ ndb_binlog_thread_handle_data_event(Ndb
     break;
   }
 
+  if (opt_ndb_log_transaction_id)
+  {
+    thd->binlog_row_event_extra_data = NULL;
+  }
+
   if (share->flags & NSF_BLOB_FLAG)
   {
     my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
@@ -7362,7 +7383,8 @@ restart_cluster_failure:
 #endif
           if ((unsigned) pOp->getEventType() <
               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
-            ndb_binlog_thread_handle_data_event(i_ndb, pOp, &rows, trans, trans_row_count, trans_slave_row_count);
+            ndb_binlog_thread_handle_data_event(thd, i_ndb, pOp, &rows, trans,
+                                                trans_row_count, trans_slave_row_count);
           else
           {
             ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
@@ -7784,4 +7806,121 @@ ulong opt_server_id_mask = ~0;
 
 #endif
 
+Ndb_binlog_extra_row_info::
+Ndb_binlog_extra_row_info()
+{
+  flags = 0;
+  transactionId = InvalidTransactionId;
+  /* Prepare buffer with extra row info buffer bytes */
+  buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = 0;
+  buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] = ERIF_NDB;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setFlags(Uint16 _flags)
+{
+  flags = _flags;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setTransactionId(Uint64 _transactionId)
+{
+  assert(_transactionId != InvalidTransactionId);
+  transactionId = _transactionId;
+};
+
+int
+Ndb_binlog_extra_row_info::
+loadFromBuffer(const uchar* extra_row_info)
+{
+  assert(extra_row_info);
+
+  Uint8 length = extra_row_info[ EXTRA_ROW_INFO_LEN_OFFSET ];
+  assert(length >= EXTRA_ROW_INFO_HDR_BYTES);
+  Uint8 payload_length = length - EXTRA_ROW_INFO_HDR_BYTES;
+  Uint8 format = extra_row_info[ EXTRA_ROW_INFO_FORMAT_OFFSET ];
+
+  if (likely(format == ERIF_NDB))
+  {
+    if (likely(payload_length >= FLAGS_SIZE))
+    {
+      const uchar* data = &extra_row_info[ EXTRA_ROW_INFO_HDR_BYTES ];
+      Uint8 nextPos = 0;
+
+      /* Have flags at least */
+      Uint16 netFlags;
+      memcpy(&netFlags, &data[ nextPos ], FLAGS_SIZE);
+      nextPos += FLAGS_SIZE;
+      flags = uint2korr((const char*) &netFlags);
+
+      if (flags & NDB_ERIF_TRANSID)
+      {
+        if (likely((nextPos + TRANSID_SIZE) <= payload_length))
+        {
+          /*
+            Correct length, retrieve transaction id, converting from
+            little endian if necessary.
+          */
+          Uint64 netTransId;
+          memcpy(&netTransId,
+                 &data[ nextPos ],
+                 TRANSID_SIZE);
+          nextPos += TRANSID_SIZE;
+          transactionId = uint8korr((const char*) &netTransId);
+        }
+        else
+        {
+          /*
+             Error - supposed to have transaction id, but
+             buffer too short
+          */
+          return -1;
+        }
+      }
+    }
+  }
+
+  /* We currently ignore other formats of extra binlog info, and
+   * different lengths.
+   */
+
+  return 0;
+}
+
+uchar*
+Ndb_binlog_extra_row_info::generateBuffer()
+{
+  /*
+    Here we write out the buffer in network format,
+    based on the current member settings.
+  */
+  Uint8 nextPos = EXTRA_ROW_INFO_HDR_BYTES;
+
+  if (flags)
+  {
+    /* Write current flags into buff */
+    Uint16 netFlags = uint2korr((const char*) &flags);
+    memcpy(&buff[ nextPos ], &netFlags, FLAGS_SIZE);
+    nextPos += FLAGS_SIZE;
+
+    if (flags & NDB_ERIF_TRANSID)
+    {
+      Uint64 netTransactionId = uint8korr((const char*) &transactionId);
+      memcpy(&buff[ nextPos ], &netTransactionId, TRANSID_SIZE);
+      nextPos += TRANSID_SIZE;
+    }
+
+    assert( nextPos <= MaxLen );
+    /* Set length */
+    assert( buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] == ERIF_NDB );
+    buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = nextPos;
+
+    return buff;
+  }
+  return 0;
+}
+
+// #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
 #endif

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-07-04 13:37:56 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-09-07 22:50:01 +0000
@@ -382,3 +382,49 @@ void ndbcluster_anyvalue_set_serverid(Ui
 #ifndef DBUG_OFF
 void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue);
 #endif
+
+/*
+   Helper for reading/writing Binlog extra row info
+   in Ndb format.
+   It contains an internal buffer, which can be passed
+   in the thd variable when writing binlog entries if
+   the object stays in scope around the write.
+*/
+class Ndb_binlog_extra_row_info
+{
+public:
+  static const Uint32 FLAGS_SIZE = sizeof(Uint16);
+  static const Uint32 TRANSID_SIZE = sizeof(Uint64);
+  static const Uint32 MaxLen =
+    EXTRA_ROW_INFO_HDR_BYTES +
+    FLAGS_SIZE +
+    TRANSID_SIZE;
+
+  static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+  enum Flags
+  {
+    NDB_ERIF_TRANSID = 0x1
+  };
+
+  Ndb_binlog_extra_row_info();
+
+  int loadFromBuffer(const uchar* extra_row_info_ptr);
+
+  Uint16 getFlags() const
+  {
+    return flags;
+  }
+  void setFlags(Uint16 _flags);
+  Uint64 getTransactionId() const
+  { return transactionId; };
+  void setTransactionId(Uint64 _transactionId);
+  uchar* getBuffPtr()
+  { return buff; };
+  uchar* generateBuffer();
+private:
+  uchar buff[MaxLen];
+  Uint16 flags;
+  Uint64 transactionId;
+};
+

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-06-30 15:59:25 +0000
+++ b/sql/log_event.cc	2011-09-07 22:50:01 +0000
@@ -1952,6 +1952,32 @@ void Rows_log_event::print_verbose(IO_CA
   const char *sql_command, *sql_clause1, *sql_clause2;
   Log_event_type type_code= get_type_code();
   
+#ifndef MCP_WL5353
+  if (m_extra_row_data)
+  {
+    uint8 extra_data_len= m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
+    uint8 extra_payload_len= extra_data_len - EXTRA_ROW_INFO_HDR_BYTES;
+    assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
+
+    my_b_printf(file, "### Extra row data format: %u, len: %u :",
+                m_extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET],
+                extra_payload_len);
+    if (extra_payload_len)
+    {
+      /*
+         Buffer for hex view of string, including '0x' prefix,
+         2 hex chars / byte and trailing 0
+      */
+      const int buff_len= 2 + (256 * 2) + 1;
+      char buff[buff_len];
+      str_to_hex(buff, (const char*) &m_extra_row_data[EXTRA_ROW_INFO_HDR_BYTES],
+                 extra_payload_len);
+      my_b_printf(file, "%s", buff);
+    }
+    my_b_printf(file, "\n");
+  }
+#endif
+
   switch (type_code) {
   case WRITE_ROWS_EVENT:
     sql_command= "INSERT INTO";
@@ -3091,6 +3117,46 @@ int Query_log_event::do_apply_event(Rela
   return do_apply_event(rli, query, q_len);
 }
 
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+
+static bool is_silent_error(THD* thd)
+{
+  DBUG_ENTER("is_silent_error");
+  List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+  MYSQL_ERROR *err;
+
+  while ((err= it++))
+  {
+    DBUG_PRINT("info", ("Error : %u : %s", err->code, err->msg));
+
+    switch (err->code)
+    {
+    case ER_GET_TEMPORARY_ERRMSG:
+    {
+      /*
+         If we are rolling back due to an explicit request, do so
+         silently
+      */
+      if (strcmp(SLAVE_SILENT_RETRY_MSG, err->msg) == 0)
+      {
+        DBUG_PRINT("info", ("Silent retry"));
+        DBUG_RETURN(true);
+      }
+
+      break;
+    }
+    default:
+      break;
+    }
+  }
+
+  DBUG_RETURN(false);
+}
+
+// #ifdef HAVE_NDB_BINLOG
+#endif
+#endif // #ifndef MCP_WL5353
 
 /**
   @todo
@@ -3411,11 +3477,19 @@ Default database: '%s'. Query: '%s'",
     */
     else if (thd->is_slave_error || thd->is_fatal_error)
     {
-      rli->report(ERROR_LEVEL, actual_error,
-                      "Error '%s' on query. Default database: '%s'. Query: '%s'",
-                      (actual_error ? thd->main_da.message() :
-                       "unexpected success or fatal error"),
-                      print_slave_db_safe(thd->db), query_arg);
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+      bool be_silent= is_silent_error(thd);
+      if (!be_silent)
+#endif
+#endif // #ifndef MCP_WL5353
+      {
+        rli->report(ERROR_LEVEL, actual_error,
+                    "Error '%s' on query. Default database: '%s'. Query: '%s'",
+                    (actual_error ? thd->main_da.message() :
+                     "unexpected success or fatal error"),
+                    print_slave_db_safe(thd->db), query_arg);
+      }
       thd->is_slave_error= 1;
     }
 
@@ -7128,6 +7202,63 @@ const char *sql_ex_info::init(const char
   return buf;
 }
 
+#ifndef MCP_WL5353
+#ifndef DBUG_OFF
+#ifndef MYSQL_CLIENT
+static uchar dbug_extra_row_data_val= 0;
+
+/**
+   set_extra_data
+
+   Called during self-test to generate various
+   self-consistent binlog row event extra
+   thread data structures which can be checked
+   when reading the binlog.
+
+   @param thd  Current thd
+   @param arr  Buffer to use
+*/
+void set_extra_data(THD* thd, uchar* arr)
+{
+  assert(thd->binlog_row_event_extra_data == NULL);
+  uchar val= (dbug_extra_row_data_val++) %
+    (EXTRA_ROW_INFO_MAX_PAYLOAD + 1); /* 0 .. MAX_PAYLOAD + 1 */
+  arr[EXTRA_ROW_INFO_LEN_OFFSET]= val + EXTRA_ROW_INFO_HDR_BYTES;
+  arr[EXTRA_ROW_INFO_FORMAT_OFFSET]= val;
+  for (uchar i=0; i<val; i++)
+    arr[EXTRA_ROW_INFO_HDR_BYTES+i]= val;
+
+  thd->binlog_row_event_extra_data= arr;
+}
+
+#endif // #ifndef MYSQL_CLIENT
+
+/**
+   check_extra_data
+
+   Called during self-test to check that
+   binlog row event extra data is self-
+   consistent as defined by the set_extra_data
+   function above.
+
+   Will assert(false) if not.
+
+   @param extra_row_data
+*/
+void check_extra_data(uchar* extra_row_data)
+{
+  assert(extra_row_data);
+  uint16 len= extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
+  uint8 val= len - EXTRA_ROW_INFO_HDR_BYTES;
+  assert(extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET] == val);
+  for (uint16 i= 0; i < val; i++)
+  {
+    assert(extra_row_data[EXTRA_ROW_INFO_HDR_BYTES + i] == val);
+  }
+}
+
+#endif  // #ifndef DBUG_OFF
+#endif  // #ifndef MCP_WL5353
 
 /**************************************************************************
 	Rows_log_event member functions
@@ -7141,7 +7272,10 @@ Rows_log_event::Rows_log_event(THD *thd_
     m_table(tbl_arg),
     m_table_id(tid),
     m_width(tbl_arg ? tbl_arg->s->fields : 1),
-    m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0) 
+    m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
+#ifndef MCP_WL5353
+    ,m_extra_row_data(0)
+#endif
 #ifdef HAVE_REPLICATION
     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
 #endif
@@ -7159,6 +7293,33 @@ Rows_log_event::Rows_log_event(THD *thd_
       set_flags(NO_FOREIGN_KEY_CHECKS_F);
   if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
       set_flags(RELAXED_UNIQUE_CHECKS_F);
+#ifndef MCP_WL5353
+#ifndef DBUG_OFF
+  uchar extra_data[255];
+  DBUG_EXECUTE_IF("extra_row_data_set",
+                  /* Set extra row data to a known value */
+                  set_extra_data(thd_arg, extra_data););
+#endif
+  if (thd_arg->binlog_row_event_extra_data)
+  {
+    /* Copy Extra data from thd into new event */
+    uint16 extra_data_len= thd_arg->get_binlog_row_event_extra_data_len();
+    assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
+
+    m_extra_row_data= (uchar*) my_malloc(extra_data_len, MYF(MY_WME));
+
+    if (likely(m_extra_row_data))
+    {
+      memcpy(m_extra_row_data, thd_arg->binlog_row_event_extra_data,
+             extra_data_len);
+      set_flags(EXTRA_ROW_EV_DATA_F);
+    }
+  }
+
+  DBUG_EXECUTE_IF("extra_row_data_set",
+                  thd_arg->binlog_row_event_extra_data = NULL;);
+#endif // #ifndef MCP_WL5353
+
   /* if bitmap_init fails, caught in is_valid() */
   if (likely(!bitmap_init(&m_cols,
                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
@@ -7190,6 +7351,9 @@ Rows_log_event::Rows_log_event(const cha
     m_table(NULL),
 #endif
     m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
+#ifndef MCP_WL5353
+    ,m_extra_row_data(0)
+#endif
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
 #endif
@@ -7219,8 +7383,35 @@ Rows_log_event::Rows_log_event(const cha
 
   m_flags= uint2korr(post_start);
 
+#ifndef MCP_WL5353
+  uint16 extra_data_len= 0;
+  if ((m_flags & EXTRA_ROW_EV_DATA_F))
+  {
+    const uchar* extra_data_start= (const uchar*) post_start + 2;
+    extra_data_len= extra_data_start[EXTRA_ROW_INFO_LEN_OFFSET];
+    assert(m_extra_row_data == 0);
+    assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
+    DBUG_PRINT("debug", ("extra_data_len = %u",
+                         extra_data_len));
+
+    m_extra_row_data= (uchar*) my_malloc(extra_data_len,
+                                         MYF(MY_WME));
+    if (likely(m_extra_row_data))
+    {
+      memcpy(m_extra_row_data, extra_data_start, extra_data_len);
+    }
+  }
+  DBUG_EXECUTE_IF("extra_row_data_check",
+                  /* Check extra data has expected value */
+                  check_extra_data(m_extra_row_data););
+#endif // #ifndef MCP_WL5353
+
   uchar const *const var_start=
-    (const uchar *)buf + common_header_len + post_header_len;
+    (const uchar *)buf + common_header_len + post_header_len
+#ifndef MCP_WL5353
+    + extra_data_len
+#endif
+    ;
   uchar const *const ptr_width= var_start;
   uchar *ptr_after_width= (uchar*) ptr_width;
   DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
@@ -7300,6 +7491,9 @@ Rows_log_event::~Rows_log_event()
     m_cols.bitmap= 0; // so no my_free in bitmap_free
   bitmap_free(&m_cols); // To pair with bitmap_init().
   my_free((uchar*)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR));
+#ifndef MCP_WL5353
+  my_free((uchar*)m_extra_row_data, MYF(MY_ALLOW_ZERO_PTR));
+#endif
 }
 
 int Rows_log_event::get_data_size()
@@ -7316,6 +7510,11 @@ int Rows_log_event::get_data_size()
   int data_size= ROWS_HEADER_LEN;
   data_size+= no_bytes_in_map(&m_cols);
   data_size+= (uint) (end - buf);
+#ifndef MCP_WL5353
+  data_size+= m_extra_row_data ?
+    m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET] :
+    0;
+#endif
 
   if (type_code == UPDATE_ROWS_EVENT)
     data_size+= no_bytes_in_map(&m_cols_ai);
@@ -7466,6 +7665,14 @@ int Rows_log_event::do_apply_event(Relay
         thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
     else
         thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+
+#ifndef MCP_WL5353
+    if (get_flags(EXTRA_ROW_EV_DATA_F))
+        thd->binlog_row_event_extra_data = m_extra_row_data;
+    else
+        thd->binlog_row_event_extra_data = NULL;
+#endif
+
     /* A small test to verify that objects have consistent types */
     DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
@@ -7596,6 +7803,12 @@ int Rows_log_event::do_apply_event(Relay
     else
       thd->options&= ~OPTION_ALLOW_BATCH;
 #endif
+#ifndef MCP_WL5353
+    if (get_flags(EXTRA_ROW_EV_DATA_F))
+        thd->binlog_row_event_extra_data = m_extra_row_data;
+    else
+        thd->binlog_row_event_extra_data = NULL;
+#endif
     /* A small test to verify that objects have consistent types */
     DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
@@ -7915,7 +8128,22 @@ bool Rows_log_event::write_data_header(I
                   });
   int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
   int2store(buf + RW_FLAGS_OFFSET, m_flags);
+#ifndef MCP_WL5353
+  int rc = my_b_safe_write(file, buf, ROWS_HEADER_LEN);
+
+  if ((rc == 0) &&
+      (m_flags & EXTRA_ROW_EV_DATA_F))
+  {
+    /* Write extra row data */
+    rc = my_b_safe_write(file, m_extra_row_data,
+                         m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET]);
+  }
+
+  /* Function returns bool, where false(0) is success :( */
+  return (rc != 0);
+#else
   return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+#endif
 }
 
 bool Rows_log_event::write_data_body(IO_CACHE*file)

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-06-30 15:59:25 +0000
+++ b/sql/log_event.h	2011-09-07 22:50:01 +0000
@@ -3509,6 +3509,13 @@ public:
       values for all columns of the table.
      */
     COMPLETE_ROWS_F = (1U << 3)
+
+#ifndef MCP_WL5353
+    /**
+       Indicates that additional information was appended to the event.
+    */
+    ,EXTRA_ROW_EV_DATA_F = (1U << 4)
+#endif
   };
 
   typedef uint16 flag_set;
@@ -3572,6 +3579,10 @@ public:
 
   uint     m_row_count;         /* The number of rows added to the event */
 
+#ifndef MCP_WL5353
+  const uchar* get_extra_row_data() const   { return m_extra_row_data; }
+#endif
+
 protected:
   /* 
      The constructors are protected since you're supposed to inherit
@@ -3620,6 +3631,11 @@ protected:
 
   flag_set m_flags;		/* Flags for row-level events */
 
+#ifndef MCP_WL5353
+  uchar    *m_extra_row_data;   /* Pointer to extra row data if any */
+                                /* If non null, first byte is length */
+#endif
+
   /* helper functions */
 
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)

=== added file 'sql/ndb_conflict_trans.cc'
--- a/sql/ndb_conflict_trans.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict_trans.cc	2011-09-07 22:50:01 +0000
@@ -0,0 +1,778 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_conflict_trans.h"
+
+#ifdef HAVE_NDB_BINLOG
+#include "my_sys.h"
+#include "my_base.h"
+
+/* Whether to track all transactions, or just
+ * 'interesting' ones
+ */
+#define TRACK_ALL_TRANSACTIONS 0
+
+/* Whether to check the transaction graph for
+ * correctness at runtime
+ */
+#define CHECK_TRANS_GRAPH 0
+
+/* st_row_event_key_info implementation */
+
+st_row_event_key_info::
+st_row_event_key_info(const NdbDictionary::Table* _table,
+                      const uchar* _key_buff,
+                      Uint32 _key_buff_len,
+                      Uint64 _transaction_id):
+  tableObj(_table),
+  packed_key(_key_buff),
+  packed_key_len(_key_buff_len),
+  transaction_id(_transaction_id),
+  hash_next(NULL)
+{
+}
+
+Uint64
+st_row_event_key_info::getTransactionId() const
+{
+  return transaction_id;
+}
+
+void
+st_row_event_key_info::updateRowTransactionId(Uint64 mostRecentTransId)
+{
+  transaction_id = mostRecentTransId;
+}
+
+Uint32
+st_row_event_key_info::hashValue() const
+{
+  /* Include Table Object Id + primary key */
+  Uint32 h = (17 * 37) + tableObj->getObjectId();
+  for (Uint32 i=0; i < packed_key_len; i++)
+    h = (37 * h) + packed_key[i];
+  return h;
+}
+
+bool
+st_row_event_key_info::equal(const st_row_event_key_info* other) const
+{
+  /* Check same table + same PK */
+  return
+    ((tableObj == other->tableObj) &&
+     (packed_key_len == other->packed_key_len) &&
+     (memcmp(packed_key, other->packed_key, packed_key_len) == 0));
+};
+
+st_row_event_key_info*
+st_row_event_key_info::getNext() const
+{
+  return hash_next;
+};
+
+void
+st_row_event_key_info::setNext(st_row_event_key_info* _next)
+{
+  hash_next = _next;
+};
+
+
+/* st_trans_dependency implementation */
+
+st_trans_dependency::
+st_trans_dependency(st_transaction* _target_transaction,
+                    st_transaction* _dependent_transaction,
+                    const st_trans_dependency* _next)
+  : target_transaction(_target_transaction),
+    dependent_transaction(_dependent_transaction),
+    next_entry(_next),
+    hash_next(NULL)
+{
+};
+
+st_transaction*
+st_trans_dependency::getTargetTransaction() const
+{
+  return target_transaction;
+}
+
+st_transaction*
+st_trans_dependency::getDependentTransaction() const
+{
+  return dependent_transaction;
+}
+
+const st_trans_dependency*
+st_trans_dependency::getNextDependency() const
+{
+  return next_entry;
+}
+
+Uint32
+st_trans_dependency::hashValue() const
+{
+  /* Hash the ptrs in a rather nasty way */
+  UintPtr p =
+    ((UintPtr) target_transaction) ^
+    ((UintPtr) dependent_transaction);;
+
+  if (sizeof(p) == 8)
+  {
+    /* Xor two words of 64 bit ptr */
+    p =
+      (p & 0xffffffff) ^
+      ((((Uint64) p) >> 32) & 0xffffffff);
+  }
+
+  return 17 + (37 * (Uint32)p);
+};
+
+bool
+st_trans_dependency::equal(const st_trans_dependency* other) const
+{
+  return ((target_transaction == other->target_transaction) &&
+          (dependent_transaction == other->dependent_transaction));
+};
+
+st_trans_dependency*
+st_trans_dependency::getNext() const
+{
+  return hash_next;
+};
+
+void
+st_trans_dependency::setNext(st_trans_dependency* _next)
+{
+  hash_next = _next;
+};
+
+
+/* st_transaction implementation */
+
+st_transaction::st_transaction(Uint64 _transaction_id)
+  : transaction_id(_transaction_id),
+    in_conflict(false),
+    dependency_list_head(NULL),
+    hash_next(NULL)
+{
+};
+
+Uint64
+st_transaction::getTransactionId() const
+{
+  return transaction_id;
+}
+
+bool
+st_transaction::getInConflict() const
+{
+  return in_conflict;
+}
+
+void
+st_transaction::setInConflict()
+{
+  in_conflict = true;
+}
+
+const st_trans_dependency*
+st_transaction::getDependencyListHead() const
+{
+  return dependency_list_head;
+}
+
+void
+st_transaction::setDependencyListHead(st_trans_dependency* head)
+{
+  dependency_list_head = head;
+}
+
+/* Hash Api */
+Uint32
+st_transaction::hashValue() const
+{
+  return 17 + (37 * ((transaction_id & 0xffffffff) ^
+                     (transaction_id >> 32 & 0xffffffff)));
+};
+
+bool
+st_transaction::equal(const st_transaction* other) const
+{
+  return transaction_id == other->transaction_id;
+};
+
+st_transaction*
+st_transaction::getNext() const
+{
+    return hash_next;
+};
+
+void
+st_transaction::setNext(st_transaction* _next)
+{
+  hash_next = _next;
+};
+
+
+
+/*
+   Unique HashMap(Set) of st_row_event_key_info ptrs, with bucket storage
+   allocated from st_mem_root_allocator
+*/
+template class HashMap2<st_row_event_key_info, true, st_mem_root_allocator>;
+template class HashMap2<st_transaction, true, st_mem_root_allocator>;
+template class HashMap2<st_trans_dependency, true, st_mem_root_allocator>;
+template class LinkedStack<Uint64, st_mem_root_allocator>;
+
+
+/**
+ * pack_key_to_buffer
+ *
+ * Given a table, key_record and record, this method will
+ * determine how many significant bytes the key contains,
+ * and if a buffer is passed, will copy the bytes into the
+ * buffer.
+ */
+static
+int
+pack_key_to_buffer(const NdbDictionary::Table* table,
+                   const NdbRecord* key_rec,
+                   const uchar* record,
+                   uchar* buffer,
+                   Uint32& buff_len)
+{
+  /* Loop over attributes in key record, determining their actual
+   * size based on column type and contents of record
+   * If buffer supplied, copy them contiguously to buffer
+   * return total length
+   */
+  Uint32 attr_id;
+  Uint32 buff_offset = 0;
+  NdbDictionary::getFirstAttrId(key_rec, attr_id);
+
+  do
+  {
+    Uint32 from_offset = 0;
+    Uint32 byte_len = 0;
+    const NdbDictionary::Column* key_col = table->getColumn(attr_id);
+    NdbDictionary::getOffset(key_rec, attr_id, from_offset);
+    assert( ! NdbDictionary::isNull(key_rec, (const char*) record, attr_id));
+
+    switch(key_col->getArrayType())
+    {
+    case NDB_ARRAYTYPE_FIXED:
+      byte_len = key_col->getSizeInBytes();
+      break;
+    case NDB_ARRAYTYPE_SHORT_VAR:
+      byte_len = record[from_offset];
+      from_offset++;
+      break;
+    case NDB_ARRAYTYPE_MEDIUM_VAR:
+      byte_len = uint2korr(&record[from_offset]);
+      from_offset+= 2;
+      break;
+    };
+    assert( (buff_offset + byte_len) <= buff_len );
+
+    if (buffer)
+      memcpy(&buffer[buff_offset], &record[from_offset], byte_len);
+
+    buff_offset+= byte_len;
+  } while (NdbDictionary::getNextAttrId(key_rec, attr_id));
+
+  buff_len = buff_offset;
+  return 0;
+};
+
+static
+Uint32 determine_packed_key_size(const NdbDictionary::Table* table,
+                                 const NdbRecord* key_rec,
+                                 const uchar* record)
+{
+  Uint32 key_size = ~Uint32(0);
+  /* Use pack_key_to_buffer to calculate length required */
+  pack_key_to_buffer(table,
+                     key_rec,
+                     record,
+                     NULL,
+                     key_size);
+  return key_size;
+};
+
+/* st_mem_root_allocator implementation */
+void*
+st_mem_root_allocator::alloc(void* ctx, size_t bytes)
+{
+  st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
+  return alloc_root(a->mem_root, bytes);
+};
+
+void*
+st_mem_root_allocator::calloc(void* ctx, size_t nelem, size_t bytes)
+{
+  st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
+  return alloc_root(a->mem_root,
+                    nelem * bytes);
+}
+
+void
+st_mem_root_allocator::free(void* ctx, void* mem)
+{
+  /* Do nothing, will be globally freed when arena (mem_root)
+   * released
+   */
+};
+
+st_mem_root_allocator::st_mem_root_allocator(MEM_ROOT* _mem_root)
+  : mem_root(_mem_root)
+{
+};
+
+
+/* DependencyTracker implementation */
+
+DependencyTracker*
+DependencyTracker::newDependencyTracker(MEM_ROOT* mem_root)
+{
+  DependencyTracker* dt = NULL;
+  void* mem = alloc_root(mem_root,
+                         sizeof(DependencyTracker));
+  if (mem)
+  {
+    dt = new (mem) DependencyTracker(mem_root);
+  }
+
+  return dt;
+}
+
+
+DependencyTracker::
+DependencyTracker(MEM_ROOT* mem_root)
+  : mra(mem_root), key_hash(&mra), trans_hash(&mra),
+    dependency_hash(&mra),
+    iteratorTodo(ITERATOR_STACK_BLOCKSIZE, &mra),
+    conflicting_trans_count(0),
+    error_text(NULL)
+{
+  /* TODO Get sizes from somewhere */
+  key_hash.setSize(1024);
+  trans_hash.setSize(100);
+  dependency_hash.setSize(100);
+};
+
+
+int
+DependencyTracker::
+track_operation(const NdbDictionary::Table* table,
+                const NdbRecord* key_rec,
+                const uchar* row,
+                Uint64 transaction_id)
+{
+  DBUG_ENTER("track_operation");
+
+  Uint32 required_buff_size = determine_packed_key_size(table,
+                                                        key_rec,
+                                                        row);
+  DBUG_PRINT("info", ("Required length for key : %u", required_buff_size));
+
+  /* Alloc space for packed key and struct*/
+  uchar* packed_key_buff = (uchar*) alloc_root(mra.mem_root,
+                                               required_buff_size);
+  void* element_mem = alloc_root(mra.mem_root,
+                                 sizeof(st_row_event_key_info));
+
+  if (pack_key_to_buffer(table,
+                         key_rec,
+                         row,
+                         packed_key_buff,
+                         required_buff_size))
+  {
+    DBUG_RETURN(-1);
+  }
+
+  if (TRACK_ALL_TRANSACTIONS)
+  {
+    st_transaction* transEntry = get_or_create_transaction(transaction_id);
+    if (!transEntry)
+    {
+      error_text = "track_operation : Failed to get or create transaction";
+      DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+    }
+  }
+
+  st_row_event_key_info* key_info = new (element_mem)
+    st_row_event_key_info(table,
+                          packed_key_buff,
+                          required_buff_size,
+                          transaction_id);
+
+  /* Now try to add element to hash */
+  if (! key_hash.add(key_info))
+  {
+    /*
+      Already an element in the keyhash with this primary key
+      If it's for the same transaction then ignore, otherwise
+      it's an inter-transaction dependency
+    */
+    st_row_event_key_info* existing = key_hash.get(key_info);
+
+    Uint64 existingTransIdOnRow = existing->getTransactionId();
+    Uint64 newTransIdOnRow = key_info->getTransactionId();
+
+    if (existingTransIdOnRow != newTransIdOnRow)
+    {
+      int res = add_dependency(existingTransIdOnRow,
+                               newTransIdOnRow);
+      /*
+        Update stored transaction_id to be latest for key.
+        Further key operations on this row will depend on this
+        transaction, and transitively on the previous
+        transaction.
+      */
+      existing->updateRowTransactionId(newTransIdOnRow);
+
+      DBUG_RETURN(res);
+    }
+    else
+    {
+      /*
+         How can we have two updates to the same row with the
+         same transaction id?  Only if the transaction id
+         is invalid (e.g. not set)
+      */
+      if (existingTransIdOnRow != InvalidTransactionId)
+      {
+        assert(false);
+        DBUG_RETURN(-1);
+      }
+    }
+  }
+
+  DBUG_RETURN(0);
+};
+
+int
+DependencyTracker::
+mark_conflict(Uint64 trans_id)
+{
+  DBUG_ENTER("mark_conflict");
+  DBUG_PRINT("info", ("trans_id : %llu", trans_id));
+
+  st_transaction* entry = get_or_create_transaction(trans_id);
+  if (!entry)
+  {
+    error_text = "mark_conflict : get_or_create_transaction() failure";
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+
+  if (entry->getInConflict())
+  {
+    /* Nothing to do here */
+    DBUG_RETURN(0);
+  }
+
+  /* Have entry, mark it, and any dependents */
+  bool fetch_node_dependents;
+  st_transaction* dependent = entry;
+  reset_dependency_iterator();
+  do
+  {
+    DBUG_PRINT("info", ("Visiting transaction %llu, conflict : %u",
+                        dependent->getTransactionId(),
+                        dependent->getInConflict()));
+    /*
+      If marked already, don't fetch dependents, as
+      they will also be marked already
+    */
+    fetch_node_dependents = false;
+
+    if (!dependent->getInConflict())
+    {
+      dependent->setInConflict();
+      conflicting_trans_count++;
+      fetch_node_dependents = true;
+    }
+  } while ((dependent = get_next_dependency(dependent, fetch_node_dependents)));
+
+  assert( verify_graph() );
+
+  DBUG_RETURN(0);
+}
+
+bool
+DependencyTracker::in_conflict(Uint64 trans_id)
+{
+  DBUG_ENTER("in_conflict");
+  DBUG_PRINT("info", ("trans_id %llu", trans_id));
+  st_transaction key(trans_id);
+  const st_transaction* entry = NULL;
+
+  /*
+    If transaction hash entry exists, check it for
+    conflicts.  If it doesn't exist, no conflict
+  */
+  if ((entry = trans_hash.get(&key)))
+  {
+    DBUG_PRINT("info", ("in_conflict : %u", entry->getInConflict()));
+    DBUG_RETURN(entry->getInConflict());
+  }
+  else
+  {
+    assert(! TRACK_ALL_TRANSACTIONS);
+  }
+  DBUG_RETURN(false);
+};
+
+st_transaction*
+DependencyTracker::
+get_or_create_transaction(Uint64 trans_id)
+{
+  DBUG_ENTER("get_or_create_transaction");
+  st_transaction transKey(trans_id);
+  st_transaction* transEntry = NULL;
+
+  if (! (transEntry = trans_hash.get(&transKey)))
+  {
+    /*
+       Transaction does not exist.  Allocate it
+       and add to the hash
+    */
+    DBUG_PRINT("info", ("Creating new hash entry for transaction (%llu)",
+                        trans_id));
+
+    transEntry = (st_transaction*)
+      st_mem_root_allocator::alloc(&mra, sizeof(st_transaction));
+
+    if (transEntry)
+    {
+      new (transEntry) st_transaction(trans_id);
+
+      if (!trans_hash.add(transEntry))
+      {
+        st_mem_root_allocator::free(&mra, transEntry); /* For show */
+        transEntry = NULL;
+      }
+    }
+  }
+
+  DBUG_RETURN(transEntry);
+}
+
+int
+DependencyTracker::
+add_dependency(Uint64 trans_id, Uint64 dependent_trans_id)
+{
+  DBUG_ENTER("add_dependency");
+  DBUG_PRINT("info", ("Recording dependency of %llu on %llu",
+                      dependent_trans_id, trans_id));
+  st_transaction* targetEntry = get_or_create_transaction(trans_id);
+  if (!targetEntry)
+  {
+    error_text = "add_dependency : Failed get_or_create_transaction";
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+
+  st_transaction* dependentEntry = get_or_create_transaction(dependent_trans_id);
+  if (!dependentEntry)
+  {
+    error_text = "add_dependency : Failed get_or_create_transaction";
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+
+  /* Now lookup dependency.  Add it if not already present */
+  st_trans_dependency depKey(targetEntry, dependentEntry, NULL);
+  st_trans_dependency* dep = NULL;
+  if (! (dep = dependency_hash.get(&depKey)))
+  {
+    DBUG_PRINT("info", ("Creating new dependency hash entry for "
+                        "dependency of %llu on %llu.",
+                        dependentEntry->getTransactionId(),
+                        targetEntry->getTransactionId()));
+
+    dep = (st_trans_dependency*)
+      st_mem_root_allocator::alloc(&mra, sizeof(st_trans_dependency));
+
+    new (dep) st_trans_dependency(targetEntry, dependentEntry,
+                                  targetEntry->getDependencyListHead());
+
+    targetEntry->setDependencyListHead(dep);
+
+    /* New dependency, propagate in_conflict if necessary */
+    if (targetEntry->getInConflict())
+    {
+      DBUG_PRINT("info", ("Marking new dependent as in-conflict"));
+      DBUG_RETURN(mark_conflict(dependentEntry->getTransactionId()));
+    }
+  }
+
+  assert(verify_graph());
+
+  DBUG_RETURN(0);
+};
+
+void
+DependencyTracker::
+reset_dependency_iterator()
+{
+  iteratorTodo.reset();
+};
+
+st_transaction*
+DependencyTracker::
+get_next_dependency(const st_transaction* current,
+                    bool include_dependents_of_current)
+{
+  DBUG_ENTER("get_next_dependency");
+  DBUG_PRINT("info", ("node : %llu", current->getTransactionId()));
+  /*
+    Depth first traverse, with option to ignore sub graphs.
+  */
+  if (include_dependents_of_current)
+  {
+    /* Add dependents to stack */
+    const st_trans_dependency* dependency = current->getDependencyListHead();
+
+    while (dependency)
+    {
+      assert(dependency->getTargetTransaction() == current);
+      DBUG_PRINT("info", ("Adding dependency %llu->%llu",
+                          dependency->getDependentTransaction()->getTransactionId(),
+                          dependency->getTargetTransaction()->getTransactionId()));
+
+      Uint64 dependentTransactionId =
+        dependency->getDependentTransaction()->getTransactionId();
+      iteratorTodo.push(dependentTransactionId);
+      dependency= dependency->getNextDependency();
+    }
+  }
+
+  Uint64 nextId;
+  if (iteratorTodo.pop(nextId))
+  {
+    DBUG_PRINT("info", ("Returning transaction id %llu",
+                        nextId));
+    st_transaction key(nextId);
+    st_transaction* dependent = trans_hash.get(&key);
+    assert(dependent);
+    DBUG_RETURN(dependent);
+  }
+
+  assert(iteratorTodo.size() == 0);
+  DBUG_PRINT("info", ("No more dependencies to visit"));
+  DBUG_RETURN(NULL);
+};
+
+void
+DependencyTracker::
+dump_dependents(Uint64 trans_id)
+{
+  fprintf(stderr, "Dumping dependents of transid %llu : ", trans_id);
+
+  st_transaction key(trans_id);
+  const st_transaction* dependent = NULL;
+
+  if ((dependent = trans_hash.get(&key)))
+  {
+    reset_dependency_iterator();
+    const char* comma = ", ";
+    const char* sep = "";
+    do
+    {
+      {
+        fprintf(stderr, "%s%llu%s", sep, dependent->getTransactionId(),
+                (dependent->getInConflict()?"-C":""));
+        sep = comma;
+      }
+    } while ((dependent = get_next_dependency(dependent)));
+    fprintf(stderr, "\n");
+  }
+  else
+  {
+    fprintf(stderr, "None\n");
+  }
+};
+
+bool
+DependencyTracker::
+verify_graph()
+{
+  if (! CHECK_TRANS_GRAPH)
+    return true;
+
+  /*
+     Check the graph structure obeys its invariants
+
+     1) There are no cycles in the graph such that
+        a transaction is a dependent of itself
+
+     2) If a transaction is marked in_conflict, all
+        of its dependents (transitively), are also
+        marked in conflict
+
+     This is expensive to verify, so not always on
+  */
+  HashMap2<st_transaction, true, st_mem_root_allocator>::Iterator it(trans_hash);
+
+  st_transaction* root = NULL;
+
+  while ((root = it.next()))
+  {
+    bool in_conflict = root->getInConflict();
+
+    /* Now visit all dependents */
+    st_transaction* dependent = root;
+    reset_dependency_iterator();
+
+    while((dependent = get_next_dependency(dependent, true)))
+    {
+      if (dependent == root)
+      {
+        /* Must exit, or we'll be here forever */
+        fprintf(stderr, "Error : Cycle discovered in graph\n");
+        abort();
+        return false;
+      }
+
+      if (in_conflict &&
+          ! dependent->getInConflict())
+      {
+        fprintf(stderr, "Error : Dependent transaction not marked in-conflict\n");
+        abort();
+        return false;
+      }
+    }
+  }
+
+  return true;
+}
+
+
+const char*
+DependencyTracker::get_error_text() const
+{
+  return error_text;
+};
+
+Uint32
+DependencyTracker::get_conflict_count() const
+{
+  return conflicting_trans_count;
+}
+
+/* #ifdef HAVE_NDB_BINLOG */
+
+#endif

=== added file 'sql/ndb_conflict_trans.h'
--- a/sql/ndb_conflict_trans.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict_trans.h	2011-09-07 22:50:01 +0000
@@ -0,0 +1,337 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_CONFLICT_TRANS_H
+#define NDB_CONFLICT_TRANS_H
+
+#include <my_global.h>
+
+#ifdef HAVE_NDB_BINLOG
+#include <ndbapi/NdbApi.hpp>
+#include <util/HashMap2.hpp>
+#include <util/LinkedStack.hpp>
+
+/*
+   This file defines structures for detecting dependencies between
+   transactions based on the rows they update.
+   It is used when applying row updates as part of the MySQLD slave.
+*/
+
+/**
+ * st_row_event_key_info
+ *
+ * This struct describes a row event applied by the Slave, based
+ * on its table, key and transaction id.
+ * Instances of this struct are placed in a hash structure where
+ * the {table, key} are the key, and the transaction id is the
+ * 'data'.
+ * This hash is used to detect when different transactions in an
+ * epoch affect the same row, which implies a dependency between
+ * the transactions.
+ */
+class st_row_event_key_info
+{
+public:
+  /**
+   * User api
+   */
+  st_row_event_key_info(const NdbDictionary::Table* _table,
+                        const uchar* _key_buff,
+                        Uint32 _key_buff_len,
+                        Uint64 _transaction_id);
+  Uint64 getTransactionId() const;
+  void updateRowTransactionId(Uint64 mostRecentTransId);
+
+  /**
+   * Hash Api
+   */
+  Uint32 hashValue() const;
+  bool equal(const st_row_event_key_info* other) const;
+  st_row_event_key_info* getNext() const;
+  void setNext(st_row_event_key_info* _next);
+
+private:
+  /* Key : Table and Primary Key */
+  const NdbDictionary::Table* tableObj;
+  const uchar* packed_key;
+  Uint32 packed_key_len;
+
+  /* Data : Transaction id */
+  Uint64 transaction_id;
+
+  /* Next ptr for hash */
+  st_row_event_key_info* hash_next;
+};
+
+
+class st_transaction;
+
+/**
+   st_trans_dependency
+   Entry in dependency hash.
+   Describes inter-transaction dependency, and comprises part
+   of list of other dependents of target_transaction
+*/
+class st_trans_dependency
+{
+public:
+  /* User Api */
+  st_trans_dependency(st_transaction* _target_transaction,
+                      st_transaction* _dependent_transaction,
+                      const st_trans_dependency* _next);
+
+  st_transaction* getTargetTransaction() const;
+  st_transaction* getDependentTransaction() const;
+  const st_trans_dependency* getNextDependency() const;
+
+
+  /* Hash Api */
+  Uint32 hashValue() const;
+  bool equal(const st_trans_dependency* other) const;
+  st_trans_dependency* getNext() const;
+  void setNext(st_trans_dependency* _next);
+
+private:
+  /* Key */
+  st_transaction* target_transaction;
+  st_transaction* dependent_transaction;
+
+  /* Rest of co-dependents of target_transaction */
+  const st_trans_dependency* next_entry;
+
+  st_trans_dependency* hash_next;
+};
+
+
+
+/**
+   st_transaction
+   Entry in transaction hash, indicates whether transaction
+   is in conflict, and has list of dependents
+*/
+class st_transaction
+{
+public:
+  /* User Api */
+  st_transaction(Uint64 _transaction_id);
+
+  Uint64 getTransactionId() const;
+  bool getInConflict() const;
+  void setInConflict();
+  const st_trans_dependency* getDependencyListHead() const;
+  void setDependencyListHead(st_trans_dependency* head);
+
+  /* Hash Api */
+  Uint32 hashValue() const;
+  bool equal(const st_transaction* other) const;
+  st_transaction* getNext() const;
+  void setNext(st_transaction* _next);
+
+private:
+  /* Key */
+  Uint64 transaction_id;
+
+  /* Data */
+  /* Is this transaction (and therefore its dependents) in conflict? */
+  bool in_conflict;
+  /* Head of list of dependencies */
+  st_trans_dependency* dependency_list_head;
+
+  /* Hash ptr */
+  st_transaction* hash_next;
+};
+
+typedef struct st_mem_root MEM_ROOT;
+
+/**
+ * Allocator type which internally uses a MySQLD mem_root
+ * Used as a template parameter for Ndb ADTs
+ */
+struct st_mem_root_allocator
+{
+  MEM_ROOT* mem_root;
+
+  static void* alloc(void* ctx, size_t bytes);
+  static void* calloc(void* ctx, size_t nelem, size_t bytes);
+  static void free(void* ctx, void* mem);
+  st_mem_root_allocator(MEM_ROOT* _mem_root);
+};
+
+
+class DependencyTracker
+{
+public:
+  static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+  /**
+     newDependencyTracker
+
+     Factory method to get a DependencyTracker object, using
+     memory from the passed mem_root.
+     To discard dependency tracker, just free the passed mem_root.
+  */
+  static DependencyTracker* newDependencyTracker(MEM_ROOT* mem_root);
+
+  /**
+     track_operation
+
+     This method records the operation on the passed
+     table + primary key as belonging to the passed
+     transaction.
+
+     If there is already a recorded operation on the
+     passed table + primary key from a different transaction
+     then a transaction dependency is recorded.
+  */
+  int track_operation(const NdbDictionary::Table* table,
+                      const NdbRecord* key_rec,
+                      const uchar* row,
+                      Uint64 transaction_id);
+
+  /**
+     mark_conflict
+
+     Record that a particular transaction is in conflict.
+     This will also mark any dependent transactions as in
+     conflict.
+  */
+  int mark_conflict(Uint64 trans_id);
+
+  /**
+     in_conflict
+
+     Returns true if the supplied transaction_id is marked as
+     in conflict
+  */
+  bool in_conflict(Uint64 trans_id);
+
+  /**
+     get_error_text
+
+     Returns string containing error description.
+     NULL if no error.
+  */
+  const char* get_error_text() const;
+
+  /**
+     get_conflict_count
+
+     Returns number of transactions marked as in-conflict
+  */
+  Uint32 get_conflict_count() const;
+
+private:
+  DependencyTracker(MEM_ROOT* mem_root);
+
+  /**
+     get_or_create_transaction
+
+     Get or create the transaction object for the
+     given transaction id.
+     Returns Null on allocation failure.
+  */
+  st_transaction* get_or_create_transaction(Uint64 trans_id);
+
+  /**
+     add_dependency
+
+     This method records a dependency between the two
+     passed transaction ids
+  */
+  int add_dependency(Uint64 trans_id, Uint64 dependent_trans_id);
+
+  /**
+     reset_dependency_iterator
+
+     Reset dependency iterator.
+     Required before using get_next_dependency()
+  */
+  void reset_dependency_iterator();
+
+  /**
+     get_next_dependency
+     Gets next dependency in dependency graph.
+     Performs breadth first search from start node.
+
+     include_dependents_of_current = false causes the traversal to skip
+     dependents of the current node.
+  */
+  st_transaction* get_next_dependency(const st_transaction* current,
+                                      bool include_dependents_of_current = true);
+
+  /**
+     dump_dependents
+
+     Debugging function
+  */
+  void dump_dependents(Uint64 trans_id);
+
+  /**
+     verify_graph
+
+     Internal invariant checking function.
+  */
+  bool verify_graph();
+
+  /* MemRoot allocator class instance */
+  st_mem_root_allocator mra;
+
+  /*
+     key_hash
+     Map of {Table, PK} -> TransID
+     Used to find inter-transaction dependencies
+     Attempt to add duplicate entry to the key_hash indicates
+     transaction dependency from existing entry to duplicate.
+  */
+  HashMap2<st_row_event_key_info, true, st_mem_root_allocator> key_hash;
+
+  /*
+     trans_hash
+     Map of {TransId} -> {in_conflict, List of dependents}
+     Used to record which transactions are in-conflict, and what
+     their dependencies are.
+     Transactions not marked in-conflict, and with no dependencies or
+     dependents, are not placed in this hash.
+   */
+  HashMap2<st_transaction, true, st_mem_root_allocator> trans_hash;
+
+  /*
+     dependency_hash
+     Map of {TransIdFrom, TransIdTo}
+     Used to ensure dependencies are added only once, for efficiency.
+     Elements are linked from the trans_hash entry for TransIdFrom.
+   */
+  HashMap2<st_trans_dependency, true, st_mem_root_allocator> dependency_hash;
+
+  /*
+     iteratorTodo
+     Stack of transaction Ids to be visited during breadth first search
+     when marking dependents as in conflict.
+  */
+  static const Uint32 ITERATOR_STACK_BLOCKSIZE = 10;
+  LinkedStack<Uint64, st_mem_root_allocator> iteratorTodo;
+
+  Uint32 conflicting_trans_count;
+
+  const char* error_text;
+};
+
+/* ifdef HAVE_NDB_BINLOG */
+#endif
+
+// #ifndef NDB_CONFLICT_TRANS_H
+#endif

=== modified file 'sql/ndb_mi.cc'
--- a/sql/ndb_mi.cc	2011-06-29 23:28:01 +0000
+++ b/sql/ndb_mi.cc	2011-09-07 22:50:01 +0000
@@ -81,6 +81,16 @@ bool ndb_mi_get_in_relay_log_statement(R
   return (rli->get_flag(Relay_log_info::IN_STMT) != 0);
 }
 
+ulong ndb_mi_get_relay_log_trans_retries()
+{
+  return active_mi->rli.trans_retries;
+}
+
+void ndb_mi_set_relay_log_trans_retries(ulong number)
+{
+  active_mi->rli.trans_retries = number;
+}
+
 /* #ifdef HAVE_NDB_BINLOG */
 
 #endif

=== modified file 'sql/ndb_mi.h'
--- a/sql/ndb_mi.h	2011-06-29 23:28:01 +0000
+++ b/sql/ndb_mi.h	2011-09-07 22:50:01 +0000
@@ -42,6 +42,8 @@ uint32 ndb_mi_get_slave_run_id();
    Relay log info related functions
 */
 bool ndb_mi_get_in_relay_log_statement(class Relay_log_info* rli);
+ulong ndb_mi_get_relay_log_trans_retries();
+void ndb_mi_set_relay_log_trans_retries(ulong number);
 
 // #ifndef NDB_MI_H
 #endif

=== modified file 'sql/rpl_constants.h'
--- a/sql/rpl_constants.h	2011-06-30 15:37:13 +0000
+++ b/sql/rpl_constants.h	2011-09-07 22:50:01 +0000
@@ -31,4 +31,46 @@ enum Incident {
   INCIDENT_COUNT
 };
 
+#ifndef MCP_WL5353
+/**
+   Enumeration of the reserved formats of Binlog extra row information
+*/
+enum ExtraRowInfoFormat {
+  /** Ndb format */
+  ERIF_NDB          =   0,
+
+  /** Reserved formats  0 -> 63 inclusive */
+  ERIF_LASTRESERVED =  63,
+
+  /**
+      Available / uncontrolled formats
+      64 -> 254 inclusive
+  */
+  ERIF_OPEN1        =  64,
+  ERIF_OPEN2        =  65,
+
+  ERIF_LASTOPEN     =  254,
+
+  /**
+      Multi-payload format 255
+
+      Length is total length, payload is sequence of
+      sub-payloads with their own headers containing
+      length + format.
+  */
+  ERIF_MULTI        =  255
+};
+
+/*
+   1 byte length, 1 byte format
+   Length is total length in bytes, including 2 byte header
+   Length values 0 and 1 are currently invalid and reserved.
+*/
+#define EXTRA_ROW_INFO_LEN_OFFSET 0
+#define EXTRA_ROW_INFO_FORMAT_OFFSET 1
+#define EXTRA_ROW_INFO_HDR_BYTES 2
+#define EXTRA_ROW_INFO_MAX_PAYLOAD (255 - EXTRA_ROW_INFO_HDR_BYTES)
+
+#endif   // #ifndef MCP_WL5353
+
 #endif /* RPL_CONSTANTS_H */

=== modified file 'sql/slave.h'
--- a/sql/slave.h	2011-06-30 15:59:25 +0000
+++ b/sql/slave.h	2011-09-07 22:50:01 +0000
@@ -231,6 +231,12 @@ extern ulong opt_server_id_mask;
 
 extern I_List<THD> threads;
 
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+#define SLAVE_SILENT_RETRY_MSG "Slave transaction rollback requested"
+#endif
+#endif
+
 #endif /* HAVE_REPLICATION */
 
 /* masks for start/stop operations on io and sql slave threads */

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2011-07-04 13:37:56 +0000
+++ b/sql/sql_class.cc	2011-09-07 22:50:01 +0000
@@ -606,6 +606,9 @@ THD::THD()
    lock_id(&main_lock_id),
    user_time(0), in_sub_stmt(0),
    sql_log_bin_toplevel(false),
+#ifndef MCP_WL5353
+   binlog_row_event_extra_data(NULL),
+#endif
    binlog_table_maps(0), binlog_flags(0UL),
    table_map_for_update(0),
    arg_of_last_insert_id_function(FALSE),
@@ -882,6 +885,9 @@ void THD::init(void)
   reset_current_stmt_binlog_row_based();
   bzero((char *) &status_var, sizeof(status_var));
   sql_log_bin_toplevel= options & OPTION_BIN_LOG;
+#ifndef MCP_WL5353
+  binlog_row_event_extra_data = 0;
+#endif
 
 #if defined(ENABLED_DEBUG_SYNC)
   /* Initialize the Debug Sync Facility. See debug_sync.cc. */
@@ -3546,7 +3552,13 @@ THD::binlog_prepare_pending_rows_event(T
       pending->get_type_code() != type_code || 
       pending->get_data_size() + needed > opt_binlog_rows_event_max_size || 
       pending->get_width() != colcnt ||
-      !bitmap_cmp(pending->get_cols(), cols)) 
+      !bitmap_cmp(pending->get_cols(), cols)
+#ifndef MCP_WL5353
+      ||
+      !binlog_row_event_extra_data_eq(pending->get_extra_row_data(),
+                                      binlog_row_event_extra_data)
+#endif
+      )
   {
     /* Create a new RowsEventT... */
     Rows_log_event* const
@@ -4065,6 +4077,64 @@ int THD::binlog_query(THD::enum_binlog_q
   DBUG_RETURN(0);
 }
 
+#ifndef MCP_WL5353
+/**
+   get_binlog_row_event_extra_data_len
+
+   Returns the length in bytes of the current thread's
+   binlog row event extra data, if present.
+   The length is stored at some offset from the extra
+   data ptr.
+   Note that this length is the length of the whole extra
+   data structure, including the fixed length header
+   of size EXTRA_ROW_INFO_HDR_BYTES
+
+   @return
+     Length in bytes of the extra data.
+     Zero is valid.  Maximum is 255
+*/
+uint8
+THD::get_binlog_row_event_extra_data_len() const
+{
+  return (binlog_row_event_extra_data?
+          binlog_row_event_extra_data[EXTRA_ROW_INFO_LEN_OFFSET]:
+          0);
+};
+
+/**
+   binlog_row_event_extra_data_eq
+
+   Comparator for two binlog row event extra data
+   pointers.
+
+   It compares their significant bytes.
+
+   Null pointers are acceptable
+
+   @param a
+     first pointer
+
+   @param b
+     first pointer
+
+   @return
+     true if the referenced structures are equal
+*/
+bool
+THD::binlog_row_event_extra_data_eq(const uchar* a,
+                                    const uchar* b)
+{
+  return ((a == b) ||
+          ((a != NULL) &&
+           (b != NULL) &&
+           (a[EXTRA_ROW_INFO_LEN_OFFSET] ==
+            b[EXTRA_ROW_INFO_LEN_OFFSET]) &&
+           (memcmp(a, b,
+                   a[EXTRA_ROW_INFO_LEN_OFFSET]) == 0)));
+}
+
+#endif  // #ifndef MCP_WL5353
+
 bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
                                  ulonglong incr)
 {

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-07-04 13:37:56 +0000
+++ b/sql/sql_class.h	2011-09-07 22:50:01 +0000
@@ -1435,6 +1435,18 @@ public:
   /* container for handler's private per-connection data */
   Ha_data ha_data[MAX_HA];
 
+#ifndef MCP_WL5353
+  /*
+     Ptr to row event extra data to be written to Binlog /
+     received from Binlog.
+
+   */
+  uchar* binlog_row_event_extra_data;
+  uint8  get_binlog_row_event_extra_data_len() const;
+  static bool binlog_row_event_extra_data_eq(const uchar* a,
+                                             const uchar* b);
+#endif
+
 #ifndef MYSQL_CLIENT
   int binlog_setup_trx_data();
 

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-09-07 22:50:01 +0000
@@ -148,7 +148,8 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ha_ndbcluster_binlog.cc
   ../../sql/ha_ndb_index_stat.cc
   ../../sql/ha_ndbinfo.cc
-  ../../sql/ndb_mi.cc)
+  ../../sql/ndb_mi.cc
+  ../../sql/ndb_conflict_trans.cc)
 INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
 
 IF(EXISTS ${CMAKE_SOURCE_DIR}/storage/mysql_storage_engine.cmake)

=== added file 'storage/ndb/include/util/HashMap2.hpp'
--- a/storage/ndb/include/util/HashMap2.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/HashMap2.hpp	2011-09-07 22:50:01 +0000
@@ -0,0 +1,488 @@
+/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#ifndef NDB_HASHMAP2_HPP
+#define NDB_HASHMAP2_HPP
+
+#include <ndb_global.h>
+
+
+/* Basic HashTable implementation
+ * The HashTable stores elements of type KV.
+ * The storage for the elements is managed outside the
+ * HashTable implementation.
+ * The HashTable uses element chaining in each bucket to
+ * deal with collisions.
+ * The HashTable can optionally enforce uniqueness
+ * The HashTable can be resized when it is empty.
+ */
+
+/**
+ * KVOPStaticAdapter template
+ *
+ * Used with HashMap2
+ * Creates a class with static methods calling members of
+ * an object passed to them, for the case where the HashMap
+ * contains types ptrs with the relevant methods defined
+ * (objects).
+ *
+ * Implements the KVOP Api, and requires the following API
+ * from the KV type :
+ *
+ * Useful for supplying OP type.
+ * Required KV Api:
+ *   Uint32 hashValue() const;
+ *   bool equal(const KV* other) const;
+ *   void setNext(KV* next);
+ *   KV* getNext() const;
+ */
+template<typename KV>
+class KVOPStaticAdapter
+{
+public:
+  static Uint32 hashValue(const KV* obj)
+  {
+    return obj->hashValue();
+  };
+
+  static bool equal(const KV* objA, const KV* objB)
+  {
+    return objA->equal(objB);
+  };
+
+  static void setNext(KV* from, KV* to)
+  {
+    return from->setNext(to);
+  };
+
+  static KV* getNext(const KV* from)
+  {
+    return from->getNext();
+  }
+};
+
+// TODO :
+//    Pass allocator context rather than allocator
+//    Support re-alloc?
+//    Use calloc?
+
+/**
+ * StandardAllocator - used in HashMap2 when no allocator supplied
+ * Uses standard malloc/free.
+ */
+struct StandardAllocator
+{
+  static void* alloc(void* ignore, size_t bytes)
+  {
+    return ::malloc(bytes);
+  };
+
+  static void* calloc(void* ignore, size_t nelem, size_t bytes)
+  {
+    return ::calloc(nelem, bytes);
+  }
+
+  static void free(void* ignore, void* mem)
+  {
+    ::free(mem);
+  };
+};
+
+/**
+ * Template parameters
+ *
+ * Classes with static methods are used to avoid the necessity
+ * of using OO wrapper objects for C data.
+ * Objects can be used by defining static methods which call
+ * normal methods.
+ * A default StaticWrapper class exists to 'automate' this if
+ * necessary.
+ *
+ * class KV - Key Value pair.
+ *   The HashTable stores pointers to these.  No interface is
+ *   assumed - they are manipulated via KVOP below, so can be
+ *   chunks of memory or C structs etc.
+ *
+ * bool unique
+ *   True if all keys in a hash table instance must be
+ *   unique.
+ *   False otherwise.
+ *
+ * class A - Allocator
+ *   Used for hash bucket allocation on setSize() call.
+ *   NOT used for element allocation, which is the responsibility
+ *   of the user.
+ *
+ *   Must support static methods :
+ *   - static void* calloc(void* context, size_t nelem, size_t bytes)
+ *   - static void free(void* context, void*)
+ *
+ * class KVOP - Operations on Key Value pair
+ *   KV instances are stored based on the hash returned
+ *   by the KVOP::hashValue() method, with identity based on the
+ *   KVOP::equal() method.
+ *   KV instances must be linkable using KVOP::getNext() and
+ *   KVOP::setNext() methods.
+ *
+ *   KVOP allows the static methods on the KV pair to be separate
+ *   from the data itself.  If they are in the same class, use
+ *   KVOP=KV.  If the methods are not static, and are on the KV class,
+ *   use KVOP=KVOPStaticAdapter<KV>, or equivalent.
+ *
+ *   KVOP must support the following static methods :
+ *   - static bool equal(const class KV* a, const class KV* b);
+ *     Return true if two elements are equal.
+ *
+ *   - static Uint32 hashValue(const class KV*) const;
+ *     Return a 32-bit stable hashvalue for the KV.
+ *     equal(a,b) implies hashValue(a) == hashValue(b)
+ *
+ *   - static void setNext(KV* from, KV* to)
+ *
+ *   - static KV* getNext(const KV* from) const
+ *
+ *
+ * TODO :
+ *   - collision count?
+ *   - release option?
+ */
+template<typename KV,
+         bool unique = true,
+         typename A = StandardAllocator,
+         typename KVOP = KVOPStaticAdapter<KV> >
+class HashMap2
+{
+public:
+  /**
+   * HashMap2 constructor
+   * Pass an Allocator pointer if the templated allocator
+   * requires some context info.
+   * setSize() must be called before the HashMap is used.
+   */
+  HashMap2(void* _allocatorContext = NULL)
+    : tableSize(0),
+      elementCount(0),
+      allocatorContext(_allocatorContext),
+      table(NULL)
+  {
+  };
+
+  ~HashMap2()
+  {
+    if (table)
+      A::free(allocatorContext, table);
+  }
+
+  /**
+   * setSize
+   *
+   * Set the number of buckets.
+   *  Can only be set when the hash table is empty.
+   *  The Allocator is used to allocate/release bucket
+   *  storage.
+   */
+  bool
+  setSize(Uint32 hashBuckets)
+  {
+    if (elementCount)
+    {
+      /* Can't set size while we have contents */
+      return false;
+    }
+
+    if (hashBuckets == 0)
+    {
+      return false;
+    }
+
+    if (table)
+    {
+      A::free(allocatorContext, table);
+      table = NULL;
+    }
+
+    /* TODO : Consider using only power-of-2 + bitmask instead of mod */
+    tableSize = hashBuckets;
+
+    table = (KV**) A::calloc(allocatorContext, hashBuckets, sizeof(KV*));
+
+    if (!table)
+    {
+      return false;
+    }
+
+    for (Uint32 i=0; i < tableSize; i++)
+      table[i] = NULL;
+
+    return true;
+  };
+
+  /**
+   * add
+   *
+   * Add a KV element to the hash table
+   * The next value must be null
+   * If the hash table requires uniqueness, and the
+   * element is not unique, false will be returned
+   */
+  bool
+  add(KV* keyVal)
+  {
+    assert(table);
+
+    Uint32 hashVal = rehash(KVOP::hashValue(keyVal));
+    Uint32 bucketIdx = hashVal % tableSize;
+
+    KV* bucket = table[bucketIdx];
+
+    if (bucket != NULL)
+    {
+      if (unique)
+      {
+        /* Need to check element is not already there, in this
+         * chain
+         */
+        const KV* chainElement = bucket;
+        while (chainElement)
+        {
+          if (KVOP::equal(keyVal, chainElement))
+          {
+            /* Found duplicate */
+            return false;
+          }
+          chainElement= KVOP::getNext(chainElement);
+        }
+      }
+
+      /* Can insert at head of list, as either no uniqueness
+       * guarantee, or uniqueness checked.
+       */
+      assert(KVOP::getNext(keyVal) == NULL);
+      KVOP::setNext(keyVal, bucket);
+      table[bucketIdx] = keyVal;
+    }
+    else
+    {
+      /* First element in bucket */
+      assert(KVOP::getNext(keyVal) == NULL);
+      KVOP::setNext(keyVal, NULL);
+      table[bucketIdx] = keyVal;
+    }
+
+    elementCount++;
+    return true;
+  }
+
+  KV*
+  remove(KV* key)
+  {
+    assert(table);
+
+    Uint32 hashVal = rehash(KVOP::hashValue(key));
+    Uint32 bucketIdx = hashVal % tableSize;
+
+    KV* bucket = table[bucketIdx];
+
+    if (bucket != NULL)
+    {
+      KV* chainElement = bucket;
+      KV* prev = NULL;
+      while (chainElement)
+      {
+        if (KVOP::equal(key, chainElement))
+        {
+          /* Found, repair bucket chain
+           * Get next, might be NULL
+           */
+          KV* n = KVOP::getNext(chainElement);
+          if (prev)
+          {
+            /* Link prev to next */
+            KVOP::setNext(prev, n);
+          }
+          else
+          {
+            /* Put next as first in bucket */
+            table[bucketIdx] = n;
+          }
+
+          KVOP::setNext(chainElement, NULL);
+          elementCount--;
+
+          return chainElement;
+        }
+        prev = chainElement;
+        chainElement = KVOP::getNext(chainElement);
+      }
+    }
+
+    return NULL;
+  }
+
+  /**
+   * get
+   *
+   * Get a ptr to a KV element in the hash table
+   * with the same key as the element passed.
+   * Returns NULL if none exists.
+   *
+   * For non unique hash tables, a ptr to the
+   * first element with the given key is returned.
+   * Further elements must be found by iteration
+   * (and key comparison), until NULL is returned.
+   */
+  KV*
+  get(const KV* key) const
+  {
+    assert(table);
+
+    Uint32 hashVal = rehash(KVOP::hashValue(key));
+    Uint32 bucketIdx = hashVal % tableSize;
+
+    KV* chainElement = table[bucketIdx];
+
+    while(chainElement)
+    {
+      if (KVOP::equal(key, chainElement))
+      {
+        break;
+      }
+      chainElement = KVOP::getNext(chainElement);
+    }
+
+    return chainElement;
+  };
+
+  /**
+   * reset
+   *
+   * Resets the hash table to have no entries.
+   * KV elements added are not released in any
+   * way.  This must be handled outside the
+   * HashTable implementation, perhaps by
+   * iterating and removing the elements?
+   * Storage for the hash table itself is
+   * preserved
+   */
+  void
+  reset()
+  {
+    /* Zap the hashtable ptrs, without freeing the 'elements'
+     * Keep the storage allocated for the ptrs
+     */
+    if (elementCount)
+    {
+      assert(table);
+      for (Uint32 i=0; i < tableSize; i++)
+      {
+        table[i] = NULL;
+      }
+
+      elementCount = 0;
+    }
+  }
+
+  /**
+   * getElementCount
+   *
+   * Returns the number of elements currently
+   * stored in this hash table.
+   */
+  Uint32
+  getElementCount() const
+  {
+      return elementCount;
+  }
+
+  /**
+   * getTableSize
+   *
+   * Returns the number of hashBuckets in this hash table
+   */
+  Uint32
+  getTableSize() const
+  {
+    return tableSize;
+  }
+
+  class Iterator
+  {
+  public:
+    Iterator(HashMap2& hashMap)
+      : m_hash(&hashMap),
+        curr_element(NULL),
+        curr_bucket(0)
+    {}
+
+    /**
+      Return the current element and reposition the iterator to the next
+      element.
+    */
+    KV* next()
+    {
+      while (curr_bucket < m_hash->tableSize)
+      {
+        if (curr_element == NULL)
+        {
+          /* First this bucket */
+          curr_element = m_hash->table[ curr_bucket ];
+        }
+        else
+        {
+          /* Next this bucket */
+          curr_element = KVOP::getNext(curr_element);
+        }
+
+        if (curr_element)
+        {
+          return curr_element;
+        }
+        curr_bucket++;
+      }
+
+      return NULL;
+    }
+    void reset()
+    {
+      curr_element = NULL;
+      curr_bucket = 0;
+    }
+  private:
+    HashMap2* m_hash;
+    KV* curr_element;
+    Uint32 curr_bucket;
+  };
+
+private:
+  static Uint32 rehash(Uint32 userHash)
+  {
+    /* We rehash the supplied hash value in case
+     * it's low quality, mixing some higher order
+     * bits in with the lower bits
+     */
+    userHash ^= ((userHash >> 20) ^ (userHash >> 12));
+    return userHash ^ (userHash >> 7) ^ (userHash >> 4);
+  }
+
+  Uint32 tableSize;
+  Uint32 elementCount;
+  void* allocatorContext;
+
+  KV** table;
+
+}; // class HashMap2()
+
+#endif

=== added file 'storage/ndb/include/util/LinkedStack.hpp'
--- a/storage/ndb/include/util/LinkedStack.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/LinkedStack.hpp	2011-09-07 22:50:01 +0000
@@ -0,0 +1,216 @@
+/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#ifndef NDB_LINKEDSTACK_HPP
+#define NDB_LINKEDSTACK_HPP
+
+#include <ndb_global.h>
+
+/**
+ * LinkedStack
+ *
+ * A templated class for a stack of elements E.
+ * Storage for the elements is allocated using the passed
+ * allocator.
+ * Push copies the supplied element into the stack
+ * Pop overwrites the supplied element with the contents
+ * of the top of the stack
+ * Internally, the class allocates 'blocks' of elements of
+ * the size passed, linking them together as necessary.
+ * As the stack shrinks, the blocks are not released.
+ * Blocks are returned to the allocator when release() is
+ * called.
+ * reset() empties the stack without releasing the allocated
+ * storage.
+ */
+template <typename E, typename A>
+class LinkedStack
+{
+private:
+  struct BlockHeader
+  {
+    BlockHeader* next;
+    BlockHeader* prev;
+    E* elements;
+  };
+
+  BlockHeader* allocBlock()
+  {
+    /* Alloc blockheader and element array */
+    BlockHeader* h = (BlockHeader*) A::alloc(allocatorContext,
+                                             sizeof(BlockHeader));
+    E* e = (E*) A::calloc(allocatorContext, blockElements, sizeof(E));
+
+    h->next = NULL;
+    h->prev = NULL;
+    h->elements = e;
+
+    return h;
+  }
+
+  bool valid()
+  {
+    if (stackTop)
+    {
+      assert(firstBlock != NULL);
+      assert(currBlock != NULL);
+      /* Check that currBlock is positioned on correct
+       * block, except for block boundary case
+       */
+      Uint32 blockNum = (stackTop - 1) / blockElements;
+      BlockHeader* bh = firstBlock;
+      while(blockNum--)
+      {
+        bh = bh->next;
+      }
+      assert(bh == currBlock);
+    }
+    else
+    {
+      assert(currBlock == NULL);
+    }
+    return true;
+  }
+
+  /* Note that stackTop is 'next insertion point' whereas
+   * currBlock points to block last inserted to.
+   * On block boundaries, they refer to different blocks
+   */
+  void* allocatorContext;
+  BlockHeader* firstBlock;
+  BlockHeader* currBlock;
+  Uint32 stackTop;
+  Uint32 blockElements;
+
+public:
+  LinkedStack(Uint32 _blockElements, void* _allocatorContext=NULL)
+    : allocatorContext(_allocatorContext),
+      firstBlock(NULL),
+      currBlock(NULL),
+      stackTop(0),
+      blockElements(_blockElements)
+  {
+    assert(blockElements > 0);
+    assert(valid());
+  }
+
+  ~LinkedStack()
+  {
+    assert(valid());
+    /* Release block storage if present */
+    release();
+  }
+
+  bool push(E& elem)
+  {
+    assert(valid());
+    Uint32 blockOffset = stackTop % blockElements;
+
+    if (blockOffset == 0)
+    {
+      /* On block boundary */
+      if (stackTop)
+      {
+        /* Some elements exist already */
+        if (!currBlock->next)
+        {
+          /* End of block list, alloc another */
+          BlockHeader* newBlock = allocBlock();
+          if (!newBlock)
+            return false;
+
+          currBlock->next = newBlock;
+          currBlock->next->prev = currBlock;
+        }
+        currBlock = currBlock->next;
+      }
+      else
+      {
+        /* First element */
+        if (!firstBlock)
+        {
+          BlockHeader* newBlock = allocBlock();
+          if (!newBlock)
+            return false;
+
+          firstBlock = currBlock = newBlock;
+        }
+        currBlock = firstBlock;
+      }
+    }
+
+    currBlock->elements[ blockOffset ] = elem;
+    stackTop++;
+
+    assert(valid());
+    return true;
+  }
+
+  bool pop(E& elem)
+  {
+    assert(valid());
+    if (stackTop)
+    {
+      stackTop--;
+      Uint32 blockOffset = stackTop % blockElements;
+      elem = currBlock->elements[ blockOffset ];
+
+      if (blockOffset == 0)
+      {
+        /* Block boundary, shift back to prev block. */
+        if (stackTop)
+          assert(currBlock->prev);
+
+        currBlock = currBlock->prev;
+      }
+
+      assert(valid());
+      return true;
+    }
+    return false;
+  }
+
+  Uint32 size() const
+  {
+    return stackTop;
+  }
+
+  void reset()
+  {
+    assert(valid());
+    stackTop = 0;
+    currBlock = NULL;
+    assert(valid());
+  };
+
+  void release()
+  {
+    assert(valid());
+    BlockHeader* h = firstBlock;
+    while (h)
+    {
+      BlockHeader* n = h->next;
+      A::free(allocatorContext, h->elements);
+      A::free(allocatorContext, h);
+      h = n;
+    };
+    stackTop = 0;
+    firstBlock = currBlock = NULL;
+    assert(valid());
+  }
+};
+
+#endif

=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt	2011-09-07 22:50:01 +0000
@@ -55,6 +55,8 @@ ADD_CONVENIENCE_LIBRARY(ndbgeneral
             require.c
             Vector.cpp
             NdbPack.cpp
+            HashMap2.cpp
+            LinkedStack.cpp
 )
 TARGET_LINK_LIBRARIES(ndbgeneral ndbtrace ${ZLIB_LIBRARY} mysys)
 
@@ -88,3 +90,12 @@ SET_TARGET_PROPERTIES(NdbPack-t
                       PROPERTIES COMPILE_FLAGS "-DTEST_NDB_PACK")
 TARGET_LINK_LIBRARIES(NdbPack-t ndbgeneral ndbportlib)
 
+ADD_EXECUTABLE(HashMap2-t HashMap2.cpp)
+SET_TARGET_PROPERTIES(HashMap2-t
+                      PROPERTIES COMPILE_FLAGS "-DTEST_HASHMAP2")
+TARGET_LINK_LIBRARIES(HashMap2-t ndbgeneral)
+
+ADD_EXECUTABLE(LinkedStack-t LinkedStack.cpp)
+SET_TARGET_PROPERTIES(LinkedStack-t
+                      PROPERTIES COMPILE_FLAGS "-DTEST_LINKEDSTACK")
+TARGET_LINK_LIBRARIES(LinkedStack-t ndbgeneral)
\ No newline at end of file

=== added file 'storage/ndb/src/common/util/HashMap2.cpp'
--- a/storage/ndb/src/common/util/HashMap2.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/HashMap2.cpp	2011-09-07 22:50:01 +0000
@@ -0,0 +1,407 @@
+/* Copyright (C) 2009 Sun Microsystems Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#include <HashMap2.hpp>
+
+#ifdef TEST_HASHMAP2
+#include <NdbTap.hpp>
+
+struct TestHeapAllocator
+{
+  static const int DEBUG_ALLOC=0;
+  static void* alloc(void* ignore, size_t bytes)
+  {
+    void* p = ::malloc(bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u bytes at %p\n",
+             (Uint32) bytes, p);
+    }
+    return p;
+  }
+
+  static void* calloc(void* ignore, size_t nelem, size_t bytes)
+  {
+    void* p = ::calloc(nelem, bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u elements of %u bytes (%u total?) at %p\n",
+             (Uint32)nelem, (Uint32) bytes, (Uint32) (nelem*bytes), p);
+    }
+    return p;
+  }
+
+  static void free(void* ignore, void* mem)
+  {
+    if (DEBUG_ALLOC)
+    {
+      printf("--Freeing bytes at %p\n",mem);
+    }
+    ::free(mem);
+  }
+};
+
+struct IntIntKVPod
+{
+  int a;
+  int b;
+  IntIntKVPod* next;
+};
+
+struct IntIntKVStaticMethods
+{
+  static Uint32 hashValue(const IntIntKVPod* obj)
+  {
+    return obj->a*31;
+  }
+
+  static bool equal(const IntIntKVPod* objA, const IntIntKVPod* objB)
+  {
+    return objA->a == objB->a;
+  }
+
+  static void setNext(IntIntKVPod* from, IntIntKVPod* to)
+  {
+    from->next = to;
+  }
+
+  static IntIntKVPod* getNext(const IntIntKVPod* from)
+  {
+    return from->next;
+  }
+};
+
+struct IntIntKVObj
+{
+  int a;
+  int b;
+  IntIntKVObj* next;
+
+  Uint32 hashValue() const
+  {
+    return a*31;
+  }
+
+  bool equal(const IntIntKVObj* other) const
+  {
+    return a == other->a;
+  }
+
+  void setNext(IntIntKVObj* _next)
+  {
+    next = _next;
+  }
+
+  IntIntKVObj* getNext() const
+  {
+    return next;
+  }
+};
+
+TAPTEST(HashMap2)
+{
+  printf("int -> int (Static, unique) \n");
+  for (int j = 1; j < 150; j ++)
+  {
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+
+    hash1.setSize(j);
+
+    OK(hash1.getElementCount() == 0);
+
+    /* Create some values in a pool on the stack */
+    IntIntKVPod pool[101];
+
+    for (int i=0; i < 100; i++)
+    {
+      pool[i].a = i;
+      pool[i].b = 3 * i;
+      pool[i].next = NULL;
+    }
+
+    /* Add the pool elements to the hash table */
+    for (int i=0; i < 100; i++)
+    {
+      OK(hash1.add(&pool[i]));
+    }
+
+    /* Now attempt to add a duplicate */
+    pool[100].a = 0;
+    pool[100].b = 999;
+    pool[100].next = NULL;
+
+    OK(hash1.getElementCount() == 100);
+
+    OK(! hash1.add(&pool[100]));
+
+    for (int i=1; i < 100; i++)
+    {
+      OK(hash1.get(&pool[i]) == &pool[i]);
+    }
+
+    OK(hash1.get(&pool[0]) == &pool[0]);
+
+    /* Test iterator Api */
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods>::Iterator it(hash1);
+
+    IntIntKVPod* j;
+    for (int i=0; i < 2; i++)
+    {
+      int count = 0;
+      while((j = it.next()))
+      {
+        OK( j->b == ((j->a * 3) - i) );
+        j->b--;
+        count++;
+      }
+      OK( count == 100 );
+      it.reset();
+    }
+
+    hash1.reset();
+    it.reset();
+    OK( it.next() == NULL );
+  }
+
+  printf("int -> int (Static, !unique) \n");
+  for (int j = 1; j < 150; j ++)
+  {
+    HashMap2<IntIntKVPod, false, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+
+    hash1.setSize(j);
+
+    OK(hash1.getElementCount() == 0);
+
+    /* Create some values in a pool on the stack */
+    IntIntKVPod pool[101];
+
+    for (int i=0; i < 100; i++)
+    {
+      pool[i].a = i;
+      pool[i].b = 3 * i;
+      pool[i].next = NULL;
+    }
+
+    /* Add the pool elements to the hash table */
+    for (int i=0; i < 100; i++)
+    {
+      OK(hash1.add(&pool[i]));
+    }
+
+    /* Now attempt to add a duplicate */
+    pool[100].a = 0;
+    pool[100].b = 999;
+    pool[100].next = NULL;
+
+    OK(hash1.getElementCount() == 100);
+
+    OK(  hash1.add(&pool[100]));
+
+    for (int i=1; i < 100; i++)
+    {
+      OK(hash1.get(&pool[i]) == &pool[i]);
+    }
+
+    OK((hash1.get(&pool[0]) == &pool[0]) ||
+       (hash1.get(&pool[0]) == &pool[100]));
+  }
+
+  printf("int -> int (!Static, defaults, (std alloc, unique)) \n");
+  for (int j = 1; j < 150; j ++)
+  {
+    HashMap2<IntIntKVObj> hash1;
+
+    hash1.setSize(j);
+
+    OK(hash1.getElementCount() == 0);
+
+    /* Create some values in a pool on the stack */
+    IntIntKVObj pool[101];
+
+    for (int i=0; i < 100; i++)
+    {
+      pool[i].a = i;
+      pool[i].b = 3 * i;
+      pool[i].next = NULL;
+    }
+
+    /* Add the pool elements to the hash table */
+    for (int i=0; i < 100; i++)
+    {
+      OK(hash1.add(&pool[i]));
+    }
+
+    /* Now attempt to add a duplicate */
+    pool[100].a = 0;
+    pool[100].b = 999;
+    pool[100].next = NULL;
+
+    OK(hash1.getElementCount() == 100);
+
+    OK(! hash1.add(&pool[100]));
+
+    for (int i=1; i < 100; i++)
+    {
+      OK(hash1.get(&pool[i]) == &pool[i]);
+    }
+
+    OK(hash1.get(&pool[0]) == &pool[0]);
+  }
+
+  printf("int -> int (Static, unique, realloc) \n");
+  {
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+    for (int j = 1; j < 150; j ++)
+    {
+      hash1.setSize(150 - j);
+
+      OK(hash1.getElementCount() == 0);
+
+      /* Create some values in a pool on the stack */
+      IntIntKVPod pool[101];
+
+      for (int i=0; i < 100; i++)
+      {
+        pool[i].a = i;
+        pool[i].b = 3 * i;
+        pool[i].next = NULL;
+      }
+
+      /* Add the pool elements to the hash table */
+      for (int i=0; i < 100; i++)
+      {
+        OK(hash1.add(&pool[i]));
+      }
+
+      /* Now attempt to add a duplicate */
+      pool[100].a = 0;
+      pool[100].b = 999;
+      pool[100].next = NULL;
+
+      OK(hash1.getElementCount() == 100);
+
+      OK(! hash1.add(&pool[100]));
+
+      for (int i=1; i < 100; i++)
+      {
+        OK(hash1.get(&pool[i]) == &pool[i]);
+      }
+
+      OK(hash1.get(&pool[0]) == &pool[0]);
+
+      OK(!hash1.setSize(j+1));
+
+      hash1.reset();
+    }
+  }
+
+  printf("int -> int (Static, unique, realloc, remove) \n");
+  {
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+    for (int j = 1; j < 150; j ++)
+    {
+//      hash1.setSize(150 - j);
+      hash1.setSize(j);
+
+      OK(hash1.getElementCount() == 0);
+
+      /* Create some values in a pool on the stack */
+      IntIntKVPod pool[101];
+
+      for (int i=0; i < 100; i++)
+      {
+        pool[i].a = i;
+        pool[i].b = 3 * i;
+        pool[i].next = NULL;
+      }
+
+      /* Add the pool elements to the hash table */
+      for (int i=0; i < 100; i++)
+      {
+        OK(hash1.add(&pool[i]));
+      }
+
+      /* Now attempt to add a duplicate */
+      pool[100].a = 0;
+      pool[100].b = 999;
+      pool[100].next = NULL;
+
+      OK(hash1.getElementCount() == 100);
+
+      OK(!hash1.add(&pool[100]));
+
+      for (int i=1; i < 100; i++)
+      {
+        OK(hash1.get(&pool[i]) == &pool[i]);
+      }
+
+      OK((hash1.get(&pool[0]) == &pool[0]) ||
+         (hash1.get(&pool[0]) == &pool[100]));
+
+      OK(!hash1.setSize(j+1));
+
+      /* Now replace elements with different ones */
+      IntIntKVPod pool2[100];
+      for (int i=0; i < 100; i++)
+      {
+        pool2[i].a = i;
+        pool2[i].b = 4 * i;
+        pool2[i].next = NULL;
+      }
+
+      for (int k=0; k < 4; k++)
+      {
+        for (int i=0; i< 100; i++)
+        {
+          if ((i % 4) == k)
+          {
+            OK(hash1.remove(&pool[i]) == &pool[i]);
+          };
+        };
+
+        OK(hash1.getElementCount() == 75);
+
+        for (int i=0; i< 100; i++)
+        {
+          if ((i % 4) == k)
+          {
+            OK(!hash1.get(&pool[i]));
+          }
+        };
+
+        for (int i=0; i< 100; i++)
+        {
+          if ((i % 4) == k)
+          {
+            OK(hash1.add(&pool2[i]));
+          }
+        }
+        OK(hash1.getElementCount() == 100);
+      }
+
+      for (int i=0; i< 100; i++)
+      {
+        OK(hash1.get(&pool2[i]) == &pool2[i]);
+      };
+
+      hash1.reset();
+    }
+  }
+
+  return 1; // OK
+}
+
+#endif

=== added file 'storage/ndb/src/common/util/LinkedStack.cpp'
--- a/storage/ndb/src/common/util/LinkedStack.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/LinkedStack.cpp	2011-09-07 22:50:01 +0000
@@ -0,0 +1,124 @@
+/* Copyright (C) 2009 Sun Microsystems Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#include <LinkedStack.hpp>
+
+#ifdef TEST_LINKEDSTACK
+#include <NdbTap.hpp>
+
+struct TestHeapAllocator
+{
+  static const int DEBUG_ALLOC=0;
+  static void* alloc(void* ignore, size_t bytes)
+  {
+    void* p = ::malloc(bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u bytes at %p\n",
+             (Uint32) bytes, p);
+    }
+    return p;
+  }
+
+  static void* calloc(void* ignore, size_t nelem, size_t bytes)
+  {
+    void* p = ::calloc(nelem, bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u elements of %u bytes (%u bytes?) at %p\n",
+             (Uint32) nelem, (Uint32) bytes, (Uint32) (nelem * bytes), p);
+    }
+    return p;
+  }
+
+  static void free(void* ignore, void* mem)
+  {
+    if (DEBUG_ALLOC)
+    {
+      printf("--Freeing bytes at %p\n",mem);
+    }
+    ::free(mem);
+  }
+};
+
+TAPTEST(LinkedStack)
+{
+  Uint32 popped;
+  Uint32 blockSize = 1;
+
+  for (Uint32 b=0; b < 10; b++)
+  {
+    LinkedStack<Uint32, TestHeapAllocator> testStack(blockSize);
+
+    for (Uint32 p=0; p<4; p++)
+    {
+      /* Pass 0 == alloc, Pass 1 == re-use, Pass 3 = Reset, Pass 4 = Release */
+      printf("LinkedBlockStack size %u, pass %u\n", blockSize, p);
+      Uint32 stackSize = 2033 * (p+1);
+
+      OK(testStack.size() == 0);
+      printf("  Pushing %u elements\n", stackSize);
+      for (Uint32 i=0; i < stackSize; i++)
+      {
+        /* Push items onto the stack */
+        OK(testStack.push(i) == true);
+        OK(testStack.size() == i+1);
+        OK(testStack.pop(popped) == true);
+        OK(popped == i);
+        OK(testStack.size() == i);
+        OK(testStack.push(i) == true);
+      };
+
+      switch(p)
+      {
+      case 0:
+      case 1:
+      {
+        printf("  Popping %u elements\n", stackSize);
+        for (Uint32 i=0; i < stackSize; i++)
+        {
+          /* Pop items off the stack */
+          OK(testStack.size() == stackSize - i);
+          OK(testStack.pop(popped) == true);
+          OK(popped == stackSize - (i+1));
+        }
+        break;
+      }
+      case 2:
+      {
+        printf("  Releasing stack\n");
+        testStack.release();
+        break;
+      }
+      case 3:
+      {
+        printf("  Resetting stack\n");
+        testStack.reset();
+        break;
+      }
+      }
+
+      OK(testStack.size() == 0);
+      OK(testStack.pop(popped) == false);
+    }
+    printf("  Destructing stack\n");
+    blockSize = (blockSize * 2)+1;
+  }
+
+  return 1; // OK
+}
+
+#endif

=== modified file 'storage/ndb/src/common/util/Makefile.am'
--- a/storage/ndb/src/common/util/Makefile.am	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/util/Makefile.am	2011-09-07 22:50:01 +0000
@@ -35,7 +35,7 @@ INCLUDES_LOC = @ZLIB_INCLUDES@
 libndbazio_la_SOURCES = ndbzio.c
 libndbazio_la_LIBADD = @ZLIB_LIBS@
 
-noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t
+noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t HashMap2-t LinkedStack-t
 
 BaseString_t_SOURCES = BaseString.cpp
 BaseString_t_CXXFLAGS = -DTEST_BASE_STRING
@@ -83,5 +83,21 @@ NdbPack_t_LDADD = \
 	$(top_builddir)/strings/libmystrings.la \
 	$(top_builddir)/dbug/libdbug.la
 
+HashMap2_t_SOURCES = HashMap2.cpp
+HashMap2_t_CXXFLAGS = -DTEST_HASHMAP2
+HashMap2_t_LDADD = \
+	libgeneral.la \
+	$(top_builddir)/mysys/libmysyslt.la \
+	$(top_builddir)/dbug/libdbuglt.la \
+	$(top_builddir)/strings/libmystringslt.la
+
+LinkedStack_t_SOURCES = LinkedStack.cpp
+LinkedStack_t_CXXFLAGS = -DTEST_LINKEDSTACK
+LinkedStack_t_LDADD = \
+	libgeneral.la \
+	$(top_builddir)/mysys/libmysyslt.la \
+	$(top_builddir)/dbug/libdbuglt.la \
+	$(top_builddir)/strings/libmystringslt.la
+
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_util.mk.am

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (frazer.clement:4507 to 4508) Frazer Clement8 Sep