List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:March 23 2012 7:44am
Subject:bzr push into mysql-5.5-cluster-7.2-spj branch (ole.john.aske:3856 to 3857)
View as plain text  
 3857 Ole John Aske	2012-03-23 [merge]
      Merge mysql-5.5-cluster-7.2 ==> mysql-5.5-cluster-7.2-spj

    added:
      storage/ndb/src/kernel/vm/mt-lock.hpp
      storage/ndb/src/kernel/vm/mt-send-t.cpp
    modified:
      mysql-test/suite/ndb/r/ndb_add_partition.result
      mysql-test/suite/ndb/t/ndb_add_partition.test
      mysql-test/suite/ndb/t/ndb_addnode.test
      sql/ha_ndbcluster.cc
      sql/handler.h
      sql/sql_table.cc
      storage/ndb/include/util/InputStream.hpp
      storage/ndb/memcache/include/NdbInstance.h
      storage/ndb/memcache/include/QueryPlan.h
      storage/ndb/memcache/include/Queue.h
      storage/ndb/memcache/scripts/pmpstack.awk
      storage/ndb/memcache/src/NdbInstance.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/src/common/transporter/Transporter.cpp
      storage/ndb/src/kernel/vm/CMakeLists.txt
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/test/run-test/conf-blade08.cnf
      storage/ndb/test/run-test/conf-ndb07.cnf
      storage/ndb/test/run-test/conf-tyr13.cnf
      storage/ndb/test/run-test/daily-basic-tests.txt
 3856 Ole John Aske	2012-03-22
      SPJ: Implement native support of pushed queries being sorted-scan-scan query.
      
      Until now we didn't allow a scan-scan query which required 'sorted' result
      set to be pushed to the SPJ block. Instead we supressed the 'sorted' request
      and instead dumped the resultset to a temporary file and 'filesorted' it.
      (Explained as 'Using tempfile; Using filesort')
      
      This has been commented as 'questionable as best' by Evgeny which is
      doing the review if WL5940: 'Integrating pushed join with optimizer & handler interface'
      So we want to change it.....
      
      This fix implement native support of sorted scan-scan by setting the 
      parent-scan batchsize to '1' when a sorted-scan-scan request is
      about to be sent to the SPJ block. 
      (Dependant child scan/lookups are retreived with 'normal' batchsize)
      
      SPJ API has to NEXTREQ more result until all related child-scan result for
      the single parent row has been retrieved - This will guarantee sorted results
      as all child rows will be retrieved together with its related parent.
      (at the cost of some overhead though).

    modified:
      mysql-test/suite/ndb/r/ndb_join_pushdown_default.result
      sql/abstract_query_plan.cc
      sql/abstract_query_plan.h
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_push.cc
      sql/ha_ndbcluster_push.h
      sql/handler.h
      sql/sql_select.cc
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/test/ndbapi/testSpj.cpp
      storage/ndb/test/tools/spj_sanity_test.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_add_partition.result'
--- a/mysql-test/suite/ndb/r/ndb_add_partition.result	2011-05-12 11:31:21 +0000
+++ b/mysql-test/suite/ndb/r/ndb_add_partition.result	2012-03-21 15:42:47 +0000
@@ -24,6 +24,19 @@ ENGINE = NDB
 STORAGE DISK
 TABLESPACE ts1
 partition by key(a);
