List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:September 6 2011 10:25am
Subject:bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4251 to 4252)
View as plain text  
 4252 Frazer Clement	2011-09-06
      WL5353 Club X.X test commit no.3

    added:
      mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result
      mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
      mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt
      mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test
      mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc
      mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
      mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
      mysql-test/suite/rpl/r/rpl_extra_row_data.result
      mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt
      mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt
      mysql-test/suite/rpl/t/rpl_extra_row_data.test
      sql/ndb_conflict_trans.cc
      sql/ndb_conflict_trans.h
      storage/ndb/include/util/HashMap2.hpp
      storage/ndb/include/util/LinkedStack.hpp
      storage/ndb/src/common/util/HashMap2.cpp
      storage/ndb/src/common/util/LinkedStack.cpp
    modified:
      libmysqld/Makefile.am
      mysql-test/suite/ndb/r/ndb_basic.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test
      sql/Makefile.am
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/log_event.cc
      sql/log_event.h
      sql/ndb_mi.cc
      sql/ndb_mi.h
      sql/rpl_constants.h
      sql/slave.h
      sql/sql_class.cc
      sql/sql_class.h
      storage/ndb/CMakeLists.txt
      storage/ndb/src/common/util/CMakeLists.txt
      storage/ndb/src/common/util/Makefile.am
      storage/ndb/src/ndbapi/NdbTransaction.cpp
      storage/ndb/test/ndbapi/testBlobs.cpp
      storage/ndb/test/run-test/daily-basic-tests.txt
 4251 Frazer Clement	2011-07-11 [merge]
      Merge 7.0->7.1

    modified:
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2011-07-04 16:30:34 +0000
+++ b/libmysqld/Makefile.am	2011-09-06 10:23:37 +0000
@@ -49,7 +49,7 @@ sqlsources = derror.cc field.cc field_co
 	     ha_ndbcluster.cc ha_ndbcluster_cond.cc \
 	ha_ndbcluster_connection.cc ha_ndbinfo.cc \
 	ha_ndb_index_stat.cc \
-	ha_ndbcluster_binlog.cc ha_partition.cc \
+	ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ha_partition.cc \
 	handler.cc sql_handler.cc \
 	hostname.cc init.cc password.c \
 	item.cc item_buff.cc item_cmpfunc.cc item_create.cc \
@@ -127,6 +127,9 @@ ha_ndb_index_stat.o: ha_ndb_index_stat.c
 ha_ndbinfo.o: ha_ndbinfo.cc
 		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
 
+ndb_conflict_trans.o: ndb_conflict_trans.cc
+		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+
 # Until we can remove dependency on ha_ndbcluster.h
 handler.o:	handler.cc
 		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<

=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2011-07-07 14:48:06 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2011-09-06 10:23:37 +0000
@@ -66,9 +66,15 @@ Ndb_cluster_node_id	#
 Ndb_config_from_host	#
 Ndb_config_from_port	#
 Ndb_conflict_fn_epoch	#
+Ndb_conflict_fn_epoch_trans	#
 Ndb_conflict_fn_max	#
 Ndb_conflict_fn_max_del_win	#
 Ndb_conflict_fn_old	#
+Ndb_conflict_trans_conflict_commit_count	#
+Ndb_conflict_trans_detect_iter_count	#
+Ndb_conflict_trans_reject_count	#
+Ndb_conflict_trans_row_conflict_count	#
+Ndb_conflict_trans_row_reject_count	#
 Ndb_connect_count	#
 Ndb_execute_count	#
 Ndb_index_stat_cache_clean	#
@@ -101,6 +107,7 @@ ndb_log_bin	#
 ndb_log_binlog_index	#
 ndb_log_empty_epochs	#
 ndb_log_orig	#
+ndb_log_transaction_id	#
 ndb_log_update_as_write	#
 ndb_log_updated_only	#
 ndb_mgmd_host	#

=== added file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result	2011-09-06 10:23:37 +0000
@@ -0,0 +1,104 @@
+use test;
+create table t1 (a int primary key, b int) engine=ndb;
+create table t2 (a int primary key, b int) engine=ndb;
+Single row autocommit transactions
+Should have 1 transaction id
+insert into t1 values (1,1);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+insert into t1 values (2,2);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+update t1 set b=20 where a=2;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi row autocommit transaction
+Should have 1 transaction id
+delete from t1;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi row explicit transaction
+Should have 1 transaction id
+begin;
+insert into t1 values (3,3);
+insert into t1 values (4,4);
+insert into t1 values (5,5);
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+begin;
+insert into t1 values (6,6);
+update t1 set b=40 where a=4;
+delete from t1 where a=5;
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi table explicit transaction
+Should have 1 transaction id
+begin;
+insert into t1 values (7,7);
+insert into t2 values (7,7);
+insert into t2 values (8,8);
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multiple autocommit transactions
+Should have 2 transaction ids
+insert into t1 values (8,8);
+insert into t1 values (9,9);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+2
+reset master;
+Multiple autocommit transactions on single row
+Should have 3 transaction ids
+insert into t1 values (10,10);
+update t1 set b=100 where a=10;
+delete from t1 where a=10;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+3
+reset master;
+Multiple explicit transactions
+Should have 2 transaction ids
+begin;
+insert into t1 values (11,11);
+delete from t1;
+commit;
+begin;
+insert into t2 values (11,11);
+delete from t2;
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+2
+reset master;
+drop table t1;
+drop table t2;

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc	2011-09-06 10:23:37 +0000
@@ -0,0 +1,30 @@
+#
+# Get the mysqlbinlog tool --verbose mode to dump the Binlog contents with
+# extra row data included, e.g.
+#
+#### Extra row data format: 0, len: 8 :0x0007200006000000
+#
+# Then process this input to get the number of distinct values, and hence
+# distinct transaction ids in the binlog
+#
+
+--disable_query_log
+let $MYSQLD_DATADIR= `select @@datadir;`;
+--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/mysqld-bin.000001 > $MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql
+
+create table raw_binlog_rows (txt varchar(1000));
+create table diff_extra_data (txt varchar(1000));
+
+--eval load data local infile '$MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql' into table raw_binlog_rows columns terminated by '\n';
+
+insert into diff_extra_data select distinct(txt) from raw_binlog_rows where txt like '### Extra row data %';
+--enable_query_log
+
+--echo Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+
+--disable_query_log
+drop table diff_extra_data;
+drop table raw_binlog_rows;
+--enable_query_log
+

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt	2011-09-06 10:23:37 +0000
@@ -0,0 +1 @@
+--ndb-log-transaction-id

=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test	2011-09-06 10:23:37 +0000
@@ -0,0 +1,122 @@
+--source include/have_ndb.inc
+--source include/have_log_bin.inc
+--source include/have_debug.inc
+
+use test;
+
+create table t1 (a int primary key, b int) engine=ndb;
+create table t2 (a int primary key, b int) engine=ndb;
+
+--echo Single row autocommit transactions
+--echo Should have 1 transaction id
+insert into t1 values (1,1);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+insert into t1 values (2,2);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+update t1 set b=20 where a=2;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi row autocommit transaction
+--echo Should have 1 transaction id
+delete from t1;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi row explicit transaction
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (3,3);
+insert into t1 values (4,4);
+insert into t1 values (5,5);
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (6,6);
+update t1 set b=40 where a=4;
+delete from t1 where a=5;
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi table explicit transaction
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (7,7);
+insert into t2 values (7,7);
+insert into t2 values (8,8);
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple autocommit transactions
+--echo Should have 2 transaction ids
+insert into t1 values (8,8);
+insert into t1 values (9,9);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple autocommit transactions on single row
+--echo Should have 3 transaction ids
+insert into t1 values (10,10);
+update t1 set b=100 where a=10;
+delete from t1 where a=10;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple explicit transactions
+--echo Should have 2 transaction ids
+begin;
+insert into t1 values (11,11);
+delete from t1;
+commit;
+
+begin;
+insert into t2 values (11,11);
+delete from t2;
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+
+drop table t1;
+drop table t2;
\ No newline at end of file

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result	2011-09-06 10:23:37 +0000
@@ -66,6 +66,169 @@ select * from t1_max_delete_win;
 a	b	X
 delete from t1_old;
 delete from t1_max;
