3469 Frazer Clement 2011-09-09 [merge]
Merge 7.1->5.5-cluster
added:
mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result
mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt
mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test
mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf
mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test
mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc
mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
mysql-test/suite/rpl/r/rpl_extra_row_data.result
mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt
mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt
mysql-test/suite/rpl/t/rpl_extra_row_data.test
sql/ndb_conflict_trans.cc
sql/ndb_conflict_trans.h
storage/ndb/include/util/HashMap2.hpp
storage/ndb/include/util/LinkedStack.hpp
storage/ndb/src/common/util/HashMap2.cpp
storage/ndb/src/common/util/LinkedStack.cpp
modified:
mysql-test/suite/ndb/r/ndb_basic.result
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.h
sql/ha_ndbcluster_glue.h
sql/log_event.cc
sql/log_event.h
sql/ndb_mi.cc
sql/ndb_mi.h
sql/ndb_share.h
sql/rpl_constants.h
sql/slave.h
sql/sql_class.cc
sql/sql_class.h
storage/ndb/CMakeLists.txt
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
storage/ndb/src/common/portlib/NdbTCP.cpp
storage/ndb/src/common/util/CMakeLists.txt
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/Ndbif.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/src/ndbapi/trp_client.cpp
storage/ndb/src/ndbapi/trp_client.hpp
3468 Jonas Oreland 2011-09-07 [merge]
ndb - merge 7.1
modified:
storage/ndb/clusterj/clusterj-jpatest/src/main/java/com/mysql/clusterj/jpatest/TimestampAsSqlTimestampTest.java
=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result 2011-09-02 09:16:56 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result 2011-09-09 09:30:43 +0000
@@ -2,6 +2,15 @@ DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t
drop database if exists mysqltest;
SHOW GLOBAL STATUS LIKE 'ndb\_%';
Variable_name Value
+Ndb_api_adaptive_send_deferred_count #
+Ndb_api_adaptive_send_deferred_count_session #
+Ndb_api_adaptive_send_deferred_count_slave #
+Ndb_api_adaptive_send_forced_count #
+Ndb_api_adaptive_send_forced_count_session #
+Ndb_api_adaptive_send_forced_count_slave #
+Ndb_api_adaptive_send_unforced_count #
+Ndb_api_adaptive_send_unforced_count_session #
+Ndb_api_adaptive_send_unforced_count_slave #
Ndb_api_bytes_received_count #
Ndb_api_bytes_received_count_session #
Ndb_api_bytes_received_count_slave #
@@ -66,9 +75,15 @@ Ndb_cluster_node_id #
Ndb_config_from_host #
Ndb_config_from_port #
Ndb_conflict_fn_epoch #
+Ndb_conflict_fn_epoch_trans #
Ndb_conflict_fn_max #
Ndb_conflict_fn_max_del_win #
Ndb_conflict_fn_old #
+Ndb_conflict_trans_conflict_commit_count #
+Ndb_conflict_trans_detect_iter_count #
+Ndb_conflict_trans_reject_count #
+Ndb_conflict_trans_row_conflict_count #
+Ndb_conflict_trans_row_reject_count #
Ndb_connect_count #
Ndb_execute_count #
Ndb_index_stat_cache_clean #
@@ -108,6 +123,7 @@ ndb_log_bin #
ndb_log_binlog_index #
ndb_log_empty_epochs #
ndb_log_orig #
+ndb_log_transaction_id #
ndb_log_update_as_write #
ndb_log_updated_only #
ndb_mgmd_host #
=== added file 'mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result'
--- a/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/r/ndb_binlog_log_transaction_id.result 2011-09-07 22:50:01 +0000
@@ -0,0 +1,104 @@
+use test;
+create table t1 (a int primary key, b int) engine=ndb;
+create table t2 (a int primary key, b int) engine=ndb;
+Single row autocommit transactions
+Should have 1 transaction id
+insert into t1 values (1,1);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+insert into t1 values (2,2);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+update t1 set b=20 where a=2;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi row autocommit transaction
+Should have 1 transaction id
+delete from t1;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi row explicit transaction
+Should have 1 transaction id
+begin;
+insert into t1 values (3,3);
+insert into t1 values (4,4);
+insert into t1 values (5,5);
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Should have 1 transaction id
+begin;
+insert into t1 values (6,6);
+update t1 set b=40 where a=4;
+delete from t1 where a=5;
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multi table explicit transaction
+Should have 1 transaction id
+begin;
+insert into t1 values (7,7);
+insert into t2 values (7,7);
+insert into t2 values (8,8);
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+1
+reset master;
+Multiple autocommit transactions
+Should have 2 transaction ids
+insert into t1 values (8,8);
+insert into t1 values (9,9);
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+2
+reset master;
+Multiple autocommit transactions on single row
+Should have 3 transaction ids
+insert into t1 values (10,10);
+update t1 set b=100 where a=10;
+delete from t1 where a=10;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+3
+reset master;
+Multiple explicit transactions
+Should have 2 transaction ids
+begin;
+insert into t1 values (11,11);
+delete from t1;
+commit;
+begin;
+insert into t2 values (11,11);
+delete from t2;
+commit;
+Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+number_of_transactions
+2
+reset master;
+drop table t1;
+drop table t2;
=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc 2011-09-07 22:50:01 +0000
@@ -0,0 +1,30 @@
+#
+# Get the mysqlbinlog tool --verbose mode to dump the Binlog contents with
+# extra row data included, e.g.
+#
+#### Extra row data format: 0, len: 8 :0x0007200006000000
+#
+# Then process this input to get the number of distinct values, and hence
+# distinct transaction ids in the binlog
+#
+
+--disable_query_log
+let $MYSQLD_DATADIR= `select @@datadir;`;
+--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/mysqld-bin.000001 > $MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql
+
+create table raw_binlog_rows (txt varchar(1000));
+create table diff_extra_data (txt varchar(1000));
+
+--eval load data local infile '$MYSQLTEST_VARDIR/tmp/ndb_binlog_mysqlbinlog.sql' into table raw_binlog_rows columns terminated by '\n';
+
+insert into diff_extra_data select distinct(txt) from raw_binlog_rows where txt like '### Extra row data %';
+--enable_query_log
+
+--echo Number of different transaction ids in Binlog
+select count(1) as number_of_transactions from diff_extra_data;
+
+--disable_query_log
+drop table diff_extra_data;
+drop table raw_binlog_rows;
+--enable_query_log
+
=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id-master.opt 2011-09-07 22:50:01 +0000
@@ -0,0 +1 @@
+--ndb-log-transaction-id
=== added file 'mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test'
--- a/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_binlog/t/ndb_binlog_log_transaction_id.test 2011-09-07 22:50:01 +0000
@@ -0,0 +1,122 @@
+--source include/have_ndb.inc
+--source include/have_log_bin.inc
+--source include/have_debug.inc
+
+use test;
+
+create table t1 (a int primary key, b int) engine=ndb;
+create table t2 (a int primary key, b int) engine=ndb;
+
+--echo Single row autocommit transactions
+--echo Should have 1 transaction id
+insert into t1 values (1,1);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+insert into t1 values (2,2);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+update t1 set b=20 where a=2;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi row autocommit transaction
+--echo Should have 1 transaction id
+delete from t1;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi row explicit transaction
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (3,3);
+insert into t1 values (4,4);
+insert into t1 values (5,5);
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (6,6);
+update t1 set b=40 where a=4;
+delete from t1 where a=5;
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multi table explicit transaction
+--echo Should have 1 transaction id
+begin;
+insert into t1 values (7,7);
+insert into t2 values (7,7);
+insert into t2 values (8,8);
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple autocommit transactions
+--echo Should have 2 transaction ids
+insert into t1 values (8,8);
+insert into t1 values (9,9);
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple autocommit transactions on single row
+--echo Should have 3 transaction ids
+insert into t1 values (10,10);
+update t1 set b=100 where a=10;
+delete from t1 where a=10;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+--echo Multiple explicit transactions
+--echo Should have 2 transaction ids
+begin;
+insert into t1 values (11,11);
+delete from t1;
+commit;
+
+begin;
+insert into t2 values (11,11);
+delete from t2;
+commit;
+
+let $wait_binlog_event= COMMIT;
+--source include/wait_for_binlog_event.inc
+--source suite/ndb_binlog/t/ndb_binlog_get_row_extra_data.inc
+reset master;
+
+
+drop table t1;
+drop table t2;
\ No newline at end of file
=== added file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict_epoch_trans.result 2011-09-09 09:30:43 +0000
@@ -0,0 +1,913 @@
+include/master-slave.inc
+[connection master]
+Setup circular replication
+RESET MASTER;
+select @slave_server_id:=(variable_value+0)
+from information_schema.global_variables
+where variable_name like 'server_id';
+@slave_server_id:=(variable_value+0)
+3
+CHANGE MASTER TO master_host="127.0.0.1",master_port=SLAVE_PORT,master_user="root";
+START SLAVE;
+select @master_server_id:=(variable_value+0)
+from information_schema.global_variables
+where variable_name like 'server_id';
+@master_server_id:=(variable_value+0)
+1
+Setup ndb_replication and t1 exceptions table
+Populate ndb_replication table as necessary
+replace into mysql.ndb_replication values
+("test", "t1", 3, 7, NULL),
+("test", "t1", 1, 7, "NDB$EPOCH_TRANS()");
+replace into mysql.ndb_replication values
+("test", "t2", 3, 7, NULL),
+("test", "t2", 1, 7, "NDB$EPOCH_TRANS()");
+create table test.t1 (
+a int primary key,
+b varchar(2000)) engine=ndb;
+create table test.t2 (
+a int primary key,
+b varchar(2000)) engine=ndb;
+Add some data
+insert into test.t1 values
+(1, "Initial data 1"),
+(2, "Initial data 2"),
+(3, "Initial data 3"),
+(4, "Initial data 4"),
+(5, "Initial data 5"),
+(6, "Initial data 6"),
+(7, "Initial data 7"),
+(8, "Initial data 8"),
+(9, "Initial data 9"),
+(10, "Initial data 10");
+Show basic row-level conflict detection
+---------------------------------------
+stop slave;
+update t1 set b="Primary first change 2" where a=2;
+select * from test.t1 order by a;
+a b
+1 Initial data 1
+2 Primary first change 2
+3 Initial data 3
+4 Initial data 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+update t1 set b="Secondary first change 2" where a=2;
+select * from test.t1 order by a;
+a b
+1 Initial data 1
+2 Secondary first change 2
+3 Initial data 3
+4 Initial data 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+Primary should have rejected change from Secondary, keeping its value
+select * from t1 order by a;
+a b
+1 Initial data 1
+2 Primary first change 2
+3 Initial data 3
+4 Initial data 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+1
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should have been realigned to Primary
+select * from t1 order by a;
+a b
+1 Initial data 1
+2 Primary first change 2
+3 Initial data 3
+4 Initial data 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+Show rollback of whole secondary transaction
+--------------------------------------------
+stop slave;
+update t1 set b="Primary second change 4" where a=4;
+select * from test.t1 order by a;
+a b
+1 Initial data 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+begin;
+update t1 set b="Secondary second change 4" where a=4;
+update t1 set b="Secondary second change 5" where a=5;
+commit;
+select * from test.t1 order by a;
+a b
+1 Initial data 1
+2 Primary first change 2
+3 Initial data 3
+4 Secondary second change 4
+5 Secondary second change 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+Primary should have rejected secondary changes on both rows
+select * from test.t1 order by a;
+a b
+1 Initial data 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+a b
+1 Initial data 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+Show rollback of dependent transaction as well
+----------------------------------------------
+stop slave;
+update t1 set b="Primary third change 1" where a=1;
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+begin;
+update t1 set b="Secondary third change 3" where a=3;
+update t1 set b="Secondary third change 1" where a=1;
+commit;
+begin;
+update t1 set b="Secondary fourth change 3" where a=3;
+insert into t1 values (11,"Secondary fourth change 11");
+commit;
+select * from test.t1 order by a;
+a b
+1 Secondary third change 1
+2 Primary first change 2
+3 Secondary fourth change 3
+4 Primary second change 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+11 Secondary fourth change 11
+Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+ndb_conflict_trans_row_reject_count
+4
+ndb_conflict_trans_reject_count
+2
+start slave;
+Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Initial data 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+Show rollback of dependent transaction across different tables
+--------------------------------------------------------------
+stop slave;
+update t1 set b="Primary fifth change 6" where a=6;
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+begin;
+update t1 set b="Secondary fifth change 6" where a=6;
+insert into t2 values (1, "Secondary fifth change 1");
+insert into t2 values (2, "Secondary fifth change 2");
+commit;
+begin;
+update t2 set b="Secondary sixth change 1" where a=2;
+insert into t2 values (3, "Secondary sixth change 2");
+commit;
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Secondary fifth change 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+select * from test.t2 order by a;
+a b
+1 Secondary fifth change 1
+2 Secondary sixth change 1
+3 Secondary sixth change 2
+Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+select * from test.t2 order by a;
+a b
+ndb_conflict_trans_row_reject_count
+5
+ndb_conflict_trans_reject_count
+2
+start slave;
+Secondary should have been realigned to primary
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Initial data 10
+select * from test.t2 order by a;
+a b
+Show that whole epoch is not rolled back
+----------------------------------------
+stop slave;
+update t1 set b="Primary is king" where a=10;
+begin;
+update t1 set b="Secondary is emperor" where a=10;
+insert into t1 values (11, "Secondary is pleni-potentiary");
+commit;
+begin;
+insert into t1 values (12, "Secondary ruled once");
+insert into t1 values (13, "This history will not be lost");
+insert into t1 values (14, "Look on my works ye mighty, and despair");
+commit;
+Primary should have rejected conflicting trans (emperor, pleni-potentiary)
+but accepted unrelated trans (history)
+select * from t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Primary is king
+12 Secondary ruled once
+13 This history will not be lost
+14 Look on my works ye mighty, and despair
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary should be aligned with Primary
+select * from t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 Initial data 7
+8 Initial data 8
+9 Initial data 9
+10 Primary is king
+12 Secondary ruled once
+13 This history will not be lost
+14 Look on my works ye mighty, and despair
+Show that non-conflicting ancestors are not implicated
+------------------------------------------------------
+stop slave;
+update t1 set b="7 : Primary is king" where a=7;
+Primary state
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 7 : Primary is king
+8 Initial data 8
+9 Initial data 9
+10 Primary is king
+12 Secondary ruled once
+13 This history will not be lost
+14 Look on my works ye mighty, and despair
+begin;
+update t1 set b="8 : Secondary innocent" where a=8;
+update t1 set b="9 : Secondary innocent" where a=9;
+commit;
+Secondary with innocent
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 Initial data 7
+8 8 : Secondary innocent
+9 9 : Secondary innocent
+10 Primary is king
+12 Secondary ruled once
+13 This history will not be lost
+14 Look on my works ye mighty, and despair
+begin;
+update t1 set b="9 : Secondary guilty" where a=9;
+update t1 set b="7 : Secondary guilty" where a=7;
+commit;
+Secondary with guilty overlaid
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 7 : Secondary guilty
+8 8 : Secondary innocent
+9 9 : Secondary guilty
+10 Primary is king
+12 Secondary ruled once
+13 This history will not be lost
+14 Look on my works ye mighty, and despair
+Primary cluster should have rejected 'guilty' secondary transaction, but
+accepted 'innocent' secondary transaction.
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 7 : Primary is king
+8 8 : Secondary innocent
+9 9 : Secondary innocent
+10 Primary is king
+12 Secondary ruled once
+13 This history will not be lost
+14 Look on my works ye mighty, and despair
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+2
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+start slave;
+Secondary cluster should be realigned with Primary
+select * from test.t1 order by a;
+a b
+1 Primary third change 1
+2 Primary first change 2
+3 Initial data 3
+4 Primary second change 4
+5 Initial data 5
+6 Primary fifth change 6
+7 7 : Primary is king
+8 8 : Secondary innocent
+9 9 : Secondary innocent
+10 Primary is king
+12 Secondary ruled once
+13 This history will not be lost
+14 Look on my works ye mighty, and despair
+Classic banking example
+-----------------------
+replace into mysql.ndb_replication values
+("test", "balances", 3, 7, NULL),
+("test", "balances", 1, 7, "NDB$EPOCH_TRANS()");
+replace into mysql.ndb_replication values
+("test", "transactions", 3, 7, NULL),
+("test", "transactions", 1, 7, "NDB$EPOCH_TRANS()");
+create table test.balances
+(name varchar(100) primary key,
+balance int) engine=ndb;
+create table test.transactions$EX
+(server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+auto_key int not null,
+from_name varchar(100) not null,
+to_name varchar(100) not null,
+detail varchar(100) not null,
+primary key(server_id, master_server_id, master_epoch, count))
+engine=ndb;
+create table test.transactions
+(auto_key int auto_increment,
+from_name varchar(100),
+to_name varchar(100),
+detail varchar(100),
+amount int,
+primary key(auto_key, from_name, to_name, detail)) engine=ndb;
+Initialise balances across both bank sites
+insert into test.balances values
+("Larry", 100),
+("Employee-1", 0),
+("Employee-2", 0),
+("Yacht dealer", 0),
+("Newsagent", 0);
+FLUSH LOGS;
+Bank sites are disconnected
+stop slave;
+Larry buys a yacht using Primary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Yacht dealer", "Yacht purchase", 50);
+update test.balances set balance = balance - 50 where name = "Larry";
+update test.balances set balance = balance + 50 where name = "Yacht dealer";
+commit;
+Show yacht transaction records
+select * from test.transactions order by auto_key;
+auto_key from_name to_name detail amount
+1 Larry Yacht dealer Yacht purchase 50
+select * from test.balances order by name;
+name balance
+Employee-1 0
+Employee-2 0
+Larry 50
+Newsagent 0
+Yacht dealer 50
+Larry pays employees using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Employee-1", "Payment to Employee-1", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-1";
+commit;
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Larry", "Employee-2", "Payment to Employee-2", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-2";
+commit;
+Employee-2 buys yacht magazine using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+values ("Employee-2", "Newsagent", "Buy yacht magazine", 1);
+update test.balances set balance = balance - 1 where name = "Employee-2";
+update test.balances set balance = balance + 1 where name = "Newsagent";
+commit;
+Show employee transactions
+select * from test.transactions order by auto_key;
+auto_key from_name to_name detail amount
+1 Larry Employee-1 Payment to Employee-1 1
+2 Larry Employee-2 Payment to Employee-2 1
+3 Employee-2 Newsagent Buy yacht magazine 1
+select * from test.balances order by name;
+name balance
+Employee-1 1
+Employee-2 0
+Larry 98
+Newsagent 1
+Yacht dealer 0
+Bank sites re-connected
+start slave;
+Records at Primary bank site
+select * from test.transactions order by auto_key;
+auto_key from_name to_name detail amount
+1 Larry Yacht dealer Yacht purchase 50
+select * from test.balances order by name;
+name balance
+Employee-1 0
+Employee-2 0
+Larry 50
+Newsagent 0
+Yacht dealer 50
+Exceptions at Primary bank site
+select server_id, master_server_id, count, auto_key, from_name, to_name, detail
+from test.transactions$EX order by count;
+server_id master_server_id count auto_key from_name to_name detail
+1 3 1 1 Larry Employee-1 Payment to Employee-1
+1 3 2 2 Larry Employee-2 Payment to Employee-2
+1 3 3 3 Employee-2 Newsagent Buy yacht magazine
+Conflict handling activity at Primary bank site
+Expect :
+1 conflict from slave T1 on Larry's balance
+1 conflict from slave T2 on Larry's balance
+=2 row conflicts
+
+3 (user) transactions rejected
+9 rows rejected (3 per transaction)
+Variability : # epoch transactions, # row conflicts detected
+1-3 2-3
+# detect_iter_count
+1-3
+We only check stable values
+ndb_conflict_trans_row_reject_count
+9
+ndb_conflict_trans_reject_count
+3
+Records at Secondary bank site
+select * from test.transactions order by auto_key;
+auto_key from_name to_name detail amount
+1 Larry Yacht dealer Yacht purchase 50
+select * from test.balances order by name;
+name balance
+Employee-1 0
+Employee-2 0
+Larry 50
+Newsagent 0
+Yacht dealer 50
+drop table test.balances;
+drop table test.transactions;
+drop table test.transactions$EX;
+Test mixing transactional and non transactional
+-----------------------------------------------
+Remove old data from t1
+delete from test.t1;
+Define table with row-based epoch detection
+replace into mysql.ndb_replication values
+("test", "t3", 3, 7, NULL),
+("test", "t3", 1, 7, 'NDB$EPOCH()');
+create table t3 (a int primary key, b int) engine=ndb;
+create table t4 (a int primary key, b int) engine=ndb;
+create table t5 (a int primary key, b longtext) engine=ndb;
+Insert some data
+insert into test.t1 values
+(1,1),
+(2,2),
+(3,3),
+(4,4),
+(5,5),
+(6,6);
+insert into test.t3 values
+(11,11),
+(12,12),
+(13,13),
+(14,14),
+(15,15),
+(16,16);
+insert into test.t4 values
+(21,21),
+(22,22),
+(23,23),
+(24,24),
+(25,25),
+(26,26);
+insert into test.t5 values
+(1, REPEAT("B", 10000)),
+(2, REPEAT("E", 10000)),
+(3, REPEAT("A", 10000));
+Allow to propagate
+FLUSH LOGS;
+Case 1 : Transactional detection affects row - based entries in same trans
+stop slave;
+update test.t1 set b=100 where a=1;
+begin;
+update test.t3 set b=1100 where a=11;
+update test.t4 set b=2100 where a=21;
+update test.t1 set b=1000 where a=1;
+commit;
+Show slave transaction effect
+select * from test.t1 order by a;
+a b
+1 1000
+2 2
+3 3
+4 4
+5 5
+6 6
+select * from test.t3 order by a;
+a b
+11 1100
+12 12
+13 13
+14 14
+15 15
+16 16
+select * from test.t4 order by a;
+a b
+21 2100
+22 22
+23 23
+24 24
+25 25
+26 26
+Expect Primary to have rejected whole trans across 3 tables
+select * from test.t1 order by a;
+a b
+1 100
+2 2
+3 3
+4 4
+5 5
+6 6
+select * from test.t3 order by a;
+a b
+11 11
+12 12
+13 13
+14 14
+15 15
+16 16
+select * from test.t4 order by a;
+a b
+21 21
+22 22
+23 23
+24 24
+25 25
+26 26
+Expect 1 transaction rejected, 3 rows rejected
+1 conflict row, 1 epoch, 1 iteration
+ndb_conflict_fn_epoch_trans
+1
+ndb_conflict_trans_row_conflict_count
+1
+ndb_conflict_trans_row_reject_count
+3
+ndb_conflict_trans_reject_count
+1
+ndb_conflict_trans_detect_iter_count
+1
+ndb_conflict_trans_conflict_commit_count
+1
+Now restart rep to Secondary, and check realignment
+start slave;
+select * from test.t1 order by a;
+a b
+1 100
+2 2
+3 3
+4 4
+5 5
+6 6
+select * from test.t3 order by a;
+a b
+11 11
+12 12
+13 13
+14 14
+15 15
+16 16
+select * from test.t4 order by a;
+a b
+21 21
+22 22
+23 23
+24 24
+25 25
+26 26
+Case 2 : Row based detection does not affect other transaction entries
+stop slave;
+update test.t3 set b=1200 where a=12;
+begin;
+update test.t3 set b=1201 where a=12;
+update test.t4 set b=2200 where a=22;
+update test.t1 set b=2000 where a=2;
+commit;
+Show effect of transaction on Secondary
+select * from test.t1 order by a;
+a b
+1 100
+2 2000
+3 3
+4 4
+5 5
+6 6
+select * from test.t3 order by a;
+a b
+11 11
+12 1201
+13 13
+14 14
+15 15
+16 16
+select * from test.t4 order by a;
+a b
+21 21
+22 2200
+23 23
+24 24
+25 25
+26 26
+Show effect of transaction on Primary
+Only t3 should have been reverted
+select * from test.t1 order by a;
+a b
+1 100
+2 2000
+3 3
+4 4
+5 5
+6 6
+select * from test.t3 order by a;
+a b
+11 11
+12 1200
+13 13
+14 14
+15 15
+16 16
+select * from test.t4 order by a;
+a b
+21 21
+22 2200
+23 23
+24 24
+25 25
+26 26
+Expect all counters to be zero
+ndb_conflict_fn_epoch_trans
+0
+ndb_conflict_trans_row_conflict_count
+0
+ndb_conflict_trans_row_reject_count
+0
+ndb_conflict_trans_reject_count
+0
+ndb_conflict_trans_detect_iter_count
+0
+ndb_conflict_trans_conflict_commit_count
+0
+Show effect of transaction on Secondary
+start slave;
+select * from test.t1 order by a;
+a b
+1 100
+2 2000
+3 3
+4 4
+5 5
+6 6
+select * from test.t3 order by a;
+a b
+11 11
+12 1200
+13 13
+14 14
+15 15
+16 16
+select * from test.t4 order by a;
+a b
+21 21
+22 2200
+23 23
+24 24
+25 25
+26 26
+flush logs;
+Case 3 : Check behaviour where table with Blob is implicated
+in transactional conflict. Should result in Slave
+stopping with an error.
+STOP SLAVE;
+Setup warning suppression
+begin;
+update t1 set b= 11 where a=1;
+commit;
+begin;
+update t1 set b= 111 where a=1;
+update t1 set b= 222 where a=2;
+update t5 set b= REPEAT("T", 10000) where a=3;
+commit;
+Show effect of transaction on Secondary
+select * from test.t1 order by a;
+a b
+1 111
+2 222
+3 3
+4 4
+5 5
+6 6
+select left(b,1), length(b) from test.t5 order by a;
+left(b,1) length(b)
+B 10000
+E 10000
+T 10000
+Check that Primary Slave has stopped
+include/wait_for_slave_sql_error.inc [errno=1296]
+Restart Primary Slave
+set global sql_slave_skip_counter=1;
+START SLAVE;
+Restart Secondary Slave
+START SLAVE;
+flush logs;
+drop table test.t3;
+drop table test.t4;
+drop table test.t5;
+drop table mysql.ndb_replication;
+drop table test.t1;
+drop table test.t2;
+flush logs;
+stop slave;
+reset slave;
+include/rpl_end.inc
=== added file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.cnf 2011-09-09 09:30:43 +0000
@@ -0,0 +1,41 @@
+# Could be :
+# !include ndb_rpl_conflict_epoch.cnf
+# [mysqld]
+# server-id-bits=24
+# ndb-serverid-transid-bits=8
+
+!include ../my.cnf
+
+# 2 clusters, each with 2 MySQLDs
+# All MySQLDs log-slave-updates
+# Potential infinite loops are broken by both servers
+# on each cluster having the same server-id
+
+[mysqld]
+log-slave-updates
+ndb-log-apply-status
+ndb-log-transaction-id
+
+[mysqld.1.1]
+server-id= 1
+log-bin = pref-master-1
+
+[mysqld.2.1]
+server-id= 2
+log-bin = pref-master-2
+
+[mysqld.1.slave]
+server-id= 3
+log-bin = sec-master-1
+skip-slave-start
+
+[mysqld.2.slave]
+server-id= 4
+log-bin = sec-master-2
+skip-slave-start
+ndb_connectstring= @mysql_cluster.slave.ndb_connectstring
+
+[ENV]
+
+SLAVE_MYPORT1= @mysqld.2.slave.port
+SLAVE_MYSOCK1= @mysqld.2.slave.socket
=== added file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch_trans.test 2011-09-09 09:30:43 +0000
@@ -0,0 +1,746 @@
+#
+# Test engine native conflict resolution for ndb
+# NDB$EPOCH_TRANS() function
+#
+#
+--source include/have_ndb.inc
+--source include/have_binlog_format_mixed_or_row.inc
+--source suite/ndb_rpl/ndb_master-slave.inc
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Setup circular replication
+
+--connection slave
+RESET MASTER;
+select @slave_server_id:=(variable_value+0)
+ from information_schema.global_variables
+ where variable_name like 'server_id';
+let $SLAVE_SERVER_ID= query_get_value('select @slave_server_id as v',v,1);
+
+--connection master
+--replace_result $SLAVE_MYPORT SLAVE_PORT
+--eval CHANGE MASTER TO master_host="127.0.0.1",master_port=$SLAVE_MYPORT,master_user="root"
+START SLAVE;
+select @master_server_id:=(variable_value+0)
+ from information_schema.global_variables
+ where variable_name like 'server_id';
+let $MASTER_SERVER_ID= query_get_value('select @master_server_id as v',v,1);
+
+--echo Setup ndb_replication and t1$EX exceptions table
+
+--disable_warnings
+--disable_query_log
+--connection master
+drop table if exists mysql.ndb_replication;
+CREATE TABLE mysql.ndb_replication
+ (db VARBINARY(63),
+ table_name VARBINARY(63),
+ server_id INT UNSIGNED,
+ binlog_type INT UNSIGNED,
+ conflict_fn VARBINARY(128),
+ PRIMARY KEY USING HASH (db,table_name,server_id))
+ ENGINE=NDB PARTITION BY KEY(db,table_name);
+--enable_warnings
+--enable_query_log
+
+--echo Populate ndb_replication table as necessary
+eval replace into mysql.ndb_replication values
+ ("test", "t1", $SLAVE_SERVER_ID, 7, NULL),
+ ("test", "t1", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+eval replace into mysql.ndb_replication values
+ ("test", "t2", $SLAVE_SERVER_ID, 7, NULL),
+ ("test", "t2", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+create table test.t1 (
+ a int primary key,
+ b varchar(2000)) engine=ndb;
+
+create table test.t2 (
+ a int primary key,
+ b varchar(2000)) engine=ndb;
+
+--sync_slave_with_master slave
+
+--connection master
+--echo Add some data
+insert into test.t1 values
+ (1, "Initial data 1"),
+ (2, "Initial data 2"),
+ (3, "Initial data 3"),
+ (4, "Initial data 4"),
+ (5, "Initial data 5"),
+ (6, "Initial data 6"),
+ (7, "Initial data 7"),
+ (8, "Initial data 8"),
+ (9, "Initial data 9"),
+ (10, "Initial data 10");
+
+--echo Show basic row-level conflict detection
+--echo ---------------------------------------
+--sync_slave_with_master slave
+--connection slave
+
+stop slave;
+
+--connection master
+
+update t1 set b="Primary first change 2" where a=2;
+select * from test.t1 order by a;
+
+--connection slave
+
+update t1 set b="Secondary first change 2" where a=2;
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--echo Primary should have rejected change from Secondary, keeping its value
+
+select * from t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+
+start slave;
+
+--connection master
+
+--sync_slave_with_master slave
+
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+
+select * from t1 order by a;
+
+--echo Show rollback of whole secondary transaction
+--echo --------------------------------------------
+
+--connection slave
+
+stop slave;
+
+--connection master
+update t1 set b="Primary second change 4" where a=4;
+
+select * from test.t1 order by a;
+
+--connection slave
+begin;
+update t1 set b="Secondary second change 4" where a=4;
+update t1 set b="Secondary second change 5" where a=5;
+commit;
+
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--connection master
+
+--echo Primary should have rejected secondary changes on both rows
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+select * from test.t1 order by a;
+
+--echo Show rollback of dependent transaction as well
+--echo ----------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="Primary third change 1" where a=1;
+
+select * from test.t1 order by a;
+
+--connection slave
+
+begin;
+update t1 set b="Secondary third change 3" where a=3;
+update t1 set b="Secondary third change 1" where a=1; # Conflict here
+commit;
+begin;
+update t1 set b="Secondary fourth change 3" where a=3; # Dependency on conflict here
+insert into t1 values (11,"Secondary fourth change 11");
+commit;
+
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary should have been realigned to Primary
+
+select * from test.t1 order by a;
+
+
+--echo Show rollback of dependent transaction across different tables
+--echo --------------------------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+
+update t1 set b="Primary fifth change 6" where a=6;
+
+select * from test.t1 order by a;
+
+--connection slave
+
+begin;
+update t1 set b="Secondary fifth change 6" where a=6; # Conflict row
+insert into t2 values (1, "Secondary fifth change 1");
+insert into t2 values (2, "Secondary fifth change 2");
+commit;
+begin;
+update t2 set b="Secondary sixth change 1" where a=2; # Dependent row
+insert into t2 values (3, "Secondary sixth change 2");
+commit;
+
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected all secondary changes
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+--connection master
+
+--sync_slave_with_master slave
+
+--echo Secondary should have been realigned to primary
+select * from test.t1 order by a;
+select * from test.t2 order by a;
+
+--echo Show that whole epoch is not rolled back
+--echo ----------------------------------------
+# Whole epoch is rolled back when --ndb-serverid-transid-bits is 0!
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="Primary is king" where a=10;
+
+--connection slave
+begin;
+update t1 set b="Secondary is emperor" where a=10;
+insert into t1 values (11, "Secondary is pleni-potentiary");
+commit;
+
+begin;
+insert into t1 values (12, "Secondary ruled once");
+insert into t1 values (13, "This history will not be lost");
+insert into t1 values (14, "Look on my works ye mighty, and despair");
+commit;
+
+--sync_slave_with_master master
+--connection master
+
+--echo Primary should have rejected conflicting trans (emperor, pleni-potentiary)
+--echo but accepted unrelated trans (history)
+
+select * from t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary should be aligned with Primary
+
+select * from t1 order by a;
+
+
+--echo Show that non-conflicting ancestors are not implicated
+--echo ------------------------------------------------------
+
+--connection slave
+stop slave;
+
+--connection master
+update t1 set b="7 : Primary is king" where a=7;
+
+--echo Primary state
+select * from test.t1 order by a;
+
+--connection slave
+
+# 'Innocent' secondary transaction
+begin;
+update t1 set b="8 : Secondary innocent" where a=8;
+update t1 set b="9 : Secondary innocent" where a=9;
+commit;
+
+--echo Secondary with innocent
+select * from test.t1 order by a;
+
+# 'Guilty secondary transaction, affecting one of the same rows as innocent
+begin;
+update t1 set b="9 : Secondary guilty" where a=9; # Dependency on innocent
+update t1 set b="7 : Secondary guilty" where a=7; # Conflict row
+commit;
+
+--echo Secondary with guilty overlaid
+select * from test.t1 order by a;
+
+--sync_slave_with_master master
+
+--connection master
+
+--echo Primary cluster should have rejected 'guilty' secondary transaction, but
+--echo accepted 'innocent' secondary transaction.
+
+select * from test.t1 order by a;
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+--echo Secondary cluster should be realigned with Primary
+
+select * from test.t1 order by a;
+
+
+--connection master
+
+--echo Classic banking example
+--echo -----------------------
+
+eval replace into mysql.ndb_replication values
+ ("test", "balances", $SLAVE_SERVER_ID, 7, NULL),
+ ("test", "balances", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+# Transactions table may not need conflict-detection?
+eval replace into mysql.ndb_replication values
+ ("test", "transactions", $SLAVE_SERVER_ID, 7, NULL),
+ ("test", "transactions", $MASTER_SERVER_ID, 7, "NDB\$EPOCH_TRANS()");
+
+create table test.balances
+(name varchar(100) primary key,
+ balance int) engine=ndb;
+
+create table test.transactions$EX
+ (server_id int unsigned,
+ master_server_id int unsigned,
+ master_epoch bigint unsigned,
+ count int unsigned,
+ auto_key int not null,
+ from_name varchar(100) not null,
+ to_name varchar(100) not null,
+ detail varchar(100) not null,
+ primary key(server_id, master_server_id, master_epoch, count))
+engine=ndb;
+
+
+create table test.transactions
+(auto_key int auto_increment,
+ from_name varchar(100),
+ to_name varchar(100),
+ detail varchar(100),
+ amount int,
+ primary key(auto_key, from_name, to_name, detail)) engine=ndb;
+
+--echo Initialise balances across both bank sites
+insert into test.balances values
+ ("Larry", 100),
+ ("Employee-1", 0),
+ ("Employee-2", 0),
+ ("Yacht dealer", 0),
+ ("Newsagent", 0);
+
+--sync_slave_with_master slave
+--connection slave
+# Sync back to master, to ensure that what follows on slave,
+# is in a separate epoch transaction.
+# This is needed to get stable counts, not for correctness
+#
+FLUSH LOGS; # To give a position to sync
+--sync_slave_with_master master
+
+
+--echo Bank sites are disconnected
+--connection slave
+stop slave;
+--connection master
+
+--echo Larry buys a yacht using Primary bank site
+
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+ values ("Larry", "Yacht dealer", "Yacht purchase", 50);
+update test.balances set balance = balance - 50 where name = "Larry";
+update test.balances set balance = balance + 50 where name = "Yacht dealer";
+commit;
+
+--echo Show yacht transaction records
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--connection slave
+--echo Larry pays employees using Secondary bank site
+
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+ values ("Larry", "Employee-1", "Payment to Employee-1", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-1";
+commit;
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+ values ("Larry", "Employee-2", "Payment to Employee-2", 1);
+update test.balances set balance = balance - 1 where name = "Larry";
+update test.balances set balance = balance + 1 where name = "Employee-2";
+commit;
+
+--echo Employee-2 buys yacht magazine using Secondary bank site
+begin;
+insert into test.transactions (from_name, to_name, detail, amount)
+ values ("Employee-2", "Newsagent", "Buy yacht magazine", 1);
+update test.balances set balance = balance - 1 where name = "Employee-2";
+update test.balances set balance = balance + 1 where name = "Newsagent";
+commit;
+
+--echo Show employee transactions
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--sync_slave_with_master master
+
+--echo Bank sites re-connected
+--connection slave
+start slave;
+
+--connection master
+--sync_slave_with_master slave
+
+--connection master
+
+--echo Records at Primary bank site
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--echo Exceptions at Primary bank site
+
+select server_id, master_server_id, count, auto_key, from_name, to_name, detail
+ from test.transactions$EX order by count;
+
+--echo Conflict handling activity at Primary bank site
+--echo Expect :
+--echo 1 conflict from slave T1 on Larry's balance
+--echo 1 conflict from slave T2 on Larry's balance
+--echo =2 row conflicts
+--echo
+--echo 3 (user) transactions rejected
+--echo 9 rows rejected (3 per transaction)
+--echo Variability : # epoch transactions, # row conflicts detected
+--echo 1-3 2-3
+--echo # detect_iter_count
+--echo 1-3
+--echo We only check stable values
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--connection slave
+
+--echo Records at Secondary bank site
+
+select * from test.transactions order by auto_key;
+select * from test.balances order by name;
+
+--sync_slave_with_master master
+
+--connection master
+drop table test.balances;
+drop table test.transactions;
+drop table test.transactions$EX;
+
+--echo Test mixing transactional and non transactional
+--echo -----------------------------------------------
+--echo Remove old data from t1
+--connection master
+delete from test.t1;
+--sync_slave_with_master slave
+--connection master
+
+--echo Define table with row-based epoch detection
+eval replace into mysql.ndb_replication values
+ ("test", "t3", $SLAVE_SERVER_ID, 7, NULL),
+ ("test", "t3", $MASTER_SERVER_ID, 7, 'NDB\$EPOCH()');
+
+create table t3 (a int primary key, b int) engine=ndb;
+create table t4 (a int primary key, b int) engine=ndb;
+create table t5 (a int primary key, b longtext) engine=ndb;
+
+--echo Insert some data
+
+insert into test.t1 values
+ (1,1),
+ (2,2),
+ (3,3),
+ (4,4),
+ (5,5),
+ (6,6);
+
+insert into test.t3 values
+ (11,11),
+ (12,12),
+ (13,13),
+ (14,14),
+ (15,15),
+ (16,16);
+
+insert into test.t4 values
+ (21,21),
+ (22,22),
+ (23,23),
+ (24,24),
+ (25,25),
+ (26,26);
+
+insert into test.t5 values
+ (1, REPEAT("B", 10000)),
+ (2, REPEAT("E", 10000)),
+ (3, REPEAT("A", 10000));
+
+--echo Allow to propagate
+--sync_slave_with_master slave
+
+--connection slave
+FLUSH LOGS; # Ensure Inserts are in previous epoch trans to what follows
+--sync_slave_with_master master
+
+--echo Case 1 : Transactional detection affects row - based entries in same trans
+--connection slave
+stop slave;
+--connection master
+update test.t1 set b=100 where a=1;
+
+--connection slave
+# t3 is in a table without trans conflict detection (but with row based)
+# t4 is in a table without any detection
+# t1 is in a table with trans conflict detection
+begin;
+update test.t3 set b=1100 where a=11;
+update test.t4 set b=2100 where a=21;
+update test.t1 set b=1000 where a=1;
+commit;
+
+--echo Show slave transaction effect
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+
+--sync_slave_with_master master
+
+--connection master
+--echo Expect Primary to have rejected whole trans across 3 tables
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Expect 1 transaction rejected, 3 rows rejected
+--echo 1 conflict row, 1 epoch, 1 iteration
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Now restart rep to Secondary, and check realignment
+--connection slave
+start slave;
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Case 2 : Row based detection does not affect other transaction entries
+--connection slave
+stop slave;
+--connection master
+update test.t3 set b=1200 where a=12;
+
+--connection slave
+# Transaction conflicts with master, on table without transactional
+# conflict detection
+# Conflict will be detected on row, but no other transaction state
+# will be reverted
+#
+begin;
+update test.t3 set b=1201 where a=12;
+update test.t4 set b=2200 where a=22;
+update test.t1 set b=2000 where a=2;
+commit;
+
+--echo Show effect of transaction on Secondary
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--sync_slave_with_master master
+
+--echo Show effect of transaction on Primary
+--echo Only t3 should have been reverted
+
+--connection master
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+--echo Expect all counters to be zero
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info.inc
+
+--source suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc
+
+--echo Show effect of transaction on Secondary
+--connection slave
+start slave;
+--connection master
+--sync_slave_with_master slave
+--connection slave
+
+select * from test.t1 order by a;
+select * from test.t3 order by a;
+select * from test.t4 order by a;
+
+flush logs;
+--sync_slave_with_master master
+
+--echo Case 3 : Check behaviour where table with Blob is implicated
+--echo in transactional conflict. Should result in Slave
+--echo stopping with an error.
+
+--connection slave
+STOP SLAVE;
+
+--connection master
+
+--echo Setup warning suppression
+--disable_query_log
+call mtr.add_suppression("Transaction conflict handling on table t5 failed as table has Blobs which cannot be refreshed");
+call mtr.add_suppression("NDBCLUSTER Error_code: 1296");
+--enable_query_log
+
+
+begin;
+update t1 set b= 11 where a=1;
+commit;
+
+--connection slave
+begin;
+update t1 set b= 111 where a=1; # Conflict
+update t1 set b= 222 where a=2; # Implicated row
+update t5 set b= REPEAT("T", 10000) where a=3; # ImplicatedBlob update
+commit;
+
+--echo Show effect of transaction on Secondary
+select * from test.t1 order by a;
+select left(b,1), length(b) from test.t5 order by a;
+
+--echo Check that Primary Slave has stopped
+--connection master
+
+--let $slave_sql_errno=1296
+--source include/wait_for_slave_sql_error.inc
+#SHOW SLAVE STATUS;
+
+--echo Restart Primary Slave
+set global sql_slave_skip_counter=1;
+
+START SLAVE;
+
+--connection slave
+
+--echo Restart Secondary Slave
+START SLAVE;
+
+flush logs;
+--sync_slave_with_master master
+
+--connection master
+drop table test.t3;
+drop table test.t4;
+drop table test.t5;
+
+# Cleanup
+--connection master
+drop table mysql.ndb_replication;
+drop table test.t1;
+drop table test.t2;
+
+--sync_slave_with_master slave
+
+--connection slave
+flush logs;
+--sync_slave_with_master master
+stop slave;
+reset slave;
+#change master to master_host='';
+--source include/rpl_end.inc
+
+# TODO
+# More complex dependencies
\ No newline at end of file
=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info.inc 2011-09-07 22:50:01 +0000
@@ -0,0 +1,8 @@
+--disable_query_log
+SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_epoch_trans AS ndb_conflict_fn_epoch_trans FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_conflict_count AS ndb_conflict_trans_row_conflict_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_reject_count AS ndb_conflict_trans_row_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_reject_count AS ndb_conflict_trans_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_detect_iter_count AS ndb_conflict_trans_detect_iter_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_conflict_commit_count AS ndb_conflict_trans_conflict_commit_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
\ No newline at end of file
=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_init.inc 2011-09-07 22:50:01 +0000
@@ -0,0 +1,10 @@
+--disable_query_log
+--disable_result_log
+SELECT @init_ndb_conflict_fn_epoch_trans:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+SELECT @init_ndb_conflict_trans_row_conflict_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT @init_ndb_conflict_trans_row_reject_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT @init_ndb_conflict_trans_reject_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+SELECT @init_ndb_conflict_trans_detect_iter_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+SELECT @init_ndb_conflict_trans_conflict_commit_count:=(VARIABLE_VALUE+0) FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
+--enable_result_log
=== added file 'mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc'
--- a/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_trans_conflict_info_stable.inc 2011-09-07 22:50:01 +0000
@@ -0,0 +1,22 @@
+--disable_query_log
+# Where multiple user transactions from the Secondary site are
+# involved, the actual number of rows in-conflict, the number
+# of epoch transactions with conflicts, and the number of
+# iterations, vary depending on the distribution of the user
+# transactions across epochs. e.g. if 3 transactions are in
+# one epoch (and executed in one batch), then only actual
+# conflicts will be recorded, but if they are separated, then
+# implied rows may conflict as well. This can make these
+# counter values non-deterministic, so this .inc file is used
+# to check that the stable counters are correct. The stable
+# counters are the number of rejected rows, and the number
+# of rejected transactions, which must be the same, regardless
+# of how the epoch boundaries lie.
+#
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_fn_epoch_trans AS ndb_conflict_fn_epoch_trans FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_FN_EPOCH_TRANS";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_conflict_count AS ndb_conflict_trans_row_conflict_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_CONFLICT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_row_reject_count AS ndb_conflict_trans_row_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_ROW_REJECT_COUNT";
+SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_reject_count AS ndb_conflict_trans_reject_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_REJECT_COUNT";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_detect_iter_count AS ndb_conflict_trans_detect_iter_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_DETECT_ITER_COUNT";
+#SELECT VARIABLE_VALUE-@init_ndb_conflict_trans_conflict_commit_count AS ndb_conflict_trans_conflict_commit_count FROM INFORMATION_SCHEMA.GLOBAL_STATUS WHERE VARIABLE_NAME LIKE "NDB_CONFLICT_TRANS_CONFLICT_COMMIT_COUNT";
+--enable_query_log
\ No newline at end of file
=== added file 'mysql-test/suite/rpl/r/rpl_extra_row_data.result'
--- a/mysql-test/suite/rpl/r/rpl_extra_row_data.result 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/r/rpl_extra_row_data.result 2011-09-07 22:50:01 +0000
@@ -0,0 +1,46 @@
+include/master-slave.inc
+[connection master]
+Basic insert, update, delete from Master->Slave
+DBUG code will set + check transfer of extra
+row data in RBR
+**** On Master ****
+CREATE TABLE t1 (a INT);
+Ten inserts in one transaction -> 1 epoch transaction
+BEGIN;
+INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+COMMIT;
+Wait for Binlog-on-disk
+flush logs;
+Check that we have the expected extra row data in the Binlog
+create table raw_data(txt varchar(1000));
+select replace(txt, '\r', '') from raw_data where txt like '%### Extra row data format%' order by txt;
+replace(txt, '\r', '')
+### Extra row data format: 0, len: 0 :
+### Extra row data format: 1, len: 1 :0x01
+### Extra row data format: 2, len: 2 :0x0202
+### Extra row data format: 3, len: 3 :0x030303
+### Extra row data format: 4, len: 4 :0x04040404
+### Extra row data format: 5, len: 5 :0x0505050505
+### Extra row data format: 6, len: 6 :0x060606060606
+### Extra row data format: 7, len: 7 :0x07070707070707
+### Extra row data format: 8, len: 8 :0x0808080808080808
+### Extra row data format: 9, len: 9 :0x090909090909090909
+drop table raw_data;
+Generate some more insert, update, delete traffic
+INSERT INTO t1 SELECT a+10 FROM t1;
+INSERT INTO t1 SELECT a+20 FROM t1;
+INSERT INTO t1 SELECT a+40 FROM t1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+DELETE FROM t1 WHERE a > 390;
+**** On Slave ****
+Check row count and that slave is running ok
+SELECT count(*) from t1;
+count(*)
+80
+include/check_slave_is_running.inc
+DROP TABLE t1;
+include/rpl_end.inc
=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data-master.opt 2011-09-07 22:50:01 +0000
@@ -0,0 +1 @@
+--loose-debug=+d,extra_row_data_set
=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data-slave.opt 2011-09-07 22:50:01 +0000
@@ -0,0 +1 @@
+--loose-debug=+d,extra_row_data_check
=== added file 'mysql-test/suite/rpl/t/rpl_extra_row_data.test'
--- a/mysql-test/suite/rpl/t/rpl_extra_row_data.test 1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/rpl/t/rpl_extra_row_data.test 2011-09-07 22:50:01 +0000
@@ -0,0 +1,68 @@
+--source include/master-slave.inc
+--source include/have_debug.inc
+--source include/have_binlog_format_row.inc
+
+#
+# Test Binlog row extra data added as part of WL5353
+# Test depends on functionality added to server code within
+# #ifndef MCP_WL5353
+#
+#
+--echo Basic insert, update, delete from Master->Slave
+--echo DBUG code will set + check transfer of extra
+--echo row data in RBR
+--echo **** On Master ****
+CREATE TABLE t1 (a INT);
+
+--echo Ten inserts in one transaction -> 1 epoch transaction
+BEGIN;
+INSERT INTO t1 VALUES (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
+COMMIT;
+
+--echo Wait for Binlog-on-disk
+flush logs;
+
+--echo Check that we have the expected extra row data in the Binlog
+create table raw_data(txt varchar(1000));
+--disable_query_log
+let $MYSQLD_DATADIR= `select @@datadir;`;
+--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/master-bin.000001 > $MYSQLTEST_VARDIR/tmp/rpl_extra_row_data.out
+
+--eval load data local infile '$MYSQLTEST_VARDIR/tmp/rpl_extra_row_data.out' into table raw_data columns terminated by '\n';
+--enable_query_log
+
+select replace(txt, '\r', '') from raw_data where txt like '%### Extra row data format%' order by txt;
+drop table raw_data;
+
+--echo Generate some more insert, update, delete traffic
+INSERT INTO t1 SELECT a+10 FROM t1;
+INSERT INTO t1 SELECT a+20 FROM t1;
+INSERT INTO t1 SELECT a+40 FROM t1;
+# 80 rows, 80 inserts
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+UPDATE t1 SET a = a+1;
+# 5 Updates of 80 rows = 400 updates, enough to show all potential lengths
+# of Binlog extra row data including 0 + 255.
+
+# 10 deletes
+DELETE FROM t1 WHERE a > 390;
+
+#show binlog events;
+#let $MYSQLD_DATADIR= `select @@datadir;`;
+#--exec $MYSQL_BINLOG --verbose $MYSQLD_DATADIR/master-bin.000001
+
+--echo **** On Slave ****
+--sync_slave_with_master
+connection slave;
+
+--echo Check row count and that slave is running ok
+SELECT count(*) from t1;
+source include/check_slave_is_running.inc;
+
+connection master;
+DROP TABLE t1;
+--sync_slave_with_master
+--source include/rpl_end.inc
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2011-09-06 10:33:58 +0000
+++ b/sql/ha_ndbcluster.cc 2011-09-09 09:30:43 +0000
@@ -52,6 +52,7 @@
#include <mysql/plugin.h>
#include <ndb_version.h>
#include "ndb_mi.h"
+#include "ndb_conflict_trans.h"
// ndb interface initialization/cleanup
extern "C" void ndb_init_internal();
@@ -497,96 +498,6 @@ update_slave_api_stats(Ndb* ndb)
st_ndb_slave_state g_ndb_slave_state;
-st_ndb_slave_state::st_ndb_slave_state()
- : current_conflict_defined_op_count(0),
- current_master_server_epoch(0),
- current_max_rep_epoch(0),
- max_rep_epoch(0),
- sql_run_id(~Uint32(0))
-{
- memset(current_violation_count, 0, sizeof(current_violation_count));
- memset(total_violation_count, 0, sizeof(total_violation_count));
-};
-
-void
-st_ndb_slave_state::atTransactionAbort()
-{
- /* Reset current-transaction counters + state */
- memset(current_violation_count, 0, sizeof(current_violation_count));
- current_conflict_defined_op_count = 0;
- current_max_rep_epoch = 0;
-}
-
-void
-st_ndb_slave_state::atTransactionCommit()
-{
- /* Merge committed transaction counters into total state
- * Then reset current transaction counters
- */
- for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
- {
- total_violation_count[i]+= current_violation_count[i];
- current_violation_count[i] = 0;
- }
- current_conflict_defined_op_count = 0;
- if (current_max_rep_epoch > max_rep_epoch)
- {
- DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
- max_rep_epoch,
- current_max_rep_epoch));
-
- max_rep_epoch = current_max_rep_epoch;
- }
- current_max_rep_epoch = 0;
-}
-
-void
-st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
- Uint32 row_server_id,
- Uint64 row_epoch,
- bool is_row_server_id_local)
-{
- if (row_server_id == master_server_id)
- {
- /*
- WRITE_ROW to ndb_apply_status injected by MySQLD
- immediately upstream of us.
- Record epoch
- */
- current_master_server_epoch = row_epoch;
- assert(! is_row_server_id_local);
- }
- else if (is_row_server_id_local)
- {
- DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
- " which is %s.",
- row_server_id, row_epoch,
- (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
- " new highest." : " older than previously applied"));
- if (row_epoch > current_max_rep_epoch)
- {
- /*
- Store new highest epoch in thdvar. If we commit successfully
- then this can become the new global max
- */
- current_max_rep_epoch = row_epoch;
- }
- }
-}
-
-void
-st_ndb_slave_state::atResetSlave()
-{
- /* Reset the Maximum replicated epoch vars
- * on slave reset
- * No need to touch the sql_run_id as that
- * will increment if the slave is started
- * again.
- */
- current_max_rep_epoch = 0;
- max_rep_epoch = 0;
-}
-
static int check_slave_state(THD* thd)
{
DBUG_ENTER("check_slave_state");
@@ -605,6 +516,8 @@ static int check_slave_state(THD* thd)
g_ndb_slave_state.sql_run_id));
g_ndb_slave_state.sql_run_id = runId;
+ g_ndb_slave_state.atStartSlave();
+
/* Always try to load the Max Replicated Epoch info
* first.
* Could be made optional if it's a problem
@@ -819,6 +732,15 @@ static int update_status_variables(Thd_n
SHOW_LONGLONG}, \
{"api_trans_local_read_row_count" NAME_SUFFIX, \
(char*) ARRAY_LOCATION[ Ndb::TransLocalReadRowCount ], \
+ SHOW_LONGLONG}, \
+ {"api_adaptive_send_forced_count" NAME_SUFFIX, \
+ (char *) ARRAY_LOCATION[ Ndb::ForcedSendsCount ], \
+ SHOW_LONGLONG}, \
+ {"api_adaptive_send_unforced_count" NAME_SUFFIX, \
+ (char *) ARRAY_LOCATION[ Ndb::UnforcedSendsCount ], \
+ SHOW_LONGLONG}, \
+ {"api_adaptive_send_deferred_count" NAME_SUFFIX, \
+ (char *) ARRAY_LOCATION[ Ndb::DeferredSendsCount ], \
SHOW_LONGLONG}
SHOW_VAR ndb_status_variables_dynamic[]= {
@@ -851,6 +773,13 @@ SHOW_VAR ndb_status_conflict_variables[]
{"fn_old", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_OLD], SHOW_LONGLONG},
{"fn_max_del_win", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_MAX_DEL_WIN], SHOW_LONGLONG},
{"fn_epoch", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH], SHOW_LONGLONG},
+ {"fn_epoch_trans", (char*) &g_ndb_slave_state.total_violation_count[CFT_NDB_EPOCH_TRANS], SHOW_LONGLONG},
+ {"trans_row_conflict_count", (char*) &g_ndb_slave_state.trans_row_conflict_count, SHOW_LONGLONG},
+ {"trans_row_reject_count", (char*) &g_ndb_slave_state.trans_row_reject_count, SHOW_LONGLONG},
+ {"trans_reject_count", (char*) &g_ndb_slave_state.trans_in_conflict_count, SHOW_LONGLONG},
+ {"trans_detect_iter_count", (char*) &g_ndb_slave_state.trans_detect_iter_count, SHOW_LONGLONG},
+ {"trans_conflict_commit_count",
+ (char*) &g_ndb_slave_state.trans_conflict_commit_count, SHOW_LONGLONG},
{NullS, NullS, SHOW_LONG}
};
@@ -961,66 +890,6 @@ static int ndb_to_mysql_error(const NdbE
#ifdef HAVE_NDB_BINLOG
-/* Write conflicting row to exceptions table. */
-static int write_conflict_row(NDB_SHARE *share,
- NdbTransaction *trans,
- const uchar *row,
- NdbError& err)
-{
- DBUG_ENTER("write_conflict_row");
-
- /* get exceptions table */
- NDB_CONFLICT_FN_SHARE *cfn_share= share->m_cfn_share;
- const NDBTAB *ex_tab= cfn_share->m_ex_tab;
- DBUG_ASSERT(ex_tab != NULL);
-
- /* get insert op */
- NdbOperation *ex_op= trans->getNdbOperation(ex_tab);
- if (ex_op == NULL)
- {
- err= trans->getNdbError();
- DBUG_RETURN(-1);
- }
- if (ex_op->insertTuple() == -1)
- {
- err= ex_op->getNdbError();
- DBUG_RETURN(-1);
- }
- {
- uint32 server_id= (uint32)::server_id;
- uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
- uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
- uint32 count= (uint32)++(cfn_share->m_count);
- if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
- ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
- ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
- ex_op->setValue((Uint32)3, (const char *)&(count)))
- {
- err= ex_op->getNdbError();
- DBUG_RETURN(-1);
- }
- }
- /* copy primary keys */
- {
- const int fixed_cols= 4;
- int nkey= cfn_share->m_pk_cols;
- int k;
- for (k= 0; k < nkey; k++)
- {
- DBUG_ASSERT(row != NULL);
- const uchar* data= row + cfn_share->m_offset[k];
- if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
- {
- err= ex_op->getNdbError();
- DBUG_RETURN(-1);
- }
- }
- }
- DBUG_RETURN(0);
-}
-#endif
-
-#ifdef HAVE_NDB_BINLOG
int
handle_conflict_op_error(Thd_ndb* thd_ndb,
NdbTransaction* trans,
@@ -1030,13 +899,14 @@ handle_conflict_op_error(Thd_ndb* thd_nd
int
handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
const char* tab_name,
+ bool table_has_blobs,
+ const char* handling_type,
const NdbRecord* key_rec,
const uchar* pk_row,
enum_conflicting_op_type op_type,
enum_conflict_cause conflict_cause,
const NdbError& conflict_error,
- NdbTransaction* conflict_trans,
- NdbError& err);
+ NdbTransaction* conflict_trans);
#endif
static const Uint32 error_op_after_refresh_op = 920;
@@ -4639,7 +4509,609 @@ thd_allow_batch(const THD* thd)
#endif
}
-#ifdef HAVE_NDB_BINLOG
+/**
+ st_ndb_slave_state constructor
+
+ Initialise Ndb Slave state object
+*/
+st_ndb_slave_state::st_ndb_slave_state()
+ : current_master_server_epoch(0),
+ current_max_rep_epoch(0),
+ conflict_flags(0),
+ retry_trans_count(0),
+ current_trans_row_conflict_count(0),
+ current_trans_row_reject_count(0),
+ current_trans_in_conflict_count(0),
+ max_rep_epoch(0),
+ sql_run_id(~Uint32(0)),
+ trans_row_conflict_count(0),
+ trans_row_reject_count(0),
+ trans_detect_iter_count(0),
+ trans_in_conflict_count(0),
+ trans_conflict_commit_count(0),
+ trans_conflict_apply_state(SAS_NORMAL),
+ trans_dependency_tracker(NULL)
+{
+ memset(current_violation_count, 0, sizeof(current_violation_count));
+ memset(total_violation_count, 0, sizeof(total_violation_count));
+
+ /* Init conflict handling state memroot */
+ const size_t CONFLICT_MEMROOT_BLOCK_SIZE = 32768;
+ init_alloc_root(&conflict_mem_root, CONFLICT_MEMROOT_BLOCK_SIZE, 0);
+};
+
+/**
+ resetPerAttemptCounters
+
+ Reset the per-epoch-transaction-application-attempt counters
+*/
+void
+st_ndb_slave_state::resetPerAttemptCounters()
+{
+ memset(current_violation_count, 0, sizeof(current_violation_count));
+ current_trans_row_conflict_count = 0;
+ current_trans_row_reject_count = 0;
+ current_trans_in_conflict_count = 0;
+
+ conflict_flags = 0;
+ current_max_rep_epoch = 0;
+}
+
+/**
+ atTransactionAbort()
+
+ Called by Slave SQL thread during transaction abort.
+*/
+void
+st_ndb_slave_state::atTransactionAbort()
+{
+ /* Reset current-transaction counters + state */
+ resetPerAttemptCounters();
+}
+
+/**
+ atTransactionCommit()
+
+ Called by Slave SQL thread after transaction commit
+*/
+void
+st_ndb_slave_state::atTransactionCommit()
+{
+ assert( ((trans_dependency_tracker == NULL) &&
+ (trans_conflict_apply_state == SAS_NORMAL)) ||
+ ((trans_dependency_tracker != NULL) &&
+ (trans_conflict_apply_state == SAS_TRACK_TRANS_DEPENDENCIES)) );
+ assert( trans_conflict_apply_state != SAS_APPLY_TRANS_DEPENDENCIES );
+
+ /* Merge committed transaction counters into total state
+ * Then reset current transaction counters
+ */
+ for (int i=0; i < CFT_NUMBER_OF_CFTS; i++)
+ {
+ total_violation_count[i]+= current_violation_count[i];
+ }
+ trans_row_conflict_count+= current_trans_row_conflict_count;
+ trans_row_reject_count+= current_trans_row_reject_count;
+ trans_in_conflict_count+= current_trans_in_conflict_count;
+
+ if (current_trans_in_conflict_count)
+ trans_conflict_commit_count++;
+
+ if (current_max_rep_epoch > max_rep_epoch)
+ {
+ DBUG_PRINT("info", ("Max replicated epoch increases from %llu to %llu",
+ max_rep_epoch,
+ current_max_rep_epoch));
+ max_rep_epoch = current_max_rep_epoch;
+ }
+
+ resetPerAttemptCounters();
+
+ /* Clear per-epoch-transaction retry_trans_count */
+ retry_trans_count = 0;
+}
+
+/**
+ atApplyStatusWrite
+
+ Called by Slave SQL thread when applying an event to the
+ ndb_apply_status table
+*/
+void
+st_ndb_slave_state::atApplyStatusWrite(Uint32 master_server_id,
+ Uint32 row_server_id,
+ Uint64 row_epoch,
+ bool is_row_server_id_local)
+{
+ if (row_server_id == master_server_id)
+ {
+ /*
+ WRITE_ROW to ndb_apply_status injected by MySQLD
+ immediately upstream of us.
+ Record epoch
+ */
+ current_master_server_epoch = row_epoch;
+ assert(! is_row_server_id_local);
+ }
+ else if (is_row_server_id_local)
+ {
+ DBUG_PRINT("info", ("Recording application of local server %u epoch %llu "
+ " which is %s.",
+ row_server_id, row_epoch,
+ (row_epoch > g_ndb_slave_state.current_max_rep_epoch)?
+ " new highest." : " older than previously applied"));
+ if (row_epoch > current_max_rep_epoch)
+ {
+ /*
+ Store new highest epoch in thdvar. If we commit successfully
+ then this can become the new global max
+ */
+ current_max_rep_epoch = row_epoch;
+ }
+ }
+}
+
+/**
+ atResetSlave()
+
+ Called when RESET SLAVE command issued - in context of command client.
+*/
+void
+st_ndb_slave_state::atResetSlave()
+{
+ /* Reset the Maximum replicated epoch vars
+ * on slave reset
+ * No need to touch the sql_run_id as that
+ * will increment if the slave is started
+ * again.
+ */
+ resetPerAttemptCounters();
+
+ retry_trans_count = 0;
+ max_rep_epoch = 0;
+}
+
+/**
+ atStartSlave()
+
+ Called by Slave SQL thread when first applying a row to Ndb after
+ a START SLAVE command.
+*/
+void
+st_ndb_slave_state::atStartSlave()
+{
+#ifdef HAVE_NDB_BINLOG
+ if (trans_conflict_apply_state != SAS_NORMAL)
+ {
+ /*
+ Remove conflict handling state on a SQL thread
+ restart
+ */
+ atEndTransConflictHandling();
+ trans_conflict_apply_state = SAS_NORMAL;
+ }
+#endif
+};
+
+#ifdef HAVE_NDB_BINLOG
+
+/**
+ atBeginTransConflictHandling()
+
+ Called by Slave SQL thread when it determines that Transactional
+ Conflict handling is required
+*/
+void
+st_ndb_slave_state::atBeginTransConflictHandling()
+{
+ DBUG_ENTER("atBeginTransConflictHandling");
+ /*
+ Allocate and initialise Transactional Conflict
+ Resolution Handling Structures
+ */
+ assert(trans_dependency_tracker == NULL);
+ trans_dependency_tracker = DependencyTracker::newDependencyTracker(&conflict_mem_root);
+ DBUG_VOID_RETURN;
+};
+
+/**
+ atPrepareConflictDetection
+
+ Called by Slave SQL thread prior to defining an operation on
+ a table with conflict detection defined.
+*/
+int
+st_ndb_slave_state::atPrepareConflictDetection(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* row_data,
+ Uint64 transaction_id,
+ bool& handle_conflict_now)
+{
+ DBUG_ENTER("atPrepareConflictDetection");
+ /*
+ Slave is preparing to apply an operation with conflict detection.
+ If we're performing Transactional Conflict Resolution, take
+ extra steps
+ */
+ switch( trans_conflict_apply_state )
+ {
+ case SAS_NORMAL:
+ DBUG_PRINT("info", ("SAS_NORMAL : No special handling"));
+ /* No special handling */
+ break;
+ case SAS_TRACK_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Tracking operation"));
+ /*
+ Track this operation and its transaction id, to determine
+ inter-transaction dependencies by {table, primary key}
+ */
+ assert( trans_dependency_tracker );
+
+ int res = trans_dependency_tracker
+ ->track_operation(table,
+ key_rec,
+ row_data,
+ transaction_id);
+ if (res != 0)
+ {
+ sql_print_error("%s", trans_dependency_tracker->get_error_text());
+ DBUG_RETURN(res);
+ }
+ /* Proceed as normal */
+ break;
+ }
+ case SAS_APPLY_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Deciding whether to apply"));
+ /*
+ Check if this operation's transaction id is marked in-conflict.
+ If it is, we tell the caller to perform conflict resolution now instead
+ of attempting to apply the operation.
+ */
+ assert( trans_dependency_tracker );
+
+ if (trans_dependency_tracker->in_conflict(transaction_id))
+ {
+ DBUG_PRINT("info", ("Event for transaction %llu is conflicting. Handling.",
+ transaction_id));
+ current_trans_row_reject_count++;
+ handle_conflict_now = true;
+ DBUG_RETURN(0);
+ }
+
+ /*
+ This transaction is not marked in-conflict, so continue with normal
+ processing.
+ Note that normal processing may subsequently detect a conflict which
+ didn't exist at the time of the previous TRACK_DEPENDENCIES pass.
+ In this case, we will rollback and repeat the TRACK_DEPENDENCIES
+ stage.
+ */
+ DBUG_PRINT("info", ("Event for transaction %llu is OK, applying",
+ transaction_id));
+ break;
+ }
+ }
+ DBUG_RETURN(0);
+}
+
+/**
+ atTransConflictDetected
+
+ Called by the Slave SQL thread when a conflict is detected on
+ an executed operation.
+*/
+int
+st_ndb_slave_state::atTransConflictDetected(Uint64 transaction_id)
+{
+ DBUG_ENTER("atTransConflictDetected");
+
+ /*
+ The Slave has detected a conflict on an operation applied
+ to a table with Transactional Conflict Resolution defined.
+ Handle according to current state.
+ */
+ conflict_flags |= SCS_TRANS_CONFLICT_DETECTED_THIS_PASS;
+ current_trans_row_conflict_count++;
+
+ switch (trans_conflict_apply_state)
+ {
+ case SAS_NORMAL:
+ {
+ DBUG_PRINT("info", ("SAS_NORMAL : Conflict on op on table with trans detection."
+ "Requires multi-pass resolution. Will transition to "
+ "SAS_TRACK_TRANS_DEPENDENCIES at Commit."));
+ /*
+ Conflict on table with transactional conflict resolution
+ defined.
+ This is the trigger that we will do transactional conflict
+ resolution.
+ Record that we need to do multiple passes to correctly
+ perform resolution.
+ TODO : Early exit from applying epoch?
+ */
+ break;
+ }
+ case SAS_TRACK_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES : Operation in transaction %llu "
+ "had conflict",
+ transaction_id));
+ /*
+ Conflict on table with transactional conflict resolution
+ defined.
+ We will mark the operation's transaction_id as in-conflict,
+ so that any other operations on the transaction are also
+ considered in-conflict, and any dependent transactions are also
+ considered in-conflict.
+ */
+ assert(trans_dependency_tracker != NULL);
+ int res = trans_dependency_tracker
+ ->mark_conflict(transaction_id);
+
+ if (res != 0)
+ {
+ sql_print_error("%s", trans_dependency_tracker->get_error_text());
+ DBUG_RETURN(res);
+ }
+ break;
+ }
+ case SAS_APPLY_TRANS_DEPENDENCIES:
+ {
+ /*
+ This must be a new conflict, not noticed on the previous
+ pass.
+ */
+ DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES : Conflict detected. "
+ "Must be further conflict. Will return to "
+ "SAS_TRACK_TRANS_DEPENDENCIES state at commit."));
+ // TODO : Early exit from applying epoch
+ break;
+ }
+ default:
+ break;
+ }
+
+ DBUG_RETURN(0);
+}
+
+/**
+ atConflictPreCommit
+
+ Called by the Slave SQL thread prior to committing a Slave transaction.
+ This method can request that the Slave transaction is retried.
+
+
+ State transitions :
+
+ START SLAVE /
+ RESET SLAVE /
+ STARTUP
+ |
+ |
+ v
+ ****************
+ * SAS_NORMAL *
+ ****************
+ ^ |
+ No transactional | | Conflict on transactional table
+ conflicts | | (Rollback)
+ (Commit) | |
+ | v
+ **********************************
+ * SAS_TRACK_TRANS_DEPENDENCIES *
+ **********************************
+ ^ I ^
+ More I I Dependencies |
+ conflicts I I determined | No new conflicts
+ found I I (Rollback) | (Commit)
+ (Rollback) I I |
+ I v |
+ **********************************
+ * SAS_APPLY_TRANS_DEPENDENCIES *
+ **********************************
+
+
+ Operation
+ The initial state is SAS_NORMAL.
+
+ On detecting a conflict on a transactional conflict detetecing table,
+ SAS_TRACK_TRANS_DEPENDENCIES is entered, and the epoch transaction is
+ rolled back and reapplied.
+
+ In SAS_TRACK_TRANS_DEPENDENCIES state, transaction dependencies and
+ conflicts are tracked as the epoch transaction is applied.
+
+ Then the Slave transitions to SAS_APPLY_TRANS_DEPENDENCIES state, and
+ the epoch transaction is rolled back and reapplied.
+
+ In the SAS_APPLY_TRANS_DEPENDENCIES state, operations for transactions
+ marked as in-conflict are not applied.
+
+ If this results in no new conflicts, the epoch transaction is committed,
+ and the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered for processing
+ the next replicated epch transaction.
+ If it results in new conflicts, the epoch transactions is rolled back, and
+ the SAS_TRACK_TRANS_DEPENDENCIES state is re-entered again, to determine
+ the new set of dependencies.
+
+ If no conflicts are found in the SAS_TRACK_TRANS_DEPENDENCIES state, then
+ the epoch transaction is committed, and the Slave transitions to SAS_NORMAL
+ state.
+
+
+ Properties
+ 1) Normally, there is no transaction dependency tracking overhead paid by
+ the slave.
+
+ 2) On first detecting a transactional conflict, the epoch transaction must be
+ applied at least three times, with two rollbacks.
+
+ 3) Transactional conflicts detected in subsequent epochs require the epoch
+ transaction to be applied two times, with one rollback.
+
+ 4) A loop between states SAS_TRACK_TRANS_DEPENDENCIES and SAS_APPLY_TRANS_
+ DEPENDENCIES occurs when further transactional conflicts are discovered
+ in SAS_APPLY_TRANS_DEPENDENCIES state. This implies that the conflicts
+ discovered in the SAS_TRACK_TRANS_DEPENDENCIES state must not be complete,
+ so we revisit that state to get a more complete picture.
+
+ 5) The number of iterations of this loop is fixed to a hard coded limit, after
+ which the Slave will stop with an error. This should be an unlikely
+ occurrence, as it requires not just n conflicts, but at least 1 new conflict
+ appearing between the transactions in the epoch transaction and the
+ database between the two states, n times in a row.
+
+ 6) Where conflicts are occasional, as expected, the post-commit transition to
+ SAS_TRACK_TRANS_DEPENDENCIES rather than SAS_NORMAL results in one epoch
+ transaction having its transaction dependencies needlessly tracked.
+
+*/
+int
+st_ndb_slave_state::atConflictPreCommit(bool& retry_slave_trans)
+{
+ DBUG_ENTER("atConflictPreCommit");
+
+ /*
+ Prior to committing a Slave transaction, we check whether
+ Transactional conflicts have been detected which require
+ us to retry the slave transaction
+ */
+ retry_slave_trans = false;
+ switch(trans_conflict_apply_state)
+ {
+ case SAS_NORMAL:
+ {
+ DBUG_PRINT("info", ("SAS_NORMAL"));
+ /*
+ Normal case. Only if we defined conflict detection on a table
+ with transactional conflict detection, and saw conflicts (on any table)
+ do we go to another state
+ */
+ if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+ {
+ DBUG_PRINT("info", ("Conflict(s) detected this pass, transitioning to "
+ "SAS_TRACK_TRANS_DEPENDENCIES."));
+ assert(conflict_flags & SCS_OPS_DEFINED);
+ /* Transactional conflict resolution required, switch state */
+ atBeginTransConflictHandling();
+ resetPerAttemptCounters();
+ trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+ retry_slave_trans = true;
+ }
+ break;
+ }
+ case SAS_TRACK_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_TRACK_TRANS_DEPENDENCIES"));
+
+ if (conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS)
+ {
+ /*
+ Conflict on table with transactional detection
+ this pass, we have collected the details and
+ dependencies, now transition to
+ SAS_APPLY_TRANS_DEPENDENCIES and
+ reapply the epoch transaction without the
+ conflicting transactions.
+ */
+ assert(conflict_flags & SCS_OPS_DEFINED);
+ DBUG_PRINT("info", ("Transactional conflicts, transitioning to "
+ "SAS_APPLY_TRANS_DEPENDENCIES"));
+
+ trans_conflict_apply_state = SAS_APPLY_TRANS_DEPENDENCIES;
+ trans_detect_iter_count++;
+ retry_slave_trans = true;
+ break;
+ }
+ else
+ {
+ /*
+ No transactional conflicts detected this pass, lets
+ return to SAS_NORMAL state after commit for more efficient
+ application of epoch transactions
+ */
+ DBUG_PRINT("info", ("No transactional conflicts, transitioning to "
+ "SAS_NORMAL"));
+ atEndTransConflictHandling();
+ trans_conflict_apply_state = SAS_NORMAL;
+ break;
+ }
+ }
+ case SAS_APPLY_TRANS_DEPENDENCIES:
+ {
+ DBUG_PRINT("info", ("SAS_APPLY_TRANS_DEPENDENCIES"));
+ assert(conflict_flags & SCS_OPS_DEFINED);
+ /*
+ We've applied the Slave epoch transaction subject to the
+ conflict detection. If any further transactional
+ conflicts have been observed, then we must repeat the
+ process.
+ */
+ atEndTransConflictHandling();
+ atBeginTransConflictHandling();
+ trans_conflict_apply_state = SAS_TRACK_TRANS_DEPENDENCIES;
+
+ if (unlikely(conflict_flags & SCS_TRANS_CONFLICT_DETECTED_THIS_PASS))
+ {
+ DBUG_PRINT("info", ("Further conflict(s) detected, repeating the "
+ "TRACK_TRANS_DEPENDENCIES pass"));
+ /*
+ Further conflict observed when applying, need
+ to re-determine dependencies
+ */
+ resetPerAttemptCounters();
+ retry_slave_trans = true;
+ break;
+ }
+
+
+ DBUG_PRINT("info", ("No further conflicts detected, committing and "
+ "returning to SAS_TRACK_TRANS_DEPENDENCIES state"));
+ /*
+ With dependencies taken into account, no further
+ conflicts detected, can now proceed to commit
+ */
+ break;
+ }
+ }
+
+ /*
+ Clear conflict flags, to ensure that we detect any new conflicts
+ */
+ conflict_flags = 0;
+
+ if (retry_slave_trans)
+ {
+ DBUG_PRINT("info", ("Requesting transaction restart"));
+ DBUG_RETURN(1);
+ }
+
+ DBUG_PRINT("info", ("Allowing commit to proceed"));
+ DBUG_RETURN(0);
+}
+
+/**
+ atEndTransConflictHandling
+
+ Called when transactional conflict handling has completed.
+*/
+void
+st_ndb_slave_state::atEndTransConflictHandling()
+{
+ DBUG_ENTER("atEndTransConflictHandling");
+ /* Release any conflict handling state */
+ if (trans_dependency_tracker)
+ {
+ current_trans_in_conflict_count =
+ trans_dependency_tracker->get_conflict_count();
+ trans_dependency_tracker = NULL;
+ free_root(&conflict_mem_root, MY_MARK_BLOCKS_FREE);
+ }
+ DBUG_VOID_RETURN;
+};
+
/**
prepare_conflict_detection
@@ -4648,22 +5120,114 @@ thd_allow_batch(const THD* thd)
It is responsible for defining and adding any operation filtering
required, and for saving any operation definition state required
- for post-execute analysis
+ for post-execute analysis.
+
+ For transactional detection, this method may determine that the
+ operation being defined should not be executed, and conflict
+ handling should occur immediately. In this case, conflict_handled
+ is set to true.
*/
int
ha_ndbcluster::prepare_conflict_detection(enum_conflicting_op_type op_type,
const NdbRecord* key_rec,
const uchar* old_data,
const uchar* new_data,
+ NdbTransaction* trans,
NdbInterpretedCode* code,
- NdbOperation::OperationOptions* options)
+ NdbOperation::OperationOptions* options,
+ bool& conflict_handled)
{
DBUG_ENTER("prepare_conflict_detection");
-
+ THD* thd = table->in_use;
int res = 0;
+ assert(thd->slave_thread);
+
+ conflict_handled = false;
+
+ /*
+ Check transaction id first, as in transactional conflict detection,
+ the transaction id is what eventually dictates whether an operation
+ is applied or not.
+
+ Not that this applies even if the current operation's table does not
+ have a conflict function defined - if a transaction spans a 'transactional
+ conflict detection' table and a non transactional table, the non-transactional
+ table's data will also be reverted.
+ */
+ Uint64 transaction_id = Ndb_binlog_extra_row_info::InvalidTransactionId;
+ if (thd->binlog_row_event_extra_data)
+ {
+ Ndb_binlog_extra_row_info extra_row_info;
+ extra_row_info.loadFromBuffer(thd->binlog_row_event_extra_data);
+ if (extra_row_info.getFlags() &
+ Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID)
+ transaction_id = extra_row_info.getTransactionId();
+ }
+
+ {
+ bool handle_conflict_now = false;
+ const uchar* row_data = (op_type == WRITE_ROW? new_data : old_data);
+ int res = g_ndb_slave_state.atPrepareConflictDetection(m_table,
+ key_rec,
+ row_data,
+ transaction_id,
+ handle_conflict_now);
+ if (res)
+ DBUG_RETURN(res);
+
+ if (handle_conflict_now)
+ {
+ DBUG_PRINT("info", ("Conflict handling for row occurring now"));
+ NdbError noRealConflictError;
+ const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
+
+ /*
+ Directly handle the conflict here - e.g refresh/ write to
+ exceptions table etc.
+ */
+ res = handle_row_conflict(m_share->m_cfn_share,
+ m_share->table_name,
+ m_share->flags & NSF_BLOB_FLAG,
+ "Transaction",
+ key_rec,
+ row_to_save,
+ op_type,
+ TRANS_IN_CONFLICT,
+ noRealConflictError,
+ trans);
+ if (unlikely(res))
+ DBUG_RETURN(res);
+
+ g_ndb_slave_state.conflict_flags |= SCS_OPS_DEFINED;
+
+ /*
+ Indicate that there (may be) some more operations to
+ execute before committing
+ */
+ m_thd_ndb->m_unsent_bytes+= 12;
+ conflict_handled = true;
+ DBUG_RETURN(0);
+ }
+ }
+
+ if (! (m_share->m_cfn_share &&
+ m_share->m_cfn_share->m_conflict_fn))
+ {
+ /* No conflict function definition required */
+ DBUG_RETURN(0);
+ }
+
const st_conflict_fn_def* conflict_fn = m_share->m_cfn_share->m_conflict_fn;
assert( conflict_fn != NULL );
+ if (unlikely((conflict_fn->flags & CF_TRANSACTIONAL) &&
+ (transaction_id == Ndb_binlog_extra_row_info::InvalidTransactionId)))
+ {
+ sql_print_warning("NDB Slave : Transactional conflict detection defined on table %s, but "
+ "events received without transaction ids. Check --ndb-log-transaction-id setting "
+ "on upstream Cluster.",
+ m_share->key);
+ }
/*
Prepare interpreted code for operation (update + delete only) according
@@ -4686,7 +5250,7 @@ ha_ndbcluster::prepare_conflict_detectio
}
} // if (op_type != WRITE_ROW)
- g_ndb_slave_state.current_conflict_defined_op_count++;
+ g_ndb_slave_state.conflict_flags |= SCS_OPS_DEFINED;
/* Now save data for potential insert to exceptions table... */
const uchar* row_to_save = (op_type == DELETE_ROW)? old_data : new_data;
@@ -4694,6 +5258,7 @@ ha_ndbcluster::prepare_conflict_detectio
ex_data.share= m_share;
ex_data.key_rec= key_rec;
ex_data.op_type= op_type;
+ ex_data.trans_id= transaction_id;
/*
We need to save the row data for possible conflict resolution after
execute().
@@ -4741,32 +5306,38 @@ handle_conflict_op_error(Thd_ndb* thd_nd
(err.classification == NdbError::NoDataFound))
{
DBUG_PRINT("info",
- ("err.code %s (int) error_conflict_fn_violation, "
- "err.classification %s",
- err.code == (int) error_conflict_fn_violation ? "==" : "!=",
- err.classification
- == NdbError::ConstraintViolation
- ? "== NdbError::ConstraintViolation"
- : (err.classification == NdbError::NoDataFound
- ? "== NdbError::NoDataFound" : "!=")));
+ ("err.code = %s, err.classification = %s",
+ ((err.code == (int) error_conflict_fn_violation)?
+ "error_conflict_fn_violation":
+ ((err.code == (int) error_op_after_refresh_op)?
+ "error_op_after_refresh_op" : "?")),
+ ((err.classification == NdbError::ConstraintViolation)?
+ "ConstraintViolation":
+ ((err.classification == NdbError::NoDataFound)?
+ "NoDataFound" : "?"))));
enum_conflict_cause conflict_cause;
+ /* Map cause onto our conflict description type */
if ((err.code == (int) error_conflict_fn_violation) ||
(err.code == (int) error_op_after_refresh_op))
{
+ DBUG_PRINT("info", ("ROW_IN_CONFLICT"));
conflict_cause= ROW_IN_CONFLICT;
}
else if (err.classification == NdbError::ConstraintViolation)
{
+ DBUG_PRINT("info", ("ROW_ALREADY_EXISTS"));
conflict_cause= ROW_ALREADY_EXISTS;
}
else
{
assert(err.classification == NdbError::NoDataFound);
+ DBUG_PRINT("info", ("ROW_DOES_NOT_EXIST"));
conflict_cause= ROW_DOES_NOT_EXIST;
}
+ /* Get exceptions data from operation */
const void* buffer=op->getCustomData();
assert(buffer);
Ndb_exceptions_data ex_data;
@@ -4774,88 +5345,65 @@ handle_conflict_op_error(Thd_ndb* thd_nd
NDB_SHARE *share= ex_data.share;
const NdbRecord* key_rec= ex_data.key_rec;
const uchar* row= ex_data.row;
- enum_conflicting_op_type op_type = ex_data.op_type;
- DBUG_ASSERT(share != NULL && row != NULL);
-
- NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
- if (cfn_share)
- {
- enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
- bool haveExTable = cfn_share->m_ex_tab != NULL;
+ enum_conflicting_op_type causing_op_type = ex_data.op_type;
- g_ndb_slave_state.current_violation_count[cft]++;
+ DBUG_PRINT("info", ("Conflict causing op type : %u",
+ causing_op_type));
+ if (causing_op_type == REFRESH_ROW)
+ {
+ /*
+ The failing op was a refresh row, we require that it
+ failed due to being a duplicate (e.g. a refresh
+ occurring on a refreshed row)
+ */
+ if (err.code == (int) error_op_after_refresh_op)
{
- NdbError handle_error;
- if (handle_row_conflict(cfn_share,
- share->table_name,
- key_rec,
- row,
- op_type,
- conflict_cause,
- err,
- trans,
- handle_error))
- {
- /* Error with handling of row conflict */
- char msg[FN_REFLEN];
- my_snprintf(msg, sizeof(msg), "Row conflict handling "
- "on table %s hit Ndb error %d '%s'",
- share->table_name,
- handle_error.code,
- handle_error.message);
-
- if (handle_error.status == NdbError::TemporaryError)
- {
- /* Slave will roll back and retry entire transaction. */
- ERR_RETURN(handle_error);
- }
- else
- {
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
- ER_EXCEPTIONS_WRITE_ERROR,
- ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
- /* Slave will stop replication. */
- DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
- }
- }
+ DBUG_PRINT("info", ("Operation after refresh - ignoring"));
+ DBUG_RETURN(0);
+ }
+ else
+ {
+ DBUG_PRINT("info", ("Refresh op hit real error %u", err.code));
+ /* Unexpected error, normal handling*/
+ DBUG_RETURN(err.code);
}
+ }
+ DBUG_ASSERT(share != NULL && row != NULL);
+ NDB_CONFLICT_FN_SHARE* cfn_share= share->m_cfn_share;
+ bool table_has_trans_conflict_detection =
+ cfn_share &&
+ cfn_share->m_conflict_fn &&
+ (cfn_share->m_conflict_fn->flags & CF_TRANSACTIONAL);
+
+ if (table_has_trans_conflict_detection)
+ {
+ /* Perform special transactional conflict-detected handling */
+ int res = g_ndb_slave_state.atTransConflictDetected(ex_data.trans_id);
+ if (res)
+ DBUG_RETURN(res);
+ }
- if (haveExTable)
- {
- NdbError ex_err;
- if (write_conflict_row(share, trans, row, ex_err))
- {
- char msg[FN_REFLEN];
- my_snprintf(msg, sizeof(msg), "table %s NDB error %d '%s'",
- cfn_share->m_ex_tab->getName(),
- ex_err.code, ex_err.message);
+ if (cfn_share)
+ {
+ /* Now handle the conflict on this row */
+ enum_conflict_fn_type cft = cfn_share->m_conflict_fn->type;
- NdbDictionary::Dictionary* dict= thd_ndb->ndb->getDictionary();
+ g_ndb_slave_state.current_violation_count[cft]++;
- if (ex_err.classification == NdbError::SchemaError)
- {
- dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
- cfn_share->m_ex_tab= NULL;
- }
- else if (ex_err.status == NdbError::TemporaryError)
- {
- /* Slave will roll back and retry entire transaction. */
- ERR_RETURN(ex_err);
- }
- else
- {
- push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
- ER_EXCEPTIONS_WRITE_ERROR,
- ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
- /* Slave will stop replication. */
- DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
- }
- }
- } // if (haveExTable)
+ int res = handle_row_conflict(cfn_share,
+ share->table_name,
+ false, /* table_has_blobs */
+ "Row",
+ key_rec,
+ row,
+ causing_op_type,
+ conflict_cause,
+ err,
+ trans);
- DBUG_RETURN(0);
+ DBUG_RETURN(res);
}
else
{
@@ -4872,10 +5420,7 @@ handle_conflict_op_error(Thd_ndb* thd_nd
DBUG_RETURN(0); // Reachable?
}
-#endif /* HAVE_NDB_BINLOG */
-
-#ifdef HAVE_NDB_BINLOG
/*
is_serverid_local
*/
@@ -5090,12 +5635,33 @@ int ha_ndbcluster::ndb_write_row(uchar *
MY_BITMAP tmpBitmap;
MY_BITMAP *user_cols_written_bitmap;
#ifdef HAVE_NDB_BINLOG
- bool haveConflictFunction =
- (thd->slave_thread &&
- m_share->m_cfn_share &&
- m_share->m_cfn_share->m_conflict_fn);
+ /* Conflict resolution in slave thread */
+ bool haveConflictFunction = false;
+ if (thd->slave_thread)
+ {
+ haveConflictFunction =
+ m_share->m_cfn_share &&
+ m_share->m_cfn_share->m_conflict_fn;
+ bool conflict_handled = false;
+
+ if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
+ key_rec,
+ NULL, /* old_data */
+ record, /* new_data */
+ trans,
+ NULL, /* code */
+ &options,
+ conflict_handled))))
+ DBUG_RETURN(error);
+
+ if (unlikely(conflict_handled))
+ {
+ /* No need to continue with operation definition */
+ /* TODO : Ensure batch execution */
+ DBUG_RETURN(0);
+ }
+ };
#endif
-
if (m_use_write
#ifdef HAVE_NDB_BINLOG
/* Conflict detection must use normal Insert */
@@ -5138,19 +5704,6 @@ int ha_ndbcluster::ndb_write_row(uchar *
}
else
{
-#ifdef HAVE_NDB_BINLOG
- if (haveConflictFunction)
- {
- /* Conflict detection in slave thread */
- if (unlikely((error = prepare_conflict_detection(WRITE_ROW,
- key_rec,
- NULL, /* old_data */
- record, /* new_data */
- NULL, /* code */
- &options))))
- DBUG_RETURN(error);
- }
-#endif
uchar *mask;
/* Check whether Ndb table definition includes any default values. */
@@ -5297,20 +5850,34 @@ int ha_ndbcluster::primary_key_cmp(const
}
#ifdef HAVE_NDB_BINLOG
+
+static Ndb_exceptions_data StaticRefreshExceptionsData=
+{ NULL, NULL, NULL, REFRESH_ROW, 0 };
+
int
handle_row_conflict(NDB_CONFLICT_FN_SHARE* cfn_share,
const char* table_name,
+ bool table_has_blobs,
+ const char* handling_type,
const NdbRecord* key_rec,
const uchar* pk_row,
enum_conflicting_op_type op_type,
enum_conflict_cause conflict_cause,
const NdbError& conflict_error,
- NdbTransaction* conflict_trans,
- NdbError& err)
+ NdbTransaction* conflict_trans)
{
DBUG_ENTER("handle_row_conflict");
- if (cfn_share->m_flags & CFF_REFRESH_ROWS)
+ /*
+ We will refresh the row if the conflict function requires
+ it, or if we are handling a transactional conflict.
+ */
+ bool refresh_row =
+ (conflict_cause == TRANS_IN_CONFLICT) ||
+ (cfn_share &&
+ (cfn_share->m_flags & CFF_REFRESH_ROWS));
+
+ if (refresh_row)
{
/* A conflict has been detected between an applied replicated operation
* and the data in the DB.
@@ -5333,66 +5900,220 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
assert(key_rec != NULL);
assert(pk_row != NULL);
- /* When the slave splits an epoch into batches, a conflict row detected
- * and refreshed in an early batch can be written to by operations in
- * a later batch. As the operations will not have applied, and the
- * row has already been refreshed, we need not attempt to refresh
- * it again
- */
- if ((conflict_cause == ROW_IN_CONFLICT) &&
- (conflict_error.code == (int) error_op_after_refresh_op))
+ do
{
- /* Attempt to apply an operation after the row was refreshed
- * Ignore the error
+ /* We cannot refresh a row which has Blobs, as we do not support
+ * Blob refresh yet.
+ * Rows implicated by a transactional conflict function may have
+ * Blobs.
+ * We will generate an error in this case
*/
- DBUG_PRINT("info", ("Operation after refresh error - ignoring"));
- DBUG_RETURN(0);
- }
+ if (table_has_blobs)
+ {
+ char msg[FN_REFLEN];
+ my_snprintf(msg, sizeof(msg), "%s conflict handling "
+ "on table %s failed as table has Blobs which cannot be refreshed.",
+ handling_type,
+ table_name);
- /* When a delete operation finds that the row does not exist, it indicates
- * a DELETE vs DELETE conflict. If we refresh the row then we can get
- * non deterministic behaviour depending on slave batching as follows :
- * Row is deleted
- *
- * Case 1
- * Slave applied DELETE, INSERT in 1 batch
- *
- * After first batch, the row is present (due to INSERT), it is
- * refreshed.
- *
- * Case 2
- * Slave applied DELETE in 1 batch, INSERT in 2nd batch
- *
- * After first batch, the row is not present, it is refreshed
- * INSERT is then rejected.
- *
- * The problem of not being able to 'record' a DELETE vs DELETE conflict
- * is known. We attempt at least to give consistent behaviour for
- * DELETE vs DELETE conflicts by :
- * NOT refreshing a row when a DELETE vs DELETE conflict is detected
- * This should map all batching scenarios onto Case1.
- */
- if ((op_type == DELETE_ROW) &&
- (conflict_cause == ROW_DOES_NOT_EXIST))
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_EXCEPTIONS_WRITE_ERROR,
+ ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+
+ DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+ }
+
+ /* When the slave splits an epoch into batches, a conflict row detected
+ * and refreshed in an early batch can be written to by operations in
+ * a later batch. As the operations will not have applied, and the
+ * row has already been refreshed, we need not attempt to refresh
+ * it again
+ */
+ if ((conflict_cause == ROW_IN_CONFLICT) &&
+ (conflict_error.code == (int) error_op_after_refresh_op))
+ {
+ /* Attempt to apply an operation after the row was refreshed
+ * Ignore the error
+ */
+ DBUG_PRINT("info", ("Operation after refresh error - ignoring"));
+ break;
+ }
+
+ /* When a delete operation finds that the row does not exist, it indicates
+ * a DELETE vs DELETE conflict. If we refresh the row then we can get
+ * non deterministic behaviour depending on slave batching as follows :
+ * Row is deleted
+ *
+ * Case 1
+ * Slave applied DELETE, INSERT in 1 batch
+ *
+ * After first batch, the row is present (due to INSERT), it is
+ * refreshed.
+ *
+ * Case 2
+ * Slave applied DELETE in 1 batch, INSERT in 2nd batch
+ *
+ * After first batch, the row is not present, it is refreshed
+ * INSERT is then rejected.
+ *
+ * The problem of not being able to 'record' a DELETE vs DELETE conflict
+ * is known. We attempt at least to give consistent behaviour for
+ * DELETE vs DELETE conflicts by :
+ * NOT refreshing a row when a DELETE vs DELETE conflict is detected
+ * This should map all batching scenarios onto Case1.
+ */
+ if ((op_type == DELETE_ROW) &&
+ (conflict_cause == ROW_DOES_NOT_EXIST))
+ {
+ DBUG_PRINT("info", ("Delete vs Delete detected, NOT refreshing"));
+ break;
+ }
+
+ /*
+ We give the refresh operation some 'exceptions data', so that
+ it can be identified as part of conflict resolution when
+ handling operation errors.
+ Specifically we need to be able to handle duplicate row
+ refreshes.
+ As there is no unique exceptions data, we use a singleton.
+
+ We also need to 'force' the ANYVALUE of the row to 0 to
+ indicate that the refresh is locally-sourced.
+ Otherwise we can 'pickup' the ANYVALUE of a previous
+ update to the row.
+ If some previous update in this transaction came from a
+ Slave, then using its ANYVALUE can result in that Slave
+ ignoring this correction.
+ */
+ NdbOperation::OperationOptions options;
+ options.optionsPresent =
+ NdbOperation::OperationOptions::OO_CUSTOMDATA |
+ NdbOperation::OperationOptions::OO_ANYVALUE;
+ options.customData = &StaticRefreshExceptionsData;
+ options.anyValue = 0;
+
+ /* Create a refresh to operation to realign other clusters */
+ // TODO AnyValue
+ // TODO Do we ever get non-PK key?
+ // Keyless table?
+ // Unique index
+ const NdbOperation* refresh_op= conflict_trans->refreshTuple(key_rec,
+ (const char*) pk_row,
+ &options,
+ sizeof(options));
+ if (!refresh_op)
+ {
+ NdbError err = conflict_trans->getNdbError();
+
+ if (err.status == NdbError::TemporaryError)
+ {
+ /* Slave will roll back and retry entire transaction. */
+ ERR_RETURN(err);
+ }
+ else
+ {
+ char msg[FN_REFLEN];
+ my_snprintf(msg, sizeof(msg), "Row conflict handling "
+ "on table %s hit Ndb error %d '%s'",
+ table_name,
+ err.code,
+ err.message);
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_EXCEPTIONS_WRITE_ERROR,
+ ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+ /* Slave will stop replication. */
+ DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+ }
+ }
+ } while(0); // End of 'refresh' block
+ }
+
+ if (cfn_share &&
+ cfn_share->m_ex_tab != NULL)
+ {
+ NdbError err;
+ assert(err.code == 0);
+ do
{
- DBUG_PRINT("info", ("Delete vs Delete detected, NOT refreshing"));
- DBUG_RETURN(0);
- }
+ /* Have exceptions table, add row to it */
+ const NDBTAB *ex_tab= cfn_share->m_ex_tab;
- /* Create a refresh to operation to realign other clusters */
- // TODO AnyValue
- // TODO Do we ever get non-PK key?
- // Keyless table?
- // Unique index
- const NdbOperation* refresh_op= conflict_trans->refreshTuple(key_rec,
- (const char*) pk_row);
+ /* get insert op */
+ NdbOperation *ex_op= conflict_trans->getNdbOperation(ex_tab);
+ if (ex_op == NULL)
+ {
+ err= conflict_trans->getNdbError();
+ break;
+ }
+ if (ex_op->insertTuple() == -1)
+ {
+ err= ex_op->getNdbError();
+ break;
+ }
+ {
+ uint32 server_id= (uint32)::server_id;
+ uint32 master_server_id= (uint32) ndb_mi_get_master_server_id();
+ uint64 master_epoch= (uint64) g_ndb_slave_state.current_master_server_epoch;
+ uint32 count= (uint32)++(cfn_share->m_count);
+ if (ex_op->setValue((Uint32)0, (const char *)&(server_id)) ||
+ ex_op->setValue((Uint32)1, (const char *)&(master_server_id)) ||
+ ex_op->setValue((Uint32)2, (const char *)&(master_epoch)) ||
+ ex_op->setValue((Uint32)3, (const char *)&(count)))
+ {
+ err= ex_op->getNdbError();
+ break;
+ }
+ }
+ /* copy primary keys */
+ {
+ const int fixed_cols= 4;
+ int nkey= cfn_share->m_pk_cols;
+ int k;
+ for (k= 0; k < nkey; k++)
+ {
+ DBUG_ASSERT(pk_row != NULL);
+ const uchar* data= pk_row + cfn_share->m_offset[k];
+ if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
+ {
+ err= ex_op->getNdbError();
+ break;
+ }
+ }
+ }
+ } while (0);
- if (!refresh_op)
+ if (err.code != 0)
{
- err= conflict_trans->getNdbError();
- DBUG_RETURN(1);
+ char msg[FN_REFLEN];
+ my_snprintf(msg, sizeof(msg), "%s conflict handling "
+ "on table %s hit Ndb error %d '%s'",
+ handling_type,
+ table_name,
+ err.code,
+ err.message);
+
+ if (err.classification == NdbError::SchemaError)
+ {
+ /* Something up with Exceptions table schema, forget it */
+ NdbDictionary::Dictionary* dict= conflict_trans->getNdb()->getDictionary();
+ dict->removeTableGlobal(*(cfn_share->m_ex_tab), false);
+ cfn_share->m_ex_tab= NULL;
+ }
+ else if (err.status == NdbError::TemporaryError)
+ {
+ /* Slave will roll back and retry entire transaction. */
+ ERR_RETURN(err);
+ }
+ else
+ {
+ push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_EXCEPTIONS_WRITE_ERROR,
+ ER(ER_EXCEPTIONS_WRITE_ERROR), msg);
+ /* Slave will stop replication. */
+ DBUG_RETURN(ER_EXCEPTIONS_WRITE_ERROR);
+ }
}
- } /* if (cfn_share->m_flags & CFF_REFRESH_ROWS) */
+ } /* if (cfn_share->m_ex_tab != NULL) */
DBUG_RETURN(0);
};
@@ -5750,17 +6471,27 @@ int ha_ndbcluster::ndb_update_row(const
NdbInterpretedCode code(m_table, buffer,
sizeof(buffer)/sizeof(buffer[0]));
- if (thd->slave_thread && m_share->m_cfn_share &&
- m_share->m_cfn_share->m_conflict_fn)
+ if (thd->slave_thread)
{
- /* Conflict resolution in slave thread. */
+ bool conflict_handled = false;
+ /* Conflict resolution in slave thread. */
+
if (unlikely((error = prepare_conflict_detection(UPDATE_ROW,
key_rec,
old_data,
new_data,
+ trans,
&code,
- &options))))
+ &options,
+ conflict_handled))))
DBUG_RETURN(error);
+
+ if (unlikely(conflict_handled))
+ {
+ /* No need to continue with operation defintion */
+ /* TODO : Ensure batch execution */
+ DBUG_RETURN(0);
+ }
}
#endif /* HAVE_NDB_BINLOG */
if (options.optionsPresent !=0)
@@ -6029,17 +6760,27 @@ int ha_ndbcluster::ndb_delete_row(const
Uint32 buffer[ MAX_CONFLICT_INTERPRETED_PROG_SIZE ];
NdbInterpretedCode code(m_table, buffer,
sizeof(buffer)/sizeof(buffer[0]));
- if (thd->slave_thread && m_share->m_cfn_share &&
- m_share->m_cfn_share->m_conflict_fn)
+ if (thd->slave_thread)
{
+ bool conflict_handled = false;
+
/* Conflict resolution in slave thread. */
if (unlikely((error = prepare_conflict_detection(DELETE_ROW,
key_rec,
key_row, /* old_data */
NULL, /* new_data */
+ trans,
&code,
- &options))))
+ &options,
+ conflict_handled))))
DBUG_RETURN(error);
+
+ if (unlikely(conflict_handled))
+ {
+ /* No need to continue with operation definition */
+ /* TODO : Ensure batch execution */
+ DBUG_RETURN(0);
+ }
}
#endif /* HAVE_NDB_BINLOG */
if (options.optionsPresent != 0)
@@ -8053,6 +8794,8 @@ int ndbcluster_commit(handlerton *hton,
Thd_ndb *thd_ndb= get_thd_ndb(thd);
Ndb *ndb= thd_ndb->ndb;
NdbTransaction *trans= thd_ndb->trans;
+ bool retry_slave_trans = false;
+ (void) retry_slave_trans;
DBUG_ENTER("ndbcluster_commit");
DBUG_ASSERT(ndb);
@@ -8092,9 +8835,23 @@ int ndbcluster_commit(handlerton *hton,
if (thd->slave_thread)
{
- if (!g_ndb_slave_state.current_conflict_defined_op_count ||
- !thd_ndb->m_unsent_bytes ||
- !(res= execute_no_commit(thd_ndb, trans, TRUE)))
+#ifdef HAVE_NDB_BINLOG
+ /* If this slave transaction has included conflict detecting ops
+ * and some defined operations are not yet sent, then perform
+ * an execute(NoCommit) before committing, as conflict op handling
+ * is done by execute(NoCommit)
+ */
+ if (g_ndb_slave_state.conflict_flags & SCS_OPS_DEFINED)
+ {
+ if (thd_ndb->m_unsent_bytes)
+ res = execute_no_commit(thd_ndb, trans, TRUE);
+ }
+
+ if (likely(res == 0))
+ res = g_ndb_slave_state.atConflictPreCommit(retry_slave_trans);
+#endif /* HAVE_NDB_BINLOG */
+
+ if (likely(res == 0))
res= execute_commit(thd, thd_ndb, trans, 1, TRUE);
update_slave_api_stats(thd_ndb->ndb);
@@ -8123,11 +8880,49 @@ int ndbcluster_commit(handlerton *hton,
if (res != 0)
{
- const NdbError err= trans->getNdbError();
- const NdbOperation *error_op= trans->getNdbErrorOperation();
- res= ndb_to_mysql_error(&err);
- if (res != -1)
- ndbcluster_print_error(res, error_op);
+#ifdef HAVE_NDB_BINLOG
+ if (retry_slave_trans)
+ {
+ if (st_ndb_slave_state::MAX_RETRY_TRANS_COUNT >
+ g_ndb_slave_state.retry_trans_count++)
+ {
+ /*
+ Warning is necessary to cause retry from slave.cc
+ exec_relay_log_event()
+ */
+ push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+ ER_GET_TEMPORARY_ERRMSG,
+ SLAVE_SILENT_RETRY_MSG);
+ /*
+ Set retry count to zero to:
+ 1) Avoid consuming slave-temp-error retry attempts
+ 2) Ensure no inter-attempt sleep
+
+ Better fix : Save + restore retry count around transactional
+ conflict handling
+ */
+ ndb_mi_set_relay_log_trans_retries(0);
+ }
+ else
+ {
+ /*
+ Too many retries, print error and exit - normal
+ too many retries mechanism will cause exit
+ */
+ sql_print_error("Ndb slave retried transaction %u time(s) in vain. Giving up.",
+ st_ndb_slave_state::MAX_RETRY_TRANS_COUNT);
+ }
+ res= ER_GET_TEMPORARY_ERRMSG;
+ }
+ else
+#endif
+ {
+ const NdbError err= trans->getNdbError();
+ const NdbOperation *error_op= trans->getNdbErrorOperation();
+ res= ndb_to_mysql_error(&err);
+ if (res != -1)
+ ndbcluster_print_error(res, error_op);
+ }
}
else
{
@@ -9279,6 +10074,7 @@ int ha_ndbcluster::create(const char *na
switch(conflict_fn->type)
{
case CFT_NDB_EPOCH:
+ case CFT_NDB_EPOCH_TRANS:
{
/* Default 6 extra Gci bits allows 2^6 == 64
* epochs / saveGCP, a comfortable default
@@ -11813,6 +12609,12 @@ static int ndbcluster_init(void *p)
if (ndbcluster_inited)
DBUG_RETURN(FALSE);
+#ifdef HAVE_NDB_BINLOG
+ /* Check const alignment */
+ assert(DependencyTracker::InvalidTransactionId ==
+ Ndb_binlog_extra_row_info::InvalidTransactionId);
+#endif
+
pthread_mutex_init(&ndbcluster_mutex,MY_MUTEX_INIT_FAST);
pthread_mutex_init(&LOCK_ndb_util_thread, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_ndb_util_thread, NULL);
@@ -17130,6 +17932,18 @@ static MYSQL_SYSVAR_BOOL(
);
+my_bool opt_ndb_log_transaction_id;
+static MYSQL_SYSVAR_BOOL(
+ log_transaction_id, /* name */
+ opt_ndb_log_transaction_id, /* var */
+ PLUGIN_VAR_OPCMDARG,
+ "Log Ndb transaction identities per row in the Binlog",
+ NULL, /* check func. */
+ NULL, /* update func. */
+ 0 /* default */
+);
+
+
static MYSQL_SYSVAR_STR(
connectstring, /* name */
opt_ndb_connectstring, /* var */
@@ -17237,6 +18051,7 @@ static struct st_mysql_sys_var* system_v
MYSQL_SYSVAR(log_binlog_index),
MYSQL_SYSVAR(log_empty_epochs),
MYSQL_SYSVAR(log_apply_status),
+ MYSQL_SYSVAR(log_transaction_id),
MYSQL_SYSVAR(connectstring),
MYSQL_SYSVAR(mgmd_host),
MYSQL_SYSVAR(nodeid),
=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h 2011-09-02 09:16:56 +0000
+++ b/sql/ha_ndbcluster.h 2011-09-09 09:30:43 +0000
@@ -177,6 +177,32 @@ inline void set_binlog_use_update(NDB_SH
inline my_bool get_binlog_use_update(NDB_SHARE *share)
{ return (share->flags & NSF_BINLOG_USE_UPDATE) != 0; }
+enum enum_slave_trans_conflict_apply_state
+{
+ /* Normal with optional row-level conflict detection */
+ SAS_NORMAL,
+
+ /*
+ SAS_TRACK_TRANS_DEPENDENCIES
+ Track inter-transaction dependencies
+ */
+ SAS_TRACK_TRANS_DEPENDENCIES,
+
+ /*
+ SAS_APPLY_TRANS_DEPENDENCIES
+ Apply only non conflicting transactions
+ */
+ SAS_APPLY_TRANS_DEPENDENCIES
+};
+
+enum enum_slave_conflict_flags
+{
+ /* Conflict detection Ops defined */
+ SCS_OPS_DEFINED = 1,
+ /* Conflict detected on table with transactional resolution */
+ SCS_TRANS_CONFLICT_DETECTED_THIS_PASS = 2
+};
+
/*
State associated with the Slave thread
(From the Ndb handler's point of view)
@@ -184,17 +210,52 @@ inline my_bool get_binlog_use_update(NDB
struct st_ndb_slave_state
{
/* Counter values for current slave transaction */
- Uint32 current_conflict_defined_op_count;
Uint32 current_violation_count[CFT_NUMBER_OF_CFTS];
Uint64 current_master_server_epoch;
Uint64 current_max_rep_epoch;
+ uint8 conflict_flags; /* enum_slave_conflict_flags */
+ /* Transactional conflict detection */
+ Uint32 retry_trans_count;
+ Uint32 current_trans_row_conflict_count;
+ Uint32 current_trans_row_reject_count;
+ Uint32 current_trans_in_conflict_count;
/* Cumulative counter values */
Uint64 total_violation_count[CFT_NUMBER_OF_CFTS];
Uint64 max_rep_epoch;
Uint32 sql_run_id;
+ /* Transactional conflict detection */
+ Uint64 trans_row_conflict_count;
+ Uint64 trans_row_reject_count;
+ Uint64 trans_detect_iter_count;
+ Uint64 trans_in_conflict_count;
+ Uint64 trans_conflict_commit_count;
+
+ static const Uint32 MAX_RETRY_TRANS_COUNT = 100;
+
+ /*
+ Slave Apply State
+
+ State of Binlog application from Ndb point of view.
+ */
+ enum_slave_trans_conflict_apply_state trans_conflict_apply_state;
+
+ MEM_ROOT conflict_mem_root;
+ class DependencyTracker* trans_dependency_tracker;
/* Methods */
+ void atStartSlave();
+ int atPrepareConflictDetection(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* row_data,
+ Uint64 transaction_id,
+ bool& handle_conflict_now);
+ int atTransConflictDetected(Uint64 transaction_id);
+ int atConflictPreCommit(bool& retry_slave_trans);
+
+ void atBeginTransConflictHandling();
+ void atEndTransConflictHandling();
+
void atTransactionCommit();
void atTransactionAbort();
void atResetSlave();
@@ -204,6 +265,8 @@ struct st_ndb_slave_state
Uint64 row_epoch,
bool is_row_server_id_local);
+ void resetPerAttemptCounters();
+
st_ndb_slave_state();
};
@@ -479,8 +542,10 @@ private:
const NdbRecord* key_rec,
const uchar* old_data,
const uchar* new_data,
+ NdbTransaction* trans,
NdbInterpretedCode* code,
- NdbOperation::OperationOptions* options);
+ NdbOperation::OperationOptions* options,
+ bool& conflict_handled);
#endif
void setup_key_ref_for_ndb_record(const NdbRecord **key_rec,
const uchar **key_row,
=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc 2011-09-01 12:36:37 +0000
+++ b/sql/ha_ndbcluster_binlog.cc 2011-09-09 09:30:43 +0000
@@ -45,6 +45,7 @@ extern my_bool opt_ndb_log_binlog_index;
extern my_bool opt_ndb_log_apply_status;
extern ulong opt_ndb_extra_logging;
extern st_ndb_slave_state g_ndb_slave_state;
+extern my_bool opt_ndb_log_transaction_id;
bool ndb_log_empty_epochs(void);
@@ -3981,13 +3982,15 @@ static const st_conflict_fn_arg_def epoc
static const st_conflict_fn_def conflict_fns[]=
{
{ "NDB$MAX_DELETE_WIN", CFT_NDB_MAX_DEL_WIN,
- &resolve_col_args[0], row_conflict_fn_max_del_win },
+ &resolve_col_args[0], row_conflict_fn_max_del_win, 0 },
{ "NDB$MAX", CFT_NDB_MAX,
- &resolve_col_args[0], row_conflict_fn_max },
+ &resolve_col_args[0], row_conflict_fn_max, 0 },
{ "NDB$OLD", CFT_NDB_OLD,
- &resolve_col_args[0], row_conflict_fn_old },
+ &resolve_col_args[0], row_conflict_fn_old, 0 },
+ { "NDB$EPOCH_TRANS", CFT_NDB_EPOCH_TRANS,
+ &epoch_fn_args[0], row_conflict_fn_epoch, CF_TRANSACTIONAL},
{ "NDB$EPOCH", CFT_NDB_EPOCH,
- &epoch_fn_args[0], row_conflict_fn_epoch }
+ &epoch_fn_args[0], row_conflict_fn_epoch, 0 }
};
static unsigned n_conflict_fns=
@@ -4242,6 +4245,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
break;
}
case CFT_NDB_EPOCH:
+ case CFT_NDB_EPOCH_TRANS:
{
if (num_args > 1)
{
@@ -4254,7 +4258,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
/* Check that table doesn't have Blobs as we don't support that */
if (share->flags & NSF_BLOB_FLAG)
{
- my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH.");
+ my_snprintf(msg, msg_len, "Table has Blob column(s), not suitable for NDB$EPOCH[_TRANS].");
DBUG_PRINT("info", ("%s", msg));
DBUG_RETURN(-1);
}
@@ -4264,7 +4268,7 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
* represent SavePeriod/EpochPeriod
*/
if (ndbtab->getExtraRowGciBits() == 0)
- sql_print_information("Ndb Slave : CFT_NDB_EPOCH, low epoch resolution");
+ sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution");
if (ndbtab->getExtraRowAuthorBits() == 0)
{
@@ -5853,8 +5857,9 @@ ndb_find_binlog_index_row(ndb_binlog_ind
return row;
}
+
static int
-ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
+ndb_binlog_thread_handle_data_event(THD* thd, Ndb *ndb, NdbEventOperation *pOp,
ndb_binlog_index_row **rows,
injector::transaction &trans,
unsigned &trans_row_count,
@@ -6001,6 +6006,17 @@ ndb_binlog_thread_handle_data_event(Ndb
uint32 logged_server_id= anyValue;
ndbcluster_anyvalue_set_serverid(logged_server_id, originating_server_id);
+ /*
+ Get NdbApi transaction id for this event to put into Binlog
+ */
+ Ndb_binlog_extra_row_info extra_row_info;
+ if (opt_ndb_log_transaction_id)
+ {
+ extra_row_info.setFlags(Ndb_binlog_extra_row_info::NDB_ERIF_TRANSID);
+ extra_row_info.setTransactionId(pOp->getTransId());
+ thd->binlog_row_event_extra_data = extra_row_info.generateBuffer();
+ }
+
DBUG_ASSERT(trans.good());
DBUG_ASSERT(table != 0);
@@ -6170,6 +6186,11 @@ ndb_binlog_thread_handle_data_event(Ndb
break;
}
+ if (opt_ndb_log_transaction_id)
+ {
+ thd->binlog_row_event_extra_data = NULL;
+ }
+
if (share->flags & NSF_BLOB_FLAG)
{
my_free(blobs_buffer[0], MYF(MY_ALLOW_ZERO_PTR));
@@ -7078,7 +7099,8 @@ restart_cluster_failure:
#endif
if ((unsigned) pOp->getEventType() <
(unsigned) NDBEVENT::TE_FIRST_NON_DATA_EVENT)
- ndb_binlog_thread_handle_data_event(i_ndb, pOp, &rows, trans, trans_row_count, trans_slave_row_count);
+ ndb_binlog_thread_handle_data_event(thd, i_ndb, pOp, &rows, trans,
+ trans_row_count, trans_slave_row_count);
else
{
ndb_binlog_thread_handle_non_data_event(thd, pOp, *rows);
@@ -7500,4 +7522,121 @@ ulong opt_server_id_mask = ~0;
#endif
+Ndb_binlog_extra_row_info::
+Ndb_binlog_extra_row_info()
+{
+ flags = 0;
+ transactionId = InvalidTransactionId;
+ /* Prepare buffer with extra row info buffer bytes */
+ buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = 0;
+ buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] = ERIF_NDB;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setFlags(Uint16 _flags)
+{
+ flags = _flags;
+}
+
+void
+Ndb_binlog_extra_row_info::
+setTransactionId(Uint64 _transactionId)
+{
+ assert(_transactionId != InvalidTransactionId);
+ transactionId = _transactionId;
+};
+
+int
+Ndb_binlog_extra_row_info::
+loadFromBuffer(const uchar* extra_row_info)
+{
+ assert(extra_row_info);
+
+ Uint8 length = extra_row_info[ EXTRA_ROW_INFO_LEN_OFFSET ];
+ assert(length >= EXTRA_ROW_INFO_HDR_BYTES);
+ Uint8 payload_length = length - EXTRA_ROW_INFO_HDR_BYTES;
+ Uint8 format = extra_row_info[ EXTRA_ROW_INFO_FORMAT_OFFSET ];
+
+ if (likely(format == ERIF_NDB))
+ {
+ if (likely(payload_length >= FLAGS_SIZE))
+ {
+ const uchar* data = &extra_row_info[ EXTRA_ROW_INFO_HDR_BYTES ];
+ Uint8 nextPos = 0;
+
+ /* Have flags at least */
+ Uint16 netFlags;
+ memcpy(&netFlags, &data[ nextPos ], FLAGS_SIZE);
+ nextPos += FLAGS_SIZE;
+ flags = uint2korr((const char*) &netFlags);
+
+ if (flags & NDB_ERIF_TRANSID)
+ {
+ if (likely((nextPos + TRANSID_SIZE) <= payload_length))
+ {
+ /*
+ Correct length, retrieve transaction id, converting from
+ little endian if necessary.
+ */
+ Uint64 netTransId;
+ memcpy(&netTransId,
+ &data[ nextPos ],
+ TRANSID_SIZE);
+ nextPos += TRANSID_SIZE;
+ transactionId = uint8korr((const char*) &netTransId);
+ }
+ else
+ {
+ /*
+ Error - supposed to have transaction id, but
+ buffer too short
+ */
+ return -1;
+ }
+ }
+ }
+ }
+
+ /* We currently ignore other formats of extra binlog info, and
+ * different lengths.
+ */
+
+ return 0;
+}
+
+uchar*
+Ndb_binlog_extra_row_info::generateBuffer()
+{
+ /*
+ Here we write out the buffer in network format,
+ based on the current member settings.
+ */
+ Uint8 nextPos = EXTRA_ROW_INFO_HDR_BYTES;
+
+ if (flags)
+ {
+ /* Write current flags into buff */
+ Uint16 netFlags = uint2korr((const char*) &flags);
+ memcpy(&buff[ nextPos ], &netFlags, FLAGS_SIZE);
+ nextPos += FLAGS_SIZE;
+
+ if (flags & NDB_ERIF_TRANSID)
+ {
+ Uint64 netTransactionId = uint8korr((const char*) &transactionId);
+ memcpy(&buff[ nextPos ], &netTransactionId, TRANSID_SIZE);
+ nextPos += TRANSID_SIZE;
+ }
+
+ assert( nextPos <= MaxLen );
+ /* Set length */
+ assert( buff[ EXTRA_ROW_INFO_FORMAT_OFFSET ] == ERIF_NDB );
+ buff[ EXTRA_ROW_INFO_LEN_OFFSET ] = nextPos;
+
+ return buff;
+ }
+ return 0;
+}
+
+// #ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
#endif
=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h 2011-09-01 12:36:37 +0000
+++ b/sql/ha_ndbcluster_binlog.h 2011-09-09 09:30:43 +0000
@@ -287,3 +287,49 @@ void ndbcluster_anyvalue_set_serverid(Ui
#ifndef DBUG_OFF
void dbug_ndbcluster_anyvalue_set_userbits(Uint32& anyValue);
#endif
+
+/*
+ Helper for reading/writing Binlog extra row info
+ in Ndb format.
+ It contains an internal buffer, which can be passed
+ in the thd variable when writing binlog entries if
+ the object stays in scope around the write.
+*/
+class Ndb_binlog_extra_row_info
+{
+public:
+ static const Uint32 FLAGS_SIZE = sizeof(Uint16);
+ static const Uint32 TRANSID_SIZE = sizeof(Uint64);
+ static const Uint32 MaxLen =
+ EXTRA_ROW_INFO_HDR_BYTES +
+ FLAGS_SIZE +
+ TRANSID_SIZE;
+
+ static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+ enum Flags
+ {
+ NDB_ERIF_TRANSID = 0x1
+ };
+
+ Ndb_binlog_extra_row_info();
+
+ int loadFromBuffer(const uchar* extra_row_info_ptr);
+
+ Uint16 getFlags() const
+ {
+ return flags;
+ }
+ void setFlags(Uint16 _flags);
+ Uint64 getTransactionId() const
+ { return transactionId; };
+ void setTransactionId(Uint64 _transactionId);
+ uchar* getBuffPtr()
+ { return buff; };
+ uchar* generateBuffer();
+private:
+ uchar buff[MaxLen];
+ Uint16 flags;
+ Uint64 transactionId;
+};
+
=== modified file 'sql/ha_ndbcluster_glue.h'
--- a/sql/ha_ndbcluster_glue.h 2011-07-05 12:46:07 +0000
+++ b/sql/ha_ndbcluster_glue.h 2011-09-09 09:30:43 +0000
@@ -38,6 +38,8 @@
#include "transaction.h"
#include "sql_test.h" // print_where
#include "key.h" // key_restore
+#include "rpl_constants.h" // Transid in Binlog
+#include "slave.h" // Silent retry definition
#else
#include "mysql_priv.h"
#endif
=== modified file 'sql/log_event.cc'
--- a/sql/log_event.cc 2011-08-31 10:39:08 +0000
+++ b/sql/log_event.cc 2011-09-09 09:30:43 +0000
@@ -1965,6 +1965,32 @@ void Rows_log_event::print_verbose(IO_CA
const char *sql_command, *sql_clause1, *sql_clause2;
Log_event_type type_code= get_type_code();
+#ifndef MCP_WL5353
+ if (m_extra_row_data)
+ {
+ uint8 extra_data_len= m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
+ uint8 extra_payload_len= extra_data_len - EXTRA_ROW_INFO_HDR_BYTES;
+ assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
+
+ my_b_printf(file, "### Extra row data format: %u, len: %u :",
+ m_extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET],
+ extra_payload_len);
+ if (extra_payload_len)
+ {
+ /*
+ Buffer for hex view of string, including '0x' prefix,
+ 2 hex chars / byte and trailing 0
+ */
+ const int buff_len= 2 + (256 * 2) + 1;
+ char buff[buff_len];
+ str_to_hex(buff, (const char*) &m_extra_row_data[EXTRA_ROW_INFO_HDR_BYTES],
+ extra_payload_len);
+ my_b_printf(file, "%s", buff);
+ }
+ my_b_printf(file, "\n");
+ }
+#endif
+
switch (type_code) {
case WRITE_ROWS_EVENT:
sql_command= "INSERT INTO";
@@ -3161,6 +3187,48 @@ int Query_log_event::do_apply_event(Rela
return do_apply_event(rli, query, q_len);
}
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+
+static bool is_silent_error(THD* thd)
+{
+ DBUG_ENTER("is_silent_error");
+ List_iterator_fast<MYSQL_ERROR> it(thd->warning_info->warn_list());
+ MYSQL_ERROR *err;
+
+ while ((err= it++))
+ {
+ DBUG_PRINT("info", ("Error : %u : %s",
+ err->get_sql_errno(),
+ err->get_message_text()));
+
+ switch (err->get_sql_errno())
+ {
+ case ER_GET_TEMPORARY_ERRMSG:
+ {
+ /*
+ If we are rolling back due to an explicit request, do so
+ silently
+ */
+ if (strcmp(SLAVE_SILENT_RETRY_MSG, err->get_message_text()) == 0)
+ {
+ DBUG_PRINT("info", ("Silent retry"));
+ DBUG_RETURN(true);
+ }
+
+ break;
+ }
+ default:
+ break;
+ }
+ }
+
+ DBUG_RETURN(false);
+}
+
+// #ifdef HAVE_NDB_BINLOG
+#endif
+#endif // #ifndef MCP_WL5353
/**
@todo
@@ -3494,11 +3562,19 @@ Default database: '%s'. Query: '%s'",
*/
else if (thd->is_slave_error || thd->is_fatal_error)
{
- rli->report(ERROR_LEVEL, actual_error,
- "Error '%s' on query. Default database: '%s'. Query: '%s'",
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+ bool be_silent= is_silent_error(thd);
+ if (!be_silent)
+#endif
+#endif // #ifndef MCP_WL5353
+ {
+ rli->report(ERROR_LEVEL, actual_error,
+ "Error '%s' on query. Default database: '%s'. Query: '%s'",
(actual_error ? thd->stmt_da->message() :
- "unexpected success or fatal error"),
- print_slave_db_safe(thd->db), query_arg);
+ "unexpected success or fatal error"),
+ print_slave_db_safe(thd->db), query_arg);
+ }
thd->is_slave_error= 1;
}
@@ -7287,6 +7363,63 @@ const char *sql_ex_info::init(const char
return buf;
}
+#ifndef MCP_WL5353
+#ifndef DBUG_OFF
+#ifndef MYSQL_CLIENT
+static uchar dbug_extra_row_data_val= 0;
+
+/**
+ set_extra_data
+
+ Called during self-test to generate various
+ self-consistent binlog row event extra
+ thread data structures which can be checked
+ when reading the binlog.
+
+ @param thd Current thd
+ @param arr Buffer to use
+*/
+void set_extra_data(THD* thd, uchar* arr)
+{
+ assert(thd->binlog_row_event_extra_data == NULL);
+ uchar val= (dbug_extra_row_data_val++) %
+ (EXTRA_ROW_INFO_MAX_PAYLOAD + 1); /* 0 .. MAX_PAYLOAD + 1 */
+ arr[EXTRA_ROW_INFO_LEN_OFFSET]= val + EXTRA_ROW_INFO_HDR_BYTES;
+ arr[EXTRA_ROW_INFO_FORMAT_OFFSET]= val;
+ for (uchar i=0; i<val; i++)
+ arr[EXTRA_ROW_INFO_HDR_BYTES+i]= val;
+
+ thd->binlog_row_event_extra_data= arr;
+}
+
+#endif // #ifndef MYSQL_CLIENT
+
+/**
+ check_extra_data
+
+ Called during self-test to check that
+ binlog row event extra data is self-
+ consistent as defined by the set_extra_data
+ function above.
+
+ Will assert(false) if not.
+
+ @param extra_row_data
+*/
+void check_extra_data(uchar* extra_row_data)
+{
+ assert(extra_row_data);
+ uint16 len= extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET];
+ uint8 val= len - EXTRA_ROW_INFO_HDR_BYTES;
+ assert(extra_row_data[EXTRA_ROW_INFO_FORMAT_OFFSET] == val);
+ for (uint16 i= 0; i < val; i++)
+ {
+ assert(extra_row_data[EXTRA_ROW_INFO_HDR_BYTES + i] == val);
+ }
+}
+
+#endif // #ifndef DBUG_OFF
+#endif // #ifndef MCP_WL5353
/**************************************************************************
Rows_log_event member functions
@@ -7300,7 +7433,10 @@ Rows_log_event::Rows_log_event(THD *thd_
m_table(tbl_arg),
m_table_id(tid),
m_width(tbl_arg ? tbl_arg->s->fields : 1),
- m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
+ m_rows_buf(0), m_rows_cur(0), m_rows_end(0), m_flags(0)
+#ifndef MCP_WL5353
+ ,m_extra_row_data(0)
+#endif
#ifdef HAVE_REPLICATION
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
#endif
@@ -7318,6 +7454,33 @@ Rows_log_event::Rows_log_event(THD *thd_
set_flags(NO_FOREIGN_KEY_CHECKS_F);
if (thd_arg->variables.option_bits & OPTION_RELAXED_UNIQUE_CHECKS)
set_flags(RELAXED_UNIQUE_CHECKS_F);
+#ifndef MCP_WL5353
+#ifndef DBUG_OFF
+ uchar extra_data[255];
+ DBUG_EXECUTE_IF("extra_row_data_set",
+ /* Set extra row data to a known value */
+ set_extra_data(thd_arg, extra_data););
+#endif
+ if (thd_arg->binlog_row_event_extra_data)
+ {
+ /* Copy Extra data from thd into new event */
+ uint16 extra_data_len= thd_arg->get_binlog_row_event_extra_data_len();
+ assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
+
+ m_extra_row_data= (uchar*) my_malloc(extra_data_len, MYF(MY_WME));
+
+ if (likely(m_extra_row_data != NULL))
+ {
+ memcpy(m_extra_row_data, thd_arg->binlog_row_event_extra_data,
+ extra_data_len);
+ set_flags(EXTRA_ROW_EV_DATA_F);
+ }
+ }
+
+ DBUG_EXECUTE_IF("extra_row_data_set",
+ thd_arg->binlog_row_event_extra_data = NULL;);
+#endif // #ifndef MCP_WL5353
+
/* if bitmap_init fails, caught in is_valid() */
if (likely(!bitmap_init(&m_cols,
m_width <= sizeof(m_bitbuf)*8 ? m_bitbuf : NULL,
@@ -7349,6 +7512,9 @@ Rows_log_event::Rows_log_event(const cha
m_table(NULL),
#endif
m_table_id(0), m_rows_buf(0), m_rows_cur(0), m_rows_end(0)
+#ifndef MCP_WL5353
+ ,m_extra_row_data(0)
+#endif
#if !defined(MYSQL_CLIENT) && defined(HAVE_REPLICATION)
, m_curr_row(NULL), m_curr_row_end(NULL), m_key(NULL)
#endif
@@ -7378,8 +7544,35 @@ Rows_log_event::Rows_log_event(const cha
m_flags= uint2korr(post_start);
+#ifndef MCP_WL5353
+ uint16 extra_data_len= 0;
+ if ((m_flags & EXTRA_ROW_EV_DATA_F))
+ {
+ const uchar* extra_data_start= (const uchar*) post_start + 2;
+ extra_data_len= extra_data_start[EXTRA_ROW_INFO_LEN_OFFSET];
+ assert(m_extra_row_data == 0);
+ assert(extra_data_len >= EXTRA_ROW_INFO_HDR_BYTES);
+ DBUG_PRINT("debug", ("extra_data_len = %u",
+ extra_data_len));
+
+ m_extra_row_data= (uchar*) my_malloc(extra_data_len,
+ MYF(MY_WME));
+ if (likely(m_extra_row_data != NULL))
+ {
+ memcpy(m_extra_row_data, extra_data_start, extra_data_len);
+ }
+ }
+ DBUG_EXECUTE_IF("extra_row_data_check",
+ /* Check extra data has expected value */
+ check_extra_data(m_extra_row_data););
+#endif // #ifndef MCP_WL5353
+
uchar const *const var_start=
- (const uchar *)buf + common_header_len + post_header_len;
+ (const uchar *)buf + common_header_len + post_header_len
+#ifndef MCP_WL5353
+ + extra_data_len
+#endif
+ ;
uchar const *const ptr_width= var_start;
uchar *ptr_after_width= (uchar*) ptr_width;
DBUG_PRINT("debug", ("Reading from %p", ptr_after_width));
@@ -7459,6 +7652,9 @@ Rows_log_event::~Rows_log_event()
m_cols.bitmap= 0; // so no my_free in bitmap_free
bitmap_free(&m_cols); // To pair with bitmap_init().
my_free(m_rows_buf);
+#ifndef MCP_WL5353
+ my_free((uchar*)m_extra_row_data);
+#endif
}
int Rows_log_event::get_data_size()
@@ -7475,6 +7671,11 @@ int Rows_log_event::get_data_size()
int data_size= ROWS_HEADER_LEN;
data_size+= no_bytes_in_map(&m_cols);
data_size+= (uint) (end - buf);
+#ifndef MCP_WL5353
+ data_size+= m_extra_row_data ?
+ m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET] :
+ 0;
+#endif
if (type_code == UPDATE_ROWS_EVENT)
data_size+= no_bytes_in_map(&m_cols_ai);
@@ -7626,6 +7827,14 @@ int Rows_log_event::do_apply_event(Relay
else
thd->variables.option_bits&= ~OPTION_ALLOW_BATCH;
#endif
+
+#ifndef MCP_WL5353
+ if (get_flags(EXTRA_ROW_EV_DATA_F))
+ thd->binlog_row_event_extra_data = m_extra_row_data;
+ else
+ thd->binlog_row_event_extra_data = NULL;
+#endif
+
/* A small test to verify that objects have consistent types */
DBUG_ASSERT(sizeof(thd->variables.option_bits) == sizeof(OPTION_RELAXED_UNIQUE_CHECKS));
@@ -7734,6 +7943,12 @@ int Rows_log_event::do_apply_event(Relay
So we call set_time(), like in SBR. Presently it changes nothing.
*/
thd->set_time((time_t)when);
+#ifndef MCP_WL5353
+ if (get_flags(EXTRA_ROW_EV_DATA_F))
+ thd->binlog_row_event_extra_data = m_extra_row_data;
+ else
+ thd->binlog_row_event_extra_data = NULL;
+#endif
/*
Now we are in a statement and will stay in a statement until we
@@ -8043,7 +8258,22 @@ bool Rows_log_event::write_data_header(I
});
int6store(buf + RW_MAPID_OFFSET, (ulonglong)m_table_id);
int2store(buf + RW_FLAGS_OFFSET, m_flags);
+#ifndef MCP_WL5353
+ int rc = my_b_safe_write(file, buf, ROWS_HEADER_LEN);
+
+ if ((rc == 0) &&
+ (m_flags & EXTRA_ROW_EV_DATA_F))
+ {
+ /* Write extra row data */
+ rc = my_b_safe_write(file, m_extra_row_data,
+ m_extra_row_data[EXTRA_ROW_INFO_LEN_OFFSET]);
+ }
+
+ /* Function returns bool, where false(0) is success :( */
+ return (rc != 0);
+#else
return (my_b_safe_write(file, buf, ROWS_HEADER_LEN));
+#endif
}
bool Rows_log_event::write_data_body(IO_CACHE*file)
=== modified file 'sql/log_event.h'
--- a/sql/log_event.h 2011-08-31 10:39:08 +0000
+++ b/sql/log_event.h 2011-09-09 09:30:43 +0000
@@ -3569,6 +3569,13 @@ public:
values for all columns of the table.
*/
COMPLETE_ROWS_F = (1U << 3)
+
+#ifndef MCP_WL5353
+ /**
+ Indicates that additional information was appended to the event.
+ */
+ ,EXTRA_ROW_EV_DATA_F = (1U << 4)
+#endif
};
typedef uint16 flag_set;
@@ -3632,6 +3639,10 @@ public:
uint m_row_count; /* The number of rows added to the event */
+#ifndef MCP_WL5353
+ const uchar* get_extra_row_data() const { return m_extra_row_data; }
+#endif
+
protected:
/*
The constructors are protected since you're supposed to inherit
@@ -3680,6 +3691,11 @@ protected:
flag_set m_flags; /* Flags for row-level events */
+#ifndef MCP_WL5353
+ uchar *m_extra_row_data; /* Pointer to extra row data if any */
+ /* If non null, first byte is length */
+#endif
+
/* helper functions */
#if defined(MYSQL_SERVER) && defined(HAVE_REPLICATION)
=== added file 'sql/ndb_conflict_trans.cc'
--- a/sql/ndb_conflict_trans.cc 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict_trans.cc 2011-09-07 22:50:01 +0000
@@ -0,0 +1,778 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include "ndb_conflict_trans.h"
+
+#ifdef HAVE_NDB_BINLOG
+#include "my_sys.h"
+#include "my_base.h"
+
+/* Whether to track all transactions, or just
+ * 'interesting' ones
+ */
+#define TRACK_ALL_TRANSACTIONS 0
+
+/* Whether to check the transaction graph for
+ * correctness at runtime
+ */
+#define CHECK_TRANS_GRAPH 0
+
+/* st_row_event_key_info implementation */
+
+st_row_event_key_info::
+st_row_event_key_info(const NdbDictionary::Table* _table,
+ const uchar* _key_buff,
+ Uint32 _key_buff_len,
+ Uint64 _transaction_id):
+ tableObj(_table),
+ packed_key(_key_buff),
+ packed_key_len(_key_buff_len),
+ transaction_id(_transaction_id),
+ hash_next(NULL)
+{
+}
+
+Uint64
+st_row_event_key_info::getTransactionId() const
+{
+ return transaction_id;
+}
+
+void
+st_row_event_key_info::updateRowTransactionId(Uint64 mostRecentTransId)
+{
+ transaction_id = mostRecentTransId;
+}
+
+Uint32
+st_row_event_key_info::hashValue() const
+{
+ /* Include Table Object Id + primary key */
+ Uint32 h = (17 * 37) + tableObj->getObjectId();
+ for (Uint32 i=0; i < packed_key_len; i++)
+ h = (37 * h) + packed_key[i];
+ return h;
+}
+
+bool
+st_row_event_key_info::equal(const st_row_event_key_info* other) const
+{
+ /* Check same table + same PK */
+ return
+ ((tableObj == other->tableObj) &&
+ (packed_key_len == other->packed_key_len) &&
+ (memcmp(packed_key, other->packed_key, packed_key_len) == 0));
+};
+
+st_row_event_key_info*
+st_row_event_key_info::getNext() const
+{
+ return hash_next;
+};
+
+void
+st_row_event_key_info::setNext(st_row_event_key_info* _next)
+{
+ hash_next = _next;
+};
+
+
+/* st_trans_dependency implementation */
+
+st_trans_dependency::
+st_trans_dependency(st_transaction* _target_transaction,
+ st_transaction* _dependent_transaction,
+ const st_trans_dependency* _next)
+ : target_transaction(_target_transaction),
+ dependent_transaction(_dependent_transaction),
+ next_entry(_next),
+ hash_next(NULL)
+{
+};
+
+st_transaction*
+st_trans_dependency::getTargetTransaction() const
+{
+ return target_transaction;
+}
+
+st_transaction*
+st_trans_dependency::getDependentTransaction() const
+{
+ return dependent_transaction;
+}
+
+const st_trans_dependency*
+st_trans_dependency::getNextDependency() const
+{
+ return next_entry;
+}
+
+Uint32
+st_trans_dependency::hashValue() const
+{
+ /* Hash the ptrs in a rather nasty way */
+ UintPtr p =
+ ((UintPtr) target_transaction) ^
+ ((UintPtr) dependent_transaction);;
+
+ if (sizeof(p) == 8)
+ {
+ /* Xor two words of 64 bit ptr */
+ p =
+ (p & 0xffffffff) ^
+ ((((Uint64) p) >> 32) & 0xffffffff);
+ }
+
+ return 17 + (37 * (Uint32)p);
+};
+
+bool
+st_trans_dependency::equal(const st_trans_dependency* other) const
+{
+ return ((target_transaction == other->target_transaction) &&
+ (dependent_transaction == other->dependent_transaction));
+};
+
+st_trans_dependency*
+st_trans_dependency::getNext() const
+{
+ return hash_next;
+};
+
+void
+st_trans_dependency::setNext(st_trans_dependency* _next)
+{
+ hash_next = _next;
+};
+
+
+/* st_transaction implementation */
+
+st_transaction::st_transaction(Uint64 _transaction_id)
+ : transaction_id(_transaction_id),
+ in_conflict(false),
+ dependency_list_head(NULL),
+ hash_next(NULL)
+{
+};
+
+Uint64
+st_transaction::getTransactionId() const
+{
+ return transaction_id;
+}
+
+bool
+st_transaction::getInConflict() const
+{
+ return in_conflict;
+}
+
+void
+st_transaction::setInConflict()
+{
+ in_conflict = true;
+}
+
+const st_trans_dependency*
+st_transaction::getDependencyListHead() const
+{
+ return dependency_list_head;
+}
+
+void
+st_transaction::setDependencyListHead(st_trans_dependency* head)
+{
+ dependency_list_head = head;
+}
+
+/* Hash Api */
+Uint32
+st_transaction::hashValue() const
+{
+ return 17 + (37 * ((transaction_id & 0xffffffff) ^
+ (transaction_id >> 32 & 0xffffffff)));
+};
+
+bool
+st_transaction::equal(const st_transaction* other) const
+{
+ return transaction_id == other->transaction_id;
+};
+
+st_transaction*
+st_transaction::getNext() const
+{
+ return hash_next;
+};
+
+void
+st_transaction::setNext(st_transaction* _next)
+{
+ hash_next = _next;
+};
+
+
+
+/*
+ Unique HashMap(Set) of st_row_event_key_info ptrs, with bucket storage
+ allocated from st_mem_root_allocator
+*/
+template class HashMap2<st_row_event_key_info, true, st_mem_root_allocator>;
+template class HashMap2<st_transaction, true, st_mem_root_allocator>;
+template class HashMap2<st_trans_dependency, true, st_mem_root_allocator>;
+template class LinkedStack<Uint64, st_mem_root_allocator>;
+
+
+/**
+ * pack_key_to_buffer
+ *
+ * Given a table, key_record and record, this method will
+ * determine how many significant bytes the key contains,
+ * and if a buffer is passed, will copy the bytes into the
+ * buffer.
+ */
+static
+int
+pack_key_to_buffer(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* record,
+ uchar* buffer,
+ Uint32& buff_len)
+{
+ /* Loop over attributes in key record, determining their actual
+ * size based on column type and contents of record
+ * If buffer supplied, copy them contiguously to buffer
+ * return total length
+ */
+ Uint32 attr_id;
+ Uint32 buff_offset = 0;
+ NdbDictionary::getFirstAttrId(key_rec, attr_id);
+
+ do
+ {
+ Uint32 from_offset = 0;
+ Uint32 byte_len = 0;
+ const NdbDictionary::Column* key_col = table->getColumn(attr_id);
+ NdbDictionary::getOffset(key_rec, attr_id, from_offset);
+ assert( ! NdbDictionary::isNull(key_rec, (const char*) record, attr_id));
+
+ switch(key_col->getArrayType())
+ {
+ case NDB_ARRAYTYPE_FIXED:
+ byte_len = key_col->getSizeInBytes();
+ break;
+ case NDB_ARRAYTYPE_SHORT_VAR:
+ byte_len = record[from_offset];
+ from_offset++;
+ break;
+ case NDB_ARRAYTYPE_MEDIUM_VAR:
+ byte_len = uint2korr(&record[from_offset]);
+ from_offset+= 2;
+ break;
+ };
+ assert( (buff_offset + byte_len) <= buff_len );
+
+ if (buffer)
+ memcpy(&buffer[buff_offset], &record[from_offset], byte_len);
+
+ buff_offset+= byte_len;
+ } while (NdbDictionary::getNextAttrId(key_rec, attr_id));
+
+ buff_len = buff_offset;
+ return 0;
+};
+
+static
+Uint32 determine_packed_key_size(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* record)
+{
+ Uint32 key_size = ~Uint32(0);
+ /* Use pack_key_to_buffer to calculate length required */
+ pack_key_to_buffer(table,
+ key_rec,
+ record,
+ NULL,
+ key_size);
+ return key_size;
+};
+
+/* st_mem_root_allocator implementation */
+void*
+st_mem_root_allocator::alloc(void* ctx, size_t bytes)
+{
+ st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
+ return alloc_root(a->mem_root, bytes);
+};
+
+void*
+st_mem_root_allocator::calloc(void* ctx, size_t nelem, size_t bytes)
+{
+ st_mem_root_allocator* a = (st_mem_root_allocator*) ctx;
+ return alloc_root(a->mem_root,
+ nelem * bytes);
+}
+
+void
+st_mem_root_allocator::free(void* ctx, void* mem)
+{
+ /* Do nothing, will be globally freed when arena (mem_root)
+ * released
+ */
+};
+
+st_mem_root_allocator::st_mem_root_allocator(MEM_ROOT* _mem_root)
+ : mem_root(_mem_root)
+{
+};
+
+
+/* DependencyTracker implementation */
+
+DependencyTracker*
+DependencyTracker::newDependencyTracker(MEM_ROOT* mem_root)
+{
+ DependencyTracker* dt = NULL;
+ void* mem = alloc_root(mem_root,
+ sizeof(DependencyTracker));
+ if (mem)
+ {
+ dt = new (mem) DependencyTracker(mem_root);
+ }
+
+ return dt;
+}
+
+
+DependencyTracker::
+DependencyTracker(MEM_ROOT* mem_root)
+ : mra(mem_root), key_hash(&mra), trans_hash(&mra),
+ dependency_hash(&mra),
+ iteratorTodo(ITERATOR_STACK_BLOCKSIZE, &mra),
+ conflicting_trans_count(0),
+ error_text(NULL)
+{
+ /* TODO Get sizes from somewhere */
+ key_hash.setSize(1024);
+ trans_hash.setSize(100);
+ dependency_hash.setSize(100);
+};
+
+
+int
+DependencyTracker::
+track_operation(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* row,
+ Uint64 transaction_id)
+{
+ DBUG_ENTER("track_operation");
+
+ Uint32 required_buff_size = determine_packed_key_size(table,
+ key_rec,
+ row);
+ DBUG_PRINT("info", ("Required length for key : %u", required_buff_size));
+
+ /* Alloc space for packed key and struct*/
+ uchar* packed_key_buff = (uchar*) alloc_root(mra.mem_root,
+ required_buff_size);
+ void* element_mem = alloc_root(mra.mem_root,
+ sizeof(st_row_event_key_info));
+
+ if (pack_key_to_buffer(table,
+ key_rec,
+ row,
+ packed_key_buff,
+ required_buff_size))
+ {
+ DBUG_RETURN(-1);
+ }
+
+ if (TRACK_ALL_TRANSACTIONS)
+ {
+ st_transaction* transEntry = get_or_create_transaction(transaction_id);
+ if (!transEntry)
+ {
+ error_text = "track_operation : Failed to get or create transaction";
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ }
+ }
+
+ st_row_event_key_info* key_info = new (element_mem)
+ st_row_event_key_info(table,
+ packed_key_buff,
+ required_buff_size,
+ transaction_id);
+
+ /* Now try to add element to hash */
+ if (! key_hash.add(key_info))
+ {
+ /*
+ Already an element in the keyhash with this primary key
+ If it's for the same transaction then ignore, otherwise
+ it's an inter-transaction dependency
+ */
+ st_row_event_key_info* existing = key_hash.get(key_info);
+
+ Uint64 existingTransIdOnRow = existing->getTransactionId();
+ Uint64 newTransIdOnRow = key_info->getTransactionId();
+
+ if (existingTransIdOnRow != newTransIdOnRow)
+ {
+ int res = add_dependency(existingTransIdOnRow,
+ newTransIdOnRow);
+ /*
+ Update stored transaction_id to be latest for key.
+ Further key operations on this row will depend on this
+ transaction, and transitively on the previous
+ transaction.
+ */
+ existing->updateRowTransactionId(newTransIdOnRow);
+
+ DBUG_RETURN(res);
+ }
+ else
+ {
+ /*
+ How can we have two updates to the same row with the
+ same transaction id? Only if the transaction id
+ is invalid (e.g. not set)
+ */
+ if (existingTransIdOnRow != InvalidTransactionId)
+ {
+ assert(false);
+ DBUG_RETURN(-1);
+ }
+ }
+ }
+
+ DBUG_RETURN(0);
+};
+
+int
+DependencyTracker::
+mark_conflict(Uint64 trans_id)
+{
+ DBUG_ENTER("mark_conflict");
+ DBUG_PRINT("info", ("trans_id : %llu", trans_id));
+
+ st_transaction* entry = get_or_create_transaction(trans_id);
+ if (!entry)
+ {
+ error_text = "mark_conflict : get_or_create_transaction() failure";
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ }
+
+ if (entry->getInConflict())
+ {
+ /* Nothing to do here */
+ DBUG_RETURN(0);
+ }
+
+ /* Have entry, mark it, and any dependents */
+ bool fetch_node_dependents;
+ st_transaction* dependent = entry;
+ reset_dependency_iterator();
+ do
+ {
+ DBUG_PRINT("info", ("Visiting transaction %llu, conflict : %u",
+ dependent->getTransactionId(),
+ dependent->getInConflict()));
+ /*
+ If marked already, don't fetch dependents, as
+ they will also be marked already
+ */
+ fetch_node_dependents = false;
+
+ if (!dependent->getInConflict())
+ {
+ dependent->setInConflict();
+ conflicting_trans_count++;
+ fetch_node_dependents = true;
+ }
+ } while ((dependent = get_next_dependency(dependent, fetch_node_dependents)));
+
+ assert( verify_graph() );
+
+ DBUG_RETURN(0);
+}
+
+bool
+DependencyTracker::in_conflict(Uint64 trans_id)
+{
+ DBUG_ENTER("in_conflict");
+ DBUG_PRINT("info", ("trans_id %llu", trans_id));
+ st_transaction key(trans_id);
+ const st_transaction* entry = NULL;
+
+ /*
+ If transaction hash entry exists, check it for
+ conflicts. If it doesn't exist, no conflict
+ */
+ if ((entry = trans_hash.get(&key)))
+ {
+ DBUG_PRINT("info", ("in_conflict : %u", entry->getInConflict()));
+ DBUG_RETURN(entry->getInConflict());
+ }
+ else
+ {
+ assert(! TRACK_ALL_TRANSACTIONS);
+ }
+ DBUG_RETURN(false);
+};
+
+st_transaction*
+DependencyTracker::
+get_or_create_transaction(Uint64 trans_id)
+{
+ DBUG_ENTER("get_or_create_transaction");
+ st_transaction transKey(trans_id);
+ st_transaction* transEntry = NULL;
+
+ if (! (transEntry = trans_hash.get(&transKey)))
+ {
+ /*
+ Transaction does not exist. Allocate it
+ and add to the hash
+ */
+ DBUG_PRINT("info", ("Creating new hash entry for transaction (%llu)",
+ trans_id));
+
+ transEntry = (st_transaction*)
+ st_mem_root_allocator::alloc(&mra, sizeof(st_transaction));
+
+ if (transEntry)
+ {
+ new (transEntry) st_transaction(trans_id);
+
+ if (!trans_hash.add(transEntry))
+ {
+ st_mem_root_allocator::free(&mra, transEntry); /* For show */
+ transEntry = NULL;
+ }
+ }
+ }
+
+ DBUG_RETURN(transEntry);
+}
+
+int
+DependencyTracker::
+add_dependency(Uint64 trans_id, Uint64 dependent_trans_id)
+{
+ DBUG_ENTER("add_dependency");
+ DBUG_PRINT("info", ("Recording dependency of %llu on %llu",
+ dependent_trans_id, trans_id));
+ st_transaction* targetEntry = get_or_create_transaction(trans_id);
+ if (!targetEntry)
+ {
+ error_text = "add_dependency : Failed get_or_create_transaction";
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ }
+
+ st_transaction* dependentEntry = get_or_create_transaction(dependent_trans_id);
+ if (!dependentEntry)
+ {
+ error_text = "add_dependency : Failed get_or_create_transaction";
+ DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+ }
+
+ /* Now lookup dependency. Add it if not already present */
+ st_trans_dependency depKey(targetEntry, dependentEntry, NULL);
+ st_trans_dependency* dep = NULL;
+ if (! (dep = dependency_hash.get(&depKey)))
+ {
+ DBUG_PRINT("info", ("Creating new dependency hash entry for "
+ "dependency of %llu on %llu.",
+ dependentEntry->getTransactionId(),
+ targetEntry->getTransactionId()));
+
+ dep = (st_trans_dependency*)
+ st_mem_root_allocator::alloc(&mra, sizeof(st_trans_dependency));
+
+ new (dep) st_trans_dependency(targetEntry, dependentEntry,
+ targetEntry->getDependencyListHead());
+
+ targetEntry->setDependencyListHead(dep);
+
+ /* New dependency, propagate in_conflict if necessary */
+ if (targetEntry->getInConflict())
+ {
+ DBUG_PRINT("info", ("Marking new dependent as in-conflict"));
+ DBUG_RETURN(mark_conflict(dependentEntry->getTransactionId()));
+ }
+ }
+
+ assert(verify_graph());
+
+ DBUG_RETURN(0);
+};
+
+void
+DependencyTracker::
+reset_dependency_iterator()
+{
+ iteratorTodo.reset();
+};
+
+st_transaction*
+DependencyTracker::
+get_next_dependency(const st_transaction* current,
+ bool include_dependents_of_current)
+{
+ DBUG_ENTER("get_next_dependency");
+ DBUG_PRINT("info", ("node : %llu", current->getTransactionId()));
+ /*
+ Depth first traverse, with option to ignore sub graphs.
+ */
+ if (include_dependents_of_current)
+ {
+ /* Add dependents to stack */
+ const st_trans_dependency* dependency = current->getDependencyListHead();
+
+ while (dependency)
+ {
+ assert(dependency->getTargetTransaction() == current);
+ DBUG_PRINT("info", ("Adding dependency %llu->%llu",
+ dependency->getDependentTransaction()->getTransactionId(),
+ dependency->getTargetTransaction()->getTransactionId()));
+
+ Uint64 dependentTransactionId =
+ dependency->getDependentTransaction()->getTransactionId();
+ iteratorTodo.push(dependentTransactionId);
+ dependency= dependency->getNextDependency();
+ }
+ }
+
+ Uint64 nextId;
+ if (iteratorTodo.pop(nextId))
+ {
+ DBUG_PRINT("info", ("Returning transaction id %llu",
+ nextId));
+ st_transaction key(nextId);
+ st_transaction* dependent = trans_hash.get(&key);
+ assert(dependent);
+ DBUG_RETURN(dependent);
+ }
+
+ assert(iteratorTodo.size() == 0);
+ DBUG_PRINT("info", ("No more dependencies to visit"));
+ DBUG_RETURN(NULL);
+};
+
+void
+DependencyTracker::
+dump_dependents(Uint64 trans_id)
+{
+ fprintf(stderr, "Dumping dependents of transid %llu : ", trans_id);
+
+ st_transaction key(trans_id);
+ const st_transaction* dependent = NULL;
+
+ if ((dependent = trans_hash.get(&key)))
+ {
+ reset_dependency_iterator();
+ const char* comma = ", ";
+ const char* sep = "";
+ do
+ {
+ {
+ fprintf(stderr, "%s%llu%s", sep, dependent->getTransactionId(),
+ (dependent->getInConflict()?"-C":""));
+ sep = comma;
+ }
+ } while ((dependent = get_next_dependency(dependent)));
+ fprintf(stderr, "\n");
+ }
+ else
+ {
+ fprintf(stderr, "None\n");
+ }
+};
+
+bool
+DependencyTracker::
+verify_graph()
+{
+ if (! CHECK_TRANS_GRAPH)
+ return true;
+
+ /*
+ Check the graph structure obeys its invariants
+
+ 1) There are no cycles in the graph such that
+ a transaction is a dependent of itself
+
+ 2) If a transaction is marked in_conflict, all
+ of its dependents (transitively), are also
+ marked in conflict
+
+ This is expensive to verify, so not always on
+ */
+ HashMap2<st_transaction, true, st_mem_root_allocator>::Iterator it(trans_hash);
+
+ st_transaction* root = NULL;
+
+ while ((root = it.next()))
+ {
+ bool in_conflict = root->getInConflict();
+
+ /* Now visit all dependents */
+ st_transaction* dependent = root;
+ reset_dependency_iterator();
+
+ while((dependent = get_next_dependency(dependent, true)))
+ {
+ if (dependent == root)
+ {
+ /* Must exit, or we'll be here forever */
+ fprintf(stderr, "Error : Cycle discovered in graph\n");
+ abort();
+ return false;
+ }
+
+ if (in_conflict &&
+ ! dependent->getInConflict())
+ {
+ fprintf(stderr, "Error : Dependent transaction not marked in-conflict\n");
+ abort();
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+
+const char*
+DependencyTracker::get_error_text() const
+{
+ return error_text;
+};
+
+Uint32
+DependencyTracker::get_conflict_count() const
+{
+ return conflicting_trans_count;
+}
+
+/* #ifdef HAVE_NDB_BINLOG */
+
+#endif
=== added file 'sql/ndb_conflict_trans.h'
--- a/sql/ndb_conflict_trans.h 1970-01-01 00:00:00 +0000
+++ b/sql/ndb_conflict_trans.h 2011-09-07 22:50:01 +0000
@@ -0,0 +1,337 @@
+/*
+ Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_CONFLICT_TRANS_H
+#define NDB_CONFLICT_TRANS_H
+
+#include <my_global.h>
+
+#ifdef HAVE_NDB_BINLOG
+#include <ndbapi/NdbApi.hpp>
+#include <util/HashMap2.hpp>
+#include <util/LinkedStack.hpp>
+
+/*
+ This file defines structures for detecting dependencies between
+ transactions based on the rows they update.
+ It is used when applying row updates as part of the MySQLD slave.
+*/
+
+/**
+ * st_row_event_key_info
+ *
+ * This struct describes a row event applied by the Slave, based
+ * on its table, key and transaction id.
+ * Instances of this struct are placed in a hash structure where
+ * the {table, key} are the key, and the transaction id is the
+ * 'data'.
+ * This hash is used to detect when different transactions in an
+ * epoch affect the same row, which implies a dependency between
+ * the transactions.
+ */
+class st_row_event_key_info
+{
+public:
+ /**
+ * User api
+ */
+ st_row_event_key_info(const NdbDictionary::Table* _table,
+ const uchar* _key_buff,
+ Uint32 _key_buff_len,
+ Uint64 _transaction_id);
+ Uint64 getTransactionId() const;
+ void updateRowTransactionId(Uint64 mostRecentTransId);
+
+ /**
+ * Hash Api
+ */
+ Uint32 hashValue() const;
+ bool equal(const st_row_event_key_info* other) const;
+ st_row_event_key_info* getNext() const;
+ void setNext(st_row_event_key_info* _next);
+
+private:
+ /* Key : Table and Primary Key */
+ const NdbDictionary::Table* tableObj;
+ const uchar* packed_key;
+ Uint32 packed_key_len;
+
+ /* Data : Transaction id */
+ Uint64 transaction_id;
+
+ /* Next ptr for hash */
+ st_row_event_key_info* hash_next;
+};
+
+
+class st_transaction;
+
+/**
+ st_trans_dependency
+ Entry in dependency hash.
+ Describes inter-transaction dependency, and comprises part
+ of list of other dependents of target_transaction
+*/
+class st_trans_dependency
+{
+public:
+ /* User Api */
+ st_trans_dependency(st_transaction* _target_transaction,
+ st_transaction* _dependent_transaction,
+ const st_trans_dependency* _next);
+
+ st_transaction* getTargetTransaction() const;
+ st_transaction* getDependentTransaction() const;
+ const st_trans_dependency* getNextDependency() const;
+
+
+ /* Hash Api */
+ Uint32 hashValue() const;
+ bool equal(const st_trans_dependency* other) const;
+ st_trans_dependency* getNext() const;
+ void setNext(st_trans_dependency* _next);
+
+private:
+ /* Key */
+ st_transaction* target_transaction;
+ st_transaction* dependent_transaction;
+
+ /* Rest of co-dependents of target_transaction */
+ const st_trans_dependency* next_entry;
+
+ st_trans_dependency* hash_next;
+};
+
+
+
+/**
+ st_transaction
+ Entry in transaction hash, indicates whether transaction
+ is in conflict, and has list of dependents
+*/
+class st_transaction
+{
+public:
+ /* User Api */
+ st_transaction(Uint64 _transaction_id);
+
+ Uint64 getTransactionId() const;
+ bool getInConflict() const;
+ void setInConflict();
+ const st_trans_dependency* getDependencyListHead() const;
+ void setDependencyListHead(st_trans_dependency* head);
+
+ /* Hash Api */
+ Uint32 hashValue() const;
+ bool equal(const st_transaction* other) const;
+ st_transaction* getNext() const;
+ void setNext(st_transaction* _next);
+
+private:
+ /* Key */
+ Uint64 transaction_id;
+
+ /* Data */
+ /* Is this transaction (and therefore its dependents) in conflict? */
+ bool in_conflict;
+ /* Head of list of dependencies */
+ st_trans_dependency* dependency_list_head;
+
+ /* Hash ptr */
+ st_transaction* hash_next;
+};
+
+typedef struct st_mem_root MEM_ROOT;
+
+/**
+ * Allocator type which internally uses a MySQLD mem_root
+ * Used as a template parameter for Ndb ADTs
+ */
+struct st_mem_root_allocator
+{
+ MEM_ROOT* mem_root;
+
+ static void* alloc(void* ctx, size_t bytes);
+ static void* calloc(void* ctx, size_t nelem, size_t bytes);
+ static void free(void* ctx, void* mem);
+ st_mem_root_allocator(MEM_ROOT* _mem_root);
+};
+
+
+class DependencyTracker
+{
+public:
+ static const Uint64 InvalidTransactionId = ~Uint64(0);
+
+ /**
+ newDependencyTracker
+
+ Factory method to get a DependencyTracker object, using
+ memory from the passed mem_root.
+ To discard dependency tracker, just free the passed mem_root.
+ */
+ static DependencyTracker* newDependencyTracker(MEM_ROOT* mem_root);
+
+ /**
+ track_operation
+
+ This method records the operation on the passed
+ table + primary key as belonging to the passed
+ transaction.
+
+ If there is already a recorded operation on the
+ passed table + primary key from a different transaction
+ then a transaction dependency is recorded.
+ */
+ int track_operation(const NdbDictionary::Table* table,
+ const NdbRecord* key_rec,
+ const uchar* row,
+ Uint64 transaction_id);
+
+ /**
+ mark_conflict
+
+ Record that a particular transaction is in conflict.
+ This will also mark any dependent transactions as in
+ conflict.
+ */
+ int mark_conflict(Uint64 trans_id);
+
+ /**
+ in_conflict
+
+ Returns true if the supplied transaction_id is marked as
+ in conflict
+ */
+ bool in_conflict(Uint64 trans_id);
+
+ /**
+ get_error_text
+
+ Returns string containing error description.
+ NULL if no error.
+ */
+ const char* get_error_text() const;
+
+ /**
+ get_conflict_count
+
+ Returns number of transactions marked as in-conflict
+ */
+ Uint32 get_conflict_count() const;
+
+private:
+ DependencyTracker(MEM_ROOT* mem_root);
+
+ /**
+ get_or_create_transaction
+
+ Get or create the transaction object for the
+ given transaction id.
+ Returns Null on allocation failure.
+ */
+ st_transaction* get_or_create_transaction(Uint64 trans_id);
+
+ /**
+ add_dependency
+
+ This method records a dependency between the two
+ passed transaction ids
+ */
+ int add_dependency(Uint64 trans_id, Uint64 dependent_trans_id);
+
+ /**
+ reset_dependency_iterator
+
+ Reset dependency iterator.
+ Required before using get_next_dependency()
+ */
+ void reset_dependency_iterator();
+
+ /**
+ get_next_dependency
+ Gets next dependency in dependency graph.
+ Performs breadth first search from start node.
+
+ include_dependents_of_current = false causes the traversal to skip
+ dependents of the current node.
+ */
+ st_transaction* get_next_dependency(const st_transaction* current,
+ bool include_dependents_of_current = true);
+
+ /**
+ dump_dependents
+
+ Debugging function
+ */
+ void dump_dependents(Uint64 trans_id);
+
+ /**
+ verify_graph
+
+ Internal invariant checking function.
+ */
+ bool verify_graph();
+
+ /* MemRoot allocator class instance */
+ st_mem_root_allocator mra;
+
+ /*
+ key_hash
+ Map of {Table, PK} -> TransID
+ Used to find inter-transaction dependencies
+ Attempt to add duplicate entry to the key_hash indicates
+ transaction dependency from existing entry to duplicate.
+ */
+ HashMap2<st_row_event_key_info, true, st_mem_root_allocator> key_hash;
+
+ /*
+ trans_hash
+ Map of {TransId} -> {in_conflict, List of dependents}
+ Used to record which transactions are in-conflict, and what
+ their dependencies are.
+ Transactions not marked in-conflict, and with no dependencies or
+ dependents, are not placed in this hash.
+ */
+ HashMap2<st_transaction, true, st_mem_root_allocator> trans_hash;
+
+ /*
+ dependency_hash
+ Map of {TransIdFrom, TransIdTo}
+ Used to ensure dependencies are added only once, for efficiency.
+ Elements are linked from the trans_hash entry for TransIdFrom.
+ */
+ HashMap2<st_trans_dependency, true, st_mem_root_allocator> dependency_hash;
+
+ /*
+ iteratorTodo
+ Stack of transaction Ids to be visited during breadth first search
+ when marking dependents as in conflict.
+ */
+ static const Uint32 ITERATOR_STACK_BLOCKSIZE = 10;
+ LinkedStack<Uint64, st_mem_root_allocator> iteratorTodo;
+
+ Uint32 conflicting_trans_count;
+
+ const char* error_text;
+};
+
+/* ifdef HAVE_NDB_BINLOG */
+#endif
+
+// #ifndef NDB_CONFLICT_TRANS_H
+#endif
=== modified file 'sql/ndb_mi.cc'
--- a/sql/ndb_mi.cc 2011-06-30 11:01:43 +0000
+++ b/sql/ndb_mi.cc 2011-09-09 09:30:43 +0000
@@ -82,6 +82,16 @@ bool ndb_mi_get_in_relay_log_statement(R
return (rli->get_flag(Relay_log_info::IN_STMT) != 0);
}
+ulong ndb_mi_get_relay_log_trans_retries()
+{
+ return active_mi->rli.trans_retries;
+}
+
+void ndb_mi_set_relay_log_trans_retries(ulong number)
+{
+ active_mi->rli.trans_retries = number;
+}
+
/* #ifdef HAVE_NDB_BINLOG */
#endif
=== modified file 'sql/ndb_mi.h'
--- a/sql/ndb_mi.h 2011-06-29 23:28:01 +0000
+++ b/sql/ndb_mi.h 2011-09-07 22:50:01 +0000
@@ -42,6 +42,8 @@ uint32 ndb_mi_get_slave_run_id();
Relay log info related functions
*/
bool ndb_mi_get_in_relay_log_statement(class Relay_log_info* rli);
+ulong ndb_mi_get_relay_log_trans_retries();
+void ndb_mi_set_relay_log_trans_retries(ulong number);
// #ifndef NDB_MI_H
#endif
=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h 2011-07-08 15:05:28 +0000
+++ b/sql/ndb_share.h 2011-09-09 09:30:43 +0000
@@ -39,6 +39,7 @@ enum enum_conflict_fn_type
,CFT_NDB_OLD
,CFT_NDB_MAX_DEL_WIN
,CFT_NDB_EPOCH
+ ,CFT_NDB_EPOCH_TRANS
,CFT_NUMBER_OF_CFTS /* End marker */
};
@@ -75,7 +76,8 @@ enum enum_conflicting_op_type
{ /* NdbApi */
WRITE_ROW, /* insert (!write) */
UPDATE_ROW, /* update */
- DELETE_ROW /* delete */
+ DELETE_ROW, /* delete */
+ REFRESH_ROW /* refresh */
};
/*
@@ -84,27 +86,34 @@ enum enum_conflicting_op_type
Type of function used to prepare for conflict detection on
an NdbApi operation
*/
-typedef int (* prepare_detect_func) (struct NDB_CONFLICT_FN_SHARE* cfn_share,
+typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
enum_conflicting_op_type op_type,
const uchar* old_data,
const uchar* new_data,
const MY_BITMAP* write_set,
class NdbInterpretedCode* code);
+enum enum_conflict_fn_flags
+{
+ CF_TRANSACTIONAL = 1
+};
+
struct st_conflict_fn_def
{
const char *name;
enum_conflict_fn_type type;
const st_conflict_fn_arg_def* arg_defs;
prepare_detect_func prep_func;
+ uint8 flags; /* enum_conflict_fn_flags */
};
/* What sort of conflict was found */
enum enum_conflict_cause
{
- ROW_ALREADY_EXISTS,
- ROW_DOES_NOT_EXIST,
- ROW_IN_CONFLICT
+ ROW_ALREADY_EXISTS, /* On insert */
+ ROW_DOES_NOT_EXIST, /* On Update, Delete */
+ ROW_IN_CONFLICT, /* On Update, Delete */
+ TRANS_IN_CONFLICT /* Any of above, or implied by transaction */
};
/* NdbOperation custom data which points out handler and record. */
@@ -113,15 +122,16 @@ struct Ndb_exceptions_data {
const NdbRecord* key_rec;
const uchar* row;
enum_conflicting_op_type op_type;
+ Uint64 trans_id;
};
-enum enum_conflict_fn_flags
+enum enum_conflict_fn_table_flags
{
- CFF_NONE = 0,
+ CFF_NONE = 0,
CFF_REFRESH_ROWS = 1
};
-struct NDB_CONFLICT_FN_SHARE {
+typedef struct st_ndbcluster_conflict_fn_share {
const st_conflict_fn_def* m_conflict_fn;
/* info about original table */
@@ -134,7 +144,7 @@ struct NDB_CONFLICT_FN_SHARE {
const NdbDictionary::Table *m_ex_tab;
uint32 m_count;
-};
+} NDB_CONFLICT_FN_SHARE;
#endif
=== modified file 'sql/rpl_constants.h'
--- a/sql/rpl_constants.h 2011-07-05 12:46:07 +0000
+++ b/sql/rpl_constants.h 2011-09-09 09:30:43 +0000
@@ -15,4 +15,46 @@ enum Incident {
INCIDENT_COUNT
};
+#ifndef MCP_WL5353
+/**
+ Enumeration of the reserved formats of Binlog extra row information
+*/
+enum ExtraRowInfoFormat {
+ /** Ndb format */
+ ERIF_NDB = 0,
+
+ /** Reserved formats 0 -> 63 inclusive */
+ ERIF_LASTRESERVED = 63,
+
+ /**
+ Available / uncontrolled formats
+ 64 -> 254 inclusive
+ */
+ ERIF_OPEN1 = 64,
+ ERIF_OPEN2 = 65,
+
+ ERIF_LASTOPEN = 254,
+
+ /**
+ Multi-payload format 255
+
+ Length is total length, payload is sequence of
+ sub-payloads with their own headers containing
+ length + format.
+ */
+ ERIF_MULTI = 255
+};
+
+/*
+ 1 byte length, 1 byte format
+ Length is total length in bytes, including 2 byte header
+ Length values 0 and 1 are currently invalid and reserved.
+*/
+#define EXTRA_ROW_INFO_LEN_OFFSET 0
+#define EXTRA_ROW_INFO_FORMAT_OFFSET 1
+#define EXTRA_ROW_INFO_HDR_BYTES 2
+#define EXTRA_ROW_INFO_MAX_PAYLOAD (255 - EXTRA_ROW_INFO_HDR_BYTES)
+
+#endif // #ifndef MCP_WL5353
+
#endif /* RPL_CONSTANTS_H */
=== modified file 'sql/slave.h'
--- a/sql/slave.h 2011-08-31 10:39:08 +0000
+++ b/sql/slave.h 2011-09-09 09:30:43 +0000
@@ -234,6 +234,12 @@ extern ulong opt_server_id_mask;
extern I_List<THD> threads;
+#ifndef MCP_WL5353
+#ifdef HAVE_NDB_BINLOG
+#define SLAVE_SILENT_RETRY_MSG "Slave transaction rollback requested"
+#endif
+#endif
+
#endif /* HAVE_REPLICATION */
/* masks for start/stop operations on io and sql slave threads */
=== modified file 'sql/sql_class.cc'
--- a/sql/sql_class.cc 2011-08-31 10:39:08 +0000
+++ b/sql/sql_class.cc 2011-09-09 09:30:43 +0000
@@ -746,6 +746,9 @@ THD::THD()
/* statement id */ 0),
rli_fake(0),
user_time(0), in_sub_stmt(0),
+#ifndef MCP_WL5353
+ binlog_row_event_extra_data(NULL),
+#endif
binlog_unsafe_warning_flags(0),
binlog_table_maps(0),
table_map_for_update(0),
@@ -1193,6 +1196,9 @@ void THD::init(void)
reset_current_stmt_binlog_format_row();
bzero((char *) &status_var, sizeof(status_var));
+#ifndef MCP_WL5353
+ binlog_row_event_extra_data = 0;
+#endif
if (variables.sql_log_bin)
variables.option_bits|= OPTION_BIN_LOG;
else
@@ -4467,7 +4473,13 @@ THD::binlog_prepare_pending_rows_event(T
pending->get_type_code() != type_code ||
pending->get_data_size() + needed > opt_binlog_rows_event_max_size ||
pending->get_width() != colcnt ||
- !bitmap_cmp(pending->get_cols(), cols))
+ !bitmap_cmp(pending->get_cols(), cols)
+#ifndef MCP_WL5353
+ ||
+ !binlog_row_event_extra_data_eq(pending->get_extra_row_data(),
+ binlog_row_event_extra_data)
+#endif
+ )
{
/* Create a new RowsEventT... */
Rows_log_event* const
@@ -4983,6 +4995,64 @@ int THD::binlog_query(THD::enum_binlog_q
DBUG_RETURN(0);
}
+#ifndef MCP_WL5353
+/**
+ get_binlog_row_event_extra_data_len
+
+ Returns the length in bytes of the current thread's
+ binlog row event extra data, if present.
+ The length is stored at some offset from the extra
+ data ptr.
+ Note that this length is the length of the whole extra
+ data structure, including the fixed length header
+ of size EXTRA_ROW_INFO_HDR_BYTES
+
+ @return
+ Length in bytes of the extra data.
+ Zero is valid. Maximum is 255
+*/
+uint8
+THD::get_binlog_row_event_extra_data_len() const
+{
+ return (binlog_row_event_extra_data?
+ binlog_row_event_extra_data[EXTRA_ROW_INFO_LEN_OFFSET]:
+ 0);
+};
+
+/**
+ binlog_row_event_extra_data_eq
+
+ Comparator for two binlog row event extra data
+ pointers.
+
+ It compares their significant bytes.
+
+ Null pointers are acceptable
+
+ @param a
+ first pointer
+
+ @param b
+ first pointer
+
+ @return
+ true if the referenced structures are equal
+*/
+bool
+THD::binlog_row_event_extra_data_eq(const uchar* a,
+ const uchar* b)
+{
+ return ((a == b) ||
+ ((a != NULL) &&
+ (b != NULL) &&
+ (a[EXTRA_ROW_INFO_LEN_OFFSET] ==
+ b[EXTRA_ROW_INFO_LEN_OFFSET]) &&
+ (memcmp(a, b,
+ a[EXTRA_ROW_INFO_LEN_OFFSET]) == 0)));
+}
+
+#endif // #ifndef MCP_WL5353
+
bool Discrete_intervals_list::append(ulonglong start, ulonglong val,
ulonglong incr)
{
=== modified file 'sql/sql_class.h'
--- a/sql/sql_class.h 2011-09-02 09:16:56 +0000
+++ b/sql/sql_class.h 2011-09-09 09:30:43 +0000
@@ -1590,6 +1590,18 @@ public:
/* container for handler's private per-connection data */
Ha_data ha_data[MAX_HA];
+#ifndef MCP_WL5353
+ /*
+ Ptr to row event extra data to be written to Binlog /
+ received from Binlog.
+
+ */
+ uchar* binlog_row_event_extra_data;
+ uint8 get_binlog_row_event_extra_data_len() const;
+ static bool binlog_row_event_extra_data_eq(const uchar* a,
+ const uchar* b);
+#endif
+
#ifndef MYSQL_CLIENT
int binlog_setup_trx_data();
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt 2011-09-02 09:16:56 +0000
+++ b/storage/ndb/CMakeLists.txt 2011-09-09 09:30:43 +0000
@@ -199,6 +199,7 @@ SET(NDBCLUSTER_SOURCES
../../sql/ndb_thd_ndb.cc
../../sql/ndb_global_schema_lock.cc
../../sql/ndb_mi.cc
+ ../../sql/ndb_conflict_trans.cc
)
INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
=== modified file 'storage/ndb/include/ndbapi/Ndb.hpp'
--- a/storage/ndb/include/ndbapi/Ndb.hpp 2011-09-02 09:16:56 +0000
+++ b/storage/ndb/include/ndbapi/Ndb.hpp 2011-09-09 09:30:43 +0000
@@ -1801,7 +1801,12 @@ public:
NonDataEventsRecvdCount = 19, /* Number of non-data events received */
EventBytesRecvdCount = 20, /* Number of bytes of event data received */
- NumClientStatistics = 21 /* End marker */
+ /* Adaptive Send */
+ ForcedSendsCount = 21, /* Number of sends with force-send set */
+ UnforcedSendsCount = 22, /* Number of sends without force-send */
+ DeferredSendsCount = 23, /* Number of adaptive send calls not actually sent */
+
+ NumClientStatistics = 24 /* End marker */
};
Uint64 getClientStat(Uint32 id) const;
=== modified file 'storage/ndb/include/ndbapi/ndb_cluster_connection.hpp'
--- a/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/ndbapi/ndb_cluster_connection.hpp 2011-09-09 09:30:43 +0000
@@ -180,6 +180,15 @@ public:
*/
Uint32 collect_client_stats(Uint64* statsArr, Uint32 sz);
+ /**
+ * Get/set the minimum time in milliseconds that can lapse until the adaptive
+ * send mechanism forces all pending signals to be sent.
+ * The default value is 10, and the allowed range is from 1 to 10.
+ */
+ void set_max_adaptive_send_time(Uint32 milliseconds);
+ Uint32 get_max_adaptive_send_time();
+
+
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
int get_no_ready();
const char *get_connectstring(char *buf, int buf_sz) const;
=== added file 'storage/ndb/include/util/HashMap2.hpp'
--- a/storage/ndb/include/util/HashMap2.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/HashMap2.hpp 2011-09-07 22:50:01 +0000
@@ -0,0 +1,488 @@
+/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#ifndef NDB_HASHMAP2_HPP
+#define NDB_HASHMAP2_HPP
+
+#include <ndb_global.h>
+
+
+/* Basic HashTable implementation
+ * The HashTable stores elements of type KV.
+ * The storage for the elements is managed outside the
+ * HashTable implementation.
+ * The HashTable uses element chaining in each bucket to
+ * deal with collisions.
+ * The HashTable can optionally enforce uniqueness
+ * The HashTable can be resized when it is empty.
+ */
+
+/**
+ * KVOPStaticAdapter template
+ *
+ * Used with HashMap2
+ * Creates a class with static methods calling members of
+ * an object passed to them, for the case where the HashMap
+ * contains types ptrs with the relevant methods defined
+ * (objects).
+ *
+ * Implements the KVOP Api, and requires the following API
+ * from the KV type :
+ *
+ * Useful for supplying OP type.
+ * Required KV Api:
+ * Uint32 hashValue() const;
+ * bool equal(const KV* other) const;
+ * void setNext(KV* next);
+ * KV* getNext() const;
+ */
+template<typename KV>
+class KVOPStaticAdapter
+{
+public:
+ static Uint32 hashValue(const KV* obj)
+ {
+ return obj->hashValue();
+ };
+
+ static bool equal(const KV* objA, const KV* objB)
+ {
+ return objA->equal(objB);
+ };
+
+ static void setNext(KV* from, KV* to)
+ {
+ return from->setNext(to);
+ };
+
+ static KV* getNext(const KV* from)
+ {
+ return from->getNext();
+ }
+};
+
+// TODO :
+// Pass allocator context rather than allocator
+// Support re-alloc?
+// Use calloc?
+
+/**
+ * StandardAllocator - used in HashMap2 when no allocator supplied
+ * Uses standard malloc/free.
+ */
+struct StandardAllocator
+{
+ static void* alloc(void* ignore, size_t bytes)
+ {
+ return ::malloc(bytes);
+ };
+
+ static void* calloc(void* ignore, size_t nelem, size_t bytes)
+ {
+ return ::calloc(nelem, bytes);
+ }
+
+ static void free(void* ignore, void* mem)
+ {
+ ::free(mem);
+ };
+};
+
+/**
+ * Template parameters
+ *
+ * Classes with static methods are used to avoid the necessity
+ * of using OO wrapper objects for C data.
+ * Objects can be used by defining static methods which call
+ * normal methods.
+ * A default StaticWrapper class exists to 'automate' this if
+ * necessary.
+ *
+ * class KV - Key Value pair.
+ * The HashTable stores pointers to these. No interface is
+ * assumed - they are manipulated via KVOP below, so can be
+ * chunks of memory or C structs etc.
+ *
+ * bool unique
+ * True if all keys in a hash table instance must be
+ * unique.
+ * False otherwise.
+ *
+ * class A - Allocator
+ * Used for hash bucket allocation on setSize() call.
+ * NOT used for element allocation, which is the responsibility
+ * of the user.
+ *
+ * Must support static methods :
+ * - static void* calloc(void* context, size_t nelem, size_t bytes)
+ * - static void free(void* context, void*)
+ *
+ * class KVOP - Operations on Key Value pair
+ * KV instances are stored based on the hash returned
+ * by the KVOP::hashValue() method, with identity based on the
+ * KVOP::equal() method.
+ * KV instances must be linkable using KVOP::getNext() and
+ * KVOP::setNext() methods.
+ *
+ * KVOP allows the static methods on the KV pair to be separate
+ * from the data itself. If they are in the same class, use
+ * KVOP=KV. If the methods are not static, and are on the KV class,
+ * use KVOP=KVOPStaticAdapter<KV>, or equivalent.
+ *
+ * KVOP must support the following static methods :
+ * - static bool equal(const class KV* a, const class KV* b);
+ * Return true if two elements are equal.
+ *
+ * - static Uint32 hashValue(const class KV*) const;
+ * Return a 32-bit stable hashvalue for the KV.
+ * equal(a,b) implies hashValue(a) == hashValue(b)
+ *
+ * - static void setNext(KV* from, KV* to)
+ *
+ * - static KV* getNext(const KV* from) const
+ *
+ *
+ * TODO :
+ * - collision count?
+ * - release option?
+ */
+template<typename KV,
+ bool unique = true,
+ typename A = StandardAllocator,
+ typename KVOP = KVOPStaticAdapter<KV> >
+class HashMap2
+{
+public:
+ /**
+ * HashMap2 constructor
+ * Pass an Allocator pointer if the templated allocator
+ * requires some context info.
+ * setSize() must be called before the HashMap is used.
+ */
+ HashMap2(void* _allocatorContext = NULL)
+ : tableSize(0),
+ elementCount(0),
+ allocatorContext(_allocatorContext),
+ table(NULL)
+ {
+ };
+
+ ~HashMap2()
+ {
+ if (table)
+ A::free(allocatorContext, table);
+ }
+
+ /**
+ * setSize
+ *
+ * Set the number of buckets.
+ * Can only be set when the hash table is empty.
+ * The Allocator is used to allocate/release bucket
+ * storage.
+ */
+ bool
+ setSize(Uint32 hashBuckets)
+ {
+ if (elementCount)
+ {
+ /* Can't set size while we have contents */
+ return false;
+ }
+
+ if (hashBuckets == 0)
+ {
+ return false;
+ }
+
+ if (table)
+ {
+ A::free(allocatorContext, table);
+ table = NULL;
+ }
+
+ /* TODO : Consider using only power-of-2 + bitmask instead of mod */
+ tableSize = hashBuckets;
+
+ table = (KV**) A::calloc(allocatorContext, hashBuckets, sizeof(KV*));
+
+ if (!table)
+ {
+ return false;
+ }
+
+ for (Uint32 i=0; i < tableSize; i++)
+ table[i] = NULL;
+
+ return true;
+ };
+
+ /**
+ * add
+ *
+ * Add a KV element to the hash table
+ * The next value must be null
+ * If the hash table requires uniqueness, and the
+ * element is not unique, false will be returned
+ */
+ bool
+ add(KV* keyVal)
+ {
+ assert(table);
+
+ Uint32 hashVal = rehash(KVOP::hashValue(keyVal));
+ Uint32 bucketIdx = hashVal % tableSize;
+
+ KV* bucket = table[bucketIdx];
+
+ if (bucket != NULL)
+ {
+ if (unique)
+ {
+ /* Need to check element is not already there, in this
+ * chain
+ */
+ const KV* chainElement = bucket;
+ while (chainElement)
+ {
+ if (KVOP::equal(keyVal, chainElement))
+ {
+ /* Found duplicate */
+ return false;
+ }
+ chainElement= KVOP::getNext(chainElement);
+ }
+ }
+
+ /* Can insert at head of list, as either no uniqueness
+ * guarantee, or uniqueness checked.
+ */
+ assert(KVOP::getNext(keyVal) == NULL);
+ KVOP::setNext(keyVal, bucket);
+ table[bucketIdx] = keyVal;
+ }
+ else
+ {
+ /* First element in bucket */
+ assert(KVOP::getNext(keyVal) == NULL);
+ KVOP::setNext(keyVal, NULL);
+ table[bucketIdx] = keyVal;
+ }
+
+ elementCount++;
+ return true;
+ }
+
+ KV*
+ remove(KV* key)
+ {
+ assert(table);
+
+ Uint32 hashVal = rehash(KVOP::hashValue(key));
+ Uint32 bucketIdx = hashVal % tableSize;
+
+ KV* bucket = table[bucketIdx];
+
+ if (bucket != NULL)
+ {
+ KV* chainElement = bucket;
+ KV* prev = NULL;
+ while (chainElement)
+ {
+ if (KVOP::equal(key, chainElement))
+ {
+ /* Found, repair bucket chain
+ * Get next, might be NULL
+ */
+ KV* n = KVOP::getNext(chainElement);
+ if (prev)
+ {
+ /* Link prev to next */
+ KVOP::setNext(prev, n);
+ }
+ else
+ {
+ /* Put next as first in bucket */
+ table[bucketIdx] = n;
+ }
+
+ KVOP::setNext(chainElement, NULL);
+ elementCount--;
+
+ return chainElement;
+ }
+ prev = chainElement;
+ chainElement = KVOP::getNext(chainElement);
+ }
+ }
+
+ return NULL;
+ }
+
+ /**
+ * get
+ *
+ * Get a ptr to a KV element in the hash table
+ * with the same key as the element passed.
+ * Returns NULL if none exists.
+ *
+ * For non unique hash tables, a ptr to the
+ * first element with the given key is returned.
+ * Further elements must be found by iteration
+ * (and key comparison), until NULL is returned.
+ */
+ KV*
+ get(const KV* key) const
+ {
+ assert(table);
+
+ Uint32 hashVal = rehash(KVOP::hashValue(key));
+ Uint32 bucketIdx = hashVal % tableSize;
+
+ KV* chainElement = table[bucketIdx];
+
+ while(chainElement)
+ {
+ if (KVOP::equal(key, chainElement))
+ {
+ break;
+ }
+ chainElement = KVOP::getNext(chainElement);
+ }
+
+ return chainElement;
+ };
+
+ /**
+ * reset
+ *
+ * Resets the hash table to have no entries.
+ * KV elements added are not released in any
+ * way. This must be handled outside the
+ * HashTable implementation, perhaps by
+ * iterating and removing the elements?
+ * Storage for the hash table itself is
+ * preserved
+ */
+ void
+ reset()
+ {
+ /* Zap the hashtable ptrs, without freeing the 'elements'
+ * Keep the storage allocated for the ptrs
+ */
+ if (elementCount)
+ {
+ assert(table);
+ for (Uint32 i=0; i < tableSize; i++)
+ {
+ table[i] = NULL;
+ }
+
+ elementCount = 0;
+ }
+ }
+
+ /**
+ * getElementCount
+ *
+ * Returns the number of elements currently
+ * stored in this hash table.
+ */
+ Uint32
+ getElementCount() const
+ {
+ return elementCount;
+ }
+
+ /**
+ * getTableSize
+ *
+ * Returns the number of hashBuckets in this hash table
+ */
+ Uint32
+ getTableSize() const
+ {
+ return tableSize;
+ }
+
+ class Iterator
+ {
+ public:
+ Iterator(HashMap2& hashMap)
+ : m_hash(&hashMap),
+ curr_element(NULL),
+ curr_bucket(0)
+ {}
+
+ /**
+ Return the current element and reposition the iterator to the next
+ element.
+ */
+ KV* next()
+ {
+ while (curr_bucket < m_hash->tableSize)
+ {
+ if (curr_element == NULL)
+ {
+ /* First this bucket */
+ curr_element = m_hash->table[ curr_bucket ];
+ }
+ else
+ {
+ /* Next this bucket */
+ curr_element = KVOP::getNext(curr_element);
+ }
+
+ if (curr_element)
+ {
+ return curr_element;
+ }
+ curr_bucket++;
+ }
+
+ return NULL;
+ }
+ void reset()
+ {
+ curr_element = NULL;
+ curr_bucket = 0;
+ }
+ private:
+ HashMap2* m_hash;
+ KV* curr_element;
+ Uint32 curr_bucket;
+ };
+
+private:
+ static Uint32 rehash(Uint32 userHash)
+ {
+ /* We rehash the supplied hash value in case
+ * it's low quality, mixing some higher order
+ * bits in with the lower bits
+ */
+ userHash ^= ((userHash >> 20) ^ (userHash >> 12));
+ return userHash ^ (userHash >> 7) ^ (userHash >> 4);
+ }
+
+ Uint32 tableSize;
+ Uint32 elementCount;
+ void* allocatorContext;
+
+ KV** table;
+
+}; // class HashMap2()
+
+#endif
=== added file 'storage/ndb/include/util/LinkedStack.hpp'
--- a/storage/ndb/include/util/LinkedStack.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/util/LinkedStack.hpp 2011-09-07 22:50:01 +0000
@@ -0,0 +1,216 @@
+/* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#ifndef NDB_LINKEDSTACK_HPP
+#define NDB_LINKEDSTACK_HPP
+
+#include <ndb_global.h>
+
+/**
+ * LinkedStack
+ *
+ * A templated class for a stack of elements E.
+ * Storage for the elements is allocated using the passed
+ * allocator.
+ * Push copies the supplied element into the stack
+ * Pop overwrites the supplied element with the contents
+ * of the top of the stack
+ * Internally, the class allocates 'blocks' of elements of
+ * the size passed, linking them together as necessary.
+ * As the stack shrinks, the blocks are not released.
+ * Blocks are returned to the allocator when release() is
+ * called.
+ * reset() empties the stack without releasing the allocated
+ * storage.
+ */
+template <typename E, typename A>
+class LinkedStack
+{
+private:
+ struct BlockHeader
+ {
+ BlockHeader* next;
+ BlockHeader* prev;
+ E* elements;
+ };
+
+ BlockHeader* allocBlock()
+ {
+ /* Alloc blockheader and element array */
+ BlockHeader* h = (BlockHeader*) A::alloc(allocatorContext,
+ sizeof(BlockHeader));
+ E* e = (E*) A::calloc(allocatorContext, blockElements, sizeof(E));
+
+ h->next = NULL;
+ h->prev = NULL;
+ h->elements = e;
+
+ return h;
+ }
+
+ bool valid()
+ {
+ if (stackTop)
+ {
+ assert(firstBlock != NULL);
+ assert(currBlock != NULL);
+ /* Check that currBlock is positioned on correct
+ * block, except for block boundary case
+ */
+ Uint32 blockNum = (stackTop - 1) / blockElements;
+ BlockHeader* bh = firstBlock;
+ while(blockNum--)
+ {
+ bh = bh->next;
+ }
+ assert(bh == currBlock);
+ }
+ else
+ {
+ assert(currBlock == NULL);
+ }
+ return true;
+ }
+
+ /* Note that stackTop is 'next insertion point' whereas
+ * currBlock points to block last inserted to.
+ * On block boundaries, they refer to different blocks
+ */
+ void* allocatorContext;
+ BlockHeader* firstBlock;
+ BlockHeader* currBlock;
+ Uint32 stackTop;
+ Uint32 blockElements;
+
+public:
+ LinkedStack(Uint32 _blockElements, void* _allocatorContext=NULL)
+ : allocatorContext(_allocatorContext),
+ firstBlock(NULL),
+ currBlock(NULL),
+ stackTop(0),
+ blockElements(_blockElements)
+ {
+ assert(blockElements > 0);
+ assert(valid());
+ }
+
+ ~LinkedStack()
+ {
+ assert(valid());
+ /* Release block storage if present */
+ release();
+ }
+
+ bool push(E& elem)
+ {
+ assert(valid());
+ Uint32 blockOffset = stackTop % blockElements;
+
+ if (blockOffset == 0)
+ {
+ /* On block boundary */
+ if (stackTop)
+ {
+ /* Some elements exist already */
+ if (!currBlock->next)
+ {
+ /* End of block list, alloc another */
+ BlockHeader* newBlock = allocBlock();
+ if (!newBlock)
+ return false;
+
+ currBlock->next = newBlock;
+ currBlock->next->prev = currBlock;
+ }
+ currBlock = currBlock->next;
+ }
+ else
+ {
+ /* First element */
+ if (!firstBlock)
+ {
+ BlockHeader* newBlock = allocBlock();
+ if (!newBlock)
+ return false;
+
+ firstBlock = currBlock = newBlock;
+ }
+ currBlock = firstBlock;
+ }
+ }
+
+ currBlock->elements[ blockOffset ] = elem;
+ stackTop++;
+
+ assert(valid());
+ return true;
+ }
+
+ bool pop(E& elem)
+ {
+ assert(valid());
+ if (stackTop)
+ {
+ stackTop--;
+ Uint32 blockOffset = stackTop % blockElements;
+ elem = currBlock->elements[ blockOffset ];
+
+ if (blockOffset == 0)
+ {
+ /* Block boundary, shift back to prev block. */
+ if (stackTop)
+ assert(currBlock->prev);
+
+ currBlock = currBlock->prev;
+ }
+
+ assert(valid());
+ return true;
+ }
+ return false;
+ }
+
+ Uint32 size() const
+ {
+ return stackTop;
+ }
+
+ void reset()
+ {
+ assert(valid());
+ stackTop = 0;
+ currBlock = NULL;
+ assert(valid());
+ };
+
+ void release()
+ {
+ assert(valid());
+ BlockHeader* h = firstBlock;
+ while (h)
+ {
+ BlockHeader* n = h->next;
+ A::free(allocatorContext, h->elements);
+ A::free(allocatorContext, h);
+ h = n;
+ };
+ stackTop = 0;
+ firstBlock = currBlock = NULL;
+ assert(valid());
+ }
+};
+
+#endif
=== modified file 'storage/ndb/src/common/portlib/NdbTCP.cpp'
--- a/storage/ndb/src/common/portlib/NdbTCP.cpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/portlib/NdbTCP.cpp 2011-09-07 16:53:24 +0000
@@ -25,6 +25,11 @@
#define INADDR_NONE -1 /* Error value from inet_addr */
#endif
+/* Return codes from getaddrinfo() */
+/* EAI_NODATA is obsolete and has been removed from some platforms */
+#ifndef EAI_NODATA
+#define EAI_NODATA EAI_NONAME
+#endif
extern "C"
int
=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt 2011-09-09 09:30:43 +0000
@@ -55,6 +55,8 @@ ADD_CONVENIENCE_LIBRARY(ndbgeneral
require.c
Vector.cpp
NdbPack.cpp
+ HashMap2.cpp
+ LinkedStack.cpp
)
TARGET_LINK_LIBRARIES(ndbgeneral ndbtrace ${ZLIB_LIBRARY} mysys)
@@ -88,3 +90,12 @@ SET_TARGET_PROPERTIES(NdbPack-t
PROPERTIES COMPILE_FLAGS "-DTEST_NDB_PACK")
TARGET_LINK_LIBRARIES(NdbPack-t ndbgeneral ndbportlib)
+ADD_EXECUTABLE(HashMap2-t HashMap2.cpp)
+SET_TARGET_PROPERTIES(HashMap2-t
+ PROPERTIES COMPILE_FLAGS "-DTEST_HASHMAP2")
+TARGET_LINK_LIBRARIES(HashMap2-t ndbgeneral)
+
+ADD_EXECUTABLE(LinkedStack-t LinkedStack.cpp)
+SET_TARGET_PROPERTIES(LinkedStack-t
+ PROPERTIES COMPILE_FLAGS "-DTEST_LINKEDSTACK")
+TARGET_LINK_LIBRARIES(LinkedStack-t ndbgeneral)
\ No newline at end of file
=== added file 'storage/ndb/src/common/util/HashMap2.cpp'
--- a/storage/ndb/src/common/util/HashMap2.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/HashMap2.cpp 2011-09-07 22:50:01 +0000
@@ -0,0 +1,407 @@
+/* Copyright (C) 2009 Sun Microsystems Inc.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <HashMap2.hpp>
+
+#ifdef TEST_HASHMAP2
+#include <NdbTap.hpp>
+
+struct TestHeapAllocator
+{
+ static const int DEBUG_ALLOC=0;
+ static void* alloc(void* ignore, size_t bytes)
+ {
+ void* p = ::malloc(bytes);
+ if (DEBUG_ALLOC)
+ {
+ printf("--Allocating %u bytes at %p\n",
+ (Uint32) bytes, p);
+ }
+ return p;
+ }
+
+ static void* calloc(void* ignore, size_t nelem, size_t bytes)
+ {
+ void* p = ::calloc(nelem, bytes);
+ if (DEBUG_ALLOC)
+ {
+ printf("--Allocating %u elements of %u bytes (%u total?) at %p\n",
+ (Uint32)nelem, (Uint32) bytes, (Uint32) (nelem*bytes), p);
+ }
+ return p;
+ }
+
+ static void free(void* ignore, void* mem)
+ {
+ if (DEBUG_ALLOC)
+ {
+ printf("--Freeing bytes at %p\n",mem);
+ }
+ ::free(mem);
+ }
+};
+
+struct IntIntKVPod
+{
+ int a;
+ int b;
+ IntIntKVPod* next;
+};
+
+struct IntIntKVStaticMethods
+{
+ static Uint32 hashValue(const IntIntKVPod* obj)
+ {
+ return obj->a*31;
+ }
+
+ static bool equal(const IntIntKVPod* objA, const IntIntKVPod* objB)
+ {
+ return objA->a == objB->a;
+ }
+
+ static void setNext(IntIntKVPod* from, IntIntKVPod* to)
+ {
+ from->next = to;
+ }
+
+ static IntIntKVPod* getNext(const IntIntKVPod* from)
+ {
+ return from->next;
+ }
+};
+
+struct IntIntKVObj
+{
+ int a;
+ int b;
+ IntIntKVObj* next;
+
+ Uint32 hashValue() const
+ {
+ return a*31;
+ }
+
+ bool equal(const IntIntKVObj* other) const
+ {
+ return a == other->a;
+ }
+
+ void setNext(IntIntKVObj* _next)
+ {
+ next = _next;
+ }
+
+ IntIntKVObj* getNext() const
+ {
+ return next;
+ }
+};
+
+TAPTEST(HashMap2)
+{
+ printf("int -> int (Static, unique) \n");
+ for (int j = 1; j < 150; j ++)
+ {
+ HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+
+ hash1.setSize(j);
+
+ OK(hash1.getElementCount() == 0);
+
+ /* Create some values in a pool on the stack */
+ IntIntKVPod pool[101];
+
+ for (int i=0; i < 100; i++)
+ {
+ pool[i].a = i;
+ pool[i].b = 3 * i;
+ pool[i].next = NULL;
+ }
+
+ /* Add the pool elements to the hash table */
+ for (int i=0; i < 100; i++)
+ {
+ OK(hash1.add(&pool[i]));
+ }
+
+ /* Now attempt to add a duplicate */
+ pool[100].a = 0;
+ pool[100].b = 999;
+ pool[100].next = NULL;
+
+ OK(hash1.getElementCount() == 100);
+
+ OK(! hash1.add(&pool[100]));
+
+ for (int i=1; i < 100; i++)
+ {
+ OK(hash1.get(&pool[i]) == &pool[i]);
+ }
+
+ OK(hash1.get(&pool[0]) == &pool[0]);
+
+ /* Test iterator Api */
+ HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods>::Iterator it(hash1);
+
+ IntIntKVPod* j;
+ for (int i=0; i < 2; i++)
+ {
+ int count = 0;
+ while((j = it.next()))
+ {
+ OK( j->b == ((j->a * 3) - i) );
+ j->b--;
+ count++;
+ }
+ OK( count == 100 );
+ it.reset();
+ }
+
+ hash1.reset();
+ it.reset();
+ OK( it.next() == NULL );
+ }
+
+ printf("int -> int (Static, !unique) \n");
+ for (int j = 1; j < 150; j ++)
+ {
+ HashMap2<IntIntKVPod, false, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+
+ hash1.setSize(j);
+
+ OK(hash1.getElementCount() == 0);
+
+ /* Create some values in a pool on the stack */
+ IntIntKVPod pool[101];
+
+ for (int i=0; i < 100; i++)
+ {
+ pool[i].a = i;
+ pool[i].b = 3 * i;
+ pool[i].next = NULL;
+ }
+
+ /* Add the pool elements to the hash table */
+ for (int i=0; i < 100; i++)
+ {
+ OK(hash1.add(&pool[i]));
+ }
+
+ /* Now attempt to add a duplicate */
+ pool[100].a = 0;
+ pool[100].b = 999;
+ pool[100].next = NULL;
+
+ OK(hash1.getElementCount() == 100);
+
+ OK( hash1.add(&pool[100]));
+
+ for (int i=1; i < 100; i++)
+ {
+ OK(hash1.get(&pool[i]) == &pool[i]);
+ }
+
+ OK((hash1.get(&pool[0]) == &pool[0]) ||
+ (hash1.get(&pool[0]) == &pool[100]));
+ }
+
+ printf("int -> int (!Static, defaults, (std alloc, unique)) \n");
+ for (int j = 1; j < 150; j ++)
+ {
+ HashMap2<IntIntKVObj> hash1;
+
+ hash1.setSize(j);
+
+ OK(hash1.getElementCount() == 0);
+
+ /* Create some values in a pool on the stack */
+ IntIntKVObj pool[101];
+
+ for (int i=0; i < 100; i++)
+ {
+ pool[i].a = i;
+ pool[i].b = 3 * i;
+ pool[i].next = NULL;
+ }
+
+ /* Add the pool elements to the hash table */
+ for (int i=0; i < 100; i++)
+ {
+ OK(hash1.add(&pool[i]));
+ }
+
+ /* Now attempt to add a duplicate */
+ pool[100].a = 0;
+ pool[100].b = 999;
+ pool[100].next = NULL;
+
+ OK(hash1.getElementCount() == 100);
+
+ OK(! hash1.add(&pool[100]));
+
+ for (int i=1; i < 100; i++)
+ {
+ OK(hash1.get(&pool[i]) == &pool[i]);
+ }
+
+ OK(hash1.get(&pool[0]) == &pool[0]);
+ }
+
+ printf("int -> int (Static, unique, realloc) \n");
+ {
+ HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+ for (int j = 1; j < 150; j ++)
+ {
+ hash1.setSize(150 - j);
+
+ OK(hash1.getElementCount() == 0);
+
+ /* Create some values in a pool on the stack */
+ IntIntKVPod pool[101];
+
+ for (int i=0; i < 100; i++)
+ {
+ pool[i].a = i;
+ pool[i].b = 3 * i;
+ pool[i].next = NULL;
+ }
+
+ /* Add the pool elements to the hash table */
+ for (int i=0; i < 100; i++)
+ {
+ OK(hash1.add(&pool[i]));
+ }
+
+ /* Now attempt to add a duplicate */
+ pool[100].a = 0;
+ pool[100].b = 999;
+ pool[100].next = NULL;
+
+ OK(hash1.getElementCount() == 100);
+
+ OK(! hash1.add(&pool[100]));
+
+ for (int i=1; i < 100; i++)
+ {
+ OK(hash1.get(&pool[i]) == &pool[i]);
+ }
+
+ OK(hash1.get(&pool[0]) == &pool[0]);
+
+ OK(!hash1.setSize(j+1));
+
+ hash1.reset();
+ }
+ }
+
+ printf("int -> int (Static, unique, realloc, remove) \n");
+ {
+ HashMap2<IntIntKVPod, true, TestHeapAllocator, IntIntKVStaticMethods> hash1;
+ for (int j = 1; j < 150; j ++)
+ {
+// hash1.setSize(150 - j);
+ hash1.setSize(j);
+
+ OK(hash1.getElementCount() == 0);
+
+ /* Create some values in a pool on the stack */
+ IntIntKVPod pool[101];
+
+ for (int i=0; i < 100; i++)
+ {
+ pool[i].a = i;
+ pool[i].b = 3 * i;
+ pool[i].next = NULL;
+ }
+
+ /* Add the pool elements to the hash table */
+ for (int i=0; i < 100; i++)
+ {
+ OK(hash1.add(&pool[i]));
+ }
+
+ /* Now attempt to add a duplicate */
+ pool[100].a = 0;
+ pool[100].b = 999;
+ pool[100].next = NULL;
+
+ OK(hash1.getElementCount() == 100);
+
+ OK(!hash1.add(&pool[100]));
+
+ for (int i=1; i < 100; i++)
+ {
+ OK(hash1.get(&pool[i]) == &pool[i]);
+ }
+
+ OK((hash1.get(&pool[0]) == &pool[0]) ||
+ (hash1.get(&pool[0]) == &pool[100]));
+
+ OK(!hash1.setSize(j+1));
+
+ /* Now replace elements with different ones */
+ IntIntKVPod pool2[100];
+ for (int i=0; i < 100; i++)
+ {
+ pool2[i].a = i;
+ pool2[i].b = 4 * i;
+ pool2[i].next = NULL;
+ }
+
+ for (int k=0; k < 4; k++)
+ {
+ for (int i=0; i< 100; i++)
+ {
+ if ((i % 4) == k)
+ {
+ OK(hash1.remove(&pool[i]) == &pool[i]);
+ };
+ };
+
+ OK(hash1.getElementCount() == 75);
+
+ for (int i=0; i< 100; i++)
+ {
+ if ((i % 4) == k)
+ {
+ OK(!hash1.get(&pool[i]));
+ }
+ };
+
+ for (int i=0; i< 100; i++)
+ {
+ if ((i % 4) == k)
+ {
+ OK(hash1.add(&pool2[i]));
+ }
+ }
+ OK(hash1.getElementCount() == 100);
+ }
+
+ for (int i=0; i< 100; i++)
+ {
+ OK(hash1.get(&pool2[i]) == &pool2[i]);
+ };
+
+ hash1.reset();
+ }
+ }
+
+ return 1; // OK
+}
+
+#endif
=== added file 'storage/ndb/src/common/util/LinkedStack.cpp'
--- a/storage/ndb/src/common/util/LinkedStack.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/util/LinkedStack.cpp 2011-09-07 22:50:01 +0000
@@ -0,0 +1,124 @@
+/* Copyright (C) 2009 Sun Microsystems Inc.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+
+#include <LinkedStack.hpp>
+
+#ifdef TEST_LINKEDSTACK
+#include <NdbTap.hpp>
+
+struct TestHeapAllocator
+{
+ static const int DEBUG_ALLOC=0;
+ static void* alloc(void* ignore, size_t bytes)
+ {
+ void* p = ::malloc(bytes);
+ if (DEBUG_ALLOC)
+ {
+ printf("--Allocating %u bytes at %p\n",
+ (Uint32) bytes, p);
+ }
+ return p;
+ }
+
+ static void* calloc(void* ignore, size_t nelem, size_t bytes)
+ {
+ void* p = ::calloc(nelem, bytes);
+ if (DEBUG_ALLOC)
+ {
+ printf("--Allocating %u elements of %u bytes (%u bytes?) at %p\n",
+ (Uint32) nelem, (Uint32) bytes, (Uint32) (nelem * bytes), p);
+ }
+ return p;
+ }
+
+ static void free(void* ignore, void* mem)
+ {
+ if (DEBUG_ALLOC)
+ {
+ printf("--Freeing bytes at %p\n",mem);
+ }
+ ::free(mem);
+ }
+};
+
+TAPTEST(LinkedStack)
+{
+ Uint32 popped;
+ Uint32 blockSize = 1;
+
+ for (Uint32 b=0; b < 10; b++)
+ {
+ LinkedStack<Uint32, TestHeapAllocator> testStack(blockSize);
+
+ for (Uint32 p=0; p<4; p++)
+ {
+ /* Pass 0 == alloc, Pass 1 == re-use, Pass 3 = Reset, Pass 4 = Release */
+ printf("LinkedBlockStack size %u, pass %u\n", blockSize, p);
+ Uint32 stackSize = 2033 * (p+1);
+
+ OK(testStack.size() == 0);
+ printf(" Pushing %u elements\n", stackSize);
+ for (Uint32 i=0; i < stackSize; i++)
+ {
+ /* Push items onto the stack */
+ OK(testStack.push(i) == true);
+ OK(testStack.size() == i+1);
+ OK(testStack.pop(popped) == true);
+ OK(popped == i);
+ OK(testStack.size() == i);
+ OK(testStack.push(i) == true);
+ };
+
+ switch(p)
+ {
+ case 0:
+ case 1:
+ {
+ printf(" Popping %u elements\n", stackSize);
+ for (Uint32 i=0; i < stackSize; i++)
+ {
+ /* Pop items off the stack */
+ OK(testStack.size() == stackSize - i);
+ OK(testStack.pop(popped) == true);
+ OK(popped == stackSize - (i+1));
+ }
+ break;
+ }
+ case 2:
+ {
+ printf(" Releasing stack\n");
+ testStack.release();
+ break;
+ }
+ case 3:
+ {
+ printf(" Resetting stack\n");
+ testStack.reset();
+ break;
+ }
+ }
+
+ OK(testStack.size() == 0);
+ OK(testStack.pop(popped) == false);
+ }
+ printf(" Destructing stack\n");
+ blockSize = (blockSize * 2)+1;
+ }
+
+ return 1; // OK
+}
+
+#endif
=== modified file 'storage/ndb/src/ndbapi/Ndb.cpp'
--- a/storage/ndb/src/ndbapi/Ndb.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/Ndb.cpp 2011-09-09 09:30:43 +0000
@@ -2290,7 +2290,10 @@ const char* ClientStatNames [] =
"TransLocalReadRowCount",
"DataEventsRecvdCount",
"NonDataEventsRecvdCount",
- "EventBytesRecvdCount"
+ "EventBytesRecvdCount",
+ "ForcedSendsCount",
+ "UnforcedSendsCount",
+ "DeferredSendsCount"
};
Uint64
=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp 2011-06-07 12:19:47 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2011-09-09 09:30:43 +0000
@@ -1247,7 +1247,13 @@ Ndb::sendPrepTrans(int forceSend)
insert_completed_list(a_con);
}//for
theNoOfPreparedTransactions = 0;
- theImpl->do_forceSend(forceSend);
+ int did_send = theImpl->do_forceSend(forceSend);
+ if(forceSend) {
+ theImpl->incClientStat(Ndb::ForcedSendsCount, 1);
+ }
+ else {
+ theImpl->incClientStat(did_send ? Ndb::UnforcedSendsCount : Ndb::DeferredSendsCount, 1);
+ }
return;
}//Ndb::sendPrepTrans()
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-06-23 06:59:40 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-09-09 09:30:43 +0000
@@ -425,6 +425,19 @@ TransporterFacade::doStop(){
DBUG_VOID_RETURN;
}
+void TransporterFacade::setSendThreadInterval(Uint32 ms)
+{
+ if(ms > 0 && ms <= 10)
+ {
+ sendThreadWaitMillisec = ms;
+ }
+}
+
+Uint32 TransporterFacade::getSendThreadInterval(void)
+{
+ return sendThreadWaitMillisec;
+}
+
extern "C"
void*
runSendRequest_C(void * me)
@@ -444,7 +457,7 @@ void TransporterFacade::threadMainSend(v
m_socket_server.startServer();
while(!theStopReceive) {
- NdbSleep_MilliSleep(10);
+ NdbSleep_MilliSleep(sendThreadWaitMillisec);
NdbMutex_Lock(theMutexPtr);
if (sendPerformedLastInterval == 0) {
theTransporterRegistry->performSend();
@@ -537,7 +550,8 @@ TransporterFacade::TransporterFacade(Glo
theSendThread(NULL),
theReceiveThread(NULL),
m_fragmented_signal_id(0),
- m_globalDictCache(cache)
+ m_globalDictCache(cache),
+ sendThreadWaitMillisec(10)
{
DBUG_ENTER("TransporterFacade::TransporterFacade");
theMutexPtr = NdbMutex_CreateWithName("TTFM");
@@ -815,7 +829,7 @@ void TransporterFacade::forceSend(Uint32
//-------------------------------------------------
// Improving API performance
//-------------------------------------------------
-void
+int
TransporterFacade::checkForceSend(Uint32 block_number) {
m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
//-------------------------------------------------
@@ -828,14 +842,16 @@ TransporterFacade::checkForceSend(Uint32
// time to increase so therefore we have to keep track of
// how the users are performing adaptively.
//-------------------------------------------------
-
- if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
+
+ int did_send = theTransporterRegistry->forceSendCheck(currentSendLimit);
+ if(did_send == 1) {
sendPerformedLastInterval = 1;
}
checkCounter--;
if (checkCounter < 0) {
calculateSendLimit();
}
+ return did_send;
}
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2011-06-23 06:59:40 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2011-09-09 09:30:43 +0000
@@ -71,6 +71,12 @@ public:
Uint32 get_active_ndb_objects() const;
+ /**
+ * Get/Set wait time in the send thread.
+ */
+ void setSendThreadInterval(Uint32 ms);
+ Uint32 getSendThreadInterval(void);
+
// Only sends to nodes which are alive
private:
int sendSignal(const NdbApiSignal * signal, NodeId nodeId);
@@ -126,7 +132,7 @@ public:
// Improving the API performance
void forceSend(Uint32 block_number);
- void checkForceSend(Uint32 block_number);
+ int checkForceSend(Uint32 block_number);
TransporterRegistry* get_registry() { return theTransporterRegistry;};
@@ -225,6 +231,7 @@ private:
// Declarations for the receive and send thread
int theStopReceive;
+ Uint32 sendThreadWaitMillisec;
void threadMainSend(void);
NdbThread* theSendThread;
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2011-09-09 09:30:43 +0000
@@ -981,5 +981,16 @@ Ndb_cluster_connection::collect_client_s
return relevant;
}
-template class Vector<Ndb_cluster_connection_impl::Node>;
+void
+Ndb_cluster_connection::set_max_adaptive_send_time(Uint32 milliseconds)
+{
+ m_impl.m_transporter_facade->setSendThreadInterval(milliseconds);
+}
+Uint32
+Ndb_cluster_connection::get_max_adaptive_send_time()
+{
+ return m_impl.m_transporter_facade->getSendThreadInterval();
+}
+
+template class Vector<Ndb_cluster_connection_impl::Node>;
=== modified file 'storage/ndb/src/ndbapi/trp_client.cpp'
--- a/storage/ndb/src/ndbapi/trp_client.cpp 2011-02-24 22:07:05 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.cpp 2011-09-08 06:22:07 +0000
@@ -101,17 +101,19 @@ trp_client::complete_poll()
m_facade->complete_poll(this);
}
-void
+int
trp_client::do_forceSend(int val)
{
+ int did_send = 1;
if (val == 0)
{
- m_facade->checkForceSend(m_blockNo);
+ did_send = m_facade->checkForceSend(m_blockNo);
}
else if (val == 1)
{
m_facade->forceSend(m_blockNo);
}
+ return did_send;
}
int
=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp 2011-02-24 22:07:05 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp 2011-09-08 06:22:07 +0000
@@ -44,7 +44,7 @@ public:
void complete_poll();
void wakeup();
- void do_forceSend(int val = 1);
+ int do_forceSend(int val = 1);
int raw_sendSignal(const NdbApiSignal*, Uint32 nodeId);
int raw_sendSignal(const NdbApiSignal*, Uint32 nodeId,
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster branch (frazer.clement:3468 to 3469) | Frazer Clement | 9 Sep |