+CREATE TABLE t3 (a int unsigned not null,
+b int unsigned not null,
+c int unsigned not null,
+primary key(a,b),
+unique (b))
+MAX_ROWS=50000000
+ENGINE = NDB;
+CREATE TABLE t4 (a int unsigned not null,
+b int unsigned not null,
+c int unsigned not null,
+primary key(a,b),
+unique (b))
+ENGINE = NDB;
 INSERT INTO t1 VALUES
 (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
 (6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),
@@ -46,12 +59,20 @@ INSERT INTO t1 VALUES
 (91,91,91),(92,92,92),(93,93,93),(94,94,94),(95,95,95),
 (96,96,96),(97,97,97),(98,98,98),(99,99,99),(100,100,100);
 insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
 select count(*) from t1;
 count(*)
 100
 select count(*) from t2;
 count(*)
 100
+select count(*) from t3;
+count(*)
+100
+select count(*) from t4;
+count(*)
+100
 select * from t1 where a < 20;
 a	b	c
 1	1	1
@@ -142,24 +163,146 @@ a	b	c
 select * from t2 where b = 50;
 a	b	c
 50	50	50
+select * from t3 where a < 20;
+a	b	c
+1	1	1
+10	10	10
+11	11	11
+12	12	12
+13	13	13
+14	14	14
+15	15	15
+16	16	16
+17	17	17
+18	18	18
+19	19	19
+2	2	2
+3	3	3
+4	4	4
+5	5	5
+6	6	6
+7	7	7
+8	8	8
+9	9	9
+select * from t3 where a = 20;
+a	b	c
+20	20	20
+select * from t3 where a = 30;
+a	b	c
+30	30	30
+select * from t3 where a = 40;
+a	b	c
+40	40	40
+select * from t3 where a = 50;
+a	b	c
+50	50	50
+select * from t3 where b = 20;
+a	b	c
+20	20	20
+select * from t3 where b = 30;
+a	b	c
+30	30	30
+select * from t3 where b = 40;
+a	b	c
+40	40	40
+select * from t3 where b = 50;
+a	b	c
+50	50	50
+select * from t4 where a < 20;
+a	b	c
+1	1	1
+10	10	10
+11	11	11
+12	12	12
+13	13	13
+14	14	14
+15	15	15
+16	16	16
+17	17	17
+18	18	18
+19	19	19
+2	2	2
+3	3	3
+4	4	4
+5	5	5
+6	6	6
+7	7	7
+8	8	8
+9	9	9
+select * from t4 where a = 20;
+a	b	c
+20	20	20
+select * from t4 where a = 30;
+a	b	c
+30	30	30
+select * from t4 where a = 40;
+a	b	c
+40	40	40
+select * from t4 where a = 50;
+a	b	c
+50	50	50
+select * from t4 where b = 20;
+a	b	c
+20	20	20
+select * from t4 where b = 30;
+a	b	c
+30	30	30
+select * from t4 where b = 40;
+a	b	c
+40	40	40
+select * from t4 where b = 50;
+a	b	c
+50	50	50
 alter online table t1 reorganize partition;
 alter online table t2 reorganize partition;
+Cannot use normal reorganize partition on t3 as it has explicit MAX_ROWS set
+alter online table t3 reorganize partition;
+ERROR 42000: This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+show warnings;
+Level	Code	Message
+Warning	1105	Cannot online REORGANIZE a table with Max_Rows set.  Use ALTER TABLE ... MAX_ROWS=<new_val> or offline REORGANIZE to redistribute this table.
+Error	1235	This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+alter online table t3 max_rows=50000000;
+alter online table t4 reorganize partition;
+Check partitions added, expect 0 in all cases
 partitions added to t1
 t1_added
 0
 partitions added to t2
 t2_added
 0
+partitions added to t3
+t3_added
+0
+partitions added to t4
+t4_added
+0
 alter online table t1 add partition partitions 1;
 alter online table t2 add partition partitions 4;
-partitions added to t1
+alter online table t3 max_rows=100000000;
+alter online table t4 max_rows=100000000;
+ERROR 42000: This version of MySQL doesn't yet support 'alter online table t4 max_rows=100000000'
+partitions added to t1 (expect 1)
 t1_added
 1
-partitions added to t2
+partitions added to t2 (expect 4)
 t2_added
 4
+partitions added to t3 (expect 2)
+t3_added
+2
+partitions added to t4 (expect 0)
+t4_added
+0
 alter online table t1 reorganize partition;
 ERROR HY000: REORGANIZE PARTITION without parameters can only be used on auto-partitioned tables using HASH PARTITIONs
+alter online table t3 reorganize partition;
+ERROR 42000: This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+show warnings;
+Level	Code	Message
+Warning	1105	Cannot online REORGANIZE a table with Max_Rows set.  Use ALTER TABLE ... MAX_ROWS=<new_val> or offline REORGANIZE to redistribute this table.
+Error	1235	This version of MySQL doesn't yet support 'alter online table t3 reorganize partition'
+alter online table t4 reorganize partition;
 select count(*) from t1;
 count(*)
 100
@@ -256,14 +399,109 @@ a	b	c
 select * from t2 where b = 50;
 a	b	c
 50	50	50
+select * from t3 where a < 20;
+a	b	c
+1	1	1
+10	10	10
+11	11	11
+12	12	12
+13	13	13
+14	14	14
+15	15	15
+16	16	16
+17	17	17
+18	18	18
+19	19	19
+2	2	2
+3	3	3
+4	4	4
+5	5	5
+6	6	6
+7	7	7
+8	8	8
+9	9	9
+select * from t3 where a = 20;
+a	b	c
+20	20	20
+select * from t3 where a = 30;
+a	b	c
+30	30	30
+select * from t3 where a = 40;
+a	b	c
+40	40	40
+select * from t3 where a = 50;
+a	b	c
+50	50	50
+select * from t3 where b = 20;
+a	b	c
+20	20	20
+select * from t3 where b = 30;
+a	b	c
+30	30	30
+select * from t3 where b = 40;
+a	b	c
+40	40	40
+select * from t3 where b = 50;
+a	b	c
+50	50	50
+select * from t4 where a < 20;
+a	b	c
+1	1	1
+10	10	10
+11	11	11
+12	12	12
+13	13	13
+14	14	14
+15	15	15
+16	16	16
+17	17	17
+18	18	18
+19	19	19
+2	2	2
+3	3	3
+4	4	4
+5	5	5
+6	6	6
+7	7	7
+8	8	8
+9	9	9
+select * from t4 where a = 20;
+a	b	c
+20	20	20
+select * from t4 where a = 30;
+a	b	c
+30	30	30
+select * from t4 where a = 40;
+a	b	c
+40	40	40
+select * from t4 where a = 50;
+a	b	c
+50	50	50
+select * from t4 where b = 20;
+a	b	c
+20	20	20
+select * from t4 where b = 30;
+a	b	c
+30	30	30
+select * from t4 where b = 40;
+a	b	c
+40	40	40
+select * from t4 where b = 50;
+a	b	c
+50	50	50
+drop table t4;
 alter online table t1 add partition partitions 2;
 alter online table t2 add partition partitions 1;
-partitions added to t1
+alter online table t3 max_rows=150000000;
+partitions added to t1 (expect 3)
 t1_added
 3
-partitions added to t2
+partitions added to t2 (expect 5)
 t2_added
 5
+partitions added to t3 (expect 4)
+t3_added
+4
 select count(*) from t1;
 count(*)
 100
@@ -360,7 +598,52 @@ a	b	c
 select * from t2 where b = 50;
 a	b	c
 50	50	50
-drop table t1,t2;
+select * from t3 where a < 20;
+a	b	c
+1	1	1
+10	10	10
+11	11	11
+12	12	12
+13	13	13
+14	14	14
+15	15	15
+16	16	16
+17	17	17
+18	18	18
+19	19	19
+2	2	2
+3	3	3
+4	4	4
+5	5	5
+6	6	6
+7	7	7
+8	8	8
+9	9	9
+select * from t3 where a = 20;
+a	b	c
+20	20	20
+select * from t3 where a = 30;
+a	b	c
+30	30	30
+select * from t3 where a = 40;
+a	b	c
+40	40	40
+select * from t3 where a = 50;
+a	b	c
+50	50	50
+select * from t3 where b = 20;
+a	b	c
+20	20	20
+select * from t3 where b = 30;
+a	b	c
+30	30	30
+select * from t3 where b = 40;
+a	b	c
+40	40	40
+select * from t3 where b = 50;
+a	b	c
+50	50	50
+drop table t1,t2,t3;
 alter tablespace ts1 drop datafile 'datafile.dat' engine = ndb;
 drop tablespace ts1 engine = ndb;
 drop logfile group lg1 engine = ndb;

=== modified file 'mysql-test/suite/ndb/t/ndb_add_partition.test'
--- a/mysql-test/suite/ndb/t/ndb_add_partition.test	2011-05-12 14:34:22 +0000
+++ b/mysql-test/suite/ndb/t/ndb_add_partition.test	2012-03-21 17:28:20 +0000
@@ -34,10 +34,29 @@ STORAGE DISK
 TABLESPACE ts1
 partition by key(a);
 
+CREATE TABLE t3 (a int unsigned not null,
+                 b int unsigned not null,
+                 c int unsigned not null,
+                 primary key(a,b),
+                 unique (b))
+MAX_ROWS=50000000
+ENGINE = NDB;
+
+CREATE TABLE t4 (a int unsigned not null,
+                 b int unsigned not null,
+                 c int unsigned not null,
+                 primary key(a,b),
+                 unique (b))
+ENGINE = NDB;
+
 let $t1_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
 
 let $t2_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
 
+let $t3_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
+let $t4_part_count_start = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1);
+
 INSERT INTO t1 VALUES
 (1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
 (6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10),
@@ -61,8 +80,12 @@ INSERT INTO t1 VALUES
 (96,96,96),(97,97,97),(98,98,98),(99,99,99),(100,100,100);
 
 insert into t2 select * from t1;
+insert into t3 select * from t1;
+insert into t4 select * from t1;
 select count(*) from t1;
 select count(*) from t2;
+select count(*) from t3;
+select count(*) from t4;
 --sorted_result
 select * from t1 where a < 20;
 --sorted_result
@@ -93,32 +116,89 @@ select * from t2 where b = 30;
 select * from t2 where b = 40;
 select * from t2 where b = 50;
 
+--sorted_result
+select * from t3 where a < 20;
+--sorted_result
+select * from t3 where a = 20;
+--sorted_result
+select * from t3 where a = 30;
+--sorted_result
+select * from t3 where a = 40;
+--sorted_result
+select * from t3 where a = 50;
+select * from t3 where b = 20;
+select * from t3 where b = 30;
+select * from t3 where b = 40;
+select * from t3 where b = 50;
+
+--sorted_result
+select * from t4 where a < 20;
+--sorted_result
+select * from t4 where a = 20;
+--sorted_result
+select * from t4 where a = 30;
+--sorted_result
+select * from t4 where a = 40;
+--sorted_result
+select * from t4 where a = 50;
+select * from t4 where b = 20;
+select * from t4 where b = 30;
+select * from t4 where b = 40;
+select * from t4 where b = 50;
+
 alter online table t1 reorganize partition;
 alter online table t2 reorganize partition;
+--echo Cannot use normal reorganize partition on t3 as it has explicit MAX_ROWS set
+--error 1235
+alter online table t3 reorganize partition;
+show warnings;
+
+alter online table t3 max_rows=50000000;
+alter online table t4 reorganize partition;
 
 let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
 
 let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
 
+let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
+let $t4_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1);
+
 --disable_query_log
+--echo Check partitions added, expect 0 in all cases
 --echo partitions added to t1
 eval select $t1_part_count_now - $t1_part_count_start as t1_added;
 --echo partitions added to t2
 eval select $t2_part_count_now - $t2_part_count_start as t2_added;
+--echo partitions added to t3
+eval select $t3_part_count_now - $t3_part_count_start as t3_added;
+--echo partitions added to t4
+eval select $t4_part_count_now - $t4_part_count_start as t4_added;
 --enable_query_log
 
 alter online table t1 add partition partitions 1;
 alter online table t2 add partition partitions 4;
+alter online table t3 max_rows=100000000; # Expansion of max rows
+--error 1235
+alter online table t4 max_rows=100000000; # Attempted introduction of max rows - fails
 
 let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
 
 let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
 
+let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
+let $t4_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't4', Value, 1);
+
 --disable_query_log
---echo partitions added to t1
+--echo partitions added to t1 (expect 1)
 eval select $t1_part_count_now - $t1_part_count_start as t1_added;
---echo partitions added to t2
+--echo partitions added to t2 (expect 4)
 eval select $t2_part_count_now - $t2_part_count_start as t2_added;
+--echo partitions added to t3 (expect 2)
+eval select $t3_part_count_now - $t3_part_count_start as t3_added;
+--echo partitions added to t4 (expect 0)
+eval select $t4_part_count_now - $t4_part_count_start as t4_added;
 --enable_query_log
 
 # reorganize partition not support if not default partitioning
@@ -126,6 +206,14 @@ eval select $t2_part_count_now - $t2_par
 --error ER_REORG_NO_PARAM_ERROR
 alter online table t1 reorganize partition;
 