+drop table t1_old, t1_max, t1_max_delete_win;
+delete from t1_old$EX;
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+delete from t1_old$EX;
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+create table t1_old (a int primary key, b longtext, X int unsigned) engine = ndb;
+create table t1_max (a int primary key, b longtext, X int unsigned) engine = ndb;
+create table t1_max_delete_win (a int primary key, b longtext, X int unsigned) engine = ndb;
+"Test 3"
+insert into t1_old values (1, repeat('Initial X=1',1000), 1);
+insert into t1_max values (1, repeat('Initial X=1',1000), 1);
+insert into t1_max_delete_win values (1, repeat('Initial X=1',1000), 1);
+update t1_old set X = 2, b=repeat('Slave X=2',1001);
+update t1_max set X = 2, b=repeat('Slave X=2',1001);
+update t1_max_delete_win set X = 2, b=repeat('Slave X=2',1001);
+update t1_old set X = 3, b=repeat('Master X=3',1002);
+update t1_max set X = 3, b=repeat('Master X=3',1002);
+update t1_max_delete_win set X = 3, b=repeat('Master X=3',1002);
+"Expect t1_old to contain slave row, and t1_max* to contain master row"
+select a, left(b, 20), length(b), X from t1_old;
+a	left(b, 20)	length(b)	X
+1	Slave X=2Slave X=2Sl	9009	2
+select a, left(b, 20), length(b), X from t1_max;
+a	left(b, 20)	length(b)	X
+1	Master X=3Master X=3	10020	3
+select a, left(b, 20), length(b), X from t1_max_delete_win;
+a	left(b, 20)	length(b)	X
+1	Master X=3Master X=3	10020	3
+Expect t1_old to have 1 entry, and t1_max* to have no entries
+select server_id, master_server_id, count, a from t1_old$EX order by count;
+server_id	master_server_id	count	a
+2	1	1	1
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+server_id	master_server_id	count	a
+select server_id, master_server_id, count, a from t1_max_delete_win$EX order by count;
+server_id	master_server_id	count	a
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+delete from t1_old$EX;
+update t1_old set X = 3, b=repeat('Master X=3', 1002);
+"Test 4"
+update t1_old set X = 4, b=repeat('Slave X=4',2000);
+update t1_max set X = 4, b=repeat('Slave X=4',2000);
+update t1_max_delete_win set X = 4, b=repeat('Slave X=4',2000);
+delete from t1_old;
+delete from t1_max;
+delete from t1_max_delete_win;
+"Expect t1_old and t1_max to contain slave row, and t1_max_delete_win to be empty(as master)"
+select a, left(b, 20), length(b), X from t1_old;
+a	left(b, 20)	length(b)	X
+1	Slave X=4Slave X=4Sl	18000	4
+select a, left(b, 20), length(b), X from t1_max;
+a	left(b, 20)	length(b)	X
+1	Slave X=4Slave X=4Sl	18000	4
+select a, left(b, 20), length(b), X from t1_max_delete_win;
+a	left(b, 20)	length(b)	X
+Expect t1_old and t1_max to contain 1 entry, and t1_max_delete_win to be empty
+select server_id, master_server_id, count, a from t1_old$EX order by count;
+server_id	master_server_id	count	a
+2	1	2	1
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+server_id	master_server_id	count	a
+2	1	1	1
+select server_id, master_server_id, count, a from t1_max_delete_win$EX order by count;
+server_id	master_server_id	count	a
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+delete from t1_old$EX;
+delete from t1_old;
+delete from t1_max;
+delete from t1_max_delete_win;
+delete from t1_old;
+delete from t1_max;
+delete from t1_max_delete_win;
+"Test 5"
+Test that Updates affecting Blobs are rejected
+correctly on the slave
+drop table t1_max;
+create table t1_max (a int primary key, b int, c longtext, d longtext, X int unsigned) engine = ndb;
+insert into t1_max values (1, 1, repeat("B", 10000), repeat("E", 10001), 1);
+insert into t1_max values (2, 2, repeat("A", 10002), repeat("T", 10003), 1);
+update t1_max set X=20;
+Initial values on Slave
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+a	b	SHA1(c)	length(c)	SHA1(d)	length(d)	X
+1	1	4a222e18b539cdefbf0960eaa7f4362a4976e1e0	10000	9641d473ab1bd921263190eee074397084933e2d	10001	20
+2	2	f833241322c062495632923d74314a6a5c23034d	10002	2dad269dfa115f6c7e53e91a73251e597aab8fe9	10003	20
+Originate update which will be rejected
+update t1_max set c=repeat("Z", 10006), d=repeat("I", 10005), X=2 where a=1;
+Check slave has rejected due to lower version
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+a	b	SHA1(c)	length(c)	SHA1(d)	length(d)	X
+1	1	4a222e18b539cdefbf0960eaa7f4362a4976e1e0	10000	9641d473ab1bd921263190eee074397084933e2d	10001	20
+2	2	f833241322c062495632923d74314a6a5c23034d	10002	2dad269dfa115f6c7e53e91a73251e597aab8fe9	10003	20
+Originate delete which will be rejected (due to NDB-OLD) algorith
+delete from t1_max where a=1;
+Check slave has rejected due to before image mismatch
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+a	b	SHA1(c)	length(c)	SHA1(d)	length(d)	X
+1	1	4a222e18b539cdefbf0960eaa7f4362a4976e1e0	10000	9641d473ab1bd921263190eee074397084933e2d	10001	20
+2	2	f833241322c062495632923d74314a6a5c23034d	10002	2dad269dfa115f6c7e53e91a73251e597aab8fe9	10003	20
+Originate insert which will be rejected (as row exists)
+insert into t1_max values (1, 1, repeat("R", 10004), repeat("A", 10007), 1);
+Check slave has rejected due to row existing already
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+a	b	SHA1(c)	length(c)	SHA1(d)	length(d)	X
+1	1	4a222e18b539cdefbf0960eaa7f4362a4976e1e0	10000	9641d473ab1bd921263190eee074397084933e2d	10001	20
+2	2	f833241322c062495632923d74314a6a5c23034d	10002	2dad269dfa115f6c7e53e91a73251e597aab8fe9	10003	20
+Expect t1_max to have 3 entries
+select server_id, master_server_id, count, a from t1_old$EX order by count;
+server_id	master_server_id	count	a
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+server_id	master_server_id	count	a
+2	1	1	1
+2	1	2	1
+2	1	3	1
+select server_id, master_server_id, count, a from t1_max_delete_win$EX order by count;
+server_id	master_server_id	count	a
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+delete from t1_old$EX;
+Test 6
+Check that non-Blob related operations in a batch with a Blob
+operation are still subject to conflict detection.
+
+insert into mysql.ndb_replication values ("test", "t2_max", 0, 7, "NDB$MAX(X)");
+create table `t2_max$EX`
+  (server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t2_max (a int primary key, b int, X bigint unsigned) engine=ndb;
+insert into t2_max values (1,1,10), (2,2,10), (3,3,10), (4,4,10), (5,5,10);
+Now issue a transaction with a successful Blob op, and unsuccessful
+non-Blob op.  Check that the Blob op succeeds, and the unsuccessful
+non-Blob op is handled as expected.
+begin;
+update t2_max set b=b+1, X=1 where a=3;
+update t1_max set c=repeat("R", 10008), d=repeat("A", 10009), X = 21 where a=1;
+commit;
+Contents on Slave
+Expect Blob data applied to t1_max, no update applied to t2_max
+select a,b,left(c,1), length(c), left(d,1), length(d), X from t1_max where a=1;
+a	b	left(c,1)	length(c)	left(d,1)	length(d)	X
+1	1	R	10008	A	10009	21
+select * from t2_max order by a;
+a	b	X
+1	1	10
+2	2	10
+3	3	10
+4	4	10
+5	5	10
+Expect No conflict in t1_max, 1 conflict in t2_max
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+server_id	master_server_id	count	a
+select server_id, master_server_id, count, a from t2_max$EX order by count;
+server_id	master_server_id	count	a
+2	1	1	3
+drop table t2_max, t2_max$EX;
 "Cleanup"
 drop table mysql.ndb_replication;
 drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;

=== added file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result	2011-09-06 10:23:37 +0000
@@ -0,0 +1,914 @@
+include/master-slave.inc
+[connection master]
+Setup circular replication
+RESET MASTER;
+select @slave_server_id:=(variable_value+0)
+from information_schema.global_variables
+where variable_name like 'server_id';
+@slave_server_id:=(variable_value+0)
+3
+CHANGE MASTER TO master_host="127.0.0.1",master_port=SLAVE_PORT,master_user="root";
+START SLAVE;
+select @master_server_id:=(variable_value+0)
+from information_schema.global_variables
+where variable_name like 'server_id';
+@master_server_id:=(variable_value+0)
+1
+Setup ndb_replication and t1 exceptions table
+Populate ndb_replication table as necessary
+replace into mysql.ndb_replication values
+("test", "t1", 3, 7, NULL),
+("test", "t1", 1, 7, "NDB$EPOCH_TRANS()");
+replace into mysql.ndb_replication values
+("test", "t2", 3, 7, NULL),
+("test", "t2", 1, 7, "NDB$EPOCH_TRANS()");
+create table test.t1 (
+a int primary key,
+b varchar(2000)) engine=ndb;
+create table test.t2 (
+a int primary key,
+b varchar(2000)) engine=ndb;
+Add some data
+insert into test.t1 values
+(1, "Initial data 1"),
+(2, "Initial data 2"),
+(3, "Initial data 3"),
+(4, "Initial data 4"),
+(5, "Initial data 5"),
+(6, "Initial data 6"),
+(7, "Initial data 7"),
+(8, "Initial data 8"),
+(9, "Initial data 9"),
+(10, "Initial data 10");
+Show basic row-level conflict detection
+---------------------------------------
+stop slave;
+update t1 set b="Primary first change 2" where a=2;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+update t1 set b="Secondary first change 2" where a=2;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Secondary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Primary should have rejected change from Secondary, keeping its value
+select * from t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+1
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should have been realigned to Primary
+select * from t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Initial data 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Show rollback of whole secondary transaction
+--------------------------------------------
+stop slave;
+update t1 set b="Primary second change 4" where a=4;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+begin;
+update t1 set b="Secondary second change 4" where a=4;
+update t1 set b="Secondary second change 5" where a=5;
+commit;
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Secondary second change 4
+5	Secondary second change 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Primary should have rejected secondary changes on both rows
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+a	b
+1	Initial data 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Show rollback of dependent transaction as well
+----------------------------------------------
+stop slave;
+update t1 set b="Primary third change 1" where a=1;
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+begin;
+update t1 set b="Secondary third change 3" where a=3;
+update t1 set b="Secondary third change 1" where a=1;
+commit;
+begin;
+update t1 set b="Secondary fourth change 3" where a=3;
+insert into t1 values (11,"Secondary fourth change 11");
+commit;
+select * from test.t1 order by a;
+a	b
+1	Secondary third change 1
+2	Primary first change 2
+3	Secondary fourth change 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+11	Secondary fourth change 11
+Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+ndb_conflict_trans_row_reject_count
+4
+ndb_conflict_trans_reject_count
+2
+start slave;
+Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Initial data 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+Show rollback of dependent transaction across different tables
+--------------------------------------------------------------
+stop slave;
+update t1 set b="Primary fifth change 6" where a=6;
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+begin;
+update t1 set b="Secondary fifth change 6" where a=6;
+insert into t2 values (1, "Secondary fifth change 1");
+insert into t2 values (2, "Secondary fifth change 2");
+commit;
+begin;
+update t2 set b="Secondary sixth change 1" where a=2;
+insert into t2 values (3, "Secondary sixth change 2");
+commit;
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Secondary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+select * from test.t2 order by a;
+a	b
+1	Secondary fifth change 1
+2	Secondary sixth change 1
+3	Secondary sixth change 2
+Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+select * from test.t2 order by a;
+a	b
+ndb_conflict_trans_row_reject_count
+5
+ndb_conflict_trans_reject_count
+2
+start slave;
+Secondary should have been realigned to primary
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Initial data 10
+select * from test.t2 order by a;
+a	b
+Show that whole epoch is not rolled back
+----------------------------------------
+stop slave;
+update t1 set b="Primary is king" where a=10;
+begin;
+update t1 set b="Secondary is emperor" where a=10;
+insert into t1 values (11, "Secondary is pleni-potentiary");
+commit;
+begin;
+insert into t1 values (12, "Secondary ruled once");
+insert into t1 values (13, "This history will not be lost");
+insert into t1 values (14, "Look on my works ye mighty, and despair");
+commit;
+Primary should have rejected conflicting trans (emperor, pleni-potentiary)
+but accepted unrelated trans (history)
+select * from t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should be aligned with Primary
+select * from t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	Initial data 8
+9	Initial data 9
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+Show that non-conflicting ancestors are not implicated
+------------------------------------------------------
+stop slave;
+update t1 set b="7 : Primary is king" where a=7;
+Primary state
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Primary is king
+8	Initial data 8
+9	Initial data 9
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+begin;
+update t1 set b="8 : Secondary innocent" where a=8;
+update t1 set b="9 : Secondary innocent" where a=9;
+commit;
+Secondary with innocent
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	Initial data 7
+8	8 : Secondary innocent
+9	9 : Secondary innocent
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+begin;
+update t1 set b="9 : Secondary guilty" where a=9;
+update t1 set b="7 : Secondary guilty" where a=7;
+commit;
+Secondary with guilty overlaid
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Secondary guilty
+8	8 : Secondary innocent
+9	9 : Secondary guilty
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+Primary cluster should have rejected 'guilty' secondary transaction, but
+accepted 'innocent' secondary transaction.
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Primary is king
+8	8 : Secondary innocent
+9	9 : Secondary innocent
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary cluster should be realigned with Primary
+select * from test.t1 order by a;
+a	b
+1	Primary third change 1
+2	Primary first change 2
+3	Initial data 3
+4	Primary second change 4
+5	Initial data 5
+6	Primary fifth change 6
+7	7 : Primary is king
+8	8 : Secondary innocent
+9	9 : Secondary innocent
+10	Primary is king
+12	Secondary ruled once
+13	This history will not be lost
+14	Look on my works ye mighty, and despair
+Classic banking example
+-----------------------
+replace into mysql.ndb_replication values
+("test", "balances", 3, 7, NULL),
+("test", "balances", 1, 7, "NDB$EPOCH_TRANS()");
+replace into mysql.ndb_replication values
+("test", "transactions", 3, 7, NULL),
+("test", "transactions", 1, 7, "NDB$EPOCH_TRANS()");
+create table test.balances
+(name     varchar(100) primary key,
+balance  int) engine=ndb;
+create table test.transactions$EX
+(server_id             int unsigned,
+master_server_id      int unsigned,
+master_epoch          bigint unsigned,
+count                 int unsigned,
+auto_key              int not null,
+from_name             varchar(100) not null,
+to_name               varchar(100) not null,
+detail                varchar(100) not null,
+primary key(server_id, master_server_id, master_epoch, count))
+engine=ndb;
+create table test.transactions
+(auto_key      int auto_increment,
+from_name     varchar(100),
+to_name       varchar(100),
+detail        varchar(100),
+amount        int,
+primary key(auto_key, from_name, to_name, detail)) engine=ndb;
+Initialise balances across both bank sites
+insert into test.balances values
+("Larry", 100),
+("Employee-1", 0),
+("Employee-2", 0),
+("Yacht dealer", 0),
+("Newsagent", 0);
+FLUSH LOGS;
+Bank sites are disconnected
+stop slave;
+Larry buys a yacht using Primary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Yacht dealer", "Yacht purchase", 50);
+update test.balances set balance = balance - 50 where name = "Larry";
+update test.balances set balance = balance + 50 where name = "Yacht dealer";
+commit;
+Show yacht transaction records
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Yacht dealer	Yacht purchase	50
+select * from test.balances order by name;
+name	balance
+Employee-1	0
+Employee-2	0
+Larry	50
+Newsagent	0
+Yacht dealer	50
+Larry pays employees using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Employee-1", "Payment to Employee-1", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-1";
+commit;
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Employee-2", "Payment to Employee-2", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-2";
+commit;
+Employee-2 buys yacht magazine using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Employee-2", "Newsagent", "Buy yacht magazine", 1);
+update test.balances set balance = balance - 1 where name = "Employee-2";
+update test.balances set balance = balance + 1 where name = "Newsagent";
+commit;
+Show employee transactions
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Employee-1	Payment to Employee-1	1
+2	Larry	Employee-2	Payment to Employee-2	1
+3	Employee-2	Newsagent	Buy yacht magazine	1
+select * from test.balances order by name;
+name	balance
+Employee-1	1
+Employee-2	0
+Larry	98
+Newsagent	1
+Yacht dealer	0
+Bank sites re-connected
+start slave;
+Records at Primary bank site
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Yacht dealer	Yacht purchase	50
+select * from test.balances order by name;
+name	balance
+Employee-1	0
+Employee-2	0
+Larry	50
+Newsagent	0
+Yacht dealer	50
+Exceptions at Primary bank site
+select server_id, master_server_id, count, auto_key, from_name, to_name, detail
+from test.transactions$EX order by count;
+server_id	master_server_id	count	auto_key	from_name	to_name	detail
+1	3	1	1	Larry	Employee-1	Payment to Employee-1
+1	3	2	2	Larry	Employee-2	Payment to Employee-2
+1	3	3	3	Employee-2	Newsagent	Buy yacht magazine
+Conflict handling activity at Primary bank site
+Expect :
+1 conflict from slave T1 on Larry's balance
+1 conflict from slave T2 on Larry's balance
+=2 row conflicts
+
+3 (user) transactions rejected
+9 rows rejected (3 per transaction)
+Variability : # epoch transactions, # row conflicts detected
+1-3                   2-3
+# detect_iter_count
+1-3
+We only check stable values
+ndb_conflict_trans_row_reject_count
+9
+ndb_conflict_trans_reject_count
+3
+Records at Secondary bank site
+select * from test.transactions order by auto_key;
+auto_key	from_name	to_name	detail	amount
+1	Larry	Yacht dealer	Yacht purchase	50
+select * from test.balances order by name;
+name	balance
+Employee-1	0
+Employee-2	0
+Larry	50
+Newsagent	0
+Yacht dealer	50
+drop table test.balances;
+drop table test.transactions;
+drop table test.transactions$EX;
+Test mixing transactional and non transactional
+-----------------------------------------------
+Remove old data from t1
+delete from test.t1;
+Define table with row-based epoch detection
+replace into mysql.ndb_replication values
+("test", "t3", 3, 7, NULL),
+("test", "t3", 1, 7, 'NDB$EPOCH()');
+create table t3 (a int primary key, b int) engine=ndb;
+create table t4 (a int primary key, b int) engine=ndb;
+create table t5 (a int primary key, b longtext) engine=ndb;
+Insert some data
+insert into test.t1 values
+(1,1),
+(2,2),
+(3,3),
+(4,4),
+(5,5),
+(6,6);
+insert into test.t3 values
+(11,11),
+(12,12),
+(13,13),
+(14,14),
+(15,15),
+(16,16);
+insert into test.t4 values
+(21,21),
+(22,22),
+(23,23),
+(24,24),
+(25,25),
+(26,26);
+insert into test.t5 values
+(1, REPEAT("B", 10000)),
+(2, REPEAT("E", 10000)),
+(3, REPEAT("A", 10000));
+Allow to propagate
+FLUSH LOGS;
+Case 1 : Transactional detection affects row - based entries in same trans
+stop slave;
+update test.t1 set b=100 where a=1;
+begin;
+update test.t3 set b=1100 where a=11;
+update test.t4 set b=2100 where a=21;
+update test.t1 set b=1000 where a=1;
+commit;
+Show slave transaction effect
+select * from test.t1 order by a;
+a	b
+1	1000
+2	2
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	1100
+12	12
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	2100
+22	22
+23	23
+24	24
+25	25
+26	26
+Expect Primary to have rejected whole trans across 3 tables
+select * from test.t1 order by a;
+a	b
+1	100
+2	2
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	12
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	22
+23	23
+24	24
+25	25
+26	26
+Expect 1 transaction rejected, 3 rows rejected
+1 conflict row, 1 epoch, 1 iteration
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+3
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+Now restart rep to Secondary, and check realignment
+start slave;
+select * from test.t1 order by a;
+a	b
+1	100
+2	2
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	12
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	22
+23	23
+24	24
+25	25
+26	26
+Case 2 : Row based detection does not affect other transaction entries
+stop slave;
+update test.t3 set b=1200 where a=12;
+begin;
+update test.t3 set b=1201 where a=12;
+update test.t4 set b=2200 where a=22;
+update test.t1 set b=2000 where a=2;
+commit;
+Show effect of transaction on Secondary
+select * from test.t1 order by a;
+a	b
+1	100
+2	2000
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	1201
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	2200
+23	23
+24	24
+25	25
+26	26
+Show effect of transaction on Primary
+Only t3 should have been reverted
+select * from test.t1 order by a;
+a	b
+1	100
+2	2000
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	1200
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	2200
+23	23
+24	24
+25	25
+26	26
+Expect all counters to be zero
+ndb_conflict_fn_epoch_trans
+0
+ndb_conflict_trans_row_conflict_count
+0
+ndb_conflict_trans_row_reject_count
+0
+ndb_conflict_trans_reject_count
+0
+ndb_conflict_trans_detect_iter_count
+0
+ndb_conflict_trans_conflict_commit_count
+0
+Show effect of transaction on Secondary
+start slave;
+select * from test.t1 order by a;
+a	b
+1	100
+2	2000
+3	3
+4	4
+5	5
+6	6
+select * from test.t3 order by a;
+a	b
+11	11
+12	1200
+13	13
+14	14
+15	15
+16	16
+select * from test.t4 order by a;
+a	b
+21	21
+22	2200
+23	23
+24	24
+25	25
+26	26
+flush logs;
+Case 3 : Check behaviour where table with Blob is implicated
+in transactional conflict.  Should result in Slave
+stopping with an error.
+STOP SLAVE;
+Setup warning suppression
+begin;
+update t1 set b= 11 where a=1;
+commit;
+begin;
+update t1 set b= 111 where a=1;
+update t1 set b= 222 where a=2;
+update t5 set b= REPEAT("T", 10000) where a=3;
+commit;
+Show effect of transaction on Secondary
+select * from test.t1 order by a;
+a	b
+1	111
+2	222
+3	3
+4	4
+5	5
+6	6
+select left(b,1), length(b) from test.t5 order by a;
+left(b,1)	length(b)
+B	10000
+E	10000
+T	10000
+Check that Primary Slave has stopped
+include/wait_for_slave_sql_error.inc [errno=1296]
+Restart Primary Slave
+set global sql_slave_skip_counter=1;
+START SLAVE;
+Restart Secondary Slave
+START SLAVE;
+flush logs;
+drop table test.t3;
+drop table test.t4;
+drop table test.t5;
+drop table mysql.ndb_replication;
+drop table test.t1;
+drop table test.t2;
+flush logs;
+stop slave;
+reset slave;
+change master to master_host='';
+include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test	2011-09-06 10:23:37 +0000
@@ -105,6 +105,209 @@ select * from t1_max_delete_win;
 delete from t1_old;
 delete from t1_max;
 
+--connection master
+
+# Now test with Blobs
+drop table t1_old, t1_max, t1_max_delete_win;
+delete from t1_old$EX;
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+
+--sync_slave_with_master
+--connection slave
+# Delete on slave, as $EX table ops don't replicate
+delete from t1_old$EX;
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+
+--connection master
+
+create table t1_old (a int primary key, b longtext, X int unsigned) engine = ndb;
+create table t1_max (a int primary key, b longtext, X int unsigned) engine = ndb;
+create table t1_max_delete_win (a int primary key, b longtext, X int unsigned) engine = ndb;
+
+--sync_slave_with_master
+
+###############
+--echo "Test 3"
+
+--connection master
+insert into t1_old values (1, repeat('Initial X=1',1000), 1);
+insert into t1_max values (1, repeat('Initial X=1',1000), 1);
+insert into t1_max_delete_win values (1, repeat('Initial X=1',1000), 1);
+--sync_slave_with_master
+
+--connection slave
+update t1_old set X = 2, b=repeat('Slave X=2',1001);
+update t1_max set X = 2, b=repeat('Slave X=2',1001);
+update t1_max_delete_win set X = 2, b=repeat('Slave X=2',1001);
+
+--connection master
+update t1_old set X = 3, b=repeat('Master X=3',1002);
+update t1_max set X = 3, b=repeat('Master X=3',1002);
+update t1_max_delete_win set X = 3, b=repeat('Master X=3',1002);
+--sync_slave_with_master
+
+--connection slave
+--echo "Expect t1_old to contain slave row, and t1_max* to contain master row"
+select a, left(b, 20), length(b), X from t1_old;
+select a, left(b, 20), length(b), X from t1_max;
+select a, left(b, 20), length(b), X from t1_max_delete_win;
+
+--echo Expect t1_old to have 1 entry, and t1_max* to have no entries
+select server_id, master_server_id, count, a from t1_old$EX order by count;
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+select server_id, master_server_id, count, a from t1_max_delete_win$EX order by count;
+
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+delete from t1_old$EX;
+
+# syncronize
+update t1_old set X = 3, b=repeat('Master X=3', 1002);
+
+###############
+--echo "Test 4"
+
+--connection slave
+update t1_old set X = 4, b=repeat('Slave X=4',2000);
+update t1_max set X = 4, b=repeat('Slave X=4',2000);
+update t1_max_delete_win set X = 4, b=repeat('Slave X=4',2000);
+
+--connection master
+delete from t1_old;
+delete from t1_max;
+delete from t1_max_delete_win;
+--sync_slave_with_master
+
+--connection slave
+--echo "Expect t1_old and t1_max to contain slave row, and t1_max_delete_win to be empty(as master)"
+select a, left(b, 20), length(b), X from t1_old;
+select a, left(b, 20), length(b), X from t1_max;
+select a, left(b, 20), length(b), X from t1_max_delete_win;
+
+--echo Expect t1_old and t1_max to contain 1 entry, and t1_max_delete_win to be empty
+select server_id, master_server_id, count, a from t1_old$EX order by count;
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+select server_id, master_server_id, count, a from t1_max_delete_win$EX order by count;
+
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+delete from t1_old$EX;
+
+delete from t1_old;
+delete from t1_max;
+delete from t1_max_delete_win;
+
+--connection master
+delete from t1_old;
+delete from t1_max;
+delete from t1_max_delete_win;
+
+#################
+--echo "Test 5"
+
+--echo Test that Updates affecting Blobs are rejected
+--echo correctly on the slave
+drop table t1_max;
+create table t1_max (a int primary key, b int, c longtext, d longtext, X int unsigned) engine = ndb;
+
+insert into t1_max values (1, 1, repeat("B", 10000), repeat("E", 10001), 1);
+insert into t1_max values (2, 2, repeat("A", 10002), repeat("T", 10003), 1);
+
+--sync_slave_with_master
+--connection slave
+
+# Bump up tuple versions
+update t1_max set X=20;
+
+--echo Initial values on Slave
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+
+--connection master
+--echo Originate update which will be rejected
+update t1_max set c=repeat("Z", 10006), d=repeat("I", 10005), X=2 where a=1;
+
+--sync_slave_with_master
+--connection slave
+--echo Check slave has rejected due to lower version
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+
+--connection master
+--echo Originate delete which will be rejected (due to NDB-OLD) algorith
+delete from t1_max where a=1;
+
+--sync_slave_with_master
+--connection slave
+--echo Check slave has rejected due to before image mismatch
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+
+--connection master
+--echo Originate insert which will be rejected (as row exists)
+insert into t1_max values (1, 1, repeat("R", 10004), repeat("A", 10007), 1);
+
+--sync_slave_with_master
+--connection slave
+--echo Check slave has rejected due to row existing already
+select a,b,SHA1(c),length(c), SHA1(d), length(d), X from t1_max order by a;
+
+--echo Expect t1_max to have 3 entries
+select server_id, master_server_id, count, a from t1_old$EX order by count;
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+select server_id, master_server_id, count, a from t1_max_delete_win$EX order by count;
+
+delete from t1_max$EX;
+delete from t1_max_delete_win$EX;
+delete from t1_old$EX;
+
+--connection master
+
+#######
+--echo Test 6
+--echo Check that non-Blob related operations in a batch with a Blob
+--echo operation are still subject to conflict detection.
+--echo
+insert into mysql.ndb_replication values ("test", "t2_max", 0, 7, "NDB$MAX(X)");
+
+create table `t2_max$EX`
+  (server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   a int not null,
+   primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+
+create table t2_max (a int primary key, b int, X bigint unsigned) engine=ndb;
+
+insert into t2_max values (1,1,10), (2,2,10), (3,3,10), (4,4,10), (5,5,10);
+
+--sync_slave_with_master
+
+--connection master
+--echo Now issue a transaction with a successful Blob op, and unsuccessful
+--echo non-Blob op.  Check that the Blob op succeeds, and the unsuccessful
+--echo non-Blob op is handled as expected.
+
+begin;
+update t2_max set b=b+1, X=1 where a=3; # conflicts
+update t1_max set c=repeat("R", 10008), d=repeat("A", 10009), X = 21 where a=1; # ok
+commit;
+
+--sync_slave_with_master
+
+--connection slave
+--echo Contents on Slave
+--echo Expect Blob data applied to t1_max, no update applied to t2_max
+select a,b,left(c,1), length(c), left(d,1), length(d), X from t1_max where a=1;
+select * from t2_max order by a;
+
+--echo Expect No conflict in t1_max, 1 conflict in t2_max$EX
+select server_id, master_server_id, count, a from t1_max$EX order by count;
+select server_id, master_server_id, count, a from t2_max$EX order by count;
+
+--connection master
+drop table t2_max, t2_max$EX;
+
 
 ###############
 --echo "Cleanup"

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf	2011-09-06 10:23:37 +0000
@@ -0,0 +1,47 @@
+# Could be :
+#  !include ndb_rpl_conflict_epoch.cnf
+#  [mysqld]
+#  server-id-bits=24
+#  ndb-serverid-transid-bits=8
+
+!include ../my.cnf
+
+# 2 clusters, each with 2 MySQLDs
+# All MySQLDs log-slave-updates
+# Potential infinite loops are broken by both servers
+# on each cluster having the same server-id
+
+[mysqld]
+log-slave-updates
+ndb-log-apply-status
+ndb-log-transaction-id
+
+[mysqld.1.1]
+server-id= 1
+log-bin = pref-master-1
+
+[mysqld.2.1]
+server-id= 2
+log-bin = pref-master-2
+
+[mysqld.1.slave]
+server-id= 3
+log-bin = sec-master-1
+skip-slave-start
+
+[mysqld.2.slave]
+server-id= 4
+log-bin = sec-master-2
+master-host=		127.0.0.1
+master-port=		@mysqld.2.1.port
+master-password=	@mysqld.2.1.#password
+master-user=		@mysqld.2.1.#user
+master-connect-retry=	1
+init-rpl-role=		slave
+skip-slave-start
+ndb_connectstring=	@mysql_cluster.slave.ndb_connectstring
+
+[ENV]
+
+SLAVE_MYPORT1=		@mysqld.2.slave.port
+SLAVE_MYSOCK1=		@mysqld.2.slave.socket

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test	2011-09-06 10:23:37 +0000
@@ -0,0 +1,746 @@
+#
+# Test engine native conflict resolution for ndb
+#   NDB$EPOCH_TRANS() function
+#
+#
+--source include/have_ndb.inc
+--source include/have_binlog_format_mixed_or_row.inc
+--source suite/ndb_rpl/ndb_master-slave.inc
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Setup circular replication
+
+--connection slave
+RESET MASTER;
+select @slave_server_id:=(variable_value+0)
+       from information_schema.global_variables
+       where variable_name like 'server_id';
+let $SLAVE_SERVER_ID= query_get_value('select @slave_server_id as v',v,1);
+
+--connection master
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+--eval CHANGE MASTER TO master_host="127.0.0.1",master_port=$SLAVE_MYPORT,master_user="root"
+START SLAVE;
+select @master_server_id:=(variable_value+0)
+       from information_schema.global_variables
+       where variable_name like 'server_id';
+let $MASTER_SERVER_ID= query_get_value('select @master_server_id as v',v,1);
+
+--echo Setup ndb_replication and t1$EX exceptions table
+
+--disable_warnings
+--disable_query_log
+--connection master
+drop table if exists mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+  (db VARBINARY(63),
+   table_name VARBINARY(63),
+   server_id INT UNSIGNED,
+   binlog_type INT UNSIGNED,
+   conflict_fn VARBINARY(128),
+   PRIMARY KEY USING HASH (db,table_name,server_id))
+  ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
+--enable_query_log
+
+--echo Populate ndb_replication table as necessary
+eval replace into mysql.ndb_replication values
+  ("test", "t1", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "t1", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+eval replace into mysql.ndb_replication values
+  ("test", "t2", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "t2", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+create table test.t1 (
+  a int primary key,
+  b varchar(2000)) engine=ndb;
+
+create table test.t2 (
+  a int primary key,
+  b varchar(2000)) engine=ndb;
+
+--sync_slave_with_master slave
+
+--connection master
+--echo Add some data
+insert into test.t1 values
+ (1, "Initial data 1"),
+ (2, "Initial data 2"),
+ (3, "Initial data 3"),
+ (4, "Initial data 4"),
+ (5, "Initial data 5"),
+ (6, "Initial data 6"),
+ (7, "Initial data 7"),
+ (8, "Initial data 8"),
+ (9, "Initial data 9"),
+ (10, "Initial data 10");
+
+--echo Show basic row-level conflict detection
+--echo ---------------------------------------
+--sync_slave_with_master slave
+--connection slave
+
+stop slave;
+
+--connection master
+
+update t1 set b="Primary first change 2" where a=2;
+select * from test.t1 order by a;
+
+--connection slave
+
+update t1 set b="Secondary first change 2" where a=2;
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--echo Primary should have rejected change from Secondary, keeping its value
+
+select * from t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+
+start slave;
+
+--connection master
+
+--sync_slave_with_master slave
+
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+
+select * from t1 order by a;
+
+--echo Show rollback of whole secondary transaction
+--echo --------------------------------------------
+
+--connection slave
+
+stop slave;
+
+--connection master
+update t1 set b="Primary second change 4" where a=4;
+
+select * from test.t1 order by a;
+
+--connection slave
+begin;
+update t1 set b="Secondary second change 4" where a=4;
+update t1 set b="Secondary second change 5" where a=5;
+commit;
+
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--connection master
+
+--echo Primary should have rejected secondary changes on both rows
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+
+--echo Show rollback of dependent transaction as well
+--echo ----------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="Primary third change 1" where a=1;
+
+select * from test.t1 order by a;
+
+--connection slave
+
+begin;
+update t1 set b="Secondary third change 3" where a=3;
+update t1 set b="Secondary third change 1" where a=1; # Conflict here
+commit;
+begin;
+update t1 set b="Secondary fourth change 3" where a=3; # Dependency on conflict here
+insert into t1 values (11,"Secondary fourth change 11");
+commit;
+
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+
+select * from test.t1 order by a;
+
+
+--echo Show rollback of dependent transaction across different tables
+--echo --------------------------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+
+update t1 set b="Primary fifth change 6" where a=6;
+
+select * from test.t1 order by a;
+
+--connection slave
+
+begin;
+update t1 set b="Secondary fifth change 6" where a=6; # Conflict row
+insert into t2 values (1, "Secondary fifth change 1");
+insert into t2 values (2, "Secondary fifth change 2");
+commit;
+begin;
+update t2 set b="Secondary sixth change 1" where a=2; # Dependent row
+insert into t2 values (3, "Secondary sixth change 2");
+commit;
+
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+--connection master
+
+--sync_slave_with_master slave
+
+--echo Secondary should have been realigned to primary
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--echo Show that whole epoch is not rolled back
+--echo ----------------------------------------
+# Whole epoch is rolled back when --ndb-serverid-transid-bits is 0!
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="Primary is king" where a=10;
+
+--connection slave
+begin;
+update t1 set b="Secondary is emperor" where a=10;
+insert into t1 values (11, "Secondary is pleni-potentiary");
+commit;
+
+begin;
+insert into t1 values (12, "Secondary ruled once");
+insert into t1 values (13, "This history will not be lost");
+insert into t1 values (14, "Look on my works ye mighty, and despair");
+commit;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected conflicting trans (emperor, pleni-potentiary)
+--echo but accepted unrelated trans (history)
+
+select * from t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary should be aligned with Primary
+
+select * from t1 order by a;
+
+
+--echo Show that non-conflicting ancestors are not implicated
+--echo ------------------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="7 : Primary is king" where a=7;
+
+--echo Primary state
+select * from test.t1 order by a;
+
+--connection slave
+
+# 'Innocent' secondary transaction
+begin;
+update t1 set b="8 : Secondary innocent" where a=8;
+update t1 set b="9 : Secondary innocent" where a=9;
+commit;
+
+--echo Secondary with innocent
+select * from test.t1 order by a;
+
+# 'Guilty secondary transaction, affecting one of the same rows as innocent
+begin;
+update t1 set b="9 : Secondary guilty" where a=9; # Dependency on innocent
+update t1 set b="7 : Secondary guilty" where a=7; # Conflict row
+commit;
+
+--echo Secondary with guilty overlaid
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--connection master
+
+--echo Primary cluster should have rejected 'guilty' secondary transaction, but
+--echo accepted 'innocent' secondary transaction.
+
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary cluster should be realigned with Primary
+
+select * from test.t1 order by a;
+
+
+--connection master
+
+--echo Classic banking example
+--echo -----------------------
+
+eval replace into mysql.ndb_replication values
+  ("test", "balances", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "balances", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+# Transactions table may not need conflict-detection?
+eval replace into mysql.ndb_replication values
+  ("test", "transactions", $SLAVE_SERVER_ID, 7, NULL),
+  ("test", "transactions", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+create table test.balances
+(name     varchar(100) primary key,
+ balance  int) engine=ndb;
+
+create table test.transactions$EX
+ (server_id             int unsigned,
+  master_server_id      int unsigned,
+  master_epoch          bigint unsigned,
+  count                 int unsigned,
+  auto_key              int not null,
+  from_name             varchar(100) not null,
+  to_name               varchar(100) not null,
+  detail                varchar(100) not null,
+  primary key(server_id, master_server_id, master_epoch, count))
+engine=ndb;
+
+
+create table test.transactions
+(auto_key      int auto_increment,
+ from_name     varchar(100),
+ to_name       varchar(100),
+ detail        varchar(100),
+ amount        int,
+ primary key(auto_key, from_name, to_name, detail)) engine=ndb;
+
+--echo Initialise balances across both bank sites
+insert into test.balances values
+ ("Larry", 100),
+ ("Employee-1", 0),
+ ("Employee-2", 0),
+ ("Yacht dealer", 0),
+ ("Newsagent", 0);
+
+--sync_slave_with_master slave
+--connection slave
+# Sync back to master, to ensure that what follows on slave,
+# is in a separate epoch transaction.
+# This is needed to get stable counts, not for correctness
+#
+FLUSH LOGS; # To give a position to sync
+--sync_slave_with_master master
+
+
+--echo Bank sites are disconnected
+--connection slave
+stop slave;
+--connection master
+
+--echo Larry buys a yacht using Primary bank site
+
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Larry", "Yacht dealer", "Yacht purchase", 50);
+update test.balances set balance = balance - 50 where name = "Larry";
+update test.balances set balance = balance + 50 where name = "Yacht dealer";
+commit;
+
+--echo Show yacht transaction records
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--connection slave
+--echo Larry pays employees using Secondary bank site
+
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Larry", "Employee-1", "Payment to Employee-1", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-1";
+commit;
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Larry", "Employee-2", "Payment to Employee-2", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-2";
+commit;
+
+--echo Employee-2 buys yacht magazine using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+  values ("Employee-2", "Newsagent", "Buy yacht magazine", 1);
+update test.balances set balance = balance - 1 where name = "Employee-2";
+update test.balances set balance = balance + 1 where name = "Newsagent";
+commit;
+
+--echo Show employee transactions
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--sync_slave_with_master master
+
+--echo Bank sites re-connected
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+
+--connection master
+
+--echo Records at Primary bank site
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--echo Exceptions at Primary bank site
+
+select server_id, master_server_id, count, auto_key, from_name, to_name, detail
+  from test.transactions$EX order by count;
+
+--echo Conflict handling activity at Primary bank site
+--echo Expect :
+--echo   1 conflict from slave T1 on Larry's balance
+--echo   1 conflict from slave T2 on Larry's balance
+--echo  =2 row conflicts
+--echo
+--echo 3 (user) transactions rejected
+--echo 9 rows rejected (3 per transaction)
+--echo Variability : # epoch transactions, # row conflicts detected
+--echo               1-3                   2-3
+--echo               # detect_iter_count
+--echo               1-3
+--echo We only check stable values
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+
+--echo Records at Secondary bank site
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--sync_slave_with_master master
+
+--connection master
+drop table test.balances;
+drop table test.transactions;
+drop table test.transactions$EX;
+
+--echo Test mixing transactional and non transactional
+--echo -----------------------------------------------
+--echo Remove old data from t1
+--connection master
+delete from test.t1;
+--sync_slave_with_master slave
+--connection master
+
+--echo Define table with row-based epoch detection
+eval replace into mysql.ndb_replication values
+           ("test", "t3", $SLAVE_SERVER_ID, 7, NULL),
+           ("test", "t3", $MASTER_SERVER_ID, 7, 'NDB\$EPOCH()');
+
+create table t3 (a int primary key, b int) engine=ndb;
+create table t4 (a int primary key, b int) engine=ndb;
+create table t5 (a int primary key, b longtext) engine=ndb;
+
+--echo Insert some data
+
+insert into test.t1 values
+  (1,1),
+  (2,2),
+  (3,3),
+  (4,4),
+  (5,5),
+  (6,6);
+
+insert into test.t3 values
+  (11,11),
+  (12,12),
+  (13,13),
+  (14,14),
+  (15,15),
+  (16,16);
+
+insert into test.t4 values
+  (21,21),
+  (22,22),
+  (23,23),
+  (24,24),
+  (25,25),
+  (26,26);
+
+insert into test.t5 values
+  (1, REPEAT("B", 10000)),
+  (2, REPEAT("E", 10000)),
+  (3, REPEAT("A", 10000));
+
+--echo Allow to propagate
+--sync_slave_with_master slave
+
+--connection slave
+FLUSH LOGS;  # Ensure Inserts are in previous epoch trans to what follows
+--sync_slave_with_master master
+
+--echo Case 1 : Transactional detection affects row - based entries in same trans
+--connection slave
+stop slave;
+--connection master
+update test.t1 set b=100 where a=1;
+
+--connection slave
+# t3 is in a table without trans conflict detection (but with row based)
+# t4 is in a table without any detection
+# t1 is in a table with trans conflict detection
+begin;
+update test.t3 set b=1100 where a=11;
+update test.t4 set b=2100 where a=21;
+update test.t1 set b=1000 where a=1;
+commit;
+
+--echo Show slave transaction effect
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+
+--sync_slave_with_master master
+
+--connection master
+--echo Expect Primary to have rejected whole trans across 3 tables
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Expect 1 transaction rejected, 3 rows rejected
+--echo        1 conflict row, 1 epoch, 1 iteration
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Now restart rep to Secondary, and check realignment
+--connection slave
+start slave;
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Case 2 : Row based detection does not affect other transaction entries
+--connection slave
+stop slave;
+--connection master
+update test.t3 set b=1200 where a=12;
+
+--connection slave
+# Transaction conflicts with master, on table without transactional
+# conflict detection
+# Conflict will be detected on row, but no other transaction state
+# will be reverted
+#
+begin;
+update test.t3 set b=1201 where a=12;
+update test.t4 set b=2200 where a=22;
+update test.t1 set b=2000 where a=2;
+commit;
+
+--echo Show effect of transaction on Secondary
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--sync_slave_with_master master
+
+--echo Show effect of transaction on Primary
+--echo Only t3 should have been reverted
+
+--connection master
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Expect all counters to be zero
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Show effect of transaction on Secondary
+--connection slave
+start slave;
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+flush logs;
+--sync_slave_with_master master
+
+--echo Case 3 : Check behaviour where table with Blob is implicated
+--echo          in transactional conflict.  Should result in Slave
+--echo          stopping with an error.
+
+--connection slave
+STOP SLAVE;
+
+--connection master
+
+--echo Setup warning suppression
+--disable_query_log
+call mtr.add_suppression("Transaction conflict handling on table t5 failed as table has Blobs which cannot be refreshed");
+call mtr.add_suppression("NDBCLUSTER Error_code: 1296");
+--enable_query_log
+
+
+begin;
+update t1 set b= 11 where a=1;
+commit;
+
+--connection slave
+begin;
+update t1 set b= 111 where a=1;                 # Conflict
+update t1 set b= 222 where a=2;                 # Implicated row
+update t5 set b= REPEAT("T", 10000) where a=3;  # ImplicatedBlob update
+commit;
+
+--echo Show effect of transaction on Secondary
+select * from test.t1 order by a;
+select left(b,1), length(b) from test.t5 order by a;
+
+--echo Check that Primary Slave has stopped
+--connection master
+
+--let $slave_sql_errno=1296
+--source include/wait_for_slave_sql_error.inc
+#SHOW SLAVE STATUS;
+
+--echo Restart Primary Slave
+set global sql_slave_skip_counter=1;
+
+START SLAVE;
+
+--connection slave
+
+--echo Restart Secondary Slave
+START SLAVE;
+
+flush logs;
+--sync_slave_with_master master
+
+--connection master
+drop table test.t3;
+drop table test.t4;
+drop table test.t5;
+
+# Cleanup
+--connection master
+drop table mysql.ndb_replication;
+drop table test.t1;
+drop table test.t2;
+
+--sync_slave_with_master slave
+
+--connection slave
+flush logs;
+--sync_slave_with_master master
+stop slave;
+reset slave;
+change master to master_host='';
+--source include/rpl_end.inc
+
+# TODO
+# More complex dependencies
\ No newline at end of file

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc	2011-09-06 10:23:37 +0000
@@ -0,0 +1,8 @@
+--disable_query_log
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_epoch_trans AS ndb_conflict_fn_epoch_trans FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_conflict_count AS ndb_conflict_trans_row_conflict_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_reject_count AS ndb_conflict_trans_row_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_reject_count AS ndb_conflict_trans_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_detect_iter_count AS ndb_conflict_trans_detect_iter_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_conflict_commit_count AS ndb_conflict_trans_conflict_commit_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
\ No newline at end of file

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc	2011-09-06 10:23:37 +0000
@@ -0,0 +1,10 @@
+--disable_query_log
+--disable_result_log
+SELECT @init_ndb_conflict_fn_epoch_trans:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+SELECT @init_ndb_conflict_trans_row_conflict_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT @init_ndb_conflict_trans_row_reject_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT @init_ndb_conflict_trans_reject_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+SELECT @init_ndb_conflict_trans_detect_iter_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+SELECT @init_ndb_conflict_trans_conflict_commit_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
+--enable_result_log

=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc	2011-09-06 10:23:37 +0000
@@ -0,0 +1,22 @@
+--disable_query_log
+# Where multiple user transactions from the Secondary site are
+# involved, the actual number of rows in-conflict, the number
+# of epoch transactions with conflicts, and the number of
+# iterations, vary depending on the distribution of the user
+# transactions across epochs.  e.g. if 3 transactions are in
+# one epoch (and executed in one batch), then only actual
+# conflicts will be recorded, but if they are separated, then
+# implied rows may conflict as well.  This can make these
+# counter values non-deterministic, so this .inc file is used
+# to check that the stable counters are correct.  The stable
+# counters are the number of rejected rows, and the number
+# of rejected transactions, which must be the same, regardless
+# of how the epoch boundaries lie.
+#
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_epoch_trans AS ndb_conflict_fn_epoch_trans FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_conflict_count AS ndb_conflict_trans_row_conflict_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_reject_count AS ndb_conflict_trans_row_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_reject_count AS ndb_conflict_trans_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_detect_iter_count AS ndb_conflict_trans_detect_iter_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_conflict_commit_count AS ndb_conflict_trans_conflict_commit_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
\ No newline at end of file

=== added file 'mysql-test/suite/rpl/r/rpl_extra_row_data.result'
--- a/mysql-test/suite/rpl/r/rpl_extra_row_data.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_extra_row_data.result	2011-09-06 10:23:37 +0000
@@ -0,0 +1,46 @@
+include/master-slave.inc
+[connection master]
+Basic insert, update, delete from Master->Slave
+DBUG code will set + check transfer of extra
+row data in RBR
+**** On Master ****
+CREATE TABLE t1 (a INT);
+Ten inserts in one transaction -> 1 epoch transaction
+BEGIN;
+INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+COMMIT;
+Wait for Binlog-on-disk
+flush logs;
+Check that we have the expected extra row data in the Binlog
+create table raw_data(txt varchar(1000));
+select replace(txt, '\r', '') from raw_data where txt like '%### Extra row data format%' order by txt;
+replace(txt, '\r', '')
+### Extra row data format: 0, len: 0 :
+### Extra row data format: 1, len: 1 :0x01
+### Extra row data format: 2, len: 2 :0x0202
+### Extra row data format: 3, len: 3 :0x030303
+### Extra row data format: 4, len: 4 :0x04040404
+### Extra row data format: 5, len: 5 :0x0505050505
+### Extra row data format: 6, len: 6 :0x060606060606
+### Extra row data format: 7, len: 7 :0x07070707070707
+### Extra row data format: 8, len: 8 :0x0808080808080808
+### Extra row data format: 9, len: 9 :0x090909090909090909
+drop table raw_data;
+Generate some more insert, update, delete traffic
+INSERT INTO t1 SELECT a+10 FROM t1;
+INSERT INTO t1 SELECT a+20 FROM t1;
+INSERT INTO t1 SELECT a+40 FROM t1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+DELETE FROM t1 WHERE a > 390;
+**** On Slave ****
+Check row count and that slave is running ok
+SELECT count(*) from t1;
+count(*)
+80
+include/check_slave_is_running.inc
+DROP TABLE t1;
+include/rpl_end.inc

=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt	2011-09-06 10:23:37 +0000
@@ -0,0 +1 @@
+--loose-debug=+d,extra_row_data_set

=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt	2011-09-06 10:23:37 +0000
@@ -0,0 +1 @@
+--loose-debug=+d,extra_row_data_check

=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data.test'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data.test	2011-09-06 10:23:37 +0000
@@ -0,0 +1,68 @@
+--source include/master-slave.inc
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+
+#
+# Test Binlog row extra data added as part of WL5353
+# Test depends on functionality added to server code within
+# #ifndef MCP_WL5353
+#
+#
+--echo Basic insert, update, delete from Master->Slave
+--echo DBUG code will set + check transfer of extra
+--echo row data in RBR
+--echo **** On Master ****
+CREATE TABLE t1 (a INT);
+
+--echo Ten inserts in one transaction -> 1 epoch transaction
+BEGIN;
+INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+COMMIT;
+
+--echo Wait for Binlog-on-disk
+flush logs;
+
+--echo Check that we have the expected extra row data in the Binlog
+create table raw_data(txt varchar(1000));
+--disable_query_log
+let $MYSQLD_DATADIR= `select @@datadir;`;
+--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/rpl_extra_row_data.out
+
+--eval load data local infile '$MYSQLTEST_VARDIR/tmp/rpl_extra_row_data.out' into table raw_data columns terminated by '\n';
+--enable_query_log
+
+select replace(txt, '\r', '') from raw_data where txt like '%### Extra row data format%' order by txt;
+drop table raw_data;
+
+--echo Generate some more insert, update, delete traffic
+INSERT INTO t1 SELECT a+10 FROM t1;
+INSERT INTO t1 SELECT a+20 FROM t1;
+INSERT INTO t1 SELECT a+40 FROM t1;
+# 80 rows, 80 inserts
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+# 5 Updates of 80 rows = 400 updates, enough to show all potential lengths
+# of Binlog extra row data including 0 + 255.
+
+# 10 deletes
+DELETE FROM t1 WHERE a > 390;
+
+#show binlog events;
+#let $MYSQLD_DATADIR= `select @@datadir;`;
+#--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/master-bin.000001
+
+--echo **** On Slave ****
+--sync_slave_with_master
+connection slave;
+
+--echo Check row count and that slave is running ok
+SELECT count(*) from t1;
+source include/check_slave_is_running.inc;
+
+connection master;
+DROP TABLE t1;
+--sync_slave_with_master
+--source include/rpl_end.inc

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2011-07-04 16:30:34 +0000
+++ b/sql/Makefile.am	2011-09-06 10:23:37 +0000
@@ -63,6 +63,7 @@ noinst_HEADERS =	item.h item_func.h item
 			ha_ndbcluster_glue.h \
 			ha_ndb_index_stat.h \
                         ndb_mi.h \
+			ndb_conflict_trans.h \
 			ha_partition.h rpl_constants.h \
 			debug_sync.h \
 			opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
@@ -139,7 +140,8 @@ libndb_la_SOURCES=	ha_ndbcluster.cc \
 			ha_ndbcluster_cond.cc \
 			ha_ndb_index_stat.cc \
 			ha_ndbinfo.cc \
-			ndb_mi.cc
+			ndb_mi.cc \
+			ndb_conflict_trans.cc
 
 gen_lex_hash_SOURCES =	gen_lex_hash.cc
 gen_lex_hash_LDFLAGS =  @NOINST_LDFLAGS@

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-07-08 12:31:38 +0000
+++ b/sql/ha_ndbcluster.cc	2011-09-06 10:23:37 +0000
@@ -44,6 +44,7 @@
 #include <mysql/plugin.h>
 #include <ndb_version.h>
 #include "ndb_mi.h"
+#include "ndb_conflict_trans.h"
 
 #ifdef ndb_dynamite
 #undef assert
@@ -460,96 +461,6 @@ update_slave_api_stats(Ndb* ndb)
 
 st_ndb_slave_state g_ndb_slave_state;
 
-st_ndb_slave_state::st_ndb_slave_state()
-  : current_conflict_defined_op_count(0),
-    current_master_server_epoch(0),
-    current_max_rep_epoch(0),
-    max_rep_epoch(0),
-    sql_run_id(~Uint32(0))
-{
-  memset(current_violation_count, 0, sizeof(current_violation_count));
-  memset(total_violation_count, 0, sizeof(total_violation_count));
-};
-
-void
-st_ndb_slave_state::atTransactionAbort()
-{
-  /* Reset current-transaction counters + state */
-  memset(current_violation_count, 0, sizeof(current_violation_count));
-  current_conflict_defined_op_count = 0;
-  current_max_rep_epoch = 0;
-}
-
-void
-st_ndb_slave_state::atTransactionCommit()
-{
-  /* Merge committed transaction counters into total state
-   * Then reset current transaction counters
-   */
-  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
-  {
-    total_violation_count[i]+= current_violation_count[i];
-    current_violation_count[i] = 0;
-  }
-  current_conflict_defined_op_count = 0;
-  if (current_max_rep_epoch > max_rep_epoch)
-  {
-    DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
-                        max_rep_epoch,
-                        current_max_rep_epoch));
-
-    max_rep_epoch = current_max_rep_epoch;
-  }
-  current_max_rep_epoch = 0;
-}
-
-void
-st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
-                                       Uint32 row_server_id,
-                                       Uint64 row_epoch,
-                                       bool is_row_server_id_local)
-{
-  if (row_server_id == master_server_id)
-  {
-    /*
-       WRITE_ROW to ndb_apply_status injected by MySQLD
-       immediately upstream of us.
-       Record epoch
-    */
-    current_master_server_epoch = row_epoch;
-    assert(! is_row_server_id_local);
-  }
-  else if (is_row_server_id_local)
-  {
-    DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
-                        " which is %s.",
-                        row_server_id, row_epoch,
-                        (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
-                        " new highest." : " older than previously applied"));
-    if (row_epoch > current_max_rep_epoch)
-    {
-      /*
-        Store new highest epoch in thdvar.  If we commit successfully
-        then this can become the new global max
-      */
-      current_max_rep_epoch = row_epoch;
-    }
-  }
-}
-
-void
-st_ndb_slave_state::atResetSlave()
-{
-  /* Reset the Maximum replicated epoch vars
-   * on slave reset
-   * No need to touch the sql_run_id as that
-   * will increment if the slave is started
-   * again.
-   */
-  current_max_rep_epoch = 0;
-  max_rep_epoch = 0;
-}
-
 static int check_slave_state(THD* thd)
 {
   DBUG_ENTER("check_slave_state");
@@ -568,6 +479,8 @@ static int check_slave_state(THD* thd)
                         g_ndb_slave_state.sql_run_id));
     g_ndb_slave_state.sql_run_id = runId;
 
+    g_ndb_slave_state.atStartSlave();
+
     /* Always try to load the Max Replicated Epoch info
      * first.
      * Could be made optional if it's a problem
@@ -799,6 +712,13 @@ SHOW_VAR ndb_status_conflict_variables[]
   {"fn_old",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONGLONG},
   {"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONGLONG},
   {"fn_epoch",     (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH], SHOW_LONGLONG},
+  {"fn_epoch_trans", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH_TRANS], SHOW_LONGLONG},
+  {"trans_row_conflict_count", (char*) &g_ndb_slave_state.trans_row_conflict_count, SHOW_LONGLONG},
+  {"trans_row_reject_count",   (char*) &g_ndb_slave_state.trans_row_reject_count, SHOW_LONGLONG},
+  {"trans_reject_count",       (char*) &g_ndb_slave_state.trans_in_conflict_count, SHOW_LONGLONG},
+  {"trans_detect_iter_count",  (char*) &g_ndb_slave_state.trans_detect_iter_count, SHOW_LONGLONG},
+  {"trans_conflict_commit_count",
+                               (char*) &g_ndb_slave_state.trans_conflict_commit_count, SHOW_LONGLONG},
   {NullS, NullS, SHOW_LONG}
 };
 
@@ -905,66 +825,6 @@ static int ndb_to_mysql_error(const NdbE
 
 #ifdef HAVE_NDB_BINLOG
 
-/* Write conflicting row to exceptions table. */
-static int write_conflict_row(NDB_SHARE *share,
-                              NdbTransaction *trans,
-                              const uchar *row,
-                              NdbError& err)
-{
-  DBUG_ENTER("write_conflict_row");
-
-  /* get exceptions table */
-  NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
-  const NDBTAB *ex_tab= cfn_share->m_ex_tab;
-  DBUG_ASSERT(ex_tab != NULL);
-
-  /* get insert op */
-  NdbOperation *ex_op= trans->getNdbOperation(ex_tab);
-  if (ex_op == NULL)
-  {
-    err= trans->getNdbError();
-    DBUG_RETURN(-1);
-  }
-  if (ex_op->insertTuple() == -1)
-  {
-    err= ex_op->getNdbError();
-    DBUG_RETURN(-1);
-  }
-  {
-    uint32 server_id= (uint32)::server_id;
-    uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
-    uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
-    uint32 count= (uint32)++(cfn_share->m_count);
-    if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
-        ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
-        ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
-        ex_op->setValue((Uint32)3, (const char *)&(count)))
-    {
-      err= ex_op->getNdbError();
-      DBUG_RETURN(-1);
-    }
-  }
-  /* copy primary keys */
-  {
-    const int fixed_cols= 4;
-    int nkey= cfn_share->m_pk_cols;
-    int k;
-    for (k= 0; k < nkey; k++)
-    {
-      DBUG_ASSERT(row != NULL);
-      const uchar* data= row + cfn_share->m_offset[k];
-      if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
-      {
-        err= ex_op->getNdbError();
-        DBUG_RETURN(-1);
-      }
-    }
-  }
-  DBUG_RETURN(0);
-}
-#endif
-
-#ifdef HAVE_NDB_BINLOG
 int
 handle_conflict_op_error(Thd_ndb* thd_ndb,
                          NdbTransaction* trans,
@@ -974,13 +834,14 @@ handle_conflict_op_error(Thd_ndb* thd_nd
 int
 handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
                     const char* tab_name,
+                    bool table_has_blobs,
+                    const char* handling_type,
                     const NdbRecord* key_rec,
                     const uchar* pk_row,
                     enum_conflicting_op_type op_type,
                     enum_conflict_cause conflict_cause,
                     const NdbError& conflict_error,
-                    NdbTransaction* conflict_trans,
-                    NdbError& err);
+                    NdbTransaction* conflict_trans);
 #endif
 
 static const Uint32 error_op_after_refresh_op = 920;
