List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:June 16 2011 2:39pm
Subject:bzr commit into mysql-5.1-telco-7.1 branch (frazer.clement:4246)
View as plain text  
#At file:///home/frazer/bzr/mysql-5.1-telco-7.1/ based on revid:frazer.clement@stripped

 4246 Frazer Clement	2011-06-16 [merge]
      Merge 7.0->7.1

    modified:
      mysql-test/suite/ndb/r/ndb_basic.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
      mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc
      mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc
      mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2011-04-28 07:47:53 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2011-06-16 14:34:56 +0000
@@ -66,6 +66,7 @@ Ndb_cluster_node_id	#
 Ndb_config_from_host	#
 Ndb_config_from_port	#
 Ndb_conflict_fn_max	#
+Ndb_conflict_fn_max_del_win	#
 Ndb_conflict_fn_old	#
 Ndb_connect_count	#
 Ndb_execute_count	#

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -162,10 +174,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -198,10 +219,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -259,6 +289,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -285,6 +318,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -305,6 +341,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -332,6 +371,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -380,10 +422,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -416,10 +467,19 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -473,10 +533,13 @@ a	b	c	d
 3	Master t2 c=2	2	123
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+3
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-3
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -505,10 +568,13 @@ a	b	c	d
 3	Master t2 a=3 at c=3	3	123
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-4
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -533,10 +599,13 @@ select * from t2 order by a, d;
 a	b	c	d
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-4
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -572,6 +641,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -616,10 +688,13 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+3
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-3
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -658,10 +733,13 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-0
+6
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-6
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -733,6 +811,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -759,6 +840,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -779,6 +863,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -806,6 +893,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -850,15 +940,24 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-1
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
+2	1	#	4	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 2	1	#	#	4	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
@@ -889,15 +988,24 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+4
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
-1
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
+2	1	#	4	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 2	1	#	#	4	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_max_delete_win.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -158,14 +170,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -194,14 +215,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -259,6 +289,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -285,6 +318,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -305,6 +341,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -332,6 +371,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -376,14 +418,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -412,14 +463,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp
@@ -476,6 +536,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -508,6 +571,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 4
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -536,6 +602,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 4
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -572,6 +641,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -619,6 +691,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -661,6 +736,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
+0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
 6
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
@@ -733,6 +811,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -759,6 +840,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -779,6 +863,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -806,6 +893,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -850,14 +940,23 @@ commit;
 *** slave - check conflict info, there should be some
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check update some data that causes conflicts
@@ -886,14 +985,23 @@ commit;
 *** slave - check conflict info, change depends on calling test
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 VARIABLE_VALUE-@init_ndb_conflict_fn_max
-3
+0
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+3
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	1	#	#
+2	1	#	2	#	#
+2	1	#	3	#	#
 SELECT * FROM `t1$EX` ORDER BY a, d;
 server_id	master_server_id	master_epoch	count	a	d
