List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:January 31 2012 11:20am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (frazer.clement:3799 to 3800)
View as plain text  
 3800 Frazer Clement	2012-01-31 [merge]
      Merge 7.1->7.2

    added:
      sql/ndb_conflict.cc
      sql/ndb_conflict.h
    modified:
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ndb_share.cc
      sql/ndb_share.h
      sql/ndb_thd.cc
      storage/ndb/CMakeLists.txt
 3799 jonas oreland	2012-01-31 [merge]
      ndb - merge 71 to 72

    modified:
      storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result	2012-01-23 15:47:46 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result	2012-01-31 10:01:22 +0000
@@ -274,6 +274,142 @@ relevant
 [note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
 [note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
 drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
+delete from mysql.ndb_replication;
+Test exceptions table schema flexibility
+insert into mysql.ndb_replication values ("test", "t1", 0, 7, "NDB$MAX(X)");
+Test 'normal' mandatory column names + all table pks
+create table test.t1$EX(
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+b int not null,
+c int not null,
+primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1);
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+server_id	master_server_id	master_epoch	count	a	b	c
+2	1	<epoch_num>	1	1	1	1
+drop table test.t1;
+drop table test.t1$EX;
+Test 'normal' mandatory column names + all table pks +
+extra columns with same and different names to main table columns
+Also a defaulted extra column.
+create table test.t1$EX(
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+b int not null,
+c int not null,
+d int,                     # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1);
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+server_id	master_server_id	master_epoch	count	a	b	c	d	lilljeholmen
+2	1	<epoch_num>	1	1	1	1	NULL	Slussen
+drop table test.t1;
+drop table test.t1$EX;
+Test unusual mandatory column names + all table pks +
+extra columns with same and different names to main table columns
+Also a defaulted extra column.
+create table test.t1$EX(
+monteverdi int unsigned,
+asparagi int unsigned,
+plenipotentiary bigint unsigned,
+mountebank int unsigned,
+a int not null,
+b int not null,
+c int not null,
+d int,                     # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1);
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+monteverdi	asparagi	plenipotentiary	mountebank	a	b	c	d	lilljeholmen
+2	1	<epoch_num>	1	1	1	1	NULL	Slussen
+drop table test.t1;
+drop table test.t1$EX;
+Test unusual mandatory column names + all table pks which are same
+as 'normal' exceptions table column names plus extra columns with
+same and different names to main table columns
+Also a defaulted extra column.
+create table test.t1$EX(
+monteverdi int unsigned,
+asparagi int unsigned,
+plenipotentiary bigint unsigned,
+mountebank int unsigned,
+server_id int unsigned not null,
+master_server_id int unsigned not null,
+master_epoch bigint unsigned not null,
+count int unsigned not null,
+d int,                     # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+create table test.t1 (server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+d int, e int, X int unsigned,
+primary key(server_id, master_server_id,
+master_epoch, count)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1,1);
+update test.t1 set X=0 where server_id=1 and master_server_id=1 and master_epoch=1 and count=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+monteverdi	asparagi	plenipotentiary	mountebank	server_id	master_server_id	master_epoch	count	d	lilljeholmen
+2	1	<epoch_num>	1	1	1	1	1	NULL	Slussen
+drop table test.t1;
+drop table test.t1$EX;
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+And some bad exceptions table schemata
+Keys in wrong positions
+create table test.t1$EX(
+a int not null,
+b int not null,
+c int not null,
+d int,                     # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+show warnings;
+Level	Code	Message
+MySQLD error output for server 1.1 matching pattern %NDB Slave%
+relevant
+[note] ndb slave: table test.t1 using conflict_fn ndb$max on attribute x.
+[warning] ndb slave: exceptions table t1$ex has wrong definition (initial 4 columns)
+drop table test.t1;
+drop table test.t1$EX;
 "Cleanup"
 drop table mysql.ndb_replication;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test	2012-01-23 15:47:46 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test	2012-01-31 10:01:22 +0000
@@ -373,6 +373,208 @@ show variables like 'server_id';
 --connection master
 drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
 
+delete from mysql.ndb_replication;
+
+--echo Test exceptions table schema flexibility
+#
+# An exceptions table should be able to have the mandatory columns with
+# different names, as long as the types match.
+# Also, not all main table primary key parts need be present
+# Finally, arbitrary extra columns should be allowed, as long as
+# they can be defaulted.
+#
+
+insert into mysql.ndb_replication values ("test", "t1", 0, 7, "NDB$MAX(X)");
+
+--echo Test 'normal' mandatory column names + all table pks
+
+create table test.t1$EX(
+   server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   a int not null,
+   b int not null,
+   c int not null,
+   primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+                      primary key(a,b,c)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--echo Test 'normal' mandatory column names + all table pks +
+--echo extra columns with same and different names to main table columns
+--echo Also a defaulted extra column.
+
+create table test.t1$EX(
+   server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   a int not null,
+   b int not null,
+   c int not null,
+   d int,                     # Same name as main table, but user defined
+   lilljeholmen varchar(50) default 'Slussen',
+                              # Separate, user defined
+   primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+                      primary key(a,b,c)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--echo Test unusual mandatory column names + all table pks +
+--echo extra columns with same and different names to main table columns
+--echo Also a defaulted extra column.
+
+create table test.t1$EX(
+   monteverdi int unsigned,
+   asparagi int unsigned,
+   plenipotentiary bigint unsigned,
+   mountebank int unsigned,
+   a int not null,
+   b int not null,
+   c int not null,
+   d int,                     # Same name as main table, but user defined
+   lilljeholmen varchar(50) default 'Slussen',
+                              # Separate, user defined
+   primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+                      primary key(a,b,c)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--echo Test unusual mandatory column names + all table pks which are same
+--echo as 'normal' exceptions table column names plus extra columns with
+--echo same and different names to main table columns
+--echo Also a defaulted extra column.
+
+create table test.t1$EX(
+   monteverdi int unsigned,
+   asparagi int unsigned,
+   plenipotentiary bigint unsigned,
+   mountebank int unsigned,
+   server_id int unsigned not null,
+   master_server_id int unsigned not null,
+   master_epoch bigint unsigned not null,
+   count int unsigned not null,
+   d int,                     # Same name as main table, but user defined
+   lilljeholmen varchar(50) default 'Slussen',
+                              # Separate, user defined
+   primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+
+create table test.t1 (server_id int unsigned,
+                      master_server_id int unsigned,
+                      master_epoch bigint unsigned,
+                      count int unsigned,
+                      d int, e int, X int unsigned,
+                      primary key(server_id, master_server_id,
+                                  master_epoch, count)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where server_id=1 and master_server_id=1 and master_epoch=1 and count=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--connection server1
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+--connection server2
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+--connection slave
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+--connection master
+
+--echo And some bad exceptions table schemata
+--echo   Keys in wrong positions
+create table test.t1$EX(
+   a int not null,
+   b int not null,
+   c int not null,
+   d int,                     # Same name as main table, but user defined
+   lilljeholmen varchar(50) default 'Slussen',
+                              # Separate, user defined
+   server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+                      primary key(a,b,c)) engine=ndb;
+show warnings;
+
+--let $server_num=1.1
+--let $pattern=%NDB Slave%
+--let $limit=2
+
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+drop table test.t1;
+drop table test.t1$EX;
+
+
+
+
 ###############
 --echo "Cleanup"
 

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2012-01-25 17:50:29 +0000
+++ b/sql/ha_ndbcluster.cc	2012-01-31 11:11:20 +0000
@@ -50,7 +50,7 @@
 #include <mysql/plugin.h>
 #include <ndb_version.h>
 #include "ndb_mi.h"
-#include "ndb_conflict_trans.h"
+#include "ndb_conflict.h"
 #include "ndb_anyvalue.h"
 #include "ndb_binlog_extra_row_info.h"
 #include "ndb_event_data.h"
@@ -386,11 +386,6 @@ ndbcluster_alter_table_flags(uint flags)
 
 #define NDB_AUTO_INCREMENT_RETRIES 100
 #define BATCH_FLUSH_SIZE (32768)
-/*
-  Room for 10 instruction words, two labels (@ 2words/label)
-  + 2 extra words for the case of resolve_size == 8
-*/
-#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16
 
 static int ndb_to_mysql_error(const NdbError *ndberr);
 
@@ -870,8 +865,7 @@ static int ndb_to_mysql_error(const NdbE
 #ifdef HAVE_NDB_BINLOG
 
 int
-handle_conflict_op_error(Thd_ndb* thd_ndb,
-                         NdbTransaction* trans,
+handle_conflict_op_error(NdbTransaction* trans,
                          const NdbError& err,
                          const NdbOperation* op);
 
@@ -937,8 +931,7 @@ check_completed_operations_pre_commit(Th
 
       if (err.classification != NdbError::NoError)
       {
-        int res = handle_conflict_op_error(thd_ndb,
-                                           trans,
+        int res = handle_conflict_op_error(trans,
                                            err,
                                            first);
         if (res != 0)
@@ -2128,11 +2121,6 @@ int ha_ndbcluster::get_metadata(THD *thd
 
   ndbtab_g.release();
 
-#ifdef HAVE_NDB_BINLOG
-  ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table,
-                                     ::server_id, FALSE);
-#endif
-
   DBUG_RETURN(0);
 
 err:
@@ -4415,610 +4403,10 @@ thd_allow_batch(const THD* thd)
 #endif
 }
 
-/**
-   st_ndb_slave_state constructor
-
-   Initialise Ndb Slave state object
-*/
-st_ndb_slave_state::st_ndb_slave_state()
-  : current_master_server_epoch(0),
-    current_max_rep_epoch(0),
-    conflict_flags(0),
-    retry_trans_count(0),
-    current_trans_row_conflict_count(0),
-    current_trans_row_reject_count(0),
-    current_trans_in_conflict_count(0),
-    max_rep_epoch(0),
-    sql_run_id(~Uint32(0)),
-    trans_row_conflict_count(0),
-    trans_row_reject_count(0),
-    trans_detect_iter_count(0),
-    trans_in_conflict_count(0),
-    trans_conflict_commit_count(0),
-    trans_conflict_apply_state(SAS_NORMAL),
-    trans_dependency_tracker(NULL)
-{
-  memset(current_violation_count, 0, sizeof(current_violation_count));
-  memset(total_violation_count, 0, sizeof(total_violation_count));
-
-  /* Init conflict handling state memroot */
-  const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
-  init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
-};
-
-/**
-   resetPerAttemptCounters
-
-   Reset the per-epoch-transaction-application-attempt counters
-*/
-void
-st_ndb_slave_state::resetPerAttemptCounters()
-{
-  memset(current_violation_count, 0, sizeof(current_violation_count));
-  current_trans_row_conflict_count = 0;
-  current_trans_row_reject_count = 0;
-  current_trans_in_conflict_count = 0;
-
-  conflict_flags = 0;
-  current_max_rep_epoch = 0;
-}
-
-/**
-   atTransactionAbort()
-
-   Called by Slave SQL thread during transaction abort.
-*/
-void
-st_ndb_slave_state::atTransactionAbort()
-{
-  /* Reset current-transaction counters + state */
-  resetPerAttemptCounters();
-}
-
-/**
-   atTransactionCommit()
-
-   Called by Slave SQL thread after transaction commit
-*/
-void
-st_ndb_slave_state::atTransactionCommit()
-{
-  assert( ((trans_dependency_tracker == NULL) &&
-           (trans_conflict_apply_state == SAS_NORMAL)) ||
-          ((trans_dependency_tracker != NULL) &&
-           (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
-  assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
-
-  /* Merge committed transaction counters into total state
-   * Then reset current transaction counters
-   */
-  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
-  {
-    total_violation_count[i]+= current_violation_count[i];
-  }
-  trans_row_conflict_count+= current_trans_row_conflict_count;
-  trans_row_reject_count+= current_trans_row_reject_count;
-  trans_in_conflict_count+= current_trans_in_conflict_count;
-
-  if (current_trans_in_conflict_count)
-    trans_conflict_commit_count++;
-
-  if (current_max_rep_epoch > max_rep_epoch)
-  {
-    DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
-                        max_rep_epoch,
-                        current_max_rep_epoch));
-    max_rep_epoch = current_max_rep_epoch;
-  }
-
-  resetPerAttemptCounters();
-
-  /* Clear per-epoch-transaction retry_trans_count */
-  retry_trans_count = 0;
-}
-
-/**
-   atApplyStatusWrite
-
-   Called by Slave SQL thread when applying an event to the
-   ndb_apply_status table
-*/
-void
-st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
-                                       Uint32 row_server_id,
-                                       Uint64 row_epoch,
-                                       bool is_row_server_id_local)
-{
-  if (row_server_id == master_server_id)
-  {
-    /*
-       WRITE_ROW to ndb_apply_status injected by MySQLD
-       immediately upstream of us.
-       Record epoch
-    */
-    current_master_server_epoch = row_epoch;
-    assert(! is_row_server_id_local);
-  }
-  else if (is_row_server_id_local)
-  {
-    DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
-                        " which is %s.",
-                        row_server_id, row_epoch,
-                        (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
-                        " new highest." : " older than previously applied"));
-    if (row_epoch > current_max_rep_epoch)
-    {
-      /*
-        Store new highest epoch in thdvar.  If we commit successfully
-        then this can become the new global max
-      */
-      current_max_rep_epoch = row_epoch;
-    }
-  }
-}
-
-/**
-   atResetSlave()
-
-   Called when RESET SLAVE command issued - in context of command client.
-*/
-void
-st_ndb_slave_state::atResetSlave()
-{
-  /* Reset the Maximum replicated epoch vars
-   * on slave reset
-   * No need to touch the sql_run_id as that
-   * will increment if the slave is started
-   * again.
-   */
-  resetPerAttemptCounters();
-
-  retry_trans_count = 0;
-  max_rep_epoch = 0;
-}
-
-/**
-   atStartSlave()
-
-   Called by Slave SQL thread when first applying a row to Ndb after
-   a START SLAVE command.
-*/
-void
-st_ndb_slave_state::atStartSlave()
-{
-#ifdef HAVE_NDB_BINLOG
-  if (trans_conflict_apply_state != SAS_NORMAL)
-  {
-    /*
-      Remove conflict handling state on a SQL thread
-      restart
-    */
-    atEndTransConflictHandling();
-    trans_conflict_apply_state = SAS_NORMAL;
-  }
-#endif
-};
 
 #ifdef HAVE_NDB_BINLOG
 
 /**
-   atBeginTransConflictHandling()
-
-   Called by Slave SQL thread when it determines that Transactional
-   Conflict handling is required
-*/
-void
-st_ndb_slave_state::atBeginTransConflictHandling()
-{
-  DBUG_ENTER("atBeginTransConflictHandling");
-  /*
-     Allocate and initialise Transactional Conflict
-     Resolution Handling Structures
-  */
-  assert(trans_dependency_tracker == NULL);
-  trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
-  DBUG_VOID_RETURN;
-};
-
-/**
-   atPrepareConflictDetection
-
-   Called by Slave SQL thread prior to defining an operation on
-   a table with conflict detection defined.
-*/
-int
-st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
-                                               const NdbRecord* key_rec,
-                                               const uchar* row_data,
-                                               Uint64 transaction_id,
-                                               bool& handle_conflict_now)
-{
-  DBUG_ENTER("atPrepareConflictDetection");
-  /*
-    Slave is preparing to apply an operation with conflict detection.
-    If we're performing Transactional Conflict Resolution, take
-    extra steps
-  */
-  switch( trans_conflict_apply_state )
-  {
-  case SAS_NORMAL:
-    DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
-    /* No special handling */
-    break;
-  case SAS_TRACK_TRANS_DEPENDENCIES:
-  {
-    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
-    /*
-      Track this operation and its transaction id, to determine
-      inter-transaction dependencies by {table, primary key}
-    */
-    assert( trans_dependency_tracker );
-
-    int res = trans_dependency_tracker
-      ->track_operation(table,
-                        key_rec,
-                        row_data,
-                        transaction_id);
-    if (res != 0)
-    {
-      sql_print_error("%s", trans_dependency_tracker->get_error_text());
-      DBUG_RETURN(res);
-    }
-    /* Proceed as normal */
-    break;
-  }
-  case SAS_APPLY_TRANS_DEPENDENCIES:
-  {
-    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
-    /*
-       Check if this operation's transaction id is marked in-conflict.
-       If it is, we tell the caller to perform conflict resolution now instead
-       of attempting to apply the operation.
-    */
-    assert( trans_dependency_tracker );
-
-    if (trans_dependency_tracker->in_conflict(transaction_id))
-    {
-      DBUG_PRINT("info", ("Event for transaction %llu is conflicting.  Handling.",
-                          transaction_id));
-      current_trans_row_reject_count++;
-      handle_conflict_now = true;
-      DBUG_RETURN(0);
-    }
-
-    /*
-       This transaction is not marked in-conflict, so continue with normal
-       processing.
-       Note that normal processing may subsequently detect a conflict which
-       didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
-       In this case, we will rollback and repeat the TRACK_DEPENDENCIES
-       stage.
-    */
-    DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
-                        transaction_id));
-    break;
-  }
-  }
-  DBUG_RETURN(0);
-}
-
-/**
-   atTransConflictDetected
-
-   Called by the Slave SQL thread when a conflict is detected on
-   an executed operation.
-*/
-int
-st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
-{
-  DBUG_ENTER("atTransConflictDetected");
-
-  /*
-     The Slave has detected a conflict on an operation applied
-     to a table with Transactional Conflict Resolution defined.
-     Handle according to current state.
-  */
-  conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
-  current_trans_row_conflict_count++;
-
-  switch (trans_conflict_apply_state)
-  {
-  case SAS_NORMAL:
-  {
-    DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
-                        "Requires multi-pass resolution.  Will transition to "
-                        "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
-    /*
-      Conflict on table with transactional conflict resolution
-      defined.
-      This is the trigger that we will do transactional conflict
-      resolution.
-      Record that we need to do multiple passes to correctly
-      perform resolution.
-      TODO : Early exit from applying epoch?
-    */
-    break;
-  }
-  case SAS_TRACK_TRANS_DEPENDENCIES:
-  {
-    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
-                        "had conflict",
-                        transaction_id));
-    /*
-       Conflict on table with transactional conflict resolution
-       defined.
-       We will mark the operation's transaction_id as in-conflict,
-       so that any other operations on the transaction are also
-       considered in-conflict, and any dependent transactions are also
-       considered in-conflict.
-    */
-    assert(trans_dependency_tracker != NULL);
-    int res = trans_dependency_tracker
-      ->mark_conflict(transaction_id);
-
-    if (res != 0)
-    {
-      sql_print_error("%s", trans_dependency_tracker->get_error_text());
-      DBUG_RETURN(res);
-    }
-    break;
-  }
-  case SAS_APPLY_TRANS_DEPENDENCIES:
-  {
-    /*
-       This must be a new conflict, not noticed on the previous
-       pass.
-    */
-    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected.  "
-                        "Must be further conflict.  Will return to "
-                        "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
-    // TODO : Early exit from applying epoch
-    break;
-  }
-  default:
-    break;
-  }
-
-  DBUG_RETURN(0);
-}
-
-/**
-   atConflictPreCommit
-
-   Called by the Slave SQL thread prior to committing a Slave transaction.
-   This method can request that the Slave transaction is retried.
-
-
-   State transitions :
-
-                       START SLAVE /
-                       RESET SLAVE /
-                        STARTUP
-                            |
-                            |
-                            v
-                    ****************
-                    *  SAS_NORMAL  *
-                    ****************
-                       ^       |
-    No transactional   |       | Conflict on transactional table
-       conflicts       |       | (Rollback)
-       (Commit)        |       |
-                       |       v
-            **********************************
-            *  SAS_TRACK_TRANS_DEPENDENCIES  *
-            **********************************
-               ^          I              ^
-     More      I          I Dependencies |
-    conflicts  I          I determined   | No new conflicts
-     found     I          I (Rollback)   | (Commit)
-    (Rollback) I          I              |
-               I          v              |
-           **********************************
-           *  SAS_APPLY_TRANS_DEPENDENCIES  *
-           **********************************
-
-
-   Operation
-     The initial state is SAS_NORMAL.
-
-     On detecting a conflict on a transactional conflict detetecing table,
-     SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
-     rolled back and reapplied.
-
-     In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
-     conflicts are tracked as the epoch transaction is applied.
-
-     Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
-     the epoch transaction is rolled back and reapplied.
-
-     In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
-     marked as in-conflict are not applied.
-
-     If this results in no new conflicts, the epoch transaction is committed,
-     and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
-     the next replicated epch transaction.
-     If it results in new conflicts, the epoch transactions is rolled back, and
-     the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
-     the new set of dependencies.
-
-     If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
-     the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
-     state.
-
-
-   Properties
-     1) Normally, there is no transaction dependency tracking overhead paid by
-        the slave.
-
-     2) On first detecting a transactional conflict, the epoch transaction must be
-        applied at least three times, with two rollbacks.
-
-     3) Transactional conflicts detected in subsequent epochs require the epoch
-        transaction to be applied two times, with one rollback.
-
-     4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
-        DEPENDENCIES occurs when further transactional conflicts are discovered
-        in SAS_APPLY_TRANS_DEPENDENCIES state.  This implies that the  conflicts
-        discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
-        so we revisit that state to get a more complete picture.
-
-     5) The number of iterations of this loop is fixed to a hard coded limit, after
-        which the Slave will stop with an error.  This should be an unlikely
-        occurrence, as it requires not just n conflicts, but at least 1 new conflict
-        appearing between the transactions in the epoch transaction and the
-        database between the two states, n times in a row.
-
-     6) Where conflicts are occasional, as expected, the post-commit transition to
-        SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
-        transaction having its transaction dependencies needlessly tracked.
-
-*/
-int
-st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
-{
-  DBUG_ENTER("atConflictPreCommit");
-
-  /*
-    Prior to committing a Slave transaction, we check whether
-    Transactional conflicts have been detected which require
-    us to retry the slave transaction
-  */
-  retry_slave_trans = false;
-  switch(trans_conflict_apply_state)
-  {
-  case SAS_NORMAL:
-  {
-    DBUG_PRINT("info", ("SAS_NORMAL"));
-    /*
-       Normal case.  Only if we defined conflict detection on a table
-       with transactional conflict detection, and saw conflicts (on any table)
-       do we go to another state
-     */
-    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
-    {
-      DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
-                          "SAS_TRACK_TRANS_DEPENDENCIES."));
-      assert(conflict_flags & SCS_OPS_DEFINED);
-      /* Transactional conflict resolution required, switch state */
-      atBeginTransConflictHandling();
-      resetPerAttemptCounters();
-      trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
-      retry_slave_trans = true;
-    }
-    break;
-  }
-  case SAS_TRACK_TRANS_DEPENDENCIES:
-  {
-    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
-
-    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
-    {
-      /*
-         Conflict on table with transactional detection
-         this pass, we have collected the details and
-         dependencies, now transition to
-         SAS_APPLY_TRANS_DEPENDENCIES and
-         reapply the epoch transaction without the
-         conflicting transactions.
-      */
-      assert(conflict_flags & SCS_OPS_DEFINED);
-      DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
-                          "SAS_APPLY_TRANS_DEPENDENCIES"));
-
-      trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
-      trans_detect_iter_count++;
-      retry_slave_trans = true;
-      break;
-    }
-    else
-    {
-      /*
-         No transactional conflicts detected this pass, lets
-         return to SAS_NORMAL state after commit for more efficient
-         application of epoch transactions
-      */
-      DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
-                          "SAS_NORMAL"));
-      atEndTransConflictHandling();
-      trans_conflict_apply_state = SAS_NORMAL;
-      break;
-    }
-  }
-  case SAS_APPLY_TRANS_DEPENDENCIES:
-  {
-    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
-    assert(conflict_flags & SCS_OPS_DEFINED);
-    /*
-       We've applied the Slave epoch transaction subject to the
-       conflict detection.  If any further transactional
-       conflicts have been observed, then we must repeat the
-       process.
-    */
-    atEndTransConflictHandling();
-    atBeginTransConflictHandling();
-    trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
-
-    if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
-    {
-      DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
-                          "TRACK_TRANS_DEPENDENCIES pass"));
-      /*
-         Further conflict observed when applying, need
-         to re-determine dependencies
-      */
-      resetPerAttemptCounters();
-      retry_slave_trans = true;
-      break;
-    }
-
-
-    DBUG_PRINT("info", ("No further conflicts detected, committing and "
-                        "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
-    /*
-       With dependencies taken into account, no further
-       conflicts detected, can now proceed to commit
-    */
-    break;
-  }
-  }
-
-  /*
-    Clear conflict flags, to ensure that we detect any new conflicts
-  */
-  conflict_flags = 0;
-
-  if (retry_slave_trans)
-  {
-    DBUG_PRINT("info", ("Requesting transaction restart"));
-    DBUG_RETURN(1);
-  }
-
-  DBUG_PRINT("info", ("Allowing commit to proceed"));
-  DBUG_RETURN(0);
-}
-
-/**
-   atEndTransConflictHandling
-
-   Called when transactional conflict handling has completed.
-*/
-void
-st_ndb_slave_state::atEndTransConflictHandling()
-{
-  DBUG_ENTER("atEndTransConflictHandling");
-  /* Release any conflict handling state */
-  if (trans_dependency_tracker)
-  {
-    current_trans_in_conflict_count =
-      trans_dependency_tracker->get_conflict_count();
-    trans_dependency_tracker = NULL;
-    free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
-  }
-  DBUG_VOID_RETURN;
-};
-
-/**
    prepare_conflict_detection
 
    This method is called during operation definition by the slave,
@@ -5199,8 +4587,7 @@ ha_ndbcluster::prepare_conflict_detectio
 */
 
 int
-handle_conflict_op_error(Thd_ndb* thd_ndb,
-                         NdbTransaction* trans,
+handle_conflict_op_error(NdbTransaction* trans,
                          const NdbError& err,
                          const NdbOperation* op)
 {
@@ -5951,92 +5338,40 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
   }
 
   if (cfn_share &&
-      cfn_share->m_ex_tab != NULL)
+      cfn_share->m_ex_tab_writer.hasTable())
   {
     NdbError err;
-    assert(err.code == 0);
-    do
+    if (cfn_share->m_ex_tab_writer.writeRow(conflict_trans,
+                                            key_rec,
+                                            ::server_id,
+                                            ndb_mi_get_master_server_id(),
+                                            g_ndb_slave_state.current_master_server_epoch,
+                                            pk_row,
+                                            err) != 0)
     {
-      /* Have exceptions table, add row to it */
-      const NDBTAB *ex_tab= cfn_share->m_ex_tab;
-
-      /* get insert op */
-      NdbOperation *ex_op= conflict_trans->getNdbOperation(ex_tab);
-      if (ex_op == NULL)
+      if (err.code != 0)
       {
-        err= conflict_trans->getNdbError();
-        break;
-      }
-      if (ex_op->insertTuple() == -1)
-      {
-        err= ex_op->getNdbError();
-        break;
-      }
-      {
-        uint32 server_id= (uint32)::server_id;
-        uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
-        uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
-        uint32 count= (uint32)++(cfn_share->m_count);
-        if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
-            ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
-            ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
-            ex_op->setValue((Uint32)3, (const char *)&(count)))
+        if (err.status == NdbError::TemporaryError)
         {
-          err= ex_op->getNdbError();
-          break;
+          /* Slave will roll back and retry entire transaction. */
+          ERR_RETURN(err);
         }
-      }
-      /* copy primary keys */
-      {
-        const int fixed_cols= 4;
-        int nkey= cfn_share->m_pk_cols;
-        int k;
-        for (k= 0; k < nkey; k++)
-        {
-          DBUG_ASSERT(pk_row != NULL);
-          const uchar* data=
-            (const uchar*) NdbDictionary::getValuePtr(key_rec,
-                                                      (const char*) pk_row,
-                                                      cfn_share->m_key_attrids[k]);
-          if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
-          {
-            err= ex_op->getNdbError();
-            break;
-          }
+        else
+        {
+          char msg[FN_REFLEN];
+          my_snprintf(msg, sizeof(msg), "%s conflict handling "
+                      "on table %s hit Ndb error %d '%s'",
+                      handling_type,
+                      table_name,
+                      err.code,
+                      err.message);
+          push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+                              ER_EXCEPTIONS_WRITE_ERROR,
+                              ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+          /* Slave will stop replication. */
+          DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
         }
       }
-    } while (0);
-
-    if (err.code != 0)
-    {
-      char msg[FN_REFLEN];
-      my_snprintf(msg, sizeof(msg), "%s conflict handling "
-                  "on table %s hit Ndb error %d '%s'",
-                  handling_type,
-                  table_name,
-                  err.code,
-                  err.message);
-
-      if (err.classification == NdbError::SchemaError)
-      {
-        /* Something up with Exceptions table schema, forget it */
-        NdbDictionary::Dictionary* dict= conflict_trans->getNdb()->getDictionary();
-        dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
-        cfn_share->m_ex_tab= NULL;
-      }
-      else if (err.status == NdbError::TemporaryError)
-      {
-        /* Slave will roll back and retry entire transaction. */
-        ERR_RETURN(err);
-      }
-      else
-      {
-        push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
-                            ER_EXCEPTIONS_WRITE_ERROR,
-                            ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
-        /* Slave will stop replication. */
-        DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
-      }
     }
   } /* if (cfn_share->m_ex_tab != NULL) */
 
@@ -8807,6 +8142,7 @@ int ndbcluster_commit(handlerton *hton,
      * an execute(NoCommit) before committing, as conflict op handling
      * is done by execute(NoCommit)
      */
+    /* TODO : Add as function */
     if (g_ndb_slave_state.conflict_flags & SCS_OPS_DEFINED)
     {
       if (thd_ndb->m_unsent_bytes)
@@ -10036,6 +9372,7 @@ int ha_ndbcluster::create(const char *na
   /* Reset database name */
   ndb->setDatabaseName(m_dbname);
 
+  /* TODO : Add as per conflict function 'virtual' */
   /* Use ndb_replication information as required */
   if (conflict_fn != NULL)
   {
@@ -12302,41 +11639,41 @@ ndbcluster_find_files(handlerton *hton,
 
   if (thd == injector_thd)
   {
-    /*
-      Don't delete anything when called from
-      the binlog thread. This is a kludge to avoid
-      that something is deleted when "Ndb schema dist"
-      uses find_files() to check for "local tables in db"
-    */
-  }
-  else
-  {
-    /*
-      Delete old files
-      (.frm files with corresponding .ndb + does not exists in NDB)
-    */
-    List_iterator_fast<char> it3(delete_list);
-    while ((file_name_str= it3++))
-    {
-      DBUG_PRINT("info", ("Deleting local files for table '%s.%s'",
-                          db, file_name_str));
-
-      // Delete the table and its related files from disk
-      Ndb_local_schema::Table local_table(thd, db, file_name_str);
-      local_table.remove_table();
-
-      // Flush the table out of ndbapi's dictionary cache
-      Ndb_table_guard ndbtab_g(ndb->getDictionary(), file_name_str);
-      ndbtab_g.invalidate();
-
-      // Flush the table from table def. cache.
-      TABLE_LIST table_list;
-      memset(&table_list, 0, sizeof(table_list));
-      table_list.db= (char*)db;
-      table_list.alias= table_list.table_name= file_name_str;
-      close_cached_tables(thd, &table_list, false, 0);
-
-      DBUG_ASSERT(!thd->is_error());
+    /*
+      Don't delete anything when called from
+      the binlog thread. This is a kludge to avoid
+      that something is deleted when "Ndb schema dist"
+      uses find_files() to check for "local tables in db"
+    */
+  }
+  else
+  {
+    /*
+      Delete old files
+      (.frm files with corresponding .ndb + does not exists in NDB)
+    */
+    List_iterator_fast<char> it3(delete_list);
+    while ((file_name_str= it3++))
+    {
+      DBUG_PRINT("info", ("Deleting local files for table '%s.%s'",
+                          db, file_name_str));
+
+      // Delete the table and its related files from disk
+      Ndb_local_schema::Table local_table(thd, db, file_name_str);
+      local_table.remove_table();
+
+      // Flush the table out of ndbapi's dictionary cache
+      Ndb_table_guard ndbtab_g(ndb->getDictionary(), file_name_str);
+      ndbtab_g.invalidate();
+
+      // Flush the table from table def. cache.
+      TABLE_LIST table_list;
+      memset(&table_list, 0, sizeof(table_list));
+      table_list.db= (char*)db;
+      table_list.alias= table_list.table_name= file_name_str;
+      close_cached_tables(thd, &table_list, false, 0);
+
+      DBUG_ASSERT(!thd->is_error());
     }
   }
 
@@ -13846,7 +13183,6 @@ NDB_SHARE *ndbcluster_get_share(const ch
   DBUG_RETURN(share);
 }
 
-
 void ndbcluster_real_free_share(NDB_SHARE **share)
 {
   DBUG_ENTER("ndbcluster_real_free_share");

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2012-01-17 14:08:16 +0000
+++ b/sql/ha_ndbcluster.h	2012-01-31 11:11:20 +0000
@@ -27,6 +27,7 @@
 #include <ndbapi/NdbApi.hpp>
 #include <ndbapi/ndbapi_limits.h>
 #include <kernel/ndb_limits.h>
+#include "ndb_conflict.h"
 
 #define NDB_IGNORE_VALUE(x) (void)x
 
@@ -110,99 +111,6 @@ public:
 #include "ndb_ndbapi_util.h"
 #include "ndb_share.h"
 
-enum enum_slave_trans_conflict_apply_state
-{
-  /* Normal with optional row-level conflict detection */
-  SAS_NORMAL,
-
-  /*
-    SAS_TRACK_TRANS_DEPENDENCIES
-    Track inter-transaction dependencies
-  */
-  SAS_TRACK_TRANS_DEPENDENCIES,
-
-  /*
-    SAS_APPLY_TRANS_DEPENDENCIES
-    Apply only non conflicting transactions
-  */
-  SAS_APPLY_TRANS_DEPENDENCIES
-};
-
-enum enum_slave_conflict_flags
-{
-  /* Conflict detection Ops defined */
-  SCS_OPS_DEFINED = 1,
-  /* Conflict detected on table with transactional resolution */
-  SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2
-};
-
-/*
-  State associated with the Slave thread
-  (From the Ndb handler's point of view)
-*/
-struct st_ndb_slave_state
-{
-  /* Counter values for current slave transaction */
-  Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
-  Uint64 current_master_server_epoch;
-  Uint64 current_max_rep_epoch;
-  uint8 conflict_flags; /* enum_slave_conflict_flags */
-    /* Transactional conflict detection */
-  Uint32 retry_trans_count;
-  Uint32 current_trans_row_conflict_count;
-  Uint32 current_trans_row_reject_count;
-  Uint32 current_trans_in_conflict_count;
-
-  /* Cumulative counter values */
-  Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
-  Uint64 max_rep_epoch;
-  Uint32 sql_run_id;
-  /* Transactional conflict detection */
-  Uint64 trans_row_conflict_count;
-  Uint64 trans_row_reject_count;
-  Uint64 trans_detect_iter_count;
-  Uint64 trans_in_conflict_count;
-  Uint64 trans_conflict_commit_count;
-
-  static const Uint32 MAX_RETRY_TRANS_COUNT = 100;
-
-  /*
-    Slave Apply State
-
-    State of Binlog application from Ndb point of view.
-  */
-  enum_slave_trans_conflict_apply_state trans_conflict_apply_state;
-
-  MEM_ROOT conflict_mem_root;
-  class DependencyTracker* trans_dependency_tracker;
-
-  /* Methods */
-  void atStartSlave();
-  int  atPrepareConflictDetection(const NdbDictionary::Table* table,
-                                  const NdbRecord* key_rec,
-                                  const uchar* row_data,
-                                  Uint64 transaction_id,
-                                  bool& handle_conflict_now);
-  int  atTransConflictDetected(Uint64 transaction_id);
-  int  atConflictPreCommit(bool& retry_slave_trans);
-
-  void atBeginTransConflictHandling();
-  void atEndTransConflictHandling();
-
-  void atTransactionCommit();
-  void atTransactionAbort();
-  void atResetSlave();
-
-  void atApplyStatusWrite(Uint32 master_server_id,
-                          Uint32 row_server_id,
-                          Uint64 row_epoch,
-                          bool is_row_server_id_local);
-
-  void resetPerAttemptCounters();
-
-  st_ndb_slave_state();
-};
-
 struct Ndb_local_table_statistics {
   int no_uncommitted_rows_count;
   ulong last_count;

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2012-01-25 17:50:29 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2012-01-31 11:11:20 +0000
@@ -4014,6 +4014,8 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   cfn_share->m_resolve_column= field_index;
   cfn_share->m_flags = flags;
 
+  /* Init Exceptions Table Writer */
+  new (&cfn_share->m_ex_tab_writer) ExceptionsTableWriter();
   {
     /* get exceptions table */
     char ex_tab_name[FN_REFLEN];
@@ -4025,71 +4027,31 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
     const NDBTAB *ex_tab= ndbtab_g.get_table();
     if (ex_tab)
     {
-      const int fixed_cols= 4;
-      bool ok=
-        ex_tab->getNoOfColumns() >= fixed_cols &&
-        ex_tab->getNoOfPrimaryKeys() == 4 &&
-        /* server id */
-        ex_tab->getColumn(0)->getType() == NDBCOL::Unsigned &&
-        ex_tab->getColumn(0)->getPrimaryKey() &&
-        /* master_server_id */
-        ex_tab->getColumn(1)->getType() == NDBCOL::Unsigned &&
-        ex_tab->getColumn(1)->getPrimaryKey() &&
-        /* master_epoch */
-        ex_tab->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
-        ex_tab->getColumn(2)->getPrimaryKey() &&
-        /* count */
-        ex_tab->getColumn(3)->getType() == NDBCOL::Unsigned &&
-        ex_tab->getColumn(3)->getPrimaryKey();
-      if (ok)
+      char msgBuf[ FN_REFLEN ];
+      const char* msg = NULL;
+      if (cfn_share->m_ex_tab_writer.init(ndbtab,
+                                          ex_tab,
+                                          msgBuf,
+                                          sizeof(msgBuf),
+                                          &msg) == 0)
       {
-        int ncol= ndbtab->getNoOfColumns();
-        int nkey= ndbtab->getNoOfPrimaryKeys();
-        int i, k;
-        for (i= k= 0; i < ncol && k < nkey; i++)
+        /* Ok */
+        /* Hold our table reference outside the table_guard scope */
+        ndbtab_g.release();
+        if (opt_ndb_extra_logging)
         {
-          const NdbDictionary::Column* col= ndbtab->getColumn(i);
-          if (col->getPrimaryKey())
-          {
-            const NdbDictionary::Column* ex_col=
-              ex_tab->getColumn(fixed_cols + k);
-            ok=
-              ex_col != NULL &&
-              col->getType() == ex_col->getType() &&
-              col->getLength() == ex_col->getLength() &&
-              col->getNullable() == ex_col->getNullable();
-            if (!ok)
-              break;
-            /*
-               Store mapping of Exception table key# to
-               orig table attrid
-            */
-            cfn_share->m_key_attrids[k]= i;
-            k++;
-          }
+          sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
+                                share->db,
+                                share->table_name,
+                                share->db,
+                                ex_tab_name);
         }
-        if (ok)
-        {
-          cfn_share->m_ex_tab= ex_tab;
-          cfn_share->m_pk_cols= nkey;
-          ndbtab_g.release();
-          if (opt_ndb_extra_logging)
-            sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
-                                  share->db,
-                                  share->table_name,
-                                  share->db,
-                                  ex_tab_name);
-        }
-        else
-          sql_print_warning("NDB Slave: exceptions table %s has wrong "
-                            "definition (column %d)",
-                            ex_tab_name, fixed_cols + k);
       }
       else
-        sql_print_warning("NDB Slave: exceptions table %s has wrong "
-                          "definition (initial %d columns)",
-                          ex_tab_name, fixed_cols);
-    }
+      {
+        sql_print_warning("%s", msg);
+      }
+    } /* if (ex_tab) */
   }
   DBUG_RETURN(0);
 }

=== added file 'sql/ndb_conflict.cc'
--- a/sql/ndb_conflict.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict.cc	2012-01-31 11:11:20 +0000
@@ -0,0 +1,805 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+#include <my_global.h> /* For config defines */
+
+#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
+/* distcheck does not compile from here... */
+
+#include "ha_ndbcluster_glue.h"
+#include "ndb_conflict.h"
+
+#ifdef HAVE_NDB_BINLOG
+
+#define NDBTAB NdbDictionary::Table
+#define NDBCOL NdbDictionary::Column
+
+int
+ExceptionsTableWriter::init(const NdbDictionary::Table* mainTable,
+                            const NdbDictionary::Table* exceptionsTable,
+                            char* msg_buf,
+                            uint msg_buf_len,
+                            const char** msg)
+{
+  const char* ex_tab_name = exceptionsTable->getName();
+  const int fixed_cols= 4;
+  bool ok=
+    exceptionsTable->getNoOfColumns() >= fixed_cols &&
+    exceptionsTable->getNoOfPrimaryKeys() == 4 &&
+    /* server id */
+    exceptionsTable->getColumn(0)->getType() == NDBCOL::Unsigned &&
+    exceptionsTable->getColumn(0)->getPrimaryKey() &&
+    /* master_server_id */
+    exceptionsTable->getColumn(1)->getType() == NDBCOL::Unsigned &&
+    exceptionsTable->getColumn(1)->getPrimaryKey() &&
+    /* master_epoch */
+    exceptionsTable->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
+    exceptionsTable->getColumn(2)->getPrimaryKey() &&
+    /* count */
+    exceptionsTable->getColumn(3)->getType() == NDBCOL::Unsigned &&
+    exceptionsTable->getColumn(3)->getPrimaryKey();
+  if (ok)
+  {
+    int ncol= mainTable->getNoOfColumns();
+    int nkey= mainTable->getNoOfPrimaryKeys();
+    int i, k;
+    for (i= k= 0; i < ncol && k < nkey; i++)
+    {
+      const NdbDictionary::Column* col= mainTable->getColumn(i);
+      if (col->getPrimaryKey())
+      {
+        const NdbDictionary::Column* ex_col=
+          exceptionsTable->getColumn(fixed_cols + k);
+        ok=
+          ex_col != NULL &&
+          col->getType() == ex_col->getType() &&
+          col->getLength() == ex_col->getLength() &&
+          col->getNullable() == ex_col->getNullable();
+        if (!ok)
+          break;
+        /*
+          Store mapping of Exception table key# to
+          orig table attrid
+        */
+        m_key_attrids[k]= i;
+        k++;
+      }
+    }
+    if (ok)
+    {
+      m_ex_tab= exceptionsTable;
+      m_pk_cols= nkey;
+      return 0;
+    }
+    else
+      my_snprintf(msg_buf, msg_buf_len,
+                  "NDB Slave: exceptions table %s has wrong "
+                  "definition (column %d)",
+                  ex_tab_name, fixed_cols + k);
+  }
+  else
+    my_snprintf(msg_buf, msg_buf_len,
+                "NDB Slave: exceptions table %s has wrong "
+                "definition (initial %d columns)",
+                ex_tab_name, fixed_cols);
+
+  *msg = msg_buf;
+  return -1;
+}
+
+void
+ExceptionsTableWriter::free(Ndb* ndb)
+{
+  if (m_ex_tab)
+  {
+    NdbDictionary::Dictionary* dict = ndb->getDictionary();
+    dict->removeTableGlobal(*m_ex_tab, 0);
+    m_ex_tab= 0;
+  }
+}
+
+int
+ExceptionsTableWriter::writeRow(NdbTransaction* trans,
+                                const NdbRecord* keyRecord,
+                                uint32 server_id,
+                                uint32 master_server_id,
+                                uint64 master_epoch,
+                                const uchar* rowPtr,
+                                NdbError& err)
+{
+  DBUG_ENTER("ExceptionsTableWriter::writeRow");
+  assert(err.code == 0);
+  do
+  {
+    /* Have exceptions table, add row to it */
+    const NDBTAB *ex_tab= m_ex_tab;
+
+    /* get insert op */
+    NdbOperation *ex_op= trans->getNdbOperation(ex_tab);
+    if (ex_op == NULL)
+    {
+      err= trans->getNdbError();
+      break;
+    }
+    if (ex_op->insertTuple() == -1)
+    {
+      err= ex_op->getNdbError();
+      break;
+    }
+    {
+      uint32 count= (uint32)++m_count;
+      if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
+          ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
+          ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
+          ex_op->setValue((Uint32)3, (const char *)&(count)))
+      {
+        err= ex_op->getNdbError();
+        break;
+      }
+    }
+    /* copy primary keys */
+    {
+      const int fixed_cols= 4;
+      int nkey= m_pk_cols;
+      int k;
+      for (k= 0; k < nkey; k++)
+      {
+        DBUG_ASSERT(rowPtr != NULL);
+        const uchar* data=
+          (const uchar*) NdbDictionary::getValuePtr(keyRecord,
+                                                    (const char*) rowPtr,
+                                                    m_key_attrids[k]);
+        if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
+        {
+          err= ex_op->getNdbError();
+          break;
+        }
+      }
+    }
+  } while (0);
+
+  if (err.code != 0)
+  {
+    if (err.classification == NdbError::SchemaError)
+    {
+      /* Something up with Exceptions table schema, forget it.
+       * No further exceptions will be recorded.
+       * TODO : Log this somehow
+       */
+      NdbDictionary::Dictionary* dict= trans->getNdb()->getDictionary();
+      dict->removeTableGlobal(*m_ex_tab, false);
+      m_ex_tab= NULL;
+      DBUG_RETURN(0);
+    }
+    DBUG_RETURN(-1);
+  }
+  DBUG_RETURN(0);
+}
+
+/* HAVE_NDB_BINLOG */
+#endif
+
+/**
+   st_ndb_slave_state constructor
+
+   Initialise Ndb Slave state object
+*/
+st_ndb_slave_state::st_ndb_slave_state()
+  : current_master_server_epoch(0),
+    current_max_rep_epoch(0),
+    conflict_flags(0),
+    retry_trans_count(0),
+    current_trans_row_conflict_count(0),
+    current_trans_row_reject_count(0),
+    current_trans_in_conflict_count(0),
+    max_rep_epoch(0),
+    sql_run_id(~Uint32(0)),
+    trans_row_conflict_count(0),
+    trans_row_reject_count(0),
+    trans_detect_iter_count(0),
+    trans_in_conflict_count(0),
+    trans_conflict_commit_count(0),
+    trans_conflict_apply_state(SAS_NORMAL),
+    trans_dependency_tracker(NULL)
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  memset(total_violation_count, 0, sizeof(total_violation_count));
+
+  /* Init conflict handling state memroot */
+  const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
+  init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
+};
+
+/**
+   resetPerAttemptCounters
+
+   Reset the per-epoch-transaction-application-attempt counters
+*/
+void
+st_ndb_slave_state::resetPerAttemptCounters()
+{
+  memset(current_violation_count, 0, sizeof(current_violation_count));
+  current_trans_row_conflict_count = 0;
+  current_trans_row_reject_count = 0;
+  current_trans_in_conflict_count = 0;
+
+  conflict_flags = 0;
+  current_max_rep_epoch = 0;
+}
+
+/**
+   atTransactionAbort()
+
+   Called by Slave SQL thread during transaction abort.
+*/
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+  /* Reset current-transaction counters + state */
+  resetPerAttemptCounters();
+}
+
+
+
+/**
+   atTransactionCommit()
+
+   Called by Slave SQL thread after transaction commit
+*/
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+  assert( ((trans_dependency_tracker == NULL) &&
+           (trans_conflict_apply_state == SAS_NORMAL)) ||
+          ((trans_dependency_tracker != NULL) &&
+           (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
+  assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
+
+  /* Merge committed transaction counters into total state
+   * Then reset current transaction counters
+   */
+  for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+  {
+    total_violation_count[i]+= current_violation_count[i];
+  }
+  trans_row_conflict_count+= current_trans_row_conflict_count;
+  trans_row_reject_count+= current_trans_row_reject_count;
+  trans_in_conflict_count+= current_trans_in_conflict_count;
+
+  if (current_trans_in_conflict_count)
+    trans_conflict_commit_count++;
+
+  if (current_max_rep_epoch > max_rep_epoch)
+  {
+    DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
+                        max_rep_epoch,
+                        current_max_rep_epoch));
+    max_rep_epoch = current_max_rep_epoch;
+  }
+
+  resetPerAttemptCounters();
+
+  /* Clear per-epoch-transaction retry_trans_count */
+  retry_trans_count = 0;
+}
+
+/**
+   atApplyStatusWrite
+
+   Called by Slave SQL thread when applying an event to the
+   ndb_apply_status table
+*/
+void
+st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
+                                       Uint32 row_server_id,
+                                       Uint64 row_epoch,
+                                       bool is_row_server_id_local)
+{
+  if (row_server_id == master_server_id)
+  {
+    /*
+       WRITE_ROW to ndb_apply_status injected by MySQLD
+       immediately upstream of us.
+       Record epoch
+    */
+    current_master_server_epoch = row_epoch;
+    assert(! is_row_server_id_local);
+  }
+  else if (is_row_server_id_local)
+  {
+    DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
+                        " which is %s.",
+                        row_server_id, row_epoch,
+                        (row_epoch > current_max_rep_epoch)?
+                        " new highest." : " older than previously applied"));
+    if (row_epoch > current_max_rep_epoch)
+    {
+      /*
+        Store new highest epoch in thdvar.  If we commit successfully
+        then this can become the new global max
+      */
+      current_max_rep_epoch = row_epoch;
+    }
+  }
+}
+
+/**
+   atResetSlave()
+
+   Called when RESET SLAVE command issued - in context of command client.
+*/
+void
+st_ndb_slave_state::atResetSlave()
+{
+  /* Reset the Maximum replicated epoch vars
+   * on slave reset
+   * No need to touch the sql_run_id as that
+   * will increment if the slave is started
+   * again.
+   */
+  resetPerAttemptCounters();
+
+  retry_trans_count = 0;
+  max_rep_epoch = 0;
+}
+
+
+/**
+   atStartSlave()
+
+   Called by Slave SQL thread when first applying a row to Ndb after
+   a START SLAVE command.
+*/
+void
+st_ndb_slave_state::atStartSlave()
+{
+#ifdef HAVE_NDB_BINLOG
+  if (trans_conflict_apply_state != SAS_NORMAL)
+  {
+    /*
+      Remove conflict handling state on a SQL thread
+      restart
+    */
+    atEndTransConflictHandling();
+    trans_conflict_apply_state = SAS_NORMAL;
+  }
+#endif
+};
+
+#ifdef HAVE_NDB_BINLOG
+
+/**
+   atEndTransConflictHandling
+
+   Called when transactional conflict handling has completed.
+*/
+void
+st_ndb_slave_state::atEndTransConflictHandling()
+{
+  DBUG_ENTER("atEndTransConflictHandling");
+  /* Release any conflict handling state */
+  if (trans_dependency_tracker)
+  {
+    current_trans_in_conflict_count =
+      trans_dependency_tracker->get_conflict_count();
+    trans_dependency_tracker = NULL;
+    free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
+  }
+  DBUG_VOID_RETURN;
+};
+
+/**
+   atBeginTransConflictHandling()
+
+   Called by Slave SQL thread when it determines that Transactional
+   Conflict handling is required
+*/
+void
+st_ndb_slave_state::atBeginTransConflictHandling()
+{
+  DBUG_ENTER("atBeginTransConflictHandling");
+  /*
+     Allocate and initialise Transactional Conflict
+     Resolution Handling Structures
+  */
+  assert(trans_dependency_tracker == NULL);
+  trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
+  DBUG_VOID_RETURN;
+};
+
+/**
+   atPrepareConflictDetection
+
+   Called by Slave SQL thread prior to defining an operation on
+   a table with conflict detection defined.
+*/
+int
+st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
+                                               const NdbRecord* key_rec,
+                                               const uchar* row_data,
+                                               Uint64 transaction_id,
+                                               bool& handle_conflict_now)
+{
+  DBUG_ENTER("atPrepareConflictDetection");
+  /*
+    Slave is preparing to apply an operation with conflict detection.
+    If we're performing Transactional Conflict Resolution, take
+    extra steps
+  */
+  switch( trans_conflict_apply_state )
+  {
+  case SAS_NORMAL:
+    DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
+    /* No special handling */
+    break;
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
+    /*
+      Track this operation and its transaction id, to determine
+      inter-transaction dependencies by {table, primary key}
+    */
+    assert( trans_dependency_tracker );
+
+    int res = trans_dependency_tracker
+      ->track_operation(table,
+                        key_rec,
+                        row_data,
+                        transaction_id);
+    if (res != 0)
+    {
+      sql_print_error("%s", trans_dependency_tracker->get_error_text());
+      DBUG_RETURN(res);
+    }
+    /* Proceed as normal */
+    break;
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
+    /*
+       Check if this operation's transaction id is marked in-conflict.
+       If it is, we tell the caller to perform conflict resolution now instead
+       of attempting to apply the operation.
+    */
+    assert( trans_dependency_tracker );
+
+    if (trans_dependency_tracker->in_conflict(transaction_id))
+    {
+      DBUG_PRINT("info", ("Event for transaction %llu is conflicting.  Handling.",
+                          transaction_id));
+      current_trans_row_reject_count++;
+      handle_conflict_now = true;
+      DBUG_RETURN(0);
+    }
+
+    /*
+       This transaction is not marked in-conflict, so continue with normal
+       processing.
+       Note that normal processing may subsequently detect a conflict which
+       didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
+       In this case, we will rollback and repeat the TRACK_DEPENDENCIES
+       stage.
+    */
+    DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
+                        transaction_id));
+    break;
+  }
+  }
+  DBUG_RETURN(0);
+}
+
+/**
+   atTransConflictDetected
+
+   Called by the Slave SQL thread when a conflict is detected on
+   an executed operation.
+*/
+int
+st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
+{
+  DBUG_ENTER("atTransConflictDetected");
+
+  /*
+     The Slave has detected a conflict on an operation applied
+     to a table with Transactional Conflict Resolution defined.
+     Handle according to current state.
+  */
+  conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
+  current_trans_row_conflict_count++;
+
+  switch (trans_conflict_apply_state)
+  {
+  case SAS_NORMAL:
+  {
+    DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
+                        "Requires multi-pass resolution.  Will transition to "
+                        "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
+    /*
+      Conflict on table with transactional conflict resolution
+      defined.
+      This is the trigger that we will do transactional conflict
+      resolution.
+      Record that we need to do multiple passes to correctly
+      perform resolution.
+      TODO : Early exit from applying epoch?
+    */
+    break;
+  }
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
+                        "had conflict",
+                        transaction_id));
+    /*
+       Conflict on table with transactional conflict resolution
+       defined.
+       We will mark the operation's transaction_id as in-conflict,
+       so that any other operations on the transaction are also
+       considered in-conflict, and any dependent transactions are also
+       considered in-conflict.
+    */
+    assert(trans_dependency_tracker != NULL);
+    int res = trans_dependency_tracker
+      ->mark_conflict(transaction_id);
+
+    if (res != 0)
+    {
+      sql_print_error("%s", trans_dependency_tracker->get_error_text());
+      DBUG_RETURN(res);
+    }
+    break;
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    /*
+       This must be a new conflict, not noticed on the previous
+       pass.
+    */
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected.  "
+                        "Must be further conflict.  Will return to "
+                        "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
+    // TODO : Early exit from applying epoch
+    break;
+  }
+  default:
+    break;
+  }
+
+  DBUG_RETURN(0);
+}
+
+/**
+   atConflictPreCommit
+
+   Called by the Slave SQL thread prior to committing a Slave transaction.
+   This method can request that the Slave transaction is retried.
+
+
+   State transitions :
+
+                       START SLAVE /
+                       RESET SLAVE /
+                        STARTUP
+                            |
+                            |
+                            v
+                    ****************
+                    *  SAS_NORMAL  *
+                    ****************
+                       ^       |
+    No transactional   |       | Conflict on transactional table
+       conflicts       |       | (Rollback)
+       (Commit)        |       |
+                       |       v
+            **********************************
+            *  SAS_TRACK_TRANS_DEPENDENCIES  *
+            **********************************
+               ^          I              ^
+     More      I          I Dependencies |
+    conflicts  I          I determined   | No new conflicts
+     found     I          I (Rollback)   | (Commit)
+    (Rollback) I          I              |
+               I          v              |
+           **********************************
+           *  SAS_APPLY_TRANS_DEPENDENCIES  *
+           **********************************
+
+
+   Operation
+     The initial state is SAS_NORMAL.
+
+     On detecting a conflict on a transactional conflict detetecing table,
+     SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
+     rolled back and reapplied.
+
+     In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
+     conflicts are tracked as the epoch transaction is applied.
+
+     Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
+     the epoch transaction is rolled back and reapplied.
+
+     In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
+     marked as in-conflict are not applied.
+
+     If this results in no new conflicts, the epoch transaction is committed,
+     and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
+     the next replicated epch transaction.
+     If it results in new conflicts, the epoch transactions is rolled back, and
+     the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
+     the new set of dependencies.
+
+     If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
+     the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
+     state.
+
+
+   Properties
+     1) Normally, there is no transaction dependency tracking overhead paid by
+        the slave.
+
+     2) On first detecting a transactional conflict, the epoch transaction must be
+        applied at least three times, with two rollbacks.
+
+     3) Transactional conflicts detected in subsequent epochs require the epoch
+        transaction to be applied two times, with one rollback.
+
+     4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
+        DEPENDENCIES occurs when further transactional conflicts are discovered
+        in SAS_APPLY_TRANS_DEPENDENCIES state.  This implies that the  conflicts
+        discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
+        so we revisit that state to get a more complete picture.
+
+     5) The number of iterations of this loop is fixed to a hard coded limit, after
+        which the Slave will stop with an error.  This should be an unlikely
+        occurrence, as it requires not just n conflicts, but at least 1 new conflict
+        appearing between the transactions in the epoch transaction and the
+        database between the two states, n times in a row.
+
+     6) Where conflicts are occasional, as expected, the post-commit transition to
+        SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
+        transaction having its transaction dependencies needlessly tracked.
+
+*/
+int
+st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
+{
+  DBUG_ENTER("atConflictPreCommit");
+
+  /*
+    Prior to committing a Slave transaction, we check whether
+    Transactional conflicts have been detected which require
+    us to retry the slave transaction
+  */
+  retry_slave_trans = false;
+  switch(trans_conflict_apply_state)
+  {
+  case SAS_NORMAL:
+  {
+    DBUG_PRINT("info", ("SAS_NORMAL"));
+    /*
+       Normal case.  Only if we defined conflict detection on a table
+       with transactional conflict detection, and saw conflicts (on any table)
+       do we go to another state
+     */
+    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+    {
+      DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
+                          "SAS_TRACK_TRANS_DEPENDENCIES."));
+      assert(conflict_flags & SCS_OPS_DEFINED);
+      /* Transactional conflict resolution required, switch state */
+      atBeginTransConflictHandling();
+      resetPerAttemptCounters();
+      trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+      retry_slave_trans = true;
+    }
+    break;
+  }
+  case SAS_TRACK_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
+
+    if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+    {
+      /*
+         Conflict on table with transactional detection
+         this pass, we have collected the details and
+         dependencies, now transition to
+         SAS_APPLY_TRANS_DEPENDENCIES and
+         reapply the epoch transaction without the
+         conflicting transactions.
+      */
+      assert(conflict_flags & SCS_OPS_DEFINED);
+      DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
+                          "SAS_APPLY_TRANS_DEPENDENCIES"));
+
+      trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
+      trans_detect_iter_count++;
+      retry_slave_trans = true;
+      break;
+    }
+    else
+    {
+      /*
+         No transactional conflicts detected this pass, lets
+         return to SAS_NORMAL state after commit for more efficient
+         application of epoch transactions
+      */
+      DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
+                          "SAS_NORMAL"));
+      atEndTransConflictHandling();
+      trans_conflict_apply_state = SAS_NORMAL;
+      break;
+    }
+  }
+  case SAS_APPLY_TRANS_DEPENDENCIES:
+  {
+    DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
+    assert(conflict_flags & SCS_OPS_DEFINED);
+    /*
+       We've applied the Slave epoch transaction subject to the
+       conflict detection.  If any further transactional
+       conflicts have been observed, then we must repeat the
+       process.
+    */
+    atEndTransConflictHandling();
+    atBeginTransConflictHandling();
+    trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+
+    if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
+    {
+      DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
+                          "TRACK_TRANS_DEPENDENCIES pass"));
+      /*
+         Further conflict observed when applying, need
+         to re-determine dependencies
+      */
+      resetPerAttemptCounters();
+      retry_slave_trans = true;
+      break;
+    }
+
+
+    DBUG_PRINT("info", ("No further conflicts detected, committing and "
+                        "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
+    /*
+       With dependencies taken into account, no further
+       conflicts detected, can now proceed to commit
+    */
+    break;
+  }
+  }
+
+  /*
+    Clear conflict flags, to ensure that we detect any new conflicts
+  */
+  conflict_flags = 0;
+
+  if (retry_slave_trans)
+  {
+    DBUG_PRINT("info", ("Requesting transaction restart"));
+    DBUG_RETURN(1);
+  }
+
+  DBUG_PRINT("info", ("Allowing commit to proceed"));
+  DBUG_RETURN(0);
+}
+
+/* HAVE_NDB_BINLOG */
+#endif
+
+/* WITH_NDBCLUSTER_STORAGE_ENGINE */
+#endif

