List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:June 20 2011 10:41am
Subject:bzr commit into mysql-5.5-cluster branch (frazer.clement:3360)
View as plain text  
#At file:///home/frazer/bzr/mysql-5.5-cluster/ based on revid:magnus.blaudd@stripped

 3360 Frazer Clement	2011-06-20 [merge]
      Merge mysql-5.1-telco-7.1 -> mysql-5.5-cluster

    added:
      mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc
    modified:
      mysql-test/suite/ndb/r/ndb_basic.result
      mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc
      mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
      mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc
      mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc
      mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test
      mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ndb_share.h
      sql/ndb_thd_ndb.h
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbQueryBuilder.hpp
      storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/test/include/HugoQueryBuilder.hpp
      storage/ndb/test/src/HugoQueryBuilder.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2011-05-06 13:40:03 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2011-06-20 10:41:04 +0000
@@ -66,6 +66,7 @@ Ndb_cluster_node_id	#
 Ndb_config_from_host	#
 Ndb_config_from_port	#
 Ndb_conflict_fn_max	#
+Ndb_conflict_fn_max_del_win	#
 Ndb_conflict_fn_old	#
 Ndb_connect_count	#
 Ndb_execute_count	#

=== modified file 'mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc'
--- a/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc	2011-05-18 12:56:24 +0000
+++ b/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch.inc	2011-06-20 10:41:04 +0000
@@ -1,132 +1,83 @@
-#############################################################
-# Author:  Serge Kozlov <skozlov@stripped>
-# Date:    03/17/2008
-# Purpose: Set up circular cluster replication where each 
+# ==== Purpose ====
+#
+# Set up circular cluster replication where each 
 # cluster has two mysqlds and replication directions are 
 # following:
+#              1       2
 #          master ---> slave  
 #           /            \
 #     cluster A        cluster B
-#           \            /
+#           \  3       4 /
 #         master1 <--- slave1
-#############################################################
-
---source include/have_log_bin.inc
+#
+# ==== Usage ====
+#
+#   [--let $rpl_server_count= N]
+#   [--let $rpl_skip_check_server_ids= 1]
+#   [--let $rpl_skip_reset_master_and_slave= 1]
+#   [--let $rpl_skip_change_master= 1]
+#   [--let $rpl_skip_start_slave= 1]
+#   [--let $rpl_debug= 1]
+#   [--let $slave_timeout= NUMBER]
+#   --source include/ndb_master-slave_2ch.inc
+#
+# Parameters:
+#   $rpl_server_count, $rpl_skip_check_server_ids,
+#   $rpl_skip_reset_master_and_slave, $rpl_skip_change_master,
+#   $rpl_skip_start_slave, $rpl_debug, $slave_timeout
+#     See include/master-slave.inc
+
+#--let $rpl_debug= 1
+--let $rpl_topology= 1->2,4->3
+--let $rpl_skip_start_slave= 1
+--source include/rpl_init.inc
 
 # Make connections to mysqlds
 
-connect (master,127.0.0.1,root,,test,$MASTER_MYPORT,);
-connect (master1,127.0.0.1,root,,test,$MASTER_MYPORT1,);
-connect (slave,127.0.0.1,root,,test,$SLAVE_MYPORT,);
-connect (slave1,127.0.0.1,root,,test,$SLAVE_MYPORT1,);
-
-# Check that all mysqld compiled with ndb support
-
---connection master
---disable_query_log
---require r/true.require
-SELECT (support = 'YES' or support = 'DEFAULT') AS `TRUE` FROM information_schema.engines WHERE engine = 'ndbcluster';
---enable_query_log
-
---connection master1
---disable_query_log
---require r/true.require
-SELECT (support = 'YES' or support = 'DEFAULT') AS `TRUE` FROM information_schema.engines WHERE engine = 'ndbcluster';
---enable_query_log
+--let $rpl_connection_name= master
+--let $rpl_server_number= 1
+--source include/rpl_connect.inc
+
+--let $rpl_connection_name= master1
+--let $rpl_server_number= 3
+--source include/rpl_connect.inc
+
+--let $rpl_connection_name= slave
+--let $rpl_server_number= 2
+--source include/rpl_connect.inc
+
+--let $rpl_connection_name= slave1
+--let $rpl_server_number= 4
+--source include/rpl_connect.inc
 
---connection slave
+# Now add IGNORE_SERVER_IDS
 --disable_query_log
---require r/true.require
-SELECT (support = 'YES' or support = 'DEFAULT') AS `TRUE` FROM information_schema.engines WHERE engine = 'ndbcluster';
---enable_query_log
-
---connection slave1
---disable_query_log
---require r/true.require
-SELECT (support = 'YES' or support = 'DEFAULT') AS `TRUE` FROM information_schema.engines WHERE engine = 'ndbcluster';
---enable_query_log
-
-# Stop slaves
-
---connection master
---disable_warnings
-STOP SLAVE;
---wait_for_slave_to_stop
---enable_warnings
-
---connection master1
---disable_warnings
-STOP SLAVE;
---wait_for_slave_to_stop
---enable_warnings
-
---connection slave
---disable_warnings
-STOP SLAVE;
---wait_for_slave_to_stop
---enable_warnings
-
---connection slave1
---disable_warnings
-STOP SLAVE;
---wait_for_slave_to_stop
---enable_warnings
-
-# Reset masters
-
---connection master
---disable_warnings
---disable_query_log
-USE test;
---enable_query_log
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
---enable_warnings
-RESET MASTER;
-
---connection master1
---disable_warnings
---disable_query_log
-USE test;
---enable_query_log
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
---enable_warnings
-RESET MASTER;
-
---connection slave
---disable_warnings
---disable_query_log
-USE test;
---enable_query_log
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
---enable_warnings
-RESET MASTER;
-
---connection slave1
---disable_warnings
---disable_query_log
-USE test;
---enable_query_log
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
---enable_warnings
-RESET MASTER;
-
-# Start slaves
-
---connection slave
-RESET SLAVE;
---replace_result $MASTER_MYPORT MASTER_MYPORT
---eval CHANGE MASTER TO master_host='127.0.0.1',master_port=$MASTER_MYPORT,master_user='root'
-START SLAVE;
---source include/wait_for_slave_to_start.inc
-
---connection master1
-RESET SLAVE;
---replace_result $SLAVE_MYPORT1 SLAVE_MYPORT1
---eval CHANGE MASTER TO master_host='127.0.0.1',master_port=$SLAVE_MYPORT1,master_user='root'
-START SLAVE;
---source include/wait_for_slave_to_start.inc
+connection master;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (1,3);
+connection master1;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (1,3);
+connection slave;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (2,4);
+connection slave1;
+CHANGE MASTER TO IGNORE_SERVER_IDS= (2,4);
+
+# Now start replication
+--source include/rpl_start_slaves.inc
+--enable_query_log
+
+# Check that all mysqld are compiled with ndb support
+--let $_rpl_server= 4
+while ($_rpl_server)
+{
+  --connection server_$_rpl_server
+  if (`SELECT COUNT(*) = 0 FROM INFORMATION_SCHEMA.ENGINES WHERE engine = 'ndbcluster' AND (support = 'YES' OR support = 'DEFAULT')`)
+  {
+    --skip Test requires NDB.
+  }
+  --source include/ndb_not_readonly.inc
+  --dec $_rpl_server
+}
 
 
 # Set the default connection to 'master' (cluster A)
 connection master;
-

=== added file 'mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc'
--- a/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/ndb_master-slave_2ch_end.inc	2011-06-16 12:36:28 +0000
@@ -0,0 +1,28 @@
+# ==== Purpose ====
+#
+# Clean up replication configuration after using a 2ch
+# setup.
+# We need to explicitly reset the IGNORE_SERVER_IDS parameters
+# on all Servers to avoid testcase check errors.
+#
+# ==== Usage ====
+#
+#   [--let $rpl_debug= 1]
+#   --source suite/ndb_rpl/ndb_master-slave_2ch_end.inc
+#
+# Parameters:
+#   $rpl_debug
+#     See include/master-slave.inc
+
+--source include/rpl_stop_slaves.inc
+--connection master
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--connection master1
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--connection slave
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--connection slave1
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+--source include/rpl_start_slaves.inc
+
+--source include/rpl_end.inc
\ No newline at end of file

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result	2011-05-18 12:56:24 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_circular_2ch.result	2011-06-20 10:41:04 +0000
@@ -1,21 +1,9 @@
-STOP SLAVE;
-STOP SLAVE;
-STOP SLAVE;
-STOP SLAVE;
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
-RESET MASTER;
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
-RESET MASTER;
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
-RESET MASTER;
-DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7,t8,t9;
-RESET MASTER;
-RESET SLAVE;
-CHANGE MASTER TO master_host='127.0.0.1',master_port=MASTER_MYPORT,master_user='root';
-START SLAVE;
-RESET SLAVE;
-CHANGE MASTER TO master_host='127.0.0.1',master_port=SLAVE_MYPORT1,master_user='root';
-START SLAVE;
+include/rpl_init.inc [topology=1->2,4->3]
+include/rpl_connect.inc [creating master]
+include/rpl_connect.inc [creating master1]
+include/rpl_connect.inc [creating slave]
+include/rpl_connect.inc [creating slave1]
+include/rpl_start_slaves.inc
 
 *** Check server_id of mysqld servers ***
 SHOW VARIABLES LIKE "server_id";
@@ -25,7 +13,7 @@ SET auto_increment_offset = 1;
 SET auto_increment_increment = 2;
 SHOW VARIABLES LIKE "server_id";
 Variable_name	Value
-server_id	1
+server_id	3
 SET auto_increment_offset = 1;
 SET auto_increment_increment = 2;
 SHOW VARIABLES LIKE "server_id";
@@ -35,7 +23,7 @@ SET auto_increment_offset = 2;
 SET auto_increment_increment = 2;
 SHOW VARIABLES LIKE "server_id";
 Variable_name	Value
-server_id	2
+server_id	4
 SET auto_increment_offset = 2;
 SET auto_increment_increment = 2;
 