@@ -4169,7 +4030,609 @@ thd_allow_batch(const THD* thd)
 #endif
 }
 
+/**
+   st_ndb_slave_state constructor
+
+   Initialise Ndb Slave state object
+*/
+st_ndb_slave_state::st_ndb_slave_state()
+  : current_master_server_epoch(0),
+    current_max_rep_epoch(0),
+    conflict_flags(0),
+    retry_trans_count(0),
+    current_trans_row_conflict_count(0),
+    current_trans_row_reject_count(0),
+    current_trans_in_conflict_count(0),
+    max_rep_epoch(0),
+    sql_run_id(~Uint32(0)),
+    trans_row_conflict_count(0),
+    trans_row_reject_count(0),
+    trans_detect_iter_count(0),
+    trans_in_conflict_count(0),
+    trans_conflict_commit_count(0),
+    trans_conflict_apply_state(SAS_NORMAL),
+    trans_dependency_tracker(NULL)
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  memset(total_violation_count, 0, sizeof(total_violation_count));
+
+  /* Init conflict handling state memroot */
+  const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
+  init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
+};
+
+/**
+   resetPerAttemptCounters
+
+   Reset the per-epoch-transaction-application-attempt counters
+*/
+void
+st_ndb_slave_state::resetPerAttemptCounters()
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  current_trans_row_conflict_count = 0;
+  current_trans_row_reject_count = 0;
+  current_trans_in_conflict_count = 0;
+
+  conflict_flags = 0;
+  current_max_rep_epoch = 0;
+}
+
+/**
+   atTransactionAbort()
+
+   Called by Slave SQL thread during transaction abort.
+*/
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+  /* Reset current-transaction counters + state */
+  resetPerAttemptCounters();
+}
+
+/**
+   atTransactionCommit()
+
+   Called by Slave SQL thread after transaction commit
+*/
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+  assert( ((trans_dependency_tracker == NULL) &&
+           (trans_conflict_apply_state == SAS_NORMAL)) ||
+          ((trans_dependency_tracker != NULL) &&
+           (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
+  assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
+
+  /* Merge committed transaction counters into total state
+   * Then reset current transaction counters
+   */
+  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+  {
+    total_violation_count[i]+= current_violation_count[i];
+  }
+  trans_row_conflict_count+= current_trans_row_conflict_count;
+  trans_row_reject_count+= current_trans_row_reject_count;
+  trans_in_conflict_count+= current_trans_in_conflict_count;
+
+  if (current_trans_in_conflict_count)
+    trans_conflict_commit_count++;
+
+  if (current_max_rep_epoch > max_rep_epoch)
+  {
+    DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
+                        max_rep_epoch,
+                        current_max_rep_epoch));
+    max_rep_epoch = current_max_rep_epoch;
+  }
+
+  resetPerAttemptCounters();
+
+  /* Clear per-epoch-transaction retry_trans_count */
+  retry_trans_count = 0;
+}
+
+/**
+   atApplyStatusWrite
+
+   Called by Slave SQL thread when applying an event to the
+   ndb_apply_status table
+*/
+void
+st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
+                                       Uint32 row_server_id,
+                                       Uint64 row_epoch,
+                                       bool is_row_server_id_local)
+{
+  if (row_server_id == master_server_id)
+  {
+    /*
+       WRITE_ROW to ndb_apply_status injected by MySQLD
+       immediately upstream of us.
+       Record epoch
+    */
+    current_master_server_epoch = row_epoch;
+    assert(! is_row_server_id_local);
+  }
+  else if (is_row_server_id_local)
+  {
+    DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
+                        " which is %s.",
+                        row_server_id, row_epoch,
+                        (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
+                        " new highest." : " older than previously applied"));
+    if (row_epoch > current_max_rep_epoch)
+    {
+      /*
+        Store new highest epoch in thdvar.  If we commit successfully
+        then this can become the new global max
+      */
+      current_max_rep_epoch = row_epoch;
+    }
+  }
+}
+
+/**
+   atResetSlave()
+
+   Called when RESET SLAVE command issued - in context of command client.
+*/
+void
+st_ndb_slave_state::atResetSlave()
+{
+  /* Reset the Maximum replicated epoch vars
+   * on slave reset
+   * No need to touch the sql_run_id as that
+   * will increment if the slave is started
+   * again.
+   */
+  resetPerAttemptCounters();
+
+  retry_trans_count = 0;
+  max_rep_epoch = 0;
+}
+
+/**
+   atStartSlave()
+
+   Called by Slave SQL thread when first applying a row to Ndb after
+   a START SLAVE command.
+*/
+void
+st_ndb_slave_state::atStartSlave()
+{
 #ifdef HAVE_NDB_BINLOG
+  if (trans_conflict_apply_state != SAS_NORMAL)
+  {
+    /*
+      Remove conflict handling state on a SQL thread
+      restart
+    */
+    atEndTransConflictHandling();
+    trans_conflict_apply_state = SAS_NORMAL;
+  }
+#endif
+};
+
+#ifdef HAVE_NDB_BINLOG
+
+/**
+   atBeginTransConflictHandling()
+
+   Called by Slave SQL thread when it determines that Transactional
+   Conflict handling is required
+*/
+void
+st_ndb_slave_state::atBeginTransConflictHandling()
+{
+  DBUG_ENTER("atBeginTransConflictHandling");
+  /*
+     Allocate and initialise Transactional Conflict
+     Resolution Handling Structures
+  */
+  assert(trans_dependency_tracker == NULL);
+  trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
+  DBUG_VOID_RETURN;
+};
+
+/**
+   atPrepareConflictDetection
+
+   Called by Slave SQL thread prior to defining an operation on
+   a table with conflict detection defined.
+*/
+int
+st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
+                                               const NdbRecord* key_rec,
+                                               const uchar* row_data,
+                                               Uint64 transaction_id,
+                                               bool& handle_conflict_now)
+{
+  DBUG_ENTER("atPrepareConflictDetection");
+  /*
+    Slave is preparing to apply an operation with conflict detection.
+    If we're performing Transactional Conflict Resolution, take
+    extra steps
+  */
+  switch( trans_conflict_apply_state )
+  {
+  case SAS_NORMAL:
+    DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
+    /* No special handling */
+    break;
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
+    /*
+      Track this operation and its transaction id, to determine
+      inter-transaction dependencies by {table, primary key}
+    */
+    assert( trans_dependency_tracker );
+
+    int res = trans_dependency_tracker
+      ->track_operation(table,
+                        key_rec,
+                        row_data,
+                        transaction_id);
+    if (res != 0)
+    {
+      sql_print_error("%s", trans_dependency_tracker->get_error_text());
+      DBUG_RETURN(res);
+    }
+    /* Proceed as normal */
+    break;
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
+    /*
+       Check if this operation's transaction id is marked in-conflict.
+       If it is, we tell the caller to perform conflict resolution now instead
+       of attempting to apply the operation.
+    */
+    assert( trans_dependency_tracker );
+
+    if (trans_dependency_tracker->in_conflict(transaction_id))
+    {
+      DBUG_PRINT("info", ("Event for transaction %llu is conflicting.  Handling.",
+                          transaction_id));
+      current_trans_row_reject_count++;
+      handle_conflict_now = true;
+      DBUG_RETURN(0);
+    }
+
+    /*
+       This transaction is not marked in-conflict, so continue with normal
+       processing.
+       Note that normal processing may subsequently detect a conflict which
+       didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
+       In this case, we will rollback and repeat the TRACK_DEPENDENCIES
+       stage.
+    */
+    DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
+                        transaction_id));
+    break;
+  }
+  }
+  DBUG_RETURN(0);
+}
+
+/**
+   atTransConflictDetected
+
+   Called by the Slave SQL thread when a conflict is detected on
+   an executed operation.
+*/
+int
+st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
+{
+  DBUG_ENTER("atTransConflictDetected");
+
+  /*
+     The Slave has detected a conflict on an operation applied
+     to a table with Transactional Conflict Resolution defined.
+     Handle according to current state.
+  */
+  conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
+  current_trans_row_conflict_count++;
+
+  switch (trans_conflict_apply_state)
+  {
+  case SAS_NORMAL:
+  {
+    DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
+                        "Requires multi-pass resolution.  Will transition to "
+                        "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
+    /*
+      Conflict on table with transactional conflict resolution
+      defined.
+      This is the trigger that we will do transactional conflict
+      resolution.
+      Record that we need to do multiple passes to correctly
+      perform resolution.
+      TODO : Early exit from applying epoch?
+    */
+    break;
+  }
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
+                        "had conflict",
+                        transaction_id));
+    /*
+       Conflict on table with transactional conflict resolution
+       defined.
+       We will mark the operation's transaction_id as in-conflict,
+       so that any other operations on the transaction are also
+       considered in-conflict, and any dependent transactions are also
+       considered in-conflict.
+    */
+    assert(trans_dependency_tracker != NULL);
+    int res = trans_dependency_tracker
+      ->mark_conflict(transaction_id);
+
+    if (res != 0)
+    {
+      sql_print_error("%s", trans_dependency_tracker->get_error_text());
+      DBUG_RETURN(res);
+    }
+    break;
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    /*
+       This must be a new conflict, not noticed on the previous
+       pass.
+    */
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected.  "
+                        "Must be further conflict.  Will return to "
+                        "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
+    // TODO : Early exit from applying epoch
+    break;
+  }
+  default:
+    break;
+  }
+
+  DBUG_RETURN(0);
+}
+
+/**
+   atConflictPreCommit
+
+   Called by the Slave SQL thread prior to committing a Slave transaction.
+   This method can request that the Slave transaction is retried.
+
+
+   State transitions :
+
+                       START SLAVE /
+                       RESET SLAVE /
+                        STARTUP
+                            |
+                            |
+                            v
+                    ****************
+                    *  SAS_NORMAL  *
+                    ****************
+                       ^       |
+    No transactional   |       | Conflict on transactional table
+       conflicts       |       | (Rollback)
+       (Commit)        |       |
+                       |       v
+            **********************************
+            *  SAS_TRACK_TRANS_DEPENDENCIES  *
+            **********************************
+               ^          I              ^
+     More      I          I Dependencies |
+    conflicts  I          I determined   | No new conflicts
+     found     I          I (Rollback)   | (Commit)
+    (Rollback) I          I              |
+               I          v              |
+           **********************************
+           *  SAS_APPLY_TRANS_DEPENDENCIES  *
+           **********************************
+
+
+   Operation
+     The initial state is SAS_NORMAL.
+
+     On detecting a conflict on a transactional conflict detetecing table,
+     SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
+     rolled back and reapplied.
+
+     In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
+     conflicts are tracked as the epoch transaction is applied.
+
+     Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
+     the epoch transaction is rolled back and reapplied.
+
+     In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
+     marked as in-conflict are not applied.
+
+     If this results in no new conflicts, the epoch transaction is committed,
+     and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
+     the next replicated epch transaction.
+     If it results in new conflicts, the epoch transactions is rolled back, and
+     the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
+     the new set of dependencies.
+
+     If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
+     the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
+     state.
+
+
+   Properties
+     1) Normally, there is no transaction dependency tracking overhead paid by
+        the slave.
+
+     2) On first detecting a transactional conflict, the epoch transaction must be
+        applied at least three times, with two rollbacks.
+
+     3) Transactional conflicts detected in subsequent epochs require the epoch
+        transaction to be applied two times, with one rollback.
+
+     4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
+        DEPENDENCIES occurs when further transactional conflicts are discovered
+        in SAS_APPLY_TRANS_DEPENDENCIES state.  This implies that the  conflicts
+        discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
+        so we revisit that state to get a more complete picture.
+
+     5) The number of iterations of this loop is fixed to a hard coded limit, after
+        which the Slave will stop with an error.  This should be an unlikely
+        occurrence, as it requires not just n conflicts, but at least 1 new conflict
+        appearing between the transactions in the epoch transaction and the
+        database between the two states, n times in a row.
+
+     6) Where conflicts are occasional, as expected, the post-commit transition to
+        SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
+        transaction having its transaction dependencies needlessly tracked.
+
+*/
+int
+st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
+{
+  DBUG_ENTER("atConflictPreCommit");
+
+  /*
+    Prior to committing a Slave transaction, we check whether
+    Transactional conflicts have been detected which require
+    us to retry the slave transaction
+  */
+  retry_slave_trans = false;
+  switch(trans_conflict_apply_state)
+  {
+  case SAS_NORMAL:
+  {
+    DBUG_PRINT("info", ("SAS_NORMAL"));
+    /*
+       Normal case.  Only if we defined conflict detection on a table
+       with transactional conflict detection, and saw conflicts (on any table)
+       do we go to another state
+     */
+    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+    {
+      DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
+                          "SAS_TRACK_TRANS_DEPENDENCIES."));
+      assert(conflict_flags & SCS_OPS_DEFINED);
+      /* Transactional conflict resolution required, switch state */
+      atBeginTransConflictHandling();
+      resetPerAttemptCounters();
+      trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+      retry_slave_trans = true;
+    }
+    break;
+  }
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
+
+    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+    {
+      /*
+         Conflict on table with transactional detection
+         this pass, we have collected the details and
+         dependencies, now transition to
+         SAS_APPLY_TRANS_DEPENDENCIES and
+         reapply the epoch transaction without the
+         conflicting transactions.
+      */
+      assert(conflict_flags & SCS_OPS_DEFINED);
+      DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
+                          "SAS_APPLY_TRANS_DEPENDENCIES"));
+
+      trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
+      trans_detect_iter_count++;
+      retry_slave_trans = true;
+      break;
+    }
+    else
+    {
+      /*
+         No transactional conflicts detected this pass, lets
+         return to SAS_NORMAL state after commit for more efficient
+         application of epoch transactions
+      */
+      DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
+                          "SAS_NORMAL"));
+      atEndTransConflictHandling();
+      trans_conflict_apply_state = SAS_NORMAL;
+      break;
+    }
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
+    assert(conflict_flags & SCS_OPS_DEFINED);
+    /*
+       We've applied the Slave epoch transaction subject to the
+       conflict detection.  If any further transactional
+       conflicts have been observed, then we must repeat the
+       process.
+    */
+    atEndTransConflictHandling();
+    atBeginTransConflictHandling();
+    trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+
+    if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
+    {
+      DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
+                          "TRACK_TRANS_DEPENDENCIES pass"));
+      /*
+         Further conflict observed when applying, need
+         to re-determine dependencies
+      */
+      resetPerAttemptCounters();
+      retry_slave_trans = true;
+      break;
+    }
+
+
+    DBUG_PRINT("info", ("No further conflicts detected, committing and "
+                        "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
+    /*
+       With dependencies taken into account, no further
+       conflicts detected, can now proceed to commit
+    */
+    break;
+  }
+  }
+
+  /*
+    Clear conflict flags, to ensure that we detect any new conflicts
+  */
+  conflict_flags = 0;
+
+  if (retry_slave_trans)
+  {
+    DBUG_PRINT("info", ("Requesting transaction restart"));
+    DBUG_RETURN(1);
+  }
+
+  DBUG_PRINT("info", ("Allowing commit to proceed"));
+  DBUG_RETURN(0);
+}
+
+/**
+   atEndTransConflictHandling
+
+   Called when transactional conflict handling has completed.
+*/
+void
+st_ndb_slave_state::atEndTransConflictHandling()
+{
+  DBUG_ENTER("atEndTransConflictHandling");
+  /* Release any conflict handling state */
+  if (trans_dependency_tracker)
+  {
+    current_trans_in_conflict_count =
+      trans_dependency_tracker->get_conflict_count();
+    trans_dependency_tracker = NULL;
+    free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
+  }
+  DBUG_VOID_RETURN;
+};
+
 /**
    prepare_conflict_detection
 
@@ -4178,22 +4641,114 @@ thd_allow_batch(const THD* thd)
 
    It is responsible for defining and adding any operation filtering
    required, and for saving any operation definition state required
-   for post-execute analysis
+   for post-execute analysis.
+
+   For transactional detection, this method may determine that the
+   operation being defined should not be executed, and conflict
+   handling should occur immediately.  In this case, conflict_handled
+   is set to true.
 */
 int
 ha_ndbcluster::prepare_conflict_detection(enum_conflicting_op_type op_type,
                                           const NdbRecord* key_rec,
                                           const uchar* old_data,
                                           const uchar* new_data,
+                                          NdbTransaction* trans,
                                           NdbInterpretedCode* code,
-                                          NdbOperation::OperationOptions* options)
+                                          NdbOperation::OperationOptions* options,
+                                          bool& conflict_handled)
 {
   DBUG_ENTER("prepare_conflict_detection");
+  THD* thd = table->in_use;
+  int res = 0;
+  assert(thd->slave_thread);
+
+  conflict_handled = false;
+
+  /*
+     Check transaction id first, as in transactional conflict detection,
+     the transaction id is what eventually dictates whether an operation
+     is applied or not.
+
+     Not that this applies even if the current operation's table does not
+     have a conflict function defined - if a transaction spans a 'transactional
+     conflict detection' table and a non transactional table, the non-transactional
+     table's data will also be reverted.
+  */
+  Uint64 transaction_id = Ndb_binlog_extra_row_info::InvalidTransactionId;
+  if (thd->binlog_row_event_extra_data)
+  {
+    Ndb_binlog_extra_row_info extra_row_info;
+    extra_row_info.loadFromBuffer(thd->binlog_row_event_extra_data);
+    if (extra_row_info.getFlags() &
+        Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID)
+      transaction_id = extra_row_info.getTransactionId();
+  }
+
+  {
+    bool handle_conflict_now = false;
+    const uchar* row_data = (op_type == WRITE_ROW? new_data : old_data);
+    int res = g_ndb_slave_state.atPrepareConflictDetection(m_table,
+                                                           key_rec,
+                                                           row_data,
+                                                           transaction_id,
+                                                           handle_conflict_now);
+    if (res)
+      DBUG_RETURN(res);
+
+    if (handle_conflict_now)
+    {
+      DBUG_PRINT("info", ("Conflict handling for row occurring now"));
+      NdbError noRealConflictError;
+      const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
+
+      /*
+         Directly handle the conflict here - e.g refresh/ write to
+         exceptions table etc.
+      */
+      res = handle_row_conflict(m_share->m_cfn_share,
+                                m_share->table_name,
+                                m_share->flags & NSF_BLOB_FLAG,
+                                "Transaction",
+                                key_rec,
+                                row_to_save,
+                                op_type,
+                                TRANS_IN_CONFLICT,
+                                noRealConflictError,
+                                trans);
+      if (unlikely(res))
+        DBUG_RETURN(res);
+
+      g_ndb_slave_state.conflict_flags |= SCS_OPS_DEFINED;
+
+      /*
+        Indicate that there (may be) some more operations to
+        execute before committing
+      */
+      m_thd_ndb->m_unsent_bytes+= 12;
+      conflict_handled = true;
+      DBUG_RETURN(0);
+    }
+  }
+
+  if (! (m_share->m_cfn_share &&
+         m_share->m_cfn_share->m_conflict_fn))
+  {
+    /* No conflict function definition required */
+    DBUG_RETURN(0);
+  }
 
-  int res = 0;
   const st_conflict_fn_def* conflict_fn = m_share->m_cfn_share->m_conflict_fn;
   assert( conflict_fn != NULL );
 
+  if (unlikely((conflict_fn->flags & CF_TRANSACTIONAL) &&
+               (transaction_id == Ndb_binlog_extra_row_info::InvalidTransactionId)))
+  {
+    sql_print_warning("NDB Slave : Transactional conflict detection defined on table %s, but "
+                      "events received without transaction ids.  Check --ndb-log-transaction-id setting "
+                      "on upstream Cluster.",
+                      m_share->key);
+  }
 
   /*
      Prepare interpreted code for operation (update + delete only) according
@@ -4216,7 +4771,7 @@ ha_ndbcluster::prepare_conflict_detectio
     }
   } // if (op_type != WRITE_ROW)
 
-  g_ndb_slave_state.current_conflict_defined_op_count++;
+  g_ndb_slave_state.conflict_flags |= SCS_OPS_DEFINED;
 
   /* Now save data for potential insert to exceptions table... */
   const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
@@ -4224,6 +4779,7 @@ ha_ndbcluster::prepare_conflict_detectio
   ex_data.share= m_share;
   ex_data.key_rec= key_rec;
   ex_data.op_type= op_type;
+  ex_data.trans_id= transaction_id;
   /*
     We need to save the row data for possible conflict resolution after
     execute().
@@ -4271,32 +4827,38 @@ handle_conflict_op_error(Thd_ndb* thd_nd
       (err.classification == NdbError::NoDataFound))
   {
     DBUG_PRINT("info",
-               ("err.code %s (int) error_conflict_fn_violation, "
-                "err.classification %s",
-                err.code == (int) error_conflict_fn_violation ? "==" : "!=",
-                err.classification
-                == NdbError::ConstraintViolation
-                ? "== NdbError::ConstraintViolation"
-                : (err.classification == NdbError::NoDataFound
-                   ? "== NdbError::NoDataFound" : "!=")));
+               ("err.code = %s, err.classification = %s",
+               ((err.code == (int) error_conflict_fn_violation)?
+                "error_conflict_fn_violation":
+                ((err.code == (int) error_op_after_refresh_op)?
+                 "error_op_after_refresh_op" : "?")),
+               ((err.classification == NdbError::ConstraintViolation)?
+                "ConstraintViolation":
+                ((err.classification == NdbError::NoDataFound)?
+                 "NoDataFound" : "?"))));
 
     enum_conflict_cause conflict_cause;
 
+    /* Map cause onto our conflict description type */
     if ((err.code == (int) error_conflict_fn_violation) ||
         (err.code == (int) error_op_after_refresh_op))
     {
+      DBUG_PRINT("info", ("ROW_IN_CONFLICT"));
       conflict_cause= ROW_IN_CONFLICT;
     }
     else if (err.classification == NdbError::ConstraintViolation)
     {
+      DBUG_PRINT("info", ("ROW_ALREADY_EXISTS"));
       conflict_cause= ROW_ALREADY_EXISTS;
     }
     else
     {
       assert(err.classification == NdbError::NoDataFound);
+      DBUG_PRINT("info", ("ROW_DOES_NOT_EXIST"));
       conflict_cause= ROW_DOES_NOT_EXIST;
     }
 
+    /* Get exceptions data from operation */
     const void* buffer=op->getCustomData();
     assert(buffer);
     Ndb_exceptions_data ex_data;
@@ -4304,88 +4866,65 @@ handle_conflict_op_error(Thd_ndb* thd_nd
     NDB_SHARE *share= ex_data.share;
     const NdbRecord* key_rec= ex_data.key_rec;
     const uchar* row= ex_data.row;
-    enum_conflicting_op_type op_type = ex_data.op_type;
-    DBUG_ASSERT(share != NULL && row != NULL);
-
-    NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
-    if (cfn_share)
-    {
-      enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
-      bool haveExTable = cfn_share->m_ex_tab != NULL;
+    enum_conflicting_op_type causing_op_type = ex_data.op_type;
 
-      g_ndb_slave_state.current_violation_count[cft]++;
+    DBUG_PRINT("info", ("Conflict causing op type : %u",
+                        causing_op_type));
 
+    if (causing_op_type == REFRESH_ROW)
+    {
+      /*
+         The failing op was a refresh row, we require that it
+         failed due to being a duplicate (e.g. a refresh
+         occurring on a refreshed row)
+       */
+      if (err.code == (int) error_op_after_refresh_op)
       {
-        NdbError handle_error;
-        if (handle_row_conflict(cfn_share,
-                                share->table_name,
-                                key_rec,
-                                row,
-                                op_type,
-                                conflict_cause,
-                                err,
-                                trans,
-                                handle_error))
-        {
-          /* Error with handling of row conflict */
-          char msg[FN_REFLEN];
-          my_snprintf(msg, sizeof(msg), "Row conflict handling "
-                      "on table %s hit Ndb error %d '%s'",
-                      share->table_name,
-                      handle_error.code,
-                      handle_error.message);
-
-          if (handle_error.status == NdbError::TemporaryError)
-          {
-            /* Slave will roll back and retry entire transaction. */
-            ERR_RETURN(handle_error);
-          }
-          else
-          {
-            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                                ER_EXCEPTIONS_WRITE_ERROR,
-                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-            /* Slave will stop replication. */
-            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-          }
-        }
+        DBUG_PRINT("info", ("Operation after refresh - ignoring"));
+        DBUG_RETURN(0);
+      }
+      else
+      {
+        DBUG_PRINT("info", ("Refresh op hit real error %u", err.code));
+        /* Unexpected error, normal handling*/
+        DBUG_RETURN(err.code);
       }
+    }
 
+    DBUG_ASSERT(share != NULL && row != NULL);
+    NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
+    bool table_has_trans_conflict_detection =
+      cfn_share &&
+      cfn_share->m_conflict_fn &&
+      (cfn_share->m_conflict_fn->flags & CF_TRANSACTIONAL);
+
+    if (table_has_trans_conflict_detection)
+    {
+      /* Perform special transactional conflict-detected handling */
+      int res = g_ndb_slave_state.atTransConflictDetected(ex_data.trans_id);
+      if (res)
+        DBUG_RETURN(res);
+    }
 
-      if (haveExTable)
-      {
-        NdbError ex_err;
-        if (write_conflict_row(share, trans, row, ex_err))
-        {
-          char msg[FN_REFLEN];
-          my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
-                      cfn_share->m_ex_tab->getName(),
-                      ex_err.code, ex_err.message);
+    if (cfn_share)
+    {
+      /* Now handle the conflict on this row */
+      enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
 
-          NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
+      g_ndb_slave_state.current_violation_count[cft]++;
 
-          if (ex_err.classification == NdbError::SchemaError)
-          {
-            dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
-            cfn_share->m_ex_tab= NULL;
-          }
-          else if (ex_err.status == NdbError::TemporaryError)
-          {
-            /* Slave will roll back and retry entire transaction. */
-            ERR_RETURN(ex_err);
-          }
-          else
-          {
-            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                                ER_EXCEPTIONS_WRITE_ERROR,
-                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-            /* Slave will stop replication. */
-            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-          }
-        }
-      } // if (haveExTable)
+      int res = handle_row_conflict(cfn_share,
+                                    share->table_name,
+                                    false, /* table_has_blobs */
+                                    "Row",
+                                    key_rec,
+                                    row,
+                                    causing_op_type,
+                                    conflict_cause,
+                                    err,
+                                    trans);
 
-      DBUG_RETURN(0);
+      DBUG_RETURN(res);
     }
     else
     {
@@ -4402,10 +4941,7 @@ handle_conflict_op_error(Thd_ndb* thd_nd
 
   DBUG_RETURN(0); // Reachable?
 }
-#endif /* HAVE_NDB_BINLOG */
 
-
-#ifdef HAVE_NDB_BINLOG
 /*
   is_serverid_local
 */
@@ -4620,12 +5156,33 @@ int ha_ndbcluster::ndb_write_row(uchar *
   MY_BITMAP tmpBitmap;
   MY_BITMAP *user_cols_written_bitmap;
 #ifdef HAVE_NDB_BINLOG
-  bool haveConflictFunction =
-    (thd->slave_thread &&
-     m_share->m_cfn_share &&
-     m_share->m_cfn_share->m_conflict_fn);
+  /* Conflict resolution in slave thread */
+  bool haveConflictFunction = false;
+  if (thd->slave_thread)
+  {
+    haveConflictFunction =
+      m_share->m_cfn_share &&
+      m_share->m_cfn_share->m_conflict_fn;
+    bool conflict_handled = false;
+
+    if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
+                                                     key_rec,
+                                                     NULL,    /* old_data */
+                                                     record,  /* new_data */
+                                                     trans,
+                                                     NULL,    /* code */
+                                                     &options,
+                                                     conflict_handled))))
+      DBUG_RETURN(error);
+
+    if (unlikely(conflict_handled))
+    {
+      /* No need to continue with operation definition */
+      /* TODO : Ensure batch execution */
+      DBUG_RETURN(0);
+    }
+  };
 #endif
