List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:June 16 2011 6:59pm
Subject:bzr push into mysql-5.1-telco-7.0-wl4124-new1 branch (pekka.nousiainen:4402
to 4403)
View as plain text  
 4403 Pekka Nousiainen	2011-06-16 [merge]
      merge main to wl4124-new1

    added:
      mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc
    modified:
      mysql-test/suite/ndb/r/ndb_basic.result
      mysql-test/suite/ndb_rpl/my.cnf
      mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc
      mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
      mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc
      mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc
      mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test
      mysql-test/suite/ndb_rpl/t/ndb_rpl_multi_binlog_update.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
      mysql-test/suite/ndb_team/my.cnf
      mysql-test/suite/rpl_ndb/my.cnf
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/sql_class.cc
      sql/sql_class.h
      storage/ndb/include/mgmapi/mgmapi_config_parameters.h
      storage/ndb/src/common/portlib/CMakeLists.txt
      storage/ndb/src/common/util/ndbzio.c
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
      storage/ndb/src/kernel/vm/NdbSeqLock.hpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
      storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbQueryBuilder.hpp
      storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/ndberror.c
      storage/ndb/test/include/HugoQueryBuilder.hpp
      storage/ndb/test/ndbapi/testSystemRestart.cpp
      storage/ndb/test/run-test/autotest-boot.sh
      storage/ndb/test/src/HugoQueryBuilder.cpp
      storage/ndb/test/src/NdbBackup.cpp
 4402 Pekka Nousiainen	2011-06-16
      wl#4124 x10_fix.diff
      build fixes

    modified:
      sql/ha_ndb_index_stat.cc
=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2011-06-15 10:37:56 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2011-06-16 18:16:01 +0000
@@ -66,6 +66,7 @@ Ndb_cluster_node_id	#
 Ndb_config_from_host	#
 Ndb_config_from_port	#
 Ndb_conflict_fn_max	#
+Ndb_conflict_fn_max_del_win	#
 Ndb_conflict_fn_old	#
 Ndb_connect_count	#
 Ndb_execute_count	#

=== modified file 'mysql-test/suite/ndb_rpl/my.cnf'
--- a/mysql-test/suite/ndb_rpl/my.cnf	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/my.cnf	2011-06-08 19:25:29 +0000
@@ -60,7 +60,6 @@ relay-log=                    slave-rela
 # Cluster only supports row format
 binlog-format=                 row
 
-init-rpl-role=                slave
 log-slave-updates
 master-retry-count=           10
 
@@ -83,8 +82,6 @@ skip-slave-start
 # test results will vary, thus a relative path is used.
 slave-load-tmpdir=            ../../../tmp
 
-rpl-recovery-rank=            @mysqld.1.slave.server-id
-
 [ENV]
 NDB_CONNECTSTRING=            @mysql_cluster.1.ndb_connectstring
 MASTER_MYPORT=                @mysqld.1.1.port

=== modified file 'mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc'
--- a/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc	2011-06-16 12:36:28 +0000
@@ -3,10 +3,11 @@
 # Set up circular cluster replication where each 
 # cluster has two mysqlds and replication directions are 
 # following:
+#              1       2
 #          master ---> slave  
 #           /            \
 #     cluster A        cluster B
-#           \            /
+#           \  3       4 /
 #         master1 <--- slave1
 #
 # ==== Usage ====
@@ -26,8 +27,9 @@
 #   $rpl_skip_start_slave, $rpl_debug, $slave_timeout
 #     See include/master-slave.inc
 
+#--let $rpl_debug= 1
 --let $rpl_topology= 1->2,4->3
---let $rpl_skip_check_server_ids= 1
+--let $rpl_skip_start_slave= 1
 --source include/rpl_init.inc
 
 # Make connections to mysqlds
@@ -37,7 +39,7 @@
 --source include/rpl_connect.inc
 
 --let $rpl_connection_name= master1
---let $rpl_server_number= 1
+--let $rpl_server_number= 3
 --source include/rpl_connect.inc
 
 --let $rpl_connection_name= slave
@@ -45,9 +47,23 @@
 --source include/rpl_connect.inc
 
 --let $rpl_connection_name= slave1
---let $rpl_server_number= 2
+--let $rpl_server_number= 4
 --source include/rpl_connect.inc
 
+# Now add IGNORE_SERVER_IDS
+--disable_query_log
+connection master;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (1,3);
+connection master1;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (1,3);
+connection slave;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (2,4);
+connection slave1;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (2,4);
+
+# Now start replication
+--source include/rpl_start_slaves.inc
+--enable_query_log
 
 # Check that all mysqld are compiled with ndb support
 --let $_rpl_server= 4

=== added file 'mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc'
--- a/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc	2011-06-16 12:36:28 +0000
@@ -0,0 +1,28 @@
+# ==== Purpose ====
+#
+# Clean up replication configuration after using a 2ch
+# setup.
+# We need to explicitly reset the IGNORE_SERVER_IDS parameters
+# on all Servers to avoid testcase check errors.
+#
+# ==== Usage ====
+#
+#   [--let $rpl_debug= 1]
+#   --source suite/ndb_rpl/ndb_master-slave_2ch_end.inc
+#
+# Parameters:
+#   $rpl_debug
+#     See include/master-slave.inc
+
+--source include/rpl_stop_slaves.inc
+--connection master
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--connection master1
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--connection slave
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--connection slave1
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--source include/rpl_start_slaves.inc
+
+--source include/rpl_end.inc
\ No newline at end of file

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result	2011-06-16 12:36:28 +0000
@@ -3,6 +3,7 @@ include/rpl_connect.inc [creating master
 include/rpl_connect.inc [creating master1]
 include/rpl_connect.inc [creating slave]
 include/rpl_connect.inc [creating slave1]
+include/rpl_start_slaves.inc
 
 *** Check server_id of mysqld servers ***
 SHOW VARIABLES LIKE "server_id";
@@ -12,7 +13,7 @@ SET auto_increment_offset = 1;
 SET auto_increment_increment = 2;
 SHOW VARIABLES LIKE "server_id";
 Variable_name	Value
-server_id	1
+server_id	3
 SET auto_increment_offset = 1;
 SET auto_increment_increment = 2;
 SHOW VARIABLES LIKE "server_id";
@@ -22,7 +23,7 @@ SET auto_increment_offset = 2;
 SET auto_increment_increment = 2;
 SHOW VARIABLES LIKE "server_id";
 Variable_name	Value
-server_id	2
+server_id	4
 SET auto_increment_offset = 2;
 SET auto_increment_increment = 2;
 
@@ -48,4 +49,10 @@ Check data on both clusters
 include/diff_tables.inc [master:t1, slave:t1]
 DROP TABLE IF EXISTS t1;
 
+include/rpl_stop_slaves.inc
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+include/rpl_start_slaves.inc
 include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -162,10 +174,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -198,10 +219,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -259,6 +289,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -285,6 +318,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -305,6 +341,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -332,6 +371,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -380,10 +422,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -416,10 +467,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -473,10 +533,13 @@ a	b	c	d
 3	Master t2 c=2	2	123
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+3
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-3
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -505,10 +568,13 @@ a	b	c	d
 3	Master t2 a=3 at c=3	3	123
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-4
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -533,10 +599,13 @@ select * from t2 order by a, d;
 a	b	c	d
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-4
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -572,6 +641,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -616,10 +688,13 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+3
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-3
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -658,10 +733,13 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+6
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-6
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -733,6 +811,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -759,6 +840,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -779,6 +863,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -806,6 +893,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -850,15 +940,24 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-1
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
+2	1	#	4	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 2	1	#	#	4	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
@@ -889,15 +988,24 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-1
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
+2	1	#	4	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 2	1	#	#	4	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -158,14 +170,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -194,14 +215,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -259,6 +289,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -285,6 +318,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -305,6 +341,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -332,6 +371,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -376,14 +418,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -412,14 +463,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -476,6 +536,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -508,6 +571,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 4
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -536,6 +602,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 4
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -572,6 +641,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -619,6 +691,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -661,6 +736,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 6
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -733,6 +811,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -759,6 +840,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -779,6 +863,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -806,6 +893,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -850,14 +940,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -886,14 +985,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -162,6 +174,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -204,6 +219,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 5
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -275,6 +293,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -301,6 +322,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -321,6 +345,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -348,6 +375,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -396,6 +426,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -438,6 +471,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 5
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -509,6 +545,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -541,6 +580,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -569,6 +611,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -604,6 +649,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -652,6 +700,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -694,6 +745,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 6
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -765,6 +819,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -791,6 +848,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -811,6 +871,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -838,6 +901,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -886,6 +952,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -931,6 +1000,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 6
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2011-06-16 14:34:56 +0000
@@ -6,9 +6,11 @@ Error	1626	Bad schema for mysql.ndb_repl
 drop table t1;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
 Error	1627	Error in parsing conflict function. Message: NDB$X(X), unknown conflict resolution function at 'NDB$X(X)'
-drop table t1;
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
 create table t1 (a int key, X int) engine ndb;
@@ -18,13 +20,17 @@ drop table t1;
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
 Error	1627	Error in parsing conflict function. Message: NDB$MAX(), missing function argument at ')'
-drop table t1;
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
 Error	1627	Error in parsing conflict function. Message: NDB$MAX(X Y), missing ')' at 'Y)'
-drop table t1;
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc	2011-06-16 14:34:56 +0000
@@ -1,5 +1,6 @@
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
 --replace_column 3 # 5 # 6 #
 --error 0,1146
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc	2011-06-16 14:34:56 +0000
@@ -2,6 +2,7 @@
 --disable_result_log
 SELECT @init_ndb_conflict_fn_max:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 SELECT @init_ndb_conflict_fn_old:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
+SELECT @init_ndb_conflict_fn_max_del_win:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
 --error 0,1146
 DELETE FROM `t1$EX`;
 --enable_query_log

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf	2011-06-08 19:25:29 +0000
@@ -66,8 +66,6 @@ default-storage-engine=myisam
 # test results will vary, thus a relative path is used.
 slave-load-tmpdir=            ../../../tmp
 
-rpl-recovery-rank=            @mysqld.1.cluster2.server-id
-
 [mysqld.1.cluster3]
 log-bin=                      cluster3-bin
 relay-log=                    cluster3-relay-bin
@@ -83,8 +81,6 @@ default-storage-engine=myisam
 # test results will vary, thus a relative path is used.
 slave-load-tmpdir=            ../../../tmp
 
-rpl-recovery-rank=            @mysqld.1.cluster3.server-id
-
 [ENV]
 SERVER_MYPORT_1=              @mysqld.1.cluster1.port
 SERVER_MYPORT_2=              @mysqld.1.cluster2.port

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf	2011-06-16 12:36:28 +0000
@@ -2,34 +2,39 @@
 
 # 2 clusters, each with 2 MySQLDs
 # All MySQLDs log-slave-updates
-# Potential infinite loops are broken by both servers 
-# on each cluster having the same server-id
-# To support > 2 clusters and/or different server-ids per
-# MySQLD server, we need some other loop breaking 
-# mechanism 
+# All MySQLDs log-apply-status
+# Infinite loops broken in the test using Ignore_server_ids mechanism
 
 [mysqld.1.1]
 server-id= 1
 log-bin
+ndb_connectstring=	@mysql_cluster.1.ndb_connectstring
 log-slave-updates
+skip-slave-start
 
 [mysqld.2.1]
-server-id= 1
+server-id= 3
 log-bin
+ndb_connectstring=	@mysql_cluster.1.ndb_connectstring
 log-slave-updates
+skip-slave-start
 
 [mysqld.1.slave]
 server-id= 2
 log-bin
+ndb_connectstring=	@mysql_cluster.slave.ndb_connectstring
 log-slave-updates
 skip-slave-start
 
 [mysqld.2.slave]
-server-id= 2
+server-id= 4
 log-bin
 ndb_connectstring=	@mysql_cluster.slave.ndb_connectstring
+log-slave-updates
+skip-slave-start
 
 [ENV]
-
-SLAVE_MYPORT1=		@mysqld.2.slave.port
-SLAVE_MYSOCK1=		@mysqld.2.slave.socket
+SERVER_MYPORT_1=        @mysqld.1.1.port
+SERVER_MYPORT_2=        @mysqld.1.slave.port
+SERVER_MYPORT_3=        @mysqld.2.1.port
+SERVER_MYPORT_4=        @mysqld.2.slave.port

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test	2011-06-16 12:36:28 +0000
@@ -163,5 +163,8 @@ DROP TABLE IF EXISTS t1;
 --source include/wait_for_query_to_fail.inc
 --echo
 
+--connection master
+
 # End of test 5.1
---source include/rpl_end.inc
+--source suite/ndb_rpl/ndb_master-slave_2ch_end.inc
+

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_multi_binlog_update.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_multi_binlog_update.cnf	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_multi_binlog_update.cnf	2011-06-08 19:25:29 +0000
@@ -58,7 +58,6 @@ binlog_format=row
 [mysqld.1.slave]
 # Note no binlog on this slave
 server-id= 4
-init-rpl-role= slave
 skip-slave-start
 loose-skip-innodb
 slave-load-tmpdir= ../../../tmp
@@ -69,7 +68,6 @@ ndb_connectstring= @mysql_cluster.slave.
 [mysqld.2.slave]
 # Note binlog on this slave, but not logging slave updates
 server-id= 5
-init-rpl-role= slave
 skip-slave-start
 loose-skip-innodb
 slave-load-tmpdir= ../../../tmp
@@ -82,7 +80,6 @@ binlog_format=row
 [mysqld.3.slave]
 # Note binlog on this slave, with slave updates logged
 server-id= 6
-init-rpl-role= slave
 skip-slave-start
 loose-skip-innodb
 slave-load-tmpdir= ../../../tmp

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2011-06-16 14:34:56 +0000
@@ -47,10 +47,11 @@ CREATE TABLE mysql.ndb_replication
 --enable_query_log
 
 # Non existant conflict_fn
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 # Column type cannot be used for this function
@@ -61,17 +62,19 @@ drop table t1;
 delete from mysql.ndb_replication;
 
 # Too few arguments
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 # Too many arguments
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 --disable_query_log

=== modified file 'mysql-test/suite/ndb_team/my.cnf'
--- a/mysql-test/suite/ndb_team/my.cnf	2011-04-15 09:31:03 +0000
+++ b/mysql-test/suite/ndb_team/my.cnf	2011-06-08 19:25:29 +0000
@@ -50,7 +50,6 @@ master-connect-retry=         1
 log-bin=                      slave-bin
 relay-log=                    slave-relay-bin
 
-init-rpl-role=                slave
 log-slave-updates
 master-retry-count=           10
 
@@ -68,9 +67,6 @@ skip-slave-start
 # test results will vary, thus a relative path is used.
 slave-load-tmpdir=            ../../../tmp
 
-rpl-recovery-rank=            @mysqld.1.slave.server-id
-
-
 [ENV]
 NDB_CONNECTSTRING=            @mysql_cluster.1.ndb_connectstring
 MASTER_MYPORT=                @mysqld.1.1.port

=== modified file 'mysql-test/suite/rpl_ndb/my.cnf'
--- a/mysql-test/suite/rpl_ndb/my.cnf	2011-04-26 09:28:41 +0000
+++ b/mysql-test/suite/rpl_ndb/my.cnf	2011-06-08 19:25:29 +0000
@@ -60,7 +60,6 @@ relay-log=                    slave-rela
 # Cluster only supports row format
 binlog-format=                 row
 
-init-rpl-role=                slave
 log-slave-updates
 master-retry-count=           10
 
@@ -83,8 +82,6 @@ skip-slave-start
 # test results will vary, thus a relative path is used.
 slave-load-tmpdir=            ../../../tmp
 
-rpl-recovery-rank=            @mysqld.1.slave.server-id
-
 [ENV]
 NDB_CONNECTSTRING=            @mysql_cluster.1.ndb_connectstring
 MASTER_MYPORT=                @mysqld.1.1.port

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-06-15 10:37:56 +0000
+++ b/sql/ha_ndbcluster.cc	2011-06-16 18:16:01 +0000
@@ -347,6 +347,11 @@ ndbcluster_alter_table_flags(uint flags)
 
 #define NDB_AUTO_INCREMENT_RETRIES 100
 #define BATCH_FLUSH_SIZE (32768)
+/*
+  Room for 10 instruction words, two labels (@ 2words/label)
+  + 2 extra words for the case of resolve_size == 8
+*/
+#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16
 
 static void set_ndb_err(THD *thd, const NdbError &err);
 
@@ -432,8 +437,6 @@ extern void ndb_index_stat_end();
 /* Status variables shown with 'show status like 'Ndb%' */
 
 struct st_ndb_status g_ndb_status;
-static long g_ndb_status_conflict_fn_max= 0;
-static long g_ndb_status_conflict_fn_old= 0;
 
 long g_ndb_status_index_stat_cache_query = 0;
 long g_ndb_status_index_stat_cache_clean = 0;
@@ -453,6 +456,37 @@ update_slave_api_stats(Ndb* ndb)
     g_slave_api_client_stats[i] = ndb->getClientStat(i);
 }
 
+st_ndb_slave_state g_ndb_slave_state;
+
+st_ndb_slave_state::st_ndb_slave_state()
+  : current_conflict_defined_op_count(0)
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  memset(total_violation_count, 0, sizeof(total_violation_count));
+};
+
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+  /* Reset current-transaction counters + state */
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  current_conflict_defined_op_count = 0;
+}
+
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+  /* Merge committed transaction counters into total state
+   * Then reset current transaction counters
+   */
+  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+  {
+    total_violation_count[i]+= current_violation_count[i];
+    current_violation_count[i] = 0;
+  }
+  current_conflict_defined_op_count = 0;
+}
+
 static int update_status_variables(Thd_ndb *thd_ndb,
                                    st_ndb_status *ns,
                                    Ndb_cluster_connection *c)
@@ -567,8 +601,9 @@ SHOW_VAR ndb_status_variables_dynamic[]=
 };
 
 SHOW_VAR ndb_status_conflict_variables[]= {
-  {"fn_max",     (char*) &g_ndb_status_conflict_fn_max, SHOW_LONG},
-  {"fn_old",     (char*) &g_ndb_status_conflict_fn_old, SHOW_LONG},
+  {"fn_max",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX], SHOW_LONG},
+  {"fn_old",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONG},
+  {"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONG},
   {NullS, NullS, SHOW_LONG}
 };
 
@@ -733,6 +768,24 @@ static int write_conflict_row(NDB_SHARE
 }
 #endif
 