@@ -45,20 +33,26 @@ CREATE TABLE t1 (a INT NOT NULL AUTO_INC
 *** Basic testing  ***
 Insert rows via all hosts
 Check data on both clusters 
-Comparing tables master:test.t1 and slave:test.t1
+include/diff_tables.inc [master:t1, slave:t1]
 *** Transaction testing ***
 BEGIN;
 BEGIN;
 COMMIT;
 COMMIT;
 Check data on both clusters 
-Comparing tables master:test.t1 and slave:test.t1
+include/diff_tables.inc [master:t1, slave:t1]
 BEGIN;
 BEGIN;
 ROLLBACK;
 ROLLBACK;
 Check data on both clusters 
-Comparing tables master:test.t1 and slave:test.t1
-DROP TABLE t1;
+include/diff_tables.inc [master:t1, slave:t1]
 DROP TABLE IF EXISTS t1;
 
+include/rpl_stop_slaves.inc
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+CHANGE MASTER TO IGNORE_SERVER_IDS= ();
+include/rpl_start_slaves.inc
+include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -162,10 +174,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -198,10 +219,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -259,6 +289,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -285,6 +318,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -305,6 +341,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -332,6 +371,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -380,10 +422,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -416,10 +467,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -473,10 +533,13 @@ a	b	c	d
 3	Master t2 c=2	2	123
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+3
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-3
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -505,10 +568,13 @@ a	b	c	d
 3	Master t2 a=3 at c=3	3	123
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-4
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -533,10 +599,13 @@ select * from t2 order by a, d;
 a	b	c	d
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-4
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -572,6 +641,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -616,10 +688,13 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+3
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-3
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -658,10 +733,13 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+6
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-6
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -733,6 +811,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -759,6 +840,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -779,6 +863,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -806,6 +893,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -850,15 +940,24 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-1
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
+2	1	#	4	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 2	1	#	#	4	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
@@ -889,15 +988,24 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-1
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
+2	1	#	4	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 2	1	#	#	4	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -158,14 +170,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -194,14 +215,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -259,6 +289,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -285,6 +318,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -305,6 +341,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -332,6 +371,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -376,14 +418,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -412,14 +463,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -476,6 +536,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -508,6 +571,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 4
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -536,6 +602,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 4
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -572,6 +641,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -619,6 +691,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -661,6 +736,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 6
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -733,6 +811,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -759,6 +840,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -779,6 +863,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -806,6 +893,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -850,14 +940,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -886,14 +985,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -162,6 +174,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -204,6 +219,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 5
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -275,6 +293,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -301,6 +322,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -321,6 +345,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -348,6 +375,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -396,6 +426,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -438,6 +471,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 5
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -509,6 +545,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -541,6 +580,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -569,6 +611,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -604,6 +649,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -652,6 +700,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -694,6 +745,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 6
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -765,6 +819,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -791,6 +848,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -811,6 +871,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -838,6 +901,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -886,6 +952,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -931,6 +1000,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 6
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2011-05-18 12:56:24 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2011-06-20 10:41:04 +0000
@@ -2,29 +2,35 @@ include/master-slave.inc
 [connection master]
 create table t1 (a int key, X int) engine ndb;
 Warnings:
-Warning	1625	Bad schema for mysql.ndb_replication table. Message: Wrong number of primary key parts, expected 3
+Error	1626	Bad schema for mysql.ndb_replication table. Message: Wrong number of primary key parts, expected 3
 drop table t1;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
-Warning	1626	Error in parsing conflict function. Message: NDB$X(X), unknown conflict resolution function at 'NDB$X(X)'
-drop table t1;
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
+Error	1627	Error in parsing conflict function. Message: NDB$X(X), unknown conflict resolution function at 'NDB$X(X)'
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
 create table t1 (a int key, X int) engine ndb;
 Warnings:
-Warning	1626	Error in parsing conflict function. Message: column 'X' has wrong datatype
+Error	1627	Error in parsing conflict function. Message: column 'X' has wrong datatype
 drop table t1;
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
-Warning	1626	Error in parsing conflict function. Message: NDB$MAX(), missing function argument at ')'
-drop table t1;
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
+Error	1627	Error in parsing conflict function. Message: NDB$MAX(), missing function argument at ')'
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
-Warning	1626	Error in parsing conflict function. Message: NDB$MAX(X Y), missing ')' at 'Y)'
-drop table t1;
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
+Error	1627	Error in parsing conflict function. Message: NDB$MAX(X Y), missing ')' at 'Y)'
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc	2011-06-16 14:34:56 +0000
@@ -1,5 +1,6 @@
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
 --replace_column 3 # 5 # 6 #
 --error 0,1146
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc	2011-06-16 14:34:56 +0000
@@ -2,6 +2,7 @@
 --disable_result_log
 SELECT @init_ndb_conflict_fn_max:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 SELECT @init_ndb_conflict_fn_old:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
+SELECT @init_ndb_conflict_fn_max_del_win:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
 --error 0,1146
 DELETE FROM `t1$EX`;
 --enable_query_log

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf	2011-05-18 12:56:24 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.cnf	2011-06-20 10:41:04 +0000
@@ -2,34 +2,39 @@
 
 # 2 clusters, each with 2 MySQLDs
 # All MySQLDs log-slave-updates
-# Potential infinite loops are broken by both servers 
-# on each cluster having the same server-id
-# To support > 2 clusters and/or different server-ids per
-# MySQLD server, we need some other loop breaking 
-# mechanism 
+# All MySQLDs log-apply-status
+# Infinite loops broken in the test using Ignore_server_ids mechanism
 
 [mysqld.1.1]
 server-id= 1
 log-bin
+ndb_connectstring=	@mysql_cluster.1.ndb_connectstring
 log-slave-updates
+skip-slave-start
 
 [mysqld.2.1]
-server-id= 1
+server-id= 3
 log-bin
+ndb_connectstring=	@mysql_cluster.1.ndb_connectstring
 log-slave-updates
+skip-slave-start
 
 [mysqld.1.slave]
 server-id= 2
 log-bin
+ndb_connectstring=	@mysql_cluster.slave.ndb_connectstring
 log-slave-updates
 skip-slave-start
 
 [mysqld.2.slave]
-server-id= 2
+server-id= 4
 log-bin
 ndb_connectstring=	@mysql_cluster.slave.ndb_connectstring
+log-slave-updates
+skip-slave-start
 
 [ENV]
-
-SLAVE_MYPORT1=		@mysqld.2.slave.port
-SLAVE_MYSOCK1=		@mysqld.2.slave.socket
+SERVER_MYPORT_1=        @mysqld.1.1.port
+SERVER_MYPORT_2=        @mysqld.1.slave.port
+SERVER_MYPORT_3=        @mysqld.2.1.port
+SERVER_MYPORT_4=        @mysqld.2.slave.port

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_circular_2ch.test	2011-06-16 12:36:28 +0000
@@ -163,5 +163,8 @@ DROP TABLE IF EXISTS t1;
 --source include/wait_for_query_to_fail.inc
 --echo
 
+--connection master
+
 # End of test 5.1
---source include/rpl_end.inc
+--source suite/ndb_rpl/ndb_master-slave_2ch_end.inc
+

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2011-06-16 14:34:56 +0000
@@ -47,10 +47,11 @@ CREATE TABLE mysql.ndb_replication
 --enable_query_log
 
 # Non existant conflict_fn
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 # Column type cannot be used for this function
@@ -61,17 +62,19 @@ drop table t1;
 delete from mysql.ndb_replication;
 
 # Too few arguments
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 # Too many arguments
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 --disable_query_log

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-06-15 14:21:47 +0000
+++ b/sql/ha_ndbcluster.cc	2011-06-20 10:41:04 +0000
@@ -335,6 +335,11 @@ ndbcluster_alter_table_flags(uint flags)
 
 #define NDB_AUTO_INCREMENT_RETRIES 100
 #define BATCH_FLUSH_SIZE (32768)
+/*
+  Room for 10 instruction words, two labels (@ 2words/label)
+  + 2 extra words for the case of resolve_size == 8
+*/
+#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16
 
 #define ERR_PRINT(err) \
   DBUG_PRINT("error", ("%d  message: %s", err.code, err.message))
@@ -418,8 +423,6 @@ struct st_ndb_status {
 };
 
 static struct st_ndb_status g_ndb_status;
-static long g_ndb_status_conflict_fn_max= 0;
-static long g_ndb_status_conflict_fn_old= 0;
 
 long long g_event_data_count = 0;
 long long g_event_nondata_count = 0;
@@ -436,6 +439,37 @@ update_slave_api_stats(Ndb* ndb)
     g_slave_api_client_stats[i] = ndb->getClientStat(i);
 }
 
+st_ndb_slave_state g_ndb_slave_state;
+
+st_ndb_slave_state::st_ndb_slave_state()
+  : current_conflict_defined_op_count(0)
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  memset(total_violation_count, 0, sizeof(total_violation_count));
+};
+
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+  /* Reset current-transaction counters + state */
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  current_conflict_defined_op_count = 0;
+}
+
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+  /* Merge committed transaction counters into total state
+   * Then reset current transaction counters
+   */
+  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+  {
+    total_violation_count[i]+= current_violation_count[i];
+    current_violation_count[i] = 0;
+  }
+  current_conflict_defined_op_count = 0;
+}
+
 static int update_status_variables(Thd_ndb *thd_ndb,
                                    st_ndb_status *ns,
                                    Ndb_cluster_connection *c)
@@ -552,8 +586,9 @@ SHOW_VAR ndb_status_variables_dynamic[]=
 };
 
 SHOW_VAR ndb_status_conflict_variables[]= {
-  {"fn_max",     (char*) &g_ndb_status_conflict_fn_max, SHOW_LONG},
-  {"fn_old",     (char*) &g_ndb_status_conflict_fn_old, SHOW_LONG},
+  {"fn_max",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX], SHOW_LONG},
+  {"fn_old",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONG},
+  {"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONG},
   {NullS, NullS, SHOW_LONG}
 };
 
@@ -713,6 +748,24 @@ static int write_conflict_row(NDB_SHARE
 }
 #endif
 
+#ifdef HAVE_NDB_BINLOG
+int
+handle_conflict_op_error(Thd_ndb* thd_ndb,
+                         NdbTransaction* trans,
+                         const NdbError& err,
+                         const NdbOperation* op);
+
+int
+handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    const NdbRecord* key_rec,
+                    const uchar* pk_row,
+                    enum_conflicting_op_type op_type,
+                    enum_conflict_cause conflict_cause,
+                    const NdbError& conflict_error,
+                    NdbTransaction* conflict_trans,
+                    NdbError& err);
+#endif
+
 inline int
 check_completed_operations_pre_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
                                       const NdbOperation *first,
