List:Commits« Previous MessageNext Message »
From:Magnus Blåudd Date:November 8 2009 1:53pm
Subject:bzr commit into mysql-5.1-telco-7.0 branch (magnus.blaudd:3199)
View as plain text  
#At file:///home/msvensson/mysql/7.0/ based on revid:frazer@strippedlvb6c

 3199 Magnus Blåudd	2009-11-08 [merge]
      Merge ndbinfo backport

    removed:
      storage/ndb/include/ndbinfo.h
      storage/ndb/src/common/util/ndbinfo.c
      storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp
      storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tableids.h
      storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h
      storage/ndb/src/mgmsrv/mgm_ndbinfo.cpp
    added:
      mysql-test/suite/ndb/r/ndbinfo.result
      mysql-test/suite/ndb/t/ndbinfo.test
      sql/ha_ndbinfo.cc
      sql/ha_ndbinfo.h
      storage/ndb/src/kernel/vm/Ndbinfo.cpp
      storage/ndb/src/kernel/vm/Ndbinfo.hpp
      storage/ndb/src/kernel/vm/NdbinfoTables.cpp
      storage/ndb/src/ndbapi/NdbInfo.cpp
      storage/ndb/src/ndbapi/NdbInfo.hpp
      storage/ndb/src/ndbapi/NdbInfoRecAttr.hpp
      storage/ndb/src/ndbapi/NdbInfoScanOperation.cpp
      storage/ndb/src/ndbapi/NdbInfoScanOperation.hpp
      storage/ndb/test/ndbapi/testNdbinfo.cpp
      storage/ndb/tools/ndbinfo_sql.cpp
    modified:
      libmysqld/Makefile.am
      mysql-test/suite/ndb/r/ndb_basic.result
      sql/Makefile.am
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      storage/ndb/CMakeLists.txt
      storage/ndb/include/Makefile.am
      storage/ndb/include/kernel/signaldata/DbinfoScan.hpp
      storage/ndb/include/kernel/signaldata/SignalData.hpp
      storage/ndb/include/mgmapi/mgmapi.h
      storage/ndb/include/util/BaseString.hpp
      storage/ndb/include/util/HashMap.hpp
      storage/ndb/include/util/Vector.hpp
      storage/ndb/src/common/util/CMakeLists.txt
      storage/ndb/src/common/util/Makefile.am
      storage/ndb/src/kernel/blocks/LocalProxy.cpp
      storage/ndb/src/kernel/blocks/LocalProxy.hpp
      storage/ndb/src/kernel/blocks/Makefile.am
      storage/ndb/src/kernel/blocks/backup/Backup.cpp
      storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
      storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp
      storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbinfo/CMakeLists.txt
      storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp
      storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.hpp
      storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp
      storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp
      storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp
      storage/ndb/src/kernel/blocks/suma/Suma.cpp
      storage/ndb/src/kernel/blocks/trix/Trix.cpp
      storage/ndb/src/kernel/vm/CMakeLists.txt
      storage/ndb/src/kernel/vm/Makefile.am
      storage/ndb/src/kernel/vm/SimulatedBlock.cpp
      storage/ndb/src/kernel/vm/SimulatedBlock.hpp
      storage/ndb/src/mgmapi/mgmapi.cpp
      storage/ndb/src/mgmclient/CommandInterpreter.cpp
      storage/ndb/src/mgmsrv/Makefile.am
      storage/ndb/src/mgmsrv/MgmtSrvr.cpp
      storage/ndb/src/mgmsrv/MgmtSrvr.hpp
      storage/ndb/src/mgmsrv/Services.cpp
      storage/ndb/src/ndbapi/CMakeLists.txt
      storage/ndb/src/ndbapi/Makefile.am
      storage/ndb/test/ndbapi/CMakeLists.txt
      storage/ndb/test/ndbapi/Makefile.am
      storage/ndb/test/ndbapi/testMgm.cpp
      storage/ndb/tools/CMakeLists.txt
      storage/ndb/tools/Makefile.am
=== modified file 'libmysqld/Makefile.am'
--- a/libmysqld/Makefile.am	2009-09-30 18:51:17 +0000
+++ b/libmysqld/Makefile.am	2009-11-08 12:52:27 +0000
@@ -48,7 +48,7 @@ noinst_HEADERS =	embedded_priv.h emb_qca
 
 sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
 	     ha_ndbcluster.cc ha_ndbcluster_cond.cc \
-	ha_ndbcluster_connection.cc \
+	ha_ndbcluster_connection.cc ha_ndbinfo.cc \
 	ha_ndbcluster_binlog.cc ha_partition.cc \
 	handler.cc sql_handler.cc \
 	hostname.cc init.cc password.c \
@@ -120,6 +120,9 @@ ha_ndbcluster_binlog.o: ha_ndbcluster_bi
 ha_ndbcluster_connection.o: ha_ndbcluster_connection.cc
 		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
 
+ha_ndbinfo.o: ha_ndbinfo.cc
+		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
+
 # Until we can remove dependency on ha_ndbcluster.h
 handler.o:	handler.cc
 		$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<

=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2009-04-08 18:10:37 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2009-11-08 12:52:27 +0000
@@ -44,6 +44,11 @@ ndb_table_temporary	#
 ndb_use_copying_alter_table	#
 ndb_use_exact_count	#
 ndb_use_transactions	#
