From: Ole John Aske Date: March 23 2012 7:44am Subject: bzr push into mysql-5.5-cluster-7.2-spj branch (ole.john.aske:3856 to 3857) List-Archive: http://lists.mysql.com/commits/143293 Message-Id: <20120323074453.88D92244@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3857 Ole John Aske 2012-03-23 [merge] Merge mysql-5.5-cluster-7.2 ==> mysql-5.5-cluster-7.2-spj added: storage/ndb/src/kernel/vm/mt-lock.hpp storage/ndb/src/kernel/vm/mt-send-t.cpp modified: mysql-test/suite/ndb/r/ndb_add_partition.result mysql-test/suite/ndb/t/ndb_add_partition.test mysql-test/suite/ndb/t/ndb_addnode.test sql/ha_ndbcluster.cc sql/handler.h sql/sql_table.cc storage/ndb/include/util/InputStream.hpp storage/ndb/memcache/include/NdbInstance.h storage/ndb/memcache/include/QueryPlan.h storage/ndb/memcache/include/Queue.h storage/ndb/memcache/scripts/pmpstack.awk storage/ndb/memcache/src/NdbInstance.cc storage/ndb/memcache/src/ndb_worker.cc storage/ndb/memcache/src/schedulers/S_sched.cc storage/ndb/src/common/transporter/Transporter.cpp storage/ndb/src/kernel/vm/CMakeLists.txt storage/ndb/src/kernel/vm/mt.cpp storage/ndb/test/run-test/conf-blade08.cnf storage/ndb/test/run-test/conf-ndb07.cnf storage/ndb/test/run-test/conf-tyr13.cnf storage/ndb/test/run-test/daily-basic-tests.txt 3856 Ole John Aske 2012-03-22 SPJ: Implement native support of pushed queries being sorted-scan-scan query. Until now we didn't allow a scan-scan query which required 'sorted' result set to be pushed to the SPJ block. Instead we supressed the 'sorted' request and instead dumped the resultset to a temporary file and 'filesorted' it. (Explained as 'Using tempfile; Using filesort') This has been commented as 'questionable as best' by Evgeny which is doing the review if WL5940: 'Integrating pushed join with optimizer & handler interface' So we want to change it..... This fix implement native support of sorted scan-scan by setting the parent-scan batchsize to '1' when a sorted-scan-scan request is about to be sent to the SPJ block. (Dependant child scan/lookups are retreived with 'normal' batchsize) SPJ API has to NEXTREQ more result until all related child-scan result for the single parent row has been retrieved - This will guarantee sorted results as all child rows will be retrieved together with its related parent. (at the cost of some overhead though). modified: mysql-test/suite/ndb/r/ndb_join_pushdown_default.result sql/abstract_query_plan.cc sql/abstract_query_plan.h sql/ha_ndbcluster.cc sql/ha_ndbcluster.h sql/ha_ndbcluster_push.cc sql/ha_ndbcluster_push.h sql/handler.h sql/sql_select.cc storage/ndb/src/ndbapi/NdbQueryBuilder.cpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/test/ndbapi/testSpj.cpp storage/ndb/test/tools/spj_sanity_test.cpp === modified file 'mysql-test/suite/ndb/r/ndb_add_partition.result' --- a/mysql-test/suite/ndb/r/ndb_add_partition.result 2011-05-12 11:31:21 +0000 +++ b/mysql-test/suite/ndb/r/ndb_add_partition.result 2012-03-21 15:42:47 +0000 @@ -24,6 +24,19 @@ ENGINE = NDB STORAGE DISK TABLESPACE ts1 partition by key(a); +CREATE TABLE t3 (a int unsigned not null, +b int unsigned not null, +c int unsigned not null, +primary key(a,b), +unique (b)) +MAX_ROWS=50000000 +ENGINE = NDB; +CREATE TABLE t4 (a int unsigned not null, +b int unsigned not null, +c int unsigned not null, +primary key(a,b), +unique (b)) +ENGINE = NDB; INSERT INTO t1 VALUES (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5), (6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10), @@ -46,12 +59,20 @@ INSERT INTO t1 VALUES (91,91,91),(92,92,92),(93,93,93),(94,94,94),(95,95,95), (96,96,96),(97,97,97),(98,98,98),(99,99,99),(100,100,100); insert into t2 select * from t1; +insert into t3 select * from t1; +insert into t4 select * from t1; select count(*) from t1; count(*) 100 select count(*) from t2; count(*) 100 +select count(*) from t3; +count(*) +100 +select count(*) from t4; +count(*) +100 select * from t1 where a < 20; a b c 1 1 1 @@ -142,24 +163,146 @@ a b c select * from t2 where b = 50; a b c 50 50 50 +select * from t3 where a < 20; +a b c +1 1 1 +10 10 10 +11 11 11 +12 12 12 +13 13 13 +14 14 14 +15 15 15 +16 16 16 +17 17 17 +18 18 18 +19 19 19 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 +9 9 9 +select * from t3 where a = 20; +a b c +20 20 20 +select * from t3 where a = 30; +a b c +30 30 30 +select * from t3 where a = 40; +a b c +40 40 40 +select * from t3 where a = 50; +a b c +50 50 50 +select * from t3 where b = 20; +a b c +20 20 20 +select * from t3 where b = 30; +a b c +30 30 30 +select * from t3 where b = 40; +a b c +40 40 40 +select * from t3 where b = 50; +a b c +50 50 50 +select * from t4 where a < 20; +a b c +1 1 1 +10 10 10 +11 11 11 +12 12 12 +13 13 13 +14 14 14 +15 15 15 +16 16 16 +17 17 17 +18 18 18 +19 19 19 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 +9 9 9 +select * from t4 where a = 20; +a b c +20 20 20 +select * from t4 where a = 30; +a b c +30 30 30 +select * from t4 where a = 40; +a b c +40 40 40 +select * from t4 where a = 50; +a b c +50 50 50 +select * from t4 where b = 20; +a b c +20 20 20 +select * from t4 where b = 30; +a b c +30 30 30 +select * from t4 where b = 40; +a b c +40 40 40 +select * from t4 where b = 50; +a b c +50 50 50 alter online table t1 reorganize partition; alter online table t2 reorganize partition; +Cannot use normal reorganize partition on t3 as it has explicit MAX_ROWS set +alter online table t3 reorganize partition; +ERROR 42000: This version of MySQL doesn't yet support 'alter online table t3 reorganize partition' +show warnings; +Level Code Message +Warning 1105 Cannot online REORGANIZE a table with Max_Rows set. Use ALTER TABLE ... MAX_ROWS= or offline REORGANIZE to redistribute this table. +Error 1235 This version of MySQL doesn't yet support 'alter online table t3 reorganize partition' +alter online table t3 max_rows=50000000; +alter online table t4 reorganize partition; +Check partitions added, expect 0 in all cases partitions added to t1 t1_added 0 partitions added to t2 t2_added 0 +partitions added to t3 +t3_added +0 +partitions added to t4 +t4_added +0 alter online table t1 add partition partitions 1; alter online table t2 add partition partitions 4; -partitions added to t1 +alter online table t3 max_rows=100000000; +alter online table t4 max_rows=100000000; +ERROR 42000: This version of MySQL doesn't yet support 'alter online table t4 max_rows=100000000' +partitions added to t1 (expect 1) t1_added 1 -partitions added to t2 +partitions added to t2 (expect 4) t2_added 4 +partitions added to t3 (expect 2) +t3_added +2 +partitions added to t4 (expect 0) +t4_added +0 alter online table t1 reorganize partition; ERROR HY000: REORGANIZE PARTITION without parameters can only be used on auto-partitioned tables using HASH PARTITIONs +alter online table t3 reorganize partition; +ERROR 42000: This version of MySQL doesn't yet support 'alter online table t3 reorganize partition' +show warnings; +Level Code Message +Warning 1105 Cannot online REORGANIZE a table with Max_Rows set. Use ALTER TABLE ... MAX_ROWS= or offline REORGANIZE to redistribute this table. +Error 1235 This version of MySQL doesn't yet support 'alter online table t3 reorganize partition' +alter online table t4 reorganize partition; select count(*) from t1; count(*) 100 @@ -256,14 +399,109 @@ a b c select * from t2 where b = 50; a b c 50 50 50 +select * from t3 where a < 20; +a b c +1 1 1 +10 10 10 +11 11 11 +12 12 12 +13 13 13 +14 14 14 +15 15 15 +16 16 16 +17 17 17 +18 18 18 +19 19 19 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 +9 9 9 +select * from t3 where a = 20; +a b c +20 20 20 +select * from t3 where a = 30; +a b c +30 30 30 +select * from t3 where a = 40; +a b c +40 40 40 +select * from t3 where a = 50; +a b c +50 50 50 +select * from t3 where b = 20; +a b c +20 20 20 +select * from t3 where b = 30; +a b c +30 30 30 +select * from t3 where b = 40; +a b c +40 40 40 +select * from t3 where b = 50; +a b c +50 50 50 +select * from t4 where a < 20; +a b c +1 1 1 +10 10 10 +11 11 11 +12 12 12 +13 13 13 +14 14 14 +15 15 15 +16 16 16 +17 17 17 +18 18 18 +19 19 19 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 +9 9 9 +select * from t4 where a = 20; +a b c +20 20 20 +select * from t4 where a = 30; +a b c +30 30 30 +select * from t4 where a = 40; +a b c +40 40 40 +select * from t4 where a = 50; +a b c +50 50 50 +select * from t4 where b = 20; +a b c +20 20 20 +select * from t4 where b = 30; +a b c +30 30 30 +select * from t4 where b = 40; +a b c +40 40 40 +select * from t4 where b = 50; +a b c +50 50 50 +drop table t4; alter online table t1 add partition partitions 2; alter online table t2 add partition partitions 1; -partitions added to t1 +alter online table t3 max_rows=150000000; +partitions added to t1 (expect 3) t1_added 3 -partitions added to t2 +partitions added to t2 (expect 5) t2_added 5 +partitions added to t3 (expect 4) +t3_added +4 select count(*) from t1; count(*) 100 @@ -360,7 +598,52 @@ a b c select * from t2 where b = 50; a b c 50 50 50 -drop table t1,t2; +select * from t3 where a < 20; +a b c +1 1 1 +10 10 10 +11 11 11 +12 12 12 +13 13 13 +14 14 14 +15 15 15 +16 16 16 +17 17 17 +18 18 18 +19 19 19 +2 2 2 +3 3 3 +4 4 4 +5 5 5 +6 6 6 +7 7 7 +8 8 8 +9 9 9 +select * from t3 where a = 20; +a b c +20 20 20 +select * from t3 where a = 30; +a b c +30 30 30 +select * from t3 where a = 40; +a b c +40 40 40 +select * from t3 where a = 50; +a b c +50 50 50 +select * from t3 where b = 20; +a b c +20 20 20 +select * from t3 where b = 30; +a b c +30 30 30 +select * from t3 where b = 40; +a b c +40 40 40 +select * from t3 where b = 50; +a b c +50 50 50 +drop table t1,t2,t3; alter tablespace ts1 drop datafile 'datafile.dat' engine = ndb; drop tablespace ts1 engine = ndb; drop logfile group lg1 engine = ndb; === modified file 'mysql-test/suite/ndb/t/ndb_add_partition.test' --- a/mysql-test/suite/ndb/t/ndb_add_partition.test 2011-05-12 14:34:22 +0000 +++ b/mysql-test/suite/ndb/t/ndb_add_partition.test 2012-03-21 17:28:20 +0000 @@ -34,10 +34,29 @@ STORAGE DISK TABLESPACE ts1 partition by key(a); +CREATE TABLE t3 (a int unsigned not null, + b int unsigned not null, + c int unsigned not null, + primary key(a,b), + unique (b)) +MAX_ROWS=50000000 +ENGINE = NDB; + +CREATE TABLE t4 (a int unsigned not null, + b int unsigned not null, + c int unsigned not null, + primary key(a,b), + unique (b)) +ENGINE = NDB; + let $t1_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1); let $t2_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1); +let $t3_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1); + +let $t4_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1); + INSERT INTO t1 VALUES (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5), (6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10), @@ -61,8 +80,12 @@ INSERT INTO t1 VALUES (96,96,96),(97,97,97),(98,98,98),(99,99,99),(100,100,100); insert into t2 select * from t1; +insert into t3 select * from t1; +insert into t4 select * from t1; select count(*) from t1; select count(*) from t2; +select count(*) from t3; +select count(*) from t4; --sorted_result select * from t1 where a < 20; --sorted_result @@ -93,32 +116,89 @@ select * from t2 where b = 30; select * from t2 where b = 40; select * from t2 where b = 50; +--sorted_result +select * from t3 where a < 20; +--sorted_result +select * from t3 where a = 20; +--sorted_result +select * from t3 where a = 30; +--sorted_result +select * from t3 where a = 40; +--sorted_result +select * from t3 where a = 50; +select * from t3 where b = 20; +select * from t3 where b = 30; +select * from t3 where b = 40; +select * from t3 where b = 50; + +--sorted_result +select * from t4 where a < 20; +--sorted_result +select * from t4 where a = 20; +--sorted_result +select * from t4 where a = 30; +--sorted_result +select * from t4 where a = 40; +--sorted_result +select * from t4 where a = 50; +select * from t4 where b = 20; +select * from t4 where b = 30; +select * from t4 where b = 40; +select * from t4 where b = 50; + alter online table t1 reorganize partition; alter online table t2 reorganize partition; +--echo Cannot use normal reorganize partition on t3 as it has explicit MAX_ROWS set +--error 1235 +alter online table t3 reorganize partition; +show warnings; + +alter online table t3 max_rows=50000000; +alter online table t4 reorganize partition; let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1); let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1); +let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1); + +let $t4_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1); + --disable_query_log +--echo Check partitions added, expect 0 in all cases --echo partitions added to t1 eval select $t1_part_count_now - $t1_part_count_start as t1_added; --echo partitions added to t2 eval select $t2_part_count_now - $t2_part_count_start as t2_added; +--echo partitions added to t3 +eval select $t3_part_count_now - $t3_part_count_start as t3_added; +--echo partitions added to t4 +eval select $t4_part_count_now - $t4_part_count_start as t4_added; --enable_query_log alter online table t1 add partition partitions 1; alter online table t2 add partition partitions 4; +alter online table t3 max_rows=100000000; # Expansion of max rows +--error 1235 +alter online table t4 max_rows=100000000; # Attempted introduction of max rows - fails let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1); let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1); +let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1); + +let $t4_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1); + --disable_query_log ---echo partitions added to t1 +--echo partitions added to t1 (expect 1) eval select $t1_part_count_now - $t1_part_count_start as t1_added; ---echo partitions added to t2 +--echo partitions added to t2 (expect 4) eval select $t2_part_count_now - $t2_part_count_start as t2_added; +--echo partitions added to t3 (expect 2) +eval select $t3_part_count_now - $t3_part_count_start as t3_added; +--echo partitions added to t4 (expect 0) +eval select $t4_part_count_now - $t4_part_count_start as t4_added; --enable_query_log # reorganize partition not support if not default partitioning @@ -126,6 +206,14 @@ eval select $t2_part_count_now - $t2_par --error ER_REORG_NO_PARAM_ERROR alter online table t1 reorganize partition; +# Following will fail as t3 has an explicit MAX_ROWS set +--error 1235 +alter online table t3 reorganize partition; +show warnings; + +# t4 reorg will succeed as t4 has no explicit MAX_ROWS +alter online table t4 reorganize partition; + select count(*) from t1; select count(*) from t2; --sorted_result @@ -158,18 +246,55 @@ select * from t2 where b = 30; select * from t2 where b = 40; select * from t2 where b = 50; +--sorted_result +select * from t3 where a < 20; +--sorted_result +select * from t3 where a = 20; +--sorted_result +select * from t3 where a = 30; +--sorted_result +select * from t3 where a = 40; +--sorted_result +select * from t3 where a = 50; +select * from t3 where b = 20; +select * from t3 where b = 30; +select * from t3 where b = 40; +select * from t3 where b = 50; + +--sorted_result +select * from t4 where a < 20; +--sorted_result +select * from t4 where a = 20; +--sorted_result +select * from t4 where a = 30; +--sorted_result +select * from t4 where a = 40; +--sorted_result +select * from t4 where a = 50; +select * from t4 where b = 20; +select * from t4 where b = 30; +select * from t4 where b = 40; +select * from t4 where b = 50; + +drop table t4; + alter online table t1 add partition partitions 2; alter online table t2 add partition partitions 1; +alter online table t3 max_rows=150000000; let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1); let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1); +let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1); + --disable_query_log ---echo partitions added to t1 +--echo partitions added to t1 (expect 3) eval select $t1_part_count_now - $t1_part_count_start as t1_added; ---echo partitions added to t2 +--echo partitions added to t2 (expect 5) eval select $t2_part_count_now - $t2_part_count_start as t2_added; +--echo partitions added to t3 (expect 4) +eval select $t3_part_count_now - $t3_part_count_start as t3_added; --enable_query_log select count(*) from t1; @@ -204,7 +329,22 @@ select * from t2 where b = 30; select * from t2 where b = 40; select * from t2 where b = 50; -drop table t1,t2; +--sorted_result +select * from t3 where a < 20; +--sorted_result +select * from t3 where a = 20; +--sorted_result +select * from t3 where a = 30; +--sorted_result +select * from t3 where a = 40; +--sorted_result +select * from t3 where a = 50; +select * from t3 where b = 20; +select * from t3 where b = 30; +select * from t3 where b = 40; +select * from t3 where b = 50; + +drop table t1,t2,t3; alter tablespace ts1 drop datafile 'datafile.dat' engine = ndb; drop tablespace ts1 engine = ndb; drop logfile group lg1 engine = ndb; === modified file 'mysql-test/suite/ndb/t/ndb_addnode.test' --- a/mysql-test/suite/ndb/t/ndb_addnode.test 2011-05-09 08:49:19 +0000 +++ b/mysql-test/suite/ndb/t/ndb_addnode.test 2012-03-21 17:28:20 +0000 @@ -20,9 +20,19 @@ CREATE TABLESPACE ts_1 create table t1(id int NOT NULL PRIMARY KEY, data char(8)) engine=ndb; create table t2(id int NOT NULL PRIMARY KEY, data char(8)) TABLESPACE ts_1 STORAGE DISK engine=ndb; +# BUG#13714648 +create table t5(id int NOT NULL PRIMARY KEY, data char(8)) max_rows=50000000 engine=ndb; load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 fields terminated by ' ' lines terminated by '\n'; load data local infile 'suite/ndb/data/table_data10000.dat' into table t2 fields terminated by ' ' lines terminated by '\n'; +load data local infile 'suite/ndb/data/table_data10000.dat' into table t5 fields terminated by ' ' lines terminated by '\n'; + +select count(1) as t1_part_count from information_schema.partitions where table_schema='test' and table_name='t1'; +select count(1) as t2_part_count from information_schema.partitions where table_schema='test' and table_name='t2'; +select @init_t5_part_count:= count(1) as t5_part_count from information_schema.partitions where table_schema='test' and table_name='t5'; + +## Check details of t5 partitioning +--exec $NDB_DESC -dtest -p -n t5 ## Create nodegroup for "new" nodes --exec $NDB_MGM -e "create nodegroup 3,4" @@ -48,6 +58,23 @@ insert into t4(id, data) VALUES alter online table t1 reorganize partition; alter online table t2 reorganize partition; +alter online table t5 max_rows=100000000; + +select count(1) as t1_part_count from information_schema.partitions where table_schema='test' and table_name='t1'; +select count(1) as t2_part_count from information_schema.partitions where table_schema='test' and table_name='t2'; +select count(1) as t3_part_count from information_schema.partitions where table_schema='test' and table_name='t3'; +select count(1) as t4_part_count from information_schema.partitions where table_schema='test' and table_name='t4'; +select @reorg_t5_part_count:= count(1) as t5_part_count from information_schema.partitions where table_schema='test' and table_name='t5'; + +## Check details of t5 partitioning +--exec $NDB_DESC -dtest -p -n t5 + +--let $t5_part_diff=query_get_value('select @reorg_t5_part_count-@init_t5_part_count as Value',Value,1) + +if (!$t5_part_diff) +{ + --die Table t5 was not reorganised +} ## Drop nodegroup with "new" nodes is not allowed with data one those nodes # NOTE: --error=0 is due to return codes doesnt work on windoze @@ -57,7 +84,7 @@ alter online table t2 reorganize partiti ## Nodegroup with "new" nodes still exist after dropping it as shown: --exec $NDB_MGM -e show -drop table t1,t2,t3,t4; +drop table t1,t2,t3,t4,t5; ## Drop nodegroup with "new" nodes --exec $NDB_MGM -e "drop nodegroup 1" === modified file 'sql/ha_ndbcluster.cc' --- a/sql/ha_ndbcluster.cc 2012-03-22 14:18:01 +0000 +++ b/sql/ha_ndbcluster.cc 2012-03-23 07:44:14 +0000 @@ -15580,7 +15580,9 @@ HA_ALTER_FLAGS supported_alter_operation HA_COLUMN_FORMAT | HA_ADD_PARTITION | HA_ALTER_TABLE_REORG | - HA_CHANGE_AUTOINCREMENT_VALUE; + HA_CHANGE_AUTOINCREMENT_VALUE | + HA_ALTER_MAX_ROWS +; } int ha_ndbcluster::check_if_supported_alter(TABLE *altered_table, @@ -15651,7 +15653,8 @@ int ha_ndbcluster::check_if_supported_al if (alter_flags->is_set(HA_ADD_COLUMN) || alter_flags->is_set(HA_ADD_PARTITION) || - alter_flags->is_set(HA_ALTER_TABLE_REORG)) + alter_flags->is_set(HA_ALTER_TABLE_REORG) || + alter_flags->is_set(HA_ALTER_MAX_ROWS)) { Ndb *ndb= get_ndb(thd); NDBDICT *dict= ndb->getDictionary(); @@ -15711,6 +15714,19 @@ int ha_ndbcluster::check_if_supported_al if (alter_flags->is_set(HA_ALTER_TABLE_REORG)) { + /* + Refuse if Max_rows has been used before... + Workaround is to use ALTER ONLINE TABLE MAX_ROWS=; + */ + if (old_tab->getMaxRows() != 0) + { + push_warning(current_thd, + MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "Cannot online REORGANIZE a table with Max_Rows set. " + "Use ALTER TABLE ... MAX_ROWS= or offline REORGANIZE " + "to redistribute this table."); + DBUG_RETURN(HA_ALTER_NOT_SUPPORTED); + } new_tab.setFragmentCount(0); new_tab.setFragmentData(0, 0); } @@ -15719,6 +15735,27 @@ int ha_ndbcluster::check_if_supported_al DBUG_PRINT("info", ("Adding partition (%u)", part_info->num_parts)); new_tab.setFragmentCount(part_info->num_parts); } + if (alter_flags->is_set(HA_ALTER_MAX_ROWS)) + { + ulonglong rows= create_info->max_rows; + uint no_fragments= get_no_fragments(rows); + uint reported_frags= no_fragments; + if (adjusted_frag_count(ndb, no_fragments, reported_frags)) + { + push_warning(current_thd, + MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR, + "Ndb might have problems storing the max amount " + "of rows specified"); + } + if (reported_frags < old_tab->getFragmentCount()) + { + DBUG_PRINT("info", ("Online reduction in number of fragments not supported")); + DBUG_RETURN(HA_ALTER_NOT_SUPPORTED); + } + new_tab.setFragmentCount(reported_frags); + new_tab.setDefaultNoPartitionsFlag(false); + new_tab.setFragmentData(0, 0); + } NDB_Modifiers table_modifiers(ndb_table_modifiers); table_modifiers.parse(thd, "NDB_TABLE=", create_info->comment.str, @@ -16037,7 +16074,9 @@ int ha_ndbcluster::alter_table_phase1(TH } } - if (alter_flags->is_set(HA_ALTER_TABLE_REORG) || alter_flags->is_set(HA_ADD_PARTITION)) + if (alter_flags->is_set(HA_ALTER_TABLE_REORG) || + alter_flags->is_set(HA_ADD_PARTITION) || + alter_flags->is_set(HA_ALTER_MAX_ROWS)) { if (alter_flags->is_set(HA_ALTER_TABLE_REORG)) { @@ -16049,7 +16088,29 @@ int ha_ndbcluster::alter_table_phase1(TH partition_info *part_info= altered_table->part_info; new_tab->setFragmentCount(part_info->num_parts); } - + else if (alter_flags->is_set(HA_ALTER_MAX_ROWS)) + { + ulonglong rows= create_info->max_rows; + uint no_fragments= get_no_fragments(rows); + uint reported_frags= no_fragments; + if (adjusted_frag_count(ndb, no_fragments, reported_frags)) + { + DBUG_ASSERT(false); /* Checked above */ + } + if (reported_frags < old_tab->getFragmentCount()) + { + DBUG_ASSERT(false); + DBUG_RETURN(HA_ALTER_NOT_SUPPORTED); + } + /* Note we don't set the ndb table's max_rows param, as that + * is considered a 'real' change + */ + //new_tab->setMaxRows(create_info->max_rows); + new_tab->setFragmentCount(reported_frags); + new_tab->setDefaultNoPartitionsFlag(false); + new_tab->setFragmentData(0, 0); + } + int res= dict->prepareHashMap(*old_tab, *new_tab); if (res == -1) { === modified file 'sql/handler.h' --- a/sql/handler.h 2012-03-22 14:18:01 +0000 +++ b/sql/handler.h 2012-03-23 07:44:14 +0000 @@ -58,7 +58,8 @@ class Alter_info; /* Bits to show what an alter table will do */ #include -#define HA_MAX_ALTER_FLAGS 40 +#define HA_MAX_ALTER_FLAGS 41 + typedef Bitmap HA_ALTER_FLAGS; #define HA_ADD_INDEX (0) @@ -101,6 +102,7 @@ typedef Bitmap HA_AL #define HA_ALTER_STORAGE_ENGINE (37) #define HA_RECREATE (38) #define HA_ALTER_TABLE_REORG (39) +#define HA_ALTER_MAX_ROWS (40) /* Remember to increase HA_MAX_ALTER_FLAGS when adding more flags! */ /* Return values for check_if_supported_alter */ === modified file 'sql/sql_table.cc' --- a/sql/sql_table.cc 2012-02-14 08:00:53 +0000 +++ b/sql/sql_table.cc 2012-03-21 17:28:20 +0000 @@ -5079,6 +5079,8 @@ compare_tables(THD *thd, *alter_flags|= HA_SET_DEFAULT_CHARACTER_SET; if (alter_info->flags & ALTER_RECREATE) *alter_flags|= HA_RECREATE; + if (create_info->used_fields & HA_CREATE_USED_MAX_ROWS) + *alter_flags|= HA_ALTER_MAX_ROWS; /* TODO check for ADD/DROP FOREIGN KEY */ if (alter_info->flags & ALTER_FOREIGN_KEY) *alter_flags|= HA_ALTER_FOREIGN_KEY; === modified file 'storage/ndb/include/util/InputStream.hpp' --- a/storage/ndb/include/util/InputStream.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/util/InputStream.hpp 2012-03-21 15:31:06 +0000 @@ -57,7 +57,7 @@ class SocketInputStream : public InputSt bool m_startover; bool m_timedout; public: - SocketInputStream(NDB_SOCKET_TYPE socket, unsigned read_timeout_ms = 60000); + SocketInputStream(NDB_SOCKET_TYPE socket, unsigned read_timeout_ms = 3000); virtual ~SocketInputStream() {} char* gets(char * buf, int bufLen); bool timedout() { return m_timedout; } === modified file 'storage/ndb/memcache/include/NdbInstance.h' --- a/storage/ndb/memcache/include/NdbInstance.h 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/include/NdbInstance.h 2012-03-22 22:18:19 +0000 @@ -42,6 +42,7 @@ public: /* Public Methods */ NdbInstance(Ndb_cluster_connection *, int); ~NdbInstance(); + void non_blocking_close(NdbTransaction *); void link_workitem(workitem *); void unlink_workitem(workitem *); === modified file 'storage/ndb/memcache/include/QueryPlan.h' --- a/storage/ndb/memcache/include/QueryPlan.h 2011-12-11 07:31:26 +0000 +++ b/storage/ndb/memcache/include/QueryPlan.h 2012-03-22 22:18:19 +0000 @@ -52,7 +52,7 @@ class QueryPlan { ~QueryPlan(); bool canHaveExternalValue() const; bool shouldExternalizeValue(size_t length) const; - bool canUseSimpleRead() const; + bool canUseCommittedRead() const; Uint64 getAutoIncrement() const; void debug_dump() const; bool hasDataOnDisk() const; @@ -103,7 +103,7 @@ inline bool QueryPlan::hasDataOnDisk() c return has_disk_storage; } -inline bool QueryPlan::canUseSimpleRead() const { +inline bool QueryPlan::canUseCommittedRead() const { return(pk_access && (! extern_store) && (! spec->exp_column)); } === modified file 'storage/ndb/memcache/include/Queue.h' --- a/storage/ndb/memcache/include/Queue.h 2011-09-12 10:05:07 +0000 +++ b/storage/ndb/memcache/include/Queue.h 2012-03-22 02:21:34 +0000 @@ -55,6 +55,7 @@ public: /* public instance variable */ public: /* public interface methods */ Queue(int maxnodes) { /* Initialize the node allocation pool */ + maxnodes += 1; // add one for a dummy node at the head. assert(sizeof(Node) < CACHE_LINE_SIZE); nodepool = (char *) calloc(maxnodes, CACHE_LINE_SIZE); nodelist = 0; === modified file 'storage/ndb/memcache/scripts/pmpstack.awk' --- a/storage/ndb/memcache/scripts/pmpstack.awk 2011-11-07 21:54:07 +0000 +++ b/storage/ndb/memcache/scripts/pmpstack.awk 2012-03-22 22:18:19 +0000 @@ -41,23 +41,35 @@ function label(name) { ### What is the thread doing? ### Patterns higher up in this file take precedence over lower ones -$2 ~ /^epoll_wait,TransporterRegistry::po/ { label("epoll_wait_transporter_recv"); next } - -$2 ~ /^epoll_wait/ { label("epoll_wait"); next } +/epoll_wait,TransporterRegistry::po/ { label("epoll_wait_transporter_recv"); next } + +/writev,TCP_Transporter::doSend/ { label("sending_to_ndb"); next } + +/recv,TCP_Transporter/ { label("tcp_recv_from_ndb"); next } -$2 ~ /^writev,TCP_Transporter::doSend/ { label("sending_to_ndb"); next } +/Ndb::closeTransaction/ { label("ndb_transaction_close"); next } -$2 ~ /^recv,TCP_Transporter/ { label("tcp_recv_from_ndb"); next } +/sendmsg,conn_mwrite/ { label("writing_to_client"); next } -$2 ~ /^sendmsg,conn_mwrite/ { label("sending_to_client"); next } +/recv,conn_read,event_handler/ { label("reading_from_client"); next } + +/poll_dispatch/ { label("poll_dispatch"); next } /pthread_mutex_unlock/ { label("releasing_locks"); next } +/_lock,pthread_cond_/ { label("getting_lock_for_condition_var"); next } + /pthread_mutex_lock,Ndb::sendPrepared/ { label("lock_Ndb_impl"); next } /_mutex_lock/ && /TransporterFacade/ { label("lock_transporter_facade_mutex"); next } /pthread_cond_timedwait/ && /ollNdb/ { label("wait_poll_ndb"); next } + +/::schedule/ && /pthread_cond_signal/ { label("Scheduler_signaling_cond_var") ; next } + +/pthread_rwlock_rdlock/ { label("acquiring_rwlock") ; next } + +/pthread_cond_[a-z]*wait/ { label("condition_variable_wait"); next } /workqueue_consumer_wait/ { label("workqueue_idle_wait"); next } @@ -67,6 +79,12 @@ $2 ~ /^sendmsg,conn_mwrite/ /Ndb::computeHash/ { label("ndb_compute_hash"); next } +/workitem__initialize/ { label("workitem_initialize"); next } + +/worker_prepare_operation/ { label("worker_prepare_operation"); next } + +/^epoll_wait/ { label("epoll_wait"); next } + /sleep/ { label("sleep"); next } @@ -74,26 +92,26 @@ $2 ~ /^sendmsg,conn_mwrite/ END { for(i in event) if (i != "total") - printf("%s\t%.2f%%\t%s\n", + printf("%s\t%.2f%% \t%s\n", "Event", (event[i] / event["total"]) * 100, i) printf("\n"); for(i in commit) if(i != "total") - printf("%s\t%.2f%%\t%s\n", + printf("%s\t%.2f%% \t%s\n", "Commit", (commit[i] / commit["total"]) * 100, i) if(commit["total"]) printf("\n"); for(i in send) if(i != "total") - printf("%s\t%.2f%%\t%s\n", + printf("%s\t%.2f%% \t%s\n", "Send", (send[i] / send["total"]) * 100, i) if(send["total"]) printf("\n"); for(i in poll) if(i != "total") - printf("%s\t%.2f%%\t%s\n", + printf("%s\t%.2f%% \t%s\n", "Poll", (poll[i] / poll["total"]) * 100, i) if(poll["total"]) printf("\n"); for(i in x) if(i != "total") - printf("%s\t%.2f%%\t%s\n", + printf("%s\t%.2f%% \t%s\n", "Unidentified", (x[i] / x["total"]) * 100, i) } === modified file 'storage/ndb/memcache/src/NdbInstance.cc' --- a/storage/ndb/memcache/src/NdbInstance.cc 2011-09-30 17:04:30 +0000 +++ b/storage/ndb/memcache/src/NdbInstance.cc 2012-03-22 22:18:19 +0000 @@ -48,3 +48,13 @@ NdbInstance::~NdbInstance() { } +void NdbInstance::non_blocking_close(NdbTransaction *tx) { + Uint64 nwaits_pre, nwaits_post; + nwaits_pre = db->getClientStat(Ndb::WaitExecCompleteCount); + + tx->close(); + + nwaits_post = db->getClientStat(Ndb::WaitExecCompleteCount); + assert(nwaits_pre == nwaits_post); +} + === modified file 'storage/ndb/memcache/src/ndb_worker.cc' --- a/storage/ndb/memcache/src/ndb_worker.cc 2012-03-15 03:21:12 +0000 +++ b/storage/ndb/memcache/src/ndb_worker.cc 2012-03-22 22:18:19 +0000 @@ -460,8 +460,8 @@ op_status_t WorkerStep1::do_read() { NdbOperation::LockMode lockmode; NdbTransaction::ExecType commitflag; - if(plan->canUseSimpleRead()) { - lockmode = NdbOperation::LM_SimpleRead; + if(plan->canUseCommittedRead()) { + lockmode = NdbOperation::LM_CommittedRead; commitflag = NdbTransaction::Commit; } else { === modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc' --- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-07 01:22:53 +0000 +++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-22 19:24:32 +0000 @@ -538,7 +538,7 @@ S::WorkerConnection::WorkerConnection(Sc /* Build the freelist */ freelist = 0; - int my_ndb_inst = conn->nInst / global->options.n_worker_threads; + int my_ndb_inst = conn->nInst / conn->n_workers; for(int j = 0 ; j < my_ndb_inst ; j++ ) { NdbInstance *inst = new NdbInstance(conn->conn, 2); inst->id = ((id.thd + 1) * 10000) + j + 1; === modified file 'storage/ndb/src/common/transporter/Transporter.cpp' --- a/storage/ndb/src/common/transporter/Transporter.cpp 2011-12-10 19:02:03 +0000 +++ b/storage/ndb/src/common/transporter/Transporter.cpp 2012-03-21 17:28:20 +0000 @@ -74,7 +74,7 @@ Transporter::Transporter(TransporterRegi checksumUsed = _checksum; signalIdUsed = _signalId; - m_timeOutMillis = 30000; + m_timeOutMillis = 3000; m_connect_address.s_addr= 0; if(s_port<0) === modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt' --- a/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/kernel/vm/CMakeLists.txt 2012-03-21 12:31:02 +0000 @@ -78,3 +78,9 @@ FOREACH(testprog CountingPool DynArr256) TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror ndbtransport ndbmgmcommon ndbmgmapi ndbportlib ndbgeneral) ENDFOREACH(testprog) + +IF(NDB_BUILD_NDBMTD) + ADD_EXECUTABLE(mt-send-t mt-send-t.cpp) + TARGET_LINK_LIBRARIES(mt-send-t ndbtest ndbkernel ndbsched ndberror + ndbtransport ndbmgmcommon ndbmgmapi ndbportlib ndbgeneral) +ENDIF() === added file 'storage/ndb/src/kernel/vm/mt-lock.hpp' --- a/storage/ndb/src/kernel/vm/mt-lock.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt-lock.hpp 2012-03-20 13:23:29 +0000 @@ -0,0 +1,160 @@ +/* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef MT_LOCK_HPP +#define MT_LOCK_HPP + +#include +#include "mt-asm.h" +#include + +struct mt_lock_stat +{ + const void * m_ptr; + char * m_name; + Uint32 m_contended_count; + Uint32 m_spin_count; +}; + +static void register_lock(const void * ptr, const char * name); +static mt_lock_stat * lookup_lock(const void * ptr); + +#ifdef NDB_HAVE_XCNG +template +struct thr_spin_lock +{ + thr_spin_lock(const char * name = 0) + { + m_lock = 0; + register_lock(this, name); + } + + union { + volatile Uint32 m_lock; + char pad[SZ]; + }; +}; + +static +ATTRIBUTE_NOINLINE +void +lock_slow(void * sl, volatile unsigned * val) +{ + mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock + +loop: + Uint32 spins = 0; + do { + spins++; + cpu_pause(); + } while (* val == 1); + + if (unlikely(xcng(val, 1) != 0)) + goto loop; + + if (s) + { + s->m_spin_count += spins; + Uint32 count = ++s->m_contended_count; + Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1)); + + if ((count % freq) == 0) + printf("%s waiting for lock, contentions: %u spins: %u\n", + s->m_name, count, s->m_spin_count); + } +} + +template +static +inline +void +lock(struct thr_spin_lock* sl) +{ + volatile unsigned* val = &sl->m_lock; + if (likely(xcng(val, 1) == 0)) + return; + + lock_slow(sl, val); +} + +template +static +inline +void +unlock(struct thr_spin_lock* sl) +{ + /** + * Memory barrier here, to make sure all of our stores are visible before + * the lock release is. + */ + mb(); + sl->m_lock = 0; +} + +template +static +inline +int +trylock(struct thr_spin_lock* sl) +{ + volatile unsigned* val = &sl->m_lock; + return xcng(val, 1); +} +#else +#define thr_spin_lock thr_mutex +#endif + +template +struct thr_mutex +{ + thr_mutex(const char * name = 0) { + NdbMutex_Init(&m_mutex); + register_lock(this, name); + } + + union { + NdbMutex m_mutex; + char pad[SZ]; + }; +}; + +template +static +inline +void +lock(struct thr_mutex* sl) +{ + NdbMutex_Lock(&sl->m_mutex); +} + +template +static +inline +void +unlock(struct thr_mutex* sl) +{ + NdbMutex_Unlock(&sl->m_mutex); +} + +template +static +inline +int +trylock(struct thr_mutex * sl) +{ + return NdbMutex_Trylock(&sl->m_mutex); +} + +#endif === added file 'storage/ndb/src/kernel/vm/mt-send-t.cpp' --- a/storage/ndb/src/kernel/vm/mt-send-t.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp 2012-03-21 14:07:51 +0000 @@ -0,0 +1,554 @@ +#include "mt-asm.h" +#include "mt-lock.hpp" +#include +#include +#include +#include +#include +#include +#include + +#define BUGGY_VERSION 0 + +/** + * DO_SYSCALL inside critical section + * (the equivalent of writev(socket) + */ +#define DO_SYSCALL 1 + +/** + * This is a unit test of the send code for mt.cpp + * specifically the code that manages which thread will send + * (write gathering) + * + * Each thread is a producer of Signals + * Each signal has a destination remote node (transporter) + * Each thread will after having produced a set of signals + * check if it should send them on socket. + * If it decides that it should, it consumes all the signals + * produced by all threads. + * + * In this unit test, we don't send signals, but the producing part + * will only be to increment a counter. + * + ****************************************************************** + * + * To use this program seriously... + * + * you should set BUGGY_VERSION to 1 + * and experiment with values on cnt_* + * until you find a variant which crashes (abort) + * + * The values compiled-in makes it crash on a single socket + * Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz release compiled! + * (i never managed to get it debug compiled) + */ +#define MAX_THREADS 256 +#define MAX_TRANSPORTERS 256 + +/** + * global variables + */ +static unsigned cnt_threads = 64; +static unsigned cnt_transporters = 8; + +/** + * outer loops + * start/stop threads + */ +static unsigned cnt_seconds = 180; + +/** + * no of signals produced before calling consume + */ +static unsigned cnt_signals_before_consume = 4; + +/** + * no of signals produced in one inner loop + */ +static unsigned cnt_signals_per_inner_loop = 4; + +/** + * no inner loops per outer loop + * + * after each inner loop + * threads will be stalled and result verified + */ +static unsigned cnt_inner_loops = 5000; + +/** + * pct of do_send that are using forceSend() + */ +static unsigned pct_force = 15; + +typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask; + +struct Producer +{ + Producer() { + bzero(val, sizeof(val)); + pendingcount = 0; + } + + void init() {} + + /** + * values produced... + */ + unsigned val[MAX_TRANSPORTERS]; + + /** + * mask/array to keep track of which transporter we have produce values to + */ + TransporterMask pendingmask; + unsigned pendingcount; + unsigned char pendinglist[MAX_TRANSPORTERS]; + + /** + * produce a value + * + * This is the equivalent of mt_send_remote() + */ + void produce(unsigned D); + + /** + * consume values (from all threads) + * for transporters that we have produced a value to + * + * This is the equivalent to do_send and if force=true + * this is the equivalent of forceSend() + */ + void consume(bool force); +}; + +struct Thread +{ + Thread() { thread= 0; } + + void init() { p.init(); } + + NdbThread * thread; + Producer p; +}; + +/** + * This is the consumer of values for *one* transporter + */ +struct Consumer +{ + Consumer() { + m_force_send = 0; bzero(val, sizeof(val)); + } + + void init() {} + + struct thr_spin_lock<8> m_send_lock; + unsigned m_force_send; + unsigned val[MAX_THREADS]; + + /** + * consume values from all threads to this transporter + */ + void consume(unsigned D); + + /** + * force_consume + */ + void forceConsume(unsigned D); +}; + +struct Consumer_pad +{ + Consumer c; + char pad[NDB_CL_PADSZ(sizeof(Consumer))]; +}; + +struct Thread_pad +{ + Thread t; + char pad[NDB_CL_PADSZ(sizeof(Thread))]; +}; + +/** + * Thread repository + * and an instance of it + */ +static +struct Rep +{ + Thread_pad t[MAX_THREADS]; + Consumer_pad c[MAX_TRANSPORTERS]; + + /** + * This menthod is called when all threads are stalled + * so it's safe to read values without locks + */ + void validate() { + for (unsigned ic = 0; ic < cnt_transporters; ic++) + { + for (unsigned it = 0; it < cnt_threads; it++) + { + if (c[ic].c.val[it] != t[it].t.p.val[ic]) + { + printf("Detected bug!!!\n"); + printf("ic: %u it: %u c[ic].c.val[it]: %u t[it].t.p.val[ic]: %u\n", + ic, it, c[ic].c.val[it], t[it].t.p.val[ic]); + abort(); + } + } + } + } + + void init() { + for (unsigned i = 0; i < cnt_threads; i++) + t[i].t.init(); + + for (unsigned i = 0; i < cnt_transporters; i++) + c[i].c.init(); + } +} rep; + +static +struct Test +{ + Test() { + waiting_start = 0; + waiting_stop = 0; + mutex = 0; + cond = 0; + } + + void init() { + mutex = NdbMutex_Create(); + cond = NdbCondition_Create(); + } + + unsigned waiting_start; + unsigned waiting_stop; + NdbMutex* mutex; + NdbCondition* cond; + + void wait_started(); + void wait_completed(); +} test; + +void* +thread_main(void * _t) +{ + unsigned seed = + (unsigned)NdbTick_CurrentNanosecond() + + (unsigned)(unsigned long long)_t; + + Thread * self = (Thread*) _t; + for (unsigned i = 0; i < cnt_inner_loops; i++) + { + test.wait_started(); + for (unsigned j = 0; j < cnt_signals_per_inner_loop;) + { + for (unsigned k = 0; k < cnt_signals_before_consume; k++) + { + /** + * Produce a signal to destination D + */ + unsigned D = unsigned(ndb_rand_r(&seed)) % cnt_transporters; + self->p.produce(D); + } + + j += cnt_signals_before_consume; + + /** + * This is the equivalent of do_send() + */ + bool force = unsigned(ndb_rand_r(&seed) % 100) < pct_force; + self->p.consume(force); + } + test.wait_completed(); + } + return 0; +} + +static +bool +match(const char * arg, const char * val, unsigned * valptr) +{ + if (strncmp(arg, val, strlen(val)) == 0) + { + * valptr = atoi(arg + strlen(val)); + return true; + } + return false; +} + +int +main(int argc, char ** argv) +{ + plan(1); + ndb_init(); + test.init(); + rep.init(); + + if (argc == 1) + { + printf("No arguments supplied...\n" + "assuming we're being run from MTR or similar.\n" + "decreasing loop counts to ridiculously small values...\n"); + cnt_seconds = 10; + cnt_inner_loops = 3000; + cnt_threads = 4; + } + else + { + printf("Arguments supplied...\n"); + for (int i = 1; i < argc; i++) + { + if (match(argv[i], "cnt_seconds=", &cnt_seconds)) + continue; + else if (match(argv[i], "cnt_threads=", &cnt_threads)) + continue; + else if (match(argv[i], "cnt_transporters=", &cnt_transporters)) + continue; + else if (match(argv[i], "cnt_inner_loops=", &cnt_inner_loops)) + continue; + else if (match(argv[i], "cnt_signals_before_consume=", + &cnt_signals_before_consume)) + continue; + else if (match(argv[i], "cnt_signals_per_inner_loop=", + &cnt_signals_per_inner_loop)) + continue; + else if (match(argv[i], "pct_force=", + &pct_force)) + continue; + else + { + printf("ignoreing unknown argument: %s\n", argv[i]); + } + } + } + + printf("%s" + " cnt_seconds=%u" + " cnt_threads=%u" + " cnt_transporters=%u" + " cnt_inner_loops=%u" + " cnt_signals_before_consume=%u" + " cnt_signals_per_inner_loop=%u" + " pct_force=%u" + "\n", + argv[0], + cnt_seconds, + cnt_threads, + cnt_transporters, + cnt_inner_loops, + cnt_signals_before_consume, + cnt_signals_per_inner_loop, + pct_force); + + Uint32 loop = 0; + Uint64 start = NdbTick_CurrentMillisecond() / 1000; + while (start + cnt_seconds > (NdbTick_CurrentMillisecond() / 1000)) + { + printf("%u ", loop++); fflush(stdout); + if ((loop < 100 && (loop % 25) == 0) || + (loop >= 100 && (loop % 20) == 0)) + printf("\n"); + + for (unsigned t = 0; t < cnt_threads; t++) + { + rep.t[t].t.thread = NdbThread_Create(thread_main, + (void**)&rep.t[t].t, + 1024*1024, + "execute thread", + NDB_THREAD_PRIO_MEAN); + } + + for (unsigned t = 0; t < cnt_threads; t++) + { + void * ret; + NdbThread_WaitFor(rep.t[t].t.thread, &ret); + } + } + printf("\n"); fflush(stdout); + + ok(true, "ok"); + return 0; +} + +inline +void +Producer::produce(unsigned D) +{ + if (!pendingmask.get(D)) + { + pendingmask.set(D); + pendinglist[pendingcount] = D; + pendingcount++; + } + val[D]++; +} + +inline +void +Producer::consume(bool force) +{ + unsigned count = pendingcount; + pendingmask.clear(); + pendingcount = 0; + + for (unsigned i = 0; i < count; i++) + { + unsigned D = pendinglist[i]; + if (force) + rep.c[D].c.forceConsume(D); + else + rep.c[D].c.consume(D); + } +} + +inline +void +Consumer::consume(unsigned D) +{ + /** + * This is the equivalent of do_send(must_send = 1) + */ + m_force_send = 1; + + do + { + if (trylock(&m_send_lock) != 0) + { + /* Other thread will send for us as we set m_force_send. */ + return; + } + + /** + * Now clear the flag, and start sending all data available to this node. + * + * Put a memory barrier here, so that if another thread tries to grab + * the send lock but fails due to us holding it here, we either + * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or + * 2) We clear here the flag just set by the other thread, but then we + * will (thanks to mb()) be able to see and send all of the data already + * in the first send iteration. + */ + m_force_send = 0; + mb(); + + /** + * This is the equivalent of link_thread_send_buffers + */ + for (unsigned i = 0; i < cnt_threads; i++) + { + val[i] = rep.t[i].t.p.val[D]; + } + + /** + * Do a syscall...which could have affect on barriers...etc + */ + if (DO_SYSCALL) + { + NdbTick_CurrentMillisecond(); + } + + unlock(&m_send_lock); + +#if BUGGY_VERSION +#else + mb(); +#endif + } + while (m_force_send != 0); +} + +inline +void +Consumer::forceConsume(unsigned D) +{ + /** + * This is the equivalent of forceSend() + */ + + do + { + /** + * NOTE: since we unconditionally lock m_send_lock + * we don't need a mb() after the clearing of m_force_send here. + */ + m_force_send = 0; + + lock(&m_send_lock); + + /** + * This is the equivalent of link_thread_send_buffers + */ + for (unsigned i = 0; i < cnt_threads; i++) + { + val[i] = rep.t[i].t.p.val[D]; + } + + /** + * Do a syscall...which could have affect on barriers...etc + */ + if (DO_SYSCALL) + { + NdbTick_CurrentMillisecond(); + } + + unlock(&m_send_lock); + +#if BUGGY_VERSION +#else + mb(); +#endif + } + while (m_force_send != 0); +} + +void +Test::wait_started() +{ + NdbMutex_Lock(mutex); + if (waiting_start + 1 == cnt_threads) + { + waiting_stop = 0; + } + waiting_start++; + assert(waiting_start <= cnt_threads); + while (waiting_start < cnt_threads) + NdbCondition_Wait(cond, mutex); + + NdbCondition_Broadcast(cond); + NdbMutex_Unlock(mutex); +} + +void +Test::wait_completed() +{ + NdbMutex_Lock(mutex); + if (waiting_stop + 1 == cnt_threads) + { + rep.validate(); + waiting_start = 0; + } + waiting_stop++; + assert(waiting_stop <= cnt_threads); + while (waiting_stop < cnt_threads) + NdbCondition_Wait(cond, mutex); + + NdbCondition_Broadcast(cond); + NdbMutex_Unlock(mutex); +} + +static +void +register_lock(const void * ptr, const char * name) +{ + return; +} + +static +mt_lock_stat * +lookup_lock(const void * ptr) +{ + return 0; +} === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2012-03-21 12:31:02 +0000 @@ -36,6 +36,7 @@ #include #include "mt-asm.h" +#include "mt-lock.hpp" inline SimulatedBlock* @@ -96,16 +97,6 @@ static Uint32 first_receiver_thread_no = /* max signal is 32 words, 7 for signal header and 25 datawords */ #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32) -struct mt_lock_stat -{ - const void * m_ptr; - char * m_name; - Uint32 m_contended_count; - Uint32 m_spin_count; -}; -static void register_lock(const void * ptr, const char * name); -static mt_lock_stat * lookup_lock(const void * ptr); - #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG) #define USE_FUTEX #endif @@ -274,132 +265,6 @@ wakeup(struct thr_wait* wait) #endif -#ifdef NDB_HAVE_XCNG -template -struct thr_spin_lock -{ - thr_spin_lock(const char * name = 0) - { - m_lock = 0; - register_lock(this, name); - } - - union { - volatile Uint32 m_lock; - char pad[SZ]; - }; -}; - -static -ATTRIBUTE_NOINLINE -void -lock_slow(void * sl, volatile unsigned * val) -{ - mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock - -loop: - Uint32 spins = 0; - do { - spins++; - cpu_pause(); - } while (* val == 1); - - if (unlikely(xcng(val, 1) != 0)) - goto loop; - - if (s) - { - s->m_spin_count += spins; - Uint32 count = ++s->m_contended_count; - Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1)); - - if ((count % freq) == 0) - printf("%s waiting for lock, contentions: %u spins: %u\n", - s->m_name, count, s->m_spin_count); - } -} - -template -static -inline -void -lock(struct thr_spin_lock* sl) -{ - volatile unsigned* val = &sl->m_lock; - if (likely(xcng(val, 1) == 0)) - return; - - lock_slow(sl, val); -} - -template -static -inline -void -unlock(struct thr_spin_lock* sl) -{ - /** - * Memory barrier here, to make sure all of our stores are visible before - * the lock release is. - */ - mb(); - sl->m_lock = 0; -} - -template -static -inline -int -trylock(struct thr_spin_lock* sl) -{ - volatile unsigned* val = &sl->m_lock; - return xcng(val, 1); -} -#else -#define thr_spin_lock thr_mutex -#endif - -template -struct thr_mutex -{ - thr_mutex(const char * name = 0) { - NdbMutex_Init(&m_mutex); - register_lock(this, name); - } - - union { - NdbMutex m_mutex; - char pad[SZ]; - }; -}; - -template -static -inline -void -lock(struct thr_mutex* sl) -{ - NdbMutex_Lock(&sl->m_mutex); -} - -template -static -inline -void -unlock(struct thr_mutex* sl) -{ - NdbMutex_Unlock(&sl->m_mutex); -} - -template -static -inline -int -trylock(struct thr_mutex * sl) -{ - return NdbMutex_Trylock(&sl->m_mutex); -} - /** * thr_safe_pool */ @@ -2719,6 +2584,13 @@ pack_send_buffer(thr_data *selfptr, Uint * After having locked/unlock m_send_lock * "protocol" dictates that we must check the m_force_send */ + + /** + * We need a memory barrier here to prevent race between clearing lock + * and reading of m_force_send. + * CPU can reorder the load to before the clear of the lock + */ + mb(); if (sb->m_force_send) { try_send(selfptr, node); @@ -2787,7 +2659,13 @@ mt_send_handle::forceSend(NodeId nodeId) do { + /** + * NOTE: we don't need a memory barrier after clearing + * m_force_send here as we unconditionally lock m_send_lock + * hence there is no way that our data can be "unsent" + */ sb->m_force_send = 0; + lock(&sb->m_send_lock); sb->m_send_thread = selfptr->m_thr_no; globalTransporterRegistry.performSend(nodeId); @@ -2799,6 +2677,12 @@ mt_send_handle::forceSend(NodeId nodeId) */ selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS); + /** + * We need a memory barrier here to prevent race between clearing lock + * and reading of m_force_send. + * CPU can reorder the load to before the clear of the lock + */ + mb(); } while (sb->m_force_send); return true; @@ -2821,6 +2705,16 @@ try_send(thr_data * selfptr, Uint32 node return; } + /** + * Now clear the flag, and start sending all data available to this node. + * + * Put a memory barrier here, so that if another thread tries to grab + * the send lock but fails due to us holding it here, we either + * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or + * 2) We clear here the flag just set by the other thread, but then we + * will (thanks to mb()) be able to see and send all of the data already + * in the first send iteration. + */ sb->m_force_send = 0; mb(); @@ -2834,6 +2728,13 @@ try_send(thr_data * selfptr, Uint32 node */ selfptr->m_send_buffer_pool.release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS); + + /** + * We need a memory barrier here to prevent race between clearing lock + * and reading of m_force_send. + * CPU can reorder the load to before the clear of the lock + */ + mb(); } while (sb->m_force_send); } @@ -2966,6 +2867,13 @@ do_send(struct thr_data* selfptr, bool m { register_pending_send(selfptr, node); } + + /** + * We need a memory barrier here to prevent race between clearing lock + * and reading of m_force_send. + * CPU can reorder the load to before the clear of the lock + */ + mb(); if (sb->m_force_send) { /** === modified file 'storage/ndb/test/run-test/conf-blade08.cnf' --- a/storage/ndb/test/run-test/conf-blade08.cnf 2011-05-19 17:47:28 +0000 +++ b/storage/ndb/test/run-test/conf-blade08.cnf 2012-03-21 14:43:07 +0000 @@ -26,6 +26,7 @@ FragmentLogFileSize = 64M CompressedLCP=1 CompressedBackup=1 ODirect=1 +MaxNoOfAttributes=2000 SharedGlobalMemory=256M InitialLogfileGroup=undo_buffer_size=64M;undofile01.dat:256M;undofile02.dat:128M === modified file 'storage/ndb/test/run-test/conf-ndb07.cnf' --- a/storage/ndb/test/run-test/conf-ndb07.cnf 2011-05-19 18:19:47 +0000 +++ b/storage/ndb/test/run-test/conf-ndb07.cnf 2012-03-21 14:43:07 +0000 @@ -31,6 +31,7 @@ MaxNoOfSavedMessages= 5 NoOfFragmentLogFiles = 8 FragmentLogFileSize = 64M ODirect=1 +MaxNoOfAttributes=2000 SharedGlobalMemory=256M DiskPageBufferMemory=256M === modified file 'storage/ndb/test/run-test/conf-tyr13.cnf' --- a/storage/ndb/test/run-test/conf-tyr13.cnf 2011-12-09 15:11:21 +0000 +++ b/storage/ndb/test/run-test/conf-tyr13.cnf 2012-03-21 14:43:07 +0000 @@ -34,6 +34,7 @@ FragmentLogFileSize = 64M ODirect=1 MaxNoOfExecutionThreads=8 SendBufferMemory=4M +MaxNoOfAttributes=2000 SharedGlobalMemory=256M DiskPageBufferMemory=256M === modified file 'storage/ndb/test/run-test/daily-basic-tests.txt' --- a/storage/ndb/test/run-test/daily-basic-tests.txt 2012-02-23 15:41:31 +0000 +++ b/storage/ndb/test/run-test/daily-basic-tests.txt 2012-03-21 17:28:20 +0000 @@ -946,7 +946,7 @@ max-time: 500 cmd: testNdbApi args: -n Bug44065 -max-time: 500 +max-time: 1000 cmd: testNdbApi args: -n Bug44065_org No bundle (reason: useless for push emails).