From: Frazer Clement Date: January 31 2012 11:20am Subject: bzr push into mysql-5.5-cluster-7.2 branch (frazer.clement:3799 to 3800) List-Archive: http://lists.mysql.com/commits/142667 Message-Id: <201201311120.q0VBK7Zf022346@acsmt356.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3800 Frazer Clement 2012-01-31 [merge] Merge 7.1->7.2 added: sql/ndb_conflict.cc sql/ndb_conflict.h modified: mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test sql/ha_ndbcluster.cc sql/ha_ndbcluster.h sql/ha_ndbcluster_binlog.cc sql/ndb_share.cc sql/ndb_share.h sql/ndb_thd.cc storage/ndb/CMakeLists.txt 3799 jonas oreland 2012-01-31 [merge] ndb - merge 71 to 72 modified: storage/ndb/src/kernel/vm/mt.cpp === modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result' --- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result 2012-01-23 15:47:46 +0000 +++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_basic.result 2012-01-31 10:01:22 +0000 @@ -274,6 +274,142 @@ relevant [note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x. [note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x. drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX; +delete from mysql.ndb_replication; +Test exceptions table schema flexibility +insert into mysql.ndb_replication values ("test", "t1", 0, 7, "NDB$MAX(X)"); +Test 'normal' mandatory column names + all table pks +create table test.t1$EX( +server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +a int not null, +b int not null, +c int not null, +primary key (server_id, master_server_id, master_epoch, count)) engine=ndb; +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, +primary key(a,b,c)) engine=ndb; +Generate a conflict on the slave +insert into test.t1 values (1,1,1,1,1,1); +update test.t1 set X=0 where a=1 and b=1 and c=1; +Check that conflict has been recorded. +select * from test.t1$EX; +server_id master_server_id master_epoch count a b c +2 1 1 1 1 1 +drop table test.t1; +drop table test.t1$EX; +Test 'normal' mandatory column names + all table pks + +extra columns with same and different names to main table columns +Also a defaulted extra column. +create table test.t1$EX( +server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +a int not null, +b int not null, +c int not null, +d int, # Same name as main table, but user defined +lilljeholmen varchar(50) default 'Slussen', +# Separate, user defined +primary key (server_id, master_server_id, master_epoch, count)) engine=ndb; +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, +primary key(a,b,c)) engine=ndb; +Generate a conflict on the slave +insert into test.t1 values (1,1,1,1,1,1); +update test.t1 set X=0 where a=1 and b=1 and c=1; +Check that conflict has been recorded. +select * from test.t1$EX; +server_id master_server_id master_epoch count a b c d lilljeholmen +2 1 1 1 1 1 NULL Slussen +drop table test.t1; +drop table test.t1$EX; +Test unusual mandatory column names + all table pks + +extra columns with same and different names to main table columns +Also a defaulted extra column. +create table test.t1$EX( +monteverdi int unsigned, +asparagi int unsigned, +plenipotentiary bigint unsigned, +mountebank int unsigned, +a int not null, +b int not null, +c int not null, +d int, # Same name as main table, but user defined +lilljeholmen varchar(50) default 'Slussen', +# Separate, user defined +primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb; +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, +primary key(a,b,c)) engine=ndb; +Generate a conflict on the slave +insert into test.t1 values (1,1,1,1,1,1); +update test.t1 set X=0 where a=1 and b=1 and c=1; +Check that conflict has been recorded. +select * from test.t1$EX; +monteverdi asparagi plenipotentiary mountebank a b c d lilljeholmen +2 1 1 1 1 1 NULL Slussen +drop table test.t1; +drop table test.t1$EX; +Test unusual mandatory column names + all table pks which are same +as 'normal' exceptions table column names plus extra columns with +same and different names to main table columns +Also a defaulted extra column. +create table test.t1$EX( +monteverdi int unsigned, +asparagi int unsigned, +plenipotentiary bigint unsigned, +mountebank int unsigned, +server_id int unsigned not null, +master_server_id int unsigned not null, +master_epoch bigint unsigned not null, +count int unsigned not null, +d int, # Same name as main table, but user defined +lilljeholmen varchar(50) default 'Slussen', +# Separate, user defined +primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb; +create table test.t1 (server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +d int, e int, X int unsigned, +primary key(server_id, master_server_id, +master_epoch, count)) engine=ndb; +Generate a conflict on the slave +insert into test.t1 values (1,1,1,1,1,1,1); +update test.t1 set X=0 where server_id=1 and master_server_id=1 and master_epoch=1 and count=1; +Check that conflict has been recorded. +select * from test.t1$EX; +monteverdi asparagi plenipotentiary mountebank server_id master_server_id master_epoch count d lilljeholmen +2 1 1 1 1 1 1 NULL Slussen +drop table test.t1; +drop table test.t1$EX; +call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*"); +call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*"); +call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*"); +And some bad exceptions table schemata +Keys in wrong positions +create table test.t1$EX( +a int not null, +b int not null, +c int not null, +d int, # Same name as main table, but user defined +lilljeholmen varchar(50) default 'Slussen', +# Separate, user defined +server_id int unsigned, +master_server_id int unsigned, +master_epoch bigint unsigned, +count int unsigned, +primary key (server_id, master_server_id, master_epoch, count)) engine=ndb; +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, +primary key(a,b,c)) engine=ndb; +show warnings; +Level Code Message +MySQLD error output for server 1.1 matching pattern %NDB Slave% +relevant +[note] ndb slave: table test.t1 using conflict_fn ndb$max on attribute x. +[warning] ndb slave: exceptions table t1$ex has wrong definition (initial 4 columns) +drop table test.t1; +drop table test.t1$EX; "Cleanup" drop table mysql.ndb_replication; include/rpl_end.inc === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test 2012-01-23 15:47:46 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_basic.test 2012-01-31 10:01:22 +0000 @@ -373,6 +373,208 @@ show variables like 'server_id'; --connection master drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX; +delete from mysql.ndb_replication; + +--echo Test exceptions table schema flexibility +# +# An exceptions table should be able to have the mandatory columns with +# different names, as long as the types match. +# Also, not all main table primary key parts need be present +# Finally, arbitrary extra columns should be allowed, as long as +# they can be defaulted. +# + +insert into mysql.ndb_replication values ("test", "t1", 0, 7, "NDB$MAX(X)"); + +--echo Test 'normal' mandatory column names + all table pks + +create table test.t1$EX( + server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + a int not null, + b int not null, + c int not null, + primary key (server_id, master_server_id, master_epoch, count)) engine=ndb; + +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, + primary key(a,b,c)) engine=ndb; + +--echo Generate a conflict on the slave + +insert into test.t1 values (1,1,1,1,1,1); +--sync_slave_with_master slave +--connection master +update test.t1 set X=0 where a=1 and b=1 and c=1; +--sync_slave_with_master slave +--connection slave + +--echo Check that conflict has been recorded. +--replace_column 3 +select * from test.t1$EX; + +--connection master +drop table test.t1; +drop table test.t1$EX; + +--echo Test 'normal' mandatory column names + all table pks + +--echo extra columns with same and different names to main table columns +--echo Also a defaulted extra column. + +create table test.t1$EX( + server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + a int not null, + b int not null, + c int not null, + d int, # Same name as main table, but user defined + lilljeholmen varchar(50) default 'Slussen', + # Separate, user defined + primary key (server_id, master_server_id, master_epoch, count)) engine=ndb; + +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, + primary key(a,b,c)) engine=ndb; + +--echo Generate a conflict on the slave + +insert into test.t1 values (1,1,1,1,1,1); +--sync_slave_with_master slave +--connection master +update test.t1 set X=0 where a=1 and b=1 and c=1; +--sync_slave_with_master slave +--connection slave + +--echo Check that conflict has been recorded. +--replace_column 3 +select * from test.t1$EX; + +--connection master +drop table test.t1; +drop table test.t1$EX; + +--echo Test unusual mandatory column names + all table pks + +--echo extra columns with same and different names to main table columns +--echo Also a defaulted extra column. + +create table test.t1$EX( + monteverdi int unsigned, + asparagi int unsigned, + plenipotentiary bigint unsigned, + mountebank int unsigned, + a int not null, + b int not null, + c int not null, + d int, # Same name as main table, but user defined + lilljeholmen varchar(50) default 'Slussen', + # Separate, user defined + primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb; + +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, + primary key(a,b,c)) engine=ndb; + +--echo Generate a conflict on the slave + +insert into test.t1 values (1,1,1,1,1,1); +--sync_slave_with_master slave +--connection master +update test.t1 set X=0 where a=1 and b=1 and c=1; +--sync_slave_with_master slave +--connection slave + +--echo Check that conflict has been recorded. +--replace_column 3 +select * from test.t1$EX; + +--connection master +drop table test.t1; +drop table test.t1$EX; + +--echo Test unusual mandatory column names + all table pks which are same +--echo as 'normal' exceptions table column names plus extra columns with +--echo same and different names to main table columns +--echo Also a defaulted extra column. + +create table test.t1$EX( + monteverdi int unsigned, + asparagi int unsigned, + plenipotentiary bigint unsigned, + mountebank int unsigned, + server_id int unsigned not null, + master_server_id int unsigned not null, + master_epoch bigint unsigned not null, + count int unsigned not null, + d int, # Same name as main table, but user defined + lilljeholmen varchar(50) default 'Slussen', + # Separate, user defined + primary key (monteverdi, asparagi, plenipotentiary, mountebank)) engine=ndb; + +create table test.t1 (server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + d int, e int, X int unsigned, + primary key(server_id, master_server_id, + master_epoch, count)) engine=ndb; + +--echo Generate a conflict on the slave + +insert into test.t1 values (1,1,1,1,1,1,1); +--sync_slave_with_master slave +--connection master +update test.t1 set X=0 where server_id=1 and master_server_id=1 and master_epoch=1 and count=1; +--sync_slave_with_master slave +--connection slave + +--echo Check that conflict has been recorded. +--replace_column 3 +select * from test.t1$EX; + +--connection master +drop table test.t1; +drop table test.t1$EX; + +--connection server1 +call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*"); +--connection server2 +call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*"); +--connection slave +call mtr.add_suppression("NDB Slave: exceptions table .* has wrong definition .*"); +--connection master + +--echo And some bad exceptions table schemata +--echo Keys in wrong positions +create table test.t1$EX( + a int not null, + b int not null, + c int not null, + d int, # Same name as main table, but user defined + lilljeholmen varchar(50) default 'Slussen', + # Separate, user defined + server_id int unsigned, + master_server_id int unsigned, + master_epoch bigint unsigned, + count int unsigned, + primary key (server_id, master_server_id, master_epoch, count)) engine=ndb; + +create table test.t1 (a int, b int, c int, d int, e int, X int unsigned, + primary key(a,b,c)) engine=ndb; +show warnings; + +--let $server_num=1.1 +--let $pattern=%NDB Slave% +--let $limit=2 + +--source suite/ndb_rpl/t/show_mysqld_warnings.inc + +drop table test.t1; +drop table test.t1$EX; + + + + ############### --echo "Cleanup" === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2012-01-25 17:50:29 +0000 +++ b/sql/ha_ndbcluster.cc 2012-01-31 11:11:20 +0000 @@ -50,7 +50,7 @@ #include #include #include "ndb_mi.h" -#include "ndb_conflict_trans.h" +#include "ndb_conflict.h" #include "ndb_anyvalue.h" #include "ndb_binlog_extra_row_info.h" #include "ndb_event_data.h" @@ -386,11 +386,6 @@ ndbcluster_alter_table_flags(uint flags) #define NDB_AUTO_INCREMENT_RETRIES 100 #define BATCH_FLUSH_SIZE (32768) -/* - Room for 10 instruction words, two labels (@ 2words/label) - + 2 extra words for the case of resolve_size == 8 -*/ -#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16 static int ndb_to_mysql_error(const NdbError *ndberr); @@ -870,8 +865,7 @@ static int ndb_to_mysql_error(const NdbE #ifdef HAVE_NDB_BINLOG int -handle_conflict_op_error(Thd_ndb* thd_ndb, - NdbTransaction* trans, +handle_conflict_op_error(NdbTransaction* trans, const NdbError& err, const NdbOperation* op); @@ -937,8 +931,7 @@ check_completed_operations_pre_commit(Th if (err.classification != NdbError::NoError) { - int res = handle_conflict_op_error(thd_ndb, - trans, + int res = handle_conflict_op_error(trans, err, first); if (res != 0) @@ -2128,11 +2121,6 @@ int ha_ndbcluster::get_metadata(THD *thd ndbtab_g.release(); -#ifdef HAVE_NDB_BINLOG - ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table, - ::server_id, FALSE); -#endif - DBUG_RETURN(0); err: @@ -4415,610 +4403,10 @@ thd_allow_batch(const THD* thd) #endif } -/** - st_ndb_slave_state constructor - - Initialise Ndb Slave state object -*/ -st_ndb_slave_state::st_ndb_slave_state() - : current_master_server_epoch(0), - current_max_rep_epoch(0), - conflict_flags(0), - retry_trans_count(0), - current_trans_row_conflict_count(0), - current_trans_row_reject_count(0), - current_trans_in_conflict_count(0), - max_rep_epoch(0), - sql_run_id(~Uint32(0)), - trans_row_conflict_count(0), - trans_row_reject_count(0), - trans_detect_iter_count(0), - trans_in_conflict_count(0), - trans_conflict_commit_count(0), - trans_conflict_apply_state(SAS_NORMAL), - trans_dependency_tracker(NULL) -{ - memset(current_violation_count, 0, sizeof(current_violation_count)); - memset(total_violation_count, 0, sizeof(total_violation_count)); - - /* Init conflict handling state memroot */ - const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768; - init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0); -}; - -/** - resetPerAttemptCounters - - Reset the per-epoch-transaction-application-attempt counters -*/ -void -st_ndb_slave_state::resetPerAttemptCounters() -{ - memset(current_violation_count, 0, sizeof(current_violation_count)); - current_trans_row_conflict_count = 0; - current_trans_row_reject_count = 0; - current_trans_in_conflict_count = 0; - - conflict_flags = 0; - current_max_rep_epoch = 0; -} - -/** - atTransactionAbort() - - Called by Slave SQL thread during transaction abort. -*/ -void -st_ndb_slave_state::atTransactionAbort() -{ - /* Reset current-transaction counters + state */ - resetPerAttemptCounters(); -} - -/** - atTransactionCommit() - - Called by Slave SQL thread after transaction commit -*/ -void -st_ndb_slave_state::atTransactionCommit() -{ - assert( ((trans_dependency_tracker == NULL) && - (trans_conflict_apply_state == SAS_NORMAL)) || - ((trans_dependency_tracker != NULL) && - (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) ); - assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES ); - - /* Merge committed transaction counters into total state - * Then reset current transaction counters - */ - for (int i=0; i < CFT_NUMBER_OF_CFTS; i++) - { - total_violation_count[i]+= current_violation_count[i]; - } - trans_row_conflict_count+= current_trans_row_conflict_count; - trans_row_reject_count+= current_trans_row_reject_count; - trans_in_conflict_count+= current_trans_in_conflict_count; - - if (current_trans_in_conflict_count) - trans_conflict_commit_count++; - - if (current_max_rep_epoch > max_rep_epoch) - { - DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu", - max_rep_epoch, - current_max_rep_epoch)); - max_rep_epoch = current_max_rep_epoch; - } - - resetPerAttemptCounters(); - - /* Clear per-epoch-transaction retry_trans_count */ - retry_trans_count = 0; -} - -/** - atApplyStatusWrite - - Called by Slave SQL thread when applying an event to the - ndb_apply_status table -*/ -void -st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id, - Uint32 row_server_id, - Uint64 row_epoch, - bool is_row_server_id_local) -{ - if (row_server_id == master_server_id) - { - /* - WRITE_ROW to ndb_apply_status injected by MySQLD - immediately upstream of us. - Record epoch - */ - current_master_server_epoch = row_epoch; - assert(! is_row_server_id_local); - } - else if (is_row_server_id_local) - { - DBUG_PRINT("info", ("Recording application of local server %u epoch %llu " - " which is %s.", - row_server_id, row_epoch, - (row_epoch > g_ndb_slave_state.current_max_rep_epoch)? - " new highest." : " older than previously applied")); - if (row_epoch > current_max_rep_epoch) - { - /* - Store new highest epoch in thdvar. If we commit successfully - then this can become the new global max - */ - current_max_rep_epoch = row_epoch; - } - } -} - -/** - atResetSlave() - - Called when RESET SLAVE command issued - in context of command client. -*/ -void -st_ndb_slave_state::atResetSlave() -{ - /* Reset the Maximum replicated epoch vars - * on slave reset - * No need to touch the sql_run_id as that - * will increment if the slave is started - * again. - */ - resetPerAttemptCounters(); - - retry_trans_count = 0; - max_rep_epoch = 0; -} - -/** - atStartSlave() - - Called by Slave SQL thread when first applying a row to Ndb after - a START SLAVE command. -*/ -void -st_ndb_slave_state::atStartSlave() -{ -#ifdef HAVE_NDB_BINLOG - if (trans_conflict_apply_state != SAS_NORMAL) - { - /* - Remove conflict handling state on a SQL thread - restart - */ - atEndTransConflictHandling(); - trans_conflict_apply_state = SAS_NORMAL; - } -#endif -}; #ifdef HAVE_NDB_BINLOG /** - atBeginTransConflictHandling() - - Called by Slave SQL thread when it determines that Transactional - Conflict handling is required -*/ -void -st_ndb_slave_state::atBeginTransConflictHandling() -{ - DBUG_ENTER("atBeginTransConflictHandling"); - /* - Allocate and initialise Transactional Conflict - Resolution Handling Structures - */ - assert(trans_dependency_tracker == NULL); - trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root); - DBUG_VOID_RETURN; -}; - -/** - atPrepareConflictDetection - - Called by Slave SQL thread prior to defining an operation on - a table with conflict detection defined. -*/ -int -st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table, - const NdbRecord* key_rec, - const uchar* row_data, - Uint64 transaction_id, - bool& handle_conflict_now) -{ - DBUG_ENTER("atPrepareConflictDetection"); - /* - Slave is preparing to apply an operation with conflict detection. - If we're performing Transactional Conflict Resolution, take - extra steps - */ - switch( trans_conflict_apply_state ) - { - case SAS_NORMAL: - DBUG_PRINT("info", ("SAS_NORMAL : No special handling")); - /* No special handling */ - break; - case SAS_TRACK_TRANS_DEPENDENCIES: - { - DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation")); - /* - Track this operation and its transaction id, to determine - inter-transaction dependencies by {table, primary key} - */ - assert( trans_dependency_tracker ); - - int res = trans_dependency_tracker - ->track_operation(table, - key_rec, - row_data, - transaction_id); - if (res != 0) - { - sql_print_error("%s", trans_dependency_tracker->get_error_text()); - DBUG_RETURN(res); - } - /* Proceed as normal */ - break; - } - case SAS_APPLY_TRANS_DEPENDENCIES: - { - DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply")); - /* - Check if this operation's transaction id is marked in-conflict. - If it is, we tell the caller to perform conflict resolution now instead - of attempting to apply the operation. - */ - assert( trans_dependency_tracker ); - - if (trans_dependency_tracker->in_conflict(transaction_id)) - { - DBUG_PRINT("info", ("Event for transaction %llu is conflicting. Handling.", - transaction_id)); - current_trans_row_reject_count++; - handle_conflict_now = true; - DBUG_RETURN(0); - } - - /* - This transaction is not marked in-conflict, so continue with normal - processing. - Note that normal processing may subsequently detect a conflict which - didn't exist at the time of the previous TRACK_DEPENDENCIES pass. - In this case, we will rollback and repeat the TRACK_DEPENDENCIES - stage. - */ - DBUG_PRINT("info", ("Event for transaction %llu is OK, applying", - transaction_id)); - break; - } - } - DBUG_RETURN(0); -} - -/** - atTransConflictDetected - - Called by the Slave SQL thread when a conflict is detected on - an executed operation. -*/ -int -st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id) -{ - DBUG_ENTER("atTransConflictDetected"); - - /* - The Slave has detected a conflict on an operation applied - to a table with Transactional Conflict Resolution defined. - Handle according to current state. - */ - conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS; - current_trans_row_conflict_count++; - - switch (trans_conflict_apply_state) - { - case SAS_NORMAL: - { - DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection." - "Requires multi-pass resolution. Will transition to " - "SAS_TRACK_TRANS_DEPENDENCIES at Commit.")); - /* - Conflict on table with transactional conflict resolution - defined. - This is the trigger that we will do transactional conflict - resolution. - Record that we need to do multiple passes to correctly - perform resolution. - TODO : Early exit from applying epoch? - */ - break; - } - case SAS_TRACK_TRANS_DEPENDENCIES: - { - DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu " - "had conflict", - transaction_id)); - /* - Conflict on table with transactional conflict resolution - defined. - We will mark the operation's transaction_id as in-conflict, - so that any other operations on the transaction are also - considered in-conflict, and any dependent transactions are also - considered in-conflict. - */ - assert(trans_dependency_tracker != NULL); - int res = trans_dependency_tracker - ->mark_conflict(transaction_id); - - if (res != 0) - { - sql_print_error("%s", trans_dependency_tracker->get_error_text()); - DBUG_RETURN(res); - } - break; - } - case SAS_APPLY_TRANS_DEPENDENCIES: - { - /* - This must be a new conflict, not noticed on the previous - pass. - */ - DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected. " - "Must be further conflict. Will return to " - "SAS_TRACK_TRANS_DEPENDENCIES state at commit.")); - // TODO : Early exit from applying epoch - break; - } - default: - break; - } - - DBUG_RETURN(0); -} - -/** - atConflictPreCommit - - Called by the Slave SQL thread prior to committing a Slave transaction. - This method can request that the Slave transaction is retried. - - - State transitions : - - START SLAVE / - RESET SLAVE / - STARTUP - | - | - v - **************** - * SAS_NORMAL * - **************** - ^ | - No transactional | | Conflict on transactional table - conflicts | | (Rollback) - (Commit) | | - | v - ********************************** - * SAS_TRACK_TRANS_DEPENDENCIES * - ********************************** - ^ I ^ - More I I Dependencies | - conflicts I I determined | No new conflicts - found I I (Rollback) | (Commit) - (Rollback) I I | - I v | - ********************************** - * SAS_APPLY_TRANS_DEPENDENCIES * - ********************************** - - - Operation - The initial state is SAS_NORMAL. - - On detecting a conflict on a transactional conflict detetecing table, - SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is - rolled back and reapplied. - - In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and - conflicts are tracked as the epoch transaction is applied. - - Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and - the epoch transaction is rolled back and reapplied. - - In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions - marked as in-conflict are not applied. - - If this results in no new conflicts, the epoch transaction is committed, - and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing - the next replicated epch transaction. - If it results in new conflicts, the epoch transactions is rolled back, and - the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine - the new set of dependencies. - - If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then - the epoch transaction is committed, and the Slave transitions to SAS_NORMAL - state. - - - Properties - 1) Normally, there is no transaction dependency tracking overhead paid by - the slave. - - 2) On first detecting a transactional conflict, the epoch transaction must be - applied at least three times, with two rollbacks. - - 3) Transactional conflicts detected in subsequent epochs require the epoch - transaction to be applied two times, with one rollback. - - 4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_ - DEPENDENCIES occurs when further transactional conflicts are discovered - in SAS_APPLY_TRANS_DEPENDENCIES state. This implies that the conflicts - discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete, - so we revisit that state to get a more complete picture. - - 5) The number of iterations of this loop is fixed to a hard coded limit, after - which the Slave will stop with an error. This should be an unlikely - occurrence, as it requires not just n conflicts, but at least 1 new conflict - appearing between the transactions in the epoch transaction and the - database between the two states, n times in a row. - - 6) Where conflicts are occasional, as expected, the post-commit transition to - SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch - transaction having its transaction dependencies needlessly tracked. - -*/ -int -st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans) -{ - DBUG_ENTER("atConflictPreCommit"); - - /* - Prior to committing a Slave transaction, we check whether - Transactional conflicts have been detected which require - us to retry the slave transaction - */ - retry_slave_trans = false; - switch(trans_conflict_apply_state) - { - case SAS_NORMAL: - { - DBUG_PRINT("info", ("SAS_NORMAL")); - /* - Normal case. Only if we defined conflict detection on a table - with transactional conflict detection, and saw conflicts (on any table) - do we go to another state - */ - if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS) - { - DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to " - "SAS_TRACK_TRANS_DEPENDENCIES.")); - assert(conflict_flags & SCS_OPS_DEFINED); - /* Transactional conflict resolution required, switch state */ - atBeginTransConflictHandling(); - resetPerAttemptCounters(); - trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES; - retry_slave_trans = true; - } - break; - } - case SAS_TRACK_TRANS_DEPENDENCIES: - { - DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES")); - - if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS) - { - /* - Conflict on table with transactional detection - this pass, we have collected the details and - dependencies, now transition to - SAS_APPLY_TRANS_DEPENDENCIES and - reapply the epoch transaction without the - conflicting transactions. - */ - assert(conflict_flags & SCS_OPS_DEFINED); - DBUG_PRINT("info", ("Transactional conflicts, transitioning to " - "SAS_APPLY_TRANS_DEPENDENCIES")); - - trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES; - trans_detect_iter_count++; - retry_slave_trans = true; - break; - } - else - { - /* - No transactional conflicts detected this pass, lets - return to SAS_NORMAL state after commit for more efficient - application of epoch transactions - */ - DBUG_PRINT("info", ("No transactional conflicts, transitioning to " - "SAS_NORMAL")); - atEndTransConflictHandling(); - trans_conflict_apply_state = SAS_NORMAL; - break; - } - } - case SAS_APPLY_TRANS_DEPENDENCIES: - { - DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES")); - assert(conflict_flags & SCS_OPS_DEFINED); - /* - We've applied the Slave epoch transaction subject to the - conflict detection. If any further transactional - conflicts have been observed, then we must repeat the - process. - */ - atEndTransConflictHandling(); - atBeginTransConflictHandling(); - trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES; - - if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)) - { - DBUG_PRINT("info", ("Further conflict(s) detected, repeating the " - "TRACK_TRANS_DEPENDENCIES pass")); - /* - Further conflict observed when applying, need - to re-determine dependencies - */ - resetPerAttemptCounters(); - retry_slave_trans = true; - break; - } - - - DBUG_PRINT("info", ("No further conflicts detected, committing and " - "returning to SAS_TRACK_TRANS_DEPENDENCIES state")); - /* - With dependencies taken into account, no further - conflicts detected, can now proceed to commit - */ - break; - } - } - - /* - Clear conflict flags, to ensure that we detect any new conflicts - */ - conflict_flags = 0; - - if (retry_slave_trans) - { - DBUG_PRINT("info", ("Requesting transaction restart")); - DBUG_RETURN(1); - } - - DBUG_PRINT("info", ("Allowing commit to proceed")); - DBUG_RETURN(0); -} - -/** - atEndTransConflictHandling - - Called when transactional conflict handling has completed. -*/ -void -st_ndb_slave_state::atEndTransConflictHandling() -{ - DBUG_ENTER("atEndTransConflictHandling"); - /* Release any conflict handling state */ - if (trans_dependency_tracker) - { - current_trans_in_conflict_count = - trans_dependency_tracker->get_conflict_count(); - trans_dependency_tracker = NULL; - free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE); - } - DBUG_VOID_RETURN; -}; - -/** prepare_conflict_detection This method is called during operation definition by the slave, @@ -5199,8 +4587,7 @@ ha_ndbcluster::prepare_conflict_detectio */ int -handle_conflict_op_error(Thd_ndb* thd_ndb, - NdbTransaction* trans, +handle_conflict_op_error(NdbTransaction* trans, const NdbError& err, const NdbOperation* op) { @@ -5951,92 +5338,40 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR } if (cfn_share && - cfn_share->m_ex_tab != NULL) + cfn_share->m_ex_tab_writer.hasTable()) { NdbError err; - assert(err.code == 0); - do + if (cfn_share->m_ex_tab_writer.writeRow(conflict_trans, + key_rec, + ::server_id, + ndb_mi_get_master_server_id(), + g_ndb_slave_state.current_master_server_epoch, + pk_row, + err) != 0) { - /* Have exceptions table, add row to it */ - const NDBTAB *ex_tab= cfn_share->m_ex_tab; - - /* get insert op */ - NdbOperation *ex_op= conflict_trans->getNdbOperation(ex_tab); - if (ex_op == NULL) + if (err.code != 0) { - err= conflict_trans->getNdbError(); - break; - } - if (ex_op->insertTuple() == -1) - { - err= ex_op->getNdbError(); - break; - } - { - uint32 server_id= (uint32)::server_id; - uint32 master_server_id= (uint32) ndb_mi_get_master_server_id(); - uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch; - uint32 count= (uint32)++(cfn_share->m_count); - if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) || - ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) || - ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) || - ex_op->setValue((Uint32)3, (const char *)&(count))) + if (err.status == NdbError::TemporaryError) { - err= ex_op->getNdbError(); - break; + /* Slave will roll back and retry entire transaction. */ + ERR_RETURN(err); } - } - /* copy primary keys */ - { - const int fixed_cols= 4; - int nkey= cfn_share->m_pk_cols; - int k; - for (k= 0; k < nkey; k++) - { - DBUG_ASSERT(pk_row != NULL); - const uchar* data= - (const uchar*) NdbDictionary::getValuePtr(key_rec, - (const char*) pk_row, - cfn_share->m_key_attrids[k]); - if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1) - { - err= ex_op->getNdbError(); - break; - } + else + { + char msg[FN_REFLEN]; + my_snprintf(msg, sizeof(msg), "%s conflict handling " + "on table %s hit Ndb error %d '%s'", + handling_type, + table_name, + err.code, + err.message); + push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, + ER_EXCEPTIONS_WRITE_ERROR, + ER(ER_EXCEPTIONS_WRITE_ERROR), msg); + /* Slave will stop replication. */ + DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR); } } - } while (0); - - if (err.code != 0) - { - char msg[FN_REFLEN]; - my_snprintf(msg, sizeof(msg), "%s conflict handling " - "on table %s hit Ndb error %d '%s'", - handling_type, - table_name, - err.code, - err.message); - - if (err.classification == NdbError::SchemaError) - { - /* Something up with Exceptions table schema, forget it */ - NdbDictionary::Dictionary* dict= conflict_trans->getNdb()->getDictionary(); - dict->removeTableGlobal(*(cfn_share->m_ex_tab), false); - cfn_share->m_ex_tab= NULL; - } - else if (err.status == NdbError::TemporaryError) - { - /* Slave will roll back and retry entire transaction. */ - ERR_RETURN(err); - } - else - { - push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN, - ER_EXCEPTIONS_WRITE_ERROR, - ER(ER_EXCEPTIONS_WRITE_ERROR), msg); - /* Slave will stop replication. */ - DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR); - } } } /* if (cfn_share->m_ex_tab != NULL) */ @@ -8807,6 +8142,7 @@ int ndbcluster_commit(handlerton *hton, * an execute(NoCommit) before committing, as conflict op handling * is done by execute(NoCommit) */ + /* TODO : Add as function */ if (g_ndb_slave_state.conflict_flags & SCS_OPS_DEFINED) { if (thd_ndb->m_unsent_bytes) @@ -10036,6 +9372,7 @@ int ha_ndbcluster::create(const char *na /* Reset database name */ ndb->setDatabaseName(m_dbname); + /* TODO : Add as per conflict function 'virtual' */ /* Use ndb_replication information as required */ if (conflict_fn != NULL) { @@ -12302,41 +11639,41 @@ ndbcluster_find_files(handlerton *hton, if (thd == injector_thd) { - /* - Don't delete anything when called from - the binlog thread. This is a kludge to avoid - that something is deleted when "Ndb schema dist" - uses find_files() to check for "local tables in db" - */ - } - else - { - /* - Delete old files - (.frm files with corresponding .ndb + does not exists in NDB) - */ - List_iterator_fast it3(delete_list); - while ((file_name_str= it3++)) - { - DBUG_PRINT("info", ("Deleting local files for table '%s.%s'", - db, file_name_str)); - - // Delete the table and its related files from disk - Ndb_local_schema::Table local_table(thd, db, file_name_str); - local_table.remove_table(); - - // Flush the table out of ndbapi's dictionary cache - Ndb_table_guard ndbtab_g(ndb->getDictionary(), file_name_str); - ndbtab_g.invalidate(); - - // Flush the table from table def. cache. - TABLE_LIST table_list; - memset(&table_list, 0, sizeof(table_list)); - table_list.db= (char*)db; - table_list.alias= table_list.table_name= file_name_str; - close_cached_tables(thd, &table_list, false, 0); - - DBUG_ASSERT(!thd->is_error()); + /* + Don't delete anything when called from + the binlog thread. This is a kludge to avoid + that something is deleted when "Ndb schema dist" + uses find_files() to check for "local tables in db" + */ + } + else + { + /* + Delete old files + (.frm files with corresponding .ndb + does not exists in NDB) + */ + List_iterator_fast it3(delete_list); + while ((file_name_str= it3++)) + { + DBUG_PRINT("info", ("Deleting local files for table '%s.%s'", + db, file_name_str)); + + // Delete the table and its related files from disk + Ndb_local_schema::Table local_table(thd, db, file_name_str); + local_table.remove_table(); + + // Flush the table out of ndbapi's dictionary cache + Ndb_table_guard ndbtab_g(ndb->getDictionary(), file_name_str); + ndbtab_g.invalidate(); + + // Flush the table from table def. cache. + TABLE_LIST table_list; + memset(&table_list, 0, sizeof(table_list)); + table_list.db= (char*)db; + table_list.alias= table_list.table_name= file_name_str; + close_cached_tables(thd, &table_list, false, 0); + + DBUG_ASSERT(!thd->is_error()); } } @@ -13846,7 +13183,6 @@ NDB_SHARE *ndbcluster_get_share(const ch DBUG_RETURN(share); } - void ndbcluster_real_free_share(NDB_SHARE **share) { DBUG_ENTER("ndbcluster_real_free_share"); === modified file 'sql/ha_ndbcluster.h' --- a/sql/ha_ndbcluster.h 2012-01-17 14:08:16 +0000 +++ b/sql/ha_ndbcluster.h 2012-01-31 11:11:20 +0000 @@ -27,6 +27,7 @@ #include #include #include +#include "ndb_conflict.h" #define NDB_IGNORE_VALUE(x) (void)x @@ -110,99 +111,6 @@ public: #include "ndb_ndbapi_util.h" #include "ndb_share.h" -enum enum_slave_trans_conflict_apply_state -{ - /* Normal with optional row-level conflict detection */ - SAS_NORMAL, - - /* - SAS_TRACK_TRANS_DEPENDENCIES - Track inter-transaction dependencies - */ - SAS_TRACK_TRANS_DEPENDENCIES, - - /* - SAS_APPLY_TRANS_DEPENDENCIES - Apply only non conflicting transactions - */ - SAS_APPLY_TRANS_DEPENDENCIES -}; - -enum enum_slave_conflict_flags -{ - /* Conflict detection Ops defined */ - SCS_OPS_DEFINED = 1, - /* Conflict detected on table with transactional resolution */ - SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2 -}; - -/* - State associated with the Slave thread - (From the Ndb handler's point of view) -*/ -struct st_ndb_slave_state -{ - /* Counter values for current slave transaction */ - Uint32 current_violation_count[CFT_NUMBER_OF_CFTS]; - Uint64 current_master_server_epoch; - Uint64 current_max_rep_epoch; - uint8 conflict_flags; /* enum_slave_conflict_flags */ - /* Transactional conflict detection */ - Uint32 retry_trans_count; - Uint32 current_trans_row_conflict_count; - Uint32 current_trans_row_reject_count; - Uint32 current_trans_in_conflict_count; - - /* Cumulative counter values */ - Uint64 total_violation_count[CFT_NUMBER_OF_CFTS]; - Uint64 max_rep_epoch; - Uint32 sql_run_id; - /* Transactional conflict detection */ - Uint64 trans_row_conflict_count; - Uint64 trans_row_reject_count; - Uint64 trans_detect_iter_count; - Uint64 trans_in_conflict_count; - Uint64 trans_conflict_commit_count; - - static const Uint32 MAX_RETRY_TRANS_COUNT = 100; - - /* - Slave Apply State - - State of Binlog application from Ndb point of view. - */ - enum_slave_trans_conflict_apply_state trans_conflict_apply_state; - - MEM_ROOT conflict_mem_root; - class DependencyTracker* trans_dependency_tracker; - - /* Methods */ - void atStartSlave(); - int atPrepareConflictDetection(const NdbDictionary::Table* table, - const NdbRecord* key_rec, - const uchar* row_data, - Uint64 transaction_id, - bool& handle_conflict_now); - int atTransConflictDetected(Uint64 transaction_id); - int atConflictPreCommit(bool& retry_slave_trans); - - void atBeginTransConflictHandling(); - void atEndTransConflictHandling(); - - void atTransactionCommit(); - void atTransactionAbort(); - void atResetSlave(); - - void atApplyStatusWrite(Uint32 master_server_id, - Uint32 row_server_id, - Uint64 row_epoch, - bool is_row_server_id_local); - - void resetPerAttemptCounters(); - - st_ndb_slave_state(); -}; - struct Ndb_local_table_statistics { int no_uncommitted_rows_count; ulong last_count; === modified file 'sql/ha_ndbcluster_binlog.cc' --- a/sql/ha_ndbcluster_binlog.cc 2012-01-25 17:50:29 +0000 +++ b/sql/ha_ndbcluster_binlog.cc 2012-01-31 11:11:20 +0000 @@ -4014,6 +4014,8 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE cfn_share->m_resolve_column= field_index; cfn_share->m_flags = flags; + /* Init Exceptions Table Writer */ + new (&cfn_share->m_ex_tab_writer) ExceptionsTableWriter(); { /* get exceptions table */ char ex_tab_name[FN_REFLEN]; @@ -4025,71 +4027,31 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE const NDBTAB *ex_tab= ndbtab_g.get_table(); if (ex_tab) { - const int fixed_cols= 4; - bool ok= - ex_tab->getNoOfColumns() >= fixed_cols && - ex_tab->getNoOfPrimaryKeys() == 4 && - /* server id */ - ex_tab->getColumn(0)->getType() == NDBCOL::Unsigned && - ex_tab->getColumn(0)->getPrimaryKey() && - /* master_server_id */ - ex_tab->getColumn(1)->getType() == NDBCOL::Unsigned && - ex_tab->getColumn(1)->getPrimaryKey() && - /* master_epoch */ - ex_tab->getColumn(2)->getType() == NDBCOL::Bigunsigned && - ex_tab->getColumn(2)->getPrimaryKey() && - /* count */ - ex_tab->getColumn(3)->getType() == NDBCOL::Unsigned && - ex_tab->getColumn(3)->getPrimaryKey(); - if (ok) + char msgBuf[ FN_REFLEN ]; + const char* msg = NULL; + if (cfn_share->m_ex_tab_writer.init(ndbtab, + ex_tab, + msgBuf, + sizeof(msgBuf), + &msg) == 0) { - int ncol= ndbtab->getNoOfColumns(); - int nkey= ndbtab->getNoOfPrimaryKeys(); - int i, k; - for (i= k= 0; i < ncol && k < nkey; i++) + /* Ok */ + /* Hold our table reference outside the table_guard scope */ + ndbtab_g.release(); + if (opt_ndb_extra_logging) { - const NdbDictionary::Column* col= ndbtab->getColumn(i); - if (col->getPrimaryKey()) - { - const NdbDictionary::Column* ex_col= - ex_tab->getColumn(fixed_cols + k); - ok= - ex_col != NULL && - col->getType() == ex_col->getType() && - col->getLength() == ex_col->getLength() && - col->getNullable() == ex_col->getNullable(); - if (!ok) - break; - /* - Store mapping of Exception table key# to - orig table attrid - */ - cfn_share->m_key_attrids[k]= i; - k++; - } + sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s", + share->db, + share->table_name, + share->db, + ex_tab_name); } - if (ok) - { - cfn_share->m_ex_tab= ex_tab; - cfn_share->m_pk_cols= nkey; - ndbtab_g.release(); - if (opt_ndb_extra_logging) - sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s", - share->db, - share->table_name, - share->db, - ex_tab_name); - } - else - sql_print_warning("NDB Slave: exceptions table %s has wrong " - "definition (column %d)", - ex_tab_name, fixed_cols + k); } else - sql_print_warning("NDB Slave: exceptions table %s has wrong " - "definition (initial %d columns)", - ex_tab_name, fixed_cols); - } + { + sql_print_warning("%s", msg); + } + } /* if (ex_tab) */ } DBUG_RETURN(0); } === added file 'sql/ndb_conflict.cc' --- a/sql/ndb_conflict.cc 1970-01-01 00:00:00 +0000 +++ b/sql/ndb_conflict.cc 2012-01-31 11:11:20 +0000 @@ -0,0 +1,805 @@ +/* + Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ +#include /* For config defines */ + +#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE +/* distcheck does not compile from here... */ + +#include "ha_ndbcluster_glue.h" +#include "ndb_conflict.h" + +#ifdef HAVE_NDB_BINLOG + +#define NDBTAB NdbDictionary::Table +#define NDBCOL NdbDictionary::Column + +int +ExceptionsTableWriter::init(const NdbDictionary::Table* mainTable, + const NdbDictionary::Table* exceptionsTable, + char* msg_buf, + uint msg_buf_len, + const char** msg) +{ + const char* ex_tab_name = exceptionsTable->getName(); + const int fixed_cols= 4; + bool ok= + exceptionsTable->getNoOfColumns() >= fixed_cols && + exceptionsTable->getNoOfPrimaryKeys() == 4 && + /* server id */ + exceptionsTable->getColumn(0)->getType() == NDBCOL::Unsigned && + exceptionsTable->getColumn(0)->getPrimaryKey() && + /* master_server_id */ + exceptionsTable->getColumn(1)->getType() == NDBCOL::Unsigned && + exceptionsTable->getColumn(1)->getPrimaryKey() && + /* master_epoch */ + exceptionsTable->getColumn(2)->getType() == NDBCOL::Bigunsigned && + exceptionsTable->getColumn(2)->getPrimaryKey() && + /* count */ + exceptionsTable->getColumn(3)->getType() == NDBCOL::Unsigned && + exceptionsTable->getColumn(3)->getPrimaryKey(); + if (ok) + { + int ncol= mainTable->getNoOfColumns(); + int nkey= mainTable->getNoOfPrimaryKeys(); + int i, k; + for (i= k= 0; i < ncol && k < nkey; i++) + { + const NdbDictionary::Column* col= mainTable->getColumn(i); + if (col->getPrimaryKey()) + { + const NdbDictionary::Column* ex_col= + exceptionsTable->getColumn(fixed_cols + k); + ok= + ex_col != NULL && + col->getType() == ex_col->getType() && + col->getLength() == ex_col->getLength() && + col->getNullable() == ex_col->getNullable(); + if (!ok) + break; + /* + Store mapping of Exception table key# to + orig table attrid + */ + m_key_attrids[k]= i; + k++; + } + } + if (ok) + { + m_ex_tab= exceptionsTable; + m_pk_cols= nkey; + return 0; + } + else + my_snprintf(msg_buf, msg_buf_len, + "NDB Slave: exceptions table %s has wrong " + "definition (column %d)", + ex_tab_name, fixed_cols + k); + } + else + my_snprintf(msg_buf, msg_buf_len, + "NDB Slave: exceptions table %s has wrong " + "definition (initial %d columns)", + ex_tab_name, fixed_cols); + + *msg = msg_buf; + return -1; +} + +void +ExceptionsTableWriter::free(Ndb* ndb) +{ + if (m_ex_tab) + { + NdbDictionary::Dictionary* dict = ndb->getDictionary(); + dict->removeTableGlobal(*m_ex_tab, 0); + m_ex_tab= 0; + } +} + +int +ExceptionsTableWriter::writeRow(NdbTransaction* trans, + const NdbRecord* keyRecord, + uint32 server_id, + uint32 master_server_id, + uint64 master_epoch, + const uchar* rowPtr, + NdbError& err) +{ + DBUG_ENTER("ExceptionsTableWriter::writeRow"); + assert(err.code == 0); + do + { + /* Have exceptions table, add row to it */ + const NDBTAB *ex_tab= m_ex_tab; + + /* get insert op */ + NdbOperation *ex_op= trans->getNdbOperation(ex_tab); + if (ex_op == NULL) + { + err= trans->getNdbError(); + break; + } + if (ex_op->insertTuple() == -1) + { + err= ex_op->getNdbError(); + break; + } + { + uint32 count= (uint32)++m_count; + if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) || + ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) || + ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) || + ex_op->setValue((Uint32)3, (const char *)&(count))) + { + err= ex_op->getNdbError(); + break; + } + } + /* copy primary keys */ + { + const int fixed_cols= 4; + int nkey= m_pk_cols; + int k; + for (k= 0; k < nkey; k++) + { + DBUG_ASSERT(rowPtr != NULL); + const uchar* data= + (const uchar*) NdbDictionary::getValuePtr(keyRecord, + (const char*) rowPtr, + m_key_attrids[k]); + if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1) + { + err= ex_op->getNdbError(); + break; + } + } + } + } while (0); + + if (err.code != 0) + { + if (err.classification == NdbError::SchemaError) + { + /* Something up with Exceptions table schema, forget it. + * No further exceptions will be recorded. + * TODO : Log this somehow + */ + NdbDictionary::Dictionary* dict= trans->getNdb()->getDictionary(); + dict->removeTableGlobal(*m_ex_tab, false); + m_ex_tab= NULL; + DBUG_RETURN(0); + } + DBUG_RETURN(-1); + } + DBUG_RETURN(0); +} + +/* HAVE_NDB_BINLOG */ +#endif + +/** + st_ndb_slave_state constructor + + Initialise Ndb Slave state object +*/ +st_ndb_slave_state::st_ndb_slave_state() + : current_master_server_epoch(0), + current_max_rep_epoch(0), + conflict_flags(0), + retry_trans_count(0), + current_trans_row_conflict_count(0), + current_trans_row_reject_count(0), + current_trans_in_conflict_count(0), + max_rep_epoch(0), + sql_run_id(~Uint32(0)), + trans_row_conflict_count(0), + trans_row_reject_count(0), + trans_detect_iter_count(0), + trans_in_conflict_count(0), + trans_conflict_commit_count(0), + trans_conflict_apply_state(SAS_NORMAL), + trans_dependency_tracker(NULL) +{ + memset(current_violation_count, 0, sizeof(current_violation_count)); + memset(total_violation_count, 0, sizeof(total_violation_count)); + + /* Init conflict handling state memroot */ + const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768; + init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0); +}; + +/** + resetPerAttemptCounters + + Reset the per-epoch-transaction-application-attempt counters +*/ +void +st_ndb_slave_state::resetPerAttemptCounters() +{ + memset(current_violation_count, 0, sizeof(current_violation_count)); + current_trans_row_conflict_count = 0; + current_trans_row_reject_count = 0; + current_trans_in_conflict_count = 0; + + conflict_flags = 0; + current_max_rep_epoch = 0; +} + +/** + atTransactionAbort() + + Called by Slave SQL thread during transaction abort. +*/ +void +st_ndb_slave_state::atTransactionAbort() +{ + /* Reset current-transaction counters + state */ + resetPerAttemptCounters(); +} + + + +/** + atTransactionCommit() + + Called by Slave SQL thread after transaction commit +*/ +void +st_ndb_slave_state::atTransactionCommit() +{ + assert( ((trans_dependency_tracker == NULL) && + (trans_conflict_apply_state == SAS_NORMAL)) || + ((trans_dependency_tracker != NULL) && + (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) ); + assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES ); + + /* Merge committed transaction counters into total state + * Then reset current transaction counters + */ + for (int i=0; i < CFT_NUMBER_OF_CFTS; i++) + { + total_violation_count[i]+= current_violation_count[i]; + } + trans_row_conflict_count+= current_trans_row_conflict_count; + trans_row_reject_count+= current_trans_row_reject_count; + trans_in_conflict_count+= current_trans_in_conflict_count; + + if (current_trans_in_conflict_count) + trans_conflict_commit_count++; + + if (current_max_rep_epoch > max_rep_epoch) + { + DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu", + max_rep_epoch, + current_max_rep_epoch)); + max_rep_epoch = current_max_rep_epoch; + } + + resetPerAttemptCounters(); + + /* Clear per-epoch-transaction retry_trans_count */ + retry_trans_count = 0; +} + +/** + atApplyStatusWrite + + Called by Slave SQL thread when applying an event to the + ndb_apply_status table +*/ +void +st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id, + Uint32 row_server_id, + Uint64 row_epoch, + bool is_row_server_id_local) +{ + if (row_server_id == master_server_id) + { + /* + WRITE_ROW to ndb_apply_status injected by MySQLD + immediately upstream of us. + Record epoch + */ + current_master_server_epoch = row_epoch; + assert(! is_row_server_id_local); + } + else if (is_row_server_id_local) + { + DBUG_PRINT("info", ("Recording application of local server %u epoch %llu " + " which is %s.", + row_server_id, row_epoch, + (row_epoch > current_max_rep_epoch)? + " new highest." : " older than previously applied")); + if (row_epoch > current_max_rep_epoch) + { + /* + Store new highest epoch in thdvar. If we commit successfully + then this can become the new global max + */ + current_max_rep_epoch = row_epoch; + } + } +} + +/** + atResetSlave() + + Called when RESET SLAVE command issued - in context of command client. +*/ +void +st_ndb_slave_state::atResetSlave() +{ + /* Reset the Maximum replicated epoch vars + * on slave reset + * No need to touch the sql_run_id as that + * will increment if the slave is started + * again. + */ + resetPerAttemptCounters(); + + retry_trans_count = 0; + max_rep_epoch = 0; +} + + +/** + atStartSlave() + + Called by Slave SQL thread when first applying a row to Ndb after + a START SLAVE command. +*/ +void +st_ndb_slave_state::atStartSlave() +{ +#ifdef HAVE_NDB_BINLOG + if (trans_conflict_apply_state != SAS_NORMAL) + { + /* + Remove conflict handling state on a SQL thread + restart + */ + atEndTransConflictHandling(); + trans_conflict_apply_state = SAS_NORMAL; + } +#endif +}; + +#ifdef HAVE_NDB_BINLOG + +/** + atEndTransConflictHandling + + Called when transactional conflict handling has completed. +*/ +void +st_ndb_slave_state::atEndTransConflictHandling() +{ + DBUG_ENTER("atEndTransConflictHandling"); + /* Release any conflict handling state */ + if (trans_dependency_tracker) + { + current_trans_in_conflict_count = + trans_dependency_tracker->get_conflict_count(); + trans_dependency_tracker = NULL; + free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE); + } + DBUG_VOID_RETURN; +}; + +/** + atBeginTransConflictHandling() + + Called by Slave SQL thread when it determines that Transactional + Conflict handling is required +*/ +void +st_ndb_slave_state::atBeginTransConflictHandling() +{ + DBUG_ENTER("atBeginTransConflictHandling"); + /* + Allocate and initialise Transactional Conflict + Resolution Handling Structures + */ + assert(trans_dependency_tracker == NULL); + trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root); + DBUG_VOID_RETURN; +}; + +/** + atPrepareConflictDetection + + Called by Slave SQL thread prior to defining an operation on + a table with conflict detection defined. +*/ +int +st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table, + const NdbRecord* key_rec, + const uchar* row_data, + Uint64 transaction_id, + bool& handle_conflict_now) +{ + DBUG_ENTER("atPrepareConflictDetection"); + /* + Slave is preparing to apply an operation with conflict detection. + If we're performing Transactional Conflict Resolution, take + extra steps + */ + switch( trans_conflict_apply_state ) + { + case SAS_NORMAL: + DBUG_PRINT("info", ("SAS_NORMAL : No special handling")); + /* No special handling */ + break; + case SAS_TRACK_TRANS_DEPENDENCIES: + { + DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation")); + /* + Track this operation and its transaction id, to determine + inter-transaction dependencies by {table, primary key} + */ + assert( trans_dependency_tracker ); + + int res = trans_dependency_tracker + ->track_operation(table, + key_rec, + row_data, + transaction_id); + if (res != 0) + { + sql_print_error("%s", trans_dependency_tracker->get_error_text()); + DBUG_RETURN(res); + } + /* Proceed as normal */ + break; + } + case SAS_APPLY_TRANS_DEPENDENCIES: + { + DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply")); + /* + Check if this operation's transaction id is marked in-conflict. + If it is, we tell the caller to perform conflict resolution now instead + of attempting to apply the operation. + */ + assert( trans_dependency_tracker ); + + if (trans_dependency_tracker->in_conflict(transaction_id)) + { + DBUG_PRINT("info", ("Event for transaction %llu is conflicting. Handling.", + transaction_id)); + current_trans_row_reject_count++; + handle_conflict_now = true; + DBUG_RETURN(0); + } + + /* + This transaction is not marked in-conflict, so continue with normal + processing. + Note that normal processing may subsequently detect a conflict which + didn't exist at the time of the previous TRACK_DEPENDENCIES pass. + In this case, we will rollback and repeat the TRACK_DEPENDENCIES + stage. + */ + DBUG_PRINT("info", ("Event for transaction %llu is OK, applying", + transaction_id)); + break; + } + } + DBUG_RETURN(0); +} + +/** + atTransConflictDetected + + Called by the Slave SQL thread when a conflict is detected on + an executed operation. +*/ +int +st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id) +{ + DBUG_ENTER("atTransConflictDetected"); + + /* + The Slave has detected a conflict on an operation applied + to a table with Transactional Conflict Resolution defined. + Handle according to current state. + */ + conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS; + current_trans_row_conflict_count++; + + switch (trans_conflict_apply_state) + { + case SAS_NORMAL: + { + DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection." + "Requires multi-pass resolution. Will transition to " + "SAS_TRACK_TRANS_DEPENDENCIES at Commit.")); + /* + Conflict on table with transactional conflict resolution + defined. + This is the trigger that we will do transactional conflict + resolution. + Record that we need to do multiple passes to correctly + perform resolution. + TODO : Early exit from applying epoch? + */ + break; + } + case SAS_TRACK_TRANS_DEPENDENCIES: + { + DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu " + "had conflict", + transaction_id)); + /* + Conflict on table with transactional conflict resolution + defined. + We will mark the operation's transaction_id as in-conflict, + so that any other operations on the transaction are also + considered in-conflict, and any dependent transactions are also + considered in-conflict. + */ + assert(trans_dependency_tracker != NULL); + int res = trans_dependency_tracker + ->mark_conflict(transaction_id); + + if (res != 0) + { + sql_print_error("%s", trans_dependency_tracker->get_error_text()); + DBUG_RETURN(res); + } + break; + } + case SAS_APPLY_TRANS_DEPENDENCIES: + { + /* + This must be a new conflict, not noticed on the previous + pass. + */ + DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected. " + "Must be further conflict. Will return to " + "SAS_TRACK_TRANS_DEPENDENCIES state at commit.")); + // TODO : Early exit from applying epoch + break; + } + default: + break; + } + + DBUG_RETURN(0); +} + +/** + atConflictPreCommit + + Called by the Slave SQL thread prior to committing a Slave transaction. + This method can request that the Slave transaction is retried. + + + State transitions : + + START SLAVE / + RESET SLAVE / + STARTUP + | + | + v + **************** + * SAS_NORMAL * + **************** + ^ | + No transactional | | Conflict on transactional table + conflicts | | (Rollback) + (Commit) | | + | v + ********************************** + * SAS_TRACK_TRANS_DEPENDENCIES * + ********************************** + ^ I ^ + More I I Dependencies | + conflicts I I determined | No new conflicts + found I I (Rollback) | (Commit) + (Rollback) I I | + I v | + ********************************** + * SAS_APPLY_TRANS_DEPENDENCIES * + ********************************** + + + Operation + The initial state is SAS_NORMAL. + + On detecting a conflict on a transactional conflict detetecing table, + SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is + rolled back and reapplied. + + In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and + conflicts are tracked as the epoch transaction is applied. + + Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and + the epoch transaction is rolled back and reapplied. + + In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions + marked as in-conflict are not applied. + + If this results in no new conflicts, the epoch transaction is committed, + and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing + the next replicated epch transaction. + If it results in new conflicts, the epoch transactions is rolled back, and + the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine + the new set of dependencies. + + If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then + the epoch transaction is committed, and the Slave transitions to SAS_NORMAL + state. + + + Properties + 1) Normally, there is no transaction dependency tracking overhead paid by + the slave. + + 2) On first detecting a transactional conflict, the epoch transaction must be + applied at least three times, with two rollbacks. + + 3) Transactional conflicts detected in subsequent epochs require the epoch + transaction to be applied two times, with one rollback. + + 4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_ + DEPENDENCIES occurs when further transactional conflicts are discovered + in SAS_APPLY_TRANS_DEPENDENCIES state. This implies that the conflicts + discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete, + so we revisit that state to get a more complete picture. + + 5) The number of iterations of this loop is fixed to a hard coded limit, after + which the Slave will stop with an error. This should be an unlikely + occurrence, as it requires not just n conflicts, but at least 1 new conflict + appearing between the transactions in the epoch transaction and the + database between the two states, n times in a row. + + 6) Where conflicts are occasional, as expected, the post-commit transition to + SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch + transaction having its transaction dependencies needlessly tracked. + +*/ +int +st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans) +{ + DBUG_ENTER("atConflictPreCommit"); + + /* + Prior to committing a Slave transaction, we check whether + Transactional conflicts have been detected which require + us to retry the slave transaction + */ + retry_slave_trans = false; + switch(trans_conflict_apply_state) + { + case SAS_NORMAL: + { + DBUG_PRINT("info", ("SAS_NORMAL")); + /* + Normal case. Only if we defined conflict detection on a table + with transactional conflict detection, and saw conflicts (on any table) + do we go to another state + */ + if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS) + { + DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to " + "SAS_TRACK_TRANS_DEPENDENCIES.")); + assert(conflict_flags & SCS_OPS_DEFINED); + /* Transactional conflict resolution required, switch state */ + atBeginTransConflictHandling(); + resetPerAttemptCounters(); + trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES; + retry_slave_trans = true; + } + break; + } + case SAS_TRACK_TRANS_DEPENDENCIES: + { + DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES")); + + if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS) + { + /* + Conflict on table with transactional detection + this pass, we have collected the details and + dependencies, now transition to + SAS_APPLY_TRANS_DEPENDENCIES and + reapply the epoch transaction without the + conflicting transactions. + */ + assert(conflict_flags & SCS_OPS_DEFINED); + DBUG_PRINT("info", ("Transactional conflicts, transitioning to " + "SAS_APPLY_TRANS_DEPENDENCIES")); + + trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES; + trans_detect_iter_count++; + retry_slave_trans = true; + break; + } + else + { + /* + No transactional conflicts detected this pass, lets + return to SAS_NORMAL state after commit for more efficient + application of epoch transactions + */ + DBUG_PRINT("info", ("No transactional conflicts, transitioning to " + "SAS_NORMAL")); + atEndTransConflictHandling(); + trans_conflict_apply_state = SAS_NORMAL; + break; + } + } + case SAS_APPLY_TRANS_DEPENDENCIES: + { + DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES")); + assert(conflict_flags & SCS_OPS_DEFINED); + /* + We've applied the Slave epoch transaction subject to the + conflict detection. If any further transactional + conflicts have been observed, then we must repeat the + process. + */ + atEndTransConflictHandling(); + atBeginTransConflictHandling(); + trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES; + + if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)) + { + DBUG_PRINT("info", ("Further conflict(s) detected, repeating the " + "TRACK_TRANS_DEPENDENCIES pass")); + /* + Further conflict observed when applying, need + to re-determine dependencies + */ + resetPerAttemptCounters(); + retry_slave_trans = true; + break; + } + + + DBUG_PRINT("info", ("No further conflicts detected, committing and " + "returning to SAS_TRACK_TRANS_DEPENDENCIES state")); + /* + With dependencies taken into account, no further + conflicts detected, can now proceed to commit + */ + break; + } + } + + /* + Clear conflict flags, to ensure that we detect any new conflicts + */ + conflict_flags = 0; + + if (retry_slave_trans) + { + DBUG_PRINT("info", ("Requesting transaction restart")); + DBUG_RETURN(1); + } + + DBUG_PRINT("info", ("Allowing commit to proceed")); + DBUG_RETURN(0); +} + +/* HAVE_NDB_BINLOG */ +#endif + +/* WITH_NDBCLUSTER_STORAGE_ENGINE */ +#endif === added file 'sql/ndb_conflict.h' --- a/sql/ndb_conflict.h 1970-01-01 00:00:00 +0000 +++ b/sql/ndb_conflict.h 2012-01-31 11:11:20 +0000 @@ -0,0 +1,315 @@ +/* + Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef NDB_CONFLICT_H +#define NDB_CONFLICT_H + +#include "ndb_conflict_trans.h" +#include +#include + +enum enum_conflict_fn_type +{ + CFT_NDB_UNDEF = 0 + ,CFT_NDB_MAX + ,CFT_NDB_OLD + ,CFT_NDB_MAX_DEL_WIN + ,CFT_NDB_EPOCH + ,CFT_NDB_EPOCH_TRANS + ,CFT_NUMBER_OF_CFTS /* End marker */ +}; + +#ifdef HAVE_NDB_BINLOG +static const Uint32 MAX_CONFLICT_ARGS= 8; + +enum enum_conflict_fn_arg_type +{ + CFAT_END + ,CFAT_COLUMN_NAME + ,CFAT_EXTRA_GCI_BITS +}; + +struct st_conflict_fn_arg +{ + enum_conflict_fn_arg_type type; + union + { + char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME + uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS + }; +}; + +struct st_conflict_fn_arg_def +{ + enum enum_conflict_fn_arg_type arg_type; + bool optional; +}; + +/* What type of operation was issued */ +enum enum_conflicting_op_type +{ /* NdbApi */ + WRITE_ROW, /* insert (!write) */ + UPDATE_ROW, /* update */ + DELETE_ROW, /* delete */ + REFRESH_ROW /* refresh */ +}; + +/* + Room for 10 instruction words, two labels (@ 2words/label) + + 2 extra words for the case of resolve_size == 8 +*/ +#define MAX_CONFLICT_INTERPRETED_PROG_SIZE 16 + +/* + prepare_detect_func + + Type of function used to prepare for conflict detection on + an NdbApi operation +*/ +typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share, + enum_conflicting_op_type op_type, + const NdbRecord* data_record, + const uchar* old_data, + const uchar* new_data, + const MY_BITMAP* write_set, + class NdbInterpretedCode* code); + +enum enum_conflict_fn_flags +{ + CF_TRANSACTIONAL = 1 +}; + +struct st_conflict_fn_def +{ + const char *name; + enum_conflict_fn_type type; + const st_conflict_fn_arg_def* arg_defs; + prepare_detect_func prep_func; + uint8 flags; /* enum_conflict_fn_flags */ +}; + +/* What sort of conflict was found */ +enum enum_conflict_cause +{ + ROW_ALREADY_EXISTS, /* On insert */ + ROW_DOES_NOT_EXIST, /* On Update, Delete */ + ROW_IN_CONFLICT, /* On Update, Delete */ + TRANS_IN_CONFLICT /* Any of above, or implied by transaction */ +}; + +/* NdbOperation custom data which points out handler and record. */ +struct Ndb_exceptions_data { + struct NDB_SHARE* share; + const NdbRecord* key_rec; + const uchar* row; + enum_conflicting_op_type op_type; + Uint64 trans_id; +}; + +enum enum_conflict_fn_table_flags +{ + CFF_NONE = 0, + CFF_REFRESH_ROWS = 1 +}; + +/* + Maximum supported key parts (16) + (Ndb supports 32, but MySQL has a lower limit) +*/ +static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS; + +/** + ExceptionsTableWriter + + Helper class for inserting entries into an exceptions + table +*/ +class ExceptionsTableWriter +{ +public: + ExceptionsTableWriter() + :m_pk_cols(0), m_ex_tab(NULL), m_count(0) + {}; + + ~ExceptionsTableWriter() + {}; + + /** + hasTable + + Returns true if there is an Exceptions table + */ + bool hasTable() const + { + return m_ex_tab != NULL; + }; + + /** + init + + Initialise ExceptionsTableWriter with main and exceptions + tables. + + May set a warning message on success or error. + */ + int init(const NdbDictionary::Table* mainTable, + const NdbDictionary::Table* exceptionsTable, + char* msg_buf, + uint msg_buf_len, + const char** msg); + + /** + free + + Release reference to exceptions table + */ + void free(Ndb* ndb); + + /** + writeRow + + Write a row to the Exceptions Table for the given + key + */ + int writeRow(NdbTransaction* trans, + const NdbRecord* keyRecord, + uint32 server_id, + uint32 master_server_id, + uint64 master_epoch, + const uchar* rowPtr, + NdbError& err); + +private: + /* info about original table */ + uint8 m_pk_cols; + uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ]; + + const NdbDictionary::Table *m_ex_tab; + uint32 m_count; +}; + +typedef struct st_ndbcluster_conflict_fn_share { + const st_conflict_fn_def* m_conflict_fn; + + /* info about original table */ + uint16 m_resolve_column; + uint8 m_resolve_size; + uint8 m_flags; + + ExceptionsTableWriter m_ex_tab_writer; +} NDB_CONFLICT_FN_SHARE; + + +/* HAVE_NDB_BINLOG */ +#endif + +enum enum_slave_trans_conflict_apply_state +{ + /* Normal with optional row-level conflict detection */ + SAS_NORMAL, + + /* + SAS_TRACK_TRANS_DEPENDENCIES + Track inter-transaction dependencies + */ + SAS_TRACK_TRANS_DEPENDENCIES, + + /* + SAS_APPLY_TRANS_DEPENDENCIES + Apply only non conflicting transactions + */ + SAS_APPLY_TRANS_DEPENDENCIES +}; + +enum enum_slave_conflict_flags +{ + /* Conflict detection Ops defined */ + SCS_OPS_DEFINED = 1, + /* Conflict detected on table with transactional resolution */ + SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2 +}; + +/* + State associated with the Slave thread + (From the Ndb handler's point of view) +*/ +struct st_ndb_slave_state +{ + /* Counter values for current slave transaction */ + Uint32 current_violation_count[CFT_NUMBER_OF_CFTS]; + Uint64 current_master_server_epoch; + Uint64 current_max_rep_epoch; + uint8 conflict_flags; /* enum_slave_conflict_flags */ + /* Transactional conflict detection */ + Uint32 retry_trans_count; + Uint32 current_trans_row_conflict_count; + Uint32 current_trans_row_reject_count; + Uint32 current_trans_in_conflict_count; + + /* Cumulative counter values */ + Uint64 total_violation_count[CFT_NUMBER_OF_CFTS]; + Uint64 max_rep_epoch; + Uint32 sql_run_id; + /* Transactional conflict detection */ + Uint64 trans_row_conflict_count; + Uint64 trans_row_reject_count; + Uint64 trans_detect_iter_count; + Uint64 trans_in_conflict_count; + Uint64 trans_conflict_commit_count; + + static const Uint32 MAX_RETRY_TRANS_COUNT = 100; + + /* + Slave Apply State + + State of Binlog application from Ndb point of view. + */ + enum_slave_trans_conflict_apply_state trans_conflict_apply_state; + + MEM_ROOT conflict_mem_root; + class DependencyTracker* trans_dependency_tracker; + + /* Methods */ + void atStartSlave(); + int atPrepareConflictDetection(const NdbDictionary::Table* table, + const NdbRecord* key_rec, + const uchar* row_data, + Uint64 transaction_id, + bool& handle_conflict_now); + int atTransConflictDetected(Uint64 transaction_id); + int atConflictPreCommit(bool& retry_slave_trans); + + void atBeginTransConflictHandling(); + void atEndTransConflictHandling(); + + void atTransactionCommit(); + void atTransactionAbort(); + void atResetSlave(); + + void atApplyStatusWrite(Uint32 master_server_id, + Uint32 row_server_id, + Uint64 row_epoch, + bool is_row_server_id_local); + + void resetPerAttemptCounters(); + + st_ndb_slave_state(); +}; + + +/* NDB_CONFLICT_H */ +#endif === modified file 'sql/ndb_share.cc' --- a/sql/ndb_share.cc 2011-11-10 08:21:36 +0000 +++ b/sql/ndb_share.cc 2012-01-31 11:11:20 +0000 @@ -33,11 +33,11 @@ NDB_SHARE::destroy(NDB_SHARE* share) pthread_mutex_destroy(&share->mutex); #ifdef HAVE_NDB_BINLOG - if (share->m_cfn_share && share->m_cfn_share->m_ex_tab && g_ndb) + if (share->m_cfn_share && + share->m_cfn_share->m_ex_tab_writer.hasTable() && + g_ndb) { - NdbDictionary::Dictionary *dict= g_ndb->getDictionary(); - dict->removeTableGlobal(*(share->m_cfn_share->m_ex_tab), 0); - share->m_cfn_share->m_ex_tab= 0; + share->m_cfn_share->m_ex_tab_writer.free(g_ndb); } #endif share->new_op= 0; === modified file 'sql/ndb_share.h' --- a/sql/ndb_share.h 2012-01-25 17:50:29 +0000 +++ b/sql/ndb_share.h 2012-01-31 11:11:20 +0000 @@ -26,6 +26,7 @@ #include // MAX_REF_PARTS #include // Ndb::TupleIdRange +#include "ndb_conflict.h" enum NDB_SHARE_STATE { NSS_INITIAL= 0, @@ -33,18 +34,6 @@ enum NDB_SHARE_STATE { NSS_ALTERED }; - -enum enum_conflict_fn_type -{ - CFT_NDB_UNDEF = 0 - ,CFT_NDB_MAX - ,CFT_NDB_OLD - ,CFT_NDB_MAX_DEL_WIN - ,CFT_NDB_EPOCH - ,CFT_NDB_EPOCH_TRANS - ,CFT_NUMBER_OF_CFTS /* End marker */ -}; - #ifdef HAVE_NDB_BINLOG enum Ndb_binlog_type { @@ -56,112 +45,6 @@ enum Ndb_binlog_type ,NBT_UPDATED_ONLY_USE_UPDATE = NBT_UPDATED_ONLY | NBT_USE_UPDATE ,NBT_FULL_USE_UPDATE = NBT_FULL | NBT_USE_UPDATE }; - -static const Uint32 MAX_CONFLICT_ARGS= 8; - -enum enum_conflict_fn_arg_type -{ - CFAT_END - ,CFAT_COLUMN_NAME - ,CFAT_EXTRA_GCI_BITS -}; - -struct st_conflict_fn_arg -{ - enum_conflict_fn_arg_type type; - union - { - char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME - uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS - }; -}; - -struct st_conflict_fn_arg_def -{ - enum enum_conflict_fn_arg_type arg_type; - bool optional; -}; - -/* What type of operation was issued */ -enum enum_conflicting_op_type -{ /* NdbApi */ - WRITE_ROW, /* insert (!write) */ - UPDATE_ROW, /* update */ - DELETE_ROW, /* delete */ - REFRESH_ROW /* refresh */ -}; - -/* - prepare_detect_func - - Type of function used to prepare for conflict detection on - an NdbApi operation -*/ -typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share, - enum_conflicting_op_type op_type, - const NdbRecord* data_record, - const uchar* old_data, - const uchar* new_data, - const MY_BITMAP* write_set, - class NdbInterpretedCode* code); - -enum enum_conflict_fn_flags -{ - CF_TRANSACTIONAL = 1 -}; - -struct st_conflict_fn_def -{ - const char *name; - enum_conflict_fn_type type; - const st_conflict_fn_arg_def* arg_defs; - prepare_detect_func prep_func; - uint8 flags; /* enum_conflict_fn_flags */ -}; - -/* What sort of conflict was found */ -enum enum_conflict_cause -{ - ROW_ALREADY_EXISTS, /* On insert */ - ROW_DOES_NOT_EXIST, /* On Update, Delete */ - ROW_IN_CONFLICT, /* On Update, Delete */ - TRANS_IN_CONFLICT /* Any of above, or implied by transaction */ -}; - -/* NdbOperation custom data which points out handler and record. */ -struct Ndb_exceptions_data { - struct NDB_SHARE* share; - const NdbRecord* key_rec; - const uchar* row; - enum_conflicting_op_type op_type; - Uint64 trans_id; -}; - -enum enum_conflict_fn_table_flags -{ - CFF_NONE = 0, - CFF_REFRESH_ROWS = 1 -}; - -/* - Maximum supported key parts (16) - (Ndb supports 32, but MySQL has a lower limit) -*/ -static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS; - -typedef struct st_ndbcluster_conflict_fn_share { - const st_conflict_fn_def* m_conflict_fn; - - /* info about original table */ - uint8 m_pk_cols; - uint16 m_resolve_column; - uint8 m_resolve_size; - uint8 m_flags; - uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ]; - - const NdbDictionary::Table *m_ex_tab; - uint32 m_count; -} NDB_CONFLICT_FN_SHARE; #endif === modified file 'sql/ndb_thd.cc' --- a/sql/ndb_thd.cc 2012-01-17 13:59:49 +0000 +++ b/sql/ndb_thd.cc 2012-01-31 11:11:20 +0000 @@ -19,6 +19,7 @@ #define MYSQL_SERVER #endif +#include "ha_ndbcluster_glue.h" #include "ndb_thd.h" #include "ndb_thd_ndb.h" === modified file 'storage/ndb/CMakeLists.txt' --- a/storage/ndb/CMakeLists.txt 2012-01-25 17:50:29 +0000 +++ b/storage/ndb/CMakeLists.txt 2012-01-31 11:11:20 +0000 @@ -89,6 +89,7 @@ SET(NDBCLUSTER_SOURCES ../../sql/ndb_component.cc ../../sql/ndb_local_schema.cc ../../sql/ndb_repl_tab.cc + ../../sql/ndb_conflict.cc ) # Include directories used when building ha_ndbcluster No bundle (reason: useless for push emails).