-  
   if (m_use_write
 #ifdef HAVE_NDB_BINLOG
       /* Conflict detection must use normal Insert */
@@ -4668,19 +5225,6 @@ int ha_ndbcluster::ndb_write_row(uchar *
   }
   else
   {
-#ifdef HAVE_NDB_BINLOG
-    if (haveConflictFunction)
-    {
-      /* Conflict detection in slave thread */
-      if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
-                                                       key_rec,
-                                                       NULL,    /* old_data */
-                                                       record,  /* new_data */
-                                                       NULL,    /* code */
-                                                       &options))))
-        DBUG_RETURN(error);
-    }
-#endif
     uchar *mask;
 
     /* Check whether Ndb table definition includes any default values. */
@@ -4827,20 +5371,34 @@ int ha_ndbcluster::primary_key_cmp(const
 }
 
 #ifdef HAVE_NDB_BINLOG
+
+static Ndb_exceptions_data StaticRefreshExceptionsData=
+{ NULL, NULL, NULL, REFRESH_ROW, 0 };
+
 int
 handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
                     const char* table_name,
+                    bool table_has_blobs,
+                    const char* handling_type,
                     const NdbRecord* key_rec,
                     const uchar* pk_row,
                     enum_conflicting_op_type op_type,
                     enum_conflict_cause conflict_cause,
                     const NdbError& conflict_error,
-                    NdbTransaction* conflict_trans,
-                    NdbError& err)
+                    NdbTransaction* conflict_trans)
 {
   DBUG_ENTER("handle_row_conflict");
 
-  if (cfn_share->m_flags & CFF_REFRESH_ROWS)
+  /*
+     We will refresh the row if the conflict function requires
+     it, or if we are handling a transactional conflict.
+  */
+  bool refresh_row =
+    (conflict_cause == TRANS_IN_CONFLICT) ||
+    (cfn_share &&
+     (cfn_share->m_flags & CFF_REFRESH_ROWS));
+
+  if (refresh_row)
   {
     /* A conflict has been detected between an applied replicated operation
      * and the data in the DB.
@@ -4863,66 +5421,220 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
     assert(key_rec != NULL);
     assert(pk_row != NULL);
 
-    /* When the slave splits an epoch into batches, a conflict row detected
-     * and refreshed in an early batch can be written to by operations in
-     * a later batch.  As the operations will not have applied, and the
-     * row has already been refreshed, we need not attempt to refresh
-     * it again
-     */
-    if ((conflict_cause == ROW_IN_CONFLICT) &&
-        (conflict_error.code == (int) error_op_after_refresh_op))
+    do
     {
-      /* Attempt to apply an operation after the row was refreshed
-       * Ignore the error
+      /* We cannot refresh a row which has Blobs, as we do not support
+       * Blob refresh yet.
+       * Rows implicated by a transactional conflict function may have
+       * Blobs.
+       * We will generate an error in this case
        */
-      DBUG_PRINT("info", ("Operation after refresh error - ignoring"));
-      DBUG_RETURN(0);
-    }
+      if (table_has_blobs)
+      {
+        char msg[FN_REFLEN];
+        my_snprintf(msg, sizeof(msg), "%s conflict handling "
+                    "on table %s failed as table has Blobs which cannot be refreshed.",
+                    handling_type,
+                    table_name);
+
+        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_EXCEPTIONS_WRITE_ERROR,
+                            ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+
+        DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+      }
+
+      /* When the slave splits an epoch into batches, a conflict row detected
+       * and refreshed in an early batch can be written to by operations in
+       * a later batch.  As the operations will not have applied, and the
+       * row has already been refreshed, we need not attempt to refresh
+       * it again
+       */
+      if ((conflict_cause == ROW_IN_CONFLICT) &&
+          (conflict_error.code == (int) error_op_after_refresh_op))
+      {
+        /* Attempt to apply an operation after the row was refreshed
+         * Ignore the error
+         */
+        DBUG_PRINT("info", ("Operation after refresh error - ignoring"));
+        break;
+      }
 
-    /* When a delete operation finds that the row does not exist, it indicates
-     * a DELETE vs DELETE conflict.  If we refresh the row then we can get
-     * non deterministic behaviour depending on slave batching as follows :
-     *   Row is deleted
-     *
-     *     Case 1
-     *       Slave applied DELETE, INSERT in 1 batch
-     *
-     *         After first batch, the row is present (due to INSERT), it is
-     *         refreshed.
-     *
-     *     Case 2
-     *       Slave applied DELETE in 1 batch, INSERT in 2nd batch
-     *
-     *         After first batch, the row is not present, it is refreshed
-     *         INSERT is then rejected.
-     *
-     * The problem of not being able to 'record' a DELETE vs DELETE conflict
-     * is known.  We attempt at least to give consistent behaviour for
-     * DELETE vs DELETE conflicts by :
-     *   NOT refreshing a row when a DELETE vs DELETE conflict is detected
-     * This should map all batching scenarios onto Case1.
-     */
-    if ((op_type == DELETE_ROW) &&
-        (conflict_cause == ROW_DOES_NOT_EXIST))
+      /* When a delete operation finds that the row does not exist, it indicates
+       * a DELETE vs DELETE conflict.  If we refresh the row then we can get
+       * non deterministic behaviour depending on slave batching as follows :
+       *   Row is deleted
+       *
+       *     Case 1
+       *       Slave applied DELETE, INSERT in 1 batch
+       *
+       *         After first batch, the row is present (due to INSERT), it is
+       *         refreshed.
+       *
+       *     Case 2
+       *       Slave applied DELETE in 1 batch, INSERT in 2nd batch
+       *
+       *         After first batch, the row is not present, it is refreshed
+       *         INSERT is then rejected.
+       *
+       * The problem of not being able to 'record' a DELETE vs DELETE conflict
+       * is known.  We attempt at least to give consistent behaviour for
+       * DELETE vs DELETE conflicts by :
+       *   NOT refreshing a row when a DELETE vs DELETE conflict is detected
+       * This should map all batching scenarios onto Case1.
+       */
+      if ((op_type == DELETE_ROW) &&
+          (conflict_cause == ROW_DOES_NOT_EXIST))
+      {
+        DBUG_PRINT("info", ("Delete vs Delete detected, NOT refreshing"));
+        break;
+      }
+
+      /*
+        We give the refresh operation some 'exceptions data', so that
+        it can be identified as part of conflict resolution when
+        handling operation errors.
+        Specifically we need to be able to handle duplicate row
+        refreshes.
+        As there is no unique exceptions data, we use a singleton.
+
+        We also need to 'force' the ANYVALUE of the row to 0 to
+        indicate that the refresh is locally-sourced.
+        Otherwise we can 'pickup' the ANYVALUE of a previous
+        update to the row.
+        If some previous update in this transaction came from a
+        Slave, then using its ANYVALUE can result in that Slave
+        ignoring this correction.
+      */
+      NdbOperation::OperationOptions options;
+      options.optionsPresent =
+        NdbOperation::OperationOptions::OO_CUSTOMDATA |
+        NdbOperation::OperationOptions::OO_ANYVALUE;
+      options.customData = &StaticRefreshExceptionsData;
+      options.anyValue = 0;
+
+      /* Create a refresh to operation to realign other clusters */
+      // TODO AnyValue
+      // TODO Do we ever get non-PK key?
+      //      Keyless table?
+      //      Unique index
+      const NdbOperation* refresh_op= conflict_trans->refreshTuple(key_rec,
+                                                                   (const char*) pk_row,
+                                                                   &options,
+                                                                   sizeof(options));
+      if (!refresh_op)
+      {
+        NdbError err = conflict_trans->getNdbError();
+
+        if (err.status == NdbError::TemporaryError)
+        {
+          /* Slave will roll back and retry entire transaction. */
+          ERR_RETURN(err);
+        }
+        else
+        {
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "Row conflict handling "
+                      "on table %s hit Ndb error %d '%s'",
+                      table_name,
+                      err.code,
+                      err.message);
+          push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                              ER_EXCEPTIONS_WRITE_ERROR,
+                              ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+          /* Slave will stop replication. */
+          DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+        }
+      }
+    } while(0); // End of 'refresh' block
+  }
+
+  if (cfn_share &&
+      cfn_share->m_ex_tab != NULL)
+  {
+    NdbError err;
+    assert(err.code == 0);
+    do
     {
-      DBUG_PRINT("info", ("Delete vs Delete detected, NOT refreshing"));
-      DBUG_RETURN(0);
-    }
+      /* Have exceptions table, add row to it */
+      const NDBTAB *ex_tab= cfn_share->m_ex_tab;
 
-    /* Create a refresh to operation to realign other clusters */
-    // TODO AnyValue
-    // TODO Do we ever get non-PK key?
-    //      Keyless table?
-    //      Unique index
-    const NdbOperation* refresh_op= conflict_trans->refreshTuple(key_rec,
-                                                                 (const char*) pk_row);
+      /* get insert op */
+      NdbOperation *ex_op= conflict_trans->getNdbOperation(ex_tab);
+      if (ex_op == NULL)
+      {
+        err= conflict_trans->getNdbError();
+        break;
+      }
+      if (ex_op->insertTuple() == -1)
+      {
+        err= ex_op->getNdbError();
+        break;
+      }
+      {
+        uint32 server_id= (uint32)::server_id;
+        uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
+        uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
+        uint32 count= (uint32)++(cfn_share->m_count);
+        if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
+            ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
+            ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
+            ex_op->setValue((Uint32)3, (const char *)&(count)))
+        {
+          err= ex_op->getNdbError();
+          break;
+        }
+      }
+      /* copy primary keys */
+      {
+        const int fixed_cols= 4;
+        int nkey= cfn_share->m_pk_cols;
+        int k;
+        for (k= 0; k < nkey; k++)
+        {
+          DBUG_ASSERT(pk_row != NULL);
+          const uchar* data= pk_row + cfn_share->m_offset[k];
+          if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
+          {
+            err= ex_op->getNdbError();
+            break;
+          }
+        }
+      }
+    } while (0);
 
-    if (!refresh_op)
+    if (err.code != 0)
     {
-      err= conflict_trans->getNdbError();
-      DBUG_RETURN(1);
+      char msg[FN_REFLEN];
+      my_snprintf(msg, sizeof(msg), "%s conflict handling "
+                  "on table %s hit Ndb error %d '%s'",
+                  handling_type,
+                  table_name,
+                  err.code,
+                  err.message);
+
+      if (err.classification == NdbError::SchemaError)
+      {
+        /* Something up with Exceptions table schema, forget it */
+        NdbDictionary::Dictionary* dict= conflict_trans->getNdb()->getDictionary();
+        dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
+        cfn_share->m_ex_tab= NULL;
+      }
+      else if (err.status == NdbError::TemporaryError)
+      {
+        /* Slave will roll back and retry entire transaction. */
+        ERR_RETURN(err);
+      }
+      else
+      {
+        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_EXCEPTIONS_WRITE_ERROR,
+                            ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+        /* Slave will stop replication. */
+        DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+      }
     }
-  } /* if (cfn_share->m_flags & CFF_REFRESH_ROWS) */
+  } /* if (cfn_share->m_ex_tab != NULL) */
 
   DBUG_RETURN(0);
 };
@@ -4983,11 +5695,15 @@ int ha_ndbcluster::exec_bulk_update(uint
       no_uncommitted_rows_execute_failure();
       DBUG_RETURN(ndb_err(trans));
     }
-    DBUG_PRINT("info", ("ignore_count: %u", ignore_count));
-    assert(m_rows_changed >= ignore_count);
-    assert(m_rows_updated >= ignore_count);
-    m_rows_changed-= ignore_count;
-    m_rows_updated-= ignore_count;
+    THD *thd= table->in_use;
+    if (!thd->slave_thread)
+    {
+      DBUG_PRINT("info", ("ignore_count: %u", ignore_count));
+      assert(m_rows_changed >= ignore_count);
+      assert(m_rows_updated >= ignore_count);
+      m_rows_changed-= ignore_count;
+      m_rows_updated-= ignore_count;
+    }
     DBUG_RETURN(0);
   }
 
@@ -5023,10 +5739,14 @@ int ha_ndbcluster::exec_bulk_update(uint
     no_uncommitted_rows_execute_failure();
     DBUG_RETURN(ndb_err(trans));
   }
-  assert(m_rows_changed >= ignore_count);
-  assert(m_rows_updated >= ignore_count);
-  m_rows_changed-= ignore_count;
-  m_rows_updated-= ignore_count;
+  THD *thd= table->in_use;
+  if (!thd->slave_thread)
+  {
+    assert(m_rows_changed >= ignore_count);
+    assert(m_rows_updated >= ignore_count);
+    m_rows_changed-= ignore_count;
+    m_rows_updated-= ignore_count;
+  }
   DBUG_RETURN(0);
 }
 
@@ -5280,17 +6000,27 @@ int ha_ndbcluster::ndb_update_row(const
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
 
-    if (thd->slave_thread && m_share->m_cfn_share &&
-        m_share->m_cfn_share->m_conflict_fn)
+    if (thd->slave_thread)
     {
-       /* Conflict resolution in slave thread. */
+      bool conflict_handled = false;
+      /* Conflict resolution in slave thread. */
+
       if (unlikely((error = prepare_conflict_detection(UPDATE_ROW,
                                                        key_rec,
                                                        old_data,
                                                        new_data,
+                                                       trans,
                                                        &code,
-                                                       &options))))
+                                                       &options,
+                                                       conflict_handled))))
         DBUG_RETURN(error);
+
+      if (unlikely(conflict_handled))
+      {
+        /* No need to continue with operation defintion */
+        /* TODO : Ensure batch execution */
+        DBUG_RETURN(0);
+      }
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent !=0)
@@ -5335,10 +6065,13 @@ int ha_ndbcluster::ndb_update_row(const
   m_rows_changed++;
   m_rows_updated++;
 
-  assert(m_rows_changed >= ignore_count);
-  assert(m_rows_updated >= ignore_count);
-  m_rows_changed-= ignore_count;
-  m_rows_updated-= ignore_count;
+  if (!thd->slave_thread)
+  {
+    assert(m_rows_changed >= ignore_count);
+    assert(m_rows_updated >= ignore_count);
+    m_rows_changed-= ignore_count;
+    m_rows_updated-= ignore_count;
+  }
 
   DBUG_RETURN(0);
 }
@@ -5392,9 +6125,13 @@ int ha_ndbcluster::end_bulk_delete()
       no_uncommitted_rows_execute_failure();
       DBUG_RETURN(ndb_err(trans));
     }
-    DBUG_PRINT("info", ("ignore_count: %u", ignore_count));
-    assert(m_rows_deleted >= ignore_count);
-    m_rows_deleted-= ignore_count;
+    THD *thd= table->in_use;
+    if (!thd->slave_thread)
+    {
+      DBUG_PRINT("info", ("ignore_count: %u", ignore_count));
+      assert(m_rows_deleted >= ignore_count);
+      m_rows_deleted-= ignore_count;
+    }
     DBUG_RETURN(0);
   }
 
@@ -5430,9 +6167,13 @@ int ha_ndbcluster::end_bulk_delete()
     DBUG_RETURN(ndb_err(trans));
   }
 
-  assert(m_rows_deleted >= ignore_count);
-  m_rows_deleted-= ignore_count;
-  no_uncommitted_rows_update(ignore_count);
+  THD *thd= table->in_use;
+  if (!thd->slave_thread)
+  {
+    assert(m_rows_deleted >= ignore_count);
+    m_rows_deleted-= ignore_count;
+    no_uncommitted_rows_update(ignore_count);
+  }
   DBUG_RETURN(0);
 }
 
@@ -5559,17 +6300,27 @@ int ha_ndbcluster::ndb_delete_row(const
     Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
-    if (thd->slave_thread && m_share->m_cfn_share &&
-        m_share->m_cfn_share->m_conflict_fn)
+    if (thd->slave_thread)
     {
+       bool conflict_handled = false;
+
       /* Conflict resolution in slave thread. */
       if (unlikely((error = prepare_conflict_detection(DELETE_ROW,
                                                        key_rec,
                                                        key_row, /* old_data */
                                                        NULL,    /* new_data */
+                                                       trans,
                                                        &code,
-                                                       &options))))
+                                                       &options,
+                                                       conflict_handled))))
         DBUG_RETURN(error);
+
+      if (unlikely(conflict_handled))
+      {
+        /* No need to continue with operation definition */
+        /* TODO : Ensure batch execution */
+        DBUG_RETURN(0);
+      }
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent != 0)
@@ -5627,9 +6378,12 @@ int ha_ndbcluster::ndb_delete_row(const
   }
   if (!primary_key_update)
   {
-    assert(m_rows_deleted >= ignore_count);
-    m_rows_deleted-= ignore_count;
-    no_uncommitted_rows_update(ignore_count);
+    if (!thd->slave_thread)
+    {
+      assert(m_rows_deleted >= ignore_count);
+      m_rows_deleted-= ignore_count;
+      no_uncommitted_rows_update(ignore_count);
+    }
   }
   DBUG_RETURN(0);
 }