=== added file 'sql/ndb_conflict.h'
--- a/sql/ndb_conflict.h	1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict.h	2012-01-31 11:11:20 +0000
@@ -0,0 +1,315 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_CONFLICT_H
+#define NDB_CONFLICT_H
+
+#include "ndb_conflict_trans.h"
+#include <ndbapi/NdbDictionary.hpp>
+#include <ndbapi/NdbTransaction.hpp>
+
+enum enum_conflict_fn_type
+{
+  CFT_NDB_UNDEF = 0
+  ,CFT_NDB_MAX
+  ,CFT_NDB_OLD
+  ,CFT_NDB_MAX_DEL_WIN
+  ,CFT_NDB_EPOCH
+  ,CFT_NDB_EPOCH_TRANS
+  ,CFT_NUMBER_OF_CFTS /* End marker */
+};
+
+#ifdef HAVE_NDB_BINLOG
+static const Uint32 MAX_CONFLICT_ARGS= 8;
+
+enum enum_conflict_fn_arg_type
+{
+  CFAT_END
+  ,CFAT_COLUMN_NAME
+  ,CFAT_EXTRA_GCI_BITS
+};
+
+struct st_conflict_fn_arg
+{
+  enum_conflict_fn_arg_type type;
+  union
+  {
+    char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
+    uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
+  };
+};
+
+struct st_conflict_fn_arg_def
+{
+  enum enum_conflict_fn_arg_type arg_type;
+  bool optional;
+};
+
+/* What type of operation was issued */
+enum enum_conflicting_op_type
+{                /* NdbApi          */
+  WRITE_ROW,     /* insert (!write) */
+  UPDATE_ROW,    /* update          */
+  DELETE_ROW,    /* delete          */
+  REFRESH_ROW    /* refresh         */
+};
+
+/*
+  Room for 10 instruction words, two labels (@ 2words/label)
+  + 2 extra words for the case of resolve_size == 8
+*/
+#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16
+
+/*
+  prepare_detect_func
+
+  Type of function used to prepare for conflict detection on
+  an NdbApi operation
+*/
+typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
+                                     enum_conflicting_op_type op_type,
+                                     const NdbRecord* data_record,
+                                     const uchar* old_data,
+                                     const uchar* new_data,
+                                     const MY_BITMAP* write_set,
+                                     class NdbInterpretedCode* code);
+
+enum enum_conflict_fn_flags
+{
+  CF_TRANSACTIONAL = 1
+};
+
+struct st_conflict_fn_def
+{
+  const char *name;
+  enum_conflict_fn_type type;
+  const st_conflict_fn_arg_def* arg_defs;
+  prepare_detect_func prep_func;
+  uint8 flags; /* enum_conflict_fn_flags */
+};
+
+/* What sort of conflict was found */
+enum enum_conflict_cause
+{
+  ROW_ALREADY_EXISTS,   /* On insert */
+  ROW_DOES_NOT_EXIST,   /* On Update, Delete */
+  ROW_IN_CONFLICT,      /* On Update, Delete */
+  TRANS_IN_CONFLICT     /* Any of above, or implied by transaction */
+};
+
+/* NdbOperation custom data which points out handler and record. */
+struct Ndb_exceptions_data {
+  struct NDB_SHARE* share;
+  const NdbRecord* key_rec;
+  const uchar* row;
+  enum_conflicting_op_type op_type;
+  Uint64 trans_id;
+};
+
+enum enum_conflict_fn_table_flags
+{
+  CFF_NONE         = 0,
+  CFF_REFRESH_ROWS = 1
+};
+
+/*
+   Maximum supported key parts (16)
+   (Ndb supports 32, but MySQL has a lower limit)
+*/
+static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
+
+/**
+   ExceptionsTableWriter
+
+   Helper class for inserting entries into an exceptions
+   table
+*/
+class ExceptionsTableWriter
+{
+public:
+  ExceptionsTableWriter()
+    :m_pk_cols(0), m_ex_tab(NULL), m_count(0)
+  {};
+
+  ~ExceptionsTableWriter()
+  {};
+
+  /**
+     hasTable
+
+     Returns true if there is an Exceptions table
+  */
+  bool hasTable() const
+  {
+    return m_ex_tab != NULL;
+  };
+
+  /**
+    init
+
+    Initialise ExceptionsTableWriter with main and exceptions
+    tables.
+
+    May set a warning message on success or error.
+  */
+  int init(const NdbDictionary::Table* mainTable,
+           const NdbDictionary::Table* exceptionsTable,
+           char* msg_buf,
+           uint msg_buf_len,
+           const char** msg);
+
+  /**
+     free
+
+     Release reference to exceptions table
+  */
+  void free(Ndb* ndb);
+
+  /**
+     writeRow
+
+     Write a row to the Exceptions Table for the given
+     key
+  */
+  int writeRow(NdbTransaction* trans,
+               const NdbRecord* keyRecord,
+               uint32 server_id,
+               uint32 master_server_id,
+               uint64 master_epoch,
+               const uchar* rowPtr,
+               NdbError& err);
+
+private:
+  /* info about original table */
+  uint8 m_pk_cols;
+  uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
+
+  const NdbDictionary::Table *m_ex_tab;
+  uint32 m_count;
+};
+
+typedef struct st_ndbcluster_conflict_fn_share {
+  const st_conflict_fn_def* m_conflict_fn;
+
+  /* info about original table */
+  uint16 m_resolve_column;
+  uint8 m_resolve_size;
+  uint8 m_flags;
+
+  ExceptionsTableWriter m_ex_tab_writer;
+} NDB_CONFLICT_FN_SHARE;
+
+
+/* HAVE_NDB_BINLOG */
+#endif
+
+enum enum_slave_trans_conflict_apply_state
+{
+  /* Normal with optional row-level conflict detection */
+  SAS_NORMAL,
+
+  /*
+    SAS_TRACK_TRANS_DEPENDENCIES
+    Track inter-transaction dependencies
+  */
+  SAS_TRACK_TRANS_DEPENDENCIES,
+
+  /*
+    SAS_APPLY_TRANS_DEPENDENCIES
+    Apply only non conflicting transactions
+  */
+  SAS_APPLY_TRANS_DEPENDENCIES
+};
+
+enum enum_slave_conflict_flags
+{
+  /* Conflict detection Ops defined */
+  SCS_OPS_DEFINED = 1,
+  /* Conflict detected on table with transactional resolution */
+  SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2
+};
+
+/*
+  State associated with the Slave thread
+  (From the Ndb handler's point of view)
+*/
+struct st_ndb_slave_state
+{
+  /* Counter values for current slave transaction */
+  Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
+  Uint64 current_master_server_epoch;
+  Uint64 current_max_rep_epoch;
+  uint8 conflict_flags; /* enum_slave_conflict_flags */
+    /* Transactional conflict detection */
+  Uint32 retry_trans_count;
+  Uint32 current_trans_row_conflict_count;
+  Uint32 current_trans_row_reject_count;
+  Uint32 current_trans_in_conflict_count;
+
+  /* Cumulative counter values */
+  Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
+  Uint64 max_rep_epoch;
+  Uint32 sql_run_id;
+  /* Transactional conflict detection */
+  Uint64 trans_row_conflict_count;
+  Uint64 trans_row_reject_count;
+  Uint64 trans_detect_iter_count;
+  Uint64 trans_in_conflict_count;
+  Uint64 trans_conflict_commit_count;
+
+  static const Uint32 MAX_RETRY_TRANS_COUNT = 100;
+
+  /*
+    Slave Apply State
+
+    State of Binlog application from Ndb point of view.
+  */
+  enum_slave_trans_conflict_apply_state trans_conflict_apply_state;
+
+  MEM_ROOT conflict_mem_root;
+  class DependencyTracker* trans_dependency_tracker;
+
+  /* Methods */
+  void atStartSlave();
+  int  atPrepareConflictDetection(const NdbDictionary::Table* table,
+                                  const NdbRecord* key_rec,
+                                  const uchar* row_data,
+                                  Uint64 transaction_id,
+                                  bool& handle_conflict_now);
+  int  atTransConflictDetected(Uint64 transaction_id);
+  int  atConflictPreCommit(bool& retry_slave_trans);
+
+  void atBeginTransConflictHandling();
+  void atEndTransConflictHandling();
+
+  void atTransactionCommit();
+  void atTransactionAbort();
+  void atResetSlave();
+
+  void atApplyStatusWrite(Uint32 master_server_id,
+                          Uint32 row_server_id,
+                          Uint64 row_epoch,
+                          bool is_row_server_id_local);
+
+  void resetPerAttemptCounters();
+
+  st_ndb_slave_state();
+};
+
+
+/* NDB_CONFLICT_H */
+#endif