+# Following will fail as t3 has an explicit MAX_ROWS set 
+--error 1235 
+alter online table t3 reorganize partition;
+show warnings;
+
+# t4 reorg will succeed as t4 has no explicit MAX_ROWS
+alter online table t4 reorganize partition;
+
 select count(*) from t1;
 select count(*) from t2;
 --sorted_result
@@ -158,18 +246,55 @@ select * from t2 where b = 30;
 select * from t2 where b = 40;
 select * from t2 where b = 50;
 
+--sorted_result
+select * from t3 where a < 20;
+--sorted_result
+select * from t3 where a = 20;
+--sorted_result
+select * from t3 where a = 30;
+--sorted_result
+select * from t3 where a = 40;
+--sorted_result
+select * from t3 where a = 50;
+select * from t3 where b = 20;
+select * from t3 where b = 30;
+select * from t3 where b = 40;
+select * from t3 where b = 50;
+
+--sorted_result
+select * from t4 where a < 20;
+--sorted_result
+select * from t4 where a = 20;
+--sorted_result
+select * from t4 where a = 30;
+--sorted_result
+select * from t4 where a = 40;
+--sorted_result
+select * from t4 where a = 50;
+select * from t4 where b = 20;
+select * from t4 where b = 30;
+select * from t4 where b = 40;
+select * from t4 where b = 50;
+
+drop table t4;
+
 alter online table t1 add partition partitions 2;
 alter online table t2 add partition partitions 1;
+alter online table t3 max_rows=150000000;
 
 let $t1_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't1', Value, 1);
 
 let $t2_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't2', Value, 1);
 
+let $t3_part_count_now = query_get_value(select count(*) as Value from information_schema.partitions where table_schema = 'test' and table_name = 't3', Value, 1);
+
 --disable_query_log
---echo partitions added to t1
+--echo partitions added to t1 (expect 3)
 eval select $t1_part_count_now - $t1_part_count_start as t1_added;
---echo partitions added to t2
+--echo partitions added to t2 (expect 5)
 eval select $t2_part_count_now - $t2_part_count_start as t2_added;
+--echo partitions added to t3 (expect 4)
+eval select $t3_part_count_now - $t3_part_count_start as t3_added;
 --enable_query_log
 
 select count(*) from t1;
@@ -204,7 +329,22 @@ select * from t2 where b = 30;
 select * from t2 where b = 40;
 select * from t2 where b = 50;
 
-drop table t1,t2;
+--sorted_result
+select * from t3 where a < 20;
+--sorted_result
+select * from t3 where a = 20;
+--sorted_result
+select * from t3 where a = 30;
+--sorted_result
+select * from t3 where a = 40;
+--sorted_result
+select * from t3 where a = 50;
+select * from t3 where b = 20;
+select * from t3 where b = 30;
+select * from t3 where b = 40;
+select * from t3 where b = 50;
+
+drop table t1,t2,t3;
 alter tablespace ts1 drop datafile 'datafile.dat' engine = ndb;
 drop tablespace ts1 engine = ndb;
 drop logfile group lg1 engine = ndb;

=== modified file 'mysql-test/suite/ndb/t/ndb_addnode.test'
--- a/mysql-test/suite/ndb/t/ndb_addnode.test	2011-05-09 08:49:19 +0000
+++ b/mysql-test/suite/ndb/t/ndb_addnode.test	2012-03-21 17:28:20 +0000
@@ -20,9 +20,19 @@ CREATE TABLESPACE ts_1
 create table t1(id int NOT NULL PRIMARY KEY, data char(8)) engine=ndb;
 create table t2(id int NOT NULL PRIMARY KEY, data char(8))
 TABLESPACE ts_1 STORAGE DISK engine=ndb;
+# BUG#13714648
+create table t5(id int NOT NULL PRIMARY KEY, data char(8)) max_rows=50000000 engine=ndb;
 
 load data local infile 'suite/ndb/data/table_data10000.dat' into table t1 fields terminated by ' ' lines terminated by '\n';
 load data local infile 'suite/ndb/data/table_data10000.dat' into table t2 fields terminated by ' ' lines terminated by '\n';
+load data local infile 'suite/ndb/data/table_data10000.dat' into table t5 fields terminated by ' ' lines terminated by '\n';
+
+select count(1) as t1_part_count from information_schema.partitions where table_schema='test' and table_name='t1';
+select count(1) as t2_part_count from information_schema.partitions where table_schema='test' and table_name='t2';
+select @init_t5_part_count:= count(1) as t5_part_count from information_schema.partitions where table_schema='test' and table_name='t5';
+
+## Check details of t5 partitioning
+--exec $NDB_DESC -dtest -p -n t5
 
 ## Create nodegroup for "new" nodes
 --exec $NDB_MGM -e "create nodegroup 3,4"
@@ -48,6 +58,23 @@ insert into t4(id, data) VALUES
 
 alter online table t1 reorganize partition;
 alter online table t2 reorganize partition;
+alter online table t5 max_rows=100000000;
+
+select count(1) as t1_part_count from information_schema.partitions where table_schema='test' and table_name='t1';
+select count(1) as t2_part_count from information_schema.partitions where table_schema='test' and table_name='t2';
+select count(1) as t3_part_count from information_schema.partitions where table_schema='test' and table_name='t3';
+select count(1) as t4_part_count from information_schema.partitions where table_schema='test' and table_name='t4';
+select @reorg_t5_part_count:= count(1) as t5_part_count from information_schema.partitions where table_schema='test' and table_name='t5';
+
+## Check details of t5 partitioning
+--exec $NDB_DESC -dtest -p -n t5
+
+--let $t5_part_diff=query_get_value('select @reorg_t5_part_count-@init_t5_part_count as Value',Value,1)
+
+if (!$t5_part_diff)
+{
+  --die Table t5 was not reorganised
+}
 
 ## Drop nodegroup with "new" nodes is not allowed with data one those nodes
 # NOTE: --error=0 is due to return codes doesnt work on windoze
@@ -57,7 +84,7 @@ alter online table t2 reorganize partiti
 ## Nodegroup with "new" nodes still exist after dropping it as shown:
 --exec $NDB_MGM -e show
 
-drop table t1,t2,t3,t4;
+drop table t1,t2,t3,t4,t5;
 
 ## Drop nodegroup with "new" nodes
 --exec $NDB_MGM -e "drop nodegroup 1"

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2012-03-22 14:18:01 +0000
+++ b/sql/ha_ndbcluster.cc	2012-03-23 07:44:14 +0000
@@ -15580,7 +15580,9 @@ HA_ALTER_FLAGS supported_alter_operation
     HA_COLUMN_FORMAT |
     HA_ADD_PARTITION |
     HA_ALTER_TABLE_REORG |
-    HA_CHANGE_AUTOINCREMENT_VALUE;
+    HA_CHANGE_AUTOINCREMENT_VALUE | 
+    HA_ALTER_MAX_ROWS
+;
 }
 
 int ha_ndbcluster::check_if_supported_alter(TABLE *altered_table,
@@ -15651,7 +15653,8 @@ int ha_ndbcluster::check_if_supported_al
 
   if (alter_flags->is_set(HA_ADD_COLUMN) ||
       alter_flags->is_set(HA_ADD_PARTITION) ||
-      alter_flags->is_set(HA_ALTER_TABLE_REORG))
+      alter_flags->is_set(HA_ALTER_TABLE_REORG) ||
+      alter_flags->is_set(HA_ALTER_MAX_ROWS))
   {
      Ndb *ndb= get_ndb(thd);
      NDBDICT *dict= ndb->getDictionary();
@@ -15711,6 +15714,19 @@ int ha_ndbcluster::check_if_supported_al
 
      if (alter_flags->is_set(HA_ALTER_TABLE_REORG))
      {
+       /* 
+          Refuse if Max_rows has been used before...
+          Workaround is to use ALTER ONLINE TABLE <t> MAX_ROWS=<bigger>;
+       */
+       if (old_tab->getMaxRows() != 0)
+       {
+         push_warning(current_thd,
+                      MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
+                      "Cannot online REORGANIZE a table with Max_Rows set.  "
+                      "Use ALTER TABLE ... MAX_ROWS=<new_val> or offline REORGANIZE "
+                      "to redistribute this table.");
+         DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
+       }
        new_tab.setFragmentCount(0);
        new_tab.setFragmentData(0, 0);
      }
@@ -15719,6 +15735,27 @@ int ha_ndbcluster::check_if_supported_al
        DBUG_PRINT("info", ("Adding partition (%u)", part_info->num_parts));
        new_tab.setFragmentCount(part_info->num_parts);
      }