@@ -7477,6 +8231,8 @@ int ndbcluster_commit(handlerton *hton,
   Thd_ndb *thd_ndb= get_thd_ndb(thd);
   Ndb *ndb= thd_ndb->ndb;
   NdbTransaction *trans= thd_ndb->trans;
+  bool retry_slave_trans = false;
+  (void) retry_slave_trans;
 
   DBUG_ENTER("ndbcluster_commit");
   DBUG_ASSERT(ndb);
@@ -7516,9 +8272,23 @@ int ndbcluster_commit(handlerton *hton,
 
   if (thd->slave_thread)
   {
-    if (!g_ndb_slave_state.current_conflict_defined_op_count ||
-        !thd_ndb->m_unsent_bytes ||
-        !(res= execute_no_commit(thd_ndb, trans, TRUE)))
+#ifdef HAVE_NDB_BINLOG
+    /* If this slave transaction has included conflict detecting ops
+     * and some defined operations are not yet sent, then perform
+     * an execute(NoCommit) before committing, as conflict op handling
+     * is done by execute(NoCommit)
+     */
+    if (g_ndb_slave_state.conflict_flags & SCS_OPS_DEFINED)
+    {
+      if (thd_ndb->m_unsent_bytes)
+        res = execute_no_commit(thd_ndb, trans, TRUE);
+    }
+
+    if (likely(res == 0))
+      res = g_ndb_slave_state.atConflictPreCommit(retry_slave_trans);
+#endif /* HAVE_NDB_BINLOG */
+
+    if (likely(res == 0))
       res= execute_commit(thd, thd_ndb, trans, 1, TRUE);
 
     update_slave_api_stats(thd_ndb->ndb);
@@ -7547,12 +8317,50 @@ int ndbcluster_commit(handlerton *hton,
 
   if (res != 0)
   {
-    const NdbError err= trans->getNdbError();
-    const NdbOperation *error_op= trans->getNdbErrorOperation();
-    set_ndb_err(thd, err);
-    res= ndb_to_mysql_error(&err);
-    if (res != -1)
-      ndbcluster_print_error(res, error_op);
+#ifdef HAVE_NDB_BINLOG
+    if (retry_slave_trans)
+    {
+      if (st_ndb_slave_state::MAX_RETRY_TRANS_COUNT >
+          g_ndb_slave_state.retry_trans_count++)
+      {
+        /*
+           Warning is necessary to cause retry from slave.cc
+           exec_relay_log_event()
+        */
+        push_warning(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                     ER_GET_TEMPORARY_ERRMSG,
+                     SLAVE_SILENT_RETRY_MSG);
+        /*
+          Set retry count to zero to:
+          1) Avoid consuming slave-temp-error retry attempts
+          2) Ensure no inter-attempt sleep
+
+          Better fix : Save + restore retry count around transactional
+          conflict handling
+        */
+        ndb_mi_set_relay_log_trans_retries(0);
+      }
+      else
+      {
+        /*
+           Too many retries, print error and exit - normal
+           too many retries mechanism will cause exit
+         */
+        sql_print_error("Ndb slave retried transaction %u time(s) in vain.  Giving up.",
+                        st_ndb_slave_state::MAX_RETRY_TRANS_COUNT);
+      }
+      res= ER_GET_TEMPORARY_ERRMSG;
+    }
+    else
+#endif
+    {
+      const NdbError err= trans->getNdbError();
+      const NdbOperation *error_op= trans->getNdbErrorOperation();
+      set_ndb_err(thd, err);
+      res= ndb_to_mysql_error(&err);
+      if (res != -1)
+        ndbcluster_print_error(res, error_op);
+    }
   }
   else
   {
@@ -8748,6 +9556,7 @@ int ha_ndbcluster::create(const char *na
     switch(conflict_fn->type)
     {
     case CFT_NDB_EPOCH:
+    case CFT_NDB_EPOCH_TRANS:
     {
       /* Default 6 extra Gci bits allows 2^6 == 64
        * epochs / saveGCP, a comfortable default
@@ -11418,6 +12227,12 @@ static int ndbcluster_init(void *p)
   if (ndbcluster_inited)
     DBUG_RETURN(FALSE);
 
+#ifdef HAVE_NDB_BINLOG
+  /* Check const alignment */
+  assert(DependencyTracker::InvalidTransactionId ==
+         Ndb_binlog_extra_row_info::InvalidTransactionId);
+#endif
+
   pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
   pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&COND_ndb_util_thread, NULL);
@@ -16268,6 +17083,18 @@ static MYSQL_SYSVAR_BOOL(
 );
 
 
+my_bool opt_ndb_log_transaction_id;
+static MYSQL_SYSVAR_BOOL(
+  log_transaction_id,               /* name */
+  opt_ndb_log_transaction_id,       /* var  */
+  PLUGIN_VAR_OPCMDARG,
+  "Log Ndb transaction identities per row in the Binlog",
+  NULL,                             /* check func. */
+  NULL,                             /* update func. */
+  0                                 /* default */
+);
+
+
 static MYSQL_SYSVAR_STR(
   connectstring,                    /* name */
   opt_ndb_connectstring,            /* var */
@@ -16375,6 +17202,7 @@ static struct st_mysql_sys_var* system_v
   MYSQL_SYSVAR(log_binlog_index),
   MYSQL_SYSVAR(log_empty_epochs),
   MYSQL_SYSVAR(log_apply_status),
+  MYSQL_SYSVAR(log_transaction_id),
   MYSQL_SYSVAR(connectstring),
   MYSQL_SYSVAR(mgmd_host),
   MYSQL_SYSVAR(nodeid),

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-07-08 12:31:38 +0000
+++ b/sql/ha_ndbcluster.h	2011-09-06 10:23:37 +0000
@@ -127,6 +127,7 @@ enum enum_conflict_fn_type
   ,CFT_NDB_OLD
   ,CFT_NDB_MAX_DEL_WIN
   ,CFT_NDB_EPOCH
+  ,CFT_NDB_EPOCH_TRANS
   ,CFT_NUMBER_OF_CFTS /* End marker */
 };
 
@@ -163,7 +164,8 @@ enum enum_conflicting_op_type
 {                /* NdbApi          */
   WRITE_ROW,     /* insert (!write) */
   UPDATE_ROW,    /* update          */
-  DELETE_ROW     /* delete          */
+  DELETE_ROW,    /* delete          */
+  REFRESH_ROW    /* refresh         */
 };
 
 /*
@@ -179,20 +181,27 @@ typedef int (* prepare_detect_func) (str
                                      const MY_BITMAP* write_set,
                                      class NdbInterpretedCode* code);
 
+enum enum_conflict_fn_flags
+{
+  CF_TRANSACTIONAL = 1
+};
+
 struct st_conflict_fn_def
 {
   const char *name;
   enum_conflict_fn_type type;
   const st_conflict_fn_arg_def* arg_defs;
   prepare_detect_func prep_func;
+  uint8 flags; /* enum_conflict_fn_flags */
 };
 
 /* What sort of conflict was found */
 enum enum_conflict_cause
 {
-  ROW_ALREADY_EXISTS,
-  ROW_DOES_NOT_EXIST,
-  ROW_IN_CONFLICT
+  ROW_ALREADY_EXISTS,   /* On insert */
+  ROW_DOES_NOT_EXIST,   /* On Update, Delete */
+  ROW_IN_CONFLICT,      /* On Update, Delete */
+  TRANS_IN_CONFLICT     /* Any of above, or implied by transaction */
 };
 
 /* NdbOperation custom data which points out handler and record. */
@@ -201,6 +210,7 @@ struct Ndb_exceptions_data {
   const NdbRecord* key_rec;
   const uchar* row;
   enum_conflicting_op_type op_type;
+  Uint64 trans_id;
 };
 
 enum enum_conflict_fn_table_flags
@@ -343,6 +353,32 @@ inline void set_binlog_use_update(NDB_SH
 inline my_bool get_binlog_use_update(NDB_SHARE *share)
 { return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
 
+enum enum_slave_trans_conflict_apply_state
+{
+  /* Normal with optional row-level conflict detection */
+  SAS_NORMAL,
+
+  /*
+    SAS_TRACK_TRANS_DEPENDENCIES
+    Track inter-transaction dependencies
+  */
+  SAS_TRACK_TRANS_DEPENDENCIES,
+
+  /*
+    SAS_APPLY_TRANS_DEPENDENCIES
+    Apply only non conflicting transactions
+  */
+  SAS_APPLY_TRANS_DEPENDENCIES
+};
+
+enum enum_slave_conflict_flags
+{
+  /* Conflict detection Ops defined */
+  SCS_OPS_DEFINED = 1,
+  /* Conflict detected on table with transactional resolution */
+  SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2
+};
+
 /*
   State associated with the Slave thread
   (From the Ndb handler's point of view)
@@ -350,17 +386,52 @@ inline my_bool get_binlog_use_update(NDB
 struct st_ndb_slave_state
 {
   /* Counter values for current slave transaction */
-  Uint32 current_conflict_defined_op_count;
   Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
   Uint64 current_master_server_epoch;
   Uint64 current_max_rep_epoch;
+  uint8 conflict_flags; /* enum_slave_conflict_flags */
+    /* Transactional conflict detection */
+  Uint32 retry_trans_count;
+  Uint32 current_trans_row_conflict_count;
+  Uint32 current_trans_row_reject_count;
+  Uint32 current_trans_in_conflict_count;
 
   /* Cumulative counter values */
   Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
   Uint64 max_rep_epoch;
   Uint32 sql_run_id;
+  /* Transactional conflict detection */
+  Uint64 trans_row_conflict_count;
+  Uint64 trans_row_reject_count;
+  Uint64 trans_detect_iter_count;
+  Uint64 trans_in_conflict_count;
+  Uint64 trans_conflict_commit_count;
+
+  static const Uint32 MAX_RETRY_TRANS_COUNT = 100;
+
+  /*
+    Slave Apply State
+
+    State of Binlog application from Ndb point of view.
+  */
+  enum_slave_trans_conflict_apply_state trans_conflict_apply_state;
+
+  MEM_ROOT conflict_mem_root;
+  class DependencyTracker* trans_dependency_tracker;
 
   /* Methods */
+  void atStartSlave();
+  int  atPrepareConflictDetection(const NdbDictionary::Table* table,
+                                  const NdbRecord* key_rec,
+                                  const uchar* row_data,
+                                  Uint64 transaction_id,
+                                  bool& handle_conflict_now);
+  int  atTransConflictDetected(Uint64 transaction_id);
+  int  atConflictPreCommit(bool& retry_slave_trans);
+
+  void atBeginTransConflictHandling();
+  void atEndTransConflictHandling();
+
   void atTransactionCommit();
   void atTransactionAbort();
   void atResetSlave();
@@ -370,6 +441,8 @@ struct st_ndb_slave_state
                           Uint64 row_epoch,
                           bool is_row_server_id_local);
 
+  void resetPerAttemptCounters();
+
   st_ndb_slave_state();
 };
 
@@ -702,8 +775,10 @@ private:
                                  const NdbRecord* key_rec,
                                  const uchar* old_data,
                                  const uchar* new_data,
+                                 NdbTransaction* trans,
                                  NdbInterpretedCode* code,
-                                 NdbOperation::OperationOptions* options);
+                                 NdbOperation::OperationOptions* options,
+                                 bool& conflict_handled);
 #endif
   void setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
                                     const uchar **key_row,

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-07-08 12:31:38 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-09-06 10:23:37 +0000
@@ -47,6 +47,7 @@ extern my_bool opt_ndb_log_binlog_index;
 extern my_bool opt_ndb_log_apply_status;
 extern ulong opt_ndb_extra_logging;
 extern st_ndb_slave_state g_ndb_slave_state;
+extern my_bool opt_ndb_log_transaction_id;
 
 bool ndb_log_empty_epochs(void);
 
@@ -4247,13 +4248,15 @@ static const st_conflict_fn_arg_def epoc
 static const st_conflict_fn_def conflict_fns[]=
 {
   { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
-    &resolve_col_args[0], row_conflict_fn_max_del_win },
+    &resolve_col_args[0], row_conflict_fn_max_del_win, 0 },
   { "NDB$MAX",            CFT_NDB_MAX,
-    &resolve_col_args[0], row_conflict_fn_max         },
+    &resolve_col_args[0], row_conflict_fn_max,         0 },
   { "NDB$OLD",            CFT_NDB_OLD,
-    &resolve_col_args[0], row_conflict_fn_old         },
+    &resolve_col_args[0], row_conflict_fn_old,         0 },
+  { "NDB$EPOCH_TRANS",    CFT_NDB_EPOCH_TRANS,
+    &epoch_fn_args[0],    row_conflict_fn_epoch,       CF_TRANSACTIONAL},
   { "NDB$EPOCH",          CFT_NDB_EPOCH,
-    &epoch_fn_args[0],    row_conflict_fn_epoch       }
+    &epoch_fn_args[0],    row_conflict_fn_epoch,       0 }
 };
 
 static unsigned n_conflict_fns=
@@ -4508,6 +4511,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
     break;
   }
   case CFT_NDB_EPOCH:
