List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:August 17 2011 10:46am
Subject:bzr push into mysql-5.5-cluster branch (jonas.oreland:3435 to 3436)
View as plain text  
 3436 Jonas Oreland	2011-08-17 [merge]
      ndb - merge 71 into 5.5-cluster

    removed:
      mysql-test/suite/ndb/r/ndb_statistics.result
      mysql-test/suite/ndb/t/ndb_statistics.test
    added:
      mysql-test/suite/ndb/r/ndb_index_stat.result
      mysql-test/suite/ndb/r/ndb_statistics0.result
      mysql-test/suite/ndb/r/ndb_statistics1.result
      mysql-test/suite/ndb/t/ndb_index_stat.test
      mysql-test/suite/ndb/t/ndb_index_stat_enable.inc
      mysql-test/suite/ndb/t/ndb_statistics.inc
      mysql-test/suite/ndb/t/ndb_statistics0.test
      mysql-test/suite/ndb/t/ndb_statistics1.test
    modified:
      mysql-test/r/group_by.result
      mysql-test/suite/ndb/r/ndb_restore_misc.result
      mysql-test/suite/ndb/t/ndb_restore_misc.test
      mysql-test/t/group_by.test
      sql/ha_ndb_index_stat.cc
      sql/ha_ndb_index_stat.h
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/sql_select.cc
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java
      storage/ndb/clusterj/clusterj-jdbc/src/main/antlr3/com/mysql/clusterj/jdbc/antlr/MySQL51Lexer.g
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPABrokerFactory.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAConfigurationImpl.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainFieldHandlerImpl.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAStoreManager.java
      storage/ndb/include/ndb_constants.h
      storage/ndb/include/ndbapi/NdbIndexStat.hpp
      storage/ndb/include/util/NdbPack.hpp
      storage/ndb/src/common/util/NdbPack.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
      storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
      storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
      storage/ndb/src/ndbapi/NdbIndexStat.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp
      storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/test/ndbapi/testIndexStat.cpp
      storage/ndb/tools/ndb_index_stat.cpp
 3435 jonas oreland	2011-07-31
      ndb - fix out-of-source-build for java stuff (jtie/clusterj)

    modified:
      storage/ndb/clusterj/CMakeLists.txt
      storage/ndb/clusterj/clusterj-api/CMakeLists.txt
      storage/ndb/clusterj/clusterj-core/CMakeLists.txt
      storage/ndb/clusterj/clusterj-jpatest/CMakeLists.txt
      storage/ndb/clusterj/clusterj-openjpa/CMakeLists.txt
      storage/ndb/clusterj/clusterj-tie/CMakeLists.txt
      storage/ndb/src/ndbjtie/jtie/test/myjapi/CMakeLists.txt
      storage/ndb/src/ndbjtie/jtie/test/unload/CMakeLists.txt
      storage/ndb/src/ndbjtie/test/CMakeLists.txt
=== modified file 'mysql-test/r/group_by.result'
--- a/mysql-test/r/group_by.result	2011-02-18 10:55:24 +0000
+++ b/mysql-test/r/group_by.result	2011-08-17 10:36:01 +0000
@@ -1857,6 +1857,21 @@ COUNT(*)
 2
 DROP TABLE t1;
 #
+# Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+#
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk 
+GROUP BY v1.pk;
+pk
+DROP VIEW v1;
+DROP TABLE t1,t2;
+# End of Bug#12798270
+#
 # Bug#59839: Aggregation followed by subquery yields wrong result
 #
 CREATE TABLE t1 (

=== added file 'mysql-test/suite/ndb/r/ndb_index_stat.result'
--- a/mysql-test/suite/ndb/r/ndb_index_stat.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_index_stat.result	2011-08-17 10:36:01 +0000
@@ -0,0 +1,498 @@
+DROP TABLE IF EXISTS t1, t2;
+set @is_enable_default = @@global.ndb_index_stat_enable;
+set @is_enable = 1;
+set @is_enable = NULL;
+# is_enable_on=1 is_enable_off=0
+# ndb_index_stat_enable - before
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+set @@global.ndb_index_stat_enable = 1;
+set @@local.ndb_index_stat_enable = 1;
+# ndb_index_stat_enable - after
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+show global variables like 'ndb_index_stat_option';
+Variable_name	Value
+ndb_index_stat_option	loop_checkon=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=32,check_delay=1m,delete_batch=8,clean_delay=0,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+set @save_option = @@global.ndb_index_stat_option;
+set @@global.ndb_index_stat_option = 'loop_idle=3333,cache_limit=44M';
+set @@global.ndb_index_stat_option = 'cache_lowpct=85,evict_delay=55';
+set @@global.ndb_index_stat_option = 'check_delay=234s';
+show global variables like 'ndb_index_stat_option';
+Variable_name	Value
+ndb_index_stat_option	loop_checkon=1000ms,loop_idle=3333ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=32,check_delay=234s,delete_batch=8,clean_delay=0,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=55s,cache_limit=44M,cache_lowpct=85
+set @@global.ndb_index_stat_option = @save_option;
+show global variables like 'ndb_index_stat_option';
+Variable_name	Value
+ndb_index_stat_option	loop_checkon=1000ms,loop_idle=1000ms,loop_busy=100ms,update_batch=1,read_batch=4,idle_batch=32,check_batch=32,check_delay=1m,delete_batch=8,clean_delay=0,error_batch=4,error_delay=1m,evict_batch=8,evict_delay=1m,cache_limit=32M,cache_lowpct=90
+create table t1 (
+a1 int unsigned not null,
+b1 int unsigned not null,
+c1 int unsigned not null,
+primary key (a1),
+index b1x (b1),
+index c1x (c1)
+) engine=ndb;
+create table t2 (
+a2 int unsigned not null,
+b2 int unsigned not null,
+c2 int unsigned not null,
+primary key (a2),
+index b2x (b2),
+index c2x (c2)
+) engine=ndb;
+analyze table t1, t2;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+test.t2	analyze	status	OK
+# must use b1x
+explain select * from t1
+where b1 = 5 and c1 = 5;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ref	b1x,c1x	b1x	4	const	#	Using where with pushed condition
+# must use c2x
+explain select * from t2
+where b2 = 5 and c2 = 5;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t2	ref	b2x,c2x	c2x	4	const	#	Using where with pushed condition
+# must use b1x, c2x
+explain select * from t1, t2
+where c1 = c2 and b1 = 5 and b2 = 5;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ref	b1x,c1x	b1x	4	const	#	Parent of 2 pushed join@1
+1	SIMPLE	t2	ref	b2x,c2x	c2x	4	test.t1.c1	#	Child of 't1' in pushed join@1; Using where with pushed condition
+# must use c2x, b1x
+explain select * from t1, t2
+where b1 = b2 and c1 = 5 and c2 = 5;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t2	ref	b2x,c2x	c2x	4	const	#	Parent of 2 pushed join@1
+1	SIMPLE	t1	ref	b1x,c1x	b1x	4	test.t2.b2	#	Child of 't2' in pushed join@1; Using where with pushed condition
+# must use t1, c2x
+explain select * from t1, t2
+where c1 = c2;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ALL	c1x	NULL	NULL	NULL	#	Parent of 2 pushed join@1
+1	SIMPLE	t2	ref	c2x	c2x	4	test.t1.c1	#	Child of 't1' in pushed join@1
+# must use t2, b1x
+explain select * from t1, t2
+where b1 = b2;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t2	ALL	b2x	NULL	NULL	NULL	#	Parent of 2 pushed join@1
+1	SIMPLE	t1	ref	b1x	b1x	4	test.t2.b2	#	Child of 't2' in pushed join@1
+# should NOT say: Using index for group-by
+explain select distinct (a1) from t1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	ALL	NULL	NULL	NULL	NULL	#	
+# must say: Using index for group by
+explain select distinct (b1) from t1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	index	NULL	b1x	4	NULL	#	
+# must say: Using index for group by
+explain select distinct (c1) from t1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t1	index	NULL	c1x	4	NULL	#	
+drop table t1, t2;
+create table t1 (a int, b int, c varchar(10) not null,
+primary key using hash (a), index(b,c)) engine=ndb;
+insert into t1 values
+(1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
+(4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
+(7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+set @@local.ndb_index_stat_enable = 0;
+select count(*) from t1 where b < 10;
+count(*)
+0
+select count(*) from t1 where b >= 10 and c >= 'bbb';
+count(*)
+6
+select count(*) from t1 where b > 10;
+count(*)
+6
+select count(*) from t1 where b <= 20 and c < 'ccc';
+count(*)
+4
+select count(*) from t1 where b = 20 and c = 'ccc';
+count(*)
+1
+select count(*) from t1 where b > 20;
+count(*)
+3
+select count(*) from t1 where b = 30 and c > 'aaa';
+count(*)
+2
+select count(*) from t1 where b <= 20;
+count(*)
+6
+select count(*) from t1 where b >= 20 and c > 'aaa';
+count(*)
+4
+set @@local.ndb_index_stat_enable = 1;
+select count(*) from t1 where b < 10;
+count(*)
+0
+select count(*) from t1 where b >= 10 and c >= 'bbb';
+count(*)
+6
+select count(*) from t1 where b > 10;
+count(*)
+6
+select count(*) from t1 where b <= 20 and c < 'ccc';
+count(*)
+4
+select count(*) from t1 where b = 20 and c = 'ccc';
+count(*)
+1
+select count(*) from t1 where b > 20;
+count(*)
+3
+select count(*) from t1 where b = 30 and c > 'aaa';
+count(*)
+2
+select count(*) from t1 where b <= 20;
+count(*)
+6
+select count(*) from t1 where b >= 20 and c > 'aaa';
+count(*)
+4
+set @@local.ndb_index_stat_enable = 0;
+select count(*) from t1 where b < 10;
+count(*)
+0
+select count(*) from t1 where b >= 10 and c >= 'bbb';
+count(*)
+6
+select count(*) from t1 where b > 10;
+count(*)
+6
+select count(*) from t1 where b <= 20 and c < 'ccc';
+count(*)
+4
+select count(*) from t1 where b = 20 and c = 'ccc';
+count(*)
+1
+select count(*) from t1 where b > 20;
+count(*)
+3
+select count(*) from t1 where b = 30 and c > 'aaa';
+count(*)
+2
+select count(*) from t1 where b <= 20;
+count(*)
+6
+select count(*) from t1 where b >= 20 and c > 'aaa';
+count(*)
+4
+set @@local.ndb_index_stat_enable = 1;
+select count(*) from t1 where b < 10;
+count(*)
+0
+select count(*) from t1 where b >= 10 and c >= 'bbb';
+count(*)
+6
+select count(*) from t1 where b > 10;
+count(*)
+6
+select count(*) from t1 where b <= 20 and c < 'ccc';
+count(*)
+4
+select count(*) from t1 where b = 20 and c = 'ccc';
+count(*)
+1
+select count(*) from t1 where b > 20;
+count(*)
+3
+select count(*) from t1 where b = 30 and c > 'aaa';
+count(*)
+2
+select count(*) from t1 where b <= 20;
+count(*)
+6
+select count(*) from t1 where b >= 20 and c > 'aaa';
+count(*)
+4
+drop table t1;
+create table t1 (a int, b int, primary key using hash (a), index x1 (b))
+engine=ndb;
+insert into t1 values (1,11),(2,22),(3,33);
+select * from t1 order by a;
+a	b
+1	11
+2	22
+3	33
+select * from t1 order by a;
+a	b
+1	11
+2	22
+3	33
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+drop table t1;
+create table t1 (
+pk int not null,
+a tinyint not null,
+b tinyint unsigned not null,
+c smallint not null,
+d smallint unsigned not null,
+e mediumint not null,
+f mediumint unsigned not null,
+g int not null,
+h int unsigned not null,
+i bigint not null,
+j bigint unsigned not null,
+k float not null,
+l double not null,
+m decimal not null,
+n decimal unsigned not null,
+primary key using hash (pk),
+index (a),
+index (b),
+index (c),
+index (d),
+index (e),
+index (f),
+index (g),
+index (h),
+index (i),
+index (j),
+index (k),
+index (l),
+index (m),
+index (n)
+) engine=ndb;
+insert into t1 values
+(1,11,11,11,11,11,11,11,11,11,11,11,11,11,11),
+(2,22,22,22,22,22,22,22,22,22,22,22,22,22,22),
+(3,33,33,33,33,33,33,33,33,33,33,33,33,33,33);
+select count(*) from t1 where a > 22;
+count(*)
+1
+select count(*) from t1 where b > 22;
+count(*)
+1
+select count(*) from t1 where c > 22;
+count(*)
+1
+select count(*) from t1 where d > 22;
+count(*)
+1
+select count(*) from t1 where e > 22;
+count(*)
+1
+select count(*) from t1 where f > 22;
+count(*)
+1
+select count(*) from t1 where g > 22;
+count(*)
+1
+select count(*) from t1 where h > 22;
+count(*)
+1
+select count(*) from t1 where i > 22;
+count(*)
+1
+select count(*) from t1 where j > 22;
+count(*)
+1
+select count(*) from t1 where k > 22;
+count(*)
+1
+select count(*) from t1 where l > 22;
+count(*)
+1
+select count(*) from t1 where m > 22;
+count(*)
+1
+select count(*) from t1 where n > 22;
+count(*)
+1
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+select count(*) from t1 where a > 22;
+count(*)
+1
+select count(*) from t1 where b > 22;
+count(*)
+1
+select count(*) from t1 where c > 22;
+count(*)
+1
+select count(*) from t1 where d > 22;
+count(*)
+1
+select count(*) from t1 where e > 22;
+count(*)
+1
+select count(*) from t1 where f > 22;
+count(*)
+1
+select count(*) from t1 where g > 22;
+count(*)
+1
+select count(*) from t1 where h > 22;
+count(*)
+1
+select count(*) from t1 where i > 22;
+count(*)
+1
+select count(*) from t1 where j > 22;
+count(*)
+1
+select count(*) from t1 where k > 22;
+count(*)
+1
+select count(*) from t1 where l > 22;
+count(*)
+1
+select count(*) from t1 where m > 22;
+count(*)
+1
+select count(*) from t1 where n > 22;
+count(*)
+1
+drop table t1;
+create table t1 (
+pk int not null,
+a datetime not null,
+b date not null,
+c year not null,
+d time not null,
+e timestamp not null,
+primary key using hash (pk),
+index (a),
+index (b),
+index (c),
+index (d),
+index (e)
+) engine=ndb;
+insert into t1 values
+(1,'1971-01-01 01:01:01','1971-01-01','1971','01:01:01','1971-01-01 01:01:01'),
+(2,'1972-02-02 02:02:02','1972-02-02','1972','02:02:02','1972-02-02 02:02:02'),
+(3,'1973-03-03 03:03:03','1973-03-03','1973','03:03:03','1973-03-03 03:03:03');
+select count(*) from t1 where a > '1972-02-02 02:02:02';
+count(*)
+1
+select count(*) from t1 where b > '1972-02-02';
+count(*)
+1
+select count(*) from t1 where c > '1972';
+count(*)
+1
+select count(*) from t1 where d > '02:02:02';
+count(*)
+1
+select count(*) from t1 where e > '1972-02-02 02:02:02';
+count(*)
+1
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+select count(*) from t1 where a > '1972-02-02 02:02:02';
+count(*)
+1
+select count(*) from t1 where b > '1972-02-02';
+count(*)
+1
+select count(*) from t1 where c > '1972';
+count(*)
+1
+select count(*) from t1 where d > '02:02:02';
+count(*)
+1
+select count(*) from t1 where e > '1972-02-02 02:02:02';
+count(*)
+1
+drop table t1;
+create table t1 (
+pk int not null,
+a char(10) not null,
+b varchar(10) not null,
+c varchar(1000) not null,
+d binary(10) not null,
+e varbinary(10) not null,
+f varbinary(1000) not null,
+primary key using hash (pk),
+index (a),
+index (b),
+index (c),
+index (d),
+index (e),
+index (f)
+) engine=ndb;
+insert into t1 values
+('1','111','111','111','111','111','111'),
+('2','222','222','222','222','222','222'),
+('3','333','333','333','333','333','333');
+select count(*) from t1 where a > '222';
+count(*)
+1
+select count(*) from t1 where b > '222';
+count(*)
+1
+select count(*) from t1 where c > '222';
+count(*)
+1
+select count(*) from t1 where d > '222';
+count(*)
+2
+select count(*) from t1 where e > '222';
+count(*)
+1
+select count(*) from t1 where f > '222';
+count(*)
+1
+analyze table t1;
+Table	Op	Msg_type	Msg_text
+test.t1	analyze	status	OK
+select count(*) from t1 where a > '222';
+count(*)
+1
+select count(*) from t1 where b > '222';
+count(*)
+1
+select count(*) from t1 where c > '222';
+count(*)
+1
+select count(*) from t1 where d > '222';
+count(*)
+2
+select count(*) from t1 where e > '222';
+count(*)
+1
+select count(*) from t1 where f > '222';
+count(*)
+1
+drop table t1;
+set @is_enable = @is_enable_default;
+set @is_enable = NULL;
+# is_enable_on=0 is_enable_off=1
+# ndb_index_stat_enable - before
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+set @@local.ndb_index_stat_enable = 0;
+set @@global.ndb_index_stat_enable = 0;
+drop table mysql.ndb_index_stat_sample;
+drop table mysql.ndb_index_stat_head;
+# ndb_index_stat_enable - after
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF

=== modified file 'mysql-test/suite/ndb/r/ndb_restore_misc.result'
--- a/mysql-test/suite/ndb/r/ndb_restore_misc.result	2011-06-07 13:47:21 +0000
+++ b/mysql-test/suite/ndb/r/ndb_restore_misc.result	2011-08-17 10:36:01 +0000
@@ -623,8 +623,12 @@ name VARCHAR(255)
 
 ndb_show_tables completed.....
 
+select id into @tmp1 from ndb_show_tables_results
+where name like '%ndb_index_stat_sample_x1%';
 select * from ndb_show_tables_results
 where type like '%Index%'
+and name not like '%ndb_index_stat_sample_x1%'
+and name not like concat('%NDB$INDEX_',@tmp1,'_CUSTOM%')
 order by 1,2,3,4,5,6,7;
 id	type	state	logging	_database	_schema	name
 drop table ndb_show_tables_results;

=== removed file 'mysql-test/suite/ndb/r/ndb_statistics.result'
--- a/mysql-test/suite/ndb/r/ndb_statistics.result	2011-03-17 13:54:30 +0000
+++ b/mysql-test/suite/ndb/r/ndb_statistics.result	1970-01-01 00:00:00 +0000
@@ -1,161 +0,0 @@
-drop table if exists t1, t2, t3, t4;
-CREATE TABLE t10(
-K INT NOT NULL AUTO_INCREMENT,
-I INT, J INT,
-PRIMARY KEY(K),
-KEY(I,J),
-UNIQUE KEY(J,K)
-) ENGINE=ndbcluster;
-INSERT INTO t10(I,J) VALUES (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(0,0);
-CREATE TABLE t100 LIKE t10;
-INSERT INTO t100(I,J)
-SELECT X.J, X.J+(10*Y.J) FROM t10 AS X,t10 AS Y;
-CREATE TABLE t10000 LIKE t10;
-INSERT INTO t10000(I,J)
-SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
-WHERE X.J<50;
-INSERT INTO t10000(I,J)
-SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
-WHERE X.J>=50;
-ANALYZE TABLE t10,t100,t10000;
-Table	Op	Msg_type	Msg_text
-test.t10	analyze	status	OK
-test.t100	analyze	status	OK
-test.t10000	analyze	status	OK
-SELECT COUNT(*) FROM t10;
-COUNT(*)
-10
-SELECT COUNT(*) FROM t100;
-COUNT(*)
-100
-SELECT COUNT(*) FROM t10000;
-COUNT(*)
-10000
-EXPLAIN
-SELECT * FROM t10000 WHERE k = 42;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	const	PRIMARY	PRIMARY	4	const	1	
-EXPLAIN
-SELECT * FROM t10000 WHERE k >= 42 and k < 10000;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	500	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	500	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE k < 42;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE k > 42;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 AS X JOIN t10000 AS Y
-ON Y.I=X.I AND Y.J = X.I;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	X	ALL	I	NULL	NULL	NULL	10000	Parent of 2 pushed join@1
-1	SIMPLE	Y	ref	J,I	I	10	test.X.I,test.X.I	11	Child of 'X' in pushed join@1; Using where
-EXPLAIN
-SELECT * FROM t100 WHERE k < 42;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	10	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t100 WHERE k > 42;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	10	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE k < 42;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE k > 42;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t100 WHERE k BETWEEN 42 AND 10000;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	5	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	500	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	I	I	5	const	200	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	J	J	5	const	100	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J = 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	J,I	I	10	const,const	4	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	I	I	5	const	200	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J > 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	J,I	I	10	NULL	100	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J < 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	J,I	I	10	NULL	50	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J BETWEEN 1 AND 10;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	J,I	I	10	NULL	50	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J = 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	J,I	I	10	const,const	4	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	J	J	5	const	100	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K > 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY,J	J	9	NULL	50	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K < 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY,J	J	9	NULL	50	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	PRIMARY,J	J	9	NULL	25	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K = 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	const	PRIMARY,J	PRIMARY	4	const	1	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J <> 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	J,I	I	10	NULL	150	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I <> 0 AND J = 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	J,I	J	5	const	100	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE I <> 0 AND J <> 1;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	J,I	J	5	NULL	1500	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J <> 1 AND I = 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	J,I	I	10	NULL	150	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 1 AND I <> 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	ref	J,I	J	5	const	100	Using where with pushed condition
-EXPLAIN
-SELECT * FROM t10000 WHERE J <> 1 AND I <> 0;
-id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
-1	SIMPLE	t10000	range	J,I	J	5	NULL	1500	Using where with pushed condition
-DROP TABLE t10,t100,t10000;
-End of 5.1 tests

=== added file 'mysql-test/suite/ndb/r/ndb_statistics0.result'
--- a/mysql-test/suite/ndb/r/ndb_statistics0.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_statistics0.result	2011-08-17 10:36:01 +0000
@@ -0,0 +1,197 @@
+set @is_enable_default = @@global.ndb_index_stat_enable;
+set @is_enable = 0;
+set @is_enable = NULL;
+# is_enable_on=0 is_enable_off=0
+# ndb_index_stat_enable - before
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+# ndb_index_stat_enable - after
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+drop table if exists t1, t2, t3, t4;
+CREATE TABLE t10(
+K INT NOT NULL AUTO_INCREMENT,
+I INT, J INT,
+PRIMARY KEY(K),
+KEY(I,J),
+UNIQUE KEY(J,K)
+) ENGINE=ndbcluster
+partition by key (K) partitions 1;
+INSERT INTO t10(I,J) VALUES (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(0,0);
+CREATE TABLE t100 LIKE t10;
+INSERT INTO t100(I,J)
+SELECT X.J, X.J+(10*Y.J) FROM t10 AS X,t10 AS Y;
+CREATE TABLE t10000 LIKE t10;
+INSERT INTO t10000(I,J)
+SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
+WHERE X.J<50;
+INSERT INTO t10000(I,J)
+SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
+WHERE X.J>=50;
+ANALYZE TABLE t10,t100,t10000;
+Table	Op	Msg_type	Msg_text
+test.t10	analyze	status	OK
+test.t100	analyze	status	OK
+test.t10000	analyze	status	OK
+SELECT COUNT(*) FROM t10;
+COUNT(*)
+10
+SELECT COUNT(*) FROM t100;
+COUNT(*)
+100
+SELECT COUNT(*) FROM t10000;
+COUNT(*)
+10000
+EXPLAIN
+SELECT * FROM t10000 WHERE k = 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	const	PRIMARY	PRIMARY	4	const	1	
+EXPLAIN
+SELECT * FROM t10000 WHERE k >= 42 and k < 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	500	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	500	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k < 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k > 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 AS X JOIN t10000 AS Y
+ON Y.I=X.I AND Y.J = X.I;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	X	ALL	I	NULL	NULL	NULL	10000	Parent of 2 pushed join@1
+1	SIMPLE	Y	ref	J,I	I	10	test.X.I,test.X.I	11	Child of 'X' in pushed join@1; Using where
+EXPLAIN
+SELECT * FROM t100 WHERE k < 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	10	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t100 WHERE k > 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	10	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k < 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k > 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	1000	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t100 WHERE k BETWEEN 42 AND 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	5	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	500	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	I	I	5	const	200	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J	J	5	const	100	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	I	10	const,const	4	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	I	I	5	const	200	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J > 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	100	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J < 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	50	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J BETWEEN 1 AND 10;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	50	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J = 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	I	10	const,const	4	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J	J	5	const	100	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K > 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY,J	J	9	NULL	50	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K < 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY,J	J	9	NULL	50	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY,J	J	9	NULL	25	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K = 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	const	PRIMARY,J	PRIMARY	4	const	1	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J <> 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	150	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I <> 0 AND J = 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	J	5	const	100	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I <> 0 AND J <> 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	J	5	NULL	1500	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J <> 1 AND I = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	150	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 1 AND I <> 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	J	5	const	100	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J <> 1 AND I <> 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	J	5	NULL	1500	Using where with pushed condition
+DROP TABLE t10,t100,t10000;
+End of 5.1 tests
+set @is_enable = @is_enable_default;
+set @is_enable = NULL;
+# is_enable_on=0 is_enable_off=0
+# ndb_index_stat_enable - before
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+# ndb_index_stat_enable - after
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF

=== added file 'mysql-test/suite/ndb/r/ndb_statistics1.result'
--- a/mysql-test/suite/ndb/r/ndb_statistics1.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_statistics1.result	2011-08-17 10:36:01 +0000
@@ -0,0 +1,203 @@
+set @is_enable_default = @@global.ndb_index_stat_enable;
+set @is_enable = 1;
+set @is_enable = NULL;
+# is_enable_on=1 is_enable_off=0
+# ndb_index_stat_enable - before
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+set @@global.ndb_index_stat_enable = 1;
+set @@local.ndb_index_stat_enable = 1;
+# ndb_index_stat_enable - after
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+drop table if exists t1, t2, t3, t4;
+CREATE TABLE t10(
+K INT NOT NULL AUTO_INCREMENT,
+I INT, J INT,
+PRIMARY KEY(K),
+KEY(I,J),
+UNIQUE KEY(J,K)
+) ENGINE=ndbcluster
+partition by key (K) partitions 1;
+INSERT INTO t10(I,J) VALUES (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(0,0);
+CREATE TABLE t100 LIKE t10;
+INSERT INTO t100(I,J)
+SELECT X.J, X.J+(10*Y.J) FROM t10 AS X,t10 AS Y;
+CREATE TABLE t10000 LIKE t10;
+INSERT INTO t10000(I,J)
+SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
+WHERE X.J<50;
+INSERT INTO t10000(I,J)
+SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
+WHERE X.J>=50;
+ANALYZE TABLE t10,t100,t10000;
+Table	Op	Msg_type	Msg_text
+test.t10	analyze	status	OK
+test.t100	analyze	status	OK
+test.t10000	analyze	status	OK
+SELECT COUNT(*) FROM t10;
+COUNT(*)
+10
+SELECT COUNT(*) FROM t100;
+COUNT(*)
+100
+SELECT COUNT(*) FROM t10000;
+COUNT(*)
+10000
+EXPLAIN
+SELECT * FROM t10000 WHERE k = 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	const	PRIMARY	PRIMARY	4	const	1	
+EXPLAIN
+SELECT * FROM t10000 WHERE k >= 42 and k < 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k < 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	42	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k > 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	9958	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 AS X JOIN t10000 AS Y
+ON Y.I=X.I AND Y.J = X.I;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	X	ALL	I	NULL	NULL	NULL	10000	Parent of 2 pushed join@1
+1	SIMPLE	Y	ref	J,I	I	10	test.X.I,test.X.I	1	Child of 'X' in pushed join@1; Using where
+EXPLAIN
+SELECT * FROM t100 WHERE k < 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	42	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t100 WHERE k > 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	58	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k < 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	42	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k > 42;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	9958	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t100 WHERE k BETWEEN 42 AND 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t100	range	PRIMARY	PRIMARY	4	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY	PRIMARY	4	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	I	I	5	const	100	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J	J	5	const	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	J	5	const	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	I	I	5	const	100	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J > 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	99	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J < 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	J	5	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J BETWEEN 1 AND 10;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	J	5	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J = 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	J	5	const	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J	J	5	const	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K > 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY,J	J	9	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K < 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY,J	PRIMARY	4	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	PRIMARY,J	PRIMARY	4	NULL	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K = 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	const	PRIMARY,J	PRIMARY	4	const	1	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J <> 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	101	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I <> 0 AND J = 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	J	5	const	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE I <> 0 AND J <> 1;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	5	NULL	9902	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J <> 1 AND I = 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	10	NULL	101	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 1 AND I <> 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	ref	J,I	J	5	const	2	Using where with pushed condition
+EXPLAIN
+SELECT * FROM t10000 WHERE J <> 1 AND I <> 0;
+id	select_type	table	type	possible_keys	key	key_len	ref	rows	Extra
+1	SIMPLE	t10000	range	J,I	I	5	NULL	9902	Using where with pushed condition
+DROP TABLE t10,t100,t10000;
+End of 5.1 tests
+set @is_enable = @is_enable_default;
+set @is_enable = NULL;
+# is_enable_on=0 is_enable_off=1
+# ndb_index_stat_enable - before
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	ON
+set @@local.ndb_index_stat_enable = 0;
+set @@global.ndb_index_stat_enable = 0;
+drop table mysql.ndb_index_stat_sample;
+drop table mysql.ndb_index_stat_head;
+# ndb_index_stat_enable - after
+show global variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF
+show local variables like 'ndb_index_stat_enable';
+Variable_name	Value
+ndb_index_stat_enable	OFF

=== added file 'mysql-test/suite/ndb/t/ndb_index_stat.test'
--- a/mysql-test/suite/ndb/t/ndb_index_stat.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat.test	2011-07-23 14:35:37 +0000
@@ -0,0 +1,313 @@
+-- source include/have_ndb.inc
+
+# Notes on index stats in *.test.
+#
+# Most tables here have few table rows.  Index stats are not very
+# useful in such cases but the optimizer seems to use them anyway.
+# One reason may be that nested joins is only join method.
+#
+# In real production index stats are computed daily or weekly.
+# But tests here must compute them at once if "explain" is used.
+# Thus: insert (or other dml) - analyze table - explain.
+#
+# Index stats are approximate since only one replica was scanned
+# and values are interpolated from samples.  MTR however should be
+# deterministic.  If not use --replace column 9 # (rows).
+
+--disable_warnings
+DROP TABLE IF EXISTS t1, t2;
+--enable_warnings
+
+set @is_enable_default = @@global.ndb_index_stat_enable;
+
+set @is_enable = 1;
+source ndb_index_stat_enable.inc;
+
+# test changing suboptions
+show global variables like 'ndb_index_stat_option';
+set @save_option = @@global.ndb_index_stat_option;
+# some options
+set @@global.ndb_index_stat_option = 'loop_idle=3333,cache_limit=44M';
+set @@global.ndb_index_stat_option = 'cache_lowpct=85,evict_delay=55';
+set @@global.ndb_index_stat_option = 'check_delay=234s';
+show global variables like 'ndb_index_stat_option';
+set @@global.ndb_index_stat_option = @save_option;
+show global variables like 'ndb_index_stat_option';
+
+# TEST: main
+create table t1 (
+  a1 int unsigned not null,
+  b1 int unsigned not null,
+  c1 int unsigned not null,
+  primary key (a1),
+  index b1x (b1),
+  index c1x (c1)
+) engine=ndb;
+
+create table t2 (
+  a2 int unsigned not null,
+  b2 int unsigned not null,
+  c2 int unsigned not null,
+  primary key (a2),
+  index b2x (b2),
+  index c2x (c2)
+) engine=ndb;
+
+# enough rows to make index stats more approximate
+
+--disable_query_log
+let $i = 1000;
+while ($i)
+{
+  dec $i;
+  eval insert into t1 values ($i, $i % 100, $i % 10);
+}
+let $i = 1000;
+while ($i)
+{
+  dec $i;
+  eval insert into t2 values ($i, $i % 10, $i % 100);
+}
+--enable_query_log
+analyze table t1, t2;
+
+# TEST: key equal constant
+
+--echo # must use b1x
+--replace_column 9 #
+explain select * from t1
+  where b1 = 5 and c1 = 5;
+
+--echo # must use c2x
+--replace_column 9 #
+explain select * from t2
+  where b2 = 5 and c2 = 5;
+
+# TEST: keys equal constant in join
+
+--echo # must use b1x, c2x
+--replace_column 9 #
+explain select * from t1, t2
+  where c1 = c2 and b1 = 5 and b2 = 5;
+
+--echo # must use c2x, b1x
+--replace_column 9 #
+explain select * from t1, t2
+  where b1 = b2 and c1 = 5 and c2 = 5;
+
+# TEST: join via keys of different selectivity
+
+--echo # must use t1, c2x
+--replace_column 9 #
+explain select * from t1, t2
+  where c1 = c2;
+--echo # must use t2, b1x
+--replace_column 9 #
+explain select * from t1, t2
+  where b1 = b2;
+
+# TEST: bug#44760 quick distinct
+# QUICK_GROUP_MIN_MAX_SELECT says "Using index for group-by".
+# Should happen only for low cardinality index.
+# wl4124_todo: result is wrong until HA_KEYREAD_ONLY is set
+
+--echo # should NOT say: Using index for group-by
+--replace_column 9 #
+explain select distinct (a1) from t1;
+
+--echo # must say: Using index for group by
+--replace_column 9 #
+explain select distinct (b1) from t1;
+
+--echo # must say: Using index for group by
+--replace_column 9 #
+explain select distinct (c1) from t1;
+
+# TEST: end
+drop table t1, t2;
+
+# turn index stats OFF in client (falls back on other methods)
+# code snippet moved from ndb_index_ordered.test
+
+create table t1 (a int, b int, c varchar(10) not null,
+  primary key using hash (a), index(b,c)) engine=ndb;
+insert into t1 values
+  (1,10,'aaa'),(2,10,'bbb'),(3,10,'ccc'),
+  (4,20,'aaa'),(5,20,'bbb'),(6,20,'ccc'),
+  (7,30,'aaa'),(8,30,'bbb'),(9,30,'ccc');
+analyze table t1;
+let $is_loop = 4;
+while ($is_loop)
+{
+  # 4-OFF 3-ON 2-OFF 1-ON
+  let $is_enable = `select ($is_loop=3 or $is_loop=1)`;
+  dec $is_loop;
+  eval set @@local.ndb_index_stat_enable = $is_enable;
+
+select count(*) from t1 where b < 10;
+select count(*) from t1 where b >= 10 and c >= 'bbb';
+select count(*) from t1 where b > 10;
+select count(*) from t1 where b <= 20 and c < 'ccc';
+select count(*) from t1 where b = 20 and c = 'ccc';
+select count(*) from t1 where b > 20;
+select count(*) from t1 where b = 30 and c > 'aaa';
+select count(*) from t1 where b <= 20;
+select count(*) from t1 where b >= 20 and c > 'aaa';
+}
+drop table t1;
+
+# bug#XXXXX
+# autocreate=false,enable=1 is now acceptable
+# following gives warning while "no stats" is counted as error
+create table t1 (a int, b int, primary key using hash (a), index x1 (b))
+engine=ndb;
+insert into t1 values (1,11),(2,22),(3,33);
+# make_join_statistics() -> info() -> ndb_index_stat_set_rpk()
+# error 4715 - no stats
+select * from t1 order by a;
+# error 9003 suppressed - previous recent error
+select * from t1 order by a;
+# analyze clears previous error at once
+analyze table t1;
+drop table t1;
+
+# bug#XXXXX
+# wrong byte size from some types to NdbPack
+# before error fixes causes stats to be ignored silently (error 4716)
+# best seen with debug and export NDB_PACK_ABORT_ON_ERROR=1
+# affected types: mediumint datetime date time timestamp
+
+create table t1 (
+  pk int not null,
+  a tinyint not null,
+  b tinyint unsigned not null,
+  c smallint not null,
+  d smallint unsigned not null,
+  e mediumint not null,
+  f mediumint unsigned not null,
+  g int not null,
+  h int unsigned not null,
+  i bigint not null,
+  j bigint unsigned not null,
+  k float not null,
+  l double not null,
+  m decimal not null,
+  n decimal unsigned not null,
+  primary key using hash (pk),
+  index (a),
+  index (b),
+  index (c),
+  index (d),
+  index (e),
+  index (f),
+  index (g),
+  index (h),
+  index (i),
+  index (j),
+  index (k),
+  index (l),
+  index (m),
+  index (n)
+) engine=ndb;
+insert into t1 values
+(1,11,11,11,11,11,11,11,11,11,11,11,11,11,11),
+(2,22,22,22,22,22,22,22,22,22,22,22,22,22,22),
+(3,33,33,33,33,33,33,33,33,33,33,33,33,33,33);
+let $i = 2;
+while ($i)
+{
+  dec $i;
+  if (!$i)
+  {
+    eval analyze table t1;
+  }
+  eval select count(*) from t1 where a > 22;
+  eval select count(*) from t1 where b > 22;
+  eval select count(*) from t1 where c > 22;
+  eval select count(*) from t1 where d > 22;
+  eval select count(*) from t1 where e > 22;
+  eval select count(*) from t1 where f > 22;
+  eval select count(*) from t1 where g > 22;
+  eval select count(*) from t1 where h > 22;
+  eval select count(*) from t1 where i > 22;
+  eval select count(*) from t1 where j > 22;
+  eval select count(*) from t1 where k > 22;
+  eval select count(*) from t1 where l > 22;
+  eval select count(*) from t1 where m > 22;
+  eval select count(*) from t1 where n > 22;
+}
+drop table t1;
+
+create table t1 (
+  pk int not null,
+  a datetime not null,
+  b date not null,
+  c year not null,
+  d time not null,
+  e timestamp not null,
+  primary key using hash (pk),
+  index (a),
+  index (b),
+  index (c),
+  index (d),
+  index (e)
+) engine=ndb;
+insert into t1 values
+(1,'1971-01-01 01:01:01','1971-01-01','1971','01:01:01','1971-01-01 01:01:01'),
+(2,'1972-02-02 02:02:02','1972-02-02','1972','02:02:02','1972-02-02 02:02:02'),
+(3,'1973-03-03 03:03:03','1973-03-03','1973','03:03:03','1973-03-03 03:03:03');
+let $i = 2;
+while ($i)
+{
+  dec $i;
+  if (!$i)
+  {
+    eval analyze table t1;
+  }
+  eval select count(*) from t1 where a > '1972-02-02 02:02:02';
+  eval select count(*) from t1 where b > '1972-02-02';
+  eval select count(*) from t1 where c > '1972';
+  eval select count(*) from t1 where d > '02:02:02';
+  eval select count(*) from t1 where e > '1972-02-02 02:02:02';
+}
+drop table t1;
+
+create table t1 (
+  pk int not null,
+  a char(10) not null,
+  b varchar(10) not null,
+  c varchar(1000) not null,
+  d binary(10) not null,
+  e varbinary(10) not null,
+  f varbinary(1000) not null,
+  primary key using hash (pk),
+  index (a),
+  index (b),
+  index (c),
+  index (d),
+  index (e),
+  index (f)
+) engine=ndb;
+insert into t1 values
+('1','111','111','111','111','111','111'),
+('2','222','222','222','222','222','222'),
+('3','333','333','333','333','333','333');
+let $i = 2;
+while ($i)
+{
+  dec $i;
+  if (!$i)
+  {
+    eval analyze table t1;
+  }
+  eval select count(*) from t1 where a > '222';
+  eval select count(*) from t1 where b > '222';
+  eval select count(*) from t1 where c > '222';
+  eval select count(*) from t1 where d > '222';
+  eval select count(*) from t1 where e > '222';
+  eval select count(*) from t1 where f > '222';
+}
+drop table t1;
+
+set @is_enable = @is_enable_default;
+source ndb_index_stat_enable.inc;

=== added file 'mysql-test/suite/ndb/t/ndb_index_stat_enable.inc'
--- a/mysql-test/suite/ndb/t/ndb_index_stat_enable.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_index_stat_enable.inc	2011-07-02 07:05:32 +0000
@@ -0,0 +1,39 @@
+# turn ndb_index_stat_enable ON or OFF
+# caller sets @is_enable 0/1
+# based on global variable, local follows global
+# do nothing if value is already correct
+# setting OFF drops stats tables to avoid MTR diff
+
+let is_enable_on = `select @is_enable and not @@global.ndb_index_stat_enable`;
+let is_enable_off = `select not @is_enable and @@global.ndb_index_stat_enable`;
+set @is_enable = NULL;
+
+--echo # is_enable_on=$is_enable_on is_enable_off=$is_enable_off
+
+--echo # ndb_index_stat_enable - before
+show global variables like 'ndb_index_stat_enable';
+show local variables like 'ndb_index_stat_enable';
+
+if ($is_enable_on)
+{
+  # first global
+  eval set @@global.ndb_index_stat_enable = 1;
+  eval set @@local.ndb_index_stat_enable = 1;
+
+  # stats thread creates stats tables
+}
+
+if ($is_enable_off)
+{
+  # first local
+  eval set @@local.ndb_index_stat_enable = 0;
+  eval set @@global.ndb_index_stat_enable = 0;
+
+  # stats thread does not (and must not) drop stats tables
+  eval drop table mysql.ndb_index_stat_sample;
+  eval drop table mysql.ndb_index_stat_head;
+}
+
+--echo # ndb_index_stat_enable - after
+show global variables like 'ndb_index_stat_enable';
+show local variables like 'ndb_index_stat_enable';

=== modified file 'mysql-test/suite/ndb/t/ndb_restore_misc.test'
--- a/mysql-test/suite/ndb/t/ndb_restore_misc.test	2011-05-31 08:28:58 +0000
+++ b/mysql-test/suite/ndb/t/ndb_restore_misc.test	2011-07-19 10:54:29 +0000
@@ -546,8 +546,15 @@ CREATE TEMPORARY TABLE IF NOT EXISTS ndb
   name VARCHAR(255)
 );
 --source ndb_show_tables_result.inc
+# the db fields include single quotes...
+--disable_warnings
+select id into @tmp1 from ndb_show_tables_results
+where name like '%ndb_index_stat_sample_x1%';
+--enable_warnings
 select * from ndb_show_tables_results
 where type like '%Index%'
+and name not like '%ndb_index_stat_sample_x1%'
+and name not like concat('%NDB$INDEX_',@tmp1,'_CUSTOM%')
 order by 1,2,3,4,5,6,7;
 drop table ndb_show_tables_results;
 

=== added file 'mysql-test/suite/ndb/t/ndb_statistics.inc'
--- a/mysql-test/suite/ndb/t/ndb_statistics.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_statistics.inc	2011-07-02 07:05:32 +0000
@@ -0,0 +1,145 @@
+-- source include/have_ndb.inc
+
+--disable_warnings
+drop table if exists t1, t2, t3, t4;
+--enable_warnings
+
+CREATE TABLE t10(
+  K INT NOT NULL AUTO_INCREMENT,
+  I INT, J INT,
+  PRIMARY KEY(K),
+  KEY(I,J),
+  UNIQUE KEY(J,K)
+) ENGINE=ndbcluster
+  partition by key (K) partitions 1;
+
+INSERT INTO t10(I,J) VALUES (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(0,0);
+
+CREATE TABLE t100 LIKE t10;
+INSERT INTO t100(I,J)
+  SELECT X.J, X.J+(10*Y.J) FROM t10 AS X,t10 AS Y;
+
+CREATE TABLE t10000 LIKE t10;
+
+# Insert into t10000 in two chunks to not
+#  exhaust MaxNoOfConcurrentOperations
+INSERT INTO t10000(I,J)
+  SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
+  WHERE X.J<50;
+INSERT INTO t10000(I,J)
+  SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
+  WHERE X.J>=50;
+
+ANALYZE TABLE t10,t100,t10000;
+
+SELECT COUNT(*) FROM t10;
+SELECT COUNT(*) FROM t100;
+SELECT COUNT(*) FROM t10000;
+
+#
+# Bug #59517: Incorrect detection of single row access in
+#             ha_ndbcluster::records_in_range()
+
+# Expect a single row (or const) when PK is excact specified
+EXPLAIN
+SELECT * FROM t10000 WHERE k = 42;
+
+# All queries below should *not* return a single row
+EXPLAIN
+SELECT * FROM t10000 WHERE k >= 42 and k < 10000;
+EXPLAIN
+SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
+EXPLAIN
+SELECT * FROM t10000 WHERE k < 42;
+EXPLAIN
+SELECT * FROM t10000 WHERE k > 42;
+
+#
+# Bug #59519 ::set_rec_per_key() assumes ORDER_INDEX to be unique
+#
+
+# 'REF' join of 'Y' should match >1 rows
+EXPLAIN
+SELECT * FROM t10000 AS X JOIN t10000 AS Y
+  ON Y.I=X.I AND Y.J = X.I;
+
+#
+# Bug #11804277: INCORRECT INDEX MAY BE SELECTED DUE TO INSUFFICIENT 
+#                STATISTICS FROM CLUSTER
+#
+
+# Open bounded range should return 10% of #rows in table
+EXPLAIN
+SELECT * FROM t100 WHERE k < 42;
+EXPLAIN
+SELECT * FROM t100 WHERE k > 42;
+EXPLAIN
+SELECT * FROM t10000 WHERE k < 42;
+EXPLAIN
+SELECT * FROM t10000 WHERE k > 42;
+
+#Closed bounded range should return 5% of #rows in table
+EXPLAIN
+SELECT * FROM t100 WHERE k BETWEEN 42 AND 10000;
+EXPLAIN
+SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
+
+#EQ-range selectivity depends on
+#  - key length specified
+#  - #rows in table.
+#  - unique/non-unique index
+#  - min 2% selectivity
+#
+#  Possibly combined with open/closed ranges as
+#  above which further improves selectivity
+#
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0;
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0;
+
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J = 0;
+
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0;
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J > 1;
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J < 1;
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J BETWEEN 1 AND 10;
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J = 1;
+
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0;
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K > 1;
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K < 1;
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10;
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 0 AND K = 1;
+
+## Verify selection of 'best' index
+## (The one of index I/J being EQ)
+EXPLAIN
+SELECT * FROM t10000 WHERE I = 0 AND J <> 1;
+EXPLAIN
+SELECT * FROM t10000 WHERE I <> 0 AND J = 1;
+EXPLAIN
+SELECT * FROM t10000 WHERE I <> 0 AND J <> 1;
+
+EXPLAIN
+SELECT * FROM t10000 WHERE J <> 1 AND I = 0;
+EXPLAIN
+SELECT * FROM t10000 WHERE J = 1 AND I <> 0;
+EXPLAIN
+SELECT * FROM t10000 WHERE J <> 1 AND I <> 0;
+
+
+DROP TABLE t10,t100,t10000;
+
+--echo End of 5.1 tests

=== removed file 'mysql-test/suite/ndb/t/ndb_statistics.test'
--- a/mysql-test/suite/ndb/t/ndb_statistics.test	2011-02-28 10:42:04 +0000
+++ b/mysql-test/suite/ndb/t/ndb_statistics.test	1970-01-01 00:00:00 +0000
@@ -1,144 +0,0 @@
--- source include/have_ndb.inc
-
---disable_warnings
-drop table if exists t1, t2, t3, t4;
---enable_warnings
-
-CREATE TABLE t10(
-  K INT NOT NULL AUTO_INCREMENT,
-  I INT, J INT,
-  PRIMARY KEY(K),
-  KEY(I,J),
-  UNIQUE KEY(J,K)
-) ENGINE=ndbcluster;
-
-INSERT INTO t10(I,J) VALUES (1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(0,0);
-
-CREATE TABLE t100 LIKE t10;
-INSERT INTO t100(I,J)
-  SELECT X.J, X.J+(10*Y.J) FROM t10 AS X,t10 AS Y;
-
-CREATE TABLE t10000 LIKE t10;
-
-# Insert into t10000 in two chunks to not
-#  exhaust MaxNoOfConcurrentOperations
-INSERT INTO t10000(I,J)
-  SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
-  WHERE X.J<50;
-INSERT INTO t10000(I,J)
-  SELECT X.J, X.J+(100*Y.J) FROM t100 AS X,t100 AS Y
-  WHERE X.J>=50;
-
-ANALYZE TABLE t10,t100,t10000;
-
-SELECT COUNT(*) FROM t10;
-SELECT COUNT(*) FROM t100;
-SELECT COUNT(*) FROM t10000;
-
-#
-# Bug #59517: Incorrect detection of single row access in
-#             ha_ndbcluster::records_in_range()
-
-# Expect a single row (or const) when PK is excact specified
-EXPLAIN
-SELECT * FROM t10000 WHERE k = 42;
-
-# All queries below should *not* return a single row
-EXPLAIN
-SELECT * FROM t10000 WHERE k >= 42 and k < 10000;
-EXPLAIN
-SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
-EXPLAIN
-SELECT * FROM t10000 WHERE k < 42;
-EXPLAIN
-SELECT * FROM t10000 WHERE k > 42;
-
-#
-# Bug #59519 ::set_rec_per_key() assumes ORDER_INDEX to be unique
-#
-
-# 'REF' join of 'Y' should match >1 rows
-EXPLAIN
-SELECT * FROM t10000 AS X JOIN t10000 AS Y
-  ON Y.I=X.I AND Y.J = X.I;
-
-#
-# Bug #11804277: INCORRECT INDEX MAY BE SELECTED DUE TO INSUFFICIENT 
-#                STATISTICS FROM CLUSTER
-#
-
-# Open bounded range should return 10% of #rows in table
-EXPLAIN
-SELECT * FROM t100 WHERE k < 42;
-EXPLAIN
-SELECT * FROM t100 WHERE k > 42;
-EXPLAIN
-SELECT * FROM t10000 WHERE k < 42;
-EXPLAIN
-SELECT * FROM t10000 WHERE k > 42;
-
-#Closed bounded range should return 5% of #rows in table
-EXPLAIN
-SELECT * FROM t100 WHERE k BETWEEN 42 AND 10000;
-EXPLAIN
-SELECT * FROM t10000 WHERE k BETWEEN 42 AND 10000;
-
-#EQ-range selectivity depends on
-#  - key length specified
-#  - #rows in table.
-#  - unique/non-unique index
-#  - min 2% selectivity
-#
-#  Possibly combined with open/closed ranges as
-#  above which further improves selectivity
-#
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0;
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0;
-
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J = 0;
-
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0;
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J > 1;
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J < 1;
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J BETWEEN 1 AND 10;
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J = 1;
-
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0;
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K > 1;
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K < 1;
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K BETWEEN 1 AND 10;
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 0 AND K = 1;
-
-## Verify selection of 'best' index
-## (The one of index I/J being EQ)
-EXPLAIN
-SELECT * FROM t10000 WHERE I = 0 AND J <> 1;
-EXPLAIN
-SELECT * FROM t10000 WHERE I <> 0 AND J = 1;
-EXPLAIN
-SELECT * FROM t10000 WHERE I <> 0 AND J <> 1;
-
-EXPLAIN
-SELECT * FROM t10000 WHERE J <> 1 AND I = 0;
-EXPLAIN
-SELECT * FROM t10000 WHERE J = 1 AND I <> 0;
-EXPLAIN
-SELECT * FROM t10000 WHERE J <> 1 AND I <> 0;
-
-
-DROP TABLE t10,t100,t10000;
-
---echo End of 5.1 tests

=== added file 'mysql-test/suite/ndb/t/ndb_statistics0.test'
--- a/mysql-test/suite/ndb/t/ndb_statistics0.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_statistics0.test	2011-07-02 07:05:32 +0000
@@ -0,0 +1,11 @@
+# index stats OFF
+
+set @is_enable_default = @@global.ndb_index_stat_enable;
+
+set @is_enable = 0;
+source ndb_index_stat_enable.inc;
+
+--source ndb_statistics.inc
+
+set @is_enable = @is_enable_default;
+source ndb_index_stat_enable.inc;

=== added file 'mysql-test/suite/ndb/t/ndb_statistics1.test'
--- a/mysql-test/suite/ndb/t/ndb_statistics1.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_statistics1.test	2011-07-02 07:05:32 +0000
@@ -0,0 +1,11 @@
+# index stats ON
+
+set @is_enable_default = @@global.ndb_index_stat_enable;
+
+set @is_enable = 1;
+source ndb_index_stat_enable.inc;
+
+--source ndb_statistics.inc
+
+set @is_enable = @is_enable_default;
+source ndb_index_stat_enable.inc;

=== modified file 'mysql-test/t/group_by.test'
--- a/mysql-test/t/group_by.test	2011-02-18 10:55:24 +0000
+++ b/mysql-test/t/group_by.test	2011-08-17 10:36:01 +0000
@@ -1248,6 +1248,27 @@ ON 1 WHERE t2.f1 > 1 GROUP BY t2.f1;
 
 DROP TABLE t1;
 
+
+--echo #
+--echo # Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+--echo #
+
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk 
+GROUP BY v1.pk;
+
+DROP VIEW v1;
+DROP TABLE t1,t2;
+
+--echo # End of Bug#12798270
 --echo #
 --echo # Bug#59839: Aggregation followed by subquery yields wrong result
 --echo #

=== modified file 'sql/ha_ndb_index_stat.cc'
--- a/sql/ha_ndb_index_stat.cc	2011-07-01 10:35:04 +0000
+++ b/sql/ha_ndb_index_stat.cc	2011-08-17 10:36:01 +0000
@@ -62,6 +62,7 @@ struct Ndb_index_stat {
   time_t check_time;    /* when checked for updated stats (>= read_time) */
   bool cache_clean;     /* old caches have been deleted */
   uint force_update;    /* one-time force update from analyze table */
+  bool no_stats;        /* have detected that no stats exist */
   NdbIndexStat::Error error;
   time_t error_time;
   int error_count;
@@ -498,40 +499,33 @@ struct Ndb_index_stat_glob {
   uint total_count;
   uint force_update;
   uint wait_update;
+  uint no_stats;
   uint cache_query_bytes; /* In use */
   uint cache_clean_bytes; /* Obsolete versions not yet removed */
-  bool is_locked;
   Ndb_index_stat_glob() :
     total_count(0),
     force_update(0),
     wait_update(0),
+    no_stats(0),
     cache_query_bytes(0),
-    cache_clean_bytes(0),
-    is_locked(false)
+    cache_clean_bytes(0)
   {
   }
   void set_list_count()
   {
+    total_count= 0;
     int lt;
     for (lt= 0; lt < Ndb_index_stat::LT_Count; lt++)
     {
       const Ndb_index_stat_list &list= ndb_index_stat_list[lt];
       list_count[lt]= list.count;
+      total_count++;
     }
   }
-  void lock()
+  void set_status_variables()
   {
-    pthread_mutex_lock(&ndb_index_stat_glob_mutex);
-    assert(!is_locked);
-    is_locked= true;
-  }
-  void unlock()
-  {
-    assert(is_locked);
     g_ndb_status_index_stat_cache_query= cache_query_bytes;
     g_ndb_status_index_stat_cache_clean= cache_clean_bytes;
-    is_locked= false;
-    pthread_mutex_unlock(&ndb_index_stat_glob_mutex);
   }
 };
 
@@ -554,6 +548,7 @@ Ndb_index_stat::Ndb_index_stat()
   check_time= 0;
   cache_clean= false;
   force_update= 0;
+  no_stats= false;
   error_time= 0;
   error_count= 0;
   share_next= 0;
@@ -567,7 +562,6 @@ Ndb_index_stat::Ndb_index_stat()
 void
 ndb_index_stat_error(Ndb_index_stat *st, const char* place, int line)
 {
-  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
   time_t now= ndb_index_stat_time();
   NdbIndexStat::Error error= st->is->getNdbError();
   if (error.code == 0)
@@ -581,13 +575,18 @@ ndb_index_stat_error(Ndb_index_stat *st,
   st->error= error;
   st->error_time= now;
   st->error_count++;
-  pthread_cond_broadcast(&ndb_index_stat_stat_cond);
-  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
 
   DBUG_PRINT("index_stat", ("%s line %d: error %d line %d extra %d",
                             place, line, error.code, error.line, error.extra));
 }
 
+void
+ndb_index_stat_clear_error(Ndb_index_stat *st)
+{
+  st->error.code= 0;
+  st->error.status= NdbError::Success;
+}
+
 /* Lists across shares */
 
 Ndb_index_stat_list::Ndb_index_stat_list(int the_lt, const char* the_name)
@@ -611,9 +610,8 @@ Ndb_index_stat_list ndb_index_stat_list[
 };
 
 void
-ndb_index_stat_list_add(Ndb_index_stat* st, int lt, int place= +1)
+ndb_index_stat_list_add(Ndb_index_stat* st, int lt)
 {
-  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   assert(st != 0 && st->lt == 0);
   assert(st->list_next == 0 && st->list_prev == 0);
   assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
@@ -627,13 +625,6 @@ ndb_index_stat_list_add(Ndb_index_stat*
     list.head= st;
     list.tail= st;
   }
-  else if (place < 0)
-  {
-    assert(list.head != 0 && list.head->list_prev == 0);
-    st->list_next= list.head;
-    list.head->list_prev= st;
-    list.head= st;
-  }
   else
   {
     assert(list.tail != 0 && list.tail->list_next == 0);
@@ -642,9 +633,6 @@ ndb_index_stat_list_add(Ndb_index_stat*
     list.tail= st;
   }
   list.count++;
-  glob.lock();
-  glob.total_count++;
-  glob.unlock();
 
   st->lt= lt;
 }
@@ -652,7 +640,6 @@ ndb_index_stat_list_add(Ndb_index_stat*
 void
 ndb_index_stat_list_remove(Ndb_index_stat* st)
 {
-  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
   assert(st != 0);
   int lt= st->lt;
   assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
@@ -669,10 +656,6 @@ ndb_index_stat_list_remove(Ndb_index_sta
     list.tail= prev;
   assert(list.count != 0);
   list.count--;
-  glob.lock();
-  assert(glob.total_count != 0);
-  glob.total_count--;
-  glob.unlock();
 
   if (next != 0)
     next->list_prev= prev;
@@ -686,66 +669,85 @@ ndb_index_stat_list_remove(Ndb_index_sta
 }
 
 void
-ndb_index_stat_list_move(Ndb_index_stat *st, int lt, int place= +1)
+ndb_index_stat_list_move(Ndb_index_stat *st, int lt)
 {
   assert(st != 0);
   ndb_index_stat_list_remove(st);
-  ndb_index_stat_list_add(st, lt, place);
+  ndb_index_stat_list_add(st, lt);
 }
 
-/* Move entry in / out error list */
+/* Stats entry changes (must hold stat_mutex) */
 
 void
-ndb_index_stat_list_to_error(Ndb_index_stat *st)
+ndb_index_stat_force_update(Ndb_index_stat *st, bool onoff)
 {
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-
-  assert(st != 0);
-  const int lt= st->lt; NDB_IGNORE_VALUE(lt);
-  assert(1 <= lt && lt < Ndb_index_stat::LT_Count);
-  assert(lt != Ndb_index_stat::LT_Error);
-
-  if (st->force_update != 0)
+  if (onoff)
   {
-    glob.lock();
+    /* One more request */
+    glob.force_update++;
+    st->force_update++;
+  }
+  else
+  {
+    /* All done */
     assert(glob.force_update >= st->force_update);
     glob.force_update-= st->force_update;
-    glob.unlock();
     st->force_update= 0;
   }
-
-  time_t now= ndb_index_stat_time();
-  st->error_time= now;
-  ndb_index_stat_list_move(st, Ndb_index_stat::LT_Error);
 }
 
 void
-ndb_index_stat_list_from_error(Ndb_index_stat *st)
+ndb_index_stat_no_stats(Ndb_index_stat *st, bool flag)
 {
-  assert(st != 0);
-  assert(st->lt == Ndb_index_stat::LT_Error);
-  if (st->force_update)
-    ndb_index_stat_list_move(st, Ndb_index_stat::LT_Update);
-  else
-    ndb_index_stat_list_move(st, Ndb_index_stat::LT_Read);
-  st->error.code= 0;
-  st->error.status= NdbError::Success;
+  Ndb_index_stat_glob &glob= ndb_index_stat_glob;
+  if (st->no_stats != flag)
+  {
+    if (flag)
+    {
+      glob.no_stats++;
+      st->no_stats= true;
+    }
+    else
+    {
+      assert(glob.no_stats >= 1);
+      glob.no_stats-= 1;
+      st->no_stats= false;
+    }
+  }
 }
 
 /* Find or add entry under the share */
 
 Ndb_index_stat*
-ndb_index_stat_alloc()
+ndb_index_stat_alloc(const NDBINDEX *index,
+                     const NDBTAB *table,
+                     int &err_out)
 {
+  err_out= 0;
   Ndb_index_stat *st= new Ndb_index_stat;
   NdbIndexStat *is= new NdbIndexStat;
   if (st != 0 && is != 0)
   {
     st->is= is;
-    return st;
+    st->index_id= index->getObjectId();
+    st->index_version= index->getObjectVersion();
+#ifndef DBUG_OFF
+    my_snprintf(st->id, sizeof(st->id), "%d.%d", st->index_id, st->index_version);
+#endif
+    if (is->set_index(*index, *table) == 0)
+      return st;
+    ndb_index_stat_error(st, "set_index", __LINE__);
+    err_out= st->error.code;
+  }
+  else
+  {
+    err_out= NdbIndexStat::NoMemError;
   }
-  delete is;
-  delete st;
+  if (is != 0)
+    delete is;
+  if (st != 0)
+    delete st;
   return 0;
 }
 
@@ -773,70 +775,65 @@ ndb_index_stat_find_share(NDB_SHARE *sha
 }
 
 /* Subroutine, have lock */
-Ndb_index_stat*
+void
 ndb_index_stat_add_share(NDB_SHARE *share,
-                         const NDBINDEX *index,
-                         const NDBTAB *table,
+                         Ndb_index_stat *st,
                          Ndb_index_stat *st_last)
 {
-  struct Ndb_index_stat *st= ndb_index_stat_alloc();
-  if (st != 0)
-  {
-    st->share= share;
-    if (st_last == 0)
-      share->index_stat_list= st;
-    else
-      st_last->share_next= st;
-    st->index_id= index->getObjectId();
-    st->index_version= index->getObjectVersion();
-#ifndef DBUG_OFF
-    my_snprintf(st->id, sizeof(st->id), "%d.%d", st->index_id, st->index_version);
-#endif
-    if (st->is->set_index(*index, *table) == -1)
-    {
-      ndb_index_stat_error(st, "set_index", __LINE__);
-      /* Caller assigns list */
-    }
-  }
-  return st;
+  st->share= share;
+  if (st_last == 0)
+    share->index_stat_list= st;
+  else
+    st_last->share_next= st;
 }
 
 Ndb_index_stat*
 ndb_index_stat_get_share(NDB_SHARE *share,
                          const NDBINDEX *index,
                          const NDBTAB *table,
+                         int &err_out,
                          bool allow_add,
                          bool force_update)
 {
   pthread_mutex_lock(&share->mutex);
   pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  pthread_mutex_lock(&ndb_index_stat_stat_mutex);
   time_t now= ndb_index_stat_time();
+  err_out= 0;
 
   struct Ndb_index_stat *st= 0;
   struct Ndb_index_stat *st_last= 0;
-  if (ndb_index_stat_allow())
+  do
   {
-    st= ndb_index_stat_find_share(share, index, st_last);
-    if (st == 0 && allow_add)
+    if (unlikely(!ndb_index_stat_allow()))
     {
-      st= ndb_index_stat_add_share(share, index, table, st_last);
-      if (st != 0)
-        ndb_index_stat_list_add(st, Ndb_index_stat::LT_New);
+      err_out= Ndb_index_stat_error_NOT_ALLOW;
+      break;
     }
-    if (st != 0)
+    st= ndb_index_stat_find_share(share, index, st_last);
+    if (st == 0)
     {
-      if (force_update != 0)
+      if (!allow_add)
+      {
+        err_out= Ndb_index_stat_error_NOT_FOUND;
+        break;
+      }
+      st= ndb_index_stat_alloc(index, table, err_out);
+      if (st == 0)
       {
-        st->force_update++;
-        Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-        glob.lock();
-        glob.force_update++;
-        glob.unlock();
+        assert(err_out != 0);
+        break;
       }
-      st->access_time= now;
+      ndb_index_stat_add_share(share, st, st_last);
+      ndb_index_stat_list_add(st, Ndb_index_stat::LT_New);
     }
+    if (force_update)
+      ndb_index_stat_force_update(st, true);
+    st->access_time= now;
   }
+  while (0);
 
+  pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
   pthread_mutex_unlock(&ndb_index_stat_list_mutex);
   pthread_mutex_unlock(&share->mutex);
   return st;
@@ -893,6 +890,39 @@ ndb_index_stat_free(NDB_SHARE *share)
   pthread_mutex_unlock(&ndb_index_stat_list_mutex);
 }
 
+/* Find entry across shares */
+/* wl4124_todo mutex overkill, hash table, can we find table share */
+Ndb_index_stat*
+ndb_index_stat_find_entry(int index_id, int index_version, int table_id)
+{
+  DBUG_ENTER("ndb_index_stat_find_entry");
+  pthread_mutex_lock(&ndbcluster_mutex);
+  pthread_mutex_lock(&ndb_index_stat_list_mutex);
+  DBUG_PRINT("index_stat", ("find index:%d version:%d table:%d",
+                            index_id, index_version, table_id));
+
+  int lt;
+  for (lt=1; lt < Ndb_index_stat::LT_Count; lt++)
+  {
+    Ndb_index_stat *st=ndb_index_stat_list[lt].head;
+    while (st != 0)
+    {
+      if (st->index_id == index_id &&
+          st->index_version == index_version)
+      {
+        pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+        pthread_mutex_unlock(&ndbcluster_mutex);
+        DBUG_RETURN(st);
+      }
+      st= st->list_next;
+    }
+  }
+
+  pthread_mutex_unlock(&ndb_index_stat_list_mutex);
+  pthread_mutex_unlock(&ndbcluster_mutex);
+  DBUG_RETURN(0);
+}
+
 /* Statistics thread sub-routines */
 
 void
@@ -909,12 +939,11 @@ ndb_index_stat_cache_move(Ndb_index_stat
   DBUG_PRINT("index_stat", ("st %s cache move: query:%u clean:%u",
                             st->id, new_query_bytes, old_query_bytes));
   st->is->move_cache();
-  glob.lock();
   assert(glob.cache_query_bytes >= old_query_bytes);
   glob.cache_query_bytes-= old_query_bytes;
   glob.cache_query_bytes+= new_query_bytes;
   glob.cache_clean_bytes+= old_query_bytes;
-  glob.unlock();
+  glob.set_status_variables();
 }
 
 void
@@ -928,20 +957,21 @@ ndb_index_stat_cache_clean(Ndb_index_sta
   DBUG_PRINT("index_stat", ("st %s cache clean: clean:%u",
                             st->id, old_clean_bytes));
   st->is->clean_cache();
-  glob.lock();
   assert(glob.cache_clean_bytes >= old_clean_bytes);
   glob.cache_clean_bytes-= old_clean_bytes;
-  glob.unlock();
+  glob.set_status_variables();
 }
 
 /* Misc in/out parameters for process steps */
 struct Ndb_index_stat_proc {
+  NdbIndexStat* is_util; // For metadata and polling
   Ndb *ndb;
   time_t now;
   int lt;
   bool busy;
   bool end;
   Ndb_index_stat_proc() :
+    is_util(0),
     ndb(0),
     now(0),
     lt(0),
@@ -1021,8 +1051,25 @@ ndb_index_stat_proc_read(Ndb_index_stat_
   NdbIndexStat::Head head;
   if (st->is->read_stat(pr.ndb) == -1)
   {
+    pthread_mutex_lock(&ndb_index_stat_stat_mutex);
     ndb_index_stat_error(st, "read_stat", __LINE__);
-    pr.lt= Ndb_index_stat::LT_Error;
+    const uint force_update= st->force_update;
+    ndb_index_stat_force_update(st, false);
+
+    /* no stats is not unexpected error, unless analyze was done */
+    if (st->is->getNdbError().code == NdbIndexStat::NoIndexStats &&
+        force_update == 0)
+    {
+      ndb_index_stat_no_stats(st, true);
+      pr.lt= Ndb_index_stat::LT_Idle;
+    }
+    else
+    {
+      pr.lt= Ndb_index_stat::LT_Error;
+    }
+
+    pthread_cond_broadcast(&ndb_index_stat_stat_cond);
+    pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
     return;
   }
 
@@ -1033,15 +1080,8 @@ ndb_index_stat_proc_read(Ndb_index_stat_
   st->read_time= pr.now;
   st->sample_version= head.m_sampleVersion;
 
-  if (st->force_update != 0)
-  {
-    Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-    glob.lock();
-    assert(glob.force_update >= st->force_update);
-    glob.force_update-= st->force_update;
-    glob.unlock();
-    st->force_update= 0;
-  }
+  ndb_index_stat_force_update(st, false);
+  ndb_index_stat_no_stats(st, false);
 
   ndb_index_stat_cache_move(st);
   st->cache_clean= false;
@@ -1073,6 +1113,7 @@ ndb_index_stat_proc_read(Ndb_index_stat_
     pr.busy= true;
 }
 
+// wl4124_todo detect force_update faster
 void
 ndb_index_stat_proc_idle(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
 {
@@ -1124,11 +1165,9 @@ ndb_index_stat_proc_idle(Ndb_index_stat_
     st_loop= st_loop->list_next;
     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
     ndb_index_stat_proc_idle(pr, st);
-    if (pr.lt != lt)
-    {
-      ndb_index_stat_list_move(st, pr.lt);
-      cnt++;
-    }
+    // rotates list if entry remains LT_Idle
+    ndb_index_stat_list_move(st, pr.lt);
+    cnt++;
   }
   if (cnt == batch)
     pr.busy= true;
@@ -1143,7 +1182,16 @@ ndb_index_stat_proc_check(Ndb_index_stat
   if (st->is->read_head(pr.ndb) == -1)
   {
     ndb_index_stat_error(st, "read_head", __LINE__);
-    pr.lt= Ndb_index_stat::LT_Error;
+    /* no stats is not unexpected error */
+    if (st->is->getNdbError().code == NdbIndexStat::NoIndexStats)
+    {
+      ndb_index_stat_no_stats(st, true);
+      pr.lt= Ndb_index_stat::LT_Idle;
+    }
+    else
+    {
+      pr.lt= Ndb_index_stat::LT_Error;
+    }
     return;
   }
   st->is->get_head(head);
@@ -1213,9 +1261,7 @@ ndb_index_stat_proc_evict()
 {
   const Ndb_index_stat_opt &opt= ndb_index_stat_opt;
   Ndb_index_stat_glob &glob= ndb_index_stat_glob;
-  glob.lock();
   uint curr_size= glob.cache_query_bytes + glob.cache_clean_bytes;
-  glob.unlock();
   const uint cache_lowpct= opt.get(Ndb_index_stat_opt::Icache_lowpct);
   const uint cache_limit= opt.get(Ndb_index_stat_opt::Icache_limit);
   if (100 * curr_size <= cache_lowpct * cache_limit)
@@ -1330,11 +1376,15 @@ ndb_index_stat_proc_error(Ndb_index_stat
   const int error_delay= opt.get(Ndb_index_stat_opt::Ierror_delay);
   const time_t error_wait= st->error_time + error_delay - pr.now;
 
-  if (error_wait <= 0)
-  {
-    ndb_index_stat_list_from_error(st);
-    DBUG_PRINT("index_stat", ("st %s error wait:%ds error count:%u",
-                              st->id, (int)error_wait, st->error_count));
+  if (error_wait <= 0 ||
+      /* Analyze issued after previous error */
+      st->force_update)
+  {
+    DBUG_PRINT("index_stat", ("st %s error wait:%ds error count:%u"
+                              " force update:%u",
+                              st->id, (int)error_wait, st->error_count,
+                              st->force_update));
+    ndb_index_stat_clear_error(st);
     if (st->force_update)
       pr.lt= Ndb_index_stat::LT_Update;
     else
@@ -1361,16 +1411,82 @@ ndb_index_stat_proc_error(Ndb_index_stat
     st_loop= st_loop->list_next;
     DBUG_PRINT("index_stat", ("st %s proc %s", st->id, list.name));
     ndb_index_stat_proc_error(pr, st);
-    if (pr.lt != lt)
-    {
-      ndb_index_stat_list_move(st, pr.lt);
-      cnt++;
-    }
+    ndb_index_stat_list_move(st, pr.lt);
+    cnt++;
   }
   if (cnt == batch)
     pr.busy= true;
 }
 
+void
+ndb_index_stat_proc_event(Ndb_index_stat_proc &pr, Ndb_index_stat *st)
+{
+  /*
+    Put on Check list if idle.
+    We get event also for our own analyze but this should not matter.
+   */
+  pr.lt= st->lt;
+  if (st->lt == Ndb_index_stat::LT_Idle ||
+      st->lt == Ndb_index_stat::LT_Error)
+    pr.lt= Ndb_index_stat::LT_Check;
+}
+
+void
+ndb_index_stat_proc_event(Ndb_index_stat_proc &pr)
+{
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+  int ret;
+  ret= is->poll_listener(ndb, 0);
+  DBUG_PRINT("index_stat", ("poll_listener ret: %d", ret));
+  if (ret == -1)
+  {
+    // wl4124_todo report error
+    DBUG_ASSERT(false);
+    return;
+  }
+  if (ret == 0)
+    return;
+
+  while (1)
+  {
+    ret= is->next_listener(ndb);
+    DBUG_PRINT("index_stat", ("next_listener ret: %d", ret));
+    if (ret == -1)
+    {
+      // wl4124_todo report error
+      DBUG_ASSERT(false);
+      return;
+    }
+    if (ret == 0)
+      break;
+
+    NdbIndexStat::Head head;
+    is->get_head(head);
+    DBUG_PRINT("index_stat", ("next_listener eventType: %d indexId: %u",
+                              head.m_eventType, head.m_indexId));
+
+    Ndb_index_stat *st= ndb_index_stat_find_entry(head.m_indexId,
+                                                  head.m_indexVersion,
+                                                  head.m_tableId);
+    /*
+      Another process can update stats for an index which is not found
+      in this mysqld.  Ignore it.
+     */
+    if (st != 0)
+    {
+      DBUG_PRINT("index_stat", ("st %s proc %s", st->id, "Event"));
+      ndb_index_stat_proc_event(pr, st);
+      if (pr.lt != st->lt)
+        ndb_index_stat_list_move(st, pr.lt);
+    }
+    else
+    {
+      DBUG_PRINT("index_stat", ("entry not found in this mysqld"));
+    }
+  }
+}
+
 #ifndef DBUG_OFF
 void
 ndb_index_stat_report(const Ndb_index_stat_glob& old_glob)
@@ -1427,15 +1543,11 @@ ndb_index_stat_report(const Ndb_index_st
 
   /* Updates waited for and forced updates */
   {
-    pthread_mutex_lock(&ndb_index_stat_list_mutex);
     uint wait_update= new_glob.wait_update;
     uint force_update= new_glob.force_update;
-    pthread_mutex_unlock(&ndb_index_stat_list_mutex);
-    if (wait_update != 0 || force_update != 0)
-    {
-      DBUG_PRINT("index_stat", ("wait update:%u force update:%u",
-                                wait_update, force_update));
-    }
+    uint no_stats= new_glob.no_stats;
+    DBUG_PRINT("index_stat", ("wait update:%u force update:%u no stats:%u",
+                              wait_update, force_update, no_stats));
   }
 }
 #endif
@@ -1458,6 +1570,7 @@ ndb_index_stat_proc(Ndb_index_stat_proc
   ndb_index_stat_proc_evict(pr);
   ndb_index_stat_proc_delete(pr);
   ndb_index_stat_proc_error(pr);
+  ndb_index_stat_proc_event(pr);
 
 #ifndef DBUG_OFF
   ndb_index_stat_report(old_glob);
@@ -1503,11 +1616,14 @@ ndb_index_stat_end()
 
 /* Index stats thread */
 
-static int
-ndb_index_stat_check_or_create_systables(NdbIndexStat* is, Ndb* ndb)
+int
+ndb_index_stat_check_or_create_systables(Ndb_index_stat_proc &pr)
 {
   DBUG_ENTER("ndb_index_stat_check_or_create_systables");
 
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
   if (is->check_systables(ndb) == 0)
   {
     DBUG_PRINT("index_stat", ("using existing index stats tables"));
@@ -1520,7 +1636,8 @@ ndb_index_stat_check_or_create_systables
     DBUG_RETURN(0);
   }
 
-  if (is->getNdbError().code == 721)
+  if (is->getNdbError().code == 721 ||
+      is->getNdbError().code == 4244)
   {
     // race between mysqlds, maybe
     DBUG_PRINT("index_stat", ("create index stats tables failed: error %d line %d",
@@ -1533,6 +1650,82 @@ ndb_index_stat_check_or_create_systables
   DBUG_RETURN(-1);
 }
 
+int
+ndb_index_stat_check_or_create_sysevents(Ndb_index_stat_proc &pr)
+{
+  DBUG_ENTER("ndb_index_stat_check_or_create_sysevents");
+
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
+  if (is->check_sysevents(ndb) == 0)
+  {
+    DBUG_PRINT("index_stat", ("using existing index stats events"));
+    DBUG_RETURN(0);
+  }
+
+  if (is->create_sysevents(ndb) == 0)
+  {
+    DBUG_PRINT("index_stat", ("created index stats events"));
+    DBUG_RETURN(0);
+  }
+
+  if (is->getNdbError().code == 746)
+  {
+    // race between mysqlds, maybe
+    DBUG_PRINT("index_stat", ("create index stats events failed: error %d line %d",
+                              is->getNdbError().code, is->getNdbError().line));
+    DBUG_RETURN(-1);
+  }
+
+  sql_print_warning("create index stats events failed: error %d line %d",
+                    is->getNdbError().code, is->getNdbError().line);
+  DBUG_RETURN(-1);
+}
+
+int
+ndb_index_stat_start_listener(Ndb_index_stat_proc &pr)
+{
+  DBUG_ENTER("ndb_index_stat_start_listener");
+
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
+  if (is->create_listener(ndb) == -1)
+  {
+    sql_print_warning("create index stats listener failed: error %d line %d",
+                      is->getNdbError().code, is->getNdbError().line);
+    DBUG_RETURN(-1);
+  }
+
+  if (is->execute_listener(ndb) == -1)
+  {
+    sql_print_warning("execute index stats listener failed: error %d line %d",
+                      is->getNdbError().code, is->getNdbError().line);
+    DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int
+ndb_index_stat_stop_listener(Ndb_index_stat_proc &pr)
+{
+  DBUG_ENTER("ndb_index_stat_stop_listener");
+
+  NdbIndexStat *is= pr.is_util;
+  Ndb *ndb= pr.ndb;
+
+  if (is->drop_listener(ndb) == -1)
+  {
+    sql_print_warning("drop index stats listener failed: error %d line %d",
+                      is->getNdbError().code, is->getNdbError().line);
+    DBUG_RETURN(-1);
+  }
+
+  DBUG_RETURN(0);
+}
+
 pthread_handler_t
 ndb_index_stat_thread_func(void *arg __attribute__((unused)))
 {
@@ -1543,6 +1736,11 @@ ndb_index_stat_thread_func(void *arg __a
   my_thread_init();
   DBUG_ENTER("ndb_index_stat_thread_func");
 
+  Ndb_index_stat_proc pr;
+
+  bool have_listener;
+  have_listener= false;
+
   // wl4124_todo remove useless stuff copied from utility thread
  
   pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
@@ -1613,6 +1811,14 @@ ndb_index_stat_thread_func(void *arg __a
   }
   pthread_mutex_unlock(&LOCK_ndb_index_stat_thread);
 
+  /* Get instance used for sys objects check and create */
+  if (!(pr.is_util= new NdbIndexStat))
+  {
+    sql_print_error("Could not allocate NdbIndexStat is_util object");
+    pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
+    goto ndb_index_stat_thread_end;
+  }
+
   /* Get thd_ndb for this thread */
   if (!(thd_ndb= Thd_ndb::seize(thd)))
   {
@@ -1629,6 +1835,7 @@ ndb_index_stat_thread_func(void *arg __a
     pthread_mutex_lock(&LOCK_ndb_index_stat_thread);
     goto ndb_index_stat_thread_end;
   }
+  pr.ndb= thd_ndb->ndb;
 
   ndb_index_stat_allow(1);
   bool enable_ok;
@@ -1653,9 +1860,6 @@ ndb_index_stat_thread_func(void *arg __a
     /* const bool enable_ok_new= THDVAR(NULL, index_stat_enable); */
     const bool enable_ok_new= ndb_index_stat_get_enable(NULL);
 
-    Ndb_index_stat_proc pr;
-    pr.ndb= thd_ndb->ndb;
-
     do
     {
       if (enable_ok != enable_ok_new)
@@ -1665,13 +1869,24 @@ ndb_index_stat_thread_func(void *arg __a
 
         if (enable_ok_new)
         {
-          // at enable check or create stats tables
-          NdbIndexStat is;
-          if (ndb_index_stat_check_or_create_systables(&is, thd_ndb->ndb) == -1)
+          // at enable check or create stats tables and events
+          if (ndb_index_stat_check_or_create_systables(pr) == -1 ||
+              ndb_index_stat_check_or_create_sysevents(pr) == -1 ||
+              ndb_index_stat_start_listener(pr) == -1)
           {
             // try again in next loop
             break;
           }
+          have_listener= true;
+        }
+        else
+        {
+          // not a normal use-case
+          if (have_listener)
+          {
+            if (ndb_index_stat_stop_listener(pr) == 0)
+              have_listener= false;
+          }
         }
         enable_ok= enable_ok_new;
       }
@@ -1702,6 +1917,16 @@ ndb_index_stat_thread_end:
   net_end(&thd->net);
 
 ndb_index_stat_thread_fail:
+  if (have_listener)
+  {
+    if (ndb_index_stat_stop_listener(pr) == 0)
+      have_listener= false;
+  }
+  if (pr.is_util)
+  {
+    delete pr.is_util;
+    pr.is_util= 0;
+  }
   if (thd_ndb)
   {
     Thd_ndb::release(thd_ndb);
@@ -1738,22 +1963,37 @@ ndb_index_stat_round(double x)
 }
 
 int
-ha_ndbcluster::ndb_index_stat_wait(Ndb_index_stat *st,
-                                   uint sample_version)
+ndb_index_stat_wait(Ndb_index_stat *st,
+                    uint sample_version,
+                    bool from_analyze)
 {
-  DBUG_ENTER("ha_ndbcluster::ndb_index_stat_wait");
+  DBUG_ENTER("ndb_index_stat_wait");
 
   pthread_mutex_lock(&ndb_index_stat_stat_mutex);
   int err= 0;
   uint count= 0;
-  (void)count; // USED
   struct timespec abstime;
-  while (true) {
+  while (true)
+  {
     int ret= 0;
-    if (st->error.code != 0 &&
-        (st->error.code != NdbIndexStat::NoIndexStats ||
-         st->force_update == 0))
+    if (count == 0)
+    {
+      if (st->lt == Ndb_index_stat::LT_Error && !from_analyze)
+      {
+        err= Ndb_index_stat_error_HAS_ERROR;
+        break;
+      }
+      ndb_index_stat_clear_error(st);
+    }
+    if (st->no_stats && !from_analyze)
     {
+      /* Have detected no stats now or before */
+      err= NdbIndexStat::NoIndexStats;
+      break;
+    }
+    if (st->error.code != 0)
+    {
+      /* A new error has occured */
       err= st->error.code;
       break;
     }
@@ -1775,7 +2015,8 @@ ha_ndbcluster::ndb_index_stat_wait(Ndb_i
     }
   }
   pthread_mutex_unlock(&ndb_index_stat_stat_mutex);
-  if (err != 0) {
+  if (err != 0)
+  {
     DBUG_PRINT("index_stat", ("st %s wait error: %d",
                                st->id, err));
     DBUG_RETURN(err);
@@ -1806,14 +2047,12 @@ ha_ndbcluster::ndb_index_stat_query(uint
   ib.range_no= 0;
 
   Ndb_index_stat *st=
-    ndb_index_stat_get_share(m_share, index, m_table, true, false);
+    ndb_index_stat_get_share(m_share, index, m_table, err, true, false);
   if (st == 0)
-  {
-    DBUG_PRINT("index_stat", ("failed to add index stat share"));
-    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
-  }
+    DBUG_RETURN(err);
 
-  err= ndb_index_stat_wait(st, 0);
+  /* Pass old version 0 so existing stats terminates wait at once */
+  err= ndb_index_stat_wait(st, 0, false);
   if (err != 0)
     DBUG_RETURN(err);
 
@@ -1927,10 +2166,9 @@ ha_ndbcluster::ndb_index_stat_analyze(Nd
     DBUG_PRINT("index_stat", ("force update: %s", index->getName()));
 
     Ndb_index_stat *st=
-      ndb_index_stat_get_share(m_share, index, m_table, true, true);
-
+      ndb_index_stat_get_share(m_share, index, m_table, err, true, true);
     if (st == 0)
-      DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+      DBUG_RETURN(err);
 
     old[i].sample_version= st->sample_version;
     old[i].error_count= st->error_count;
@@ -1945,11 +2183,11 @@ ha_ndbcluster::ndb_index_stat_analyze(Nd
     DBUG_PRINT("index_stat", ("wait for update: %s", index->getName()));
 
     Ndb_index_stat *st=
-      ndb_index_stat_get_share(m_share, index, m_table, false, false);
+      ndb_index_stat_get_share(m_share, index, m_table, err, false, false);
     if (st == 0)
-      DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
+      DBUG_RETURN(err);
 
-    err= ndb_index_stat_wait(st, old[i].sample_version);
+    err= ndb_index_stat_wait(st, old[i].sample_version, true);
     if (err != 0)
       DBUG_RETURN(err);
   }

=== modified file 'sql/ha_ndb_index_stat.h'
--- a/sql/ha_ndb_index_stat.h	2011-06-15 10:37:56 +0000
+++ b/sql/ha_ndb_index_stat.h	2011-08-11 17:11:30 +0000
@@ -19,16 +19,21 @@
 
 extern struct st_ndb_status g_ndb_status;
 
+extern pthread_mutex_t ndbcluster_mutex;
+
 extern pthread_t ndb_index_stat_thread;
 extern pthread_cond_t COND_ndb_index_stat_thread;
 extern pthread_mutex_t LOCK_ndb_index_stat_thread;
-extern pthread_mutex_t ndb_index_stat_glob_mutex;
+
+/* protect entry lists where needed */
 extern pthread_mutex_t ndb_index_stat_list_mutex;
+
+/* protect and signal changes in stats entries */
 extern pthread_mutex_t ndb_index_stat_stat_mutex;
 extern pthread_cond_t ndb_index_stat_stat_cond;
 
+/* these have to live in ha_ndbcluster.cc */
 extern bool ndb_index_stat_get_enable(THD *thd);
-
 extern long g_ndb_status_index_stat_cache_query;
 extern long g_ndb_status_index_stat_cache_clean;
 
@@ -36,3 +41,14 @@ void
 compute_index_bounds(NdbIndexScanOperation::IndexBound & bound,
                      const KEY *key_info,
                      const key_range *start_key, const key_range *end_key);
+
+/* error codes local to ha_ndb */
+
+/* stats thread is not open for requests (should not happen) */
+#define Ndb_index_stat_error_NOT_ALLOW          9001
+
+/* stats entry for existing index not found (should not happen) */
+#define Ndb_index_stat_error_NOT_FOUND          9002
+
+/* request on stats entry with recent error was ignored */
+#define Ndb_index_stat_error_HAS_ERROR          9003

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-07-08 15:05:28 +0000
+++ b/sql/ha_ndbcluster.cc	2011-08-17 10:36:01 +0000
@@ -41,6 +41,7 @@
 #include "ndb_global_schema_lock.h"
 #include "ndb_global_schema_lock_guard.h"
 #include "abstract_query_plan.h"
+#include "ha_ndb_index_stat.h"
 
 #include <mysql/plugin.h>
 #include <ndb_version.h>
@@ -453,7 +454,6 @@ int ndb_index_stat_thread_running= 0;
 pthread_mutex_t LOCK_ndb_index_stat_thread;
 pthread_cond_t COND_ndb_index_stat_thread;
 pthread_cond_t COND_ndb_index_stat_ready;
-pthread_mutex_t ndb_index_stat_glob_mutex;
 pthread_mutex_t ndb_index_stat_list_mutex;
 pthread_mutex_t ndb_index_stat_stat_mutex;
 pthread_cond_t ndb_index_stat_stat_cond;
@@ -1397,6 +1397,7 @@ void ha_ndbcluster::set_rec_per_key()
   */
   for (uint i=0 ; i < table_share->keys ; i++)
   {
+    KEY* key_info= table->key_info + i;
     switch (get_index_type(i))
     {
     case UNIQUE_ORDERED_INDEX:
@@ -1406,7 +1407,6 @@ void ha_ndbcluster::set_rec_per_key()
     {
       // Index is unique when all 'key_parts' are specified,
       // else distribution is unknown and not specified here.
-      KEY* key_info= table->key_info + i;
       key_info->rec_per_key[key_info->key_parts-1]= 1;
       break;
     }
@@ -1420,8 +1420,18 @@ void ha_ndbcluster::set_rec_per_key()
       if (index_stat_enable)
       {
         int err= ndb_index_stat_set_rpk(i);
-        if (err == 0)
-          break;
+        if (err != 0 &&
+            /* no stats is not unexpected error */
+            err != NdbIndexStat::NoIndexStats &&
+            /* warning was printed at first error */
+            err != Ndb_index_stat_error_HAS_ERROR)
+        {
+          push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+                              ER_CANT_GET_STAT, /* pun? */
+                              "index stats (RPK) for key %s:"
+                              " unexpected error %d",
+                              key_info->name, err);
+        }
       }
       // no fallback method...
       break;
@@ -11743,7 +11753,6 @@ static int ndbcluster_init(void *p)
   pthread_mutex_init(&LOCK_ndb_index_stat_thread, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&COND_ndb_index_stat_thread, NULL);
   pthread_cond_init(&COND_ndb_index_stat_ready, NULL);
-  pthread_mutex_init(&ndb_index_stat_glob_mutex, MY_MUTEX_INIT_FAST);
   pthread_mutex_init(&ndb_index_stat_list_mutex, MY_MUTEX_INIT_FAST);
   pthread_mutex_init(&ndb_index_stat_stat_mutex, MY_MUTEX_INIT_FAST);
   pthread_cond_init(&ndb_index_stat_stat_cond, NULL);
@@ -11858,7 +11867,6 @@ static int ndbcluster_init(void *p)
     pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
     pthread_cond_destroy(&COND_ndb_index_stat_thread);
     pthread_cond_destroy(&COND_ndb_index_stat_ready);
-    pthread_mutex_destroy(&ndb_index_stat_glob_mutex);
     pthread_mutex_destroy(&ndb_index_stat_list_mutex);
     pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
     pthread_cond_destroy(&ndb_index_stat_stat_cond);
@@ -11879,7 +11887,6 @@ static int ndbcluster_init(void *p)
     pthread_mutex_destroy(&LOCK_ndb_index_stat_thread);
     pthread_cond_destroy(&COND_ndb_index_stat_thread);
     pthread_cond_destroy(&COND_ndb_index_stat_ready);
-    pthread_mutex_destroy(&ndb_index_stat_glob_mutex);
     pthread_mutex_destroy(&ndb_index_stat_list_mutex);
     pthread_mutex_destroy(&ndb_index_stat_stat_mutex);
     pthread_cond_destroy(&ndb_index_stat_stat_cond);
@@ -12145,6 +12152,18 @@ ha_ndbcluster::records_in_range(uint inx
           rows = 2;
         DBUG_RETURN(rows);
       }
+      if (err != 0 &&
+          /* no stats is not unexpected error */
+          err != NdbIndexStat::NoIndexStats &&
+          /* warning was printed at first error */
+          err != Ndb_index_stat_error_HAS_ERROR)
+      {
+        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+                            ER_CANT_GET_STAT, /* pun? */
+                            "index stats (RIR) for key %s:"
+                            " unexpected error %d",
+                            key_info->name, err);
+      }
       /*fall through*/
     }
 

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2011-07-08 15:05:28 +0000
+++ b/sql/ha_ndbcluster.h	2011-08-17 10:36:01 +0000
@@ -640,17 +640,15 @@ private:
   void no_uncommitted_rows_reset(THD *);
 
   /* Ordered index statistics v4 */
+  int ndb_index_stat_query(uint inx,
+                           const key_range *min_key,
+                           const key_range *max_key,
+                           NdbIndexStat::Stat& stat);
   int ndb_index_stat_get_rir(uint inx,
                              key_range *min_key,
                              key_range *max_key,
                              ha_rows *rows_out);
   int ndb_index_stat_set_rpk(uint inx);
-  int ndb_index_stat_wait(struct Ndb_index_stat *st,
-                          uint sample_version);
-  int ndb_index_stat_query(uint inx,
-                           const key_range *min_key,
-                           const key_range *max_key,
-                           NdbIndexStat::Stat& stat);
   int ndb_index_stat_analyze(Ndb *ndb,
                              uint *inx_list,
                              uint inx_count);

=== modified file 'sql/sql_select.cc'
--- a/sql/sql_select.cc	2011-07-05 12:46:07 +0000
+++ b/sql/sql_select.cc	2011-08-17 10:36:01 +0000
@@ -7070,7 +7070,15 @@ make_join_readinfo(JOIN *join, ulonglong
          (join->sort_by_table == (TABLE *) 1 && i != join->const_tables)))
       ordered_set= 1;
 
+#ifdef MCP_BUG12798270
     tab->sorted= sorted;
+#else
+    /*
+      For eq_ref there is at most one join match for each row from
+      previous tables so ordering is not useful.
+    */
+    tab->sorted= (tab->type != JT_EQ_REF) ? sorted : false;
+#endif
     sorted= 0;                                  // only first must be sorted
     table->status=STATUS_NO_RECORD;
     pick_table_access_method (tab);

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java	2011-03-08 00:44:56 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionFactoryImpl.java	2011-08-03 01:00:56 +0000
@@ -313,14 +313,13 @@ public class SessionFactoryImpl implemen
      * @param cls the Class for which to get domain type handler
      * @return the DomainTypeHandler or null if not available
      */
-    @SuppressWarnings( "unchecked" )
     public static <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls) {
-        DomainTypeHandler<T> domainTypeHandler;
         // synchronize here because the map is not synchronized
         synchronized(typeToHandlerMap) {
-            domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
+            @SuppressWarnings( "unchecked" )
+            DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
+            return domainTypeHandler;
         }
-        return domainTypeHandler;
     }
 
     /** Create or get the DomainTypeHandler for a class.
@@ -329,13 +328,13 @@ public class SessionFactoryImpl implemen
      * @param dictionary the dictionary to validate against
      * @return the type handler
      */
-    @SuppressWarnings( "unchecked" )
+    
     public <T> DomainTypeHandler<T> getDomainTypeHandler(Class<T> cls,
             Dictionary dictionary) {
-        DomainTypeHandler<T> domainTypeHandler;
         // synchronize here because the map is not synchronized
         synchronized(typeToHandlerMap) {
-            domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
+            @SuppressWarnings("unchecked")
+            DomainTypeHandler<T> domainTypeHandler = (DomainTypeHandler<T>) typeToHandlerMap.get(cls);
             if (logger.isDetailEnabled()) logger.detail("DomainTypeToHandler for "
                     + cls.getName() + "(" + cls
                     + ") returned " + domainTypeHandler);
@@ -351,8 +350,8 @@ public class SessionFactoryImpl implemen
                     proxyClassToDomainClass.put(proxyClass, cls);
                 }
             }
+            return domainTypeHandler;
         }
-        return domainTypeHandler;
     }
 
     /** Create or get the DomainTypeHandler for an instance.

=== modified file 'storage/ndb/clusterj/clusterj-jdbc/src/main/antlr3/com/mysql/clusterj/jdbc/antlr/MySQL51Lexer.g'
--- a/storage/ndb/clusterj/clusterj-jdbc/src/main/antlr3/com/mysql/clusterj/jdbc/antlr/MySQL51Lexer.g	2011-02-21 11:53:51 +0000
+++ b/storage/ndb/clusterj/clusterj-jdbc/src/main/antlr3/com/mysql/clusterj/jdbc/antlr/MySQL51Lexer.g	2011-08-16 23:47:38 +0000
@@ -117,6 +117,7 @@ IF	:	'IF';
 IGNORE	:	'IGNORE';
 IN	:	'IN';
 INDEX	:	'INDEX';
+INDEX_SYM  :    'INDEX_SYM';
 INFILE	:	'INFILE';
 INNER	:	'INNER';
 INNODB  : 'INNODB';
@@ -819,13 +820,13 @@ STRING
 	:	'N'?			// "introducer" for the national character set (UTF8 for MySQL 4.1 and up). must immediately precede the first quote character.
 		(	'"' 
 			(	('""')=> '""'
-			|	(ESCAPE_SEQUENCE)=> ESCAPE_SEQUENCE
+//			|	(ESCAPE_SEQUENCE)=> ESCAPE_SEQUENCE
 			|	~('"'|'\\')
 			)*
 			'"'	// TODO: collapse two consecutive internal double quotes into one
 		|	'\''
 			(	('\'\'')=> '\'\''
-			|	(ESCAPE_SEQUENCE)=> ESCAPE_SEQUENCE
+//			|	(ESCAPE_SEQUENCE)=> ESCAPE_SEQUENCE
 			|	~('\''|'\\')
 			)*
 			'\''	// TODO: same as above with single quotes
@@ -869,23 +870,25 @@ REAL_ID
 
 // TODO: these are case sensitive -> specifying them as lowercase in the grammar causes them to never be matched (because ANTLR doesn't know
 // we are only serving uppercase letters. Add trueCaseLA predicates here (but beware of hoisting)
-fragment
-ESCAPE_SEQUENCE
-	:	'\\'
-		(	'0'
-		|	'\''
-		|	'"'
-		|	'b'
-		|	'n'		// TODO currently this clashes with \N == NULL. add predicate!
-		|	'r'
-		|	't'
-		|	'Z'		// this is UPPERCASE! -> use ANTLRNoCaseStringStream.trueCaseLA() in predicate to resolve
-		|	'\\'
-		|	'%'
-		|	'_'
-		|	character=.	// TODO: collapse into just $char
-		)
-	;
+// TODO: this rule is broken; it is to parse Java source files not compiled strings.
+// The entire rule should be removed...
+//fragment
+//ESCAPE_SEQUENCE
+//	:	'\\'
+//		(	'0'
+//		|	'\''
+//		|	'"'
+//		|	'b'
+//		|	'n'		// TODO currently this clashes with \N == NULL. add predicate!
+//		|	'r'
+//		|	't'
+//		|	'Z'		// this is UPPERCASE! -> use ANTLRNoCaseStringStream.trueCaseLA() in predicate to resolve
+//		|	'\\'
+//		|	'%'
+//		|	'_'
+//              |    character=.     // TODO: collapse into just $char; this might be an error
+//		)
+//	;
 		
 fragment
 DIGIT

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPABrokerFactory.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPABrokerFactory.java	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPABrokerFactory.java	2011-08-17 10:36:01 +0000
@@ -59,7 +59,7 @@ public class NdbOpenJPABrokerFactory ext
      * Invoked from {@link Bootstrap#getBrokerFactory}.
      */
     public static NdbOpenJPABrokerFactory getInstance(ConfigurationProvider cp) {
-        Map<?, ?> props = cp.getProperties();
+        Map<String, Object> props = cp.getProperties();
         Object key = toPoolKey(props);
         NdbOpenJPABrokerFactory factory = (NdbOpenJPABrokerFactory)
             getPooledFactoryForKey(key);

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAConfigurationImpl.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAConfigurationImpl.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAConfigurationImpl.java	2011-08-03 01:02:19 +0000
@@ -30,7 +30,8 @@ import com.mysql.clusterj.core.util.I18N
 import com.mysql.clusterj.core.util.Logger;
 import com.mysql.clusterj.core.util.LoggerFactoryService;
 
-import java.util.Arrays;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
@@ -46,8 +47,11 @@ import org.apache.openjpa.lib.conf.Strin
  * Default implementation of the {@link NdbOpenJPAConfiguration} interface.
  * This implementation extends the JDBCConfiguration so both access
  * to MySQLd via JDBC and access to Ndb via ClusterJ are supported.
- *
+ * Type safety: The return type Map for getCacheMarshallerInstances()
+ * from the type OpenJPAConfigurationImpl needs unchecked conversion
+ * to conform to Map<String,CacheMarshaller> from the type OpenJPAConfiguration
  */
+@SuppressWarnings("unchecked")
 public class NdbOpenJPAConfigurationImpl extends JDBCConfigurationImpl
     implements NdbOpenJPAConfiguration, com.mysql.clusterj.Constants, DomainTypeHandlerFactory {
 
@@ -73,6 +77,52 @@ public class NdbOpenJPAConfigurationImpl
             new HashMap<Class<?>, NdbOpenJPADomainTypeHandlerImpl<?>>();
 
     /**
+     * These are to bridge an incompatibility between OpenJPA 1.x and 2.x in the handling of configuration
+     * values. In 1.x, the IntValue.get() method returns int and in 2.x it returns Integer.
+     * Similarly, in 1.x BooleanValue.get() returns boolean and in 2.x it returns Boolean.
+     */
+    static private Method intValueMethod;
+    static private Method booleanValueMethod;
+    static {
+        try {
+            intValueMethod = IntValue.class.getMethod("get", (Class[])null);
+            booleanValueMethod = BooleanValue.class.getMethod("get", (Class[])null);
+        } catch (SecurityException e) {
+            throw new ClusterJFatalInternalException(e);
+        } catch (NoSuchMethodException e) {
+            throw new ClusterJFatalInternalException(e);
+        }
+    }
+
+    /** Return the int value from a configuration IntValue.
+     * 
+     */
+    int getIntValue(IntValue value) {
+        try {
+            return (Integer)intValueMethod.invoke(value);
+        } catch (IllegalArgumentException e) {
+            throw new ClusterJFatalInternalException(e);
+        } catch (IllegalAccessException e) {
+            throw new ClusterJFatalInternalException(e);
+        } catch (InvocationTargetException e) {
+            throw new ClusterJFatalInternalException(e);
+        }
+    }
+    /** Return the boolean value from a configuration BooleanValue.
+     * 
+     */
+    boolean getBooleanValue(BooleanValue value) {
+        try {
+            return (Boolean) booleanValueMethod.invoke(value);
+        } catch (IllegalArgumentException e) {
+            throw new ClusterJFatalInternalException(e);
+        } catch (IllegalAccessException e) {
+            throw new ClusterJFatalInternalException(e);
+        } catch (InvocationTargetException e) {
+            throw new ClusterJFatalInternalException(e);
+        }
+    }
+    /**
      * Default constructor. Attempts to load default properties.
      */
     public NdbOpenJPAConfigurationImpl() {
@@ -143,7 +193,7 @@ public class NdbOpenJPAConfigurationImpl
     }
 
     public int getConnectRetries() {
-        return connectRetries.get();
+        return getIntValue(connectRetries);
     }
 
     public void setConnectRetries(int value) {
@@ -151,7 +201,7 @@ public class NdbOpenJPAConfigurationImpl
     }
 
     public int getConnectDelay() {
-        return connectDelay.get();
+        return getIntValue(connectDelay);
     }
 
     public void setConnectDelay(int value) {
@@ -159,7 +209,7 @@ public class NdbOpenJPAConfigurationImpl
     }
 
     public int getConnectVerbose() {
-        return connectVerbose.get();
+        return getIntValue(connectVerbose);
     }
 
     public void setConnectVerbose(int value) {
@@ -167,7 +217,7 @@ public class NdbOpenJPAConfigurationImpl
     }
 
     public int getConnectTimeoutBefore() {
-        return connectTimeoutBefore.get();
+        return getIntValue(connectTimeoutBefore);
     }
 
     public void setConnectTimeoutBefore(int value) {
@@ -175,7 +225,7 @@ public class NdbOpenJPAConfigurationImpl
     }
 
     public int getConnectTimeoutAfter() {
-        return connectTimeoutAfter.get();
+        return getIntValue(connectTimeoutAfter);
     }
 
     public void setConnectTimeoutAfter(int value) {
@@ -207,7 +257,7 @@ public class NdbOpenJPAConfigurationImpl
     }
 
     public int getMaxTransactions() {
-        return maxTransactions.get();
+        return getIntValue(maxTransactions);
     }
 
     public void setMaxTransactions(int value) {
@@ -215,7 +265,7 @@ public class NdbOpenJPAConfigurationImpl
     }
 
     public boolean getFailOnJDBCPath() {
-        return failOnJDBCPath.get();
+        return getBooleanValue(failOnJDBCPath);
     }
 
     public void setFailOnJDBCPath(boolean value) {
@@ -327,7 +377,6 @@ public class NdbOpenJPAConfigurationImpl
      * handler when performing a clusterj query for an openjpa entity. The class
      * must have already been registered via the openjpa clusterj path.
      */
-    @SuppressWarnings("unchecked")
     public <T> DomainTypeHandler<T> createDomainTypeHandler(
             Class<T> domainClass, Dictionary dictionary) {
         DomainTypeHandler<T> result = (DomainTypeHandler<T>) domainTypeHandlerMap.get(domainClass);

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainFieldHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainFieldHandlerImpl.java	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainFieldHandlerImpl.java	2011-08-17 10:36:01 +0000
@@ -66,6 +66,7 @@ import org.apache.openjpa.kernel.OpenJPA
 import org.apache.openjpa.meta.JavaTypes;
 import org.apache.openjpa.util.IntId;
 import org.apache.openjpa.util.LongId;
+import org.apache.openjpa.util.ObjectId;
 import org.apache.openjpa.util.OpenJPAId;
 import org.apache.openjpa.util.StringId;
 
@@ -717,7 +718,11 @@ public class NdbOpenJPADomainFieldHandle
     }
 
     protected Object getKeyValue(Object keys) {
-        return getKeyValue(oidField, keys);
+        Object key = keys;
+        if (keys instanceof ObjectId) {
+            key = ((ObjectId)keys).getId();
+        }
+        return getKeyValue(oidField, key);
     }
 
     protected static Object getKeyValue(Field field, Object keys) {
@@ -733,9 +738,12 @@ public class NdbOpenJPADomainFieldHandle
             if (logger.isDetailEnabled()) logger.detail("For field " + fieldName + " keys: " + keys + " value returned is " + result);
             return result;
         } catch (IllegalArgumentException ex) {
-            throw new ClusterJUserException("keys: " + keys);
+            String message = "IllegalArgumentException, field " + field.getDeclaringClass().getName() + ":" + field.getName() + " keys: " + keys;
+            logger.error(message);
+            throw new ClusterJUserException(message, ex);
         } catch (IllegalAccessException ex) {
-            throw new ClusterJUserException("keys: " + keys);
+            String message = "IllegalAccessException, field " + field.getDeclaringClass().getName() + ":" + field.getName() + " keys: " + keys;
+            throw new ClusterJUserException(message, ex);
         }
     }
 

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java	2011-08-03 01:02:19 +0000
@@ -110,7 +110,7 @@ public class NdbOpenJPADomainTypeHandler
         this.classMapping = classMapping;
         classMapping.resolve(0xffffffff);
         oidClass = classMapping.getObjectIdType();
-        this.describedType = classMapping.getDescribedType();
+        this.describedType = (Class<T>)classMapping.getDescribedType();
         if (classMapping.getPCSuperclass() != null) {
             // persistent subclasses are not supported
             message = local.message("ERR_Subclass", this.describedType);
@@ -370,19 +370,24 @@ public class NdbOpenJPADomainTypeHandler
                 classMapping.getPrimaryKeyFieldMappings();
         int numberOfFields = classMapping.getFieldMappings().length;
         Object[] keyValues = new Object[numberOfFields];
+        boolean nullKeyValue = false;
         if (primaryKeyFieldMappings.length != 1) {
         // for each key field, use the field value accessor to get the
         // value from the Oid and put it into the proper place in keyValues
             for (NdbOpenJPADomainFieldHandlerImpl fmd: primaryKeyFields) {
                 // store the key value from the oid into the keyValues array
                 // this can be improved with a smarter KeyValueHandlerImpl
-                keyValues[fmd.getFieldNumber()] = fmd.getKeyValue(keys);
+                Object keyValue = fmd.getKeyValue(keys);
+                keyValues[fmd.getFieldNumber()] = keyValue;
+                if (keyValue == null) {
+                    nullKeyValue = true;
+                }
             }
         } else {
             keyValues[primaryKeyFieldMappings[0].getIndex()] = keys;
         }
         KeyValueHandlerImpl keyHandler = new KeyValueHandlerImpl(keyValues);
-        return keyHandler;
+        return nullKeyValue?null:keyHandler;
     }
 
     public ValueHandler getValueHandler(Object instance) {

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAStoreManager.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAStoreManager.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPAStoreManager.java	2011-08-03 01:02:19 +0000
@@ -132,7 +132,14 @@ public class NdbOpenJPAStoreManager exte
         JDBCFetchConfiguration fetch) {
         if (logger.isDebugEnabled()) {
             logger.debug("NdbStoreManager.find(Object oid, ValueMapping vm, "
-                    + "JDBCFetchConfiguration fetch) delegated to super.");
+                    + "JDBCFetchConfiguration fetch) delegated to super with oid " + oid + ".");
+        }
+        // return null if the oid is null (this will be the case if a foreign key element is null)
+        ClassMapping cls = vm.getDeclaredTypeMapping();
+        NdbOpenJPADomainTypeHandlerImpl<?> domainTypeHandler = getDomainTypeHandler(cls);
+        Object handler = domainTypeHandler.createKeyValueHandler(oid);
+        if (handler == null) {
+            return null;
         }
         return super.find(oid, vm, fetch);
     }
@@ -244,8 +251,9 @@ public class NdbOpenJPAStoreManager exte
 //                    domainTypeHandler.createKeyValueHandler(id.getIdObject()));
                 // initialize via OpenJPA protocol
                 // select all columns from table
+                ValueHandler keyValueHandler = domainTypeHandler.createKeyValueHandler(id.getIdObject());
                 ResultData resultData = session.selectUnique(domainTypeHandler,
-                        domainTypeHandler.createKeyValueHandler(id.getIdObject()),
+                        keyValueHandler,
                         null);
                 // create an OpenJPA Result from the ndb result data
                 NdbOpenJPAResult result = new NdbOpenJPAResult(resultData, domainTypeHandler, null);

=== modified file 'storage/ndb/include/ndb_constants.h'
--- a/storage/ndb/include/ndb_constants.h	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/ndb_constants.h	2011-08-17 10:36:01 +0000
@@ -121,6 +121,7 @@
 #define NDB_INDEX_STAT_HEAD_TABLE    "ndb_index_stat_head"
 #define NDB_INDEX_STAT_SAMPLE_TABLE  "ndb_index_stat_sample"
 #define NDB_INDEX_STAT_SAMPLE_INDEX1 "ndb_index_stat_sample_x1"
+#define NDB_INDEX_STAT_HEAD_EVENT    "ndb_index_stat_head_event"
 
 #define NDB_INDEX_STAT_PREFIX        "ndb_index_stat"
 

=== modified file 'storage/ndb/include/ndbapi/NdbIndexStat.hpp'
--- a/storage/ndb/include/ndbapi/NdbIndexStat.hpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/ndbapi/NdbIndexStat.hpp	2011-08-17 10:36:01 +0000
@@ -101,7 +101,10 @@ public:
     InvalidCache = 4718,
     InternalError = 4719,
     BadSysTables = 4720,  // sys tables partly missing or invalid
-    HaveSysTables = 4244  // create error if all sys tables exist
+    HaveSysTables = 4244, // create error if all sys tables exist
+    NoSysEvents = 4710,
+    BadSysEvents = BadSysTables,
+    HaveSysEvents = 746
   };
 
   /*
@@ -192,6 +195,7 @@ public:
    */
   struct Head {
     Int32 m_found;        // -1 no read done, 0 = no record, 1 = exists
+    Int32 m_eventType;    // if polling, NdbDictionary::Event::TE_INSERT etc
     Uint32 m_indexId;
     Uint32 m_indexVersion;
     Uint32 m_tableId;
@@ -328,15 +332,51 @@ public:
   static void get_rule(const Stat& stat, char* buffer);
 
   /*
-   * Memory allocator for the stats caches.  By default each instance
-   * uses its own malloc-based implementation.
+   * Events (there is 1) for polling.  These are dictionary objects.
+   * Correct sys tables must exist.  Drop ignores non-existing events.
+   */
+  int create_sysevents(Ndb* ndb);
+  int drop_sysevents(Ndb* ndb);
+  int check_sysevents(Ndb* ndb);
+
+  /*
+   * Create listener for stats updates.  Only 1 is allowed.
+   */
+  int create_listener(Ndb* ndb);
+
+  /*
+   * Start listening for events (call NdbEventOperation::execute).
+   */
+  int execute_listener(Ndb* ndb);
+
+  /*
+   * Poll the listener (call Ndb::pollEvents).  Returns 1 if there are
+   * events available and 0 otherwise, or -1 on failure as usual.
+   */
+  int poll_listener(Ndb* ndb, int max_wait_ms);
+
+  /*
+   * Get next available event.  Returns 1 if a new event was returned
+   * and 0 otherwise, or -1 on failure as usual.  Use get_heed() to
+   * retrieve event type and data.
+   */
+  int next_listener(Ndb* ndb);
+
+  /*
+   * Drop the listener.
+   */
+  int drop_listener(Ndb* ndb);
+
+  /*
+   * Memory allocator for stats cache data (key and value byte arrays).
+   * Implementation default uses malloc/free.  The memory in use is the
+   * sum of CacheInfo::m_totalBytes from all cache types.
    */
   struct Mem {
     Mem();
     virtual ~Mem();
     virtual void* mem_alloc(UintPtr size) = 0;
     virtual void mem_free(void* ptr) = 0;
-    virtual UintPtr mem_used() const = 0;
   };
 
   /*

=== modified file 'storage/ndb/include/util/NdbPack.hpp'
--- a/storage/ndb/include/util/NdbPack.hpp	2011-05-04 09:44:18 +0000
+++ b/storage/ndb/include/util/NdbPack.hpp	2011-08-09 15:37:45 +0000
@@ -117,7 +117,7 @@ public:
   class Endian {
   public:
     enum Value {
-      Native = 0,
+      Native = 0, // replaced by actual value
       Little = 1,
       Big = 2
     };
@@ -300,6 +300,10 @@ public:
    * Instance of an array of data values.  The values are packed into
    * a byte buffer.  The buffer is also maintained as a single varbinary
    * value if non-zero var bytes (length bytes) is specified.
+   *
+   * Data instances can be received from another source (such as table
+   * in database) and may not be native-endian.  Such instances must
+   * first be completed with desc_all() and convert().
    */
   class Data : public DataC {
   public:
@@ -324,7 +328,7 @@ public:
     // convert endian
     int convert(Endian::Value to_endian);
     // create complete instance from buffer contents
-    int desc_all(Uint32 cnt);
+    int desc_all(Uint32 cnt, Endian::Value from_endian);
     // getters
     Uint32 get_max_len() const;
     Uint32 get_max_len4() const;
@@ -351,7 +355,7 @@ public:
     const Uint32 m_varBytes;
     Uint8* m_buf;
     Uint32 m_bufMaxLen;
-    Endian::Value m_endian; // Native until finalize()
+    Endian::Value m_endian;
     // iterator on items added
     Iter m_iter;
   };
@@ -685,7 +689,7 @@ NdbPack::Data::Data(const Spec& spec, bo
 {
   m_buf = 0;
   m_bufMaxLen = 0;
-  m_endian = Endian::Native;
+  m_endian = Endian::get_endian();
 }
 
 inline void
@@ -704,7 +708,7 @@ NdbPack::Data::reset()
   m_cnt = 0;  // in DataC
   const Uint32 bytes = m_varBytes + m_spec.get_nullmask_len(m_allNullable);
   memset(m_buf, 0, bytes);
-  m_endian = Endian::Native;
+  m_endian = Endian::get_endian();
   m_iter.reset();
 }
 
@@ -713,17 +717,14 @@ NdbPack::Data::finalize()
 {
   if (m_varBytes == 0 ||
       finalize_impl() == 0)
-  {
-    m_endian = Endian::get_endian();
     return 0;
-  }
   return -1;
 }
 
 inline int
 NdbPack::Data::convert(Endian::Value to_endian)
 {
-  if (unlikely(to_endian == Endian::Native))
+  if (to_endian == Endian::Native)
     to_endian = Endian::get_endian();
   if (m_endian == to_endian)
     return 0;

=== modified file 'storage/ndb/src/common/util/NdbPack.cpp'
--- a/storage/ndb/src/common/util/NdbPack.cpp	2011-05-09 15:35:25 +0000
+++ b/storage/ndb/src/common/util/NdbPack.cpp	2011-08-09 15:37:45 +0000
@@ -560,8 +560,11 @@ NdbPack::Data::finalize_impl()
 }
 
 int
-NdbPack::Data::desc_all(Uint32 cnt)
+NdbPack::Data::desc_all(Uint32 cnt, Endian::Value from_endian)
 {
+  if (from_endian == NdbPack::Endian::Native)
+    from_endian = NdbPack::Endian::get_endian();
+  m_endian = from_endian;
   assert(m_cnt == 0); // reset() would destroy nullmask
   for (Uint32 i = 0; i < cnt; i++)
   {
@@ -757,7 +760,7 @@ NdbPack::Spec::print(char* buf, Uint32 b
 
 // print DataC
 
-bool g_ndb_pack_print_hex_always = false;
+bool g_ndb_pack_print_hex_always = true;
 
 NdbOut&
 operator<<(NdbOut& out, const NdbPack::DataC& a)
@@ -1836,7 +1839,7 @@ testdesc(const Tdata& tdata)
   Uint8 buf_new[Tspec::MaxBuf];
   data_new.set_buf(buf_new, sizeof(buf_new));
   memcpy(buf_new, buf_old, fullLen);
-  chk2(data_new.desc_all(cnt) == 0, data_new);
+  chk2(data_new.desc_all(cnt, NdbPack::Endian::Native) == 0, data_new);
   chk1(memcmp(buf_new, data.get_full_buf(), data.get_full_len()) == 0);
   chk1(data_new.get_data_len() == data.get_data_len());
   chk1(data_new.get_cnt() == data.get_cnt());
@@ -1903,28 +1906,17 @@ testconvert(const Tdata& tdata)
   require(data.get_cnt() == data_new.get_cnt());
   const Uint32 cnt = tdata.m_cnt;
   Uint32 num_eq;
-  switch (NdbPack::Endian::get_endian()) {
-  case NdbPack::Endian::Little:
-    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
-    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    break;
-  case NdbPack::Endian::Big:
-    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    chk2(data_new.convert(NdbPack::Endian::Little) == 0, data_new);
-    chk2(data_new.convert(NdbPack::Endian::Big) == 0, data_new);
-    num_eq = ~(Uint32)0;
-    chk1(data.cmp(data_new, cnt, num_eq) == 0);
-    break;
-  default:
-    require(false);
-    break;
+  int i;
+  for (i = 0; i < 10; i++) {
+    int k = getrandom(3); // assumes Endian::Value 0,1,2
+    NdbPack::Endian::Value v = (NdbPack::Endian::Value)k;
+    chk2(data_new.convert(v) == 0, data_new);
+    if (v == NdbPack::Endian::Native ||
+        v == NdbPack::Endian::get_endian()) {
+      num_eq = ~(Uint32)0;
+      chk1(data.cmp(data_new, cnt, num_eq) == 0);
+      require(num_eq == cnt);
+    }
   }
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-08-17 10:36:01 +0000
@@ -155,19 +155,26 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan
 {
   const Frag& frag = node.m_frag;
   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
-  KeyData prefKey(index.m_keySpec, false, 0);
-  prefKey.set_buf(node.getPref(), index.m_prefBytes);
+  /*
+   * bug#12873640
+   * Node prefix exists if it has non-zero number of attributes.  It is
+   * then a partial instance of KeyData.  If the prefix does not exist
+   * then set_buf() could overwrite m_pageId1 in first entry, causing
+   * random crash in TUP via readKeyAttrs().
+   */
   if (index.m_prefAttrs > 0) {
+    KeyData prefKey(index.m_keySpec, false, 0);
+    prefKey.set_buf(node.getPref(), index.m_prefBytes);
     jam();
     readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs);
-  }
 #ifdef VM_TRACE
-  if (debugFlags & DebugMaint) {
-    debugOut << "setNodePref: " << node;
-    debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
-    debugOut << endl;
-  }
+    if (debugFlags & DebugMaint) {
+      debugOut << "setNodePref: " << node;
+      debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+      debugOut << endl;
+    }
 #endif
+  }
 }
 
 // node operations

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp	2011-08-16 08:27:14 +0000
@@ -39,6 +39,16 @@ static const char * f_method = "MSms";
 #endif
 #define MAX_CHUNKS 10
 
+#ifdef VM_TRACE
+#ifndef NDBD_RANDOM_START_PAGE
+#define NDBD_RANDOM_START_PAGE
+#endif
+#endif
+
+#ifdef NDBD_RANDOM_START_PAGE
+static Uint32 g_random_start_page_id = 0;
+#endif
+
 /*
  * For muti-threaded ndbd, these calls are used for locking around
  * memory allocation operations.
@@ -224,6 +234,16 @@ Ndbd_mem_manager::Ndbd_mem_manager()
   mt_mem_manager_init();
 }
 
+void*
+Ndbd_mem_manager::get_memroot() const
+{
+#ifdef NDBD_RANDOM_START_PAGE
+  return (void*)(m_base_page - g_random_start_page_id);
+#else
+  return (void*)m_base_page;
+#endif
+}
+
 /**
  *
  * resource 0 has following semantics:
@@ -359,6 +379,29 @@ Ndbd_mem_manager::init(Uint32 *watchCoun
   }
 #endif
 
+#ifdef NDBD_RANDOM_START_PAGE
+  /**
+   * In order to find bad-users of page-id's
+   *   we add a random offset to the page-id's returned
+   *   however, due to ZONE_LO that offset can't be that big
+   *   (since we at get_page don't know if it's a HI/LO page)
+   */
+  Uint32 max_rand_start = ZONE_LO_BOUND - 1;
+  if (max_rand_start > pages)
+  {
+    max_rand_start -= pages;
+    if (max_rand_start > 0x10000)
+      g_random_start_page_id = 0x10000 + (rand() % (max_rand_start - 0x10000));
+    else if (max_rand_start)
+      g_random_start_page_id = rand() % max_rand_start;
+
+    assert(Uint64(pages) + Uint64(g_random_start_page_id) <= 0xFFFFFFFF);
+
+    ndbout_c("using g_random_start_page_id: %u (%.8x)",
+             g_random_start_page_id, g_random_start_page_id);
+  }
+#endif
+
   /**
    * Do malloc
    */
@@ -670,7 +713,7 @@ Ndbd_mem_manager::alloc(AllocZone zone,
       return;
     * pages = save;
   }
-  
+
   alloc_impl(ZONE_LO, ret, pages, min);
 }
 
@@ -870,7 +913,12 @@ Ndbd_mem_manager::alloc_page(Uint32 type
 
       check_resource_limits(m_resource_limit);
       mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+      *i += g_random_start_page_id;
+      return m_base_page + *i - g_random_start_page_id;
+#else
       return m_base_page + *i;
+#endif
     }
   }
   mt_mem_manager_unlock();
@@ -885,7 +933,11 @@ Ndbd_mem_manager::release_page(Uint32 ty
   mt_mem_manager_lock();
   Resource_limit tot = m_resource_limit[0];
   Resource_limit rl = m_resource_limit[idx];
-  
+
+#ifdef NDBD_RANDOM_START_PAGE
+  i -= g_random_start_page_id;
+#endif
+
   Uint32 sub = (rl.m_curr <= rl.m_min) ? 1 : 0; // Over min ?
   release(i, 1);
   m_resource_limit[0].m_curr = tot.m_curr - 1;
@@ -954,10 +1006,16 @@ Ndbd_mem_manager::alloc_pages(Uint32 typ
     m_resource_limit[idx].m_curr = rl.m_curr + req;
     check_resource_limits(m_resource_limit);
     mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+    *i += g_random_start_page_id;
+#endif
     return ;
   }
   mt_mem_manager_unlock();
   * cnt = req;
+#ifdef NDBD_RANDOM_START_PAGE
+  *i += g_random_start_page_id;
+#endif
   return;
 }
 
@@ -969,7 +1027,11 @@ Ndbd_mem_manager::release_pages(Uint32 t
   mt_mem_manager_lock();
   Resource_limit tot = m_resource_limit[0];
   Resource_limit rl = m_resource_limit[idx];
-  
+
+#ifdef NDBD_RANDOM_START_PAGE
+  i -= g_random_start_page_id;
+#endif
+
   release(i, cnt);
 
   Uint32 currnew = rl.m_curr - cnt;

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2011-08-17 10:36:01 +0000
@@ -68,7 +68,7 @@ public:
 
   bool init(Uint32 *watchCounter, bool allow_alloc_less_than_requested = true);
   void map(Uint32 * watchCounter, bool memlock = false, Uint32 resources[] = 0);
-  void* get_memroot() const { return (void*)m_base_page;}
+  void* get_memroot() const;
   
   void dump() const ;
   

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStat.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStat.cpp	2011-08-17 10:36:01 +0000
@@ -656,6 +656,82 @@ NdbIndexStat::get_rule(const Stat& stat_
   DBUG_VOID_RETURN;
 }
 
+// events and polling
+
+int
+NdbIndexStat::create_sysevents(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::create_sysevents");
+  if (m_impl.create_sysevents(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::drop_sysevents(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::drop_sysevents");
+  if (m_impl.drop_sysevents(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::check_sysevents(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::check_sysevents");
+  if (m_impl.check_sysevents(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::create_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::create_listener");
+  if (m_impl.create_listener(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::execute_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::execute_listener");
+  if (m_impl.execute_listener(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
+int
+NdbIndexStat::poll_listener(Ndb* ndb, int max_wait_ms)
+{
+  DBUG_ENTER("NdbIndexStat::poll_listener");
+  int ret = m_impl.poll_listener(ndb, max_wait_ms);
+  if (ret == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(ret);
+}
+
+int
+NdbIndexStat::next_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::next_listener");
+  int ret = m_impl.next_listener(ndb);
+  if (ret == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(ret);
+}
+
+int
+NdbIndexStat::drop_listener(Ndb* ndb)
+{
+  DBUG_ENTER("NdbIndexStat::drop_listener");
+  if (m_impl.drop_listener(ndb) == -1)
+    DBUG_RETURN(-1);
+  DBUG_RETURN(0);
+}
+
 // mem
 
 NdbIndexStat::Mem::Mem()

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp	2011-08-17 10:36:01 +0000
@@ -22,6 +22,7 @@
 #include <Bitmask.hpp>
 #include <NdbSqlUtil.hpp>
 #include <NdbRecord.hpp>
+#include <NdbEventOperation.hpp>
 #include "NdbIndexStatImpl.hpp"
 
 #undef min
@@ -33,8 +34,8 @@ static const char* const g_headtable_nam
 static const char* const g_sampletable_name = NDB_INDEX_STAT_SAMPLE_TABLE;
 static const char* const g_sampleindex1_name = NDB_INDEX_STAT_SAMPLE_INDEX1;
 
-const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
-const int ERR_TupleNotFound[] = { 626, 0 };
+static const int ERR_NoSuchObject[] = { 709, 723, 4243, 0 };
+static const int ERR_TupleNotFound[] = { 626, 0 };
 
 NdbIndexStatImpl::NdbIndexStatImpl(NdbIndexStat& facade) :
   NdbIndexStat(*this),
@@ -45,7 +46,8 @@ NdbIndexStatImpl::NdbIndexStatImpl(NdbIn
   init();
   m_query_mutex = NdbMutex_Create();
   assert(m_query_mutex != 0);
-  m_mem_handler = &g_mem_default_handler;
+  m_eventOp = 0;
+  m_mem_handler = &c_mem_default_handler;
 }
 
 void
@@ -544,10 +546,8 @@ NdbIndexStatImpl::drop_systables(Ndb* nd
 }
 
 int
-NdbIndexStatImpl::check_systables(Ndb* ndb)
+NdbIndexStatImpl::check_systables(Sys& sys)
 {
-  Sys sys(this, ndb);
-
   if (get_systables(sys) == -1)
     return -1;
 
@@ -566,6 +566,17 @@ NdbIndexStatImpl::check_systables(Ndb* n
   return 0;
 }
 
+int
+NdbIndexStatImpl::check_systables(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+  
+  if (check_systables(sys) == -1)
+    return -1;
+
+  return 0;
+}
+
 // operation context
 
 NdbIndexStatImpl::Con::Con(NdbIndexStatImpl* impl, Head& head, Ndb* ndb) :
@@ -725,7 +736,7 @@ NdbIndexStatImpl::set_index(const NdbDic
       }
       NdbPack::Type type (
         icol->getType(),
-        icol->getArrayType() + icol->getSize() * icol->getLength(),
+        icol->getSizeInBytes(),
         icol->getNullable(),
         icol->getCharset() != 0 ? icol->getCharset()->number : 0
       );
@@ -742,7 +753,7 @@ NdbIndexStatImpl::set_index(const NdbDic
     // rir + rpk
     if (m_valueSpec.add(type, m_valueAttrs) == -1)
     {
-      setError(InternalError, __LINE__);
+      setError(InternalError, __LINE__, m_valueSpec.get_error_code());
       return -1;
     }
   }
@@ -781,6 +792,7 @@ void
 NdbIndexStatImpl::init_head(Head& head)
 {
   head.m_found = -1;
+  head.m_eventType = -1;
   head.m_indexId = 0;
   head.m_indexVersion = 0;
   head.m_tableId = 0;
@@ -1153,15 +1165,32 @@ NdbIndexStatImpl::read_next(Con& con)
       setError(con, __LINE__);
     return ret;
   }
-  // create consistent NdbPack::Data instances
-  if (m_keyData.desc_all(m_keyAttrs) == -1)
+
+  /*
+   * Key and value are raw data and little-endian.  Create the complete
+   * NdbPack::Data instance and convert it to native-endian.
+   */
+  const NdbPack::Endian::Value from_endian = NdbPack::Endian::Little;
+  const NdbPack::Endian::Value to_endian = NdbPack::Endian::Native;
+
+  if (m_keyData.desc_all(m_keyAttrs, from_endian) == -1)
   {
-    setError(InternalError, __LINE__);
+    setError(InternalError, __LINE__, m_keyData.get_error_code());
     return -1;
   }
-  if (m_valueData.desc_all(m_valueAttrs) == -1)
+  if (m_keyData.convert(to_endian) == -1)
   {
-    setError(InternalError, __LINE__);
+    setError(InternalError, __LINE__, m_keyData.get_error_code());
+    return -1;
+  }
+  if (m_valueData.desc_all(m_valueAttrs, from_endian) == -1)
+  {
+    setError(InternalError, __LINE__, m_valueData.get_error_code());
+    return -1;
+  }
+  if (m_valueData.convert(to_endian) == -1)
+  {
+    setError(InternalError, __LINE__, m_valueData.get_error_code());
     return -1;
   }
   return 0;
@@ -1183,13 +1212,12 @@ NdbIndexStatImpl::read_commit(Con& con)
 int
 NdbIndexStatImpl::save_start(Con& con)
 {
-  Mem* mem = m_mem_handler;
   if (m_cacheBuild != 0)
   {
     free_cache(m_cacheBuild);
     m_cacheBuild = 0;
   }
-  con.m_cacheBuild = (Cache*)mem->mem_alloc(sizeof(Cache));
+  con.m_cacheBuild = new Cache;
   if (con.m_cacheBuild == 0)
   {
     setError(NoMemError, __LINE__);
@@ -1725,7 +1753,7 @@ NdbIndexStatImpl::free_cache(Cache* c)
   mem->mem_free(c->m_addrArray);
   mem->mem_free(c->m_keyArray);
   mem->mem_free(c->m_valueArray);
-  mem->mem_free(c);
+  delete c;
 }
 
 void
@@ -1838,7 +1866,9 @@ NdbIndexStatImpl::convert_range(Range& r
     Uint32 len_out;
     for (uint i = 0; i < key_count; i++)
     {
-      const NdbRecord::Attr& attr = key_record->columns[i];
+      const uint i2 = key_record->key_indexes[i];
+      require(i2 < key_record->noOfColumns);
+      const NdbRecord::Attr& attr = key_record->columns[i2];
       if (!attr.is_null(key))
       {
         const char* data = key + attr.offset;
@@ -1855,7 +1885,7 @@ NdbIndexStatImpl::convert_range(Range& r
         }
         if (bound.m_data.add(data, &len_out) == -1)
         {
-          setError(InternalError, __LINE__);
+          setError(InternalError, __LINE__, bound.m_data.get_error_code());
           return -1;
         }
       }
@@ -1863,7 +1893,7 @@ NdbIndexStatImpl::convert_range(Range& r
       {
         if (bound.m_data.add_null(&len_out) == -1)
         {
-          setError(InternalError, __LINE__);
+          setError(InternalError, __LINE__, bound.m_data.get_error_code());
           return -1;
         }
       }
@@ -2241,59 +2271,206 @@ NdbIndexStatImpl::query_keycmp(const Cac
   return res;
 }
 
-// mem alloc - default impl
+// events and polling
 
-NdbIndexStatImpl::MemDefault
-NdbIndexStatImpl::g_mem_default_handler;
+int
+NdbIndexStatImpl::create_sysevents(Ndb* ndb)
+{
+  Sys sys(this, ndb);
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
 
-NdbIndexStatImpl::MemDefault::MemDefault()
+  if (check_systables(sys) == -1)
+    return -1;
+  const NdbDictionary::Table* tab = sys.m_headtable;
+  require(tab != 0);
+
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  NdbDictionary::Event ev(evname, *tab);
+  ev.addTableEvent(NdbDictionary::Event::TE_INSERT);
+  ev.addTableEvent(NdbDictionary::Event::TE_DELETE);
+  ev.addTableEvent(NdbDictionary::Event::TE_UPDATE);
+  for (int i = 0; i < tab->getNoOfColumns(); i++)
+    ev.addEventColumn(i);
+  ev.setReport(NdbDictionary::Event::ER_UPDATED);
+
+  if (dic->createEvent(ev) == -1)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::drop_sysevents(Ndb* ndb)
 {
-  m_used = 0;
+  Sys sys(this, ndb);
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+  if (check_systables(sys) == -1)
+    return -1;
+
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  if (dic->dropEvent(evname) == -1)
+  {
+    int code = dic->getNdbError().code;
+    if (code != 4710)
+    {
+      setError(dic->getNdbError().code, __LINE__);
+      return -1;
+    }
+  }
+  return 0;
 }
 
-NdbIndexStatImpl::MemDefault::~MemDefault()
+int
+NdbIndexStatImpl::check_sysevents(Ndb* ndb)
 {
-  assert(m_used == 0);
+  Sys sys(this, ndb);
+  NdbDictionary::Dictionary* const dic = ndb->getDictionary();
+
+  if (check_systables(sys) == -1)
+    return -1;
+
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  const NdbDictionary::Event* ev = dic->getEvent(evname);
+  if (ev == 0)
+  {
+    setError(dic->getNdbError().code, __LINE__);
+    return -1;
+  }
+  delete ev; // getEvent() creates new instance
+  return 0;
 }
 
-void*
-NdbIndexStatImpl::MemDefault::mem_alloc(UintPtr size)
+int
+NdbIndexStatImpl::create_listener(Ndb* ndb)
 {
-  if (size == 0 || size % 4 != 0)
+  if (m_eventOp != 0)
   {
-    size += 4 - size % 4;
+    setError(UsageError, __LINE__);
+    return -1;
   }
-  Item* item = (Item*)malloc(sizeof(Item) + size);
-  if (item != 0)
+  const char* const evname = NDB_INDEX_STAT_HEAD_EVENT;
+  m_eventOp = ndb->createEventOperation(evname);
+  if (m_eventOp == 0)
   {
-    item->m_magic = MemMagic;
-    item->m_size = size;
-    void* ptr = &item[1];
-    m_used += size;
-    return ptr;
+    setError(ndb->getNdbError().code, __LINE__);
+    return -1;
+  }
+
+  // all columns are non-nullable
+  Head& head = m_facadeHead;
+  if (m_eventOp->getValue("index_id", (char*)&head.m_indexId) == 0 ||
+      m_eventOp->getValue("index_version", (char*)&head.m_indexVersion) == 0 ||
+      m_eventOp->getValue("table_id", (char*)&head.m_tableId) == 0 ||
+      m_eventOp->getValue("frag_count", (char*)&head.m_fragCount) == 0 ||
+      m_eventOp->getValue("value_format", (char*)&head.m_valueFormat) == 0 ||
+      m_eventOp->getValue("sample_version", (char*)&head.m_sampleVersion) == 0 ||
+      m_eventOp->getValue("load_time", (char*)&head.m_loadTime) == 0 ||
+      m_eventOp->getValue("sample_count", (char*)&head.m_sampleCount) == 0 ||
+      m_eventOp->getValue("key_bytes", (char*)&head.m_keyBytes) == 0)
+  {
+    setError(m_eventOp->getNdbError().code, __LINE__);
+    return -1;
+  }
+  // wl4124_todo why this
+  static Head xxx;
+  if (m_eventOp->getPreValue("index_id", (char*)&xxx.m_indexId) == 0 ||
+      m_eventOp->getPreValue("index_version", (char*)&xxx.m_indexVersion) == 0 ||
+      m_eventOp->getPreValue("table_id", (char*)&xxx.m_tableId) == 0 ||
+      m_eventOp->getPreValue("frag_count", (char*)&xxx.m_fragCount) == 0 ||
+      m_eventOp->getPreValue("value_format", (char*)&xxx.m_valueFormat) == 0 ||
+      m_eventOp->getPreValue("sample_version", (char*)&xxx.m_sampleVersion) == 0 ||
+      m_eventOp->getPreValue("load_time", (char*)&xxx.m_loadTime) == 0 ||
+      m_eventOp->getPreValue("sample_count", (char*)&xxx.m_sampleCount) == 0 ||
+      m_eventOp->getPreValue("key_bytes", (char*)&xxx.m_keyBytes) == 0)
+  {
+    setError(m_eventOp->getNdbError().code, __LINE__);
+    return -1;
   }
   return 0;
 }
 
-void
-NdbIndexStatImpl::MemDefault::mem_free(void* ptr)
+int
+NdbIndexStatImpl::execute_listener(Ndb* ndb)
 {
-  if (ptr != 0)
+  if (m_eventOp == 0)
+  {
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  if (m_eventOp->execute() == -1)
+  {
+    setError(m_eventOp->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return 0;
+}
+
+int
+NdbIndexStatImpl::poll_listener(Ndb* ndb, int max_wait_ms)
+{
+  int ret;
+  if ((ret = ndb->pollEvents(max_wait_ms)) < 0)
+  {
+    setError(ndb->getNdbError().code, __LINE__);
+    return -1;
+  }
+  return (ret == 0 ? 0 : 1);
+}
+
+int
+NdbIndexStatImpl::next_listener(Ndb* ndb)
+{
+  NdbEventOperation* op = ndb->nextEvent();
+  if (op == 0)
+    return 0;
+
+  Head& head = m_facadeHead;
+  head.m_eventType = (int)op->getEventType();
+  return 1;
+}
+
+int
+NdbIndexStatImpl::drop_listener(Ndb* ndb)
+{
+  if (m_eventOp == 0)
   {
-    Item* item = (Item*)ptr - 1;
-    assert(item->m_magic == MemMagic);
-    size_t size = item->m_size;
-    item->m_magic = 0;
-    free(item);
-    assert(m_used >= size);
-    m_used -= size;
+    setError(UsageError, __LINE__);
+    return -1;
+  }
+  if (ndb->dropEventOperation(m_eventOp) != 0)
+  {
+    setError(ndb->getNdbError().code, __LINE__);
+    return -1;
   }
+  m_eventOp = 0;
+  return 0;
 }
 
-UintPtr
-NdbIndexStatImpl::MemDefault::mem_used() const
+// mem alloc - default impl
+
+NdbIndexStatImpl::MemDefault::MemDefault()
+{
+}
+
+NdbIndexStatImpl::MemDefault::~MemDefault()
+{
+}
+
+void*
+NdbIndexStatImpl::MemDefault::mem_alloc(UintPtr size)
+{
+  void* ptr = malloc(size);
+  return ptr;
+}
+
+void
+NdbIndexStatImpl::MemDefault::mem_free(void* ptr)
 {
-  return m_used;
+  if (ptr != 0)
+    free(ptr);
 }
 
 // error

=== modified file 'storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp	2011-08-17 10:36:01 +0000
@@ -29,6 +29,7 @@ class NdbTransaction;
 class NdbIndexScanOperation;
 class NdbRecAttr;
 class NdbOperation;
+class NdbEventOperation;
 
 extern const uint g_ndb_index_stat_head_frm_len;
 extern const uint8 g_ndb_index_stat_head_frm_data[];
@@ -71,6 +72,7 @@ public:
   Cache* m_cacheClean;
   // mutex for query cache switch, memory barrier would do
   NdbMutex* m_query_mutex;
+  NdbEventOperation* m_eventOp;
   Mem* m_mem_handler;
   NdbIndexStat::Error m_error;
 
@@ -98,6 +100,7 @@ public:
   int get_systables(Sys& sys);
   int create_systables(Ndb* ndb);
   int drop_systables(Ndb* ndb);
+  int check_systables(Sys& sys);
   int check_systables(Ndb* ndb);
 
   // operation context
@@ -279,22 +282,25 @@ public:
   void query_search(const Cache&, const Bound&, StatBound&);
   int query_keycmp(const Cache&, const Bound&, uint pos, Uint32& numEq);
 
+  // events and polling
+  int create_sysevents(Ndb* ndb);
+  int drop_sysevents(Ndb* ndb);
+  int check_sysevents(Ndb* ndb);
+  //
+  int create_listener(Ndb* ndb);
+  int execute_listener(Ndb* ndb);
+  int poll_listener(Ndb* ndb, int max_wait_ms);
+  int next_listener(Ndb* ndb);
+  int drop_listener(Ndb* ndb);
+
   // default memory allocator
   struct MemDefault : public Mem {
     virtual void* mem_alloc(UintPtr bytes);
-    virtual void mem_free(void* p);
-    virtual UintPtr mem_used() const;
+    virtual void mem_free(void* ptr);
     MemDefault();
     virtual ~MemDefault();
-  private:
-    enum { MemMagic = 0xf1f2f3f4 };
-    struct Item {
-      Uint32 m_magic;
-      size_t m_size;
-    };
-    size_t m_used;
   };
-  static MemDefault g_mem_default_handler;
+  MemDefault c_mem_default_handler;
 
   // error
   const NdbIndexStat::Error& getNdbError() const;

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-07-04 08:38:03 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 10:36:01 +0000
@@ -87,6 +87,32 @@ const bool traceSignals = false;
  * children. That way, results for child operations can be updated correctly
  * when the application iterates over the results of the root scan operation.
  */
+class TupleCorrelation
+{
+public:
+  static const Uint32 wordCount = 1;
+
+  explicit TupleCorrelation()
+  : m_correlation((tupleNotFound<<16) | tupleNotFound)
+  {}
+
+  /** Conversion to/from Uint32 to store/fetch from buffers */
+  explicit TupleCorrelation(Uint32 val)
+  : m_correlation(val)
+  {}
+  Uint32 toUint32() const 
+  { return m_correlation; }
+
+  Uint16 getTupleId() const
+  { return m_correlation & 0xffff;}
+
+  Uint16 getParentTupleId() const
+  { return m_correlation >> 16;}
+
+private:
+  Uint32 m_correlation;
+}; // class TupleCorrelation
+
 class CorrelationData
 {
 public:
@@ -99,18 +125,15 @@ public:
     assert(AttributeHeader(m_corrPart[0]).getAttributeId() 
            == AttributeHeader::CORR_FACTOR64);
     assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
-    assert(getTupleId()<tupleNotFound);
-    assert(getParentTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
   }
 
   Uint32 getRootReceiverId() const
   { return m_corrPart[2];}
 
-  Uint16 getTupleId() const
-  { return m_corrPart[1] & 0xffff;}
-
-  Uint16 getParentTupleId() const
-  { return m_corrPart[1] >> 16;}
+  const TupleCorrelation getTupleCorrelation() const
+  { return TupleCorrelation(m_corrPart[1]); }
 
 private:
   const Uint32* const m_corrPart;
@@ -148,6 +171,8 @@ public:
 
   explicit NdbRootFragment();
 
+  ~NdbRootFragment();
+
   /**
    * Initialize object.
    * @param query Enclosing query.
@@ -164,6 +189,11 @@ public:
    */
   void reset();
 
+  /**
+   * Prepare for reading another batch of results.
+   */
+  void prepareResultSet();
+
   void incrOutstandingResults(Int32 delta)
   {
     m_outstandingResults += delta;
@@ -174,7 +204,7 @@ public:
     m_outstandingResults = 0;
   }
 
-  void setConfReceived();
+  void setConfReceived(Uint32 tcPtrI);
 
   /** 
    * The root operation will read from a number of fragments of a table.
@@ -198,9 +228,14 @@ public:
    * @param operationNo The id of the operation.
    * @return The result stream for this root fragment.
    */
-  NdbResultStream& getResultStream(Uint32 operationNo) const
-  { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); }
-  
+  NdbResultStream& getResultStream(Uint32 operationNo) const;
+
+  NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const
+  { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); }
+
+  Uint32 getReceiverId() const;
+  Uint32 getReceiverTcPtrI() const;
+
   /**
    * @return True if there are no more batches to be received for this fragment.
    */
@@ -212,6 +247,19 @@ public:
    */
   bool isEmpty() const;
 
+  /** 
+   * This method is used for marking which streams belonging to this
+   * NdbRootFragment which has remaining batches for a sub scan
+   * instantiated from the current batch of its parent operation.
+   */
+  void setRemainingSubScans(Uint32 nodeMask)
+  { 
+    m_remainingScans = nodeMask;
+  }
+
+  /** Release resources after last row has been returned */
+  void postFetchRelease();
+
 private:
   STATIC_CONST( voidFragNo = 0xffffffff);
 
@@ -221,6 +269,9 @@ private:
   /** Number of the root operation fragment.*/
   Uint32 m_fragNo;
 
+  /** For processing results originating from this root fragment (Array of).*/
+  NdbResultStream* m_resultStreams;
+
   /**
    * The number of outstanding TCKEYREF or TRANSID_AI 
    * messages for the fragment. This includes both messages related to the
@@ -239,6 +290,12 @@ private:
    * TCKEYCONF message has been received */
   bool m_confReceived;
 
+  /**
+   * A bitmask of operation id's for which we will receive more
+   * ResultSets in a NEXTREQ.
+   */
+  Uint32 m_remainingScans;
+
   /** 
    * Used for implementing a hash map from root receiver ids to a 
    * NdbRootFragment instance. m_idMapHead is the index of the first
@@ -272,26 +329,19 @@ public:
    * @param operation The operation for which we will receive results.
    * @param rootFragNo 0..n-1 when the root operation reads from n fragments.
    */
-  explicit NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo);
+  explicit NdbResultStream(NdbQueryOperationImpl& operation,
+                           NdbRootFragment& rootFrag);
 
   ~NdbResultStream();
 
   /** 
    * Prepare for receiving first results. 
-   * @return possible error code. 
    */
-  int prepare();
+  void prepare();
 
   /** Prepare for receiving next batch of scan results. */
   void reset();
     
-  /**
-   * 0..n-1 if the root operation reads from n fragments. This stream holds data
-   * derived from one of those fragments.
-   */
-  Uint32 getRootFragNo() const
-  { return m_rootFragNo; }
-
   NdbReceiver& getReceiver()
   { return m_receiver; }
 
@@ -301,6 +351,9 @@ public:
   Uint32 getRowCount() const
   { return m_rowCount; }
 
+  char* getRow(Uint32 tupleNo) const
+  { return (m_buffer + (tupleNo*m_rowSize)); }
+
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
    * ids and pass it on to m_receiver.
@@ -308,12 +361,15 @@ public:
    * @param ptr buffer holding tuple.
    * @param len buffer length.
    */
-  void execTRANSID_AI(const Uint32 *ptr, Uint32 len);
+  void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                      TupleCorrelation correlation);
 
-  /** A complete batch has been received for a fragment on this NdbResultStream,
-   *  Update whatever required before the appl. are allowed to navigate the result.
+  /**
+   * A complete batch has been received for a fragment on this NdbResultStream,
+   * Update whatever required before the appl. are allowed to navigate the result.
+   * @return true if node and all its siblings have returned all rows.
    */ 
-  void handleBatchComplete();
+  bool prepareResultSet(Uint32 remainingScans);
 
   /**
    * Navigate within the current result batch to resp. first and next row.
@@ -333,36 +389,33 @@ public:
   { return m_iterState == Iter_finished; }
 
   /** 
-   * This method is
-   * used for marking a stream as holding the last batch of a sub scan. 
-   * This means that it is the last batch of the scan that was instantiated 
-   * from the current batch of its parent operation.
-   */
-  void setSubScanCompletion(bool complete)
-  { 
-    // Lookups should always be 'complete'
-    assert(complete || m_operation.getQueryOperationDef().isScanOperation());
-    m_subScanComplete = complete; 
-  }
-
-  /** 
    * This method 
-   * returns true if this result stream holds the last batch of a sub scan
+   * returns true if this result stream holds the last batch of a sub scan.
    * This means that it is the last batch of the scan that was instantiated 
    * from the current batch of its parent operation.
    */
-  bool isSubScanComplete() const
+  bool isSubScanComplete(Uint32 remainingScans) const
   { 
-    // Lookups should always be 'complete'
-    assert(m_subScanComplete || m_operation.getQueryOperationDef().isScanOperation());
-    return m_subScanComplete; 
+    /**
+     * Find the node number seen by the SPJ block. Since a unique index
+     * operation will have two distincts nodes in the tree used by the
+     * SPJ block, this number may be different from 'opNo'.
+     */
+    const Uint32 internalOpNo = m_operation.getQueryOperationDef().getQueryOperationId();
+
+    const bool complete = !((remainingScans >> internalOpNo) & 1);
+    assert(complete || isScanResult());    // Lookups should always be 'complete'
+    return complete; 
   }
 
-  /** Variant of isSubScanComplete() above which checks that this resultstream
-   * and all its descendants have consumed all batches of rows instantiated 
-   * from their parent operation(s).
-   */
-  bool isAllSubScansComplete() const;
+  bool isScanQuery() const
+  { return (m_properties & Is_Scan_Query); }
+
+  bool isScanResult() const
+  { return (m_properties & Is_Scan_Result); }
+
+  bool isInnerJoin() const
+  { return (m_properties & Is_Inner_Join); }
 
   /** For debugging.*/
   friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
@@ -405,7 +458,7 @@ public:
      * that had no matching children.*/
     Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
 
-    explicit TupleSet()
+    explicit TupleSet() : m_hash_head(tupleNotFound)
     {}
 
   private:
@@ -415,22 +468,39 @@ public:
   };
 
 private:
-  /** This stream handles results derived from the m_rootFragNo'th 
-   * fragment of the root operation.*/
-  const Uint32 m_rootFragNo;
+  /**
+   * This stream handles results derived from specified 
+   * m_rootFrag of the root operation.
+   */
+  const NdbRootFragment& m_rootFrag;
+ 
+  /** Operation to which this resultStream belong.*/
+  NdbQueryOperationImpl& m_operation;
+
+  /** ResultStream for my parent operation, or NULL if I am root */
+  NdbResultStream* const m_parent;
+
+  const enum properties
+  {
+    Is_Scan_Query = 0x01,
+    Is_Scan_Result = 0x02,
+    Is_Inner_Join = 0x10
+  } m_properties;
 
   /** The receiver object that unpacks transid_AI messages.*/
   NdbReceiver m_receiver;
 
-  /** Max #rows which this stream may recieve in its buffer structures */
-  Uint32 m_maxRows;
+  /** The buffers which we receive the results into */
+  char* m_buffer;
+
+  /** Used for checking if buffer overrun occurred. */
+  Uint32* m_batchOverflowCheck;
+
+  Uint32 m_rowSize;
 
   /** The number of transid_AI messages received.*/
   Uint32 m_rowCount;
 
-  /** Operation to which this resultStream belong.*/
-  NdbQueryOperationImpl& m_operation;
-
   /** This is the state of the iterator used by firstResult(), nextResult().*/
   enum
   {
@@ -442,24 +512,19 @@ private:
     Iter_finished
   } m_iterState;
 
-  /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */
+  /**
+   * Tuple id of the current tuple, or 'tupleNotFound'
+   * if Iter_notStarted or Iter_finished. 
+   */
   Uint16 m_currentRow;
   
-  /** 
-   * This field is only used for result streams of scan operations. If set,
-   * it indicates that the stream is holding the last batch of a sub scan. 
-   * This means that it is the last batch of the scan that was instantiated 
-   * from the current batch of its parent operation.
-   */
-  bool m_subScanComplete;
+  /** Max #rows which this stream may recieve in its TupleSet structures */
+  Uint32 m_maxRows;
 
+  /** TupleSet contains the correlation between parent/childs */
   TupleSet* m_tupleSet;
 
-  void clearTupleSet();
-
-  void setParentChildMap(Uint16 parentId,
-                         Uint16 tupleId, 
-                         Uint16 tupleNo);
+  void buildResultCorrelations();
 
   Uint16 getTupleId(Uint16 tupleNo) const
   { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
@@ -525,15 +590,30 @@ void* NdbBulkAllocator::allocObjMem(Uint
 /////////  NdbResultStream methods ///////////
 //////////////////////////////////////////////
 
-NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo):
-  m_rootFragNo(rootFragNo),
+NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation,
+                                 NdbRootFragment& rootFrag)
+:
+  m_rootFrag(rootFrag),
+  m_operation(operation),
+  m_parent(operation.getParentOperation()
+        ? &rootFrag.getResultStream(*operation.getParentOperation())
+        : NULL),
+  m_properties(
+    (enum properties)
+     ((operation.getQueryDef().isScanQuery()
+       ? Is_Scan_Query : 0)
+     | (operation.getQueryOperationDef().isScanOperation()
+       ? Is_Scan_Result : 0)
+     | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
+       ? Is_Inner_Join : 0))),
   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
-  m_maxRows(0),
+  m_buffer(NULL),
+  m_batchOverflowCheck(NULL),
+  m_rowSize(0),
   m_rowCount(0),
-  m_operation(operation),
   m_iterState(Iter_notStarted),
   m_currentRow(tupleNotFound),
-  m_subScanComplete(true),
+  m_maxRows(0),
   m_tupleSet(NULL)
 {};
 
@@ -545,41 +625,58 @@ NdbResultStream::~NdbResultStream()
   }
 }
 
-int  // Return 0 if ok, else errorcode
+void
 NdbResultStream::prepare()
 {
+  const Uint32 rowSize = m_operation.getRowSize();
+  NdbQueryImpl &query = m_operation.getQuery();
+
+  m_rowSize = rowSize;
+
   /* Parent / child correlation is only relevant for scan type queries
    * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
    * Neither is these structures required for operations not having respective
    * child or parent operations.
    */
-  if (m_operation.getQueryDef().isScanQuery())
+  if (isScanQuery())
   {
     m_maxRows  = m_operation.getMaxBatchRows();
     m_tupleSet = 
-      new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows)) 
+      new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) 
       TupleSet[m_maxRows];
-
-    clearTupleSet();
   }
   else
     m_maxRows = 1;
 
-  return 0;
-} //NdbResultStream::prepare
+  const int bufferSize = rowSize * m_maxRows;
+  NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+  m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
+
+  // So that we can test for buffer overrun.
+  m_batchOverflowCheck = 
+    reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+  *m_batchOverflowCheck = 0xacbd1234;
 
+  m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
+  m_receiver.do_setup_ndbrecord(
+                          m_operation.getNdbRecord(),
+                          m_maxRows, 
+                          0 /*key_size*/, 
+                          0 /*read_range_no*/, 
+                          rowSize,
+                          m_buffer);
+} //NdbResultStream::prepare
 
 void
 NdbResultStream::reset()
 {
-  assert (m_operation.getQueryDef().isScanQuery());
+  assert (isScanQuery());
 
   // Root scan-operation need a ScanTabConf to complete
   m_rowCount = 0;
   m_iterState = Iter_notStarted;
   m_currentRow = tupleNotFound;
 
-  clearTupleSet();
   m_receiver.prepareSend();
   /**
    * If this stream will get new rows in the next batch, then so will
@@ -589,88 +686,14 @@ NdbResultStream::reset()
        childNo++)
   {
     NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    child.getResultStream(getRootFragNo()).reset();
+    m_rootFrag.getResultStream(child).reset();
   }
 } //NdbResultStream::reset
 
-void
-NdbResultStream::clearTupleSet()
-{
-  assert (m_operation.getQueryDef().isScanQuery());
-  for (Uint32 i=0; i<m_maxRows; i++)
-  {
-    m_tupleSet[i].m_parentId = tupleNotFound;
-    m_tupleSet[i].m_tupleId  = tupleNotFound;
-    m_tupleSet[i].m_hash_head = tupleNotFound;
-    m_tupleSet[i].m_skip = false;
-    m_tupleSet[i].m_hasMatchingChild.clear();
-  }
-}
-
-bool
-NdbResultStream::isAllSubScansComplete() const
-{ 
-  // Lookups should always be 'complete'
-  assert(m_subScanComplete || m_operation.getQueryOperationDef().isScanOperation());
-
-  if (!m_subScanComplete)
-    return false;
-
-  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); 
-       childNo++)
-  {
-    const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    const NdbResultStream& childStream = child.getResultStream(getRootFragNo());
-    if (!childStream.isAllSubScansComplete())
-      return false;
-  }
-  return true;
-} //NdbResultStream::isAllSubScansComplete
-
-
-void
-NdbResultStream::setParentChildMap(Uint16 parentId,
-                                   Uint16 tupleId, 
-                                   Uint16 tupleNo)
-{
-  assert (m_operation.getQueryDef().isScanQuery());
-  assert (tupleNo < m_maxRows);
-  assert (tupleId != tupleNotFound);
-
-  for (Uint32 i = 0; i < tupleNo; i++)
-  {
-    // Check that tuple id is unique.
-    assert (m_tupleSet[i].m_tupleId != tupleId); 
-  }
-  m_tupleSet[tupleNo].m_parentId = parentId;
-  m_tupleSet[tupleNo].m_tupleId  = tupleId;
-
-  const Uint16 hash = (parentId % m_maxRows);
-  if (parentId == tupleNotFound)
-  {
-    /* Root stream: Insert sequentially in hash_next to make it
-     * possible to use ::findTupleWithParentId() and ::findNextTuple()
-     * to navigate even the root operation.
-     */
-    assert (m_operation.getParentOperation()==NULL);
-    /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
-    if (tupleNo==0)
-      m_tupleSet[hash].m_hash_head  = 0;
-    else
-      m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
-    m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
-  }
-  else
-  {
-    /* Insert parentId in HashMap */
-    m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
-    m_tupleSet[hash].m_hash_head  = tupleNo;
-  }
-}
-
 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
  *  parentId == tupleNotFound is use as a special value for iterating results
- *  from the root operation in the order which they was inserted by ::setParentChildMap()
+ *  from the root operation in the order which they was inserted by 
+ *  ::buildResultCorrelations()
  *
  *  Position of 'currentRow' is *not* updated and should be modified by callee
  *  if it want to keep the new position.
@@ -678,7 +701,7 @@ NdbResultStream::setParentChildMap(Uint1
 Uint16
 NdbResultStream::findTupleWithParentId(Uint16 parentId) const
 {
-  assert ((parentId==tupleNotFound) == (m_operation.getParentOperation()==NULL));
+  assert ((parentId==tupleNotFound) == (m_parent==NULL));
 
   if (likely(m_rowCount>0))
   {
@@ -736,14 +759,10 @@ NdbResultStream::findNextTuple(Uint16 tu
 Uint16
 NdbResultStream::firstResult()
 {
-  NdbQueryOperationImpl* parent = m_operation.getParentOperation();
-
   Uint16 parentId = tupleNotFound;
-  if (parent!=NULL)
+  if (m_parent!=NULL)
   {
-    const NdbResultStream& parentStream = parent->getResultStream(m_rootFragNo);
-    parentId = parentStream.getCurrentTupleId();
-
+    parentId = m_parent->getCurrentTupleId();
     if (parentId == tupleNotFound)
     {
       m_currentRow = tupleNotFound;
@@ -780,104 +799,195 @@ NdbResultStream::nextResult()
 } //NdbResultStream::nextResult()
 
 
+/**
+ * Callback when a TRANSID_AI signal (receive row) is processed.
+ */
 void
-NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len)
+NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                                TupleCorrelation correlation)
 {
-  assert(m_iterState == Iter_notStarted);
-  if (m_operation.getQueryDef().isScanQuery())
-  {
-    const CorrelationData correlData(ptr, len);
-
-    assert(m_operation.getRoot().getResultStream(m_rootFragNo)
-           .m_receiver.getId() == correlData.getRootReceiverId());
-
-    m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
+  m_receiver.execTRANSID_AI(ptr, len);
+  m_rowCount++;
 
+  if (isScanQuery())
+  {
     /**
-     * Keep correlation data between parent and child tuples.
-     * Since tuples may arrive in any order, we cannot match
-     * parent and child until all tuples (for this batch and 
-     * root fragment) have arrived.
+     * Store TupleCorrelation as hidden value imm. after received row
+     * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
      */
-    setParentChildMap(m_operation.getParentOperation()==NULL
-                      ? tupleNotFound
-                      : correlData.getParentTupleId(),
-                      correlData.getTupleId(),
-                      m_rowCount);
+    Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+    row_recv[-1] = correlation.toUint32();
   }
-  else
-  {
-    // Lookup query.
-    m_receiver.execTRANSID_AI(ptr, len);
-  }
-  m_rowCount++;
-  /* Set correct #rows received in the NdbReceiver.
-   */
-  getReceiver().m_result_rows = getRowCount();
 } // NdbResultStream::execTRANSID_AI()
 
-
 /**
- * A fresh batch of results has arrived for this ResultStream (and all its parent / childs)
- * Filter away any result rows which should not be visible (yet) - Either due to incomplete
- * child batches, or the join being an 'inner join'.
- * Set result itterator state to 'before first' resultrow.
+ * Make preparations for another batch of result to be read:
+ *  - Fill in parent/child result correlations in m_tupleSet[]
+ *  - ... or reset m_tupleSet[] if we reuse the previous.
+ *  - Apply inner/outer join filtering to remove non qualifying 
+ *    rows.
  */
-void 
-NdbResultStream::handleBatchComplete()
+bool 
+NdbResultStream::prepareResultSet(Uint32 remainingScans)
 {
-  for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+  bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
+  assert(isComplete || isScanResult());                //Lookups always 'complete'
+
+  // Set correct #rows received in the NdbReceiver.
+  getReceiver().m_result_rows = getRowCount();
+
+  /**
+   * Prepare NdbResultStream for reading - either the next received
+   * from datanodes or reuse current.
+   */
+  if (m_tupleSet!=NULL)
   {
-    m_tupleSet[tupleNo].m_skip = false;
+    const bool newResults = (m_iterState!=Iter_finished);
+    if (newResults)
+    {
+      buildResultCorrelations();
+    }
+    else
+    {
+      // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      {
+        m_tupleSet[tupleNo].m_skip = false;
+      }
+    }
   }
 
+  /**
+   * Recursively iterate all child results depth first. 
+   * Filter away any result rows which should not be visible (yet) - 
+   * Either due to incomplete child batches, or the join being an 'inner join'.
+   * Set result itterator state to 'before first' resultrow.
+   */
   for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
   {
     const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    NdbResultStream& childStream = child.getResultStream(m_rootFragNo);
-    childStream.handleBatchComplete();
+    NdbResultStream& childStream = m_rootFrag.getResultStream(child);
+    const bool allSubScansComplete = childStream.prepareResultSet(remainingScans);
+
+    Uint32 childId = child.getQueryOperationDef().getQueryOperationIx();
 
-    const bool isInnerJoin = child.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll;
-    const bool allSubScansComplete = childStream.isAllSubScansComplete();
+    /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
+    const bool skipNonMatches = !allSubScansComplete ||      // 1)
+                                childStream.isInnerJoin();   // 2)
 
-    for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+    if (m_tupleSet!=NULL)
     {
-      if (!m_tupleSet[tupleNo].m_skip)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
       {
-        Uint16 tupleId = getTupleId(tupleNo);
-        if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
-          m_tupleSet[tupleNo].m_hasMatchingChild.set(childNo);
-
-        /////////////////////////////////
-        //  No child matched for this row. Making parent row visible
-        //  will cause a NULL (outer join) row to be produced.
-        //  Skip NULL row production when:
-        //    1) Some child batches are not complete; they may contain later matches.
-        //    2) A match was found in a previous batch.
-        //    3) Join type is 'inner join', skip as no child are matching.
-        //
-        else if (!allSubScansComplete                                 // 1)
-             ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childNo)  // 2)
-             ||  isInnerJoin)                                         // 3)
-          m_tupleSet[tupleNo].m_skip = true;
+        if (!m_tupleSet[tupleNo].m_skip)
+        {
+          Uint16 tupleId = getTupleId(tupleNo);
+          if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
+            m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
+
+          /////////////////////////////////
+          //  No child matched for this row. Making parent row visible
+          //  will cause a NULL (outer join) row to be produced.
+          //  Skip NULL row production when:
+          //    1) Some child batches are not complete; they may contain later matches.
+          //    2) Join type is 'inner join', skip as no child are matching.
+          //    3) A match was found in a previous batch.
+          //  Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
+          //
+          else if (skipNonMatches                                       // 1 & 2)
+               ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
+            m_tupleSet[tupleNo].m_skip = true;
+        }
       }
     }
+    isComplete &= allSubScansComplete;
   }
-  m_currentRow = tupleNotFound;
+
+  // Set current position 'before first'
   m_iterState = Iter_notStarted;
-} // NdbResultStream::handleBatchComplete()
+  m_currentRow = tupleNotFound;
+
+  return isComplete; 
+} // NdbResultStream::prepareResultSet()
+
+
+/**
+ * Fill m_tupleSet[] with correlation data between parent 
+ * and child tuples. The 'TupleCorrelation' is stored as
+ * and extra Uint32 after each row received
+ * by execTRANSID_AI().
+ *
+ * NOTE: In order to reduce work done when holding the 
+ * transporter mutex, the 'TupleCorrelation' is only stored
+ * in the buffer when it arrives. Later (here) we build the
+ * correlation hashMap immediately before we prepare to 
+ * read the NdbResultStream.
+ */
+void 
+NdbResultStream::buildResultCorrelations()
+{
+  // Buffer overrun check.
+  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
+//if (m_tupleSet!=NULL)
+  {
+    /* Clear the hashmap structures */
+    for (Uint32 i=0; i<m_maxRows; i++)
+    {
+      m_tupleSet[i].m_hash_head = tupleNotFound;
+    }
+
+    /* Rebuild correlation & hashmap from received buffers */
+    for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+    {
+      const Uint32* row = (Uint32*)getRow(tupleNo+1);
+      const TupleCorrelation correlation(row[-1]);
+
+      const Uint16 tupleId  = correlation.getTupleId();
+      const Uint16 parentId = (m_parent!=NULL) 
+                                ? correlation.getParentTupleId()
+                                : tupleNotFound;
+
+      m_tupleSet[tupleNo].m_skip     = false;
+      m_tupleSet[tupleNo].m_parentId = parentId;
+      m_tupleSet[tupleNo].m_tupleId  = tupleId;
+      m_tupleSet[tupleNo].m_hasMatchingChild.clear();
+
+      /* Insert into parentId-hashmap */
+      const Uint16 hash = (parentId % m_maxRows);
+      if (m_parent==NULL)
+      {
+        /* Root stream: Insert sequentially in hash_next to make it
+         * possible to use ::findTupleWithParentId() and ::findNextTuple()
+         * to navigate even the root operation.
+         */
+        /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
+        if (tupleNo==0)
+          m_tupleSet[hash].m_hash_head  = tupleNo;
+        else
+          m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
+        m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
+      }
+      else
+      {
+        /* Insert parentId in HashMap */
+        m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
+        m_tupleSet[hash].m_hash_head  = tupleNo;
+      }
+    }
+  }
+} // NdbResultStream::buildResultCorrelations
 
 
 ///////////////////////////////////////////
-/////////  NdbRootFragment methods ///////////
+////////  NdbRootFragment methods /////////
 ///////////////////////////////////////////
 void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags, 
                                         Uint32 noOfFrags)
 {
   for(Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
   {
-    const Uint32 receiverId = 
-      frags[fragNo].getResultStream(0).getReceiver().getId();
+    const Uint32 receiverId = frags[fragNo].getReceiverId();
     /** 
      * For reasons unknow, NdbObjectIdMap shifts ids two bits to the left,
      * so we must do the opposite to get a good hash distribution.
@@ -890,6 +1000,7 @@ void NdbRootFragment::buildReciverIdMap(
   } 
 }
 
+//static
 NdbRootFragment* 
 NdbRootFragment::receiverIdLookup(NdbRootFragment* frags, 
                                   Uint32 noOfFrags, 
@@ -903,9 +1014,7 @@ NdbRootFragment::receiverIdLookup(NdbRoo
   const int hash = (receiverId >> 2) % noOfFrags;
   int current = frags[hash].m_idMapHead;
   assert(current < static_cast<int>(noOfFrags));
-  while (current >= 0 && 
-         frags[current].getResultStream(0).getReceiver().getId() 
-         != receiverId)
+  while (current >= 0 && frags[current].getReceiverId() != receiverId)
   {
     current = frags[current].m_idMapNext;
     assert(current < static_cast<int>(noOfFrags));
@@ -924,18 +1033,65 @@ NdbRootFragment::receiverIdLookup(NdbRoo
 NdbRootFragment::NdbRootFragment():
   m_query(NULL),
   m_fragNo(voidFragNo),
+  m_resultStreams(NULL),
   m_outstandingResults(0),
   m_confReceived(false),
+  m_remainingScans(0),
   m_idMapHead(-1),
   m_idMapNext(-1)
 {
 }
 
+NdbRootFragment::~NdbRootFragment()
+{
+  assert(m_resultStreams==NULL);
+}
+
 void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
 {
   assert(m_fragNo==voidFragNo);
   m_query = &query;
   m_fragNo = fragNo;
+
+  m_resultStreams = reinterpret_cast<NdbResultStream*>
+     (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
+  assert(m_resultStreams!=NULL);
+
+  for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++) 
+  {
+    NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
+    new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
+    m_resultStreams[opNo].prepare();
+  }
+}
+
+/**
+ * Release what we want need anymore after last available row has been 
+ * returned from datanodes.
+ */ 
+void
+NdbRootFragment::postFetchRelease()
+{
+  if (m_resultStreams != NULL)
+  { 
+    for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++) 
+    {
+      m_resultStreams[opNo].~NdbResultStream();
+    }
+  }
+  /**
+   * Don't 'delete' the object as it was in-place constructed from
+   * ResultStreamAlloc'ed memory. Memory is released by
+   * ResultStreamAlloc::reset().
+   */
+  m_resultStreams = NULL;
+}
+
+NdbResultStream&
+NdbRootFragment::getResultStream(Uint32 operationNo) const
+{
+  assert(m_resultStreams);
+  return m_resultStreams[operationNo];
 }
 
 void NdbRootFragment::reset()
@@ -943,29 +1099,69 @@ void NdbRootFragment::reset()
   assert(m_fragNo!=voidFragNo);
   assert(m_outstandingResults == 0);
   assert(m_confReceived);
+
+  for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++) 
+  {
+    if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+    {
+      /**
+       * Reset m_resultStreams[] and all its descendants, since all these
+       * streams will get a new set of rows in the next batch.
+       */ 
+      m_resultStreams[opNo].reset();
+    }
+  }
   m_confReceived = false;
 }
 
-void NdbRootFragment::setConfReceived()
+void NdbRootFragment::prepareResultSet()
+{
+  NdbResultStream& rootStream = getResultStream(0);
+  rootStream.prepareResultSet(m_remainingScans);  
+
+  /* Position at the first (sorted?) row available from this fragments.
+   */
+  rootStream.firstResult();
+}
+
+void NdbRootFragment::setConfReceived(Uint32 tcPtrI)
 { 
   /* For a query with a lookup root, there may be more than one TCKEYCONF
      message. For a scan, there should only be one SCAN_TABCONF per root
      fragment. 
   */
-  assert(!m_query->getQueryDef().isScanQuery() || !m_confReceived);
+  assert(!getResultStream(0).isScanQuery() || !m_confReceived);
+  getResultStream(0).getReceiver().m_tcPtrI = tcPtrI;
   m_confReceived = true; 
 }
 
 bool NdbRootFragment::finalBatchReceived() const
 {
-  return getResultStream(0).getReceiver().m_tcPtrI==RNIL;
+  return getReceiverTcPtrI()==RNIL;
 }
 
-bool  NdbRootFragment::isEmpty() const
+bool NdbRootFragment::isEmpty() const
 { 
   return getResultStream(0).isEmpty();
 }
 
+/**
+ * SPJ requests are identified by the receiver-id of the
+ * *root* ResultStream for each RootFragment. Furthermore
+ * a NEXTREQ use the tcPtrI saved in this ResultStream to
+ * identify the 'cursor' to restart.
+ *
+ * We provide some convenient accessors for fetching this info 
+ */
+Uint32 NdbRootFragment::getReceiverId() const
+{
+  return getResultStream(0).getReceiver().getId();
+}
+
+Uint32 NdbRootFragment::getReceiverTcPtrI() const
+{
+  return getResultStream(0).getReceiver().m_tcPtrI;
+}
 
 ///////////////////////////////////////////
 /////////  NdbQuery API methods ///////////
@@ -1475,7 +1671,14 @@ NdbQueryImpl::~NdbQueryImpl()
 void
 NdbQueryImpl::postFetchRelease()
 {
-  if (m_operations != NULL) {
+  if (m_rootFrags != NULL)
+  {
+    for (unsigned i=0; i<m_rootFragCount; i++)
+    { m_rootFrags[i].postFetchRelease();
+    }
+  }
+  if (m_operations != NULL)
+  {
     for (unsigned i=0; i<m_countOperations; i++)
     { m_operations[i].postFetchRelease();
     }
@@ -1926,15 +2129,12 @@ NdbQueryImpl::nextRootResult(bool fetchA
      */
     if (fetchAllowed)
     {
-      // Ask for a new batch if we emptied one.
-      NdbRootFragment* emptyFrag = m_applFrags.getEmpty();
-      while (emptyFrag != NULL)
+      // Ask for a new batch if we emptied some.
+      NdbRootFragment** frags;
+      const Uint32 cnt = m_applFrags.getFetchMore(frags);
+      if (cnt > 0 && sendFetchMore(frags, cnt, forceSend) != 0)
       {
-        if (sendFetchMore(*emptyFrag, forceSend) != 0)
-        {
-          return NdbQuery::NextResult_error;
-        }        
-        emptyFrag = m_applFrags.getEmpty();
+        return NdbQuery::NextResult_error;
       }
     }
 
@@ -1986,6 +2186,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
         NdbRootFragment* frag;
         while ((frag=m_fullFrags.pop()) != NULL)
         {
+          frag->prepareResultSet();
           m_applFrags.add(*frag);
         }
         if (m_applFrags.getCurrent() != NULL)
@@ -2040,6 +2241,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
     NdbRootFragment* frag;
     if ((frag=m_fullFrags.pop()) != NULL)
     {
+      frag->prepareResultSet();
       m_applFrags.add(*frag);
     }
     assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
@@ -2070,15 +2272,15 @@ NdbQueryImpl::awaitMoreResults(bool forc
   returns: 'true' when application thread should be resumed.
 */
 bool 
-NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
+NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
 {
   if (traceSignals) {
-    ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo
+    ndbout << "NdbQueryImpl::handleBatchComplete"
+           << ", fragNo=" << rootFrag.getFragNo()
            << ", pendingFrags=" << (m_pendingFrags-1)
            << ", finalBatchFrags=" << m_finalBatchFrags
            <<  endl;
   }
-  bool resume = false;
 
   /* May received fragment data after a SCANREF() (timeout?) 
    * terminated the scan.  We are about to close this query, 
@@ -2086,8 +2288,6 @@ NdbQueryImpl::handleBatchComplete(Uint32
    */
   if (likely(m_fullFrags.m_errorCode == 0))
   {
-    NdbQueryOperationImpl& root = getRoot();
-    NdbRootFragment& rootFrag = m_rootFrags[fragNo];
     assert(rootFrag.isFragBatchComplete());
 
     assert(m_pendingFrags > 0);                // Check against underflow.
@@ -2100,34 +2300,14 @@ NdbQueryImpl::handleBatchComplete(Uint32
       assert(m_finalBatchFrags <= m_rootFragCount);
     }
 
-    if (getQueryDef().isScanQuery())
-    {
-      // Only required for scans
-      root.getResultStream(fragNo).handleBatchComplete();  
-
-      // Only ordered scans has to wait until all pending completed
-      resume = (m_pendingFrags==0) ||
-               (root.m_ordering==NdbQueryOptions::ScanOrdering_unordered);
-    }
-    else
-    {
-      assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL);
-      assert(m_finalBatchFrags==1);
-      assert(m_pendingFrags==0);  // Lookup query should be complete now.
-      resume = true;   
-    }
-
-    /* Position at the first (sorted?) row available from this fragments.
-     */
-    root.m_resultStreams[fragNo]->firstResult();
-
     /* When application thread ::awaitMoreResults() it will later be moved
      * from m_fullFrags to m_applFrags under mutex protection.
      */
     m_fullFrags.push(rootFrag);
+    return true;
   }
 
-  return resume;
+  return false;
 } // NdbQueryImpl::handleBatchComplete
 
 int
@@ -2272,22 +2452,22 @@ NdbQueryImpl::execTCKEYCONF()
     ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
   }
   assert(!getQueryDef().isScanQuery());
+  NdbRootFragment& rootFrag = m_rootFrags[0];
 
   // We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
-  m_rootFrags[0].setConfReceived();
-  m_rootFrags[0].incrOutstandingResults(-1);
+  rootFrag.setConfReceived(RNIL);
+  rootFrag.incrOutstandingResults(-1);
 
   bool ret = false;
-  if (m_rootFrags[0].isFragBatchComplete())
+  if (rootFrag.isFragBatchComplete())
   { 
-    ret = handleBatchComplete(0);
+    ret = handleBatchComplete(rootFrag);
   }
 
   if (traceSignals) {
     ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
            << ", m_pendingFrags=" << m_pendingFrags
-           << ", *getRoot().m_resultStreams[0]=" 
-           << *getRoot().m_resultStreams[0]
+           << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
            << endl;
   }
   return ret;
@@ -2364,9 +2544,7 @@ NdbQueryImpl::prepareSend()
   // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
   error = m_pointerAlloc.init(m_rootFragCount * 
                               (SharedFragStack::pointersPerFragment +
-                               OrderedFragSet::pointersPerFragment +
-                               // Pointers to NdbResultStream objects.
-                               getNoOfOperations()));
+                               OrderedFragSet::pointersPerFragment));
   if (error != 0)
   {
     setErrorCode(error);
@@ -2377,21 +2555,22 @@ NdbQueryImpl::prepareSend()
   getRoot().calculateBatchedRows(NULL);
   getRoot().setBatchedRows(1);
 
-  /** Calculate total amount of row buffer space for all operations and
-   * fragments.*/
+  /**
+   * Calculate total amount of row buffer space for all operations and
+   * fragments.
+   */
   Uint32 totalBuffSize = 0;
   for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
   {
-    NdbQueryOperationImpl& op = getQueryOperation(opNo);
-
-    op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows();
-    totalBuffSize += op.m_bufferSize;
+    const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+    totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
   }
-  /** Add one word per operation for buffer overrun check. We add a word
+  /**
+   * Add one word per ResultStream for buffer overrun check. We add a word
    * rather than a byte to avoid possible alignment problems.
    */
-  m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount + 
-                        sizeof(Uint32) * getNoOfOperations());
+  m_rowBufferAlloc.init(m_rootFragCount * 
+                       (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
   if (getQueryDef().isScanQuery())
   {
     Uint32 totalRows = 0;
@@ -2406,15 +2585,28 @@ NdbQueryImpl::prepareSend()
       return -1;
     }
   }
-  // 1. Build receiver structures for each QueryOperation.
-  // 2. Fill in parameters (into ATTRINFO) for QueryTree.
-  //    (Has to complete *after* ::prepareReceiver() as QueryTree params
-  //     refer receiver id's.)
-  //
+
+  /**
+   * Allocate and initialize fragment state variables.
+   * Will also cause a ResultStream object containing a 
+   * NdbReceiver to be constructed for each operation in QueryTree
+   */
+  m_rootFrags = new NdbRootFragment[m_rootFragCount];
+  if (m_rootFrags == NULL)
+  {
+    setErrorCode(Err_MemoryAlloc);
+    return -1;
+  }
+  for (Uint32 i = 0; i<m_rootFragCount; i++)
+  {
+    m_rootFrags[i].init(*this, i); // Set fragment number.
+  }
+
+  // Fill in parameters (into ATTRINFO) for QueryTree.
   for (Uint32 i = 0; i < m_countOperations; i++) {
-    int error;
-    if (unlikely((error = m_operations[i].prepareReceiver()) != 0)
-              || (error = m_operations[i].prepareAttrInfo(m_attrInfo)) != 0) {
+    const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
+    if (unlikely(error))
+    {
       setErrorCode(error);
       return -1;
     }
@@ -2450,23 +2642,6 @@ NdbQueryImpl::prepareSend()
     return -1;
   }
 
-  /**
-   * Allocate and initialize fragment state variables.
-   */
-  m_rootFrags = new NdbRootFragment[m_rootFragCount];
-  if(m_rootFrags == NULL)
-  {
-    setErrorCode(Err_MemoryAlloc);
-    return -1;
-  }
-  else
-  {
-    for(Uint32 i = 0; i<m_rootFragCount; i++)
-    {
-      m_rootFrags[i].init(*this, i); // Set fragment number.
-    }
-  }
-
   if (getQueryDef().isScanQuery())
   {
     NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
@@ -2494,11 +2669,13 @@ class InitialReceiverIdIterator: public
 {
 public:
   
-  InitialReceiverIdIterator(const NdbQueryImpl& query)
-    :m_query(query),
+  InitialReceiverIdIterator(NdbRootFragment rootFrags[],
+                            Uint32 cnt)
+    :m_rootFrags(rootFrags),
+     m_fragCount(cnt),
      m_currFragNo(0)
   {}
-  
+
   virtual ~InitialReceiverIdIterator() {};
   
   /**
@@ -2519,8 +2696,11 @@ private:
    * improving efficiency.
    */
   static const Uint32 bufSize = 16;
-  /** The query with the scan root operation that we list receiver ids for.*/
-  const NdbQueryImpl& m_query;
+
+  /** Set of root fragments which we want to itterate receiver ids for.*/
+  NdbRootFragment* m_rootFrags;
+  const Uint32 m_fragCount;
+
   /** The next fragment numnber to be processed. (Range for 0 to no of 
    * fragments.)*/
   Uint32 m_currFragNo;
@@ -2530,29 +2710,97 @@ private:
 
 const Uint32* InitialReceiverIdIterator::getNextWords(Uint32& sz)
 {
-  sz = 0;
   /**
    * For the initial batch, we want to retrieve one batch for each fragment
    * whether it is a sorted scan or not.
    */
-  if (m_currFragNo >= m_query.getRootFragCount())
+  if (m_currFragNo >= m_fragCount)
   {
+    sz = 0;
     return NULL;
   }
   else
   {
-    const NdbQueryOperationImpl& root = m_query.getQueryOperation(0U);
-    while (sz < bufSize && 
-           m_currFragNo < m_query.getRootFragCount())
+    Uint32 cnt = 0;
+    while (cnt < bufSize && m_currFragNo < m_fragCount)
     {
-      m_receiverIds[sz] = root.getReceiver(m_currFragNo).getId();
-      sz++;
+      m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId();
+      cnt++;
       m_currFragNo++;
     }
+    sz = cnt;
     return m_receiverIds;
   }
 }
   
+/** This iterator is used for inserting a sequence of 'TcPtrI'  
+ * for a NEXTREQ to a single or multiple fragments via a GenericSectionPtr.*/
+class FetchMoreTcIdIterator: public GenericSectionIterator
+{
+public:
+  FetchMoreTcIdIterator(NdbRootFragment* rootFrags[],
+                        Uint32 cnt)
+    :m_rootFrags(rootFrags),
+     m_fragCount(cnt),
+     m_currFragNo(0)
+  {}
+
+  virtual ~FetchMoreTcIdIterator() {};
+  
+  /**
+   * Get next batch of receiver ids. 
+   * @param sz This will be set to the number of receiver ids that have been
+   * put in the buffer (0 if end has been reached.)
+   * @return Array of receiver ids (or NULL if end reached.
+   */
+  virtual const Uint32* getNextWords(Uint32& sz);
+
+  virtual void reset()
+  { m_currFragNo = 0;};
+  
+private:
+  /** 
+   * Size of internal receiver id buffer. This value is arbitrary, but 
+   * a larger buffer would mean fewer calls to getNextWords(), possibly
+   * improving efficiency.
+   */
+  static const Uint32 bufSize = 16;
+
+  /** Set of root fragments which we want to itterate TcPtrI ids for.*/
+  NdbRootFragment** m_rootFrags;
+  const Uint32 m_fragCount;
+
+  /** The next fragment numnber to be processed. (Range for 0 to no of 
+   * fragments.)*/
+  Uint32 m_currFragNo;
+  /** Buffer for storing one batch of receiver ids.*/
+  Uint32 m_receiverIds[bufSize];
+};
+
+const Uint32* FetchMoreTcIdIterator::getNextWords(Uint32& sz)
+{
+  /**
+   * For the initial batch, we want to retrieve one batch for each fragment
+   * whether it is a sorted scan or not.
+   */
+  if (m_currFragNo >= m_fragCount)
+  {
+    sz = 0;
+    return NULL;
+  }
+  else
+  {
+    Uint32 cnt = 0;
+    while (cnt < bufSize && m_currFragNo < m_fragCount)
+    {
+      m_receiverIds[cnt] = m_rootFrags[m_currFragNo]->getReceiverTcPtrI();
+      cnt++;
+      m_currFragNo++;
+    }
+    sz = cnt;
+    return m_receiverIds;
+  }
+}
 
 /******************************************************************************
 int doSend()    Send serialized queryTree and parameters encapsulated in 
@@ -2690,7 +2938,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
      * Section 2 : Optional KEYINFO section
      */
     GenericSectionPtr secs[3];
-    InitialReceiverIdIterator receiverIdIter(*this);
+    InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount);
     LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize());
     LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize());
  
@@ -2829,28 +3077,18 @@ Parameters:     emptyFrag: Root frgament
 Remark:
 ******************************************************************************/
 int
-NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend)
+NdbQueryImpl::sendFetchMore(NdbRootFragment* rootFrags[],
+                            Uint32 cnt,
+                            bool forceSend)
 {
-  assert(getRoot().m_resultStreams!=NULL);
-  assert(!emptyFrag.finalBatchReceived());
   assert(getQueryDef().isScanQuery());
 
-  const Uint32 fragNo = emptyFrag.getFragNo();
-  emptyFrag.reset();
-
-  for (unsigned opNo=0; opNo<m_countOperations; opNo++) 
+  for (Uint32 i=0; i<cnt; i++)
   {
-    NdbResultStream& resultStream = 
-       getQueryOperation(opNo).getResultStream(fragNo);
-
-    if (!resultStream.isSubScanComplete())
-    {
-      /**
-       * Reset resultstream and all its descendants, since all these
-       * streams will get a new set of rows in the next batch.
-       */ 
-      resultStream.reset();
-    }
+    NdbRootFragment* rootFrag = rootFrags[i];
+    assert(rootFrag->isFragBatchComplete());
+    assert(!rootFrag->finalBatchReceived());
+    rootFrag->reset();
   }
 
   Ndb& ndb = *getNdbTransaction().getNdb();
@@ -2868,13 +3106,11 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
   scanNextReq->transId2 = (Uint32) (transId >> 32);
   tSignal.setLength(ScanNextReq::SignalLength);
 
-  const uint32 receiverId = 
-    emptyFrag.getResultStream(0).getReceiver().m_tcPtrI;
-  LinearSectionIterator receiverIdIter(&receiverId ,1);
+  FetchMoreTcIdIterator receiverIdIter(rootFrags, cnt);
 
   GenericSectionPtr secs[1];
   secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter;
-  secs[ScanNextReq::ReceiverIdsSectionNum].sz = 1;
+  secs[ScanNextReq::ReceiverIdsSectionNum].sz = cnt;
   
   NdbImpl * impl = ndb.theImpl;
   Uint32 nodeId = m_transaction.getConnectedNodeId();
@@ -2887,7 +3123,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
 
   if (unlikely(hasReceivedError()))
   {
-    // Errors arrived inbetween ::await released mutex, and fetchMore grabbed it
+    // Errors arrived inbetween ::await released mutex, and sendFetchMore grabbed it
     return -1;
   }
   if (impl->getNodeSequence(nodeId) != seq ||
@@ -2898,8 +3134,9 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
   }
   impl->do_forceSend(forceSend);
 
-  m_pendingFrags++;
+  m_pendingFrags += cnt;
   assert(m_pendingFrags <= getRootFragCount());
+
   return 0;
 } // NdbQueryImpl::sendFetchMore()
 
@@ -3116,7 +3353,7 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
   m_keyRecord = keyRecord;
   m_resultRecord = resultRecord;
   return 0;
-}
+} // OrderedFragSet::prepare()
 
 
 /**
@@ -3152,8 +3389,7 @@ NdbQueryImpl::OrderedFragSet::getCurrent
     assert(!m_activeFrags[m_activeFragCount-1]->isEmpty());
     return m_activeFrags[m_activeFragCount-1];
   }
-}
-
+} // OrderedFragSet::getCurrent()
 
 /**
  *  Keep the FragSet ordered, both with respect to specified ScanOrdering, and
@@ -3165,42 +3401,45 @@ NdbQueryImpl::OrderedFragSet::getCurrent
 void
 NdbQueryImpl::OrderedFragSet::reorganize()
 {
+  assert(m_activeFragCount > 0);
+  NdbRootFragment* const frag = m_activeFrags[m_activeFragCount-1];
+
   // Remove the current fragment if the batch has been emptied.
-  if (m_activeFragCount>0 && m_activeFrags[m_activeFragCount-1]->isEmpty())
+  if (frag->isEmpty())
   {
-    if (m_activeFrags[m_activeFragCount-1]->finalBatchReceived())
+    if (frag->finalBatchReceived())
     {
       m_finalFragCount++;
     }
     else
     {
-      m_emptiedFrags[m_emptiedFragCount++] = m_activeFrags[m_activeFragCount-1];
+      m_emptiedFrags[m_emptiedFragCount++] = frag;
     }
     m_activeFragCount--;
-    assert(m_activeFragCount==0 || 
-           !m_activeFrags[m_activeFragCount-1]->isEmpty());
     assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
            <= m_capacity);
+
+    return;  // Remaining m_activeFrags[] are sorted
   }
 
   // Reorder fragments if this is a sorted scan.
-  if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered && 
-      m_activeFragCount+m_finalFragCount == m_capacity)
+  if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
   {
     /** 
      * This is a sorted scan. There are more data to be read from 
      * m_activeFrags[m_activeFragCount-1]. Move it to its proper place.
+     *
+     * Use binary search to find the largest record that is smaller than or
+     * equal to m_activeFrags[m_activeFragCount-1].
      */
     int first = 0;
     int last = m_activeFragCount-1;
-    /* Use binary search to find the largest record that is smaller than or
-     * equal to m_activeFrags[m_activeFragCount-1] */
     int middle = (first+last)/2;
-    while(first<last)
+
+    while (first<last)
     {
       assert(middle<m_activeFragCount);
-      const int cmpRes = compare(*m_activeFrags[m_activeFragCount-1], 
-                                 *m_activeFrags[middle]);
+      const int cmpRes = compare(*frag, *m_activeFrags[middle]);
       if (cmpRes < 0)
       {
         first = middle + 1;
@@ -3216,67 +3455,29 @@ NdbQueryImpl::OrderedFragSet::reorganize
       middle = (first+last)/2;
     }
 
-    assert(m_activeFragCount == 0 ||
-           compare(*m_activeFrags[m_activeFragCount-1], 
-                   *m_activeFrags[middle]) >= 0);
-
-    if(middle < m_activeFragCount-1)
+    // Move into correct sorted position
+    if (middle < m_activeFragCount-1)
     {
-      NdbRootFragment* const oldTop = m_activeFrags[m_activeFragCount-1];
+      assert(compare(*frag, *m_activeFrags[middle]) >= 0);
       memmove(m_activeFrags+middle+1, 
               m_activeFrags+middle, 
               (m_activeFragCount - middle - 1) * sizeof(NdbRootFragment*));
-      m_activeFrags[middle] = oldTop;
+      m_activeFrags[middle] = frag;
     }
     assert(verifySortOrder());
   }
-}
+  assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
+         <= m_capacity);
+} // OrderedFragSet::reorganize()
 
 void 
 NdbQueryImpl::OrderedFragSet::add(NdbRootFragment& frag)
 {
-  assert(&frag!=NULL);
+  assert(m_activeFragCount+m_finalFragCount < m_capacity);
 
-  if (frag.isEmpty())
-  {
-    if (frag.finalBatchReceived())
-    {
-      m_finalFragCount++;
-    }
-    else
-    {
-      m_emptiedFrags[m_emptiedFragCount++] = &frag;
-    }
-  }
-  else
-  {
-    assert(m_activeFragCount+m_finalFragCount < m_capacity);
-    if(m_ordering==NdbQueryOptions::ScanOrdering_unordered)
-    {
-      m_activeFrags[m_activeFragCount++] = &frag;
-    }
-    else
-    {
-      int current = 0;
-      // Insert the new frag such that the array remains sorted.
-      while(current<m_activeFragCount && 
-            compare(frag, *m_activeFrags[current]) < 0)
-      {
-        current++;
-      }
-      memmove(m_activeFrags+current+1,
-              m_activeFrags+current,
-              (m_activeFragCount - current) * sizeof(NdbRootFragment*));
-      m_activeFrags[current] = &frag;
-      m_activeFragCount++;
-      assert(verifySortOrder());
-    }
-  }
-  assert(m_activeFragCount==0 || 
-         !m_activeFrags[m_activeFragCount-1]->isEmpty());
-  assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
-         <= m_capacity);
-}
+  m_activeFrags[m_activeFragCount++] = &frag;  // Add avail fragment
+  reorganize();                                // Move into position
+} // OrderedFragSet::add()
 
 void NdbQueryImpl::OrderedFragSet::clear() 
 { 
@@ -3285,26 +3486,21 @@ void NdbQueryImpl::OrderedFragSet::clear
   m_finalFragCount = 0;
 }
 
-NdbRootFragment* 
-NdbQueryImpl::OrderedFragSet::getEmpty()
+Uint32 
+NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
 {
-  if (m_emptiedFragCount > 0)
-  {
-    assert(m_emptiedFrags[m_emptiedFragCount-1]->isEmpty());
-    return m_emptiedFrags[--m_emptiedFragCount];
-  }
-  else
-  {
-    return NULL;
-  }
+  const Uint32 cnt = m_emptiedFragCount;
+  frags = m_emptiedFrags;
+  m_emptiedFragCount = 0;
+  return cnt;
 }
 
 bool 
 NdbQueryImpl::OrderedFragSet::verifySortOrder() const
 {
-  for(int i = 0; i<m_activeFragCount-2; i++)
+  for (int i = 0; i<m_activeFragCount-1; i++)
   {
-    if(compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
+    if (compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
     {
       assert(false);
       return false;
@@ -3313,7 +3509,6 @@ NdbQueryImpl::OrderedFragSet::verifySort
   return true;
 }
 
-
 /**
  * Compare frags such that f1<f2 if f1 is empty but f2 is not.
  * - Othewise compare record contents.
@@ -3364,10 +3559,7 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_parent(NULL),
   m_children(def.getNoOfChildOperations()),
   m_maxBatchRows(0),   // >0: User specified prefered value, ==0: Use default CFG values
-  m_resultStreams(NULL),
   m_params(),
-  m_bufferSize(0),
-  m_batchOverflowCheck(NULL),
   m_resultBuffer(NULL),
   m_resultRef(NULL),
   m_isRowNull(true),
@@ -3416,10 +3608,10 @@ NdbQueryOperationImpl::NdbQueryOperation
 
 NdbQueryOperationImpl::~NdbQueryOperationImpl()
 {
-  // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
-  // Either by fetching through last row, or calling ::close() which forcefully terminates fetch
-  assert (m_batchOverflowCheck == NULL);
-  assert (m_resultStreams == NULL);
+  /**
+   * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
+   * Either by fetching through last row, or calling ::close() which forcefully terminates fetch
+   */
   assert (m_firstRecAttr == NULL);
   assert (m_interpretedCode == NULL);
 } //NdbQueryOperationImpl::~NdbQueryOperationImpl()
@@ -3431,22 +3623,6 @@ NdbQueryOperationImpl::~NdbQueryOperatio
 void
 NdbQueryOperationImpl::postFetchRelease()
 {
-  // Buffer overrun check.
-  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck ==  0xacbd1234);
-  m_batchOverflowCheck = NULL;
-  
-  if (m_resultStreams != NULL)
-  { 
-    for (int i = static_cast<int>(getQuery().getRootFragCount())-1; i >= 0; i--)
-    {
-      if (m_resultStreams[i] != NULL)
-      {
-        m_resultStreams[i]->~NdbResultStream();
-      }
-    }
-  }
-  m_resultStreams = NULL;
-
   Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
   NdbRecAttr* recAttr = m_firstRecAttr;
   while (recAttr != NULL) {
@@ -3680,7 +3856,7 @@ NdbQueryOperationImpl::firstResult()
 
   if (rootFrag != NULL)
   {
-    NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+    NdbResultStream& resultStream = rootFrag->getResultStream(*this);
     if (resultStream.firstResult() != tupleNotFound)
     {
       fetchRow(resultStream);
@@ -3720,7 +3896,7 @@ NdbQueryOperationImpl::nextResult(bool f
     const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
     if (rootFrag!=NULL)
     {
-      NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+      NdbResultStream& resultStream = rootFrag->getResultStream(*this);
       if (resultStream.nextResult() != tupleNotFound)
       {
         fetchRow(resultStream);
@@ -3736,7 +3912,7 @@ NdbQueryOperationImpl::nextResult(bool f
 void 
 NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
 {
-  const char* buff = resultStream.getReceiver().get_row();
+  const char* buff = resultStream.getReceiver().peek_row();
   assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
 
   m_isRowNull = false;
@@ -4051,55 +4227,9 @@ NdbQueryOperationImpl::setBatchedRows(Ui
   }
 }
 
-
-int 
-NdbQueryOperationImpl::prepareReceiver()
-{
-  // Construct receiver streams and prepare them for receiving scan result
-  assert(m_resultStreams==NULL);
-  assert(m_queryImpl.getRootFragCount() > 0);
-
-  m_resultStreams = reinterpret_cast<NdbResultStream**>
-    (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount())); 
-
-  for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
-    m_resultStreams[i] = NULL;  // Init to legal contents for d'tor
-  }
-  for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
-    m_resultStreams[i] = new (getQuery().m_resultStreamAlloc.allocObjMem(1))
-      NdbResultStream(*this, i);
-    const int error = m_resultStreams[i]->prepare();
-    if (unlikely(error)) {
-      return error;
-    }
-
-    m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION, 
-                                        false, this);
-    char* const rowBuf = reinterpret_cast<char*>(getQuery().getRowBufferAlloc()
-                                                 .allocObjMem(m_bufferSize));
-    m_resultStreams[i]->getReceiver()
-      .do_setup_ndbrecord(m_ndbRecord,
-                          getMaxBatchRows(), 
-                          0 /*key_size*/, 
-                          0 /*read_range_no*/, 
-                          getRowSize(),
-                          rowBuf);
-    m_resultStreams[i]->getReceiver().prepareSend();
-  }
-  // So that we can test for for buffer overrun.
-  m_batchOverflowCheck = 
-    reinterpret_cast<Uint32*>(getQuery().getRowBufferAlloc()
-                              .allocObjMem(sizeof(Uint32)));
-  *m_batchOverflowCheck = 0xacbd1234;
-  return 0;
-}//NdbQueryOperationImpl::prepareReceiver
-
 int 
 NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
 {
-  // ::prepareReceiver() need to complete first:
-  assert (m_resultStreams != NULL);
-
   const NdbQueryOperationDefImpl& def = getQueryOperationDef();
 
   /**
@@ -4553,10 +4683,12 @@ NdbQueryOperationImpl::prepareLookupKeyI
 bool 
 NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
 {
+  TupleCorrelation tupleCorrelation;
   NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
-  Uint32 rootFragNo = 0;
+
   if (getQueryDef().isScanQuery())
   {
+    const CorrelationData correlData(ptr, len);
     const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
@@ -4572,24 +4704,27 @@ NdbQueryOperationImpl::execTRANSID_AI(co
       assert(false);
       return false;
     }
-    rootFragNo = rootFrag->getFragNo();
+
+    // Extract tuple correlation.
+    tupleCorrelation = correlData.getTupleCorrelation();
+    len -= CorrelationData::wordCount;
   }
+
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" 
            << ", operation no: " << getQueryOperationDef().getQueryOperationIx()
-           << ", fragment no: " << rootFragNo
+           << ", fragment no: " << rootFrag->getFragNo()
            << endl;
   }
 
   // Process result values.
-  m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
-
+  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
   rootFrag->incrOutstandingResults(-1);
 
   bool ret = false;
   if (rootFrag->isFragBatchComplete())
   {
-    ret = m_queryImpl.handleBatchComplete(rootFragNo);
+    ret = m_queryImpl.handleBatchComplete(*rootFrag);
   }
 
   if (traceSignals) {
@@ -4631,7 +4766,6 @@ NdbQueryOperationImpl::execTCKEYREF(cons
     }
   }
 
-  Uint32 rootFragNo = 0;
   NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
 
   if (ref->errorCode != DbspjErr::NodeFailure)
@@ -4656,13 +4790,12 @@ NdbQueryOperationImpl::execTCKEYREF(cons
   bool ret = false;
   if (rootFrag.isFragBatchComplete())
   { 
-    ret = m_queryImpl.handleBatchComplete(rootFragNo);
+    ret = m_queryImpl.handleBatchComplete(rootFrag);
   } 
 
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
-           << ", *getRoot().m_resultStreams[0] {" 
-           << *getRoot().m_resultStreams[0] << "}"
+           << ", resultStream= {" << rootFrag.getResultStream(*this) << "}"
            << ", *this=" << *this <<  endl;
   }
   return ret;
@@ -4694,48 +4827,22 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
     assert(false);
     return false;
   }
-  rootFrag->setConfReceived();
+  // Prepare for SCAN_NEXTREQ, tcPtrI==RNIL, nodeMask==0 -> EOF
+  rootFrag->setConfReceived(tcPtrI);
+  rootFrag->setRemainingSubScans(nodeMask);
   rootFrag->incrOutstandingResults(rowCount);
 
-  // Handle for SCAN_NEXTREQ, RNIL -> EOF
-  NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
-  resultStream.getReceiver().m_tcPtrI = tcPtrI;  
-
   if(traceSignals){
-    ndbout << "  resultStream(root) {" << resultStream << "} fragNo" 
-           << rootFrag->getFragNo() << endl;
+    ndbout << "  resultStream {" << rootFrag->getResultStream(*this)
+           << "} fragNo" << rootFrag->getFragNo()
+           << endl;
   }
 
-  const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef();
-  /* Mark each scan node to indicate if the current batch is the last in the
-   * current sub-scan or not.
-   */
-  for (Uint32 opNo = 0; opNo < queryDef.getNoOfOperations(); opNo++)
-  {
-    const NdbQueryOperationImpl& op = m_queryImpl.getQueryOperation(opNo);
-    /**
-     * Find the node number seen by the SPJ block. Since a unique index
-     * operation will have two distincts nodes in the tree used by the
-     * SPJ block, this number may be different from 'opNo'.
-     */
-    const Uint32 internalOpNo = op.getQueryOperationDef().getQueryOperationId();
-    assert(internalOpNo >= opNo);
-    const bool complete = ((nodeMask >> internalOpNo) & 1) == 0;
-
-    // Lookups should always be 'complete'
-    assert(complete ||  op.getQueryOperationDef().isScanOperation());
-    rootFrag->getResultStream(opNo).setSubScanCompletion(complete);
-  }
-  // Check that nodeMask does not have more bits than we have operations. 
-  assert(nodeMask >> 
-         (1+queryDef.getQueryOperation(queryDef.getNoOfOperations() - 1)
-          .getQueryOperationId()) == 0);
-
   bool ret = false;
   if (rootFrag->isFragBatchComplete())
   {
     /* This fragment is now complete */
-    ret = m_queryImpl.handleBatchComplete(rootFrag->getFragNo());
+    ret = m_queryImpl.handleBatchComplete(*rootFrag);
   }
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
@@ -4871,13 +4978,6 @@ int NdbQueryOperationImpl::setBatchSize(
   return 0;
 }
 
-NdbResultStream& 
-NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const
-{
-  assert(rootFragNo < getQuery().getRootFragCount());
-  return *m_resultStreams[rootFragNo];
-}
-
 bool
 NdbQueryOperationImpl::hasInterpretedCode() const
 {
@@ -4916,15 +5016,8 @@ NdbQueryOperationImpl::prepareInterprete
 
 Uint32 
 NdbQueryOperationImpl::getIdOfReceiver() const {
-  return m_resultStreams[0]->getReceiver().getId();
-}
-
-
-const NdbReceiver& 
-NdbQueryOperationImpl::getReceiver(Uint32 recNo) const {
-  assert(recNo<getQuery().getRootFragCount());
-  assert(m_resultStreams!=NULL);
-  return m_resultStreams[recNo]->getReceiver();
+  NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
+  return rootFrag.getResultStream(*this).getReceiver().getId();
 }
 
 Uint32 NdbQueryOperationImpl::getRowSize() const
@@ -4934,6 +5027,12 @@ Uint32 NdbQueryOperationImpl::getRowSize
   {
     m_rowSize = 
       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+
+    const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
+    if (withCorrelation)
+    {
+      m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
+    }
   }
   return m_rowSize;
 }
@@ -4953,7 +5052,8 @@ NdbOut& operator<<(NdbOut& out, const Nd
   out << "  m_queryImpl: " << &op.m_queryImpl;
   out << "  m_operationDef: " << &op.m_operationDef;
   for(Uint32 i = 0; i<op.m_queryImpl.getRootFragCount(); i++){
-    out << "  m_resultStream[" << i << "]{" << *op.m_resultStreams[i] << "}";
+    NdbRootFragment& rootFrag = op.m_queryImpl.m_rootFrags[i];
+    out << "  m_resultStream[" << i << "]{" << rootFrag.getResultStream(op) << "}";
   }
   out << " m_isRowNull " << op.m_isRowNull;
   out << " ]";

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-17 08:10:27 +0000
@@ -247,9 +247,15 @@ public:
   Uint32 getRootFragCount() const
   { return m_rootFragCount; }
 
+  NdbBulkAllocator& getResultStreamAlloc()
+  { return m_resultStreamAlloc; }
+
   NdbBulkAllocator& getTupleSetAlloc()
   { return m_tupleSetAlloc; }
 
+  NdbBulkAllocator& getRowBufferAlloc()
+  { return m_rowBufferAlloc; }
+
 private:
   /** Possible return values from NdbQueryImpl::awaitMoreResults. 
    * A subset of the integer values also matches those returned
@@ -275,7 +281,7 @@ private:
   class SharedFragStack{
   public:
     // For calculating need for dynamically allocated memory.
-    static const Uint32 pointersPerFragment = 2;
+    static const Uint32 pointersPerFragment = 1;
 
     explicit SharedFragStack();
 
@@ -320,7 +326,7 @@ private:
   class OrderedFragSet{
   public:
     // For calculating need for dynamically allocated memory.
-    static const Uint32 pointersPerFragment = 1;
+    static const Uint32 pointersPerFragment = 2;
 
     explicit OrderedFragSet();
 
@@ -342,9 +348,11 @@ private:
     /** Get the root fragment from which to read the next row.*/
     NdbRootFragment* getCurrent() const;
 
-    /** Re-organize the fragments after a row has been consumed. This is 
-     * needed to remove fragements that has been needed, and to re-sort 
-     * fragments if doing a sorted scan.*/
+    /**
+     * Re-organize the fragments after a row has been consumed. This is 
+     * needed to remove fragments that has been emptied, and to re-sort 
+     * fragments if doing a sorted scan.
+     */
     void reorganize();
 
     /** Add a complete fragment that has been received.*/
@@ -353,12 +361,14 @@ private:
     /** Reset object to an empty state.*/
     void clear();
 
-    /** Get a fragment where all rows have been consumed. (This method is 
-     * not idempotent - the fragment is removed from the set. 
-     * @return Emptied fragment (or NULL if there are no more emptied 
-     * fragments).
+    /**
+     * Get all fragments where more rows may be (pre-)fetched.
+     * (This method is not idempotent - the fragments are removed
+     * from the set.)
+     * @return Number of fragments (in &frags) from which more 
+     * results should be requested.
      */
-    NdbRootFragment* getEmpty();
+    Uint32 getFetchMore(NdbRootFragment** &rootFrags);
 
   private:
 
@@ -543,7 +553,8 @@ private:
   /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
    * @return 0 if send succeeded, -1 otherwise.
    */
-  int sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend);
+  int sendFetchMore(NdbRootFragment* rootFrags[], Uint32 cnt,
+                    bool forceSend);
 
   /** Wait for more scan results which already has been REQuested to arrive.
    * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
@@ -577,13 +588,10 @@ private:
    *  the result.
    *  @return: 'true' if its time to resume appl. threads
    */ 
-  bool handleBatchComplete(Uint32 rootFragNo);
+  bool handleBatchComplete(NdbRootFragment& rootFrag);
 
   NdbBulkAllocator& getPointerAlloc()
   { return m_pointerAlloc; }
-  
-  NdbBulkAllocator& getRowBufferAlloc()
-  { return m_rowBufferAlloc; }
 
 }; // class NdbQueryImpl
 
@@ -705,10 +713,6 @@ public:
   int setInterpretedCode(const NdbInterpretedCode& code);
   bool hasInterpretedCode() const;
 
-  NdbResultStream& getResultStream(Uint32 rootFragNo) const;
-
-  const NdbReceiver& getReceiver(Uint32 rootFragNo) const;
-
   /** Verify magic number.*/
   bool checkMagicNumber() const
   { return m_magic == MAGIC; }
@@ -719,6 +723,12 @@ public:
   Uint32 getMaxBatchRows() const
   { return m_maxBatchRows; }
 
+  /** Get size of row as required to buffer it. */  
+  Uint32 getRowSize() const;
+
+  const NdbRecord* getNdbRecord() const
+  { return m_ndbRecord; }
+
 private:
 
   STATIC_CONST (MAGIC = 0xfade1234);
@@ -742,16 +752,9 @@ private:
   /** Max rows (per resultStream) in a scan batch.*/
   Uint32 m_maxBatchRows;
 
-  /** For processing results from this operation (Array of).*/
-  NdbResultStream** m_resultStreams;
   /** Buffer for parameters in serialized format */
   Uint32Buffer m_params;
 
-  /** Buffer size allocated for *each* ResultStream/Receiver when 
-   *  fetching results.*/
-  Uint32 m_bufferSize;
-  /** Used for checking if buffer overrun occurred. */
-  Uint32* m_batchOverflowCheck;
   /** User specified buffer for final storage of result.*/
   char* m_resultBuffer;
   /** User specified pointer to application pointer that should be 
@@ -819,9 +822,6 @@ private:
   Uint32 calculateBatchedRows(const NdbQueryOperationImpl* closestScan);
   void setBatchedRows(Uint32 batchedRows);
 
-  /** Construct and prepare receiver streams for result processing. */
-  int prepareReceiver();
-
   /** Prepare ATTRINFO for execution. (Add execution params++)
    *  @return possible error code.*/
   int prepareAttrInfo(Uint32Buffer& attrInfo);
@@ -863,7 +863,6 @@ private:
   bool diskInUserProjection() const
   { return m_diskInUserProjection; }
 
-  Uint32 getRowSize() const;
 }; // class NdbQueryOperationImpl
 
 

=== modified file 'storage/ndb/test/ndbapi/testIndexStat.cpp'
--- a/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/test/ndbapi/testIndexStat.cpp	2011-08-17 10:36:01 +0000
@@ -110,7 +110,8 @@ static const NdbRecord* g_ind_rec = 0;
 
 struct my_record
 {
-  Uint32 m_null_bm;
+  Uint8 m_null_bm;
+  Uint8 fill[3];
   Uint32 m_a;
   Uint32 m_b;
   char m_c[1+g_charlen];
@@ -133,6 +134,7 @@ static NdbIndexScanOperation* g_rangesca
 
 static NdbIndexStat* g_is = 0;
 static bool g_has_created_stat_tables = false;
+static bool g_has_created_stat_events = false;
 
 static uint
 urandom()
@@ -212,7 +214,7 @@ errdb()
       ll0(++any << " rangescan_op: error " << e);
   }
   if (g_is != 0) {
-    const NdbError& e = g_is->getNdbError();
+    const NdbIndexStat::Error& e = g_is->getNdbError();
     if (e.code != 0)
       ll0(++any << " stat: error " << e);
   }
@@ -376,6 +378,7 @@ struct Val {
   void copy(const Val& val2);
   void make(uint numattrs, const Lim& lim);
   int cmp(const Val& val2, uint numattrs = g_numattrs, uint* num_eq = 0) const;
+  void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
 
 private:
   Val& operator=(const Val&);
@@ -556,6 +559,40 @@ Val::cmp(const Val& val2, uint numattrs,
   return k;
 }
 
+void
+Val::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
+{
+  const char* key = (j == 0 ? ib.low_key : ib.high_key);
+  const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
+  const Uint8 nullbits = *(const Uint8*)key;
+  require(numattrs <= g_numattrs);
+  if (numattrs >= 1) {
+    if (nullbits & (1 << g_ndbrec_b_nb_offset))
+      b_null = 1;
+    else {
+      memcpy(&b, &key[g_ndbrec_b_offset], sizeof(b));
+      b_null = 0;
+    }
+  }
+  if (numattrs >= 2) {
+    if (nullbits & (1 << g_ndbrec_c_nb_offset))
+      c_null = 1;
+    else {
+      memcpy(c, &key[g_ndbrec_c_offset], sizeof(c));
+      c_null = 0;
+    }
+  }
+  if (numattrs >= 3) {
+    if (nullbits & (1 << g_ndbrec_d_nb_offset))
+      d_null = 1;
+    else {
+      memcpy(&d, &key[g_ndbrec_d_offset], sizeof(d));
+      d_null = 0;
+    }
+  }
+  m_numattrs = numattrs;
+}
+
 // index keys
 
 struct Key {
@@ -844,7 +881,9 @@ struct Bnd {
   Bnd& make(uint minattrs);
   Bnd& make(uint minattrs, const Val& theval);
   int cmp(const Key& key) const;
+  int cmp(const Bnd& bnd2);
   int type(uint colno) const; // for setBound
+  void fromib(const NdbIndexScanOperation::IndexBound& ib, uint j);
 
 private:
   Bnd& operator=(const Bnd&);
@@ -937,6 +976,52 @@ Bnd::cmp(const Key& key) const
 }
 
 int
+Bnd::cmp(const Bnd& bnd2)
+{
+  int place; // debug
+  int ret;
+  const Bnd& bnd1 = *this;
+  const Val& val1 = bnd1.m_val;
+  const Val& val2 = bnd2.m_val;
+  const uint numattrs1 = val1.m_numattrs;
+  const uint numattrs2 = val2.m_numattrs;
+  const uint n = (numattrs1 < numattrs2 ? numattrs1 : numattrs2);
+  do {
+    int k = val1.cmp(val2, n);
+    if (k != 0) {
+      place = 1;
+      ret = k;
+      break;
+    }
+    if (numattrs1 < numattrs2) {
+      place = 2;
+      ret = (+1) * bnd1.m_side;
+      break;
+    }
+    if (numattrs1 > numattrs2) {
+      place = 3;
+      ret = (-1) * bnd1.m_side;
+      break;
+    }
+    if (bnd1.m_side < bnd2.m_side) {
+      place = 4;
+      ret = -1;
+      break;
+    }
+    if (bnd1.m_side > bnd2.m_side) {
+      place = 5;
+      ret = +1;
+      break;
+    }
+    place = 6;
+    ret = 0;
+  } while (0);
+  ll3("bnd: " << *this << " cmp bnd: " << bnd2
+      << " ret: " << ret << " place: " << place);
+  return ret;
+}
+
+int
 Bnd::type(uint colno) const
 {
   int t;
@@ -960,6 +1045,21 @@ Bnd::type(uint colno) const
   return t;
 }
 
+void
+Bnd::fromib(const NdbIndexScanOperation::IndexBound& ib, uint j)
+{
+  Val& val = m_val;
+  val.fromib(ib, j);
+  const uint numattrs = (j == 0 ? ib.low_key_count : ib.high_key_count);
+  const bool inclusive = (j == 0 ? ib.low_inclusive : ib.high_inclusive);
+  if (numattrs == 0) {
+    m_side = 0;
+  } else {
+    m_side = (j == 0 ? (inclusive ? -1 : +1) : (inclusive ? +1 : -1));
+  }
+  m_lohi = j;
+}
+
 // stats values
 
 struct Stval {
@@ -1016,6 +1116,7 @@ struct Rng {
   void copy(const Rng& rng2);
   int cmp(const Key& key) const; // -1,0,+1 = key is before,in,after range
   uint rowcount() const;
+  void fromib(const NdbIndexScanOperation::IndexBound& ib);
 
 private:
   Rng& operator=(const Rng&);
@@ -1164,6 +1265,15 @@ Rng::rowcount() const
   return count;
 }
 
+void
+Rng::fromib(const NdbIndexScanOperation::IndexBound& ib)
+{
+  for (uint j = 0; j <= 1; j++) {
+    Bnd& bnd = m_bnd[j];
+    bnd.fromib(ib, j);
+  }
+}
+
 static Rng* g_rnglist = 0;
 
 static void
@@ -1409,6 +1519,40 @@ readstat()
   return 0;
 }
 
+// test polling after updatestat
+
+static int
+startlistener()
+{
+  ll1("startlistener");
+  chkdb(g_is->create_listener(g_ndb_sys) == 0);
+  chkdb(g_is->execute_listener(g_ndb_sys) == 0);
+  return 0;
+}
+
+static int
+runlistener()
+{
+  ll1("runlistener");
+  int ret;
+  chkdb((ret = g_is->poll_listener(g_ndb_sys, 10000)) != -1);
+  chkrc(ret == 1);
+  // one event is expected
+  chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
+  chkrc(ret == 1);
+  chkdb((ret = g_is->next_listener(g_ndb_sys)) != -1);
+  chkrc(ret == 0);
+  return 0;
+}
+
+static int
+stoplistener()
+{
+  ll1("stoplistener");
+  chkdb(g_is->drop_listener(g_ndb_sys) != -1);
+  return 0;
+}
+
 // stats queries
 
 // exact stats from scan results
@@ -1475,7 +1619,8 @@ queryscan(Rng& rng)
  */
 static int
 initialiseIndexBound(const Rng& rng, 
-                     NdbIndexScanOperation::IndexBound& ib)
+                     NdbIndexScanOperation::IndexBound& ib,
+                     my_record* low_key, my_record* high_key)
 {
   ll3("initialiseIndexBound: " << rng);
   uint i;
@@ -1483,9 +1628,13 @@ initialiseIndexBound(const Rng& rng,
   Uint32 colsInBound[2]= {0, 0};
   bool boundInclusive[2]= {false, false};
 
+  memset(&ib, 0xf1, sizeof(ib));
+  memset(low_key, 0xf2, sizeof(*low_key));
+  memset(high_key, 0xf3, sizeof(*high_key));
+
   // Clear nullbit storage
-  *((char *)ib.low_key) = 
-    *((char *)ib.high_key) = 0;
+  low_key->m_null_bm = 0;
+  high_key->m_null_bm = 0;
 
   for (i = 0; i < g_numattrs; i++) {
     const Uint32 no = i; // index attribute number
@@ -1499,7 +1648,7 @@ initialiseIndexBound(const Rng& rng,
     }
     for (j = 0; j <= 1; j++) {
       /* Get ptr to key storage space for this bound */
-      my_record* keyBuf= (my_record *)( (j==0) ? ib.low_key : ib.high_key);
+      my_record* keyBuf= (j==0) ? low_key : high_key;
       int t = type[j];
       if (t == -1)
         continue;
@@ -1542,8 +1691,10 @@ initialiseIndexBound(const Rng& rng,
   }
 
   /* Now have everything we need to initialise the IndexBound */
+  ib.low_key = (char*)low_key;
   ib.low_key_count= colsInBound[0];
   ib.low_inclusive= boundInclusive[0];
+  ib.high_key = (char*)high_key;
   ib.high_key_count= colsInBound[1];
   ib.high_inclusive= boundInclusive[1];
   ib.range_no= 0;
@@ -1554,11 +1705,18 @@ initialiseIndexBound(const Rng& rng,
       " high_inc=" << ib.high_inclusive);
   ll3(" low bound b=" << *((Uint32*) &ib.low_key[g_ndbrec_b_offset]) <<
       " d=" << *((Uint16*) &ib.low_key[g_ndbrec_d_offset]) <<
-      " first byte=%xu" << ib.low_key[0]);
+      " first byte=" << ib.low_key[0]);
   ll3(" high bound b=" << *((Uint32*) &ib.high_key[g_ndbrec_b_offset]) <<
       " d=" << *((Uint16*) &ib.high_key[g_ndbrec_d_offset]) <<
-      " first byte=%xu" << ib.high_key[0]);  
+      " first byte=" << ib.high_key[0]);  
 
+  // verify by reverse
+  {
+    Rng rng;
+    rng.fromib(ib);
+    require(rng.m_bnd[0].cmp(bnd[0]) == 0);
+    require(rng.m_bnd[1].cmp(bnd[1]) == 0);
+  }
   return 0;
 }
 
@@ -1568,13 +1726,12 @@ querystat_v2(Rng& rng)
   ll3("querystat_v2");
   
   /* Create IndexBound and key storage space */
-  char keySpace[2][g_ndbrecord_bytes];
   NdbIndexScanOperation::IndexBound ib;
-  ib.low_key= keySpace[0];
-  ib.high_key= keySpace[1];
+  my_record low_key;
+  my_record high_key;
 
   chkdb((g_con = g_ndb->startTransaction()) != 0);
-  chkrc(initialiseIndexBound(rng, ib) == 0);
+  chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
 
   Uint64 count = ~(Uint64)0;
   chkdb(g_is->records_in_range(g_ind, 
@@ -1610,10 +1767,9 @@ querystat(Rng& rng)
 
   // convert to IndexBound (like in mysqld)
   NdbIndexScanOperation::IndexBound ib;
-  char keySpace[2][g_ndbrecord_bytes];
-  ib.low_key = keySpace[0];
-  ib.high_key = keySpace[1];
-  chkrc(initialiseIndexBound(rng, ib) == 0);
+  my_record low_key;
+  my_record high_key;
+  chkrc(initialiseIndexBound(rng, ib, &low_key, &high_key) == 0);
   chkrc(g_is->convert_range(range, g_ind_rec, &ib) == 0);
 
   // index stat query
@@ -1960,6 +2116,7 @@ runtest()
   chkrc(createindex() == 0);
   chkrc(createNdbRecords() == 0);
   chkrc(definestat() == 0);
+  chkrc(startlistener() == 0);
 
   for (g_loop = 0; g_opts.loops == 0 || g_loop < g_opts.loops; g_loop++) {
     ll0("=== loop " << g_loop << " ===");
@@ -1973,6 +2130,7 @@ runtest()
     makeranges();
     chkrc(scanranges() == 0);
     chkrc(updatestat() == 0);
+    chkrc(runlistener() == 0);
     chkrc(readstat() == 0);
     chkrc(queryranges() == 0);
     loopstats();
@@ -1980,6 +2138,7 @@ runtest()
   }
   finalstats();
 
+  chkrc(stoplistener() == 0);
   if (!g_opts.keeptable)
     chkrc(droptable() == 0);
   freeranges();
@@ -2113,22 +2272,66 @@ docreate_stat_tables()
 {
   if (g_is->check_systables(g_ndb_sys) == 0)
     return 0;
+  ll1("check_systables: " << g_is->getNdbError());
 
-  if (g_is->create_systables(g_ndb_sys) == 0)
-  {
-    g_has_created_stat_tables = true;
-    return 0;
-  }
-  return -1;
+  ll0("create stat tables");
+  chkdb(g_is->create_systables(g_ndb_sys) == 0);
+  g_has_created_stat_tables = true;
+  return 0;
 }
 
 static
-void
+int
 dodrop_stat_tables()
 {
   if (g_has_created_stat_tables == false)
-    return;
-  g_is->drop_systables(g_ndb_sys);
+    return 0;
+
+  ll0("drop stat tables");
+  chkdb(g_is->drop_systables(g_ndb_sys) == 0);
+  return 0;
+}
+
+static int
+docreate_stat_events()
+{
+  if (g_is->check_sysevents(g_ndb_sys) == 0)
+    return 0;
+  ll1("check_sysevents: " << g_is->getNdbError());
+
+  ll0("create stat events");
+  chkdb(g_is->create_sysevents(g_ndb_sys) == 0);
+  g_has_created_stat_events = true;
+  return 0;
+}
+
+static int
+dodrop_stat_events()
+{
+  if (g_has_created_stat_events == false)
+    return 0;
+
+  ll0("drop stat events");
+  chkdb(g_is->drop_sysevents(g_ndb_sys) == 0);
+  return 0;
+}
+
+static int
+docreate_sys_objects()
+{
+  require(g_is != 0 && g_ndb_sys != 0);
+  chkrc(docreate_stat_tables() == 0);
+  chkrc(docreate_stat_events() == 0);
+  return 0;
+}
+
+static int
+dodrop_sys_objects()
+{
+  require(g_is != 0 && g_ndb_sys != 0);
+  chkrc(dodrop_stat_events() == 0);
+  chkrc(dodrop_stat_tables() == 0);
+  return 0;
 }
 
 int
@@ -2156,17 +2359,22 @@ main(int argc, char** argv)
     ll0("connect failed");
     return NDBT_ProgramExit(NDBT_FAILED);
   }
-  if (docreate_stat_tables() == -1){
-    ll0("failed to create stat tables");
-    return NDBT_ProgramExit(NDBT_FAILED);
+  if (docreate_sys_objects() == -1) {
+    ll0("failed to check or create stat tables and events");
+    goto failed;
   }
   if (runtest() == -1) {
     ll0("test failed");
-    dodrop_stat_tables();
-    dodisconnect();
-    return NDBT_ProgramExit(NDBT_FAILED);
+    goto failed;
+  }
+  if (dodrop_sys_objects() == -1) {
+    ll0("failed to drop created stat tables or events");
+    goto failed;
   }
-  dodrop_stat_tables();
   dodisconnect();
   return NDBT_ProgramExit(NDBT_OK);
+failed:
+  (void)dodrop_sys_objects();
+  dodisconnect();
+  return NDBT_ProgramExit(NDBT_FAILED);
 }

=== modified file 'storage/ndb/tools/ndb_index_stat.cpp'
--- a/storage/ndb/tools/ndb_index_stat.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/tools/ndb_index_stat.cpp	2011-08-17 10:36:01 +0000
@@ -35,6 +35,8 @@ static my_bool _sys_create = false;
 static my_bool _sys_create_if_not_exist = false;
 static my_bool _sys_create_if_not_valid = false;
 static my_bool _sys_check = false;
+static my_bool _sys_skip_tables = false;
+static my_bool _sys_skip_events = false;
 static int _sys_any = 0;
 // other
 static my_bool _verbose = false;
@@ -88,11 +90,13 @@ doconnect()
     CHK2(g_ncc->connect(6, 5) == 0, getNdbError(g_ncc));
     CHK2(g_ncc->wait_until_ready(30, 10) == 0, getNdbError(g_ncc));
 
-    g_ndb = new Ndb(g_ncc, _dbname);
-    CHK2(g_ndb->init() == 0, g_ndb->getNdbError());
-    CHK2(g_ndb->waitUntilReady(30) == 0, g_ndb->getNdbError());
-
-    g_dic = g_ndb->getDictionary();
+    if (!_sys_any)
+    {
+      g_ndb = new Ndb(g_ncc, _dbname);
+      CHK2(g_ndb->init() == 0, g_ndb->getNdbError());
+      CHK2(g_ndb->waitUntilReady(30) == 0, g_ndb->getNdbError());
+      g_dic = g_ndb->getDictionary();
+    }
 
     g_ndb_sys = new Ndb(g_ncc, NDB_INDEX_STAT_DB);
     CHK2(g_ndb_sys->init() == 0, g_ndb_sys->getNdbError());
@@ -112,7 +116,7 @@ dodisconnect()
   delete g_ndb_sys;
   delete g_ndb;
   delete g_ncc;
-    g_info << "disconnected" << endl;
+  g_info << "disconnected" << endl;
 }
 
 static const char*
@@ -380,69 +384,146 @@ dosys()
   {
     if (_sys_drop)
     {
-      g_info << "dropping any sys tables" << endl;
-      CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
-      CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
-           "unexpected error: " << g_is->getNdbError());
+      if (!_sys_skip_events)
+      {
+        g_info << "dropping sys events" << endl;
+        CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success");
+        CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+             "unexpected error: " << g_is->getNdbError());
+      }
+
+      if (!_sys_skip_tables)
+      {
+        g_info << "dropping all sys tables" << endl;
+        CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+        CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+             "unexpected error: " << g_is->getNdbError());
+      }
       g_info << "drop done" << endl;
     }
 
     if (_sys_create)
     {
-      g_info << "creating all sys tables" << endl;
-      CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      g_info << "create done" << endl;
-    }
-
-    if (_sys_create_if_not_exist)
-    {
-      if (g_is->check_systables(g_ndb_sys) == -1)
+      if (!_sys_skip_tables)
       {
-        CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
-             g_is->getNdbError());
         g_info << "creating all sys tables" << endl;
         CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
         CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+      }
+
+      if (!_sys_skip_events)
+      {
+        g_info << "creating sys events" << endl;
+        CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+        CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
         g_info << "create done" << endl;
       }
-      else
+    }
+
+    if (_sys_create_if_not_exist)
+    {
+      if (!_sys_skip_tables)
       {
-        g_info << "using existing sys tables" << endl;
+        if (g_is->check_systables(g_ndb_sys) == -1)
+        {
+          CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+               g_is->getNdbError());
+          g_info << "creating all sys tables" << endl;
+          CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys tables" << endl;
+        }
+      }
+
+      if (!_sys_skip_events)
+      {
+        if (g_is->check_sysevents(g_ndb_sys) == -1)
+        {
+          CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+               g_is->getNdbError());
+          g_info << "creating sys events" << endl;
+          CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys events" << endl;
+        }
       }
     }
 
     if (_sys_create_if_not_valid)
     {
-      if (g_is->check_systables(g_ndb_sys) == -1)
+      if (!_sys_skip_tables)
       {
-        if (g_is->getNdbError().code != NdbIndexStat::NoSysTables)
+        if (g_is->check_systables(g_ndb_sys) == -1)
         {
-          CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables,
-               g_is->getNdbError());
-          g_info << "dropping invalid sys tables" << endl;
-          CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
-          CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
-          CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
-               "unexpected error: " << g_is->getNdbError());
-          g_info << "drop done" << endl;
+          if (g_is->getNdbError().code != NdbIndexStat::NoSysTables)
+          {
+            CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysTables,
+                 g_is->getNdbError());
+            g_info << "dropping invalid sys tables" << endl;
+            CHK2(g_is->drop_systables(g_ndb_sys) == 0, g_is->getNdbError());
+            CHK2(g_is->check_systables(g_ndb_sys) == -1, "unexpected success");
+            CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysTables,
+                 "unexpected error: " << g_is->getNdbError());
+            g_info << "drop done" << endl;
+          }
+          g_info << "creating all sys tables" << endl;
+          CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys tables" << endl;
         }
-        g_info << "creating all sys tables" << endl;
-        CHK2(g_is->create_systables(g_ndb_sys) == 0, g_is->getNdbError());
-        CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
-        g_info << "create done" << endl;
       }
-      else
+      if (!_sys_skip_events)
       {
-        g_info << "using existing sys tables" << endl;
+        if (g_is->check_sysevents(g_ndb_sys) == -1)
+        {
+          if (g_is->getNdbError().code != NdbIndexStat::NoSysEvents)
+          {
+            CHK2(g_is->getNdbError().code == NdbIndexStat::BadSysEvents,
+                 g_is->getNdbError());
+            g_info << "dropping invalid sys events" << endl;
+            CHK2(g_is->drop_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+            CHK2(g_is->check_sysevents(g_ndb_sys) == -1, "unexpected success");
+            CHK2(g_is->getNdbError().code == NdbIndexStat::NoSysEvents,
+                 "unexpected error: " << g_is->getNdbError());
+            g_info << "drop done" << endl;
+          }
+          g_info << "creating sys events" << endl;
+          CHK2(g_is->create_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+          CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+          g_info << "create done" << endl;
+        }
+        else
+        {
+          g_info << "using existing sys events" << endl;
+        }
       }
     }
 
     if (_sys_check)
     {
-      CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
-      g_info << "sys tables ok" << endl;
+      if (!_sys_skip_tables)
+      {
+        CHK2(g_is->check_systables(g_ndb_sys) == 0, g_is->getNdbError());
+        g_info << "sys tables ok" << endl;
+      }
+      if (!_sys_skip_events)
+      {
+        CHK2(g_is->check_sysevents(g_ndb_sys) == 0, g_is->getNdbError());
+        g_info << "sys events ok" << endl;
+      }
     }
   }
   while (0);
@@ -512,25 +593,33 @@ my_long_options[] =
     GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
   // sys options
   { "sys-drop", ++oi,
-    "Drop any stats tables in NDB kernel (all stats is lost)",
+    "Drop any stats tables and events in NDB kernel (all stats is lost)",
     (uchar **)&_sys_drop, (uchar **)&_sys_drop, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-create", ++oi,
-    "Create stats tables in NDB kernel (must not exist)",
+    "Create stats tables and events in NDB kernel (must not exist)",
     (uchar **)&_sys_create, (uchar **)&_sys_create, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-create-if-not-exist", ++oi,
-    "Like --sys-create but do nothing if correct stats tables exist",
+    "Like --sys-create but do nothing if correct objects exist",
     (uchar **)&_sys_create_if_not_exist, (uchar **)&_sys_create_if_not_exist, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-create-if-not-valid", ++oi,
-    "Like --sys-create-if-not-exist but first drop any invalid tables",
+    "Like --sys-create-if-not-exist but first drop any invalid objects",
     (uchar **)&_sys_create_if_not_valid, (uchar **)&_sys_create_if_not_valid, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   { "sys-check", ++oi,
-    "Check that correct stats tables exist in NDB kernel",
+    "Check that correct stats tables and events exist in NDB kernel",
     (uchar **)&_sys_check, (uchar **)&_sys_check, 0,
     GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-skip-tables", ++oi,
+    "Do not apply sys options to tables",
+    (uchar **)&_sys_skip_tables, (uchar **)&_sys_skip_tables, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+  { "sys-skip-events", ++oi,
+    "Do not apply sys options to events",
+    (uchar **)&_sys_skip_events, (uchar **)&_sys_skip_events, 0,
+    GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
   // other
   { "verbose", 'v',
     "Verbose messages",
@@ -579,7 +668,9 @@ checkopts(int argc, char** argv)
       (_sys_create_if_not_exist != 0) +
       (_sys_create_if_not_valid != 0) +
       (_sys_drop != 0) +
-      ( _sys_check != 0);
+      ( _sys_check != 0) +
+      (_sys_skip_tables != 0) +
+      (_sys_skip_events != 0);
     if (!_sys_any)
     {
       if (_dbname == 0)

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster branch (jonas.oreland:3435 to 3436) Jonas Oreland22 Aug