@@ -733,102 +786,42 @@ check_completed_operations_pre_commit(Th
     or exceptions to report
   */
 #ifdef HAVE_NDB_BINLOG
-  uint conflict_rows_written= 0;
+  const NdbOperation* lastUserOp = trans->getLastDefinedOperation();
 #endif
   while (true)
   {
     const NdbError &err= first->getNdbError();
-    if (err.classification != NdbError::NoError
-#ifndef HAVE_NDB_BINLOG
-        && err.classification != NdbError::ConstraintViolation
-        && err.classification != NdbError::NoDataFound
-#endif
-        )
+    const bool op_has_conflict_detection = (first->getCustomData() != NULL);
+    if (!op_has_conflict_detection)
     {
-#ifdef HAVE_NDB_BINLOG
-      DBUG_PRINT("info", ("ndb error: %d", err.code));
-      if (err.code == (int) error_conflict_fn_max_violation)
-      {
-        DBUG_PRINT("info", ("err.code == (int) error_conflict_fn_max_violation"));
-        thd_ndb->m_max_violation_count++;
-      }
-      else if (err.code == (int) error_conflict_fn_old_violation ||
-               err.classification == NdbError::ConstraintViolation ||
-               err.classification == NdbError::NoDataFound)
-      {
-        DBUG_PRINT("info",
-                   ("err.code %s (int) error_conflict_fn_old_violation, "
-                    "err.classification %s",
-                    err.code == (int) error_conflict_fn_old_violation ? "==" : "!=",
-                    err.classification
-                    == NdbError::ConstraintViolation
-                    ? "== NdbError::ConstraintViolation"
-                    : (err.classification == NdbError::NoDataFound
-                       ? "== NdbError::NoDataFound" : "!=")));
-        thd_ndb->m_old_violation_count++;
-        const void* buffer= first->getCustomData();
-        if (buffer != NULL)
-        {
-          Ndb_exceptions_data ex_data;
-          memcpy(&ex_data, buffer, sizeof(ex_data));
-          NDB_SHARE *share= ex_data.share;
-          const uchar* row= ex_data.row;
-          DBUG_ASSERT(share != NULL && row != NULL);
-
-          NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
-          if (cfn_share && cfn_share->m_ex_tab)
-          {
-            NdbError ex_err;
-            if (write_conflict_row(share, trans, row, ex_err))
-            {
-              char msg[FN_REFLEN];
-              my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
-                          cfn_share->m_ex_tab->getName(),
-                          ex_err.code, ex_err.message);
-
-              NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
-
-              if (ex_err.classification == NdbError::SchemaError)
-              {
-                dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
-                cfn_share->m_ex_tab= NULL;
-              }
-              else if (ex_err.status == NdbError::TemporaryError)
-              {
-                /* Slave will roll back and retry entire transaction. */
-                ERR_RETURN(ex_err);
-              }
-              else
-              {
-                push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
-                                    ER_EXCEPTIONS_WRITE_ERROR,
-                                    ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-                /* Slave will stop replication. */
-                DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-              }
-            }
-            else
-            {
-              conflict_rows_written++;
-            }
-          }
-          else
-          {
-            DBUG_PRINT("info", ("missing %s", !cfn_share ? "cfn_share" : "ex_tab"));
-          }
-        }
-        else
-        {
-          DBUG_PRINT("info", ("missing custom data"));
-        }
-      }
-      else
-#endif
+      /* 'Normal path' - ignore key (not) present, others are errors */
+      if (err.classification != NdbError::NoError &&
+          err.classification != NdbError::ConstraintViolation &&
+          err.classification != NdbError::NoDataFound)
       {
+        /* Non ignored error, report it */
         DBUG_PRINT("info", ("err.code == %u", err.code));
         DBUG_RETURN(err.code);
       }
     }
+#ifdef HAVE_NDB_BINLOG
+    else
+    {
+      /*
+         Op with conflict detection, use special error handling method
+       */
+
+      if (err.classification != NdbError::NoError)
+      {
+        int res = handle_conflict_op_error(thd_ndb,
+                                           trans,
+                                           err,
+                                           first);
+        if (res != 0)
+          DBUG_RETURN(res);
+      }
+    } // if (!op_has_conflict_detection)
+#endif
     if (err.classification != NdbError::NoError)
       ignores++;
 
@@ -840,7 +833,11 @@ check_completed_operations_pre_commit(Th
   if (ignore_count)
     *ignore_count= ignores;
 #ifdef HAVE_NDB_BINLOG
-  if (conflict_rows_written)
+  /*
+     Conflict detection related error handling above may have defined
+     new operations on the transaction.  If so, execute them now
+  */
+  if (trans->getLastDefinedOperation() != lastUserOp)
   {
     if (trans->execute(NdbTransaction::NoCommit,
                        NdbOperation::AO_IgnoreError,
@@ -944,10 +941,10 @@ int execute_no_commit(Thd_ndb *thd_ndb,
                                                     ignore_count));
 }
 
-int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                    int force_send, int ignore_error, uint *ignore_count= 0);
 inline
-int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                    int force_send, int ignore_error, uint *ignore_count)
 {
   DBUG_ENTER("execute_commit");
@@ -964,19 +961,19 @@ int execute_commit(Thd_ndb *thd_ndb, Ndb
   const NdbOperation *first= trans->getFirstDefinedOperation();
   const NdbOperation *last= trans->getLastDefinedOperation();
   thd_ndb->m_execute_count++;
-  thd_ndb->m_conflict_fn_usage_count= 0;
   thd_ndb->m_unsent_bytes= 0;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
   if (trans->execute(NdbTransaction::Commit, ao, force_send))
   {
-    thd_ndb->m_max_violation_count= 0;
-    thd_ndb->m_old_violation_count= 0;
+    if (thd->slave_thread)
+      g_ndb_slave_state.atTransactionAbort();
     DBUG_RETURN(-1);
   }
-  g_ndb_status_conflict_fn_max+= thd_ndb->m_max_violation_count;
-  g_ndb_status_conflict_fn_old+= thd_ndb->m_old_violation_count;
-  thd_ndb->m_max_violation_count= 0;
-  thd_ndb->m_old_violation_count= 0;
+  /* Success of some sort */
+  if (thd->slave_thread)
+  {
+    g_ndb_slave_state.atTransactionCommit();
+  }
   if (!ignore_error || trans->getNdbError().code == 0)
     DBUG_RETURN(trans->getNdbError().code);
   DBUG_RETURN(check_completed_operations(thd_ndb, trans, first, last,
@@ -1033,9 +1030,6 @@ Thd_ndb::Thd_ndb(THD* thd) :
   m_execute_count= 0;
   m_scan_count= 0;
   m_pruned_scan_count= 0;
-  m_max_violation_count= 0;
-  m_old_violation_count= 0;
-  m_conflict_fn_usage_count= 0;
   bzero(m_transaction_no_hint_count, sizeof(m_transaction_no_hint_count));
   bzero(m_transaction_hint_count, sizeof(m_transaction_hint_count));
   global_schema_lock_trans= NULL;
@@ -2977,9 +2971,8 @@ int ha_ndbcluster::ndb_pk_update_row(THD
     DBUG_PRINT("info", ("insert failed"));
     if (trans->commitStatus() == NdbConnection::Started)
     {
-      m_thd_ndb->m_max_violation_count= 0;
-      m_thd_ndb->m_old_violation_count= 0;
-      m_thd_ndb->m_conflict_fn_usage_count= 0;
+      if (thd->slave_thread)
+        g_ndb_slave_state.atTransactionAbort();
       m_thd_ndb->m_unsent_bytes= 0;
       m_thd_ndb->m_execute_count++;
       DBUG_PRINT("info", ("execute_count: %u", m_thd_ndb->m_execute_count));
@@ -3912,6 +3905,239 @@ thd_allow_batch(const THD* thd)
 #endif
 }
 
+#ifdef HAVE_NDB_BINLOG
+/**
+   prepare_conflict_detection
+
+   This method is called during operation definition by the slave,
+   when writing to a table with conflict detection defined.
+
+   It is responsible for defining and adding any operation filtering
+   required, and for saving any operation definition state required
+   for post-execute analysis
+*/
+int
+ha_ndbcluster::prepare_conflict_detection(enum_conflicting_op_type op_type,
+                                          const NdbRecord* key_rec,
+                                          const uchar* old_data,
+                                          const uchar* new_data,
+                                          NdbInterpretedCode* code,
+                                          NdbOperation::OperationOptions* options)
+{
+  DBUG_ENTER("prepare_conflict_detection");
+
+  int res = 0;
+  const st_conflict_fn_def* conflict_fn = m_share->m_cfn_share->m_conflict_fn;
+  assert( conflict_fn != NULL );
+
+
+  /*
+     Prepare interpreted code for operation (update + delete only) according
+     to algorithm used
+  */
+  if (op_type != WRITE_ROW)
+  {
+    res = conflict_fn->prep_func(m_share->m_cfn_share,
+                                 op_type,
+                                 old_data,
+                                 new_data,
+                                 table->write_set,
+                                 code);
+
+    if (!res)
+    {
+      /* Attach conflict detecting filter program to operation */
+      options->optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
+      options->interpretedCode= code;
+    }
+  } // if (op_type != WRITE_ROW)
+
+  g_ndb_slave_state.current_conflict_defined_op_count++;
+
+  /* Now save data for potential insert to exceptions table... */
+  const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
+  Ndb_exceptions_data ex_data;
+  ex_data.share= m_share;
+  ex_data.key_rec= key_rec;
+  ex_data.op_type= op_type;
+  /*
+    We need to save the row data for possible conflict resolution after
+    execute().
+  */
+  ex_data.row= copy_row_to_buffer(m_thd_ndb, row_to_save);
+  uchar* ex_data_buffer= get_buffer(m_thd_ndb, sizeof(ex_data));
+  if (ex_data.row == NULL || ex_data_buffer == NULL)
+  {
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+  memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
+
+  /* Store ptr to exceptions data in operation 'customdata' ptr */
+  options->optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
+  options->customData= (void*)ex_data_buffer;
+
+  DBUG_RETURN(0);
+}
+
+/**
+   handle_conflict_op_error
+
+   This method is called when an error is detected after executing an
+   operation with conflict detection active.
+
+   If the operation error is related to conflict detection, handling
+   starts.
+
+   Handling involves incrementing the relevant counter, and optionally
+   refreshing the row and inserting an entry into the exceptions table
+*/
+
+int
+handle_conflict_op_error(Thd_ndb* thd_ndb,
+                         NdbTransaction* trans,
+                         const NdbError& err,
+                         const NdbOperation* op)
+{
+  DBUG_ENTER("handle_conflict_op_error");
+  DBUG_PRINT("info", ("ndb error: %d", err.code));
+
+  if ((err.code == (int) error_conflict_fn_violation) ||
+      (err.classification == NdbError::ConstraintViolation) ||
+      (err.classification == NdbError::NoDataFound))
+  {
+    DBUG_PRINT("info",
+               ("err.code %s (int) error_conflict_fn_violation, "
+                "err.classification %s",
+                err.code == (int) error_conflict_fn_violation ? "==" : "!=",
+                err.classification
+                == NdbError::ConstraintViolation
+                ? "== NdbError::ConstraintViolation"
+                : (err.classification == NdbError::NoDataFound
+                   ? "== NdbError::NoDataFound" : "!=")));
+
+    enum_conflict_cause conflict_cause;
+
+    if (err.code == (int) error_conflict_fn_violation)
+    {
+      conflict_cause= ROW_IN_CONFLICT;
+    }
+    else if (err.classification == NdbError::ConstraintViolation)
+    {
+      conflict_cause= ROW_ALREADY_EXISTS;
+    }
+    else
+    {
+      assert(err.classification == NdbError::NoDataFound);
+      conflict_cause= ROW_DOES_NOT_EXIST;
+    }
+
+    const void* buffer=op->getCustomData();
+    assert(buffer);
+    Ndb_exceptions_data ex_data;
+    memcpy(&ex_data, buffer, sizeof(ex_data));
+    NDB_SHARE *share= ex_data.share;
+    const NdbRecord* key_rec= ex_data.key_rec;
+    const uchar* row= ex_data.row;
+    enum_conflicting_op_type op_type = ex_data.op_type;
+    DBUG_ASSERT(share != NULL && row != NULL);
+
+    NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
+    if (cfn_share)
+    {
+      enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
+      bool haveExTable = cfn_share->m_ex_tab != NULL;
+
+      g_ndb_slave_state.current_violation_count[cft]++;
+
+      {
+        NdbError handle_error;
+        if (handle_row_conflict(cfn_share,
+                                key_rec,
+                                row,
+                                op_type,
+                                conflict_cause,
+                                err,
+                                trans,
+                                handle_error))
+        {
+          /* Error with handling of row conflict */
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "Row conflict handling "
+                      "on table %s hit Ndb error %d '%s'",
+                      share->table_name,
+                      handle_error.code,
+                      handle_error.message);
+
+          if (handle_error.status == NdbError::TemporaryError)
+          {
+            /* Slave will roll back and retry entire transaction. */
+            ERR_RETURN(handle_error);
+          }
+          else
+          {
+            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                                ER_EXCEPTIONS_WRITE_ERROR,
+                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+            /* Slave will stop replication. */
+            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+          }
+        }
+      }
+
+
+      if (haveExTable)
+      {
+        NdbError ex_err;
+        if (write_conflict_row(share, trans, row, ex_err))
+        {
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
+                      cfn_share->m_ex_tab->getName(),
+                      ex_err.code, ex_err.message);
+
+          NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
+
+          if (ex_err.classification == NdbError::SchemaError)
+          {
+            dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
+            cfn_share->m_ex_tab= NULL;
+          }
+          else if (ex_err.status == NdbError::TemporaryError)
+          {
+            /* Slave will roll back and retry entire transaction. */
+            ERR_RETURN(ex_err);
+          }
+          else
+          {
+            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                                ER_EXCEPTIONS_WRITE_ERROR,
+                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+            /* Slave will stop replication. */
+            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+          }
+        }
+      } // if (haveExTable)
+
+      DBUG_RETURN(0);
+    }
+    else
+    {
+      DBUG_PRINT("info", ("missing cfn_share"));
+      DBUG_RETURN(0); // TODO : Correct?
+    }
+  }
+  else
+  {
+    /* Non conflict related error */
+    DBUG_PRINT("info", ("err.code == %u", err.code));
+    DBUG_RETURN(err.code);
+  }
+
+  DBUG_RETURN(0); // Reachable?
+}
+#endif /* HAVE_NDB_BINLOG */
+
+
 int ha_ndbcluster::write_row(uchar *record)
 {
   DBUG_ENTER("ha_ndbcluster::write_row");
@@ -4096,14 +4322,16 @@ int ha_ndbcluster::ndb_write_row(uchar *
   MY_BITMAP tmpBitmap;
   MY_BITMAP *user_cols_written_bitmap;
 #ifdef HAVE_NDB_BINLOG
-  uchar* ex_data_buffer= NULL;
+  bool haveConflictFunction =
+    (thd->slave_thread &&
+     m_share->m_cfn_share &&
+     m_share->m_cfn_share->m_conflict_fn);
 #endif
   
   if (m_use_write
 #ifdef HAVE_NDB_BINLOG
-      && !(thd->slave_thread &&
-           m_share->m_cfn_share &&
-           m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF)
+      /* Conflict detection must use normal Insert */
+      && !haveConflictFunction
 #endif
       )
   {
@@ -4135,52 +4363,7 @@ int ha_ndbcluster::ndb_write_row(uchar *
       user_cols_written_bitmap= NULL;
       mask= NULL;
     }
-
-#if 0 /* NOT YET, interpeted function not supported for write */
-#ifdef HAVE_NDB_BINLOG
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
-    NdbInterpretedCode code(m_table, buffer,
-                            sizeof(buffer)/sizeof(buffer[0]));
-    if (useWriteSet)
-    {
-      /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share ? m_share->m_cfn_share->m_resolve_cft : CFT_NDB_UNDEF;
-      if (cft != CFT_NDB_UNDEF)
-      {
-        if (!write_row_conflict_fn(cft, record, &code))
-        {
-          options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-          options.interpretedCode= &code;
-          thd_ndb->m_conflict_fn_usage_count++;
-        }
-        
-        Ndb_exceptions_data ex_data;
-        ex_data.share= m_share;
-        /*
-          We need to save the row data for possible conflict resolution after
-          execute().
-        */
-        ex_data.row= copy_row_to_buffer(thd_ndb, record);
-        ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-        if (ex_data.row == NULL || ex_data_buffer == NULL)
-        {
-          DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-        }
-        memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-        options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-        options.customData= (void*)ex_data_buffer;
-        if (options.optionsPresent != 0)
-          poptions=&options;
-      }
-    }
-#endif  /* HAVE_NDB_BINLOG */
-#endif  /* NOT YET */
-
+    /* TODO : Add conflict detection etc when interpreted write supported */
     op= trans->writeTuple(key_rec, (const char *)key_row, m_ndb_record,
                           (char *)record, mask,
                           poptions, sizeof(NdbOperation::OperationOptions));
@@ -4188,27 +4371,16 @@ int ha_ndbcluster::ndb_write_row(uchar *
   else
   {
 #ifdef HAVE_NDB_BINLOG
-    if (m_use_write)
+    if (haveConflictFunction)
     {
-      thd_ndb->m_conflict_fn_usage_count++;
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, record);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
-      poptions=&options;
+      /* Conflict detection in slave thread */
+      if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
+                                                       key_rec,
+                                                       NULL,    /* old_data */
+                                                       record,  /* new_data */
+                                                       NULL,    /* code */
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif
     uchar *mask;
@@ -4358,248 +4530,20 @@ int ha_ndbcluster::primary_key_cmp(const
 
 #ifdef HAVE_NDB_BINLOG
 
-/**
-  CFT_NDB_NEW
-
-  To perform conflict resolution, an interpreted program is used to read
-  the timestamp stored locally and compare to what is going to be applied.
-  If timestamp is lower, an error for this operation (9999) will be raised,
-  and new row will not be applied. The error codes for the operations will
-  be checked on return.  For this to work is is vital that the operation
-  is run with ignore error option.
-*/
-
 int
-ha_ndbcluster::row_conflict_fn_max(const uchar *new_data,
-                                   NdbInterpretedCode *code)
+handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    const NdbRecord* key_rec,
+                    const uchar* pk_row,
+                    enum_conflicting_op_type op_type,
+                    enum_conflict_cause conflict_cause,
+                    const NdbError& conflict_error,
+                    NdbTransaction* conflict_trans,
+                    NdbError& err)
 {
-  uint32 resolve_column= m_share->m_cfn_share->m_resolve_column;
-  uint32 resolve_size= m_share->m_cfn_share->m_resolve_size;
-
-  DBUG_PRINT("info", ("interpreted update pre equal on column %u",
-                      resolve_column));
-
-  DBUG_ASSERT(resolve_size == 4 || resolve_size == 8);
-
-  if (!bitmap_is_set(table->write_set, resolve_column))
-  {
-    sql_print_information("NDB Slave: missing data for NDB_MAX");
-    return 1;
-  }
-
-  const uint label_0= 0;
-  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
-  int r;
-  Field *field= table->field[resolve_column];
-  DBUG_PRINT("info", ("interpreted update post equal"));
-  /*
-   * read new value from record
-   */
-  union {
-    uint32 new_value_32;
-    uint64 new_value_64;
-  };
-  {
-    const uchar *field_ptr= field->ptr + (new_data - table->record[0]);
-    if (resolve_size == 4)
-    {
-      memcpy(&new_value_32, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("new_value_32: %u", new_value_32));
-    }
-    else
-    {
-      memcpy(&new_value_64, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("new_value_64: %llu",
-                          (unsigned long long) new_value_64));
-    }
-  }
-  /*
-   * Load registers RegNewValue and RegCurrentValue
-   */
-  if (resolve_size == 4)
-    r= code->load_const_u32(RegNewValue, new_value_32);
-  else
-    r= code->load_const_u64(RegNewValue, new_value_64);
-  DBUG_ASSERT(r == 0);
-  r= code->read_attr(RegCurrentValue, resolve_column);
-  DBUG_ASSERT(r == 0);
-  /*
-   * if RegNewValue > RegCurrentValue goto label_0
-   * else raise error for this row
-   */
-  r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_nok(error_conflict_fn_max_violation);
-  DBUG_ASSERT(r == 0);
-  r= code->def_label(label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_ok();
-  DBUG_ASSERT(r == 0);
-  r= code->finalise();
-  DBUG_ASSERT(r == 0);
-  return r;
-}
-
-/**
-  CFT_NDB_OLD
-
-  To perform conflict detection, an interpreted program is used to read
-  the timestamp stored locally and compare to what was on the master.
-  If timestamp is not equal, an error for this operation (9998) will be raised,
-  and new row will not be applied. The error codes for the operations will
-  be checked on return.  For this to work is is vital that the operation
-  is run with ignore error option.
-
-  As an independent feature, phase 2 also saves the
-  conflicts into the table's exceptions table.
-*/
-
-int
-ha_ndbcluster::row_conflict_fn_old(const uchar *old_data,
-                                   NdbInterpretedCode *code)
-{
-  uint32 resolve_column= m_share->m_cfn_share->m_resolve_column;
-  uint32 resolve_size= m_share->m_cfn_share->m_resolve_size;
-
-  DBUG_PRINT("info", ("interpreted update pre equal on column %u",
-                      resolve_column));
-
-  DBUG_ASSERT(resolve_size == 4 || resolve_size == 8);
-
-  if (!bitmap_is_set(table->write_set, resolve_column))
-  {
-    sql_print_information("NDB Slave: missing data for NDB_OLD");
-    return -1;
-  }
-
-  const uint label_0= 0;
-  const Uint32 RegOldValue= 1, RegCurrentValue= 2;
-  int r;
-  Field *field= table->field[resolve_column];
-  DBUG_PRINT("info", ("interpreted update post equal"));
-  /*
-   * read old value from record
-   */
-  union {
-    uint32 old_value_32;
-    uint64 old_value_64;
-  };
-  {
-    const uchar *field_ptr= field->ptr + (old_data - table->record[0]);
-    if (resolve_size == 4)
-    {
-      memcpy(&old_value_32, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("old_value_32: %u", old_value_32));
-    }
-    else
-    {
-      memcpy(&old_value_64, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("old_value_64: %llu",
-                          (unsigned long long) old_value_64));
-    }
-  }
-  /*
-   * Load registers RegOldValue and RegCurrentValue
-   */
-  if (resolve_size == 4)
-    r= code->load_const_u32(RegOldValue, old_value_32);
-  else
-    r= code->load_const_u64(RegOldValue, old_value_64);
-  DBUG_ASSERT(r == 0);
-  r= code->read_attr(RegCurrentValue, resolve_column);
-  DBUG_ASSERT(r == 0);
-  /*
-   * if RegOldValue == RegCurrentValue goto label_0
-   * else raise error for this row
-   */
-  r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_nok(error_conflict_fn_old_violation);
-  DBUG_ASSERT(r == 0);
-  r= code->def_label(label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_ok();
-  DBUG_ASSERT(r == 0);
-  r= code->finalise();
-  DBUG_ASSERT(r == 0);
-  return r;
-}
-
-int
-ha_ndbcluster::update_row_conflict_fn(enum_conflict_fn_type cft,
-                                      const uchar *old_data,
-                                      uchar *new_data,
-                                      NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-  case CFT_NDB_MAX_DEL_WIN:
-    return row_conflict_fn_max(new_data, code);
-  case CFT_NDB_OLD:
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
-
-#if 0 /* NOT YET, interpeted function not supported for write */
-int
-ha_ndbcluster::write_row_conflict_fn(enum_conflict_fn_type cft,
-                                     uchar *data,
-                                     NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-  case CFT_NDB_MAX_DEL_WIN:
-    return row_conflict_fn_max(data, code);
-  case CFT_NDB_OLD:
-    /*
-      No conflict function here, instead detect if tuple
-      already exists
-     */
-    return 1;
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
-#endif
+  DBUG_ENTER("handle_row_conflict");
 
-int
-ha_ndbcluster::delete_row_conflict_fn(enum_conflict_fn_type cft,
-                                      const uchar *old_data,
-                                      NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-    /*
-      As we do not have a timestamp for the actual delete,
-      the best we can do is to detect a possible conflict
-      on the old data.
-    */
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_MAX_DEL_WIN:
-  {
-    /**
-     * let delete always win
-     */
-    int r = code->interpret_exit_ok();
-    DBUG_ASSERT(r == 0);
-    r = code->finalise();
-    DBUG_ASSERT(r == 0);
-    return r;
-  }
-  case CFT_NDB_OLD:
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
+  DBUG_RETURN(0);
+};
 #endif /* HAVE_NDB_BINLOG */
 
 /**
@@ -4650,7 +4594,7 @@ int ha_ndbcluster::exec_bulk_update(uint
     DBUG_PRINT("info", ("committing auto-commit+rbwr early"));
     uint ignore_count= 0;
     const int ignore_error= 1;
-    if (execute_commit(m_thd_ndb, trans,
+    if (execute_commit(table->in_use, m_thd_ndb, trans,
                        m_thd_ndb->m_force_send, ignore_error,
                        &ignore_count) != 0)
     {
@@ -4929,42 +4873,21 @@ int ha_ndbcluster::ndb_update_row(const
 				 m_read_before_write_removal_used);
 
 #ifdef HAVE_NDB_BINLOG
-    uchar* ex_data_buffer= NULL;
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
+    Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
+
     if (thd->slave_thread && m_share->m_cfn_share &&
-        (m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF))
+        m_share->m_cfn_share->m_conflict_fn)
     {
-      /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share->m_resolve_cft;
-      if (!update_row_conflict_fn(cft, old_data, new_data, &code))
-      {
-        options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-        options.interpretedCode= &code;
-        thd_ndb->m_conflict_fn_usage_count++;
-      }
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, new_data);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
+       /* Conflict resolution in slave thread. */
+      if (unlikely((error = prepare_conflict_detection(UPDATE_ROW,
+                                                       key_rec,
+                                                       old_data,
+                                                       new_data,
+                                                       &code,
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent !=0)
@@ -5059,7 +4982,7 @@ int ha_ndbcluster::end_bulk_delete()
     DBUG_PRINT("info", ("committing auto-commit+rbwr early"));
     uint ignore_count= 0;
     const int ignore_error= 1;
-    if (execute_commit(m_thd_ndb, trans,
+    if (execute_commit(table->in_use, m_thd_ndb, trans,
                        m_thd_ndb->m_force_send, ignore_error,
                        &ignore_count) != 0)
     {
@@ -5225,42 +5148,20 @@ int ha_ndbcluster::ndb_delete_row(const
 				 m_read_before_write_removal_used);
 
 #ifdef HAVE_NDB_BINLOG
-    uchar* ex_data_buffer= NULL;
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
+    Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
     if (thd->slave_thread && m_share->m_cfn_share &&
-        (m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF))
+        m_share->m_cfn_share->m_conflict_fn)
     {
       /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share->m_resolve_cft;
-      if (!delete_row_conflict_fn(cft, record, &code))
-      {
-        options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-        options.interpretedCode= &code;
-        thd_ndb->m_conflict_fn_usage_count++;
-      }
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, record);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
+      if (unlikely((error = prepare_conflict_detection(DELETE_ROW,
+                                                       key_rec,
+                                                       key_row, /* old_data */
+                                                       NULL,    /* new_data */
+                                                       &code,
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent != 0)
@@ -6413,7 +6314,7 @@ ha_ndbcluster::flush_bulk_insert(bool al
     THD *thd= table->in_use;
     thd->transaction.all.modified_non_trans_table=
       thd->transaction.stmt.modified_non_trans_table= TRUE;
-    if (execute_commit(m_thd_ndb, trans, m_thd_ndb->m_force_send,
+    if (execute_commit(thd, m_thd_ndb, trans, m_thd_ndb->m_force_send,
                        m_ignore_no_key) != 0)
     {
       no_uncommitted_rows_execute_failure();
@@ -7178,9 +7079,10 @@ int ndbcluster_commit(handlerton *hton,
 
   if (thd->slave_thread)
   {
-    if (!thd_ndb->m_conflict_fn_usage_count || !thd_ndb->m_unsent_bytes ||
+    if (!g_ndb_slave_state.current_conflict_defined_op_count ||
+        !thd_ndb->m_unsent_bytes ||
         !(res= execute_no_commit(thd_ndb, trans, TRUE)))
-      res= execute_commit(thd_ndb, trans, 1, TRUE);
+      res= execute_commit(thd, thd_ndb, trans, 1, TRUE);
 
     update_slave_api_stats(thd_ndb->ndb);
   }
@@ -7203,7 +7105,7 @@ int ndbcluster_commit(handlerton *hton,
       }
     }
     else
-      res= execute_commit(thd_ndb, trans, THDVAR(thd, force_send), FALSE);
+      res= execute_commit(thd, thd_ndb, trans, THDVAR(thd, force_send), FALSE);
   }
 
   if (res != 0)
@@ -7276,9 +7178,8 @@ static int ndbcluster_rollback(handlerto
     DBUG_RETURN(0);
   }
   thd_ndb->save_point_count= 0;
-  thd_ndb->m_max_violation_count= 0;
-  thd_ndb->m_old_violation_count= 0;
-  thd_ndb->m_conflict_fn_usage_count= 0;
+  if (thd->slave_thread)
+    g_ndb_slave_state.atTransactionAbort();
   thd_ndb->m_unsent_bytes= 0;
   thd_ndb->m_execute_count++;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
@@ -8314,6 +8215,34 @@ int ha_ndbcluster::create(const char *na
                         create_info->comment.length);
   const NDB_Modifier * mod_nologging = table_modifiers.get("NOLOGGING");
 
+#ifdef HAVE_NDB_BINLOG
+  /* Read ndb_replication entry for this table, if any */
+  Uint32 binlog_flags;
+  const st_conflict_fn_def* conflict_fn= NULL;
+  st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
+  Uint32 num_args = MAX_CONFLICT_ARGS;
+
+  int rep_read_rc= ndbcluster_get_binlog_replication_info(thd,
+                                                          ndb,
+                                                          m_dbname,
+                                                          m_tabname,
+                                                          ::server_id,
+                                                          form,
+                                                          &binlog_flags,
+                                                          &conflict_fn,
+                                                          args,
+                                                          &num_args);
+  if (rep_read_rc != 0)
+  {
+    DBUG_RETURN(rep_read_rc);
+  }
+
+  /* Reset database name */
+  ndb->setDatabaseName(m_dbname);
+
+  /* Use ndb_replication information as required */
+#endif
+
   if ((dict->beginSchemaTrans() == -1))
   {
     DBUG_PRINT("info", ("Failed to start schema transaction"));
@@ -8739,8 +8668,18 @@ cleanup_failed:
     {
 #ifdef HAVE_NDB_BINLOG
       if (share)
-        ndbcluster_read_binlog_replication(thd, ndb, share, m_table,
-                                           ::server_id, form, TRUE);
+      {
+        /* Set the Binlogging information we retrieved above */
+        ndbcluster_apply_binlog_replication_info(thd,
+                                                 share,
+                                                 m_table,
+                                                 form,
+                                                 conflict_fn,
+                                                 args,
+                                                 num_args,
+                                                 TRUE, /* Do set binlog flags */
+                                                 binlog_flags);
+      }
 #endif
       String event_name(INJECTOR_EVENT_LEN);
       ndb_rep_event_name(&event_name, m_dbname, m_tabname,

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-06-15 14:21:47 +0000
+++ b/sql/ha_ndbcluster.h	2011-06-20 10:41:04 +0000
@@ -172,6 +172,25 @@ inline void set_binlog_use_update(NDB_SH
 inline my_bool get_binlog_use_update(NDB_SHARE *share)
 { return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
 
+/*
+  State associated with the Slave thread
+  (From the Ndb handler's point of view)
+*/
+struct st_ndb_slave_state
+{
+  /* Counter values for current slave transaction */
+  Uint32 current_conflict_defined_op_count;
+  Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
+
+  /* Cumulative counter values */
+  Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
+
+  /* Methods */
+  void atTransactionCommit();
+  void atTransactionAbort();
+
+  st_ndb_slave_state();
+};
 
 struct Ndb_local_table_statistics {
   int no_uncommitted_rows_count;
@@ -393,20 +412,12 @@ static void set_tabname(const char *path
 
 private:
 #ifdef HAVE_NDB_BINLOG
-  int delete_row_conflict_fn(enum_conflict_fn_type cft,
-                             const uchar *old_data,
-                             NdbInterpretedCode *);
-  int write_row_conflict_fn(enum_conflict_fn_type cft,
-                            uchar *data,
-                            NdbInterpretedCode *);
-  int update_row_conflict_fn(enum_conflict_fn_type cft,
-                             const uchar *old_data,
-                             uchar *new_data,
-                             NdbInterpretedCode *);
-  int row_conflict_fn_max(const uchar *new_data,
-                          NdbInterpretedCode *);
-  int row_conflict_fn_old(const uchar *old_data,
-                          NdbInterpretedCode *);
+  int prepare_conflict_detection(enum_conflicting_op_type op_type,
+                                 const NdbRecord* key_rec,
+                                 const uchar* old_data,
+                                 const uchar* new_data,
+                                 NdbInterpretedCode* code,
+                                 NdbOperation::OperationOptions* options);
 #endif
   void setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
                                     const uchar **key_row,

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-04-06 13:51:46 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-06-20 10:41:04 +0000
@@ -3351,18 +3351,13 @@ inline void slave_reset_conflict_fn(NDB_
   }
 }
 
-static int
-slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
-                     const NDBTAB *ndbtab, uint field_index,
-                     enum_conflict_fn_type type, TABLE *table)
+static uint
+slave_check_resolve_col_type(const NDBTAB *ndbtab,
+                             uint field_index)
 {
-  DBUG_ENTER("slave_set_resolve_fn");
-
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  Ndb *ndb= thd_ndb->ndb;
-  NDBDICT *dict= ndb->getDictionary();
+  DBUG_ENTER("slave_check_resolve_col_type");
   const NDBCOL *c= ndbtab->getColumn(field_index);
-  uint sz;
+  uint sz= 0;
   switch (c->getType())
   {
   case  NDBCOL::Unsigned:
@@ -3378,11 +3373,24 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   default:
     DBUG_PRINT("info", ("resolve column %u has wrong type",
                         field_index));
-    slave_reset_conflict_fn(share);
-    DBUG_RETURN(-1);
     break;
   }
+  DBUG_RETURN(sz);
+}
 
+static int
+slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
+                     const NDBTAB *ndbtab, uint field_index,
+                     uint resolve_col_sz,
+                     const st_conflict_fn_def* conflict_fn,
+                     TABLE *table,
+                     uint8 flags)
+{
+  DBUG_ENTER("slave_set_resolve_fn");
+
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  Ndb *ndb= thd_ndb->ndb;
+  NDBDICT *dict= ndb->getDictionary();
   NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
   if (cfn_share == NULL)
   {
@@ -3390,9 +3398,14 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
       alloc_root(&share->mem_root, sizeof(NDB_CONFLICT_FN_SHARE));
     slave_reset_conflict_fn(share);
   }
-  cfn_share->m_resolve_size= sz;
+  cfn_share->m_conflict_fn= conflict_fn;
+
+  /* Calculate resolve col stuff (if relevant) */
+  cfn_share->m_resolve_size= resolve_col_sz;
   cfn_share->m_resolve_column= field_index;
-  cfn_share->m_resolve_cft= type;
+  cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
+                                        table->record[0]);
+  cfn_share->m_flags = flags;
 
   {
     /* get exceptions table */
@@ -3451,7 +3464,10 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
           cfn_share->m_pk_cols= nkey;
           ndbtab_g.release();
           if (opt_ndb_extra_logging)
-            sql_print_information("NDB Slave: log exceptions to %s",
+            sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
+                                  table->s->db.str,
+                                  table->s->table_name.str,
+                                  table->s->db.str,
                                   ex_tab_name);
         }
         else
@@ -3468,76 +3484,307 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   DBUG_RETURN(0);
 }
 
-enum enum_conflict_fn_arg_type
+/**
+  CFT_NDB_OLD
+
+  To perform conflict detection, an interpreted program is used to read
+  the timestamp stored locally and compare to what was on the master.
+  If timestamp is not equal, an error for this operation (9998) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  As an independent feature, phase 2 also saves the
+  conflicts into the table's exceptions table.
+*/
+int
+row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    enum_conflicting_op_type op_type,
+                    const uchar* old_data,
+                    const uchar* new_data,
+                    const MY_BITMAP* write_set,
+                    NdbInterpretedCode* code)
 {
-  CFAT_END
-  ,CFAT_COLUMN_NAME
-};
-struct st_conflict_fn_arg
+  DBUG_ENTER("row_conflict_fn_old");
+  uint32 resolve_column= cfn_share->m_resolve_column;
+  uint32 resolve_size= cfn_share->m_resolve_size;
+  const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
+
+  assert((resolve_size == 4) || (resolve_size == 8));
+
+  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
+  {
+    sql_print_information("NDB Slave: missing data for %s",
+                          cfn_share->m_conflict_fn->name);
+    DBUG_RETURN(1);
+  }
+
+  const uint label_0= 0;
+  const Uint32 RegOldValue= 1, RegCurrentValue= 2;
+  int r;
+
+  DBUG_PRINT("info",
+             ("Adding interpreted filter, existing value must eq event old value"));
+  /*
+   * read old value from record
+   */
+  union {
+    uint32 old_value_32;
+    uint64 old_value_64;
+  };
+  {
+    if (resolve_size == 4)
+    {
+      memcpy(&old_value_32, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  old_value_32: %u", old_value_32));
+    }
+    else
+    {
+      memcpy(&old_value_64, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  old_value_64: %llu",
+                          (unsigned long long) old_value_64));
+    }
+  }
+
+  /*
+   * Load registers RegOldValue and RegCurrentValue
+   */
+  if (resolve_size == 4)
+    r= code->load_const_u32(RegOldValue, old_value_32);
+  else
+    r= code->load_const_u64(RegOldValue, old_value_64);
+  DBUG_ASSERT(r == 0);
+  r= code->read_attr(RegCurrentValue, resolve_column);
+  DBUG_ASSERT(r == 0);
+  /*
+   * if RegOldValue == RegCurrentValue goto label_0
+   * else raise error for this row
+   */
+  r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_nok(error_conflict_fn_violation);
+  DBUG_ASSERT(r == 0);
+  r= code->def_label(label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_ok();
+  DBUG_ASSERT(r == 0);
+  r= code->finalise();
+  DBUG_ASSERT(r == 0);
+  DBUG_RETURN(r);
+}
+
+int
+row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
+                                enum_conflicting_op_type op_type,
+                                const uchar* old_data,
+                                const uchar* new_data,
+                                const MY_BITMAP* write_set,
+                                NdbInterpretedCode* code)
 {
-  enum_conflict_fn_arg_type type;
-  const char *ptr;
-  uint32 len;
-  uint32 fieldno; // CFAT_COLUMN_NAME
+  DBUG_ENTER("row_conflict_fn_max_update_only");
+  uint32 resolve_column= cfn_share->m_resolve_column;
+  uint32 resolve_size= cfn_share->m_resolve_size;
+  const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
+
+  assert((resolve_size == 4) || (resolve_size == 8));
+
+  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
+  {
+    sql_print_information("NDB Slave: missing data for %s",
+                          cfn_share->m_conflict_fn->name);
+    DBUG_RETURN(1);
+  }
+
+  const uint label_0= 0;
+  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
+  int r;
+
+  DBUG_PRINT("info",
+             ("Adding interpreted filter, existing value must be lt event new"));
+  /*
+   * read new value from record
+   */
+  union {
+    uint32 new_value_32;
+    uint64 new_value_64;
+  };
+  {
+    if (resolve_size == 4)
+    {
+      memcpy(&new_value_32, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  new_value_32: %u", new_value_32));
+    }
+    else
+    {
+      memcpy(&new_value_64, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  new_value_64: %llu",
+                          (unsigned long long) new_value_64));
+    }
+  }
+  /*
+   * Load registers RegNewValue and RegCurrentValue
+   */
+  if (resolve_size == 4)
+    r= code->load_const_u32(RegNewValue, new_value_32);
+  else
+    r= code->load_const_u64(RegNewValue, new_value_64);
+  DBUG_ASSERT(r == 0);
+  r= code->read_attr(RegCurrentValue, resolve_column);
+  DBUG_ASSERT(r == 0);
+  /*
+   * if RegNewValue > RegCurrentValue goto label_0
+   * else raise error for this row
+   */
+  r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_nok(error_conflict_fn_violation);
+  DBUG_ASSERT(r == 0);
+  r= code->def_label(label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_ok();
+  DBUG_ASSERT(r == 0);
+  r= code->finalise();
+  DBUG_ASSERT(r == 0);
+  DBUG_RETURN(r);
+}
+
+/**
+  CFT_NDB_MAX
+
+  To perform conflict resolution, an interpreted program is used to read
+  the timestamp stored locally and compare to what is going to be applied.
+  If timestamp is lower, an error for this operation (9999) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  Note that for delete, this algorithm reverts to the OLD algorithm.
+*/
+int
+row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    enum_conflicting_op_type op_type,
+                    const uchar* old_data,
+                    const uchar* new_data,
+                    const MY_BITMAP* write_set,
+                    NdbInterpretedCode* code)
+{
+  switch(op_type)
+  {
+  case WRITE_ROW:
+    abort();
+    return 1;
+  case UPDATE_ROW:
+    return row_conflict_fn_max_update_only(cfn_share,
+                                           op_type,
+                                           old_data,
+                                           new_data,
+                                           write_set,
+                                           code);
+  case DELETE_ROW:
+    /* Can't use max of new image, as there's no new image
+     * for DELETE
+     * Use OLD instead
+     */
+    return row_conflict_fn_old(cfn_share,
+                               op_type,
+                               old_data,
+                               new_data,
+                               write_set,
+                               code);
+  default:
+    abort();
+    return 1;
+  }
+}
+
+
+/**
+  CFT_NDB_MAX_DEL_WIN
+
+  To perform conflict resolution, an interpreted program is used to read
+  the timestamp stored locally and compare to what is going to be applied.
+  If timestamp is lower, an error for this operation (9999) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  In this variant, replicated DELETEs alway succeed - no filter is added
+  to them.
+*/
+
+int
+row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
+                            enum_conflicting_op_type op_type,
+                            const uchar* old_data,
+                            const uchar* new_data,
+                            const MY_BITMAP* write_set,
+                            NdbInterpretedCode* code)
+{
+  switch(op_type)
+  {
+  case WRITE_ROW:
+    abort();
+    return 1;
+  case UPDATE_ROW:
+    return row_conflict_fn_max_update_only(cfn_share,
+                                           op_type,
+                                           old_data,
+                                           new_data,
+                                           write_set,
+                                           code);
+  case DELETE_ROW:
+    /* This variant always lets a received DELETE_ROW
+     * succeed.
+     */
+    return 1;
+  default:
+    abort();
+    return 1;
+  }
 };
-struct st_conflict_fn_def
+
+static const st_conflict_fn_arg_def resolve_col_args[]=
 {
-  const char *name;
-  enum_conflict_fn_type type;
-  enum enum_conflict_fn_arg_type arg_type;
+  /* Arg type              Optional */
+  { CFAT_COLUMN_NAME,      false },
+  { CFAT_END,              false }
 };
-static struct st_conflict_fn_def conflict_fns[]=
+
+static const st_conflict_fn_def conflict_fns[]=
 {
-   { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN, CFAT_COLUMN_NAME }
-  ,{ NULL,                 CFT_NDB_MAX_DEL_WIN, CFAT_END }
-  ,{ "NDB$MAX", CFT_NDB_MAX,   CFAT_COLUMN_NAME }
-  ,{ NULL,      CFT_NDB_MAX,   CFAT_END }
-  ,{ "NDB$OLD", CFT_NDB_OLD,   CFAT_COLUMN_NAME }
-  ,{ NULL,      CFT_NDB_OLD,   CFAT_END }
+  { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
+    &resolve_col_args[0], row_conflict_fn_max_del_win },
+  { "NDB$MAX",            CFT_NDB_MAX,
+    &resolve_col_args[0], row_conflict_fn_max         },
+  { "NDB$OLD",            CFT_NDB_OLD,
+    &resolve_col_args[0], row_conflict_fn_old         },
 };
+
 static unsigned n_conflict_fns=
-sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
+  sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
 
-static int
-set_conflict_fn(THD *thd, NDB_SHARE *share,
-                const NDBTAB *ndbtab,
-                const NDBCOL *conflict_col,
-                char *conflict_fn,
-                char *msg, uint msg_len,
-                TABLE *table)
-{
-  DBUG_ENTER("set_conflict_fn");
-  uint len= 0;
-  switch (conflict_col->getArrayType())
-  {
-  case NDBCOL::ArrayTypeShortVar:
-    len= *(uchar*)conflict_fn;
-    conflict_fn++;
-    break;
-  case NDBCOL::ArrayTypeMediumVar:
-    len= uint2korr(conflict_fn);
-    conflict_fn+= 2;
-    break;
-  default:
-    break;
-  }
-  conflict_fn[len]= '\0';
-  const char *ptr= conflict_fn;
+
+int
+parse_conflict_fn_spec(const char* conflict_fn_spec,
+                       const st_conflict_fn_def** conflict_fn,
+                       st_conflict_fn_arg* args,
+                       Uint32* max_args,
+                       const TABLE* table,
+                       char *msg, uint msg_len)
+{
+  DBUG_ENTER("parse_conflict_fn_spec");
+
+  Uint32 no_args = 0;
+  const char *ptr= conflict_fn_spec;
   const char *error_str= "unknown conflict resolution function";
   /* remove whitespace */
   while (*ptr == ' ' && *ptr != '\0') ptr++;
 
-  const unsigned MAX_ARGS= 8;
-  unsigned no_args= 0;
-  struct st_conflict_fn_arg args[MAX_ARGS];
-
-  DBUG_PRINT("info", ("parsing %s", conflict_fn));
+  DBUG_PRINT("info", ("parsing %s", conflict_fn_spec));
 
   for (unsigned i= 0; i < n_conflict_fns; i++)
   {
-    struct st_conflict_fn_def &fn= conflict_fns[i];
-    if (fn.name == NULL)
-      continue;
+    const st_conflict_fn_def &fn= conflict_fns[i];
 
     uint len= strlen(fn.name);
     if (strncmp(ptr, fn.name, len))
@@ -3563,9 +3810,16 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
     /* find all arguments */
     for (;;)
     {
+      if (no_args >= *max_args)
+      {
+        error_str= "too many arguments";
+        DBUG_PRINT("info", ("parse error %s", error_str));
+        break;
+      }
+
       /* expected type */
       enum enum_conflict_fn_arg_type type=
-        conflict_fns[i+no_args].arg_type;
+        conflict_fns[i].arg_defs[no_args].arg_type;
 
       /* remove whitespace */
       while (*ptr == ' ' && *ptr != '\0') ptr++;
@@ -3578,16 +3832,30 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
       }
 
       /* arg */
+      /* Todo : Should support comma as an arg separator? */
       const char *start_arg= ptr;
       while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
       const char *end_arg= ptr;
 
+      bool optional_arg = conflict_fns[i].arg_defs[no_args].optional;
       /* any arg given? */
       if (start_arg == end_arg)
       {
-        error_str= "missing function argument";
-        DBUG_PRINT("info", ("parse error %s", error_str));
-        break;
+        if (!optional_arg)
+        {
+          error_str= "missing function argument";
+          DBUG_PRINT("info", ("parse error %s", error_str));
+          break;
+        }
+        else
+        {
+          /* Arg was optional, and not present
+           * Must be at end of args, finish parsing
+           */
+          args[no_args].type= CFAT_END;
+          error_str= NULL;
+          break;
+        }
       }
 
       uint len= end_arg - start_arg;
@@ -3598,6 +3866,7 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
  
       DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
 
+      bool arg_processing_error = false;
       switch (type)
       {
       case CFAT_COLUMN_NAME:
@@ -3622,6 +3891,8 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
         abort();
       }
 
+      if (arg_processing_error)
+        break;
       no_args++;
     }
 
@@ -3649,43 +3920,86 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
       break;
     }
 
-    /* setup the function */
-    switch (fn.type)
-    {
-    case CFT_NDB_MAX:
-    case CFT_NDB_OLD:
-    case CFT_NDB_MAX_DEL_WIN:
-      if (args[0].fieldno == (uint32)-1)
-        break;
-      if (slave_set_resolve_fn(thd, share, ndbtab, args[0].fieldno,
-                               fn.type, table))
-      {
-        /* wrong data type */
-        my_snprintf(msg, msg_len,
-                 "column '%s' has wrong datatype",
-                 table->s->field[args[0].fieldno]->field_name);
-        DBUG_PRINT("info", ("%s", msg));
-        DBUG_RETURN(-1);
-      }
-      if (opt_ndb_extra_logging)
-      {
-        sql_print_information("NDB Slave: conflict_fn %s on attribute %s",
-                              fn.name,
-                              table->s->field[args[0].fieldno]->field_name);
-      }
-      break;
-    case CFT_NDB_UNDEF:
-      abort();
-    }
+    /* Update ptrs to conflict fn + # of args */
+    *conflict_fn = &conflict_fns[i];
+    *max_args = no_args;
+
     DBUG_RETURN(0);
   }
   /* parse error */
   my_snprintf(msg, msg_len, "%s, %s at '%s'",
-           conflict_fn, error_str, ptr);
+           conflict_fn_spec, error_str, ptr);
   DBUG_PRINT("info", ("%s", msg));
   DBUG_RETURN(-1);
 }
 
+static int
+setup_conflict_fn(THD *thd, NDB_SHARE *share,
+                  const NDBTAB *ndbtab,
+                  char *msg, uint msg_len,
+                  TABLE *table,
+                  const st_conflict_fn_def* conflict_fn,
+                  const st_conflict_fn_arg* args,
+                  const Uint32 num_args)
+{
+  DBUG_ENTER("setup_conflict_fn");
+
+  /* setup the function */
+  switch (conflict_fn->type)
+  {
+  case CFT_NDB_MAX:
+  case CFT_NDB_OLD:
+  case CFT_NDB_MAX_DEL_WIN:
+  {
+    if (num_args != 1)
+    {
+      my_snprintf(msg, msg_len,
+                  "Incorrect arguments to conflict function");
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
+    uint resolve_col_sz= 0;
+
+    if (0 == (resolve_col_sz =
+              slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
+    {
+      /* wrong data type */
+      slave_reset_conflict_fn(share);
+      my_snprintf(msg, msg_len,
+                  "column '%s' has wrong datatype",
+                  table->s->field[args[0].fieldno]->field_name);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
+    if (slave_set_resolve_fn(thd, share, ndbtab,
+                             args[0].fieldno, resolve_col_sz,
+                             conflict_fn, table, CFF_NONE))
+    {
+      my_snprintf(msg, msg_len,
+                  "unable to setup conflict resolution using column '%s'",
+                  table->s->field[args[0].fieldno]->field_name);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+    if (opt_ndb_extra_logging)
+    {
+       sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
+                             table->s->db.str,
+                             table->s->table_name.str,
+                             conflict_fn->name,
+                             table->s->field[args[0].fieldno]->field_name);
+    }
+    break;
+  }
+  case CFT_NUMBER_OF_CFTS:
+  case CFT_NDB_UNDEF:
+    abort();
+  }
+  DBUG_RETURN(0);
+}
+
 static const char *ndb_rep_db= NDB_REP_DB;
 static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
 static const char *nrt_db= "db";
@@ -3693,45 +4007,30 @@ static const char *nrt_table_name= "tabl
 static const char *nrt_server_id= "server_id";
 static const char *nrt_binlog_type= "binlog_type";
 static const char *nrt_conflict_fn= "conflict_fn";
+
+/*
+   ndbcluster_read_replication_table
+
+   This function reads the information for the supplied table from
+   the mysql.ndb_replication table.
+   Where there is no information (or no table), defaults are
+   returned.
+*/
 int
-ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
-                                   NDB_SHARE *share,
-                                   const NDBTAB *ndbtab,
-                                   uint server_id,
-                                   TABLE *table,
-                                   bool do_set_binlog_flags)
+ndbcluster_read_replication_table(THD *thd, Ndb *ndb,
+                                  const char* db,
+                                  const char* table_name,
+                                  uint server_id,
+                                  Uint32* binlog_flags,
+                                  char** conflict_fn_spec,
+                                  char* conflict_fn_buffer,
+                                  Uint32 conflict_fn_buffer_len)
 {
-  DBUG_ENTER("ndbcluster_read_binlog_replication");
-  const char *db= share->db;
-  const char *table_name= share->table_name;
+  DBUG_ENTER("ndbcluster_read_replication_table");
   NdbError ndberror;
   int error= 0;
   const char *error_str= "<none>";
 
-  /* Override for ndb_apply_status when logging */
-  if (opt_ndb_log_apply_status &&
-      do_set_binlog_flags)
-  {
-    if (strcmp(db, NDB_REP_DB) == 0 &&
-        strcmp(table_name, NDB_APPLY_TABLE) == 0)
-    {
-      /*
-        Ensure that we get all columns from ndb_apply_status updates
-        by forcing FULL event type
-        Also, ensure that ndb_apply_status events are always logged as 
-        WRITES.
-        This is needed so that received ndb_apply_status WRITES can be
-        cleanly propagated.
-      */
-      DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
-      sql_print_information("NDB : ndb-log-apply-status forcing "
-                            "%s.%s to FULL USE_WRITE",
-                            NDB_REP_DB, NDB_APPLY_TABLE);
-      set_binlog_flags(share, NBT_FULL);
-      DBUG_RETURN(0);
-    }
-  }
-
   ndb->setDatabaseName(ndb_rep_db);
   NDBDICT *dict= ndb->getDictionary();
   Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
@@ -3741,8 +4040,8 @@ ndbcluster_read_binlog_replication(THD *
        dict->getNdbError().code == 4009))
   {
     DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
-    if (do_set_binlog_flags)
-      set_binlog_flags(share, NBT_DEFAULT);
+    *binlog_flags= NBT_DEFAULT;
+    *conflict_fn_spec= NULL;
     DBUG_RETURN(0);
   }
   const NDBCOL
@@ -3808,6 +4107,10 @@ ndbcluster_read_binlog_replication(THD *
     char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
     NdbOperation *op[2];
     uint32 i, id= 0;
+    /* Read generic row (server_id==0) and specific row (server_id == our id)
+     * from ndb_replication.
+     * Specific overrides generic, if present
+     */
     for (i= 0; i < 2; i++)
     {
       NdbOperation *_op;
@@ -3867,39 +4170,70 @@ ndbcluster_read_binlog_replication(THD *
     if (col_binlog_type_rec_attr[1] == NULL ||
         col_binlog_type_rec_attr[1]->isNULL())
     {
+      /* No specific value, use generic */
       col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
       ndb_binlog_type[1]= ndb_binlog_type[0];
     }
     if (col_conflict_fn_rec_attr[1] == NULL ||
         col_conflict_fn_rec_attr[1]->isNULL())
     {
+      /* No specific value, use generic */
       col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
       ndb_conflict_fn[1]= ndb_conflict_fn[0];
     }
 
-    if (do_set_binlog_flags)
+    if (col_binlog_type_rec_attr[1] == NULL ||
+        col_binlog_type_rec_attr[1]->isNULL())
     {
-      if (col_binlog_type_rec_attr[1] == NULL ||
-          col_binlog_type_rec_attr[1]->isNULL())
-        set_binlog_flags(share, NBT_DEFAULT);
-      else
-        set_binlog_flags(share, (enum Ndb_binlog_type) ndb_binlog_type[1]);
+      DBUG_PRINT("info", ("No binlog flag value, using default"));
+      /* No value */
+      *binlog_flags= NBT_DEFAULT;
     }
-    if (table)
+    else
+    {
+      DBUG_PRINT("info", ("Taking binlog flag value from the table"));
+      *binlog_flags= (enum Ndb_binlog_type) ndb_binlog_type[1];
+    }
+
+    if (col_conflict_fn_rec_attr[1] == NULL ||
+        col_conflict_fn_rec_attr[1]->isNULL())
+    {
+      /* No conflict function */
+      *conflict_fn_spec = NULL;
+    }
+    else
     {
-      if (col_conflict_fn_rec_attr[1] == NULL ||
-          col_conflict_fn_rec_attr[1]->isNULL())
-        slave_reset_conflict_fn(share); /* no conflict_fn */
-      else if (set_conflict_fn(thd, share, ndbtab,
-                               col_conflict_fn, ndb_conflict_fn[1],
-                               tmp_buf, sizeof(tmp_buf), table))
+      const char* conflict_fn = ndb_conflict_fn[1];
+      uint len= 0;
+      switch (col_conflict_fn->getArrayType())
+      {
+      case NDBCOL::ArrayTypeShortVar:
+        len= *(uchar*)conflict_fn;
+        conflict_fn++;
+        break;
+      case NDBCOL::ArrayTypeMediumVar:
+        len= uint2korr(conflict_fn);
+        conflict_fn+= 2;
+        break;
+      default:
+        abort();
+      }
+      if ((len + 1) > conflict_fn_buffer_len)
       {
-        error_str= tmp_buf;
-        error= 1;
         ndb->closeTransaction(trans);
+        error= -2;
+        error_str= "Conflict function specification too long.";
         goto err;
       }
+      memcpy(conflict_fn_buffer, conflict_fn, len);
+      conflict_fn_buffer[len] = '\0';
+      *conflict_fn_spec = conflict_fn_buffer;
     }
+
+    DBUG_PRINT("info", ("Retrieved Binlog flags : %u and function spec : %s",
+                        *binlog_flags, (*conflict_fn_spec != NULL ?*conflict_fn_spec:
+                                       "NULL")));
+
     ndb->closeTransaction(trans);
 
     DBUG_RETURN(0);
@@ -3908,14 +4242,7 @@ ndbcluster_read_binlog_replication(THD *
 err:
   DBUG_PRINT("info", ("error %d, error_str %s, ndberror.code %u",
                       error, error_str, ndberror.code));
-  if (error > 0)
-  {
-    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
-                        ER_CONFLICT_FN_PARSE_ERROR,
-                        ER(ER_CONFLICT_FN_PARSE_ERROR),
-                        error_str);
-  }
-  else if (error < 0)
+  if (error < 0)
   {
     char msg[FN_REFLEN];
     switch (error)
@@ -3947,12 +4274,192 @@ err:
                         ER(ER_ILLEGAL_HA_CREATE_OPTION),
                         ndbcluster_hton_name, msg);  
   }
-  if (do_set_binlog_flags)
-    set_binlog_flags(share, NBT_DEFAULT);
+  *binlog_flags= NBT_DEFAULT;
+  *conflict_fn_spec= NULL;
+
   if (ndberror.code && opt_ndb_extra_logging)
     print_warning_list("NDB", thd_warn_list(thd));
   DBUG_RETURN(ndberror.code);
 }
+
+/*
+  ndbcluster_get_binlog_replication_info
+
+  This function retrieves the data for the given table
+  from the ndb_replication table.
+
+  If the table is not found, or the table does not exist,
+  then defaults are returned.
+*/
+int
+ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
+                                       const char* db,
+                                       const char* table_name,
+                                       uint server_id,
+                                       const TABLE *table,
+                                       Uint32* binlog_flags,
+                                       const st_conflict_fn_def** conflict_fn,
+                                       st_conflict_fn_arg* args,
+                                       Uint32* num_args)
+{
+  DBUG_ENTER("ndbcluster_get_binlog_replication_info");
+
+  /* Override for ndb_apply_status when logging */
+  if (opt_ndb_log_apply_status)
+  {
+    if (strcmp(db, NDB_REP_DB) == 0 &&
+        strcmp(table_name, NDB_APPLY_TABLE) == 0)
+    {
+      /*
+        Ensure that we get all columns from ndb_apply_status updates
+        by forcing FULL event type
+        Also, ensure that ndb_apply_status events are always logged as
+        WRITES.
+      */
+      DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
+      sql_print_information("NDB : ndb-log-apply-status forcing "
+                            "%s.%s to FULL USE_WRITE",
+                            NDB_REP_DB, NDB_APPLY_TABLE);
+      *binlog_flags = NBT_FULL;
+      *conflict_fn = NULL;
+      *num_args = 0;
+      DBUG_RETURN(0);
+    }
+  }
+
+  const Uint32 MAX_CONFLICT_FN_SPEC_LEN = 256;
+  char conflict_fn_buffer[MAX_CONFLICT_FN_SPEC_LEN];
+  char* conflict_fn_spec;
+
+  if (ndbcluster_read_replication_table(thd,
+                                        ndb,
+                                        db,
+                                        table_name,
+                                        server_id,
+                                        binlog_flags,
+                                        &conflict_fn_spec,
+                                        conflict_fn_buffer,
+                                        MAX_CONFLICT_FN_SPEC_LEN) != 0)
+  {
+    DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
+  }
+
+  if (table != NULL)
+  {
+    if (conflict_fn_spec != NULL)
+    {
+      char tmp_buf[FN_REFLEN];
+
+      if (parse_conflict_fn_spec(conflict_fn_spec,
+                                 conflict_fn,
+                                 args,
+                                 num_args,
+                                 table,
+                                 tmp_buf,
+                                 sizeof(tmp_buf)) != 0)
+      {
+        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_CONFLICT_FN_PARSE_ERROR,
+                            ER(ER_CONFLICT_FN_PARSE_ERROR),
+                            tmp_buf);
+        DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
+      }
+    }
+    else
+    {
+      /* No conflict function specified */
+      conflict_fn= NULL;
+      num_args= 0;
+    }
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndbcluster_apply_binlog_replication_info(THD *thd,
+                                         NDB_SHARE *share,
+                                         const NDBTAB* ndbtab,
+                                         TABLE* table,
+                                         const st_conflict_fn_def* conflict_fn,
+                                         const st_conflict_fn_arg* args,
+                                         Uint32 num_args,
+                                         bool do_set_binlog_flags,
+                                         Uint32 binlog_flags)
+{
+  DBUG_ENTER("ndbcluster_apply_binlog_replication_info");
+  char tmp_buf[FN_REFLEN];
+
+  if (do_set_binlog_flags)
+  {
+    DBUG_PRINT("info", ("Setting binlog flags to %u", binlog_flags));
+    set_binlog_flags(share, (enum Ndb_binlog_type)binlog_flags);
+  }
+
+  if (conflict_fn != NULL)
+  {
+    if (setup_conflict_fn(thd, share,
+                          ndbtab,
+                          tmp_buf, sizeof(tmp_buf),
+                          table,
+                          conflict_fn,
+                          args,
+                          num_args) != 0)
+    {
+      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_CONFLICT_FN_PARSE_ERROR,
+                          ER(ER_CONFLICT_FN_PARSE_ERROR),
+                          tmp_buf);
+      DBUG_RETURN(-1);
+    }
+  }
+  else
+  {
+    /* No conflict function specified */
+    slave_reset_conflict_fn(share);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
+                                   NDB_SHARE *share,
+                                   const NDBTAB *ndbtab,
+                                   uint server_id,
+                                   TABLE *table,
+                                   bool do_set_binlog_flags)
+{
+  DBUG_ENTER("ndbcluster_read_binlog_replication");
+  Uint32 binlog_flags;
+  const st_conflict_fn_def* conflict_fn= NULL;
+  st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
+  Uint32 num_args = MAX_CONFLICT_ARGS;
+
+  if ((ndbcluster_get_binlog_replication_info(thd, ndb,
+                                              share->db,
+                                              share->table_name,
+                                              server_id,
+                                              table,
+                                              &binlog_flags,
+                                              &conflict_fn,
+                                              args,
+                                              &num_args) != 0) ||
+      (ndbcluster_apply_binlog_replication_info(thd,
+                                                share,
+                                                ndbtab,
+                                                table,
+                                                conflict_fn,
+                                                args,
+                                                num_args,
+                                                do_set_binlog_flags,
+                                                binlog_flags) != 0))
+  {
+    DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
 #endif /* HAVE_NDB_BINLOG */
 
 bool
@@ -5574,6 +6081,13 @@ ndb_binlog_thread_func(void *arg)
   uint incident_id= 0;
   Binlog_thread_state do_ndbcluster_binlog_close_connection;
 
+  /**
+   * If we get error after having reported incident
+   *   but before binlog started...we do "Restarting Cluster Binlog"
+   *   in that case, don't report incident again
+   */
+  bool do_incident = true;
+
 #ifdef RUN_NDB_BINLOG_TIMER
   Timer main_timer;
 #endif
@@ -5717,7 +6231,7 @@ restart_cluster_failure:
   /*
     Main NDB Injector loop
   */
-  while (ndb_binlog_running)
+  while (do_incident && ndb_binlog_running)
   {
     /*
       check if it is the first log, if so we do not insert a GAP event
@@ -5749,6 +6263,7 @@ restart_cluster_failure:
     int ret = inj->record_incident(thd, INCIDENT_LOST_EVENTS,
                                    msg[incident_id]);
     assert(ret == 0);
+    do_incident = false; // Don't report incident again, unless we get started
     break;
   }
   incident_id= 1;
@@ -5874,6 +6389,7 @@ restart_cluster_failure:
     static char db[]= "";
     thd->db= db;
   }
+  do_incident = true; // If we get disconnected again...do incident report
   do_ndbcluster_binlog_close_connection= BCCC_running;
   for ( ; !((ndbcluster_binlog_terminating ||
              do_ndbcluster_binlog_close_connection) &&

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-03-08 22:31:14 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-06-20 10:41:04 +0000
@@ -93,8 +93,7 @@ static const char *ha_ndb_ext=".ndb";
 #define NDB_EXCEPTIONS_TABLE_SUFFIX "$EX"
 #define NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER "$ex"
 
-const uint error_conflict_fn_old_violation= 9998;
-const uint error_conflict_fn_max_violation= 9999;
+const uint error_conflict_fn_violation= 9999;
 #endif /* HAVE_NDB_BINLOG */
 
 
@@ -163,6 +162,26 @@ void ndb_rep_event_name(String *event_na
                         const char *db, const char *tbl, my_bool full);
 #ifdef HAVE_NDB_BINLOG
 int
+ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
+                                       const char* db,
+                                       const char* table_name,
+                                       uint server_id,
+                                       const TABLE *table,
+                                       Uint32* binlog_flags,
+                                       const st_conflict_fn_def** conflict_fn,
+                                       st_conflict_fn_arg* args,
+                                       Uint32* num_args);
+int
+ndbcluster_apply_binlog_replication_info(THD *thd,
+                                         NDB_SHARE *share,
+                                         const NDBTAB* ndbtab,
+                                         TABLE* table,
+                                         const st_conflict_fn_def* conflict_fn,
+                                         const st_conflict_fn_arg* args,
+                                         Uint32 num_args,
+                                         bool do_set_binlog_flags,
+                                         Uint32 binlog_flags);
+int
 ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
                                    NDB_SHARE *share,
                                    const NDBTAB *ndbtab,

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	2011-03-08 09:13:39 +0000
+++ b/sql/ndb_share.h	2011-06-20 10:41:04 +0000
@@ -25,7 +25,6 @@
 
 #include <ndbapi/Ndb.hpp>    // Ndb::TupleIdRange
 
-
 enum NDB_SHARE_STATE {
   NSS_INITIAL= 0,
   NSS_DROPPED,
@@ -33,32 +32,98 @@ enum NDB_SHARE_STATE {
 };
 
 
-#ifdef HAVE_NDB_BINLOG
 enum enum_conflict_fn_type
 {
   CFT_NDB_UNDEF = 0
   ,CFT_NDB_MAX
   ,CFT_NDB_OLD
   ,CFT_NDB_MAX_DEL_WIN
+  ,CFT_NUMBER_OF_CFTS /* End marker */
+};
+
+#ifdef HAVE_NDB_BINLOG
+static const Uint32 MAX_CONFLICT_ARGS= 8;
+
+enum enum_conflict_fn_arg_type
+{
+  CFAT_END
+  ,CFAT_COLUMN_NAME
+};
+
+struct st_conflict_fn_arg
+{
+  enum_conflict_fn_arg_type type;
+  const char *ptr;
+  uint32 len;
+  uint32 fieldno; // CFAT_COLUMN_NAME
+};
+
+struct st_conflict_fn_arg_def
+{
+  enum enum_conflict_fn_arg_type arg_type;
+  bool optional;
+};
+
+/* What type of operation was issued */
+enum enum_conflicting_op_type
+{                /* NdbApi          */
+  WRITE_ROW,     /* insert (!write) */
+  UPDATE_ROW,    /* update          */
+  DELETE_ROW     /* delete          */
+};
+
+/*
+  prepare_detect_func
+
+  Type of function used to prepare for conflict detection on
+  an NdbApi operation
+*/
+typedef int (* prepare_detect_func) (struct NDB_CONFLICT_FN_SHARE* cfn_share,
+                                     enum_conflicting_op_type op_type,
+                                     const uchar* old_data,
+                                     const uchar* new_data,
+                                     const MY_BITMAP* write_set,
+                                     struct NdbInterpretedCode* code);
+
+struct st_conflict_fn_def
+{
+  const char *name;
+  enum_conflict_fn_type type;
+  const st_conflict_fn_arg_def* arg_defs;
+  prepare_detect_func prep_func;
 };
 
+/* What sort of conflict was found */
+enum enum_conflict_cause
+{
+  ROW_ALREADY_EXISTS,
+  ROW_DOES_NOT_EXIST,
+  ROW_IN_CONFLICT
+};
 
 /* NdbOperation custom data which points out handler and record. */
 struct Ndb_exceptions_data {
   struct NDB_SHARE* share;
+  const NdbRecord* key_rec;
   const uchar* row;
+  enum_conflicting_op_type op_type;
 };
 
+enum enum_conflict_fn_flags
+{
+  CFF_NONE = 0
+};
 
 struct NDB_CONFLICT_FN_SHARE {
-  enum_conflict_fn_type m_resolve_cft;
+  const st_conflict_fn_def* m_conflict_fn;
 
   /* info about original table */
   uint8 m_pk_cols;
   uint8 m_resolve_column;
   uint8 m_resolve_size;
-  uint8 unused;
+  uint8 m_flags;
   uint16 m_offset[16];
+  uint16 m_resolve_offset;
 
   const NdbDictionary::Table *m_ex_tab;
   uint32 m_count;

=== modified file 'sql/ndb_thd_ndb.h'
--- a/sql/ndb_thd_ndb.h	2011-03-09 10:18:50 +0000
+++ b/sql/ndb_thd_ndb.h	2011-06-20 10:41:04 +0000
@@ -96,9 +96,6 @@ public:
   uint m_batch_size;
 
   uint m_execute_count;
-  uint m_max_violation_count;
-  uint m_old_violation_count;
-  uint m_conflict_fn_usage_count;
 
   uint m_scan_count;
   uint m_pruned_scan_count;

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-06-07 12:08:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-06-16 08:37:10 +0000
@@ -4418,7 +4418,7 @@ Dbspj::scanIndex_build(Build_context& ct
     if (requestPtr.p->m_bits & Request::RT_REPEAT_SCAN_RESULT &&
        !treeNodePtr.p->m_ancestors.contains(ctx.m_scans))
     {
-      nodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
+      treeNodePtr.p->m_bits |= TreeNode::T_SCAN_REPEATABLE;
     }
 
     ctx.m_scan_cnt++;
@@ -5025,6 +5025,8 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
 
   DEBUG("scanIndex_parent_batch_repeat(), m_node_no: " << treeNodePtr.p->m_node_no
         << ", m_batch_chunks: " << data.m_batch_chunks);
+  
+  ndbassert(treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE);
 
   /**
    * Register index-scans to be restarted if we didn't get all

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2011-05-17 12:47:21 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2011-06-16 09:32:43 +0000
@@ -23,7 +23,6 @@
 #include <Vector.hpp>
 #include "signaldata/QueryTree.hpp"
 
-#include <Ndb.hpp>
 #include "NdbDictionaryImpl.hpp"
 #include <NdbRecord.hpp>
 #include "AttributeHeader.hpp"
@@ -575,16 +574,15 @@ NdbQueryIndexScanOperationDef::~NdbQuery
 
 NdbQueryOperationDefImpl::~NdbQueryOperationDefImpl()
 {
+  // Unlink any parent and child refering this object
   if (m_parent != NULL)
   {
     m_parent->removeChild(this);
   }
-  // Delete children recursively also.
   for (Uint32 i = 0; i<m_children.size(); i++)
   {
     assert(m_children[i]->m_parent == this);
     m_children[i]->m_parent = NULL;
-    delete m_children[i];
   }
 }
 
@@ -686,9 +684,9 @@ NdbQueryOperationDef::getIndex() const
  * Implementation of NdbQueryBuilder factory
  ******************************************/
 // Static method.
-NdbQueryBuilder* NdbQueryBuilder::create(Ndb& ndb)
+NdbQueryBuilder* NdbQueryBuilder::create()
 {
-  NdbQueryBuilderImpl* const impl = new NdbQueryBuilderImpl(ndb);
+  NdbQueryBuilderImpl* const impl = new NdbQueryBuilderImpl();
   if (likely (impl != NULL))
   {
     if (likely(impl->getNdbError().code == 0))
@@ -1045,9 +1043,9 @@ NdbQueryBuilder::prepare()
 // The (hidden) Impl of NdbQueryBuilder
 ////////////////////////////////////////
 
-NdbQueryBuilderImpl::NdbQueryBuilderImpl(Ndb& ndb)
+NdbQueryBuilderImpl::NdbQueryBuilderImpl()
 : m_interface(*this),
-  m_ndb(ndb), m_error(),
+  m_error(),
   m_operations(),
   m_operands(),
   m_paramCnt(0),
@@ -1063,12 +1061,13 @@ NdbQueryBuilderImpl::NdbQueryBuilderImpl
 NdbQueryBuilderImpl::~NdbQueryBuilderImpl()
 {
   // Delete all operand and operator in Vector's
-  if (m_operations.size() > 0)
+  for (Uint32 i=0; i<m_operations.size(); ++i)
   {
-    delete m_operations[0];
+    delete m_operations[i];
   }
   for (Uint32 i=0; i<m_operands.size(); ++i)
-  { delete m_operands[i];
+  {
+    delete m_operands[i];
   }
 }
 
@@ -1231,12 +1230,13 @@ NdbQueryDefImpl(const Vector<NdbQueryOpe
 NdbQueryDefImpl::~NdbQueryDefImpl()
 {
   // Release all NdbQueryOperations
-  if (m_operations.size() > 0)
+  for (Uint32 i=0; i<m_operations.size(); ++i)
   {
-    delete m_operations[0];
+    delete m_operations[i];
   }
   for (Uint32 i=0; i<m_operands.size(); ++i)
-  { delete m_operands[i];
+  {
+    delete m_operands[i];
   }
 }
 
@@ -2799,15 +2799,14 @@ main(int argc, const char** argv)
   assert (sizeof(NdbParamOperand) == sizeof(NdbQueryOperandImpl*));
   assert (sizeof(NdbLinkedOperand) == sizeof(NdbQueryOperandImpl*));
 
-  Ndb *myNdb = 0;
-  NdbQueryBuilder myBuilder(*myNdb);
+  NdbQueryBuilder* const myBuilder= NdbQueryBuilder::create();
 
   const NdbDictionary::Table *manager = (NdbDictionary::Table*)0xDEADBEAF;
 //  const NdbDictionary::Index *ix = (NdbDictionary::Index*)0x11223344;
 
-  NdbQueryDef* q1 = 0;
+  const NdbQueryDef* q1 = 0;
   {
-    NdbQueryBuilder* qb = &myBuilder; //myDict->getQueryBuilder();
+    NdbQueryBuilder* qb = myBuilder;
 
     const NdbQueryOperand* managerKey[] =  // Manager is indexed om {"dept_no", "emp_no"}
     {  qb->constValue("d005"),             // dept_no = "d005"

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.hpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.hpp	2011-06-16 09:32:43 +0000
@@ -24,7 +24,6 @@
 // skip includes...and require them to be included first
 // BUH!
 
-class Ndb;
 class NdbQueryDef;
 class NdbQueryDefImpl;
 class NdbQueryBuilderImpl;
@@ -358,9 +357,8 @@ private:
  *   build phase.
  *
  * - The NdbQueryDef produced by the ::prepare() method has a lifetime 
- *   determined by the Ndb object, or until it is explicit released by
- *   NdbQueryDef::release()
- *  
+ *   until it is explicit released by NdbQueryDef::release()
+ *
  */
 class NdbQueryBuilder 
 {
@@ -380,7 +378,7 @@ public:
    * Allocate an instance.
    * @return New instance, or NULL if allocation failed.
    */
-  static NdbQueryBuilder* create(Ndb& ndb);
+  static NdbQueryBuilder* create();
 
   /**
    * Release this object and any resources held by it.
@@ -478,8 +476,7 @@ private:
  * NdbQueryDef represents a ::prepare()'d object from NdbQueryBuilder.
  *
  * The NdbQueryDef is reusable in the sense that it may be executed multiple
- * times. Its lifetime is defined by the Ndb object which it was created with,
- * or it may be explicitely released() when no longer required.
+ * times. It is valid until it is explicitely released().
  *
  * The NdbQueryDef *must* be keept alive until the last thread
  * which executing a query based on this NdbQueryDef has completed execution 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp	2011-05-05 11:06:08 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilderImpl.hpp	2011-06-16 09:32:43 +0000
@@ -631,7 +631,7 @@ class NdbQueryBuilderImpl
 
 public:
   ~NdbQueryBuilderImpl();
-  explicit NdbQueryBuilderImpl(Ndb& ndb);
+  explicit NdbQueryBuilderImpl();
 
   const NdbQueryDefImpl* prepare();
 
@@ -665,7 +665,6 @@ private:
   bool contains(const NdbQueryOperationDefImpl*);
 
   NdbQueryBuilder m_interface;
-  Ndb& m_ndb;
   NdbError m_error;
 
   Vector<NdbQueryOperationDefImpl*> m_operations;

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-05-26 14:44:59 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-06-16 09:32:43 +0000
@@ -2895,7 +2895,6 @@ NdbQueryImpl::closeTcCursor(bool forceSe
   assert (m_queryDef.isScanQuery());
 
   NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
-  NdbImpl* const impl = ndb;
   const Uint32 timeout  = ndb->get_waitfor_timeout();
   const Uint32 nodeId   = m_transaction.getConnectedNodeId();
   const Uint32 seq      = m_transaction.theNodeSequence;
@@ -2905,7 +2904,7 @@ NdbQueryImpl::closeTcCursor(bool forceSe
    */
   PollGuard poll_guard(*ndb);
 
-  if (unlikely(impl->getNodeSequence(nodeId) != seq))
+  if (unlikely(ndb->getNodeSequence(nodeId) != seq))
   {
     setErrorCode(Err_NodeFailCausedAbort);
     return -1;  // Transporter disconnected and reconnected, no need to close
@@ -2917,7 +2916,7 @@ NdbQueryImpl::closeTcCursor(bool forceSe
     const FetchResult result = static_cast<FetchResult>
         (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
 
-    if (unlikely(impl->getNodeSequence(nodeId) != seq))
+    if (unlikely(ndb->getNodeSequence(nodeId) != seq))
       setFetchTerminated(Err_NodeFailCausedAbort,false);
     else if (unlikely(result != FetchResult_ok))
     {
@@ -2952,7 +2951,7 @@ NdbQueryImpl::closeTcCursor(bool forceSe
       const FetchResult result = static_cast<FetchResult>
           (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
 
-      if (unlikely(impl->getNodeSequence(nodeId) != seq))
+      if (unlikely(ndb->getNodeSequence(nodeId) != seq))
         setFetchTerminated(Err_NodeFailCausedAbort,false);
       else if (unlikely(result != FetchResult_ok))
       {

=== modified file 'storage/ndb/test/include/HugoQueryBuilder.hpp'
--- a/storage/ndb/test/include/HugoQueryBuilder.hpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/test/include/HugoQueryBuilder.hpp	2011-06-16 09:32:43 +0000
@@ -1,6 +1,5 @@
 /*
-   Copyright (C) 2003 MySQL AB
-    All rights reserved. Use is subject to license terms.
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -105,7 +104,7 @@ public:
   OptionMask getOptionMask() const { return m_options;}
   void setOptionMask(OptionMask om) { m_options = om;}
 
-  const NdbQueryDef * createQuery(Ndb*, bool takeOwnership = false);
+  const NdbQueryDef * createQuery(bool takeOwnership = false);
 
 private:
   struct TableDef

=== modified file 'storage/ndb/test/src/HugoQueryBuilder.cpp'
--- a/storage/ndb/test/src/HugoQueryBuilder.cpp	2011-04-06 14:16:13 +0000
+++ b/storage/ndb/test/src/HugoQueryBuilder.cpp	2011-06-16 09:32:43 +0000
@@ -1,6 +1,5 @@
 /*
-   Copyright (C) 2003 MySQL AB
-    All rights reserved. Use is subject to license terms.
+   Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -563,9 +562,9 @@ loop:
 }
 
 const NdbQueryDef *
-HugoQueryBuilder::createQuery(Ndb* pNdb, bool takeOwnership)
+HugoQueryBuilder::createQuery(bool takeOwnership)
 {
-  NdbQueryBuilder* const builder = NdbQueryBuilder::create(*pNdb);
+  NdbQueryBuilder* const builder = NdbQueryBuilder::create();
   if (builder == NULL)
   {
     ndbout << "Failed to create NdbQueryBuilder." << endl;

No bundle (reason: revision is a merge (you can force generation of a bundle with env var BZR_FORCE_BUNDLE=1)).
Thread
bzr commit into mysql-5.5-cluster branch (frazer.clement:3360) Frazer Clement20 Jun