+#ifdef HAVE_NDB_BINLOG
+int
+handle_conflict_op_error(Thd_ndb* thd_ndb,
+                         NdbTransaction* trans,
+                         const NdbError& err,
+                         const NdbOperation* op);
+
+int
+handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    const NdbRecord* key_rec,
+                    const uchar* pk_row,
+                    enum_conflicting_op_type op_type,
+                    enum_conflict_cause conflict_cause,
+                    const NdbError& conflict_error,
+                    NdbTransaction* conflict_trans,
+                    NdbError& err);
+#endif
+
 inline int
 check_completed_operations_pre_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
                                       const NdbOperation *first,
@@ -753,102 +806,42 @@ check_completed_operations_pre_commit(Th
     or exceptions to report
   */
 #ifdef HAVE_NDB_BINLOG
-  uint conflict_rows_written= 0;
+  const NdbOperation* lastUserOp = trans->getLastDefinedOperation();
 #endif
   while (true)
   {
     const NdbError &err= first->getNdbError();
-    if (err.classification != NdbError::NoError
-#ifndef HAVE_NDB_BINLOG
-        && err.classification != NdbError::ConstraintViolation
-        && err.classification != NdbError::NoDataFound
-#endif
-        )
+    const bool op_has_conflict_detection = (first->getCustomData() != NULL);
+    if (!op_has_conflict_detection)
     {
-#ifdef HAVE_NDB_BINLOG
-      DBUG_PRINT("info", ("ndb error: %d", err.code));
-      if (err.code == (int) error_conflict_fn_max_violation)
-      {
-        DBUG_PRINT("info", ("err.code == (int) error_conflict_fn_max_violation"));
-        thd_ndb->m_max_violation_count++;
-      }
-      else if (err.code == (int) error_conflict_fn_old_violation ||
-               err.classification == NdbError::ConstraintViolation ||
-               err.classification == NdbError::NoDataFound)
-      {
-        DBUG_PRINT("info",
-                   ("err.code %s (int) error_conflict_fn_old_violation, "
-                    "err.classification %s",
-                    err.code == (int) error_conflict_fn_old_violation ? "==" : "!=",
-                    err.classification
-                    == NdbError::ConstraintViolation
-                    ? "== NdbError::ConstraintViolation"
-                    : (err.classification == NdbError::NoDataFound
-                       ? "== NdbError::NoDataFound" : "!=")));
-        thd_ndb->m_old_violation_count++;
-        const void* buffer= first->getCustomData();
-        if (buffer != NULL)
-        {
-          Ndb_exceptions_data ex_data;
-          memcpy(&ex_data, buffer, sizeof(ex_data));
-          NDB_SHARE *share= ex_data.share;
-          const uchar* row= ex_data.row;
-          DBUG_ASSERT(share != NULL && row != NULL);
-
-          NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
-          if (cfn_share && cfn_share->m_ex_tab)
-          {
-            NdbError ex_err;
-            if (write_conflict_row(share, trans, row, ex_err))
-            {
-              char msg[FN_REFLEN];
-              my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
-                          cfn_share->m_ex_tab->getName(),
-                          ex_err.code, ex_err.message);
-
-              NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
-
-              if (ex_err.classification == NdbError::SchemaError)
-              {
-                dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
-                cfn_share->m_ex_tab= NULL;
-              }
-              else if (ex_err.status == NdbError::TemporaryError)
-              {
-                /* Slave will roll back and retry entire transaction. */
-                ERR_RETURN(ex_err);
-              }
-              else
-              {
-                push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                                    ER_EXCEPTIONS_WRITE_ERROR,
-                                    ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-                /* Slave will stop replication. */
-                DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-              }
-            }
-            else
-            {
-              conflict_rows_written++;
-            }
-          }
-          else
-          {
-            DBUG_PRINT("info", ("missing %s", !cfn_share ? "cfn_share" : "ex_tab"));
-          }
-        }
-        else
-        {
-          DBUG_PRINT("info", ("missing custom data"));
-        }
-      }
-      else
-#endif
+      /* 'Normal path' - ignore key (not) present, others are errors */
+      if (err.classification != NdbError::NoError &&
+          err.classification != NdbError::ConstraintViolation &&
+          err.classification != NdbError::NoDataFound)
       {
+        /* Non ignored error, report it */
         DBUG_PRINT("info", ("err.code == %u", err.code));
         DBUG_RETURN(err.code);
       }
     }
+#ifdef HAVE_NDB_BINLOG
+    else
+    {
+      /*
+         Op with conflict detection, use special error handling method
+       */
+
+      if (err.classification != NdbError::NoError)
+      {
+        int res = handle_conflict_op_error(thd_ndb,
+                                           trans,
+                                           err,
+                                           first);
+        if (res != 0)
+          DBUG_RETURN(res);
+      }
+    } // if (!op_has_conflict_detection)
+#endif
     if (err.classification != NdbError::NoError)
       ignores++;
 
@@ -860,7 +853,11 @@ check_completed_operations_pre_commit(Th
   if (ignore_count)
     *ignore_count= ignores;
 #ifdef HAVE_NDB_BINLOG
-  if (conflict_rows_written)
+  /*
+     Conflict detection related error handling above may have defined
+     new operations on the transaction.  If so, execute them now
+  */
+  if (trans->getLastDefinedOperation() != lastUserOp)
   {
     if (trans->execute(NdbTransaction::NoCommit,
                        NdbOperation::AO_IgnoreError,
@@ -964,10 +961,10 @@ int execute_no_commit(Thd_ndb *thd_ndb,
                                                     ignore_count));
 }
 
-int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                    int force_send, int ignore_error, uint *ignore_count= 0);
 inline
-int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                    int force_send, int ignore_error, uint *ignore_count)
 {
   DBUG_ENTER("execute_commit");
@@ -984,19 +981,19 @@ int execute_commit(Thd_ndb *thd_ndb, Ndb
   const NdbOperation *first= trans->getFirstDefinedOperation();
   const NdbOperation *last= trans->getLastDefinedOperation();
   thd_ndb->m_execute_count++;
-  thd_ndb->m_conflict_fn_usage_count= 0;
   thd_ndb->m_unsent_bytes= 0;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
   if (trans->execute(NdbTransaction::Commit, ao, force_send))
   {
-    thd_ndb->m_max_violation_count= 0;
-    thd_ndb->m_old_violation_count= 0;
+    if (thd->slave_thread)
+      g_ndb_slave_state.atTransactionAbort();
     DBUG_RETURN(-1);
   }
-  g_ndb_status_conflict_fn_max+= thd_ndb->m_max_violation_count;
-  g_ndb_status_conflict_fn_old+= thd_ndb->m_old_violation_count;
-  thd_ndb->m_max_violation_count= 0;
-  thd_ndb->m_old_violation_count= 0;
+  /* Success of some sort */
+  if (thd->slave_thread)
+  {
+    g_ndb_slave_state.atTransactionCommit();
+  }
   if (!ignore_error || trans->getNdbError().code == 0)
     DBUG_RETURN(trans->getNdbError().code);
   DBUG_RETURN(check_completed_operations(thd_ndb, trans, first, last,
@@ -1052,9 +1049,6 @@ Thd_ndb::Thd_ndb()
   m_execute_count= 0;
   m_scan_count= 0;
   m_pruned_scan_count= 0;
-  m_max_violation_count= 0;
-  m_old_violation_count= 0;
-  m_conflict_fn_usage_count= 0;
   bzero(m_transaction_no_hint_count, sizeof(m_transaction_no_hint_count));
   bzero(m_transaction_hint_count, sizeof(m_transaction_hint_count));
   global_schema_lock_trans= NULL;
@@ -3000,9 +2994,8 @@ int ha_ndbcluster::ndb_pk_update_row(THD
     DBUG_PRINT("info", ("insert failed"));
     if (trans->commitStatus() == NdbConnection::Started)
     {
-      m_thd_ndb->m_max_violation_count= 0;
-      m_thd_ndb->m_old_violation_count= 0;
-      m_thd_ndb->m_conflict_fn_usage_count= 0;
+      if (thd->slave_thread)
+        g_ndb_slave_state.atTransactionAbort();
       m_thd_ndb->m_unsent_bytes= 0;
       m_thd_ndb->m_execute_count++;
       DBUG_PRINT("info", ("execute_count: %u", m_thd_ndb->m_execute_count));
@@ -3940,6 +3933,239 @@ thd_allow_batch(const THD* thd)
 #endif
 }
 
+#ifdef HAVE_NDB_BINLOG
+/**
+   prepare_conflict_detection
+
+   This method is called during operation definition by the slave,
+   when writing to a table with conflict detection defined.
+
+   It is responsible for defining and adding any operation filtering
+   required, and for saving any operation definition state required
+   for post-execute analysis
+*/
+int
+ha_ndbcluster::prepare_conflict_detection(enum_conflicting_op_type op_type,
+                                          const NdbRecord* key_rec,
+                                          const uchar* old_data,
+                                          const uchar* new_data,
+                                          NdbInterpretedCode* code,
+                                          NdbOperation::OperationOptions* options)
+{
+  DBUG_ENTER("prepare_conflict_detection");
+
+  int res = 0;
+  const st_conflict_fn_def* conflict_fn = m_share->m_cfn_share->m_conflict_fn;
+  assert( conflict_fn != NULL );
+
+
+  /*
+     Prepare interpreted code for operation (update + delete only) according
+     to algorithm used
+  */
+  if (op_type != WRITE_ROW)
+  {
+    res = conflict_fn->prep_func(m_share->m_cfn_share,
+                                 op_type,
+                                 old_data,
+                                 new_data,
+                                 table->write_set,
+                                 code);
+
+    if (!res)
+    {
+      /* Attach conflict detecting filter program to operation */
+      options->optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
+      options->interpretedCode= code;
+    }
+  } // if (op_type != WRITE_ROW)
+
+  g_ndb_slave_state.current_conflict_defined_op_count++;
+
+  /* Now save data for potential insert to exceptions table... */
+  const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
+  Ndb_exceptions_data ex_data;
+  ex_data.share= m_share;
+  ex_data.key_rec= key_rec;
+  ex_data.op_type= op_type;
+  /*
+    We need to save the row data for possible conflict resolution after
+    execute().
+  */
+  ex_data.row= copy_row_to_buffer(m_thd_ndb, row_to_save);
+  uchar* ex_data_buffer= get_buffer(m_thd_ndb, sizeof(ex_data));
+  if (ex_data.row == NULL || ex_data_buffer == NULL)
+  {
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+  memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
+
+  /* Store ptr to exceptions data in operation 'customdata' ptr */
+  options->optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
+  options->customData= (void*)ex_data_buffer;
+
+  DBUG_RETURN(0);
+}
+
+/**
+   handle_conflict_op_error
+
+   This method is called when an error is detected after executing an
+   operation with conflict detection active.
+
+   If the operation error is related to conflict detection, handling
+   starts.
+
+   Handling involves incrementing the relevant counter, and optionally
+   refreshing the row and inserting an entry into the exceptions table
+*/
+
+int
+handle_conflict_op_error(Thd_ndb* thd_ndb,
+                         NdbTransaction* trans,
+                         const NdbError& err,
+                         const NdbOperation* op)
+{
+  DBUG_ENTER("handle_conflict_op_error");
+  DBUG_PRINT("info", ("ndb error: %d", err.code));
+
+  if ((err.code == (int) error_conflict_fn_violation) ||
+      (err.classification == NdbError::ConstraintViolation) ||
+      (err.classification == NdbError::NoDataFound))
+  {
+    DBUG_PRINT("info",
+               ("err.code %s (int) error_conflict_fn_violation, "
+                "err.classification %s",
+                err.code == (int) error_conflict_fn_violation ? "==" : "!=",
+                err.classification
+                == NdbError::ConstraintViolation
+                ? "== NdbError::ConstraintViolation"
+                : (err.classification == NdbError::NoDataFound
+                   ? "== NdbError::NoDataFound" : "!=")));
+
+    enum_conflict_cause conflict_cause;
+
+    if (err.code == (int) error_conflict_fn_violation)
+    {
+      conflict_cause= ROW_IN_CONFLICT;
+    }
+    else if (err.classification == NdbError::ConstraintViolation)
+    {
+      conflict_cause= ROW_ALREADY_EXISTS;
+    }
+    else
+    {
+      assert(err.classification == NdbError::NoDataFound);
+      conflict_cause= ROW_DOES_NOT_EXIST;
+    }
+
+    const void* buffer=op->getCustomData();
+    assert(buffer);
+    Ndb_exceptions_data ex_data;
+    memcpy(&ex_data, buffer, sizeof(ex_data));
+    NDB_SHARE *share= ex_data.share;
+    const NdbRecord* key_rec= ex_data.key_rec;
+    const uchar* row= ex_data.row;
+    enum_conflicting_op_type op_type = ex_data.op_type;
+    DBUG_ASSERT(share != NULL && row != NULL);
+
+    NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
+    if (cfn_share)
+    {
+      enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
+      bool haveExTable = cfn_share->m_ex_tab != NULL;
+
+      g_ndb_slave_state.current_violation_count[cft]++;
+
+      {
+        NdbError handle_error;
+        if (handle_row_conflict(cfn_share,
+                                key_rec,
+                                row,
+                                op_type,
+                                conflict_cause,
+                                err,
+                                trans,
+                                handle_error))
+        {
+          /* Error with handling of row conflict */
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "Row conflict handling "
+                      "on table %s hit Ndb error %d '%s'",
+                      share->table_name,
+                      handle_error.code,
+                      handle_error.message);
+
+          if (handle_error.status == NdbError::TemporaryError)
+          {
+            /* Slave will roll back and retry entire transaction. */
+            ERR_RETURN(handle_error);
+          }
+          else
+          {
+            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                                ER_EXCEPTIONS_WRITE_ERROR,
+                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+            /* Slave will stop replication. */
+            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+          }
+        }
+      }
+
+
+      if (haveExTable)
+      {
+        NdbError ex_err;
+        if (write_conflict_row(share, trans, row, ex_err))
+        {
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
+                      cfn_share->m_ex_tab->getName(),
+                      ex_err.code, ex_err.message);
+
+          NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
+
+          if (ex_err.classification == NdbError::SchemaError)
+          {
+            dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
+            cfn_share->m_ex_tab= NULL;
+          }
+          else if (ex_err.status == NdbError::TemporaryError)
+          {
+            /* Slave will roll back and retry entire transaction. */
+            ERR_RETURN(ex_err);
+          }
+          else
+          {
+            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                                ER_EXCEPTIONS_WRITE_ERROR,
+                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+            /* Slave will stop replication. */
+            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+          }
+        }
+      } // if (haveExTable)
+
+      DBUG_RETURN(0);
+    }
+    else
+    {
+      DBUG_PRINT("info", ("missing cfn_share"));
+      DBUG_RETURN(0); // TODO : Correct?
+    }
+  }
+  else
+  {
+    /* Non conflict related error */
+    DBUG_PRINT("info", ("err.code == %u", err.code));
+    DBUG_RETURN(err.code);
+  }
+
+  DBUG_RETURN(0); // Reachable?
+}
+#endif /* HAVE_NDB_BINLOG */
+
+
 int ha_ndbcluster::write_row(uchar *record)
 {
   DBUG_ENTER("ha_ndbcluster::write_row");
@@ -4124,14 +4350,16 @@ int ha_ndbcluster::ndb_write_row(uchar *
   MY_BITMAP tmpBitmap;
   MY_BITMAP *user_cols_written_bitmap;
 #ifdef HAVE_NDB_BINLOG
-  uchar* ex_data_buffer= NULL;
+  bool haveConflictFunction =
+    (thd->slave_thread &&
+     m_share->m_cfn_share &&
+     m_share->m_cfn_share->m_conflict_fn);
 #endif
   
   if (m_use_write
 #ifdef HAVE_NDB_BINLOG
-      && !(thd->slave_thread &&
-           m_share->m_cfn_share &&
-           m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF)
+      /* Conflict detection must use normal Insert */
+      && !haveConflictFunction
 #endif
       )
   {
@@ -4163,52 +4391,7 @@ int ha_ndbcluster::ndb_write_row(uchar *
       user_cols_written_bitmap= NULL;
       mask= NULL;
     }
-
-#if 0 /* NOT YET, interpeted function not supported for write */
-#ifdef HAVE_NDB_BINLOG
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
-    NdbInterpretedCode code(m_table, buffer,
-                            sizeof(buffer)/sizeof(buffer[0]));
-    if (useWriteSet)
-    {
-      /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share ? m_share->m_cfn_share->m_resolve_cft : CFT_NDB_UNDEF;
-      if (cft != CFT_NDB_UNDEF)
-      {
-        if (!write_row_conflict_fn(cft, record, &code))
-        {
-          options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-          options.interpretedCode= &code;
-          thd_ndb->m_conflict_fn_usage_count++;
-        }
-        
-        Ndb_exceptions_data ex_data;
-        ex_data.share= m_share;
-        /*
-          We need to save the row data for possible conflict resolution after
-          execute().
-        */
-        ex_data.row= copy_row_to_buffer(thd_ndb, record);
-        ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-        if (ex_data.row == NULL || ex_data_buffer == NULL)
-        {
-          DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-        }
-        memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-        options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-        options.customData= (void*)ex_data_buffer;
-        if (options.optionsPresent != 0)
-          poptions=&options;
-      }
-    }
-#endif  /* HAVE_NDB_BINLOG */
-#endif  /* NOT YET */
-
+    /* TODO : Add conflict detection etc when interpreted write supported */
     op= trans->writeTuple(key_rec, (const char *)key_row, m_ndb_record,
                           (char *)record, mask,
                           poptions, sizeof(NdbOperation::OperationOptions));
@@ -4216,27 +4399,16 @@ int ha_ndbcluster::ndb_write_row(uchar *
   else
   {
 #ifdef HAVE_NDB_BINLOG
-    if (m_use_write)
+    if (haveConflictFunction)
     {
-      thd_ndb->m_conflict_fn_usage_count++;
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, record);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
-      poptions=&options;
+      /* Conflict detection in slave thread */
+      if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
+                                                       key_rec,
+                                                       NULL,    /* old_data */
+                                                       record,  /* new_data */
+                                                       NULL,    /* code */
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif
     uchar *mask;
@@ -4386,248 +4558,20 @@ int ha_ndbcluster::primary_key_cmp(const
 
 #ifdef HAVE_NDB_BINLOG
 
-/**
-  CFT_NDB_NEW
-
-  To perform conflict resolution, an interpreted program is used to read
-  the timestamp stored locally and compare to what is going to be applied.
-  If timestamp is lower, an error for this operation (9999) will be raised,
-  and new row will not be applied. The error codes for the operations will
-  be checked on return.  For this to work is is vital that the operation
-  is run with ignore error option.
-*/
-
-int
-ha_ndbcluster::row_conflict_fn_max(const uchar *new_data,
-                                   NdbInterpretedCode *code)
-{
-  uint32 resolve_column= m_share->m_cfn_share->m_resolve_column;
-  uint32 resolve_size= m_share->m_cfn_share->m_resolve_size;
-
-  DBUG_PRINT("info", ("interpreted update pre equal on column %u",
-                      resolve_column));
-
-  DBUG_ASSERT(resolve_size == 4 || resolve_size == 8);
-
-  if (!bitmap_is_set(table->write_set, resolve_column))
-  {
-    sql_print_information("NDB Slave: missing data for NDB_MAX");
-    return 1;
-  }
-
-  const uint label_0= 0;
-  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
-  int r;
-  Field *field= table->field[resolve_column];
-  DBUG_PRINT("info", ("interpreted update post equal"));
-  /*
-   * read new value from record
-   */
-  union {
-    uint32 new_value_32;
-    uint64 new_value_64;
-  };
-  {
-    const uchar *field_ptr= field->ptr + (new_data - table->record[0]);
-    if (resolve_size == 4)
-    {
-      memcpy(&new_value_32, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("new_value_32: %u", new_value_32));
-    }
-    else
-    {
-      memcpy(&new_value_64, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("new_value_64: %llu",
-                          (unsigned long long) new_value_64));
-    }
-  }
-  /*
-   * Load registers RegNewValue and RegCurrentValue
-   */
-  if (resolve_size == 4)
-    r= code->load_const_u32(RegNewValue, new_value_32);
-  else
-    r= code->load_const_u64(RegNewValue, new_value_64);
-  DBUG_ASSERT(r == 0);
-  r= code->read_attr(RegCurrentValue, resolve_column);
-  DBUG_ASSERT(r == 0);
-  /*
-   * if RegNewValue > RegCurrentValue goto label_0
-   * else raise error for this row
-   */
-  r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_nok(error_conflict_fn_max_violation);
-  DBUG_ASSERT(r == 0);
-  r= code->def_label(label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_ok();
-  DBUG_ASSERT(r == 0);
-  r= code->finalise();
-  DBUG_ASSERT(r == 0);
-  return r;
-}
-
-/**
-  CFT_NDB_OLD
-
-  To perform conflict detection, an interpreted program is used to read
-  the timestamp stored locally and compare to what was on the master.
-  If timestamp is not equal, an error for this operation (9998) will be raised,
-  and new row will not be applied. The error codes for the operations will
-  be checked on return.  For this to work is is vital that the operation
-  is run with ignore error option.
-
-  As an independent feature, phase 2 also saves the
-  conflicts into the table's exceptions table.
-*/
-
 int
-ha_ndbcluster::row_conflict_fn_old(const uchar *old_data,
-                                   NdbInterpretedCode *code)
+handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    const NdbRecord* key_rec,
+                    const uchar* pk_row,
+                    enum_conflicting_op_type op_type,
+                    enum_conflict_cause conflict_cause,
+                    const NdbError& conflict_error,
+                    NdbTransaction* conflict_trans,
+                    NdbError& err)
 {
-  uint32 resolve_column= m_share->m_cfn_share->m_resolve_column;
-  uint32 resolve_size= m_share->m_cfn_share->m_resolve_size;
-
-  DBUG_PRINT("info", ("interpreted update pre equal on column %u",
-                      resolve_column));
-
-  DBUG_ASSERT(resolve_size == 4 || resolve_size == 8);
-
-  if (!bitmap_is_set(table->write_set, resolve_column))
-  {
-    sql_print_information("NDB Slave: missing data for NDB_OLD");
-    return -1;
-  }
-
-  const uint label_0= 0;
-  const Uint32 RegOldValue= 1, RegCurrentValue= 2;
-  int r;
-  Field *field= table->field[resolve_column];
-  DBUG_PRINT("info", ("interpreted update post equal"));
-  /*
-   * read old value from record
-   */
-  union {
-    uint32 old_value_32;
-    uint64 old_value_64;
-  };
-  {
-    const uchar *field_ptr= field->ptr + (old_data - table->record[0]);
-    if (resolve_size == 4)
-    {
-      memcpy(&old_value_32, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("old_value_32: %u", old_value_32));
-    }
-    else
-    {
-      memcpy(&old_value_64, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("old_value_64: %llu",
-                          (unsigned long long) old_value_64));
-    }
-  }
-  /*
-   * Load registers RegOldValue and RegCurrentValue
-   */
-  if (resolve_size == 4)
-    r= code->load_const_u32(RegOldValue, old_value_32);
-  else
-    r= code->load_const_u64(RegOldValue, old_value_64);
-  DBUG_ASSERT(r == 0);
-  r= code->read_attr(RegCurrentValue, resolve_column);
-  DBUG_ASSERT(r == 0);
-  /*
-   * if RegOldValue == RegCurrentValue goto label_0
-   * else raise error for this row
-   */
-  r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_nok(error_conflict_fn_old_violation);
-  DBUG_ASSERT(r == 0);
-  r= code->def_label(label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_ok();
-  DBUG_ASSERT(r == 0);
-  r= code->finalise();
-  DBUG_ASSERT(r == 0);
-  return r;
-}
-
-int
-ha_ndbcluster::update_row_conflict_fn(enum_conflict_fn_type cft,
-                                      const uchar *old_data,
-                                      uchar *new_data,
-                                      NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-  case CFT_NDB_MAX_DEL_WIN:
-    return row_conflict_fn_max(new_data, code);
-  case CFT_NDB_OLD:
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
+  DBUG_ENTER("handle_row_conflict");
 
-#if 0 /* NOT YET, interpeted function not supported for write */
-int
-ha_ndbcluster::write_row_conflict_fn(enum_conflict_fn_type cft,
-                                     uchar *data,
-                                     NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-  case CFT_NDB_MAX_DEL_WIN:
-    return row_conflict_fn_max(data, code);
-  case CFT_NDB_OLD:
-    /*
-      No conflict function here, instead detect if tuple
-      already exists
-     */
-    return 1;
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
-#endif
-
-int
-ha_ndbcluster::delete_row_conflict_fn(enum_conflict_fn_type cft,
-                                      const uchar *old_data,
-                                      NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-    /*
-      As we do not have a timestamp for the actual delete,
-      the best we can do is to detect a possible conflict
-      on the old data.
-    */
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_MAX_DEL_WIN:
-  {
-    /**
-     * let delete always win
-     */
-    int r = code->interpret_exit_ok();
-    DBUG_ASSERT(r == 0);
-    r = code->finalise();
-    DBUG_ASSERT(r == 0);
-    return r;
-  }
-  case CFT_NDB_OLD:
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
+  DBUG_RETURN(0);
+};
 #endif /* HAVE_NDB_BINLOG */
 
 /**
@@ -4656,26 +4600,79 @@ int ha_ndbcluster::bulk_update_row(const
 
 int ha_ndbcluster::exec_bulk_update(uint *dup_key_found)
 {
+  NdbTransaction* trans= m_thd_ndb->trans;
   DBUG_ENTER("ha_ndbcluster::exec_bulk_update");
   *dup_key_found= 0;
-  if (m_thd_ndb->m_unsent_bytes &&
-      !thd_allow_batch(table->in_use) &&
-      (!m_thd_ndb->m_handler ||
-       m_blobs_pending))
+
+  // m_handler must be NULL or point to _this_ handler instance
+  assert(m_thd_ndb->m_handler == NULL || m_thd_ndb->m_handler == this);
+
+  if (m_thd_ndb->m_handler &&
+      m_read_before_write_removal_possible)
   {
+    /*
+      This is an autocommit involving only one table and rbwr is on
+
+      Commit the autocommit transaction early(before the usual place
+      in ndbcluster_commit) in order to:
+      1) save one round trip, "no-commit+commit" converted to "commit"
+      2) return the correct number of updated and affected rows
+         to the update loop(which will ask handler in rbwr mode)
+    */
+    DBUG_PRINT("info", ("committing auto-commit+rbwr early"));
     uint ignore_count= 0;
-    if (execute_no_commit(m_thd_ndb, m_thd_ndb->trans,
-                          m_ignore_no_key || m_read_before_write_removal_used,
-                          &ignore_count) != 0)
+    const int ignore_error= 1;
+    if (execute_commit(table->in_use, m_thd_ndb, trans,
+                       m_thd_ndb->m_force_send, ignore_error,
+                       &ignore_count) != 0)
     {
       no_uncommitted_rows_execute_failure();
-      DBUG_RETURN(ndb_err(m_thd_ndb->trans));
+      DBUG_RETURN(ndb_err(trans));
     }
+    DBUG_PRINT("info", ("ignore_count: %u", ignore_count));
     assert(m_rows_changed >= ignore_count);
     assert(m_rows_updated >= ignore_count);
     m_rows_changed-= ignore_count;
     m_rows_updated-= ignore_count;
+    DBUG_RETURN(0);
+  }
+
+  if (m_thd_ndb->m_unsent_bytes == 0)
+  {
+    DBUG_PRINT("exit", ("skip execute - no unsent bytes"));
+    DBUG_RETURN(0);
+  }
+
+  if (thd_allow_batch(table->in_use))
+  {
+    /*
+      Turned on by @@transaction_allow_batching=ON
+      or implicitly by slave exec thread
+    */
+    DBUG_PRINT("exit", ("skip execute - transaction_allow_batching is ON"));
+    DBUG_RETURN(0);
+  }
+
+  if (m_thd_ndb->m_handler &&
+      !m_blobs_pending)
+  {
+    // Execute at commit time(in 'ndbcluster_commit') to save a round trip
+    DBUG_PRINT("exit", ("skip execute - simple autocommit"));
+    DBUG_RETURN(0);
+  }
+
+  uint ignore_count= 0;
+  if (execute_no_commit(m_thd_ndb, trans,
+                        m_ignore_no_key || m_read_before_write_removal_used,
+                        &ignore_count) != 0)
+  {
+    no_uncommitted_rows_execute_failure();
+    DBUG_RETURN(ndb_err(trans));
   }
+  assert(m_rows_changed >= ignore_count);
+  assert(m_rows_updated >= ignore_count);
+  m_rows_changed-= ignore_count;
+  m_rows_updated-= ignore_count;
   DBUG_RETURN(0);
 }
 
@@ -4904,42 +4901,21 @@ int ha_ndbcluster::ndb_update_row(const
 				 m_read_before_write_removal_used);
 
 #ifdef HAVE_NDB_BINLOG
-    uchar* ex_data_buffer= NULL;
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
+    Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
+
     if (thd->slave_thread && m_share->m_cfn_share &&
-        (m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF))
+        m_share->m_cfn_share->m_conflict_fn)
     {
-      /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share->m_resolve_cft;
-      if (!update_row_conflict_fn(cft, old_data, new_data, &code))
-      {
-        options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-        options.interpretedCode= &code;
-        thd_ndb->m_conflict_fn_usage_count++;
-      }
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, new_data);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
+       /* Conflict resolution in slave thread. */
+      if (unlikely((error = prepare_conflict_detection(UPDATE_ROW,
+                                                       key_rec,
+                                                       old_data,
+                                                       new_data,
+                                                       &code,
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent !=0)
@@ -5011,25 +4987,76 @@ bool ha_ndbcluster::start_bulk_delete()
 
 int ha_ndbcluster::end_bulk_delete()
 {
+  NdbTransaction* trans= m_thd_ndb->trans;
   DBUG_ENTER("end_bulk_delete");
   assert(m_is_bulk_delete); // Don't allow end() without start()
-  if (m_thd_ndb->m_unsent_bytes &&
-      !thd_allow_batch(table->in_use) &&
-      !m_thd_ndb->m_handler)
+  m_is_bulk_delete = false;
+
+  // m_handler must be NULL or point to _this_ handler instance
+  assert(m_thd_ndb->m_handler == NULL || m_thd_ndb->m_handler == this);
+
+  if (m_thd_ndb->m_handler &&
+      m_read_before_write_removal_possible)
   {
+    /*
+      This is an autocommit involving only one table and rbwr is on
+
+      Commit the autocommit transaction early(before the usual place
+      in ndbcluster_commit) in order to:
+      1) save one round trip, "no-commit+commit" converted to "commit"
+      2) return the correct number of updated and affected rows
+         to the delete loop(which will ask handler in rbwr mode)
+    */
+    DBUG_PRINT("info", ("committing auto-commit+rbwr early"));
     uint ignore_count= 0;
-    if (execute_no_commit(m_thd_ndb, m_thd_ndb->trans,
-                          m_ignore_no_key || m_read_before_write_removal_used,
-                          &ignore_count) != 0)
+    const int ignore_error= 1;
+    if (execute_commit(table->in_use, m_thd_ndb, trans,
+                       m_thd_ndb->m_force_send, ignore_error,
+                       &ignore_count) != 0)
     {
       no_uncommitted_rows_execute_failure();
-      m_is_bulk_delete = false;
-      DBUG_RETURN(ndb_err(m_thd_ndb->trans));
+      DBUG_RETURN(ndb_err(trans));
     }
+    DBUG_PRINT("info", ("ignore_count: %u", ignore_count));
     assert(m_rows_deleted >= ignore_count);
     m_rows_deleted-= ignore_count;
+    DBUG_RETURN(0);
   }
-  m_is_bulk_delete = false;
+
+  if (m_thd_ndb->m_unsent_bytes == 0)
+  {
+    DBUG_PRINT("exit", ("skip execute - no unsent bytes"));
+    DBUG_RETURN(0);
+  }
+
+  if (thd_allow_batch(table->in_use))
+  {
+    /*
+      Turned on by @@transaction_allow_batching=ON
+      or implicitly by slave exec thread
+    */
+    DBUG_PRINT("exit", ("skip execute - transaction_allow_batching is ON"));
+    DBUG_RETURN(0);
+  }
+
+  if (m_thd_ndb->m_handler)
+  {
+    // Execute at commit time(in 'ndbcluster_commit') to save a round trip
+    DBUG_PRINT("exit", ("skip execute - simple autocommit"));
+    DBUG_RETURN(0);
+  }
+
+  uint ignore_count= 0;
+  if (execute_no_commit(m_thd_ndb, trans,
+                        m_ignore_no_key || m_read_before_write_removal_used,
+                        &ignore_count) != 0)
+  {
+    no_uncommitted_rows_execute_failure();
+    DBUG_RETURN(ndb_err(trans));
+  }
+
+  assert(m_rows_deleted >= ignore_count);
+  m_rows_deleted-= ignore_count;
   DBUG_RETURN(0);
 }
 
@@ -5149,42 +5176,20 @@ int ha_ndbcluster::ndb_delete_row(const
 				 m_read_before_write_removal_used);
 
 #ifdef HAVE_NDB_BINLOG
-    uchar* ex_data_buffer= NULL;
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
+    Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
     if (thd->slave_thread && m_share->m_cfn_share &&
-        (m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF))
+        m_share->m_cfn_share->m_conflict_fn)
     {
       /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share->m_resolve_cft;
-      if (!delete_row_conflict_fn(cft, record, &code))
-      {
-        options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-        options.interpretedCode= &code;
-        thd_ndb->m_conflict_fn_usage_count++;
-      }
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, record);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
+      if (unlikely((error = prepare_conflict_detection(DELETE_ROW,
+                                                       key_rec,
+                                                       key_row, /* old_data */
+                                                       NULL,    /* new_data */
+                                                       &code,
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent != 0)
@@ -6329,7 +6334,7 @@ ha_ndbcluster::flush_bulk_insert(bool al
     THD *thd= table->in_use;
     thd->transaction.all.modified_non_trans_table=
       thd->transaction.stmt.modified_non_trans_table= TRUE;
-    if (execute_commit(m_thd_ndb, trans, m_thd_ndb->m_force_send,
+    if (execute_commit(thd, m_thd_ndb, trans, m_thd_ndb->m_force_send,
                        m_ignore_no_key) != 0)
     {
       no_uncommitted_rows_execute_failure();
@@ -7094,9 +7099,10 @@ int ndbcluster_commit(handlerton *hton,
 
   if (thd->slave_thread)
   {
-    if (!thd_ndb->m_conflict_fn_usage_count || !thd_ndb->m_unsent_bytes ||
+    if (!g_ndb_slave_state.current_conflict_defined_op_count ||
+        !thd_ndb->m_unsent_bytes ||
         !(res= execute_no_commit(thd_ndb, trans, TRUE)))
-      res= execute_commit(thd_ndb, trans, 1, TRUE);
+      res= execute_commit(thd, thd_ndb, trans, 1, TRUE);
 
     update_slave_api_stats(thd_ndb->ndb);
   }
@@ -7105,61 +7111,21 @@ int ndbcluster_commit(handlerton *hton,
     if (thd_ndb->m_handler &&
         thd_ndb->m_handler->m_read_before_write_removal_possible)
     {
-#ifndef NDB_WITHOUT_READ_BEFORE_WRITE_REMOVAL
-      /* Autocommit with read-before-write removal
-       * Some operations in this autocommitted statement have not
-       * yet been executed
-       * They will be executed here as part of commit, and the results
-       * (rowcount, message) sent back to the client will then be modified 
-       * according to how the execution went.
-       * This saves a single roundtrip in the autocommit case
-       */
-      uint ignore_count= 0;
-      res= execute_commit(thd_ndb, trans, THDVAR(thd, force_send),
-                          TRUE, &ignore_count);
-      if (!res && ignore_count)
-      {
-        DBUG_PRINT("info", ("AutoCommit + RBW removal, ignore_count=%u",
-                            ignore_count));
-        /* We have some rows to ignore, modify recorded results,
-         * regenerate result message as required.
-         */
-        thd->row_count_func-= ignore_count;
-
-        ha_rows affected= 0;
-        char buff[ STRING_BUFFER_USUAL_SIZE ];
-        const char* msg= NULL;
-        if (thd->lex->sql_command == SQLCOM_DELETE)
-        {
-          assert(thd_ndb->m_handler->m_rows_deleted >= ignore_count);
-          affected= (thd_ndb->m_handler->m_rows_deleted-= ignore_count);
-        }
-        else
-        {
-          DBUG_PRINT("info", ("Update : message was %s", 
-                              thd->main_da.message()));
-          assert(thd_ndb->m_handler->m_rows_updated >= ignore_count);
-          affected= (thd_ndb->m_handler->m_rows_updated-= ignore_count);
-          /* For update in this scenario, we set found and changed to be 
-           * the same as affected
-           * Regenerate the update message
-           */
-          sprintf(buff, ER(ER_UPDATE_INFO), (ulong)affected, (ulong)affected,
-                  (ulong) thd->cuted_fields);
-          msg= buff;
-          DBUG_PRINT("info", ("Update : message changed to %s",
-                              msg));
-        }
-
-        /* Modify execution result + optionally message */
-        thd->main_da.modify_affected_rows(affected, msg);
+      /*
+        This is an autocommit involving only one table and
+        rbwr is on, thus the transaction has already been
+        committed in exec_bulk_update() or end_bulk_delete()
+      */
+      DBUG_PRINT("info", ("autocommit+rbwr, transaction already comitted"));
+      if (trans->commitStatus() != NdbTransaction::Committed)
+      {
+        sql_print_error("found uncomitted autocommit+rbwr transaction, "
+                        "commit status: %d", trans->commitStatus());
+        abort();
       }
-#else
-      abort(); // Should never come here without rbwr support
-#endif
     }
     else
-      res= execute_commit(thd_ndb, trans, THDVAR(thd, force_send), FALSE);
+      res= execute_commit(thd, thd_ndb, trans, THDVAR(thd, force_send), FALSE);
   }
 
   if (res != 0)
@@ -7233,9 +7199,8 @@ static int ndbcluster_rollback(handlerto
     DBUG_RETURN(0);
   }
   thd_ndb->save_point_count= 0;
-  thd_ndb->m_max_violation_count= 0;
-  thd_ndb->m_old_violation_count= 0;
-  thd_ndb->m_conflict_fn_usage_count= 0;
+  if (thd->slave_thread)
+    g_ndb_slave_state.atTransactionAbort();
   thd_ndb->m_unsent_bytes= 0;
   thd_ndb->m_execute_count++;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
@@ -8316,6 +8281,34 @@ int ha_ndbcluster::create(const char *na
                         create_info->comment.length);
   const NDB_Modifier * mod_nologging = table_modifiers.get("NOLOGGING");
 
+#ifdef HAVE_NDB_BINLOG
+  /* Read ndb_replication entry for this table, if any */
+  Uint32 binlog_flags;
+  const st_conflict_fn_def* conflict_fn= NULL;
+  st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
+  Uint32 num_args = MAX_CONFLICT_ARGS;
+
+  int rep_read_rc= ndbcluster_get_binlog_replication_info(thd,
+                                                          ndb,
+                                                          m_dbname,
+                                                          m_tabname,
+                                                          ::server_id,
+                                                          form,
+                                                          &binlog_flags,
+                                                          &conflict_fn,
+                                                          args,
+                                                          &num_args);
+  if (rep_read_rc != 0)
+  {
+    DBUG_RETURN(rep_read_rc);
+  }
+
+  /* Reset database name */
+  ndb->setDatabaseName(m_dbname);
+
+  /* Use ndb_replication information as required */
+#endif
+
   if ((dict->beginSchemaTrans() == -1))
   {
     DBUG_PRINT("info", ("Failed to start schema transaction"));
@@ -8445,6 +8438,20 @@ int ha_ndbcluster::create(const char *na
     goto abort_return;
   }
 
+  // Save the table level storage media setting
+  switch(create_info->storage_media)
+  {
+    case HA_SM_DISK:
+      tab.setStorageType(NdbDictionary::Column::StorageTypeDisk);
+      break;
+    case HA_SM_DEFAULT:
+      tab.setStorageType(NdbDictionary::Column::StorageTypeDefault);
+      break;
+    case HA_SM_MEMORY:
+      tab.setStorageType(NdbDictionary::Column::StorageTypeMemory);
+      break;
+  }
+
   DBUG_PRINT("info", ("Table %s is %s stored with tablespace %s",
                       m_tabname,
                       (use_disk) ? "disk" : "memory",
@@ -8730,8 +8737,18 @@ cleanup_failed:
     {
 #ifdef HAVE_NDB_BINLOG
       if (share)
-        ndbcluster_read_binlog_replication(thd, ndb, share, m_table,
-                                           ::server_id, form, TRUE);
+      {
+        /* Set the Binlogging information we retrieved above */
+        ndbcluster_apply_binlog_replication_info(thd,
+                                                 share,
+                                                 m_table,
+                                                 form,
+                                                 conflict_fn,
+                                                 args,
+                                                 num_args,
+                                                 TRUE, /* Do set binlog flags */
+                                                 binlog_flags);
+      }
 #endif
       String event_name(INJECTOR_EVENT_LEN);
       ndb_rep_event_name(&event_name, m_dbname, m_tabname,
@@ -10585,10 +10602,11 @@ int ndbcluster_find_all_files(THD *thd)
   DBUG_RETURN(-(skipped + unhandled));
 }
 
-int ndbcluster_find_files(handlerton *hton, THD *thd,
-                          const char *db,
-                          const char *path,
-                          const char *wild, bool dir, List<LEX_STRING> *files)
+
+static int
+ndbcluster_find_files(handlerton *hton, THD *thd,
+                      const char *db, const char *path,
+                      const char *wild, bool dir, List<LEX_STRING> *files)
 {
   DBUG_ENTER("ndbcluster_find_files");
   DBUG_PRINT("enter", ("db: %s", db));
@@ -10962,7 +10980,7 @@ static int ndbcluster_init(void *p)
     ndbcluster_binlog_init_handlerton();
     h->flags=            HTON_CAN_RECREATE | HTON_TEMPORARY_NOT_SUPPORTED;
     h->discover=         ndbcluster_discover;
-    h->find_files= ndbcluster_find_files;
+    h->find_files=       ndbcluster_find_files;
     h->table_exists_in_engine= ndbcluster_table_exists_in_engine;
   }
 

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-06-15 10:37:56 +0000
+++ b/sql/ha_ndbcluster.h	2011-06-16 18:16:01 +0000
@@ -120,30 +120,98 @@ typedef enum {
   NSS_ALTERED 
 } NDB_SHARE_STATE;
 
-#ifdef HAVE_NDB_BINLOG
 enum enum_conflict_fn_type
 {
   CFT_NDB_UNDEF = 0
   ,CFT_NDB_MAX
   ,CFT_NDB_OLD
   ,CFT_NDB_MAX_DEL_WIN
+  ,CFT_NUMBER_OF_CFTS /* End marker */
+};
+
+#ifdef HAVE_NDB_BINLOG
+static const Uint32 MAX_CONFLICT_ARGS= 8;
+
+enum enum_conflict_fn_arg_type
+{
+  CFAT_END
+  ,CFAT_COLUMN_NAME
+};
+
+struct st_conflict_fn_arg
+{
+  enum_conflict_fn_arg_type type;
+  const char *ptr;
+  uint32 len;
+  uint32 fieldno; // CFAT_COLUMN_NAME
+};
+
+struct st_conflict_fn_arg_def
+{
+  enum enum_conflict_fn_arg_type arg_type;
+  bool optional;
+};
+
+/* What type of operation was issued */
+enum enum_conflicting_op_type
+{                /* NdbApi          */
+  WRITE_ROW,     /* insert (!write) */
+  UPDATE_ROW,    /* update          */
+  DELETE_ROW     /* delete          */
+};
+
+/*
+  prepare_detect_func
+
+  Type of function used to prepare for conflict detection on
+  an NdbApi operation
+*/
+typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
+                                     enum_conflicting_op_type op_type,
+                                     const uchar* old_data,
+                                     const uchar* new_data,
+                                     const MY_BITMAP* write_set,
+                                     NdbInterpretedCode* code);
+
+struct st_conflict_fn_def
+{
+  const char *name;
+  enum_conflict_fn_type type;
+  const st_conflict_fn_arg_def* arg_defs;
+  prepare_detect_func prep_func;
+};
+
+/* What sort of conflict was found */
+enum enum_conflict_cause
+{
+  ROW_ALREADY_EXISTS,
+  ROW_DOES_NOT_EXIST,
+  ROW_IN_CONFLICT
 };
 
 /* NdbOperation custom data which points out handler and record. */
 struct Ndb_exceptions_data {
   struct st_ndbcluster_share *share;
+  const NdbRecord* key_rec;
   const uchar* row;
+  enum_conflicting_op_type op_type;
+};
+
+enum enum_conflict_fn_flags
+{
+  CFF_NONE = 0
 };
 
 typedef struct st_ndbcluster_conflict_fn_share {
-  enum_conflict_fn_type m_resolve_cft;
+  const st_conflict_fn_def* m_conflict_fn;
 
   /* info about original table */
   uint8 m_pk_cols;
   uint8 m_resolve_column;
   uint8 m_resolve_size;
-  uint8 unused;
+  uint8 m_flags;
   uint16 m_offset[16];
+  uint16 m_resolve_offset;
 
   const NdbDictionary::Table *m_ex_tab;
   uint32 m_count;
@@ -269,6 +337,26 @@ inline my_bool get_binlog_use_update(NDB
 { return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
 
 /*
+  State associated with the Slave thread
+  (From the Ndb handler's point of view)
+*/
+struct st_ndb_slave_state
+{
+  /* Counter values for current slave transaction */
+  Uint32 current_conflict_defined_op_count;
+  Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
+
+  /* Cumulative counter values */
+  Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
+
+  /* Methods */
+  void atTransactionCommit();
+  void atTransactionAbort();
+
+  st_ndb_slave_state();
+};
+
+/*
   Place holder for ha_ndbcluster thread specific data
 */
 
@@ -342,9 +430,6 @@ class Thd_ndb
   uint m_batch_size;
 
   uint m_execute_count;
-  uint m_max_violation_count;
-  uint m_old_violation_count;
-  uint m_conflict_fn_usage_count;
 
   uint m_scan_count;
   uint m_pruned_scan_count;
@@ -596,20 +681,12 @@ static void set_tabname(const char *path
 
 private:
 #ifdef HAVE_NDB_BINLOG
-  int delete_row_conflict_fn(enum_conflict_fn_type cft,
-                             const uchar *old_data,
-                             NdbInterpretedCode *);
-  int write_row_conflict_fn(enum_conflict_fn_type cft,
-                            uchar *data,
-                            NdbInterpretedCode *);
-  int update_row_conflict_fn(enum_conflict_fn_type cft,
-                             const uchar *old_data,
-                             uchar *new_data,
-                             NdbInterpretedCode *);
-  int row_conflict_fn_max(const uchar *new_data,
-                          NdbInterpretedCode *);
-  int row_conflict_fn_old(const uchar *old_data,
-                          NdbInterpretedCode *);
+  int prepare_conflict_detection(enum_conflicting_op_type op_type,
+                                 const NdbRecord* key_rec,
+                                 const uchar* old_data,
+                                 const uchar* new_data,
+                                 NdbInterpretedCode* code,
+                                 NdbOperation::OperationOptions* options);
 #endif
   void setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
                                     const uchar **key_row,
@@ -913,8 +990,6 @@ private:
 
 int ndbcluster_discover(THD* thd, const char* dbname, const char* name,
                         const void** frmblob, uint* frmlen);
-int ndbcluster_find_files(THD *thd,const char *db,const char *path,
-                          const char *wild, bool dir, List<LEX_STRING> *files);
 int ndbcluster_table_exists_in_engine(THD* thd,
                                       const char *db, const char *name);
 void ndbcluster_print_error(int error, const NdbOperation *error_op);

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-06-15 10:37:56 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-06-16 18:16:01 +0000
@@ -3759,18 +3759,13 @@ inline void slave_reset_conflict_fn(NDB_
   }
 }
 
-static int
-slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
-                     const NDBTAB *ndbtab, uint field_index,
-                     enum_conflict_fn_type type, TABLE *table)
+static uint
+slave_check_resolve_col_type(const NDBTAB *ndbtab,
+                             uint field_index)
 {
-  DBUG_ENTER("slave_set_resolve_fn");
-
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  Ndb *ndb= thd_ndb->ndb;
-  NDBDICT *dict= ndb->getDictionary();
+  DBUG_ENTER("slave_check_resolve_col_type");
   const NDBCOL *c= ndbtab->getColumn(field_index);
-  uint sz;
+  uint sz= 0;
   switch (c->getType())
   {
   case  NDBCOL::Unsigned:
@@ -3786,11 +3781,24 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   default:
     DBUG_PRINT("info", ("resolve column %u has wrong type",
                         field_index));
-    slave_reset_conflict_fn(share);
-    DBUG_RETURN(-1);
     break;
   }
+  DBUG_RETURN(sz);
+}
 
+static int
+slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
+                     const NDBTAB *ndbtab, uint field_index,
+                     uint resolve_col_sz,
+                     const st_conflict_fn_def* conflict_fn,
+                     TABLE *table,
+                     uint8 flags)
+{
+  DBUG_ENTER("slave_set_resolve_fn");
+
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  Ndb *ndb= thd_ndb->ndb;
+  NDBDICT *dict= ndb->getDictionary();
   NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
   if (cfn_share == NULL)
   {
@@ -3798,9 +3806,14 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
       alloc_root(&share->mem_root, sizeof(NDB_CONFLICT_FN_SHARE));
     slave_reset_conflict_fn(share);
   }
-  cfn_share->m_resolve_size= sz;
+  cfn_share->m_conflict_fn= conflict_fn;
+
+  /* Calculate resolve col stuff (if relevant) */
+  cfn_share->m_resolve_size= resolve_col_sz;
   cfn_share->m_resolve_column= field_index;
-  cfn_share->m_resolve_cft= type;
+  cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
+                                        table->record[0]);
+  cfn_share->m_flags = flags;
 
   {
     /* get exceptions table */
@@ -3859,7 +3872,10 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
           cfn_share->m_pk_cols= nkey;
           ndbtab_g.release();
           if (opt_ndb_extra_logging)
-            sql_print_information("NDB Slave: log exceptions to %s",
+            sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
+                                  table->s->db.str,
+                                  table->s->table_name.str,
+                                  table->s->db.str,
                                   ex_tab_name);
         }
         else
@@ -3876,76 +3892,307 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   DBUG_RETURN(0);
 }
 
-enum enum_conflict_fn_arg_type
+/**
+  CFT_NDB_OLD
+
+  To perform conflict detection, an interpreted program is used to read
+  the timestamp stored locally and compare to what was on the master.
+  If timestamp is not equal, an error for this operation (9998) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  As an independent feature, phase 2 also saves the
+  conflicts into the table's exceptions table.
+*/
+int
+row_conflict_fn_old(st_ndbcluster_conflict_fn_share* cfn_share,
+                    enum_conflicting_op_type op_type,
+                    const uchar* old_data,
+                    const uchar* new_data,
+                    const MY_BITMAP* write_set,
+                    NdbInterpretedCode* code)
 {
-  CFAT_END
-  ,CFAT_COLUMN_NAME
-};
-struct st_conflict_fn_arg
+  DBUG_ENTER("row_conflict_fn_old");
+  uint32 resolve_column= cfn_share->m_resolve_column;
+  uint32 resolve_size= cfn_share->m_resolve_size;
+  const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
+
+  assert((resolve_size == 4) || (resolve_size == 8));
+
+  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
+  {
+    sql_print_information("NDB Slave: missing data for %s",
+                          cfn_share->m_conflict_fn->name);
+    DBUG_RETURN(1);
+  }
+
+  const uint label_0= 0;
+  const Uint32 RegOldValue= 1, RegCurrentValue= 2;
+  int r;
+
+  DBUG_PRINT("info",
+             ("Adding interpreted filter, existing value must eq event old value"));
+  /*
+   * read old value from record
+   */
+  union {
+    uint32 old_value_32;
+    uint64 old_value_64;
+  };
+  {
+    if (resolve_size == 4)
+    {
+      memcpy(&old_value_32, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  old_value_32: %u", old_value_32));
+    }
+    else
+    {
+      memcpy(&old_value_64, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  old_value_64: %llu",
+                          (unsigned long long) old_value_64));
+    }
+  }
+
+  /*
+   * Load registers RegOldValue and RegCurrentValue
+   */
+  if (resolve_size == 4)
+    r= code->load_const_u32(RegOldValue, old_value_32);
+  else
+    r= code->load_const_u64(RegOldValue, old_value_64);
+  DBUG_ASSERT(r == 0);
+  r= code->read_attr(RegCurrentValue, resolve_column);
+  DBUG_ASSERT(r == 0);
+  /*
+   * if RegOldValue == RegCurrentValue goto label_0
+   * else raise error for this row
+   */
+  r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_nok(error_conflict_fn_violation);
+  DBUG_ASSERT(r == 0);
+  r= code->def_label(label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_ok();
+  DBUG_ASSERT(r == 0);
+  r= code->finalise();
+  DBUG_ASSERT(r == 0);
+  DBUG_RETURN(r);
+}
+
+int
+row_conflict_fn_max_update_only(st_ndbcluster_conflict_fn_share* cfn_share,
+                                enum_conflicting_op_type op_type,
+                                const uchar* old_data,
+                                const uchar* new_data,
+                                const MY_BITMAP* write_set,
+                                NdbInterpretedCode* code)
 {
-  enum_conflict_fn_arg_type type;
-  const char *ptr;
-  uint32 len;
-  uint32 fieldno; // CFAT_COLUMN_NAME
+  DBUG_ENTER("row_conflict_fn_max_update_only");
+  uint32 resolve_column= cfn_share->m_resolve_column;
+  uint32 resolve_size= cfn_share->m_resolve_size;
+  const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
+
+  assert((resolve_size == 4) || (resolve_size == 8));
+
+  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
+  {
+    sql_print_information("NDB Slave: missing data for %s",
+                          cfn_share->m_conflict_fn->name);
+    DBUG_RETURN(1);
+  }
+
+  const uint label_0= 0;
+  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
+  int r;
+
+  DBUG_PRINT("info",
+             ("Adding interpreted filter, existing value must be lt event new"));
+  /*
+   * read new value from record
+   */
+  union {
+    uint32 new_value_32;
+    uint64 new_value_64;
+  };
+  {
+    if (resolve_size == 4)
+    {
+      memcpy(&new_value_32, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  new_value_32: %u", new_value_32));
+    }
+    else
+    {
+      memcpy(&new_value_64, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  new_value_64: %llu",
+                          (unsigned long long) new_value_64));
+    }
+  }
+  /*
+   * Load registers RegNewValue and RegCurrentValue
+   */
+  if (resolve_size == 4)
+    r= code->load_const_u32(RegNewValue, new_value_32);
+  else
+    r= code->load_const_u64(RegNewValue, new_value_64);
+  DBUG_ASSERT(r == 0);
+  r= code->read_attr(RegCurrentValue, resolve_column);
+  DBUG_ASSERT(r == 0);
+  /*
+   * if RegNewValue > RegCurrentValue goto label_0
+   * else raise error for this row
+   */
+  r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_nok(error_conflict_fn_violation);
+  DBUG_ASSERT(r == 0);
+  r= code->def_label(label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_ok();
+  DBUG_ASSERT(r == 0);
+  r= code->finalise();
+  DBUG_ASSERT(r == 0);
+  DBUG_RETURN(r);
+}
+
+/**
+  CFT_NDB_MAX
+
+  To perform conflict resolution, an interpreted program is used to read
+  the timestamp stored locally and compare to what is going to be applied.
+  If timestamp is lower, an error for this operation (9999) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  Note that for delete, this algorithm reverts to the OLD algorithm.
+*/
+int
+row_conflict_fn_max(st_ndbcluster_conflict_fn_share* cfn_share,
+                    enum_conflicting_op_type op_type,
+                    const uchar* old_data,
+                    const uchar* new_data,
+                    const MY_BITMAP* write_set,
+                    NdbInterpretedCode* code)
+{
+  switch(op_type)
+  {
+  case WRITE_ROW:
+    abort();
+    return 1;
+  case UPDATE_ROW:
+    return row_conflict_fn_max_update_only(cfn_share,
+                                           op_type,
+                                           old_data,
+                                           new_data,
+                                           write_set,
+                                           code);
+  case DELETE_ROW:
+    /* Can't use max of new image, as there's no new image
+     * for DELETE
+     * Use OLD instead
+     */
+    return row_conflict_fn_old(cfn_share,
+                               op_type,
+                               old_data,
+                               new_data,
+                               write_set,
+                               code);
+  default:
+    abort();
+    return 1;
+  }
+}
+
+
+/**
+  CFT_NDB_MAX_DEL_WIN
+
+  To perform conflict resolution, an interpreted program is used to read
+  the timestamp stored locally and compare to what is going to be applied.
+  If timestamp is lower, an error for this operation (9999) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  In this variant, replicated DELETEs alway succeed - no filter is added
+  to them.
+*/
+
+int
+row_conflict_fn_max_del_win(st_ndbcluster_conflict_fn_share* cfn_share,
+                            enum_conflicting_op_type op_type,
+                            const uchar* old_data,
+                            const uchar* new_data,
+                            const MY_BITMAP* write_set,
+                            NdbInterpretedCode* code)
+{
+  switch(op_type)
+  {
+  case WRITE_ROW:
+    abort();
+    return 1;
+  case UPDATE_ROW:
+    return row_conflict_fn_max_update_only(cfn_share,
+                                           op_type,
+                                           old_data,
+                                           new_data,
+                                           write_set,
+                                           code);
+  case DELETE_ROW:
+    /* This variant always lets a received DELETE_ROW
+     * succeed.
+     */
+    return 1;
+  default:
+    abort();
+    return 1;
+  }
 };
-struct st_conflict_fn_def
+
+static const st_conflict_fn_arg_def resolve_col_args[]=
 {
-  const char *name;
-  enum_conflict_fn_type type;
-  enum enum_conflict_fn_arg_type arg_type;
+  /* Arg type              Optional */
+  { CFAT_COLUMN_NAME,      false },
+  { CFAT_END,              false }
 };
-static struct st_conflict_fn_def conflict_fns[]=
+
+static const st_conflict_fn_def conflict_fns[]=
 {
-   { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN, CFAT_COLUMN_NAME }
-  ,{ NULL,                 CFT_NDB_MAX_DEL_WIN, CFAT_END }
-  ,{ "NDB$MAX", CFT_NDB_MAX,   CFAT_COLUMN_NAME }
-  ,{ NULL,      CFT_NDB_MAX,   CFAT_END }
-  ,{ "NDB$OLD", CFT_NDB_OLD,   CFAT_COLUMN_NAME }
-  ,{ NULL,      CFT_NDB_OLD,   CFAT_END }
+  { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
+    &resolve_col_args[0], row_conflict_fn_max_del_win },
+  { "NDB$MAX",            CFT_NDB_MAX,
+    &resolve_col_args[0], row_conflict_fn_max         },
+  { "NDB$OLD",            CFT_NDB_OLD,
+    &resolve_col_args[0], row_conflict_fn_old         },
 };
+
 static unsigned n_conflict_fns=
-sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
+  sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
 
-static int
-set_conflict_fn(THD *thd, NDB_SHARE *share,
-                const NDBTAB *ndbtab,
-                const NDBCOL *conflict_col,
-                char *conflict_fn,
-                char *msg, uint msg_len,
-                TABLE *table)
-{
-  DBUG_ENTER("set_conflict_fn");
-  uint len= 0;
-  switch (conflict_col->getArrayType())
-  {
-  case NDBCOL::ArrayTypeShortVar:
-    len= *(uchar*)conflict_fn;
-    conflict_fn++;
-    break;
-  case NDBCOL::ArrayTypeMediumVar:
-    len= uint2korr(conflict_fn);
-    conflict_fn+= 2;
-    break;
-  default:
-    break;
-  }
-  conflict_fn[len]= '\0';
-  const char *ptr= conflict_fn;
+
+int
+parse_conflict_fn_spec(const char* conflict_fn_spec,
+                       const st_conflict_fn_def** conflict_fn,
+                       st_conflict_fn_arg* args,
+                       Uint32* max_args,
+                       const TABLE* table,
+                       char *msg, uint msg_len)
+{
+  DBUG_ENTER("parse_conflict_fn_spec");
+
+  Uint32 no_args = 0;
+  const char *ptr= conflict_fn_spec;
   const char *error_str= "unknown conflict resolution function";
   /* remove whitespace */
   while (*ptr == ' ' && *ptr != '\0') ptr++;
 
-  const unsigned MAX_ARGS= 8;
-  unsigned no_args= 0;
-  struct st_conflict_fn_arg args[MAX_ARGS];
-
-  DBUG_PRINT("info", ("parsing %s", conflict_fn));
+  DBUG_PRINT("info", ("parsing %s", conflict_fn_spec));
 
   for (unsigned i= 0; i < n_conflict_fns; i++)
   {
-    struct st_conflict_fn_def &fn= conflict_fns[i];
-    if (fn.name == NULL)
-      continue;
+    const st_conflict_fn_def &fn= conflict_fns[i];
 
     uint len= strlen(fn.name);
     if (strncmp(ptr, fn.name, len))
@@ -3971,9 +4218,16 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
     /* find all arguments */
     for (;;)
     {
+      if (no_args >= *max_args)
+      {
+        error_str= "too many arguments";
+        DBUG_PRINT("info", ("parse error %s", error_str));
+        break;
+      }
+
       /* expected type */
       enum enum_conflict_fn_arg_type type=
-        conflict_fns[i+no_args].arg_type;
+        conflict_fns[i].arg_defs[no_args].arg_type;
 
       /* remove whitespace */
       while (*ptr == ' ' && *ptr != '\0') ptr++;
@@ -3986,16 +4240,30 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
       }
 
       /* arg */
+      /* Todo : Should support comma as an arg separator? */
       const char *start_arg= ptr;
       while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
       const char *end_arg= ptr;
 
+      bool optional_arg = conflict_fns[i].arg_defs[no_args].optional;
       /* any arg given? */
       if (start_arg == end_arg)
       {
-        error_str= "missing function argument";
-        DBUG_PRINT("info", ("parse error %s", error_str));
-        break;
+        if (!optional_arg)
+        {
+          error_str= "missing function argument";
+          DBUG_PRINT("info", ("parse error %s", error_str));
+          break;
+        }
+        else
+        {
+          /* Arg was optional, and not present
+           * Must be at end of args, finish parsing
+           */
+          args[no_args].type= CFAT_END;
+          error_str= NULL;
+          break;
+        }
       }
 
       uint len= end_arg - start_arg;
@@ -4006,6 +4274,7 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
  
       DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
 
+      bool arg_processing_error = false;
       switch (type)
       {
       case CFAT_COLUMN_NAME:
@@ -4030,6 +4299,8 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
         abort();
       }
 
+      if (arg_processing_error)
+        break;
       no_args++;
     }
 
@@ -4057,43 +4328,86 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
       break;
     }
 
-    /* setup the function */
-    switch (fn.type)
-    {
-    case CFT_NDB_MAX:
-    case CFT_NDB_OLD:
-    case CFT_NDB_MAX_DEL_WIN:
-      if (args[0].fieldno == (uint32)-1)
-        break;
-      if (slave_set_resolve_fn(thd, share, ndbtab, args[0].fieldno,
-                               fn.type, table))
-      {
-        /* wrong data type */
-        my_snprintf(msg, msg_len,
-                 "column '%s' has wrong datatype",
-                 table->s->field[args[0].fieldno]->field_name);
-        DBUG_PRINT("info", ("%s", msg));
-        DBUG_RETURN(-1);
-      }
-      if (opt_ndb_extra_logging)
-      {
-        sql_print_information("NDB Slave: conflict_fn %s on attribute %s",
-                              fn.name,
-                              table->s->field[args[0].fieldno]->field_name);
-      }
-      break;
-    case CFT_NDB_UNDEF:
-      abort();
-    }
+    /* Update ptrs to conflict fn + # of args */
+    *conflict_fn = &conflict_fns[i];
+    *max_args = no_args;
+
     DBUG_RETURN(0);
   }
   /* parse error */
   my_snprintf(msg, msg_len, "%s, %s at '%s'",
-           conflict_fn, error_str, ptr);
+           conflict_fn_spec, error_str, ptr);
   DBUG_PRINT("info", ("%s", msg));
   DBUG_RETURN(-1);
 }
 
+static int
+setup_conflict_fn(THD *thd, NDB_SHARE *share,
+                  const NDBTAB *ndbtab,
+                  char *msg, uint msg_len,
+                  TABLE *table,
+                  const st_conflict_fn_def* conflict_fn,
+                  const st_conflict_fn_arg* args,
+                  const Uint32 num_args)
+{
+  DBUG_ENTER("setup_conflict_fn");
+
+  /* setup the function */
+  switch (conflict_fn->type)
+  {
+  case CFT_NDB_MAX:
+  case CFT_NDB_OLD:
+  case CFT_NDB_MAX_DEL_WIN:
+  {
+    if (num_args != 1)
+    {
+      my_snprintf(msg, msg_len,
+                  "Incorrect arguments to conflict function");
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
+    uint resolve_col_sz= 0;
+
+    if (0 == (resolve_col_sz =
+              slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
+    {
+      /* wrong data type */
+      slave_reset_conflict_fn(share);
+      my_snprintf(msg, msg_len,
+                  "column '%s' has wrong datatype",
+                  table->s->field[args[0].fieldno]->field_name);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
+    if (slave_set_resolve_fn(thd, share, ndbtab,
+                             args[0].fieldno, resolve_col_sz,
+                             conflict_fn, table, CFF_NONE))
+    {
+      my_snprintf(msg, msg_len,
+                  "unable to setup conflict resolution using column '%s'",
+                  table->s->field[args[0].fieldno]->field_name);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+    if (opt_ndb_extra_logging)
+    {
+       sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
+                             table->s->db.str,
+                             table->s->table_name.str,
+                             conflict_fn->name,
+                             table->s->field[args[0].fieldno]->field_name);
+    }
+    break;
+  }
+  case CFT_NUMBER_OF_CFTS:
+  case CFT_NDB_UNDEF:
+    abort();
+  }
+  DBUG_RETURN(0);
+}
+
 static const char *ndb_rep_db= NDB_REP_DB;
 static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
 static const char *nrt_db= "db";
@@ -4101,45 +4415,30 @@ static const char *nrt_table_name= "tabl
 static const char *nrt_server_id= "server_id";
 static const char *nrt_binlog_type= "binlog_type";
 static const char *nrt_conflict_fn= "conflict_fn";
+
+/*
+   ndbcluster_read_replication_table
+
+   This function reads the information for the supplied table from
+   the mysql.ndb_replication table.
+   Where there is no information (or no table), defaults are
+   returned.
+*/
 int
-ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
-                                   NDB_SHARE *share,
-                                   const NDBTAB *ndbtab,
-                                   uint server_id,
-                                   TABLE *table,
-                                   bool do_set_binlog_flags)
+ndbcluster_read_replication_table(THD *thd, Ndb *ndb,
+                                  const char* db,
+                                  const char* table_name,
+                                  uint server_id,
+                                  Uint32* binlog_flags,
+                                  char** conflict_fn_spec,
+                                  char* conflict_fn_buffer,
+                                  Uint32 conflict_fn_buffer_len)
 {
-  DBUG_ENTER("ndbcluster_read_binlog_replication");
-  const char *db= share->db;
-  const char *table_name= share->table_name;
+  DBUG_ENTER("ndbcluster_read_replication_table");
   NdbError ndberror;
   int error= 0;
   const char *error_str= "<none>";
 
-  /* Override for ndb_apply_status when logging */
-  if (opt_ndb_log_apply_status &&
-      do_set_binlog_flags)
-  {
-    if (strcmp(db, NDB_REP_DB) == 0 &&
-        strcmp(table_name, NDB_APPLY_TABLE) == 0)
-    {
-      /*
-        Ensure that we get all columns from ndb_apply_status updates
-        by forcing FULL event type
-        Also, ensure that ndb_apply_status events are always logged as 
-        WRITES.
-        This is needed so that received ndb_apply_status WRITES can be
-        cleanly propagated.
-      */
-      DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
-      sql_print_information("NDB : ndb-log-apply-status forcing "
-                            "%s.%s to FULL USE_WRITE",
-                            NDB_REP_DB, NDB_APPLY_TABLE);
-      set_binlog_flags(share, NBT_FULL);
-      DBUG_RETURN(0);
-    }
-  }
-
   ndb->setDatabaseName(ndb_rep_db);
   NDBDICT *dict= ndb->getDictionary();
   Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
@@ -4149,8 +4448,8 @@ ndbcluster_read_binlog_replication(THD *
        dict->getNdbError().code == 4009))
   {
     DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
-    if (do_set_binlog_flags)
-      set_binlog_flags(share, NBT_DEFAULT);
+    *binlog_flags= NBT_DEFAULT;
+    *conflict_fn_spec= NULL;
     DBUG_RETURN(0);
   }
   const NDBCOL
@@ -4216,6 +4515,10 @@ ndbcluster_read_binlog_replication(THD *
     char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
     NdbOperation *op[2];
     uint32 i, id= 0;
+    /* Read generic row (server_id==0) and specific row (server_id == our id)
+     * from ndb_replication.
+     * Specific overrides generic, if present
+     */
     for (i= 0; i < 2; i++)
     {
       NdbOperation *_op;
@@ -4275,39 +4578,70 @@ ndbcluster_read_binlog_replication(THD *
     if (col_binlog_type_rec_attr[1] == NULL ||
         col_binlog_type_rec_attr[1]->isNULL())
     {
+      /* No specific value, use generic */
       col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
       ndb_binlog_type[1]= ndb_binlog_type[0];
     }
     if (col_conflict_fn_rec_attr[1] == NULL ||
         col_conflict_fn_rec_attr[1]->isNULL())
     {
+      /* No specific value, use generic */
       col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
       ndb_conflict_fn[1]= ndb_conflict_fn[0];
     }
 
-    if (do_set_binlog_flags)
+    if (col_binlog_type_rec_attr[1] == NULL ||
+        col_binlog_type_rec_attr[1]->isNULL())
     {
-      if (col_binlog_type_rec_attr[1] == NULL ||
-          col_binlog_type_rec_attr[1]->isNULL())
-        set_binlog_flags(share, NBT_DEFAULT);
-      else
-        set_binlog_flags(share, (enum Ndb_binlog_type) ndb_binlog_type[1]);
+      DBUG_PRINT("info", ("No binlog flag value, using default"));
+      /* No value */
+      *binlog_flags= NBT_DEFAULT;
+    }
+    else
+    {
+      DBUG_PRINT("info", ("Taking binlog flag value from the table"));
+      *binlog_flags= (enum Ndb_binlog_type) ndb_binlog_type[1];
     }
-    if (table)
+
+    if (col_conflict_fn_rec_attr[1] == NULL ||
+        col_conflict_fn_rec_attr[1]->isNULL())
     {
-      if (col_conflict_fn_rec_attr[1] == NULL ||
-          col_conflict_fn_rec_attr[1]->isNULL())
-        slave_reset_conflict_fn(share); /* no conflict_fn */
-      else if (set_conflict_fn(thd, share, ndbtab,
-                               col_conflict_fn, ndb_conflict_fn[1],
-                               tmp_buf, sizeof(tmp_buf), table))
+      /* No conflict function */
+      *conflict_fn_spec = NULL;
+    }
+    else
+    {
+      const char* conflict_fn = ndb_conflict_fn[1];
+      uint len= 0;
+      switch (col_conflict_fn->getArrayType())
+      {
+      case NDBCOL::ArrayTypeShortVar:
+        len= *(uchar*)conflict_fn;
+        conflict_fn++;
+        break;
+      case NDBCOL::ArrayTypeMediumVar:
+        len= uint2korr(conflict_fn);
+        conflict_fn+= 2;
+        break;
+      default:
+        abort();
+      }
+      if ((len + 1) > conflict_fn_buffer_len)
       {
-        error_str= tmp_buf;
-        error= 1;
         ndb->closeTransaction(trans);
+        error= -2;
+        error_str= "Conflict function specification too long.";
         goto err;
       }
+      memcpy(conflict_fn_buffer, conflict_fn, len);
+      conflict_fn_buffer[len] = '\0';
+      *conflict_fn_spec = conflict_fn_buffer;
     }
+
+    DBUG_PRINT("info", ("Retrieved Binlog flags : %u and function spec : %s",
+                        *binlog_flags, (*conflict_fn_spec != NULL ?*conflict_fn_spec:
+                                       "NULL")));
+
     ndb->closeTransaction(trans);
 
     DBUG_RETURN(0);
@@ -4316,14 +4650,7 @@ ndbcluster_read_binlog_replication(THD *
 err:
   DBUG_PRINT("info", ("error %d, error_str %s, ndberror.code %u",
                       error, error_str, ndberror.code));
-  if (error > 0)
-  {
-    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_CONFLICT_FN_PARSE_ERROR,
-                        ER(ER_CONFLICT_FN_PARSE_ERROR),
-                        error_str);
-  }
-  else if (error < 0)
+  if (error < 0)
   {
     char msg[FN_REFLEN];
     switch (error)
@@ -4355,12 +4682,192 @@ err:
                         ER(ER_ILLEGAL_HA_CREATE_OPTION),
                         ndbcluster_hton_name, msg);  
   }
-  if (do_set_binlog_flags)
-    set_binlog_flags(share, NBT_DEFAULT);
+  *binlog_flags= NBT_DEFAULT;
+  *conflict_fn_spec= NULL;
+
   if (ndberror.code && opt_ndb_extra_logging)
     print_warning_list("NDB", thd_warn_list(thd));
   DBUG_RETURN(ndberror.code);
 }
+
+/*
+  ndbcluster_get_binlog_replication_info
+
+  This function retrieves the data for the given table
+  from the ndb_replication table.
+
+  If the table is not found, or the table does not exist,
+  then defaults are returned.
+*/
+int
+ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
+                                       const char* db,
+                                       const char* table_name,
+                                       uint server_id,
+                                       const TABLE *table,
+                                       Uint32* binlog_flags,
+                                       const st_conflict_fn_def** conflict_fn,
+                                       st_conflict_fn_arg* args,
+                                       Uint32* num_args)
+{
+  DBUG_ENTER("ndbcluster_get_binlog_replication_info");
+
+  /* Override for ndb_apply_status when logging */
+  if (opt_ndb_log_apply_status)
+  {
+    if (strcmp(db, NDB_REP_DB) == 0 &&
+        strcmp(table_name, NDB_APPLY_TABLE) == 0)
+    {
+      /*
+        Ensure that we get all columns from ndb_apply_status updates
+        by forcing FULL event type
+        Also, ensure that ndb_apply_status events are always logged as
+        WRITES.
+      */
+      DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
+      sql_print_information("NDB : ndb-log-apply-status forcing "
+                            "%s.%s to FULL USE_WRITE",
+                            NDB_REP_DB, NDB_APPLY_TABLE);
+      *binlog_flags = NBT_FULL;
+      *conflict_fn = NULL;
+      *num_args = 0;
+      DBUG_RETURN(0);
+    }
+  }
+
+  const Uint32 MAX_CONFLICT_FN_SPEC_LEN = 256;
+  char conflict_fn_buffer[MAX_CONFLICT_FN_SPEC_LEN];
+  char* conflict_fn_spec;
+
+  if (ndbcluster_read_replication_table(thd,
+                                        ndb,
+                                        db,
+                                        table_name,
+                                        server_id,
+                                        binlog_flags,
+                                        &conflict_fn_spec,
+                                        conflict_fn_buffer,
+                                        MAX_CONFLICT_FN_SPEC_LEN) != 0)
+  {
+    DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
+  }
+
+  if (table != NULL)
+  {
+    if (conflict_fn_spec != NULL)
+    {
+      char tmp_buf[FN_REFLEN];
+
+      if (parse_conflict_fn_spec(conflict_fn_spec,
+                                 conflict_fn,
+                                 args,
+                                 num_args,
+                                 table,
+                                 tmp_buf,
+                                 sizeof(tmp_buf)) != 0)
+      {
+        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_CONFLICT_FN_PARSE_ERROR,
+                            ER(ER_CONFLICT_FN_PARSE_ERROR),
+                            tmp_buf);
+        DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
+      }
+    }
+    else
+    {
+      /* No conflict function specified */
+      conflict_fn= NULL;
+      num_args= 0;
+    }
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndbcluster_apply_binlog_replication_info(THD *thd,
+                                         NDB_SHARE *share,
+                                         const NDBTAB* ndbtab,
+                                         TABLE* table,
+                                         const st_conflict_fn_def* conflict_fn,
+                                         const st_conflict_fn_arg* args,
+                                         Uint32 num_args,
+                                         bool do_set_binlog_flags,
+                                         Uint32 binlog_flags)
+{
+  DBUG_ENTER("ndbcluster_apply_binlog_replication_info");
+  char tmp_buf[FN_REFLEN];
+
+  if (do_set_binlog_flags)
+  {
+    DBUG_PRINT("info", ("Setting binlog flags to %u", binlog_flags));
+    set_binlog_flags(share, (enum Ndb_binlog_type)binlog_flags);
+  }
+
+  if (conflict_fn != NULL)
+  {
+    if (setup_conflict_fn(thd, share,
+                          ndbtab,
+                          tmp_buf, sizeof(tmp_buf),
+                          table,
+                          conflict_fn,
+                          args,
+                          num_args) != 0)
+    {
+      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_CONFLICT_FN_PARSE_ERROR,
+                          ER(ER_CONFLICT_FN_PARSE_ERROR),
+                          tmp_buf);
+      DBUG_RETURN(-1);
+    }
+  }
+  else
+  {
+    /* No conflict function specified */
+    slave_reset_conflict_fn(share);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
+                                   NDB_SHARE *share,
+                                   const NDBTAB *ndbtab,
+                                   uint server_id,
+                                   TABLE *table,
+                                   bool do_set_binlog_flags)
+{
+  DBUG_ENTER("ndbcluster_read_binlog_replication");
+  Uint32 binlog_flags;
+  const st_conflict_fn_def* conflict_fn= NULL;
+  st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
+  Uint32 num_args = MAX_CONFLICT_ARGS;
+
+  if ((ndbcluster_get_binlog_replication_info(thd, ndb,
+                                              share->db,
+                                              share->table_name,
+                                              server_id,
+                                              table,
+                                              &binlog_flags,
+                                              &conflict_fn,
+                                              args,
+                                              &num_args) != 0) ||
+      (ndbcluster_apply_binlog_replication_info(thd,
+                                                share,
+                                                ndbtab,
+                                                table,
+                                                conflict_fn,
+                                                args,
+                                                num_args,
+                                                do_set_binlog_flags,
+                                                binlog_flags) != 0))
+  {
+    DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
 #endif /* HAVE_NDB_BINLOG */
 
 bool
@@ -5984,6 +6491,13 @@ ndb_binlog_thread_func(void *arg)
   uint incident_id= 0;
   Binlog_thread_state do_ndbcluster_binlog_close_connection;
 
+  /**
+   * If we get error after having reported incident
+   *   but before binlog started...we do "Restarting Cluster Binlog"
+   *   in that case, don't report incident again
+   */
+  bool do_incident = true;
+
 #ifdef RUN_NDB_BINLOG_TIMER
   Timer main_timer;
 #endif
@@ -6124,7 +6638,7 @@ restart_cluster_failure:
   /*
     Main NDB Injector loop
   */
-  while (ndb_binlog_running)
+  while (do_incident && ndb_binlog_running)
   {
     /*
       check if it is the first log, if so we do not insert a GAP event
@@ -6156,6 +6670,7 @@ restart_cluster_failure:
     int ret = inj->record_incident(thd, INCIDENT_LOST_EVENTS,
                                    msg[incident_id]);
     assert(ret == 0);
+    do_incident = false; // Don't report incident again, unless we get started
     break;
   }
   incident_id= 1;
@@ -6281,6 +6796,7 @@ restart_cluster_failure:
     static char db[]= "";
     thd->db= db;
   }
+  do_incident = true; // If we get disconnected again...do incident report
   do_ndbcluster_binlog_close_connection= BCCC_running;
   for ( ; !((ndbcluster_binlog_terminating ||
              do_ndbcluster_binlog_close_connection) &&

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-06-15 10:37:56 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-06-16 18:16:01 +0000
@@ -95,8 +95,7 @@ static const char *ha_ndb_ext=".ndb";
 #define NDB_EXCEPTIONS_TABLE_SUFFIX "$EX"
 #define NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER "$ex"
 
-const uint error_conflict_fn_old_violation= 9998;
-const uint error_conflict_fn_max_violation= 9999;
+const uint error_conflict_fn_violation= 9999;
 #endif /* HAVE_NDB_BINLOG */
 
 
@@ -239,6 +238,26 @@ void ndb_rep_event_name(String *event_na
                         const char *db, const char *tbl, my_bool full);
 #ifdef HAVE_NDB_BINLOG
 int
+ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
+                                       const char* db,
+                                       const char* table_name,
+                                       uint server_id,
+                                       const TABLE *table,
+                                       Uint32* binlog_flags,
+                                       const st_conflict_fn_def** conflict_fn,
+                                       st_conflict_fn_arg* args,
+                                       Uint32* num_args);
+int
+ndbcluster_apply_binlog_replication_info(THD *thd,
+                                         NDB_SHARE *share,
+                                         const NDBTAB* ndbtab,
+                                         TABLE* table,
+                                         const st_conflict_fn_def* conflict_fn,
+                                         const st_conflict_fn_arg* args,
+                                         Uint32 num_args,
+                                         bool do_set_binlog_flags,
+                                         Uint32 binlog_flags);
+int
 ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
                                    NDB_SHARE *share,
                                    const NDBTAB *ndbtab,

=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc	2011-04-08 11:06:53 +0000
+++ b/sql/sql_class.cc	2011-06-09 09:20:47 +0000
@@ -583,24 +583,6 @@ Diagnostics_area::set_error_status(THD *
   m_status= DA_ERROR;
 }
 
-/**
- * modify_affected_rows
- * Modify the number of affected rows, and optionally the 
- * message in the Diagnostics area
- */
-void
-Diagnostics_area::modify_affected_rows(ha_rows new_affected_rows,
-                                       const char* new_message)
-{
-  DBUG_ASSERT(is_set());
-  DBUG_ASSERT(m_status == DA_OK);
-  DBUG_ASSERT(can_overwrite_status);
-
-  m_affected_rows= new_affected_rows;
-  if (new_message)
-    strmake(m_message, new_message, sizeof(m_message) - 1);
-}
-
 
 /**
   Mark the diagnostics area as 'DISABLED'.

=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h	2011-04-08 13:59:44 +0000
+++ b/sql/sql_class.h	2011-06-09 09:20:47 +0000
@@ -1163,9 +1163,6 @@ public:
   void set_eof_status(THD *thd);
   void set_error_status(THD *thd, uint sql_errno_arg, const char *message_arg);
 
-  /* Modify affected rows count and optionally message */
-  void modify_affected_rows(ha_rows new_affected_rows, const char *new_message= 0);
-
   void disable_status();
 
   void reset_diagnostics_area();

=== modified file 'storage/ndb/include/mgmapi/mgmapi_config_parameters.h'
--- a/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-05-31 08:28:58 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi_config_parameters.h	2011-06-15 10:55:06 +0000
@@ -194,6 +194,8 @@
 #define CFG_DB_INDEX_STAT_TRIGGER_SCALE  625
 #define CFG_DB_INDEX_STAT_UPDATE_DELAY   626
 
+#define CFG_DB_MAX_DML_OPERATIONS_PER_TRANSACTION 627
+
 #define CFG_NODE_ARBIT_RANK           200
 #define CFG_NODE_ARBIT_DELAY          201
 #define CFG_RESERVED_SEND_BUFFER_MEMORY 202

=== modified file 'storage/ndb/src/common/portlib/CMakeLists.txt'
--- a/storage/ndb/src/common/portlib/CMakeLists.txt	2011-05-11 12:23:24 +0000
+++ b/storage/ndb/src/common/portlib/CMakeLists.txt	2011-05-25 06:52:33 +0000
@@ -40,5 +40,7 @@ TARGET_LINK_LIBRARIES(NdbDir-t ndbportli
 ADD_EXECUTABLE(NdbGetInAddr-t NdbTCP.cpp)
 SET_TARGET_PROPERTIES(NdbGetInAddr-t
                       PROPERTIES COMPILE_FLAGS "-DTEST_NDBGETINADDR")
+TARGET_LINK_LIBRARIES(NdbGetInAddr-t ${LIBSOCKET} ${LIBNSL})
+
 
 

=== modified file 'storage/ndb/src/common/util/ndbzio.c'
--- a/storage/ndb/src/common/util/ndbzio.c	2011-04-18 14:15:23 +0000
+++ b/storage/ndb/src/common/util/ndbzio.c	2011-05-24 14:34:41 +0000
@@ -167,9 +167,11 @@ void ndbz_free(voidpf opaque, voidpf add
 }
 
 #ifdef _WIN32
-/* Windows doesn't define ENOTSUP, define it same as Solaris */
+#ifndef ENOTSUP
+/* If Windows doesn't define ENOTSUP, define it same as Solaris */
 #define ENOTSUP 48
 #endif
+#endif
 
 #ifndef HAVE_POSIX_MEMALIGN
 static inline int posix_memalign(void **memptr, size_t alignment, size_t size)

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-05-31 08:28:58 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2011-06-13 12:27:32 +0000
@@ -14238,6 +14238,15 @@ retry:
       sltLogPartPtr.p->logTailMbyte = 
         sltLogFilePtr.p->logLastPrepRef[tsltMbyte] & 65535;
 
+      if (DEBUG_REDO)
+      {
+        ndbout_c("part: %u setLogTail(gci: %u): file: %u mb: %u",
+                 sltLogPartPtr.p->logPartNo, 
+                 keepGci,
+                 sltLogPartPtr.p->logTailFileNo,
+                 sltLogPartPtr.p->logTailMbyte);
+      }
+
       bool tailmoved = !(ToldTailFileNo == sltLogPartPtr.p->logTailFileNo &&
                          ToldTailMByte == sltLogPartPtr.p->logTailMbyte);
 
@@ -16169,11 +16178,28 @@ void Dblqh::writeFileDescriptor(Signal*
   arrGuard(logFilePtr.p->currentMbyte, clogFileSize);
   if (DEBUG_REDO)
   {
-    ndbout_c("part: %u file: %u setting logMaxGciCompleted[%u] = %u",
-             logPartPtr.p->logPartNo,
-             logFilePtr.p->fileNo,
-             logFilePtr.p->currentMbyte,
-             logPartPtr.p->logPartNewestCompletedGCI);
+    printf("part: %u file: %u setting logMaxGciCompleted[%u] = %u logMaxGciStarted[%u]: %u lastPrepRef[%u]: ",
+           logPartPtr.p->logPartNo,
+           logFilePtr.p->fileNo,
+           logFilePtr.p->currentMbyte,
+           logPartPtr.p->logPartNewestCompletedGCI,
+           logFilePtr.p->currentMbyte,
+           cnewestGci,
+           logFilePtr.p->currentMbyte);
+    if (logPartPtr.p->firstLogTcrec == RNIL)
+    {
+      ndbout_c("file: %u mb: %u (RNIL)",
+               logFilePtr.p->fileNo,
+               logFilePtr.p->currentMbyte);
+    }
+    else
+    {
+      wfdTcConnectptr.i = logPartPtr.p->firstLogTcrec;
+      ptrCheckGuard(wfdTcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
+      ndbout_c("file: %u mb: %u",
+               wfdTcConnectptr.p->logStartFileNo,
+               wfdTcConnectptr.p->logStartPageNo >> ZTWOLOG_NO_PAGES_IN_MBYTE);
+    }
   }
   logFilePtr.p->logMaxGciCompleted[logFilePtr.p->currentMbyte] = 
     logPartPtr.p->logPartNewestCompletedGCI;
@@ -16346,10 +16372,11 @@ void Dblqh::writeSinglePage(Signal* sign
 
   if (DEBUG_REDO)
   {
-    ndbout_c("writeSingle 1 page at part: %u file: %u pos: %u",
+    ndbout_c("writeSingle 1 page at part: %u file: %u page: %u (mb: %u)",
              logPartPtr.p->logPartNo,
              logFilePtr.p->fileNo,
-             pageNo);
+             pageNo,
+             pageNo >> ZTWOLOG_NO_PAGES_IN_MBYTE);
   }
 }//Dblqh::writeSinglePage()
 
@@ -16452,8 +16479,10 @@ void Dblqh::readSrLastMbyteLab(Signal* s
       logPartPtr.p->lastMbyte = logFilePtr.p->currentMbyte - 1;
       if (DEBUG_REDO)
       {
-        ndbout_c("readSrLastMbyteLab part: %u lastMbyte: %u",
-                 logPartPtr.p->logPartNo, logPartPtr.p->lastMbyte);
+        ndbout_c("readSrLastMbyteLab part: %u file: %u lastMbyte: %u",
+                 logPartPtr.p->logPartNo, 
+                 logFilePtr.p->fileNo,
+                 logPartPtr.p->lastMbyte);
       }
     }//if
   }//if
@@ -17503,6 +17532,17 @@ void Dblqh::execSrCompletedLab(Signal* s
         systemErrorLab(signal, __LINE__);
         return;
       }//if
+
+      if (DEBUG_REDO)
+      {
+        ndbout_c("part: %u srLogLimits SR_FOURTH_PHASE %u-%u (file: %u mb: %u)",
+                 logPartPtr.p->logPartNo, 
+                 logPartPtr.p->logStartGci,
+                 logPartPtr.p->logLastGci,
+                 logPartPtr.p->lastLogfile,
+                 logPartPtr.p->lastMbyte);
+      }
+
       signal->theData[0] = ZSR_LOG_LIMITS;
       signal->theData[1] = logPartPtr.i;
       signal->theData[2] = logPartPtr.p->lastLogfile;
@@ -17689,6 +17729,15 @@ void Dblqh::srGciLimits(Signal* signal)
     jam();
     ptrAss(logPartPtr, logPartRecord);
     logPartPtr.p->logExecState = LogPartRecord::LES_SEARCH_STOP;
+    if (DEBUG_REDO)
+    {
+      ndbout_c("part: %u srLogLimits (srGciLimits) %u-%u (file: %u mb: %u)",
+               logPartPtr.p->logPartNo, 
+               logPartPtr.p->logStartGci,
+               logPartPtr.p->logLastGci,
+               logPartPtr.p->lastLogfile,
+               logPartPtr.p->lastMbyte);
+    }
     signal->theData[0] = ZSR_LOG_LIMITS;
     signal->theData[1] = logPartPtr.i;
     signal->theData[2] = logPartPtr.p->lastLogfile;
@@ -17720,22 +17769,34 @@ void Dblqh::srLogLimits(Signal* signal)
    * ----------------------------------------------------------------------- */
   while(true) {
     ndbrequire(tmbyte < clogFileSize);
-    if (logPartPtr.p->logExecState == LogPartRecord::LES_SEARCH_STOP) {
-      if (logFilePtr.p->logMaxGciCompleted[tmbyte] <= logPartPtr.p->logLastGci) {
+    if (logPartPtr.p->logExecState == LogPartRecord::LES_SEARCH_STOP)
+    {
+      if (logFilePtr.p->logMaxGciCompleted[tmbyte] <= logPartPtr.p->logLastGci)
+      {
         jam();
-      /* --------------------------------------------------------------------
-       *  WE ARE STEPPING BACKWARDS FROM MBYTE TO MBYTE. THIS IS THE FIRST 
-       *  MBYTE WHICH IS TO BE INCLUDED IN THE LOG EXECUTION. THE STOP GCI 
-       *  HAS NOT BEEN COMPLETED BEFORE THIS MBYTE. THUS THIS MBYTE HAVE 
-       *  TO BE EXECUTED.
-       * ------------------------------------------------------------------- */
+        /* --------------------------------------------------------------------
+         *  WE ARE STEPPING BACKWARDS FROM MBYTE TO MBYTE. THIS IS THE FIRST 
+         *  MBYTE WHICH IS TO BE INCLUDED IN THE LOG EXECUTION. THE STOP GCI 
+         *  HAS NOT BEEN COMPLETED BEFORE THIS MBYTE. THUS THIS MBYTE HAVE 
+         *  TO BE EXECUTED.
+         * ------------------------------------------------------------------ */
         logPartPtr.p->stopLogfile = logFilePtr.i;
         logPartPtr.p->stopMbyte = tmbyte;
         logPartPtr.p->logExecState = LogPartRecord::LES_SEARCH_START;
+        if (DEBUG_REDO)
+        {
+          ndbout_c("part: %u srLogLimits found stop pos file: %u mb: %u logMaxGciCompleted[tmbyte]: %u (lastGci: %u)",
+                   logPartPtr.p->logPartNo,
+                   logFilePtr.p->fileNo,
+                   tmbyte,
+                   logFilePtr.p->logMaxGciCompleted[tmbyte],
+                   logPartPtr.p->logLastGci);
+        }
       }//if
       else if (DEBUG_REDO)
       {
-        ndbout_c("SKIP part: %u file: %u mb: %u logMaxGciCompleted: %u >= %u",
+        ndbout_c("SEARCH STOP SKIP part: %u file: %u mb: %u "
+                 "logMaxGciCompleted: %u > %u",
                  logPartPtr.p->logPartNo,
                  logFilePtr.p->fileNo,
                  tmbyte,
@@ -17743,28 +17804,53 @@ void Dblqh::srLogLimits(Signal* signal)
                  logPartPtr.p->logLastGci);
       }
     }//if
-  /* ------------------------------------------------------------------------
-   *  WHEN WE HAVEN'T FOUND THE STOP MBYTE IT IS NOT NECESSARY TO LOOK FOR THE
-   *  START MBYTE. THE REASON IS THE FOLLOWING LOGIC CHAIN: 
-   *    MAX_GCI_STARTED >= MAX_GCI_COMPLETED >= LAST_GCI >= START_GCI
-   *  THUS MAX_GCI_STARTED >= START_GCI. THUS MAX_GCI_STARTED < START_GCI CAN
-   *  NOT BE TRUE AS WE WILL CHECK OTHERWISE.
-   * ----------------------------------------------------------------------- */
-    if (logPartPtr.p->logExecState == LogPartRecord::LES_SEARCH_START) {
-      if (logFilePtr.p->logMaxGciStarted[tmbyte] < logPartPtr.p->logStartGci) {
+    /* ------------------------------------------------------------------------
+     *  WHEN WE HAVEN'T FOUND THE STOP MBYTE IT IS NOT NECESSARY TO LOOK FOR THE
+     *  START MBYTE. THE REASON IS THE FOLLOWING LOGIC CHAIN: 
+     *    MAX_GCI_STARTED >= MAX_GCI_COMPLETED >= LAST_GCI >= START_GCI
+     *  THUS MAX_GCI_STARTED >= START_GCI. THUS MAX_GCI_STARTED < START_GCI CAN
+     *  NOT BE TRUE AS WE WILL CHECK OTHERWISE.
+     * ---------------------------------------------------------------------- */
+    if (logPartPtr.p->logExecState == LogPartRecord::LES_SEARCH_START)
+    {
+      if (logFilePtr.p->logMaxGciStarted[tmbyte] < logPartPtr.p->logStartGci)
+      {
         jam();
-      /* --------------------------------------------------------------------
-       *  WE HAVE NOW FOUND THE START OF THE EXECUTION OF THE LOG. 
-       *  WE STILL HAVE TO MOVE IT BACKWARDS TO ALSO INCLUDE THE 
-       *  PREPARE RECORDS WHICH WERE STARTED IN A PREVIOUS MBYTE.
-       * ------------------------------------------------------------------- */
+        /* --------------------------------------------------------------------
+         *  WE HAVE NOW FOUND THE START OF THE EXECUTION OF THE LOG. 
+         *  WE STILL HAVE TO MOVE IT BACKWARDS TO ALSO INCLUDE THE 
+         *  PREPARE RECORDS WHICH WERE STARTED IN A PREVIOUS MBYTE.
+         * ------------------------------------------------------------------ */
+        if (DEBUG_REDO)
+        {
+          ndbout_c("part: %u srLogLimits found start pos file: %u mb: %u logMaxGciStarted[tmbyte]: %u (startGci: %u)",
+                   logPartPtr.p->logPartNo,
+                   logFilePtr.p->fileNo,
+                   tmbyte,
+                   logFilePtr.p->logMaxGciCompleted[tmbyte],
+                   logPartPtr.p->logStartGci);
+          ndbout_c("part: %u srLogLimits lastPrepRef => file: %u mb: %u",
+                   logPartPtr.p->logPartNo, 
+                   logFilePtr.p->logLastPrepRef[tmbyte] >> 16,
+                   logFilePtr.p->logLastPrepRef[tmbyte] & 65535);
+        }
         tlastPrepRef = logFilePtr.p->logLastPrepRef[tmbyte];
         logPartPtr.p->startMbyte = tlastPrepRef & 65535;
         LogFileRecordPtr locLogFilePtr;
         findLogfile(signal, tlastPrepRef >> 16, logPartPtr, &locLogFilePtr);
         logPartPtr.p->startLogfile = locLogFilePtr.i;
         logPartPtr.p->logExecState = LogPartRecord::LES_EXEC_LOG;
-      }//if
+      }
+      else if (DEBUG_REDO)
+      {
+        ndbout_c("SEARCH START SKIP part: %u file: %u mb: %u "
+                 "logMaxGciCompleted: %u >= %u",
+                 logPartPtr.p->logPartNo,
+                 logFilePtr.p->fileNo,
+                 tmbyte,
+                 logFilePtr.p->logMaxGciStarted[tmbyte],
+                 logPartPtr.p->logStartGci);
+      }
     }//if
     if (logPartPtr.p->logExecState != LogPartRecord::LES_EXEC_LOG) {
       if (tmbyte == 0) {
@@ -18250,11 +18336,12 @@ void Dblqh::execSr(Signal* signal)
       logWord = readLogword(signal);
       if (DEBUG_REDO)
       {
-        ndbout_c("found gci: %u part: %u file: %u page: %u",
+        ndbout_c("found gci: %u part: %u file: %u page: %u (mb: %u)",
                  logWord,
                  logPartPtr.p->logPartNo,
                  logFilePtr.p->fileNo,
-                 logFilePtr.p->currentFilepage);
+                 logFilePtr.p->currentFilepage,
+                 logFilePtr.p->currentFilepage >> ZTWOLOG_NO_PAGES_IN_MBYTE);
       }
       if (logWord == logPartPtr.p->logLastGci)
       {
@@ -18868,6 +18955,30 @@ stepNext_2:
     {
       jam();
       logPartPtr.p->invalidatePageNo = logPartPtr.p->headPageNo;
+
+      if (! ((cstartType == NodeState::ST_INITIAL_START) ||
+             (cstartType == NodeState::ST_INITIAL_NODE_RESTART)))
+      {
+        jam();
+        if (logFilePtr.i == logPartPtr.p->lastLogfile)
+        {
+          jam();
+          Uint32 lastMbytePageNo =
+            logPartPtr.p->lastMbyte << ZTWOLOG_NO_PAGES_IN_MBYTE;
+          if (logPartPtr.p->invalidatePageNo < lastMbytePageNo)
+          {
+            jam();
+            if (DEBUG_REDO)
+            {
+              ndbout_c("readFileInInvalidate part: %u step: %u moving invalidatePageNo from %u to %u (lastMbyte)",
+                       logPartPtr.p->logPartNo, stepNext,
+                       logPartPtr.p->invalidatePageNo,
+                       lastMbytePageNo);
+            }
+            logPartPtr.p->invalidatePageNo = lastMbytePageNo;
+          }
+        }
+      }
       readFileInInvalidate(signal, 1);
       return;
     }
@@ -19879,11 +19990,12 @@ void Dblqh::completedLogPage(Signal* sig
 
   if (DEBUG_REDO)
   {
-    ndbout_c("writing %d pages at part: %u file: %u pos: %u",
+    ndbout_c("writing %d pages at part: %u file: %u page: %u (mb: %u)",
              twlpNoPages,
              logPartPtr.p->logPartNo,
              logFilePtr.p->fileNo,
-             logFilePtr.p->filePosition);
+             logFilePtr.p->filePosition,
+             logFilePtr.p->filePosition >> ZTWOLOG_NO_PAGES_IN_MBYTE);
   }
 
   if (twlpType == ZNORMAL) {
@@ -21089,11 +21201,12 @@ void Dblqh::readExecLog(Signal* signal)
 
   if (DEBUG_REDO)
   {
-    ndbout_c("readExecLog %u page at part: %u file: %u pos: %u",
+    ndbout_c("readExecLog %u page at part: %u file: %u page: %u (mb: %u)",
              lfoPtr.p->noPagesRw,
              logPartPtr.p->logPartNo,
              logFilePtr.p->fileNo,
-             logPartPtr.p->execSrStartPageNo);
+             logPartPtr.p->execSrStartPageNo,
+             logPartPtr.p->execSrStartPageNo >> ZTWOLOG_NO_PAGES_IN_MBYTE);
   }
 }//Dblqh::readExecLog()
 
@@ -21160,11 +21273,12 @@ void Dblqh::readExecSr(Signal* signal)
 
   if (DEBUG_REDO)
   {
-    ndbout_c("readExecSr %u page at part: %u file: %u pos: %u",
+    ndbout_c("readExecSr %u page at part: %u file: %u page: %u (mb: %u)",
              8,
              logPartPtr.p->logPartNo,
              logFilePtr.p->fileNo,
-             tresPageid);
+             tresPageid,
+             tresPageid >> ZTWOLOG_NO_PAGES_IN_MBYTE);
   }
 }//Dblqh::readExecSr()
 
@@ -21322,10 +21436,11 @@ void Dblqh::readSinglePage(Signal* signa
 
   if (DEBUG_REDO)
   {
-    ndbout_c("readSinglePage 1 page at part: %u file: %u pos: %u",
+    ndbout_c("readSinglePage 1 page at part: %u file: %u page: %u (mb: %u)",
              logPartPtr.p->logPartNo,
              logFilePtr.p->fileNo,
-             pageNo);
+             pageNo,
+             pageNo >> ZTWOLOG_NO_PAGES_IN_MBYTE);
   }
 }//Dblqh::readSinglePage()
 
@@ -21854,11 +21969,12 @@ void Dblqh::writeCompletedGciLog(Signal*
 
   if (DEBUG_REDO)
   {
-    ndbout_c("writeCompletedGciLog gci: %u part: %u file: %u page: %u",
+    ndbout_c("writeCompletedGciLog gci: %u part: %u file: %u page: %u (mb: %u)",
              cnewestCompletedGci,
              logPartPtr.p->logPartNo,
              logFilePtr.p->fileNo,
-             logFilePtr.p->currentFilepage);
+             logFilePtr.p->currentFilepage,
+             logFilePtr.p->currentFilepage >> ZTWOLOG_NO_PAGES_IN_MBYTE);
   }
 
   writeLogWord(signal, ZCOMPLETED_GCI_TYPE);
@@ -21904,10 +22020,11 @@ void Dblqh::writeDirty(Signal* signal, U
 
   if (DEBUG_REDO)
   {
-    ndbout_c("writeDirty 1 page at part: %u file: %u pos: %u",
+    ndbout_c("writeDirty 1 page at part: %u file: %u page: %u (mb: %u)",
              logPartPtr.p->logPartNo,
              logFilePtr.p->fileNo,
-             logPartPtr.p->prevFilepage);
+             logPartPtr.p->prevFilepage,
+             logPartPtr.p->prevFilepage >> ZTWOLOG_NO_PAGES_IN_MBYTE);
   }
 }//Dblqh::writeDirty()
 

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-05-25 09:31:27 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-06-16 08:37:10 +0000
@@ -4418,7 +4418,7 @@ Dbspj::scanIndex_build(Build_context& ct
     if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT &&
        !treeNodePtr.p->m_ancestors.contains(ctx.m_scans))
     {
-      nodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
+      treeNodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
     }
 
     ctx.m_scan_cnt++;
@@ -4879,12 +4879,6 @@ Dbspj::scanIndex_parent_row(Signal* sign
     }
 
     Uint32 ptrI = fragPtr.p->m_rangePtrI;
-    if (ptrI == RNIL)
-    {
-      jam();
-      data.m_frags_not_complete++;
-    }
-
     bool hasNull;
     if (treeNodePtr.p->m_bits & TreeNode::T_KEYINFO_CONSTRUCTED)
     {
@@ -4972,6 +4966,39 @@ Dbspj::scanIndex_parent_batch_complete(S
   data.m_rows_received = 0;
   data.m_rows_expecting = 0;
   ndbassert(data.m_frags_outstanding == 0);
+  ndbassert(data.m_frags_not_complete == 0);
+
+  Ptr<ScanFragHandle> fragPtr;
+  {
+    Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+    list.first(fragPtr);
+
+    if ((treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) == 0)
+    {
+      if (fragPtr.p->m_rangePtrI != RNIL)
+      {
+        // No pruning, so we must scan all fragments.
+        jam();
+        data.m_frags_not_complete = data.m_fragCount;
+      }
+    }
+    else
+    {
+      while(!fragPtr.isNull())
+      {
+        if (fragPtr.p->m_rangePtrI != RNIL)
+        {
+          jam();
+          /**
+           * This is a pruned scan, so we must scan those fragments that
+           * some distribution key hashed to.
+           */
+          data.m_frags_not_complete++;
+        }
+        list.next(fragPtr);
+      }
+    }
+  }
 
   if (data.m_frags_not_complete == 0)
   {
@@ -4981,11 +5008,6 @@ Dbspj::scanIndex_parent_batch_complete(S
      */
     return;
   }
-  else if ((treeNodePtr.p->m_bits & TreeNode::T_PRUNE_PATTERN) == 0)
-  {
-    jam();
-    data.m_frags_not_complete = data.m_fragCount;
-  }
 
   /**
    * When parent's batch is complete, we send our batch
@@ -5003,6 +5025,8 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
 
   DEBUG("scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no
         << ", m_batch_chunks: " << data.m_batch_chunks);
+  
+  ndbassert(treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE);
 
   /**
    * Register index-scans to be restarted if we didn't get all
@@ -5014,7 +5038,6 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
     DEBUG("Register TreeNode for restart, m_node_no: " << treeNodePtr.p->m_node_no);
     ndbrequire(treeNodePtr.p->m_state != TreeNode::TN_ACTIVE);
     registerActiveCursor(requestPtr, treeNodePtr);
-    data.m_frags_not_complete = 1;
     data.m_batch_chunks = 0;
   }
 }

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2011-05-26 11:52:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2011-06-15 10:55:06 +0000
@@ -140,6 +140,7 @@
 #define ZUNLOCKED_IVAL_TOO_HIGH 294
 #define ZUNLOCKED_OP_HAS_BAD_STATE 295 
 #define ZBAD_DIST_KEY 298
+#define ZTRANS_TOO_BIG 261
 #endif
 
 class Dbtc: public SimulatedBlock {
@@ -722,6 +723,7 @@ public:
     };
 
     Uint32 no_commit_ack_markers;
+    Uint32 m_write_count;
     ReturnSignal returnsignal;
     AbortState abortState;
 
@@ -2102,6 +2104,7 @@ private:
   Uint32 c_lastFailedApi;
 #endif
   Uint32 m_deferred_enabled;
+  Uint32 m_max_writes_per_trans;
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp	2011-04-28 07:47:53 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp	2011-06-15 10:55:06 +0000
@@ -338,6 +338,7 @@ Dbtc::Dbtc(Block_context& ctx, Uint32 in
   c_apiConTimer_line = 0;
   csystemStart = SSS_FALSE;
   m_deferred_enabled = ~Uint32(0);
+  m_max_writes_per_trans = ~Uint32(0);
 }//Dbtc::Dbtc()
 
 Dbtc::~Dbtc() 

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2011-05-31 12:28:59 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2011-06-15 10:55:06 +0000
@@ -685,6 +685,10 @@ void Dbtc::execREAD_CONFIG_REQ(Signal* s
   //ndb_mgm_get_int_parameter(p, CFG_DB_PARALLEL_TRANSACTION_TAKEOVER, &val);
   set_no_parallel_takeover(val);
 
+  val = ~(Uint32)0;
+  ndb_mgm_get_int_parameter(p, CFG_DB_MAX_DML_OPERATIONS_PER_TRANSACTION, &val);
+  m_max_writes_per_trans = val;
+
   ctimeOutCheckDelay = 50; // 500ms
 }//Dbtc::execSIZEALT_REP()
 
@@ -1857,6 +1861,13 @@ start_failure:
     abortErrorLab(signal);
     return;
   }
+  case 65:
+  {
+    jam();
+    terrorCode = ZTRANS_TOO_BIG;
+    abortErrorLab(signal);
+    return;
+  }
   default:
     jam();
     systemErrorLab(signal, __LINE__);
@@ -2398,6 +2409,8 @@ void Dbtc::initApiConnectRec(Signal* sig
 #ifdef ERROR_INSERT
   regApiPtr->continueBCount = 0;
 #endif
+
+  regApiPtr->m_write_count = 0;
 }//Dbtc::initApiConnectRec()
 
 int
@@ -3068,6 +3081,11 @@ void Dbtc::execTCKEYREQ(Signal* signal)
     case ZWRITE:
     case ZREFRESH:
       jam();
+      if (unlikely((++ regApiPtr->m_write_count) > m_max_writes_per_trans))
+      {
+        TCKEY_abort(signal, 65);
+        return;
+      }
       break;
     default:
       TCKEY_abort(signal, 9);

=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2011-05-31 12:28:59 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2011-06-13 06:14:32 +0000
@@ -3909,7 +3909,7 @@ void Qmgr::failReportLab(Signal* signal,
       msg = "Start timeout";
       break;
     case FailRep::ZHEARTBEAT_FAILURE:
-      msg = "Hearbeat failure";
+      msg = "Heartbeat failure";
       break;
     case FailRep::ZLINK_FAILURE:
       msg = "Connection failure";

=== modified file 'storage/ndb/src/kernel/vm/NdbSeqLock.hpp'
--- a/storage/ndb/src/kernel/vm/NdbSeqLock.hpp	2011-05-17 07:06:30 +0000
+++ b/storage/ndb/src/kernel/vm/NdbSeqLock.hpp	2011-06-10 12:17:51 +0000
@@ -86,7 +86,7 @@ struct NdbSeqLock
   void write_lock() {}
   void write_unlock() {}
 
-  Uint32 read_lock() {}
+  Uint32 read_lock() { return 0; }
   bool read_unlock(Uint32 val) const { return true;}
 };
 

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2011-05-19 09:16:32 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	2011-06-15 10:55:06 +0000
@@ -36,6 +36,7 @@
 
 #define KEY_INTERNAL 0
 #define MAX_INT_RNIL 0xfffffeff
+#define MAX_INT32 0xffffffff
 #define MAX_PORT_NO 65535
 
 #define _STR_VALUE(x) #x
@@ -771,6 +772,19 @@ const ConfigInfo::ParamInfo ConfigInfo::
     STR_VALUE(MAX_INT_RNIL) },
 
   {
+    CFG_DB_MAX_DML_OPERATIONS_PER_TRANSACTION,
+    "MaxDMLOperationsPerTransaction",
+    DB_TOKEN,
+    "Max DML-operations in one transaction (0 == no limit)",
+    ConfigInfo::CI_USED,
+    false,
+    ConfigInfo::CI_INT,
+    STR_VALUE(MAX_INT32),
+    "32",
+    STR_VALUE(MAX_INT32)
+  },
+
+  {
     CFG_DB_NO_LOCAL_OPS,
     "MaxNoOfLocalOperations",
     DB_TOKEN,

=== modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2011-05-25 14:31:47 +0000
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp	2011-06-13 10:38:49 +0000
@@ -596,6 +596,7 @@ NdbEventOperationImpl::execute_nolock()
     {
       switch(myDict->getNdbError().code){
       case 711:
+      case 763:
         // ignore;
         break;
       default:

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-06-14 10:42:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-06-16 18:16:01 +0000
@@ -2263,7 +2263,7 @@ NdbIndexStatImpl::MemDefault::mem_alloc(
   {
     size += 4 - size % 4;
   }
-  Item* item = (Item*)my_malloc(sizeof(Item) + size, MYF(0));
+  Item* item = (Item*)malloc(sizeof(Item) + size);
   if (item != 0)
   {
     item->m_magic = MemMagic;
@@ -2284,7 +2284,7 @@ NdbIndexStatImpl::MemDefault::mem_free(v
     assert(item->m_magic == MemMagic);
     size_t size = item->m_size;
     item->m_magic = 0;
-    my_free(item, MYF(0));
+    free(item);
     assert(m_used >= size);
     m_used -= size;
   }

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2011-05-17 12:47:21 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2011-06-16 09:32:43 +0000
@@ -23,7 +23,6 @@
 #include <Vector.hpp>
 #include "signaldata/QueryTree.hpp"
 
-#include <Ndb.hpp>
 #include "NdbDictionaryImpl.hpp"
 #include <NdbRecord.hpp>
 #include "AttributeHeader.hpp"
@@ -575,16 +574,15 @@ NdbQueryIndexScanOperationDef::~NdbQuery
 
 NdbQueryOperationDefImpl::~NdbQueryOperationDefImpl()
 {
+  // Unlink any parent and child refering this object
   if (m_parent != NULL)
   {
     m_parent->removeChild(this);
   }
-  // Delete children recursively also.
   for (Uint32 i = 0; i<m_children.size(); i++)
   {
     assert(m_children[i]->m_parent == this);
     m_children[i]->m_parent = NULL;
-    delete m_children[i];
   }
 }
 
@@ -686,9 +684,9 @@ NdbQueryOperationDef::getIndex() const
  * Implementation of NdbQueryBuilder factory
  ******************************************/
 // Static method.
-NdbQueryBuilder* NdbQueryBuilder::create(Ndb& ndb)
+NdbQueryBuilder* NdbQueryBuilder::create()
 {
-  NdbQueryBuilderImpl* const impl = new NdbQueryBuilderImpl(ndb);
+  NdbQueryBuilderImpl* const impl = new NdbQueryBuilderImpl();
   if (likely (impl != NULL))
   {
     if (likely(impl->getNdbError().code == 0))
@@ -1045,9 +1043,9 @@ NdbQueryBuilder::prepare()
 // The (hidden) Impl of NdbQueryBuilder
 ////////////////////////////////////////
 
-NdbQueryBuilderImpl::NdbQueryBuilderImpl(Ndb& ndb)
+NdbQueryBuilderImpl::NdbQueryBuilderImpl()
 : m_interface(*this),
-  m_ndb(ndb), m_error(),
+  m_error(),
   m_operations(),
   m_operands(),
   m_paramCnt(0),
@@ -1063,12 +1061,13 @@ NdbQueryBuilderImpl::NdbQueryBuilderImpl
 NdbQueryBuilderImpl::~NdbQueryBuilderImpl()
 {
   // Delete all operand and operator in Vector's
-  if (m_operations.size() > 0)
+  for (Uint32 i=0; i<m_operations.size(); ++i)
   {
-    delete m_operations[0];
+    delete m_operations[i];
   }
   for (Uint32 i=0; i<m_operands.size(); ++i)
-  { delete m_operands[i];
+  {
+    delete m_operands[i];
   }
 }
 
@@ -1231,12 +1230,13 @@ NdbQueryDefImpl(const Vector<NdbQueryOpe
 NdbQueryDefImpl::~NdbQueryDefImpl()
 {
   // Release all NdbQueryOperations
-  if (m_operations.size() > 0)
+  for (Uint32 i=0; i<m_operations.size(); ++i)
   {
-    delete m_operations[0];
+    delete m_operations[i];
   }
   for (Uint32 i=0; i<m_operands.size(); ++i)
-  { delete m_operands[i];
+  {
+    delete m_operands[i];
   }
 }
 
@@ -2799,15 +2799,14 @@ main(int argc, const char** argv)
   assert (sizeof(NdbParamOperand) == sizeof(NdbQueryOperandImpl*));
   assert (sizeof(NdbLinkedOperand) == sizeof(NdbQueryOperandImpl*));
 
-  Ndb *myNdb = 0;
-  NdbQueryBuilder myBuilder(*myNdb);
+  NdbQueryBuilder* const myBuilder= NdbQueryBuilder::create();
 
   const NdbDictionary::Table *manager = (NdbDictionary::Table*)0xDEADBEAF;
 //  const NdbDictionary::Index *ix = (NdbDictionary::Index*)0x11223344;
 
-  NdbQueryDef* q1 = 0;
+  const NdbQueryDef* q1 = 0;
   {
-    NdbQueryBuilder* qb = &myBuilder; //myDict->getQueryBuilder();
+    NdbQueryBuilder* qb = myBuilder;
 
     const NdbQueryOperand* managerKey[] =  // Manager is indexed om {"dept_no", "emp_no"}
     {  qb->constValue("d005"),             // dept_no = "d005"

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.hpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.hpp	2011-06-16 09:32:43 +0000
@@ -24,7 +24,6 @@
 // skip includes...and require them to be included first
 // BUH!
 
-class Ndb;
 class NdbQueryDef;
 class NdbQueryDefImpl;
 class NdbQueryBuilderImpl;
@@ -358,9 +357,8 @@ private:
  *   build phase.
  *
  * - The NdbQueryDef produced by the ::prepare() method has a lifetime 
- *   determined by the Ndb object, or until it is explicit released by
- *   NdbQueryDef::release()
- *  
+ *   until it is explicit released by NdbQueryDef::release()
+ *
  */
 class NdbQueryBuilder 
 {
@@ -380,7 +378,7 @@ public:
    * Allocate an instance.
    * @return New instance, or NULL if allocation failed.
    */
-  static NdbQueryBuilder* create(Ndb& ndb);
+  static NdbQueryBuilder* create();
 
   /**
    * Release this object and any resources held by it.
@@ -478,8 +476,7 @@ private:
  * NdbQueryDef represents a ::prepare()'d object from NdbQueryBuilder.
  *
  * The NdbQueryDef is reusable in the sense that it may be executed multiple
- * times. Its lifetime is defined by the Ndb object which it was created with,
- * or it may be explicitely released() when no longer required.
+ * times. It is valid until it is explicitely released().
  *
  * The NdbQueryDef *must* be keept alive until the last thread
  * which executing a query based on this NdbQueryDef has completed execution 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp	2011-05-05 11:06:08 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp	2011-06-16 09:32:43 +0000
@@ -631,7 +631,7 @@ class NdbQueryBuilderImpl
 
 public:
   ~NdbQueryBuilderImpl();
-  explicit NdbQueryBuilderImpl(Ndb& ndb);
+  explicit NdbQueryBuilderImpl();
 
   const NdbQueryDefImpl* prepare();
 
@@ -665,7 +665,6 @@ private:
   bool contains(const NdbQueryOperationDefImpl*);
 
   NdbQueryBuilder m_interface;
-  Ndb& m_ndb;
   NdbError m_error;
 
   Vector<NdbQueryOperationDefImpl*> m_operations;

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-05-26 14:44:59 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-06-16 09:32:43 +0000
@@ -2895,7 +2895,6 @@ NdbQueryImpl::closeTcCursor(bool forceSe
   assert (m_queryDef.isScanQuery());
 
   NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
-  NdbImpl* const impl = ndb;
   const Uint32 timeout  = ndb->get_waitfor_timeout();
   const Uint32 nodeId   = m_transaction.getConnectedNodeId();
   const Uint32 seq      = m_transaction.theNodeSequence;
@@ -2905,7 +2904,7 @@ NdbQueryImpl::closeTcCursor(bool forceSe
    */
   PollGuard poll_guard(*ndb);
 
-  if (unlikely(impl->getNodeSequence(nodeId) != seq))
+  if (unlikely(ndb->getNodeSequence(nodeId) != seq))
   {
     setErrorCode(Err_NodeFailCausedAbort);
     return -1;  // Transporter disconnected and reconnected, no need to close
@@ -2917,7 +2916,7 @@ NdbQueryImpl::closeTcCursor(bool forceSe
     const FetchResult result = static_cast<FetchResult>
         (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
 
-    if (unlikely(impl->getNodeSequence(nodeId) != seq))
+    if (unlikely(ndb->getNodeSequence(nodeId) != seq))
       setFetchTerminated(Err_NodeFailCausedAbort,false);
     else if (unlikely(result != FetchResult_ok))
     {
@@ -2952,7 +2951,7 @@ NdbQueryImpl::closeTcCursor(bool forceSe
       const FetchResult result = static_cast<FetchResult>
           (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
 
-      if (unlikely(impl->getNodeSequence(nodeId) != seq))
+      if (unlikely(ndb->getNodeSequence(nodeId) != seq))
         setFetchTerminated(Err_NodeFailCausedAbort,false);
       else if (unlikely(result != FetchResult_ok))
       {

=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c	2011-06-06 12:18:27 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c	2011-06-15 10:55:06 +0000
@@ -317,7 +317,9 @@ ErrorBundle ErrorCodes[] = {
    */
   { 281,  HA_ERR_NO_CONNECTION, AE, "Operation not allowed due to cluster shutdown in progress" },
   { 299,  DMEC, AE, "Operation not allowed or aborted due to single user mode" },
-  { 763,  DMEC, AE, "Alter table requires cluster nodes to have exact same version" },
+  { 261,  DMEC, AE,
+    "DML count in transaction exceeds config parameter MaxDMLOperationsPerTransaction" },
+  { 763,  DMEC, AE, "DDL is not supported with mixed data-node versions" },
   { 823,  DMEC, AE, "Too much attrinfo from application in tuple manager" },
   { 829,  DMEC, AE, "Corrupt data received for insert/update" },
   { 831,  DMEC, AE, "Too many nullable/bitfields in table definition" },

=== modified file 'storage/ndb/test/include/HugoQueryBuilder.hpp'
--- a/storage/ndb/test/include/HugoQueryBuilder.hpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/test/include/HugoQueryBuilder.hpp	2011-06-16 09:32:43 +0000
@@ -1,6 +1,5 @@
 /*
-   Copyright (C) 2003 MySQL AB
-    All rights reserved. Use is subject to license terms.
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -105,7 +104,7 @@ public:
   OptionMask getOptionMask() const { return m_options;}
   void setOptionMask(OptionMask om) { m_options = om;}
 
-  const NdbQueryDef * createQuery(Ndb*, bool takeOwnership = false);
+  const NdbQueryDef * createQuery(bool takeOwnership = false);
 
 private:
   struct TableDef

=== modified file 'storage/ndb/test/ndbapi/testSystemRestart.cpp'
--- a/storage/ndb/test/ndbapi/testSystemRestart.cpp	2011-02-18 18:40:25 +0000
+++ b/storage/ndb/test/ndbapi/testSystemRestart.cpp	2011-06-10 12:50:28 +0000
@@ -35,6 +35,14 @@ int runLoadTable(NDBT_Context* ctx, NDBT
   return NDBT_OK;
 }
 
+int
+clearOldBackups(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbBackup backup(GETNDB(step)->getNodeId());
+  backup.clearOldBackups();
+  return NDBT_OK;
+}
+
 #define CHECK(b) if (!(b)) { \
   g_err << "ERR: "<< step->getName() \
          << " failed on line " << __LINE__ << endl; \
@@ -2594,6 +2602,7 @@ TESTCASE("SR_DD_1", "")
 {
   TC_PROPERTY("ALL", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runStopper);
   STEP(runSR_DD_1);
   FINALIZER(runClearTable);
@@ -2601,6 +2610,7 @@ TESTCASE("SR_DD_1", "")
 TESTCASE("SR_DD_1b", "")
 {
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runSR_DD_1);
   FINALIZER(runClearTable);
 }
@@ -2609,6 +2619,7 @@ TESTCASE("SR_DD_1_LCP", "")
   TC_PROPERTY("ALL", 1);
   TC_PROPERTY("LCP", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runStopper);
   STEP(runSR_DD_1);
   FINALIZER(runClearTable);
@@ -2617,6 +2628,7 @@ TESTCASE("SR_DD_1b_LCP", "")
 {
   TC_PROPERTY("LCP", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runSR_DD_1);
   FINALIZER(runClearTable);
 }
@@ -2624,6 +2636,7 @@ TESTCASE("SR_DD_2", "")
 {
   TC_PROPERTY("ALL", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runStopper);
   STEP(runSR_DD_2);
   FINALIZER(runClearTable);
@@ -2631,6 +2644,7 @@ TESTCASE("SR_DD_2", "")
 TESTCASE("SR_DD_2b", "")
 {
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runSR_DD_2);
   FINALIZER(runClearTable);
 }
@@ -2639,6 +2653,7 @@ TESTCASE("SR_DD_2_LCP", "")
   TC_PROPERTY("ALL", 1);
   TC_PROPERTY("LCP", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runStopper);
   STEP(runSR_DD_2);
   FINALIZER(runClearTable);
@@ -2647,6 +2662,7 @@ TESTCASE("SR_DD_2b_LCP", "")
 {
   TC_PROPERTY("LCP", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runSR_DD_2);
   FINALIZER(runClearTable);
 }
@@ -2654,6 +2670,7 @@ TESTCASE("SR_DD_3", "")
 {
   TC_PROPERTY("ALL", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runStopper);
   STEP(runSR_DD_3);
   FINALIZER(runClearTable);
@@ -2661,6 +2678,7 @@ TESTCASE("SR_DD_3", "")
 TESTCASE("SR_DD_3b", "")
 {
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runSR_DD_3);
   FINALIZER(runClearTable);
 }
@@ -2669,6 +2687,7 @@ TESTCASE("SR_DD_3_LCP", "")
   TC_PROPERTY("ALL", 1);
   TC_PROPERTY("LCP", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runStopper);
   STEP(runSR_DD_3);
   FINALIZER(runClearTable);
@@ -2677,6 +2696,7 @@ TESTCASE("SR_DD_3b_LCP", "")
 {
   TC_PROPERTY("LCP", 1);
   INITIALIZER(runWaitStarted);
+  INITIALIZER(clearOldBackups);
   STEP(runSR_DD_3);
   FINALIZER(runClearTable);
 }

=== modified file 'storage/ndb/test/run-test/autotest-boot.sh'
--- a/storage/ndb/test/run-test/autotest-boot.sh	2011-06-01 08:55:03 +0000
+++ b/storage/ndb/test/run-test/autotest-boot.sh	2011-06-10 15:38:50 +0000
@@ -243,9 +243,9 @@ fi
 if [ "$build" ]
 then
     rm -rf $install_dir
-    
-	if [ -z "$clone1" ]
-	then
+    p=`pwd`
+    if [ -z "$clone1" ]
+    then
         cd $dst_place0
         if [ `uname | grep -ic cygwin || true` -ne 0 ]
         then
@@ -255,18 +255,19 @@ then
             cmd /c devenv.com MySql.sln /Build RelWithDebInfo
             cmd /c devenv.com MySql.sln /Project INSTALL /Build
         else
-	        BUILD/compile-ndb-autotest --prefix=$install_dir0
-	        make install
-        fi
-	else
-	    cd $dst_place0
 	    BUILD/compile-ndb-autotest --prefix=$install_dir0
 	    make install
-	    
-	    cd $dst_place1
-	    BUILD/compile-ndb-autotest --prefix=$install_dir1
-	    make install
-	fi
+        fi
+    else
+	cd $dst_place0
+	BUILD/compile-ndb-autotest --prefix=$install_dir0
+	make install
+	
+	cd $dst_place1
+	BUILD/compile-ndb-autotest --prefix=$install_dir1
+	make install
+    fi
+    cd $p
 fi
 
 

=== modified file 'storage/ndb/test/src/HugoQueryBuilder.cpp'
--- a/storage/ndb/test/src/HugoQueryBuilder.cpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/test/src/HugoQueryBuilder.cpp	2011-06-16 09:32:43 +0000
@@ -1,6 +1,5 @@
 /*
-   Copyright (C) 2003 MySQL AB
-    All rights reserved. Use is subject to license terms.
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -563,9 +562,9 @@ loop:
 }
 
 const NdbQueryDef *
-HugoQueryBuilder::createQuery(Ndb* pNdb, bool takeOwnership)
+HugoQueryBuilder::createQuery(bool takeOwnership)
 {
-  NdbQueryBuilder* const builder = NdbQueryBuilder::create(*pNdb);
+  NdbQueryBuilder* const builder = NdbQueryBuilder::create();
   if (builder == NULL)
   {
     ndbout << "Failed to create NdbQueryBuilder." << endl;

=== modified file 'storage/ndb/test/src/NdbBackup.cpp'
--- a/storage/ndb/test/src/NdbBackup.cpp	2011-02-02 00:40:07 +0000
+++ b/storage/ndb/test/src/NdbBackup.cpp	2011-06-10 12:50:28 +0000
@@ -64,7 +64,7 @@ NdbBackup::clearOldBackups()
      * Clear old backup files
      */ 
     BaseString tmp;
-    tmp.assfmt("ssh -v %s rm -rf %s/BACKUP", host, path);
+    tmp.assfmt("ssh %s rm -rf %s/BACKUP", host, path);
   
     ndbout << "buf: "<< tmp.c_str() <<endl;
     int res = system(tmp.c_str());  
@@ -107,6 +107,7 @@ loop:
     {
       NdbSleep_SecSleep(3);
       _backup_id += 100;
+      user_backup_id += 100;
       goto loop;
     }
     

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-wl4124-new1 branch (pekka.nousiainen:4402to 4403) Pekka Nousiainen16 Jun