3857 Ole John Aske 2012-03-23 [merge]
Merge mysql-5.5-cluster-7.2 ==> mysql-5.5-cluster-7.2-spj
added:
storage/ndb/src/kernel/vm/mt-lock.hpp
storage/ndb/src/kernel/vm/mt-send-t.cpp
modified:
mysql-test/suite/ndb/r/ndb_add_partition.result
mysql-test/suite/ndb/t/ndb_add_partition.test
mysql-test/suite/ndb/t/ndb_addnode.test
sql/ha_ndbcluster.cc
sql/handler.h
sql/sql_table.cc
storage/ndb/include/util/InputStream.hpp
storage/ndb/memcache/include/NdbInstance.h
storage/ndb/memcache/include/QueryPlan.h
storage/ndb/memcache/include/Queue.h
storage/ndb/memcache/scripts/pmpstack.awk
storage/ndb/memcache/src/NdbInstance.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/S_sched.cc
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/kernel/vm/CMakeLists.txt
storage/ndb/src/kernel/vm/mt.cpp
storage/ndb/test/run-test/conf-blade08.cnf
storage/ndb/test/run-test/conf-ndb07.cnf
storage/ndb/test/run-test/conf-tyr13.cnf
storage/ndb/test/run-test/daily-basic-tests.txt
3856 Ole John Aske 2012-03-22
SPJ: Implement native support of pushed queries being sorted-scan-scan query.
Until now we didn't allow a scan-scan query which required 'sorted' result
set to be pushed to the SPJ block. Instead we supressed the 'sorted' request
and instead dumped the resultset to a temporary file and 'filesorted' it.
(Explained as 'Using tempfile; Using filesort')
This has been commented as 'questionable as best' by Evgeny which is
doing the review if WL5940: 'Integrating pushed join with optimizer & handler interface'
So we want to change it.....
This fix implement native support of sorted scan-scan by setting the
parent-scan batchsize to '1' when a sorted-scan-scan request is
about to be sent to the SPJ block.
(Dependant child scan/lookups are retreived with 'normal' batchsize)
SPJ API has to NEXTREQ more result until all related child-scan result for
the single parent row has been retrieved - This will guarantee sorted results
as all child rows will be retrieved together with its related parent.
(at the cost of some overhead though).
modified:
mysql-test/suite/ndb/r/ndb_join_pushdown_default.result
sql/abstract_query_plan.cc
sql/abstract_query_plan.h
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.h
sql/ha_ndbcluster_push.cc
sql/ha_ndbcluster_push.h
sql/handler.h
sql/sql_select.cc
storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
storage/ndb/test/ndbapi/testSpj.cpp
storage/ndb/test/tools/spj_sanity_test.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_add_partition.result'
--- a/mysql-test/suite/ndb/r/ndb_add_partition.result 2011-05-12 11:31:21 +0000
+++ b/mysql-test/suite/ndb/r/ndb_add_partition.result 2012-03-21 15:42:47 +0000
@@ -24,6 +24,19 @@ ENGINE = NDB
STORAGE DISK
TABLESPACE ts1
partition by key(a);
+CREATE TABLE t3 (a int unsigned not null,
+b int unsigned not null,
+c int unsigned not null,
+primary key(a,b),
+unique (b))
+MAX_ROWS=50000000
+ENGINE = NDB;
+CREATE TABLE t4 (a int unsigned not null,
+b int unsigned not null,
+c int unsigned not null,
+primary key(a,b),
+unique (b))
+ENGINE = NDB;
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),
@@ -46,12 +59,20 @@ INSERT INTO t1 VALUES
(91,91,91),(92,92,92),(93,93,93),(94,94,94),(95,95,95),
(96,96,96),(97,97,97),(98,98,98),(99,99,99),(100,100,100);
insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
select count(*) from t1;
count(*)
100
select count(*) from t2;
count(*)
100
+select count(*) from t3;
+count(*)
+100
+select count(*) from t4;
+count(*)
+100
select * from t1 where a < 20;
a b c
1 1 1
@@ -142,24 +163,146 @@ a b c
select * from t2 where b = 50;
a b c
50 50 50
+select * from t3 where a < 20;
+a b c
+1 1 1
+10 10 10
+11 11 11
+12 12 12
+13 13 13
+14 14 14
+15 15 15
+16 16 16
+17 17 17
+18 18 18
+19 19 19
+2 2 2
+3 3 3
+4 4 4
+5 5 5
+6 6 6
+7 7 7
+8 8 8
+9 9 9
+select * from t3 where a = 20;
+a b c
+20 20 20
+select * from t3 where a = 30;
+a b c
+30 30 30
+select * from t3 where a = 40;
+a b c
+40 40 40
+select * from t3 where a = 50;
+a b c
+50 50 50
+select * from t3 where b = 20;
+a b c
+20 20 20
+select * from t3 where b = 30;
+a b c
+30 30 30
+select * from t3 where b = 40;
+a b c
+40 40 40
+select * from t3 where b = 50;
+a b c
+50 50 50
+select * from t4 where a < 20;
+a b c
+1 1 1
+10 10 10
+11 11 11
+12 12 12
+13 13 13
+14 14 14
+15 15 15
+16 16 16
+17 17 17
+18 18 18
+19 19 19
+2 2 2
+3 3 3
+4 4 4
+5 5 5
+6 6 6
+7 7 7
+8 8 8
+9 9 9
+select * from t4 where a = 20;
+a b c
+20 20 20
+select * from t4 where a = 30;
+a b c
+30 30 30
+select * from t4 where a = 40;
+a b c
+40 40 40
+select * from t4 where a = 50;
+a b c
+50 50 50
+select * from t4 where b = 20;
+a b c
+20 20 20
+select * from t4 where b = 30;
+a b c
+30 30 30
+select * from t4 where b = 40;
+a b c
+40 40 40
+select * from t4 where b = 50;
+a b c
+50 50 50
alter online table t1 reorganize partition;
alter online table t2 reorganize partition;
+Cannot use normal reorganize partition on t3 as it has explicit MAX_ROWS set
+alter online table t3 reorganize partition;
+ERROR 42000: This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+show warnings;
+Level Code Message
+Warning 1105 Cannot online REORGANIZE a table with Max_Rows set. Use ALTER TABLE ... MAX_ROWS=<new_val> or offline REORGANIZE to redistribute this table.
+Error 1235 This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+alter online table t3 max_rows=50000000;
+alter online table t4 reorganize partition;
+Check partitions added, expect 0 in all cases
partitions added to t1
t1_added
0
partitions added to t2
t2_added
0
+partitions added to t3
+t3_added
+0
+partitions added to t4
+t4_added
+0
alter online table t1 add partition partitions 1;
alter online table t2 add partition partitions 4;
-partitions added to t1
+alter online table t3 max_rows=100000000;
+alter online table t4 max_rows=100000000;
+ERROR 42000: This version of MySQL doesn't yet support 'alter online table t4 max_rows=100000000'
+partitions added to t1 (expect 1)
t1_added
1
-partitions added to t2
+partitions added to t2 (expect 4)
t2_added
4
+partitions added to t3 (expect 2)
+t3_added
+2
+partitions added to t4 (expect 0)
+t4_added
+0
alter online table t1 reorganize partition;
ERROR HY000: REORGANIZE PARTITION without parameters can only be used on auto-partitioned tables using HASH PARTITIONs
+alter online table t3 reorganize partition;
+ERROR 42000: This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+show warnings;
+Level Code Message
+Warning 1105 Cannot online REORGANIZE a table with Max_Rows set. Use ALTER TABLE ... MAX_ROWS=<new_val> or offline REORGANIZE to redistribute this table.
+Error 1235 This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+alter online table t4 reorganize partition;
select count(*) from t1;
count(*)
100
@@ -256,14 +399,109 @@ a b c
select * from t2 where b = 50;
a b c
50 50 50
+select * from t3 where a < 20;
+a b c
+1 1 1
+10 10 10
+11 11 11
+12 12 12
+13 13 13
+14 14 14
+15 15 15
+16 16 16
+17 17 17
+18 18 18
+19 19 19
+2 2 2
+3 3 3
+4 4 4
+5 5 5
+6 6 6
+7 7 7
+8 8 8
+9 9 9
+select * from t3 where a = 20;
+a b c
+20 20 20
+select * from t3 where a = 30;
+a b c
+30 30 30
+select * from t3 where a = 40;
+a b c
+40 40 40
+select * from t3 where a = 50;
+a b c
+50 50 50
+select * from t3 where b = 20;
+a b c
+20 20 20
+select * from t3 where b = 30;
+a b c
+30 30 30
+select * from t3 where b = 40;
+a b c
+40 40 40
+select * from t3 where b = 50;
+a b c
+50 50 50
+select * from t4 where a < 20;
+a b c
+1 1 1
+10 10 10
+11 11 11
+12 12 12
+13 13 13
+14 14 14
+15 15 15
+16 16 16
+17 17 17
+18 18 18
+19 19 19
+2 2 2
+3 3 3
+4 4 4
+5 5 5
+6 6 6
+7 7 7
+8 8 8
+9 9 9
+select * from t4 where a = 20;
+a b c
+20 20 20
+select * from t4 where a = 30;
+a b c
+30 30 30
+select * from t4 where a = 40;
+a b c
+40 40 40
+select * from t4 where a = 50;
+a b c
+50 50 50
+select * from t4 where b = 20;
+a b c
+20 20 20
+select * from t4 where b = 30;
+a b c
+30 30 30
+select * from t4 where b = 40;
+a b c
+40 40 40
+select * from t4 where b = 50;
+a b c
+50 50 50
+drop table t4;
alter online table t1 add partition partitions 2;
alter online table t2 add partition partitions 1;
-partitions added to t1
+alter online table t3 max_rows=150000000;
+partitions added to t1 (expect 3)
t1_added
3
-partitions added to t2
+partitions added to t2 (expect 5)
t2_added
5
+partitions added to t3 (expect 4)
+t3_added
+4
select count(*) from t1;
count(*)
100
@@ -360,7 +598,52 @@ a b c
select * from t2 where b = 50;
a b c
50 50 50
-drop table t1,t2;
+select * from t3 where a < 20;
+a b c
+1 1 1
+10 10 10
+11 11 11
+12 12 12
+13 13 13
+14 14 14
+15 15 15
+16 16 16
+17 17 17
+18 18 18
+19 19 19
+2 2 2
+3 3 3
+4 4 4
+5 5 5
+6 6 6
+7 7 7
+8 8 8
+9 9 9
+select * from t3 where a = 20;
+a b c
+20 20 20
+select * from t3 where a = 30;
+a b c
+30 30 30
+select * from t3 where a = 40;
+a b c
+40 40 40
+select * from t3 where a = 50;
+a b c
+50 50 50
+select * from t3 where b = 20;
+a b c
+20 20 20
+select * from t3 where b = 30;
+a b c
+30 30 30
+select * from t3 where b = 40;
+a b c
+40 40 40
+select * from t3 where b = 50;
+a b c
+50 50 50
+drop table t1,t2,t3;
alter tablespace ts1 drop datafile 'datafile.dat' engine = ndb;
drop tablespace ts1 engine = ndb;
drop logfile group lg1 engine = ndb;
=== modified file 'mysql-test/suite/ndb/t/ndb_add_partition.test'
--- a/mysql-test/suite/ndb/t/ndb_add_partition.test 2011-05-12 14:34:22 +0000
+++ b/mysql-test/suite/ndb/t/ndb_add_partition.test 2012-03-21 17:28:20 +0000
@@ -34,10 +34,29 @@ STORAGE DISK
TABLESPACE ts1
partition by key(a);
+CREATE TABLE t3 (a int unsigned not null,
+ b int unsigned not null,
+ c int unsigned not null,
+ primary key(a,b),
+ unique (b))
+MAX_ROWS=50000000
+ENGINE = NDB;
+
+CREATE TABLE t4 (a int unsigned not null,
+ b int unsigned not null,
+ c int unsigned not null,
+ primary key(a,b),
+ unique (b))
+ENGINE = NDB;
+
let $t1_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
let $t2_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
+let $t3_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
+let $t4_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1);
+
INSERT INTO t1 VALUES
(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),
@@ -61,8 +80,12 @@ INSERT INTO t1 VALUES
(96,96,96),(97,97,97),(98,98,98),(99,99,99),(100,100,100);
insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
select count(*) from t1;
select count(*) from t2;
+select count(*) from t3;
+select count(*) from t4;
--sorted_result
select * from t1 where a < 20;
--sorted_result
@@ -93,32 +116,89 @@ select * from t2 where b = 30;
select * from t2 where b = 40;
select * from t2 where b = 50;
+--sorted_result
+select * from t3 where a < 20;
+--sorted_result
+select * from t3 where a = 20;
+--sorted_result
+select * from t3 where a = 30;
+--sorted_result
+select * from t3 where a = 40;
+--sorted_result
+select * from t3 where a = 50;
+select * from t3 where b = 20;
+select * from t3 where b = 30;
+select * from t3 where b = 40;
+select * from t3 where b = 50;
+
+--sorted_result
+select * from t4 where a < 20;
+--sorted_result
+select * from t4 where a = 20;
+--sorted_result
+select * from t4 where a = 30;
+--sorted_result
+select * from t4 where a = 40;
+--sorted_result
+select * from t4 where a = 50;
+select * from t4 where b = 20;
+select * from t4 where b = 30;
+select * from t4 where b = 40;
+select * from t4 where b = 50;
+
alter online table t1 reorganize partition;
alter online table t2 reorganize partition;
+--echo Cannot use normal reorganize partition on t3 as it has explicit MAX_ROWS set
+--error 1235
+alter online table t3 reorganize partition;
+show warnings;
+
+alter online table t3 max_rows=50000000;
+alter online table t4 reorganize partition;
let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
+let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
+let $t4_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1);
+
--disable_query_log
+--echo Check partitions added, expect 0 in all cases
--echo partitions added to t1
eval select $t1_part_count_now - $t1_part_count_start as t1_added;
--echo partitions added to t2
eval select $t2_part_count_now - $t2_part_count_start as t2_added;
+--echo partitions added to t3
+eval select $t3_part_count_now - $t3_part_count_start as t3_added;
+--echo partitions added to t4
+eval select $t4_part_count_now - $t4_part_count_start as t4_added;
--enable_query_log
alter online table t1 add partition partitions 1;
alter online table t2 add partition partitions 4;
+alter online table t3 max_rows=100000000; # Expansion of max rows
+--error 1235
+alter online table t4 max_rows=100000000; # Attempted introduction of max rows - fails
let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
+let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
+let $t4_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1);
+
--disable_query_log
---echo partitions added to t1
+--echo partitions added to t1 (expect 1)
eval select $t1_part_count_now - $t1_part_count_start as t1_added;
---echo partitions added to t2
+--echo partitions added to t2 (expect 4)
eval select $t2_part_count_now - $t2_part_count_start as t2_added;
+--echo partitions added to t3 (expect 2)
+eval select $t3_part_count_now - $t3_part_count_start as t3_added;
+--echo partitions added to t4 (expect 0)
+eval select $t4_part_count_now - $t4_part_count_start as t4_added;
--enable_query_log
# reorganize partition not support if not default partitioning
@@ -126,6 +206,14 @@ eval select $t2_part_count_now - $t2_par
--error ER_REORG_NO_PARAM_ERROR
alter online table t1 reorganize partition;
+# Following will fail as t3 has an explicit MAX_ROWS set
+--error 1235
+alter online table t3 reorganize partition;
+show warnings;
+
+# t4 reorg will succeed as t4 has no explicit MAX_ROWS
+alter online table t4 reorganize partition;
+
select count(*) from t1;
select count(*) from t2;
--sorted_result
@@ -158,18 +246,55 @@ select * from t2 where b = 30;
select * from t2 where b = 40;
select * from t2 where b = 50;
+--sorted_result
+select * from t3 where a < 20;
+--sorted_result
+select * from t3 where a = 20;
+--sorted_result
+select * from t3 where a = 30;
+--sorted_result
+select * from t3 where a = 40;
+--sorted_result
+select * from t3 where a = 50;
+select * from t3 where b = 20;
+select * from t3 where b = 30;
+select * from t3 where b = 40;
+select * from t3 where b = 50;
+
+--sorted_result
+select * from t4 where a < 20;
+--sorted_result
+select * from t4 where a = 20;
+--sorted_result
+select * from t4 where a = 30;
+--sorted_result
+select * from t4 where a = 40;
+--sorted_result
+select * from t4 where a = 50;
+select * from t4 where b = 20;
+select * from t4 where b = 30;
+select * from t4 where b = 40;
+select * from t4 where b = 50;
+
+drop table t4;
+
alter online table t1 add partition partitions 2;
alter online table t2 add partition partitions 1;
+alter online table t3 max_rows=150000000;
let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
+let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
--disable_query_log
---echo partitions added to t1
+--echo partitions added to t1 (expect 3)
eval select $t1_part_count_now - $t1_part_count_start as t1_added;
---echo partitions added to t2
+--echo partitions added to t2 (expect 5)
eval select $t2_part_count_now - $t2_part_count_start as t2_added;
+--echo partitions added to t3 (expect 4)
+eval select $t3_part_count_now - $t3_part_count_start as t3_added;
--enable_query_log
select count(*) from t1;
@@ -204,7 +329,22 @@ select * from t2 where b = 30;
select * from t2 where b = 40;
select * from t2 where b = 50;
-drop table t1,t2;
+--sorted_result
+select * from t3 where a < 20;
+--sorted_result
+select * from t3 where a = 20;
+--sorted_result
+select * from t3 where a = 30;
+--sorted_result
+select * from t3 where a = 40;
+--sorted_result
+select * from t3 where a = 50;
+select * from t3 where b = 20;
+select * from t3 where b = 30;
+select * from t3 where b = 40;
+select * from t3 where b = 50;
+
+drop table t1,t2,t3;
alter tablespace ts1 drop datafile 'datafile.dat' engine = ndb;
drop tablespace ts1 engine = ndb;
drop logfile group lg1 engine = ndb;
=== modified file 'mysql-test/suite/ndb/t/ndb_addnode.test'
--- a/mysql-test/suite/ndb/t/ndb_addnode.test 2011-05-09 08:49:19 +0000
+++ b/mysql-test/suite/ndb/t/ndb_addnode.test 2012-03-21 17:28:20 +0000
@@ -20,9 +20,19 @@ CREATE TABLESPACE ts_1
create table t1(id int NOT NULL PRIMARY KEY, data char(8)) engine=ndb;
create table t2(id int NOT NULL PRIMARY KEY, data char(8))
TABLESPACE ts_1 STORAGE DISK engine=ndb;
+# BUG#13714648
+create table t5(id int NOT NULL PRIMARY KEY, data char(8)) max_rows=50000000 engine=ndb;
load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 fields terminated by ' ' lines terminated by '\n';
load data local infile 'suite/ndb/data/table_data10000.dat' into table t2 fields terminated by ' ' lines terminated by '\n';
+load data local infile 'suite/ndb/data/table_data10000.dat' into table t5 fields terminated by ' ' lines terminated by '\n';
+
+select count(1) as t1_part_count from information_schema.partitions where table_schema='test' and table_name='t1';
+select count(1) as t2_part_count from information_schema.partitions where table_schema='test' and table_name='t2';
+select @init_t5_part_count:= count(1) as t5_part_count from information_schema.partitions where table_schema='test' and table_name='t5';
+
+## Check details of t5 partitioning
+--exec $NDB_DESC -dtest -p -n t5
## Create nodegroup for "new" nodes
--exec $NDB_MGM -e "create nodegroup 3,4"
@@ -48,6 +58,23 @@ insert into t4(id, data) VALUES
alter online table t1 reorganize partition;
alter online table t2 reorganize partition;
+alter online table t5 max_rows=100000000;
+
+select count(1) as t1_part_count from information_schema.partitions where table_schema='test' and table_name='t1';
+select count(1) as t2_part_count from information_schema.partitions where table_schema='test' and table_name='t2';
+select count(1) as t3_part_count from information_schema.partitions where table_schema='test' and table_name='t3';
+select count(1) as t4_part_count from information_schema.partitions where table_schema='test' and table_name='t4';
+select @reorg_t5_part_count:= count(1) as t5_part_count from information_schema.partitions where table_schema='test' and table_name='t5';
+
+## Check details of t5 partitioning
+--exec $NDB_DESC -dtest -p -n t5
+
+--let $t5_part_diff=query_get_value('select @reorg_t5_part_count-@init_t5_part_count as Value',Value,1)
+
+if (!$t5_part_diff)
+{
+ --die Table t5 was not reorganised
+}
## Drop nodegroup with "new" nodes is not allowed with data one those nodes
# NOTE: --error=0 is due to return codes doesnt work on windoze
@@ -57,7 +84,7 @@ alter online table t2 reorganize partiti
## Nodegroup with "new" nodes still exist after dropping it as shown:
--exec $NDB_MGM -e show
-drop table t1,t2,t3,t4;
+drop table t1,t2,t3,t4,t5;
## Drop nodegroup with "new" nodes
--exec $NDB_MGM -e "drop nodegroup 1"
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc 2012-03-22 14:18:01 +0000
+++ b/sql/ha_ndbcluster.cc 2012-03-23 07:44:14 +0000
@@ -15580,7 +15580,9 @@ HA_ALTER_FLAGS supported_alter_operation
HA_COLUMN_FORMAT |
HA_ADD_PARTITION |
HA_ALTER_TABLE_REORG |
- HA_CHANGE_AUTOINCREMENT_VALUE;
+ HA_CHANGE_AUTOINCREMENT_VALUE |
+ HA_ALTER_MAX_ROWS
+;
}
int ha_ndbcluster::check_if_supported_alter(TABLE *altered_table,
@@ -15651,7 +15653,8 @@ int ha_ndbcluster::check_if_supported_al
if (alter_flags->is_set(HA_ADD_COLUMN) ||
alter_flags->is_set(HA_ADD_PARTITION) ||
- alter_flags->is_set(HA_ALTER_TABLE_REORG))
+ alter_flags->is_set(HA_ALTER_TABLE_REORG) ||
+ alter_flags->is_set(HA_ALTER_MAX_ROWS))
{
Ndb *ndb= get_ndb(thd);
NDBDICT *dict= ndb->getDictionary();
@@ -15711,6 +15714,19 @@ int ha_ndbcluster::check_if_supported_al
if (alter_flags->is_set(HA_ALTER_TABLE_REORG))
{
+ /*
+ Refuse if Max_rows has been used before...
+ Workaround is to use ALTER ONLINE TABLE <t> MAX_ROWS=<bigger>;
+ */
+ if (old_tab->getMaxRows() != 0)
+ {
+ push_warning(current_thd,
+ MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
+ "Cannot online REORGANIZE a table with Max_Rows set. "
+ "Use ALTER TABLE ... MAX_ROWS=<new_val> or offline REORGANIZE "
+ "to redistribute this table.");
+ DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
+ }
new_tab.setFragmentCount(0);
new_tab.setFragmentData(0, 0);
}
@@ -15719,6 +15735,27 @@ int ha_ndbcluster::check_if_supported_al
DBUG_PRINT("info", ("Adding partition (%u)", part_info->num_parts));
new_tab.setFragmentCount(part_info->num_parts);
}
+ if (alter_flags->is_set(HA_ALTER_MAX_ROWS))
+ {
+ ulonglong rows= create_info->max_rows;
+ uint no_fragments= get_no_fragments(rows);
+ uint reported_frags= no_fragments;
+ if (adjusted_frag_count(ndb, no_fragments, reported_frags))
+ {
+ push_warning(current_thd,
+ MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
+ "Ndb might have problems storing the max amount "
+ "of rows specified");
+ }
+ if (reported_frags < old_tab->getFragmentCount())
+ {
+ DBUG_PRINT("info", ("Online reduction in number of fragments not supported"));
+ DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
+ }
+ new_tab.setFragmentCount(reported_frags);
+ new_tab.setDefaultNoPartitionsFlag(false);
+ new_tab.setFragmentData(0, 0);
+ }
NDB_Modifiers table_modifiers(ndb_table_modifiers);
table_modifiers.parse(thd, "NDB_TABLE=", create_info->comment.str,
@@ -16037,7 +16074,9 @@ int ha_ndbcluster::alter_table_phase1(TH
}
}
- if (alter_flags->is_set(HA_ALTER_TABLE_REORG) || alter_flags->is_set(HA_ADD_PARTITION))
+ if (alter_flags->is_set(HA_ALTER_TABLE_REORG) ||
+ alter_flags->is_set(HA_ADD_PARTITION) ||
+ alter_flags->is_set(HA_ALTER_MAX_ROWS))
{
if (alter_flags->is_set(HA_ALTER_TABLE_REORG))
{
@@ -16049,7 +16088,29 @@ int ha_ndbcluster::alter_table_phase1(TH
partition_info *part_info= altered_table->part_info;
new_tab->setFragmentCount(part_info->num_parts);
}
-
+ else if (alter_flags->is_set(HA_ALTER_MAX_ROWS))
+ {
+ ulonglong rows= create_info->max_rows;
+ uint no_fragments= get_no_fragments(rows);
+ uint reported_frags= no_fragments;
+ if (adjusted_frag_count(ndb, no_fragments, reported_frags))
+ {
+ DBUG_ASSERT(false); /* Checked above */
+ }
+ if (reported_frags < old_tab->getFragmentCount())
+ {
+ DBUG_ASSERT(false);
+ DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
+ }
+ /* Note we don't set the ndb table's max_rows param, as that
+ * is considered a 'real' change
+ */
+ //new_tab->setMaxRows(create_info->max_rows);
+ new_tab->setFragmentCount(reported_frags);
+ new_tab->setDefaultNoPartitionsFlag(false);
+ new_tab->setFragmentData(0, 0);
+ }
+
int res= dict->prepareHashMap(*old_tab, *new_tab);
if (res == -1)
{
=== modified file 'sql/handler.h'
--- a/sql/handler.h 2012-03-22 14:18:01 +0000
+++ b/sql/handler.h 2012-03-23 07:44:14 +0000
@@ -58,7 +58,8 @@ class Alter_info;
/* Bits to show what an alter table will do */
#include <sql_bitmap.h>
-#define HA_MAX_ALTER_FLAGS 40
+#define HA_MAX_ALTER_FLAGS 41
+
typedef Bitmap<HA_MAX_ALTER_FLAGS> HA_ALTER_FLAGS;
#define HA_ADD_INDEX (0)
@@ -101,6 +102,7 @@ typedef Bitmap<HA_MAX_ALTER_FLAGS> HA_AL
#define HA_ALTER_STORAGE_ENGINE (37)
#define HA_RECREATE (38)
#define HA_ALTER_TABLE_REORG (39)
+#define HA_ALTER_MAX_ROWS (40)
/* Remember to increase HA_MAX_ALTER_FLAGS when adding more flags! */
/* Return values for check_if_supported_alter */
=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc 2012-02-14 08:00:53 +0000
+++ b/sql/sql_table.cc 2012-03-21 17:28:20 +0000
@@ -5079,6 +5079,8 @@ compare_tables(THD *thd,
*alter_flags|= HA_SET_DEFAULT_CHARACTER_SET;
if (alter_info->flags & ALTER_RECREATE)
*alter_flags|= HA_RECREATE;
+ if (create_info->used_fields & HA_CREATE_USED_MAX_ROWS)
+ *alter_flags|= HA_ALTER_MAX_ROWS;
/* TODO check for ADD/DROP FOREIGN KEY */
if (alter_info->flags & ALTER_FOREIGN_KEY)
*alter_flags|= HA_ALTER_FOREIGN_KEY;
=== modified file 'storage/ndb/include/util/InputStream.hpp'
--- a/storage/ndb/include/util/InputStream.hpp 2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/util/InputStream.hpp 2012-03-21 15:31:06 +0000
@@ -57,7 +57,7 @@ class SocketInputStream : public InputSt
bool m_startover;
bool m_timedout;
public:
- SocketInputStream(NDB_SOCKET_TYPE socket, unsigned read_timeout_ms = 60000);
+ SocketInputStream(NDB_SOCKET_TYPE socket, unsigned read_timeout_ms = 3000);
virtual ~SocketInputStream() {}
char* gets(char * buf, int bufLen);
bool timedout() { return m_timedout; }
=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h 2012-03-22 22:18:19 +0000
@@ -42,6 +42,7 @@ public:
/* Public Methods */
NdbInstance(Ndb_cluster_connection *, int);
~NdbInstance();
+ void non_blocking_close(NdbTransaction *);
void link_workitem(workitem *);
void unlink_workitem(workitem *);
=== modified file 'storage/ndb/memcache/include/QueryPlan.h'
--- a/storage/ndb/memcache/include/QueryPlan.h 2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/include/QueryPlan.h 2012-03-22 22:18:19 +0000
@@ -52,7 +52,7 @@ class QueryPlan {
~QueryPlan();
bool canHaveExternalValue() const;
bool shouldExternalizeValue(size_t length) const;
- bool canUseSimpleRead() const;
+ bool canUseCommittedRead() const;
Uint64 getAutoIncrement() const;
void debug_dump() const;
bool hasDataOnDisk() const;
@@ -103,7 +103,7 @@ inline bool QueryPlan::hasDataOnDisk() c
return has_disk_storage;
}
-inline bool QueryPlan::canUseSimpleRead() const {
+inline bool QueryPlan::canUseCommittedRead() const {
return(pk_access && (! extern_store) && (! spec->exp_column));
}
=== modified file 'storage/ndb/memcache/include/Queue.h'
--- a/storage/ndb/memcache/include/Queue.h 2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/include/Queue.h 2012-03-22 02:21:34 +0000
@@ -55,6 +55,7 @@ public: /* public instance variable */
public: /* public interface methods */
Queue(int maxnodes) {
/* Initialize the node allocation pool */
+ maxnodes += 1; // add one for a dummy node at the head.
assert(sizeof(Node) < CACHE_LINE_SIZE);
nodepool = (char *) calloc(maxnodes, CACHE_LINE_SIZE);
nodelist = 0;
=== modified file 'storage/ndb/memcache/scripts/pmpstack.awk'
--- a/storage/ndb/memcache/scripts/pmpstack.awk 2011-11-07 21:54:07 +0000
+++ b/storage/ndb/memcache/scripts/pmpstack.awk 2012-03-22 22:18:19 +0000
@@ -41,23 +41,35 @@ function label(name) {
### What is the thread doing?
### Patterns higher up in this file take precedence over lower ones
-$2 ~ /^epoll_wait,TransporterRegistry::po/ { label("epoll_wait_transporter_recv"); next }
-
-$2 ~ /^epoll_wait/ { label("epoll_wait"); next }
+/epoll_wait,TransporterRegistry::po/ { label("epoll_wait_transporter_recv"); next }
+
+/writev,TCP_Transporter::doSend/ { label("sending_to_ndb"); next }
+
+/recv,TCP_Transporter/ { label("tcp_recv_from_ndb"); next }
-$2 ~ /^writev,TCP_Transporter::doSend/ { label("sending_to_ndb"); next }
+/Ndb::closeTransaction/ { label("ndb_transaction_close"); next }
-$2 ~ /^recv,TCP_Transporter/ { label("tcp_recv_from_ndb"); next }
+/sendmsg,conn_mwrite/ { label("writing_to_client"); next }
-$2 ~ /^sendmsg,conn_mwrite/ { label("sending_to_client"); next }
+/recv,conn_read,event_handler/ { label("reading_from_client"); next }
+
+/poll_dispatch/ { label("poll_dispatch"); next }
/pthread_mutex_unlock/ { label("releasing_locks"); next }
+/_lock,pthread_cond_/ { label("getting_lock_for_condition_var"); next }
+
/pthread_mutex_lock,Ndb::sendPrepared/ { label("lock_Ndb_impl"); next }
/_mutex_lock/ && /TransporterFacade/ { label("lock_transporter_facade_mutex"); next }
/pthread_cond_timedwait/ && /ollNdb/ { label("wait_poll_ndb"); next }
+
+/::schedule/ && /pthread_cond_signal/ { label("Scheduler_signaling_cond_var") ; next }
+
+/pthread_rwlock_rdlock/ { label("acquiring_rwlock") ; next }
+
+/pthread_cond_[a-z]*wait/ { label("condition_variable_wait"); next }
/workqueue_consumer_wait/ { label("workqueue_idle_wait"); next }
@@ -67,6 +79,12 @@ $2 ~ /^sendmsg,conn_mwrite/
/Ndb::computeHash/ { label("ndb_compute_hash"); next }
+/workitem__initialize/ { label("workitem_initialize"); next }
+
+/worker_prepare_operation/ { label("worker_prepare_operation"); next }
+
+/^epoll_wait/ { label("epoll_wait"); next }
+
/sleep/ { label("sleep"); next }
@@ -74,26 +92,26 @@ $2 ~ /^sendmsg,conn_mwrite/
END {
for(i in event) if (i != "total")
- printf("%s\t%.2f%%\t%s\n",
+ printf("%s\t%.2f%% \t%s\n",
"Event", (event[i] / event["total"]) * 100, i)
printf("\n");
for(i in commit) if(i != "total")
- printf("%s\t%.2f%%\t%s\n",
+ printf("%s\t%.2f%% \t%s\n",
"Commit", (commit[i] / commit["total"]) * 100, i)
if(commit["total"]) printf("\n");
for(i in send) if(i != "total")
- printf("%s\t%.2f%%\t%s\n",
+ printf("%s\t%.2f%% \t%s\n",
"Send", (send[i] / send["total"]) * 100, i)
if(send["total"]) printf("\n");
for(i in poll) if(i != "total")
- printf("%s\t%.2f%%\t%s\n",
+ printf("%s\t%.2f%% \t%s\n",
"Poll", (poll[i] / poll["total"]) * 100, i)
if(poll["total"]) printf("\n");
for(i in x) if(i != "total")
- printf("%s\t%.2f%%\t%s\n",
+ printf("%s\t%.2f%% \t%s\n",
"Unidentified", (x[i] / x["total"]) * 100, i)
}
=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc 2011-09-30 17:04:30 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc 2012-03-22 22:18:19 +0000
@@ -48,3 +48,13 @@ NdbInstance::~NdbInstance() {
}
+void NdbInstance::non_blocking_close(NdbTransaction *tx) {
+ Uint64 nwaits_pre, nwaits_post;
+ nwaits_pre = db->getClientStat(Ndb::WaitExecCompleteCount);
+
+ tx->close();
+
+ nwaits_post = db->getClientStat(Ndb::WaitExecCompleteCount);
+ assert(nwaits_pre == nwaits_post);
+}
+
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2012-03-15 03:21:12 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2012-03-22 22:18:19 +0000
@@ -460,8 +460,8 @@ op_status_t WorkerStep1::do_read() {
NdbOperation::LockMode lockmode;
NdbTransaction::ExecType commitflag;
- if(plan->canUseSimpleRead()) {
- lockmode = NdbOperation::LM_SimpleRead;
+ if(plan->canUseCommittedRead()) {
+ lockmode = NdbOperation::LM_CommittedRead;
commitflag = NdbTransaction::Commit;
}
else {
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-22 19:24:32 +0000
@@ -538,7 +538,7 @@ S::WorkerConnection::WorkerConnection(Sc
/* Build the freelist */
freelist = 0;
- int my_ndb_inst = conn->nInst / global->options.n_worker_threads;
+ int my_ndb_inst = conn->nInst / conn->n_workers;
for(int j = 0 ; j < my_ndb_inst ; j++ ) {
NdbInstance *inst = new NdbInstance(conn->conn, 2);
inst->id = ((id.thd + 1) * 10000) + j + 1;
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2011-12-10 19:02:03 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2012-03-21 17:28:20 +0000
@@ -74,7 +74,7 @@ Transporter::Transporter(TransporterRegi
checksumUsed = _checksum;
signalIdUsed = _signalId;
- m_timeOutMillis = 30000;
+ m_timeOutMillis = 3000;
m_connect_address.s_addr= 0;
if(s_port<0)
=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-03-21 12:31:02 +0000
@@ -78,3 +78,9 @@ FOREACH(testprog CountingPool DynArr256)
TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror
ndbtransport ndbmgmcommon ndbmgmapi ndbportlib ndbgeneral)
ENDFOREACH(testprog)
+
+IF(NDB_BUILD_NDBMTD)
+ ADD_EXECUTABLE(mt-send-t mt-send-t.cpp)
+ TARGET_LINK_LIBRARIES(mt-send-t ndbtest ndbkernel ndbsched ndberror
+ ndbtransport ndbmgmcommon ndbmgmapi ndbportlib ndbgeneral)
+ENDIF()
=== added file 'storage/ndb/src/kernel/vm/mt-lock.hpp'
--- a/storage/ndb/src/kernel/vm/mt-lock.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-lock.hpp 2012-03-20 13:23:29 +0000
@@ -0,0 +1,160 @@
+/* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef MT_LOCK_HPP
+#define MT_LOCK_HPP
+
+#include <ndb_global.h>
+#include "mt-asm.h"
+#include <NdbMutex.h>
+
+struct mt_lock_stat
+{
+ const void * m_ptr;
+ char * m_name;
+ Uint32 m_contended_count;
+ Uint32 m_spin_count;
+};
+
+static void register_lock(const void * ptr, const char * name);
+static mt_lock_stat * lookup_lock(const void * ptr);
+
+#ifdef NDB_HAVE_XCNG
+template <unsigned SZ>
+struct thr_spin_lock
+{
+ thr_spin_lock(const char * name = 0)
+ {
+ m_lock = 0;
+ register_lock(this, name);
+ }
+
+ union {
+ volatile Uint32 m_lock;
+ char pad[SZ];
+ };
+};
+
+static
+ATTRIBUTE_NOINLINE
+void
+lock_slow(void * sl, volatile unsigned * val)
+{
+ mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
+
+loop:
+ Uint32 spins = 0;
+ do {
+ spins++;
+ cpu_pause();
+ } while (* val == 1);
+
+ if (unlikely(xcng(val, 1) != 0))
+ goto loop;
+
+ if (s)
+ {
+ s->m_spin_count += spins;
+ Uint32 count = ++s->m_contended_count;
+ Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
+
+ if ((count % freq) == 0)
+ printf("%s waiting for lock, contentions: %u spins: %u\n",
+ s->m_name, count, s->m_spin_count);
+ }
+}
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_spin_lock<SZ>* sl)
+{
+ volatile unsigned* val = &sl->m_lock;
+ if (likely(xcng(val, 1) == 0))
+ return;
+
+ lock_slow(sl, val);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_spin_lock<SZ>* sl)
+{
+ /**
+ * Memory barrier here, to make sure all of our stores are visible before
+ * the lock release is.
+ */
+ mb();
+ sl->m_lock = 0;
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_spin_lock<SZ>* sl)
+{
+ volatile unsigned* val = &sl->m_lock;
+ return xcng(val, 1);
+}
+#else
+#define thr_spin_lock thr_mutex
+#endif
+
+template <unsigned SZ>
+struct thr_mutex
+{
+ thr_mutex(const char * name = 0) {
+ NdbMutex_Init(&m_mutex);
+ register_lock(this, name);
+ }
+
+ union {
+ NdbMutex m_mutex;
+ char pad[SZ];
+ };
+};
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_mutex<SZ>* sl)
+{
+ NdbMutex_Lock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_mutex<SZ>* sl)
+{
+ NdbMutex_Unlock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_mutex<SZ> * sl)
+{
+ return NdbMutex_Trylock(&sl->m_mutex);
+}
+
+#endif
=== added file 'storage/ndb/src/kernel/vm/mt-send-t.cpp'
--- a/storage/ndb/src/kernel/vm/mt-send-t.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-21 14:07:51 +0000
@@ -0,0 +1,554 @@
+#include "mt-asm.h"
+#include "mt-lock.hpp"
+#include <NdbTick.h>
+#include <NdbMutex.h>
+#include <NdbThread.h>
+#include <NdbCondition.h>
+#include <NdbTap.hpp>
+#include <Bitmask.hpp>
+#include <ndb_rand.h>
+
+#define BUGGY_VERSION 0
+
+/**
+ * DO_SYSCALL inside critical section
+ * (the equivalent of writev(socket)
+ */
+#define DO_SYSCALL 1
+
+/**
+ * This is a unit test of the send code for mt.cpp
+ * specifically the code that manages which thread will send
+ * (write gathering)
+ *
+ * Each thread is a producer of Signals
+ * Each signal has a destination remote node (transporter)
+ * Each thread will after having produced a set of signals
+ * check if it should send them on socket.
+ * If it decides that it should, it consumes all the signals
+ * produced by all threads.
+ *
+ * In this unit test, we don't send signals, but the producing part
+ * will only be to increment a counter.
+ *
+ ******************************************************************
+ *
+ * To use this program seriously...
+ *
+ * you should set BUGGY_VERSION to 1
+ * and experiment with values on cnt_*
+ * until you find a variant which crashes (abort)
+ *
+ * The values compiled-in makes it crash on a single socket
+ * Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz release compiled!
+ * (i never managed to get it debug compiled)
+ */
+#define MAX_THREADS 256
+#define MAX_TRANSPORTERS 256
+
+/**
+ * global variables
+ */
+static unsigned cnt_threads = 64;
+static unsigned cnt_transporters = 8;
+
+/**
+ * outer loops
+ * start/stop threads
+ */
+static unsigned cnt_seconds = 180;
+
+/**
+ * no of signals produced before calling consume
+ */
+static unsigned cnt_signals_before_consume = 4;
+
+/**
+ * no of signals produced in one inner loop
+ */
+static unsigned cnt_signals_per_inner_loop = 4;
+
+/**
+ * no inner loops per outer loop
+ *
+ * after each inner loop
+ * threads will be stalled and result verified
+ */
+static unsigned cnt_inner_loops = 5000;
+
+/**
+ * pct of do_send that are using forceSend()
+ */
+static unsigned pct_force = 15;
+
+typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask;
+
+struct Producer
+{
+ Producer() {
+ bzero(val, sizeof(val));
+ pendingcount = 0;
+ }
+
+ void init() {}
+
+ /**
+ * values produced...
+ */
+ unsigned val[MAX_TRANSPORTERS];
+
+ /**
+ * mask/array to keep track of which transporter we have produce values to
+ */
+ TransporterMask pendingmask;
+ unsigned pendingcount;
+ unsigned char pendinglist[MAX_TRANSPORTERS];
+
+ /**
+ * produce a value
+ *
+ * This is the equivalent of mt_send_remote()
+ */
+ void produce(unsigned D);
+
+ /**
+ * consume values (from all threads)
+ * for transporters that we have produced a value to
+ *
+ * This is the equivalent to do_send and if force=true
+ * this is the equivalent of forceSend()
+ */
+ void consume(bool force);
+};
+
+struct Thread
+{
+ Thread() { thread= 0; }
+
+ void init() { p.init(); }
+
+ NdbThread * thread;
+ Producer p;
+};
+
+/**
+ * This is the consumer of values for *one* transporter
+ */
+struct Consumer
+{
+ Consumer() {
+ m_force_send = 0; bzero(val, sizeof(val));
+ }
+
+ void init() {}
+
+ struct thr_spin_lock<8> m_send_lock;
+ unsigned m_force_send;
+ unsigned val[MAX_THREADS];
+
+ /**
+ * consume values from all threads to this transporter
+ */
+ void consume(unsigned D);
+
+ /**
+ * force_consume
+ */
+ void forceConsume(unsigned D);
+};
+
+struct Consumer_pad
+{
+ Consumer c;
+ char pad[NDB_CL_PADSZ(sizeof(Consumer))];
+};
+
+struct Thread_pad
+{
+ Thread t;
+ char pad[NDB_CL_PADSZ(sizeof(Thread))];
+};
+
+/**
+ * Thread repository
+ * and an instance of it
+ */
+static
+struct Rep
+{
+ Thread_pad t[MAX_THREADS];
+ Consumer_pad c[MAX_TRANSPORTERS];
+
+ /**
+ * This menthod is called when all threads are stalled
+ * so it's safe to read values without locks
+ */
+ void validate() {
+ for (unsigned ic = 0; ic < cnt_transporters; ic++)
+ {
+ for (unsigned it = 0; it < cnt_threads; it++)
+ {
+ if (c[ic].c.val[it] != t[it].t.p.val[ic])
+ {
+ printf("Detected bug!!!\n");
+ printf("ic: %u it: %u c[ic].c.val[it]: %u t[it].t.p.val[ic]: %u\n",
+ ic, it, c[ic].c.val[it], t[it].t.p.val[ic]);
+ abort();
+ }
+ }
+ }
+ }
+
+ void init() {
+ for (unsigned i = 0; i < cnt_threads; i++)
+ t[i].t.init();
+
+ for (unsigned i = 0; i < cnt_transporters; i++)
+ c[i].c.init();
+ }
+} rep;
+
+static
+struct Test
+{
+ Test() {
+ waiting_start = 0;
+ waiting_stop = 0;
+ mutex = 0;
+ cond = 0;
+ }
+
+ void init() {
+ mutex = NdbMutex_Create();
+ cond = NdbCondition_Create();
+ }
+
+ unsigned waiting_start;
+ unsigned waiting_stop;
+ NdbMutex* mutex;
+ NdbCondition* cond;
+
+ void wait_started();
+ void wait_completed();
+} test;
+
+void*
+thread_main(void * _t)
+{
+ unsigned seed =
+ (unsigned)NdbTick_CurrentNanosecond() +
+ (unsigned)(unsigned long long)_t;
+
+ Thread * self = (Thread*) _t;
+ for (unsigned i = 0; i < cnt_inner_loops; i++)
+ {
+ test.wait_started();
+ for (unsigned j = 0; j < cnt_signals_per_inner_loop;)
+ {
+ for (unsigned k = 0; k < cnt_signals_before_consume; k++)
+ {
+ /**
+ * Produce a signal to destination D
+ */
+ unsigned D = unsigned(ndb_rand_r(&seed)) % cnt_transporters;
+ self->p.produce(D);
+ }
+
+ j += cnt_signals_before_consume;
+
+ /**
+ * This is the equivalent of do_send()
+ */
+ bool force = unsigned(ndb_rand_r(&seed) % 100) < pct_force;
+ self->p.consume(force);
+ }
+ test.wait_completed();
+ }
+ return 0;
+}
+
+static
+bool
+match(const char * arg, const char * val, unsigned * valptr)
+{
+ if (strncmp(arg, val, strlen(val)) == 0)
+ {
+ * valptr = atoi(arg + strlen(val));
+ return true;
+ }
+ return false;
+}
+
+int
+main(int argc, char ** argv)
+{
+ plan(1);
+ ndb_init();
+ test.init();
+ rep.init();
+
+ if (argc == 1)
+ {
+ printf("No arguments supplied...\n"
+ "assuming we're being run from MTR or similar.\n"
+ "decreasing loop counts to ridiculously small values...\n");
+ cnt_seconds = 10;
+ cnt_inner_loops = 3000;
+ cnt_threads = 4;
+ }
+ else
+ {
+ printf("Arguments supplied...\n");
+ for (int i = 1; i < argc; i++)
+ {
+ if (match(argv[i], "cnt_seconds=", &cnt_seconds))
+ continue;
+ else if (match(argv[i], "cnt_threads=", &cnt_threads))
+ continue;
+ else if (match(argv[i], "cnt_transporters=", &cnt_transporters))
+ continue;
+ else if (match(argv[i], "cnt_inner_loops=", &cnt_inner_loops))
+ continue;
+ else if (match(argv[i], "cnt_signals_before_consume=",
+ &cnt_signals_before_consume))
+ continue;
+ else if (match(argv[i], "cnt_signals_per_inner_loop=",
+ &cnt_signals_per_inner_loop))
+ continue;
+ else if (match(argv[i], "pct_force=",
+ &pct_force))
+ continue;
+ else
+ {
+ printf("ignoreing unknown argument: %s\n", argv[i]);
+ }
+ }
+ }
+
+ printf("%s"
+ " cnt_seconds=%u"
+ " cnt_threads=%u"
+ " cnt_transporters=%u"
+ " cnt_inner_loops=%u"
+ " cnt_signals_before_consume=%u"
+ " cnt_signals_per_inner_loop=%u"
+ " pct_force=%u"
+ "\n",
+ argv[0],
+ cnt_seconds,
+ cnt_threads,
+ cnt_transporters,
+ cnt_inner_loops,
+ cnt_signals_before_consume,
+ cnt_signals_per_inner_loop,
+ pct_force);
+
+ Uint32 loop = 0;
+ Uint64 start = NdbTick_CurrentMillisecond() / 1000;
+ while (start + cnt_seconds > (NdbTick_CurrentMillisecond() / 1000))
+ {
+ printf("%u ", loop++); fflush(stdout);
+ if ((loop < 100 && (loop % 25) == 0) ||
+ (loop >= 100 && (loop % 20) == 0))
+ printf("\n");
+
+ for (unsigned t = 0; t < cnt_threads; t++)
+ {
+ rep.t[t].t.thread = NdbThread_Create(thread_main,
+ (void**)&rep.t[t].t,
+ 1024*1024,
+ "execute thread",
+ NDB_THREAD_PRIO_MEAN);
+ }
+
+ for (unsigned t = 0; t < cnt_threads; t++)
+ {
+ void * ret;
+ NdbThread_WaitFor(rep.t[t].t.thread, &ret);
+ }
+ }
+ printf("\n"); fflush(stdout);
+
+ ok(true, "ok");
+ return 0;
+}
+
+inline
+void
+Producer::produce(unsigned D)
+{
+ if (!pendingmask.get(D))
+ {
+ pendingmask.set(D);
+ pendinglist[pendingcount] = D;
+ pendingcount++;
+ }
+ val[D]++;
+}
+
+inline
+void
+Producer::consume(bool force)
+{
+ unsigned count = pendingcount;
+ pendingmask.clear();
+ pendingcount = 0;
+
+ for (unsigned i = 0; i < count; i++)
+ {
+ unsigned D = pendinglist[i];
+ if (force)
+ rep.c[D].c.forceConsume(D);
+ else
+ rep.c[D].c.consume(D);
+ }
+}
+
+inline
+void
+Consumer::consume(unsigned D)
+{
+ /**
+ * This is the equivalent of do_send(must_send = 1)
+ */
+ m_force_send = 1;
+
+ do
+ {
+ if (trylock(&m_send_lock) != 0)
+ {
+ /* Other thread will send for us as we set m_force_send. */
+ return;
+ }
+
+ /**
+ * Now clear the flag, and start sending all data available to this node.
+ *
+ * Put a memory barrier here, so that if another thread tries to grab
+ * the send lock but fails due to us holding it here, we either
+ * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
+ * 2) We clear here the flag just set by the other thread, but then we
+ * will (thanks to mb()) be able to see and send all of the data already
+ * in the first send iteration.
+ */
+ m_force_send = 0;
+ mb();
+
+ /**
+ * This is the equivalent of link_thread_send_buffers
+ */
+ for (unsigned i = 0; i < cnt_threads; i++)
+ {
+ val[i] = rep.t[i].t.p.val[D];
+ }
+
+ /**
+ * Do a syscall...which could have affect on barriers...etc
+ */
+ if (DO_SYSCALL)
+ {
+ NdbTick_CurrentMillisecond();
+ }
+
+ unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+ mb();
+#endif
+ }
+ while (m_force_send != 0);
+}
+
+inline
+void
+Consumer::forceConsume(unsigned D)
+{
+ /**
+ * This is the equivalent of forceSend()
+ */
+
+ do
+ {
+ /**
+ * NOTE: since we unconditionally lock m_send_lock
+ * we don't need a mb() after the clearing of m_force_send here.
+ */
+ m_force_send = 0;
+
+ lock(&m_send_lock);
+
+ /**
+ * This is the equivalent of link_thread_send_buffers
+ */
+ for (unsigned i = 0; i < cnt_threads; i++)
+ {
+ val[i] = rep.t[i].t.p.val[D];
+ }
+
+ /**
+ * Do a syscall...which could have affect on barriers...etc
+ */
+ if (DO_SYSCALL)
+ {
+ NdbTick_CurrentMillisecond();
+ }
+
+ unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+ mb();
+#endif
+ }
+ while (m_force_send != 0);
+}
+
+void
+Test::wait_started()
+{
+ NdbMutex_Lock(mutex);
+ if (waiting_start + 1 == cnt_threads)
+ {
+ waiting_stop = 0;
+ }
+ waiting_start++;
+ assert(waiting_start <= cnt_threads);
+ while (waiting_start < cnt_threads)
+ NdbCondition_Wait(cond, mutex);
+
+ NdbCondition_Broadcast(cond);
+ NdbMutex_Unlock(mutex);
+}
+
+void
+Test::wait_completed()
+{
+ NdbMutex_Lock(mutex);
+ if (waiting_stop + 1 == cnt_threads)
+ {
+ rep.validate();
+ waiting_start = 0;
+ }
+ waiting_stop++;
+ assert(waiting_stop <= cnt_threads);
+ while (waiting_stop < cnt_threads)
+ NdbCondition_Wait(cond, mutex);
+
+ NdbCondition_Broadcast(cond);
+ NdbMutex_Unlock(mutex);
+}
+
+static
+void
+register_lock(const void * ptr, const char * name)
+{
+ return;
+}
+
+static
+mt_lock_stat *
+lookup_lock(const void * ptr)
+{
+ return 0;
+}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-03-21 12:31:02 +0000
@@ -36,6 +36,7 @@
#include <portlib/ndb_prefetch.h>
#include "mt-asm.h"
+#include "mt-lock.hpp"
inline
SimulatedBlock*
@@ -96,16 +97,6 @@ static Uint32 first_receiver_thread_no =
/* max signal is 32 words, 7 for signal header and 25 datawords */
#define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
-struct mt_lock_stat
-{
- const void * m_ptr;
- char * m_name;
- Uint32 m_contended_count;
- Uint32 m_spin_count;
-};
-static void register_lock(const void * ptr, const char * name);
-static mt_lock_stat * lookup_lock(const void * ptr);
-
#if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
#define USE_FUTEX
#endif
@@ -274,132 +265,6 @@ wakeup(struct thr_wait* wait)
#endif
-#ifdef NDB_HAVE_XCNG
-template <unsigned SZ>
-struct thr_spin_lock
-{
- thr_spin_lock(const char * name = 0)
- {
- m_lock = 0;
- register_lock(this, name);
- }
-
- union {
- volatile Uint32 m_lock;
- char pad[SZ];
- };
-};
-
-static
-ATTRIBUTE_NOINLINE
-void
-lock_slow(void * sl, volatile unsigned * val)
-{
- mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
-
-loop:
- Uint32 spins = 0;
- do {
- spins++;
- cpu_pause();
- } while (* val == 1);
-
- if (unlikely(xcng(val, 1) != 0))
- goto loop;
-
- if (s)
- {
- s->m_spin_count += spins;
- Uint32 count = ++s->m_contended_count;
- Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
-
- if ((count % freq) == 0)
- printf("%s waiting for lock, contentions: %u spins: %u\n",
- s->m_name, count, s->m_spin_count);
- }
-}
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_spin_lock<SZ>* sl)
-{
- volatile unsigned* val = &sl->m_lock;
- if (likely(xcng(val, 1) == 0))
- return;
-
- lock_slow(sl, val);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_spin_lock<SZ>* sl)
-{
- /**
- * Memory barrier here, to make sure all of our stores are visible before
- * the lock release is.
- */
- mb();
- sl->m_lock = 0;
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_spin_lock<SZ>* sl)
-{
- volatile unsigned* val = &sl->m_lock;
- return xcng(val, 1);
-}
-#else
-#define thr_spin_lock thr_mutex
-#endif
-
-template <unsigned SZ>
-struct thr_mutex
-{
- thr_mutex(const char * name = 0) {
- NdbMutex_Init(&m_mutex);
- register_lock(this, name);
- }
-
- union {
- NdbMutex m_mutex;
- char pad[SZ];
- };
-};
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_mutex<SZ>* sl)
-{
- NdbMutex_Lock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_mutex<SZ>* sl)
-{
- NdbMutex_Unlock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_mutex<SZ> * sl)
-{
- return NdbMutex_Trylock(&sl->m_mutex);
-}
-
/**
* thr_safe_pool
*/
@@ -2719,6 +2584,13 @@ pack_send_buffer(thr_data *selfptr, Uint
* After having locked/unlock m_send_lock
* "protocol" dictates that we must check the m_force_send
*/
+
+ /**
+ * We need a memory barrier here to prevent race between clearing lock
+ * and reading of m_force_send.
+ * CPU can reorder the load to before the clear of the lock
+ */
+ mb();
if (sb->m_force_send)
{
try_send(selfptr, node);
@@ -2787,7 +2659,13 @@ mt_send_handle::forceSend(NodeId nodeId)
do
{
+ /**
+ * NOTE: we don't need a memory barrier after clearing
+ * m_force_send here as we unconditionally lock m_send_lock
+ * hence there is no way that our data can be "unsent"
+ */
sb->m_force_send = 0;
+
lock(&sb->m_send_lock);
sb->m_send_thread = selfptr->m_thr_no;
globalTransporterRegistry.performSend(nodeId);
@@ -2799,6 +2677,12 @@ mt_send_handle::forceSend(NodeId nodeId)
*/
selfptr->m_send_buffer_pool.release_global(rep->m_mm,
RG_TRANSPORTER_BUFFERS);
+ /**
+ * We need a memory barrier here to prevent race between clearing lock
+ * and reading of m_force_send.
+ * CPU can reorder the load to before the clear of the lock
+ */
+ mb();
} while (sb->m_force_send);
return true;
@@ -2821,6 +2705,16 @@ try_send(thr_data * selfptr, Uint32 node
return;
}
+ /**
+ * Now clear the flag, and start sending all data available to this node.
+ *
+ * Put a memory barrier here, so that if another thread tries to grab
+ * the send lock but fails due to us holding it here, we either
+ * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
+ * 2) We clear here the flag just set by the other thread, but then we
+ * will (thanks to mb()) be able to see and send all of the data already
+ * in the first send iteration.
+ */
sb->m_force_send = 0;
mb();
@@ -2834,6 +2728,13 @@ try_send(thr_data * selfptr, Uint32 node
*/
selfptr->m_send_buffer_pool.release_global(rep->m_mm,
RG_TRANSPORTER_BUFFERS);
+
+ /**
+ * We need a memory barrier here to prevent race between clearing lock
+ * and reading of m_force_send.
+ * CPU can reorder the load to before the clear of the lock
+ */
+ mb();
} while (sb->m_force_send);
}
@@ -2966,6 +2867,13 @@ do_send(struct thr_data* selfptr, bool m
{
register_pending_send(selfptr, node);
}
+
+ /**
+ * We need a memory barrier here to prevent race between clearing lock
+ * and reading of m_force_send.
+ * CPU can reorder the load to before the clear of the lock
+ */
+ mb();
if (sb->m_force_send)
{
/**
=== modified file 'storage/ndb/test/run-test/conf-blade08.cnf'
--- a/storage/ndb/test/run-test/conf-blade08.cnf 2011-05-19 17:47:28 +0000
+++ b/storage/ndb/test/run-test/conf-blade08.cnf 2012-03-21 14:43:07 +0000
@@ -26,6 +26,7 @@ FragmentLogFileSize = 64M
CompressedLCP=1
CompressedBackup=1
ODirect=1
+MaxNoOfAttributes=2000
SharedGlobalMemory=256M
InitialLogfileGroup=undo_buffer_size=64M;undofile01.dat:256M;undofile02.dat:128M
=== modified file 'storage/ndb/test/run-test/conf-ndb07.cnf'
--- a/storage/ndb/test/run-test/conf-ndb07.cnf 2011-05-19 18:19:47 +0000
+++ b/storage/ndb/test/run-test/conf-ndb07.cnf 2012-03-21 14:43:07 +0000
@@ -31,6 +31,7 @@ MaxNoOfSavedMessages= 5
NoOfFragmentLogFiles = 8
FragmentLogFileSize = 64M
ODirect=1
+MaxNoOfAttributes=2000
SharedGlobalMemory=256M
DiskPageBufferMemory=256M
=== modified file 'storage/ndb/test/run-test/conf-tyr13.cnf'
--- a/storage/ndb/test/run-test/conf-tyr13.cnf 2011-12-09 15:11:21 +0000
+++ b/storage/ndb/test/run-test/conf-tyr13.cnf 2012-03-21 14:43:07 +0000
@@ -34,6 +34,7 @@ FragmentLogFileSize = 64M
ODirect=1
MaxNoOfExecutionThreads=8
SendBufferMemory=4M
+MaxNoOfAttributes=2000
SharedGlobalMemory=256M
DiskPageBufferMemory=256M
=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt 2012-02-23 15:41:31 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2012-03-21 17:28:20 +0000
@@ -946,7 +946,7 @@ max-time: 500
cmd: testNdbApi
args: -n Bug44065
-max-time: 500
+max-time: 1000
cmd: testNdbApi
args: -n Bug44065_org
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2-spj branch (ole.john.aske:3856 to 3857) | Ole John Aske | 23 Mar |