+ndbinfo_database	#
+ndbinfo_max_bytes	#
+ndbinfo_max_rows	#
+ndbinfo_show_hidden	#
+ndbinfo_table_prefix	#
 CREATE TABLE t1 (
 pk1 INT NOT NULL PRIMARY KEY,
 attr1 INT NOT NULL,

=== added file 'mysql-test/suite/ndb/r/ndbinfo.result'
--- a/mysql-test/suite/ndb/r/ndbinfo.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndbinfo.result	2009-11-08 12:52:27 +0000
@@ -0,0 +1,159 @@
+result_format: 2
+
+
+SELECT * FROM information_schema.plugins WHERE PLUGIN_NAME = 'ndbinfo';
+PLUGIN_NAME	PLUGIN_VERSION	PLUGIN_STATUS	PLUGIN_TYPE	PLUGIN_TYPE_VERSION	PLUGIN_LIBRARY	PLUGIN_LIBRARY_VERSION	PLUGIN_AUTHOR	PLUGIN_DESCRIPTION	PLUGIN_LICENSE
+ndbinfo	0.1	ACTIVE	STORAGE ENGINE	50139.0	NULL	NULL	Sun Microsystems Inc.	MySQL Cluster system information storage engine	GPL
+
+## Creation of temporary tables should not be supported by NDBINFO engine
+CREATE TEMPORARY TABLE `t1` (
+  `dummy` INT UNSIGNED
+) ENGINE=NDBINFO;
+ERROR HY000: Table storage engine 'ndbinfo' does not support the create option 'TEMPORARY'
+
+@have_ndbinfo:= COUNT(*)
+1
+USE ndbinfo;
+
+SHOW CREATE TABLE ndb$tables;
+Table	Create Table
+ndb$tables	CREATE TABLE `ndb$tables` (
+  `table_id` int(10) unsigned DEFAULT NULL,
+  `table_name` varchar(512) DEFAULT NULL,
+  `comment` varchar(512) DEFAULT NULL
+) ENGINE=NDBINFO DEFAULT CHARSET=latin1
+
+SELECT * FROM ndb$tables;
+table_id	table_name	comment
+0	tables	
+1	columns	
+2	memusage	
+3	logdestination	
+4	backup_records	
+5	backup_parameters	
+6	pools	
+7	test	
+8	trp_status	
+SELECT COUNT(*) FROM ndb$tables;
+COUNT(*)
+9
+SELECT * FROM ndb$tables WHERE table_id = 2;
+table_id	table_name	comment
+2	memusage	
+SELECT * FROM ndb$tables WHERE table_id > 5;
+table_id	table_name	comment
+6	pools	
+7	test	
+8	trp_status	
+SELECT * FROM ndb$tables WHERE table_name = 'LOGDESTINATION';
+table_id	table_name	comment
+3	logdestination	
+SELECT COUNT(*) FROM ndb$tables t1, ndb$tables t2 WHERE t1.table_id = t1.table_id;
+COUNT(*)
+81
+
+SELECT table_id, table_name, comment from ndb$tables
+  WHERE table_id > 2 AND table_id <= 5 ORDER BY table_id;
+table_id	table_name	comment
+3	logdestination	
+4	backup_records	
+5	backup_parameters	
+SELECT table_id FROM ndb$tables  WHERE table_id = 2 ORDER BY table_name;
+table_id
+2
+SELECT table_id, table_name FROM ndb$tables ORDER BY table_name;
+table_id	table_name
+5	backup_parameters
+4	backup_records
+1	columns
+3	logdestination
+2	memusage
+6	pools
+0	tables
+7	test
+8	trp_status
+
+SELECT table_id, column_id, column_name FROM ndb$columns LIMIT 7;
+table_id	column_id	column_name
+0	0	table_id
+0	1	table_name
+0	2	comment
+1	0	table_id
+1	1	column_id
+1	2	column_name
+1	3	column_type
+
+UPDATE ndb$tables SET table_id=2 WHERE table_id=3;
+ERROR HY000: Table 'ndb$tables' is read only
+
+UPDATE ndb$tables SET table_id=9 WHERE 1=0;
+ERROR HY000: Table 'ndb$tables' is read only
+
+UPDATE ndb$tables SET table_id=9 WHERE table_id > 1;
+ERROR HY000: Table 'ndb$tables' is read only
+
+DELETE FROM ndb$tables WHERE table_id=3;
+ERROR HY000: Table 'ndb$tables' is read only
+
+DELETE FROM ndb$tables WHERE 1=0;
+ERROR HY000: Table 'ndb$tables' is read only
+
+DELETE FROM ndb$tables WHERE table_id > 1;
+ERROR HY000: Table 'ndb$tables' is read only
+
+FLUSH TABLES;
+SELECT table_id FROM ndb$tables;
+table_id
+0
+1
+2
+3
+4
+5
+6
+7
+8
+
+TRUNCATE ndb$tables;
+ERROR HY000: Table 'ndb$tables' is read only
+
+## Variables and status
+SHOW GLOBAL STATUS LIKE 'ndbinfo\_%';
+Variable_name	Value
+SHOW GLOBAL VARIABLES LIKE 'ndbinfo\_%';
+Variable_name	Value
+ndbinfo_database	ndbinfo
+ndbinfo_max_bytes	0
+ndbinfo_max_rows	10
+ndbinfo_show_hidden	OFF
+ndbinfo_table_prefix	ndb$
+
+SELECT counter, HEX(counter2) FROM ndb$test LIMIT 10;
+counter	HEX(counter2)
+0	0
+1	100000000
+2	200000000
+3	300000000
+4	400000000
+5	500000000
+6	600000000
+7	700000000
+8	800000000
+9	900000000
+
+SHOW TABLES LIKE 'ndb$te%';
+Tables_in_ndbinfo (ndb$te%)
+set @@ndbinfo_show_hidden=TRUE;
+SHOW TABLES LIKE 'ndb$te%';
+Tables_in_ndbinfo (ndb$te%)
+ndb$test
+set @@ndbinfo_show_hidden=default;
+
+set @@ndbinfo_table_prefix="somethingelse";
+ERROR HY000: Variable 'ndbinfo_table_prefix' is a read only variable
+
+set @@ndbinfo_database="somethingelse";
+ERROR HY000: Variable 'ndbinfo_database' is a read only variable
+
+## Cleanup
+DROP DATABASE ndbinfo;

=== added file 'mysql-test/suite/ndb/t/ndbinfo.test'
--- a/mysql-test/suite/ndb/t/ndbinfo.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndbinfo.test	2009-11-08 12:52:27 +0000
@@ -0,0 +1,80 @@
+--result_format 2
+--source include/have_ndb.inc
+--source have_ndbinfo.inc
+
+SELECT * FROM information_schema.plugins WHERE PLUGIN_NAME = 'ndbinfo';
+
+## Creation of temporary tables should not be supported by NDBINFO engine
+--error ER_ILLEGAL_HA_CREATE_OPTION
+CREATE TEMPORARY TABLE `t1` (
+  `dummy` INT UNSIGNED
+) ENGINE=NDBINFO;
+
+# Run the ndbinfo.sql script that creates ndbinfo database, tables and views
+--exec $MYSQL < $NDBINFO_SQL 2>&1
+
+USE ndbinfo;
+
+SHOW CREATE TABLE ndb$tables;
+
+SELECT * FROM ndb$tables;
+SELECT COUNT(*) FROM ndb$tables;
+SELECT * FROM ndb$tables WHERE table_id = 2;
+SELECT * FROM ndb$tables WHERE table_id > 5;
+SELECT * FROM ndb$tables WHERE table_name = 'LOGDESTINATION';
+SELECT COUNT(*) FROM ndb$tables t1, ndb$tables t2 WHERE t1.table_id = t1.table_id;
+
+SELECT table_id, table_name, comment from ndb$tables
+  WHERE table_id > 2 AND table_id <= 5 ORDER BY table_id;
+SELECT table_id FROM ndb$tables  WHERE table_id = 2 ORDER BY table_name;
+SELECT table_id, table_name FROM ndb$tables ORDER BY table_name;
+
+SELECT table_id, column_id, column_name FROM ndb$columns LIMIT 7;
+
+--error ER_OPEN_AS_READONLY
+UPDATE ndb$tables SET table_id=2 WHERE table_id=3;
+
+--error ER_OPEN_AS_READONLY
+UPDATE ndb$tables SET table_id=9 WHERE 1=0;
+
+--error ER_OPEN_AS_READONLY
+UPDATE ndb$tables SET table_id=9 WHERE table_id > 1;
+
+--error ER_OPEN_AS_READONLY
+DELETE FROM ndb$tables WHERE table_id=3;
+
+--error ER_OPEN_AS_READONLY
+DELETE FROM ndb$tables WHERE 1=0;
+
+--error ER_OPEN_AS_READONLY
+DELETE FROM ndb$tables WHERE table_id > 1;
+
+FLUSH TABLES;
+SELECT table_id FROM ndb$tables;
+
+--error ER_OPEN_AS_READONLY
+TRUNCATE ndb$tables;
+
+## Variables and status
+SHOW GLOBAL STATUS LIKE 'ndbinfo\_%';
+SHOW GLOBAL VARIABLES LIKE 'ndbinfo\_%';
+
+SELECT counter, HEX(counter2) FROM ndb$test LIMIT 10;
+
+# All tables that contain data are hidden by default
+# and becomes visible with ndbinfo_show_hidden
+SHOW TABLES LIKE 'ndb$te%';
+set @@ndbinfo_show_hidden=TRUE;
+SHOW TABLES LIKE 'ndb$te%';
+set @@ndbinfo_show_hidden=default;
+
+# Check that ndbinfo_table_prefix is readonly
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+set @@ndbinfo_table_prefix="somethingelse";
+
+# Check that ndbinfo_database is readonly
+--error ER_INCORRECT_GLOBAL_LOCAL_VAR
+set @@ndbinfo_database="somethingelse";
+
+## Cleanup
+DROP DATABASE ndbinfo;

=== modified file 'sql/Makefile.am'
--- a/sql/Makefile.am	2009-10-01 05:56:05 +0000
+++ b/sql/Makefile.am	2009-11-08 12:52:27 +0000
@@ -61,7 +61,7 @@ noinst_HEADERS =	item.h item_func.h item
 			ha_ndbcluster.h ha_ndbcluster_cond.h \
 			ha_ndbcluster_binlog.h ha_ndbcluster_tables.h \
 			ha_ndbcluster_connection.h ha_ndbcluster_connection.h \
-			ha_ndbcluster_lock_ext.h \
+			ha_ndbcluster_lock_ext.h ha_ndbinfo.h \
 			ha_partition.h rpl_constants.h \
 			opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
 			rpl_reporting.h \
@@ -133,7 +133,8 @@ libndb_la_CPPFLAGS=	@ndbcluster_includes
 libndb_la_SOURCES=	ha_ndbcluster.cc \
 			ha_ndbcluster_binlog.cc \
 			ha_ndbcluster_connection.cc \
-			ha_ndbcluster_cond.cc
+			ha_ndbcluster_cond.cc \
+			ha_ndbinfo.cc
 
 gen_lex_hash_SOURCES =	gen_lex_hash.cc
 gen_lex_hash_LDFLAGS =  @NOINST_LDFLAGS@

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2009-10-23 19:20:27 +0000
+++ b/sql/ha_ndbcluster.cc	2009-11-08 12:52:27 +0000
@@ -55,14 +55,10 @@
 extern ulong opt_ndb_cache_check_time;
 
 // ndb interface initialization/cleanup
-#ifdef  __cplusplus
-extern "C" {
-#endif
-extern void ndb_init_internal();
-extern void ndb_end_internal();
-#ifdef  __cplusplus
-}
-#endif
+extern "C" void ndb_init_internal();
+extern "C" void ndb_end_internal();
+
+static const ha_rows DEFAULT_AUTO_PREFETCH= 32;
 
 const char *ndb_distribution_names[]= {"KEYHASH", "LINHASH", NullS};
 TYPELIB ndb_distribution_typelib= { array_elements(ndb_distribution_names)-1,
@@ -70,13 +66,6 @@ TYPELIB ndb_distribution_typelib= { arra
 const char *opt_ndb_distribution= ndb_distribution_names[ND_KEYHASH];
 enum ndb_distribution opt_ndb_distribution_id= ND_KEYHASH;
 
-/*
-  Provided for testing purposes to be able to run full test suite
-  with --ndbcluster option without getting warnings about cluster
-  not being connected
-*/
-my_bool ndbcluster_silent= 0;
-
 // Default value for parallelism
 static const int parallelism= 0;
 
@@ -5413,9 +5402,9 @@ void ha_ndbcluster::start_bulk_insert(ha
   {
     /* We don't know how many will be inserted, guess */
     m_rows_to_insert=
-      (m_autoincrement_prefetch > NDB_DEFAULT_AUTO_PREFETCH)
+      (m_autoincrement_prefetch > DEFAULT_AUTO_PREFETCH)
       ? m_autoincrement_prefetch
-      : NDB_DEFAULT_AUTO_PREFETCH;
+      : DEFAULT_AUTO_PREFETCH;
     m_autoincrement_prefetch= m_rows_to_insert;
   }
   else
@@ -8111,7 +8100,7 @@ ha_ndbcluster::ha_ndbcluster(handlerton 
   m_blobs_buffer(0),
   m_blobs_buffer_size(0),
   m_dupkey((uint) -1),
-  m_autoincrement_prefetch((ha_rows) NDB_DEFAULT_AUTO_PREFETCH),
+  m_autoincrement_prefetch(DEFAULT_AUTO_PREFETCH),
   m_cond(NULL),
   m_multi_cursor(NULL)
 {
@@ -8804,10 +8793,8 @@ err:
                              share->key, share->use_count));
     free_share(&share);
   }
-  /*
-    ndbcluster_silent - avoid "cluster disconnected error"
-  */
-  if (ndb_error.code && (!ndbcluster_silent || ndb_error.code != 4009))
+
+  if (ndb_error.code)
   {
     ERR_RETURN(ndb_error);
   }
@@ -8832,12 +8819,6 @@ int ndbcluster_table_exists_in_engine(ha
   NdbDictionary::Dictionary::List list;
   if (dict->listObjects(list, NdbDictionary::Object::UserTable) != 0)
   {
-    /*
-      ndbcluster_silent
-      - avoid "cluster failure" warning if cluster is not connected
-    */
-    if (ndbcluster_silent && dict->getNdbError().code == 4009)
-      DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
     ERR_RETURN(dict->getNdbError());
   }
   for (uint i= 0 ; i < list.count ; i++)
@@ -13492,6 +13473,14 @@ SHOW_VAR ndb_status_variables_export[]= 
 struct st_mysql_storage_engine ndbcluster_storage_engine=
 { MYSQL_HANDLERTON_INTERFACE_VERSION };
 
+
+#include "ha_ndbinfo.h"
+
+extern struct st_mysql_sys_var* ndbinfo_system_variables[];
+
+struct st_mysql_storage_engine ndbinfo_storage_engine=
+{ MYSQL_HANDLERTON_INTERFACE_VERSION };
+
 mysql_declare_plugin(ndbcluster)
 {
   MYSQL_STORAGE_ENGINE_PLUGIN,
@@ -13500,12 +13489,26 @@ mysql_declare_plugin(ndbcluster)
   "MySQL AB",
   "Clustered, fault-tolerant tables",
   PLUGIN_LICENSE_GPL,
-  ndbcluster_init, /* Plugin Init */
-  NULL, /* Plugin Deinit */
-  0x0100 /* 1.0 */,
+  ndbcluster_init,            /* plugin init */
+  NULL,                       /* plugin deinit */
+  0x0100,                     /* plugin version */
   ndb_status_variables_export,/* status variables                */
   NULL,                       /* system variables                */
   NULL                        /* config options                  */
+},
+{
+  MYSQL_STORAGE_ENGINE_PLUGIN,
+  &ndbinfo_storage_engine,
+  "ndbinfo",
+  "Sun Microsystems Inc.",
+  "MySQL Cluster system information storage engine",
+  PLUGIN_LICENSE_GPL,
+  ndbinfo_init,               /* plugin init */
+  ndbinfo_deinit,             /* plugin deinit */
+  0x0001,                     /* plugin version */
+  NULL,                       /* status variables */
+  ndbinfo_system_variables,   /* system variables */
+  NULL                        /* config options */
 }
 mysql_declare_plugin_end;
 

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2009-10-07 17:38:23 +0000
+++ b/sql/ha_ndbcluster.h	2009-10-28 18:42:32 +0000
@@ -35,8 +35,6 @@
 #include <kernel/ndb_limits.h>
 
 #define NDB_HIDDEN_PRIMARY_KEY_LENGTH 8
-#define NDB_DEFAULT_AUTO_PREFETCH 32
-
 
 class Ndb;             // Forward declaration
 class NdbOperation;    // Forward declaration
@@ -51,12 +49,6 @@ class NdbInterpretedCode;
 class ha_ndbcluster_cond;
 class Ndb_event_data;
 
-// connectstring to cluster if given by mysqld
-extern const char *ndbcluster_connectstring;
-extern ulong ndb_cache_check_time;
-extern ulong ndb_report_thresh_binlog_epoch_slip;
-extern ulong ndb_report_thresh_binlog_mem_usage;
-
 typedef enum ndb_index_type {
   UNDEFINED_INDEX = 0,
   PRIMARY_KEY_INDEX = 1,

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	2009-10-23 19:20:27 +0000
+++ b/sql/ha_ndbcluster_binlog.cc	2009-10-28 18:58:25 +0000
@@ -43,8 +43,6 @@ extern my_bool opt_ndb_log_empty_epochs;
 extern my_bool opt_ndb_log_update_as_write;
 extern my_bool opt_ndb_log_updated_only;
 
-extern my_bool ndbcluster_silent;
-
 extern my_bool ndb_log_binlog_index;
 
 /*
@@ -704,21 +702,13 @@ static void ndbcluster_reset_slave(THD *
   char *end= strmov(buf, "DELETE FROM " NDB_REP_DB "." NDB_APPLY_TABLE);
   run_query(thd, buf, end, NULL, TRUE, FALSE);
   if (thd->main_da.is_error() &&
-      ((thd->main_da.sql_errno() == ER_NO_SUCH_TABLE) ||
-       (thd->main_da.sql_errno() == ER_OPEN_AS_READONLY && ndbcluster_silent)))
+      ((thd->main_da.sql_errno() == ER_NO_SUCH_TABLE)))
   {
     /*
       If table does not exist ignore the error as it
       is a consistant behavior
     */
     thd->main_da.reset_diagnostics_area();
-    /*
-      ndbcluster_silent
-      - avoid "no table mysql.ndb_apply_status" warning - ER_NO_SUCH_TABLE
-      - avoid "mysql.ndb_apply_status read only" warning - ER_OPEN_AS_READONLY
-    */
-    if (ndbcluster_silent)
-      mysql_reset_errors(thd, 1);
   }
 
   DBUG_VOID_RETURN;
@@ -872,11 +862,6 @@ static int ndbcluster_global_schema_lock
     DBUG_RETURN(0);
   }
 
-  /*
-    ndbcluster_silent - avoid "cluster disconnected error"
-  */
-  if (ndbcluster_silent)
-    report_cluster_disconnected= 0;
   if (ndb_error.code != 4009 || report_cluster_disconnected)
   {
     sql_print_warning("NDB: Could not acquire global schema lock (%d)%s",
@@ -5479,6 +5464,9 @@ enum Binlog_thread_state
   BCCC_restart= 2
 };
 
+extern ulong ndb_report_thresh_binlog_epoch_slip;
+extern ulong ndb_report_thresh_binlog_mem_usage;
+
 pthread_handler_t ndb_binlog_thread_func(void *arg)
 {
   THD *thd; /* needs to be first for thread_stack */

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	2009-09-21 14:23:46 +0000
+++ b/sql/ha_ndbcluster_binlog.h	2009-10-28 19:11:53 +0000
@@ -198,18 +198,11 @@ extern Ndb_cluster_connection* g_ndb_clu
 void ndbcluster_global_schema_lock_init();
 void ndbcluster_global_schema_lock_deinit();
 
-extern pthread_t ndb_binlog_thread;
-extern pthread_mutex_t injector_mutex;
-extern pthread_cond_t  injector_cond;
-
 extern unsigned char g_node_id_map[max_ndb_nodes];
-extern pthread_t ndb_util_thread;
 extern pthread_mutex_t LOCK_ndb_util_thread;
 extern pthread_cond_t COND_ndb_util_thread;
-extern int ndbcluster_util_inited;
 extern pthread_mutex_t ndbcluster_mutex;
 extern HASH ndbcluster_open_tables;
-extern long ndb_number_of_storage_nodes;
 
 /*
   Initialize the binlog part of the ndb handlerton

=== added file 'sql/ha_ndbinfo.cc'
--- a/sql/ha_ndbinfo.cc	1970-01-01 00:00:00 +0000
+++ b/sql/ha_ndbinfo.cc	2009-11-08 12:52:27 +0000
@@ -0,0 +1,518 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "mysql_priv.h"
+#ifdef WITH_NDBCLUSTER_STORAGE_ENGINE
+#include "ha_ndbinfo.h"
+#include "../storage/ndb/src/ndbapi/NdbInfo.hpp"
+
+
+static MYSQL_THDVAR_UINT(
+  max_rows,                          /* name */
+  PLUGIN_VAR_RQCMDARG,
+  "Specify max number of rows to fetch per roundtrip to cluster",
+  NULL,                              /* check func. */
+  NULL,                              /* update func. */
+  10,                                /* default */
+  1,                                 /* min */
+  256,                               /* max */
+  0                                  /* block */
+);
+
+static MYSQL_THDVAR_UINT(
+  max_bytes,                         /* name */
+  PLUGIN_VAR_RQCMDARG,
+  "Specify approx. max number of bytes to fetch per roundtrip to cluster",
+  NULL,                              /* check func. */
+  NULL,                              /* update func. */
+  0,                                 /* default */
+  0,                                 /* min */
+  65535,                             /* max */
+  0                                  /* block */
+);
+
+static MYSQL_THDVAR_BOOL(
+  show_hidden,                       /* name */
+  PLUGIN_VAR_RQCMDARG,
+  "Control if tables should be visible or not",
+  NULL,                              /* check func. */
+  NULL,                              /* update func. */
+  FALSE                              /* default */
+);
+
+static char* ndbinfo_dbname = (char*)"ndbinfo";
+static MYSQL_SYSVAR_STR(
+  database,                         /* name */
+  ndbinfo_dbname,                   /* var */
+  PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+  "Name of the database used by ndbinfo",
+  NULL,                             /* check func. */
+  NULL,                             /* update func. */
+  NULL                              /* default */
+);
+
+static char* table_prefix = (char*)"ndb$";
+static MYSQL_SYSVAR_STR(
+  table_prefix,                     /* name */
+  table_prefix,                     /* var */
+  PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
+  "Prefix to use for all virtual tables loaded from NDB",
+  NULL,                             /* check func. */
+  NULL,                             /* update func. */
+  NULL                              /* default */
+);
+
+
+
+static NdbInfo* g_ndbinfo;
+
+extern Ndb_cluster_connection* g_ndb_cluster_connection;
+
+static bool
+ndbcluster_is_disabled(void)
+{
+  /*
+    ndbinfo uses the same connection as ndbcluster
+    to avoid using up another nodeid, this also means that
+    if ndbcluster is not enabled, ndbinfo won't start
+  */
+  if (g_ndb_cluster_connection)
+    return false;
+  assert(g_ndbinfo == NULL);
+  return true;
+}
+
+static handler*
+create_handler(handlerton *hton, TABLE_SHARE *table, MEM_ROOT *mem_root)
+{
+  return new (mem_root) ha_ndbinfo(hton, table);
+}
+
+struct ha_ndbinfo_impl
+{
+  const NdbInfo::Table* m_table;
+  NdbInfoScanOperation* m_scan_op;
+  Vector<const NdbInfoRecAttr *> m_columns;
+
+  ha_ndbinfo_impl() :
+    m_table(NULL),
+    m_scan_op(NULL)
+  {
+  }
+};
+
+ha_ndbinfo::ha_ndbinfo(handlerton *hton, TABLE_SHARE *table_arg)
+: handler(hton, table_arg), m_impl(*new ha_ndbinfo_impl)
+{
+}
+
+ha_ndbinfo::~ha_ndbinfo()
+{
+  delete &m_impl;
+}
+
+static int err2mysql(int error)
+{
+  DBUG_ENTER("err2mysql");
+  DBUG_PRINT("enter", ("error: %d", error));
+  assert(error != 0);
+  switch(error)
+  {
+  case NdbInfo::ERR_ClusterFailure:
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+    break;
+  case NdbInfo::ERR_OutOfMemory:
+    DBUG_RETURN(HA_ERR_OUT_OF_MEM);
+    break;
+  default:
+    break;
+  }
+  push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                      ER_GET_ERRNO, ER(ER_GET_ERRNO), error);
+  DBUG_RETURN(HA_ERR_INTERNAL_ERROR);
+}
+
+int ha_ndbinfo::create(const char *name, TABLE *form,
+                       HA_CREATE_INFO *create_info)
+{
+  DBUG_ENTER("ha_ndbinfo::create");
+  DBUG_PRINT("enter", ("name: %s", name));
+
+  DBUG_RETURN(0);
+}
+
+bool ha_ndbinfo::is_open(void) const
+{
+  return m_impl.m_table != NULL;
+}
+
+int ha_ndbinfo::open(const char *name, int mode, uint test_if_locked)
+{
+  DBUG_ENTER("ha_ndbinfo::open");
+  DBUG_PRINT("enter", ("name: %s, mode: %d", name, mode));
+
+  assert(is_closed());
+
+  if (mode == O_RDWR)
+  {
+    if (table->db_stat & HA_TRY_READ_ONLY)
+    {
+      DBUG_PRINT("info", ("Telling server to use readonly mode"));
+      DBUG_RETURN(EROFS); // Read only fs
+    }
+    // Find any commands that does not allow open readonly
+    DBUG_ASSERT(false);
+  }
+
+  if (ndbcluster_is_disabled())
+  {
+    // Allow table to be opened with ndbcluster disabled
+    DBUG_RETURN(0);
+  }
+
+  /* Increase "ref_length" to allow a whole row to be stored in "ref" */
+  ref_length = 0;
+  for (uint i = 0; i < table->s->fields; i++)
+    ref_length += table->field[i]->pack_length();
+  DBUG_PRINT("info", ("ref_length: %u", ref_length));
+
+  int err = g_ndbinfo->openTable(name, &m_impl.m_table);
+  if (err)
+  {
+    if (err == NdbInfo::ERR_NoSuchTable)
+      DBUG_RETURN(HA_ERR_NO_SUCH_TABLE);
+    DBUG_RETURN(err2mysql(err));
+  }
+
+  DBUG_RETURN(0);
+}
+
+int ha_ndbinfo::close(void)
+{
+  DBUG_ENTER("ha_ndbinfo::close");
+
+  if (ndbcluster_is_disabled())
+    DBUG_RETURN(0);
+
+  assert(is_open());
+  if (m_impl.m_table)
+  {
+    g_ndbinfo->closeTable(m_impl.m_table);
+    m_impl.m_table = NULL;
+  }
+  DBUG_RETURN(0);
+}
+
+int ha_ndbinfo::rnd_init(bool scan)
+{
+  DBUG_ENTER("ha_ndbinfo::rnd_init");
+  DBUG_PRINT("info", ("scan: %d", scan));
+
+  if (ndbcluster_is_disabled())
+  {
+    push_warning(current_thd, MYSQL_ERROR::WARN_LEVEL_NOTE, 1,
+                 "'NDBINFO' has been started "
+                 "in limited mode since the 'NDBCLUSTER' "
+                 "engine is disabled - no rows can be returned");
+    DBUG_RETURN(0);
+  }
+
+  assert(is_open());
+  assert(m_impl.m_scan_op == NULL); // No scan already ongoing
+
+  if (!scan)
+  {
+    // Just an init to read using 'rnd_pos'
+    DBUG_PRINT("info", ("not scan"));
+    DBUG_RETURN(0);
+  }
+
+  THD* thd = current_thd;
+  int err;
+  NdbInfoScanOperation* scan_op = NULL;
+  if ((err = g_ndbinfo->createScanOperation(m_impl.m_table,
+                                            &scan_op,
+                                            THDVAR(thd, max_rows),
+                                            THDVAR(thd, max_bytes))) != 0)
+    DBUG_RETURN(err2mysql(err));
+
+  if ((err = scan_op->readTuples()) != 0)
+    DBUG_RETURN(err2mysql(err));
+
+  /* Read all columns specified in read_set */
+  TABLE_SHARE *table_share = table->s;
+  for (uint i = 0; i < table_share->fields; i++)
+  {
+    Field *field = table->field[i];
+    if (bitmap_is_set(table->read_set, i))
+      m_impl.m_columns.push_back(scan_op->getValue(field->field_name));
+    else
+      m_impl.m_columns.push_back(NULL);
+  }
+
+  if ((err = scan_op->execute()) != 0)
+    DBUG_RETURN(err2mysql(err));
+
+  m_impl.m_scan_op = scan_op;
+  DBUG_RETURN(0);
+}
+
+int ha_ndbinfo::rnd_end()
+{
+  DBUG_ENTER("ha_ndbinfo::rnd_end");
+
+  if (ndbcluster_is_disabled())
+    DBUG_RETURN(0);
+
+  assert(is_open());
+
+  if (m_impl.m_scan_op)
+  {
+    g_ndbinfo->releaseScanOperation(m_impl.m_scan_op);
+    m_impl.m_scan_op = NULL;
+  }
+  m_impl.m_columns.clear();
+
+  DBUG_RETURN(0);
+}
+
+int ha_ndbinfo::rnd_next(uchar *buf)
+{
+  int err;
+  DBUG_ENTER("ha_ndbinfo::rnd_next");
+
+  if (ndbcluster_is_disabled())
+    DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+  assert(is_open());
+  assert(m_impl.m_scan_op);
+
+  if ((err = m_impl.m_scan_op->nextResult()) == 0)
+    DBUG_RETURN(HA_ERR_END_OF_FILE);
+
+  if (err != 1)
+    DBUG_RETURN(err2mysql(err));
+
+  unpack_record(buf);
+
+  DBUG_RETURN(0);
+}
+
+int ha_ndbinfo::rnd_pos(uchar *buf, uchar *pos)
+{
+  DBUG_ENTER("ha_ndbinfo::rnd_pos");
+  assert(is_open());
+  assert(m_impl.m_scan_op == NULL); // No scan started
+
+  /* Copy the saved row into "buf" and set all fields to not null */
+  memcpy(buf, pos, ref_length);
+  for (uint i = 0; i < table->s->fields; i++)
+    table->field[i]->set_notnull();
+
+  DBUG_RETURN(0);
+}
+
+void ha_ndbinfo::position(const uchar *record)
+{
+  DBUG_ENTER("ha_ndbinfo::position");
+  assert(is_open());
+  assert(m_impl.m_scan_op);
+
+  /* Save away the whole row in "ref" */
+  memcpy(ref, record, ref_length);
+
+  DBUG_VOID_RETURN;
+}
+
+int ha_ndbinfo::info(uint flag)
+{
+  DBUG_ENTER("ha_ndbinfo::info");
+  DBUG_PRINT("enter", ("flag: %d", flag));
+  DBUG_RETURN(0);
+}
+
+void
+ha_ndbinfo::unpack_record(uchar *dst_row)
+{
+  DBUG_ENTER("ha_ndbinfo::unpack_record");
+  my_ptrdiff_t dst_offset = dst_row - table->record[0];
+
+  TABLE_SHARE *table_share = table->s;
+  for (uint i = 0; i < table_share->fields; i++)
+  {
+    Field *field = table->field[i];
+    const NdbInfoRecAttr* record = m_impl.m_columns[i];
+    if (m_impl.m_columns[i])
+    {
+      field->set_notnull();
+      field->move_field_offset(dst_offset);
+      switch (field->type()) {
+
+      case (MYSQL_TYPE_VARCHAR):
+      {
+        DBUG_PRINT("info", ("str: %s", record->c_str()));
+        Field_varstring* vfield = (Field_varstring *) field;
+        /* Field_bit in DBUG requires the bit set in write_set for store(). */
+        my_bitmap_map *old_map =
+          dbug_tmp_use_all_columns(table, table->write_set);
+        (void)vfield->store(record->c_str(),
+                            min(record->length(), field->field_length)-1,
+                            field->charset());
+        dbug_tmp_restore_column_map(table->write_set, old_map);
+        break;
+      }
+
+      case (MYSQL_TYPE_LONG):
+      {
+        Uint32 val = record->u_32_value();
+        DBUG_PRINT("info", ("val: %d", val));
+        memcpy(field->ptr, &val, sizeof (Uint32));
+        break;
+      }
+
+      case (MYSQL_TYPE_LONGLONG):
+      {
+        Uint64 val = record->u_64_value();
+        DBUG_PRINT("info", ("val: %llu", val));
+        memcpy(field->ptr, &val, sizeof (Uint64));
+        break;
+      }
+
+      default:
+        sql_print_error("Found unexpected field type %u", field->type());
+        break;
+      }
+
+      field->move_field_offset(-dst_offset);
+    }
+    else
+    {
+      field->set_null();
+    }
+  }
+  DBUG_VOID_RETURN;
+}
+
+
+static int
+ndbinfo_find_files(handlerton *hton, THD *thd,
+                   const char *db, const char *path,
+                   const char *wild, bool dir, List<LEX_STRING> *files)
+{
+  DBUG_ENTER("ndbinfo_find_files");
+  DBUG_PRINT("enter", ("db: '%s', dir: %d", db, dir));
+
+  const bool show_hidden = THDVAR(thd, show_hidden);
+
+  if(show_hidden)
+    DBUG_RETURN(0); // Don't filter out anything
+
+  if (dir)
+    DBUG_RETURN(0); // Don't care about filtering databases
+
+  DBUG_ASSERT(db);
+  if (strcmp(db, ndbinfo_dbname))
+    DBUG_RETURN(0); // Only hide files in "our" db
+
+  /* Hide all files that start with "our" prefix */
+  LEX_STRING *file_name;
+  List_iterator<LEX_STRING> it(*files);
+  while ((file_name=it++))
+  {
+    if (is_prefix(file_name->str, table_prefix))
+    {
+      DBUG_PRINT("info", ("Hiding '%s'", file_name->str));
+      it.remove();
+    }
+  }
+
+  DBUG_RETURN(0);
+}
+
+
+handlerton* ndbinfo_hton;
+
+int ndbinfo_init(void *plugin)
+{
+  DBUG_ENTER("ndbinfo_init");
+
+  handlerton *hton = (handlerton *) plugin;
+  hton->create = create_handler;
+  hton->flags = HTON_TEMPORARY_NOT_SUPPORTED;
+  hton->find_files = ndbinfo_find_files;
+
+  ndbinfo_hton = hton;
+
+  if (ndbcluster_is_disabled())
+  {
+    // Starting in limited mode since ndbcluster is disabled
+     DBUG_RETURN(0);
+  }
+
+  char prefix[FN_REFLEN];
+  build_table_filename(prefix, sizeof(prefix) - 1,
+                       ndbinfo_dbname, table_prefix, "", 0);
+  DBUG_PRINT("info", ("prefix: '%s'", prefix));
+  assert(g_ndb_cluster_connection);
+  g_ndbinfo = new NdbInfo(g_ndb_cluster_connection, prefix,
+                          ndbinfo_dbname, table_prefix);
+  if (!g_ndbinfo)
+  {
+    sql_print_error("Failed to create NdbInfo");
+    DBUG_RETURN(1);
+  }
+
+  if (!g_ndbinfo->init())
+  {
+    sql_print_error("Failed to init NdbInfo");
+
+    delete g_ndbinfo;
+    g_ndbinfo = NULL;
+
+    DBUG_RETURN(1);
+  }
+
+  DBUG_RETURN(0);
+}
+
+int ndbinfo_deinit(void *plugin)
+{
+  DBUG_ENTER("ndbinfo_deinit");
+
+  if (g_ndbinfo)
+  {
+    delete g_ndbinfo;
+    g_ndbinfo = NULL;
+  }
+
+  DBUG_RETURN(0);
+}
+
+struct st_mysql_sys_var* ndbinfo_system_variables[]= {
+  MYSQL_SYSVAR(max_rows),
+  MYSQL_SYSVAR(max_bytes),
+  MYSQL_SYSVAR(show_hidden),
+  MYSQL_SYSVAR(database),
+  MYSQL_SYSVAR(table_prefix),
+
+  NULL
+};
+
+template class Vector<const NdbInfoRecAttr*>;
+
+#endif

=== added file 'sql/ha_ndbinfo.h'
--- a/sql/ha_ndbinfo.h	1970-01-01 00:00:00 +0000
+++ b/sql/ha_ndbinfo.h	2009-11-08 12:52:27 +0000
@@ -0,0 +1,73 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef HA_NDBINFO_H
+#define HA_NDBINFO_H
+
+#include <mysql/plugin.h>
+
+int ndbinfo_init(void *plugin);
+int ndbinfo_deinit(void *plugin);
+
+class ha_ndbinfo: public handler
+{
+public:
+  ha_ndbinfo(handlerton *hton, TABLE_SHARE *table_arg);
+  ~ha_ndbinfo();
+
+  const char *table_type() const { return "NDBINFO"; }
+  const char **bas_ext() const {
+    static const char *null[] = { NullS };
+    return null;
+  }
+  ulonglong table_flags() const {
+    return HA_REC_NOT_IN_SEQ | HA_NO_TRANSACTIONS;
+  }
+  ulong index_flags(uint inx, uint part, bool all_parts) const {
+    return 0;
+  }
+
+  int create(const char *name, TABLE *form,
+             HA_CREATE_INFO *create_info);
+
+  int open(const char *name, int mode, uint test_if_locked);
+  int close(void);
+
+  int rnd_init(bool scan);
+  int rnd_end();
+  int rnd_next(uchar *buf);
+  int rnd_pos(uchar *buf, uchar *pos);
+  void position(const uchar *record);
+  int info(uint);
+
+  THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
+                             enum thr_lock_type lock_type) {
+    return to;
+  }
+
+private:
+  void unpack_record(uchar *dst_row);
+
+  bool is_open(void) const;
+  bool is_closed(void) const { return ! is_open(); };
+
+  struct ha_ndbinfo_impl& m_impl;
+
+};
+
+#endif

=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2009-10-19 09:44:26 +0000
+++ b/storage/ndb/CMakeLists.txt	2009-11-08 12:52:27 +0000
@@ -43,6 +43,7 @@ SET(NDBCLUSTER_SOURCES
 	../../sql/ha_ndbcluster.cc
 	../../sql/ha_ndbcluster_cond.cc
 	../../sql/ha_ndbcluster_connection.cc
-	../../sql/ha_ndbcluster_binlog.cc)
+	../../sql/ha_ndbcluster_binlog.cc
+	../../sql/ha_ndbinfo.cc)
 SET(NDBCLUSTER_LIBS ndbclient)
 MYSQL_STORAGE_ENGINE(NDBCLUSTER)

=== modified file 'storage/ndb/include/Makefile.am'
--- a/storage/ndb/include/Makefile.am	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/Makefile.am	2009-11-08 12:52:27 +0000
@@ -20,8 +20,7 @@ ndbinclude_HEADERS = \
 ndb_constants.h \
 ndb_init.h \
 ndb_types.h \
-ndb_version.h \
-ndbinfo.h
+ndb_version.h
 
 ndbapiinclude_HEADERS = \
 ndbapi/ndbapi_limits.h \

=== modified file 'storage/ndb/include/kernel/signaldata/DbinfoScan.hpp'
--- a/storage/ndb/include/kernel/signaldata/DbinfoScan.hpp	2008-11-10 11:44:02 +0000
+++ b/storage/ndb/include/kernel/signaldata/DbinfoScan.hpp	2009-11-08 12:52:27 +0000
@@ -1,4 +1,6 @@
-/* Copyright (C) 2007 MySQL AB
+/* 
+   Copyright (C) 2007 MySQL AB, 2009 Sun Microsystems, Inc.
+    All rights reserved. Use is subject to license terms.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -20,107 +22,62 @@
 
 struct DbinfoScanCursor
 {
-  Uint32 cur_requestInfo;
-  Uint32 cur_node;
-  Uint32 cur_block;
-  Uint32 cur_item;
+  Uint32 data[11];
 };
 
-/**
- * SENDER:  API,MGM
- * RECIVER: DBINFO
- */
-struct DbinfoScanReq
+struct DbinfoScan
 {
-  /* Reciver(s) */
-  friend class Dbinfo;
+  STATIC_CONST( SignalLength = 12 );
 
-  /* Sender(s) */
-
-  friend bool printDBINFO_SCANREQ(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
-
-  STATIC_CONST( SignalLength = 10 );
-  STATIC_CONST( SignalLengthWithCursor = 14 );
-//private:
-  Uint32 tableId;         // DBINFO table ID
-  Uint32 senderRef;       // API doing scan
-  Uint32 apiTxnId;        // ID unique to API.
-  Uint32 colBitmapLo;     // bitmap of what columns you want. (64bit)
-  Uint32 colBitmapHi;
-  Uint32 requestInfo;     // start, endofdata
-
-  STATIC_CONST( StartScan  = 0x1 );
-  STATIC_CONST( AllColumns = 0x2 );
-
-  Uint32 maxRows;
-  Uint32 maxBytes;
-  Uint32 rows_total;
-  Uint32 word_total;
-
-  union
-  {
-    Uint32 cursordata[1];
-    struct DbinfoScanCursor cursor;
-  };
-};
-
-/**
- * SENDER:  DBINFO
- * RECIVER: API,MGM
- */
-class DbinfoScanConf {
-  /* Reciver(s) */
-
-  /* Sender(s) */
-  friend class Dbinfo;
-
-  friend bool printDBINFO_SCANCONF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
-
-public:
-  STATIC_CONST( SignalLength = 10 );
-  STATIC_CONST( SignalLengthWithCursor = 14 );
+  // API identifiers
+  Uint32 resultData;      // Will be returned in TransIdAI::connectPtr
+  Uint32 transId[2];      // ID unique to API
+  Uint32 resultRef;       // Where to send result rows
 
+  // Parameters for the scan
   Uint32 tableId;         // DBINFO table ID
-  Uint32 senderRef;       // API doing scan
-  Uint32 apiTxnId;        // ID unique to API.
-  Uint32 colBitmapLo;     // bitmap of what columns you want. (64bit)
-  Uint32 colBitmapHi;
-  Uint32 requestInfo;     // start, endofdata
-
-  STATIC_CONST( MoreData  = 0x1 );
-  STATIC_CONST( AllColumns = 0x2 );
-
-  Uint32 maxRows;
-  Uint32 maxBytes;
-  Uint32 rows_total;
-  Uint32 word_total;
+  Uint32 colBitmap[2];     // bitmap of what columns you want. (64bit)
+  Uint32 requestInfo;     // flags
+  Uint32 maxRows;         // Max number of rows to return per REQ
+  Uint32 maxBytes;        // Max number of bytes to return per REQ
+
+  // Result from the scan
+  Uint32 returnedRows;    // Number of rows returned for this CONF
+
+  // Cursor that contains data used by the kernel for keeping track
+  // of where it is, how many bytes or rows it has sent etc.
+  // Set to zero in last CONF to indicate that scan is finished
+  Uint32 cursor_sz;
+  // Cursor data of cursor_sz size follows
+  DbinfoScanCursor cursor;
+
+  static const Uint32* getCursorPtr(const DbinfoScan* sig) {
+    return sig->cursor.data;
+  }
+  static Uint32* getCursorPtrSend(DbinfoScan* sig) {
+    return sig->cursor.data;
+  }
 
-  union
-  {
-    Uint32 cursordata[1];
-    struct DbinfoScanCursor cursor;
-  };
 };
 
-/**
- * SENDER:  DBINFO
- * RECIVER: API,MGM
- */
-class DbinfoScanRef {
-  /* Reciver(s) */
-
-  /* Sender(s) */
-  friend class Dbinfo;
+typedef DbinfoScan DbinfoScanReq;
+typedef DbinfoScan DbinfoScanConf;
 
-  friend bool printDBINFO_SCANREF(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo);
+struct DbinfoScanRef
+{
+  STATIC_CONST( SignalLength = 5 );
 
-public:
-  STATIC_CONST( SignalLength = 3 );
+  // API identifiers
+  Uint32 resultData;      // Will be returned in TransIdAI::connectPtr
+  Uint32 transId[2];      // ID unique to API
+  Uint32 resultRef;       // Where to send result rows
 
-private:
-  Uint32 tableId;         // DBINFO table ID
-  Uint32 apiTxnId;        // ID unique to API.
   Uint32 errorCode;       // Error Code
+  enum ErrorCode
+  {
+    NoError = 0,
+    NoTable = 4800
+  };
 };
 
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/SignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalData.hpp	2009-09-21 08:42:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalData.hpp	2009-11-08 12:52:27 +0000
@@ -281,4 +281,7 @@ GSN_PRINT_SIGNATURE(printAPI_VERSION_CON
 
 GSN_PRINT_SIGNATURE(printLOCAL_ROUTE_ORD);
 
+GSN_PRINT_SIGNATURE(printDBINFO_SCAN);
+GSN_PRINT_SIGNATURE(printDBINFO_SCAN_REF);
+
 #endif

=== modified file 'storage/ndb/include/mgmapi/mgmapi.h'
--- a/storage/ndb/include/mgmapi/mgmapi.h	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/mgmapi/mgmapi.h	2009-11-08 12:52:27 +0000
@@ -1226,29 +1226,6 @@ extern "C" {
                           int len, char* str);
 
   /**
-   * Query NDB$INFO.
-   * On success, returns number of rows.
-   * ndb_mgm_ndbinfo_getrow() *MUST* be called that many times.
-   */
-  int ndb_mgm_ndbinfo(NdbMgmHandle handle, const char *query, int *rows);
-
-  /**
-   * Gets the column names for NDBINFO query
-   *
-   * Must be called after successful ndb_mgm_ndbinfo() but BEFORE
-   * ndb_mgm_getrows.
-   */
-  int ndb_mgm_ndbinfo_colcount(NdbMgmHandle h);
-  int ndb_mgm_ndbinfo_getcolums(NdbMgmHandle h, int n, int l, char** c);
-  char* ndb_mgm_ndbinfo_nextcolumn(char* row, int *len);
-
-  /**
-   * returns zero on success.
-   * Must be called after ndb_mgm_ndbinfo and after ndb_mgm_ndbinfo_getcolumns
-   */
-  int ndb_mgm_ndbinfo_getrow(NdbMgmHandle h, char* row, int len);
-
-  /**
    * Config iterator
    */
   typedef struct ndb_mgm_configuration_iterator ndb_mgm_configuration_iterator;

=== removed file 'storage/ndb/include/ndbinfo.h'
--- a/storage/ndb/include/ndbinfo.h	2008-11-10 11:44:02 +0000
+++ b/storage/ndb/include/ndbinfo.h	1970-01-01 00:00:00 +0000
@@ -1,134 +0,0 @@
-/* Copyright (C) 2007 MySQL AB
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; version 2 of the License.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
-
-#include <ndb_types.h>
-
-#ifndef __NDBINFO_H__
-#define __NDBINFO_H__
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#define NDBINFO_TYPE_STRING 1
-#define NDBINFO_TYPE_NUMBER 2
-
-struct ndbinfo_column {
-  char name[50];
-  int coltype;
-};
-
-#define NDBINFO_CONSTANT_TABLE 0x1
-
-#define NDBINFO_TABLE_MEMBERS \
-  char name[50]; \
-  int ncols; \
-  int flags;
-
-struct ndbinfo_table {
-  NDBINFO_TABLE_MEMBERS
-  struct ndbinfo_column col[1];
-};
-
-#define DECLARE_NDBINFO_TABLE(var, num)         \
-struct ndbinfostruct##var {                     \
-  NDBINFO_TABLE_MEMBERS                         \
-  struct ndbinfo_column col[num];               \
-} var
-
-int ndbinfo_create_sql(struct ndbinfo_table *t, char* sql, int len);
-
-static inline const char* ndbinfo_coltype_to_string(int coltype)
-{
-  static const char* ndbinfo_type_string[]= {"NONE","VARCHAR(255)","BIGINT"};
-
-  if(coltype>3)
-    coltype= 0;
-
-  return ndbinfo_type_string[coltype];
-}
-
-struct dbinfo_row {
-  char *buf;
-  int   endrow;
-  int   blen;
-  int   c;
-};
-
-void dbinfo_write_row_init(struct dbinfo_row *r, char* buf, int len);
-
-int dbinfo_write_row_column(struct dbinfo_row *r, const char* col, int clen);
-
-int dbinfo_write_row_column_uint32(struct dbinfo_row *r, Uint32 value);
-
-/*
- * We need to call protected function of SimulatedBlock (sendSignal)
- * so easier to implement as macro...
- */
-#define dbinfo_send_row(signal, r, rl, apiTxnId, senderRef)             \
-  do {                                                                  \
-  TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();               \
-  tidai->connectPtr= 0;                                                 \
-  tidai->transId[0]= apiTxnId;                                          \
-  tidai->transId[1]= 0;                                                 \
-  LinearSectionPtr ptr[3];                                              \
-  ptr[0].p= (Uint32*)r.buf;                                             \
-  ptr[0].sz= (Uint32)r.endrow;                                          \
-  rl.rows++;                                                            \
-  rl.bytes+=r.endrow;                                                   \
-  sendSignal(senderRef, GSN_DBINFO_TRANSID_AI, signal, 3, JBB, ptr, 1); \
-} while (0)
-
-#define dbinfo_ratelimit_sendconf(signal, req, rl, itemnumber)          \
-  do {                                                                  \
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();      \
-  conf->tableId= (req).tableId;                                         \
-  conf->senderRef= (req).senderRef;                                     \
-  conf->apiTxnId= (req).apiTxnId;                                       \
-  conf->colBitmapLo= (req).colBitmapLo;                                 \
-  conf->colBitmapHi= (req).colBitmapHi;                                 \
-  conf->requestInfo= (req).requestInfo | DbinfoScanConf::MoreData;      \
-  conf->cursor.cur_requestInfo= 0;                                      \
-  conf->cursor.cur_node= getOwnNodeId();                                \
-  conf->cursor.cur_block= number();                                     \
-  conf->cursor.cur_item= (itemnumber);                                  \
-  conf->maxRows= (rl).maxRows;                                          \
-  conf->maxBytes= (rl).maxBytes;                                        \
-  conf->rows_total= (rl).rows_total + (rl).rows;                        \
-  conf->word_total= (rl).bytes_total+ (rl).bytes;                       \
-  sendSignal((req).senderRef, GSN_DBINFO_SCANCONF, signal,              \
-             DbinfoScanConf::SignalLengthWithCursor, JBB);              \
-} while (0)
-
-
-struct dbinfo_ratelimit {
-  Uint32 maxRows;
-  Uint32 maxBytes;
-  Uint32 rows_total;
-  Uint32 bytes_total;
-  Uint32 rows;
-  Uint32 bytes;
-};
-
-struct DbinfoScanReq;
-void dbinfo_ratelimit_init(struct dbinfo_ratelimit *rl, struct DbinfoScanReq *);
-
-int dbinfo_ratelimit_continue(struct dbinfo_ratelimit *rl);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif

=== modified file 'storage/ndb/include/util/BaseString.hpp'
--- a/storage/ndb/include/util/BaseString.hpp	2009-08-26 09:01:17 +0000
+++ b/storage/ndb/include/util/BaseString.hpp	2009-11-08 12:52:27 +0000
@@ -20,7 +20,7 @@
 #define __UTIL_BASESTRING_HPP_INCLUDED__
 
 #include <ndb_global.h>
-#include <Vector.hpp>
+#include <util/Vector.hpp>
 #include "Bitmask.hpp"
 
 /**

=== modified file 'storage/ndb/include/util/HashMap.hpp'
--- a/storage/ndb/include/util/HashMap.hpp	2009-05-13 09:06:43 +0000
+++ b/storage/ndb/include/util/HashMap.hpp	2009-11-08 12:52:27 +0000
@@ -150,10 +150,27 @@ public:
     return true;
   }
 
+  bool remove(size_t i) {
+    Entry* entry = (Entry*)my_hash_element(&m_hash, i);
+    if (entry == NULL)
+      return false;
+
+    if (my_hash_delete(&m_hash, (uchar*)entry))
+      return false;
+    return true;
+  }
+
   size_t entries(void) const {
     return m_hash.records;
   }
 
+  T* value(size_t i) const {
+    Entry* entry = (Entry*)my_hash_element((HASH*)&m_hash, i);
+    if (entry == NULL)
+      return NULL;
+    return &(entry->m_value);
+  }
+
 };
 
 #endif

=== modified file 'storage/ndb/include/util/Vector.hpp'
--- a/storage/ndb/include/util/Vector.hpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/util/Vector.hpp	2009-11-08 12:52:27 +0000
@@ -20,7 +20,7 @@
 #define NDB_VECTOR_HPP
 
 #include <ndb_global.h>
-#include <NdbMutex.h>
+#include <portlib/NdbMutex.h>
 
 template<class T>
 class Vector {

=== modified file 'storage/ndb/src/common/util/CMakeLists.txt'
--- a/storage/ndb/src/common/util/CMakeLists.txt	2009-09-23 02:13:25 +0000
+++ b/storage/ndb/src/common/util/CMakeLists.txt	2009-11-08 12:52:27 +0000
@@ -53,7 +53,6 @@ ADD_LIBRARY(ndbgeneral STATIC
             ndb_opts.c
             basestring_vsnprintf.c
             Bitmask.cpp
-	    ndbinfo.c
 )
 TARGET_LINK_LIBRARIES(ndbgeneral zlib mysys ws2_32)
 

=== modified file 'storage/ndb/src/common/util/Makefile.am'
--- a/storage/ndb/src/common/util/Makefile.am	2009-06-03 16:04:23 +0000
+++ b/storage/ndb/src/common/util/Makefile.am	2009-11-08 12:52:27 +0000
@@ -28,8 +28,7 @@ libgeneral_la_SOURCES = \
             strdup.c \
             ConfigValues.cpp ndb_init.cpp basestring_vsnprintf.c \
             Bitmask.cpp \
-	    ndb_rand.c \
-	    ndbinfo.c
+	    ndb_rand.c
 
 INCLUDES_LOC = @ZLIB_INCLUDES@
 

=== removed file 'storage/ndb/src/common/util/ndbinfo.c'
--- a/storage/ndb/src/common/util/ndbinfo.c	2008-10-08 06:32:05 +0000
+++ b/storage/ndb/src/common/util/ndbinfo.c	1970-01-01 00:00:00 +0000
@@ -1,54 +0,0 @@
-/* Copyright (C) 2007 MySQL AB
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; version 2 of the License.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
-
-#include <stdio.h>
-#include <string.h>
-#include <errno.h>
-
-#include <ndb_global.h>
-#include <my_base.h>
-#include <m_string.h> /* for my_snprintf */
-
-#include <ndbinfo.h>
-
-int ndbinfo_create_sql(struct ndbinfo_table *t, char* sql, int len)
-{
-  int i;
-
-  my_snprintf(sql,len,"CREATE TABLE `%s` (", t->name);
-
-  len-=strlen(sql);
-  sql+=strlen(sql);
-  if(len<0)
-    return ENOMEM;
-
-  for(i=0;i<t->ncols;i++)
-  {
-    my_snprintf(sql,len,"\n\t`%s` %s,",
-             t->col[i].name, ndbinfo_coltype_to_string(t->col[i].coltype));
-    len-=strlen(sql);
-    sql+=strlen(sql);
-    if(len<0)
-      return ENOMEM;
-  }
-  *(--sql)='\0';
-  my_snprintf(sql,len,"\n) ENGINE=NDBINFO;");
-  len-=strlen(sql);
-  sql+=strlen(sql);
-  if(len<0)
-    return ENOMEM;
-
-  return 0;
-}

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2009-05-12 18:40:56 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2009-11-08 12:52:27 +0000
@@ -83,6 +83,10 @@ LocalProxy::LocalProxy(BlockNumber block
   addRecSignal(GSN_DROP_TRIG_IMPL_REQ, &LocalProxy::execDROP_TRIG_IMPL_REQ);
   addRecSignal(GSN_DROP_TRIG_IMPL_CONF, &LocalProxy::execDROP_TRIG_IMPL_CONF);
   addRecSignal(GSN_DROP_TRIG_IMPL_REF, &LocalProxy::execDROP_TRIG_IMPL_REF);
+
+  // GSN_DBINFO_SCANREQ
+  addRecSignal(GSN_DBINFO_SCANREQ, &LocalProxy::execDBINFO_SCANREQ);
+  addRecSignal(GSN_DBINFO_SCANCONF, &LocalProxy::execDBINFO_SCANCONF);
 }
 
 LocalProxy::~LocalProxy()
@@ -971,4 +975,143 @@ LocalProxy::sendDROP_TRIG_IMPL_CONF(Sign
   ssRelease<Ss_DROP_TRIG_IMPL_REQ>(ssId);
 }
 
+// GSN_DBINFO_SCANREQ
+
+bool
+LocalProxy::find_next(Ndbinfo::ScanCursor* cursor) const
+{
+  jam();
+  const Uint32 node = refToNode(cursor->currRef);
+  const Uint32 block = refToMain(cursor->currRef);
+  Uint32 instance = refToInstance(cursor->currRef);
+
+  ndbrequire(node == getOwnNodeId());
+  ndbrequire(block == number());
+
+  if (instance++ < c_workers)
+  {
+    cursor->currRef = numberToRef(block, instance, node);
+    return true;
+  }
+
+  cursor->currRef = numberToRef(block, node);
+  return false;
+}
+
+
+
+void
+LocalProxy::execDBINFO_SCANREQ(Signal* signal)
+{
+  jamEntry();
+  const DbinfoScanReq* req = (const DbinfoScanReq*) signal->getDataPtr();
+  Uint32 signal_length = signal->getLength();
+  ndbrequire(signal_length == DbinfoScanReq::SignalLength+req->cursor_sz);
+
+  Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(req);
+
+  if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) &&
+      cursor->saveCurrRef)
+  {
+    /* Continue in the saved block ref */
+    cursor->currRef = cursor->saveCurrRef;
+    cursor->saveCurrRef = 0;
+
+    // Set this block as sender and remember original sender
+    cursor->saveSenderRef = cursor->senderRef;
+    cursor->senderRef = reference();
+
+    sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
+               signal, signal_length, JBB);
+    return;
+  }
+
+  Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
+
+  if (find_next(cursor))
+  {
+    jam();
+    ndbrequire(cursor->currRef);
+    ndbrequire(cursor->saveCurrRef == 0);
+
+    // Set this block as sender and remember original sender
+    cursor->saveSenderRef = cursor->senderRef;
+    cursor->senderRef = reference();
+
+    sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
+               signal, signal_length, JBB);
+    return;
+  }
+
+  /* Scan is done, send SCANCONF back to caller  */
+
+  /* Swap back saved senderRef */
+  const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
+  cursor->saveSenderRef = 0;
+
+  ndbrequire(cursor->currRef);
+  ndbrequire(cursor->saveCurrRef == 0);
+
+  ndbrequire(refToInstance(cursor->currRef) == 0);
+  sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
+  return;
+}
+
+void
+LocalProxy::execDBINFO_SCANCONF(Signal* signal)
+{
+  jamEntry();
+  const DbinfoScanConf* conf = (const DbinfoScanConf*)signal->getDataPtr();
+  Uint32 signal_length = signal->getLength();
+  ndbrequire(signal_length == DbinfoScanConf::SignalLength+conf->cursor_sz);
+
+  Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(conf);
+
+  if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags))
+  {
+    /* The underlying block want to continue */
+    jam();
+    
+    /* Swap back saved senderRef */
+    const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
+    cursor->saveSenderRef = 0;
+
+    /* Save currRef to continue with same instance again */
+    cursor->saveCurrRef = cursor->currRef;
+    cursor->currRef = reference();
+
+    sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
+    return;
+  } 
+  
+  /* The underlying block reported completed, find next if any */
+  if (find_next(cursor))
+  {
+    jam();
+
+    ndbrequire(cursor->senderRef == reference());
+    ndbrequire(cursor->saveSenderRef); // Should already be set
+
+    ndbrequire(cursor->saveCurrRef == 0);
+
+    sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
+               signal, signal_length, JBB);
+    return;
+  }
+
+  /* Scan in this block and its instances are completed */
+
+  /* Swap back saved senderRef */
+  const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
+  cursor->saveSenderRef = 0;
+
+  ndbrequire(cursor->currRef);
+  ndbrequire(cursor->saveCurrRef == 0);
+
+  sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
+  return;
+}
+
 BLOCK_FUNCTIONS(LocalProxy)

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2009-04-16 15:40:18 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2009-11-08 12:52:27 +0000
@@ -28,6 +28,7 @@
 #include <signaldata/NFCompleteRep.hpp>
 #include <signaldata/CreateTrigImpl.hpp>
 #include <signaldata/DropTrigImpl.hpp>