+  case CFT_NDB_EPOCH_TRANS:
   {
     if (num_args > 1)
     {
@@ -4520,7 +4524,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
     /* Check that table doesn't have Blobs as we don't support that */
     if (share->flags & NSF_BLOB_FLAG)
     {
-      my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH.");
+      my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH[_TRANS].");
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
@@ -4530,7 +4534,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
      * represent SavePeriod/EpochPeriod
      */
     if (ndbtab->getExtraRowGciBits() == 0)
-      sql_print_information("Ndb Slave : CFT_NDB_EPOCH, low epoch resolution");
+      sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution");
 
     if (ndbtab->getExtraRowAuthorBits() == 0)
     {
@@ -6140,8 +6144,9 @@ ndb_find_binlog_index_row(ndb_binlog_ind
   return row;
 }
 
+
 static int
-ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
+ndb_binlog_thread_handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
                                     ndb_binlog_index_row **rows,
                                     injector::transaction &trans,
                                     unsigned &trans_row_count,
@@ -6288,6 +6293,17 @@ ndb_binlog_thread_handle_data_event(Ndb
   uint32 logged_server_id= anyValue;
   ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
 
+  /*
+     Get NdbApi transaction id for this event to put into Binlog
+  */
+  Ndb_binlog_extra_row_info extra_row_info;
+  if (opt_ndb_log_transaction_id)
+  {
+    extra_row_info.setFlags(Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID);
+    extra_row_info.setTransactionId(pOp->getTransId());
+    thd->binlog_row_event_extra_data = extra_row_info.generateBuffer();
+  }
+
   DBUG_ASSERT(trans.good());
   DBUG_ASSERT(table != 0);
 
@@ -6457,6 +6473,11 @@ ndb_binlog_thread_handle_data_event(Ndb
     break;
   }
 
+  if (opt_ndb_log_transaction_id)
+  {
+    thd->binlog_row_event_extra_data = NULL;
+  }
+
   if (share->flags & NSF_BLOB_FLAG)
   {
     my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
@@ -7357,7 +7378,8 @@ restart_cluster_failure:
 #endif
           if ((unsigned) pOp->getEventType() <
               (unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
-            ndb_binlog_thread_handle_data_event(i_ndb, pOp, &rows, trans, trans_row_count, trans_slave_row_count);
+            ndb_binlog_thread_handle_data_event(thd, i_ndb, pOp, &rows, trans,
+                                                trans_row_count, trans_slave_row_count);
           else
           {
             ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
@@ -7779,4 +7801,119 @@ ulong opt_server_id_mask = ~0;
 
 #endif
 
+Ndb_binlog_extra_row_info::
+Ndb_binlog_extra_row_info()
+{
+  flags = 0;
+  transactionId = InvalidTransactionId;
+  /* Prepare buffer with extra row info buffer bytes */
+  buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = 0;
+  buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] = ERIF_NDB;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setFlags(Uint16 _flags)
+{
+  flags = _flags;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setTransactionId(Uint64 _transactionId)
+{
+  assert(_transactionId != InvalidTransactionId);
+  transactionId = _transactionId;
+};
+
+int
+Ndb_binlog_extra_row_info::
+loadFromBuffer(const uchar* extra_row_info)
+{
+  assert(extra_row_info);
+
+  Uint8 format = extra_row_info[ EXTRA_ROW_INFO_FORMAT_OFFSET ];
+  Uint8 length = extra_row_info[ EXTRA_ROW_INFO_LEN_OFFSET ];
+
+  if (likely(format == ERIF_NDB))
+  {
+    if (likely(length >= FLAGS_SIZE))
+    {
+      const uchar* data = &extra_row_info[ EXTRA_ROW_INFO_HDR_BYTES ];
+      Uint8 nextPos = 0;
+
+      /* Have flags at least */
+      Uint16 netFlags;
+      memcpy(&netFlags, &data[ nextPos ], FLAGS_SIZE);
+      nextPos += FLAGS_SIZE;
+      flags = uint2korr((const char*) &netFlags);
+
+      if (flags & NDB_ERIF_TRANSID)
+      {
+        if (likely((nextPos + TRANSID_SIZE) <= length))
+        {
+          /*
+            Correct length, retrieve transaction id, converting from
+            little endian if necessary.
+          */
+          Uint64 netTransId;
+          memcpy(&netTransId,
+                 &data[ nextPos ],
+                 TRANSID_SIZE);
+          nextPos += TRANSID_SIZE;
+          transactionId = uint8korr((const char*) &netTransId);
+        }
+        else
+        {
+          /*
+             Error - supposed to have transaction id, but
+             buffer too short
+          */
+          return -1;
+        }
+      }
+    }
+  }
+
+  /* We currently ignore other formats of extra binlog info, and
+   * different lengths.
+   */
+
+  return 0;
+}
+
+uchar*
+Ndb_binlog_extra_row_info::generateBuffer()
+{
+  /*
+    Here we write out the buffer in network format,
+    based on the current member settings.
+  */
+  Uint8 nextPos = EXTRA_ROW_INFO_HDR_BYTES;
+
+  if (flags)
+  {
+    /* Write current flags into buff */
+    Uint16 netFlags = uint2korr((const char*) &flags);
+    memcpy(&buff[ nextPos ], &netFlags, FLAGS_SIZE);
+    nextPos += FLAGS_SIZE;
+
+    if (flags & NDB_ERIF_TRANSID)
+    {
+      Uint64 netTransactionId = uint8korr((const char*) &transactionId);
+      memcpy(&buff[ nextPos ], &netTransactionId, TRANSID_SIZE);
+      nextPos += TRANSID_SIZE;
+    }
+
+    assert( nextPos <= MaxLen );
+    /* Set length */
+    assert( buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] == ERIF_NDB );
+    buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = nextPos - EXTRA_ROW_INFO_HDR_BYTES;
+
+    return buff;
+  }
+  return 0;
+}
+
+// #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
 #endif

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-07-04 16:30:34 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-09-06 10:23:37 +0000
@@ -382,3 +382,49 @@ void ndbcluster_anyvalue_set_serverid(Ui
 #ifndef DBUG_OFF
 void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue);
 #endif
+
+/*
+   Helper for reading/writing Binlog extra row info
+   in Ndb format.
+   It contains an internal buffer, which can be passed
+   in the thd variable when writing binlog entries if
+   the object stays in scope around the write.
+*/
+class Ndb_binlog_extra_row_info
+{
+public:
+  static const Uint32 FLAGS_SIZE = sizeof(Uint16);
+  static const Uint32 TRANSID_SIZE = sizeof(Uint64);
+  static const Uint32 MaxLen =
+    EXTRA_ROW_INFO_HDR_BYTES +
+    FLAGS_SIZE +
+    TRANSID_SIZE;
+
+  static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+  enum Flags
+  {
+    NDB_ERIF_TRANSID = 0x1
+  };
+
+  Ndb_binlog_extra_row_info();
+
+  int loadFromBuffer(const uchar* extra_row_info_ptr);
+
+  Uint16 getFlags() const
+  {
+    return flags;
+  }
+  void setFlags(Uint16 _flags);
+  Uint64 getTransactionId() const
+  { return transactionId; };
+  void setTransactionId(Uint64 _transactionId);
+  uchar* getBuffPtr()
+  { return buff; };
+  uchar* generateBuffer();
+private:
+  uchar buff[MaxLen];
+  Uint16 flags;
+  Uint64 transactionId;
+};
+

=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc	2011-06-30 15:59:25 +0000
+++ b/sql/log_event.cc	2011-09-06 10:23:37 +0000
@@ -1952,6 +1952,29 @@ void Rows_log_event::print_verbose(IO_CA
   const char *sql_command, *sql_clause1, *sql_clause2;
   Log_event_type type_code= get_type_code();
   
+#ifndef MCP_WL5353
+  if (m_extra_row_data)
+  {
+    uint8 extra_data_len= m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
+    my_b_printf(file, "### Extra row data format: %u, len: %u :",
+                m_extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET],
+                extra_data_len);
+    if (extra_data_len)
+    {
+      /*
+         Buffer for hex view of string, including '0x' prefix,
+         2 hex chars / byte and trailing 0
+      */
+      const int buff_len= 2 + (256 * 2) + 1;
+      char buff[buff_len];
+      str_to_hex(buff, (const char*) &m_extra_row_data[EXTRA_ROW_INFO_HDR_BYTES],
+                 extra_data_len);
+      my_b_printf(file, "%s", buff);
+    }
+    my_b_printf(file, "\n");
+  }
+#endif
+
   switch (type_code) {
   case WRITE_ROWS_EVENT:
     sql_command= "INSERT INTO";
@@ -3091,6 +3114,46 @@ int Query_log_event::do_apply_event(Rela
   return do_apply_event(rli, query, q_len);
 }
 
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+
+static bool is_silent_error(THD* thd)
+{
+  DBUG_ENTER("is_silent_error");
+  List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
+  MYSQL_ERROR *err;
+
+  while ((err= it++))
+  {
+    DBUG_PRINT("info", ("Error : %u : %s", err->code, err->msg));
+
+    switch (err->code)
+    {
+    case ER_GET_TEMPORARY_ERRMSG:
+    {
+      /*
+         If we are rolling back due to an explicit request, do so
+         silently
+      */
+      if (strcmp(SLAVE_SILENT_RETRY_MSG, err->msg) == 0)
+      {
+        DBUG_PRINT("info", ("Silent retry"));
+        DBUG_RETURN(true);
+      }
+
+      break;
+    }
+    default:
+      break;
+    }
+  }
+
+  DBUG_RETURN(false);
+}
+
+// #ifdef HAVE_NDB_BINLOG
+#endif
+#endif // #ifndef MCP_WL5353
 
 /**
   @todo
@@ -3411,11 +3474,19 @@ Default database: '%s'. Query: '%s'",
     */
     else if (thd->is_slave_error || thd->is_fatal_error)
     {
-      rli->report(ERROR_LEVEL, actual_error,
-                      "Error '%s' on query. Default database: '%s'. Query: '%s'",
-                      (actual_error ? thd->main_da.message() :
-                       "unexpected success or fatal error"),
-                      print_slave_db_safe(thd->db), query_arg);
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+      bool be_silent= is_silent_error(thd);
+      if (!be_silent)
+#endif
+#endif // #ifndef MCP_WL5353
+      {
+        rli->report(ERROR_LEVEL, actual_error,
+                    "Error '%s' on query. Default database: '%s'. Query: '%s'",
+                    (actual_error ? thd->main_da.message() :
+                     "unexpected success or fatal error"),
+                    print_slave_db_safe(thd->db), query_arg);
+      }
       thd->is_slave_error= 1;
     }
 
@@ -7128,6 +7199,61 @@ const char *sql_ex_info::init(const char
   return buf;
 }
 
+#ifndef MCP_WL5353
+#ifndef DBUG_OFF
+#ifndef MYSQL_CLIENT
+static uchar dbug_extra_row_data_val= 0;
+
+/**
+   set_extra_data
+
+   Called during self-test to generate various
+   self-consistent binlog row event extra
+   thread data structures which can be checked
+   when reading the binlog.
+
+   @param thd  Current thd
+   @param arr  Buffer to use
+*/
+void set_extra_data(THD* thd, uchar* arr)
+{
+  assert(thd->binlog_row_event_extra_data == NULL);
+  uchar val= dbug_extra_row_data_val++;
+  arr[EXTRA_ROW_INFO_LEN_OFFSET]= val;
+  arr[EXTRA_ROW_INFO_FORMAT_OFFSET]= val;
+  for (uchar i=0; i<val; i++)
+    arr[EXTRA_ROW_INFO_HDR_BYTES+i]= val;
+
+  thd->binlog_row_event_extra_data= arr;
+}
+
+#endif // #ifndef MYSQL_CLIENT
+
+/**
+   check_extra_data
+
+   Called during self-test to check that
+   binlog row event extra data is self-
+   consistent as defined by the set_extra_data
+   function above.
+
+   Will assert(false) if not.
+
+   @param extra_row_data
+*/
+void check_extra_data(uchar* extra_row_data)
+{
+  assert(extra_row_data);
+  uint16 len= extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
+  assert(extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET] == len);
+  for (uint16 i= 0; i < len; i++)
+  {
+    assert(extra_row_data[EXTRA_ROW_INFO_HDR_BYTES + i] == len);
+  }
+}
+
+#endif  // #ifndef DBUG_OFF
+#endif  // #ifndef MCP_WL5353
 
 /**************************************************************************
 	Rows_log_event member functions
@@ -7141,7 +7267,10 @@ Rows_log_event::Rows_log_event(THD *thd_
     m_table(tbl_arg),
     m_table_id(tid),
     m_width(tbl_arg ? tbl_arg->s->fields : 1),
-    m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0) 
+    m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
+#ifndef MCP_WL5353
+    ,m_extra_row_data(0)
+#endif
 #ifdef HAVE_REPLICATION
     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
 #endif
@@ -7159,6 +7288,34 @@ Rows_log_event::Rows_log_event(THD *thd_
       set_flags(NO_FOREIGN_KEY_CHECKS_F);
   if (thd_arg->options & OPTION_RELAXED_UNIQUE_CHECKS)
       set_flags(RELAXED_UNIQUE_CHECKS_F);
+#ifndef MCP_WL5353
+#ifndef DBUG_OFF
+  uchar extra_data[257];
+  DBUG_EXECUTE_IF("extra_row_data_set",
+                  /* Set extra row data to a known value */
+                  set_extra_data(thd_arg, extra_data););
+#endif
+  if (thd_arg->binlog_row_event_extra_data)
+  {
+    /* Copy Extra data from thd into new event */
+    uint16 extra_data_len=
+      EXTRA_ROW_INFO_HDR_BYTES +
+      thd_arg->get_binlog_row_event_extra_data_len();
+
+    m_extra_row_data= (uchar*) my_malloc(extra_data_len, MYF(MY_WME));
+
+    if (likely(m_extra_row_data))
+    {
+      memcpy(m_extra_row_data, thd_arg->binlog_row_event_extra_data,
+             extra_data_len);
+      set_flags(EXTRA_ROW_EV_DATA_F);
+    }
+  }
+
+  DBUG_EXECUTE_IF("extra_row_data_set",
+                  thd_arg->binlog_row_event_extra_data = NULL;);
+#endif // #ifndef MCP_WL5353
+
   /* if bitmap_init fails, caught in is_valid() */
   if (likely(!bitmap_init(&m_cols,
                           m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
@@ -7190,6 +7347,9 @@ Rows_log_event::Rows_log_event(const cha
     m_table(NULL),
 #endif
     m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
+#ifndef MCP_WL5353
+    ,m_extra_row_data(0)
+#endif
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
     , m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
 #endif
@@ -7219,8 +7379,35 @@ Rows_log_event::Rows_log_event(const cha
 
   m_flags= uint2korr(post_start);
 
+#ifndef MCP_WL5353
+  uint16 extra_data_len= 0;
+  if ((m_flags & EXTRA_ROW_EV_DATA_F))
+  {
+    const uchar* extra_data_start= (const uchar*) post_start + 2;
+    extra_data_len=  EXTRA_ROW_INFO_HDR_BYTES +
+      extra_data_start[EXTRA_ROW_INFO_LEN_OFFSET];
+    assert(m_extra_row_data == 0);
+    DBUG_PRINT("debug", ("extra_data_len = %u",
+                         extra_data_len));
+
+    m_extra_row_data= (uchar*) my_malloc(extra_data_len,
+                                         MYF(MY_WME));
+    if (likely(m_extra_row_data))
+    {
+      memcpy(m_extra_row_data, extra_data_start, extra_data_len);
+    }
+  }
+  DBUG_EXECUTE_IF("extra_row_data_check",
+                  /* Check extra data has expected value */
+                  check_extra_data(m_extra_row_data););
+#endif // #ifndef MCP_WL5353
+
   uchar const *const var_start=
-    (const uchar *)buf + common_header_len + post_header_len;
+    (const uchar *)buf + common_header_len + post_header_len
+#ifndef MCP_WL5353
+    + extra_data_len
+#endif
+    ;
   uchar const *const ptr_width= var_start;
   uchar *ptr_after_width= (uchar*) ptr_width;
   DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
@@ -7300,6 +7487,9 @@ Rows_log_event::~Rows_log_event()
     m_cols.bitmap= 0; // so no my_free in bitmap_free
   bitmap_free(&m_cols); // To pair with bitmap_init().
   my_free((uchar*)m_rows_buf, MYF(MY_ALLOW_ZERO_PTR));
+#ifndef MCP_WL5353
+  my_free((uchar*)m_extra_row_data, MYF(MY_ALLOW_ZERO_PTR));
+#endif
 }
 
 int Rows_log_event::get_data_size()
@@ -7316,6 +7506,12 @@ int Rows_log_event::get_data_size()
   int data_size= ROWS_HEADER_LEN;
   data_size+= no_bytes_in_map(&m_cols);
   data_size+= (uint) (end - buf);
+#ifndef MCP_WL5353
+  data_size+= m_extra_row_data ?
+    EXTRA_ROW_INFO_HDR_BYTES +
+    m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET] :
+    0;
+#endif
 
   if (type_code == UPDATE_ROWS_EVENT)
     data_size+= no_bytes_in_map(&m_cols_ai);
@@ -7466,6 +7662,14 @@ int Rows_log_event::do_apply_event(Relay
         thd->options|= OPTION_RELAXED_UNIQUE_CHECKS;
     else
         thd->options&= ~OPTION_RELAXED_UNIQUE_CHECKS;
+
+#ifndef MCP_WL5353
+    if (get_flags(EXTRA_ROW_EV_DATA_F))
+        thd->binlog_row_event_extra_data = m_extra_row_data;
+    else
+        thd->binlog_row_event_extra_data = NULL;
+#endif
+
     /* A small test to verify that objects have consistent types */
     DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
@@ -7596,6 +7800,12 @@ int Rows_log_event::do_apply_event(Relay
     else
       thd->options&= ~OPTION_ALLOW_BATCH;
 #endif
+#ifndef MCP_WL5353
+    if (get_flags(EXTRA_ROW_EV_DATA_F))
+        thd->binlog_row_event_extra_data = m_extra_row_data;
+    else
+        thd->binlog_row_event_extra_data = NULL;
+#endif
     /* A small test to verify that objects have consistent types */
     DBUG_ASSERT(sizeof(thd->options) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
 
@@ -7915,7 +8125,23 @@ bool Rows_log_event::write_data_header(I
                   });
   int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
   int2store(buf + RW_FLAGS_OFFSET, m_flags);
+#ifndef MCP_WL5353
+  int rc = my_b_safe_write(file, buf, ROWS_HEADER_LEN);
+
+  if ((rc == 0) &&
+      (m_flags & EXTRA_ROW_EV_DATA_F))
+  {
+    /* Write extra row data */
+    rc = my_b_safe_write(file, m_extra_row_data,
+                         EXTRA_ROW_INFO_HDR_BYTES +
+                         m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET]);
+  }
+
+  /* Function returns bool, where false(0) is success :( */
+  return (rc != 0);
+#else
   return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+#endif
 }
 
 bool Rows_log_event::write_data_body(IO_CACHE*file)

=== modified file 'sql/log_event.h'
--- a/sql/log_event.h	2011-06-30 15:59:25 +0000
+++ b/sql/log_event.h	2011-09-06 10:23:37 +0000
@@ -3509,6 +3509,13 @@ public:
       values for all columns of the table.
      */
     COMPLETE_ROWS_F = (1U << 3)
+
+#ifndef MCP_WL5353
+    /**
+       Indicates that additional information was appended to the event.
+    */
+    ,EXTRA_ROW_EV_DATA_F = (1U << 4)
+#endif
   };
 
   typedef uint16 flag_set;
@@ -3572,6 +3579,10 @@ public:
 
   uint     m_row_count;         /* The number of rows added to the event */
 
+#ifndef MCP_WL5353
+  const uchar* get_extra_row_data() const   { return m_extra_row_data; }
+#endif
+
 protected:
   /* 
      The constructors are protected since you're supposed to inherit
@@ -3620,6 +3631,11 @@ protected:
 
   flag_set m_flags;		/* Flags for row-level events */
 
+#ifndef MCP_WL5353
+  uchar    *m_extra_row_data;   /* Pointer to extra row data if any */
+                                /* If non null, first byte is length */
+#endif
+
   /* helper functions */
 
 #if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)

=== added file 'sql/ndb_conflict_trans.cc'
--- a/sql/ndb_conflict_trans.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict_trans.cc	2011-09-06 10:23:37 +0000
@@ -0,0 +1,778 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "ndb_conflict_trans.h"
+
+#ifdef HAVE_NDB_BINLOG
+#include "my_sys.h"
+#include "my_base.h"
+
+/* Whether to track all transactions, or just
+ * 'interesting' ones
+ */
+#define TRACK_ALL_TRANSACTIONS 0
+
+/* Whether to check the transaction graph for
+ * correctness at runtime
+ */
+#define CHECK_TRANS_GRAPH 0
+
+/* st_row_event_key_info implementation */
+
+st_row_event_key_info::
+st_row_event_key_info(const NdbDictionary::Table* _table,
+                      const uchar* _key_buff,
+                      Uint32 _key_buff_len,
+                      Uint64 _transaction_id):
+  tableObj(_table),
+  packed_key(_key_buff),
+  packed_key_len(_key_buff_len),
+  transaction_id(_transaction_id),
+  hash_next(NULL)
+{
+}
+
+Uint64
+st_row_event_key_info::getTransactionId() const
+{
+  return transaction_id;
+}
+
+void
+st_row_event_key_info::updateRowTransactionId(Uint64 mostRecentTransId)
+{
+  transaction_id = mostRecentTransId;
+}
+
+Uint32
+st_row_event_key_info::hashValue() const
+{
+  /* Include Table Object Id + primary key */
+  Uint32 h = (17 * 37) + tableObj->getObjectId();
+  for (Uint32 i=0; i < packed_key_len; i++)
+    h = (37 * h) + packed_key[i];
+  return h;
+}
+
+bool
+st_row_event_key_info::equal(const st_row_event_key_info* other) const
+{
+  /* Check same table + same PK */
+  return
+    ((tableObj == other->tableObj) &&
+     (packed_key_len == other->packed_key_len) &&
+     (memcmp(packed_key, other->packed_key, packed_key_len) == 0));
+};
+
+st_row_event_key_info*
+st_row_event_key_info::getNext() const
+{
+  return hash_next;
+};
+
+void
+st_row_event_key_info::setNext(st_row_event_key_info* _next)
+{
+  hash_next = _next;
+};
+
+
+/* st_trans_dependency implementation */
+
+st_trans_dependency::
+st_trans_dependency(st_transaction* _target_transaction,
+                    st_transaction* _dependent_transaction,
+                    const st_trans_dependency* _next)
+  : target_transaction(_target_transaction),
+    dependent_transaction(_dependent_transaction),
+    next_entry(_next),
+    hash_next(NULL)
+{
+};
+
+st_transaction*
+st_trans_dependency::getTargetTransaction() const
+{
+  return target_transaction;
+}
+
+st_transaction*
+st_trans_dependency::getDependentTransaction() const
+{
+  return dependent_transaction;
+}
+
+const st_trans_dependency*
+st_trans_dependency::getNextDependency() const
+{
+  return next_entry;
+}
+
+Uint32
+st_trans_dependency::hashValue() const
+{
+  /* Hash the ptrs in a rather nasty way */
+  UintPtr p =
+    ((UintPtr) target_transaction) ^
+    ((UintPtr) dependent_transaction);;
+
+  if (sizeof(p) == 8)
+  {
+    /* Xor two words of 64 bit ptr */
+    p =
+      (p & 0xffffffff) ^
+      ((((Uint64) p) >> 32) & 0xffffffff);
+  }
+
+  return 17 + (37 * (Uint32)p);
+};
+
+bool
+st_trans_dependency::equal(const st_trans_dependency* other) const
+{
+  return ((target_transaction == other->target_transaction) &&
+          (dependent_transaction == other->dependent_transaction));
+};
+
+st_trans_dependency*
+st_trans_dependency::getNext() const
+{
+  return hash_next;
+};
+
+void
+st_trans_dependency::setNext(st_trans_dependency* _next)
+{
+  hash_next = _next;
+};
+
+
+/* st_transaction implementation */
+
+st_transaction::st_transaction(Uint64 _transaction_id)
+  : transaction_id(_transaction_id),
+    in_conflict(false),
+    dependency_list_head(NULL),
+    hash_next(NULL)
+{
+};
+
+Uint64
+st_transaction::getTransactionId() const
+{
+  return transaction_id;
+}
+
+bool
+st_transaction::getInConflict() const
+{
+  return in_conflict;
+}
+
+void
+st_transaction::setInConflict()
+{
+  in_conflict = true;
+}
+
+const st_trans_dependency*
+st_transaction::getDependencyListHead() const
+{
+  return dependency_list_head;
+}
+
+void
+st_transaction::setDependencyListHead(st_trans_dependency* head)
+{
+  dependency_list_head = head;
+}
+
+/* Hash Api */
+Uint32
+st_transaction::hashValue() const
+{
+  return 17 + (37 * ((transaction_id & 0xffffffff) ^
+                     (transaction_id >> 32 & 0xffffffff)));
+};
+
+bool
+st_transaction::equal(const st_transaction* other) const
+{
+  return transaction_id == other->transaction_id;
+};
+
+st_transaction*
+st_transaction::getNext() const
+{
+    return hash_next;
+};
+
+void
+st_transaction::setNext(st_transaction* _next)
+{
+  hash_next = _next;
+};
+
+
+
+/*
+   Unique HashMap(Set) of st_row_event_key_info ptrs, with bucket storage
+   allocated from st_mem_root_allocator
+*/
+template class HashMap2<st_row_event_key_info, true, st_mem_root_allocator>;
+template class HashMap2<st_transaction, true, st_mem_root_allocator>;
+template class HashMap2<st_trans_dependency, true, st_mem_root_allocator>;
+template class LinkedStack<Uint64, st_mem_root_allocator>;
+
+
+/**
+ * pack_key_to_buffer
+ *
+ * Given a table, key_record and record, this method will
+ * determine how many significant bytes the key contains,
+ * and if a buffer is passed, will copy the bytes into the
+ * buffer.
+ */
+static
+int
+pack_key_to_buffer(const NdbDictionary::Table* table,
+                   const NdbRecord* key_rec,
+                   const uchar* record,
+                   uchar* buffer,
+                   Uint32& buff_len)
+{
+  /* Loop over attributes in key record, determining their actual
+   * size based on column type and contents of record
+   * If buffer supplied, copy them contiguously to buffer
+   * return total length
+   */
+  Uint32 attr_id;
+  Uint32 buff_offset = 0;
+  NdbDictionary::getFirstAttrId(key_rec, attr_id);
+
+  do
+  {
+    Uint32 from_offset = 0;
+    Uint32 byte_len = 0;
+    const NdbDictionary::Column* key_col = table->getColumn(attr_id);
+    NdbDictionary::getOffset(key_rec, attr_id, from_offset);
+    assert( ! NdbDictionary::isNull(key_rec, (const char*) record, attr_id));
+
+    switch(key_col->getArrayType())
+    {
+    case NDB_ARRAYTYPE_FIXED:
+      byte_len = key_col->getSizeInBytes();
+      break;
+    case NDB_ARRAYTYPE_SHORT_VAR:
+      byte_len = record[from_offset];
+      from_offset++;
+      break;
+    case NDB_ARRAYTYPE_MEDIUM_VAR:
+      byte_len = uint2korr(&record[from_offset]);
+      from_offset+= 2;
+      break;
+    };
+    assert( (buff_offset + byte_len) <= buff_len );
+
+    if (buffer)
+      memcpy(&buffer[buff_offset], &record[from_offset], byte_len);
+
+    buff_offset+= byte_len;
+  } while (NdbDictionary::getNextAttrId(key_rec, attr_id));
+
+  buff_len = buff_offset;
+  return 0;
+};
+
+static
+Uint32 determine_packed_key_size(const NdbDictionary::Table* table,
+                                 const NdbRecord* key_rec,
+                                 const uchar* record)
+{
+  Uint32 key_size = ~Uint32(0);
+  /* Use pack_key_to_buffer to calculate length required */
+  pack_key_to_buffer(table,
+                     key_rec,
+                     record,
+                     NULL,
+                     key_size);
+  return key_size;
+};
+
+/* st_mem_root_allocator implementation */
+void*
+st_mem_root_allocator::alloc(void* ctx, size_t bytes)
+{
+  st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
+  return alloc_root(a->mem_root, bytes);
+};
+
+void*
+st_mem_root_allocator::calloc(void* ctx, size_t nelem, size_t bytes)
+{
+  st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
+  return alloc_root(a->mem_root,
+                    nelem * bytes);
+}
+
+void
+st_mem_root_allocator::free(void* ctx, void* mem)
+{
+  /* Do nothing, will be globally freed when arena (mem_root)
+   * released
+   */
+};
+
+st_mem_root_allocator::st_mem_root_allocator(MEM_ROOT* _mem_root)
+  : mem_root(_mem_root)
+{
+};
+
+
+/* DependencyTracker implementation */
+
+DependencyTracker*
+DependencyTracker::newDependencyTracker(MEM_ROOT* mem_root)
+{
+  DependencyTracker* dt = NULL;
+  void* mem = alloc_root(mem_root,
+                         sizeof(DependencyTracker));
+  if (mem)
+  {
+    dt = new (mem) DependencyTracker(mem_root);
+  }
+
+  return dt;
+}
+
+
+DependencyTracker::
+DependencyTracker(MEM_ROOT* mem_root)
+  : mra(mem_root), key_hash(&mra), trans_hash(&mra),
+    dependency_hash(&mra),
+    iteratorTodo(ITERATOR_STACK_BLOCKSIZE, &mra),
+    conflicting_trans_count(0),
+    error_text(NULL)
+{
+  /* TODO Get sizes from somewhere */
+  key_hash.setSize(1024);
+  trans_hash.setSize(100);
+  dependency_hash.setSize(100);
+};
+
+
+int
+DependencyTracker::
+track_operation(const NdbDictionary::Table* table,
+                const NdbRecord* key_rec,
+                const uchar* row,
+                Uint64 transaction_id)
+{
+  DBUG_ENTER("track_operation");
+
+  Uint32 required_buff_size = determine_packed_key_size(table,
+                                                        key_rec,
+                                                        row);
+  DBUG_PRINT("info", ("Required length for key : %u", required_buff_size));
+
+  /* Alloc space for packed key and struct*/
+  uchar* packed_key_buff = (uchar*) alloc_root(mra.mem_root,
+                                               required_buff_size);
+  void* element_mem = alloc_root(mra.mem_root,
+                                 sizeof(st_row_event_key_info));
+
+  if (pack_key_to_buffer(table,
+                         key_rec,
+                         row,
+                         packed_key_buff,
+                         required_buff_size))
+  {
+    DBUG_RETURN(-1);
+  }
+
+  if (TRACK_ALL_TRANSACTIONS)
+  {
+    st_transaction* transEntry = get_or_create_transaction(transaction_id);
+    if (!transEntry)
+    {
+      error_text = "track_operation : Failed to get or create transaction";
+      DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+    }
+  }
+
+  st_row_event_key_info* key_info = new (element_mem)
+    st_row_event_key_info(table,
+                          packed_key_buff,
+                          required_buff_size,
+                          transaction_id);
+
+  /* Now try to add element to hash */
+  if (! key_hash.add(key_info))
+  {
+    /*
+      Already an element in the keyhash with this primary key
+      If it's for the same transaction then ignore, otherwise
+      it's an inter-transaction dependency
+    */
+    st_row_event_key_info* existing = key_hash.get(key_info);
+
+    Uint64 existingTransIdOnRow = existing->getTransactionId();
+    Uint64 newTransIdOnRow = key_info->getTransactionId();
+
+    if (existingTransIdOnRow != newTransIdOnRow)
+    {
+      int res = add_dependency(existingTransIdOnRow,
+                               newTransIdOnRow);
+      /*
+        Update stored transaction_id to be latest for key.
+        Further key operations on this row will depend on this
+        transaction, and transitively on the previous
+        transaction.
+      */
+      existing->updateRowTransactionId(newTransIdOnRow);
+
+      DBUG_RETURN(res);
+    }
+    else
+    {
+      /*
+         How can we have two updates to the same row with the
+         same transaction id?  Only if the transaction id
+         is invalid (e.g. not set)
+      */
+      if (existingTransIdOnRow != InvalidTransactionId)
+      {
+        assert(false);
+        DBUG_RETURN(-1);
+      }
+    }
+  }
+
+  DBUG_RETURN(0);
+};
+
+int
+DependencyTracker::
+mark_conflict(Uint64 trans_id)
+{
+  DBUG_ENTER("mark_conflict");
+  DBUG_PRINT("info", ("trans_id : %llu", trans_id));
+
+  st_transaction* entry = get_or_create_transaction(trans_id);
+  if (!entry)
+  {
+    error_text = "mark_conflict : get_or_create_transaction() failure";
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+
+  if (entry->getInConflict())
+  {
+    /* Nothing to do here */
+    DBUG_RETURN(0);
+  }
+
+  /* Have entry, mark it, and any dependents */
+  bool fetch_node_dependents;
+  st_transaction* dependent = entry;
+  reset_dependency_iterator();
+  do
+  {
+    DBUG_PRINT("info", ("Visiting transaction %llu, conflict : %u",
+                        dependent->getTransactionId(),
+                        dependent->getInConflict()));
+    /*
+      If marked already, don't fetch dependents, as
+      they will also be marked already
+    */
+    fetch_node_dependents = false;
+
+    if (!dependent->getInConflict())
+    {
+      dependent->setInConflict();
+      conflicting_trans_count++;
+      fetch_node_dependents = true;
+    }
+  } while ((dependent = get_next_dependency(dependent, fetch_node_dependents)));
+
+  assert( verify_graph() );
+
+  DBUG_RETURN(0);
+}
+
+bool
+DependencyTracker::in_conflict(Uint64 trans_id)
+{
+  DBUG_ENTER("in_conflict");
+  DBUG_PRINT("info", ("trans_id %llu", trans_id));
+  st_transaction key(trans_id);
+  const st_transaction* entry = NULL;
+
+  /*
+    If transaction hash entry exists, check it for
+    conflicts.  If it doesn't exist, no conflict
+  */
+  if ((entry = trans_hash.get(&key)))
+  {
+    DBUG_PRINT("info", ("in_conflict : %u", entry->getInConflict()));
+    DBUG_RETURN(entry->getInConflict());
+  }
+  else
+  {
+    assert(! TRACK_ALL_TRANSACTIONS);
+  }
+  DBUG_RETURN(false);
+};
+
+st_transaction*
+DependencyTracker::
+get_or_create_transaction(Uint64 trans_id)
+{
+  DBUG_ENTER("get_or_create_transaction");
+  st_transaction transKey(trans_id);
+  st_transaction* transEntry = NULL;
+
+  if (! (transEntry = trans_hash.get(&transKey)))
+  {
+    /*
+       Transaction does not exist.  Allocate it
+       and add to the hash
+    */
+    DBUG_PRINT("info", ("Creating new hash entry for transaction (%llu)",
+                        trans_id));
+
+    transEntry = (st_transaction*)
+      st_mem_root_allocator::alloc(&mra, sizeof(st_transaction));
+
+    if (transEntry)
+    {
+      new (transEntry) st_transaction(trans_id);
+
+      if (!trans_hash.add(transEntry))
+      {
+        st_mem_root_allocator::free(&mra, transEntry); /* For show */
+        transEntry = NULL;
+      }
+    }
+  }
+
+  DBUG_RETURN(transEntry);
+}
+
+int
+DependencyTracker::
+add_dependency(Uint64 trans_id, Uint64 dependent_trans_id)
+{
+  DBUG_ENTER("add_dependency");
+  DBUG_PRINT("info", ("Recording dependency of %llu on %llu",
+                      dependent_trans_id, trans_id));
+  st_transaction* targetEntry = get_or_create_transaction(trans_id);
+  if (!targetEntry)
+  {
+    error_text = "add_dependency : Failed get_or_create_transaction";
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+
+  st_transaction* dependentEntry = get_or_create_transaction(dependent_trans_id);
+  if (!dependentEntry)
+  {
+    error_text = "add_dependency : Failed get_or_create_transaction";
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+
+  /* Now lookup dependency.  Add it if not already present */
+  st_trans_dependency depKey(targetEntry, dependentEntry, NULL);
+  st_trans_dependency* dep = NULL;
+  if (! (dep = dependency_hash.get(&depKey)))
+  {
+    DBUG_PRINT("info", ("Creating new dependency hash entry for "
+                        "dependency of %llu on %llu.",
+                        dependentEntry->getTransactionId(),
+                        targetEntry->getTransactionId()));
+
+    dep = (st_trans_dependency*)
+      st_mem_root_allocator::alloc(&mra, sizeof(st_trans_dependency));
+
+    new (dep) st_trans_dependency(targetEntry, dependentEntry,
+                                  targetEntry->getDependencyListHead());
+
+    targetEntry->setDependencyListHead(dep);
+
+    /* New dependency, propagate in_conflict if necessary */
+    if (targetEntry->getInConflict())
+    {
+      DBUG_PRINT("info", ("Marking new dependent as in-conflict"));
+      DBUG_RETURN(mark_conflict(dependentEntry->getTransactionId()));
+    }
+  }
+
+  assert(verify_graph());
+
+  DBUG_RETURN(0);
+};
+
+void
+DependencyTracker::
+reset_dependency_iterator()
+{
+  iteratorTodo.reset();
+};
+
+st_transaction*
+DependencyTracker::
+get_next_dependency(const st_transaction* current,
+                    bool include_dependents_of_current)
+{
+  DBUG_ENTER("get_next_dependency");
+  DBUG_PRINT("info", ("node : %llu", current->getTransactionId()));
+  /*
+    Depth first traverse, with option to ignore sub graphs.
+  */
+  if (include_dependents_of_current)
+  {
+    /* Add dependents to stack */
+    const st_trans_dependency* dependency = current->getDependencyListHead();
+
+    while (dependency)
+    {
+      assert(dependency->getTargetTransaction() == current);
+      DBUG_PRINT("info", ("Adding dependency %llu->%llu",
+                          dependency->getDependentTransaction()->getTransactionId(),
+                          dependency->getTargetTransaction()->getTransactionId()));
+
+      Uint64 dependentTransactionId =
+        dependency->getDependentTransaction()->getTransactionId();
+      iteratorTodo.push(dependentTransactionId);
+      dependency= dependency->getNextDependency();
+    }
+  }
+
+  Uint64 nextId;
+  if (iteratorTodo.pop(nextId))
+  {
+    DBUG_PRINT("info", ("Returning transaction id %llu",
+                        nextId));
+    st_transaction key(nextId);
+    st_transaction* dependent = trans_hash.get(&key);
+    assert(dependent);
+    DBUG_RETURN(dependent);
+  }
+
+  assert(iteratorTodo.size() == 0);
+  DBUG_PRINT("info", ("No more dependencies to visit"));
+  DBUG_RETURN(NULL);
+};
+
+void
+DependencyTracker::
+dump_dependents(Uint64 trans_id)
+{
+  fprintf(stderr, "Dumping dependents of transid %llu : ", trans_id);
+
+  st_transaction key(trans_id);
+  const st_transaction* dependent = NULL;
+
+  if ((dependent = trans_hash.get(&key)))
+  {
+    reset_dependency_iterator();
+    const char* comma = ", ";
+    const char* sep = "";
+    do
+    {
+      {
+        fprintf(stderr, "%s%llu%s", sep, dependent->getTransactionId(),
+                (dependent->getInConflict()?"-C":""));
+        sep = comma;
+      }
+    } while ((dependent = get_next_dependency(dependent)));
+    fprintf(stderr, "\n");
+  }
+  else
+  {
+    fprintf(stderr, "None\n");
+  }
+};
+
+bool
+DependencyTracker::
+verify_graph()
+{
+  if (! CHECK_TRANS_GRAPH)
+    return true;
+
+  /*
+     Check the graph structure obeys its invariants
+
+     1) There are no cycles in the graph such that
+        a transaction is a dependent of itself
+
+     2) If a transaction is marked in_conflict, all
+        of its dependents (transitively), are also
+        marked in conflict
+
+     This is expensive to verify, so not always on
+  */
+  HashMap2<st_transaction, true, st_mem_root_allocator>::Iterator it(trans_hash);
+
+  st_transaction* root = NULL;
+
+  while ((root = it.next()))
+  {
+    bool in_conflict = root->getInConflict();
+
+    /* Now visit all dependents */
+    st_transaction* dependent = root;
+    reset_dependency_iterator();
+
+    while((dependent = get_next_dependency(dependent, true)))
+    {
+      if (dependent == root)
+      {
+        /* Must exit, or we'll be here forever */
+        fprintf(stderr, "Error : Cycle discovered in graph\n");
+        abort();
+        return false;
+      }
+
+      if (in_conflict &&
+          ! dependent->getInConflict())
+      {
+        fprintf(stderr, "Error : Dependent transaction not marked in-conflict\n");
+        abort();
+        return false;
+      }
+    }
+  }
+
+  return true;
+}
+
+
+const char*
+DependencyTracker::get_error_text() const
+{
+  return error_text;
+};
+
+Uint32
+DependencyTracker::get_conflict_count() const
+{
+  return conflicting_trans_count;
+}
+
+/* #ifdef HAVE_NDB_BINLOG */
+
+#endif

=== added file 'sql/ndb_conflict_trans.h'
--- a/sql/ndb_conflict_trans.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict_trans.h	2011-09-06 10:23:37 +0000
@@ -0,0 +1,337 @@
+/*
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_CONFLICT_TRANS_H
+#define NDB_CONFLICT_TRANS_H
+
+#include <my_global.h>
+
+#ifdef HAVE_NDB_BINLOG
+#include <ndbapi/NdbApi.hpp>
+#include <util/HashMap2.hpp>
+#include <util/LinkedStack.hpp>
+
+/*
+   This file defines structures for detecting dependencies between
+   transactions based on the rows they update.
+   It is used when applying row updates as part of the MySQLD slave.
+*/
+
+/**
+ * st_row_event_key_info
+ *
+ * This struct describes a row event applied by the Slave, based
+ * on its table, key and transaction id.
+ * Instances of this struct are placed in a hash structure where
+ * the {table, key} are the key, and the transaction id is the
+ * 'data'.
+ * This hash is used to detect when different transactions in an
+ * epoch affect the same row, which implies a dependency between
+ * the transactions.
+ */
+class st_row_event_key_info
+{
+public:
+  /**
+   * User api
+   */
+  st_row_event_key_info(const NdbDictionary::Table* _table,
+                        const uchar* _key_buff,
+                        Uint32 _key_buff_len,
+                        Uint64 _transaction_id);
+  Uint64 getTransactionId() const;
+  void updateRowTransactionId(Uint64 mostRecentTransId);
+
+  /**
+   * Hash Api
+   */
+  Uint32 hashValue() const;
+  bool equal(const st_row_event_key_info* other) const;
+  st_row_event_key_info* getNext() const;
+  void setNext(st_row_event_key_info* _next);
+
+private:
+  /* Key : Table and Primary Key */
+  const NdbDictionary::Table* tableObj;
+  const uchar* packed_key;
+  Uint32 packed_key_len;
+
+  /* Data : Transaction id */
+  Uint64 transaction_id;
+
+  /* Next ptr for hash */
+  st_row_event_key_info* hash_next;
+};
+
+
+class st_transaction;
+
+/**
+   st_trans_dependency
+   Entry in dependency hash.
+   Describes inter-transaction dependency, and comprises part
+   of list of other dependents of target_transaction
+*/
+class st_trans_dependency
+{
+public:
+  /* User Api */
+  st_trans_dependency(st_transaction* _target_transaction,
+                      st_transaction* _dependent_transaction,
+                      const st_trans_dependency* _next);
+
+  st_transaction* getTargetTransaction() const;
+  st_transaction* getDependentTransaction() const;
+  const st_trans_dependency* getNextDependency() const;
+
+
+  /* Hash Api */
+  Uint32 hashValue() const;
+  bool equal(const st_trans_dependency* other) const;
+  st_trans_dependency* getNext() const;
+  void setNext(st_trans_dependency* _next);
+
+private:
+  /* Key */
+  st_transaction* target_transaction;
+  st_transaction* dependent_transaction;
+
+  /* Rest of co-dependents of target_transaction */
+  const st_trans_dependency* next_entry;
+
+  st_trans_dependency* hash_next;
+};
+
+
+
+/**
+   st_transaction
+   Entry in transaction hash, indicates whether transaction
+   is in conflict, and has list of dependents
+*/
+class st_transaction
+{
+public:
+  /* User Api */
+  st_transaction(Uint64 _transaction_id);
+
+  Uint64 getTransactionId() const;
+  bool getInConflict() const;
+  void setInConflict();
+  const st_trans_dependency* getDependencyListHead() const;
+  void setDependencyListHead(st_trans_dependency* head);
+
+  /* Hash Api */
+  Uint32 hashValue() const;
+  bool equal(const st_transaction* other) const;
+  st_transaction* getNext() const;
+  void setNext(st_transaction* _next);
+
+private:
+  /* Key */
+  Uint64 transaction_id;
+
+  /* Data */
+  /* Is this transaction (and therefore its dependents) in conflict? */
+  bool in_conflict;
+  /* Head of list of dependencies */
+  st_trans_dependency* dependency_list_head;
+
+  /* Hash ptr */
+  st_transaction* hash_next;
+};
+
+typedef struct st_mem_root MEM_ROOT;
+
+/**
+ * Allocator type which internally uses a MySQLD mem_root
+ * Used as a template parameter for Ndb ADTs
+ */
+struct st_mem_root_allocator
+{
+  MEM_ROOT* mem_root;
+
+  static void* alloc(void* ctx, size_t bytes);
+  static void* calloc(void* ctx, size_t nelem, size_t bytes);
+  static void free(void* ctx, void* mem);
+  st_mem_root_allocator(MEM_ROOT* _mem_root);
+};
+
+
+class DependencyTracker
+{
+public:
+  static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+  /**
+     newDependencyTracker
+
+     Factory method to get a DependencyTracker object, using
+     memory from the passed mem_root.
+     To discard dependency tracker, just free the passed mem_root.
+  */
+  static DependencyTracker* newDependencyTracker(MEM_ROOT* mem_root);
+
+  /**
+     track_operation
+
+     This method records the operation on the passed
+     table + primary key as belonging to the passed
+     transaction.
+
+     If there is already a recorded operation on the
+     passed table + primary key from a different transaction
+     then a transaction dependency is recorded.
+  */
+  int track_operation(const NdbDictionary::Table* table,
+                      const NdbRecord* key_rec,
+                      const uchar* row,
+                      Uint64 transaction_id);
+
+  /**
+     mark_conflict
+
+     Record that a particular transaction is in conflict.
+     This will also mark any dependent transactions as in
+     conflict.
+  */
+  int mark_conflict(Uint64 trans_id);
+
+  /**
+     in_conflict
+
+     Returns true if the supplied transaction_id is marked as
+     in conflict
+  */
+  bool in_conflict(Uint64 trans_id);
+
+  /**
+     get_error_text
+
+     Returns string containing error description.
+     NULL if no error.
+  */
+  const char* get_error_text() const;
+
+  /**
+     get_conflict_count
+
+     Returns number of transactions marked as in-conflict
+  */
+  Uint32 get_conflict_count() const;
+
+private:
+  DependencyTracker(MEM_ROOT* mem_root);
+
+  /**
+     get_or_create_transaction
+
+     Get or create the transaction object for the
+     given transaction id.
+     Returns Null on allocation failure.
+  */
+  st_transaction* get_or_create_transaction(Uint64 trans_id);
+
+  /**
+     add_dependency
+
+     This method records a dependency between the two
+     passed transaction ids
+  */
+  int add_dependency(Uint64 trans_id, Uint64 dependent_trans_id);
+
+  /**
+     reset_dependency_iterator
+
+     Reset dependency iterator.
+     Required before using get_next_dependency()
+  */
+  void reset_dependency_iterator();
+
+  /**
+     get_next_dependency
+     Gets next dependency in dependency graph.
+     Performs breadth first search from start node.
+
+     include_dependents_of_current = false causes the traversal to skip
+     dependents of the current node.
+  */
+  st_transaction* get_next_dependency(const st_transaction* current,
+                                      bool include_dependents_of_current = true);
+
+  /**
+     dump_dependents
+
+     Debugging function
+  */
+  void dump_dependents(Uint64 trans_id);
+
+  /**
+     verify_graph
+
+     Internal invariant checking function.
+  */
+  bool verify_graph();
+
+  /* MemRoot allocator class instance */
+  st_mem_root_allocator mra;
+
+  /*
+     key_hash
+     Map of {Table, PK} -> TransID
+     Used to find inter-transaction dependencies
+     Attempt to add duplicate entry to the key_hash indicates
+     transaction dependency from existing entry to duplicate.
+  */
+  HashMap2<st_row_event_key_info, true, st_mem_root_allocator> key_hash;
+
+  /*
+     trans_hash
+     Map of {TransId} -> {in_conflict, List of dependents}
+     Used to record which transactions are in-conflict, and what
+     their dependencies are.
+     Transactions not marked in-conflict, and with no dependencies or
+     dependents, are not placed in this hash.
+   */
+  HashMap2<st_transaction, true, st_mem_root_allocator> trans_hash;
+
+  /*
+     dependency_hash
+     Map of {TransIdFrom, TransIdTo}
+     Used to ensure dependencies are added only once, for efficiency.
+     Elements are linked from the trans_hash entry for TransIdFrom.
+   */
+  HashMap2<st_trans_dependency, true, st_mem_root_allocator> dependency_hash;
+
+  /*
+     iteratorTodo
+     Stack of transaction Ids to be visited during breadth first search
+     when marking dependents as in conflict.
+  */
+  static const Uint32 ITERATOR_STACK_BLOCKSIZE = 10;
+  LinkedStack<Uint64, st_mem_root_allocator> iteratorTodo;
+
+  Uint32 conflicting_trans_count;
+
+  const char* error_text;
+};
+
+/* ifdef HAVE_NDB_BINLOG */
+#endif
+
+// #ifndef NDB_CONFLICT_TRANS_H
+#endif

=== modified file 'sql/ndb_mi.cc'
--- a/sql/ndb_mi.cc	2011-06-29 23:28:01 +0000
+++ b/sql/ndb_mi.cc	2011-09-06 10:23:37 +0000
@@ -81,6 +81,16 @@ bool ndb_mi_get_in_relay_log_statement(R
   return (rli->get_flag(Relay_log_info::IN_STMT) != 0);
 }
 
+ulong ndb_mi_get_relay_log_trans_retries()
+{
+  return active_mi->rli.trans_retries;
+}
+
+void ndb_mi_set_relay_log_trans_retries(ulong number)
+{
+  active_mi->rli.trans_retries = number;
+}
+
 /* #ifdef HAVE_NDB_BINLOG */
 
 #endif

=== modified file 'sql/ndb_mi.h'
--- a/sql/ndb_mi.h	2011-06-29 23:28:01 +0000
+++ b/sql/ndb_mi.h	2011-09-06 10:23:37 +0000
@@ -42,6 +42,8 @@ uint32 ndb_mi_get_slave_run_id();
    Relay log info related functions
 */
 bool ndb_mi_get_in_relay_log_statement(class Relay_log_info* rli);
+ulong ndb_mi_get_relay_log_trans_retries();
+void ndb_mi_set_relay_log_trans_retries(ulong number);
 
 // #ifndef NDB_MI_H
 #endif

=== modified file 'sql/rpl_constants.h'
--- a/sql/rpl_constants.h	2011-06-30 15:37:13 +0000
+++ b/sql/rpl_constants.h	2011-09-06 10:23:37 +0000
@@ -31,4 +31,32 @@ enum Incident {
   INCIDENT_COUNT
 };
 
+#ifndef MCP_WL5353
+/**
+   Enumeration of the reserved formats of Binlog extra row information
+*/
+enum ExtraRowInfoFormat {
+  /** Ndb format */
+  ERIF_NDB          =   0,
+
+  /** Reserved formats  0 -> 63 inclusive */
+  ERIF_LASTRESERVED =  63,
+
+  /**
+      Available / uncontrolled formats
+      64 -> 255 inclusive
+  */
+  ERIF_OPEN1        =  64,
+  ERIF_OPEN2        =  65,
+
+  ERIF_LASTOPEN     =  255
+};
+
+/* 1 byte length, 1 byte format */
+#define EXTRA_ROW_INFO_LEN_OFFSET 0
+#define EXTRA_ROW_INFO_FORMAT_OFFSET 1
+#define EXTRA_ROW_INFO_HDR_BYTES 2
+
+#endif   // #ifndef MCP_WL5353
+
 #endif /* RPL_CONSTANTS_H */

=== modified file 'sql/slave.h'
--- a/sql/slave.h	2011-06-30 15:59:25 +0000
+++ b/sql/slave.h	2011-09-06 10:23:37 +0000
@@ -231,6 +231,12 @@ extern ulong opt_server_id_mask;
 
 extern I_List<THD> threads;
 
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+#define SLAVE_SILENT_RETRY_MSG "Slave transaction rollback requested"
+#endif
+#endif
+
 #endif /* HAVE_REPLICATION */
 
 /* masks for start/stop operations on io and sql slave threads */

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2011-07-04 16:30:34 +0000
+++ b/sql/sql_class.cc	2011-09-06 10:23:37 +0000
@@ -606,6 +606,9 @@ THD::THD()
    lock_id(&main_lock_id),
    user_time(0), in_sub_stmt(0),
    sql_log_bin_toplevel(false),
+#ifndef MCP_WL5353
+   binlog_row_event_extra_data(NULL),
+#endif
    binlog_table_maps(0), binlog_flags(0UL),
    table_map_for_update(0),
    arg_of_last_insert_id_function(FALSE),
@@ -882,6 +885,9 @@ void THD::init(void)
   reset_current_stmt_binlog_row_based();
   bzero((char *) &status_var, sizeof(status_var));
   sql_log_bin_toplevel= options & OPTION_BIN_LOG;
+#ifndef MCP_WL5353
+  binlog_row_event_extra_data = 0;
+#endif
 
 #if defined(ENABLED_DEBUG_SYNC)
   /* Initialize the Debug Sync Facility. See debug_sync.cc. */
@@ -3546,7 +3552,13 @@ THD::binlog_prepare_pending_rows_event(T
       pending->get_type_code() != type_code || 
       pending->get_data_size() + needed > opt_binlog_rows_event_max_size || 
       pending->get_width() != colcnt ||
-      !bitmap_cmp(pending->get_cols(), cols)) 
+      !bitmap_cmp(pending->get_cols(), cols)
+#ifndef MCP_WL5353
+      ||
+      !binlog_row_event_extra_data_eq(pending->get_extra_row_data(),
+                                      binlog_row_event_extra_data)
+#endif
+      )
   {
     /* Create a new RowsEventT... */
     Rows_log_event* const
@@ -4065,6 +4077,65 @@ int THD::binlog_query(THD::enum_binlog_q
   DBUG_RETURN(0);
 }
 
+#ifndef MCP_WL5353
+/**
+   get_binlog_row_event_extra_data_len
+
+   Returns the length in bytes of the current thread's
+   binlog row event extra data, if present.
+   The length is stored at some offset from the extra
+   data ptr.
+   Note that this length is the length of the extra
+   data 'payload'.  There is also a fixed size extra data
+   header.
+
+   @return
+     Length in bytes of the payload of extra data.
+     Zero is valid.  Maximum is 255
+*/
+uint8
+THD::get_binlog_row_event_extra_data_len() const
+{
+  return (binlog_row_event_extra_data?
+          binlog_row_event_extra_data[EXTRA_ROW_INFO_LEN_OFFSET]:
+          0);
+};
+
+/**
+   binlog_row_event_extra_data_eq
+
+   Comparator for two binlog row event extra data
+   pointers.
+
+   It compares their significant bytes.
+
+   Null pointers are acceptable
+
+   @param a
+     first pointer
+
+   @param b
+     first pointer
+
+   @return
+     true if the referenced structures are equal
+*/
+bool
+THD::binlog_row_event_extra_data_eq(const uchar* a,
+                                    const uchar* b)
+{
+  return ((a == b) ||
+          ((a != NULL) &&
+           (b != NULL) &&
+           (a[EXTRA_ROW_INFO_LEN_OFFSET] ==
+            b[EXTRA_ROW_INFO_LEN_OFFSET]) &&
+           (memcmp(a, b,
+                   EXTRA_ROW_INFO_HDR_BYTES +
+                   a[EXTRA_ROW_INFO_LEN_OFFSET]) == 0)));
+}
+
+#endif  // #ifndef MCP_WL5353
+
 bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
                                  ulonglong incr)
 {

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-07-04 16:30:34 +0000
+++ b/sql/sql_class.h	2011-09-06 10:23:37 +0000
@@ -1435,6 +1435,18 @@ public:
   /* container for handler's private per-connection data */
   Ha_data ha_data[MAX_HA];
 
+#ifndef MCP_WL5353
+  /*
+     Ptr to row event extra data to be written to Binlog /
+     received from Binlog.
+
+   */
+  uchar* binlog_row_event_extra_data;
+  uint8  get_binlog_row_event_extra_data_len() const;
+  static bool binlog_row_event_extra_data_eq(const uchar* a,
+                                             const uchar* b);
+#endif
+
 #ifndef MYSQL_CLIENT
   int binlog_setup_trx_data();
 

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2011-07-04 16:30:34 +0000
+++ b/storage/ndb/CMakeLists.txt	2011-09-06 10:23:37 +0000
@@ -177,7 +177,8 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ha_ndbcluster_binlog.cc
   ../../sql/ha_ndb_index_stat.cc
   ../../sql/ha_ndbinfo.cc
-  ../../sql/ndb_mi.cc)
+  ../../sql/ndb_mi.cc
+  ../../sql/ndb_conflict_trans.cc)
 INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
 
 IF(EXISTS ${CMAKE_SOURCE_DIR}/storage/mysql_storage_engine.cmake)

=== added file 'storage/ndb/include/util/HashMap2.hpp'
--- a/storage/ndb/include/util/HashMap2.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/HashMap2.hpp	2011-09-06 10:23:37 +0000
@@ -0,0 +1,488 @@
+/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#ifndef NDB_HASHMAP2_HPP
+#define NDB_HASHMAP2_HPP
+
+#include <ndb_global.h>
+
+
+/* Basic HashTable implementation
+ * The HashTable stores elements of type KV.
+ * The storage for the elements is managed outside the
+ * HashTable implementation.
+ * The HashTable uses element chaining in each bucket to
+ * deal with collisions.
+ * The HashTable can optionally enforce uniqueness
+ * The HashTable can be resized when it is empty.
+ */
+
+/**
+ * KVOPStaticAdapter template
+ *
+ * Used with HashMap2
+ * Creates a class with static methods calling members of
+ * an object passed to them, for the case where the HashMap
+ * contains types ptrs with the relevant methods defined
+ * (objects).
+ *
+ * Implements the KVOP Api, and requires the following API
+ * from the KV type :
+ *
+ * Useful for supplying OP type.
+ * Required KV Api:
+ *   Uint32 hashValue() const;
+ *   bool equal(const KV* other) const;
+ *   void setNext(KV* next);
+ *   KV* getNext() const;
+ */
+template<typename KV>
+class KVOPStaticAdapter
+{
+public:
+  static Uint32 hashValue(const KV* obj)
+  {
+    return obj->hashValue();
+  };
+
+  static bool equal(const KV* objA, const KV* objB)
+  {
+    return objA->equal(objB);
+  };
+
+  static void setNext(KV* from, KV* to)
+  {
+    return from->setNext(to);
+  };
+
+  static KV* getNext(const KV* from)
+  {
+    return from->getNext();
+  }
+};
+
+// TODO :
+//    Pass allocator context rather than allocator
+//    Support re-alloc?
+//    Use calloc?
+
+/**
+ * StandardAllocator - used in HashMap2 when no allocator supplied
+ * Uses standard malloc/free.
+ */
+struct StandardAllocator
+{
+  static void* alloc(void* ignore, size_t bytes)
+  {
+    return ::malloc(bytes);
+  };
+
+  static void* calloc(void* ignore, size_t nelem, size_t bytes)
+  {
+    return ::calloc(nelem, bytes);
+  }
+
+  static void free(void* ignore, void* mem)
+  {
+    ::free(mem);
+  };
+};
+
+/**
+ * Template parameters
+ *
+ * Classes with static methods are used to avoid the necessity
+ * of using OO wrapper objects for C data.
+ * Objects can be used by defining static methods which call
+ * normal methods.
+ * A default StaticWrapper class exists to 'automate' this if
+ * necessary.
+ *
+ * class KV - Key Value pair.
+ *   The HashTable stores pointers to these.  No interface is
+ *   assumed - they are manipulated via KVOP below, so can be
+ *   chunks of memory or C structs etc.
+ *
+ * bool unique
+ *   True if all keys in a hash table instance must be
+ *   unique.
+ *   False otherwise.
+ *
+ * class A - Allocator
+ *   Used for hash bucket allocation on setSize() call.
+ *   NOT used for element allocation, which is the responsibility
+ *   of the user.
+ *
+ *   Must support static methods :
+ *   - static void* calloc(void* context, size_t nelem, size_t bytes)
+ *   - static void free(void* context, void*)
+ *
+ * class KVOP - Operations on Key Value pair
+ *   KV instances are stored based on the hash returned
+ *   by the KVOP::hashValue() method, with identity based on the
+ *   KVOP::equal() method.
+ *   KV instances must be linkable using KVOP::getNext() and
+ *   KVOP::setNext() methods.
+ *
+ *   KVOP allows the static methods on the KV pair to be separate
+ *   from the data itself.  If they are in the same class, use
+ *   KVOP=KV.  If the methods are not static, and are on the KV class,
+ *   use KVOP=KVOPStaticAdapter<KV>, or equivalent.
+ *
+ *   KVOP must support the following static methods :
+ *   - static bool equal(const class KV* a, const class KV* b);
+ *     Return true if two elements are equal.
+ *
+ *   - static Uint32 hashValue(const class KV*) const;
+ *     Return a 32-bit stable hashvalue for the KV.
+ *     equal(a,b) implies hashValue(a) == hashValue(b)
+ *
+ *   - static void setNext(KV* from, KV* to)
+ *
+ *   - static KV* getNext(const KV* from) const
+ *
+ *
+ * TODO :
+ *   - collision count?
+ *   - release option?
+ */
+template<typename KV,
+         bool unique = true,
+         typename A = StandardAllocator,
+         typename KVOP = KVOPStaticAdapter<KV> >
+class HashMap2
+{
+public:
+  /**
+   * HashMap2 constructor
+   * Pass an Allocator pointer if the templated allocator
+   * requires some context info.
+   * setSize() must be called before the HashMap is used.
+   */
+  HashMap2(void* _allocatorContext = NULL)
+    : tableSize(0),
+      elementCount(0),
+      allocatorContext(_allocatorContext),
+      table(NULL)
+  {
+  };
+
+  ~HashMap2()
+  {
+    if (table)
+      A::free(allocatorContext, table);
+  }
+
+  /**
+   * setSize
+   *
+   * Set the number of buckets.
+   *  Can only be set when the hash table is empty.
+   *  The Allocator is used to allocate/release bucket
+   *  storage.
+   */
+  bool
+  setSize(Uint32 hashBuckets)
+  {
+    if (elementCount)
+    {
+      /* Can't set size while we have contents */
+      return false;
+    }
+
+    if (hashBuckets == 0)
+    {
+      return false;
+    }
+
+    if (table)
+    {
+      A::free(allocatorContext, table);
+      table = NULL;
+    }
+
+    /* TODO : Consider using only power-of-2 + bitmask instead of mod */
+    tableSize = hashBuckets;
+
+    table = (KV**) A::calloc(allocatorContext, hashBuckets, sizeof(KV*));
+
+    if (!table)
+    {
+      return false;
+    }
+
+    for (Uint32 i=0; i < tableSize; i++)
+      table[i] = NULL;
+
+    return true;
+  };
+
+  /**
+   * add
+   *
+   * Add a KV element to the hash table
+   * The next value must be null
+   * If the hash table requires uniqueness, and the
+   * element is not unique, false will be returned
+   */
+  bool
+  add(KV* keyVal)
+  {
+    assert(table);
+
+    Uint32 hashVal = rehash(KVOP::hashValue(keyVal));
+    Uint32 bucketIdx = hashVal % tableSize;
+
+    KV* bucket = table[bucketIdx];
+
+    if (bucket != NULL)
+    {
+      if (unique)
+      {
+        /* Need to check element is not already there, in this
+         * chain
+         */
+        const KV* chainElement = bucket;
+        while (chainElement)
+        {
+          if (KVOP::equal(keyVal, chainElement))
+          {
+            /* Found duplicate */
+            return false;
+          }
+          chainElement= KVOP::getNext(chainElement);
+        }
+      }
+
+      /* Can insert at head of list, as either no uniqueness
+       * guarantee, or uniqueness checked.
+       */
+      assert(KVOP::getNext(keyVal) == NULL);
+      KVOP::setNext(keyVal, bucket);
+      table[bucketIdx] = keyVal;
+    }
+    else
+    {
+      /* First element in bucket */
+      assert(KVOP::getNext(keyVal) == NULL);
+      KVOP::setNext(keyVal, NULL);
+      table[bucketIdx] = keyVal;
+    }
+
+    elementCount++;
+    return true;
+  }
+
+  KV*
+  remove(KV* key)
+  {
+    assert(table);
+
+    Uint32 hashVal = rehash(KVOP::hashValue(key));
+    Uint32 bucketIdx = hashVal % tableSize;
+
+    KV* bucket = table[bucketIdx];
+
+    if (bucket != NULL)
+    {
+      KV* chainElement = bucket;
+      KV* prev = NULL;
+      while (chainElement)
+      {
+        if (KVOP::equal(key, chainElement))
+        {
+          /* Found, repair bucket chain
+           * Get next, might be NULL
+           */
+          KV* n = KVOP::getNext(chainElement);
+          if (prev)
+          {
+            /* Link prev to next */
+            KVOP::setNext(prev, n);
+          }
+          else
+          {
+            /* Put next as first in bucket */
+            table[bucketIdx] = n;
+          }
+
+          KVOP::setNext(chainElement, NULL);
+          elementCount--;
+
+          return chainElement;
+        }
+        prev = chainElement;
+        chainElement = KVOP::getNext(chainElement);
+      }
+    }
+
+    return NULL;
+  }
+
+  /**
+   * get
+   *
+   * Get a ptr to a KV element in the hash table
+   * with the same key as the element passed.
+   * Returns NULL if none exists.
+   *
+   * For non unique hash tables, a ptr to the
+   * first element with the given key is returned.
+   * Further elements must be found by iteration
+   * (and key comparison), until NULL is returned.
+   */
+  KV*
+  get(const KV* key) const
+  {
+    assert(table);
+
+    Uint32 hashVal = rehash(KVOP::hashValue(key));
+    Uint32 bucketIdx = hashVal % tableSize;
+
+    KV* chainElement = table[bucketIdx];
+
+    while(chainElement)
+    {
+      if (KVOP::equal(key, chainElement))
+      {
+        break;
+      }
+      chainElement = KVOP::getNext(chainElement);
+    }
+
+    return chainElement;
+  };
+
+  /**
+   * reset
+   *
+   * Resets the hash table to have no entries.
+   * KV elements added are not released in any
+   * way.  This must be handled outside the
+   * HashTable implementation, perhaps by
+   * iterating and removing the elements?
+   * Storage for the hash table itself is
+   * preserved
+   */
+  void
+  reset()
+  {
+    /* Zap the hashtable ptrs, without freeing the 'elements'
+     * Keep the storage allocated for the ptrs
+     */
+    if (elementCount)
+    {
+      assert(table);
+      for (Uint32 i=0; i < tableSize; i++)
+      {
+        table[i] = NULL;
+      }
+
+      elementCount = 0;
+    }
+  }
+
+  /**
+   * getElementCount
+   *
+   * Returns the number of elements currently
+   * stored in this hash table.
+   */
+  Uint32
+  getElementCount() const
+  {
+      return elementCount;
+  }
+
+  /**
+   * getTableSize
+   *
+   * Returns the number of hashBuckets in this hash table
+   */
+  Uint32
+  getTableSize() const
+  {
+    return tableSize;
+  }
+
+  class Iterator
+  {
+  public:
+    Iterator(HashMap2& hashMap)
+      : m_hash(&hashMap),
+        curr_element(NULL),
+        curr_bucket(0)
+    {}
+
+    /**
+      Return the current element and reposition the iterator to the next
+      element.
+    */
+    KV* next()
+    {
+      while (curr_bucket < m_hash->tableSize)
+      {
+        if (curr_element == NULL)
+        {
+          /* First this bucket */
+          curr_element = m_hash->table[ curr_bucket ];
+        }
+        else
+        {
+          /* Next this bucket */
+          curr_element = KVOP::getNext(curr_element);
+        }
+
+        if (curr_element)
+        {
+          return curr_element;
+        }
+        curr_bucket++;
+      }
+
+      return NULL;
+    }
+    void reset()
+    {
+      curr_element = NULL;
+      curr_bucket = 0;
+    }
+  private:
+    HashMap2* m_hash;
+    KV* curr_element;
+    Uint32 curr_bucket;
+  };
+
+private:
+  static Uint32 rehash(Uint32 userHash)
+  {
+    /* We rehash the supplied hash value in case
+     * it's low quality, mixing some higher order
+     * bits in with the lower bits
+     */
+    userHash ^= ((userHash >> 20) ^ (userHash >> 12));
+    return userHash ^ (userHash >> 7) ^ (userHash >> 4);
+  }
+
+  Uint32 tableSize;
+  Uint32 elementCount;
+  void* allocatorContext;
+
+  KV** table;
+
+}; // class HashMap2()
+
+#endif

=== added file 'storage/ndb/include/util/LinkedStack.hpp'
--- a/storage/ndb/include/util/LinkedStack.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/LinkedStack.hpp	2011-09-06 10:23:37 +0000
@@ -0,0 +1,216 @@
+/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#ifndef NDB_LINKEDSTACK_HPP
+#define NDB_LINKEDSTACK_HPP
+
+#include <ndb_global.h>
+
+/**
+ * LinkedStack
+ *
+ * A templated class for a stack of elements E.
+ * Storage for the elements is allocated using the passed
+ * allocator.
+ * Push copies the supplied element into the stack
+ * Pop overwrites the supplied element with the contents
+ * of the top of the stack
+ * Internally, the class allocates 'blocks' of elements of
+ * the size passed, linking them together as necessary.
+ * As the stack shrinks, the blocks are not released.
+ * Blocks are returned to the allocator when release() is
+ * called.
+ * reset() empties the stack without releasing the allocated
+ * storage.
+ */
+template <typename E, typename A>
+class LinkedStack
+{
+private:
+  struct BlockHeader
+  {
+    BlockHeader* next;
+    BlockHeader* prev;
+    E* elements;
+  };
+
+  BlockHeader* allocBlock()
+  {
+    /* Alloc blockheader and element array */
+    BlockHeader* h = (BlockHeader*) A::alloc(allocatorContext,
+                                             sizeof(BlockHeader));
+    E* e = (E*) A::calloc(allocatorContext, blockElements, sizeof(E));
+
+    h->next = NULL;
+    h->prev = NULL;
+    h->elements = e;
+
+    return h;
+  }
+
+  bool valid()
+  {
+    if (stackTop)
+    {
+      assert(firstBlock != NULL);
+      assert(currBlock != NULL);
+      /* Check that currBlock is positioned on correct
+       * block, except for block boundary case
+       */
+      Uint32 blockNum = (stackTop - 1) / blockElements;
+      BlockHeader* bh = firstBlock;
+      while(blockNum--)
+      {
+        bh = bh->next;
+      }
+      assert(bh == currBlock);
+    }
+    else
+    {
+      assert(currBlock == NULL);
+    }
+    return true;
+  }
+
+  /* Note that stackTop is 'next insertion point' whereas
+   * currBlock points to block last inserted to.
+   * On block boundaries, they refer to different blocks
+   */
+  void* allocatorContext;
+  BlockHeader* firstBlock;
+  BlockHeader* currBlock;
+  Uint32 stackTop;
+  Uint32 blockElements;
+
+public:
+  LinkedStack(Uint32 _blockElements, void* _allocatorContext=NULL)
+    : allocatorContext(_allocatorContext),
+      firstBlock(NULL),
+      currBlock(NULL),
+      stackTop(0),
+      blockElements(_blockElements)
+  {
+    assert(blockElements > 0);
+    assert(valid());
+  }
+
+  ~LinkedStack()
+  {
+    assert(valid());
+    /* Release block storage if present */
+    release();
+  }
+
+  bool push(E& elem)
+  {
+    assert(valid());
+    Uint32 blockOffset = stackTop % blockElements;
+
+    if (blockOffset == 0)
+    {
+      /* On block boundary */
+      if (stackTop)
+      {
+        /* Some elements exist already */
+        if (!currBlock->next)
+        {
+          /* End of block list, alloc another */
+          BlockHeader* newBlock = allocBlock();
+          if (!newBlock)
+            return false;
+
+          currBlock->next = newBlock;
+          currBlock->next->prev = currBlock;
+        }
+        currBlock = currBlock->next;
+      }
+      else
+      {
+        /* First element */
+        if (!firstBlock)
+        {
+          BlockHeader* newBlock = allocBlock();
+          if (!newBlock)
+            return false;
+
+          firstBlock = currBlock = newBlock;
+        }
+        currBlock = firstBlock;
+      }
+    }
+
+    currBlock->elements[ blockOffset ] = elem;
+    stackTop++;
+
+    assert(valid());
+    return true;
+  }
+
+  bool pop(E& elem)
+  {
+    assert(valid());
+    if (stackTop)
+    {
+      stackTop--;
+      Uint32 blockOffset = stackTop % blockElements;
+      elem = currBlock->elements[ blockOffset ];
+
+      if (blockOffset == 0)
+      {
+        /* Block boundary, shift back to prev block. */
+        if (stackTop)
+          assert(currBlock->prev);
+
+        currBlock = currBlock->prev;
+      }
+
+      assert(valid());
+      return true;
+    }
+    return false;
+  }
+
+  Uint32 size() const
+  {
+    return stackTop;
+  }
+
+  void reset()
+  {
+    assert(valid());
+    stackTop = 0;
+    currBlock = NULL;
+    assert(valid());
+  };
+
+  void release()
+  {
+    assert(valid());
+    BlockHeader* h = firstBlock;
+    while (h)
+    {
+      BlockHeader* n = h->next;
+      A::free(allocatorContext, h->elements);
+      A::free(allocatorContext, h);
+      h = n;
+    };
+    stackTop = 0;
+    firstBlock = currBlock = NULL;
+    assert(valid());
+  }
+};
+
+#endif

=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt	2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt	2011-09-06 10:23:37 +0000
@@ -55,6 +55,8 @@ ADD_CONVENIENCE_LIBRARY(ndbgeneral
             require.c
             Vector.cpp
             NdbPack.cpp
+            HashMap2.cpp
+            LinkedStack.cpp
 )
 TARGET_LINK_LIBRARIES(ndbgeneral ndbtrace ${ZLIB_LIBRARY} mysys)
 
@@ -88,3 +90,12 @@ SET_TARGET_PROPERTIES(NdbPack-t
                       PROPERTIES COMPILE_FLAGS "-DTEST_NDB_PACK")
 TARGET_LINK_LIBRARIES(NdbPack-t ndbgeneral ndbportlib)
 
+ADD_EXECUTABLE(HashMap2-t HashMap2.cpp)
+SET_TARGET_PROPERTIES(HashMap2-t
+                      PROPERTIES COMPILE_FLAGS "-DTEST_HASHMAP2")
+TARGET_LINK_LIBRARIES(HashMap2-t ndbgeneral)
+
+ADD_EXECUTABLE(LinkedStack-t LinkedStack.cpp)
+SET_TARGET_PROPERTIES(LinkedStack-t
+                      PROPERTIES COMPILE_FLAGS "-DTEST_LINKEDSTACK")
+TARGET_LINK_LIBRARIES(LinkedStack-t ndbgeneral)
\ No newline at end of file

=== added file 'storage/ndb/src/common/util/HashMap2.cpp'
--- a/storage/ndb/src/common/util/HashMap2.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/HashMap2.cpp	2011-09-06 10:23:37 +0000
@@ -0,0 +1,407 @@
+/* Copyright (C) 2009 Sun Microsystems Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#include <HashMap2.hpp>
+
+#ifdef TEST_HASHMAP2
+#include <NdbTap.hpp>
+
+struct TestHeapAllocator
+{
+  static const int DEBUG_ALLOC=0;
+  static void* alloc(void* ignore, size_t bytes)
+  {
+    void* p = ::malloc(bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u bytes at %p\n",
+             (Uint32) bytes, p);
+    }
+    return p;
+  }
+
+  static void* calloc(void* ignore, size_t nelem, size_t bytes)
+  {
+    void* p = ::calloc(nelem, bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u elements of %u bytes (%u total?) at %p\n",
+             (Uint32)nelem, (Uint32) bytes, (Uint32) (nelem*bytes), p);
+    }
+    return p;
+  }
+
+  static void free(void* ignore, void* mem)
+  {
+    if (DEBUG_ALLOC)
+    {
+      printf("--Freeing bytes at %p\n",mem);
+    }
+    ::free(mem);
+  }
+};
+
+struct IntIntKVPod
+{
+  int a;
+  int b;
+  IntIntKVPod* next;
+};
+
+struct IntIntKVStaticMethods
+{
+  static Uint32 hashValue(const IntIntKVPod* obj)
+  {
+    return obj->a*31;
+  }
+
+  static bool equal(const IntIntKVPod* objA, const IntIntKVPod* objB)
+  {
+    return objA->a == objB->a;
+  }
+
+  static void setNext(IntIntKVPod* from, IntIntKVPod* to)
+  {
+    from->next = to;
+  }
+
+  static IntIntKVPod* getNext(const IntIntKVPod* from)
+  {
+    return from->next;
+  }
+};
+
+struct IntIntKVObj
+{
+  int a;
+  int b;
+  IntIntKVObj* next;
+
+  Uint32 hashValue() const
+  {
+    return a*31;
+  }
+
+  bool equal(const IntIntKVObj* other) const
+  {
+    return a == other->a;
+  }
+
+  void setNext(IntIntKVObj* _next)
+  {
+    next = _next;
+  }
+
+  IntIntKVObj* getNext() const
+  {
+    return next;
+  }
+};
+
+TAPTEST(HashMap2)
+{
+  printf("int -> int (Static, unique) \n");
+  for (int j = 1; j < 150; j ++)
+  {
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+
+    hash1.setSize(j);
+
+    OK(hash1.getElementCount() == 0);
+
+    /* Create some values in a pool on the stack */
+    IntIntKVPod pool[101];
+
+    for (int i=0; i < 100; i++)
+    {
+      pool[i].a = i;
+      pool[i].b = 3 * i;
+      pool[i].next = NULL;
+    }
+
+    /* Add the pool elements to the hash table */
+    for (int i=0; i < 100; i++)
+    {
+      OK(hash1.add(&pool[i]));
+    }
+
+    /* Now attempt to add a duplicate */
+    pool[100].a = 0;
+    pool[100].b = 999;
+    pool[100].next = NULL;
+
+    OK(hash1.getElementCount() == 100);
+
+    OK(! hash1.add(&pool[100]));
+
+    for (int i=1; i < 100; i++)
+    {
+      OK(hash1.get(&pool[i]) == &pool[i]);
+    }
+
+    OK(hash1.get(&pool[0]) == &pool[0]);
+
+    /* Test iterator Api */
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods>::Iterator it(hash1);
+
+    IntIntKVPod* j;
+    for (int i=0; i < 2; i++)
+    {
+      int count = 0;
+      while((j = it.next()))
+      {
+        OK( j->b == ((j->a * 3) - i) );
+        j->b--;
+        count++;
+      }
+      OK( count == 100 );
+      it.reset();
+    }
+
+    hash1.reset();
+    it.reset();
+    OK( it.next() == NULL );
+  }
+
+  printf("int -> int (Static, !unique) \n");
+  for (int j = 1; j < 150; j ++)
+  {
+    HashMap2<IntIntKVPod, false, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+
+    hash1.setSize(j);
+
+    OK(hash1.getElementCount() == 0);
+
+    /* Create some values in a pool on the stack */
+    IntIntKVPod pool[101];
+
+    for (int i=0; i < 100; i++)
+    {
+      pool[i].a = i;
+      pool[i].b = 3 * i;
+      pool[i].next = NULL;
+    }
+
+    /* Add the pool elements to the hash table */
+    for (int i=0; i < 100; i++)
+    {
+      OK(hash1.add(&pool[i]));
+    }
+
+    /* Now attempt to add a duplicate */
+    pool[100].a = 0;
+    pool[100].b = 999;
+    pool[100].next = NULL;
+
+    OK(hash1.getElementCount() == 100);
+
+    OK(  hash1.add(&pool[100]));
+
+    for (int i=1; i < 100; i++)
+    {
+      OK(hash1.get(&pool[i]) == &pool[i]);
+    }
+
+    OK((hash1.get(&pool[0]) == &pool[0]) ||
+       (hash1.get(&pool[0]) == &pool[100]));
+  }
+
+  printf("int -> int (!Static, defaults, (std alloc, unique)) \n");
+  for (int j = 1; j < 150; j ++)
+  {
+    HashMap2<IntIntKVObj> hash1;
+
+    hash1.setSize(j);
+
+    OK(hash1.getElementCount() == 0);
+
+    /* Create some values in a pool on the stack */
+    IntIntKVObj pool[101];
+
+    for (int i=0; i < 100; i++)
+    {
+      pool[i].a = i;
+      pool[i].b = 3 * i;
+      pool[i].next = NULL;
+    }
+
+    /* Add the pool elements to the hash table */
+    for (int i=0; i < 100; i++)
+    {
+      OK(hash1.add(&pool[i]));
+    }
+
+    /* Now attempt to add a duplicate */
+    pool[100].a = 0;
+    pool[100].b = 999;
+    pool[100].next = NULL;
+
+    OK(hash1.getElementCount() == 100);
+
+    OK(! hash1.add(&pool[100]));
+
+    for (int i=1; i < 100; i++)
+    {
+      OK(hash1.get(&pool[i]) == &pool[i]);
+    }
+
+    OK(hash1.get(&pool[0]) == &pool[0]);
+  }
+
+  printf("int -> int (Static, unique, realloc) \n");
+  {
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+    for (int j = 1; j < 150; j ++)
+    {
+      hash1.setSize(150 - j);
+
+      OK(hash1.getElementCount() == 0);
+
+      /* Create some values in a pool on the stack */
+      IntIntKVPod pool[101];
+
+      for (int i=0; i < 100; i++)
+      {
+        pool[i].a = i;
+        pool[i].b = 3 * i;
+        pool[i].next = NULL;
+      }
+
+      /* Add the pool elements to the hash table */
+      for (int i=0; i < 100; i++)
+      {
+        OK(hash1.add(&pool[i]));
+      }
+
+      /* Now attempt to add a duplicate */
+      pool[100].a = 0;
+      pool[100].b = 999;
+      pool[100].next = NULL;
+
+      OK(hash1.getElementCount() == 100);
+
+      OK(! hash1.add(&pool[100]));
+
+      for (int i=1; i < 100; i++)
+      {
+        OK(hash1.get(&pool[i]) == &pool[i]);
+      }
+
+      OK(hash1.get(&pool[0]) == &pool[0]);
+
+      OK(!hash1.setSize(j+1));
+
+      hash1.reset();
+    }
+  }
+
+  printf("int -> int (Static, unique, realloc, remove) \n");
+  {
+    HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+    for (int j = 1; j < 150; j ++)
+    {
+//      hash1.setSize(150 - j);
+      hash1.setSize(j);
+
+      OK(hash1.getElementCount() == 0);
+
+      /* Create some values in a pool on the stack */
+      IntIntKVPod pool[101];
+
+      for (int i=0; i < 100; i++)
+      {
+        pool[i].a = i;
+        pool[i].b = 3 * i;
+        pool[i].next = NULL;
+      }
+
+      /* Add the pool elements to the hash table */
+      for (int i=0; i < 100; i++)
+      {
+        OK(hash1.add(&pool[i]));
+      }
+
+      /* Now attempt to add a duplicate */
+      pool[100].a = 0;
+      pool[100].b = 999;
+      pool[100].next = NULL;
+
+      OK(hash1.getElementCount() == 100);
+
+      OK(!hash1.add(&pool[100]));
+
+      for (int i=1; i < 100; i++)
+      {
+        OK(hash1.get(&pool[i]) == &pool[i]);
+      }
+
+      OK((hash1.get(&pool[0]) == &pool[0]) ||
+         (hash1.get(&pool[0]) == &pool[100]));
+
+      OK(!hash1.setSize(j+1));
+
+      /* Now replace elements with different ones */
+      IntIntKVPod pool2[100];
+      for (int i=0; i < 100; i++)
+      {
+        pool2[i].a = i;
+        pool2[i].b = 4 * i;
+        pool2[i].next = NULL;
+      }
+
+      for (int k=0; k < 4; k++)
+      {
+        for (int i=0; i< 100; i++)
+        {
+          if ((i % 4) == k)
+          {
+            OK(hash1.remove(&pool[i]) == &pool[i]);
+          };
+        };
+
+        OK(hash1.getElementCount() == 75);
+
+        for (int i=0; i< 100; i++)
+        {
+          if ((i % 4) == k)
+          {
+            OK(!hash1.get(&pool[i]));
+          }
+        };
+
+        for (int i=0; i< 100; i++)
+        {
+          if ((i % 4) == k)
+          {
+            OK(hash1.add(&pool2[i]));
+          }
+        }
+        OK(hash1.getElementCount() == 100);
+      }
+
+      for (int i=0; i< 100; i++)
+      {
+        OK(hash1.get(&pool2[i]) == &pool2[i]);
+      };
+
+      hash1.reset();
+    }
+  }
+
+  return 1; // OK
+}
+
+#endif

=== added file 'storage/ndb/src/common/util/LinkedStack.cpp'
--- a/storage/ndb/src/common/util/LinkedStack.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/LinkedStack.cpp	2011-09-06 10:23:37 +0000
@@ -0,0 +1,124 @@
+/* Copyright (C) 2009 Sun Microsystems Inc.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+
+#include <LinkedStack.hpp>
+
+#ifdef TEST_LINKEDSTACK
+#include <NdbTap.hpp>
+
+struct TestHeapAllocator
+{
+  static const int DEBUG_ALLOC=0;
+  static void* alloc(void* ignore, size_t bytes)
+  {
+    void* p = ::malloc(bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u bytes at %p\n",
+             (Uint32) bytes, p);
+    }
+    return p;
+  }
+
+  static void* calloc(void* ignore, size_t nelem, size_t bytes)
+  {
+    void* p = ::calloc(nelem, bytes);
+    if (DEBUG_ALLOC)
+    {
+      printf("--Allocating %u elements of %u bytes (%u bytes?) at %p\n",
+             (Uint32) nelem, (Uint32) bytes, (Uint32) (nelem * bytes), p);
+    }
+    return p;
+  }
+
+  static void free(void* ignore, void* mem)
+  {
+    if (DEBUG_ALLOC)
+    {
+      printf("--Freeing bytes at %p\n",mem);
+    }
+    ::free(mem);
+  }
+};
+
+TAPTEST(LinkedStack)
+{
+  Uint32 popped;
+  Uint32 blockSize = 1;
+
+  for (Uint32 b=0; b < 10; b++)
+  {
+    LinkedStack<Uint32, TestHeapAllocator> testStack(blockSize);
+
+    for (Uint32 p=0; p<4; p++)
+    {
+      /* Pass 0 == alloc, Pass 1 == re-use, Pass 3 = Reset, Pass 4 = Release */
+      printf("LinkedBlockStack size %u, pass %u\n", blockSize, p);
+      Uint32 stackSize = 2033 * (p+1);
+
+      OK(testStack.size() == 0);
+      printf("  Pushing %u elements\n", stackSize);
+      for (Uint32 i=0; i < stackSize; i++)
+      {
+        /* Push items onto the stack */
+        OK(testStack.push(i) == true);
+        OK(testStack.size() == i+1);
+        OK(testStack.pop(popped) == true);
+        OK(popped == i);
+        OK(testStack.size() == i);
+        OK(testStack.push(i) == true);
+      };
+
+      switch(p)
+      {
+      case 0:
+      case 1:
+      {
+        printf("  Popping %u elements\n", stackSize);
+        for (Uint32 i=0; i < stackSize; i++)
+        {
+          /* Pop items off the stack */
+          OK(testStack.size() == stackSize - i);
+          OK(testStack.pop(popped) == true);
+          OK(popped == stackSize - (i+1));
+        }
+        break;
+      }
+      case 2:
+      {
+        printf("  Releasing stack\n");
+        testStack.release();
+        break;
+      }
+      case 3:
+      {
+        printf("  Resetting stack\n");
+        testStack.reset();
+        break;
+      }
+      }
+
+      OK(testStack.size() == 0);
+      OK(testStack.pop(popped) == false);
+    }
+    printf("  Destructing stack\n");
+    blockSize = (blockSize * 2)+1;
+  }
+
+  return 1; // OK
+}
+
+#endif

=== modified file 'storage/ndb/src/common/util/Makefile.am'
--- a/storage/ndb/src/common/util/Makefile.am	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/util/Makefile.am	2011-09-06 10:23:37 +0000
@@ -35,7 +35,7 @@ INCLUDES_LOC = @ZLIB_INCLUDES@
 libndbazio_la_SOURCES = ndbzio.c
 libndbazio_la_LIBADD = @ZLIB_LIBS@
 
-noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t
+noinst_PROGRAMS = BaseString-t HashMap-t Bitmask-t SparseBitmask-t ndb_version-t NdbPack-t HashMap2-t LinkedStack-t
 
 BaseString_t_SOURCES = BaseString.cpp
 BaseString_t_CXXFLAGS = -DTEST_BASE_STRING
@@ -83,5 +83,21 @@ NdbPack_t_LDADD = \
 	$(top_builddir)/strings/libmystrings.la \
 	$(top_builddir)/dbug/libdbug.la
 
+HashMap2_t_SOURCES = HashMap2.cpp
+HashMap2_t_CXXFLAGS = -DTEST_HASHMAP2
+HashMap2_t_LDADD = \
+	libgeneral.la \
+	$(top_builddir)/mysys/libmysyslt.la \
+	$(top_builddir)/dbug/libdbuglt.la \
+	$(top_builddir)/strings/libmystringslt.la
+
+LinkedStack_t_SOURCES = LinkedStack.cpp
+LinkedStack_t_CXXFLAGS = -DTEST_LINKEDSTACK
+LinkedStack_t_LDADD = \
+	libgeneral.la \
+	$(top_builddir)/mysys/libmysyslt.la \
+	$(top_builddir)/dbug/libdbuglt.la \
+	$(top_builddir)/strings/libmystringslt.la
+
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_util.mk.am

=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2011-09-06 10:23:37 +0000
@@ -290,7 +290,8 @@ NdbTransaction::execute(ExecType aTypeOf
 			NdbOperation::AbortOption abortOption,
 			int forceSend)
 {
-  NdbError savedError= theError;
+  NdbError existingTransError = theError;
+  NdbError firstTransError;
   DBUG_ENTER("NdbTransaction::execute");
   DBUG_PRINT("enter", ("aTypeOfExec: %d, abortOption: %d", 
 		       aTypeOfExec, abortOption));
@@ -374,8 +375,8 @@ NdbTransaction::execute(ExecType aTypeOf
           if (tBlob->preExecute(tExecType, batch) == -1)
 	  {
             ret = -1;
-	    if(savedError.code==0)
-	      savedError= theError;
+	    if (firstTransError.code==0)
+	      firstTransError= theError;
 	  }
           tBlob = tBlob->theNext;
         }
@@ -413,8 +414,8 @@ NdbTransaction::execute(ExecType aTypeOf
             if (tBlob->preCommit() == -1)
 	    {
 	      ret = -1;
-	      if(savedError.code==0)
-		savedError= theError;
+	      if (firstTransError.code==0)
+		firstTransError= theError;
 	    }
             tBlob = tBlob->theNext;
           }
@@ -440,8 +441,6 @@ NdbTransaction::execute(ExecType aTypeOf
 		       NdbOperation::DefaultAbortOption,
 		       forceSend) == -1)
     {
-      if(savedError.code==0)
-	savedError= theError;
       /**
        * We abort the execute here. But we still need to put the split-off
        * operation list back into the transaction object, or we will get a
@@ -463,9 +462,13 @@ NdbTransaction::execute(ExecType aTypeOf
           theCompletedLastOp = tCompletedLastOp;
       }
 
+      /* executeNoBlobs will have set transaction error */
       DBUG_RETURN(-1);
     }
 
+    /* Capture any trans error left by the execute() in case it gets trampled */
+    if (firstTransError.code==0)
+      firstTransError= theError;
 
 #ifdef ndb_api_crash_on_complex_blob_abort
     assert(theFirstOpInList == NULL && theLastOpInList == NULL);
@@ -483,8 +486,8 @@ NdbTransaction::execute(ExecType aTypeOf
             if (tBlob->postExecute(tExecType) == -1)
 	    {
               ret = -1;
-	      if(savedError.code==0)
-		savedError= theError;
+	      if (firstTransError.code==0)
+		firstTransError= theError;
 	    }
             tBlob = tBlob->theNext;
           }
@@ -520,8 +523,37 @@ NdbTransaction::execute(ExecType aTypeOf
   }
 #endif
 
-  if(savedError.code!=0 && theError.code==4350) // Trans already aborted
-      theError= savedError;
+  /* Sometimes the original error is trampled by 'Trans already aborted',
+   * detect this case and attempt to restore the original error
+   */
+  if (theError.code == 4350) // Trans already aborted
+  {
+    DBUG_PRINT("info", ("Trans already aborted, existingTransError.code %u, "
+                        "firstTransError.code %u",
+                        existingTransError.code,
+                        firstTransError.code));
+    if (existingTransError.code != 0)
+    {
+      theError = existingTransError;
+    }
+    else if (firstTransError.code != 0)
+    {
+      theError = firstTransError;
+    }
+  }
+
+  /* Generally return the first error which we encountered as
+   * the Trans error.  Caller can traverse the op list to
+   * get the full picture
+   */
+  if (firstTransError.code != 0)
+  {
+    DBUG_PRINT("info", ("Setting error to first error.  firstTransError.code = %u, "
+                        "theError.code = %u",
+                        firstTransError.code,
+                        theError.code));
+    theError = firstTransError;
+  }
 
   DBUG_RETURN(ret);
 }

=== modified file 'storage/ndb/test/ndbapi/testBlobs.cpp'
--- a/storage/ndb/test/ndbapi/testBlobs.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/test/ndbapi/testBlobs.cpp	2011-09-06 10:23:37 +0000
@@ -184,6 +184,7 @@ printusage()
     << "  -bug 27370  Potential inconsistent blob reads for ReadCommitted reads" << endl
     << "  -bug 36756  Handling execute(.., abortOption) and Blobs " << endl
     << "  -bug 45768  execute(Commit) after failing blob batch " << endl
+    << "  -bug 12345  Blob obscures ignored error codes in batch" << endl
     ;
 }
 
@@ -3860,6 +3861,124 @@ static int bugtest_48040()
 }
 
 
+static int bugtest_12345()
+{
+  /* Having a Blob operation in a batch with other operations
+   * causes the other operation's ignored error not to be
+   * set as the transaction error code after execution.
+   * This is used (e.g in MySQLD) to check for conflicts
+   */
+  DBG("bugtest_12345 : Error code from other ops in batch obscured");
+
+  /*
+     1) Setup table : 1 row exists, another doesnt
+     2) Start transaction
+     3) Define failing before op
+     4) Define Blob op with/without post-exec part
+     5) Define failing after op
+     6) Execute
+     7) Check results
+  */
+  calcTups(true);
+
+  /* Setup table */
+  Tup& tupExists = g_tups[0];
+  Tup& notExists = g_tups[1];
+  {
+    CHK((g_con= g_ndb->startTransaction()) != 0);
+    CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
+    CHK(g_opr->insertTuple() == 0);
+    CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
+    if (g_opt.m_pk2chr.m_len != 0)
+    {
+      CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
+      CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
+    }
+    setUDpartId(tupExists, g_opr);
+    CHK(getBlobHandles(g_opr) == 0);
+
+    CHK(setBlobValue(tupExists) == 0);
+
+    CHK(g_con->execute(Commit) == 0);
+    g_con->close();
+  }
+
+  for (int scenario = 0; scenario < 4; scenario++)
+  {
+    DBG(" Scenario : " << scenario);
+    CHK((g_con= g_ndb->startTransaction()) != 0);
+    NdbOperation* failOp = NULL;
+    if ((scenario & 0x1) == 0)
+    {
+      DBG("  Fail op before");
+      /* Define failing op in batch before Blob op */
+      failOp= g_con->getNdbOperation(g_opt.m_tname);
+      CHK(failOp != 0);
+      CHK(failOp->readTuple() == 0);
+      CHK(failOp->equal("PK1", notExists.m_pk1) == 0);
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        CHK(failOp->equal("PK2", notExists.m_pk2) == 0);
+        CHK(failOp->equal("PK3", notExists.m_pk3) == 0);
+      }
+      setUDpartId(notExists, failOp);
+      CHK(failOp->getValue("PK1") != 0);
+      CHK(failOp->setAbortOption(NdbOperation::AO_IgnoreError) == 0);
+    }
+
+    /* Now define successful Blob op */
+    CHK((g_opr= g_con->getNdbOperation(g_opt.m_tname)) != 0);
+    CHK(g_opr->readTuple() == 0);
+    CHK(g_opr->equal("PK1", tupExists.m_pk1) == 0);
+    if (g_opt.m_pk2chr.m_len != 0)
+    {
+      CHK(g_opr->equal("PK2", tupExists.m_pk2) == 0);
+      CHK(g_opr->equal("PK3", tupExists.m_pk3) == 0);
+    }
+    setUDpartId(tupExists, g_opr);
+    CHK(getBlobHandles(g_opr) == 0);
+
+    CHK(getBlobValue(tupExists) == 0);
+
+
+    /* Define failing batch op after Blob op if not defined before */
+    if (failOp == 0)
+    {
+      DBG("  Fail op after");
+      failOp= g_con->getNdbOperation(g_opt.m_tname);
+      CHK(failOp != 0);
+      CHK(failOp->readTuple() == 0);
+      CHK(failOp->equal("PK1", notExists.m_pk1) == 0);
+      if (g_opt.m_pk2chr.m_len != 0)
+      {
+        CHK(failOp->equal("PK2", notExists.m_pk2) == 0);
+        CHK(failOp->equal("PK3", notExists.m_pk3) == 0);
+      }
+      setUDpartId(notExists, failOp);
+      CHK(failOp->getValue("PK1") != 0);
+      CHK(failOp->setAbortOption(NdbOperation::AO_IgnoreError) == 0);
+    }
+
+    /* Now execute and check rc etc */
+    NdbTransaction::ExecType et = (scenario & 0x2) ?
+      NdbTransaction::NoCommit:
+      NdbTransaction::Commit;
+
+    DBG("  Executing with execType = " << ((et == NdbTransaction::NoCommit)?
+                                           "NoCommit":"Commit"));
+    int rc = g_con->execute(NdbTransaction::NoCommit);
+
+    CHK(rc == 0);
+    CHK(g_con->getNdbError().code == 626);
+    CHK(failOp->getNdbError().code == 626);
+    CHK(g_opr->getNdbError().code == 0);
+    DBG("  Error code on transaction as expected");
+
+    g_con->close();
+  }
+
+  return 0;
+}
 
 // main
 
@@ -4824,7 +4943,8 @@ static struct {
   { 36756, bugtest_36756 },
   { 45768, bugtest_45768 },
   { 48040, bugtest_48040 },
-  { 28116, bugtest_28116 }
+  { 28116, bugtest_28116 },
+  { 12345, bugtest_12345 }
 };
 
 NDB_COMMAND(testOdbcDriver, "testBlobs", "testBlobs", "testBlobs", 65535)

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2011-06-28 08:47:18 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2011-09-06 10:23:37 +0000
@@ -1740,3 +1740,7 @@ max-time: 300
 cmd: testIndexStat
 args:
 
+max-time: 300
+cmd: testBlobs
+args: -bug 12345 -skip p
+

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4251 to 4252) Frazer Clement6 Sep