=== modified file 'sql/ndb_share.cc'
--- a/sql/ndb_share.cc	2011-11-10 08:21:36 +0000
+++ b/sql/ndb_share.cc	2012-01-31 11:11:20 +0000
@@ -33,11 +33,11 @@ NDB_SHARE::destroy(NDB_SHARE* share)
   pthread_mutex_destroy(&share->mutex);
 
 #ifdef HAVE_NDB_BINLOG
-  if (share->m_cfn_share && share->m_cfn_share->m_ex_tab && g_ndb)
+  if (share->m_cfn_share && 
+      share->m_cfn_share->m_ex_tab_writer.hasTable() && 
+      g_ndb)
   {
-    NdbDictionary::Dictionary *dict= g_ndb->getDictionary();
-    dict->removeTableGlobal(*(share->m_cfn_share->m_ex_tab), 0);
-    share->m_cfn_share->m_ex_tab= 0;
+    share->m_cfn_share->m_ex_tab_writer.free(g_ndb);
   }
 #endif
   share->new_op= 0;

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	2012-01-25 17:50:29 +0000
+++ b/sql/ndb_share.h	2012-01-31 11:11:20 +0000
@@ -26,6 +26,7 @@
 #include <sql_const.h>       // MAX_REF_PARTS
 
 #include <ndbapi/Ndb.hpp>    // Ndb::TupleIdRange