+#include <signaldata/DbinfoScan.hpp>
 
 /*
  * Proxy blocks for MT LQH.
@@ -553,6 +554,11 @@ protected:
   void execDROP_TRIG_IMPL_CONF(Signal*);
   void execDROP_TRIG_IMPL_REF(Signal*);
   void sendDROP_TRIG_IMPL_CONF(Signal*, Uint32 ssId);
+
+  // GSN_DBINFO_SCANREQ
+  bool find_next(Ndbinfo::ScanCursor* cursor) const;
+  void execDBINFO_SCANREQ(Signal*);
+  void execDBINFO_SCANCONF(Signal*);
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/Makefile.am'
--- a/storage/ndb/src/kernel/blocks/Makefile.am	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/blocks/Makefile.am	2009-11-08 12:52:27 +0000
@@ -65,7 +65,6 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp
   backup/BackupProxy.cpp \
   RestoreProxy.cpp \
   dbinfo/Dbinfo.cpp \
-  dbinfo/ndbinfo_helpers.cpp \
   dblqh/DblqhCommon.cpp \
   PgmanProxy.cpp \
   dbtup/DbtupClient.cpp

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2009-11-04 16:49:02 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2009-11-08 12:53:35 +0000
@@ -63,8 +63,6 @@
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
 
 #include <NdbTick.h>
 #include <dbtup/Dbtup.hpp>
@@ -667,53 +665,62 @@ void Backup::execDBINFO_SCANREQ(Signal *
 {
   jamEntry();
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
 
-  char buf[512];
-  struct dbinfo_ratelimit rl;
-  struct dbinfo_row r;
+  Ndbinfo::Ratelimit rl;
 
-  dbinfo_ratelimit_init(&rl, &req);
-
-  if(req.tableId == NDBINFO_BACKUP_RECORDS_TABLEID)
+  if(req.tableId == Ndbinfo::BACKUP_RECORDS_TABLEID)
   {
+#if 0
+// TODO
     BackupRecordPtr ptr LINT_SET_PTR;
-    for(c_backups.first(ptr); ptr.i != RNIL; c_backups.next(ptr))
+    ptr.i = cursor->data[0];
+    while(c_backups.get(ptr))
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      dbinfo_write_row_column_uint32(&r, ptr.i);
-      dbinfo_write_row_column_uint32(&r, ptr.p->backupId);
-      dbinfo_write_row_column_uint32(&r, ptr.p->masterRef);
-      dbinfo_write_row_column_uint32(&r, ptr.p->clientRef);
-      dbinfo_write_row_column_uint32(&r, ptr.p->slaveState.getState());
-      dbinfo_write_row_column_uint32(&r, (Uint32)ptr.p->noOfBytes); // TODO
-      dbinfo_write_row_column_uint32(&r, (Uint32)ptr.p->noOfRecords); // TODO
-      dbinfo_write_row_column_uint32(&r, (Uint32)ptr.p->noOfLogBytes); // TODO
-      dbinfo_write_row_column_uint32(&r, (Uint32)ptr.p->noOfLogRecords); //TODO
-      dbinfo_write_row_column_uint32(&r, ptr.p->errorCode);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
-    }
-  }
-  else if( req.tableId == NDBINFO_BACKUP_PARAMETERS_TABLEID )
-  {
-    dbinfo_write_row_init(&r, buf, sizeof(buf));
-    dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-    dbinfo_write_row_column_uint32(&r, m_curr_disk_write_speed);
-    dbinfo_write_row_column_uint32(&r, 4*m_words_written_this_period);
-    dbinfo_write_row_column_uint32(&r, m_overflow_disk_write);
-    dbinfo_write_row_column_uint32(&r, m_reset_delay_used);
-    dbinfo_write_row_column_uint32(&r, 0); // Uninteresting m_reset_disk_speed_time);
-    dbinfo_write_row_column_uint32(&r, c_backupPool.getSize());
-    dbinfo_write_row_column_uint32(&r, c_backupFilePool.getSize());
-    dbinfo_write_row_column_uint32(&r, c_tablePool.getSize());
-    dbinfo_write_row_column_uint32(&r, c_triggerPool.getSize());
-    dbinfo_write_row_column_uint32(&r, c_fragmentPool.getSize());
-    dbinfo_write_row_column_uint32(&r, c_pagePool.getSize());
-    dbinfo_write_row_column_uint32(&r, c_defaults.m_compressed_backup);
-    dbinfo_write_row_column_uint32(&r, c_defaults.m_compressed_lcp);
-    dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(ptr.i);
+      row.write_uint32(ptr.p->backupId);
+      row.write_uint32(ptr.p->masterRef);
+      row.write_uint32(ptr.p->clientRef);
+      row.write_uint32(ptr.p->slaveState.getState());
+      row.write_uint32((Uint32)ptr.p->noOfBytes); // TODO
+      row.write_uint32((Uint32)ptr.p->noOfRecords); // TODO
+      row.write_uint32((Uint32)ptr.p->noOfLogBytes); // TODO
+      row.write_uint32((Uint32)ptr.p->noOfLogRecords); //TODO
+      row.write_uint32(ptr.p->errorCode);
+      ndbinfo_send_row(signal, req, row, rl);
+      c_backups.next(ptr);
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
+    }
+#endif
+  }
+  else if(req.tableId == Ndbinfo::BACKUP_PARAMETERS_TABLEID)
+  {
+    Ndbinfo::Row row(signal, req);
+    row.write_uint32(getOwnNodeId());
+    row.write_uint32(m_curr_disk_write_speed);
+    row.write_uint32(4*m_words_written_this_period);
+    row.write_uint32(m_overflow_disk_write);
+    row.write_uint32(m_reset_delay_used);
+    row.write_uint32(0); // Uninteresting m_reset_disk_speed_time);
+    row.write_uint32(c_backupPool.getSize());
+    row.write_uint32(c_backupFilePool.getSize());
+    row.write_uint32(c_tablePool.getSize());
+    row.write_uint32(c_triggerPool.getSize());
+    row.write_uint32(c_fragmentPool.getSize());
+    row.write_uint32(c_pagePool.getSize());
+    row.write_uint32(c_defaults.m_compressed_backup);
+    row.write_uint32(c_defaults.m_compressed_lcp);
+    ndbinfo_send_row(signal, req, row, rl);
   }
-  else if( req.tableId == NDBINFO_POOLS_TABLEID )
+  else if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
     struct {
       const char* poolname;
@@ -742,24 +749,30 @@ void Backup::execDBINFO_SCANREQ(Signal *
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "BACKUP";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
     }
   }
 
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 bool

=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2009-10-12 07:07:14 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2009-11-08 12:52:27 +0000
@@ -40,6 +40,7 @@
 #include <signaldata/DisconnectRep.hpp>
 #include <signaldata/EnableCom.hpp>
 #include <signaldata/RouteOrd.hpp>
+#include <signaldata/DbinfoScan.hpp>
 
 #include <EventLogger.hpp>
 #include <TimeQueue.hpp>
@@ -108,6 +109,7 @@ Cmvmi::Cmvmi(Block_context& ctx) :
 
   addRecSignal(GSN_CONTINUEB, &Cmvmi::execCONTINUEB);
   addRecSignal(GSN_ROUTE_ORD, &Cmvmi::execROUTE_ORD);
+  addRecSignal(GSN_DBINFO_SCANREQ, &Cmvmi::execDBINFO_SCANREQ);
   
   subscriberPool.setSize(5);
   
@@ -1299,6 +1301,57 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal
   }
 }//Cmvmi::execDUMP_STATE_ORD()
 
+
+void Cmvmi::execDBINFO_SCANREQ(Signal *signal)
+{
+  DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
+
+  jamEntry();
+
+  if(req.tableId == Ndbinfo::TRP_STATUS_TABLEID)
+  {
+    jam();
+    Uint32 rnode = cursor->data[0];
+    if (rnode == 0)
+      rnode++; // Skip node 0
+
+    while(rnode < MAX_NODES)
+    {
+      switch(getNodeInfo(rnode).m_type)
+      {
+      default:
+      {
+        jam();
+        Ndbinfo::Row row(signal, req);
+        row.write_uint32(getOwnNodeId()); // Node id
+        row.write_uint32(rnode); // Remote node id
+        row.write_uint32(globalTransporterRegistry.ioState(rnode)); // State
+        ndbinfo_send_row(signal, req, row, rl);
+       break;
+      }
+
+      case NodeInfo::INVALID:
+        jam();
+       break;
+      }
+
+      rnode++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, rnode);
+        return;
+      }
+    }
+  }
+
+  ndbinfo_send_scan_conf(signal, req, rl);
+}
+
+
 void
 Cmvmi::execNODE_START_REP(Signal* signal)
 {

=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp	2009-10-12 07:07:14 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp	2009-11-08 12:52:27 +0000
@@ -73,7 +73,9 @@ private:
   void execCONTINUEB(Signal* signal);
 
   void execROUTE_ORD(Signal* signal);
-  
+
+  void execDBINFO_SCANREQ(Signal *signal);
+
   char theErrorMessage[256];
   void sendSTTORRY(Signal* signal);
 

=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2009-09-22 14:34:26 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp	2009-11-08 12:52:27 +0000
@@ -36,12 +36,6 @@
 #include <signaldata/TransIdAI.hpp>
 #include <KeyDescriptor.hpp>
 
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
-
-// TO_DO_RONM is a label for comments on what needs to be improved in future versions
-// when more time is given.
-
 #ifdef VM_TRACE
 #define DEBUG(x) ndbout << "DBACC: "<< x << endl;
 #else
@@ -8244,31 +8238,23 @@ void Dbacc::execDBINFO_SCANREQ(Signal *s
   jamEntry();
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
 
-  char buf[512];
-  struct dbinfo_ratelimit rl;
-  struct dbinfo_row r;
-
-  dbinfo_ratelimit_init(&rl, &req);
-
-  if(req.tableId == NDBINFO_MEMUSAGE_TABLEID)
-  {
-    dbinfo_write_row_init(&r, buf, sizeof(buf));
-    const char *imstr= "IndexMemory";
-    dbinfo_write_row_column(&r, "IndexMemory", 11);
-    dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-    Uint32 page_size_kb= sizeof(*rpPageptr.p);;
-    dbinfo_write_row_column(&r, (char*)&page_size_kb, 4); // 8kb
-    dbinfo_write_row_column_uint32(&r, cnoOfAllocatedPages); // alloced pages
-    dbinfo_write_row_column_uint32(&r, cpagesize); // number of pages
-    dbinfo_write_row_column(&r, "DBACC", strlen("DBACC"));
-    dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
-  }
-
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  Ndbinfo::Ratelimit rl;
+
+  if(req.tableId == Ndbinfo::MEMUSAGE_TABLEID)
+  {
+    jam();
+    Ndbinfo::Row row(signal, req);
+    row.write_uint32(getOwnNodeId());
+    row.write_uint32(blockToMain(number())); // block number
+    row.write_uint32(instance());            // block instance
+    row.write_string("IndexMemory");
+    row.write_uint32(sizeof(page8));
+    row.write_uint32(cnoOfAllocatedPages);   // alloced pages
+    row.write_uint32(cpagesize);             // number of pages
+    ndbinfo_send_row(signal, req, row, rl);
+  }
+
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 void

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2009-10-30 10:05:35 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2009-11-08 12:53:35 +0000
@@ -99,8 +99,6 @@ extern EventLogger * g_eventLogger;
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
 
 #define ZNOT_FOUND 626
 #define ZALREADYEXIST 630
@@ -277,15 +275,13 @@ Dbdict::execDUMP_STATE_ORD(Signal* signa
 void Dbdict::execDBINFO_SCANREQ(Signal *signal)
 {
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
-  char buf[512];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
-
-  dbinfo_ratelimit_init(&rl, &req);
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
 
   jamEntry();
 
-  if(req.tableId == NDBINFO_POOLS_TABLEID)
+  if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
     struct {
       const char* poolname;
@@ -368,24 +364,30 @@ void Dbdict::execDBINFO_SCANREQ(Signal *
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "DBDICT";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
     }
   }
 
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 

=== modified file 'storage/ndb/src/kernel/blocks/dbinfo/CMakeLists.txt'
--- a/storage/ndb/src/kernel/blocks/dbinfo/CMakeLists.txt	2008-10-08 08:12:18 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/CMakeLists.txt	2009-11-08 12:52:27 +0000
@@ -18,7 +18,6 @@ INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/
 INCLUDE(${CMAKE_SOURCE_DIR}/storage/ndb/config/type_kernel.cmake)
 
 ADD_LIBRARY(ndbdbinfo STATIC
-		      Dbinfo.cpp
-		      ndbinfo_helpers.cpp
+	Dbinfo.cpp
 )
 

=== modified file 'storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp'
--- a/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp	2009-10-08 11:15:24 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp	2009-11-08 12:52:27 +0000
@@ -1,4 +1,5 @@
-/* Copyright (C) 2007 MySQL AB
+/* Copyright (C) 2007-2008 MySQL AB, 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -14,7 +15,6 @@
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
 #include "Dbinfo.hpp"
-#include <ndbinfo.h>
 
 #include <signaldata/NodeFailRep.hpp>
 #include <signaldata/ReadNodesConf.hpp>
@@ -22,11 +22,9 @@
 #include <signaldata/DumpStateOrd.hpp>
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include "ndbinfo_tables.h"
-#include "ndbinfo_tableids.h"
-#include <AttributeHeader.hpp>
 
-Uint32 dbinfo_blocks[] = { DBACC, DBTUP, BACKUP, DBTC, SUMA, DBUTIL, TRIX, DBTUX, DBDICT, 0};
+Uint32 dbinfo_blocks[] = { DBACC, DBTUP, BACKUP, DBTC, SUMA, DBUTIL,
+                           TRIX, DBTUX, DBDICT, CMVMI, DBLQH, 0};
 
 Dbinfo::Dbinfo(Block_context& ctx) :
   SimulatedBlock(DBINFO, ctx),
@@ -34,6 +32,8 @@ Dbinfo::Dbinfo(Block_context& ctx) :
 {
   BLOCK_CONSTRUCTOR(Dbinfo);
 
+  STATIC_ASSERT(sizeof(DbinfoScanCursor) == sizeof(Ndbinfo::ScanCursor));
+
   c_nodePool.setSize(MAX_NDB_NODES);
 
   /* Add Received Signals */
@@ -44,8 +44,6 @@ Dbinfo::Dbinfo(Block_context& ctx) :
   addRecSignal(GSN_DBINFO_SCANREQ, &Dbinfo::execDBINFO_SCANREQ);
   addRecSignal(GSN_DBINFO_SCANCONF, &Dbinfo::execDBINFO_SCANCONF);
 
-  addRecSignal(GSN_DBINFO_TRANSID_AI, &Dbinfo::execDBINFO_TRANSID_AI);
-
   addRecSignal(GSN_READ_NODESCONF, &Dbinfo::execREAD_NODESCONF);
   addRecSignal(GSN_NODE_FAILREP, &Dbinfo::execNODE_FAILREP);
   addRecSignal(GSN_INCL_NODEREQ, &Dbinfo::execINCL_NODEREQ);
@@ -102,68 +100,6 @@ void Dbinfo::sendSTTORRY(Signal* signal)
   sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 6, JBB);
 }
 