+2	1	#	#	1	111
+2	1	#	#	2	111222
+2	1	#	#	3	111222333
 SELECT * FROM `t2$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 SELECT * FROM `t2$EX` ORDER BY a, d;
 *** slave - check higer timestamp

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_old.result	2011-06-16 14:34:56 +0000
@@ -41,6 +41,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -67,6 +70,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -87,6 +93,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -114,6 +123,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -162,6 +174,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -204,6 +219,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 5
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -275,6 +293,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -301,6 +322,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -321,6 +345,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -348,6 +375,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -396,6 +426,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -438,6 +471,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 5
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -509,6 +545,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -541,6 +580,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -569,6 +611,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -604,6 +649,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -652,6 +700,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 3
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -694,6 +745,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 6
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	5	#	#
@@ -765,6 +819,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -791,6 +848,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -811,6 +871,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -838,6 +901,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 0
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 SELECT * FROM `t1$EX` ORDER BY a, d;
@@ -886,6 +952,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 4
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#
@@ -931,6 +1000,9 @@ VARIABLE_VALUE-@init_ndb_conflict_fn_max
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
 VARIABLE_VALUE-@init_ndb_conflict_fn_old
 6
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
+VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win
+0
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;
 server_id	master_server_id	master_epoch	count	a	d
 2	1	#	1	#	#

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	2011-06-16 14:34:56 +0000
@@ -6,9 +6,11 @@ Error	1626	Bad schema for mysql.ndb_repl
 drop table t1;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
 Error	1627	Error in parsing conflict function. Message: NDB$X(X), unknown conflict resolution function at 'NDB$X(X)'
-drop table t1;
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
 create table t1 (a int key, X int) engine ndb;
@@ -18,13 +20,17 @@ drop table t1;
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
 Error	1627	Error in parsing conflict function. Message: NDB$MAX(), missing function argument at ')'
-drop table t1;
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
 create table t1 (a int key, X int) engine ndb;
-Warnings:
+ERROR HY000: Can't create table 'test.t1' (errno: 1627)
+show warnings;
+Level	Code	Message
 Error	1627	Error in parsing conflict function. Message: NDB$MAX(X Y), missing ')' at 'Y)'
-drop table t1;
+Error	1005	Can't create table 'test.t1' (errno: 1627)
 delete from mysql.ndb_replication;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_conflict_info.inc	2011-06-16 14:34:56 +0000
@@ -1,5 +1,6 @@
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_old FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_max_del_win FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
 --replace_column 3 # 5 # 6 #
 --error 0,1146
 SELECT server_id, master_server_id, master_epoch, count, a, d FROM `t1$EX` ORDER BY server_id, master_server_id, master_epoch, count;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_conflict_info_init.inc	2011-06-16 14:34:56 +0000
@@ -2,6 +2,7 @@
 --disable_result_log
 SELECT @init_ndb_conflict_fn_max:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX";
 SELECT @init_ndb_conflict_fn_old:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_OLD";
+SELECT @init_ndb_conflict_fn_max_del_win:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_MAX_DEL_WIN";
 --error 0,1146
 DELETE FROM `t1$EX`;
 --enable_query_log

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2011-05-13 07:40:50 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	2011-06-16 14:34:56 +0000
@@ -47,10 +47,11 @@ CREATE TABLE mysql.ndb_replication
 --enable_query_log
 
 # Non existant conflict_fn
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 # Column type cannot be used for this function
@@ -61,17 +62,19 @@ drop table t1;
 delete from mysql.ndb_replication;
 
 # Too few arguments
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 # Too many arguments
-# gives warning when creating table
+# gives error when creating table
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
+--error 1005
 create table t1 (a int key, X int) engine ndb;
-drop table t1;
+show warnings;
 delete from mysql.ndb_replication;
 
 --disable_query_log

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-06-13 06:14:56 +0000
+++ b/sql/ha_ndbcluster.cc	2011-06-16 14:39:44 +0000
@@ -337,6 +337,11 @@ ndbcluster_alter_table_flags(uint flags)
 
 #define NDB_AUTO_INCREMENT_RETRIES 100
 #define BATCH_FLUSH_SIZE (32768)
+/*
+  Room for 10 instruction words, two labels (@ 2words/label)
+  + 2 extra words for the case of resolve_size == 8
+*/
+#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16
 
 static void set_ndb_err(THD *thd, const NdbError &err);
 
@@ -424,8 +429,6 @@ struct st_ndb_status {
 };
 
 static struct st_ndb_status g_ndb_status;
-static long g_ndb_status_conflict_fn_max= 0;
-static long g_ndb_status_conflict_fn_old= 0;
 
 long long g_event_data_count = 0;
 long long g_event_nondata_count = 0;
@@ -442,6 +445,37 @@ update_slave_api_stats(Ndb* ndb)
     g_slave_api_client_stats[i] = ndb->getClientStat(i);
 }
 
+st_ndb_slave_state g_ndb_slave_state;
+
+st_ndb_slave_state::st_ndb_slave_state()
+  : current_conflict_defined_op_count(0)
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  memset(total_violation_count, 0, sizeof(total_violation_count));
+};
+
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+  /* Reset current-transaction counters + state */
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  current_conflict_defined_op_count = 0;
+}
+
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+  /* Merge committed transaction counters into total state
+   * Then reset current transaction counters
+   */
+  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+  {
+    total_violation_count[i]+= current_violation_count[i];
+    current_violation_count[i] = 0;
+  }
+  current_conflict_defined_op_count = 0;
+}
+
 static int update_status_variables(Thd_ndb *thd_ndb,
                                    st_ndb_status *ns,
                                    Ndb_cluster_connection *c)
@@ -556,8 +590,9 @@ SHOW_VAR ndb_status_variables_dynamic[]=
 };
 
 SHOW_VAR ndb_status_conflict_variables[]= {
-  {"fn_max",     (char*) &g_ndb_status_conflict_fn_max, SHOW_LONG},
-  {"fn_old",     (char*) &g_ndb_status_conflict_fn_old, SHOW_LONG},
+  {"fn_max",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX], SHOW_LONG},
+  {"fn_old",       (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONG},
+  {"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONG},
   {NullS, NullS, SHOW_LONG}
 };
 
@@ -717,6 +752,24 @@ static int write_conflict_row(NDB_SHARE
 }
 #endif
 
+#ifdef HAVE_NDB_BINLOG
+int
+handle_conflict_op_error(Thd_ndb* thd_ndb,
+                         NdbTransaction* trans,
+                         const NdbError& err,
+                         const NdbOperation* op);
+
+int
+handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    const NdbRecord* key_rec,
+                    const uchar* pk_row,
+                    enum_conflicting_op_type op_type,
+                    enum_conflict_cause conflict_cause,
+                    const NdbError& conflict_error,
+                    NdbTransaction* conflict_trans,
+                    NdbError& err);
+#endif
+
 inline int
 check_completed_operations_pre_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
                                       const NdbOperation *first,
@@ -737,102 +790,42 @@ check_completed_operations_pre_commit(Th
     or exceptions to report
   */
 #ifdef HAVE_NDB_BINLOG
-  uint conflict_rows_written= 0;
+  const NdbOperation* lastUserOp = trans->getLastDefinedOperation();
 #endif
   while (true)
   {
     const NdbError &err= first->getNdbError();
-    if (err.classification != NdbError::NoError
-#ifndef HAVE_NDB_BINLOG
-        && err.classification != NdbError::ConstraintViolation
-        && err.classification != NdbError::NoDataFound
-#endif
-        )
+    const bool op_has_conflict_detection = (first->getCustomData() != NULL);
+    if (!op_has_conflict_detection)
     {
-#ifdef HAVE_NDB_BINLOG
-      DBUG_PRINT("info", ("ndb error: %d", err.code));
-      if (err.code == (int) error_conflict_fn_max_violation)
-      {
-        DBUG_PRINT("info", ("err.code == (int) error_conflict_fn_max_violation"));
-        thd_ndb->m_max_violation_count++;
-      }
-      else if (err.code == (int) error_conflict_fn_old_violation ||
-               err.classification == NdbError::ConstraintViolation ||
-               err.classification == NdbError::NoDataFound)
-      {
-        DBUG_PRINT("info",
-                   ("err.code %s (int) error_conflict_fn_old_violation, "
-                    "err.classification %s",
-                    err.code == (int) error_conflict_fn_old_violation ? "==" : "!=",
-                    err.classification
-                    == NdbError::ConstraintViolation
-                    ? "== NdbError::ConstraintViolation"
-                    : (err.classification == NdbError::NoDataFound
-                       ? "== NdbError::NoDataFound" : "!=")));
-        thd_ndb->m_old_violation_count++;
-        const void* buffer= first->getCustomData();
-        if (buffer != NULL)
-        {
-          Ndb_exceptions_data ex_data;
-          memcpy(&ex_data, buffer, sizeof(ex_data));
-          NDB_SHARE *share= ex_data.share;
-          const uchar* row= ex_data.row;
-          DBUG_ASSERT(share != NULL && row != NULL);
-
-          NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
-          if (cfn_share && cfn_share->m_ex_tab)
-          {
-            NdbError ex_err;
-            if (write_conflict_row(share, trans, row, ex_err))
-            {
-              char msg[FN_REFLEN];
-              my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
-                          cfn_share->m_ex_tab->getName(),
-                          ex_err.code, ex_err.message);
-
-              NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
-
-              if (ex_err.classification == NdbError::SchemaError)
-              {
-                dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
-                cfn_share->m_ex_tab= NULL;
-              }
-              else if (ex_err.status == NdbError::TemporaryError)
-              {
-                /* Slave will roll back and retry entire transaction. */
-                ERR_RETURN(ex_err);
-              }
-              else
-              {
-                push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                                    ER_EXCEPTIONS_WRITE_ERROR,
-                                    ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-                /* Slave will stop replication. */
-                DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-              }
-            }
-            else
-            {
-              conflict_rows_written++;
-            }
-          }
-          else
-          {
-            DBUG_PRINT("info", ("missing %s", !cfn_share ? "cfn_share" : "ex_tab"));
-          }
-        }
-        else
-        {
-          DBUG_PRINT("info", ("missing custom data"));
-        }
-      }
-      else
-#endif
+      /* 'Normal path' - ignore key (not) present, others are errors */
+      if (err.classification != NdbError::NoError &&
+          err.classification != NdbError::ConstraintViolation &&
+          err.classification != NdbError::NoDataFound)
       {
+        /* Non ignored error, report it */
         DBUG_PRINT("info", ("err.code == %u", err.code));
         DBUG_RETURN(err.code);
       }
     }
+#ifdef HAVE_NDB_BINLOG
+    else
+    {
+      /*
+         Op with conflict detection, use special error handling method
+       */
+
+      if (err.classification != NdbError::NoError)
+      {
+        int res = handle_conflict_op_error(thd_ndb,
+                                           trans,
+                                           err,
+                                           first);
+        if (res != 0)
+          DBUG_RETURN(res);
+      }
+    } // if (!op_has_conflict_detection)
+#endif
     if (err.classification != NdbError::NoError)
       ignores++;
 
@@ -844,7 +837,11 @@ check_completed_operations_pre_commit(Th
   if (ignore_count)
     *ignore_count= ignores;
 #ifdef HAVE_NDB_BINLOG
-  if (conflict_rows_written)
+  /*
+     Conflict detection related error handling above may have defined
+     new operations on the transaction.  If so, execute them now
+  */
+  if (trans->getLastDefinedOperation() != lastUserOp)
   {
     if (trans->execute(NdbTransaction::NoCommit,
                        NdbOperation::AO_IgnoreError,
@@ -948,10 +945,10 @@ int execute_no_commit(Thd_ndb *thd_ndb,
                                                     ignore_count));
 }
 
-int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                    int force_send, int ignore_error, uint *ignore_count= 0);
 inline
-int execute_commit(Thd_ndb *thd_ndb, NdbTransaction *trans,
+int execute_commit(THD* thd, Thd_ndb *thd_ndb, NdbTransaction *trans,
                    int force_send, int ignore_error, uint *ignore_count)
 {
   DBUG_ENTER("execute_commit");
@@ -968,19 +965,19 @@ int execute_commit(Thd_ndb *thd_ndb, Ndb
   const NdbOperation *first= trans->getFirstDefinedOperation();
   const NdbOperation *last= trans->getLastDefinedOperation();
   thd_ndb->m_execute_count++;
-  thd_ndb->m_conflict_fn_usage_count= 0;
   thd_ndb->m_unsent_bytes= 0;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
   if (trans->execute(NdbTransaction::Commit, ao, force_send))
   {
-    thd_ndb->m_max_violation_count= 0;
-    thd_ndb->m_old_violation_count= 0;
+    if (thd->slave_thread)
+      g_ndb_slave_state.atTransactionAbort();
     DBUG_RETURN(-1);
   }
-  g_ndb_status_conflict_fn_max+= thd_ndb->m_max_violation_count;
-  g_ndb_status_conflict_fn_old+= thd_ndb->m_old_violation_count;
-  thd_ndb->m_max_violation_count= 0;
-  thd_ndb->m_old_violation_count= 0;
+  /* Success of some sort */
+  if (thd->slave_thread)
+  {
+    g_ndb_slave_state.atTransactionCommit();
+  }
   if (!ignore_error || trans->getNdbError().code == 0)
     DBUG_RETURN(trans->getNdbError().code);
   DBUG_RETURN(check_completed_operations(thd_ndb, trans, first, last,
@@ -1036,9 +1033,6 @@ Thd_ndb::Thd_ndb()
   m_execute_count= 0;
   m_scan_count= 0;
   m_pruned_scan_count= 0;
-  m_max_violation_count= 0;
-  m_old_violation_count= 0;
-  m_conflict_fn_usage_count= 0;
   bzero(m_transaction_no_hint_count, sizeof(m_transaction_no_hint_count));
   bzero(m_transaction_hint_count, sizeof(m_transaction_hint_count));
   global_schema_lock_trans= NULL;
@@ -2999,9 +2993,8 @@ int ha_ndbcluster::ndb_pk_update_row(THD
     DBUG_PRINT("info", ("insert failed"));
     if (trans->commitStatus() == NdbConnection::Started)
     {
-      m_thd_ndb->m_max_violation_count= 0;
-      m_thd_ndb->m_old_violation_count= 0;
-      m_thd_ndb->m_conflict_fn_usage_count= 0;
+      if (thd->slave_thread)
+        g_ndb_slave_state.atTransactionAbort();
       m_thd_ndb->m_unsent_bytes= 0;
       m_thd_ndb->m_execute_count++;
       DBUG_PRINT("info", ("execute_count: %u", m_thd_ndb->m_execute_count));
@@ -3934,6 +3927,239 @@ thd_allow_batch(const THD* thd)
 #endif
 }
 
+#ifdef HAVE_NDB_BINLOG
+/**
+   prepare_conflict_detection
+
+   This method is called during operation definition by the slave,
+   when writing to a table with conflict detection defined.
+
+   It is responsible for defining and adding any operation filtering
+   required, and for saving any operation definition state required
+   for post-execute analysis
+*/
+int
+ha_ndbcluster::prepare_conflict_detection(enum_conflicting_op_type op_type,
+                                          const NdbRecord* key_rec,
+                                          const uchar* old_data,
+                                          const uchar* new_data,
+                                          NdbInterpretedCode* code,
+                                          NdbOperation::OperationOptions* options)
+{
+  DBUG_ENTER("prepare_conflict_detection");
+
+  int res = 0;
+  const st_conflict_fn_def* conflict_fn = m_share->m_cfn_share->m_conflict_fn;
+  assert( conflict_fn != NULL );
+
+
+  /*
+     Prepare interpreted code for operation (update + delete only) according
+     to algorithm used
+  */
+  if (op_type != WRITE_ROW)
+  {
+    res = conflict_fn->prep_func(m_share->m_cfn_share,
+                                 op_type,
+                                 old_data,
+                                 new_data,
+                                 table->write_set,
+                                 code);
+
+    if (!res)
+    {
+      /* Attach conflict detecting filter program to operation */
+      options->optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
+      options->interpretedCode= code;
+    }
+  } // if (op_type != WRITE_ROW)
+
+  g_ndb_slave_state.current_conflict_defined_op_count++;
+
+  /* Now save data for potential insert to exceptions table... */
+  const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
+  Ndb_exceptions_data ex_data;
+  ex_data.share= m_share;
+  ex_data.key_rec= key_rec;
+  ex_data.op_type= op_type;
+  /*
+    We need to save the row data for possible conflict resolution after
+    execute().
+  */
+  ex_data.row= copy_row_to_buffer(m_thd_ndb, row_to_save);
+  uchar* ex_data_buffer= get_buffer(m_thd_ndb, sizeof(ex_data));
+  if (ex_data.row == NULL || ex_data_buffer == NULL)
+  {
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+  }
+  memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
+
+  /* Store ptr to exceptions data in operation 'customdata' ptr */
+  options->optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
+  options->customData= (void*)ex_data_buffer;
+
+  DBUG_RETURN(0);
+}
+
+/**
+   handle_conflict_op_error
+
+   This method is called when an error is detected after executing an
+   operation with conflict detection active.
+
+   If the operation error is related to conflict detection, handling
+   starts.
+
+   Handling involves incrementing the relevant counter, and optionally
+   refreshing the row and inserting an entry into the exceptions table
+*/
+
+int
+handle_conflict_op_error(Thd_ndb* thd_ndb,
+                         NdbTransaction* trans,
+                         const NdbError& err,
+                         const NdbOperation* op)
+{
+  DBUG_ENTER("handle_conflict_op_error");
+  DBUG_PRINT("info", ("ndb error: %d", err.code));
+
+  if ((err.code == (int) error_conflict_fn_violation) ||
+      (err.classification == NdbError::ConstraintViolation) ||
+      (err.classification == NdbError::NoDataFound))
+  {
+    DBUG_PRINT("info",
+               ("err.code %s (int) error_conflict_fn_violation, "
+                "err.classification %s",
+                err.code == (int) error_conflict_fn_violation ? "==" : "!=",
+                err.classification
+                == NdbError::ConstraintViolation
+                ? "== NdbError::ConstraintViolation"
+                : (err.classification == NdbError::NoDataFound
+                   ? "== NdbError::NoDataFound" : "!=")));
+
+    enum_conflict_cause conflict_cause;
+
+    if (err.code == (int) error_conflict_fn_violation)
+    {
+      conflict_cause= ROW_IN_CONFLICT;
+    }
+    else if (err.classification == NdbError::ConstraintViolation)
+    {
+      conflict_cause= ROW_ALREADY_EXISTS;
+    }
+    else
+    {
+      assert(err.classification == NdbError::NoDataFound);
+      conflict_cause= ROW_DOES_NOT_EXIST;
+    }
+
+    const void* buffer=op->getCustomData();
+    assert(buffer);
+    Ndb_exceptions_data ex_data;
+    memcpy(&ex_data, buffer, sizeof(ex_data));
+    NDB_SHARE *share= ex_data.share;
+    const NdbRecord* key_rec= ex_data.key_rec;
+    const uchar* row= ex_data.row;
+    enum_conflicting_op_type op_type = ex_data.op_type;
+    DBUG_ASSERT(share != NULL && row != NULL);
+
+    NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
+    if (cfn_share)
+    {
+      enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
+      bool haveExTable = cfn_share->m_ex_tab != NULL;
+
+      g_ndb_slave_state.current_violation_count[cft]++;
+
+      {
+        NdbError handle_error;
+        if (handle_row_conflict(cfn_share,
+                                key_rec,
+                                row,
+                                op_type,
+                                conflict_cause,
+                                err,
+                                trans,
+                                handle_error))
+        {
+          /* Error with handling of row conflict */
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "Row conflict handling "
+                      "on table %s hit Ndb error %d '%s'",
+                      share->table_name,
+                      handle_error.code,
+                      handle_error.message);
+
+          if (handle_error.status == NdbError::TemporaryError)
+          {
+            /* Slave will roll back and retry entire transaction. */
+            ERR_RETURN(handle_error);
+          }
+          else
+          {
+            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                                ER_EXCEPTIONS_WRITE_ERROR,
+                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+            /* Slave will stop replication. */
+            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+          }
+        }
+      }
+
+
+      if (haveExTable)
+      {
+        NdbError ex_err;
+        if (write_conflict_row(share, trans, row, ex_err))
+        {
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
+                      cfn_share->m_ex_tab->getName(),
+                      ex_err.code, ex_err.message);
+
+          NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
+
+          if (ex_err.classification == NdbError::SchemaError)
+          {
+            dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
+            cfn_share->m_ex_tab= NULL;
+          }
+          else if (ex_err.status == NdbError::TemporaryError)
+          {
+            /* Slave will roll back and retry entire transaction. */
+            ERR_RETURN(ex_err);
+          }
+          else
+          {
+            push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                                ER_EXCEPTIONS_WRITE_ERROR,
+                                ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+            /* Slave will stop replication. */
+            DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+          }
+        }
+      } // if (haveExTable)
+
+      DBUG_RETURN(0);
+    }
+    else
+    {
+      DBUG_PRINT("info", ("missing cfn_share"));
+      DBUG_RETURN(0); // TODO : Correct?
+    }
+  }
+  else
+  {
+    /* Non conflict related error */
+    DBUG_PRINT("info", ("err.code == %u", err.code));
+    DBUG_RETURN(err.code);
+  }
+
+  DBUG_RETURN(0); // Reachable?
+}
+#endif /* HAVE_NDB_BINLOG */
+
+
 int ha_ndbcluster::write_row(uchar *record)
 {
   DBUG_ENTER("ha_ndbcluster::write_row");
@@ -4118,14 +4344,16 @@ int ha_ndbcluster::ndb_write_row(uchar *
   MY_BITMAP tmpBitmap;
   MY_BITMAP *user_cols_written_bitmap;
 #ifdef HAVE_NDB_BINLOG
-  uchar* ex_data_buffer= NULL;
+  bool haveConflictFunction =
+    (thd->slave_thread &&
+     m_share->m_cfn_share &&
+     m_share->m_cfn_share->m_conflict_fn);
 #endif
   
   if (m_use_write
 #ifdef HAVE_NDB_BINLOG
-      && !(thd->slave_thread &&
-           m_share->m_cfn_share &&
-           m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF)
+      /* Conflict detection must use normal Insert */
+      && !haveConflictFunction
 #endif
       )
   {
@@ -4157,52 +4385,7 @@ int ha_ndbcluster::ndb_write_row(uchar *
       user_cols_written_bitmap= NULL;
       mask= NULL;
     }
-
-#if 0 /* NOT YET, interpeted function not supported for write */
-#ifdef HAVE_NDB_BINLOG
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
-    NdbInterpretedCode code(m_table, buffer,
-                            sizeof(buffer)/sizeof(buffer[0]));
-    if (useWriteSet)
-    {
-      /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share ? m_share->m_cfn_share->m_resolve_cft : CFT_NDB_UNDEF;
-      if (cft != CFT_NDB_UNDEF)
-      {
-        if (!write_row_conflict_fn(cft, record, &code))
-        {
-          options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-          options.interpretedCode= &code;
-          thd_ndb->m_conflict_fn_usage_count++;
-        }
-        
-        Ndb_exceptions_data ex_data;
-        ex_data.share= m_share;
-        /*
-          We need to save the row data for possible conflict resolution after
-          execute().
-        */
-        ex_data.row= copy_row_to_buffer(thd_ndb, record);
-        ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-        if (ex_data.row == NULL || ex_data_buffer == NULL)
-        {
-          DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-        }
-        memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-        options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-        options.customData= (void*)ex_data_buffer;
-        if (options.optionsPresent != 0)
-          poptions=&options;
-      }
-    }
-#endif  /* HAVE_NDB_BINLOG */
-#endif  /* NOT YET */
-
+    /* TODO : Add conflict detection etc when interpreted write supported */
     op= trans->writeTuple(key_rec, (const char *)key_row, m_ndb_record,
                           (char *)record, mask,
                           poptions, sizeof(NdbOperation::OperationOptions));
@@ -4210,27 +4393,16 @@ int ha_ndbcluster::ndb_write_row(uchar *
   else
   {
 #ifdef HAVE_NDB_BINLOG
-    if (m_use_write)
+    if (haveConflictFunction)
     {
-      thd_ndb->m_conflict_fn_usage_count++;
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, record);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
-      poptions=&options;
+      /* Conflict detection in slave thread */
+      if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
+                                                       key_rec,
+                                                       NULL,    /* old_data */
+                                                       record,  /* new_data */
+                                                       NULL,    /* code */
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif
     uchar *mask;
@@ -4380,248 +4552,20 @@ int ha_ndbcluster::primary_key_cmp(const
 
 #ifdef HAVE_NDB_BINLOG
 
-/**
-  CFT_NDB_NEW
-
-  To perform conflict resolution, an interpreted program is used to read
-  the timestamp stored locally and compare to what is going to be applied.
-  If timestamp is lower, an error for this operation (9999) will be raised,
-  and new row will not be applied. The error codes for the operations will
-  be checked on return.  For this to work is is vital that the operation
-  is run with ignore error option.
-*/
-
-int
-ha_ndbcluster::row_conflict_fn_max(const uchar *new_data,
-                                   NdbInterpretedCode *code)
-{
-  uint32 resolve_column= m_share->m_cfn_share->m_resolve_column;
-  uint32 resolve_size= m_share->m_cfn_share->m_resolve_size;
-
-  DBUG_PRINT("info", ("interpreted update pre equal on column %u",
-                      resolve_column));
-
-  DBUG_ASSERT(resolve_size == 4 || resolve_size == 8);
-
-  if (!bitmap_is_set(table->write_set, resolve_column))
-  {
-    sql_print_information("NDB Slave: missing data for NDB_MAX");
-    return 1;
-  }
-
-  const uint label_0= 0;
-  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
-  int r;
-  Field *field= table->field[resolve_column];
-  DBUG_PRINT("info", ("interpreted update post equal"));
-  /*
-   * read new value from record
-   */
-  union {
-    uint32 new_value_32;
-    uint64 new_value_64;
-  };
-  {
-    const uchar *field_ptr= field->ptr + (new_data - table->record[0]);
-    if (resolve_size == 4)
-    {
-      memcpy(&new_value_32, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("new_value_32: %u", new_value_32));
-    }
-    else
-    {
-      memcpy(&new_value_64, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("new_value_64: %llu",
-                          (unsigned long long) new_value_64));
-    }
-  }
-  /*
-   * Load registers RegNewValue and RegCurrentValue
-   */
-  if (resolve_size == 4)
-    r= code->load_const_u32(RegNewValue, new_value_32);
-  else
-    r= code->load_const_u64(RegNewValue, new_value_64);
-  DBUG_ASSERT(r == 0);
-  r= code->read_attr(RegCurrentValue, resolve_column);
-  DBUG_ASSERT(r == 0);
-  /*
-   * if RegNewValue > RegCurrentValue goto label_0
-   * else raise error for this row
-   */
-  r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_nok(error_conflict_fn_max_violation);
-  DBUG_ASSERT(r == 0);
-  r= code->def_label(label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_ok();
-  DBUG_ASSERT(r == 0);
-  r= code->finalise();
-  DBUG_ASSERT(r == 0);
-  return r;
-}
-
-/**
-  CFT_NDB_OLD
-
-  To perform conflict detection, an interpreted program is used to read
-  the timestamp stored locally and compare to what was on the master.
-  If timestamp is not equal, an error for this operation (9998) will be raised,
-  and new row will not be applied. The error codes for the operations will
-  be checked on return.  For this to work is is vital that the operation
-  is run with ignore error option.
-
-  As an independent feature, phase 2 also saves the
-  conflicts into the table's exceptions table.
-*/
-
 int
-ha_ndbcluster::row_conflict_fn_old(const uchar *old_data,
-                                   NdbInterpretedCode *code)
+handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
+                    const NdbRecord* key_rec,
+                    const uchar* pk_row,
+                    enum_conflicting_op_type op_type,
+                    enum_conflict_cause conflict_cause,
+                    const NdbError& conflict_error,
+                    NdbTransaction* conflict_trans,
+                    NdbError& err)
 {
-  uint32 resolve_column= m_share->m_cfn_share->m_resolve_column;
-  uint32 resolve_size= m_share->m_cfn_share->m_resolve_size;
-
-  DBUG_PRINT("info", ("interpreted update pre equal on column %u",
-                      resolve_column));
-
-  DBUG_ASSERT(resolve_size == 4 || resolve_size == 8);
-
-  if (!bitmap_is_set(table->write_set, resolve_column))
-  {
-    sql_print_information("NDB Slave: missing data for NDB_OLD");
-    return -1;
-  }
+  DBUG_ENTER("handle_row_conflict");
 
-  const uint label_0= 0;
-  const Uint32 RegOldValue= 1, RegCurrentValue= 2;
-  int r;
-  Field *field= table->field[resolve_column];
-  DBUG_PRINT("info", ("interpreted update post equal"));
-  /*
-   * read old value from record
-   */
-  union {
-    uint32 old_value_32;
-    uint64 old_value_64;
-  };
-  {
-    const uchar *field_ptr= field->ptr + (old_data - table->record[0]);
-    if (resolve_size == 4)
-    {
-      memcpy(&old_value_32, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("old_value_32: %u", old_value_32));
-    }
-    else
-    {
-      memcpy(&old_value_64, field_ptr, resolve_size);
-      DBUG_PRINT("info", ("old_value_64: %llu",
-                          (unsigned long long) old_value_64));
-    }
-  }
-  /*
-   * Load registers RegOldValue and RegCurrentValue
-   */
-  if (resolve_size == 4)
-    r= code->load_const_u32(RegOldValue, old_value_32);
-  else
-    r= code->load_const_u64(RegOldValue, old_value_64);
-  DBUG_ASSERT(r == 0);
-  r= code->read_attr(RegCurrentValue, resolve_column);
-  DBUG_ASSERT(r == 0);
-  /*
-   * if RegOldValue == RegCurrentValue goto label_0
-   * else raise error for this row
-   */
-  r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_nok(error_conflict_fn_old_violation);
-  DBUG_ASSERT(r == 0);
-  r= code->def_label(label_0);
-  DBUG_ASSERT(r == 0);
-  r= code->interpret_exit_ok();
-  DBUG_ASSERT(r == 0);
-  r= code->finalise();
-  DBUG_ASSERT(r == 0);
-  return r;
-}
-
-int
-ha_ndbcluster::update_row_conflict_fn(enum_conflict_fn_type cft,
-                                      const uchar *old_data,
-                                      uchar *new_data,
-                                      NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-  case CFT_NDB_MAX_DEL_WIN:
-    return row_conflict_fn_max(new_data, code);
-  case CFT_NDB_OLD:
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
-
-#if 0 /* NOT YET, interpeted function not supported for write */
-int
-ha_ndbcluster::write_row_conflict_fn(enum_conflict_fn_type cft,
-                                     uchar *data,
-                                     NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-  case CFT_NDB_MAX_DEL_WIN:
-    return row_conflict_fn_max(data, code);
-  case CFT_NDB_OLD:
-    /*
-      No conflict function here, instead detect if tuple
-      already exists
-     */
-    return 1;
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
-#endif
-
-int
-ha_ndbcluster::delete_row_conflict_fn(enum_conflict_fn_type cft,
-                                      const uchar *old_data,
-                                      NdbInterpretedCode *code)
-{
-  switch (cft) {
-  case CFT_NDB_MAX:
-    /*
-      As we do not have a timestamp for the actual delete,
-      the best we can do is to detect a possible conflict
-      on the old data.
-    */
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_MAX_DEL_WIN:
-  {
-    /**
-     * let delete always win
-     */
-    int r = code->interpret_exit_ok();
-    DBUG_ASSERT(r == 0);
-    r = code->finalise();
-    DBUG_ASSERT(r == 0);
-    return r;
-  }
-  case CFT_NDB_OLD:
-    return row_conflict_fn_old(old_data, code);
-  case CFT_NDB_UNDEF:
-    abort();
-  }
-  DBUG_ASSERT(false);
-  return 1;
-}
+  DBUG_RETURN(0);
+};
 #endif /* HAVE_NDB_BINLOG */
 
 /**
@@ -4672,7 +4616,7 @@ int ha_ndbcluster::exec_bulk_update(uint
     DBUG_PRINT("info", ("committing auto-commit+rbwr early"));
     uint ignore_count= 0;
     const int ignore_error= 1;
-    if (execute_commit(m_thd_ndb, trans,
+    if (execute_commit(table->in_use, m_thd_ndb, trans,
                        m_thd_ndb->m_force_send, ignore_error,
                        &ignore_count) != 0)
     {
@@ -4951,42 +4895,21 @@ int ha_ndbcluster::ndb_update_row(const
 				 m_read_before_write_removal_used);
 
 #ifdef HAVE_NDB_BINLOG
-    uchar* ex_data_buffer= NULL;
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
+    Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
+
     if (thd->slave_thread && m_share->m_cfn_share &&
-        (m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF))
+        m_share->m_cfn_share->m_conflict_fn)
     {
-      /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share->m_resolve_cft;
-      if (!update_row_conflict_fn(cft, old_data, new_data, &code))
-      {
-        options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-        options.interpretedCode= &code;
-        thd_ndb->m_conflict_fn_usage_count++;
-      }
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, new_data);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
+       /* Conflict resolution in slave thread. */
+      if (unlikely((error = prepare_conflict_detection(UPDATE_ROW,
+                                                       key_rec,
+                                                       old_data,
+                                                       new_data,
+                                                       &code,
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent !=0)
@@ -5081,7 +5004,7 @@ int ha_ndbcluster::end_bulk_delete()
     DBUG_PRINT("info", ("committing auto-commit+rbwr early"));
     uint ignore_count= 0;
     const int ignore_error= 1;
-    if (execute_commit(m_thd_ndb, trans,
+    if (execute_commit(table->in_use, m_thd_ndb, trans,
                        m_thd_ndb->m_force_send, ignore_error,
                        &ignore_count) != 0)
     {
@@ -5247,42 +5170,20 @@ int ha_ndbcluster::ndb_delete_row(const
 				 m_read_before_write_removal_used);
 
 #ifdef HAVE_NDB_BINLOG
-    uchar* ex_data_buffer= NULL;
-    /*
-      Room for 10 instruction words, two labels (@ 2words/label)
-      + 2 extra words for the case of resolve_size == 8
-    */
-    Uint32 buffer[16];
+    Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
     NdbInterpretedCode code(m_table, buffer,
                             sizeof(buffer)/sizeof(buffer[0]));
     if (thd->slave_thread && m_share->m_cfn_share &&
-        (m_share->m_cfn_share->m_resolve_cft != CFT_NDB_UNDEF))
+        m_share->m_cfn_share->m_conflict_fn)
     {
       /* Conflict resolution in slave thread. */
-      enum_conflict_fn_type cft= m_share->m_cfn_share->m_resolve_cft;
-      if (!delete_row_conflict_fn(cft, record, &code))
-      {
-        options.optionsPresent|=NdbOperation::OperationOptions::OO_INTERPRETED;
-        options.interpretedCode= &code;
-        thd_ndb->m_conflict_fn_usage_count++;
-      }
-
-      Ndb_exceptions_data ex_data;
-      ex_data.share= m_share;
-      /*
-        We need to save the row data for possible conflict resolution after
-        execute().
-      */
-      ex_data.row= copy_row_to_buffer(thd_ndb, record);
-      ex_data_buffer= get_buffer(thd_ndb, sizeof(ex_data));
-      if (ex_data.row == NULL || ex_data_buffer == NULL)
-      {
-        DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-      }
-      memcpy(ex_data_buffer, &ex_data, sizeof(ex_data));
-
-      options.optionsPresent|= NdbOperation::OperationOptions::OO_CUSTOMDATA;
-      options.customData= (void*)ex_data_buffer;
+      if (unlikely((error = prepare_conflict_detection(DELETE_ROW,
+                                                       key_rec,
+                                                       key_row, /* old_data */
+                                                       NULL,    /* new_data */
+                                                       &code,
+                                                       &options))))
+        DBUG_RETURN(error);
     }
 #endif /* HAVE_NDB_BINLOG */
     if (options.optionsPresent != 0)
@@ -6425,7 +6326,7 @@ ha_ndbcluster::flush_bulk_insert(bool al
     THD *thd= table->in_use;
     thd->transaction.all.modified_non_trans_table=
       thd->transaction.stmt.modified_non_trans_table= TRUE;
-    if (execute_commit(m_thd_ndb, trans, m_thd_ndb->m_force_send,
+    if (execute_commit(thd, m_thd_ndb, trans, m_thd_ndb->m_force_send,
                        m_ignore_no_key) != 0)
     {
       no_uncommitted_rows_execute_failure();
@@ -7190,9 +7091,10 @@ int ndbcluster_commit(handlerton *hton,
 
   if (thd->slave_thread)
   {
-    if (!thd_ndb->m_conflict_fn_usage_count || !thd_ndb->m_unsent_bytes ||
+    if (!g_ndb_slave_state.current_conflict_defined_op_count ||
+        !thd_ndb->m_unsent_bytes ||
         !(res= execute_no_commit(thd_ndb, trans, TRUE)))
-      res= execute_commit(thd_ndb, trans, 1, TRUE);
+      res= execute_commit(thd, thd_ndb, trans, 1, TRUE);
 
     update_slave_api_stats(thd_ndb->ndb);
   }
@@ -7215,7 +7117,7 @@ int ndbcluster_commit(handlerton *hton,
       }
     }
     else
-      res= execute_commit(thd_ndb, trans, THDVAR(thd, force_send), FALSE);
+      res= execute_commit(thd, thd_ndb, trans, THDVAR(thd, force_send), FALSE);
   }
 
   if (res != 0)
@@ -7289,9 +7191,8 @@ static int ndbcluster_rollback(handlerto
     DBUG_RETURN(0);
   }
   thd_ndb->save_point_count= 0;
-  thd_ndb->m_max_violation_count= 0;
-  thd_ndb->m_old_violation_count= 0;
-  thd_ndb->m_conflict_fn_usage_count= 0;
+  if (thd->slave_thread)
+    g_ndb_slave_state.atTransactionAbort();
   thd_ndb->m_unsent_bytes= 0;
   thd_ndb->m_execute_count++;
   DBUG_PRINT("info", ("execute_count: %u", thd_ndb->m_execute_count));
@@ -8372,6 +8273,34 @@ int ha_ndbcluster::create(const char *na
                         create_info->comment.length);
   const NDB_Modifier * mod_nologging = table_modifiers.get("NOLOGGING");
 
+#ifdef HAVE_NDB_BINLOG
+  /* Read ndb_replication entry for this table, if any */
+  Uint32 binlog_flags;
+  const st_conflict_fn_def* conflict_fn= NULL;
+  st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
+  Uint32 num_args = MAX_CONFLICT_ARGS;
+
+  int rep_read_rc= ndbcluster_get_binlog_replication_info(thd,
+                                                          ndb,
+                                                          m_dbname,
+                                                          m_tabname,
+                                                          ::server_id,
+                                                          form,
+                                                          &binlog_flags,
+                                                          &conflict_fn,
+                                                          args,
+                                                          &num_args);
+  if (rep_read_rc != 0)
+  {
+    DBUG_RETURN(rep_read_rc);
+  }
+
+  /* Reset database name */
+  ndb->setDatabaseName(m_dbname);
+
+  /* Use ndb_replication information as required */
+#endif
+
   if ((dict->beginSchemaTrans() == -1))
   {
     DBUG_PRINT("info", ("Failed to start schema transaction"));
@@ -8800,8 +8729,18 @@ cleanup_failed:
     {
 #ifdef HAVE_NDB_BINLOG
       if (share)
-        ndbcluster_read_binlog_replication(thd, ndb, share, m_table,
-                                           ::server_id, form, TRUE);
+      {
+        /* Set the Binlogging information we retrieved above */
+        ndbcluster_apply_binlog_replication_info(thd,
+                                                 share,
+                                                 m_table,
+                                                 form,
+                                                 conflict_fn,
+                                                 args,
+                                                 num_args,
+                                                 TRUE, /* Do set binlog flags */
+                                                 binlog_flags);
+      }
 #endif
       String event_name(INJECTOR_EVENT_LEN);
       ndb_rep_event_name(&event_name, m_dbname, m_tabname,

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-06-07 12:49:08 +0000
+++ b/sql/ha_ndbcluster.h	2011-06-16 14:39:44 +0000
@@ -126,30 +126,98 @@ typedef enum {
   NSS_ALTERED 
 } NDB_SHARE_STATE;
 
-#ifdef HAVE_NDB_BINLOG
 enum enum_conflict_fn_type
 {
   CFT_NDB_UNDEF = 0
   ,CFT_NDB_MAX
   ,CFT_NDB_OLD
   ,CFT_NDB_MAX_DEL_WIN
+  ,CFT_NUMBER_OF_CFTS /* End marker */
+};
+
+#ifdef HAVE_NDB_BINLOG
+static const Uint32 MAX_CONFLICT_ARGS= 8;
+
+enum enum_conflict_fn_arg_type
+{
+  CFAT_END
+  ,CFAT_COLUMN_NAME
+};
+
+struct st_conflict_fn_arg
+{
+  enum_conflict_fn_arg_type type;
+  const char *ptr;
+  uint32 len;
+  uint32 fieldno; // CFAT_COLUMN_NAME
+};
+
+struct st_conflict_fn_arg_def
+{
+  enum enum_conflict_fn_arg_type arg_type;
+  bool optional;
+};
+
+/* What type of operation was issued */
+enum enum_conflicting_op_type
+{                /* NdbApi          */
+  WRITE_ROW,     /* insert (!write) */
+  UPDATE_ROW,    /* update          */
+  DELETE_ROW     /* delete          */
+};
+
+/*
+  prepare_detect_func
+
+  Type of function used to prepare for conflict detection on
+  an NdbApi operation
+*/
+typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
+                                     enum_conflicting_op_type op_type,
+                                     const uchar* old_data,
+                                     const uchar* new_data,
+                                     const MY_BITMAP* write_set,
+                                     NdbInterpretedCode* code);
+
+struct st_conflict_fn_def
+{
+  const char *name;
+  enum_conflict_fn_type type;
+  const st_conflict_fn_arg_def* arg_defs;
+  prepare_detect_func prep_func;
+};
+
+/* What sort of conflict was found */
+enum enum_conflict_cause
+{
+  ROW_ALREADY_EXISTS,
+  ROW_DOES_NOT_EXIST,
+  ROW_IN_CONFLICT
 };
 
 /* NdbOperation custom data which points out handler and record. */
 struct Ndb_exceptions_data {
   struct st_ndbcluster_share *share;
+  const NdbRecord* key_rec;
   const uchar* row;
+  enum_conflicting_op_type op_type;
+};
+
+enum enum_conflict_fn_flags
+{
+  CFF_NONE = 0
 };
 
 typedef struct st_ndbcluster_conflict_fn_share {
-  enum_conflict_fn_type m_resolve_cft;
+  const st_conflict_fn_def* m_conflict_fn;
 
   /* info about original table */
   uint8 m_pk_cols;
   uint8 m_resolve_column;
   uint8 m_resolve_size;
-  uint8 unused;
+  uint8 m_flags;
   uint16 m_offset[16];
+  uint16 m_resolve_offset;
 
   const NdbDictionary::Table *m_ex_tab;
   uint32 m_count;
@@ -274,6 +342,26 @@ inline my_bool get_binlog_use_update(NDB
 { return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
 
 /*
+  State associated with the Slave thread
+  (From the Ndb handler's point of view)
+*/
+struct st_ndb_slave_state
+{
+  /* Counter values for current slave transaction */
+  Uint32 current_conflict_defined_op_count;
+  Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
+
+  /* Cumulative counter values */
+  Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
+
+  /* Methods */
+  void atTransactionCommit();
+  void atTransactionAbort();
+
+  st_ndb_slave_state();
+};
+
+/*
   Place holder for ha_ndbcluster thread specific data
 */
 
@@ -347,9 +435,6 @@ class Thd_ndb
   uint m_batch_size;
 
   uint m_execute_count;
-  uint m_max_violation_count;
-  uint m_old_violation_count;
-  uint m_conflict_fn_usage_count;
 
   uint m_scan_count;
   uint m_pruned_scan_count;
@@ -583,20 +668,12 @@ static void set_tabname(const char *path
 
 private:
 #ifdef HAVE_NDB_BINLOG
-  int delete_row_conflict_fn(enum_conflict_fn_type cft,
-                             const uchar *old_data,
-                             NdbInterpretedCode *);
-  int write_row_conflict_fn(enum_conflict_fn_type cft,
-                            uchar *data,
-                            NdbInterpretedCode *);
-  int update_row_conflict_fn(enum_conflict_fn_type cft,
-                             const uchar *old_data,
-                             uchar *new_data,
-                             NdbInterpretedCode *);
-  int row_conflict_fn_max(const uchar *new_data,
-                          NdbInterpretedCode *);
-  int row_conflict_fn_old(const uchar *old_data,
-                          NdbInterpretedCode *);
+  int prepare_conflict_detection(enum_conflicting_op_type op_type,
+                                 const NdbRecord* key_rec,
+                                 const uchar* old_data,
+                                 const uchar* new_data,
+                                 NdbInterpretedCode* code,
+                                 NdbOperation::OperationOptions* options);
 #endif
   void setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
                                     const uchar **key_row,

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2011-06-16 13:19:31 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2011-06-16 14:34:56 +0000
@@ -3741,18 +3741,13 @@ inline void slave_reset_conflict_fn(NDB_
   }
 }
 
-static int
-slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
-                     const NDBTAB *ndbtab, uint field_index,
-                     enum_conflict_fn_type type, TABLE *table)
+static uint
+slave_check_resolve_col_type(const NDBTAB *ndbtab,
+                             uint field_index)
 {
-  DBUG_ENTER("slave_set_resolve_fn");
-
-  Thd_ndb *thd_ndb= get_thd_ndb(thd);
-  Ndb *ndb= thd_ndb->ndb;
-  NDBDICT *dict= ndb->getDictionary();
+  DBUG_ENTER("slave_check_resolve_col_type");
   const NDBCOL *c= ndbtab->getColumn(field_index);
-  uint sz;
+  uint sz= 0;
   switch (c->getType())
   {
   case  NDBCOL::Unsigned:
@@ -3768,11 +3763,24 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   default:
     DBUG_PRINT("info", ("resolve column %u has wrong type",
                         field_index));
-    slave_reset_conflict_fn(share);
-    DBUG_RETURN(-1);
     break;
   }
+  DBUG_RETURN(sz);
+}
+
+static int
+slave_set_resolve_fn(THD *thd, NDB_SHARE *share,
+                     const NDBTAB *ndbtab, uint field_index,
+                     uint resolve_col_sz,
+                     const st_conflict_fn_def* conflict_fn,
+                     TABLE *table,
+                     uint8 flags)
+{
+  DBUG_ENTER("slave_set_resolve_fn");
 
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  Ndb *ndb= thd_ndb->ndb;
+  NDBDICT *dict= ndb->getDictionary();
   NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
   if (cfn_share == NULL)
   {
@@ -3780,9 +3788,14 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
       alloc_root(&share->mem_root, sizeof(NDB_CONFLICT_FN_SHARE));
     slave_reset_conflict_fn(share);
   }
-  cfn_share->m_resolve_size= sz;
+  cfn_share->m_conflict_fn= conflict_fn;
+
+  /* Calculate resolve col stuff (if relevant) */
+  cfn_share->m_resolve_size= resolve_col_sz;
   cfn_share->m_resolve_column= field_index;
-  cfn_share->m_resolve_cft= type;
+  cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
+                                        table->record[0]);
+  cfn_share->m_flags = flags;
 
   {
     /* get exceptions table */
@@ -3841,7 +3854,10 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
           cfn_share->m_pk_cols= nkey;
           ndbtab_g.release();
           if (opt_ndb_extra_logging)
-            sql_print_information("NDB Slave: log exceptions to %s",
+            sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
+                                  table->s->db.str,
+                                  table->s->table_name.str,
+                                  table->s->db.str,
                                   ex_tab_name);
         }
         else
@@ -3858,76 +3874,307 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   DBUG_RETURN(0);
 }
 
-enum enum_conflict_fn_arg_type
+/**
+  CFT_NDB_OLD
+
+  To perform conflict detection, an interpreted program is used to read
+  the timestamp stored locally and compare to what was on the master.
+  If timestamp is not equal, an error for this operation (9998) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  As an independent feature, phase 2 also saves the
+  conflicts into the table's exceptions table.
+*/
+int
+row_conflict_fn_old(st_ndbcluster_conflict_fn_share* cfn_share,
+                    enum_conflicting_op_type op_type,
+                    const uchar* old_data,
+                    const uchar* new_data,
+                    const MY_BITMAP* write_set,
+                    NdbInterpretedCode* code)
 {
-  CFAT_END
-  ,CFAT_COLUMN_NAME
-};
-struct st_conflict_fn_arg
+  DBUG_ENTER("row_conflict_fn_old");
+  uint32 resolve_column= cfn_share->m_resolve_column;
+  uint32 resolve_size= cfn_share->m_resolve_size;
+  const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
+
+  assert((resolve_size == 4) || (resolve_size == 8));
+
+  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
+  {
+    sql_print_information("NDB Slave: missing data for %s",
+                          cfn_share->m_conflict_fn->name);
+    DBUG_RETURN(1);
+  }
+
+  const uint label_0= 0;
+  const Uint32 RegOldValue= 1, RegCurrentValue= 2;
+  int r;
+
+  DBUG_PRINT("info",
+             ("Adding interpreted filter, existing value must eq event old value"));
+  /*
+   * read old value from record
+   */
+  union {
+    uint32 old_value_32;
+    uint64 old_value_64;
+  };
+  {
+    if (resolve_size == 4)
+    {
+      memcpy(&old_value_32, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  old_value_32: %u", old_value_32));
+    }
+    else
+    {
+      memcpy(&old_value_64, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  old_value_64: %llu",
+                          (unsigned long long) old_value_64));
+    }
+  }
+
+  /*
+   * Load registers RegOldValue and RegCurrentValue
+   */
+  if (resolve_size == 4)
+    r= code->load_const_u32(RegOldValue, old_value_32);
+  else
+    r= code->load_const_u64(RegOldValue, old_value_64);
+  DBUG_ASSERT(r == 0);
+  r= code->read_attr(RegCurrentValue, resolve_column);
+  DBUG_ASSERT(r == 0);
+  /*
+   * if RegOldValue == RegCurrentValue goto label_0
+   * else raise error for this row
+   */
+  r= code->branch_eq(RegOldValue, RegCurrentValue, label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_nok(error_conflict_fn_violation);
+  DBUG_ASSERT(r == 0);
+  r= code->def_label(label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_ok();
+  DBUG_ASSERT(r == 0);
+  r= code->finalise();
+  DBUG_ASSERT(r == 0);
+  DBUG_RETURN(r);
+}
+
+int
+row_conflict_fn_max_update_only(st_ndbcluster_conflict_fn_share* cfn_share,
+                                enum_conflicting_op_type op_type,
+                                const uchar* old_data,
+                                const uchar* new_data,
+                                const MY_BITMAP* write_set,
+                                NdbInterpretedCode* code)
+{
+  DBUG_ENTER("row_conflict_fn_max_update_only");
+  uint32 resolve_column= cfn_share->m_resolve_column;
+  uint32 resolve_size= cfn_share->m_resolve_size;
+  const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
+
+  assert((resolve_size == 4) || (resolve_size == 8));
+
+  if (unlikely(!bitmap_is_set(write_set, resolve_column)))
+  {
+    sql_print_information("NDB Slave: missing data for %s",
+                          cfn_share->m_conflict_fn->name);
+    DBUG_RETURN(1);
+  }
+
+  const uint label_0= 0;
+  const Uint32 RegNewValue= 1, RegCurrentValue= 2;
+  int r;
+
+  DBUG_PRINT("info",
+             ("Adding interpreted filter, existing value must be lt event new"));
+  /*
+   * read new value from record
+   */
+  union {
+    uint32 new_value_32;
+    uint64 new_value_64;
+  };
+  {
+    if (resolve_size == 4)
+    {
+      memcpy(&new_value_32, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  new_value_32: %u", new_value_32));
+    }
+    else
+    {
+      memcpy(&new_value_64, field_ptr, resolve_size);
+      DBUG_PRINT("info", ("  new_value_64: %llu",
+                          (unsigned long long) new_value_64));
+    }
+  }
+  /*
+   * Load registers RegNewValue and RegCurrentValue
+   */
+  if (resolve_size == 4)
+    r= code->load_const_u32(RegNewValue, new_value_32);
+  else
+    r= code->load_const_u64(RegNewValue, new_value_64);
+  DBUG_ASSERT(r == 0);
+  r= code->read_attr(RegCurrentValue, resolve_column);
+  DBUG_ASSERT(r == 0);
+  /*
+   * if RegNewValue > RegCurrentValue goto label_0
+   * else raise error for this row
+   */
+  r= code->branch_gt(RegNewValue, RegCurrentValue, label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_nok(error_conflict_fn_violation);
+  DBUG_ASSERT(r == 0);
+  r= code->def_label(label_0);
+  DBUG_ASSERT(r == 0);
+  r= code->interpret_exit_ok();
+  DBUG_ASSERT(r == 0);
+  r= code->finalise();
+  DBUG_ASSERT(r == 0);
+  DBUG_RETURN(r);
+}
+
+/**
+  CFT_NDB_MAX
+
+  To perform conflict resolution, an interpreted program is used to read
+  the timestamp stored locally and compare to what is going to be applied.
+  If timestamp is lower, an error for this operation (9999) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  Note that for delete, this algorithm reverts to the OLD algorithm.
+*/
+int
+row_conflict_fn_max(st_ndbcluster_conflict_fn_share* cfn_share,
+                    enum_conflicting_op_type op_type,
+                    const uchar* old_data,
+                    const uchar* new_data,
+                    const MY_BITMAP* write_set,
+                    NdbInterpretedCode* code)
 {
-  enum_conflict_fn_arg_type type;
-  const char *ptr;
-  uint32 len;
-  uint32 fieldno; // CFAT_COLUMN_NAME
+  switch(op_type)
+  {
+  case WRITE_ROW:
+    abort();
+    return 1;
+  case UPDATE_ROW:
+    return row_conflict_fn_max_update_only(cfn_share,
+                                           op_type,
+                                           old_data,
+                                           new_data,
+                                           write_set,
+                                           code);
+  case DELETE_ROW:
+    /* Can't use max of new image, as there's no new image
+     * for DELETE
+     * Use OLD instead
+     */
+    return row_conflict_fn_old(cfn_share,
+                               op_type,
+                               old_data,
+                               new_data,
+                               write_set,
+                               code);
+  default:
+    abort();
+    return 1;
+  }
+}
+
+
+/**
+  CFT_NDB_MAX_DEL_WIN
+
+  To perform conflict resolution, an interpreted program is used to read
+  the timestamp stored locally and compare to what is going to be applied.
+  If timestamp is lower, an error for this operation (9999) will be raised,
+  and new row will not be applied. The error codes for the operations will
+  be checked on return.  For this to work is is vital that the operation
+  is run with ignore error option.
+
+  In this variant, replicated DELETEs alway succeed - no filter is added
+  to them.
+*/
+
+int
+row_conflict_fn_max_del_win(st_ndbcluster_conflict_fn_share* cfn_share,
+                            enum_conflicting_op_type op_type,
+                            const uchar* old_data,
+                            const uchar* new_data,
+                            const MY_BITMAP* write_set,
+                            NdbInterpretedCode* code)
+{
+  switch(op_type)
+  {
+  case WRITE_ROW:
+    abort();
+    return 1;
+  case UPDATE_ROW:
+    return row_conflict_fn_max_update_only(cfn_share,
+                                           op_type,
+                                           old_data,
+                                           new_data,
+                                           write_set,
+                                           code);
+  case DELETE_ROW:
+    /* This variant always lets a received DELETE_ROW
+     * succeed.
+     */
+    return 1;
+  default:
+    abort();
+    return 1;
+  }
 };
-struct st_conflict_fn_def
+
+static const st_conflict_fn_arg_def resolve_col_args[]=
 {
-  const char *name;
-  enum_conflict_fn_type type;
-  enum enum_conflict_fn_arg_type arg_type;
+  /* Arg type              Optional */
+  { CFAT_COLUMN_NAME,      false },
+  { CFAT_END,              false }
 };
-static struct st_conflict_fn_def conflict_fns[]=
+
+static const st_conflict_fn_def conflict_fns[]=
 {
-   { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN, CFAT_COLUMN_NAME }
-  ,{ NULL,                 CFT_NDB_MAX_DEL_WIN, CFAT_END }
-  ,{ "NDB$MAX", CFT_NDB_MAX,   CFAT_COLUMN_NAME }
-  ,{ NULL,      CFT_NDB_MAX,   CFAT_END }
-  ,{ "NDB$OLD", CFT_NDB_OLD,   CFAT_COLUMN_NAME }
-  ,{ NULL,      CFT_NDB_OLD,   CFAT_END }
+  { "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
+    &resolve_col_args[0], row_conflict_fn_max_del_win },
+  { "NDB$MAX",            CFT_NDB_MAX,
+    &resolve_col_args[0], row_conflict_fn_max         },
+  { "NDB$OLD",            CFT_NDB_OLD,
+    &resolve_col_args[0], row_conflict_fn_old         },
 };
+
 static unsigned n_conflict_fns=
-sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
+  sizeof(conflict_fns) / sizeof(struct st_conflict_fn_def);
 
-static int
-set_conflict_fn(THD *thd, NDB_SHARE *share,
-                const NDBTAB *ndbtab,
-                const NDBCOL *conflict_col,
-                char *conflict_fn,
-                char *msg, uint msg_len,
-                TABLE *table)
-{
-  DBUG_ENTER("set_conflict_fn");
-  uint len= 0;
-  switch (conflict_col->getArrayType())
-  {
-  case NDBCOL::ArrayTypeShortVar:
-    len= *(uchar*)conflict_fn;
-    conflict_fn++;
-    break;
-  case NDBCOL::ArrayTypeMediumVar:
-    len= uint2korr(conflict_fn);
-    conflict_fn+= 2;
-    break;
-  default:
-    break;
-  }
-  conflict_fn[len]= '\0';
-  const char *ptr= conflict_fn;
+
+int
+parse_conflict_fn_spec(const char* conflict_fn_spec,
+                       const st_conflict_fn_def** conflict_fn,
+                       st_conflict_fn_arg* args,
+                       Uint32* max_args,
+                       const TABLE* table,
+                       char *msg, uint msg_len)
+{
+  DBUG_ENTER("parse_conflict_fn_spec");
+
+  Uint32 no_args = 0;
+  const char *ptr= conflict_fn_spec;
   const char *error_str= "unknown conflict resolution function";
   /* remove whitespace */
   while (*ptr == ' ' && *ptr != '\0') ptr++;
 
-  const unsigned MAX_ARGS= 8;
-  unsigned no_args= 0;
-  struct st_conflict_fn_arg args[MAX_ARGS];
-
-  DBUG_PRINT("info", ("parsing %s", conflict_fn));
+  DBUG_PRINT("info", ("parsing %s", conflict_fn_spec));
 
   for (unsigned i= 0; i < n_conflict_fns; i++)
   {
-    struct st_conflict_fn_def &fn= conflict_fns[i];
-    if (fn.name == NULL)
-      continue;
+    const st_conflict_fn_def &fn= conflict_fns[i];
 
     uint len= strlen(fn.name);
     if (strncmp(ptr, fn.name, len))
@@ -3953,9 +4200,16 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
     /* find all arguments */
     for (;;)
     {
+      if (no_args >= *max_args)
+      {
+        error_str= "too many arguments";
+        DBUG_PRINT("info", ("parse error %s", error_str));
+        break;
+      }
+
       /* expected type */
       enum enum_conflict_fn_arg_type type=
-        conflict_fns[i+no_args].arg_type;
+        conflict_fns[i].arg_defs[no_args].arg_type;
 
       /* remove whitespace */
       while (*ptr == ' ' && *ptr != '\0') ptr++;
@@ -3968,16 +4222,30 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
       }
 
       /* arg */
+      /* Todo : Should support comma as an arg separator? */
       const char *start_arg= ptr;
       while (*ptr != ')' && *ptr != ' ' && *ptr != '\0') ptr++;
       const char *end_arg= ptr;
 
+      bool optional_arg = conflict_fns[i].arg_defs[no_args].optional;
       /* any arg given? */
       if (start_arg == end_arg)
       {
-        error_str= "missing function argument";
-        DBUG_PRINT("info", ("parse error %s", error_str));
-        break;
+        if (!optional_arg)
+        {
+          error_str= "missing function argument";
+          DBUG_PRINT("info", ("parse error %s", error_str));
+          break;
+        }
+        else
+        {
+          /* Arg was optional, and not present
+           * Must be at end of args, finish parsing
+           */
+          args[no_args].type= CFAT_END;
+          error_str= NULL;
+          break;
+        }
       }
 
       uint len= end_arg - start_arg;
@@ -3988,6 +4256,7 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
  
       DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
 
+      bool arg_processing_error = false;
       switch (type)
       {
       case CFAT_COLUMN_NAME:
@@ -4012,6 +4281,8 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
         abort();
       }
 
+      if (arg_processing_error)
+        break;
       no_args++;
     }
 
@@ -4039,43 +4310,86 @@ set_conflict_fn(THD *thd, NDB_SHARE *sha
       break;
     }
 
-    /* setup the function */
-    switch (fn.type)
-    {
-    case CFT_NDB_MAX:
-    case CFT_NDB_OLD:
-    case CFT_NDB_MAX_DEL_WIN:
-      if (args[0].fieldno == (uint32)-1)
-        break;
-      if (slave_set_resolve_fn(thd, share, ndbtab, args[0].fieldno,
-                               fn.type, table))
-      {
-        /* wrong data type */
-        my_snprintf(msg, msg_len,
-                 "column '%s' has wrong datatype",
-                 table->s->field[args[0].fieldno]->field_name);
-        DBUG_PRINT("info", ("%s", msg));
-        DBUG_RETURN(-1);
-      }
-      if (opt_ndb_extra_logging)
-      {
-        sql_print_information("NDB Slave: conflict_fn %s on attribute %s",
-                              fn.name,
-                              table->s->field[args[0].fieldno]->field_name);
-      }
-      break;
-    case CFT_NDB_UNDEF:
-      abort();
-    }
+    /* Update ptrs to conflict fn + # of args */
+    *conflict_fn = &conflict_fns[i];
+    *max_args = no_args;
+
     DBUG_RETURN(0);
   }
   /* parse error */
   my_snprintf(msg, msg_len, "%s, %s at '%s'",
-           conflict_fn, error_str, ptr);
+           conflict_fn_spec, error_str, ptr);
   DBUG_PRINT("info", ("%s", msg));
   DBUG_RETURN(-1);
 }
 
+static int
+setup_conflict_fn(THD *thd, NDB_SHARE *share,
+                  const NDBTAB *ndbtab,
+                  char *msg, uint msg_len,
+                  TABLE *table,
+                  const st_conflict_fn_def* conflict_fn,
+                  const st_conflict_fn_arg* args,
+                  const Uint32 num_args)
+{
+  DBUG_ENTER("setup_conflict_fn");
+
+  /* setup the function */
+  switch (conflict_fn->type)
+  {
+  case CFT_NDB_MAX:
+  case CFT_NDB_OLD:
+  case CFT_NDB_MAX_DEL_WIN:
+  {
+    if (num_args != 1)
+    {
+      my_snprintf(msg, msg_len,
+                  "Incorrect arguments to conflict function");
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
+    uint resolve_col_sz= 0;
+
+    if (0 == (resolve_col_sz =
+              slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
+    {
+      /* wrong data type */
+      slave_reset_conflict_fn(share);
+      my_snprintf(msg, msg_len,
+                  "column '%s' has wrong datatype",
+                  table->s->field[args[0].fieldno]->field_name);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
+    if (slave_set_resolve_fn(thd, share, ndbtab,
+                             args[0].fieldno, resolve_col_sz,
+                             conflict_fn, table, CFF_NONE))
+    {
+      my_snprintf(msg, msg_len,
+                  "unable to setup conflict resolution using column '%s'",
+                  table->s->field[args[0].fieldno]->field_name);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+    if (opt_ndb_extra_logging)
+    {
+       sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
+                             table->s->db.str,
+                             table->s->table_name.str,
+                             conflict_fn->name,
+                             table->s->field[args[0].fieldno]->field_name);
+    }
+    break;
+  }
+  case CFT_NUMBER_OF_CFTS:
+  case CFT_NDB_UNDEF:
+    abort();
+  }
+  DBUG_RETURN(0);
+}
+
 static const char *ndb_rep_db= NDB_REP_DB;
 static const char *ndb_replication_table= NDB_REPLICATION_TABLE;
 static const char *nrt_db= "db";
@@ -4083,45 +4397,30 @@ static const char *nrt_table_name= "tabl
 static const char *nrt_server_id= "server_id";
 static const char *nrt_binlog_type= "binlog_type";
 static const char *nrt_conflict_fn= "conflict_fn";
+
+/*
+   ndbcluster_read_replication_table
+
+   This function reads the information for the supplied table from
+   the mysql.ndb_replication table.
+   Where there is no information (or no table), defaults are
+   returned.
+*/
 int
-ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
-                                   NDB_SHARE *share,
-                                   const NDBTAB *ndbtab,
-                                   uint server_id,
-                                   TABLE *table,
-                                   bool do_set_binlog_flags)
+ndbcluster_read_replication_table(THD *thd, Ndb *ndb,
+                                  const char* db,
+                                  const char* table_name,
+                                  uint server_id,
+                                  Uint32* binlog_flags,
+                                  char** conflict_fn_spec,
+                                  char* conflict_fn_buffer,
+                                  Uint32 conflict_fn_buffer_len)
 {
-  DBUG_ENTER("ndbcluster_read_binlog_replication");
-  const char *db= share->db;
-  const char *table_name= share->table_name;
+  DBUG_ENTER("ndbcluster_read_replication_table");
   NdbError ndberror;
   int error= 0;
   const char *error_str= "<none>";
 
-  /* Override for ndb_apply_status when logging */
-  if (opt_ndb_log_apply_status &&
-      do_set_binlog_flags)
-  {
-    if (strcmp(db, NDB_REP_DB) == 0 &&
-        strcmp(table_name, NDB_APPLY_TABLE) == 0)
-    {
-      /*
-        Ensure that we get all columns from ndb_apply_status updates
-        by forcing FULL event type
-        Also, ensure that ndb_apply_status events are always logged as 
-        WRITES.
-        This is needed so that received ndb_apply_status WRITES can be
-        cleanly propagated.
-      */
-      DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
-      sql_print_information("NDB : ndb-log-apply-status forcing "
-                            "%s.%s to FULL USE_WRITE",
-                            NDB_REP_DB, NDB_APPLY_TABLE);
-      set_binlog_flags(share, NBT_FULL);
-      DBUG_RETURN(0);
-    }
-  }
-
   ndb->setDatabaseName(ndb_rep_db);
   NDBDICT *dict= ndb->getDictionary();
   Ndb_table_guard ndbtab_g(dict, ndb_replication_table);
@@ -4131,8 +4430,8 @@ ndbcluster_read_binlog_replication(THD *
        dict->getNdbError().code == 4009))
   {
     DBUG_PRINT("info", ("No %s.%s table", ndb_rep_db, ndb_replication_table));
-    if (do_set_binlog_flags)
-      set_binlog_flags(share, NBT_DEFAULT);
+    *binlog_flags= NBT_DEFAULT;
+    *conflict_fn_spec= NULL;
     DBUG_RETURN(0);
   }
   const NDBCOL
@@ -4198,6 +4497,10 @@ ndbcluster_read_binlog_replication(THD *
     char *ndb_conflict_fn[2]= {ndb_conflict_fn_buf, ndb_conflict_fn_buf+sz};
     NdbOperation *op[2];
     uint32 i, id= 0;
+    /* Read generic row (server_id==0) and specific row (server_id == our id)
+     * from ndb_replication.
+     * Specific overrides generic, if present
+     */
     for (i= 0; i < 2; i++)
     {
       NdbOperation *_op;
@@ -4257,39 +4560,70 @@ ndbcluster_read_binlog_replication(THD *
     if (col_binlog_type_rec_attr[1] == NULL ||
         col_binlog_type_rec_attr[1]->isNULL())
     {
+      /* No specific value, use generic */
       col_binlog_type_rec_attr[1]= col_binlog_type_rec_attr[0];
       ndb_binlog_type[1]= ndb_binlog_type[0];
     }
     if (col_conflict_fn_rec_attr[1] == NULL ||
         col_conflict_fn_rec_attr[1]->isNULL())
     {
+      /* No specific value, use generic */
       col_conflict_fn_rec_attr[1]= col_conflict_fn_rec_attr[0];
       ndb_conflict_fn[1]= ndb_conflict_fn[0];
     }
 
-    if (do_set_binlog_flags)
+    if (col_binlog_type_rec_attr[1] == NULL ||
+        col_binlog_type_rec_attr[1]->isNULL())
     {
-      if (col_binlog_type_rec_attr[1] == NULL ||
-          col_binlog_type_rec_attr[1]->isNULL())
-        set_binlog_flags(share, NBT_DEFAULT);
-      else
-        set_binlog_flags(share, (enum Ndb_binlog_type) ndb_binlog_type[1]);
+      DBUG_PRINT("info", ("No binlog flag value, using default"));
+      /* No value */
+      *binlog_flags= NBT_DEFAULT;
     }
-    if (table)
+    else
     {
-      if (col_conflict_fn_rec_attr[1] == NULL ||
-          col_conflict_fn_rec_attr[1]->isNULL())
-        slave_reset_conflict_fn(share); /* no conflict_fn */
-      else if (set_conflict_fn(thd, share, ndbtab,
-                               col_conflict_fn, ndb_conflict_fn[1],
-                               tmp_buf, sizeof(tmp_buf), table))
+      DBUG_PRINT("info", ("Taking binlog flag value from the table"));
+      *binlog_flags= (enum Ndb_binlog_type) ndb_binlog_type[1];
+    }
+
+    if (col_conflict_fn_rec_attr[1] == NULL ||
+        col_conflict_fn_rec_attr[1]->isNULL())
+    {
+      /* No conflict function */
+      *conflict_fn_spec = NULL;
+    }
+    else
+    {
+      const char* conflict_fn = ndb_conflict_fn[1];
+      uint len= 0;
+      switch (col_conflict_fn->getArrayType())
+      {
+      case NDBCOL::ArrayTypeShortVar:
+        len= *(uchar*)conflict_fn;
+        conflict_fn++;
+        break;
+      case NDBCOL::ArrayTypeMediumVar:
+        len= uint2korr(conflict_fn);
+        conflict_fn+= 2;
+        break;
+      default:
+        abort();
+      }
+      if ((len + 1) > conflict_fn_buffer_len)
       {
-        error_str= tmp_buf;
-        error= 1;
         ndb->closeTransaction(trans);
+        error= -2;
+        error_str= "Conflict function specification too long.";
         goto err;
       }
+      memcpy(conflict_fn_buffer, conflict_fn, len);
+      conflict_fn_buffer[len] = '\0';
+      *conflict_fn_spec = conflict_fn_buffer;
     }
+
+    DBUG_PRINT("info", ("Retrieved Binlog flags : %u and function spec : %s",
+                        *binlog_flags, (*conflict_fn_spec != NULL ?*conflict_fn_spec:
+                                       "NULL")));
+
     ndb->closeTransaction(trans);
 
     DBUG_RETURN(0);
@@ -4298,14 +4632,7 @@ ndbcluster_read_binlog_replication(THD *
 err:
   DBUG_PRINT("info", ("error %d, error_str %s, ndberror.code %u",
                       error, error_str, ndberror.code));
-  if (error > 0)
-  {
-    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
-                        ER_CONFLICT_FN_PARSE_ERROR,
-                        ER(ER_CONFLICT_FN_PARSE_ERROR),
-                        error_str);
-  }
-  else if (error < 0)
+  if (error < 0)
   {
     char msg[FN_REFLEN];
     switch (error)
@@ -4337,12 +4664,192 @@ err:
                         ER(ER_ILLEGAL_HA_CREATE_OPTION),
                         ndbcluster_hton_name, msg);  
   }
-  if (do_set_binlog_flags)
-    set_binlog_flags(share, NBT_DEFAULT);
+  *binlog_flags= NBT_DEFAULT;
+  *conflict_fn_spec= NULL;
+
   if (ndberror.code && opt_ndb_extra_logging)
     print_warning_list("NDB", thd_warn_list(thd));
   DBUG_RETURN(ndberror.code);
 }
+
+/*
+  ndbcluster_get_binlog_replication_info
+
+  This function retrieves the data for the given table
+  from the ndb_replication table.
+
+  If the table is not found, or the table does not exist,
+  then defaults are returned.
+*/
+int
+ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
+                                       const char* db,
+                                       const char* table_name,
+                                       uint server_id,
+                                       const TABLE *table,
+                                       Uint32* binlog_flags,
+                                       const st_conflict_fn_def** conflict_fn,
+                                       st_conflict_fn_arg* args,
+                                       Uint32* num_args)
+{
+  DBUG_ENTER("ndbcluster_get_binlog_replication_info");
+
+  /* Override for ndb_apply_status when logging */
+  if (opt_ndb_log_apply_status)
+  {
+    if (strcmp(db, NDB_REP_DB) == 0 &&
+        strcmp(table_name, NDB_APPLY_TABLE) == 0)
+    {
+      /*
+        Ensure that we get all columns from ndb_apply_status updates
+        by forcing FULL event type
+        Also, ensure that ndb_apply_status events are always logged as
+        WRITES.
+      */
+      DBUG_PRINT("info", ("ndb_apply_status defaulting to FULL, USE_WRITE"));
+      sql_print_information("NDB : ndb-log-apply-status forcing "
+                            "%s.%s to FULL USE_WRITE",
+                            NDB_REP_DB, NDB_APPLY_TABLE);
+      *binlog_flags = NBT_FULL;
+      *conflict_fn = NULL;
+      *num_args = 0;
+      DBUG_RETURN(0);
+    }
+  }
+
+  const Uint32 MAX_CONFLICT_FN_SPEC_LEN = 256;
+  char conflict_fn_buffer[MAX_CONFLICT_FN_SPEC_LEN];
+  char* conflict_fn_spec;
+
+  if (ndbcluster_read_replication_table(thd,
+                                        ndb,
+                                        db,
+                                        table_name,
+                                        server_id,
+                                        binlog_flags,
+                                        &conflict_fn_spec,
+                                        conflict_fn_buffer,
+                                        MAX_CONFLICT_FN_SPEC_LEN) != 0)
+  {
+    DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
+  }
+
+  if (table != NULL)
+  {
+    if (conflict_fn_spec != NULL)
+    {
+      char tmp_buf[FN_REFLEN];
+
+      if (parse_conflict_fn_spec(conflict_fn_spec,
+                                 conflict_fn,
+                                 args,
+                                 num_args,
+                                 table,
+                                 tmp_buf,
+                                 sizeof(tmp_buf)) != 0)
+      {
+        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                            ER_CONFLICT_FN_PARSE_ERROR,
+                            ER(ER_CONFLICT_FN_PARSE_ERROR),
+                            tmp_buf);
+        DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
+      }
+    }
+    else
+    {
+      /* No conflict function specified */
+      conflict_fn= NULL;
+      num_args= 0;
+    }
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndbcluster_apply_binlog_replication_info(THD *thd,
+                                         NDB_SHARE *share,
+                                         const NDBTAB* ndbtab,
+                                         TABLE* table,
+                                         const st_conflict_fn_def* conflict_fn,
+                                         const st_conflict_fn_arg* args,
+                                         Uint32 num_args,
+                                         bool do_set_binlog_flags,
+                                         Uint32 binlog_flags)
+{
+  DBUG_ENTER("ndbcluster_apply_binlog_replication_info");
+  char tmp_buf[FN_REFLEN];
+
+  if (do_set_binlog_flags)
+  {
+    DBUG_PRINT("info", ("Setting binlog flags to %u", binlog_flags));
+    set_binlog_flags(share, (enum Ndb_binlog_type)binlog_flags);
+  }
+
+  if (conflict_fn != NULL)
+  {
+    if (setup_conflict_fn(thd, share,
+                          ndbtab,
+                          tmp_buf, sizeof(tmp_buf),
+                          table,
+                          conflict_fn,
+                          args,
+                          num_args) != 0)
+    {
+      push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                          ER_CONFLICT_FN_PARSE_ERROR,
+                          ER(ER_CONFLICT_FN_PARSE_ERROR),
+                          tmp_buf);
+      DBUG_RETURN(-1);
+    }
+  }
+  else
+  {
+    /* No conflict function specified */
+    slave_reset_conflict_fn(share);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
+                                   NDB_SHARE *share,
+                                   const NDBTAB *ndbtab,
+                                   uint server_id,
+                                   TABLE *table,
+                                   bool do_set_binlog_flags)
+{
+  DBUG_ENTER("ndbcluster_read_binlog_replication");
+  Uint32 binlog_flags;
+  const st_conflict_fn_def* conflict_fn= NULL;
+  st_conflict_fn_arg args[MAX_CONFLICT_ARGS];
+  Uint32 num_args = MAX_CONFLICT_ARGS;
+
+  if ((ndbcluster_get_binlog_replication_info(thd, ndb,
+                                              share->db,
+                                              share->table_name,
+                                              server_id,
+                                              table,
+                                              &binlog_flags,
+                                              &conflict_fn,
+                                              args,
+                                              &num_args) != 0) ||
+      (ndbcluster_apply_binlog_replication_info(thd,
+                                                share,
+                                                ndbtab,
+                                                table,
+                                                conflict_fn,
+                                                args,
+                                                num_args,
+                                                do_set_binlog_flags,
+                                                binlog_flags) != 0))
+  {
+    DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
 #endif /* HAVE_NDB_BINLOG */
 
 bool

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2011-02-18 13:55:27 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2011-06-16 14:34:56 +0000
@@ -95,8 +95,7 @@ static const char *ha_ndb_ext=".ndb";
 #define NDB_EXCEPTIONS_TABLE_SUFFIX "$EX"
 #define NDB_EXCEPTIONS_TABLE_SUFFIX_LOWER "$ex"
 
-const uint error_conflict_fn_old_violation= 9998;
-const uint error_conflict_fn_max_violation= 9999;
+const uint error_conflict_fn_violation= 9999;
 #endif /* HAVE_NDB_BINLOG */
 
 
@@ -237,6 +236,26 @@ void ndb_rep_event_name(String *event_na
                         const char *db, const char *tbl, my_bool full);
 #ifdef HAVE_NDB_BINLOG
 int
+ndbcluster_get_binlog_replication_info(THD *thd, Ndb *ndb,
+                                       const char* db,
+                                       const char* table_name,
+                                       uint server_id,
+                                       const TABLE *table,
+                                       Uint32* binlog_flags,
+                                       const st_conflict_fn_def** conflict_fn,
+                                       st_conflict_fn_arg* args,
+                                       Uint32* num_args);
+int
+ndbcluster_apply_binlog_replication_info(THD *thd,
+                                         NDB_SHARE *share,
+                                         const NDBTAB* ndbtab,
+                                         TABLE* table,
+                                         const st_conflict_fn_def* conflict_fn,
+                                         const st_conflict_fn_arg* args,
+                                         Uint32 num_args,
+                                         bool do_set_binlog_flags,
+                                         Uint32 binlog_flags);
+int
 ndbcluster_read_binlog_replication(THD *thd, Ndb *ndb,
                                    NDB_SHARE *share,
                                    const NDBTAB *ndbtab,

No bundle (reason: revision is a merge (you can force generation of a bundle with env var BZR_FORCE_BUNDLE=1)).
Thread
bzr commit into mysql-5.1-telco-7.1 branch (frazer.clement:4246) Frazer Clement16 Jun