+#include "ndb_conflict.h"
 
 enum NDB_SHARE_STATE {
   NSS_INITIAL= 0,
@@ -33,18 +34,6 @@ enum NDB_SHARE_STATE {
   NSS_ALTERED 
 };
 
-
-enum enum_conflict_fn_type
-{
-  CFT_NDB_UNDEF = 0
-  ,CFT_NDB_MAX
-  ,CFT_NDB_OLD
-  ,CFT_NDB_MAX_DEL_WIN
-  ,CFT_NDB_EPOCH
-  ,CFT_NDB_EPOCH_TRANS
-  ,CFT_NUMBER_OF_CFTS /* End marker */
-};
-
 #ifdef HAVE_NDB_BINLOG
 enum Ndb_binlog_type
 {
@@ -56,112 +45,6 @@ enum Ndb_binlog_type
   ,NBT_UPDATED_ONLY_USE_UPDATE  = NBT_UPDATED_ONLY | NBT_USE_UPDATE
   ,NBT_FULL_USE_UPDATE          = NBT_FULL         | NBT_USE_UPDATE
 };
-
-static const Uint32 MAX_CONFLICT_ARGS= 8;
-
-enum enum_conflict_fn_arg_type
-{
-  CFAT_END
-  ,CFAT_COLUMN_NAME
-  ,CFAT_EXTRA_GCI_BITS
-};
-
-struct st_conflict_fn_arg
-{
-  enum_conflict_fn_arg_type type;
-  union
-  {
-    char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
-    uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
-  };
-};
-
-struct st_conflict_fn_arg_def
-{
-  enum enum_conflict_fn_arg_type arg_type;
-  bool optional;
-};
-
-/* What type of operation was issued */
-enum enum_conflicting_op_type
-{                /* NdbApi          */
-  WRITE_ROW,     /* insert (!write) */
-  UPDATE_ROW,    /* update          */
-  DELETE_ROW,    /* delete          */
-  REFRESH_ROW    /* refresh         */
-};
-
-/*
-  prepare_detect_func
-
-  Type of function used to prepare for conflict detection on
-  an NdbApi operation
-*/
-typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
-                                     enum_conflicting_op_type op_type,
-                                     const NdbRecord* data_record,
-                                     const uchar* old_data,
-                                     const uchar* new_data,
-                                     const MY_BITMAP* write_set,
-                                     class NdbInterpretedCode* code);
-
-enum enum_conflict_fn_flags
-{
-  CF_TRANSACTIONAL = 1
-};
-
-struct st_conflict_fn_def
-{
-  const char *name;
-  enum_conflict_fn_type type;
-  const st_conflict_fn_arg_def* arg_defs;
-  prepare_detect_func prep_func;
-  uint8 flags; /* enum_conflict_fn_flags */
-};
-
-/* What sort of conflict was found */
-enum enum_conflict_cause
-{
-  ROW_ALREADY_EXISTS,   /* On insert */
-  ROW_DOES_NOT_EXIST,   /* On Update, Delete */
-  ROW_IN_CONFLICT,      /* On Update, Delete */
-  TRANS_IN_CONFLICT     /* Any of above, or implied by transaction */
-};
-
-/* NdbOperation custom data which points out handler and record. */
-struct Ndb_exceptions_data {
-  struct NDB_SHARE* share;
-  const NdbRecord* key_rec;
-  const uchar* row;
-  enum_conflicting_op_type op_type;
-  Uint64 trans_id;
-};
-
-enum enum_conflict_fn_table_flags
-{
-  CFF_NONE         = 0,
-  CFF_REFRESH_ROWS = 1
-};
-
-/*
-   Maximum supported key parts (16)
-   (Ndb supports 32, but MySQL has a lower limit)
-*/
-static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
-
-typedef struct st_ndbcluster_conflict_fn_share {
-  const st_conflict_fn_def* m_conflict_fn;
-
-  /* info about original table */
-  uint8 m_pk_cols;
-  uint16 m_resolve_column;
-  uint8 m_resolve_size;
-  uint8 m_flags;
-  uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
-
-  const NdbDictionary::Table *m_ex_tab;
-  uint32 m_count;
-} NDB_CONFLICT_FN_SHARE;
 #endif
 
 

=== modified file 'sql/ndb_thd.cc'
--- a/sql/ndb_thd.cc	2012-01-17 13:59:49 +0000
+++ b/sql/ndb_thd.cc	2012-01-31 11:11:20 +0000
@@ -19,6 +19,7 @@
 #define MYSQL_SERVER
 #endif
 
+#include "ha_ndbcluster_glue.h"
 #include "ndb_thd.h"
 #include "ndb_thd_ndb.h"
 

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2012-01-25 17:50:29 +0000
+++ b/storage/ndb/CMakeLists.txt	2012-01-31 11:11:20 +0000
@@ -89,6 +89,7 @@ SET(NDBCLUSTER_SOURCES
   ../../sql/ndb_component.cc
   ../../sql/ndb_local_schema.cc
   ../../sql/ndb_repl_tab.cc
+  ../../sql/ndb_conflict.cc
 )
 
 # Include directories used when building ha_ndbcluster

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (frazer.clement:3799 to 3800) Frazer Clement31 Jan