-/*
- * Executing DBINFO_TRANSID_AI is only for debugging
- * We use this as part of the DUMP interface
- * and during debugging.
- */
-void Dbinfo::execDBINFO_TRANSID_AI(Signal* signal)
-{
-  jamEntry();
-  TransIdAI *tidai= (TransIdAI*)signal->theData;
-
-  if(!assembleFragments(signal)){
-    return;
-  }
-
-  const Uint32 tableId= tidai->transId[0];
-  const int ncols = ndbinfo_tables[tableId]->ncols;
-
-  SectionHandle handle(this, signal);
-  SegmentedSectionPtr ptr;
-  handle.getSection(ptr, 0);
-
-  char rowbuf[1024];
-  char *row= rowbuf;
-  copy((Uint32*)rowbuf, ptr);
-
-  //Uint32 rowsz= ptr.sz;
-  int len;
-
-  for(int i=0; i<ncols; i++)
-  {
-    jam();
-    AttributeHeader ah(*(Uint32*)row);
-    row+=ah.getHeaderSize()*sizeof(Uint32);
-
-    ndbout << "AI " << ah.getAttributeId() << " ";
-    len= ah.getByteSize();
-    ndbout << "||" << len << "||" << " ";
-    ndbout << "Table " << tableId << " ::: ";
-    switch(ndbinfo_tables[tableId]->col[i].coltype)
-    {
-    case NDBINFO_TYPE_NUMBER:
-      jam();
-      ndbout << *(Uint32*)row;
-      row+= len;
-      break;
-    case NDBINFO_TYPE_STRING:
-      jam();
-      char b[512];
-      memcpy(b,row,len);
-      b[len]=0;
-      ndbout << b;
-      row+= len;
-      break;
-    default:
-      ndbassert(false);
-      break;
-    };
-    ndbout << endl;
-  }
-  releaseSections(handle);
-}
-
 void Dbinfo::execDUMP_STATE_ORD(Signal* signal)
 {
   jamEntry();
@@ -173,12 +109,10 @@ void Dbinfo::execDUMP_STATE_ORD(Signal* 
   case DumpStateOrd::DbinfoListTables:
     jam();
     ndbout_c("--- BEGIN NDB$INFO.TABLES ---");
-    char create_sql[512];
-    for(Uint32 i=0;i<number_ndbinfo_tables;i++)
+    for(int i = 0; i < Ndbinfo::getNumTables(); i++)
     {
-      ndbinfo_create_sql(ndbinfo_tables[i],
-                         create_sql, sizeof(create_sql));
-      ndbout_c("%d,%s,%s",i,ndbinfo_tables[i]->name,create_sql);
+      const Ndbinfo::Table& tab = Ndbinfo::getTable(i);
+      ndbout_c("%d,%s", i, tab.m.name);
     }
     ndbout_c("--- END NDB$INFO.TABLES ---");
     break;
@@ -186,334 +120,334 @@ void Dbinfo::execDUMP_STATE_ORD(Signal* 
   case DumpStateOrd::DbinfoListColumns:
     jam();
     ndbout_c("--- BEGIN NDB$INFO.COLUMNS ---");
-    for(Uint32 i=0;i<number_ndbinfo_tables;i++)
+    for(int i = 0; i < Ndbinfo::getNumTables(); i++)
     {
-      struct ndbinfo_table *t= ndbinfo_tables[i];
+      const Ndbinfo::Table& tab = Ndbinfo::getTable(i);
 
-      for(int j=0;j<t->ncols;j++)
-        ndbout_c("%d,%d,%s,%d",i,j,t->col[j].name,t->col[j].coltype);
+      for(int j = 0; j < tab.m.ncols; j++)
+        ndbout_c("%d,%d,%s,%d", i, j,
+                 tab.col[j].name, tab.col[j].coltype);
     }
     ndbout_c("--- END NDB$INFO.COLUMNS ---");
     break;
 
-  case DumpStateOrd::DbinfoScanTable:
-    jam();
-    const Uint32 tableId= signal->theData[1];
-
-    DbinfoScanReq *req = (DbinfoScanReq*)signal->theData;
-    req->tableId= tableId;
-    req->senderRef= reference();
-    req->apiTxnId= tableId;
-    req->requestInfo= DbinfoScanReq::AllColumns | DbinfoScanReq::StartScan;
-    req->colBitmapLo= ~0;
-    req->colBitmapHi= ~0;
-    req->maxRows= 2;
-    req->maxBytes= 0;
-    req->rows_total= 0;
-    req->word_total= 0;
-
-    ndbout_c("BEGIN DBINFO DUMP SCAN on %u",tableId);
-
-    sendSignal(reference(), GSN_DBINFO_SCANREQ,
-               signal, DbinfoScanReq::SignalLength, JBB);
-    break;
   };
 }
 
-void Dbinfo::execDBINFO_SCANREQ(Signal *signal)
+Uint32 Dbinfo::find_next_node(Uint32 node) const
 {
-  jamEntry();
-  DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
+  node++;
+  while(!c_aliveNodes.get(node) &&
+        node < MAX_NDB_NODES)
+     node++;
 
-  const Uint32 tableId= req.tableId;
-  const Uint32 senderRef= req.senderRef;
-  const Uint32 apiTxnId= req.apiTxnId;
-  //const Uint32 colBitmapLo= req.colBitmapLo;
-  //const Uint32 colBitmapHi= req.colBitmapHi;
-  //const Uint32 requestInfo= req.requestInfo;
+  if (node == MAX_NDB_NODES)
+    return 0;
+  return node;
+}
 
-  Uint32 i;
-  int j;
-  int startid= 0;
+Uint32 Dbinfo::find_next_block(Uint32 block) const
+{
+  int i = 0;
+  // Find current blocks position
+  while (dbinfo_blocks[i] != block &&
+         dbinfo_blocks[i] != 0)
+    i++;
 
-  int startTableId, startColumnId;
+  // Make sure current block was found
+  ndbrequire(dbinfo_blocks[i]);
 
-  int continue_sending;
+  // Return the next block(which might be 0)
+  return dbinfo_blocks[++i];
+}
 
-  char buf[512];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
+bool Dbinfo::find_next(Ndbinfo::ScanCursor* cursor) const
+{
+  Uint32 node = refToNode(cursor->currRef);
+  Uint32 block = refToBlock(cursor->currRef);
+  const Uint32 instance = refToInstance(cursor->currRef);
+  ndbrequire(instance == 0);
 
-  switch(req.tableId)
+  if (block)
   {
-  case NDBINFO_TABLES_TABLEID:
     jam();
-
-    char create_sql[512];
-
-    dbinfo_ratelimit_init(&rl, &req);
-
-    if(!(req.requestInfo & DbinfoScanReq::StartScan))
+    if (block == DBINFO)
     {
       jam();
-      startid= req.cursor.cur_item;
-    }
 
-    for(i=startid;dbinfo_ratelimit_continue(&rl) && i<number_ndbinfo_tables;i++)
-    {
-      jam();
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      ndbinfo_create_sql(ndbinfo_tables[i],
-                         create_sql, sizeof(create_sql));
-
-      dbinfo_write_row_column(&r, (char*)&i, sizeof(i));
-      dbinfo_write_row_column(&r, ndbinfo_tables[i]->name,
-                              strlen(ndbinfo_tables[i]->name));
-      dbinfo_write_row_column(&r, create_sql, strlen(create_sql));
+      // Starting scan on this node
+      ndbrequire(node == getOwnNodeId());
 
-      dbinfo_send_row(signal,r,rl,apiTxnId,senderRef);
+      // Start on first block
+      cursor->currRef = numberToRef(dbinfo_blocks[0], node);
+      return true;
     }
 
-    if(!dbinfo_ratelimit_continue(&rl) && i < number_ndbinfo_tables)
+    // Find next block
+    block = find_next_block(block);
+    if (block)
     {
       jam();
-      dbinfo_ratelimit_sendconf(signal,req,rl,i);
-    }
-    else
-    {
-      jam();
-      DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-      conf->tableId= req.tableId;
-      conf->senderRef= req.senderRef;
-      conf->apiTxnId= req.apiTxnId;
-      conf->requestInfo= 0;
-      sendSignal(req.senderRef, GSN_DBINFO_SCANCONF, signal,
-                 DbinfoScanConf::SignalLength, JBB);
+      cursor->currRef = numberToRef(block, node);
+      return true;
     }
+  }
 
-    break;
+  node = find_next_node(node);
+  if (node)
+  {
+    jam();
+    block = DBINFO;
+    cursor->currRef = numberToRef(block, node);
+    return true;
+  }
 
-  case NDBINFO_COLUMNS_TABLEID:
+  // No more nodes -> done
+  cursor->currRef = 0;
+  return false;
+}
+
+void Dbinfo::execDBINFO_SCANREQ(Signal *signal)
+{
+  jamEntry();
+  DbinfoScanReq* req_ptr = (DbinfoScanReq*)signal->getDataPtrSend();
+  const Uint32 senderRef = signal->header.theSendersBlockRef;
+
+  // Copy signal on stack
+  DbinfoScanReq req = *req_ptr;
+
+  const Uint32 resultData = req.resultData;
+  const Uint32 transId0 = req.transId[0];
+  const Uint32 transId1 = req.transId[1];
+  const Uint32 resultRef = req.resultRef;
+
+  // Validate tableId
+  const Uint32 tableId = req.tableId;
+  if (tableId >= (Uint32)Ndbinfo::getNumTables())
+  {
+    jam();
+    DbinfoScanRef *ref= (DbinfoScanRef*)signal->getDataPtrSend();
+    ref->resultData = resultData;
+    ref->transId[0] = transId0;
+    ref->transId[1] = transId1;
+    ref->resultRef = resultRef;
+    ref->errorCode= DbinfoScanRef::NoTable;
+    sendSignal(senderRef, GSN_DBINFO_SCANREF, signal,
+               DbinfoScanRef::SignalLength, JBB);
+    return;
+  }
+
+  // TODO Check all scan parameters
+
+  Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Uint32 signal_length = signal->getLength();
+  if (signal_length == DbinfoScanReq::SignalLength)
+  {
+    // Initialize cursor
     jam();
+    cursor->senderRef = senderRef;
+    cursor->saveSenderRef = 0;
+    cursor->currRef = 0;
+    cursor->saveCurrRef = 0;
+    // Reset all data holders
+    memset(cursor->data, 0, sizeof(cursor->data));
+    cursor->flags = 0;
+    cursor->totalRows = 0;
+    cursor->totalBytes = 0;
+    req.cursor_sz = Ndbinfo::ScanCursor::Length;
+    signal_length += req.cursor_sz;
+  }
+  ndbrequire(signal_length ==
+             DbinfoScanReq::SignalLength + Ndbinfo::ScanCursor::Length);
+  ndbrequire(req.cursor_sz == Ndbinfo::ScanCursor::Length);
 
-    startTableId= 0;
-    startColumnId= 0;
+  switch(tableId)
+  {
+  case Ndbinfo::TABLES_TABLEID:
+  {
+    jam();
 
-    if(!(req.requestInfo & DbinfoScanReq::StartScan))
+    Ndbinfo::Ratelimit rl;
+    Uint32 tableId = cursor->data[0];
+
+    while(tableId < (Uint32)Ndbinfo::getNumTables())
     {
       jam();
-      startTableId= req.cursor.cur_item >> 8;
-      startColumnId= req.cursor.cur_item & 0xFF;
+      const Ndbinfo::Table& tab = Ndbinfo::getTable(tableId);
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(tableId);
+      row.write_string(tab.m.name);
+      row.write_string(tab.m.comment);
+      ndbinfo_send_row(signal, req, row, rl);
+
+      tableId++;
+
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, tableId);
+        return;
+      }
     }
 
-    struct ndbinfo_table *t;
+    // All tables sent
+    req.cursor_sz = 0; // Close cursor
+    ndbinfo_send_scan_conf(signal, req, rl);
+    return;
+
+    break;
+  }
 
-    dbinfo_ratelimit_init(&rl, &req);
+  case Ndbinfo::COLUMNS_TABLEID:
+  {
+    jam();
 
-    continue_sending= 1;
+    Ndbinfo::Ratelimit rl;
+    Uint32 tableId = cursor->data[0];
+    Uint32 columnId = cursor->data[1];
 
-    for(i=startTableId; continue_sending && i<number_ndbinfo_tables; i++)
+    while(tableId < (Uint32)Ndbinfo::getNumTables())
     {
       jam();
-      t= ndbinfo_tables[i];
-
-      for(j=startColumnId; continue_sending && j<t->ncols;j++)
+      const Ndbinfo::Table& tab = Ndbinfo::getTable(tableId);
+      while(columnId < (Uint32)tab.m.ncols)
       {
         jam();
-        dbinfo_write_row_init(&r, buf, sizeof(buf));
-        dbinfo_write_row_column(&r, (char*)&i, sizeof(i));
-        dbinfo_write_row_column(&r, (char*)&j, sizeof(j));
-        dbinfo_write_row_column(&r, t->col[j].name, strlen(t->col[j].name));
-        const char* coltype_name= ndbinfo_coltype_to_string(t->col[j].coltype);
-        dbinfo_write_row_column(&r, coltype_name, strlen(coltype_name));
-        dbinfo_send_row(signal,r,rl, apiTxnId,senderRef);
+        Ndbinfo::Row row(signal, req);
+        row.write_uint32(tableId);
+        row.write_uint32(columnId);
+        row.write_string(tab.col[columnId].name);
+        row.write_uint32(tab.col[columnId].coltype);
+        row.write_string(tab.col[columnId].comment);
+        ndbinfo_send_row(signal, req, row, rl);
+
+        assert(columnId < 256);
+        columnId++;
 
-        continue_sending= dbinfo_ratelimit_continue(&rl);
+        if(rl.need_break(req))
+        {
+          jam();
+          ndbinfo_send_scan_break(signal, req, rl, tableId, columnId);
+          return;
+        }
       }
-      startColumnId= 0;
+      columnId = 0;
+      tableId++;
     }
 
-    if((i < number_ndbinfo_tables || j < t->ncols))
-    {
-      jam();
-      i--;
-      dbinfo_ratelimit_sendconf(signal, req, rl, (i << 8) | j);
-    }
-    else
-    {
-      jam();
-      DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-      conf->tableId= req.tableId;
-      conf->senderRef= req.senderRef;
-      conf->apiTxnId= req.apiTxnId;
-      conf->requestInfo= 0;
-      sendSignal(req.senderRef, GSN_DBINFO_SCANCONF, signal,
-                 DbinfoScanConf::SignalLength, JBB);
-    }
+    // All tables and columns sent
+    req.cursor_sz = 0; // Close cursor
+    ndbinfo_send_scan_conf(signal, req, rl);
 
     break;
+  }
 
   default:
+  {
     jam();
 
-    if(tableId > number_ndbinfo_tables)
-    {
-      jam();
-      DbinfoScanRef *ref= (DbinfoScanRef*)signal->getDataPtrSend();
-      ref->tableId= tableId;
-      ref->apiTxnId= apiTxnId;
-      ref->errorCode= 1;
-      sendSignal(senderRef, GSN_DBINFO_SCANREF, signal,
-                 DbinfoScanRef::SignalLength, JBB);
-      break;
-    }
-
     ndbassert(tableId > 1);
 
-    if(signal->getLength() == DbinfoScanReq::SignalLength)
-    {
-      /*
-       * We've gotten a request from application, first
-       * ScanReq signal. start from beginning
-       */
+    //printSignalHeader(stdout, signal->header, 99, 98, true);
+    //printDBINFO_SCAN(stdout, signal->theData, signal->getLength(), 0);
 
+    if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) ||
+        find_next(cursor))
+    {
       jam();
+      ndbrequire(cursor->currRef);
 
-      DbinfoScanReq ireq= *(DbinfoScanReq*)signal->theData;
-      DbinfoScanReq *oreq= (DbinfoScanReq*)signal->getDataPtrSend();
-
-      memcpy(signal->getDataPtrSend(),&ireq,DbinfoScanReq::SignalLength*sizeof(Uint32));
-      oreq->cursor.cur_requestInfo= 0;
-      oreq->cursor.cur_node= 0;
-      oreq->cursor.cur_block= DBINFO;
-      oreq->cursor.cur_item= 0;
-
-      for(oreq->cursor.cur_node= 0;
-          !c_aliveNodes.get(oreq->cursor.cur_node);
-          oreq->cursor.cur_node++)
-        ;
+      // CONF or REF should be sent back here
+      cursor->senderRef = reference();
 
-      sendSignal(numberToRef(DBINFO,oreq->cursor.cur_node), GSN_DBINFO_SCANREQ,
-                 signal, DbinfoScanReq::SignalLengthWithCursor, JBB);
+      // Send SCANREQ
+      MEMCOPY_NO_WORDS(req_ptr,
+                       &req, signal_length);
+      sendSignal(cursor->currRef,
+                 GSN_DBINFO_SCANREQ,
+                 signal, signal_length, JBB);
     }
     else
     {
-      /**
-       * We have a cursor, so we need to continue scanning.
-       */
+      // Scan is done, send SCANCONF back to caller
       jam();
-      int next_dbinfo_block= 0;
-      if(req.cursor.cur_block != DBINFO)
-      {
-        while(dbinfo_blocks[next_dbinfo_block] != req.cursor.cur_block
-            && dbinfo_blocks[next_dbinfo_block] != 0)
-        {
-          jam();
-          next_dbinfo_block++;
-        }
-      }
-
-      DbinfoScanReq ireq= *(DbinfoScanReq*)signal->theData;
-      DbinfoScanReq *oreq= (DbinfoScanReq*)signal->getDataPtrSend();
-
-      memcpy(signal->getDataPtrSend(),&ireq,signal->getLength()*sizeof(Uint32));
-
-      oreq->cursor.cur_block= dbinfo_blocks[next_dbinfo_block];
-
-      sendSignal(numberToRef(oreq->cursor.cur_block, oreq->cursor.cur_node),
-                 GSN_DBINFO_SCANREQ,
-                 signal, signal->getLength(), JBB);
+      DbinfoScanConf *apiconf= (DbinfoScanConf*)signal->getDataPtrSend();
+      MEMCOPY_NO_WORDS(apiconf, &req, DbinfoScanConf::SignalLength);
+      // Set cursor_sz back to 0 to indicate end of scan
+      apiconf->cursor_sz = 0;
+      sendSignal(resultRef, GSN_DBINFO_SCANCONF, signal,
+                 DbinfoScanConf::SignalLength, JBB);
     }
     break;
-  };
+  }
+  }
 }
 
 void Dbinfo::execDBINFO_SCANCONF(Signal *signal)
 {
-  DbinfoScanConf conf= *(DbinfoScanConf*)signal->theData;
+  const DbinfoScanConf* conf_ptr= (const DbinfoScanConf*)signal->getDataPtr();
+  // Copy signal on stack
+  const DbinfoScanConf conf= *conf_ptr;
 
   jamEntry();
 
-  const Uint32 tableId= conf.tableId;
-  const Uint32 senderRef= conf.senderRef;
-  const Uint32 apiTxnId= conf.apiTxnId;
-  //const Uint32 colBitmapLo= conf.colBitmapLo;
-  //const Uint32 colBitmapHi= conf.colBitmapHi;
+  //printDBINFO_SCAN(stdout, signal->theData, signal->getLength(), 0);
 
-  DbinfoScanReq *oreq= (DbinfoScanReq*)signal->getDataPtrSend();
+  Uint32 signal_length = signal->getLength();
+  ndbrequire(signal_length ==
+             DbinfoScanReq::SignalLength+Ndbinfo::ScanCursor::Length);
+  ndbrequire(conf.cursor_sz == Ndbinfo::ScanCursor::Length);
 
-  memcpy(signal->getDataPtrSend(),&conf,signal->getLength()*sizeof(Uint32));
+  // Validate tableId
+  const Uint32 tableId= conf.tableId;
+  ndbassert(tableId < Ndbinfo::getNumTables());
 
-  if(conf.requestInfo & DbinfoScanConf::MoreData)
-  {
-    /*
-     * Continue a DUMP scan of DBINFO table (hit maxrows/maxbytes)
-     */
-    jam();
-    oreq->requestInfo &= ~(DbinfoScanReq::StartScan);
-    sendSignal(numberToRef(oreq->cursor.cur_block, oreq->cursor.cur_node),
-               GSN_DBINFO_SCANREQ,
-               signal, signal->getLength(), JBB);
-    return;
-  }
+  const Uint32 resultRef = conf.resultRef;
 
-  int next_dbinfo_block= 0;
+  // Copy cursor on stack
+  ndbrequire(conf.cursor_sz);
+  Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&conf);
 
-  if(signal->getLength() == 3) // we have the ACK from a DUMP initiated scan
+  if (Ndbinfo::ScanCursor::getHasMoreData(cursor->flags) || conf.returnedRows)
   {
+    // Rate limit break, pass through to API
     jam();
-    ndbout_c("FINISHED DBINFO DUMP Scan on %u",signal->theData[0]);
+    ndbrequire(cursor->currRef);
+    DbinfoScanConf *apiconf = (DbinfoScanConf*) signal->getDataPtrSend();
+    MEMCOPY_NO_WORDS(apiconf, &conf, signal_length);
+    sendSignal(resultRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
     return;
   }
 
-  if(conf.cursor.cur_block != DBINFO)
+  if (find_next(cursor))
   {
     jam();
-    while(dbinfo_blocks[next_dbinfo_block] != conf.cursor.cur_block
-          && dbinfo_blocks[next_dbinfo_block] != 0)
-    {
-      jam();
-      next_dbinfo_block++;
-    }
-  }
+    ndbrequire(cursor->currRef);
 
-  next_dbinfo_block++;
+    // CONF or REF should be sent back here
+    cursor->senderRef = reference();
 
-  if(dbinfo_blocks[next_dbinfo_block]!=0)
-  {
-    jam();
-    oreq->cursor.cur_block= dbinfo_blocks[next_dbinfo_block];
-  }
-  else
-  {
-    for(oreq->cursor.cur_node++;
-        !c_aliveNodes.get(oreq->cursor.cur_node)
-          && oreq->cursor.cur_node < MAX_NDB_NODES;
-        oreq->cursor.cur_node++)
-      ;
-
-    if(oreq->cursor.cur_node < MAX_NDB_NODES)
-    {
-      jam();
-      oreq->cursor.cur_requestInfo= 0;
-      oreq->cursor.cur_block= DBINFO;
-      oreq->cursor.cur_item= 0;
-    }
-    else
-    {
-      jam();
-      DbinfoScanConf *apiconf= (DbinfoScanConf*)signal->getDataPtrSend();
-      apiconf->tableId= tableId;
-      apiconf->senderRef= senderRef;
-      apiconf->apiTxnId= apiTxnId;
-      sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, 3, JBB);
-      return;
-    }
+    // Send SCANREQ
+    MEMCOPY_NO_WORDS(signal->getDataPtrSend(),
+                     &conf, signal_length);
+    sendSignal(cursor->currRef,
+               GSN_DBINFO_SCANREQ,
+               signal, signal_length, JBB);
+    return;
   }
 
-  sendSignal(numberToRef(oreq->cursor.cur_block, oreq->cursor.cur_node),
-             GSN_DBINFO_SCANREQ,
-             signal, signal->getLength(), JBB);
+  // Scan is done, send SCANCONF back to caller
+  jam();
+  DbinfoScanConf *apiconf = (DbinfoScanConf*) signal->getDataPtrSend();
+  MEMCOPY_NO_WORDS(apiconf, &conf, DbinfoScanConf::SignalLength);
+
+  // Set cursor_sz back to 0 to indicate end of scan
+  apiconf->cursor_sz = 0;
+  sendSignal(resultRef, GSN_DBINFO_SCANCONF, signal,
+             DbinfoScanConf::SignalLength, JBB);
+  return;
 }
 
 

=== modified file 'storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.hpp'
--- a/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.hpp	2008-10-05 07:14:37 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.hpp	2009-11-08 12:52:27 +0000
@@ -1,4 +1,5 @@
-/* Copyright (C) 2007 MySQL AB
+/* Copyright (C) 2007-2008 MySQL AB, 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -49,8 +50,10 @@ protected:
   void execREAD_CONFIG_REQ(Signal*);
   void execDUMP_STATE_ORD(Signal* signal);
 
+  Uint32 find_next_node(Uint32 node) const;
+  Uint32 find_next_block(Uint32 block) const;
+  bool find_next(Ndbinfo::ScanCursor* cursor) const;
   void execDBINFO_SCANREQ(Signal *signal);
-  void execDBINFO_TRANSID_AI(Signal* signal);
   void execDBINFO_SCANCONF(Signal *signal);
 
   /* for maintaining c_aliveNodes */

=== removed file 'storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp'
--- a/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp	2008-10-08 20:29:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_helpers.cpp	1970-01-01 00:00:00 +0000
@@ -1,56 +0,0 @@
-#include "Dbinfo.hpp"
-#include <ndbinfo.h>
-#include <AttributeHeader.hpp>
-#include <signaldata/DbinfoScan.hpp>
-
-void dbinfo_write_row_init(struct dbinfo_row *r, char* buf, int len)
-{
-  r->buf= buf;
-  r->endrow= 0;
-  r->blen= len;
-  r->c= 0;
-}
-
-int dbinfo_write_row_column(struct dbinfo_row *r, const char* col, int clen)
-{
-  AttributeHeader ah;
-  if(!(int(r->blen - r->endrow) >= int(clen+ah.getHeaderSize()*sizeof(Uint32))))
-  {
-    return -1; // Not enough room.
-  }
-  r->buf[r->endrow] = 0;
-
-  ah.setAttributeId(r->c++);
-  ah.setByteSize(clen);
-  ah.insertHeader((Uint32*)&r->buf[r->endrow]);
-
-  r->endrow+= ah.getHeaderSize()*sizeof(Uint32);
-
-  memcpy(&r->buf[r->endrow], col, clen);
-
-  r->endrow+=clen;
-  return 0;
-}
-
-int dbinfo_write_row_column_uint32(struct dbinfo_row *r, Uint32 value)
-{
-  return dbinfo_write_row_column(r, (char*)&value, sizeof(value));
-}
-
-void dbinfo_ratelimit_init(struct dbinfo_ratelimit *rl, struct DbinfoScanReq *r)
-{
-  rl->maxRows= r->maxRows;
-  rl->maxBytes= r->maxBytes;
-  rl->rows_total= r->rows_total;
-  rl->bytes_total= r->word_total;
-  rl->rows= 0;
-  rl->bytes= 0;
-}
-
-int dbinfo_ratelimit_continue(struct dbinfo_ratelimit *rl)
-{
-  if(((rl->maxRows==0) || (rl->maxRows > rl->rows))
-     && ((rl->maxBytes==0) || (rl->maxBytes > rl->bytes)) )
-    return 1;
-  return 0;
-}

=== removed file 'storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tableids.h'
--- a/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tableids.h	2008-10-05 07:14:21 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tableids.h	1970-01-01 00:00:00 +0000
@@ -1,21 +0,0 @@
-
-#ifndef NDBINFO_TABLEIDS_H
-#define NDBINFO_TABLEIDS_H
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-#define NDBINFO_TABLES_TABLEID    0
-#define NDBINFO_COLUMNS_TABLEID   1
-#define NDBINFO_MEMUSAGE_TABLEID  2
-#define NDBINFO_LOGDESTINATION_TABLEID  3
-#define NDBINFO_BACKUP_RECORDS_TABLEID 4
-#define NDBINFO_BACKUP_PARAMETERS_TABLEID 5
-#define NDBINFO_POOLS_TABLEID                     6
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif

=== removed file 'storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h'
--- a/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h	2008-11-10 11:44:02 +0000
+++ b/storage/ndb/src/kernel/blocks/dbinfo/ndbinfo_tables.h	1970-01-01 00:00:00 +0000
@@ -1,132 +0,0 @@
-/* Copyright (C) 2007 MySQL AB
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; version 2 of the License.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
-
-#ifndef NDBINFO_TABLES_H
-#define NDBINFO_TABLES_H
-
-#include <ndbinfo.h>
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-/** Reserved for DBINFO only */
-DECLARE_NDBINFO_TABLE(ndbinfo_TABLES,3)
-     = { "TABLES", 3, 0,
-        {
-          {"TABLE_ID",  NDBINFO_TYPE_NUMBER},
-          {"TABLE_NAME",NDBINFO_TYPE_STRING},
-          {"CREATE_SQL",NDBINFO_TYPE_STRING},
-        }};
-
-/** Reserved for DBINFO only */
-DECLARE_NDBINFO_TABLE(ndbinfo_COLUMNS,4)
-     = { "COLUMNS", 4, 0,
-        {
-          {"TABLE_ID",    NDBINFO_TYPE_NUMBER},
-          {"COLUMN_ID",   NDBINFO_TYPE_NUMBER},
-          {"COLUMN_NAME", NDBINFO_TYPE_STRING},
-          {"COLUMN_TYPE", NDBINFO_TYPE_STRING},
-        }};
-
-DECLARE_NDBINFO_TABLE(ndbinfo_MEMUSAGE,6)
-     = { "MEMUSAGE", 6, 0,
-        {
-          {"RESOURCE_NAME",    NDBINFO_TYPE_STRING},
-          {"NODE_ID",          NDBINFO_TYPE_NUMBER},
-          {"PAGE_SIZE_KB",     NDBINFO_TYPE_NUMBER},
-          {"PAGES_USED",       NDBINFO_TYPE_NUMBER},
-          {"PAGES_TOTAL",      NDBINFO_TYPE_NUMBER},
-          {"BLOCK",            NDBINFO_TYPE_STRING},
-        }};
-
-DECLARE_NDBINFO_TABLE(ndbinfo_LOGDESTINATION,5) =
-{ "LOGDESTINATION", 5, 0,
- {
-   {"NODE_ID",NDBINFO_TYPE_NUMBER},
-   {"TYPE",NDBINFO_TYPE_STRING},
-   {"PARAMS",NDBINFO_TYPE_STRING},
-   {"CURRENT_SIZE",NDBINFO_TYPE_NUMBER},
-   {"MAX_SIZE",NDBINFO_TYPE_NUMBER},
- }
-};
-
-DECLARE_NDBINFO_TABLE(ndbinfo_BACKUP_RECORDS,11)
-= { "BACKUP_RECORDS", 11, 0,
-   {
-     {"NODE_ID",          NDBINFO_TYPE_NUMBER},
-     {"BACKUP_RECORD",    NDBINFO_TYPE_NUMBER},
-     {"BACKUP_ID",        NDBINFO_TYPE_NUMBER},
-     {"MASTER_REF",       NDBINFO_TYPE_NUMBER},
-     {"CLIENT_REF",       NDBINFO_TYPE_NUMBER},
-     {"STATE",            NDBINFO_TYPE_NUMBER},
-     {"BYTES",            NDBINFO_TYPE_NUMBER},
-     {"RECORDS",          NDBINFO_TYPE_NUMBER},
-     {"LOG_BYTES",        NDBINFO_TYPE_NUMBER},
-     {"LOG_RECORDS",      NDBINFO_TYPE_NUMBER},
-     {"ERROR_CODE",       NDBINFO_TYPE_NUMBER},
-   }
-};
-
-DECLARE_NDBINFO_TABLE(ndbinfo_BACKUP_PARAMETERS,14)
-= { "BACKUP_PARAMETERS", 14, 0,
-   {
-     {"NODE_ID",                  NDBINFO_TYPE_NUMBER},
-     {"CURRENT_DISK_WRITE_SPEED", NDBINFO_TYPE_NUMBER},
-     {"BYTES_WRITTEN_THIS_PERIOD",NDBINFO_TYPE_NUMBER},
-     {"OVERFLOW_DISK_WRITE",      NDBINFO_TYPE_NUMBER},
-     {"RESET_DELAY_USED",         NDBINFO_TYPE_NUMBER},
-     {"RESET_DISK_SPEED_TIME",    NDBINFO_TYPE_NUMBER},
-     {"BACKUP_POOL_SIZE",         NDBINFO_TYPE_NUMBER},
-     {"BACKUP_FILE_POOL_SIZE",    NDBINFO_TYPE_NUMBER},
-     {"TABLE_POOL_SIZE",          NDBINFO_TYPE_NUMBER},
-     {"TRIGGER_POOL_SIZE",        NDBINFO_TYPE_NUMBER},
-     {"FRAGMENT_POOL_SIZE",       NDBINFO_TYPE_NUMBER},
-     {"PAGE_POOL_SIZE",           NDBINFO_TYPE_NUMBER},
-     {"COMPRESSED_BACKUP",        NDBINFO_TYPE_NUMBER},
-     {"COMPRESSED_LCP",           NDBINFO_TYPE_NUMBER},
-   }
-};
-
-DECLARE_NDBINFO_TABLE(ndbinfo_POOLS,5)
-= { "POOLS", 5, 0,
-   {
-     {"NODE_ID",                  NDBINFO_TYPE_NUMBER},
-     {"BLOCK",                    NDBINFO_TYPE_STRING},
-     {"POOL_NAME",                NDBINFO_TYPE_STRING},
-     {"FREE",                     NDBINFO_TYPE_NUMBER},
-     {"SIZE",                     NDBINFO_TYPE_NUMBER},
-   }
-};
-
-static Uint32 number_ndbinfo_tables= 7;
-
-#define DBINFOTBL(x) (struct ndbinfo_table*)&x
-
-struct ndbinfo_table *ndbinfo_tables[] = {
-  DBINFOTBL(ndbinfo_TABLES),
-  DBINFOTBL(ndbinfo_COLUMNS),
-  DBINFOTBL(ndbinfo_MEMUSAGE),
-  DBINFOTBL(ndbinfo_LOGDESTINATION),
-  DBINFOTBL(ndbinfo_BACKUP_RECORDS),
-  DBINFOTBL(ndbinfo_BACKUP_PARAMETERS),
-  DBINFOTBL(ndbinfo_POOLS),
-};
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2009-10-23 11:41:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/Dblqh.hpp	2009-11-08 12:52:27 +0000
@@ -2106,6 +2106,7 @@ private:
   void execREAD_PSEUDO_REQ(Signal* signal);
   void execSIGNAL_DROPPED_REP(Signal* signal);
 
+  void execDBINFO_SCANREQ(Signal* signal); 
   void execDUMP_STATE_ORD(Signal* signal);
   void execACC_ABORTCONF(Signal* signal);
   void execNODE_FAILREP(Signal* signal);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2009-10-20 20:15:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp	2009-11-08 12:52:27 +0000
@@ -374,6 +374,7 @@ Dblqh::Dblqh(Block_context& ctx, Uint32 
   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &Dblqh::execSUB_GCP_COMPLETE_REP);
   addRecSignal(GSN_FSWRITEREQ,
                &Dblqh::execFSWRITEREQ);
+  addRecSignal(GSN_DBINFO_SCANREQ, &Dblqh::execDBINFO_SCANREQ);
 
   initData();
 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-11-02 17:15:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-11-08 12:53:35 +0000
@@ -73,6 +73,7 @@
 #include <SectionReader.hpp>
 #include <signaldata/SignalDroppedRep.hpp>
 #include <signaldata/FsReadWriteReq.hpp>
+#include <signaldata/DbinfoScan.hpp>
 #include <NdbEnv.h>
 
 #include "../suma/Suma.hpp"
@@ -21343,6 +21344,61 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
 
 }//Dblqh::execDUMP_STATE_ORD()
 
+
+void Dblqh::execDBINFO_SCANREQ(Signal *signal)
+{
+  DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
+
+  jamEntry();
+
+  if(req.tableId == Ndbinfo::LOG_SPACE_TABLEID)
+  {
+    Uint32 logpart = cursor->data[0];
+    while(logpart < clogPartFileSize)
+    {
+      jam();
+
+      logPartPtr.i = logpart;
+      ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+
+      LogFileRecordPtr logFile;
+      logFile.i = logPartPtr.p->currentLogfile;
+      ptrCheckGuard(logFile, clogFileFileSize, logFileRecord);
+
+      LogPosition head = { logFile.p->fileNo, logFile.p->currentMbyte };
+      LogPosition tail = { logPartPtr.p->logTailFileNo,
+                           logPartPtr.p->logTailMbyte};
+      Uint64 mb = free_log(head, tail, logPartPtr.p->noLogFiles, clogFileSize);
+      Uint64 total = logPartPtr.p->noLogFiles * Uint64(clogFileSize);
+      Uint64 high = 0; // TODO
+
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(0);              // log id, always 0 in LQH
+      row.write_uint32(0);              // log type, 0 = REDO
+      row.write_uint32(logpart);        // log part
+      row.write_uint32(getOwnNodeId());
+      row.write_uint64(total);          // total allocated
+      row.write_uint64((total-mb));     // currently in use
+      row.write_uint64(high);           // in use high water mark
+      ndbinfo_send_row(signal, req, row, rl);
+
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, logpart);
+        return;
+      }
+      logpart++;
+    }
+  }
+
+  ndbinfo_send_scan_conf(signal, req, rl);
+}
+
+
 /* **************************************************************** */
 /* ---------------------------------------------------------------- */
 /* ---------------------- TRIGGER HANDLING ------------------------ */

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2009-10-26 20:51:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2009-11-08 12:52:27 +0000
@@ -88,8 +88,6 @@
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
 
 
 // Use DEBUG to print messages that should be