+     if (alter_flags->is_set(HA_ALTER_MAX_ROWS))
+     {
+       ulonglong rows= create_info->max_rows;
+       uint no_fragments= get_no_fragments(rows);
+       uint reported_frags= no_fragments;
+       if (adjusted_frag_count(ndb, no_fragments, reported_frags))
+       {
+         push_warning(current_thd,
+                      MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
+                      "Ndb might have problems storing the max amount "
+                      "of rows specified");
+       }
+       if (reported_frags < old_tab->getFragmentCount())
+       {
+         DBUG_PRINT("info", ("Online reduction in number of fragments not supported"));
+         DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
+       }
+       new_tab.setFragmentCount(reported_frags);
+       new_tab.setDefaultNoPartitionsFlag(false);
+       new_tab.setFragmentData(0, 0);
+     }
 
      NDB_Modifiers table_modifiers(ndb_table_modifiers);
      table_modifiers.parse(thd, "NDB_TABLE=", create_info->comment.str,
@@ -16037,7 +16074,9 @@ int ha_ndbcluster::alter_table_phase1(TH
      }
   }
 
-  if (alter_flags->is_set(HA_ALTER_TABLE_REORG) || alter_flags->is_set(HA_ADD_PARTITION))
+  if (alter_flags->is_set(HA_ALTER_TABLE_REORG) || 
+      alter_flags->is_set(HA_ADD_PARTITION) ||
+      alter_flags->is_set(HA_ALTER_MAX_ROWS))
   {
     if (alter_flags->is_set(HA_ALTER_TABLE_REORG))
     {
@@ -16049,7 +16088,29 @@ int ha_ndbcluster::alter_table_phase1(TH
       partition_info *part_info= altered_table->part_info;
       new_tab->setFragmentCount(part_info->num_parts);
     }
-
+    else if (alter_flags->is_set(HA_ALTER_MAX_ROWS))
+    {
+      ulonglong rows= create_info->max_rows;
+      uint no_fragments= get_no_fragments(rows);
+      uint reported_frags= no_fragments;
+      if (adjusted_frag_count(ndb, no_fragments, reported_frags))
+      {
+        DBUG_ASSERT(false); /* Checked above */
+      }
+      if (reported_frags < old_tab->getFragmentCount())
+      {
+        DBUG_ASSERT(false);
+        DBUG_RETURN(HA_ALTER_NOT_SUPPORTED);
+      }
+      /* Note we don't set the ndb table's max_rows param, as that 
+       * is considered a 'real' change
+       */
+      //new_tab->setMaxRows(create_info->max_rows);
+      new_tab->setFragmentCount(reported_frags);
+      new_tab->setDefaultNoPartitionsFlag(false);
+      new_tab->setFragmentData(0, 0);
+    }
+    
     int res= dict->prepareHashMap(*old_tab, *new_tab);
     if (res == -1)
     {

=== modified file 'sql/handler.h'
--- a/sql/handler.h	2012-03-22 14:18:01 +0000
+++ b/sql/handler.h	2012-03-23 07:44:14 +0000
@@ -58,7 +58,8 @@ class Alter_info;
 /* Bits to show what an alter table will do */
 #include <sql_bitmap.h>
 
-#define HA_MAX_ALTER_FLAGS 40
+#define HA_MAX_ALTER_FLAGS 41
+
 typedef Bitmap<HA_MAX_ALTER_FLAGS> HA_ALTER_FLAGS;
 
 #define HA_ADD_INDEX                  (0)
@@ -101,6 +102,7 @@ typedef Bitmap<HA_MAX_ALTER_FLAGS> HA_AL
 #define HA_ALTER_STORAGE_ENGINE       (37)
 #define HA_RECREATE                   (38)
 #define HA_ALTER_TABLE_REORG          (39)
+#define HA_ALTER_MAX_ROWS             (40)
 /* Remember to increase HA_MAX_ALTER_FLAGS when adding more flags! */
 
 /* Return values for check_if_supported_alter */

=== modified file 'sql/sql_table.cc'
--- a/sql/sql_table.cc	2012-02-14 08:00:53 +0000
+++ b/sql/sql_table.cc	2012-03-21 17:28:20 +0000
@@ -5079,6 +5079,8 @@ compare_tables(THD *thd,
       *alter_flags|= HA_SET_DEFAULT_CHARACTER_SET;
     if (alter_info->flags & ALTER_RECREATE)
       *alter_flags|= HA_RECREATE;
+    if (create_info->used_fields & HA_CREATE_USED_MAX_ROWS)
+      *alter_flags|= HA_ALTER_MAX_ROWS;
     /* TODO check for ADD/DROP FOREIGN KEY */
     if (alter_info->flags & ALTER_FOREIGN_KEY)
       *alter_flags|=  HA_ALTER_FOREIGN_KEY;

=== modified file 'storage/ndb/include/util/InputStream.hpp'
--- a/storage/ndb/include/util/InputStream.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/util/InputStream.hpp	2012-03-21 15:31:06 +0000
@@ -57,7 +57,7 @@ class SocketInputStream : public InputSt
   bool m_startover;
   bool m_timedout;
 public:
-  SocketInputStream(NDB_SOCKET_TYPE socket, unsigned read_timeout_ms = 60000);
+  SocketInputStream(NDB_SOCKET_TYPE socket, unsigned read_timeout_ms = 3000);
   virtual ~SocketInputStream() {}
   char* gets(char * buf, int bufLen);
   bool timedout() { return m_timedout; }

=== modified file 'storage/ndb/memcache/include/NdbInstance.h'
--- a/storage/ndb/memcache/include/NdbInstance.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/NdbInstance.h	2012-03-22 22:18:19 +0000
@@ -42,6 +42,7 @@ public:
   /* Public Methods */
   NdbInstance(Ndb_cluster_connection *, int);
   ~NdbInstance();
+  void non_blocking_close(NdbTransaction *);
   void link_workitem(workitem *);
   void unlink_workitem(workitem *);
 

=== modified file 'storage/ndb/memcache/include/QueryPlan.h'
--- a/storage/ndb/memcache/include/QueryPlan.h	2011-12-11 07:31:26 +0000
+++ b/storage/ndb/memcache/include/QueryPlan.h	2012-03-22 22:18:19 +0000
@@ -52,7 +52,7 @@ class QueryPlan {
   ~QueryPlan();
   bool canHaveExternalValue() const;
   bool shouldExternalizeValue(size_t length) const;
-  bool canUseSimpleRead() const;
+  bool canUseCommittedRead() const;
   Uint64 getAutoIncrement() const;
   void debug_dump() const;
   bool hasDataOnDisk() const;
@@ -103,7 +103,7 @@ inline bool QueryPlan::hasDataOnDisk() c
   return has_disk_storage;
 }
 
-inline bool QueryPlan::canUseSimpleRead() const {
+inline bool QueryPlan::canUseCommittedRead() const {
   return(pk_access && (! extern_store) && (! spec->exp_column));
 }
 

=== modified file 'storage/ndb/memcache/include/Queue.h'
--- a/storage/ndb/memcache/include/Queue.h	2011-09-12 10:05:07 +0000
+++ b/storage/ndb/memcache/include/Queue.h	2012-03-22 02:21:34 +0000
@@ -55,6 +55,7 @@ public:   /* public instance variable */
 public:   /* public interface methods */
   Queue(int maxnodes) {
     /* Initialize the node allocation pool */
+    maxnodes += 1;  // add one for a dummy node at the head.
     assert(sizeof(Node) < CACHE_LINE_SIZE);
     nodepool = (char *) calloc(maxnodes, CACHE_LINE_SIZE);
     nodelist = 0;

=== modified file 'storage/ndb/memcache/scripts/pmpstack.awk'
--- a/storage/ndb/memcache/scripts/pmpstack.awk	2011-11-07 21:54:07 +0000
+++ b/storage/ndb/memcache/scripts/pmpstack.awk	2012-03-22 22:18:19 +0000
@@ -41,23 +41,35 @@ function label(name)  {
 ### What is the thread doing? 
 ### Patterns higher up in this file take precedence over lower ones
 
-$2 ~ /^epoll_wait,TransporterRegistry::po/ { label("epoll_wait_transporter_recv"); next }
-                                             
-$2 ~ /^epoll_wait/                         { label("epoll_wait"); next }
+/epoll_wait,TransporterRegistry::po/       { label("epoll_wait_transporter_recv"); next }
+
+/writev,TCP_Transporter::doSend/           { label("sending_to_ndb"); next }
+
+/recv,TCP_Transporter/                     { label("tcp_recv_from_ndb"); next }
 
-$2 ~ /^writev,TCP_Transporter::doSend/     { label("sending_to_ndb"); next }
+/Ndb::closeTransaction/                    { label("ndb_transaction_close"); next } 
 
-$2 ~ /^recv,TCP_Transporter/               { label("tcp_recv_from_ndb"); next }
+/sendmsg,conn_mwrite/                      { label("writing_to_client"); next }
 
-$2 ~ /^sendmsg,conn_mwrite/                { label("sending_to_client"); next }
+/recv,conn_read,event_handler/             { label("reading_from_client"); next }
+
+/poll_dispatch/                            { label("poll_dispatch"); next } 
 
 /pthread_mutex_unlock/                     { label("releasing_locks"); next }
 
+/_lock,pthread_cond_/                      { label("getting_lock_for_condition_var"); next }
+
 /pthread_mutex_lock,Ndb::sendPrepared/     { label("lock_Ndb_impl"); next }
 
 /_mutex_lock/ && /TransporterFacade/       { label("lock_transporter_facade_mutex"); next }
 
 /pthread_cond_timedwait/ && /ollNdb/       { label("wait_poll_ndb"); next }
+
+/::schedule/ && /pthread_cond_signal/      { label("Scheduler_signaling_cond_var") ; next }
+
+/pthread_rwlock_rdlock/                    { label("acquiring_rwlock") ; next } 
+
+/pthread_cond_[a-z]*wait/                  { label("condition_variable_wait"); next }
                                         
 /workqueue_consumer_wait/                  { label("workqueue_idle_wait"); next }
 
@@ -67,6 +79,12 @@ $2 ~ /^sendmsg,conn_mwrite/             
 
 /Ndb::computeHash/                         { label("ndb_compute_hash"); next }
 
+/workitem__initialize/                     { label("workitem_initialize"); next } 
+
+/worker_prepare_operation/                 { label("worker_prepare_operation"); next } 
+                                             
+/^epoll_wait/                              { label("epoll_wait"); next }
+
 /sleep/                                    { label("sleep"); next }
 
 
@@ -74,26 +92,26 @@ $2 ~ /^sendmsg,conn_mwrite/             
 
 END { 
       for(i in event) if (i != "total")
-       printf("%s\t%.2f%%\t%s\n", 
+       printf("%s\t%.2f%% \t%s\n", 
               "Event", (event[i] / event["total"]) * 100, i)
       printf("\n");
 
       for(i in commit) if(i != "total")
-       printf("%s\t%.2f%%\t%s\n", 
+       printf("%s\t%.2f%% \t%s\n", 
               "Commit", (commit[i] / commit["total"]) * 100, i)
       if(commit["total"]) printf("\n");
 
       for(i in send) if(i != "total")
-       printf("%s\t%.2f%%\t%s\n", 
+       printf("%s\t%.2f%% \t%s\n", 
               "Send", (send[i] / send["total"]) * 100, i)
       if(send["total"]) printf("\n");
 
       for(i in poll) if(i != "total")
-       printf("%s\t%.2f%%\t%s\n", 
+       printf("%s\t%.2f%% \t%s\n", 
               "Poll", (poll[i] / poll["total"]) * 100, i)
       if(poll["total"]) printf("\n");
 
       for(i in x) if(i != "total")
-       printf("%s\t%.2f%%\t%s\n", 
+       printf("%s\t%.2f%% \t%s\n", 
               "Unidentified", (x[i] / x["total"]) * 100, i)
     }

=== modified file 'storage/ndb/memcache/src/NdbInstance.cc'
--- a/storage/ndb/memcache/src/NdbInstance.cc	2011-09-30 17:04:30 +0000
+++ b/storage/ndb/memcache/src/NdbInstance.cc	2012-03-22 22:18:19 +0000
@@ -48,3 +48,13 @@ NdbInstance::~NdbInstance() {
 }
 
 
+void NdbInstance::non_blocking_close(NdbTransaction *tx) {
+  Uint64 nwaits_pre, nwaits_post;
+  nwaits_pre = db->getClientStat(Ndb::WaitExecCompleteCount);
+
+  tx->close();
+
+  nwaits_post = db->getClientStat(Ndb::WaitExecCompleteCount);  
+  assert(nwaits_pre == nwaits_post);
+}
+

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2012-03-15 03:21:12 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2012-03-22 22:18:19 +0000
@@ -460,8 +460,8 @@ op_status_t WorkerStep1::do_read() {
   
   NdbOperation::LockMode lockmode;
   NdbTransaction::ExecType commitflag;
-  if(plan->canUseSimpleRead()) {
-    lockmode = NdbOperation::LM_SimpleRead;
+  if(plan->canUseCommittedRead()) {
+    lockmode = NdbOperation::LM_CommittedRead;
     commitflag = NdbTransaction::Commit;
   }
   else {

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-03-22 19:24:32 +0000
@@ -538,7 +538,7 @@ S::WorkerConnection::WorkerConnection(Sc
 
   /* Build the freelist */
   freelist = 0;
-  int my_ndb_inst = conn->nInst / global->options.n_worker_threads;
+  int my_ndb_inst = conn->nInst / conn->n_workers;
   for(int j = 0 ; j < my_ndb_inst ; j++ ) {
     NdbInstance *inst = new NdbInstance(conn->conn, 2);
     inst->id = ((id.thd + 1) * 10000) + j + 1; 

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2011-12-10 19:02:03 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2012-03-21 17:28:20 +0000
@@ -74,7 +74,7 @@ Transporter::Transporter(TransporterRegi
   checksumUsed    = _checksum;
   signalIdUsed    = _signalId;
 
-  m_timeOutMillis = 30000;
+  m_timeOutMillis = 3000;
 
   m_connect_address.s_addr= 0;
   if(s_port<0)

=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt	2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt	2012-03-21 12:31:02 +0000
@@ -78,3 +78,9 @@ FOREACH(testprog CountingPool DynArr256)
   TARGET_LINK_LIBRARIES(${testprog}-t ndbtest ndbkernel ndbsched ndberror
                 ndbtransport ndbmgmcommon ndbmgmapi ndbportlib ndbgeneral)
 ENDFOREACH(testprog)
+
+IF(NDB_BUILD_NDBMTD)
+  ADD_EXECUTABLE(mt-send-t mt-send-t.cpp)
+  TARGET_LINK_LIBRARIES(mt-send-t ndbtest ndbkernel ndbsched ndberror
+                 ndbtransport ndbmgmcommon ndbmgmapi ndbportlib ndbgeneral)
+ENDIF()

=== added file 'storage/ndb/src/kernel/vm/mt-lock.hpp'
--- a/storage/ndb/src/kernel/vm/mt-lock.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-lock.hpp	2012-03-20 13:23:29 +0000
@@ -0,0 +1,160 @@
+/* Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
+
+#ifndef MT_LOCK_HPP
+#define MT_LOCK_HPP
+
+#include <ndb_global.h>
+#include "mt-asm.h"
+#include <NdbMutex.h>
+
+struct mt_lock_stat
+{
+  const void * m_ptr;
+  char * m_name;
+  Uint32 m_contended_count;
+  Uint32 m_spin_count;
+};
+
+static void register_lock(const void * ptr, const char * name);
+static mt_lock_stat * lookup_lock(const void * ptr);
+
+#ifdef NDB_HAVE_XCNG
+template <unsigned SZ>
+struct thr_spin_lock
+{
+  thr_spin_lock(const char * name = 0)
+  {
+    m_lock = 0;
+    register_lock(this, name);
+  }
+
+  union {
+    volatile Uint32 m_lock;
+    char pad[SZ];
+  };
+};
+
+static
+ATTRIBUTE_NOINLINE
+void
+lock_slow(void * sl, volatile unsigned * val)
+{
+  mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
+
+loop:
+  Uint32 spins = 0;
+  do {
+    spins++;
+    cpu_pause();
+  } while (* val == 1);
+
+  if (unlikely(xcng(val, 1) != 0))
+    goto loop;
+
+  if (s)
+  {
+    s->m_spin_count += spins;
+    Uint32 count = ++s->m_contended_count;
+    Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
+
+    if ((count % freq) == 0)
+      printf("%s waiting for lock, contentions: %u spins: %u\n",
+             s->m_name, count, s->m_spin_count);
+  }
+}
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_spin_lock<SZ>* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+  if (likely(xcng(val, 1) == 0))
+    return;
+
+  lock_slow(sl, val);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_spin_lock<SZ>* sl)
+{
+  /**
+   * Memory barrier here, to make sure all of our stores are visible before
+   * the lock release is.
+   */
+  mb();
+  sl->m_lock = 0;
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_spin_lock<SZ>* sl)
+{
+  volatile unsigned* val = &sl->m_lock;
+  return xcng(val, 1);
+}
+#else
+#define thr_spin_lock thr_mutex
+#endif
+
+template <unsigned SZ>
+struct thr_mutex
+{
+  thr_mutex(const char * name = 0) {
+    NdbMutex_Init(&m_mutex);
+    register_lock(this, name);
+  }
+
+  union {
+    NdbMutex m_mutex;
+    char pad[SZ];
+  };
+};
+
+template <unsigned SZ>
+static
+inline
+void
+lock(struct thr_mutex<SZ>* sl)
+{
+  NdbMutex_Lock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+void
+unlock(struct thr_mutex<SZ>* sl)
+{
+  NdbMutex_Unlock(&sl->m_mutex);
+}
+
+template <unsigned SZ>
+static
+inline
+int
+trylock(struct thr_mutex<SZ> * sl)
+{
+  return NdbMutex_Trylock(&sl->m_mutex);
+}
+
+#endif

=== added file 'storage/ndb/src/kernel/vm/mt-send-t.cpp'
--- a/storage/ndb/src/kernel/vm/mt-send-t.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/mt-send-t.cpp	2012-03-21 14:07:51 +0000
@@ -0,0 +1,554 @@
+#include "mt-asm.h"
+#include "mt-lock.hpp"
+#include <NdbTick.h>
+#include <NdbMutex.h>
+#include <NdbThread.h>
+#include <NdbCondition.h>
+#include <NdbTap.hpp>
+#include <Bitmask.hpp>
+#include <ndb_rand.h>
+
+#define BUGGY_VERSION 0
+
+/**
+ * DO_SYSCALL inside critical section
+ *  (the equivalent of writev(socket)
+ */
+#define DO_SYSCALL 1
+
+/**
+ * This is a unit test of the send code for mt.cpp
+ *   specifically the code that manages which thread will send
+ *   (write gathering)
+ *
+ * Each thread is a producer of Signals
+ * Each signal has a destination remote node (transporter)
+ * Each thread will after having produced a set of signals
+ *   check if it should send them on socket.
+ *   If it decides that it should, it consumes all the signals
+ *   produced by all threads.
+ *
+ * In this unit test, we don't send signals, but the producing part
+ *   will only be to increment a counter.
+ *
+ ******************************************************************
+ *
+ * To use this program seriously...
+ *
+ *   you should set BUGGY_VERSION to 1
+ *   and experiment with values on cnt_*
+ *   until you find a variant which crashes (abort)
+ *
+ * The values compiled-in makes it crash on a single socket
+ *   Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz release compiled!
+ *   (i never managed to get it debug compiled)
+ */
+#define MAX_THREADS 256
+#define MAX_TRANSPORTERS 256
+
+/**
+ * global variables
+ */
+static unsigned cnt_threads = 64;
+static unsigned cnt_transporters = 8;
+
+/**
+ * outer loops
+ *   start/stop threads
+ */
+static unsigned cnt_seconds = 180;
+
+/**
+ * no of signals produced before calling consume
+ */
+static unsigned cnt_signals_before_consume = 4;
+
+/**
+ * no of signals produced in one inner loop
+ */
+static unsigned cnt_signals_per_inner_loop = 4;
+
+/**
+ * no inner loops per outer loop
+ *
+ *   after each inner loop
+ *   threads will be stalled and result verified
+ */
+static unsigned cnt_inner_loops = 5000;
+
+/**
+ * pct of do_send that are using forceSend()
+ */
+static unsigned pct_force = 15;
+
+typedef Bitmask<(MAX_TRANSPORTERS+31)/32> TransporterMask;
+
+struct Producer
+{
+  Producer() {
+    bzero(val, sizeof(val));
+    pendingcount = 0;
+  }
+
+  void init() {}
+
+  /**
+   * values produced...
+   */
+  unsigned val[MAX_TRANSPORTERS];
+
+  /**
+   * mask/array to keep track of which transporter we have produce values to
+   */
+  TransporterMask pendingmask;
+  unsigned pendingcount;
+  unsigned char pendinglist[MAX_TRANSPORTERS];
+
+  /**
+   * produce a value
+   *
+   * This is the equivalent of mt_send_remote()
+   */
+  void produce(unsigned D);
+
+  /**
+   * consume values (from all threads)
+   *   for transporters that we have produced a value to
+   *
+   * This is the equivalent to do_send and if force=true
+   *   this is the equivalent of forceSend()
+   */
+  void consume(bool force);
+};
+
+struct Thread
+{
+  Thread() { thread= 0; }
+
+  void init() { p.init(); }
+
+  NdbThread * thread;
+  Producer p;
+};
+
+/**
+ * This is the consumer of values for *one* transporter
+ */
+struct Consumer
+{
+  Consumer() {
+    m_force_send = 0; bzero(val, sizeof(val));
+  }
+
+  void init() {}
+
+  struct thr_spin_lock<8> m_send_lock;
+  unsigned m_force_send;
+  unsigned val[MAX_THREADS];
+
+  /**
+   * consume values from all threads to this transporter
+   */
+  void consume(unsigned D);
+
+  /**
+   * force_consume
+   */
+  void forceConsume(unsigned D);
+};
+
+struct Consumer_pad
+{
+  Consumer c;
+  char pad[NDB_CL_PADSZ(sizeof(Consumer))];
+};
+
+struct Thread_pad
+{
+  Thread t;
+  char pad[NDB_CL_PADSZ(sizeof(Thread))];
+};
+
+/**
+ * Thread repository
+ *   and an instance of it
+ */
+static
+struct Rep
+{
+  Thread_pad t[MAX_THREADS];
+  Consumer_pad c[MAX_TRANSPORTERS];
+
+  /**
+   * This menthod is called when all threads are stalled
+   *   so it's safe to read values without locks
+   */
+  void validate() {
+    for (unsigned ic = 0; ic < cnt_transporters; ic++)
+    {
+      for (unsigned it = 0; it < cnt_threads; it++)
+      {
+        if (c[ic].c.val[it] != t[it].t.p.val[ic])
+        {
+          printf("Detected bug!!!\n");
+          printf("ic: %u it: %u c[ic].c.val[it]: %u t[it].t.p.val[ic]: %u\n",
+                 ic, it, c[ic].c.val[it], t[it].t.p.val[ic]);
+          abort();
+        }
+      }
+    }
+  }
+
+  void init() {
+    for (unsigned i = 0; i < cnt_threads; i++)
+      t[i].t.init();
+
+    for (unsigned i = 0; i < cnt_transporters; i++)
+      c[i].c.init();
+  }
+} rep;
+
+static
+struct Test
+{
+  Test() {
+    waiting_start = 0;
+    waiting_stop = 0;
+    mutex = 0;
+    cond = 0;
+  }
+
+  void init() {
+    mutex = NdbMutex_Create();
+    cond = NdbCondition_Create();
+  }
+
+  unsigned waiting_start;
+  unsigned waiting_stop;
+  NdbMutex* mutex;
+  NdbCondition* cond;
+
+  void wait_started();
+  void wait_completed();
+} test;
+
+void*
+thread_main(void * _t)
+{
+  unsigned seed =
+    (unsigned)NdbTick_CurrentNanosecond() +
+    (unsigned)(unsigned long long)_t;
+
+  Thread * self = (Thread*) _t;
+  for (unsigned i = 0; i < cnt_inner_loops; i++)
+  {
+    test.wait_started();
+    for (unsigned j = 0; j < cnt_signals_per_inner_loop;)
+    {
+      for (unsigned k = 0; k < cnt_signals_before_consume; k++)
+      {
+        /**
+         * Produce a signal to destination D
+         */
+        unsigned D = unsigned(ndb_rand_r(&seed)) % cnt_transporters;
+        self->p.produce(D);
+      }
+
+      j += cnt_signals_before_consume;
+
+      /**
+       * This is the equivalent of do_send()
+       */
+      bool force = unsigned(ndb_rand_r(&seed) % 100) < pct_force;
+      self->p.consume(force);
+    }
+    test.wait_completed();
+  }
+  return 0;
+}
+
+static
+bool
+match(const char * arg, const char * val, unsigned * valptr)
+{
+  if (strncmp(arg, val, strlen(val)) == 0)
+  {
+    * valptr = atoi(arg + strlen(val));
+    return true;
+  }
+  return false;
+}
+
+int
+main(int argc, char ** argv)
+{
+  plan(1);
+  ndb_init();
+  test.init();
+  rep.init();
+
+  if (argc == 1)
+  {
+    printf("No arguments supplied...\n"
+           "assuming we're being run from MTR or similar.\n"
+           "decreasing loop counts to ridiculously small values...\n");
+    cnt_seconds = 10;
+    cnt_inner_loops = 3000;
+    cnt_threads = 4;
+  }
+  else
+  {
+    printf("Arguments supplied...\n");
+    for (int i = 1; i < argc; i++)
+    {
+      if (match(argv[i], "cnt_seconds=", &cnt_seconds))
+        continue;
+      else if (match(argv[i], "cnt_threads=", &cnt_threads))
+        continue;
+      else if (match(argv[i], "cnt_transporters=", &cnt_transporters))
+        continue;
+      else if (match(argv[i], "cnt_inner_loops=", &cnt_inner_loops))
+        continue;
+      else if (match(argv[i], "cnt_signals_before_consume=",
+                     &cnt_signals_before_consume))
+        continue;
+      else if (match(argv[i], "cnt_signals_per_inner_loop=",
+                     &cnt_signals_per_inner_loop))
+        continue;
+      else if (match(argv[i], "pct_force=",
+                     &pct_force))
+        continue;
+      else
+      {
+        printf("ignoreing unknown argument: %s\n", argv[i]);
+      }
+    }
+  }
+
+  printf("%s"
+         " cnt_seconds=%u"
+         " cnt_threads=%u"
+         " cnt_transporters=%u"
+         " cnt_inner_loops=%u"
+         " cnt_signals_before_consume=%u"
+         " cnt_signals_per_inner_loop=%u"
+         " pct_force=%u"
+         "\n",
+         argv[0],
+         cnt_seconds,
+         cnt_threads,
+         cnt_transporters,
+         cnt_inner_loops,
+         cnt_signals_before_consume,
+         cnt_signals_per_inner_loop,
+         pct_force);
+
+  Uint32 loop = 0;
+  Uint64 start = NdbTick_CurrentMillisecond() / 1000;
+  while (start + cnt_seconds > (NdbTick_CurrentMillisecond() / 1000))
+  {
+    printf("%u ", loop++); fflush(stdout);
+    if ((loop < 100 && (loop % 25) == 0) ||
+        (loop >= 100 && (loop % 20) == 0))
+      printf("\n");
+
+    for (unsigned t = 0; t < cnt_threads; t++)
+    {
+      rep.t[t].t.thread = NdbThread_Create(thread_main,
+                                           (void**)&rep.t[t].t,
+                                           1024*1024,
+                                           "execute thread",
+                                           NDB_THREAD_PRIO_MEAN);
+    }
+
+    for (unsigned t = 0; t < cnt_threads; t++)
+    {
+      void * ret;
+      NdbThread_WaitFor(rep.t[t].t.thread, &ret);
+    }
+  }
+  printf("\n"); fflush(stdout);
+
+  ok(true, "ok");
+  return 0;
+}
+
+inline
+void
+Producer::produce(unsigned D)
+{
+  if (!pendingmask.get(D))
+  {
+    pendingmask.set(D);
+    pendinglist[pendingcount] = D;
+    pendingcount++;
+  }
+  val[D]++;
+}
+
+inline
+void
+Producer::consume(bool force)
+{
+  unsigned count = pendingcount;
+  pendingmask.clear();
+  pendingcount = 0;
+
+  for (unsigned i = 0; i < count; i++)
+  {
+    unsigned D = pendinglist[i];
+    if (force)
+      rep.c[D].c.forceConsume(D);
+    else
+      rep.c[D].c.consume(D);
+  }
+}
+
+inline
+void
+Consumer::consume(unsigned D)
+{
+  /**
+   * This is the equivalent of do_send(must_send = 1)
+   */
+  m_force_send = 1;
+
+  do
+  {
+    if (trylock(&m_send_lock) != 0)
+    {
+      /* Other thread will send for us as we set m_force_send. */
+      return;
+    }
+
+    /**
+     * Now clear the flag, and start sending all data available to this node.
+     *
+     * Put a memory barrier here, so that if another thread tries to grab
+     * the send lock but fails due to us holding it here, we either
+     * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
+     * 2) We clear here the flag just set by the other thread, but then we
+     * will (thanks to mb()) be able to see and send all of the data already
+     * in the first send iteration.
+     */
+    m_force_send = 0;
+    mb();
+
+    /**
+     * This is the equivalent of link_thread_send_buffers
+     */
+    for (unsigned i = 0; i < cnt_threads; i++)
+    {
+      val[i] = rep.t[i].t.p.val[D];
+    }
+
+    /**
+     * Do a syscall...which could have affect on barriers...etc
+     */
+    if (DO_SYSCALL)
+    {
+      NdbTick_CurrentMillisecond();
+    }
+
+    unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+    mb();
+#endif
+  }
+  while (m_force_send != 0);
+}
+
+inline
+void
+Consumer::forceConsume(unsigned D)
+{
+  /**
+   * This is the equivalent of forceSend()
+   */
+
+  do
+  {
+    /**
+     * NOTE: since we unconditionally lock m_send_lock
+     *   we don't need a mb() after the clearing of m_force_send here.
+     */
+    m_force_send = 0;
+
+    lock(&m_send_lock);
+
+    /**
+     * This is the equivalent of link_thread_send_buffers
+     */
+    for (unsigned i = 0; i < cnt_threads; i++)
+    {
+      val[i] = rep.t[i].t.p.val[D];
+    }
+
+    /**
+     * Do a syscall...which could have affect on barriers...etc
+     */
+    if (DO_SYSCALL)
+    {
+      NdbTick_CurrentMillisecond();
+    }
+
+    unlock(&m_send_lock);
+
+#if BUGGY_VERSION
+#else
+    mb();
+#endif
+  }
+  while (m_force_send != 0);
+}
+
+void
+Test::wait_started()
+{
+  NdbMutex_Lock(mutex);
+  if (waiting_start + 1 == cnt_threads)
+  {
+    waiting_stop = 0;
+  }
+  waiting_start++;
+  assert(waiting_start <= cnt_threads);
+  while (waiting_start < cnt_threads)
+    NdbCondition_Wait(cond, mutex);
+
+  NdbCondition_Broadcast(cond);
+  NdbMutex_Unlock(mutex);
+}
+
+void
+Test::wait_completed()
+{
+  NdbMutex_Lock(mutex);
+  if (waiting_stop + 1 == cnt_threads)
+  {
+    rep.validate();
+    waiting_start = 0;
+  }
+  waiting_stop++;
+  assert(waiting_stop <= cnt_threads);
+  while (waiting_stop < cnt_threads)
+    NdbCondition_Wait(cond, mutex);
+
+  NdbCondition_Broadcast(cond);
+  NdbMutex_Unlock(mutex);
+}
+
+static
+void
+register_lock(const void * ptr, const char * name)
+{
+  return;
+}
+
+static
+mt_lock_stat *
+lookup_lock(const void * ptr)
+{
+  return 0;
+}

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	2012-02-23 15:41:31 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp	2012-03-21 12:31:02 +0000
@@ -36,6 +36,7 @@
 #include <portlib/ndb_prefetch.h>
 
 #include "mt-asm.h"
+#include "mt-lock.hpp"
 
 inline
 SimulatedBlock*
@@ -96,16 +97,6 @@ static Uint32 first_receiver_thread_no =
 /* max signal is 32 words, 7 for signal header and 25 datawords */
 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
 
-struct mt_lock_stat
-{
-  const void * m_ptr;
-  char * m_name;
-  Uint32 m_contended_count;
-  Uint32 m_spin_count;
-};
-static void register_lock(const void * ptr, const char * name);
-static mt_lock_stat * lookup_lock(const void * ptr);
-
 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
 #define USE_FUTEX
 #endif
@@ -274,132 +265,6 @@ wakeup(struct thr_wait* wait)
 
 #endif
 
-#ifdef NDB_HAVE_XCNG
-template <unsigned SZ>
-struct thr_spin_lock
-{
-  thr_spin_lock(const char * name = 0)
-  {
-    m_lock = 0;
-    register_lock(this, name);
-  }
-
-  union {
-    volatile Uint32 m_lock;
-    char pad[SZ];
-  };
-};
-
-static
-ATTRIBUTE_NOINLINE
-void
-lock_slow(void * sl, volatile unsigned * val)
-{
-  mt_lock_stat* s = lookup_lock(sl); // lookup before owning lock
-
-loop:
-  Uint32 spins = 0;
-  do {
-    spins++;
-    cpu_pause();
-  } while (* val == 1);
-
-  if (unlikely(xcng(val, 1) != 0))
-    goto loop;
-
-  if (s)
-  {
-    s->m_spin_count += spins;
-    Uint32 count = ++s->m_contended_count;
-    Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
-
-    if ((count % freq) == 0)
-      printf("%s waiting for lock, contentions: %u spins: %u\n",
-             s->m_name, count, s->m_spin_count);
-  }
-}
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_spin_lock<SZ>* sl)
-{
-  volatile unsigned* val = &sl->m_lock;
-  if (likely(xcng(val, 1) == 0))
-    return;
-
-  lock_slow(sl, val);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_spin_lock<SZ>* sl)
-{
-  /**
-   * Memory barrier here, to make sure all of our stores are visible before
-   * the lock release is.
-   */
-  mb();
-  sl->m_lock = 0;
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_spin_lock<SZ>* sl)
-{
-  volatile unsigned* val = &sl->m_lock;
-  return xcng(val, 1);
-}
-#else
-#define thr_spin_lock thr_mutex
-#endif
-
-template <unsigned SZ>
-struct thr_mutex
-{
-  thr_mutex(const char * name = 0) {
-    NdbMutex_Init(&m_mutex);
-    register_lock(this, name);
-  }
-
-  union {
-    NdbMutex m_mutex;
-    char pad[SZ];
-  };
-};
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_mutex<SZ>* sl)
-{
-  NdbMutex_Lock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_mutex<SZ>* sl)
-{
-  NdbMutex_Unlock(&sl->m_mutex);
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_mutex<SZ> * sl)
-{
-  return NdbMutex_Trylock(&sl->m_mutex);
-}
-
 /**
  * thr_safe_pool
  */
@@ -2719,6 +2584,13 @@ pack_send_buffer(thr_data *selfptr, Uint
    * After having locked/unlock m_send_lock
    *   "protocol" dictates that we must check the m_force_send
    */
+
+  /**
+   * We need a memory barrier here to prevent race between clearing lock
+   *   and reading of m_force_send.
+   *   CPU can reorder the load to before the clear of the lock
+   */
+  mb();
   if (sb->m_force_send)
   {
     try_send(selfptr, node);
@@ -2787,7 +2659,13 @@ mt_send_handle::forceSend(NodeId nodeId)
 
   do
   {
+    /**
+     * NOTE: we don't need a memory barrier after clearing
+     *       m_force_send here as we unconditionally lock m_send_lock
+     *       hence there is no way that our data can be "unsent"
+     */
     sb->m_force_send = 0;
+
     lock(&sb->m_send_lock);
     sb->m_send_thread = selfptr->m_thr_no;
     globalTransporterRegistry.performSend(nodeId);
@@ -2799,6 +2677,12 @@ mt_send_handle::forceSend(NodeId nodeId)
      */
     selfptr->m_send_buffer_pool.release_global(rep->m_mm,
                                                RG_TRANSPORTER_BUFFERS);
+    /**
+     * We need a memory barrier here to prevent race between clearing lock
+     *   and reading of m_force_send.
+     *   CPU can reorder the load to before the clear of the lock
+     */
+    mb();
   } while (sb->m_force_send);
 
   return true;
@@ -2821,6 +2705,16 @@ try_send(thr_data * selfptr, Uint32 node
       return;
     }
 
+    /**
+     * Now clear the flag, and start sending all data available to this node.
+     *
+     * Put a memory barrier here, so that if another thread tries to grab
+     * the send lock but fails due to us holding it here, we either
+     * 1) Will see m_force_send[nodeId] set to 1 at the end of the loop, or
+     * 2) We clear here the flag just set by the other thread, but then we
+     * will (thanks to mb()) be able to see and send all of the data already
+     * in the first send iteration.
+     */
     sb->m_force_send = 0;
     mb();
 
@@ -2834,6 +2728,13 @@ try_send(thr_data * selfptr, Uint32 node
      */
     selfptr->m_send_buffer_pool.release_global(rep->m_mm,
                                                RG_TRANSPORTER_BUFFERS);
+
+    /**
+     * We need a memory barrier here to prevent race between clearing lock
+     *   and reading of m_force_send.
+     *   CPU can reorder the load to before the clear of the lock
+     */
+    mb();
   } while (sb->m_force_send);
 }
 
@@ -2966,6 +2867,13 @@ do_send(struct thr_data* selfptr, bool m
       {
         register_pending_send(selfptr, node);
       }
+
+      /**
+       * We need a memory barrier here to prevent race between clearing lock
+       *   and reading of m_force_send.
+       *   CPU can reorder the load to before the clear of the lock
+       */
+      mb();
       if (sb->m_force_send)
       {
         /**

=== modified file 'storage/ndb/test/run-test/conf-blade08.cnf'
--- a/storage/ndb/test/run-test/conf-blade08.cnf	2011-05-19 17:47:28 +0000
+++ b/storage/ndb/test/run-test/conf-blade08.cnf	2012-03-21 14:43:07 +0000
@@ -26,6 +26,7 @@ FragmentLogFileSize = 64M
 CompressedLCP=1
 CompressedBackup=1
 ODirect=1
+MaxNoOfAttributes=2000
 
 SharedGlobalMemory=256M
 InitialLogfileGroup=undo_buffer_size=64M;undofile01.dat:256M;undofile02.dat:128M

=== modified file 'storage/ndb/test/run-test/conf-ndb07.cnf'
--- a/storage/ndb/test/run-test/conf-ndb07.cnf	2011-05-19 18:19:47 +0000
+++ b/storage/ndb/test/run-test/conf-ndb07.cnf	2012-03-21 14:43:07 +0000
@@ -31,6 +31,7 @@ MaxNoOfSavedMessages= 5
 NoOfFragmentLogFiles = 8
 FragmentLogFileSize = 64M
 ODirect=1
+MaxNoOfAttributes=2000
 
 SharedGlobalMemory=256M
 DiskPageBufferMemory=256M

=== modified file 'storage/ndb/test/run-test/conf-tyr13.cnf'
--- a/storage/ndb/test/run-test/conf-tyr13.cnf	2011-12-09 15:11:21 +0000
+++ b/storage/ndb/test/run-test/conf-tyr13.cnf	2012-03-21 14:43:07 +0000
@@ -34,6 +34,7 @@ FragmentLogFileSize = 64M
 ODirect=1
 MaxNoOfExecutionThreads=8
 SendBufferMemory=4M
+MaxNoOfAttributes=2000
 
 SharedGlobalMemory=256M
 DiskPageBufferMemory=256M

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2012-02-23 15:41:31 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2012-03-21 17:28:20 +0000
@@ -946,7 +946,7 @@ max-time: 500
 cmd: testNdbApi
 args: -n Bug44065
 
-max-time: 500
+max-time: 1000
 cmd: testNdbApi
 args: -n Bug44065_org
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2-spj branch (ole.john.aske:3856 to 3857) Ole John Aske23 Mar