4450 Frazer Clement 2012-01-31 [merge]
Merge 7.0->7.1
added:
sql/ndb_conflict.cc
sql/ndb_conflict.h
modified:
libmysqld/Makefile.am
mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test
sql/Makefile.am
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
storage/ndb/CMakeLists.txt
4449 jonas oreland 2012-01-31 [merge]
ndb - merge 70 to 71
modified:
storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am 2011-11-10 14:23:53 +0000
+++ b/libmysqld/Makefile.am 2012-01-31 10:24:30 +0000
@@ -50,6 +50,7 @@ sqlsources = derror.cc field.cc field_co
ha_ndbcluster_connection.cc ha_ndbinfo.cc \
ha_ndb_index_stat.cc \
ha_ndbcluster_binlog.cc ndb_conflict_trans.cc ndb_component.cc \
+ ndb_conflict.cc \
ha_partition.cc \
handler.cc sql_handler.cc \
hostname.cc init.cc password.c \
@@ -131,6 +132,9 @@ ha_ndbinfo.o: ha_ndbinfo.cc
ndb_conflict_trans.o: ndb_conflict_trans.cc
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+ndb_conflict.o: ndb_conflict.cc
+ $(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+
# Until we can remove dependency on ha_ndbcluster.h
handler.o: handler.cc
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result 2012-01-23 15:47:46 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result 2012-01-31 10:01:22 +0000
@@ -274,6 +274,142 @@ relevant
[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
+delete from mysql.ndb_replication;
+Test exceptions table schema flexibility
+insert into mysql.ndb_replication values ("test", "t1", 0, 7, "NDB$MAX(X)");
+Test 'normal' mandatory column names + all table pks
+create table test.t1$EX(
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+b int not null,
+c int not null,
+primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1);
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+server_id master_server_id master_epoch count a b c
+2 1 <epoch_num> 1 1 1 1
+drop table test.t1;
+drop table test.t1$EX;
+Test 'normal' mandatory column names + all table pks +
+extra columns with same and different names to main table columns
+Also a defaulted extra column.
+create table test.t1$EX(
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+b int not null,
+c int not null,
+d int, # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1);
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+server_id master_server_id master_epoch count a b c d lilljeholmen
+2 1 <epoch_num> 1 1 1 1 NULL Slussen
+drop table test.t1;
+drop table test.t1$EX;
+Test unusual mandatory column names + all table pks +
+extra columns with same and different names to main table columns
+Also a defaulted extra column.
+create table test.t1$EX(
+monteverdi int unsigned,
+asparagi int unsigned,
+plenipotentiary bigint unsigned,
+mountebank int unsigned,
+a int not null,
+b int not null,
+c int not null,
+d int, # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1);
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+monteverdi asparagi plenipotentiary mountebank a b c d lilljeholmen
+2 1 <epoch_num> 1 1 1 1 NULL Slussen
+drop table test.t1;
+drop table test.t1$EX;
+Test unusual mandatory column names + all table pks which are same
+as 'normal' exceptions table column names plus extra columns with
+same and different names to main table columns
+Also a defaulted extra column.
+create table test.t1$EX(
+monteverdi int unsigned,
+asparagi int unsigned,
+plenipotentiary bigint unsigned,
+mountebank int unsigned,
+server_id int unsigned not null,
+master_server_id int unsigned not null,
+master_epoch bigint unsigned not null,
+count int unsigned not null,
+d int, # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+create table test.t1 (server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+d int, e int, X int unsigned,
+primary key(server_id, master_server_id,
+master_epoch, count)) engine=ndb;
+Generate a conflict on the slave
+insert into test.t1 values (1,1,1,1,1,1,1);
+update test.t1 set X=0 where server_id=1 and master_server_id=1 and master_epoch=1 and count=1;
+Check that conflict has been recorded.
+select * from test.t1$EX;
+monteverdi asparagi plenipotentiary mountebank server_id master_server_id master_epoch count d lilljeholmen
+2 1 <epoch_num> 1 1 1 1 1 NULL Slussen
+drop table test.t1;
+drop table test.t1$EX;
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+And some bad exceptions table schemata
+Keys in wrong positions
+create table test.t1$EX(
+a int not null,
+b int not null,
+c int not null,
+d int, # Same name as main table, but user defined
+lilljeholmen varchar(50) default 'Slussen',
+# Separate, user defined
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+primary key(a,b,c)) engine=ndb;
+show warnings;
+Level Code Message
+MySQLD error output for server 1.1 matching pattern %NDB Slave%
+relevant
+[note] ndb slave: table test.t1 using conflict_fn ndb$max on attribute x.
+[warning] ndb slave: exceptions table t1$ex has wrong definition (initial 4 columns)
+drop table test.t1;
+drop table test.t1$EX;
"Cleanup"
drop table mysql.ndb_replication;
include/rpl_end.inc
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test 2012-01-23 15:47:46 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test 2012-01-31 10:01:22 +0000
@@ -373,6 +373,208 @@ show variables like 'server_id';
--connection master
drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
+delete from mysql.ndb_replication;
+
+--echo Test exceptions table schema flexibility
+#
+# An exceptions table should be able to have the mandatory columns with
+# different names, as long as the types match.
+# Also, not all main table primary key parts need be present
+# Finally, arbitrary extra columns should be allowed, as long as
+# they can be defaulted.
+#
+
+insert into mysql.ndb_replication values ("test", "t1", 0, 7, "NDB$MAX(X)");
+
+--echo Test 'normal' mandatory column names + all table pks
+
+create table test.t1$EX(
+ server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ a int not null,
+ b int not null,
+ c int not null,
+ primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+ primary key(a,b,c)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--echo Test 'normal' mandatory column names + all table pks +
+--echo extra columns with same and different names to main table columns
+--echo Also a defaulted extra column.
+
+create table test.t1$EX(
+ server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ a int not null,
+ b int not null,
+ c int not null,
+ d int, # Same name as main table, but user defined
+ lilljeholmen varchar(50) default 'Slussen',
+ # Separate, user defined
+ primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+ primary key(a,b,c)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--echo Test unusual mandatory column names + all table pks +
+--echo extra columns with same and different names to main table columns
+--echo Also a defaulted extra column.
+
+create table test.t1$EX(
+ monteverdi int unsigned,
+ asparagi int unsigned,
+ plenipotentiary bigint unsigned,
+ mountebank int unsigned,
+ a int not null,
+ b int not null,
+ c int not null,
+ d int, # Same name as main table, but user defined
+ lilljeholmen varchar(50) default 'Slussen',
+ # Separate, user defined
+ primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+ primary key(a,b,c)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where a=1 and b=1 and c=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--echo Test unusual mandatory column names + all table pks which are same
+--echo as 'normal' exceptions table column names plus extra columns with
+--echo same and different names to main table columns
+--echo Also a defaulted extra column.
+
+create table test.t1$EX(
+ monteverdi int unsigned,
+ asparagi int unsigned,
+ plenipotentiary bigint unsigned,
+ mountebank int unsigned,
+ server_id int unsigned not null,
+ master_server_id int unsigned not null,
+ master_epoch bigint unsigned not null,
+ count int unsigned not null,
+ d int, # Same name as main table, but user defined
+ lilljeholmen varchar(50) default 'Slussen',
+ # Separate, user defined
+ primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb;
+
+create table test.t1 (server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ d int, e int, X int unsigned,
+ primary key(server_id, master_server_id,
+ master_epoch, count)) engine=ndb;
+
+--echo Generate a conflict on the slave
+
+insert into test.t1 values (1,1,1,1,1,1,1);
+--sync_slave_with_master slave
+--connection master
+update test.t1 set X=0 where server_id=1 and master_server_id=1 and master_epoch=1 and count=1;
+--sync_slave_with_master slave
+--connection slave
+
+--echo Check that conflict has been recorded.
+--replace_column 3 <epoch_num>
+select * from test.t1$EX;
+
+--connection master
+drop table test.t1;
+drop table test.t1$EX;
+
+--connection server1
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+--connection server2
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+--connection slave
+call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*");
+--connection master
+
+--echo And some bad exceptions table schemata
+--echo Keys in wrong positions
+create table test.t1$EX(
+ a int not null,
+ b int not null,
+ c int not null,
+ d int, # Same name as main table, but user defined
+ lilljeholmen varchar(50) default 'Slussen',
+ # Separate, user defined
+ server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ primary key (server_id, master_server_id, master_epoch, count)) engine=ndb;
+
+create table test.t1 (a int, b int, c int, d int, e int, X int unsigned,
+ primary key(a,b,c)) engine=ndb;
+show warnings;
+
+--let $server_num=1.1
+--let $pattern=%NDB Slave%
+--let $limit=2
+
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+drop table test.t1;
+drop table test.t1$EX;
+
+
+
+
###############
--echo "Cleanup"
=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am 2012-01-25 17:15:41 +0000
+++ b/sql/Makefile.am 2012-01-31 10:24:30 +0000
@@ -68,6 +68,7 @@ noinst_HEADERS = item.h item_func.h item
ndb_util_thread.h \
ndb_table_guard.h \
ndb_repl_tab.h \
+ ndb_conflict.h \
ha_partition.h rpl_constants.h \
debug_sync.h \
opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
@@ -147,7 +148,8 @@ libndb_la_SOURCES= ha_ndbcluster.cc \
ndb_mi.cc \
ndb_conflict_trans.cc \
ndb_component.cc \
- ndb_repl_tab.cc
+ ndb_repl_tab.cc \
+ ndb_conflict.cc
gen_lex_hash_SOURCES = gen_lex_hash.cc
gen_lex_hash_LDFLAGS = @NOINST_LDFLAGS@
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2012-01-25 17:15:41 +0000
+++ b/sql/ha_ndbcluster.cc 2012-01-31 10:24:30 +0000
@@ -45,7 +45,7 @@
#include <mysql/plugin.h>
#include <ndb_version.h>
#include "ndb_mi.h"
-#include "ndb_conflict_trans.h"
+#include "ndb_conflict.h"
#include "ndb_component.h"
#include "ndb_util_thread.h"
#include "ndb_table_guard.h"
@@ -356,11 +356,6 @@ ndbcluster_alter_table_flags(uint flags)
#define NDB_AUTO_INCREMENT_RETRIES 100
#define BATCH_FLUSH_SIZE (32768)
-/*
- Room for 10 instruction words, two labels (@ 2words/label)
- + 2 extra words for the case of resolve_size == 8
-*/
-#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16
static void set_ndb_err(THD *thd, const NdbError &err);
@@ -823,8 +818,7 @@ static int ndb_to_mysql_error(const NdbE
#ifdef HAVE_NDB_BINLOG
int
-handle_conflict_op_error(Thd_ndb* thd_ndb,
- NdbTransaction* trans,
+handle_conflict_op_error(NdbTransaction* trans,
const NdbError& err,
const NdbOperation* op);
@@ -890,8 +884,7 @@ check_completed_operations_pre_commit(Th
if (err.classification != NdbError::NoError)
{
- int res = handle_conflict_op_error(thd_ndb,
- trans,
+ int res = handle_conflict_op_error(trans,
err,
first);
if (res != 0)
@@ -2198,11 +2191,6 @@ int ha_ndbcluster::get_metadata(THD *thd
ndbtab_g.release();
-#ifdef HAVE_NDB_BINLOG
- ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table,
- ::server_id, FALSE);
-#endif
-
DBUG_RETURN(0);
err:
@@ -4078,610 +4066,10 @@ thd_allow_batch(const THD* thd)
#endif
}
-/**
- st_ndb_slave_state constructor
-
- Initialise Ndb Slave state object
-*/
-st_ndb_slave_state::st_ndb_slave_state()
- : current_master_server_epoch(0),
- current_max_rep_epoch(0),
- conflict_flags(0),
- retry_trans_count(0),
- current_trans_row_conflict_count(0),
- current_trans_row_reject_count(0),
- current_trans_in_conflict_count(0),
- max_rep_epoch(0),
- sql_run_id(~Uint32(0)),
- trans_row_conflict_count(0),
- trans_row_reject_count(0),
- trans_detect_iter_count(0),
- trans_in_conflict_count(0),
- trans_conflict_commit_count(0),
- trans_conflict_apply_state(SAS_NORMAL),
- trans_dependency_tracker(NULL)
-{
- memset(current_violation_count, 0, sizeof(current_violation_count));
- memset(total_violation_count, 0, sizeof(total_violation_count));
-
- /* Init conflict handling state memroot */
- const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
- init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
-};
-
-/**
- resetPerAttemptCounters
-
- Reset the per-epoch-transaction-application-attempt counters
-*/
-void
-st_ndb_slave_state::resetPerAttemptCounters()
-{
- memset(current_violation_count, 0, sizeof(current_violation_count));
- current_trans_row_conflict_count = 0;
- current_trans_row_reject_count = 0;
- current_trans_in_conflict_count = 0;
-
- conflict_flags = 0;
- current_max_rep_epoch = 0;
-}
-
-/**
- atTransactionAbort()
-
- Called by Slave SQL thread during transaction abort.
-*/
-void
-st_ndb_slave_state::atTransactionAbort()
-{
- /* Reset current-transaction counters + state */
- resetPerAttemptCounters();
-}
-
-/**
- atTransactionCommit()
-
- Called by Slave SQL thread after transaction commit
-*/
-void
-st_ndb_slave_state::atTransactionCommit()
-{
- assert( ((trans_dependency_tracker == NULL) &&
- (trans_conflict_apply_state == SAS_NORMAL)) ||
- ((trans_dependency_tracker != NULL) &&
- (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
- assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
-
- /* Merge committed transaction counters into total state
- * Then reset current transaction counters
- */
- for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
- {
- total_violation_count[i]+= current_violation_count[i];
- }
- trans_row_conflict_count+= current_trans_row_conflict_count;
- trans_row_reject_count+= current_trans_row_reject_count;
- trans_in_conflict_count+= current_trans_in_conflict_count;
-
- if (current_trans_in_conflict_count)
- trans_conflict_commit_count++;
-
- if (current_max_rep_epoch > max_rep_epoch)
- {
- DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
- max_rep_epoch,
- current_max_rep_epoch));
- max_rep_epoch = current_max_rep_epoch;
- }
-
- resetPerAttemptCounters();
-
- /* Clear per-epoch-transaction retry_trans_count */
- retry_trans_count = 0;
-}
-
-/**
- atApplyStatusWrite
-
- Called by Slave SQL thread when applying an event to the
- ndb_apply_status table
-*/
-void
-st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
- Uint32 row_server_id,
- Uint64 row_epoch,
- bool is_row_server_id_local)
-{
- if (row_server_id == master_server_id)
- {
- /*
- WRITE_ROW to ndb_apply_status injected by MySQLD
- immediately upstream of us.
- Record epoch
- */
- current_master_server_epoch = row_epoch;
- assert(! is_row_server_id_local);
- }
- else if (is_row_server_id_local)
- {
- DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
- " which is %s.",
- row_server_id, row_epoch,
- (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
- " new highest." : " older than previously applied"));
- if (row_epoch > current_max_rep_epoch)
- {
- /*
- Store new highest epoch in thdvar. If we commit successfully
- then this can become the new global max
- */
- current_max_rep_epoch = row_epoch;
- }
- }
-}
-
-/**
- atResetSlave()
-
- Called when RESET SLAVE command issued - in context of command client.
-*/
-void
-st_ndb_slave_state::atResetSlave()
-{
- /* Reset the Maximum replicated epoch vars
- * on slave reset
- * No need to touch the sql_run_id as that
- * will increment if the slave is started
- * again.
- */
- resetPerAttemptCounters();
-
- retry_trans_count = 0;
- max_rep_epoch = 0;
-}
-
-/**
- atStartSlave()
-
- Called by Slave SQL thread when first applying a row to Ndb after
- a START SLAVE command.
-*/
-void
-st_ndb_slave_state::atStartSlave()
-{
-#ifdef HAVE_NDB_BINLOG
- if (trans_conflict_apply_state != SAS_NORMAL)
- {
- /*
- Remove conflict handling state on a SQL thread
- restart
- */
- atEndTransConflictHandling();
- trans_conflict_apply_state = SAS_NORMAL;
- }
-#endif
-};
#ifdef HAVE_NDB_BINLOG
/**
- atBeginTransConflictHandling()
-
- Called by Slave SQL thread when it determines that Transactional
- Conflict handling is required
-*/
-void
-st_ndb_slave_state::atBeginTransConflictHandling()
-{
- DBUG_ENTER("atBeginTransConflictHandling");
- /*
- Allocate and initialise Transactional Conflict
- Resolution Handling Structures
- */
- assert(trans_dependency_tracker == NULL);
- trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
- DBUG_VOID_RETURN;
-};
-
-/**
- atPrepareConflictDetection
-
- Called by Slave SQL thread prior to defining an operation on
- a table with conflict detection defined.
-*/
-int
-st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
- const NdbRecord* key_rec,
- const uchar* row_data,
- Uint64 transaction_id,
- bool& handle_conflict_now)
-{
- DBUG_ENTER("atPrepareConflictDetection");
- /*
- Slave is preparing to apply an operation with conflict detection.
- If we're performing Transactional Conflict Resolution, take
- extra steps
- */
- switch( trans_conflict_apply_state )
- {
- case SAS_NORMAL:
- DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
- /* No special handling */
- break;
- case SAS_TRACK_TRANS_DEPENDENCIES:
- {
- DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
- /*
- Track this operation and its transaction id, to determine
- inter-transaction dependencies by {table, primary key}
- */
- assert( trans_dependency_tracker );
-
- int res = trans_dependency_tracker
- ->track_operation(table,
- key_rec,
- row_data,
- transaction_id);
- if (res != 0)
- {
- sql_print_error("%s", trans_dependency_tracker->get_error_text());
- DBUG_RETURN(res);
- }
- /* Proceed as normal */
- break;
- }
- case SAS_APPLY_TRANS_DEPENDENCIES:
- {
- DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
- /*
- Check if this operation's transaction id is marked in-conflict.
- If it is, we tell the caller to perform conflict resolution now instead
- of attempting to apply the operation.
- */
- assert( trans_dependency_tracker );
-
- if (trans_dependency_tracker->in_conflict(transaction_id))
- {
- DBUG_PRINT("info", ("Event for transaction %llu is conflicting. Handling.",
- transaction_id));
- current_trans_row_reject_count++;
- handle_conflict_now = true;
- DBUG_RETURN(0);
- }
-
- /*
- This transaction is not marked in-conflict, so continue with normal
- processing.
- Note that normal processing may subsequently detect a conflict which
- didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
- In this case, we will rollback and repeat the TRACK_DEPENDENCIES
- stage.
- */
- DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
- transaction_id));
- break;
- }
- }
- DBUG_RETURN(0);
-}
-
-/**
- atTransConflictDetected
-
- Called by the Slave SQL thread when a conflict is detected on
- an executed operation.
-*/
-int
-st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
-{
- DBUG_ENTER("atTransConflictDetected");
-
- /*
- The Slave has detected a conflict on an operation applied
- to a table with Transactional Conflict Resolution defined.
- Handle according to current state.
- */
- conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
- current_trans_row_conflict_count++;
-
- switch (trans_conflict_apply_state)
- {
- case SAS_NORMAL:
- {
- DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
- "Requires multi-pass resolution. Will transition to "
- "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
- /*
- Conflict on table with transactional conflict resolution
- defined.
- This is the trigger that we will do transactional conflict
- resolution.
- Record that we need to do multiple passes to correctly
- perform resolution.
- TODO : Early exit from applying epoch?
- */
- break;
- }
- case SAS_TRACK_TRANS_DEPENDENCIES:
- {
- DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
- "had conflict",
- transaction_id));
- /*
- Conflict on table with transactional conflict resolution
- defined.
- We will mark the operation's transaction_id as in-conflict,
- so that any other operations on the transaction are also
- considered in-conflict, and any dependent transactions are also
- considered in-conflict.
- */
- assert(trans_dependency_tracker != NULL);
- int res = trans_dependency_tracker
- ->mark_conflict(transaction_id);
-
- if (res != 0)
- {
- sql_print_error("%s", trans_dependency_tracker->get_error_text());
- DBUG_RETURN(res);
- }
- break;
- }
- case SAS_APPLY_TRANS_DEPENDENCIES:
- {
- /*
- This must be a new conflict, not noticed on the previous
- pass.
- */
- DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected. "
- "Must be further conflict. Will return to "
- "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
- // TODO : Early exit from applying epoch
- break;
- }
- default:
- break;
- }
-
- DBUG_RETURN(0);
-}
-
-/**
- atConflictPreCommit
-
- Called by the Slave SQL thread prior to committing a Slave transaction.
- This method can request that the Slave transaction is retried.
-
-
- State transitions :
-
- START SLAVE /
- RESET SLAVE /
- STARTUP
- |
- |
- v
- ****************
- * SAS_NORMAL *
- ****************
- ^ |
- No transactional | | Conflict on transactional table
- conflicts | | (Rollback)
- (Commit) | |
- | v
- **********************************
- * SAS_TRACK_TRANS_DEPENDENCIES *
- **********************************
- ^ I ^
- More I I Dependencies |
- conflicts I I determined | No new conflicts
- found I I (Rollback) | (Commit)
- (Rollback) I I |
- I v |
- **********************************
- * SAS_APPLY_TRANS_DEPENDENCIES *
- **********************************
-
-
- Operation
- The initial state is SAS_NORMAL.
-
- On detecting a conflict on a transactional conflict detetecing table,
- SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
- rolled back and reapplied.
-
- In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
- conflicts are tracked as the epoch transaction is applied.
-
- Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
- the epoch transaction is rolled back and reapplied.
-
- In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
- marked as in-conflict are not applied.
-
- If this results in no new conflicts, the epoch transaction is committed,
- and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
- the next replicated epch transaction.
- If it results in new conflicts, the epoch transactions is rolled back, and
- the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
- the new set of dependencies.
-
- If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
- the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
- state.
-
-
- Properties
- 1) Normally, there is no transaction dependency tracking overhead paid by
- the slave.
-
- 2) On first detecting a transactional conflict, the epoch transaction must be
- applied at least three times, with two rollbacks.
-
- 3) Transactional conflicts detected in subsequent epochs require the epoch
- transaction to be applied two times, with one rollback.
-
- 4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
- DEPENDENCIES occurs when further transactional conflicts are discovered
- in SAS_APPLY_TRANS_DEPENDENCIES state. This implies that the conflicts
- discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
- so we revisit that state to get a more complete picture.
-
- 5) The number of iterations of this loop is fixed to a hard coded limit, after
- which the Slave will stop with an error. This should be an unlikely
- occurrence, as it requires not just n conflicts, but at least 1 new conflict
- appearing between the transactions in the epoch transaction and the
- database between the two states, n times in a row.
-
- 6) Where conflicts are occasional, as expected, the post-commit transition to
- SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
- transaction having its transaction dependencies needlessly tracked.
-
-*/
-int
-st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
-{
- DBUG_ENTER("atConflictPreCommit");
-
- /*
- Prior to committing a Slave transaction, we check whether
- Transactional conflicts have been detected which require
- us to retry the slave transaction
- */
- retry_slave_trans = false;
- switch(trans_conflict_apply_state)
- {
- case SAS_NORMAL:
- {
- DBUG_PRINT("info", ("SAS_NORMAL"));
- /*
- Normal case. Only if we defined conflict detection on a table
- with transactional conflict detection, and saw conflicts (on any table)
- do we go to another state
- */
- if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
- {
- DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
- "SAS_TRACK_TRANS_DEPENDENCIES."));
- assert(conflict_flags & SCS_OPS_DEFINED);
- /* Transactional conflict resolution required, switch state */
- atBeginTransConflictHandling();
- resetPerAttemptCounters();
- trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
- retry_slave_trans = true;
- }
- break;
- }
- case SAS_TRACK_TRANS_DEPENDENCIES:
- {
- DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
-
- if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
- {
- /*
- Conflict on table with transactional detection
- this pass, we have collected the details and
- dependencies, now transition to
- SAS_APPLY_TRANS_DEPENDENCIES and
- reapply the epoch transaction without the
- conflicting transactions.
- */
- assert(conflict_flags & SCS_OPS_DEFINED);
- DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
- "SAS_APPLY_TRANS_DEPENDENCIES"));
-
- trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
- trans_detect_iter_count++;
- retry_slave_trans = true;
- break;
- }
- else
- {
- /*
- No transactional conflicts detected this pass, lets
- return to SAS_NORMAL state after commit for more efficient
- application of epoch transactions
- */
- DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
- "SAS_NORMAL"));
- atEndTransConflictHandling();
- trans_conflict_apply_state = SAS_NORMAL;
- break;
- }
- }
- case SAS_APPLY_TRANS_DEPENDENCIES:
- {
- DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
- assert(conflict_flags & SCS_OPS_DEFINED);
- /*
- We've applied the Slave epoch transaction subject to the
- conflict detection. If any further transactional
- conflicts have been observed, then we must repeat the
- process.
- */
- atEndTransConflictHandling();
- atBeginTransConflictHandling();
- trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
-
- if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
- {
- DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
- "TRACK_TRANS_DEPENDENCIES pass"));
- /*
- Further conflict observed when applying, need
- to re-determine dependencies
- */
- resetPerAttemptCounters();
- retry_slave_trans = true;
- break;
- }
-
-
- DBUG_PRINT("info", ("No further conflicts detected, committing and "
- "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
- /*
- With dependencies taken into account, no further
- conflicts detected, can now proceed to commit
- */
- break;
- }
- }
-
- /*
- Clear conflict flags, to ensure that we detect any new conflicts
- */
- conflict_flags = 0;
-
- if (retry_slave_trans)
- {
- DBUG_PRINT("info", ("Requesting transaction restart"));
- DBUG_RETURN(1);
- }
-
- DBUG_PRINT("info", ("Allowing commit to proceed"));
- DBUG_RETURN(0);
-}
-
-/**
- atEndTransConflictHandling
-
- Called when transactional conflict handling has completed.
-*/
-void
-st_ndb_slave_state::atEndTransConflictHandling()
-{
- DBUG_ENTER("atEndTransConflictHandling");
- /* Release any conflict handling state */
- if (trans_dependency_tracker)
- {
- current_trans_in_conflict_count =
- trans_dependency_tracker->get_conflict_count();
- trans_dependency_tracker = NULL;
- free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
- }
- DBUG_VOID_RETURN;
-};
-
-/**
prepare_conflict_detection
This method is called during operation definition by the slave,
@@ -4862,8 +4250,7 @@ ha_ndbcluster::prepare_conflict_detectio
*/
int
-handle_conflict_op_error(Thd_ndb* thd_ndb,
- NdbTransaction* trans,
+handle_conflict_op_error(NdbTransaction* trans,
const NdbError& err,
const NdbOperation* op)
{
@@ -5614,92 +5001,40 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
}
if (cfn_share &&
- cfn_share->m_ex_tab != NULL)
+ cfn_share->m_ex_tab_writer.hasTable())
{
NdbError err;
- assert(err.code == 0);
- do
+ if (cfn_share->m_ex_tab_writer.writeRow(conflict_trans,
+ key_rec,
+ ::server_id,
+ ndb_mi_get_master_server_id(),
+ g_ndb_slave_state.current_master_server_epoch,
+ pk_row,
+ err) != 0)
{
- /* Have exceptions table, add row to it */
- const NDBTAB *ex_tab= cfn_share->m_ex_tab;
-
- /* get insert op */
- NdbOperation *ex_op= conflict_trans->getNdbOperation(ex_tab);
- if (ex_op == NULL)
+ if (err.code != 0)
{
- err= conflict_trans->getNdbError();
- break;
- }
- if (ex_op->insertTuple() == -1)
- {
- err= ex_op->getNdbError();
- break;
- }
- {
- uint32 server_id= (uint32)::server_id;
- uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
- uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
- uint32 count= (uint32)++(cfn_share->m_count);
- if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
- ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
- ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
- ex_op->setValue((Uint32)3, (const char *)&(count)))
+ if (err.status == NdbError::TemporaryError)
{
- err= ex_op->getNdbError();
- break;
+ /* Slave will roll back and retry entire transaction. */
+ ERR_RETURN(err);
}
- }
- /* copy primary keys */
- {
- const int fixed_cols= 4;
- int nkey= cfn_share->m_pk_cols;
- int k;
- for (k= 0; k < nkey; k++)
- {
- DBUG_ASSERT(pk_row != NULL);
- const uchar* data=
- (const uchar*) NdbDictionary::getValuePtr(key_rec,
- (const char*) pk_row,
- cfn_share->m_key_attrids[k]);
- if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
- {
- err= ex_op->getNdbError();
- break;
- }
+ else
+ {
+ char msg[FN_REFLEN];
+ my_snprintf(msg, sizeof(msg), "%s conflict handling "
+ "on table %s hit Ndb error %d '%s'",
+ handling_type,
+ table_name,
+ err.code,
+ err.message);
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+ ER_EXCEPTIONS_WRITE_ERROR,
+ ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+ /* Slave will stop replication. */
+ DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
}
}
- } while (0);
-
- if (err.code != 0)
- {
- char msg[FN_REFLEN];
- my_snprintf(msg, sizeof(msg), "%s conflict handling "
- "on table %s hit Ndb error %d '%s'",
- handling_type,
- table_name,
- err.code,
- err.message);
-
- if (err.classification == NdbError::SchemaError)
- {
- /* Something up with Exceptions table schema, forget it */
- NdbDictionary::Dictionary* dict= conflict_trans->getNdb()->getDictionary();
- dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
- cfn_share->m_ex_tab= NULL;
- }
- else if (err.status == NdbError::TemporaryError)
- {
- /* Slave will roll back and retry entire transaction. */
- ERR_RETURN(err);
- }
- else
- {
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
- ER_EXCEPTIONS_WRITE_ERROR,
- ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
- /* Slave will stop replication. */
- DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
- }
}
} /* if (cfn_share->m_ex_tab != NULL) */
@@ -8345,6 +7680,7 @@ int ndbcluster_commit(handlerton *hton,
* an execute(NoCommit) before committing, as conflict op handling
* is done by execute(NoCommit)
*/
+ /* TODO : Add as function */
if (g_ndb_slave_state.conflict_flags & SCS_OPS_DEFINED)
{
if (thd_ndb->m_unsent_bytes)
@@ -9616,6 +8952,7 @@ int ha_ndbcluster::create(const char *na
/* Reset database name */
ndb->setDatabaseName(m_dbname);
+ /* TODO : Add as per conflict function 'virtual' */
/* Use ndb_replication information as required */
if (conflict_fn != NULL)
{
@@ -13636,7 +12973,6 @@ NDB_SHARE *ndbcluster_get_share(const ch
DBUG_RETURN(share);
}
-
void ndbcluster_real_free_share(NDB_SHARE **share)
{
DBUG_ENTER("ndbcluster_real_free_share");
@@ -13652,11 +12988,11 @@ void ndbcluster_real_free_share(NDB_SHAR
pthread_mutex_destroy(&(*share)->mutex);
#ifdef HAVE_NDB_BINLOG
- if ((*share)->m_cfn_share && (*share)->m_cfn_share->m_ex_tab && g_ndb)
+ if ((*share)->m_cfn_share &&
+ (*share)->m_cfn_share->m_ex_tab_writer.hasTable() &&
+ g_ndb)
{
- NDBDICT *dict= g_ndb->getDictionary();
- dict->removeTableGlobal(*(*share)->m_cfn_share->m_ex_tab, 0);
- (*share)->m_cfn_share->m_ex_tab= 0;
+ (*share)->m_cfn_share->m_ex_tab_writer.free(g_ndb);
}
#endif
(*share)->new_op= 0;
=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h 2012-01-17 13:57:01 +0000
+++ b/sql/ha_ndbcluster.h 2012-01-31 10:24:30 +0000
@@ -32,6 +32,7 @@
#include <ndbapi/NdbApi.hpp>
#include <ndbapi/ndbapi_limits.h>
#include <kernel/ndb_limits.h>
+#include "ndb_conflict.h"
#define NDB_IGNORE_VALUE(x) (void)x
@@ -120,125 +121,6 @@ typedef enum {
NSS_ALTERED
} NDB_SHARE_STATE;
-enum enum_conflict_fn_type
-{
- CFT_NDB_UNDEF = 0
- ,CFT_NDB_MAX
- ,CFT_NDB_OLD
- ,CFT_NDB_MAX_DEL_WIN
- ,CFT_NDB_EPOCH
- ,CFT_NDB_EPOCH_TRANS
- ,CFT_NUMBER_OF_CFTS /* End marker */
-};
-
-#ifdef HAVE_NDB_BINLOG
-static const Uint32 MAX_CONFLICT_ARGS= 8;
-
-enum enum_conflict_fn_arg_type
-{
- CFAT_END
- ,CFAT_COLUMN_NAME
- ,CFAT_EXTRA_GCI_BITS
-};
-
-struct st_conflict_fn_arg
-{
- enum_conflict_fn_arg_type type;
- union
- {
- char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
- uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
- };
-};
-
-struct st_conflict_fn_arg_def
-{
- enum enum_conflict_fn_arg_type arg_type;
- bool optional;
-};
-
-/* What type of operation was issued */
-enum enum_conflicting_op_type
-{ /* NdbApi */
- WRITE_ROW, /* insert (!write) */
- UPDATE_ROW, /* update */
- DELETE_ROW, /* delete */
- REFRESH_ROW /* refresh */
-};
-
-/*
- prepare_detect_func
-
- Type of function used to prepare for conflict detection on
- an NdbApi operation
-*/
-typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
- enum_conflicting_op_type op_type,
- const NdbRecord* data_record,
- const uchar* old_data,
- const uchar* new_data,
- const MY_BITMAP* write_set,
- class NdbInterpretedCode* code);
-
-enum enum_conflict_fn_flags
-{
- CF_TRANSACTIONAL = 1
-};
-
-struct st_conflict_fn_def
-{
- const char *name;
- enum_conflict_fn_type type;
- const st_conflict_fn_arg_def* arg_defs;
- prepare_detect_func prep_func;
- uint8 flags; /* enum_conflict_fn_flags */
-};
-
-/* What sort of conflict was found */
-enum enum_conflict_cause
-{
- ROW_ALREADY_EXISTS, /* On insert */
- ROW_DOES_NOT_EXIST, /* On Update, Delete */
- ROW_IN_CONFLICT, /* On Update, Delete */
- TRANS_IN_CONFLICT /* Any of above, or implied by transaction */
-};
-
-/* NdbOperation custom data which points out handler and record. */
-struct Ndb_exceptions_data {
- struct NDB_SHARE* share;
- const NdbRecord* key_rec;
- const uchar* row;
- enum_conflicting_op_type op_type;
- Uint64 trans_id;
-};
-
-enum enum_conflict_fn_table_flags
-{
- CFF_NONE = 0,
- CFF_REFRESH_ROWS = 1
-};
-
-/*
- Maximum supported key parts (16)
- (Ndb supports 32, but MySQL has a lower limit)
-*/
-static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
-
-typedef struct st_ndbcluster_conflict_fn_share {
- const st_conflict_fn_def* m_conflict_fn;
-
- /* info about original table */
- uint8 m_pk_cols;
- uint16 m_resolve_column;
- uint8 m_resolve_size;
- uint8 m_flags;
- uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
-
- const NdbDictionary::Table *m_ex_tab;
- uint32 m_count;
-} NDB_CONFLICT_FN_SHARE;
-#endif
-
/*
Stats that can be retrieved from ndb
*/
@@ -357,99 +239,6 @@ inline void set_binlog_use_update(NDB_SH
inline my_bool get_binlog_use_update(NDB_SHARE *share)
{ return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
-enum enum_slave_trans_conflict_apply_state
-{
- /* Normal with optional row-level conflict detection */
- SAS_NORMAL,
-
- /*
- SAS_TRACK_TRANS_DEPENDENCIES
- Track inter-transaction dependencies
- */
- SAS_TRACK_TRANS_DEPENDENCIES,
-
- /*
- SAS_APPLY_TRANS_DEPENDENCIES
- Apply only non conflicting transactions
- */
- SAS_APPLY_TRANS_DEPENDENCIES
-};
-
-enum enum_slave_conflict_flags
-{
- /* Conflict detection Ops defined */
- SCS_OPS_DEFINED = 1,
- /* Conflict detected on table with transactional resolution */
- SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2
-};
-
-/*
- State associated with the Slave thread
- (From the Ndb handler's point of view)
-*/
-struct st_ndb_slave_state
-{
- /* Counter values for current slave transaction */
- Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
- Uint64 current_master_server_epoch;
- Uint64 current_max_rep_epoch;
- uint8 conflict_flags; /* enum_slave_conflict_flags */
- /* Transactional conflict detection */
- Uint32 retry_trans_count;
- Uint32 current_trans_row_conflict_count;
- Uint32 current_trans_row_reject_count;
- Uint32 current_trans_in_conflict_count;
-
- /* Cumulative counter values */
- Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
- Uint64 max_rep_epoch;
- Uint32 sql_run_id;
- /* Transactional conflict detection */
- Uint64 trans_row_conflict_count;
- Uint64 trans_row_reject_count;
- Uint64 trans_detect_iter_count;
- Uint64 trans_in_conflict_count;
- Uint64 trans_conflict_commit_count;
-
- static const Uint32 MAX_RETRY_TRANS_COUNT = 100;
-
- /*
- Slave Apply State
-
- State of Binlog application from Ndb point of view.
- */
- enum_slave_trans_conflict_apply_state trans_conflict_apply_state;
-
- MEM_ROOT conflict_mem_root;
- class DependencyTracker* trans_dependency_tracker;
-
- /* Methods */
- void atStartSlave();
- int atPrepareConflictDetection(const NdbDictionary::Table* table,
- const NdbRecord* key_rec,
- const uchar* row_data,
- Uint64 transaction_id,
- bool& handle_conflict_now);
- int atTransConflictDetected(Uint64 transaction_id);
- int atConflictPreCommit(bool& retry_slave_trans);
-
- void atBeginTransConflictHandling();
- void atEndTransConflictHandling();
-
- void atTransactionCommit();
- void atTransactionAbort();
- void atResetSlave();
-
- void atApplyStatusWrite(Uint32 master_server_id,
- Uint32 row_server_id,
- Uint64 row_epoch,
- bool is_row_server_id_local);
-
- void resetPerAttemptCounters();
-
- st_ndb_slave_state();
-};
-
/*
Place holder for ha_ndbcluster thread specific data
*/
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2012-01-25 17:15:41 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2012-01-31 10:24:30 +0000
@@ -3871,6 +3871,8 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
cfn_share->m_resolve_column= field_index;
cfn_share->m_flags = flags;
+ /* Init Exceptions Table Writer */
+ new (&cfn_share->m_ex_tab_writer) ExceptionsTableWriter();
{
/* get exceptions table */
char ex_tab_name[FN_REFLEN];
@@ -3882,71 +3884,31 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
const NDBTAB *ex_tab= ndbtab_g.get_table();
if (ex_tab)
{
- const int fixed_cols= 4;
- bool ok=
- ex_tab->getNoOfColumns() >= fixed_cols &&
- ex_tab->getNoOfPrimaryKeys() == 4 &&
- /* server id */
- ex_tab->getColumn(0)->getType() == NDBCOL::Unsigned &&
- ex_tab->getColumn(0)->getPrimaryKey() &&
- /* master_server_id */
- ex_tab->getColumn(1)->getType() == NDBCOL::Unsigned &&
- ex_tab->getColumn(1)->getPrimaryKey() &&
- /* master_epoch */
- ex_tab->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
- ex_tab->getColumn(2)->getPrimaryKey() &&
- /* count */
- ex_tab->getColumn(3)->getType() == NDBCOL::Unsigned &&
- ex_tab->getColumn(3)->getPrimaryKey();
- if (ok)
+ char msgBuf[ FN_REFLEN ];
+ const char* msg = NULL;
+ if (cfn_share->m_ex_tab_writer.init(ndbtab,
+ ex_tab,
+ msgBuf,
+ sizeof(msgBuf),
+ &msg) == 0)
{
- int ncol= ndbtab->getNoOfColumns();
- int nkey= ndbtab->getNoOfPrimaryKeys();
- int i, k;
- for (i= k= 0; i < ncol && k < nkey; i++)
+ /* Ok */
+ /* Hold our table reference outside the table_guard scope */
+ ndbtab_g.release();
+ if (opt_ndb_extra_logging)
{
- const NdbDictionary::Column* col= ndbtab->getColumn(i);
- if (col->getPrimaryKey())
- {
- const NdbDictionary::Column* ex_col=
- ex_tab->getColumn(fixed_cols + k);
- ok=
- ex_col != NULL &&
- col->getType() == ex_col->getType() &&
- col->getLength() == ex_col->getLength() &&
- col->getNullable() == ex_col->getNullable();
- if (!ok)
- break;
- /*
- Store mapping of Exception table key# to
- orig table attrid
- */
- cfn_share->m_key_attrids[k]= i;
- k++;
- }
+ sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
+ share->db,
+ share->table_name,
+ share->db,
+ ex_tab_name);
}
- if (ok)
- {
- cfn_share->m_ex_tab= ex_tab;
- cfn_share->m_pk_cols= nkey;
- ndbtab_g.release();
- if (opt_ndb_extra_logging)
- sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
- share->db,
- share->table_name,
- share->db,
- ex_tab_name);
- }
- else
- sql_print_warning("NDB Slave: exceptions table %s has wrong "
- "definition (column %d)",
- ex_tab_name, fixed_cols + k);
}
else
- sql_print_warning("NDB Slave: exceptions table %s has wrong "
- "definition (initial %d columns)",
- ex_tab_name, fixed_cols);
- }
+ {
+ sql_print_warning("%s", msg);
+ }
+ } /* if (ex_tab) */
}
DBUG_RETURN(0);
}
=== added file 'sql/ndb_conflict.cc'
--- a/sql/ndb_conflict.cc 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict.cc 2012-01-31 10:01:22 +0000
@@ -0,0 +1,804 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+#include <my_global.h> /* For config defines */
+
+#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
+/* distcheck does not compile from here... */
+
+#include "ndb_conflict.h"
+
+#ifdef HAVE_NDB_BINLOG
+
+#define NDBTAB NdbDictionary::Table
+#define NDBCOL NdbDictionary::Column
+
+int
+ExceptionsTableWriter::init(const NdbDictionary::Table* mainTable,
+ const NdbDictionary::Table* exceptionsTable,
+ char* msg_buf,
+ uint msg_buf_len,
+ const char** msg)
+{
+ const char* ex_tab_name = exceptionsTable->getName();
+ const int fixed_cols= 4;
+ bool ok=
+ exceptionsTable->getNoOfColumns() >= fixed_cols &&
+ exceptionsTable->getNoOfPrimaryKeys() == 4 &&
+ /* server id */
+ exceptionsTable->getColumn(0)->getType() == NDBCOL::Unsigned &&
+ exceptionsTable->getColumn(0)->getPrimaryKey() &&
+ /* master_server_id */
+ exceptionsTable->getColumn(1)->getType() == NDBCOL::Unsigned &&
+ exceptionsTable->getColumn(1)->getPrimaryKey() &&
+ /* master_epoch */
+ exceptionsTable->getColumn(2)->getType() == NDBCOL::Bigunsigned &&
+ exceptionsTable->getColumn(2)->getPrimaryKey() &&
+ /* count */
+ exceptionsTable->getColumn(3)->getType() == NDBCOL::Unsigned &&
+ exceptionsTable->getColumn(3)->getPrimaryKey();
+ if (ok)
+ {
+ int ncol= mainTable->getNoOfColumns();
+ int nkey= mainTable->getNoOfPrimaryKeys();
+ int i, k;
+ for (i= k= 0; i < ncol && k < nkey; i++)
+ {
+ const NdbDictionary::Column* col= mainTable->getColumn(i);
+ if (col->getPrimaryKey())
+ {
+ const NdbDictionary::Column* ex_col=
+ exceptionsTable->getColumn(fixed_cols + k);
+ ok=
+ ex_col != NULL &&
+ col->getType() == ex_col->getType() &&
+ col->getLength() == ex_col->getLength() &&
+ col->getNullable() == ex_col->getNullable();
+ if (!ok)
+ break;
+ /*
+ Store mapping of Exception table key# to
+ orig table attrid
+ */
+ m_key_attrids[k]= i;
+ k++;
+ }
+ }
+ if (ok)
+ {
+ m_ex_tab= exceptionsTable;
+ m_pk_cols= nkey;
+ return 0;
+ }
+ else
+ my_snprintf(msg_buf, msg_buf_len,
+ "NDB Slave: exceptions table %s has wrong "
+ "definition (column %d)",
+ ex_tab_name, fixed_cols + k);
+ }
+ else
+ my_snprintf(msg_buf, msg_buf_len,
+ "NDB Slave: exceptions table %s has wrong "
+ "definition (initial %d columns)",
+ ex_tab_name, fixed_cols);
+
+ *msg = msg_buf;
+ return -1;
+}
+
+void
+ExceptionsTableWriter::free(Ndb* ndb)
+{
+ if (m_ex_tab)
+ {
+ NdbDictionary::Dictionary* dict = ndb->getDictionary();
+ dict->removeTableGlobal(*m_ex_tab, 0);
+ m_ex_tab= 0;
+ }
+}
+
+int
+ExceptionsTableWriter::writeRow(NdbTransaction* trans,
+ const NdbRecord* keyRecord,
+ uint32 server_id,
+ uint32 master_server_id,
+ uint64 master_epoch,
+ const uchar* rowPtr,
+ NdbError& err)
+{
+ DBUG_ENTER("ExceptionsTableWriter::writeRow");
+ assert(err.code == 0);
+ do
+ {
+ /* Have exceptions table, add row to it */
+ const NDBTAB *ex_tab= m_ex_tab;
+
+ /* get insert op */
+ NdbOperation *ex_op= trans->getNdbOperation(ex_tab);
+ if (ex_op == NULL)
+ {
+ err= trans->getNdbError();
+ break;
+ }
+ if (ex_op->insertTuple() == -1)
+ {
+ err= ex_op->getNdbError();
+ break;
+ }
+ {
+ uint32 count= (uint32)++m_count;
+ if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
+ ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
+ ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
+ ex_op->setValue((Uint32)3, (const char *)&(count)))
+ {
+ err= ex_op->getNdbError();
+ break;
+ }
+ }
+ /* copy primary keys */
+ {
+ const int fixed_cols= 4;
+ int nkey= m_pk_cols;
+ int k;
+ for (k= 0; k < nkey; k++)
+ {
+ DBUG_ASSERT(rowPtr != NULL);
+ const uchar* data=
+ (const uchar*) NdbDictionary::getValuePtr(keyRecord,
+ (const char*) rowPtr,
+ m_key_attrids[k]);
+ if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
+ {
+ err= ex_op->getNdbError();
+ break;
+ }
+ }
+ }
+ } while (0);
+
+ if (err.code != 0)
+ {
+ if (err.classification == NdbError::SchemaError)
+ {
+ /* Something up with Exceptions table schema, forget it.
+ * No further exceptions will be recorded.
+ * TODO : Log this somehow
+ */
+ NdbDictionary::Dictionary* dict= trans->getNdb()->getDictionary();
+ dict->removeTableGlobal(*m_ex_tab, false);
+ m_ex_tab= NULL;
+ DBUG_RETURN(0);
+ }
+ DBUG_RETURN(-1);
+ }
+ DBUG_RETURN(0);
+}
+
+/* HAVE_NDB_BINLOG */
+#endif
+
+/**
+ st_ndb_slave_state constructor
+
+ Initialise Ndb Slave state object
+*/
+st_ndb_slave_state::st_ndb_slave_state()
+ : current_master_server_epoch(0),
+ current_max_rep_epoch(0),
+ conflict_flags(0),
+ retry_trans_count(0),
+ current_trans_row_conflict_count(0),
+ current_trans_row_reject_count(0),
+ current_trans_in_conflict_count(0),
+ max_rep_epoch(0),
+ sql_run_id(~Uint32(0)),
+ trans_row_conflict_count(0),
+ trans_row_reject_count(0),
+ trans_detect_iter_count(0),
+ trans_in_conflict_count(0),
+ trans_conflict_commit_count(0),
+ trans_conflict_apply_state(SAS_NORMAL),
+ trans_dependency_tracker(NULL)
+{
+ memset(current_violation_count, 0, sizeof(current_violation_count));
+ memset(total_violation_count, 0, sizeof(total_violation_count));
+
+ /* Init conflict handling state memroot */
+ const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
+ init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
+};
+
+/**
+ resetPerAttemptCounters
+
+ Reset the per-epoch-transaction-application-attempt counters
+*/
+void
+st_ndb_slave_state::resetPerAttemptCounters()
+{
+ memset(current_violation_count, 0, sizeof(current_violation_count));
+ current_trans_row_conflict_count = 0;
+ current_trans_row_reject_count = 0;
+ current_trans_in_conflict_count = 0;
+
+ conflict_flags = 0;
+ current_max_rep_epoch = 0;
+}
+
+/**
+ atTransactionAbort()
+
+ Called by Slave SQL thread during transaction abort.
+*/
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+ /* Reset current-transaction counters + state */
+ resetPerAttemptCounters();
+}
+
+
+
+/**
+ atTransactionCommit()
+
+ Called by Slave SQL thread after transaction commit
+*/
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+ assert( ((trans_dependency_tracker == NULL) &&
+ (trans_conflict_apply_state == SAS_NORMAL)) ||
+ ((trans_dependency_tracker != NULL) &&
+ (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
+ assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
+
+ /* Merge committed transaction counters into total state
+ * Then reset current transaction counters
+ */
+ for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+ {
+ total_violation_count[i]+= current_violation_count[i];
+ }
+ trans_row_conflict_count+= current_trans_row_conflict_count;
+ trans_row_reject_count+= current_trans_row_reject_count;
+ trans_in_conflict_count+= current_trans_in_conflict_count;
+
+ if (current_trans_in_conflict_count)
+ trans_conflict_commit_count++;
+
+ if (current_max_rep_epoch > max_rep_epoch)
+ {
+ DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
+ max_rep_epoch,
+ current_max_rep_epoch));
+ max_rep_epoch = current_max_rep_epoch;
+ }
+
+ resetPerAttemptCounters();
+
+ /* Clear per-epoch-transaction retry_trans_count */
+ retry_trans_count = 0;
+}
+
+/**
+ atApplyStatusWrite
+
+ Called by Slave SQL thread when applying an event to the
+ ndb_apply_status table
+*/
+void
+st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
+ Uint32 row_server_id,
+ Uint64 row_epoch,
+ bool is_row_server_id_local)
+{
+ if (row_server_id == master_server_id)
+ {
+ /*
+ WRITE_ROW to ndb_apply_status injected by MySQLD
+ immediately upstream of us.
+ Record epoch
+ */
+ current_master_server_epoch = row_epoch;
+ assert(! is_row_server_id_local);
+ }
+ else if (is_row_server_id_local)
+ {
+ DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
+ " which is %s.",
+ row_server_id, row_epoch,
+ (row_epoch > current_max_rep_epoch)?
+ " new highest." : " older than previously applied"));
+ if (row_epoch > current_max_rep_epoch)
+ {
+ /*
+ Store new highest epoch in thdvar. If we commit successfully
+ then this can become the new global max
+ */
+ current_max_rep_epoch = row_epoch;
+ }
+ }
+}
+
+/**
+ atResetSlave()
+
+ Called when RESET SLAVE command issued - in context of command client.
+*/
+void
+st_ndb_slave_state::atResetSlave()
+{
+ /* Reset the Maximum replicated epoch vars
+ * on slave reset
+ * No need to touch the sql_run_id as that
+ * will increment if the slave is started
+ * again.
+ */
+ resetPerAttemptCounters();
+
+ retry_trans_count = 0;
+ max_rep_epoch = 0;
+}
+
+
+/**
+ atStartSlave()
+
+ Called by Slave SQL thread when first applying a row to Ndb after
+ a START SLAVE command.
+*/
+void
+st_ndb_slave_state::atStartSlave()
+{
+#ifdef HAVE_NDB_BINLOG
+ if (trans_conflict_apply_state != SAS_NORMAL)
+ {
+ /*
+ Remove conflict handling state on a SQL thread
+ restart
+ */
+ atEndTransConflictHandling();
+ trans_conflict_apply_state = SAS_NORMAL;
+ }
+#endif
+};
+
+#ifdef HAVE_NDB_BINLOG
+
+/**
+ atEndTransConflictHandling
+
+ Called when transactional conflict handling has completed.
+*/
+void
+st_ndb_slave_state::atEndTransConflictHandling()
+{
+ DBUG_ENTER("atEndTransConflictHandling");
+ /* Release any conflict handling state */
+ if (trans_dependency_tracker)
+ {
+ current_trans_in_conflict_count =
+ trans_dependency_tracker->get_conflict_count();
+ trans_dependency_tracker = NULL;
+ free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
+ }
+ DBUG_VOID_RETURN;
+};
+
+/**
+ atBeginTransConflictHandling()
+
+ Called by Slave SQL thread when it determines that Transactional
+ Conflict handling is required
+*/
+void
+st_ndb_slave_state::atBeginTransConflictHandling()
+{
+ DBUG_ENTER("atBeginTransConflictHandling");
+ /*
+ Allocate and initialise Transactional Conflict
+ Resolution Handling Structures
+ */
+ assert(trans_dependency_tracker == NULL);
+ trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
+ DBUG_VOID_RETURN;
+};
+
+/**
+ atPrepareConflictDetection
+
+ Called by Slave SQL thread prior to defining an operation on
+ a table with conflict detection defined.
+*/
+int
+st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* row_data,
+ Uint64 transaction_id,
+ bool& handle_conflict_now)
+{
+ DBUG_ENTER("atPrepareConflictDetection");
+ /*
+ Slave is preparing to apply an operation with conflict detection.
+ If we're performing Transactional Conflict Resolution, take
+ extra steps
+ */
+ switch( trans_conflict_apply_state )
+ {
+ case SAS_NORMAL:
+ DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
+ /* No special handling */
+ break;
+ case SAS_TRACK_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
+ /*
+ Track this operation and its transaction id, to determine
+ inter-transaction dependencies by {table, primary key}
+ */
+ assert( trans_dependency_tracker );
+
+ int res = trans_dependency_tracker
+ ->track_operation(table,
+ key_rec,
+ row_data,
+ transaction_id);
+ if (res != 0)
+ {
+ sql_print_error("%s", trans_dependency_tracker->get_error_text());
+ DBUG_RETURN(res);
+ }
+ /* Proceed as normal */
+ break;
+ }
+ case SAS_APPLY_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
+ /*
+ Check if this operation's transaction id is marked in-conflict.
+ If it is, we tell the caller to perform conflict resolution now instead
+ of attempting to apply the operation.
+ */
+ assert( trans_dependency_tracker );
+
+ if (trans_dependency_tracker->in_conflict(transaction_id))
+ {
+ DBUG_PRINT("info", ("Event for transaction %llu is conflicting. Handling.",
+ transaction_id));
+ current_trans_row_reject_count++;
+ handle_conflict_now = true;
+ DBUG_RETURN(0);
+ }
+
+ /*
+ This transaction is not marked in-conflict, so continue with normal
+ processing.
+ Note that normal processing may subsequently detect a conflict which
+ didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
+ In this case, we will rollback and repeat the TRACK_DEPENDENCIES
+ stage.
+ */
+ DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
+ transaction_id));
+ break;
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+/**
+ atTransConflictDetected
+
+ Called by the Slave SQL thread when a conflict is detected on
+ an executed operation.
+*/
+int
+st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
+{
+ DBUG_ENTER("atTransConflictDetected");
+
+ /*
+ The Slave has detected a conflict on an operation applied
+ to a table with Transactional Conflict Resolution defined.
+ Handle according to current state.
+ */
+ conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
+ current_trans_row_conflict_count++;
+
+ switch (trans_conflict_apply_state)
+ {
+ case SAS_NORMAL:
+ {
+ DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
+ "Requires multi-pass resolution. Will transition to "
+ "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
+ /*
+ Conflict on table with transactional conflict resolution
+ defined.
+ This is the trigger that we will do transactional conflict
+ resolution.
+ Record that we need to do multiple passes to correctly
+ perform resolution.
+ TODO : Early exit from applying epoch?
+ */
+ break;
+ }
+ case SAS_TRACK_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
+ "had conflict",
+ transaction_id));
+ /*
+ Conflict on table with transactional conflict resolution
+ defined.
+ We will mark the operation's transaction_id as in-conflict,
+ so that any other operations on the transaction are also
+ considered in-conflict, and any dependent transactions are also
+ considered in-conflict.
+ */
+ assert(trans_dependency_tracker != NULL);
+ int res = trans_dependency_tracker
+ ->mark_conflict(transaction_id);
+
+ if (res != 0)
+ {
+ sql_print_error("%s", trans_dependency_tracker->get_error_text());
+ DBUG_RETURN(res);
+ }
+ break;
+ }
+ case SAS_APPLY_TRANS_DEPENDENCIES:
+ {
+ /*
+ This must be a new conflict, not noticed on the previous
+ pass.
+ */
+ DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected. "
+ "Must be further conflict. Will return to "
+ "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
+ // TODO : Early exit from applying epoch
+ break;
+ }
+ default:
+ break;
+ }
+
+ DBUG_RETURN(0);
+}
+
+/**
+ atConflictPreCommit
+
+ Called by the Slave SQL thread prior to committing a Slave transaction.
+ This method can request that the Slave transaction is retried.
+
+
+ State transitions :
+
+ START SLAVE /
+ RESET SLAVE /
+ STARTUP
+ |
+ |
+ v
+ ****************
+ * SAS_NORMAL *
+ ****************
+ ^ |
+ No transactional | | Conflict on transactional table
+ conflicts | | (Rollback)
+ (Commit) | |
+ | v
+ **********************************
+ * SAS_TRACK_TRANS_DEPENDENCIES *
+ **********************************
+ ^ I ^
+ More I I Dependencies |
+ conflicts I I determined | No new conflicts
+ found I I (Rollback) | (Commit)
+ (Rollback) I I |
+ I v |
+ **********************************
+ * SAS_APPLY_TRANS_DEPENDENCIES *
+ **********************************
+
+
+ Operation
+ The initial state is SAS_NORMAL.
+
+ On detecting a conflict on a transactional conflict detetecing table,
+ SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
+ rolled back and reapplied.
+
+ In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
+ conflicts are tracked as the epoch transaction is applied.
+
+ Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
+ the epoch transaction is rolled back and reapplied.
+
+ In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
+ marked as in-conflict are not applied.
+
+ If this results in no new conflicts, the epoch transaction is committed,
+ and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
+ the next replicated epch transaction.
+ If it results in new conflicts, the epoch transactions is rolled back, and
+ the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
+ the new set of dependencies.
+
+ If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
+ the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
+ state.
+
+
+ Properties
+ 1) Normally, there is no transaction dependency tracking overhead paid by
+ the slave.
+
+ 2) On first detecting a transactional conflict, the epoch transaction must be
+ applied at least three times, with two rollbacks.
+
+ 3) Transactional conflicts detected in subsequent epochs require the epoch
+ transaction to be applied two times, with one rollback.
+
+ 4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
+ DEPENDENCIES occurs when further transactional conflicts are discovered
+ in SAS_APPLY_TRANS_DEPENDENCIES state. This implies that the conflicts
+ discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
+ so we revisit that state to get a more complete picture.
+
+ 5) The number of iterations of this loop is fixed to a hard coded limit, after
+ which the Slave will stop with an error. This should be an unlikely
+ occurrence, as it requires not just n conflicts, but at least 1 new conflict
+ appearing between the transactions in the epoch transaction and the
+ database between the two states, n times in a row.
+
+ 6) Where conflicts are occasional, as expected, the post-commit transition to
+ SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
+ transaction having its transaction dependencies needlessly tracked.
+
+*/
+int
+st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
+{
+ DBUG_ENTER("atConflictPreCommit");
+
+ /*
+ Prior to committing a Slave transaction, we check whether
+ Transactional conflicts have been detected which require
+ us to retry the slave transaction
+ */
+ retry_slave_trans = false;
+ switch(trans_conflict_apply_state)
+ {
+ case SAS_NORMAL:
+ {
+ DBUG_PRINT("info", ("SAS_NORMAL"));
+ /*
+ Normal case. Only if we defined conflict detection on a table
+ with transactional conflict detection, and saw conflicts (on any table)
+ do we go to another state
+ */
+ if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+ {
+ DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
+ "SAS_TRACK_TRANS_DEPENDENCIES."));
+ assert(conflict_flags & SCS_OPS_DEFINED);
+ /* Transactional conflict resolution required, switch state */
+ atBeginTransConflictHandling();
+ resetPerAttemptCounters();
+ trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+ retry_slave_trans = true;
+ }
+ break;
+ }
+ case SAS_TRACK_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
+
+ if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+ {
+ /*
+ Conflict on table with transactional detection
+ this pass, we have collected the details and
+ dependencies, now transition to
+ SAS_APPLY_TRANS_DEPENDENCIES and
+ reapply the epoch transaction without the
+ conflicting transactions.
+ */
+ assert(conflict_flags & SCS_OPS_DEFINED);
+ DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
+ "SAS_APPLY_TRANS_DEPENDENCIES"));
+
+ trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
+ trans_detect_iter_count++;
+ retry_slave_trans = true;
+ break;
+ }
+ else
+ {
+ /*
+ No transactional conflicts detected this pass, lets
+ return to SAS_NORMAL state after commit for more efficient
+ application of epoch transactions
+ */
+ DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
+ "SAS_NORMAL"));
+ atEndTransConflictHandling();
+ trans_conflict_apply_state = SAS_NORMAL;
+ break;
+ }
+ }
+ case SAS_APPLY_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
+ assert(conflict_flags & SCS_OPS_DEFINED);
+ /*
+ We've applied the Slave epoch transaction subject to the
+ conflict detection. If any further transactional
+ conflicts have been observed, then we must repeat the
+ process.
+ */
+ atEndTransConflictHandling();
+ atBeginTransConflictHandling();
+ trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+
+ if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
+ {
+ DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
+ "TRACK_TRANS_DEPENDENCIES pass"));
+ /*
+ Further conflict observed when applying, need
+ to re-determine dependencies
+ */
+ resetPerAttemptCounters();
+ retry_slave_trans = true;
+ break;
+ }
+
+
+ DBUG_PRINT("info", ("No further conflicts detected, committing and "
+ "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
+ /*
+ With dependencies taken into account, no further
+ conflicts detected, can now proceed to commit
+ */
+ break;
+ }
+ }
+
+ /*
+ Clear conflict flags, to ensure that we detect any new conflicts
+ */
+ conflict_flags = 0;
+
+ if (retry_slave_trans)
+ {
+ DBUG_PRINT("info", ("Requesting transaction restart"));
+ DBUG_RETURN(1);
+ }
+
+ DBUG_PRINT("info", ("Allowing commit to proceed"));
+ DBUG_RETURN(0);
+}
+
+/* HAVE_NDB_BINLOG */
+#endif
+
+/* WITH_NDBCLUSTER_STORAGE_ENGINE */
+#endif
=== added file 'sql/ndb_conflict.h'
--- a/sql/ndb_conflict.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict.h 2012-01-31 10:01:22 +0000
@@ -0,0 +1,316 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_CONFLICT_H
+#define NDB_CONFLICT_H
+
+#include "ha_ndbcluster_glue.h"
+#include "ndb_conflict_trans.h"
+#include <ndbapi/NdbDictionary.hpp>
+#include <ndbapi/NdbTransaction.hpp>
+
+enum enum_conflict_fn_type
+{
+ CFT_NDB_UNDEF = 0
+ ,CFT_NDB_MAX
+ ,CFT_NDB_OLD
+ ,CFT_NDB_MAX_DEL_WIN
+ ,CFT_NDB_EPOCH
+ ,CFT_NDB_EPOCH_TRANS
+ ,CFT_NUMBER_OF_CFTS /* End marker */
+};
+
+#ifdef HAVE_NDB_BINLOG
+static const Uint32 MAX_CONFLICT_ARGS= 8;
+
+enum enum_conflict_fn_arg_type
+{
+ CFAT_END
+ ,CFAT_COLUMN_NAME
+ ,CFAT_EXTRA_GCI_BITS
+};
+
+struct st_conflict_fn_arg
+{
+ enum_conflict_fn_arg_type type;
+ union
+ {
+ char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
+ uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
+ };
+};
+
+struct st_conflict_fn_arg_def
+{
+ enum enum_conflict_fn_arg_type arg_type;
+ bool optional;
+};
+
+/* What type of operation was issued */
+enum enum_conflicting_op_type
+{ /* NdbApi */
+ WRITE_ROW, /* insert (!write) */
+ UPDATE_ROW, /* update */
+ DELETE_ROW, /* delete */
+ REFRESH_ROW /* refresh */
+};
+
+/*
+ Room for 10 instruction words, two labels (@ 2words/label)
+ + 2 extra words for the case of resolve_size == 8
+*/
+#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16
+
+/*
+ prepare_detect_func
+
+ Type of function used to prepare for conflict detection on
+ an NdbApi operation
+*/
+typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
+ enum_conflicting_op_type op_type,
+ const NdbRecord* data_record,
+ const uchar* old_data,
+ const uchar* new_data,
+ const MY_BITMAP* write_set,
+ class NdbInterpretedCode* code);
+
+enum enum_conflict_fn_flags
+{
+ CF_TRANSACTIONAL = 1
+};
+
+struct st_conflict_fn_def
+{
+ const char *name;
+ enum_conflict_fn_type type;
+ const st_conflict_fn_arg_def* arg_defs;
+ prepare_detect_func prep_func;
+ uint8 flags; /* enum_conflict_fn_flags */
+};
+
+/* What sort of conflict was found */
+enum enum_conflict_cause
+{
+ ROW_ALREADY_EXISTS, /* On insert */
+ ROW_DOES_NOT_EXIST, /* On Update, Delete */
+ ROW_IN_CONFLICT, /* On Update, Delete */
+ TRANS_IN_CONFLICT /* Any of above, or implied by transaction */
+};
+
+/* NdbOperation custom data which points out handler and record. */
+struct Ndb_exceptions_data {
+ struct NDB_SHARE* share;
+ const NdbRecord* key_rec;
+ const uchar* row;
+ enum_conflicting_op_type op_type;
+ Uint64 trans_id;
+};
+
+enum enum_conflict_fn_table_flags
+{
+ CFF_NONE = 0,
+ CFF_REFRESH_ROWS = 1
+};
+
+/*
+ Maximum supported key parts (16)
+ (Ndb supports 32, but MySQL has a lower limit)
+*/
+static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
+
+/**
+ ExceptionsTableWriter
+
+ Helper class for inserting entries into an exceptions
+ table
+*/
+class ExceptionsTableWriter
+{
+public:
+ ExceptionsTableWriter()
+ :m_pk_cols(0), m_ex_tab(NULL), m_count(0)
+ {};
+
+ ~ExceptionsTableWriter()
+ {};
+
+ /**
+ hasTable
+
+ Returns true if there is an Exceptions table
+ */
+ bool hasTable() const
+ {
+ return m_ex_tab != NULL;
+ };
+
+ /**
+ init
+
+ Initialise ExceptionsTableWriter with main and exceptions
+ tables.
+
+ May set a warning message on success or error.
+ */
+ int init(const NdbDictionary::Table* mainTable,
+ const NdbDictionary::Table* exceptionsTable,
+ char* msg_buf,
+ uint msg_buf_len,
+ const char** msg);
+
+ /**
+ free
+
+ Release reference to exceptions table
+ */
+ void free(Ndb* ndb);
+
+ /**
+ writeRow
+
+ Write a row to the Exceptions Table for the given
+ key
+ */
+ int writeRow(NdbTransaction* trans,
+ const NdbRecord* keyRecord,
+ uint32 server_id,
+ uint32 master_server_id,
+ uint64 master_epoch,
+ const uchar* rowPtr,
+ NdbError& err);
+
+private:
+ /* info about original table */
+ uint8 m_pk_cols;
+ uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
+
+ const NdbDictionary::Table *m_ex_tab;
+ uint32 m_count;
+};
+
+typedef struct st_ndbcluster_conflict_fn_share {
+ const st_conflict_fn_def* m_conflict_fn;
+
+ /* info about original table */
+ uint16 m_resolve_column;
+ uint8 m_resolve_size;
+ uint8 m_flags;
+
+ ExceptionsTableWriter m_ex_tab_writer;
+} NDB_CONFLICT_FN_SHARE;
+
+
+/* HAVE_NDB_BINLOG */
+#endif
+
+enum enum_slave_trans_conflict_apply_state
+{
+ /* Normal with optional row-level conflict detection */
+ SAS_NORMAL,
+
+ /*
+ SAS_TRACK_TRANS_DEPENDENCIES
+ Track inter-transaction dependencies
+ */
+ SAS_TRACK_TRANS_DEPENDENCIES,
+
+ /*
+ SAS_APPLY_TRANS_DEPENDENCIES
+ Apply only non conflicting transactions
+ */
+ SAS_APPLY_TRANS_DEPENDENCIES
+};
+
+enum enum_slave_conflict_flags
+{
+ /* Conflict detection Ops defined */
+ SCS_OPS_DEFINED = 1,
+ /* Conflict detected on table with transactional resolution */
+ SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2
+};
+
+/*
+ State associated with the Slave thread
+ (From the Ndb handler's point of view)
+*/
+struct st_ndb_slave_state
+{
+ /* Counter values for current slave transaction */
+ Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
+ Uint64 current_master_server_epoch;
+ Uint64 current_max_rep_epoch;
+ uint8 conflict_flags; /* enum_slave_conflict_flags */
+ /* Transactional conflict detection */
+ Uint32 retry_trans_count;
+ Uint32 current_trans_row_conflict_count;
+ Uint32 current_trans_row_reject_count;
+ Uint32 current_trans_in_conflict_count;
+
+ /* Cumulative counter values */
+ Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
+ Uint64 max_rep_epoch;
+ Uint32 sql_run_id;
+ /* Transactional conflict detection */
+ Uint64 trans_row_conflict_count;
+ Uint64 trans_row_reject_count;
+ Uint64 trans_detect_iter_count;
+ Uint64 trans_in_conflict_count;
+ Uint64 trans_conflict_commit_count;
+
+ static const Uint32 MAX_RETRY_TRANS_COUNT = 100;
+
+ /*
+ Slave Apply State
+
+ State of Binlog application from Ndb point of view.
+ */
+ enum_slave_trans_conflict_apply_state trans_conflict_apply_state;
+
+ MEM_ROOT conflict_mem_root;
+ class DependencyTracker* trans_dependency_tracker;
+
+ /* Methods */
+ void atStartSlave();
+ int atPrepareConflictDetection(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* row_data,
+ Uint64 transaction_id,
+ bool& handle_conflict_now);
+ int atTransConflictDetected(Uint64 transaction_id);
+ int atConflictPreCommit(bool& retry_slave_trans);
+
+ void atBeginTransConflictHandling();
+ void atEndTransConflictHandling();
+
+ void atTransactionCommit();
+ void atTransactionAbort();
+ void atResetSlave();
+
+ void atApplyStatusWrite(Uint32 master_server_id,
+ Uint32 row_server_id,
+ Uint64 row_epoch,
+ bool is_row_server_id_local);
+
+ void resetPerAttemptCounters();
+
+ st_ndb_slave_state();
+};
+
+
+/* NDB_CONFLICT_H */
+#endif
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt 2012-01-25 17:15:41 +0000
+++ b/storage/ndb/CMakeLists.txt 2012-01-31 10:24:30 +0000
@@ -180,7 +180,8 @@ SET(NDBCLUSTER_SOURCES
../../sql/ndb_mi.cc
../../sql/ndb_conflict_trans.cc
../../sql/ndb_component.cc
- ../../sql/ndb_repl_tab.cc)
+ ../../sql/ndb_repl_tab.cc
+ ../../sql/ndb_conflict.cc)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
IF(EXISTS ${CMAKE_SOURCE_DIR}/storage/mysql_storage_engine.cmake)
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (frazer.clement:4449 to 4450) | Frazer Clement | 31 Jan |