@@ -12374,14 +12372,13 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
 void Dbtc::execDBINFO_SCANREQ(Signal *signal)
 {
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
-  char buf[512];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
-  dbinfo_ratelimit_init(&rl, &req);
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
 
   jamEntry();
 
-  if(req.tableId == NDBINFO_POOLS_TABLEID)
+  if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
     struct {
       const char* poolname;
@@ -12410,62 +12407,30 @@ void Dbtc::execDBINFO_SCANREQ(Signal *si
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
-    {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "DBTC";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
+    {
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
     }
   }
 
-  /*  if(req->tableId ==
-  dbinfo_ratelimit_init(&rl, &req);
-
-    if(!(req.requestInfo & DbinfoScanReq::StartScan))
-      startid= req.cur_item;
-
-    for(i=startid;dbinfo_ratelimit_continue(&rl) && i<number_ndbinfo_tables;i++)
-    {
-      jam();
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      ndbinfo_create_sql(ndbinfo_tables[i],
-                         create_sql, sizeof(create_sql));
-
-      dbinfo_write_row_column(&r, (char*)&i, sizeof(i));
-      dbinfo_write_row_column(&r, ndbinfo_tables[i]->name,
-                              strlen(ndbinfo_tables[i]->name));
-      dbinfo_write_row_column(&r, create_sql, strlen(create_sql));
-
-      dbinfo_send_row(signal,r,rl,apiTxnId,senderRef);
-    }
-
-    if(!dbinfo_ratelimit_continue(&rl) && i < number_ndbinfo_tables)
-    {
-      jam();
-      dbinfo_ratelimit_sendconf(signal,req,rl,i);
-    }
-    else
-    {
-      DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-      conf->tableId= req.tableId;
-      conf->senderRef= req.senderRef;
-      conf->apiTxnId= req.apiTxnId;
-      conf->requestInfo= 0;
-      sendSignal(req.senderRef, GSN_DBINFO_SCANCONF, signal,
-                 DbinfoScanConf::SignalLength, JBB);
-    }
-  */
-
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 bool

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2009-10-27 12:08:44 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2009-11-08 12:52:27 +0000
@@ -3050,6 +3050,8 @@ private:
   Uint32 czero;
   Uint32 cCopyProcedure;
   Uint32 cCopyLastSeg;
+  Uint32 cTotPages;
+  Uint32 cTotNoFragPages;
 
  // A little bit bigger to cover overwrites in copy algorithms (16384 real size).
 #define ZATTR_BUFFER_SIZE 16384

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupDebug.cpp	2009-11-08 12:52:27 +0000
@@ -30,8 +30,6 @@
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
 
 
 /* **************************************************************** */
@@ -76,16 +74,28 @@ void Dbtup::execDBINFO_SCANREQ(Signal* s
 {
   jamEntry();
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
-  const Uint32 reqlength= signal->getLength();
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
 
-  char buf[1024];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
+  Ndbinfo::Ratelimit rl;
 
-  dbinfo_ratelimit_init(&rl, &req);
+  if(req.tableId == Ndbinfo::MEMUSAGE_TABLEID)
+  {
+    jam();
+    Ndbinfo::Row row(signal, req);
+    row.write_uint32(getOwnNodeId());
+    row.write_uint32(blockToMain(number())); // block number
+    row.write_uint32(instance());            // block instance
+    row.write_string("DataMemory");
+    row.write_uint32(sizeof(Page));
+    row.write_uint32(cTotNoFragPages);       // alloced pages
+    row.write_uint32(cTotPages);             // number of pages
+    ndbinfo_send_row(signal, req, row, rl);
 
-  if(req.tableId == NDBINFO_POOLS_TABLEID)
+  }
+  if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
+    jam();
     struct {
       const char* poolname;
       Uint32 free;
@@ -116,25 +126,55 @@ void Dbtup::execDBINFO_SCANREQ(Signal* s
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
+    {
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
+    }
+  }
+  if(req.tableId == Ndbinfo::TEST_TABLEID)
+  {
+    Uint32 counter = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(counter < 1000)
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "DBTUP";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_uint32(counter);
+      Uint64 counter2 = counter;
+      counter2 = counter2 << 32;
+      row.write_uint64(counter2);
+      ndbinfo_send_row(signal, req, row, rl);
+      counter++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, counter);
+        return;
+      }
     }
   }
 
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, reqlength * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
-  ndbout_c("DBTUP done doing DBINFO");
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 #ifdef VM_TRACE

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2009-10-08 11:15:24 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2009-11-08 12:52:27 +0000
@@ -56,6 +56,7 @@ void Dbtup::initData() 
   // Records with constant sizes
   init_list_sizes();
   cpackedListIndex = 0;
+  cTotNoFragPages = 0;
 }//Dbtup::initData()
 
 Dbtup::Dbtup(Block_context& ctx, Uint32 instanceNumber)
@@ -484,6 +485,11 @@ void Dbtup::initRecords() 
     m_ctx.m_config.getOwnConfigIterator();
   ndbrequire(p != 0);
 
+  // Check available data memory pages
+  Uint64 dataMem;
+  ndb_mgm_get_int64_parameter(p, CFG_DB_DATA_MEM, &dataMem);
+  cTotPages = (Uint32)(dataMem / (ZWORDS_ON_PAGE * 4));
+
   // Records with dynamic sizes
   void* ptr = m_ctx.m_mm.get_memroot();
   c_page_pool.set((Page*)ptr, (Uint32)~0);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp	2009-05-27 12:11:46 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupPageMap.cpp	2009-11-08 12:52:27 +0000
@@ -262,7 +262,7 @@ Dbtup::allocFragPage(Fragrecord* regFrag
   }
   
   regFragPtr->noOfPages = cnt + 1;
-  
+  cTotNoFragPages++; 
   c_page_pool.getPtr(pagePtr);
   init_page(regFragPtr, pagePtr, pageId);
   
@@ -370,6 +370,7 @@ Dbtup::allocFragPage(Tablerec* tabPtrP, 
   }
   
   fragPtrP->noOfPages = cnt + 1;
+  cTotNoFragPages++; 
   if (page_no + 1 > max)
   {
     jam();
@@ -427,6 +428,7 @@ Dbtup::releaseFragPage(Fragrecord* fragP
   }
 
   fragPtrP->noOfPages = cnt - 1;
+  cTotNoFragPages++;
   if (DBUG_PAGE_MAP)
     ndbout_c("release(%u %u)@%s", logicalPageId, pagePtr.i, where);
   do_check_page_map(fragPtrP);

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxDebug.cpp	2009-11-08 12:52:27 +0000
@@ -21,20 +21,18 @@
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
+
 
 void Dbtux::execDBINFO_SCANREQ(Signal *signal)
 {
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
-  char buf[512];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
-  dbinfo_ratelimit_init(&rl, &req);
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
 
   jamEntry();
 
-  if(req.tableId == NDBINFO_POOLS_TABLEID)
+  if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
     struct {
       const char* poolname;
@@ -66,24 +64,30 @@ void Dbtux::execDBINFO_SCANREQ(Signal *s
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "DBTC";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
     }
   }
 
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 /*

=== modified file 'storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp'
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp	2009-10-08 11:15:24 +0000
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp	2009-11-08 12:52:27 +0000
@@ -45,8 +45,6 @@
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
 
 /**************************************************************************
  * ------------------------------------------------------------------------
@@ -761,15 +759,13 @@ DbUtil::execDUMP_STATE_ORD(Signal* signa
 void DbUtil::execDBINFO_SCANREQ(Signal *signal)
 {
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
-  char buf[512];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
-
-  dbinfo_ratelimit_init(&rl, &req);
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
 
   jamEntry();
 
-  if(req.tableId == NDBINFO_POOLS_TABLEID)
+  if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
     struct {
       const char* poolname;
@@ -801,24 +797,30 @@ void DbUtil::execDBINFO_SCANREQ(Signal *
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "DBUTIL";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
     }
   }
 
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 void
 DbUtil::mutex_created(Signal* signal, Uint32 ptrI, Uint32 retVal){

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2009-10-08 11:15:24 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2009-11-08 12:52:27 +0000
@@ -63,8 +63,6 @@
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
 
 #include <EventLogger.hpp>
 extern EventLogger * g_eventLogger;
@@ -1544,15 +1542,13 @@ Suma::execDUMP_STATE_ORD(Signal* signal)
 void Suma::execDBINFO_SCANREQ(Signal *signal)
 {
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
-  char buf[512];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
-
-  dbinfo_ratelimit_init(&rl, &req);
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
 
   jamEntry();
 
-  if(req.tableId == NDBINFO_POOLS_TABLEID)
+  if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
     struct {
       const char* poolname;
@@ -1587,24 +1583,30 @@ void Suma::execDBINFO_SCANREQ(Signal *si
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "SUMA";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
     }
   }
 
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 /*************************************************************

=== modified file 'storage/ndb/src/kernel/blocks/trix/Trix.cpp'
--- a/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/blocks/trix/Trix.cpp	2009-11-08 12:52:27 +0000
@@ -39,9 +39,6 @@
 
 #include <signaldata/DbinfoScan.hpp>
 #include <signaldata/TransIdAI.hpp>
-#include <ndbinfo.h>
-#include <dbinfo/ndbinfo_tableids.h>
-
 #include <signaldata/WaitGCP.hpp>
 
 #define CONSTRAINT_VIOLATION 893
@@ -495,15 +492,13 @@ Trix::execDUMP_STATE_ORD(Signal* signal)
 void Trix::execDBINFO_SCANREQ(Signal *signal)
 {
   DbinfoScanReq req= *(DbinfoScanReq*)signal->theData;
-  char buf[512];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
-
-  dbinfo_ratelimit_init(&rl, &req);
+  const Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtr(&req);
+  Ndbinfo::Ratelimit rl;
 
   jamEntry();
 
-  if(req.tableId == NDBINFO_POOLS_TABLEID)
+  if(req.tableId == Ndbinfo::POOLS_TABLEID)
   {
     struct {
       const char* poolname;
@@ -520,24 +515,30 @@ void Trix::execDBINFO_SCANREQ(Signal *si
           { NULL, 0, 0}
         };
 
-    for(int i=0; pools[i].poolname; i++)
+    Uint32 pool = cursor->data[0];
+    BlockNumber bn = blockToMain(number());
+    while(pools[pool].poolname)
     {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-      dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-      const char *blockname= "TRIX";
-      dbinfo_write_row_column(&r, blockname, strlen(blockname));
-      dbinfo_write_row_column(&r, pools[i].poolname, strlen(pools[i].poolname));
-      dbinfo_write_row_column_uint32(&r, pools[i].free);
-      dbinfo_write_row_column_uint32(&r, pools[i].size);
-      dbinfo_send_row(signal, r, rl, req.apiTxnId, req.senderRef);
+      jam();
+      Ndbinfo::Row row(signal, req);
+      row.write_uint32(getOwnNodeId());
+      row.write_uint32(bn);           // block number
+      row.write_uint32(instance()); // block instance
+      row.write_string(pools[pool].poolname);
+      row.write_uint32(pools[pool].free);
+      row.write_uint32(pools[pool].size);
+      ndbinfo_send_row(signal, req, row, rl);
+      pool++;
+      if (rl.need_break(req))
+      {
+        jam();
+        ndbinfo_send_scan_break(signal, req, rl, pool);
+        return;
+      }
     }
   }
 
-  DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-  memcpy(conf,&req, DbinfoScanReq::SignalLengthWithCursor * sizeof(Uint32));
-  conf->requestInfo &= ~(DbinfoScanConf::MoreData);
-  sendSignal(DBINFO_REF, GSN_DBINFO_SCANCONF,
-             signal, DbinfoScanConf::SignalLengthWithCursor, JBB);
+  ndbinfo_send_scan_conf(signal, req, rl);
 }
 
 // Build index

=== modified file 'storage/ndb/src/kernel/vm/CMakeLists.txt'
--- a/storage/ndb/src/kernel/vm/CMakeLists.txt	2008-11-16 09:15:35 +0000
+++ b/storage/ndb/src/kernel/vm/CMakeLists.txt	2009-11-08 12:52:27 +0000
@@ -36,6 +36,7 @@ ADD_LIBRARY(ndbkernel STATIC
             WOPool.cpp
 	    GlobalData.cpp
 	    SafeMutex.cpp
+            Ndbinfo.cpp
 )
 ADD_LIBRARY(ndbsched STATIC
             TimeQueue.cpp

=== modified file 'storage/ndb/src/kernel/vm/Makefile.am'
--- a/storage/ndb/src/kernel/vm/Makefile.am	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/kernel/vm/Makefile.am	2009-11-08 12:52:27 +0000
@@ -38,7 +38,8 @@ libkernel_a_SOURCES = \
 	ndbd_malloc_impl.cpp \
 	Pool.cpp WOPool.cpp RWPool.cpp DynArr256.cpp LockQueue.cpp \
 	GlobalData.cpp \
-	SafeMutex.cpp
+	SafeMutex.cpp \
+	Ndbinfo.cpp
 
 libsched_a_SOURCES = TimeQueue.cpp		\
                      ThreadConfig.cpp \

=== added file 'storage/ndb/src/kernel/vm/Ndbinfo.cpp'
--- a/storage/ndb/src/kernel/vm/Ndbinfo.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/Ndbinfo.cpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,129 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "Ndbinfo.hpp"
+#include "SimulatedBlock.hpp"
+#include <kernel/AttributeHeader.hpp>
+#include <signaldata/TransIdAI.hpp>
+
+Ndbinfo::Row::Row(Signal* signal, DbinfoScanReq& req) :
+  col_counter(0),
+  m_req(req)
+{
+  // Use the "temporary" part of signal->theData as row buffer
+  start = signal->getDataPtrSend() + DbinfoScanReq::SignalLength;
+  const Uint32 data_sz = sizeof(signal->theData)/sizeof(signal->theData[0]);
+  end = signal->getDataPtrSend() + data_sz;
+  assert(start < end);
+
+  curr = start;
+}
+
+bool
+Ndbinfo::Row::check_buffer_space(AttributeHeader& ah) const
+{
+  const Uint32 needed =  ah.getHeaderSize() + ah.getDataSize();
+  const Uint32 avail = (Uint32)(end - curr);
+
+  if(needed > avail)
+  {
+    ndbout_c("Warning, too small row buffer for attribute: %d, "
+             "needed: %d, avail: %d", ah.getAttributeId(), needed, avail);
+    assert(false);
+    return false; // Not enough room in row buffer
+  }
+  return true;
+}
+
+void
+Ndbinfo::Row::check_attribute_type(AttributeHeader& ah, ColumnType type) const
+{
+  const Table& tab = getTable(m_req.tableId);
+  const Uint32 colid = ah.getAttributeId();
+  assert(colid < tab.m.ncols);
+  assert(tab.col[colid].coltype == type);
+}
+
+void
+Ndbinfo::Row::write_string(const char* str)
+{
+  const size_t clen = strlen(str) + 1;
+  // Create AttributeHeader
+  AttributeHeader ah(col_counter++, clen);
+  check_attribute_type(ah, Ndbinfo::String);
+  if (!check_buffer_space(ah))
+    return;
+
+  // Write AttributeHeader to buffer
+  ah.insertHeader(curr);
+  curr += ah.getHeaderSize();
+
+  // Write data to buffer
+  memcpy(curr, str, clen);
+  curr += ah.getDataSize();
+
+  assert(curr <= end);
+  return;
+}
+
+void
+Ndbinfo::Row::write_uint32(Uint32 value)
+{
+  // Create AttributeHeader
+  AttributeHeader ah(col_counter++, sizeof(Uint32));
+  check_attribute_type(ah, Ndbinfo::Number);
+  if (!check_buffer_space(ah))
+    return;
+
+  // Write AttributeHeader to buffer
+  ah.insertHeader(curr);
+  curr += ah.getHeaderSize();
+
+  // Write data to buffer
+  memcpy(curr, &value, sizeof(Uint32));
+  curr += ah.getDataSize();
+
+  assert(curr <= end);
+  return;
+}
+
+void
+Ndbinfo::Row::write_uint64(Uint64 value)
+{
+  // Create AttributeHeader
+  AttributeHeader ah(col_counter++, sizeof(Uint64));
+  check_attribute_type(ah, Ndbinfo::Number64);
+  if (!check_buffer_space(ah))
+    return;
+
+  // Write AttributeHeader to buffer
+  ah.insertHeader(curr);
+  curr += ah.getHeaderSize();
+
+  // Write data to buffer
+  memcpy(curr, &value, sizeof(Uint64));
+  curr += ah.getDataSize();
+
+  assert(curr <= end);
+  return;
+}
+
+
+#include "NdbinfoTables.cpp"
+
+

=== added file 'storage/ndb/src/kernel/vm/Ndbinfo.hpp'
--- a/storage/ndb/src/kernel/vm/Ndbinfo.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/Ndbinfo.hpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,161 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef KERNEL_NDBINFO_HPP
+#define KERNEL_NDBINFO_HPP
+
+#include <signaldata/DbinfoScan.hpp>
+
+class Ndbinfo {
+public:
+
+  enum ColumnType {
+    String = 1,
+    Number = 2,
+    Number64 = 3
+  };
+
+  struct Column {
+    const char* name;
+    ColumnType coltype;
+    const char* comment;
+  };
+
+  enum TableId {
+    TABLES_TABLEID =             0,
+    COLUMNS_TABLEID =            1,
+    MEMUSAGE_TABLEID =           2,
+    LOGDESTINATION_TABLEID =     3,
+    BACKUP_RECORDS_TABLEID =     4,
+    BACKUP_PARAMETERS_TABLEID =  5,
+    POOLS_TABLEID =              6,
+    TEST_TABLEID =               7,
+    TRP_STATUS_TABLEID =         8,
+    LOG_SPACE_TABLEID =          9
+  };
+
+  struct Table {
+    struct Members {
+      const char* name;
+      int ncols;
+      int flags;
+      const char* comment;
+    } m;
+    Column col[1];
+
+    int columns(void) const {
+      return m.ncols;
+    }
+  };
+  static int getNumTables();
+  static const Table& getTable(int i);
+  static const Table& getTable(Uint32 i);
+
+  class Row {
+    friend class SimulatedBlock;
+    Uint32* start;      // Start of row buffer
+    Uint32* curr;       // Current position in row buffer
+    Uint32* end;        // End of buffer
+    int col_counter;    // Current column counter
+    DbinfoScan& m_req;  // The scan parameters
+    Row();              // Not impl
+    Row(const Row&);    // Not impl
+  public:
+
+    Row(class Signal* signal, DbinfoScanReq& req);
+
+    Uint32 getLength(void) const {
+      return (Uint32)(curr - start);
+    }
+
+    Uint32* getDataPtr() const {
+      return start;
+    }
+
+    void write_string(const char* col);
+    void write_uint32(Uint32 value);
+    void write_uint64(Uint64 value);
+
+    int columns(void) const {
+      return col_counter;
+    }
+
+  private:
+    bool check_buffer_space(class AttributeHeader& ah) const;
+    void check_attribute_type(class AttributeHeader& ah, ColumnType) const;
+  };
+
+  struct ScanCursor
+  {
+    Uint32 senderRef;
+    Uint32 saveSenderRef;
+    Uint32 currRef; // The current node, block and instance
+    Uint32 saveCurrRef;
+    /**
+     * Flags
+     *
+     m = More data         - 1  Bit 1
+
+               1111111111222222222233
+     01234567890123456789012345678901
+     m
+    */
+    Uint32 flags;
+    Uint32 data[4];  // Cursor data
+
+    Uint32 totalRows;
+    Uint32 totalBytes;
+    STATIC_CONST( Length = 10 );
+
+    STATIC_CONST( MOREDATA_SHIFT = 0 );
+    STATIC_CONST( MOREDATA_MASK = 1 );
+
+    static bool getHasMoreData(const UintR & flags){
+      return (bool)((flags >> MOREDATA_SHIFT) & MOREDATA_MASK);
+    }
+    static void setHasMoreData(UintR & flags, bool value){
+      flags = (flags & ~(MOREDATA_MASK << MOREDATA_SHIFT)) |
+              ((value & MOREDATA_MASK) << MOREDATA_SHIFT);
+    }
+  };
+
+  class Ratelimit {
+    friend class SimulatedBlock;
+    Uint32 rows;
+    Uint32 bytes;
+    Ratelimit(const Ratelimit&);// Not impl
+  public:
+    Ratelimit() :
+      rows(0),
+      bytes(0){
+    }
+
+    bool need_break(const DbinfoScan& scan) const {
+      // Automatically limit the maxRows value
+      Uint32 maxRows = min(scan.maxRows ? scan.maxRows : 256, 256);
+
+      if (maxRows != 0 && rows >= maxRows)
+        return true; // More than max rows already sent
+      if (scan.maxBytes != 0 &&  bytes >= scan.maxBytes)
+        return true; // More than max bytes already sent
+      return false;
+    }
+  };
+};
+
+#endif

=== added file 'storage/ndb/src/kernel/vm/NdbinfoTables.cpp'
--- a/storage/ndb/src/kernel/vm/NdbinfoTables.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/kernel/vm/NdbinfoTables.cpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,193 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "Ndbinfo.hpp"
+
+
+#define DECLARE_NDBINFO_TABLE(var, num)  \
+static const struct  {                   \
+  Ndbinfo::Table::Members m;             \
+  Ndbinfo::Column col[num];              \
+} ndbinfo_##var 
+
+
+DECLARE_NDBINFO_TABLE(TABLES,3) =
+{ "tables", 3, 0, "",
+  {
+    {"table_id",  Ndbinfo::Number, ""},
+    {"table_name",Ndbinfo::String, ""},
+    {"comment",   Ndbinfo::String, ""},
+  }};
+
+DECLARE_NDBINFO_TABLE(COLUMNS,5) =
+{ "columns", 5, 0, "",
+  {
+    {"table_id",    Ndbinfo::Number, ""},
+    {"column_id",   Ndbinfo::Number, ""},
+    {"column_name", Ndbinfo::String, ""},
+    {"column_type", Ndbinfo::Number, ""},
+    {"comment",     Ndbinfo::String, ""},
+  }};
+
+DECLARE_NDBINFO_TABLE(MEMUSAGE,7) =
+{ "memusage", 7, 0, "",
+  {
+    {"node_id",          Ndbinfo::Number, ""},
+    {"block_number",     Ndbinfo::Number, ""},
+    {"block_instance",   Ndbinfo::Number, ""},
+    {"resource_name",    Ndbinfo::String, ""},
+    {"page_size",        Ndbinfo::Number, ""},
+    {"pages_used",       Ndbinfo::Number, ""},
+    {"pages_total",      Ndbinfo::Number, ""},
+  }};
+
+DECLARE_NDBINFO_TABLE(LOGDESTINATION,5) =
+{ "logdestination", 5, 0, "",
+  {
+    {"node_id",          Ndbinfo::Number, ""},
+    {"type",             Ndbinfo::String, ""},
+    {"params",           Ndbinfo::String, ""},
+    {"current_size",     Ndbinfo::Number, ""},
+    {"max_size",         Ndbinfo::Number, ""},
+  }
+};
+
+DECLARE_NDBINFO_TABLE(BACKUP_RECORDS,11) =
+{ "backup_records", 11, 0, "",
+  {
+    {"node_id",          Ndbinfo::Number, ""},
+    {"backup_record",    Ndbinfo::Number, ""},
+    {"backup_id",        Ndbinfo::Number, ""},
+    {"master_ref",       Ndbinfo::Number, ""},
+    {"client_ref",       Ndbinfo::Number, ""},
+    {"state",            Ndbinfo::Number, ""},
+    {"bytes",            Ndbinfo::Number, ""},
+    {"records",          Ndbinfo::Number, ""},
+    {"log_bytes",        Ndbinfo::Number, ""},
+    {"log_records",      Ndbinfo::Number, ""},
+    {"error_code",       Ndbinfo::Number, ""},
+  }
+};
+
+DECLARE_NDBINFO_TABLE(BACKUP_PARAMETERS,14) =
+{ "backup_parameters", 14, 0, "",
+  {
+    {"node_id",                  Ndbinfo::Number, ""},
+    {"current_disk_write_speed", Ndbinfo::Number, ""},
+    {"bytes_written_this_period",Ndbinfo::Number, ""},
+    {"overflow_disk_write",      Ndbinfo::Number, ""},
+    {"reset_delay_used",         Ndbinfo::Number, ""},
+    {"reset_disk_speed_time",    Ndbinfo::Number, ""},
+    {"backup_pool_size",         Ndbinfo::Number, ""},
+    {"backup_file_pool_size",    Ndbinfo::Number, ""},
+    {"table_pool_size",          Ndbinfo::Number, ""},
+    {"trigger_pool_size",        Ndbinfo::Number, ""},
+    {"fragment_pool_size",       Ndbinfo::Number, ""},
+    {"page_pool_size",           Ndbinfo::Number, ""},
+    {"compressed_backup",        Ndbinfo::Number, ""},
+    {"compressed_lcp",           Ndbinfo::Number, ""},
+  }
+};
+
+DECLARE_NDBINFO_TABLE(POOLS,6) =
+{ "pools", 6, 0, "",
+  {
+    {"node_id",                  Ndbinfo::Number, ""},
+    {"block_number",             Ndbinfo::Number, ""},
+    {"block_instance",           Ndbinfo::Number, ""},
+    {"pool_name",                Ndbinfo::String, ""},
+    {"free",                     Ndbinfo::Number, ""},
+    {"size",                     Ndbinfo::Number, ""},
+  }
+};
+
+DECLARE_NDBINFO_TABLE(TEST,5) =
+{ "test", 5, 0, "",
+  {
+    {"node_id",                 Ndbinfo::Number, ""},
+    {"block_number",            Ndbinfo::Number, ""},
+    {"block_instance",          Ndbinfo::Number, ""},
+    {"counter",                 Ndbinfo::Number, ""},
+    {"counter2",                Ndbinfo::Number64, ""},
+  }
+};
+
+DECLARE_NDBINFO_TABLE(TRP_STATUS, 3) =
+{ "trp_status", 3, 0, "",
+  {
+    {"node_id",                 Ndbinfo::Number, ""},
+    {"remote_node_id",          Ndbinfo::Number, ""},
+    {"status",                  Ndbinfo::Number, ""},
+  }
+};
+
+DECLARE_NDBINFO_TABLE(LOG_SPACE, 7) =
+{ "log_space", 7, 0, "",
+  {
+    {"log_id",   Ndbinfo::Number, ""},
+    {"log_type", Ndbinfo::Number, "0 = REDO, 1 = DD-UNDO"},
+    {"log_part", Ndbinfo::Number, ""},
+    {"node_id",  Ndbinfo::Number, ""},
+    {"total",    Ndbinfo::Number64, "total allocated, Mb"},
+    {"used",     Ndbinfo::Number64, "currently in use, Mb"},
+    {"used_hi",  Ndbinfo::Number64, "in use high water mark, Mb"},
+  }
+};
+
+
+#define DBINFOTBL(x) Ndbinfo::x##_TABLEID, (Ndbinfo::Table*)&ndbinfo_##x
+
+static
+struct ndbinfo_table_list_entry {
+  Ndbinfo::TableId id;
+  Ndbinfo::Table * table;
+} ndbinfo_tables[] = {
+  // NOTE! the tables must be added to the list in the same order
+  // as they are in "enum TableId"
+  DBINFOTBL(TABLES),
+  DBINFOTBL(COLUMNS),
+  DBINFOTBL(MEMUSAGE),
+  DBINFOTBL(LOGDESTINATION),
+  DBINFOTBL(BACKUP_RECORDS),
+  DBINFOTBL(BACKUP_PARAMETERS),
+  DBINFOTBL(POOLS),
+  DBINFOTBL(TEST),
+  DBINFOTBL(TRP_STATUS),
+  DBINFOTBL(LOG_SPACE)
+};
+
+static int no_ndbinfo_tables =
+  sizeof(ndbinfo_tables) / sizeof(ndbinfo_tables[0]);
+
+
+int Ndbinfo::getNumTables(){
+  return no_ndbinfo_tables;
+}
+
+const Ndbinfo::Table& Ndbinfo::getTable(int i)
+{
+  assert(i >= 0 && i < number_ndbinfo_tables);
+  ndbinfo_table_list_entry& entry = ndbinfo_tables[i];
+  assert(entry.id == i);
+  return *entry.table;
+}
+
+const Ndbinfo::Table& Ndbinfo::getTable(Uint32 i)
+{
+  return getTable((int)i);
+}

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2009-11-04 16:49:02 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2009-11-08 12:53:35 +0000
@@ -43,6 +43,7 @@
 #include <signaldata/FsRef.hpp>
 #include <signaldata/SignalDroppedRep.hpp>
 #include <signaldata/LocalRouteOrd.hpp>
+#include <signaldata/TransIdAI.hpp>
 #include <DebuggerNames.hpp>
 #include "LongSignal.hpp"
 
@@ -4034,3 +4035,98 @@ SimulatedBlock::wakeup()
   globalTransporterRegistry.wakeup();
 #endif
 }
+
+
+void
+SimulatedBlock::ndbinfo_send_row(Signal* signal,
+                                 const DbinfoScanReq& req,
+                                 const Ndbinfo::Row& row,
+                                 Ndbinfo::Ratelimit& rl) const
+{
+  // Check correct number of columns against table
+  assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
+
+  TransIdAI *tidai= (TransIdAI*)signal->getDataPtrSend();
+  tidai->connectPtr= req.resultData;
+  tidai->transId[0]= req.transId[0];
+  tidai->transId[1]= req.transId[1];
+
+  LinearSectionPtr ptr[3];
+  ptr[0].p = row.getDataPtr();
+  ptr[0].sz = row.getLength();
+
+  rl.rows++;
+  rl.bytes += row.getLength();
+
+  sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
+             signal, TransIdAI::HeaderLength, JBB, ptr, 1);
+}
+
+
+void
+SimulatedBlock::ndbinfo_send_scan_break(Signal* signal,
+                                       DbinfoScanReq& req,
+                                       const Ndbinfo::Ratelimit& rl,
+                                       Uint32 data1, Uint32 data2,
+                                       Uint32 data3, Uint32 data4) const
+{
+  DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
+  const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
+  MEMCOPY_NO_WORDS(conf, &req, signal_length);
+
+  conf->returnedRows = rl.rows;
+
+  // Update the cursor with current item number
+  Ndbinfo::ScanCursor* cursor =
+    (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
+
+  cursor->data[0] = data1;
+  cursor->data[1] = data2;
+  cursor->data[2] = data3;
+  cursor->data[3] = data4;
+
+  // Increase number of rows and bytes sent to far
+  cursor->totalRows += rl.rows;
+  cursor->totalBytes += rl.bytes;
+
+  Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, true);
+
+  sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
+             signal_length, JBB);
+}
+
+void
+SimulatedBlock::ndbinfo_send_scan_conf(Signal* signal,
+                                       DbinfoScanReq& req,
+                                       const Ndbinfo::Ratelimit& rl) const
+{
+  DbinfoScanConf* conf= (DbinfoScanConf*)signal->getDataPtrSend();
+  const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
+  Uint32 sender_ref = req.resultRef;
+  MEMCOPY_NO_WORDS(conf, &req, signal_length);
+
+  conf->returnedRows = rl.rows;
+
+  if (req.cursor_sz)
+  {
+    jam();
+    // Update the cursor with current item number
+    Ndbinfo::ScanCursor* cursor =
+      (Ndbinfo::ScanCursor*)DbinfoScan::getCursorPtrSend(conf);
+
+    // Reset all data holders
+    memset(cursor->data, 0, sizeof(cursor->data));
+
+    // Increase number of rows and bytes sent to far
+    cursor->totalRows += rl.rows;
+    cursor->totalBytes += rl.bytes;
+
+    Ndbinfo::ScanCursor::setHasMoreData(cursor->flags, false);
+
+    sender_ref = cursor->senderRef;
+
+  }
+  sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
+             signal_length, JBB);
+}
+

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2009-11-04 16:49:02 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2009-11-08 12:53:35 +0000
@@ -58,6 +58,8 @@
 #include "ndbd_malloc_impl.hpp"
 #include <blocks/record_types.hpp>
 
+#include "Ndbinfo.hpp"
+
 #ifdef VM_TRACE
 #define D(x) \
   do { \
@@ -931,6 +933,22 @@ public:
   void debugOutUnlock() { globalSignalLoggers.unlock(); }
   const char* debugOutTag(char* buf, int line);
 #endif
+
+  void ndbinfo_send_row(Signal* signal,
+                        const DbinfoScanReq& req,
+                        const Ndbinfo::Row& row,
+                        Ndbinfo::Ratelimit& rl) const;
+
+  void ndbinfo_send_scan_break(Signal* signal,
+                               DbinfoScanReq& req,
+                               const Ndbinfo::Ratelimit& rl,
+                               Uint32 data1, Uint32 data2 = 0,
+                               Uint32 data3 = 0, Uint32 data4 = 0) const;
+
+  void ndbinfo_send_scan_conf(Signal* signal,
+                              DbinfoScanReq& req,
+                              const Ndbinfo::Ratelimit& rl) const;
+
 };
 
 // outside blocks e.g. within a struct

=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp	2009-11-08 12:52:27 +0000
@@ -3242,141 +3242,6 @@ int ndb_mgm_drop_nodegroup(NdbMgmHandle 
   DBUG_RETURN(res);
 }
 
-extern "C"
-int
-ndb_mgm_ndbinfo(NdbMgmHandle handle, const char* query, int *rows)
-{
-  int retval= 0;
-  DBUG_ENTER("ndb_mgm_ndbinfo");
-  CHECK_HANDLE(handle, 0);
-  CHECK_CONNECTED(handle, 0);
-
-  Properties args;
-  args.put("query", query);
-
-  const ParserRow<ParserDummy> reply[]= {
-    MGM_CMD("ndbinfo reply", NULL, ""),
-    MGM_ARG("error", Int, Mandatory, "Error code"),
-    MGM_ARG("rows", Int, Optional, "Row Count"),
-    MGM_END()
-  };
-
-  const Properties *prop;
-  prop = ndb_mgm_call(handle, reply, "ndbinfo", &args);
-  CHECK_REPLY(handle, prop, 0);
-
-  Uint32 ndbinfo_err=0;
-
-  if(!prop->get("error",&ndbinfo_err)){
-    fprintf(handle->errstream, "Unable to get error\n");
-    goto err;
-  }
-
-  if(ndbinfo_err)
-  {
-    retval= ndbinfo_err;
-    goto err;
-  }
-
-  Uint64 r;
-
-  if(!prop->get("rows",&r))
-  {
-    fprintf(handle->errstream, "Unable to get number of rows\n");
-    goto err;
-  }
-
-  *rows= (int)r;
-
-err:
-  delete prop;
-  DBUG_RETURN(retval);
-}
-
-int ndb_mgm_ndbinfo_colcount(NdbMgmHandle h)
-{
-  int n;
-  SocketInputStream in(h->socket, h->timeout);
-  char c[100];
-
-  in.gets(c,sizeof(c));
-
-  CHECK_TIMEDOUT_RET(h, in, in, -1);
-
-  n= atoi(c);
-
-  return n;
-}
-
-int ndb_mgm_ndbinfo_getcolums(NdbMgmHandle h,int n, int l, char** c)
-{
-  int i;
-  SocketInputStream in(h->socket, h->timeout);
-
-  for(i=0;i<n;i++)
-  {
-    in.gets(c[i],l);
-    CHECK_TIMEDOUT_RET(h, in, in, 1);
-
-    if(c[i][strlen(c[i])-1]=='\n')
-      c[i][strlen(c[i])-1]='\0';
-  }
-
-  return 0;
-}
-
-char* ndb_mgm_ndbinfo_nextcolumn(char* row, int *len)
-{
-  char *curr= row;
-  char *end= row;
-
-  if(curr[0]=='\'')
-  {
-    curr++;
-    end= strchr(curr,'\'');
-    if(!end)
-    {
-      if((end= strchr(curr,'\n')))
-      {
-        *end='\0';
-        *len= strlen(curr);
-        return curr;
-      }
-      return NULL;
-    }
-    *len= end - curr;
-    end++;
-  }
-  else
-  {
-    end= strchr(curr,',');
-    if(!end)
-    {
-      if((end= strchr(curr,'\n')))
-      {
-        *end='\0';
-        *len= strlen(curr);
-        return curr;
-      }
-      return NULL;
-    }
-    *len= end - curr;
-    end++;
-  }
-
-  return curr;
-}
-
-int ndb_mgm_ndbinfo_getrow(NdbMgmHandle h, char* row, int len)
-{
-  SocketInputStream in(h->socket, h->timeout);
-
-  in.gets(row,len);
-
-  CHECK_TIMEDOUT_RET(h, in, in, 1);
-
-  return 0;
-}
 
 NDB_SOCKET_TYPE _ndb_mgm_get_socket(NdbMgmHandle h)
 {

=== modified file 'storage/ndb/src/mgmclient/CommandInterpreter.cpp'
--- a/storage/ndb/src/mgmclient/CommandInterpreter.cpp	2009-09-08 15:12:34 +0000
+++ b/storage/ndb/src/mgmclient/CommandInterpreter.cpp	2009-11-08 12:52:27 +0000
@@ -140,8 +140,6 @@ public:
   int executeCreateNodeGroup(char* parameters);
   int executeDropNodeGroup(char* parameters);
 
-  int executeNdbInfo(const char* parameters);
-
 public:
   bool connect(bool interactive);
   bool disconnect();
@@ -1242,16 +1240,6 @@ CommandInterpreter::execute_impl(const c
     m_error = executeDropNodeGroup(allAfterFirstToken);
     DBUG_RETURN(true);
   }
-  else if(strcasecmp(firstToken, "NDBINFO")==0)
-  {
-    const char *q=NULL;
-    if(allAfterFirstToken==NULL)
-      q= "SELECT * FROM NDB$INFO.TABLES";
-    else
-      q= allAfterFirstToken;
-    m_error = executeNdbInfo(q);
-    DBUG_RETURN(true);
-  }
   else if (strcasecmp(firstToken, "ALL") == 0) {
     m_error = analyseAfterFirstToken(-1, allAfterFirstToken);
   } else {
@@ -2527,44 +2515,6 @@ CommandInterpreter::executeReport(int pr
   return -1;
 }
 
-int
-CommandInterpreter::executeNdbInfo(const char* parameters)
-{
-  int rows;
-  int r= ndb_mgm_ndbinfo(m_mgmsrv,parameters, &rows);
-
-  if(r)
-  {
-    ndbout_c("Error displaying NDBINFO: %d",r);
-    return -1;
-  }
-
-  int ncol= ndb_mgm_ndbinfo_colcount(m_mgmsrv);
-
-  char **cols= (char**)malloc(ncol*sizeof(char*));
-  for(int i=0;i<ncol;i++)
-    cols[i]= (char*) malloc(100*sizeof(char));
-
-  ndb_mgm_ndbinfo_getcolums(m_mgmsrv,ncol,100,cols);
-
-  for(int i=0;i<ncol;i++)
-  {
-    ndbout << cols[i] << "\t";
-    free(cols[i]);
-  }
-  ndbout << endl;
-
-  while(rows--)
-  {
-    char c[1024];
-    ndb_mgm_ndbinfo_getrow(m_mgmsrv, c, 1024);
-    c[strlen(c)-1]='\0';
-    ndbout_c(c);
-  }
-
-  free(cols);
-  return 0;
-}
 
 static void helpTextReportFn()
 {

=== modified file 'storage/ndb/src/mgmsrv/Makefile.am'
--- a/storage/ndb/src/mgmsrv/Makefile.am	2009-10-01 07:16:52 +0000
+++ b/storage/ndb/src/mgmsrv/Makefile.am	2009-11-08 12:52:27 +0000
@@ -30,7 +30,6 @@ ndb_mgmd_SOURCES = \
 	ConfigInfo.cpp \
 	InitConfigFileParser.cpp \
 	Config.cpp \
-	mgm_ndbinfo.cpp \
 	ConfigManager.cpp
 
 noinst_PROGRAMS = MgmConfig-t

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2009-10-09 16:20:38 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2009-11-08 12:52:27 +0000
@@ -62,9 +62,6 @@
 
 #include <SignalSender.hpp>
 
-#include <ndbinfo.h>
-#include <AttributeHeader.hpp>
-
 int g_errorInsert;
 #define ERROR_INSERTED(x) (g_errorInsert == x)
 
@@ -2471,77 +2468,6 @@ const char* MgmtSrvr::getErrorText(int e
   return buf;
 }
 
-void MgmtSrvr::execDBINFO_SCANREQ(NdbApiSignal* signal)
-{
-#if 1
-  (void)signal;
-#else
-  DbinfoScanReq req= *(DbinfoScanReq*) signal->getDataPtr();
-
-  const Uint32 tableId= req.tableId;
-  const Uint32 senderRef= req.senderRef;
-  const Uint32 apiTxnId= req.apiTxnId;
-
-  DbinfoScanReq *oreq= (DbinfoScanReq*)signal->getDataPtrSend();
-
-  memcpy(signal->getDataPtrSend(),&req,signal->getLength()*sizeof(Uint32));
-
-  char buf[1024];
-  struct dbinfo_row r;
-  struct dbinfo_ratelimit rl;
-  int i;
-  int startid= 0;
-
-  switch(req.tableId)
-  {
-  case 3:
-//  case NDBINFO_LOGDESTINATION_TABLEID:
-/*    dbinfo_ratelimit_init(&rl, &req);
-
-    if(!(req.requestInfo & DbinfoScanReq::StartScan))
-      startid= req.cur_item;
-
-    for(i=startid;
-        dbinfo_ratelimit_continue(&rl)
-          && 3+getLogger()->getHandlerCount();
-        i++)
-    {
-      dbinfo_write_row_init(&r, buf, sizeof(buf));
-
-      LogHandler* lh= getLogger()->getHandler(i);
-      if(lh)
-      {
-        BaseString lparams;
-        const char *s;
-        lh->getParams(lparams);
-        dbinfo_write_row_column_uint32(&r, getOwnNodeId());
-        s= lh->handler_type();
-        dbinfo_write_row_column(&r, s, strlen(s));
-        s= lparams.c_str();
-        dbinfo_write_row_column(&r, s, strlen(s));
-        dbinfo_write_row_column_uint32(&r, lh->getCurrentSize());
-        dbinfo_write_row_column_uint32(&r, lh->getMaxSize());
-        dbinfo_send_row(signal,r,rl,apiTxnId,senderRef);
-      }
-    }
-    if(!dbinfo_ratelimit_continue(&rl) && i < number_ndbinfo_tables)
-    {
-      dbinfo_ratelimit_sendconf(signal,req,rl,i);
-    }
-    else
-    {
-      DbinfoScanConf *conf= (DbinfoScanConf*)signal->getDataPtrSend();
-      conf->tableId= req.tableId;
-      conf->senderRef= req.senderRef;
-      conf->apiTxnId= req.apiTxnId;
-      conf->requestInfo= 0;
-      sendSignal(req.senderRef, GSN_DBINFO_SCANCONF, signal,
-                 DbinfoScanConf::SignalLength, JBB);
-    }
-*/    break;
-  }
-#endif
-}
 
 void
 MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
@@ -2567,10 +2493,6 @@ MgmtSrvr::handleReceivedSignal(NdbApiSig
   case GSN_TAKE_OVERTCCONF:
     break;
 
-  case GSN_DBINFO_SCANREQ:
-    execDBINFO_SCANREQ(signal);
-    break;
-
   default:
     g_eventLogger->error("Unknown signal received. SignalNumber: "
                          "%i from (%d, 0x%x)",
@@ -3639,235 +3561,6 @@ bool MgmtSrvr::connect_to_self()
   return true;
 }
 
-Logger* MgmtSrvr::getLogger()
-{
-  return g_eventLogger;
-}
-
-int MgmtSrvr::ndbinfo(BaseString table_name, Vector<BaseString> *cols, Vector<BaseString> *rows)
-{
-  int r= ENOENT;
-
-  if(m_ndbinfo_table_names.size()==0 || table_name=="TABLES" || table_name=="COLUMNS")
-  {
-    Vector<BaseString> tmp;
-    Vector<BaseString> tmp2;
-
-    /* Refresh NDBINFO metadata cache */
-    r= ndbinfo(0, &tmp, &tmp2);
-    r= ndbinfo(1, &tmp, &tmp2);
-
-    if(table_name=="TABLES")
-      return ndbinfo(0, cols, rows);
-    if(table_name=="COLUMNS")
-      return ndbinfo(1, cols, rows);
-
-  }
-
-  for(Uint32 i = 2; i<m_ndbinfo_table_names.size(); i++)
-  {
-    if(table_name == m_ndbinfo_table_names[i])
-      return ndbinfo(i, cols, rows);
-  }
-
-  return r;
-}
-
-int MgmtSrvr::ndbinfo(Uint32 tableId, 
-                      Vector<BaseString> *cols, Vector<BaseString> *rows)
-{
-  SignalSender ss(theFacade);
-  ss.lock();
-
-  SimpleSignal ssig;
-  DbinfoScanReq *req= CAST_PTR(DbinfoScanReq, ssig.getDataPtrSend());
-  req->tableId= tableId;
-  req->senderRef= ss.getOwnRef();
-  req->apiTxnId= 1;
-  req->requestInfo= DbinfoScanReq::AllColumns | DbinfoScanReq::StartScan;
-  req->colBitmapLo= ~0;
-  req->colBitmapHi= ~0;
-  req->maxRows= 2;
-  req->maxBytes= 0;
-  req->rows_total= 0;
-  req->word_total= 0;
-
-  ssig.set(ss, TestOrd::TraceAPI, DBINFO, GSN_DBINFO_SCANREQ,
-           DbinfoScanReq::SignalLength);
-
-  NodeId nodeId = m_master_node;
-  if (okToSendTo(nodeId, false) != 0)
-  {
-    bool next;
-    nodeId = m_master_node = 0;
-    while((next = getNextNodeId(&nodeId, NDB_MGM_NODE_TYPE_NDB)) == true &&
-          okToSendTo(nodeId, false) != 0);
-    if(!next)
-      return NO_CONTACT_WITH_DB_NODES;
-  }
-
-  int do_send= 1;
-
-  int ncols;
-  if(m_ndbinfo_column_types.size() >= tableId+1)
-  {
-    ncols= m_ndbinfo_column_types[tableId].size();
-    *cols= m_ndbinfo_column_names[tableId];
-  }
-  else
-  {
-    if(tableId==0)
-      ncols= 3;
-    else // tableid = 1
-      ncols= 4;
-  }
-
-  while(true)
-  {
-    if(do_send)
-    {
-      if(ss.sendSignal(nodeId, &ssig) != SEND_OK)
-        return SEND_OR_RECEIVE_FAILED;
-
-      do_send= 0;
-    }
-
-    SimpleSignal *signal= ss.waitFor();
-
-    int gsn= signal->readSignalNumber();
-
-    int len;
-    BaseString b, rowstr;
-    int i;
-    char *row;
-    Uint32 rowsz;
-    Uint32 rec_tableid;
-    int rec_colid;
-    DbinfoScanConf *conf;
-    Uint32 coltype;
-
-    switch(gsn)
-    {
-    case GSN_DBINFO_TRANSID_AI:
-      row= (char*)signal->ptr[0].p;
-      rowsz= signal->ptr[0].sz;
-
-      rowstr.clear();
-
-      for(i=0; i<ncols; i++)
-      {
-        AttributeHeader ah(*(Uint32*)row);
-        row+=ah.getHeaderSize()*sizeof(Uint32);
-        len= ah.getByteSize();
-
-        len= ah.getByteSize();
-
-        if(tableId==0)
-        {
-          if(i==0)
-            rec_tableid= *(Uint32*)row;
-          if(i==1)
-          {
-            b.assign(row,len);
-            m_ndbinfo_table_names.set(b, rec_tableid, b);
-            b.clear();
-          }
-        }
-
-        if(tableId==1)
-        {
-          if(i==0)
-            rec_tableid= *(Uint32*)row;
-          if(i==1)
-            rec_colid= *(Uint32*)row;
-          if(i==2)
-          {
-            if(m_ndbinfo_column_names.size() <= rec_tableid)
-            {
-              Vector<BaseString> v;
-              m_ndbinfo_column_names.fill(rec_tableid+1, v);
-            }
-            b.assign(row,len);
-            m_ndbinfo_column_names[rec_tableid].set(b,(unsigned)rec_colid,b);
-            b.clear();
-          }
-          if(i==3)
-          {
-            if(m_ndbinfo_column_types.size() <= rec_tableid)
-            {
-              Vector<Uint32> v;
-              m_ndbinfo_column_types.fill(rec_tableid+1, v);
-            }
-            coltype= (strncmp("BIGINT",row,len)==0)?2:1;
-
-            m_ndbinfo_column_types[rec_tableid].set(coltype,
-                                                    (unsigned)rec_colid,
-                                                    coltype);
-          }
-        }
-        
-        if(m_ndbinfo_column_types.size() > tableId
-           && m_ndbinfo_column_types[tableId].size() > (unsigned)i)
-        {
-          switch(m_ndbinfo_column_types[tableId][i])
-          {
-          case NDBINFO_TYPE_NUMBER:
-            rowstr.appfmt("%u",*(Uint32*)row);
-            break;
-          case NDBINFO_TYPE_STRING:
-            b.assign(row,len);
-            for(char*c= (char*)b.c_str(); *c!='\0'; c++)
-              if(*c=='\n')
-                *c= ' ';
-            rowstr.append(b);
-            b.clear();
-            break;
-          }
-
-          if(i!=ncols-1)
-            rowstr.append(",");
-        }
-
-        row+=len;
-
-      }
-
-      rows->push_back(rowstr);
-      rowstr.clear();
-
-      break;
-    case GSN_DBINFO_SCANCONF:
-      conf= (DbinfoScanConf*) signal->getDataPtr();
-
-      if(conf->requestInfo & DbinfoScanConf::MoreData)
-      {
-        memcpy(req,conf,signal->header.theLength*sizeof(Uint32));
-        req->requestInfo &= ~(DbinfoScanReq::StartScan);
-        ssig.set(ss, TestOrd::TraceAPI, req->cursor.cur_block, 
-                 GSN_DBINFO_SCANREQ, DbinfoScanReq::SignalLengthWithCursor);
-        nodeId= req->cursor.cur_node;
-
-        do_send= 1;
-        continue;
-      }
-      else
-      {
-        return 0;
-      }
-      break;
-    case GSN_API_REGCONF:
-    case GSN_TAKE_OVERTCCONF:
-      // Ignore;
-      break;
-    default:
-      report_unknown_signal(signal);
-      return SEND_OR_RECEIVE_FAILED;
-    }
-  }
-
-  return 0;
-}
-
 
 bool
 MgmtSrvr::change_config(Config& new_config, BaseString& msg)

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2009-11-08 12:52:27 +0000
@@ -446,15 +446,6 @@ private:
   int check_nodes_stopping();
   int check_nodes_single_user();
 
-
-  Logger*  getLogger();
-
-  int ndbinfo(Uint32 tableId, Vector<BaseString> *cols, Vector<BaseString> *rows);
-
-  int ndbinfo(BaseString table_name,
-              Vector<BaseString> *cols,
-              Vector<BaseString> *rows);
-
   //**************************************************************************
 
   const MgmtOpts& m_opts;
@@ -463,10 +454,6 @@ private:
   Uint32 m_port;
   SocketServer m_socket_server;
 
-  Vector<BaseString> m_ndbinfo_table_names;
-  Vector< Vector<Uint32> > m_ndbinfo_column_types;
-  Vector< Vector<BaseString> > m_ndbinfo_column_names;
-
   NdbMutex* m_local_config_mutex;
   const Config* m_local_config;
 
@@ -483,7 +470,6 @@ private:
 
   void handleReceivedSignal(NdbApiSignal* signal);
   void handleStatus(NodeId nodeId, bool alive, bool nfComplete);
-  void execDBINFO_SCANREQ(NdbApiSignal* signal);
 
   /**
      Callback function installed into TransporterFacade, will be called

=== modified file 'storage/ndb/src/mgmsrv/Services.cpp'
--- a/storage/ndb/src/mgmsrv/Services.cpp	2009-10-27 16:50:41 +0000
+++ b/storage/ndb/src/mgmsrv/Services.cpp	2009-11-08 12:52:27 +0000
@@ -36,8 +36,6 @@
 #include <base64.h>
 #include <ndberror.h>
 
-#include <ndbinfo.h>
-
 extern bool g_StopServer;
 extern bool g_RestartServer;
 extern EventLogger * g_eventLogger;
@@ -292,9 +290,6 @@ ParserRow<MgmApiSession> commands[] = {
   MGM_CMD("drop nodegroup", &MgmApiSession::drop_nodegroup, ""),
     MGM_ARG("ng", Int, Mandatory, "Nodegroup"),
 
-  MGM_CMD("ndbinfo", &MgmApiSession::getNdbInfo, ""),
-    MGM_ARG("query", String, Mandatory, "SQL-Like Query"),
-
   MGM_CMD("show config", &MgmApiSession::showConfig, ""),
     MGM_ARG("Section", String, Optional, "Section name"),
     MGM_ARG("NodeId", Int, Optional, "Nodeid"),
@@ -2065,42 +2060,6 @@ done:
   m_output->println("");
 }
 
-int print_ndbinfo_table_mgm(struct ndbinfo_table* t, BaseString &out);
-
-void MgmApiSession::getNdbInfo(Parser_t::Context &ctx, Properties const &args)
-{
-  BaseString query;
-  args.get("query", query);
-
-  Vector<BaseString> columns;
-  Vector<BaseString> rows;
-
-  m_output->println("ndbinfo reply");
-
-  int r= m_mgmsrv.ndbinfo(query, &columns, &rows);
-
-  if(r)
-  {
-    m_output->println("error: %d",r);
-    m_output->println("");
-    return;
-  }
-
-  m_output->println("error: 0");
-  m_output->println("rows: %d",rows.size());
-  m_output->println("");
-
-  m_output->println("%d",columns.size());
-
-  for(unsigned i = 0; i < columns.size(); i++)
-    m_output->println(columns[i].c_str());
-
-  for(unsigned i = 0; i < rows.size(); i++)
-    m_output->println(rows[i].c_str());
-
-  return ;
-}
-
 
 void MgmApiSession::showConfig(Parser_t::Context &ctx, Properties const &args)
 {

=== removed file 'storage/ndb/src/mgmsrv/mgm_ndbinfo.cpp'
--- a/storage/ndb/src/mgmsrv/mgm_ndbinfo.cpp	2008-10-06 06:53:04 +0000
+++ b/storage/ndb/src/mgmsrv/mgm_ndbinfo.cpp	1970-01-01 00:00:00 +0000
@@ -1,14 +0,0 @@
-
-#include <BaseString.hpp>
-#include <ndbinfo.h>
-
-int print_ndbinfo_table_mgm(struct ndbinfo_table* t, BaseString &out)
-{
-  int i;
-  out.appfmt("%d\n",t->ncols);
-
-  for(i=0;i<t->ncols;i++)
-    out.appfmt("%s\n",t->col[i].name);
-
-  return 0;
-}

=== modified file 'storage/ndb/src/ndbapi/CMakeLists.txt'
--- a/storage/ndb/src/ndbapi/CMakeLists.txt	2009-10-19 11:56:57 +0000
+++ b/storage/ndb/src/ndbapi/CMakeLists.txt	2009-11-08 12:52:27 +0000
@@ -54,4 +54,7 @@ ADD_LIBRARY(ndbapi STATIC
             ndb_cluster_connection.cpp
             NdbBlob.cpp
             SignalSender.cpp
-            ObjectMap.cpp)
+            ObjectMap.cpp
+            NdbInfo.cpp
+            NdbInfoScanOperation.cpp)
+

=== modified file 'storage/ndb/src/ndbapi/Makefile.am'
--- a/storage/ndb/src/ndbapi/Makefile.am	2009-10-19 11:56:57 +0000
+++ b/storage/ndb/src/ndbapi/Makefile.am	2009-11-08 12:52:27 +0000
@@ -58,7 +58,9 @@ libndbapi_la_SOURCES = \
 	NdbIndexStat.cpp \
         SignalSender.cpp \
         ObjectMap.cpp \
-	NdbInterpretedCode.cpp
+	NdbInterpretedCode.cpp \
+	NdbInfo.cpp \
+	NdbInfoScanOperation.cpp
 
 INCLUDES_LOC = -I$(top_srcdir)/storage/ndb/src/mgmapi
 

=== added file 'storage/ndb/src/ndbapi/NdbInfo.cpp'
--- a/storage/ndb/src/ndbapi/NdbInfo.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbInfo.cpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,523 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#include "NdbInfo.hpp"
+
+#include <ndbapi/ndb_cluster_connection.hpp>
+
+NdbInfo::NdbInfo(class Ndb_cluster_connection* connection,
+                 const char* prefix, const char* dbname,
+                 const char* table_prefix) :
+  m_connect_count(connection->get_connect_count()),
+  m_connection(connection),
+  m_tables_table(NULL), m_columns_table(NULL),
+  m_prefix(prefix),
+  m_dbname(dbname),
+  m_table_prefix(table_prefix),
+  m_id_counter(0)
+{
+}
+
+bool NdbInfo::init(void)
+{
+  if (pthread_mutex_init(&m_mutex, MY_MUTEX_INIT_FAST))
+    return false;
+  if (!load_hardcoded_tables())
+    return false;
+  return true;
+}
+
+NdbInfo::~NdbInfo(void)
+{
+  pthread_mutex_destroy(&m_mutex);
+}
+
+BaseString NdbInfo::mysql_table_name(const char* table_name) const
+{
+  DBUG_ENTER("mysql_table_name");
+  BaseString mysql_name;
+  mysql_name.assfmt("%s%s", m_prefix.c_str(), table_name);
+  DBUG_PRINT("exit", ("mysql_name: %s", mysql_name.c_str()));
+  DBUG_RETURN(mysql_name);
+}
+
+bool NdbInfo::load_hardcoded_tables(void)
+{
+  {
+    Table tabs("tables", 0);
+    if (!tabs.addColumn(Column("table_id", 0, Column::Number)) ||
+        !tabs.addColumn(Column("table_name", 1, Column::String)) ||
+        !tabs.addColumn(Column("comment", 2, Column::String)))
+      return false;
+
+    BaseString hash_key = mysql_table_name(tabs.getName());
+    if (!m_tables.insert(hash_key.c_str(), tabs))
+      return false;
+    if (!m_tables.search(hash_key.c_str(), &m_tables_table))
+      return false;
+  }
+
+  {
+    Table cols("columns", 1);
+    if (!cols.addColumn(Column("table_id", 0, Column::Number)) ||
+        !cols.addColumn(Column("column_id", 1, Column::Number)) ||
+        !cols.addColumn(Column("column_name", 2, Column::String)) ||
+        !cols.addColumn(Column("column_type", 3, Column::String)) ||
+        !cols.addColumn(Column("comment", 4, Column::String)))
+      return false;
+
+    BaseString hash_key = mysql_table_name(cols.getName());
+    if (!m_tables.insert(hash_key.c_str(), cols))
+      return false;
+    if (!m_tables.search(hash_key.c_str(), &m_columns_table))
+      return false;
+  }
+
+  return true;
+}
+
+bool NdbInfo::addColumn(Uint32 tableId, Column aCol)
+{
+  Table * table = NULL;
+
+  // Find the table with correct id
+  for (size_t i = 0; i < m_tables.entries(); i++)
+  {
+    table = m_tables.value(i);
+    if (table->m_table_id == tableId)
+      break;
+  }
+
+  table->addColumn(aCol);
+
+  return true;
+}
+
+bool NdbInfo::load_ndbinfo_tables(void)
+{
+  DBUG_ENTER("load_ndbinfo_tables");
+  assert(m_tables_table && m_columns_table);
+
+  {
+    // Create tables by scanning the TABLES table
+    NdbInfoScanOperation* scanOp = NULL;
+    if (createScanOperation(m_tables_table, &scanOp) != 0)
+      DBUG_RETURN(false);
+
+    if (scanOp->readTuples() != 0)
+    {
+      releaseScanOperation(scanOp);
+      DBUG_RETURN(false);
+    }
+
+    const NdbInfoRecAttr *tableIdRes = scanOp->getValue("table_id");
+    const NdbInfoRecAttr *tableNameRes = scanOp->getValue("table_name");
+    if (!tableIdRes || !tableNameRes)
+    {
+      releaseScanOperation(scanOp);
+      DBUG_RETURN(false);
+    }
+
+    if (scanOp->execute() != 0)
+    {
+      releaseScanOperation(scanOp);
+      DBUG_RETURN(false);
+    }
+
+    int err;
+    while ((err = scanOp->nextResult()) == 1)
+    {
+      Uint32 tableId = tableIdRes->u_32_value();
+      const char * tableName = tableNameRes->c_str();
+      DBUG_PRINT("info", ("table: '%s', id: %u",
+                 tableName, tableId));
+      switch (tableId) {
+      case 0:
+        assert(strcmp(tableName, "tables") == 0);
+        break;
+      case 1:
+        assert(strcmp(tableName, "columns") == 0);
+        break;
+
+      default:
+        BaseString hash_key = mysql_table_name(tableName);
+        if (!m_tables.insert(hash_key.c_str(),
+                             Table(tableName, tableId)))
+        {
+          DBUG_PRINT("error", ("Failed to insert Table('%s', %u)",
+                     tableName, tableId));
+          releaseScanOperation(scanOp);
+          DBUG_RETURN(false);
+        }
+      }
+    }
+    releaseScanOperation(scanOp);
+
+   if (err != 0)
+      DBUG_RETURN(false);
+  }
+
+  {
+    // Fill tables with columns by scanning the COLUMNS table
+    NdbInfoScanOperation* scanOp = NULL;
+    if (createScanOperation(m_columns_table, &scanOp) != 0)
+      DBUG_RETURN(false);
+
+    if (scanOp->readTuples() != 0)
+    {
+      releaseScanOperation(scanOp);
+      DBUG_RETURN(false);
+    }
+
+    const NdbInfoRecAttr *tableIdRes = scanOp->getValue("table_id");
+    const NdbInfoRecAttr *columnIdRes = scanOp->getValue("column_id");
+    const NdbInfoRecAttr *columnNameRes = scanOp->getValue("column_name");
+    const NdbInfoRecAttr *columnTypeRes = scanOp->getValue("column_type");
+    if (!tableIdRes || !columnIdRes || !columnNameRes || !columnTypeRes)
+    {
+      releaseScanOperation(scanOp);
+      DBUG_RETURN(false);
+    }
+    if (scanOp->execute() != 0)
+    {
+      releaseScanOperation(scanOp);
+      DBUG_RETURN(false);
+    }
+
+    int err;
+    while ((err = scanOp->nextResult()) == 1)
+    {
+      Uint32 tableId = tableIdRes->u_32_value();
+      Uint32 columnId = columnIdRes->u_32_value();
+      const char * columnName = columnNameRes->c_str();
+      Uint32 columnType = columnTypeRes->u_32_value();
+      DBUG_PRINT("info",
+                 ("tableId: %u, columnId: %u, column: '%s', type: %d",
+                 tableId, columnId, columnName, columnType));
+      switch (tableId) {
+      case 0:
+      case 1:
+        // Ignore columns for TABLES and COLUMNS tables since
+        // those are already known(hardcoded)
+        break;
+
+      default:
+      {
+        Column::Type type;
+        switch(columnType)
+        {
+        case 1:
+          type = Column::String;
+          break;
+        case 2:
+          type = Column::Number;
+          break;
+        case 3:
+          type = Column::Number64;
+          break;
+        default:
+        {
+          DBUG_PRINT("error", ("Unknown columntype: %d", columnType));
+          releaseScanOperation(scanOp);
+          DBUG_RETURN(false);
+        }
+        }
+
+        Column column(columnName, columnId, type);
+
+        // Find the table with given id
+
+        if (!addColumn(tableId, column))
+        {
+          DBUG_PRINT("error", ("Failed to add column for %d, %d, '%s', %d)",
+                     tableId, columnId, columnName, columnType));
+          releaseScanOperation(scanOp);
+          DBUG_RETURN(false);
+        }
+        break;
+      }
+      }
+    }
+    releaseScanOperation(scanOp);
+
+    if (err != 0)
+      DBUG_RETURN(false);
+  }
+  DBUG_RETURN(true);
+}
+
+bool NdbInfo::load_tables()
+{
+  if (!load_ndbinfo_tables())
+  {
+    // Remove any dynamic tables that might have been partially created
+    flush_tables();
+    return false;
+  }
+
+  // After sucessfull load of the tables, set connect count
+  m_connect_count = m_connection->get_connect_count();
+  return true;
+}
+
+int NdbInfo::createScanOperation(const Table* table,
+                                 NdbInfoScanOperation** ret_scan_op,
+                                 Uint32 max_rows, Uint32 max_bytes)
+{
+  NdbInfoScanOperation* scan_op = new NdbInfoScanOperation(*this, m_connection,
+                                                           table, max_rows,
+                                                           max_bytes);
+  if (!scan_op)
+    return ERR_OutOfMemory;
+  // Global id counter, not critical if you get same id on two instances
+  // since reference is also part of the unique identifier.
+  if (!scan_op->init(m_id_counter++))
+  {
+    delete scan_op;
+    return ERR_ClusterFailure;
+  }
+
+  *ret_scan_op = scan_op;
+
+  return 0;
+}
+
+void NdbInfo::releaseScanOperation(NdbInfoScanOperation* scan_op) const
+{
+  delete scan_op;
+}
+
+void NdbInfo::flush_tables()
+{
+  // Delete all but the hardcoded tables
+  for (size_t i = NUM_HARDCODED_TABLES; i < m_tables.entries(); i++)
+    m_tables.remove(i);
+  assert(m_tables.entries() == NUM_HARDCODED_TABLES);
+}
+
+bool
+NdbInfo::check_tables()
+{
+  if (m_connection->get_connect_count() != m_connect_count)
+  {
+    // Connect count has changed -> flush the cached table definitions
+    flush_tables();
+  }
+  if (m_tables.entries() <= NUM_HARDCODED_TABLES)
+  {
+    // Global table cache is not loaded yet or has been
+    // flushed, try to load it
+    if (!load_tables())
+    {
+      return false;
+    }
+  }
+  // Make sure that some dynamic tables have been loaded
+  assert(m_tables.entries() > NUM_HARDCODED_TABLES);
+  return true;
+}
+
+
+int
+NdbInfo::openTable(const char* table_name,
+                   const NdbInfo::Table** table_copy)
+{
+  pthread_mutex_lock(&m_mutex);
+
+  if (!check_tables()){
+    pthread_mutex_unlock(&m_mutex);
+    return ERR_ClusterFailure;
+  }
+
+  Table* tab;
+  if (!m_tables.search(table_name, &tab))
+  {
+    // No such table existed
+    pthread_mutex_unlock(&m_mutex);
+    return ERR_NoSuchTable;
+  }
+
+  // Return a _copy_ of the table
+  *table_copy = new Table(*tab);
+
+  pthread_mutex_unlock(&m_mutex);
+  return 0;
+}
+
+int
+NdbInfo::openTable(Uint32 tableId,
+                   const NdbInfo::Table** table_copy)
+{
+  pthread_mutex_lock(&m_mutex);
+
+  if (!check_tables()){
+    pthread_mutex_unlock(&m_mutex);
+    return ERR_ClusterFailure;
+  }
+
+  // Find the table with correct id
+  const Table* table = NULL;
+  for (size_t i = 0; i < m_tables.entries(); i++)
+  {
+    const Table* tmp = m_tables.value(i);
+    if (tmp->m_table_id == tableId)
+    {
+      table = tmp;
+      break;
+    }
+  }
+  if (table == NULL)
+  {
+    // No such table existed
+    pthread_mutex_unlock(&m_mutex);
+    return ERR_NoSuchTable;
+  }
+
+  // Return a _copy_ of the table
+  *table_copy = new Table(*table);
+
+  pthread_mutex_unlock(&m_mutex);
+  return 0;
+}
+
+void NdbInfo::closeTable(const Table* table) {
+  delete table;
+}
+
+
+// Column
+
+NdbInfo::Column::Column(const char* name, Uint32 col_id,
+                                NdbInfo::Column::Type type) :
+  m_type(type),
+  m_column_id(col_id),
+  m_name(name)
+{
+}
+
+NdbInfo::Column::Column(const NdbInfo::Column & col)
+{
+  m_column_id = col.m_column_id;
+  m_name.assign(col.m_name);
+  m_type = col.m_type;
+}
+
+NdbInfo::Column &
+NdbInfo::Column::operator=(const NdbInfo::Column & col)
+{
+  m_column_id = col.m_column_id;
+  m_name.assign(col.m_name);
+  m_type = col.m_type;
+  return *this;
+}
+
+
+// Table
+
+NdbInfo::Table::Table(const char *name, Uint32 id) :
+  m_name(name),
+  m_table_id(id)
+{
+};
+
+NdbInfo::Table::Table(const NdbInfo::Table& tab)
+{
+  DBUG_ENTER("Table(const Table&");
+  m_table_id = tab.m_table_id;
+  m_name.assign(tab.m_name);
+  for (unsigned i = 0; i < tab.m_columns.size(); i++)
+    addColumn(*tab.m_columns[i]);
+  DBUG_VOID_RETURN;
+}
+
+const NdbInfo::Table &
+NdbInfo::Table::operator=(const NdbInfo::Table& tab)
+{
+  DBUG_ENTER("Table::operator=");
+  m_table_id = tab.m_table_id;
+  m_name.assign(tab.m_name);
+  for (unsigned i = 0; i < tab.m_columns.size(); i++)
+    addColumn(*tab.m_columns[i]);
+  DBUG_RETURN(*this);
+}
+
+NdbInfo::Table::~Table()
+{
+  for (unsigned i = 0; i < m_columns.size(); i++)
+    delete m_columns[i];
+};
+
+const char * NdbInfo::Table::getName() const
+{
+  return m_name.c_str();
+}
+
+Uint32 NdbInfo::Table::getTableId() const
+{
+  return m_table_id;
+}
+
+bool NdbInfo::Table::addColumn(const NdbInfo::Column aCol)
+{
+  NdbInfo::Column* col = new NdbInfo::Column(aCol);
+  if (col == NULL)
+  {
+    errno = ENOMEM;
+    return false;
+  }
+
+  if (m_columns.push_back(col))
+  {
+    delete col;
+    return false;
+  }
+  return true;
+}
+
+unsigned NdbInfo::Table::columns(void) const {
+  return m_columns.size();
+}
+
+const NdbInfo::Column*
+NdbInfo::Table::getColumn(const unsigned attributeId) const
+{
+  return (attributeId < m_columns.size()) ?
+    m_columns[attributeId]
+    : NULL;
+}
+
+const NdbInfo::Column* NdbInfo::Table::getColumn(const char * name) const
+{
+  DBUG_ENTER("Column::getColumn");
+  DBUG_PRINT("info", ("columns: %d", m_columns.size()));
+  const NdbInfo::Column* column = NULL;
+  for (uint i = 0; i < m_columns.size(); i++)
+  {
+    DBUG_PRINT("info", ("col: %d %s", i, m_columns[i]->m_name.c_str()));
+    if (strcmp(m_columns[i]->m_name.c_str(), name) == 0)
+    {
+      column = m_columns[i];
+      break;
+    }
+  }
+  DBUG_RETURN(column);
+}
+
+
+template class Vector<NdbInfo::Column*>;
+

=== added file 'storage/ndb/src/ndbapi/NdbInfo.hpp'
--- a/storage/ndb/src/ndbapi/NdbInfo.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbInfo.hpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,129 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#ifndef NDBINFO_HPP
+#define NDBINFO_HPP
+
+#include <ndb_global.h>
+#include <ndb_types.h>
+
+#include <util/Vector.hpp>
+#include <util/BaseString.hpp>
+#include <util/HashMap.hpp>
+
+class NdbInfo
+{
+public:
+
+  enum Error
+  {
+    ERR_NoError = 0,
+    ERR_NoSuchTable = 40,
+    ERR_OutOfMemory = 41,
+    ERR_ClusterFailure = 42,
+    ERR_WrongState = 43
+  };
+
+  struct Column
+  {
+  public:
+
+    enum Type
+    {
+      String = 1,
+      Number = 2,
+      Number64 = 3
+    } m_type;
+
+    Uint32 m_column_id;
+    BaseString m_name;
+
+    Column(const char* name, Uint32 col_id, Type type);
+    Column(const Column & col);
+    Column & operator=(const Column & col);
+  };
+
+  class Table
+  {
+  public:
+
+    Table(const char *name, Uint32 id);
+    Table(const Table& tab);
+    const Table & operator=(const Table& tab);
+    ~Table();
+
+    const char * getName() const;
+    Uint32 getTableId() const;
+
+    bool addColumn(const Column aCol);
+    unsigned columns(void) const;
+    const Column* getColumn(const unsigned attributeId) const;
+    const Column* getColumn(const char * name) const;
+
+  private:
+    friend class NdbInfo;
+    BaseString m_name;
+    Uint32 m_table_id;
+    Vector<Column*> m_columns;
+  };
+
+  NdbInfo(class Ndb_cluster_connection* connection,
+          const char* prefix, const char* dbname = "",
+          const char* table_prefix = "");
+  bool init(void);
+  ~NdbInfo();
+
+  void flush_tables();
+
+  int openTable(const char* table_name, const Table**);
+  int openTable(Uint32 tableId, const Table**);
+  void closeTable(const Table* table);
+
+  int createScanOperation(const Table*,
+                          class NdbInfoScanOperation**,
+                          Uint32 max_rows = 256, Uint32 max_bytes = 0);
+  void releaseScanOperation(class NdbInfoScanOperation*) const;
+
+private:
+  static const size_t NUM_HARDCODED_TABLES = 2;
+  unsigned m_connect_count;
+  class Ndb_cluster_connection* m_connection;
+  pthread_mutex_t m_mutex;
+  HashMap<BaseString, Table, BaseString_get_key> m_tables;
+  Table* m_tables_table;
+  Table* m_columns_table;
+  BaseString m_prefix;
+  BaseString m_dbname;
+  BaseString m_table_prefix;
+  Uint32 m_id_counter;
+
+  bool addColumn(Uint32 tableId, const Column aCol);
+
+  bool load_ndbinfo_tables();
+  bool load_hardcoded_tables(void);
+  bool load_tables();
+  bool check_tables();
+
+  BaseString mysql_table_name(const char* table_name) const;
+
+};
+
+#include "NdbInfoScanOperation.hpp"
+#include "NdbInfoRecAttr.hpp"
+
+#endif

=== added file 'storage/ndb/src/ndbapi/NdbInfoRecAttr.hpp'
--- a/storage/ndb/src/ndbapi/NdbInfoRecAttr.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbInfoRecAttr.hpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,52 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#ifndef NdbInfoRecAttr_H
+#define NdbInfoRecAttr_H
+
+class NdbInfoRecAttr {
+public:
+  Uint32 u_32_value() const {
+    assert(m_len == sizeof(Uint32));
+    return *((Uint32 *) m_data);
+  }
+
+  Uint64 u_64_value() const {
+    assert(m_len == sizeof(Uint64));
+    return *((Uint64 *) m_data);
+  }
+
+  const char* c_str() const {
+    assert(m_len > 0);
+    return m_data;
+  }
+
+  Uint32 length() const {
+    return m_len;
+  }
+
+protected:
+  friend class NdbInfoScanOperation;
+  NdbInfoRecAttr() : m_data(NULL), m_len(0) {};
+  ~NdbInfoRecAttr() {};
+private:
+  const char* m_data;
+  Uint32 m_len;
+};
+
+#endif

=== added file 'storage/ndb/src/ndbapi/NdbInfoScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbInfoScanOperation.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbInfoScanOperation.cpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,411 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include "NdbInfo.hpp"
+#include "SignalSender.hpp"
+
+#include <AttributeHeader.hpp>
+#include <signaldata/DbinfoScan.hpp>
+#include <signaldata/TransIdAI.hpp>
+#include <signaldata/NFCompleteRep.hpp>
+
+#define CAST_PTR(X,Y) static_cast<X*>(static_cast<void*>(Y))
+#define CAST_CONSTPTR(X,Y) static_cast<const X*>(static_cast<const void*>(Y))
+
+NdbInfoScanOperation::NdbInfoScanOperation(const NdbInfo& info,
+                                           Ndb_cluster_connection* connection,
+                                           const NdbInfo::Table* table,
+                                           Uint32 max_rows, Uint32 max_bytes) :
+  m_info(info),
+  m_state(Undefined),
+  m_connection(connection),
+  m_signal_sender(NULL),
+  m_table(table),
+  m_node_id(0),
+  m_max_rows(max_rows),
+  m_max_bytes(max_bytes),
+  m_result_data(0x37),
+  m_received_rows(0)
+{
+}
+
+bool
+NdbInfoScanOperation::init(Uint32 id)
+{
+  DBUG_ENTER("NdbInfoScanoperation::init");
+  if (m_state != Undefined)
+    DBUG_RETURN(false);
+
+  m_signal_sender = new SignalSender(m_connection);
+  if (!m_signal_sender)
+    DBUG_RETURN(false);
+
+  m_transid0 = id;
+  m_transid1 = m_table->getTableId();
+  m_result_ref = m_signal_sender->getOwnRef();
+
+  for (unsigned i = 0; i < m_table->columns(); i++)
+    m_recAttrs.push_back(NULL);
+
+  m_state = Initial;
+  DBUG_RETURN(true);
+
+}
+
+NdbInfoScanOperation::~NdbInfoScanOperation()
+{
+  close();
+  delete m_signal_sender;
+}
+
+int
+NdbInfoScanOperation::readTuples()
+{
+  if (m_state != Initial)
+    return NdbInfo::ERR_WrongState;
+
+  m_state = Prepared;
+  return 0;
+}
+
+const NdbInfoRecAttr *
+NdbInfoScanOperation::getValue(const char * anAttrName)
+{
+  if (m_state != Prepared)
+    return NULL;
+
+  const NdbInfo::Column* column = m_table->getColumn(anAttrName);
+  if (!column)
+    return NULL;
+  return getValue(column->m_column_id);
+}
+
+const NdbInfoRecAttr *
+NdbInfoScanOperation::getValue(Uint32 anAttrId)
+{
+  if (m_state != Prepared)
+    return NULL;
+
+  if (anAttrId >= m_recAttrs.size())
+    return NULL;
+
+  NdbInfoRecAttr *recAttr = new NdbInfoRecAttr;
+  m_recAttrs[anAttrId] = recAttr;
+  return recAttr;
+}
+
+int NdbInfoScanOperation::execute()
+{
+  DBUG_ENTER("NdbInfoScanOperation::execute");
+  DBUG_PRINT("info", ("name: '%s', id: %d",
+             m_table->getName(), m_table->getTableId()));
+
+  if (m_state != Prepared)
+    DBUG_RETURN(-1);
+
+  assert(m_cursor.size() == 0);
+
+  m_signal_sender->lock();
+  int ret = sendDBINFO_SCANREQ();
+  m_signal_sender->unlock();
+
+  m_state = MoreData;
+
+  DBUG_RETURN(ret);
+}
+
+int
+NdbInfoScanOperation::sendDBINFO_SCANREQ(void)
+{
+  DBUG_ENTER("NdbInfoScanOperation::sendDBINFO_SCANREQ");
+
+  if (m_node_id == 0)
+  {
+    m_node_id = m_signal_sender->get_an_alive_node();
+    if(m_node_id == 0)
+      DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
+  }
+
+  SimpleSignal ss;
+  DbinfoScanReq * req = CAST_PTR(DbinfoScanReq, ss.getDataPtrSend());
+
+  // API Identifiers
+  req->resultData = m_result_data;
+  req->transId[0] = m_transid0;
+  req->transId[1] = m_transid1;
+  req->resultRef = m_result_ref;
+
+  // Scan parameters
+  req->tableId = m_table->getTableId();
+  req->colBitmap[0] = ~0;
+  req->colBitmap[1] = ~0;
+  req->requestInfo = 0;
+  req->maxRows = m_max_rows;
+  req->maxBytes = m_max_bytes;
+
+  // Scan result
+  req->returnedRows = 0;
+
+  // Cursor data
+  Uint32* cursor_ptr = DbinfoScan::getCursorPtrSend(req);
+  for (unsigned i = 0; i < m_cursor.size(); i++)
+  {
+    *cursor_ptr = m_cursor[i];
+    DBUG_PRINT("info", ("cursor[%u]: 0x%x", i, m_cursor[i]));
+    cursor_ptr++;
+  }
+  req->cursor_sz = m_cursor.size();
+  m_cursor.clear();
+
+  assert(m_node_id);
+  Uint32 len = DbinfoScanReq::SignalLength + req->cursor_sz;
+  if (m_signal_sender->sendSignal(m_node_id, ss, DBINFO,
+                                  GSN_DBINFO_SCANREQ, len) != SEND_OK)
+    DBUG_RETURN(NdbInfo::ERR_ClusterFailure);
+
+  DBUG_RETURN(0);
+}
+
+int NdbInfoScanOperation::receive(void)
+{
+  DBUG_ENTER("NdbInfoScanOperation::receive");
+  while (true)
+  {
+    const SimpleSignal* sig = m_signal_sender->waitFor();
+    if (!sig)
+      DBUG_RETURN(-1);
+    //sig->print();
+
+    int sig_number = sig->readSignalNumber();
+    switch (sig_number) {
+
+    case GSN_DBINFO_TRANSID_AI:
+    {
+      int ret = execDBINFO_TRANSID_AI(sig);
+      if (ret == 0)
+        continue;
+      DBUG_RETURN(ret); // More data
+      break;
+    }
+
+    case GSN_DBINFO_SCANCONF:
+    {
+      int ret = execDBINFO_SCANCONF(sig);
+      if (ret > 0)
+        continue; // Wait for more data
+      DBUG_RETURN(ret);
+      break;
+    }
+
+    case GSN_DBINFO_SCANREF:
+    {
+      int ret = execDBINFO_SCANREF(sig);
+      if (ret == 0)
+        continue;
+      DBUG_RETURN(ret);
+      break;
+    }
+
+    case GSN_NODE_FAILREP:
+      // Ignore and wait for NF_COMPLETEREP
+      break;
+
+    case GSN_NF_COMPLETEREP:
+      DBUG_RETURN(-3);
+      break;
+
+    case GSN_SUB_GCP_COMPLETE_REP:
+    case GSN_API_REGCONF:
+    case GSN_TAKE_OVERTCCONF:
+      // ignore
+      break;
+
+    default:
+      DBUG_PRINT("error", ("Got unexpected signal: %d", sig_number));
+      assert(false);
+      break;
+    }
+  }
+  assert(false); // Should never come here
+  DBUG_RETURN(-1);
+}
+
+int
+NdbInfoScanOperation::nextResult()
+{
+  DBUG_ENTER("NdbInfoScanOperation::nextResult");
+
+  switch(m_state)
+  {
+  case MoreData:
+  {
+    m_signal_sender->lock();
+    int ret = receive();
+    m_signal_sender->unlock();
+    DBUG_RETURN(ret);
+    break;
+  }
+  case End:
+    DBUG_RETURN(0); // EOF
+    break;
+  default:
+    DBUG_RETURN(-1);
+    break;
+  }
+}
+
+void
+NdbInfoScanOperation::close()
+{
+  DBUG_ENTER("NdbInfoScanOperation::close");
+
+  for (unsigned i = 0; i < m_recAttrs.size(); i++)
+  {
+    if (m_recAttrs[i])
+    {
+      delete m_recAttrs[i];
+      m_recAttrs[i] = NULL;
+    }
+  }
+
+  DBUG_VOID_RETURN;
+}
+
+int
+NdbInfoScanOperation::execDBINFO_TRANSID_AI(const SimpleSignal * signal)
+{
+  DBUG_ENTER("NdbInfoScanOperation::execDBINFO_TRANSID_AI");
+  const TransIdAI* transid =
+          CAST_CONSTPTR(TransIdAI, signal->getDataPtr());
+  if (transid->connectPtr != m_result_data ||
+      transid->transId[0] != m_transid0 ||
+      transid->transId[1] != m_transid1)
+  {
+    // Drop signal that belongs to previous scan
+    DBUG_RETURN(0);
+  }
+  m_received_rows++;
+
+  const Uint32* start = signal->ptr[0].p;
+  const Uint32* end = start + signal->ptr[0].sz;
+
+  DBUG_PRINT("info", ("start: %p, end: %p", start, end));
+  for (unsigned col = 0; col < m_table->columns(); col++)
+  {
+
+    // Read attribute header
+    const AttributeHeader ah(*start);
+    const Uint32 len = ah.getByteSize();
+    DBUG_PRINT("info", ("col: %u, len: %u", col, len));
+
+    // Step past attribute header
+    start += ah.getHeaderSize();
+
+    NdbInfoRecAttr* attr = m_recAttrs[col];
+    if (attr)
+    {
+      // Update NdbInfoRecAttr pointer and length
+      attr->m_data = (const char*)start;
+      attr->m_len = len;
+    }
+
+    // Step to next attribute header
+    start += ah.getDataSize();
+
+    // No reading beyond end of signal size
+    assert(start <= end);
+  }
+  DBUG_RETURN(1);
+}
+
+int
+NdbInfoScanOperation::execDBINFO_SCANCONF(const SimpleSignal * sig)
+{
+  DBUG_ENTER("NdbInfoScanOperation::execDBINFO_SCANCONF");
+  const DbinfoScanConf* conf =
+          CAST_CONSTPTR(DbinfoScanConf, sig->getDataPtr());
+
+  if (conf->resultData != m_result_data ||
+      conf->transId[0] != m_transid0 ||
+      conf->transId[1] != m_transid1 ||
+      conf->resultRef != m_result_ref)
+  {
+    // Drop signal that belongs to previous scan
+    DBUG_RETURN(1); // Continue waiting
+  }
+  const Uint32 tableId = conf->tableId;
+  assert(tableId == m_table->getTableId());
+
+  // Assert all scan settings is unchanged
+  assert(conf->colBitmap[0] == (Uint32)~0);
+  assert(conf->colBitmap[1] == (Uint32)~0);
+  assert(conf->requestInfo == 0);
+  assert(conf->maxRows == m_max_rows);
+  assert(conf->maxBytes == m_max_bytes);
+
+  // Save cursor data
+  assert(m_cursor.size() == 0);
+  const Uint32* cursor_ptr = DbinfoScan::getCursorPtr(conf);
+  for (unsigned i = 0; i < conf->cursor_sz; i++)
+  {
+    m_cursor.push_back(*cursor_ptr);
+    DBUG_PRINT("info", ("cursor[%u]: 0x%x", i, m_cursor[i]));
+    cursor_ptr++;
+  }
+  assert(conf->cursor_sz == m_cursor.size());
+
+  if (conf->cursor_sz)
+  {
+    DBUG_PRINT("info", ("Request more data"));
+    int err = sendDBINFO_SCANREQ();
+    if (err != 0)
+    {
+      DBUG_PRINT("info", ("Failed to reuqest more data"));
+      m_state = Error;
+      DBUG_RETURN(err);
+    }
+
+    m_state = MoreData;
+    DBUG_RETURN(1);
+  }
+
+  m_state = End;
+  DBUG_RETURN(0); // EOF
+}
+
+int
+NdbInfoScanOperation::execDBINFO_SCANREF(const SimpleSignal * signal)
+{
+  DBUG_ENTER("NdbInfoScanOperation::execDBINFO_SCANREF");
+  const DbinfoScanRef* ref =
+          CAST_CONSTPTR(DbinfoScanRef, signal->getDataPtr());
+
+  if (ref->resultData != m_result_data ||
+      ref->transId[0] != m_transid0 ||
+      ref->transId[1] != m_transid1 ||
+      ref->resultRef != m_result_ref)
+  {
+    // Drop signal that belongs to previous scan
+    DBUG_RETURN(0); // Continue waiting
+  }
+
+  m_state = Error;
+  DBUG_RETURN(ref->errorCode);
+}
+
+template class Vector<NdbInfoRecAttr*>;

=== added file 'storage/ndb/src/ndbapi/NdbInfoScanOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbInfoScanOperation.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbInfoScanOperation.hpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,65 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+#ifndef NDBINFOSCANOPERATION_H
+#define NDBINFOSCANOPERATION_H
+
+class NdbInfoScanOperation {
+public:
+  int readTuples();
+  const class NdbInfoRecAttr* getValue(const char * anAttrName);
+  const class NdbInfoRecAttr* getValue(Uint32 anAttrId);
+  int execute();
+  int nextResult();
+protected:
+  friend class NdbInfo;
+  NdbInfoScanOperation(const NdbInfo&,
+                       class Ndb_cluster_connection*,
+                       const NdbInfo::Table*,
+                       Uint32 max_rows, Uint32 max_bytes);
+  bool init(Uint32 id);
+  ~NdbInfoScanOperation();
+  void close();
+private:
+  int execDBINFO_TRANSID_AI(const struct SimpleSignal * signal);
+  int execDBINFO_SCANCONF(const struct SimpleSignal * signal);
+  int execDBINFO_SCANREF(const struct SimpleSignal * signal);
+  int sendDBINFO_SCANREQ();
+
+  int receive(void);
+
+  const NdbInfo& m_info;
+  enum State { Undefined, Initial, Prepared,
+               MoreData, End, Error } m_state;
+  class Ndb_cluster_connection* m_connection;
+  class SignalSender*           m_signal_sender;
+  const NdbInfo::Table*     m_table;
+  Vector<NdbInfoRecAttr*>       m_recAttrs;
+  Vector<Uint32>                m_cursor;
+  Uint32 m_node_id;
+  Uint32 m_transid0;
+  Uint32 m_transid1;
+  Uint32 m_result_ref;
+  Uint32 m_max_rows;
+  Uint32 m_max_bytes;
+  Uint32 m_result_data;
+  Uint32 m_received_rows;
+};
+
+
+#endif

=== modified file 'storage/ndb/test/ndbapi/CMakeLists.txt'
--- a/storage/ndb/test/ndbapi/CMakeLists.txt	2008-11-21 08:04:43 +0000
+++ b/storage/ndb/test/ndbapi/CMakeLists.txt	2009-11-08 12:52:27 +0000
@@ -65,7 +65,7 @@ ADD_EXECUTABLE(DbAsyncGenerator bench/ma
 #ADD_EXECUTABLE(test_event_multi_table test_event_multi_table.cpp)
 ADD_EXECUTABLE(testSRBank testSRBank.cpp)
 ADD_EXECUTABLE(test_event_merge test_event_merge.cpp)
-
+Add_EXECUTABLE(testNdbinfo testNdbinfo.cpp)
 ##testDict_INCLUDES = $(INCLUDES) -I$(top_srcdir)/ndb/include/kernel
 ##testIndex_INCLUDES = $(INCLUDES) -I$(top_srcdir)/ndb/include/kernel
 ##testSystemRestart_INCLUDES = $(INCLUDES) -I$(top_srcdir)/ndb/include/kernel

=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am	2009-09-16 12:53:49 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am	2009-11-08 12:52:27 +0000
@@ -63,6 +63,7 @@ ndbapi_50compat0 \
 ndbapi_50compat1 \
 testNDBT \
 testReconnect \
+testNdbinfo \
 NdbRepStress \
 msa
 
@@ -128,6 +129,7 @@ testNDBT_SOURCES = testNDBT.cpp
 testNDBT_LDADD = $(LDADD) $(top_srcdir)/libmysql_r/libmysqlclient_r.la
 testReconnect_SOURCES = testReconnect.cpp
 testReconnect_LDADD = $(LDADD) $(top_srcdir)/libmysql_r/libmysqlclient_r.la
+testNdbinfo_SOURCES = testNdbinfo.cpp
 testBitfield_SOURCES = testBitfield.cpp
 NdbRepStress_SOURCES = acrt/NdbRepStress.cpp
 DbCreate_SOURCES = bench/mainPopulate.cpp bench/dbPopulate.cpp bench/userInterface.cpp bench/dbPopulate.h bench/userInterface.h bench/testData.h bench/testDefinitions.h bench/ndb_schema.hpp bench/ndb_error.hpp

=== modified file 'storage/ndb/test/ndbapi/testMgm.cpp'
--- a/storage/ndb/test/ndbapi/testMgm.cpp	2009-10-27 12:52:57 +0000
+++ b/storage/ndb/test/ndbapi/testMgm.cpp	2009-11-08 12:52:27 +0000
@@ -704,154 +704,6 @@ int runGetConfigUntilStopped(NDBT_Contex
 }
 
 
-
-int getMgmLogInfo(NdbMgmHandle h, off_t *current_size, off_t *max_size)
-{
-  int r, ncol;
-  char rowbuf[1024];
-  char **cols;
-  int current_size_colnum= 0;
-  int max_size_colnum= 0;
-
-  int rows;
-  r= ndb_mgm_ndbinfo(h,"LOGDESTINATION", &rows);
-
-  ncol= ndb_mgm_ndbinfo_colcount(h);
-
-  cols= (char**)malloc(ncol*sizeof(char*));
-  for(int i=0;i<ncol;i++)
-    cols[i]= (char*) malloc(100*sizeof(char));
-
-  ndb_mgm_ndbinfo_getcolums(h,ncol,100,cols);
-
-  for(int i=0;i<ncol;i++)
-  {
-    if(strcmp(cols[i],"CURRENT_SIZE")==0)
-      current_size_colnum= i;
-    if(strcmp(cols[i],"MAX_SIZE")==0)
-      max_size_colnum= i;
-    free(cols[i]);
-  }
-  ndbout << endl;
-
-  while(r--)
-  {
-    ndb_mgm_ndbinfo_getrow(h, rowbuf, sizeof(rowbuf));
-    char *col= rowbuf;
-    for(int i=0;i<ncol;i++)
-    {
-      int len;
-      col= ndb_mgm_ndbinfo_nextcolumn(col, &len);
-      if(!col)
-        break;
-      if(col[len]=='\'')
-        col[len++]='\0';
-      col[len]='\0';
-      if(i==current_size_colnum)
-        *current_size= (off_t)strtoll(col,NULL,10);
-      if(i==max_size_colnum)
-        *max_size= (off_t)strtoll(col,NULL,10);
-      col= &col[len+1];
-    }
-
-  }
-
-  ndbout_c("CURRENT SIZE = %lu",*current_size);
-  ndbout_c("MAX SIZE = %lu",*max_size);
-
-  free(cols);
-
-  return 0;
-}
-
-int runTestMgmLogRotation(NDBT_Context* ctx, NDBT_Step* step)
-{
-  NdbMgmd mgmd;
-  const char *mgm= mgmd.getConnectString();
-  int result= NDBT_FAILED;
-  int mgmid= 0;
-  off_t current_size= 0, max_size= 0;
-  int i,j;
-
-  NdbMgmHandle h= NULL,h1,h2,h3,h4;
-  h= ndb_mgm_create_handle();
-  ndb_mgm_set_connectstring(h, mgm);
-
-  ndb_mgm_connect(h,0,0,0);
-
-  h1= ndb_mgm_create_handle();
-  ndb_mgm_set_connectstring(h1, mgm);
-  ndb_mgm_connect(h1,0,0,0);
-
-  h2= ndb_mgm_create_handle();
-  ndb_mgm_set_connectstring(h2, mgm);
-  ndb_mgm_connect(h2,0,0,0);
-
-  h3= ndb_mgm_create_handle();
-  ndb_mgm_set_connectstring(h3, mgm);
-  ndb_mgm_connect(h3,0,0,0);
-
-  h4= ndb_mgm_create_handle();
-  ndb_mgm_set_connectstring(h4, mgm);
-  ndb_mgm_connect(h4,0,0,0);
-
-
-  if(ndb_mgm_check_connection(h) < 0)
-  {
-    result= NDBT_FAILED;
-    goto done;
-  }
-
-  mgmid= ndb_mgm_get_mgmd_nodeid(h);
-
-  ndbout_c("Connected to MGM server at NodeID: %d",mgmid);
-
-  if(getMgmLogInfo(h, &current_size, &max_size))
-  {
-    result= NDBT_FAILED;
-    goto done;
-  }
-
-  for(i=0;i<max_size/4;i++)
-  {
-        Uint32 theData[25];
-        memset(theData,0,sizeof(theData));
-        EventReport *fake_event = (EventReport*)theData;
-
-        for(j=0;j<100;j++)
-        {
-          fake_event->setEventType((Ndb_logevent_type)j);
-          fake_event->setNodeId(j+100);
-          ndb_mgm_report_event(h, theData, 6);
-          fake_event->setNodeId(j+200);
-          ndb_mgm_report_event(h1, theData, 6);
-          fake_event->setNodeId(j+300);
-          ndb_mgm_report_event(h2, theData, 6);
-          fake_event->setNodeId(j+400);
-          ndb_mgm_report_event(h3, theData, 6);
-          fake_event->setNodeId(j+500);
-          ndb_mgm_report_event(h4, theData, 6);
-
-        }
-        off_t c,m;
-        getMgmLogInfo(h, &c, &m);
-        if(c < current_size)
-        {
-          result= NDBT_OK;
-          break;
-        }
-  }
-
-  return result;
-
-done:
-  if(h)
-    ndb_mgm_destroy_handle(&h);
-
-  return result;
-}
-
-
 int runTestStatus(NDBT_Context* ctx, NDBT_Step* step)
 {
   ndb_mgm_node_type types[2] = {
@@ -2530,13 +2382,6 @@ TESTCASE("TestSetConfigParallel",
 TESTCASE("GetConfig", "Run ndb_mgm_get_configuration in parallel"){
   STEPS(runGetConfig, 100);
 }
-#if 0
-TESTCASE("MgmLogRotation",
-	 "Test log rotation"){
-  INITIALIZER(runTestMgmLogRotation);
-
-}
-#endif
 TESTCASE("TestStatus",
 	 "Test status and status2"){
   INITIALIZER(runTestStatus);

=== added file 'storage/ndb/test/ndbapi/testNdbinfo.cpp'
--- a/storage/ndb/test/ndbapi/testNdbinfo.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/test/ndbapi/testNdbinfo.cpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,411 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include <NDBT.hpp>
+#include <NDBT_Test.hpp>
+#include "../../src/ndbapi/NdbInfo.hpp"
+
+
+int runTestNdbInfo(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbInfo ndbinfo(&ctx->m_cluster_connection, "ndbinfo/");
+  if (!ndbinfo.init())
+  {
+    g_err << "ndbinfo.init failed" << endl;
+    return NDBT_FAILED;
+  }
+
+  const NdbInfo::Table* table;
+  if (ndbinfo.openTable("ndbinfo/tables", &table) != 0)
+  {
+    g_err << "Failed to openTable(tables)" << endl;
+    return NDBT_FAILED;
+  }
+
+  for (int l = 0; l < ctx->getNumLoops(); l++)
+  {
+
+    NdbInfoScanOperation* scanOp = NULL;
+    if (ndbinfo.createScanOperation(table, &scanOp))
+    {
+      g_err << "No NdbInfoScanOperation" << endl;
+      return NDBT_FAILED;
+    }
+
+    if (scanOp->readTuples() != 0)
+    {
+      g_err << "scanOp->readTuples failed" << endl;
+      return NDBT_FAILED;
+    }
+
+    const NdbInfoRecAttr* tableName = scanOp->getValue("table_name");
+    const NdbInfoRecAttr* comment = scanOp->getValue("comment");
+
+    if(scanOp->execute() != 0)
+    {
+      g_err << "scanOp->execute failed" << endl;
+      return NDBT_FAILED;
+    }
+
+    while(scanOp->nextResult() == 1)
+    {
+      g_info << "NAME: " << tableName->c_str() << endl;
+      g_info << "COMMENT: " << comment->c_str() << endl;
+    }
+    ndbinfo.releaseScanOperation(scanOp);
+  }
+
+  ndbinfo.closeTable(table);
+  return NDBT_OK;
+}
+
+int runScanAll(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbInfo ndbinfo(&ctx->m_cluster_connection, "ndbinfo/");
+  if (!ndbinfo.init())
+  {
+    g_err << "ndbinfo.init failed" << endl;
+    return NDBT_FAILED;
+  }
+
+  Uint32 tableId = 0;
+  while(true) {
+    const NdbInfo::Table* table;
+
+    int err = ndbinfo.openTable(tableId, &table);
+    if (err == NdbInfo::ERR_NoSuchTable)
+    {
+      // No more tables -> return
+      return NDBT_OK;
+    }
+    else if (err != 0)
+    {
+      // Unexpected return code
+      g_err << "Failed to openTable(" << tableId << "), err: " << err << endl;
+      return NDBT_FAILED;
+    }
+    ndbout << "table("<<tableId<<"): " << table->getName() << endl;
+
+    for (int l = 0; l < ctx->getNumLoops(); l++)
+    {
+      NdbInfoScanOperation* scanOp = NULL;
+      if (ndbinfo.createScanOperation(table, &scanOp))
+      {
+        g_err << "No NdbInfoScanOperation" << endl;
+        return NDBT_FAILED;
+      }
+
+      if (scanOp->readTuples() != 0)
+      {
+        g_err << "scanOp->readTuples failed" << endl;
+        return NDBT_FAILED;
+      }
+
+      int columnId = 0;
+      while (scanOp->getValue(columnId))
+        columnId++;
+      // At least one column
+      assert(columnId >= 1);
+
+      if(scanOp->execute() != 0)
+      {
+        g_err << "scanOp->execute failed" << endl;
+        return NDBT_FAILED;
+      }
+
+      int row = 0;
+      while(scanOp->nextResult() == 1)
+        row++;
+      ndbout << "rows: " << row << endl;
+      ndbinfo.releaseScanOperation(scanOp);
+    }
+    ndbinfo.closeTable(table);
+    tableId++;
+  }
+
+  // Should never come here
+  assert(false);
+  return NDBT_FAILED;
+}
+
+int runScanStop(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbInfo ndbinfo(&ctx->m_cluster_connection, "ndbinfo/");
+  if (!ndbinfo.init())
+  {
+    g_err << "ndbinfo.init failed" << endl;
+    return NDBT_FAILED;
+  }
+
+  Uint32 tableId = 0;
+  while(true) {
+    const NdbInfo::Table* table;
+
+    int err = ndbinfo.openTable(tableId, &table);
+    if (err == NdbInfo::ERR_NoSuchTable)
+    {
+      // No more tables -> return
+      return NDBT_OK;
+    }
+    else if (err != 0)
+    {
+      // Unexpected return code
+      g_err << "Failed to openTable(" << tableId << "), err: " << err << endl;
+      return NDBT_FAILED;
+    }
+    ndbout << "table: " << table->getName() << endl;
+
+    for (int l = 0; l < ctx->getNumLoops()*10; l++)
+    {
+      NdbInfoScanOperation* scanOp = NULL;
+      if (ndbinfo.createScanOperation(table, &scanOp))
+      {
+        g_err << "No NdbInfoScanOperation" << endl;
+        return NDBT_FAILED;
+      }
+
+      if (scanOp->readTuples() != 0)
+      {
+        g_err << "scanOp->readTuples failed" << endl;
+        return NDBT_FAILED;
+      }
+
+      int columnId = 0;
+      while (scanOp->getValue(columnId))
+        columnId++;
+      // At least one column
+      assert(columnId >= 1);
+
+      if(scanOp->execute() != 0)
+      {
+        g_err << "scanOp->execute failed" << endl;
+        return NDBT_FAILED;
+      }
+
+      int stopRow = rand() % 100;
+      int row = 0;
+      while(scanOp->nextResult() == 1)
+      {
+        row++;
+        if (row == stopRow)
+        {
+          ndbout_c("Aborting scan at row %d", stopRow);
+          break;
+        }
+      }
+      ndbinfo.releaseScanOperation(scanOp);
+    }
+    ndbinfo.closeTable(table);
+    tableId++;
+  }
+
+  // Should never come here
+  assert(false);
+  return NDBT_FAILED;
+}
+
+
+int runRatelimit(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbInfo ndbinfo(&ctx->m_cluster_connection, "ndbinfo/");
+  if (!ndbinfo.init())
+  {
+    g_err << "ndbinfo.init failed" << endl;
+    return NDBT_FAILED;
+  }
+
+  Uint32 tableId = 0;
+  while(true) {
+
+    const NdbInfo::Table* table;
+
+    int err = ndbinfo.openTable(tableId, &table);
+    if (err == NdbInfo::ERR_NoSuchTable)
+    {
+      // No more tables -> return
+      return NDBT_OK;
+    }
+    else if (err != 0)
+    {
+      // Unexpected return code
+      g_err << "Failed to openTable(" << tableId << "), err: " << err << endl;
+      return NDBT_FAILED;
+    }
+    ndbout << "table: " << table->getName() << endl;
+    
+
+    struct { Uint32 rows; Uint32 bytes; } limits[] = {
+      { 0, 0 },
+      { 1, 0 }, { 2, 0 }, { 10, 0 }, { 37, 0 }, { 1000, 0 },
+      { 0, 1 }, { 0, 2 }, { 0, 10 }, { 0, 37 }, { 0, 1000 },
+      { 1, 1 }, { 2, 2 }, { 10, 10 }, { 37, 37 }, { 1000, 1000 }
+    };
+
+    int lastRows = 0;
+    for (int l = 0; l < (int)(sizeof(limits)/sizeof(limits[0])); l++)
+    {
+
+      Uint32 maxRows = limits[l].rows;
+      Uint32 maxBytes = limits[l].bytes;
+
+      NdbInfoScanOperation* scanOp = NULL;
+      if (ndbinfo.createScanOperation(table, &scanOp, maxRows, maxBytes))
+      {
+        g_err << "No NdbInfoScanOperation" << endl;
+        return NDBT_FAILED;
+      }
+
+      if (scanOp->readTuples() != 0)
+      {
+        g_err << "scanOp->readTuples failed" << endl;
+        return NDBT_FAILED;
+      }
+
+      int columnId = 0;
+      while (scanOp->getValue(columnId))
+        columnId++;
+      // At least one column
+      assert(columnId >= 1);
+
+      if(scanOp->execute() != 0)
+      {
+        g_err << "scanOp->execute failed" << endl;
+        return NDBT_FAILED;
+      }
+
+      int row = 0;
+      while(scanOp->nextResult() == 1)
+        row++;
+      ndbinfo.releaseScanOperation(scanOp);
+
+      ndbout_c("[%u,%u] rows: %d", maxRows, maxBytes, row);
+      if (lastRows != 0)
+      {
+        // Check that the number of rows is same as last round on same table
+        if (lastRows != row)
+        {
+          g_err << "Got different number of rows this round, expected: "
+                << lastRows << ", got: " << row << endl;
+          ndbinfo.closeTable(table);
+          return NDBT_FAILED;
+        }
+      }
+      lastRows = row;
+    }
+    ndbinfo.closeTable(table);
+    tableId++;
+  }
+
+  // Should never come here
+  assert(false);
+  return NDBT_FAILED;
+}
+
+int runTestTable(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbInfo ndbinfo(&ctx->m_cluster_connection, "ndbinfo/");
+  if (!ndbinfo.init())
+  {
+    g_err << "ndbinfo.init failed" << endl;
+    return NDBT_FAILED;
+  }
+
+  const NdbInfo::Table* table;
+  if (ndbinfo.openTable("ndbinfo/test", &table) != 0)
+  {
+    g_err << "Failed to openTable(test)" << endl;
+    return NDBT_FAILED;
+  }
+
+  for (int l = 0; l < ctx->getNumLoops(); l++)
+  {
+
+    NdbInfoScanOperation* scanOp = NULL;
+    if (ndbinfo.createScanOperation(table, &scanOp))
+    {
+      g_err << "No NdbInfoScanOperation" << endl;
+      return NDBT_FAILED;
+    }
+
+    if (scanOp->readTuples() != 0)
+    {
+      g_err << "scanOp->readTuples failed" << endl;
+      return NDBT_FAILED;
+    }
+
+    const NdbInfoRecAttr* nodeId= scanOp->getValue("node_id");
+    const NdbInfoRecAttr* blockNumber= scanOp->getValue("block_number");
+    const NdbInfoRecAttr* blockInstance= scanOp->getValue("block_instance");
+    const NdbInfoRecAttr* counter= scanOp->getValue("counter");
+
+    if(scanOp->execute() != 0)
+    {
+      g_err << "scanOp->execute failed" << endl;
+      return NDBT_FAILED;
+    }
+
+    int rows = 0;
+    while(scanOp->nextResult() == 1)
+    {
+      rows++;//
+    }
+    ndbout << "rows: " << rows << endl;
+    ndbinfo.releaseScanOperation(scanOp);
+  }
+
+  ndbinfo.closeTable(table);
+  return NDBT_OK;
+}
+
+
+NDBT_TESTSUITE(testNdbinfo);
+TESTCASE("Ndbinfo",
+         "Test ndbapi interface to NDB$INFO"){
+  INITIALIZER(runTestNdbInfo);
+}
+TESTCASE("Ndbinfo10",
+         "Test ndbapi interface to NDB$INFO"){
+  STEPS(runTestNdbInfo, 10);
+}
+TESTCASE("ScanAll",
+         "Scan all colums of all table known to NdbInfo"){
+  STEPS(runScanAll, 1);
+}
+TESTCASE("ScanStop",
+         "Randomly stop the scan"){
+  STEPS(runScanStop, 1);
+}
+TESTCASE("Ratelimit",
+         "Scan wit different combinations of ratelimit"){
+  STEPS(runRatelimit, 1);
+}
+TESTCASE("TestTable",
+         "Scan the test table and make sure it returns correct number "
+          "of rows which will depend on how many TUP blocks are configured"){
+  STEP(runTestTable);
+}
+NDBT_TESTSUITE_END(testNdbinfo);
+
+
+int main(int argc, const char** argv){
+  ndb_init();
+  NDBT_TESTSUITE_INSTANCE(testNdbinfo);
+  testNdbinfo.setCreateTable(false);
+  testNdbinfo.setRunAllTables(true);
+  return testNdbinfo.execute(argc, argv);
+}

=== modified file 'storage/ndb/tools/CMakeLists.txt'
--- a/storage/ndb/tools/CMakeLists.txt	2008-08-21 14:52:24 +0000
+++ b/storage/ndb/tools/CMakeLists.txt	2009-11-08 12:52:27 +0000
@@ -53,3 +53,11 @@ SET(options "${options} -I${CMAKE_SOURCE
 #SET(options "${options} -DMYSQLCLUSTERDIR=\"\\\"\\\"\"")
 SET_TARGET_PROPERTIES(ndb_config PROPERTIES
                       COMPILE_FLAGS "${options}")
+
+# Build ndbinfo_sql and run it to create ndbinfo.sql
+ADD_EXECUTABLE(ndbinfo_sql ndbinfo_sql.cpp)a
+GET_TARGET_PROPERTY(NDBINFO_SQL_EXE comp_sql LOCATION)
+ADD_CUSTOM_COMMAND(OUTPUT ${PROJECT_SOURCE_DIR}/storage/ndb/tools/ndbinfo.sql
+                   COMMAND ${NDBINFO_SQL_EXE} > ndbinfo.sql
+                   DEPENDS ndbinfo_sql)
+

=== modified file 'storage/ndb/tools/Makefile.am'
--- a/storage/ndb/tools/Makefile.am	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/tools/Makefile.am	2009-11-08 12:52:27 +0000
@@ -16,8 +16,12 @@
 
 EXTRA_DIST = CMakeLists.txt
 
+BUILT_SOURCES = ndbinfo.sql
+
+noinst_PROGRAMS = 	ndbinfo_sql
+
 dist_bin_SCRIPTS =	ndb_size.pl ndb_error_reporter
-dist_pkgdata_DATA =
+dist_pkgdata_DATA =	ndbinfo.sql
 
 ndbtools_PROGRAMS = \
   ndb_test_platform \
@@ -66,6 +70,13 @@ ndb_config_CXXFLAGS = -I$(top_srcdir)/st
 ndb_restore_LDADD = $(top_builddir)/storage/ndb/src/common/util/libndbazio.la \
                     $(LDADD)
 
+
+ndbinfo_sql_SOURCES = ndbinfo_sql.cpp
+
+# Build ndbinfo.sql by executing ndbinfo_sql
+ndbinfo.sql: ndbinfo_sql
+	$(builddir)/ndbinfo_sql$(EXEEXT) > $@
+
 include $(top_srcdir)/storage/ndb/config/common.mk.am
 include $(top_srcdir)/storage/ndb/config/type_ndbapitools.mk.am
 

=== added file 'storage/ndb/tools/ndbinfo_sql.cpp'
--- a/storage/ndb/tools/ndbinfo_sql.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/tools/ndbinfo_sql.cpp	2009-11-08 12:52:27 +0000
@@ -0,0 +1,235 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include <ndb_global.h>
+#include <ndb_opts.h>
+#include <util/BaseString.hpp>
+#include "../src/kernel/vm/NdbinfoTables.cpp"
+
+static char* opt_ndbinfo_db = (char*)"ndbinfo";
+static char* opt_table_prefix = (char*)"ndb$";
+
+static struct my_option
+my_long_options[] =
+{
+  { "database", 'd',
+    "Name of the database used by ndbinfo",
+    (uchar**) &opt_ndbinfo_db, (uchar**) &opt_ndbinfo_db, 0,
+    GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+  { "database", 'd',
+    "Prefix to use for all virtual tables loaded from NDB",
+    (uchar**) &opt_table_prefix, (uchar**) &opt_table_prefix, 0,
+    GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
+  { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
+};
+
+
+struct view {
+  const char* name;
+  const char* sql;
+} views[] =
+{
+  { "trp_status",
+    "SELECT node_id, remote_node_id, "
+    " CASE status"
+    "  WHEN 0 THEN \"connected\""
+    "  WHEN 1 THEN \"connecting\""
+    "  WHEN 2 THEN \"disconnected\""
+    "  WHEN 3 THEN \"disconnecting\""
+    "  ELSE NULL "
+    " END AS status "
+    "FROM <NDBINFO_DB>.<TABLE_PREFIX>trp_status"
+   },
+  { "log_space",
+    "SELECT log_id, "
+    " CASE log_type"
+    "  WHEN 0 THEN \"REDO\""
+    "  WHEN 1 THEN \"DD-UNDO\""
+    "  ELSE NULL "
+    " END AS log_id "
+    " log_part, node_id, total, used"
+  } 
+};
+
+size_t num_views = sizeof(views)/sizeof(views[0]);
+
+
+struct replace {
+  const char* tag;
+  const char* string;
+} replaces[] =
+{
+  {"<TABLE_PREFIX>", opt_table_prefix},
+  {"<NDBINFO_DB>", opt_ndbinfo_db},
+};
+
+size_t num_replaces = sizeof(replaces)/sizeof(replaces[0]);
+
+
+BaseString replace_tags(const char* str)
+{
+  BaseString result(str);
+  for (size_t i = 0; i < num_replaces; i++)
+  {
+    Vector<BaseString> parts;
+    const char* p = result.c_str();
+    const char* tag = replaces[i].tag;
+
+    /* Split on <tag> */
+    const char* first;
+    while((first = strstr(p, tag)))
+    {
+      BaseString part;
+      part.assign(p, first - p);
+      parts.push_back(part);
+      p = first + strlen(tag);
+    }
+    parts.push_back(p);
+
+    /* Put back together */
+    BaseString res;
+    const char* separator = "";
+    for (unsigned j = 0; j < parts.size(); j++)
+    {
+      res.appfmt("%s%s", separator, parts[j].c_str());
+      separator = replaces[i].string;
+    }
+
+    /* Save result from this loop */
+    result = res;
+  }
+  return result;
+}
+
+
+int main(int argc, char** argv){
+
+  if ((handle_options(&argc, &argv, my_long_options, NULL)))
+    return 2;
+
+  printf("--\n");
+  printf("-- SQL commands for creating the tables in MySQL Server which \n");
+  printf("-- are used by the NDBINFO storage engine to access system \n");
+  printf("-- information and statistics from MySQL Cluster\n");
+  printf("--\n");
+
+  printf("CREATE DATABASE IF NOT EXISTS `%s`;\n\n", opt_ndbinfo_db);
+
+  printf("-- Only create tables if NDBINFO is enabled\n");
+  printf("SELECT @have_ndbinfo:= COUNT(*) FROM "
+                  "information_schema.engines WHERE engine='NDBINFO' "
+                  "AND support IN ('YES', 'DEFAULT');\n\n");
+
+  printf("-- drop any old views in %s\n", opt_ndbinfo_db);
+  for (size_t i = 0; i < num_views; i++)
+  {
+    printf("DROP VIEW IF EXISTS %s.%s;\n",
+            opt_ndbinfo_db, views[i].name);
+  }
+  printf("\n");
+
+  for (int i = 0; i < Ndbinfo::getNumTables(); i++)
+  {
+    const Ndbinfo::Table& table = Ndbinfo::getTable(i);
+
+    printf("-- %s.%s%s\n",
+            opt_ndbinfo_db, opt_table_prefix, table.m.name);
+
+    /* Drop the table if it exists */
+    printf("SET @str=IF(@have_ndbinfo,"
+                    "'DROP TABLE IF EXISTS `%s`.`%s%s`',"
+                    "'SET @dummy = 0');\n",
+            opt_ndbinfo_db, opt_table_prefix, table.m.name);
+    printf("PREPARE stmt FROM @str;\n");
+    printf("EXECUTE stmt;\n");
+    printf("DROP PREPARE stmt;\n\n");
+
+    /* Create the table */
+    BaseString sql;
+    sql.assfmt("CREATE TABLE `%s`.`%s%s` (",
+               opt_ndbinfo_db, opt_table_prefix, table.m.name);
+
+    const char* separator = "";
+    for(int j = 0; j < table.m.ncols ; j++)
+    {
+      const Ndbinfo::Column& col = table.col[j];
+
+      sql.appfmt("%s", separator);
+      separator = ",";
+
+      sql.appfmt("`%s` ", col.name);
+
+      switch(col.coltype)
+      {
+      case Ndbinfo::Number:
+        sql.appfmt("INT UNSIGNED");
+        break;
+      case Ndbinfo:: Number64:
+        sql.appfmt("BIGINT UNSIGNED");
+        break;
+      case Ndbinfo::String:
+        sql.appfmt("VARCHAR(512)");
+        break;
+      default:
+        fprintf(stderr, "unknown coltype: %d\n", col.coltype);
+        abort();
+        break;
+      }
+
+      if (col.comment[0] != '\0')
+        sql.appfmt(" COMMENT \"%s\"", col.comment);
+
+    }
+
+    sql.appfmt(") COMMENT=\"%s\" ENGINE=NDBINFO;", table.m.comment);
+
+    printf("SET @str=IF(@have_ndbinfo,'%s','SET @dummy = 0');\n", sql.c_str());
+    printf("PREPARE stmt FROM @str;\n");
+    printf("EXECUTE stmt;\n");
+    printf("DROP PREPARE stmt;\n\n");
+
+  }
+
+  printf("--\n");
+  printf("-- %s views\n", opt_ndbinfo_db);
+  printf("--\n\n");
+
+  for (size_t i = 0; i < num_views; i++)
+  {
+    view v = views[i];
+
+    printf("-- %s.%s\n", opt_ndbinfo_db, v.name);
+
+    BaseString view_sql = replace_tags(v.sql);
+
+    /* Create or replace the view */
+    BaseString sql;
+    sql.assfmt("CREATE OR REPLACE DEFINER=`root@localhost` "
+               "SQL SECURITY INVOKER VIEW `%s`.`%s` AS %s",
+               opt_ndbinfo_db, v.name, view_sql.c_str());
+
+    printf("SET @str=IF(@have_ndbinfo,'%s','SET @dummy = 0');\n",
+           sql.c_str());
+    printf("PREPARE stmt FROM @str;\n");
+    printf("EXECUTE stmt;\n");
+    printf("DROP PREPARE stmt;\n\n");
+  }
+
+  return 0;
+}
+

Attachment: [text/bzr-bundle] bzr/magnus.blaudd@sun.com-20091108125335-nn0jev237hi335d9.bundle
Thread
bzr commit into mysql-5.1-telco-7.0 branch (magnus.blaudd:3199)Magnus Blåudd8 Nov 2009