List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:April 11 2012 10:19am
Subject:bzr push into mysql-5.5-cluster-7.3 branch (magnus.blaudd:3876 to 3877)
View as plain text  
 3877 magnus.blaudd@stripped	2012-04-11 [merge]
      Merge 7.2 -> 7.3

    removed:
      mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt
    added:
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java
    modified:
      mysql-test/suite/ndb/r/ndb_restore_misc.result
      mysql-test/suite/ndb/t/ndb_restore_misc.test
      mysql-test/suite/ndb_rpl/my.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf
      mysql-test/suite/rpl_ndb/my.cnf
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java
      storage/ndb/clusterj/clusterj-tie/logging.properties
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
      storage/ndb/compile-cluster
      storage/ndb/include/util/Vector.hpp
      storage/ndb/memcache/extra/memcached/CMakeLists.txt
      storage/ndb/memcache/extra/memcached/config_tests.in
      storage/ndb/memcache/extra/memcached/daemon/memcached.c
      storage/ndb/memcache/include/Scheduler.h
      storage/ndb/memcache/include/ndb_engine.h
      storage/ndb/memcache/include/ndb_engine_errors.h
      storage/ndb/memcache/include/ndb_pipeline.h
      storage/ndb/memcache/include/ndbmemcache_config.in
      storage/ndb/memcache/src/ExternalValue.cc
      storage/ndb/memcache/src/ndb_engine.c
      storage/ndb/memcache/src/ndb_engine_errors.cc
      storage/ndb/memcache/src/ndb_error_logger.cc
      storage/ndb/memcache/src/ndb_pipeline.cc
      storage/ndb/memcache/src/ndb_worker.cc
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/src/schedulers/S_sched.h
      storage/ndb/memcache/src/schedulers/Stockholm.cc
      storage/ndb/memcache/src/schedulers/Stockholm.h
      storage/ndb/memcache/unit/alloc.cc
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 3876 Mauritz Sundell	2012-04-10
      ndb - allow upgrade from 7.x to 7.3

    modified:
      storage/ndb/src/common/util/version.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_restore_misc.result'
--- a/mysql-test/suite/ndb/r/ndb_restore_misc.result	2011-08-17 10:36:01 +0000
+++ b/mysql-test/suite/ndb/r/ndb_restore_misc.result	2012-04-11 09:56:27 +0000
@@ -134,11 +134,12 @@ ForceVarPart: 1
 drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c,t10_c,t11_c;
 ForceVarPart: 0
 ForceVarPart: 1
-select * from information_schema.columns where table_name = "t1_c";
-TABLE_CATALOG	TABLE_SCHEMA	TABLE_NAME	COLUMN_NAME	ORDINAL_POSITION	COLUMN_DEFAULT	IS_NULLABLE	DATA_TYPE	CHARACTER_MAXIMUM_LENGTH	CHARACTER_OCTET_LENGTH	NUMERIC_PRECISION	NUMERIC_SCALE	CHARACTER_SET_NAME	COLLATION_NAME	COLUMN_TYPE	COLUMN_KEY	EXTRA	PRIVILEGES	COLUMN_COMMENT
-def	test	t1_c	capgoaledatta	1	NULL	NO	mediumint	NULL	NULL	7	0	NULL	NULL	mediumint(5) unsigned	PRI	auto_increment	#	
-def	test	t1_c	goaledatta	2		NO	char	2	2	NULL	NULL	latin1	latin1_swedish_ci	char(2)	PRI		#	
-def	test	t1_c	maturegarbagefa	3		NO	varchar	32	32	NULL	NULL	latin1	latin1_swedish_ci	varchar(32)	PRI		#	
+select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE
+from information_schema.columns where table_name = "t1_c";
+TABLE_SCHEMA	TABLE_NAME	COLUMN_NAME	ORDINAL_POSITION	DATA_TYPE
+test	t1_c	capgoaledatta	1	mediumint
+test	t1_c	goaledatta	2	char
+test	t1_c	maturegarbagefa	3	varchar
 select count(*) from t1;
 count(*)
 5

=== modified file 'mysql-test/suite/ndb/t/ndb_restore_misc.test'
--- a/mysql-test/suite/ndb/t/ndb_restore_misc.test	2011-07-19 10:54:29 +0000
+++ b/mysql-test/suite/ndb/t/ndb_restore_misc.test	2012-04-11 07:17:49 +0000
@@ -192,9 +192,8 @@ source show_varpart.inc;
 # Bug #30667
 # ndb table discovery does not work correcly with information schema
 # - prior to bug fix this would yeild no output and a warning
-# (priviliges differ on embedded and server so replace)
---replace_column 18 #
-select * from information_schema.columns where table_name = "t1_c";
+select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE
+  from information_schema.columns where table_name = "t1_c";
 
 # random output order??
 #show tables;

=== modified file 'mysql-test/suite/ndb_rpl/my.cnf'
--- a/mysql-test/suite/ndb_rpl/my.cnf	2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/ndb_rpl/my.cnf	2012-03-29 10:13:47 +0000
@@ -59,12 +59,6 @@ report-host=                  127.0.0.1
 report-port=                  @mysqld.1.slave.port
 report-user=                  root
 
-# Configure slave mysqld without innodb, and set myisam
-# as default storage engine(since innodb will be default
-# otherwise starting  from 5.5)
-loose-skip-innodb
-default-storage-engine=myisam
-
 skip-slave-start
 
 # Directory where slaves find the dumps generated by "load data"

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf	2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf	2012-03-29 10:13:47 +0000
@@ -47,9 +47,6 @@ relay-log=                    cluster2-r
 log-slave-updates
 ndb-log-apply-status
 
-loose-skip-innodb
-default-storage-engine=myisam
-
 # Directory where slaves find the dumps generated by "load data"
 # on the server. The path need to have constant length otherwise
 # test results will vary, thus a relative path is used.
@@ -62,9 +59,6 @@ relay-log=                    cluster3-r
 log-slave-updates
 ndb-log-apply-status
 
-loose-skip-innodb
-default-storage-engine=myisam
-
 # Directory where slaves find the dumps generated by "load data"
 # on the server. The path need to have constant length otherwise
 # test results will vary, thus a relative path is used.

=== removed file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt	2011-05-18 12:56:24 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt	1970-01-01 00:00:00 +0000
@@ -1,3 +0,0 @@
---max_relay_log_size=16384
---loose-innodb
---log-warnings

=== modified file 'mysql-test/suite/rpl_ndb/my.cnf'
--- a/mysql-test/suite/rpl_ndb/my.cnf	2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/rpl_ndb/my.cnf	2012-03-29 10:13:47 +0000
@@ -59,12 +59,6 @@ report-host=                  127.0.0.1
 report-port=                  @mysqld.1.slave.port
 report-user=                  root
 
-# Configure slave mysqld without innodb, and set myisam
-# as default storage engine(since innodb will be default
-# otherwise starting  from 5.5)
-loose-skip-innodb
-default-storage-engine=myisam
-
 skip-slave-start
 
 # Directory where slaves find the dumps generated by "load data"

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2012-04-02 20:43:14 +0000
@@ -202,7 +202,7 @@ public class SessionImpl implements Sess
                 }
                 if (smartValueHandler.found()) {
                     // create a new proxy (or dynamic instance) with the smart value handler
-                    return domainTypeHandler.newInstance(smartValueHandler, db);
+                    return domainTypeHandler.newInstance(smartValueHandler);
                 } else {
                     // not found
                     return null;
@@ -289,6 +289,16 @@ public class SessionImpl implements Sess
         return instance;
     }
 
+    /** Create an instance from a result data row.
+     * @param resultData the result of a query
+     * @param domainTypeHandler the domain type handler
+     * @return the instance
+     */
+    public <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler) {
+        T result = domainTypeHandler.newInstance(resultData, db);
+        return result;
+    }
+
     /** Load the instance from the database into memory. Loading
      * is asynchronous and will be executed when an operation requiring
      * database access is executed: find, flush, or query. The instance must

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java	2012-03-12 09:22:04 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java	2012-04-11 09:56:27 +0000
@@ -413,6 +413,10 @@ public abstract class AbstractDomainType
         throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
     }
 
+    public T newInstance(ResultData resultData, Db db) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
     public void objectMarkModified(ValueHandler handler, String fieldName) {
         throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
     }
@@ -450,7 +454,7 @@ public abstract class AbstractDomainType
         return reasons == null?null:reasons.toString();
     }
 
-    public T newInstance(ValueHandler valueHandler, Db db) {
+    public T newInstance(ValueHandler valueHandler) {
         throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
     }
 

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java	2012-04-02 20:43:14 +0000
@@ -18,6 +18,7 @@
 package com.mysql.clusterj.core.metadata;
 
 import com.mysql.clusterj.core.spi.DomainFieldHandler;
+import com.mysql.clusterj.core.spi.SmartValueHandler;
 import com.mysql.clusterj.core.spi.ValueHandlerFactory;
 import com.mysql.clusterj.core.spi.ValueHandler;
 import com.mysql.clusterj.ClusterJException;
@@ -36,6 +37,7 @@ import com.mysql.clusterj.core.store.Db;
 import com.mysql.clusterj.core.store.Index;
 import com.mysql.clusterj.core.store.Dictionary;
 import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationHandler;
@@ -422,11 +424,22 @@ public class DomainTypeHandlerImpl<T> ex
     @Override
     public T newInstance(Db db) {
         ValueHandler valueHandler = valueHandlerFactory.getValueHandler(this, db);
-        return newInstance(valueHandler, db);
+        return newInstance(valueHandler);
+    }
+
+    /** Create a new domain type instance from the result.
+     * @param resultData the results from a database query
+     * @param db the Db
+     * @return the domain type instance
+     */
+    public T newInstance(ResultData resultData, Db db) {
+        ValueHandler valueHandler = valueHandlerFactory.getValueHandler(this, db, resultData);
+        T result = newInstance(valueHandler);
+        return result;
     }
 
     @Override
-    public T newInstance(ValueHandler valueHandler, Db db) {
+    public T newInstance(ValueHandler valueHandler) {
         T instance;
         try {
             if (dynamic) {
@@ -608,7 +621,7 @@ public class DomainTypeHandlerImpl<T> ex
         Object[] result = new Object[numberOfFields];
         int i = 0;
         for (Integer idFieldNumber: idFieldNumbers) {
-            result[idFieldNumber] = keyValues[i];
+            result[idFieldNumber] = keyValues[i++];
         }
         return result;
     }
@@ -624,6 +637,7 @@ public class DomainTypeHandlerImpl<T> ex
 
     /** Factory for default InvocationHandlerImpl */
     protected ValueHandlerFactory defaultInvocationHandlerFactory = new ValueHandlerFactory()  {
+
         public <V> ValueHandler getValueHandler(DomainTypeHandlerImpl<V> domainTypeHandler, Db db) {
             return new InvocationHandlerImpl<V>(domainTypeHandler);
         }
@@ -633,6 +647,13 @@ public class DomainTypeHandlerImpl<T> ex
             Object[] expandedKeyValues = expandKeyValues(keyValues);
             return new KeyValueHandlerImpl(expandedKeyValues);
         }
+
+        public <V> ValueHandler getValueHandler(
+                DomainTypeHandlerImpl<V> domainTypeHandler, Db db, ResultData resultData) {
+            ValueHandler result = new InvocationHandlerImpl<V>(domainTypeHandler);
+            objectSetValues(resultData, result);
+            return result;
+        }
     };
 
     public int getNumberOfTransientFields() {

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java	2012-02-09 10:22:48 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java	2012-04-11 09:56:27 +0000
@@ -27,7 +27,6 @@ import com.mysql.clusterj.core.spi.Domai
 import com.mysql.clusterj.core.spi.DomainTypeHandler;
 import com.mysql.clusterj.core.spi.QueryExecutionContext;
 import com.mysql.clusterj.core.spi.SessionSPI;
-import com.mysql.clusterj.core.spi.ValueHandler;
 import com.mysql.clusterj.core.spi.ValueHandlerBatching;
 
 import com.mysql.clusterj.core.store.Index;
@@ -163,10 +162,7 @@ public class QueryDomainTypeImpl<T> impl
             ResultData resultData = getResultData(context);
             // put the result data into the result list
             while (resultData.next()) {
-                T row = (T) session.newInstance(cls);
-                ValueHandler handler =domainTypeHandler.getValueHandler(row);
-                // set values from result set into object
-                domainTypeHandler.objectSetValues(resultData, handler);
+                T row = session.newInstance(resultData, domainTypeHandler);
                 resultList.add(row);
             }
             session.endAutoTransaction();
@@ -206,6 +202,7 @@ public class QueryDomainTypeImpl<T> impl
         switch (scanType) {
 
             case PRIMARY_KEY: {
+                if (logger.isDetailEnabled()) logger.detail("Using primary key find for query.");
                 // perform a select operation
                 Operation op = session.getSelectOperation(domainTypeHandler.getStoreTable());
                 op.beginDefinition();
@@ -221,7 +218,7 @@ public class QueryDomainTypeImpl<T> impl
 
             case INDEX_SCAN: {
                 storeIndex = index.getStoreIndex();
-                if (logger.isDetailEnabled()) logger.detail("Using index scan with index " + index.getIndexName());
+                if (logger.isDetailEnabled()) logger.detail("Using index scan with ordered index " + index.getIndexName() + " for query.");
                 IndexScanOperation op;
                 // perform an index scan operation
                 if (index.isMultiRange()) {
@@ -231,27 +228,31 @@ public class QueryDomainTypeImpl<T> impl
                     op = session.getIndexScanOperation(storeIndex, domainTypeHandler.getStoreTable());
                     
                 }
+                op.beginDefinition();
                 // set the expected columns into the operation
                 domainTypeHandler.operationGetValues(op);
                 // set the bounds into the operation
                 index.operationSetBounds(context, op);
                 // set additional filter conditions
                 where.filterCmpValue(context, op);
+                op.endDefinition();
                 // execute the scan and get results
                 result = op.resultData();
                 break;
             }
 
             case TABLE_SCAN: {
-                if (logger.isDetailEnabled()) logger.detail("Using table scan");
+                if (logger.isDetailEnabled()) logger.detail("Using table scan for query.");
                 // perform a table scan operation
                 ScanOperation op = session.getTableScanOperation(domainTypeHandler.getStoreTable());
+                op.beginDefinition();
                 // set the expected columns into the operation
                 domainTypeHandler.operationGetValues(op);
-                // set the bounds into the operation
+                // set filter conditions into the operation
                 if (where != null) {
                     where.filterCmpValue(context, op);
                 }
+                op.endDefinition();
                 // execute the scan and get results
                 result = op.resultData();
                 break;
@@ -259,14 +260,16 @@ public class QueryDomainTypeImpl<T> impl
 
             case UNIQUE_KEY: {
                 storeIndex = index.getStoreIndex();
-                if (logger.isDetailEnabled()) logger.detail("Using unique lookup with index " + index.getIndexName());
+                if (logger.isDetailEnabled()) logger.detail("Using lookup with unique index " + index.getIndexName() + " for query.");
                 // perform a unique lookup operation
                 IndexOperation op = session.getUniqueIndexOperation(storeIndex, domainTypeHandler.getStoreTable());
+                op.beginDefinition();
                 // set the keys of the indexName into the operation
                 where.operationEqual(context, op);
                 // set the expected columns into the operation
                 //domainTypeHandler.operationGetValuesExcept(op, indexName);
                 domainTypeHandler.operationGetValues(op);
+                op.endDefinition();
                 // execute the select and get results
                 result = op.resultData();
                 break;

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java	2012-04-02 20:43:14 +0000
@@ -93,6 +93,8 @@ public interface DomainTypeHandler<T> {
 
     public void setUnsupported(String reason);
 
-    public T newInstance(ValueHandler valueHandler, Db db);
+    public T newInstance(ValueHandler valueHandler);
+
+    public T newInstance(ResultData resultData, Db db);
 
 }

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java	2011-11-22 22:01:23 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java	2012-04-02 20:43:14 +0000
@@ -98,6 +98,8 @@ public interface SessionSPI extends Sess
 
     <T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> handler);
 
+    <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler);
+
     String getCoordinatedTransactionId();
 
     void setCoordinatedTransactionId(String coordinatedTransactionId);

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java	2012-04-02 20:43:14 +0000
@@ -19,6 +19,7 @@ package com.mysql.clusterj.core.spi;
 
 import com.mysql.clusterj.core.store.ClusterTransaction;
 import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
 
 /** SmartValueHandler is the interface that must be implemented for
  * operations that bypass the normal value handler and directly

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java	2012-04-02 20:43:14 +0000
@@ -19,6 +19,7 @@ package com.mysql.clusterj.core.spi;
 
 import com.mysql.clusterj.core.metadata.DomainTypeHandlerImpl;
 import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.ResultData;
 
 /** ValueHandlerFactory allows a component to provide an alternative value handler.
  *
@@ -27,5 +28,8 @@ public interface ValueHandlerFactory {
 
     <T> ValueHandler getValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db);
 
+    <T> ValueHandler getValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db, ResultData resultData);
+
     <T> ValueHandler getKeyValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db, Object keyValues);
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java	2012-04-02 20:43:14 +0000
@@ -255,7 +255,12 @@ public class NdbOpenJPADomainTypeHandler
                 local.message("ERR_Implementation_Should_Not_Occur"));
     }
 
-    public T newInstance(ValueHandler valueHandler, Db db) {
+    public T newInstance(ValueHandler valueHandler) {
+        throw new ClusterJFatalInternalException(
+                local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+    public T newInstance(ResultData resultData,Db db) {
         throw new ClusterJFatalInternalException(
                 local.message("ERR_Implementation_Should_Not_Occur"));
     }

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java	2012-04-04 06:22:39 +0000
@@ -23,6 +23,8 @@ import com.mysql.clusterj.Constants;
 import com.mysql.clusterj.Session;
 import com.mysql.clusterj.SessionFactory;
 import com.mysql.clusterj.Transaction;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -57,6 +59,10 @@ import junit.framework.TestCase;
  *
  */
 public abstract class AbstractClusterJTest extends TestCase {
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance("com.mysql.clusterj.test");
+
     protected static final String JDBC_DRIVER_NAME = "jdbc.driverName";
     protected static final String JDBC_URL = "jdbc.url";
     protected static Connection connection;
@@ -76,7 +82,7 @@ public abstract class AbstractClusterJTe
      *
      * Error messages collected during a test.
      */
-    private StringBuffer errorMessages;
+    protected StringBuffer errorMessages;
     /**
      *
      * A list of registered pc classes.

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java	2012-04-07 02:06:08 +0000
@@ -1,5 +1,5 @@
 /*
-  Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
 
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
@@ -80,6 +80,7 @@ public class BitTypesTest extends Abstra
             case 1: { // boolean
                 boolean data = (i % 2) == 0;
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (boolean)" + data);
                 return data;
             }
@@ -90,6 +91,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 2) + (int)(Math.random() * 2);
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (byte)" + data);
                 return Byte.valueOf((byte)data);
             }
@@ -100,6 +102,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 2) + (int)(Math.random() * 2);
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (short)" + data);
                 return Short.valueOf((short)data);
             }
@@ -111,6 +114,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 2) + ((int)(Math.random() * 2));
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (int)" + data);
                 // TODO bug in JDBC handling high bit
                 data = Math.abs(data);
@@ -124,6 +128,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 256) + (i * 16) + d;
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (long)" + data);
                 return Long.valueOf(data);
             }
@@ -148,13 +153,14 @@ public class BitTypesTest extends Abstra
             errorIfNotEqual(where + " got failure on id for row " + i, i, actual[0]);
             for (int j = 1; j < expected.length; ++j) {
                 if (getDebug()) System.out.println("BitTypesTest.verify for " + i + ", " + j
+                        + " " + columnDescriptors[j - 1].getColumnName()
                         + "  is (" + actual[j].getClass().getName() + ")" + actual[j]);
                 switch (j) {
                     case 1: { // boolean
                         Boolean expectedColumn = (Boolean)expected[j];
                         Boolean actualColumn = (Boolean)actual[j];
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 expectedColumn, actualColumn);
                         break;
                     }
@@ -163,7 +169,7 @@ public class BitTypesTest extends Abstra
                         byte actualColumn = (Byte)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
                         break;
                     }
@@ -172,7 +178,7 @@ public class BitTypesTest extends Abstra
                         short actualColumn = (Short)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
                         break;
                     }
@@ -182,7 +188,7 @@ public class BitTypesTest extends Abstra
                         int actualColumn = (Integer)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
                         break;
                     }
@@ -192,7 +198,7 @@ public class BitTypesTest extends Abstra
                         long actualColumn = (Long)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Long.toHexString(expectedColumn), Long.toHexString(actualColumn));
                         break;
                    }
@@ -209,9 +215,8 @@ public class BitTypesTest extends Abstra
     }
 
     public void testWriteNDBReadJDBC() {
-//        TODO: investigate platform dependency when reading via JDBC
-//        writeNDBreadJDBC();
-//        failOnError();
+        writeNDBreadJDBC();
+        failOnError();
    }
 
     public void testWriteNDBReadNDB() {
@@ -220,9 +225,8 @@ public class BitTypesTest extends Abstra
    }
 
     public void testWriteJDBCReadJDBC() {
-//      TODO: investigate platform dependency when reading via JDBC
-//        writeJDBCreadJDBC();
-//        failOnError();
+        writeJDBCreadJDBC();
+        failOnError();
    }
 
    static ColumnDescriptor bit1 = new ColumnDescriptor

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java	2011-12-18 18:37:39 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java	2012-04-04 00:32:48 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2011, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -17,6 +17,8 @@
 
 package testsuite.clusterj;
 
+import java.io.File;
+
 import com.mysql.clusterj.ClusterJHelper;
 import com.mysql.clusterj.Dbug;
 
@@ -25,7 +27,9 @@ import com.mysql.clusterj.Dbug;
  */
 public class DbugTest extends AbstractClusterJTest{
 
-    static String tmpFileName = System.getProperty("MYSQL_TMP_DIR", "/tmp") + "/clusterj-test-dbug";
+    private static final String TMP_DIR_NAME = System.getProperty("java.io.tmpdir");
+    private static final String FILE_SEPARATOR = File.separator;
+    private static final String TMP_FILE_NAME = TMP_DIR_NAME + FILE_SEPARATOR + "clusterj-test-dbug";
 
     public boolean getDebug() {
         return false;
@@ -46,7 +50,7 @@ public class DbugTest extends AbstractCl
             return;
         }
         String originalState = "t";
-        String newState = "d,jointx:o," + tmpFileName;
+        String newState = "d,jointx:o," + TMP_FILE_NAME;
         dbug.set(originalState);
         String actualState = dbug.get();
         errorIfNotEqual("Failed to set original state", originalState, actualState);
@@ -58,17 +62,17 @@ public class DbugTest extends AbstractCl
         errorIfNotEqual("Failed to pop original state", originalState, actualState);
 
         dbug = ClusterJHelper.newDbug();
-        dbug.output(tmpFileName).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
+        dbug.output(TMP_FILE_NAME).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
         actualState = dbug.get();
         // keywords are stored LIFO
-        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + tmpFileName, actualState);
+        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + TMP_FILE_NAME, actualState);
         dbug.pop();
 
         dbug = ClusterJHelper.newDbug();
-        dbug.append(tmpFileName).trace().debug("a,b,c,d,e,f").set();
+        dbug.append(TMP_FILE_NAME).trace().debug("a,b,c,d,e,f").set();
         actualState = dbug.get();
         // keywords are stored LIFO
-        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + tmpFileName + ":t", actualState);
+        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + TMP_FILE_NAME + ":t", actualState);
         dbug.pop();
 
         failOnError();

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java	2011-03-22 15:32:28 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java	2012-04-04 06:46:06 +0000
@@ -103,7 +103,7 @@ create table longintstringix (
     }
 
     public void testPrettyBigIn() {
-        int arraySize = 4096;
+        int arraySize = 20;
         Integer[] keys = new Integer[arraySize];
         for (int i = 0; i < arraySize; ++i) {
             keys[i] = i;

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java	2011-10-28 23:29:26 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java	2012-04-04 06:22:39 +0000
@@ -44,6 +44,7 @@ public class SchemaChangeTest extends Ab
     }
 
     public void testFind() {
+        logger.info("PLEASE IGNORE THE FOLLOWING EXPECTED SEVERE ERROR.");
         // change the schema (drop a column)
         executeSQL(modifyTableStatement);
         try {
@@ -80,6 +81,7 @@ public class SchemaChangeTest extends Ab
                 session.find(StringTypes.class, 0);
             }
         }
+        logger.info("PLEASE IGNORE THE PRECEDING EXPECTED SEVERE ERROR.\n");
         failOnError();
     }
 

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java	2012-04-02 20:43:14 +0000
@@ -110,7 +110,7 @@ public class CrazyDomainTypeHandlerFacto
                     throw new UnsupportedOperationException("Not supported yet.");
                 }
 
-                public T newInstance(ValueHandler valueHandler, Db db) {
+                public T newInstance(ValueHandler valueHandler) {
                     throw new UnsupportedOperationException("Not supported yet.");
                 }
 
@@ -208,6 +208,10 @@ public class CrazyDomainTypeHandlerFacto
                     throw new UnsupportedOperationException("Not supported yet.");
                }
 
+                public T newInstance(ResultData resultData, Db db) {
+                    throw new UnsupportedOperationException("Not supported yet.");
+                }
+
             };
         } else {
             return null;

=== modified file 'storage/ndb/clusterj/clusterj-tie/logging.properties'
--- a/storage/ndb/clusterj/clusterj-tie/logging.properties	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/logging.properties	2012-04-11 09:56:27 +0000
@@ -13,7 +13,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-
+com.mysql.clusterj.test.level=INFO
 com.mysql.clusterj.bindings.level=INFO
 com.mysql.clusterj.core.level=INFO
 com.mysql.clusterj.core.metadata.level=INFO

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	2012-03-06 16:51:07 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	2012-04-02 20:43:14 +0000
@@ -18,6 +18,7 @@
 package com.mysql.clusterj.tie;
 
 import java.util.IdentityHashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +34,7 @@ import com.mysql.clusterj.ClusterJHelper
 
 import com.mysql.clusterj.core.spi.ValueHandlerFactory;
 import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.Index;
 import com.mysql.clusterj.core.store.Table;
 
 import com.mysql.clusterj.core.util.I18NHelper;
@@ -50,7 +52,7 @@ public class ClusterConnectionImpl
 
     /** My logger */
     static final Logger logger = LoggerFactoryService.getFactory()
-            .getInstance(com.mysql.clusterj.core.store.ClusterConnection.class);
+            .getInstance(ClusterConnectionImpl.class);
 
     /** Ndb_cluster_connection is wrapped by ClusterConnection */
     protected Ndb_cluster_connection clusterConnection;
@@ -195,7 +197,9 @@ public class ClusterConnectionImpl
     }
 
     /** 
-     * Get the cached NdbRecord implementation for this cluster connection.
+     * Get the cached NdbRecord implementation for the table
+     * used with this cluster connection. All columns are included
+     * in the NdbRecord.
      * Use a ConcurrentHashMap for best multithread performance.
      * There are three possibilities:
      * <ul><li>Case 1: return the already-cached NdbRecord
@@ -203,8 +207,7 @@ public class ClusterConnectionImpl
      * </li><li>Case 3: return the winner of a race with another thread
      * </li></ul>
      * @param storeTable the store table
-     * @param ndbDictionary the ndb dictionary
-     * @return the NdbRecordImpl
+     * @return the NdbRecordImpl for the table
      */
     protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
         String tableName = storeTable.getName();
@@ -239,17 +242,81 @@ public class ClusterConnectionImpl
         }
     }
 
-    /** Remove the cached NdbRecord associated with this table. This allows schema change to work.
+    /** 
+     * Get the cached NdbRecord implementation for the index and table
+     * used with this cluster connection.
+     * The NdbRecordImpl is cached under the name tableName+indexName.
+     * Only the key columns are included in the NdbRecord.
+     * Use a ConcurrentHashMap for best multithread performance.
+     * There are three possibilities:
+     * <ul><li>Case 1: return the already-cached NdbRecord
+     * </li><li>Case 2: return a new instance created by this method
+     * </li><li>Case 3: return the winner of a race with another thread
+     * </li></ul>
+     * @param storeTable the store table
+     * @param storeIndex the store index
+     * @return the NdbRecordImpl for the index
+     */
+    protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
+        String recordName = storeTable.getName() + "+" + storeIndex.getInternalName();
+        // find the NdbRecordImpl in the global cache
+        NdbRecordImpl result = ndbRecordImplMap.get(recordName);
+        if (result != null) {
+            // case 1
+            if (logger.isDebugEnabled())logger.debug("NdbRecordImpl found for " + recordName);
+            return result;
+        } else {
+            // dictionary is single thread
+            NdbRecordImpl newNdbRecordImpl;
+            synchronized (dictionaryForNdbRecord) {
+                // try again; another thread might have beat us
+                result = ndbRecordImplMap.get(recordName);
+                if (result != null) {
+                    return result;
+                }
+                newNdbRecordImpl = new NdbRecordImpl(storeIndex, storeTable, dictionaryForNdbRecord);   
+            }
+            NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(recordName, newNdbRecordImpl);
+            if (winner == null) {
+                // case 2: the previous value was null, so return the new (winning) value
+                if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + recordName);
+                return newNdbRecordImpl;
+            } else {
+                // case 3: another thread beat us, so return the winner and garbage collect ours
+                if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + recordName);
+                newNdbRecordImpl.releaseNdbRecord();
+                return winner;
+            }
+        }
+    }
+
+    /** Remove the cached NdbRecord(s) associated with this table. This allows schema change to work.
+     * All NdbRecords including any index NdbRecords will be removed. Index NdbRecords are named
+     * tableName+indexName.
      * @param tableName the name of the table
      */
     public void unloadSchema(String tableName) {
-        if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + tableName);
-        NdbRecordImpl ndbRecordImpl = ndbRecordImplMap.remove(tableName);
-        if (ndbRecordImpl != null) {
-            ndbRecordImpl.releaseNdbRecord();
+        // synchronize to avoid multiple threads unloading schema simultaneously
+        // it is possible although unlikely that another thread is adding an entry while 
+        // we are removing entries; if this occurs an error will be signaled here
+        synchronized(ndbRecordImplMap) {
+            Iterator<Map.Entry<String, NdbRecordImpl>> iterator = ndbRecordImplMap.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, NdbRecordImpl> entry = iterator.next();
+                String key = entry.getKey();
+                if (key.startsWith(tableName)) {
+                    // remove all records whose key begins with the table name; this will remove index records also
+                    if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + key);
+                    NdbRecordImpl record = entry.getValue();
+                    iterator.remove();
+                    if (record != null) {
+                        record.releaseNdbRecord();
+                    }
+                }
+            }
+            if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName);
+            dictionaryForNdbRecord.removeCachedTable(tableName);
         }
-        if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName);
-        dictionaryForNdbRecord.removeCachedTable(tableName);
     }
 
     public ValueHandlerFactory getSmartValueHandlerFactory() {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2012-04-02 20:43:14 +0000
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.mysql.clusterj.ClusterJDatastoreException;
-import com.mysql.clusterj.ClusterJException;
 import com.mysql.clusterj.ClusterJFatalInternalException;
 import com.mysql.clusterj.ClusterJHelper;
 import com.mysql.clusterj.LockMode;
@@ -55,6 +54,8 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
 import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst;
 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst;
 
 /**
  *
@@ -243,12 +244,18 @@ class ClusterTransactionImpl implements 
 
     public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, indexScanLockMode);
+        }
         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
         handleError(ndbIndex, ndbDictionary);
         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
         handleError(ndbOperation, ndbTransaction);
-        int lockMode = indexScanLockMode;
         int scanFlags = 0;
+        int lockMode = indexScanLockMode;
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            scanFlags = ScanFlag.SF_KeyInfo;
+        }
         int parallel = 0;
         int batch = 0;
         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -259,12 +266,19 @@ class ClusterTransactionImpl implements 
 
     public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, true, indexScanLockMode);
+        }
         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
         handleError(ndbIndex, ndbDictionary);
         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
         handleError(ndbOperation, ndbTransaction);
+        int scanFlags = 0;
         int lockMode = indexScanLockMode;
-        int scanFlags = ScanFlag.SF_MultiRange;
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            scanFlags = ScanFlag.SF_KeyInfo;
+        }
+        scanFlags |= ScanFlag.SF_MultiRange;
         int parallel = 0;
         int batch = 0;
         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -307,12 +321,18 @@ class ClusterTransactionImpl implements 
 
     public ScanOperation getTableScanOperation(Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordTableScanOperationImpl(this, storeTable, tableScanLockMode);
+        }
         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
         handleError(ndbTable, ndbDictionary);
         NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
         handleError(ndbScanOperation, ndbTransaction);
         int lockMode = tableScanLockMode;
         int scanFlags = 0;
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            scanFlags = ScanFlag.SF_KeyInfo;
+        }
         int parallel = 0;
         int batch = 0;
         int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -339,6 +359,9 @@ class ClusterTransactionImpl implements 
 
     public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordUniqueKeyOperationImpl(this, storeIndex, storeTable);
+        }
         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
         handleError(ndbIndex, ndbDictionary);
         NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
@@ -414,6 +437,33 @@ class ClusterTransactionImpl implements 
         return operation;
     }
 
+    /** Create a table scan operation using NdbRecord.
+     * 
+     * @param ndbRecord the NdbRecord for the result
+     * @param mask the columns to read
+     * @param options the scan options
+     * @return
+     */
+    public NdbScanOperation scanTable(NdbRecordConst ndbRecord, byte[] mask, ScanOptionsConst options) {
+        enlist();
+        int lockMode = tableScanLockMode;
+        NdbScanOperation operation = ndbTransaction.scanTable(ndbRecord, lockMode, mask, options, 0);
+        handleError(operation, ndbTransaction);
+        return operation;
+    }
+
+    /** Create a scan operation on the index using NdbRecord. 
+     * 
+     * @param ndbRecord the ndb record
+     * @param mask the mask that specifies which columns to read
+     * @param object scan options // TODO change this
+     * @return
+     */
+    public NdbIndexScanOperation scanIndex(NdbRecordConst key_record, NdbRecordConst result_record,
+            byte[] result_mask, ScanOptions scanOptions) {
+        return ndbTransaction.scanIndex(key_record, result_record, indexScanLockMode, result_mask, null, scanOptions, 0);
+    }
+
     /** Create an NdbOperation for delete using NdbRecord.
      * 
      * @param ndbRecord the NdbRecord
@@ -655,6 +705,16 @@ class ClusterTransactionImpl implements 
         return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable);
     }
 
+    /** Get the cached NdbRecordImpl for this index and table. The NdbRecordImpl is cached in the
+     * cluster connection.
+     * @param storeTable the table
+     * @param storeIndex the index
+     * @return
+     */
+    protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
+        return clusterConnectionImpl.getCachedNdbRecordImpl(storeIndex, storeTable);
+    }
+
     /** 
      * Add an operation to check for errors after execute.
      * @param op the operation to check

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2012-04-02 20:43:14 +0000
@@ -95,7 +95,10 @@ class DbImpl implements com.mysql.cluste
     }
 
     public void close() {
-        Ndb.delete(ndb);
+        if (ndb != null) {
+            Ndb.delete(ndb);
+            ndb = null;
+        }
         clusterConnection.close(this);
     }
 
@@ -142,7 +145,7 @@ class DbImpl implements com.mysql.cluste
     /** Enlist an NdbTransaction using table and key data to specify 
      * the transaction coordinator.
      * 
-     * @param table the table
+     * @param tableName the name of the table
      * @param keyParts the list of partition key parts
      * @return the ndbTransaction
      */
@@ -199,8 +202,8 @@ class DbImpl implements com.mysql.cluste
      * the transaction coordinator. This method is also used if
      * the key data is null.
      * 
-     * @param table the table
-     * @param keyParts the list of partition key parts
+     * @param tableName the name of the table
+     * @param partitionId the partition id
      * @return the ndbTransaction
      */
     public NdbTransaction enlist(String tableName, int partitionId) {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java	2012-04-08 20:50:07 +0000
@@ -26,12 +26,13 @@ import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.mysql.clusterj.ClusterJDatastoreException;
 import com.mysql.clusterj.ClusterJFatalInternalException;
 import com.mysql.clusterj.ClusterJFatalUserException;
 import com.mysql.clusterj.ClusterJUserException;
+import com.mysql.clusterj.ColumnType;
 
 import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Index;
 import com.mysql.clusterj.core.store.Table;
 
 import com.mysql.clusterj.core.util.I18NHelper;
@@ -44,15 +45,23 @@ import com.mysql.ndbjtie.ndbapi.NdbRecor
 import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecification;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecificationArray;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
 
 /**
- * Wrapper around an NdbRecord. The default implementation can be used for create, read, update, or delete
- * using an NdbRecord that defines every column in the table. After construction, the instance is
+ * Wrapper around an NdbRecord. Operations may use one or two instances.
+ * <ul><li>The table implementation can be used for create, read, update, or delete
+ * using an NdbRecord that defines every column in the table.
+ * </li><li>The index implementation for unique indexes can be used with a unique lookup operation.
+ * </li><li>The index implementation for ordered (non-unique) indexes can be used with an index scan operation.
+ * </li></ul>
+ * After construction, the instance is
  * read-only and can be shared among all threads that use the same cluster connection; and the size of the
- * buffer required for operations is available. The NdbRecord instance is released when the cluster
+ * buffer required for operations is available. 
+ * Methods on the instance generally require a buffer to be passed, which is modified by the method.
+ * The NdbRecord instance is released when the cluster
  * connection is closed or when schema change invalidates it. Column values can be set using a provided
  * buffer and buffer manager.
  */
@@ -79,12 +88,15 @@ public class NdbRecordImpl {
     private RecordSpecificationArray recordSpecificationArray;
 
     /** The NdbTable */
-    TableConst tableConst;
+    TableConst tableConst = null;
 
-    /** The size of the receive buffer for this operation */
+    /** The NdbIndex, which will be null for complete-table instances */
+    IndexConst indexConst = null;
+
+    /** The size of the buffer for this NdbRecord */
     protected int bufferSize;
 
-    /** The maximum column id for this operation */
+    /** The maximum column id for this NdbRecord */
     protected int maximumColumnId;
 
     /** The offsets into the buffer for each column */
@@ -115,14 +127,16 @@ public class NdbRecordImpl {
     private Dictionary ndbDictionary;
 
     /** Number of columns for this NdbRecord */
-    private int numberOfColumns;
+    private int numberOfTableColumns;
 
     /** These fields are only used during construction of the RecordSpecificationArray */
     int offset = 0;
     int nullablePosition = 0;
     byte[] defaultValues;
 
-    /** Constructor used for insert operations that do not need to read data.
+    private int[] recordSpecificationIndexes = null;
+
+    /** Constructor for table operations.
      * 
      * @param storeTable the store table
      * @param ndbDictionary the ndb dictionary
@@ -130,14 +144,41 @@ public class NdbRecordImpl {
     protected NdbRecordImpl(Table storeTable, Dictionary ndbDictionary) {
         this.ndbDictionary = ndbDictionary;
         this.tableConst = getNdbTable(storeTable.getName());
-        this.numberOfColumns = tableConst.getNoOfColumns();
-        this.recordSpecificationArray = RecordSpecificationArray.create(numberOfColumns);
-        this.offsets = new int[numberOfColumns];
-        this.lengths = new int[numberOfColumns];
-        this.nullbitBitInByte = new int[numberOfColumns];
-        this.nullbitByteOffset = new int[numberOfColumns];
-        this.storeColumns = new Column[numberOfColumns];
+        this.numberOfTableColumns = tableConst.getNoOfColumns();
+        this.recordSpecificationArray = RecordSpecificationArray.create(numberOfTableColumns);
+        recordSpecificationIndexes = new int[numberOfTableColumns];
+        this.offsets = new int[numberOfTableColumns];
+        this.lengths = new int[numberOfTableColumns];
+        this.nullbitBitInByte = new int[numberOfTableColumns];
+        this.nullbitByteOffset = new int[numberOfTableColumns];
+        this.storeColumns = new Column[numberOfTableColumns];
         this.ndbRecord = createNdbRecord(storeTable, ndbDictionary);
+        if (logger.isDetailEnabled()) logger.detail(storeTable.getName() + " " + dumpDefinition());
+        initializeDefaultBuffer();
+    }
+
+    /** Constructor for index operations. The NdbRecord has columns just for
+     * the columns in the index. 
+     * 
+     * @param storeIndex the store index
+     * @param storeTable the store table
+     * @param ndbDictionary the ndb dictionary
+     */
+    protected NdbRecordImpl(Index storeIndex, Table storeTable, Dictionary ndbDictionary) {
+        this.ndbDictionary = ndbDictionary;
+        this.tableConst = getNdbTable(storeTable.getName());
+        this.indexConst = getNdbIndex(storeIndex.getInternalName(), tableConst.getName());
+        this.numberOfTableColumns = tableConst.getNoOfColumns();
+        int numberOfIndexColumns = this.indexConst.getNoOfColumns();
+        this.recordSpecificationArray = RecordSpecificationArray.create(numberOfIndexColumns);
+        recordSpecificationIndexes = new int[numberOfTableColumns];
+        this.offsets = new int[numberOfTableColumns];
+        this.lengths = new int[numberOfTableColumns];
+        this.nullbitBitInByte = new int[numberOfTableColumns];
+        this.nullbitByteOffset = new int[numberOfTableColumns];
+        this.storeColumns = new Column[numberOfTableColumns];
+        this.ndbRecord = createNdbRecord(storeIndex, storeTable, ndbDictionary);
+        if (logger.isDetailEnabled()) logger.detail(storeIndex.getInternalName() + " " + dumpDefinition());
         initializeDefaultBuffer();
     }
 
@@ -152,9 +193,10 @@ public class NdbRecordImpl {
         zeros.order(ByteOrder.nativeOrder());
         // just to be sure, initialize with zeros
         zeros.put(defaultValues);
+        // not all columns are set at this point, so only check for those that are set
         for (Column storeColumn: storeColumns) {
             // nullable columns get the null bit set
-            if (storeColumn.getNullable()) {
+            if (storeColumn != null && storeColumn.getNullable()) {
                 setNull(zeros, storeColumn);
             }
         }
@@ -170,13 +212,22 @@ public class NdbRecordImpl {
      */
     protected ByteBuffer newBuffer() {
         ByteBuffer result = ByteBuffer.allocateDirect(bufferSize);
-        result.order(ByteOrder.nativeOrder());
-        result.put(defaultValues);
-        result.limit(bufferSize);
-        result.position(0);
+        initializeBuffer(result);
         return result;
     }
 
+    /** Initialize an already-allocated buffer with default values for all columns.
+     * 
+     * @param buffer
+     */
+    protected void initializeBuffer(ByteBuffer buffer) {
+        buffer.order(ByteOrder.nativeOrder());
+        buffer.limit(bufferSize);
+        buffer.position(0);
+        buffer.put(defaultValues);
+        buffer.position(0);
+    }
+
     public int setNull(ByteBuffer buffer, Column storeColumn) {
         int columnId = storeColumn.getColumnId();
         if (!storeColumn.getNullable()) {
@@ -217,11 +268,11 @@ public class NdbRecordImpl {
     public int setByte(ByteBuffer buffer, Column storeColumn, byte value) {
         resetNull(buffer, storeColumn);
         int columnId = storeColumn.getColumnId();
-        if (storeColumn.getLength() == 4) {
+        if (storeColumn.getType() == ColumnType.Bit) {
             // the byte is stored as a BIT array of four bytes
-            buffer.putInt(offsets[columnId], value);
+            buffer.putInt(offsets[columnId], value & 0xff);
         } else {
-            buffer.put(offsets[columnId], (byte)value);
+            buffer.put(offsets[columnId], value);
         }
         buffer.limit(bufferSize);
         buffer.position(0);
@@ -232,8 +283,13 @@ public class NdbRecordImpl {
         resetNull(buffer, storeColumn);
         int columnId = storeColumn.getColumnId();
         int offset = offsets[columnId];
-        int length = storeColumn.getLength() + storeColumn.getPrefixLength();
-        buffer.limit(offset + length);
+        int length = storeColumn.getLength();
+        int prefixLength = storeColumn.getPrefixLength();
+        if (length < value.length) {
+            throw new ClusterJUserException(local.message("ERR_Data_Too_Long",
+                    storeColumn.getName(), length, value.length));
+        }
+        buffer.limit(offset + prefixLength + length);
         buffer.position(offset);
         Utility.convertValue(buffer, storeColumn, value);
         buffer.limit(bufferSize);
@@ -287,7 +343,7 @@ public class NdbRecordImpl {
         int columnId = storeColumn.getColumnId();
         if (storeColumn.getLength() == 4) {
             // the short is stored as a BIT array of four bytes
-            buffer.putInt(offsets[columnId], value);
+            buffer.putInt(offsets[columnId], value & 0xffff);
         } else {
             buffer.putShort(offsets[columnId], (short)value);
         }
@@ -298,15 +354,16 @@ public class NdbRecordImpl {
         resetNull(buffer, storeColumn);
         int columnId = storeColumn.getColumnId();
         int offset = offsets[columnId];
-        int length = storeColumn.getLength() + storeColumn.getPrefixLength();
+        int prefixLength = storeColumn.getPrefixLength();
+        int length = storeColumn.getLength() + prefixLength;
         buffer.limit(offset + length);
         buffer.position(offset);
         // TODO provide the buffer to Utility.encode to avoid copying
         // for now, use the encode method to encode the value then copy it
         ByteBuffer converted = Utility.encode(value, storeColumn, bufferManager);
         if (length < converted.remaining()) {
-            throw new ClusterJUserException(local.message("ERR_Data_Too_Large",
-                    storeColumn.getName(), length, converted.remaining()));
+            throw new ClusterJUserException(local.message("ERR_Data_Too_Long",
+                    storeColumn.getName(), length - prefixLength, converted.remaining() - prefixLength));
         }
         buffer.put(converted);
         buffer.limit(bufferSize);
@@ -321,9 +378,9 @@ public class NdbRecordImpl {
 
     public byte getByte(ByteBuffer buffer, int columnId) {
         Column storeColumn = storeColumns[columnId];
-        if (storeColumn.getLength() == 4) {
+        if (storeColumn.getType() == ColumnType.Bit) {
             // the byte was stored in a BIT column as four bytes
-            return (byte)buffer.get(offsets[columnId]);
+            return (byte)(buffer.getInt(offsets[columnId]));
         } else {
             // the byte was stored as a byte
             return buffer.get(offsets[columnId]);
@@ -391,7 +448,7 @@ public class NdbRecordImpl {
         Column storeColumn = storeColumns[columnId];
         if (storeColumn.getLength() == 4) {
             // the short was stored in a BIT column as four bytes
-            return (short)buffer.get(offsets[columnId]);
+            return (short)buffer.getInt(offsets[columnId]);
         } else {
             // the short was stored as a short
             return buffer.getShort(offsets[columnId]);
@@ -599,8 +656,35 @@ public class NdbRecordImpl {
         }
     }
 
+    protected NdbRecord createNdbRecord(Index storeIndex, Table storeTable, Dictionary ndbDictionary) {
+        String[] columnNames = storeIndex.getColumnNames();
+        // analyze columns; sort into alignment buckets, allocate space in the buffer
+        // and build the record specification array
+        analyzeColumns(storeTable, columnNames);
+        // create the NdbRecord
+        NdbRecord result = ndbDictionary.createRecord(indexConst, tableConst, recordSpecificationArray,
+                columnNames.length, SIZEOF_RECORD_SPECIFICATION, 0);
+        // delete the RecordSpecificationArray since it is no longer needed
+        RecordSpecificationArray.delete(recordSpecificationArray);
+        handleError(result, ndbDictionary);
+        return result;
+    }
+
     protected NdbRecord createNdbRecord(Table storeTable, Dictionary ndbDictionary) {
         String[] columnNames = storeTable.getColumnNames();
+        // analyze columns; sort into alignment buckets, allocate space in the buffer,
+        // and build the record specification array
+        analyzeColumns(storeTable, columnNames);
+        // create the NdbRecord
+        NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
+                columnNames.length, SIZEOF_RECORD_SPECIFICATION, 0);
+        // delete the RecordSpecificationArray since it is no longer needed
+        RecordSpecificationArray.delete(recordSpecificationArray);
+        handleError(result, ndbDictionary);
+        return result;
+    }
+
+    private void analyzeColumns(Table storeTable, String[] columnNames) {
         List<Column> align8 = new ArrayList<Column>();
         List<Column> align4 = new ArrayList<Column>();
         List<Column> align2 = new ArrayList<Column>();
@@ -609,6 +693,8 @@ public class NdbRecordImpl {
         int i = 0;
         for (String columnName: columnNames) {
             Column storeColumn = storeTable.getColumn(columnName);
+            int columnId = storeColumn.getColumnId();
+            recordSpecificationIndexes[columnId] = i;
             if (logger.isDetailEnabled()) logger.detail("storeColumn: " + storeColumn.getName() + " id: " + storeColumn.getColumnId() + " index: " + i);
             lengths[i] = storeColumn.getLength();
             storeColumns[i++] = storeColumn;
@@ -671,28 +757,20 @@ public class NdbRecordImpl {
         offset = (7 + offset) / 8 * 8;
         nullIndicatorSize = offset;
         for (Column storeColumn: align8) {
-            handleColumn(8, storeColumn);
+            analyzeColumn(8, storeColumn);
         }
         for (Column storeColumn: align4) {
-            handleColumn(4, storeColumn);
+            analyzeColumn(4, storeColumn);
         }
         for (Column storeColumn: align2) {
-            handleColumn(2, storeColumn);
+            analyzeColumn(2, storeColumn);
         }
         for (Column storeColumn: align1) {
-            handleColumn(1, storeColumn);
+            analyzeColumn(1, storeColumn);
         }
         bufferSize = offset;
 
         if (logger.isDebugEnabled()) logger.debug(dumpDefinition());
-
-        // now create an NdbRecord
-        NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
-                numberOfColumns, SIZEOF_RECORD_SPECIFICATION, 0);
-        // delete the RecordSpecificationArray since it is no longer needed
-        RecordSpecificationArray.delete(recordSpecificationArray);
-        handleError(result, ndbDictionary);
-        return result;
     }
 
     /** Create a record specification for a column. Keep track of the offset into the buffer
@@ -701,9 +779,10 @@ public class NdbRecordImpl {
      * @param alignment the alignment for this column in the buffer
      * @param storeColumn the column
      */
-    private void handleColumn(int alignment, Column storeColumn) {
+    private void analyzeColumn(int alignment, Column storeColumn) {
         int columnId = storeColumn.getColumnId();
-        RecordSpecification recordSpecification = recordSpecificationArray.at(columnId);
+        int recordSpecificationIndex = recordSpecificationIndexes[columnId];
+        RecordSpecification recordSpecification = recordSpecificationArray.at(recordSpecificationIndex);
         ColumnConst columnConst = tableConst.getColumn(columnId);
         recordSpecification.column(columnConst);
         recordSpecification.offset(offset);
@@ -727,21 +806,23 @@ public class NdbRecordImpl {
     private String dumpDefinition() {
         StringBuilder builder = new StringBuilder(tableConst.getName());
         builder.append(" numberOfColumns: ");
-        builder.append(numberOfColumns);
+        builder.append(numberOfTableColumns);
         builder.append('\n');
-        for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
+        for (int columnId = 0; columnId < numberOfTableColumns; ++columnId) {
             Column storeColumn = storeColumns[columnId];
-            builder.append(" column: ");
-            builder.append(storeColumn.getName());
-            builder.append(" offset: ");
-            builder.append(offsets[columnId]);
-            builder.append(" length: ");
-            builder.append(lengths[columnId]);
-            builder.append(" nullbitBitInByte: ");
-            builder.append(nullbitBitInByte[columnId]);
-            builder.append(" nullbitByteOffset: ");
-            builder.append(nullbitByteOffset[columnId]);
-            builder.append('\n');
+            if (storeColumn != null) {
+                builder.append(" column: ");
+                builder.append(storeColumn.getName());
+                builder.append(" offset: ");
+                builder.append(offsets[columnId]);
+                builder.append(" length: ");
+                builder.append(lengths[columnId]);
+                builder.append(" nullbitBitInByte: ");
+                builder.append(nullbitBitInByte[columnId]);
+                builder.append(" nullbitByteOffset: ");
+                builder.append(nullbitByteOffset[columnId]);
+                builder.append('\n');
+            }
         }
         return builder.toString();
     }
@@ -749,37 +830,39 @@ public class NdbRecordImpl {
     public String dumpValues(ByteBuffer data, byte[] mask) {
         StringBuilder builder = new StringBuilder(tableConst.getName());
         builder.append(" numberOfColumns: ");
-        builder.append(numberOfColumns);
+        builder.append(numberOfTableColumns);
         builder.append('\n');
-        for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
+        for (int columnId = 0; columnId < numberOfTableColumns; ++columnId) {
             Column storeColumn = storeColumns[columnId];
-            builder.append(" column: ");
-            builder.append(storeColumn.getName());
-            builder.append(" offset: ");
-            builder.append(offsets[columnId]);
-            builder.append(" length: ");
-            builder.append(lengths[columnId]);
-            builder.append(" nullbitBitInByte: ");
-            int nullBitInByte = nullbitBitInByte[columnId];
-            builder.append(nullBitInByte);
-            builder.append(" nullbitByteOffset: ");
-            int nullByteOffset = nullbitByteOffset[columnId];
-            builder.append(nullByteOffset);
-            builder.append(" data: ");
-            int size = storeColumn.getColumnSpace() != 0 ? storeColumn.getColumnSpace():storeColumn.getSize();
-            int offset = offsets[columnId];
-            data.limit(bufferSize);
-            data.position(0);
-            for (int index = offset; index < offset + size; ++index) {
-                builder.append(String.format("%2x ", data.get(index)));
-            }
-            builder.append(" null: ");
-            builder.append(isNull(data, columnId));
-            builder.append(" present: ");
-            if (mask != null) {
-                builder.append(isPresent(mask, columnId));
+            if (storeColumn != null) {
+                builder.append(" column: ");
+                builder.append(storeColumn.getName());
+                builder.append(" offset: ");
+                builder.append(offsets[columnId]);
+                builder.append(" length: ");
+                builder.append(lengths[columnId]);
+                builder.append(" nullbitBitInByte: ");
+                int nullBitInByte = nullbitBitInByte[columnId];
+                builder.append(nullBitInByte);
+                builder.append(" nullbitByteOffset: ");
+                int nullByteOffset = nullbitByteOffset[columnId];
+                builder.append(nullByteOffset);
+                builder.append(" data: ");
+                int size = storeColumn.getColumnSpace() != 0 ? storeColumn.getColumnSpace():storeColumn.getSize();
+                int offset = offsets[columnId];
+                data.limit(bufferSize);
+                data.position(0);
+                for (int index = offset; index < offset + size; ++index) {
+                    builder.append(String.format("%2x ", data.get(index)));
+                }
+                builder.append(" null: ");
+                builder.append(isNull(data, columnId));
+                if (mask != null) {
+                    builder.append(" present: ");
+                    builder.append(isPresent(mask, columnId));
+                }
+                builder.append('\n');
             }
-            builder.append('\n');
         }
         data.position(0);
         return builder.toString();
@@ -791,9 +874,28 @@ public class NdbRecordImpl {
             // try the lower case table name
             ndbTable = ndbDictionary.getTable(tableName.toLowerCase());
         }
+        if (ndbTable == null) {
+            Utility.throwError(ndbTable, ndbDictionary.getNdbError(), tableName);
+        }
         return ndbTable;
     }
 
+    TableConst getNdbTable() {
+        return tableConst;
+    }
+
+    IndexConst getNdbIndex(String indexName, String tableName) {
+        IndexConst ndbIndex = ndbDictionary.getIndex(indexName, tableName);
+        if (ndbIndex == null) {
+            Utility.throwError(ndbIndex, ndbDictionary.getNdbError(),  tableName+ "+" + indexName);
+        }
+        return ndbIndex;
+    }
+
+    IndexConst getNdbIndex() {
+        return indexConst;
+    }
+
     public int getBufferSize() {
         return bufferSize;
     }
@@ -803,7 +905,7 @@ public class NdbRecordImpl {
     }
 
     public int getNumberOfColumns() {
-        return numberOfColumns;
+        return numberOfTableColumns;
     }
 
     protected void releaseNdbRecord() {

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java	2012-04-05 15:12:21 +0000
@@ -0,0 +1,359 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJFatalInternalException;
+
+import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Index;
+import com.mysql.clusterj.core.store.IndexScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
+
+/** NdbRecordIndexScanOperationImpl performs index scans using NdbRecord.
+ * Two NdbRecordImpl instances are used: one to define the bounds (low and high)
+ * and one to define the result. The superclass NdbRecordScanOperationImpl
+ * holds the NdbRecordImpl and buffer that define and hold the result.
+ * <p>
+ * This instance declares and holds the bounds while they are being defined.
+ * Bounds are handled by creating two bound buffers: one for the low bound
+ * and a second for the high bound. While the bounds are being created, the
+ * number of columns and the strictness of the bound are recorded.
+ * <p>
+ * Bounds are calculated elsewhere based on the query parameters and delivered, in sequence,
+ * to this instance. Bounds are delivered for the most significant index column first,
+ * followed by the next most significant index column, until all columns that have bounds
+ * have been delivered. There may be more columns for the low bound versus the high bound,
+ * or vice versa. For each bound that is delivered, the method (assignBoundBuffer) determines 
+ * to which bound, low or high, the bound belongs. The column count is incremented
+ * for the appropriate bound buffer. The value is then applied to the bound buffer using
+ * the setXXX method of the NdbRecordImpl that manages the layout of the bound buffer.
+ * <p>
+ * The superclass declares and holds the filter while it is being defined.
+ * <p>
+ * At endDefinition, the filter is used to create the scanOptions which is
+ * passed to create the NdbIndexScanOperation. Then the bounds are set into
+ * the newly created NdbIndexScanOperation.
+ * The resulting NdbIndexScanOperation is iterated (scanned) by the NdbRecordResultDataImpl.
+ */
+public class NdbRecordIndexScanOperationImpl extends NdbRecordScanOperationImpl implements IndexScanOperation {
+
+    /** The ndb index scan operation */
+    private NdbIndexScanOperation ndbIndexScanOperation;
+
+    /** The range for this bound */
+    private int indexBoundRange = 0;
+
+    /** The buffer that contains low bounds for all index columns */
+    private ByteBuffer indexBoundLowBuffer = null;
+
+    /** The number of columns in the low bound */
+    private int indexBoundLowCount = 0;
+
+    /** Is the low bound strict? */
+    private boolean indexBoundLowStrict = false;
+
+    /** The buffer that contains high bounds for all index columns */
+    private ByteBuffer indexBoundHighBuffer = null;
+
+    /** The number of columns in the high bound */
+    private int indexBoundHighCount = 0;
+
+    /** Is the high bound strict? */
+    private boolean indexBoundHighStrict = false;
+
+    /** Is this an equal scan? */
+    private boolean equalScan = true;
+
+    /** The list of index bounds already defined; null for a single range */
+    List<NdbIndexScanOperation.IndexBound> ndbIndexBoundList = null;
+
+    public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
+            Index storeIndex, Table storeTable, int lockMode) {
+        this(clusterTransaction, storeIndex, storeTable, false, lockMode);
+    }
+
+    public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
+                Index storeIndex, Table storeTable, boolean multiRange, int lockMode) {
+        super(clusterTransaction, storeTable, lockMode);
+        this.multiRange = multiRange;
+        if (this.multiRange) {
+            ndbIndexBoundList = new ArrayList<NdbIndexScanOperation.IndexBound>();
+        }
+        ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeIndex, storeTable);
+        keyBufferSize = ndbRecordKeys.bufferSize;
+        indexBoundLowBuffer = ndbRecordKeys.newBuffer();
+        indexBoundHighBuffer = ndbRecordKeys.newBuffer();
+    }
+
+    public void endDefinition() {
+        // get the scan options which also sets the filter
+        getScanOptions();
+        if (logger.isDetailEnabled()) logger.detail("scan options present " + dumpScanOptions(scanOptions.optionsPresent(), scanOptions.scan_flags()));
+
+        // create the scan operation
+        ndbIndexScanOperation = clusterTransaction.scanIndex(
+                ndbRecordKeys.getNdbRecord(), ndbRecordValues.getNdbRecord(), mask, scanOptions);
+        ndbOperation = ndbIndexScanOperation;
+
+        // set the bounds, either from the single indexBound or from multiple ranges
+        if (ndbIndexBoundList != null) {
+            if (logger.isDetailEnabled()) logger.detail("list size " + ndbIndexBoundList.size());
+            // apply all of the bounds to the operation
+            for (NdbIndexScanOperation.IndexBound ndbIndexBound: ndbIndexBoundList) {
+                int returnCode = ndbIndexScanOperation.setBound(ndbRecordKeys.getNdbRecord(), ndbIndexBound);
+                handleError(returnCode, ndbIndexScanOperation);
+            }
+        } else {
+            // only one range defined
+            NdbIndexScanOperation.IndexBound ndbIndexBound = getNdbIndexBound();
+            int returnCode = ndbIndexScanOperation.setBound(ndbRecordKeys.getNdbRecord(), ndbIndexBound);
+            handleError(returnCode, ndbIndexScanOperation);
+        }
+    }
+
+    public void setBoundBigInteger(Column storeColumn, BoundType type, BigInteger value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setBigInteger(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setBigInteger(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setBigInteger(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundByte(Column storeColumn, BoundType type, byte value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setByte(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setByte(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setByte(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundBytes(Column storeColumn, BoundType type, byte[] value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setBytes(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setBytes(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setBytes(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundDecimal(Column storeColumn, BoundType type, BigDecimal value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setDecimal(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setDecimal(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setDecimal(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundDouble(Column storeColumn, BoundType type, Double value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setDouble(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setDouble(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setDouble(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundFloat(Column storeColumn, BoundType type, Float value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setFloat(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setFloat(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setFloat(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundShort(Column storeColumn, BoundType type, short value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setShort(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setShort(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setShort(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundInt(Column storeColumn, BoundType type, Integer value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setInt(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setInt(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setInt(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundLong(Column storeColumn, BoundType type, long value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setLong(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setLong(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setLong(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundString(Column storeColumn, BoundType type, String value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setString(indexBoundLowBuffer, bufferManager, storeColumn, value);
+            ndbRecordKeys.setString(indexBoundHighBuffer, bufferManager, storeColumn, value);
+        } else {
+            ndbRecordKeys.setString(keyBuffer, bufferManager, storeColumn, value);
+        }
+    }
+
+    public void endBound(int rangeNumber) {
+        if (logger.isDetailEnabled()) logger.detail("range: " + rangeNumber);
+        indexBoundRange = rangeNumber;
+        ndbIndexBoundList.add(getNdbIndexBound());
+    }
+
+    private ByteBuffer assignBoundBuffer(BoundType type) {
+        switch (type) {
+            case BoundEQ:
+                indexBoundHighCount++;
+                indexBoundLowCount++;
+                return null;
+            case BoundGE:
+                equalScan = false;
+                indexBoundHighCount++;
+                return indexBoundHighBuffer;
+            case BoundGT:
+                equalScan = false;
+                indexBoundHighStrict = true;
+                indexBoundHighCount++;
+                return indexBoundHighBuffer;
+            case BoundLE:
+                equalScan = false;
+                indexBoundLowCount++;
+                return indexBoundLowBuffer;
+            case BoundLT:
+                equalScan = false;
+                indexBoundLowStrict = true;
+                indexBoundLowCount++;
+                return indexBoundLowBuffer;
+            default:
+                throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+        }
+    }
+
+    /** Create an ndb index bound for the current bounds and clear the current bounds
+     * 
+     */
+    private NdbIndexScanOperation.IndexBound getNdbIndexBound() {
+        ByteBuffer reclaimed = null;
+        if (indexBoundLowCount + indexBoundHighCount > 0) {
+            if (indexBoundLowCount == 0) {
+                indexBoundLowBuffer =  null;
+            } else {
+                indexBoundLowBuffer.limit(keyBufferSize);
+                indexBoundLowBuffer.position(0);
+            }
+            if (indexBoundHighCount == 0) {
+                indexBoundHighBuffer =  null;
+            } else {
+                indexBoundHighBuffer.limit(keyBufferSize);
+                indexBoundHighBuffer.position(0);
+            }
+            if (equalScan) {
+                reclaimed = indexBoundLowBuffer;
+                indexBoundLowBuffer = indexBoundHighBuffer;
+            }
+            // set the index bound
+            NdbIndexScanOperation.IndexBound ndbindexBound = NdbIndexScanOperation.IndexBound.create();
+            ndbindexBound.low_key(indexBoundLowBuffer);
+            ndbindexBound.high_key(indexBoundHighBuffer);
+            ndbindexBound.low_key_count(indexBoundLowCount);
+            ndbindexBound.high_key_count(indexBoundHighCount);
+            ndbindexBound.low_inclusive(!indexBoundLowStrict);
+            ndbindexBound.high_inclusive(!indexBoundHighStrict);
+            ndbindexBound.range_no(indexBoundRange);
+            if (logger.isDetailEnabled()) logger.detail(
+                    " indexBoundLowCount: " + indexBoundLowCount + " indexBoundHighCount: " + indexBoundHighCount +
+                    " indexBoundLowStrict: " + indexBoundLowStrict + " indexBoundHighStrict: " + indexBoundHighStrict +
+                    " range: " + indexBoundRange
+                    );
+            // reset the index bound for the next range
+            // if equal bound, initialize and reuse previous buffer
+            if (reclaimed != null) {
+                indexBoundLowBuffer = reclaimed;
+                ndbRecordKeys.initializeBuffer(reclaimed);
+            } else {
+                indexBoundLowBuffer = ndbRecordKeys.newBuffer();
+            }
+            indexBoundHighBuffer = ndbRecordKeys.newBuffer();
+            indexBoundLowCount = 0;
+            indexBoundHighCount = 0;
+            indexBoundLowStrict = false;
+            indexBoundHighStrict = false;
+            indexBoundRange = 0;
+            equalScan = true;
+            return ndbindexBound;
+        } else {
+            return null;
+        }
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -23,20 +23,11 @@ public class NdbRecordInsertOperationImp
 
     public NdbRecordInsertOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
         super(clusterTransaction, storeTable);
-        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
         this.ndbRecordKeys = ndbRecordValues;
-        this.valueBufferSize = ndbRecordValues.getBufferSize();
-        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
-        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        this.keyBuffer = valueBuffer;
         resetMask();
     }
 
-    public void beginDefinition() {
-        // allocate a buffer for the operation data
-        valueBuffer = ndbRecordValues.newBuffer();
-        keyBuffer = valueBuffer;
-    }
-
     public void endDefinition() {
         ndbOperation = insert(clusterTransaction);
     }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -17,101 +17,31 @@
 
 package com.mysql.clusterj.tie;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import com.mysql.clusterj.core.store.Column;
-import com.mysql.clusterj.core.store.ResultData;
 import com.mysql.clusterj.core.store.Table;
 
 public class NdbRecordKeyOperationImpl extends NdbRecordOperationImpl {
 
     public NdbRecordKeyOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
         super(clusterTransaction, storeTable);
-        this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeTable);
-        this.keyBufferSize = ndbRecordKeys.getBufferSize();
-        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
-        this.valueBufferSize = ndbRecordValues.getBufferSize();
-        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
-        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
-        resetMask();
-    }
-
-    public void beginDefinition() {
-        // allocate a buffer for the key data
-        keyBuffer = ByteBuffer.allocateDirect(keyBufferSize);
-        keyBuffer.order(ByteOrder.nativeOrder());
-        // allocate a buffer for the value result data
-        // TODO: we should not need another buffer
-        valueBuffer = ByteBuffer.allocateDirect(valueBufferSize);
-        valueBuffer.order(ByteOrder.nativeOrder());
-    }
-
-    /** Specify the columns to be used for the operation.
-     */
-    public void getValue(Column storeColumn) {
-        int columnId = storeColumn.getColumnId();
-        columnSet(columnId);
-    }
-
-    /**
-     * Mark this blob column to be read.
-     * @param storeColumn the store column
-     */
-    @Override
-    public void getBlob(Column storeColumn) {
-        // create an NdbRecordBlobImpl for the blob
-        int columnId = storeColumn.getColumnId();
-        columnSet(columnId);
-        NdbRecordBlobImpl blob = new NdbRecordBlobImpl(this, storeColumn);
-        blobs[columnId] = blob;
+        this.ndbRecordKeys = this.ndbRecordValues;
+        this.keyBufferSize = this.valueBufferSize;
+        this.keyBuffer = valueBuffer;
     }
 
     public void endDefinition() {
-        // position the key buffer at the beginning for ndbjtie
-        keyBuffer.position(0);
-        keyBuffer.limit(keyBufferSize);
         // position the value buffer at the beginning for ndbjtie
         valueBuffer.position(0);
         valueBuffer.limit(valueBufferSize);
         // create the key operation
         ndbOperation = clusterTransaction.readTuple(ndbRecordKeys.getNdbRecord(), keyBuffer,
                 ndbRecordValues.getNdbRecord(), valueBuffer, mask, null);
-        // set up a callback when this operation is executed
-        clusterTransaction.postExecuteCallback(new Runnable() {
-            public void run() {
-                for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
-                    NdbRecordBlobImpl blob = blobs[columnId];
-                    if (blob != null) {
-                        blob.setNdbBlob();
-                    }
-                }
-            }
-        });
-    }
-
-    /** Construct a new ResultData using the saved column data and then execute the operation.
-     */
-    @Override
-    public ResultData resultData() {
-        return resultData(true);
-    }
-
-    /** Construct a new ResultData and if requested, execute the operation.
-     */
-    @Override
-    public ResultData resultData(boolean execute) {
-        NdbRecordResultDataImpl result =
-            new NdbRecordResultDataImpl(this);
-        if (execute) {
-            clusterTransaction.executeNoCommit(false, true);
-        }
-        return result;
+        // set the NdbBlob for all active blob columns
+        activateBlobs();
     }
 
     @Override
     public String toString() {
-        return " key " + tableName;
+        return " primary key " + tableName;
     }
 
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java	2012-04-05 05:45:15 +0000
@@ -45,6 +45,8 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
 
 /**
  * Implementation of store operation that uses NdbRecord.
+ * Operations of the "equal" variety delegate to the key NdbRecordImpl.
+ * Operations of the "set" and "get" varieties delegate to the value NdbRecordImpl.
  */
 public class NdbRecordOperationImpl implements Operation {
 
@@ -68,7 +70,11 @@ public class NdbRecordOperationImpl impl
     /** The NdbRecord for values */
     protected NdbRecordImpl ndbRecordValues = null;
 
-    /** The mask for this operation, which contains a bit set for each column accessed */
+    /** The mask for this operation, which contains a bit set for each column referenced.
+     * For insert, this contains a bit for each column to be inserted.
+     * For update, this contains a bit for each column to be updated.
+     * For read/scan operations, this contains a bit for each column to be read.
+     */
     byte[] mask;
 
     /** The ByteBuffer containing keys */
@@ -89,27 +95,32 @@ public class NdbRecordOperationImpl impl
     /** The size of the value buffer for this operation */
     protected int valueBufferSize;
 
-    /** The size of the null indicator byte array */
-    protected int nullIndicatorSize;
-
     /** The buffer manager for string encode and decode */
     protected BufferManager bufferManager;
 
     /** The table name */
     protected String tableName;
 
+    /** The store table */
+    protected Table storeTable;
+
     /** The store columns. */
     protected Column[] storeColumns;
 
     /** The number of columns */
     int numberOfColumns;
 
-    /** Constructor used for smart value handler.
+    /** Constructor used for smart value handler for new instances,
+     * and the cluster transaction is not yet known. There is only one
+     * NdbRecord and one buffer, so all operations result in using
+     * the same buffer.
      * 
      * @param clusterConnection the cluster connection
+     * @param db the Db
      * @param storeTable the store table
      */
     public NdbRecordOperationImpl(ClusterConnectionImpl clusterConnection, Db db, Table storeTable) {
+        this.storeTable = storeTable;
         this.tableName = storeTable.getName();
         this.ndbRecordValues = clusterConnection.getCachedNdbRecordImpl(storeTable);
         this.ndbRecordKeys = ndbRecordValues;
@@ -124,18 +135,41 @@ public class NdbRecordOperationImpl impl
         resetMask();
     }
 
-    protected void resetMask() {
-        this.mask = new byte[1 + (numberOfColumns/8)];
-    }
-
-    /** Constructor used for insert and delete operations that do not need to read data.
+    /** Constructor used when the transaction is known.
      * 
      * @param clusterTransaction the cluster transaction
      */
     public NdbRecordOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
-        this.tableName = storeTable.getName();
         this.clusterTransaction = clusterTransaction;
         this.bufferManager = clusterTransaction.getBufferManager();
+        this.tableName = storeTable.getName();
+        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+        this.valueBufferSize = ndbRecordValues.getBufferSize();
+        this.valueBuffer = ndbRecordValues.newBuffer();
+        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
+        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        resetMask();
+    }
+
+    /** Constructor used to copy an existing NdbRecordOperationImpl for use with a SmartValueHandler.
+     * The value buffer is copied and cannot be used by the existing NdbRecordOperationImpl.
+     * 
+     * @param ndbRecordOperationImpl2 the existing NdbRecordOperationImpl with value buffer
+     */
+    public NdbRecordOperationImpl(NdbRecordOperationImpl ndbRecordOperationImpl2) {
+        this.ndbRecordValues = ndbRecordOperationImpl2.ndbRecordValues;
+        this.valueBufferSize = ndbRecordOperationImpl2.valueBufferSize;
+        this.ndbRecordKeys = ndbRecordValues;
+        this.keyBufferSize = ndbRecordKeys.bufferSize;
+        this.valueBuffer = ndbRecordOperationImpl2.valueBuffer;
+        this.keyBuffer = this.valueBuffer;
+        this.bufferManager = ndbRecordOperationImpl2.bufferManager;
+        this.tableName = ndbRecordOperationImpl2.tableName;
+        this.storeColumns = ndbRecordOperationImpl2.ndbRecordValues.storeColumns;
+        this.numberOfColumns = this.storeColumns.length;
+        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        this.activeBlobs = ndbRecordOperationImpl2.activeBlobs;
+        resetMask();
     }
 
     public NdbOperationConst insert(ClusterTransactionImpl clusterTransactionImpl) {
@@ -206,6 +240,20 @@ public class NdbRecordOperationImpl impl
         }
     }
 
+    protected void resetMask() {
+        this.mask = new byte[1 + (numberOfColumns/8)];
+    }
+
+    public void allocateValueBuffer() {
+        this.valueBuffer = ndbRecordValues.newBuffer();
+    }
+
+    protected void activateBlobs() {
+        for (NdbRecordBlobImpl blob: activeBlobs) {
+            blob.setNdbBlob();
+        }
+    }
+
     public void equalBigInteger(Column storeColumn, BigInteger value) {
         int columnId = ndbRecordKeys.setBigInteger(keyBuffer, storeColumn, value);
         columnSet(columnId);
@@ -262,8 +310,7 @@ public class NdbRecordOperationImpl impl
     }
 
     public void getBlob(Column storeColumn) {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.getBlob(Column)"));
+        getBlobHandle(storeColumn);
     }
 
     /**
@@ -285,11 +332,10 @@ public class NdbRecordOperationImpl impl
     }
 
     /** Specify the columns to be used for the operation.
-     * This is implemented by a subclass.
      */
     public void getValue(Column storeColumn) {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.getValue(Column)"));
+        int columnId = storeColumn.getColumnId();
+        columnSet(columnId);
     }
 
     public void postExecuteCallback(Runnable callback) {
@@ -297,19 +343,20 @@ public class NdbRecordOperationImpl impl
     }
 
     /** Construct a new ResultData using the saved column data and then execute the operation.
-     * This is implemented by a subclass.
      */
     public ResultData resultData() {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.resultData()"));
+        return resultData(true);
     }
 
     /** Construct a new ResultData and if requested, execute the operation.
-     * This is implemented by a subclass.
      */
     public ResultData resultData(boolean execute) {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.resultData(boolean)"));
+        NdbRecordResultDataImpl result =
+            new NdbRecordResultDataImpl(this);
+        if (execute) {
+            clusterTransaction.executeNoCommit(false, true);
+        }
+        return result;
     }
 
     public void setBigInteger(Column storeColumn, BigInteger value) {
@@ -594,10 +641,6 @@ public class NdbRecordOperationImpl impl
         return ndbRecordValues.getLong(valueBuffer, columnId);
     }
 
-    public long getLong(Column storeColumn) {
-        return getLong(storeColumn.getColumnId());
-     }
-
     public float getFloat(int columnId) {
         return ndbRecordValues.getFloat(valueBuffer, columnId);
     }
@@ -718,19 +761,21 @@ public class NdbRecordOperationImpl impl
     }
 
     public void beginDefinition() {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-        "NdbRecordResultDataImpl.beginDefinition()"));
+        // by default, nothing to do
     }
 
     public void endDefinition() {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-        "NdbRecordResultDataImpl.endDefinition()"));
+        // by default, nothing to do
     }
 
     public String dumpValues() {
         return ndbRecordValues.dumpValues(valueBuffer, mask);
     }
 
+    public String dumpKeys() {
+        return ndbRecordKeys.dumpValues(keyBuffer, null);
+    }
+
     public boolean isModified(int columnId) {
         return ndbRecordValues.isPresent(mask, columnId);
     }
@@ -781,4 +826,16 @@ public class NdbRecordOperationImpl impl
         }
     }
 
+    /** Transform this NdbRecordOperationImpl into one that can be used to back a SmartValueHandler.
+     * For instances that are used in primary key or unique key operations, the same instance is used.
+     * Scans are handled by a subclass that overrides this method.
+     * 
+     * @return this NdbRecordOperationImpl
+     */
+    public NdbRecordOperationImpl transformNdbRecordOperationImpl() {
+        this.keyBuffer = valueBuffer;
+        resetModified();
+        return this;
+    }
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java	2012-04-02 20:43:14 +0000
@@ -31,7 +31,7 @@ import com.mysql.clusterj.core.util.Logg
 import com.mysql.clusterj.core.util.LoggerFactoryService;
 
 /**
- *
+ * Handle the results of an operation using NdbRecord. 
  */
 class NdbRecordResultDataImpl implements ResultData {
 
@@ -43,28 +43,21 @@ class NdbRecordResultDataImpl implements
     static final Logger logger = LoggerFactoryService.getFactory()
             .getInstance(NdbRecordResultDataImpl.class);
 
-    /** Flags for iterating a scan */
-    protected final int RESULT_READY = 0;
-    protected final int SCAN_FINISHED = 1;
-    protected final int CACHE_EMPTY = 2;
-
     /** The NdbOperation that defines the result */
-    private NdbRecordOperationImpl operation = null;
+    protected NdbRecordOperationImpl operation = null;
 
     /** The flag indicating that there are no more results */
     private boolean nextDone;
 
-    /** Construct the ResultDataImpl based on an NdbRecordOperationImpl, and the 
-     * buffer manager to help with string columns.
+    /** Construct the ResultDataImpl based on an NdbRecordOperationImpl.
      * @param operation the NdbRecordOperationImpl
-     * @param bufferManager the buffer manager
      */
     public NdbRecordResultDataImpl(NdbRecordOperationImpl operation) {
         this.operation = operation;
     }
 
     public boolean next() {
-        // NdbOperation has exactly zero or one result. ScanResultDataImpl handles scans...
+        // NdbOperation has exactly zero or one result. NdbRecordScanResultDataImpl handles scans...
         // if the ndbOperation reports an error there is no result
         int errorCode = operation.errorCode();
         if (errorCode != 0) {
@@ -264,4 +257,12 @@ class NdbRecordResultDataImpl implements
         return null;
     }
 
+    /** Return an operation that can be used by SmartValueHandler.
+     * The operation contains the buffer with the row data from the operation.
+     * @return the operation
+     */
+    public NdbRecordOperationImpl transformOperation() {
+        return operation.transformNdbRecordOperationImpl();
+    }
+
 }

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,223 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.spi.QueryExecutionContext;
+import com.mysql.clusterj.core.store.ResultData;
+import com.mysql.clusterj.core.store.ScanFilter;
+import com.mysql.clusterj.core.store.ScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+import com.mysql.ndbjtie.ndbapi.NdbInterpretedCode;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
+import com.mysql.ndbjtie.ndbapi.NdbScanFilter;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst.Type;
+
+/** NdbRecordScanOperationImpl performs table and index scans using NdbRecord.
+ * The scans are set up via subclasses. After executing, the NdbRecordScanOperationImpl instance
+ * is owned and iterated (scanned) by NdbRecordScanResultDataImpl.
+ */
+public abstract class NdbRecordScanOperationImpl extends NdbRecordOperationImpl implements ScanOperation {
+
+    /** The ndb scan options */
+    ScanOptions scanOptions = null;
+
+    /** The ndb scan filter */
+    NdbScanFilter ndbScanFilter = null;
+
+    /** The ndb interpreted code used for filters */
+    NdbInterpretedCode ndbInterpretedCode = null;
+
+    /** Is this scan multi-range? */
+    protected boolean multiRange = false;
+
+    /** The lock mode for this operation */
+    int lockMode;
+
+    public NdbRecordScanOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable,
+            int lockMode) {
+        super(clusterTransaction, storeTable);
+        this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+        this.keyBufferSize = ndbRecordKeys.getBufferSize();
+        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+        this.valueBufferSize = ndbRecordValues.getBufferSize();
+        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
+        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        this.lockMode = lockMode;
+        resetMask();
+    }
+
+    /** Construct a new ResultData and if requested, execute the operation.
+     */
+    @Override
+    public ResultData resultData(boolean execute) {
+        NdbRecordResultDataImpl result =
+            new NdbRecordScanResultDataImpl(this);
+        if (execute) {
+            clusterTransaction.executeNoCommit(false, true);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return " scan " + tableName;
+    }
+
+    /** Deallocate resources used in by this scan after the scan is complete.
+     * 
+     */
+    public void close() {
+        if (ndbInterpretedCode != null) {
+            NdbInterpretedCode.delete(ndbInterpretedCode);
+        }
+        if (ndbScanFilter != null) {
+            NdbScanFilter.delete(ndbScanFilter);
+        }
+        if (scanOptions != null) {
+            ScanOptions.delete(scanOptions);
+        }
+        ((NdbScanOperation)ndbOperation).close(true, true);
+    }
+
+    public void deleteCurrentTuple() {
+        int returnCode = ((NdbScanOperation)ndbOperation).deleteCurrentTuple();
+        handleError(returnCode, ndbOperation);
+    }
+
+    /** Create scan options for this scan. 
+     * Scan options are used to set a filter into the NdbScanOperation,
+     * set the key info flag if using a lock mode that requires lock takeover, and set the multi range flag.
+     */
+    protected void getScanOptions() {
+        long options = 0L;
+        int flags = 0;
+        if (multiRange | (ndbScanFilter != null) | 
+                (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead)) {
+
+            scanOptions = ScanOptions.create();
+            if (multiRange) {
+                flags |= ScanFlag.SF_MultiRange;
+                options |= (long)Type.SO_SCANFLAGS;
+                scanOptions.scan_flags(flags);
+            }
+            if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+                flags |= ScanFlag.SF_KeyInfo;
+                options |= (long)Type.SO_SCANFLAGS;
+                scanOptions.scan_flags(flags);
+            }
+            if (ndbScanFilter != null) {
+                options |= (long)Type.SO_INTERPRETED;
+                scanOptions.interpretedCode(ndbScanFilter.getInterpretedCode());
+            }
+            
+            scanOptions.optionsPresent(options);
+        }
+        if (logger.isDebugEnabled()) logger.debug("ScanOptions: " + dumpScanOptions(options, flags));
+    }
+
+    protected String dumpScanOptions(long optionsPresent, int flags) {
+        StringBuilder builder = new StringBuilder();
+        if (0L != (optionsPresent & (long)Type.SO_BATCH)) builder.append("SO_BATCH ");
+        if (0L != (optionsPresent & (long)Type.SO_GETVALUE)) builder.append("SO_GETVALUE ");
+        if (0L != (optionsPresent & (long)Type.SO_PARALLEL)) builder.append("SO_PARALLEL ");
+        if (0L != (optionsPresent & (long)Type.SO_CUSTOMDATA)) builder.append("SO_CUSTOMDATA ");
+        if (0L != (optionsPresent & (long)Type.SO_INTERPRETED)) builder.append("SO_INTERPRETED ");
+        if (0L != (optionsPresent & (long)Type.SO_PARTITION_ID)) builder.append("SO_PARTITION_ID ");
+        if (0L != (optionsPresent & (long)Type.SO_SCANFLAGS)) {
+            builder.append("SO_SCANFLAGS(");
+            if (0 != (flags & ScanFlag.SF_KeyInfo)) builder.append("SF_KeyInfo ");
+            if (0 != (flags & ScanFlag.SF_Descending)) builder.append("SF_Descending ");
+            if (0 != (flags & ScanFlag.SF_DiskScan)) builder.append("SF_DiskScan ");
+            if (0 != (flags & ScanFlag.SF_MultiRange)) builder.append("SF_MultiRange ");
+            if (0 != (flags & ScanFlag.SF_OrderBy)) builder.append("SF_OrderBy ");
+            if (0 != (flags & ScanFlag.SF_ReadRangeNo)) builder.append("SF_ReadRangeNo ");
+            if (0 != (flags & ScanFlag.SF_TupScan)) builder.append("SF_TupScan ");
+            builder.append(")");
+        }
+        return builder.toString();
+    }
+
+    /** Create a scan filter for this scan.
+     * @param context the query execution context
+     * @return the ScanFilter to build the filter for the scan
+     */
+    public ScanFilter getScanFilter(QueryExecutionContext context) {
+        
+        ndbInterpretedCode = NdbInterpretedCode.create(ndbRecordValues.getNdbTable(), null, 0);
+        ndbScanFilter = NdbScanFilter.create(ndbInterpretedCode);
+        handleError(ndbScanFilter, ndbOperation);
+        ScanFilter scanFilter = new ScanFilterImpl(ndbScanFilter);
+        context.addFilter(scanFilter);
+        return scanFilter;
+    }
+
+    /** Get the next result from the scan.
+     * Only used for deletePersistentAll to scan the table and delete all rows.
+     */
+    public int nextResult(boolean fetch) {
+        int result = ((NdbScanOperation)ndbOperation).nextResult(fetch, false);
+        clusterTransaction.handleError(result);
+        return result;
+    }
+
+    /** Get the next result from the scan. Copy the data into a newly allocated result buffer.
+     * 
+     */
+    public int nextResultCopyOut(boolean fetch, boolean force) {
+        allocateValueBuffer();
+        int result = ((NdbScanOperation)ndbOperation).nextResultCopyOut(valueBuffer, fetch, force);
+        return result;
+    }
+
+    /** Transfer the lock on the current tuple to the original transaction.
+     * This allows the original transaction to keep the results locked until
+     * the original transaction completes.
+     * Only transfer the lock if the lock mode is not committed read
+     * (there is no lock held for committed read).
+     */
+    public void lockCurrentTuple() {
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            NdbOperationConst op = ((NdbScanOperation)ndbOperation).lockCurrentTuple(
+                    clusterTransaction.ndbTransaction, ndbRecordValues.getNdbRecord(),
+                    null, null, null, 0);
+            if (op == null) {
+                Utility.throwError(op, ndbOperation.getNdbError());
+            }
+        }
+    }
+
+    /** Transform this NdbRecordOperationImpl into one that can be used to back a SmartValueHandler.
+     * For instances that are used in scans, create a new instance and allocate a new buffer
+     * to continue the scan.
+     * 
+     * @return the NdbRecordOperationImpl
+     */
+    @Override
+    public NdbRecordOperationImpl transformNdbRecordOperationImpl() {
+        NdbRecordOperationImpl result = new NdbRecordOperationImpl(this);
+        // we gave away our buffers; get new ones for the next result
+        this.valueBuffer = ndbRecordValues.newBuffer();
+        this.keyBuffer = valueBuffer;
+        return result;
+    }
+
+}

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,87 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+
+/**
+ *
+ */
+class NdbRecordScanResultDataImpl extends NdbRecordResultDataImpl {
+
+    /** My message translator */
+    static final I18NHelper local = I18NHelper
+            .getInstance(NdbRecordScanResultDataImpl.class);
+
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance(NdbRecordScanResultDataImpl.class);
+
+    /** Flags for iterating a scan */
+    protected final int RESULT_READY = 0;
+    protected final int SCAN_FINISHED = 1;
+    protected final int CACHE_EMPTY = 2;
+
+    /** The NdbOperation that defines the result */
+    private NdbRecordScanOperationImpl scanOperation = null;
+
+    /** The NdbScanOperation */
+    private NdbScanOperation ndbScanOperation = null;
+
+    /** Construct the ResultDataImpl based on an NdbRecordOperationImpl.
+     * When used with the compatibility operations, delegate to the NdbRecordOperation
+     * to copy data.
+     * @param operation the NdbRecordOperationImpl
+     */
+    public NdbRecordScanResultDataImpl(NdbRecordScanOperationImpl scanOperation) {
+        super(scanOperation);
+        this.scanOperation = scanOperation;
+        this.ndbScanOperation = (NdbScanOperation)scanOperation.ndbOperation;
+    }
+
+    @Override
+    public boolean next() {
+        // NdbScanOperation may have many results.
+        boolean done = false;
+        boolean fetch = false;
+        boolean force = true; // always true for scans
+        while (!done) {
+            int result = scanOperation.nextResultCopyOut(fetch, force);
+            switch (result) {
+                case RESULT_READY:
+                    // if scanning with locks, grab the lock for the current transaction
+                    scanOperation.lockCurrentTuple();
+                    return true;
+                case SCAN_FINISHED:
+                    ndbScanOperation.close(true, true);
+                    return false;
+                case CACHE_EMPTY:
+                    fetch = true;
+                    break;
+                default:
+                    Utility.throwError(result, ndbScanOperation.getNdbError());
+            }
+        }
+        return true; // this statement is needed to make the compiler happy but it's never executed
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java	2012-04-02 20:43:14 +0000
@@ -21,6 +21,7 @@ import com.mysql.clusterj.core.metadata.
 import com.mysql.clusterj.core.spi.ValueHandler;
 import com.mysql.clusterj.core.spi.ValueHandlerFactory;
 import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.ResultData;
 
 public class NdbRecordSmartValueHandlerFactoryImpl implements ValueHandlerFactory {
 
@@ -35,4 +36,11 @@ public class NdbRecordSmartValueHandlerF
         return result;
     }
 
+    public <T> ValueHandler getValueHandler(
+            DomainTypeHandlerImpl<T> domainTypeHandler, Db db, ResultData resultData) {
+        NdbRecordSmartValueHandlerImpl result;
+        result = new NdbRecordSmartValueHandlerImpl(domainTypeHandler, db, resultData);
+        return result;
+    }
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java	2012-04-08 20:50:07 +0000
@@ -38,12 +38,11 @@ import com.mysql.clusterj.core.metadata.
 import com.mysql.clusterj.core.spi.DomainFieldHandler;
 import com.mysql.clusterj.core.spi.DomainTypeHandler;
 import com.mysql.clusterj.core.spi.SmartValueHandler;
-import com.mysql.clusterj.core.spi.ValueHandler;
 
 import com.mysql.clusterj.core.store.ClusterTransaction;
 import com.mysql.clusterj.core.store.Db;
 import com.mysql.clusterj.core.store.Operation;
-import com.mysql.clusterj.core.store.Table;
+import com.mysql.clusterj.core.store.ResultData;
 
 import com.mysql.clusterj.core.util.I18NHelper;
 import com.mysql.clusterj.core.util.Logger;
@@ -94,14 +93,10 @@ public class NdbRecordSmartValueHandlerI
 
     private Object proxy;
 
-    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db) {
+    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler) {
         this.domainTypeHandler = domainTypeHandler;
         this.domainFieldHandlers = domainTypeHandler.getFieldHandlers();
         fieldNumberToColumnNumberMap = domainTypeHandler.getFieldNumberToColumnNumberMap();
-
-        Table storeTable = domainTypeHandler.getStoreTable();
-        this.operation = ((DbImpl)db).newNdbRecordOperationImpl(storeTable);
-
         numberOfTransientFields = domainTypeHandler.getNumberOfTransientFields();
         transientModified = new boolean[numberOfTransientFields];
         if (numberOfTransientFields != 0) {
@@ -109,6 +104,16 @@ public class NdbRecordSmartValueHandlerI
         }
     }
 
+    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db) {
+        this(domainTypeHandler);
+        this.operation = ((DbImpl)db).newNdbRecordOperationImpl(domainTypeHandler.getStoreTable());
+    }
+
+    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db, ResultData resultData) {
+        this(domainTypeHandler);
+        this.operation = ((NdbRecordResultDataImpl)resultData).transformOperation();
+    }
+
     public Operation insert(ClusterTransaction clusterTransaction) {
         if (logger.isDetailEnabled()) logger.detail("smart insert for type: " + domainTypeHandler.getName()
                 + "\n" + operation.dumpValues());
@@ -199,8 +204,8 @@ public class NdbRecordSmartValueHandlerI
     }
 
     public boolean[] getBooleans(int fieldNumber) {
-        // TODO Auto-generated method stub
-        return null;
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
+                "NdbRecordSmartValueHandler.getBooleans(int)"));
     }
 
     public byte getByte(int fieldNumber) {
@@ -445,8 +450,8 @@ public class NdbRecordSmartValueHandlerI
     }
 
     public void setBooleans(int fieldNumber, boolean[] b) {
-        // TODO Auto-generated method stub
-        
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
+                "NdbRecordSmartValueHandler.setBooleans(int, boolean[])"));
     }
 
     public void setByte(int fieldNumber, byte value) {
@@ -686,8 +691,10 @@ public class NdbRecordSmartValueHandlerI
         int columnId = fieldNumberToColumnNumberMap[fieldNumber];
         if (columnId < 0) {
             transientValues[-1 - columnId] = value;
+            transientModified[-1 - columnId] = true;
+        } else {
+            domainFieldHandlers[fieldNumber].objectSetValue(value, this);
         }
-        domainFieldHandlers[fieldNumber].objectSetValue(value, this);
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,40 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.ScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+/** NdbRecordTableScanOperationImpl performs table scans using NdbRecord.
+ * Most methods are implemented in the superclass.
+ */
+public class NdbRecordTableScanOperationImpl extends NdbRecordScanOperationImpl implements ScanOperation {
+
+    public NdbRecordTableScanOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable,
+            int lockMode) {
+        super(clusterTransaction, storeTable, lockMode);
+    }
+
+    public void endDefinition() {
+        // get the scan options which also sets the filter
+        getScanOptions();
+        // create the ndb scan operation
+        ndbOperation = clusterTransaction.scanTable(ndbRecordValues.getNdbRecord(), mask, scanOptions);
+    }
+
+}

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,53 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.Index;
+import com.mysql.clusterj.core.store.IndexOperation;
+import com.mysql.clusterj.core.store.Table;
+
+public class NdbRecordUniqueKeyOperationImpl extends NdbRecordOperationImpl implements IndexOperation {
+
+    public NdbRecordUniqueKeyOperationImpl(ClusterTransactionImpl clusterTransaction, Index storeIndex, Table storeTable) {
+        super(clusterTransaction, storeTable);
+        this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeIndex, storeTable);
+        this.keyBufferSize = ndbRecordKeys.getBufferSize();
+        // allocate a buffer for the key data
+        keyBuffer = ndbRecordKeys.newBuffer();
+    }
+
+    public void endDefinition() {
+        // position the key buffer at the beginning for ndbjtie
+        keyBuffer.limit(keyBufferSize);
+        keyBuffer.position(0);
+        // position the value buffer at the beginning for ndbjtie
+        valueBuffer.limit(valueBufferSize);
+        valueBuffer.position(0);
+        // create the key operation
+        ndbOperation = clusterTransaction.readTuple(ndbRecordKeys.getNdbRecord(), keyBuffer,
+                ndbRecordValues.getNdbRecord(), valueBuffer, mask, null);
+        // set the NdbBlob for all active blob columns
+        activateBlobs();
+    }
+
+    @Override
+    public String toString() {
+        return " unique key " + tableName;
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java	2012-04-05 05:45:15 +0000
@@ -128,12 +128,6 @@ class ResultDataImpl implements ResultDa
         }
     }
 
-    private NdbRecAttr getValue(NdbOperation ndbOperation2, int columnId,
-            ByteBuffer byteBuffer2) {
-        // TODO: to help profiling
-        return ndbOperation2.getValue(columnId, byteBuffer2);
-    }
-
     public boolean next() {
         // NdbOperation has exactly zero or one result. ScanResultDataImpl handles scans...
         NdbErrorConst error = ndbOperation.getNdbError();
@@ -215,7 +209,7 @@ class ResultDataImpl implements ResultDa
     public long getLong(Column storeColumn) {
         int index = storeColumn.getColumnId();
         NdbRecAttr ndbRecAttr = ndbRecAttrs[index];
-        return Utility.getLong(storeColumn, ndbRecAttr);
+        return Utility.getLong(storeColumn, ndbRecAttr.int64_value());
      }
 
     public float getFloat(int column) {
@@ -377,7 +371,7 @@ class ResultDataImpl implements ResultDa
     public Long getObjectLong(Column storeColumn) {
         int index = storeColumn.getColumnId();
         NdbRecAttr ndbRecAttr = ndbRecAttrs[index];
-        return (ndbRecAttr.isNULL() == 1)?null:Utility.getLong(storeColumn, ndbRecAttr);
+        return (ndbRecAttr.isNULL() == 1)?null:Utility.getLong(storeColumn, ndbRecAttr.int64_value());
     }
 
     public Float getObjectFloat(int column) {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java	2012-04-02 20:43:14 +0000
@@ -27,6 +27,7 @@ import com.mysql.clusterj.core.util.Logg
 import com.mysql.clusterj.tie.DbImpl.BufferManager;
 
 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode;
 
 /**
  *
@@ -67,6 +68,9 @@ class ScanResultDataImpl extends ResultD
             int result = ndbScanOperation.nextResult(fetch, force);
             switch (result) {
                 case RESULT_READY:
+                    if (ndbScanOperation.getLockMode() != LockMode.LM_CommittedRead) { 
+                        ndbScanOperation.lockCurrentTuple();
+                    }
                     return true;
                 case SCAN_FINISHED:
                     ndbScanOperation.close(true, true);

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java	2012-03-29 00:25:53 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java	2012-04-05 06:37:40 +0000
@@ -355,11 +355,23 @@ public class Utility {
                 case Timestamp:
                     return (value >> 32) * 1000L;
                 case Date:
-                    // the unsigned value is stored in the top 3 bytes
-                    return unpackDate((int)(value >>> 40));
+                    // the three high order bytes are the little endian representation
+                    // the original is zzyyxx0000000000 and the result is 0000000000xxyyzz
+                    long packedDate = 0L;
+                    packedDate |= (value & ffoooooooooooooo) >>> 56;
+                    packedDate |= (value & ooffoooooooooooo) >>> 40;
+                    // the xx byte is signed, so shift left 16 and arithmetic shift right 40
+                    packedDate |= ((value & ooooffoooooooooo) << 16) >> 40;
+                    return unpackDate((int)packedDate);
                 case Time:
-                    // the signed value is stored in the top 3 bytes
-                    return unpackTime((int)(value >> 40));
+                    // the three high order bytes are the little endian representation
+                    // the original is zzyyxx0000000000 and the result is 0000000000xxyyzz
+                    long packedTime = 0L;
+                    packedTime |= (value & ffoooooooooooooo) >>> 56;
+                    packedTime |= (value & ooffoooooooooooo) >>> 40;
+                    // the xx byte is signed, so shift left 16 and arithmetic shift right 40
+                    packedTime |= ((value & ooooffoooooooooo) << 16) >> 40;
+                    return unpackTime((int)packedTime);
                 default:
                     throw new ClusterJUserException(
                             local.message("ERR_Unsupported_Mapping", storeColumn.getType(), "long"));
@@ -685,9 +697,9 @@ public class Utility {
                 case Timestamp:
                     return value * 1000L;
                 case Date:
-                    return unpackDate((int)value);
+                    return unpackDate((int)(value));
                 case Time:
-                    return unpackTime((int)value);
+                    return unpackTime((int)(value));
                 default:
                     throw new ClusterJUserException(
                             local.message("ERR_Unsupported_Mapping", storeColumn.getType(), "long"));
@@ -2020,10 +2032,12 @@ public class Utility {
      * @param ndbRecAttr the NdbRecAttr
      * @return the long
      */
-    public static long getLong(Column storeColumn, NdbRecAttr ndbRecAttr) {
-        return endianManager.getLong(storeColumn, ndbRecAttr);
-    }
 
+    /** Convert a long value from storage.
+     * The value stored in the database might be a time, timestamp, date, bit array,
+     * or simply a long value. The converted value can be converted into a 
+     * time, timestamp, date, bit array, or long value.
+     */
     public static long getLong(Column storeColumn, long value) {
         return endianManager.getLong(storeColumn, value);
     }

=== modified file 'storage/ndb/compile-cluster'
--- a/storage/ndb/compile-cluster	2012-03-15 08:23:12 +0000
+++ b/storage/ndb/compile-cluster	2012-04-10 11:40:39 +0000
@@ -85,7 +85,7 @@ my $cmake_version_id;
   my @args;
   
   # Hardcoded options controlling how to build MySQL Server
-  # push(@args, "-DWITH_SSL=bundled");
+  push(@args, "-DWITH_SSL=yes");
  
   if ($opt_debug)
   {

=== modified file 'storage/ndb/include/util/Vector.hpp'
--- a/storage/ndb/include/util/Vector.hpp	2011-09-02 09:16:56 +0000
+++ b/storage/ndb/include/util/Vector.hpp	2012-04-11 08:45:06 +0000
@@ -24,7 +24,8 @@
 template<class T>
 class Vector {
 public:
-  Vector(int sz = 10);
+  Vector(unsigned sz = 10, unsigned inc_sz = 0);
+  int expand(unsigned sz);
   ~Vector();
 
   T& operator[](unsigned i);
@@ -32,7 +33,7 @@ public:
   unsigned size() const { return m_size; };
   
   int push_back(const T &);
-  void push(const T&, unsigned pos);
+  int push(const T&, unsigned pos);
   T& set(T&, unsigned pos, T& fill_obj);
   T& back();
   
@@ -63,40 +64,80 @@ private:
   unsigned m_arraySize;
 };
 
-template<class T>
-Vector<T>::Vector(int i){
-  m_items = new T[i];
+/**
+ * BEWARE: Constructing Vector with initial size > 0 is
+ * unsafe wrt. catching 'out of memory' errors.
+ * (C'tor doesn't return error code)
+ * Instead construct Vector with size==0, and then
+ * expand() it to the wanted initial size.
+ */
+template<class T>
+Vector<T>::Vector(unsigned sz, unsigned inc_sz):
+  m_items(NULL),
+  m_size(0),
+  m_incSize((inc_sz > 0) ? inc_sz : 50),
+  m_arraySize(0)
+{
+  if (sz == 0)
+    return;
+
+  m_items = new T[sz];
   if (m_items == NULL)
   {
     errno = ENOMEM;
-    m_size = 0;
-    m_arraySize = 0;
-    m_incSize = 0;
     return;
   }
-  m_size = 0;
-  m_arraySize = i;
-  m_incSize = 50;
+  m_arraySize = sz;
+}
+
+template<class T>
+int
+Vector<T>::expand(unsigned sz){
+  if (sz <= m_size)
+    return 0;
+
+  T * tmp = new T[sz];
+  if(tmp == NULL)
+  {
+    errno = ENOMEM;
+    return -1;
+  }
+  for (unsigned i = 0; i < m_size; i++)
+    tmp[i] = m_items[i];
+  delete[] m_items;
+  m_items = tmp;
+  m_arraySize = sz;
+  return 0;
 }
 
+/**
+ * BEWARE: Copy-constructing a Vector is
+ * unsafe wrt. catching 'out of memory' errors.
+ * (C'tor doesn't return error code)
+ * Instead construct empty Vector (size==0),
+ * and then assign() it the initial contents.
+ */
 template<class T>
 Vector<T>::Vector(const Vector& src):
-  m_items(new T[src.m_size]),
-  m_size(src.m_size),
+  m_items(NULL),
+  m_size(0),
   m_incSize(src.m_incSize),
-  m_arraySize(src.m_size)
-  
+  m_arraySize(0)
 {
+  const unsigned sz = src.m_size;
+  if (sz == 0)
+    return;
+
+  m_items = new T[sz];
   if (unlikely(m_items == NULL)){
     errno = ENOMEM;
-    m_size = 0;
-    m_arraySize = 0;
-    m_incSize = 0;
     return;
   }
-  for(unsigned i = 0; i < m_size; i++){
+  for(unsigned i = 0; i < sz; i++){
     m_items[i] = src.m_items[i];
   }
+  m_arraySize = sz;
+  m_size = sz;
 }
 
 template<class T>
@@ -127,6 +168,8 @@ Vector<T>::operator[](unsigned i) const 
 template<class T>
 T &
 Vector<T>::back(){
+  if(m_size==0)
+    abort();
   return (* this)[m_size - 1];
 }
 
@@ -134,17 +177,9 @@ template<class T>
 int
 Vector<T>::push_back(const T & t){
   if(m_size == m_arraySize){
-    T * tmp = new T [m_arraySize + m_incSize];
-    if(tmp == NULL)
-    {
-      errno = ENOMEM;
-      return -1;
-    }
-    for (unsigned k = 0; k < m_size; k++)
-      tmp[k] = m_items[k];
-    delete[] m_items;
-    m_items = tmp;
-    m_arraySize = m_arraySize + m_incSize;
+    const int err = expand(m_arraySize + m_incSize);
+    if (unlikely(err))
+      return err;
   }
   m_items[m_size] = t;
   m_size++;
@@ -152,10 +187,12 @@ Vector<T>::push_back(const T & t){
 }
 
 template<class T>
-void
+int
 Vector<T>::push(const T & t, unsigned pos)
 {
-  push_back(t);
+  const int err = push_back(t);
+  if (unlikely(err))
+    return err;
   if (pos < m_size - 1)
   {
     for(unsigned i = m_size - 1; i > pos; i--)
@@ -164,13 +201,15 @@ Vector<T>::push(const T & t, unsigned po
     }
     m_items[pos] = t;
   }
+  return 0;
 }
 
 template<class T>
 T&
 Vector<T>::set(T & t, unsigned pos, T& fill_obj)
 {
-  fill(pos, fill_obj);
+  if (fill(pos, fill_obj))
+    abort();
   T& ret = m_items[pos];
   m_items[pos] = t;
   return ret;
@@ -196,19 +235,31 @@ Vector<T>::clear(){
 template<class T>
 int
 Vector<T>::fill(unsigned new_size, T & obj){
+  const int err = expand(new_size);
+  if (unlikely(err))
+    return err;
   while(m_size <= new_size)
     if (push_back(obj))
       return -1;
   return 0;
 }
 
+/**
+ * 'operator=' will 'abort()' on 'out of memory' errors.
+ *  You may prefer using ::assign()' which returns
+ *  an error code instead of aborting.
+ */
 template<class T>
 Vector<T>& 
 Vector<T>::operator=(const Vector<T>& obj){
   if(this != &obj){
     clear();
+    const int err = expand(obj.size());
+    if (unlikely(err))
+      abort();
     for(unsigned i = 0; i<obj.size(); i++){
-      push_back(obj[i]);
+      if (push_back(obj[i]))
+        abort();
     }
   }
   return * this;
@@ -218,12 +269,19 @@ template<class T>
 int
 Vector<T>::assign(const T* src, unsigned cnt)
 {
+  if (getBase() == src)
+    return 0;  // Self-assign is a NOOP
+
   clear();
+  const int err = expand(cnt);
+  if (unlikely(err))
+    return err;
+
   for (unsigned i = 0; i<cnt; i++)
   {
-    int ret;
-    if ((ret = push_back(src[i])))
-      return ret;
+    const int err = push_back(src[i]);
+    if (unlikely(err))
+      return err;
   }
   return 0;
 }
@@ -241,7 +299,8 @@ Vector<T>::equal(const Vector<T>& obj) c
 template<class T>
 class MutexVector : public NdbLockable {
 public:
-  MutexVector(int sz = 10);
+  MutexVector(unsigned sz = 10, unsigned inc_sz = 0);
+  int expand(unsigned sz);
   ~MutexVector();
 
   T& operator[](unsigned i);
@@ -260,26 +319,60 @@ public:
 
   int fill(unsigned new_size, T & obj);
 private:
+  // Don't allow copy and assignment of MutexVector
+  MutexVector(const MutexVector&); 
+  MutexVector<T>& operator=(const MutexVector<T>&);
+
   T * m_items;
   unsigned m_size;
   unsigned m_incSize;
   unsigned m_arraySize;
 };
 
-template<class T>
-MutexVector<T>::MutexVector(int i){
-  m_items = new T[i];
+/**
+ * BEWARE: Constructing MutexVector with initial size > 0 is
+ * unsafe wrt. catching 'out of memory' errors.
+ * (C'tor doesn't return error code)
+ * Instead construct MutexVector with size==0, and then
+ * expand() it to the wanted initial size.
+ */
+template<class T>
+MutexVector<T>::MutexVector(unsigned sz, unsigned inc_sz):
+  m_items(NULL),
+  m_size(0),
+  m_incSize((inc_sz > 0) ? inc_sz : 50),
+  m_arraySize(0)
+{
+  if (sz == 0)
+    return;
+
+  m_items = new T[sz];
   if (m_items == NULL)
   {
     errno = ENOMEM;
-    m_size = 0;
-    m_arraySize = 0;
-    m_incSize = 0;
     return;
   }
-  m_size = 0;
-  m_arraySize = i;
-  m_incSize = 50;
+  m_arraySize = sz;
+}
+
+template<class T>
+int
+MutexVector<T>::expand(unsigned sz){
+  if (sz <= m_size)
+    return 0;
+
+  T * tmp = new T[sz];
+  if(tmp == NULL)
+  {
+    errno = ENOMEM;
+    return -1;
+  }
+  for (unsigned i = 0; i < m_size; i++)
+    tmp[i] = m_items[i];
+  delete[] m_items;
+  m_items = tmp;
+  m_arraySize = sz;
+  return 0;
 }
 
 template<class T>
@@ -310,6 +403,8 @@ MutexVector<T>::operator[](unsigned i) c
 template<class T>
 T &
 MutexVector<T>::back(){
+  if(m_size==0)
+    abort();
   return (* this)[m_size - 1];
 }
 
@@ -318,18 +413,12 @@ int
 MutexVector<T>::push_back(const T & t){
   lock();
   if(m_size == m_arraySize){
-    T * tmp = new T [m_arraySize + m_incSize];
-    if (tmp == NULL)
+    const int err = expand(m_arraySize + m_incSize);
+    if (unlikely(err))
     {
-      errno = ENOMEM;
       unlock();
-      return -1;
+      return err;
     }
-    for (unsigned k = 0; k < m_size; k++)
-      tmp[k] = m_items[k];
-    delete[] m_items;
-    m_items = tmp;
-    m_arraySize = m_arraySize + m_incSize;
   }
   m_items[m_size] = t;
   m_size++;
@@ -343,19 +432,13 @@ MutexVector<T>::push_back(const T & t, b
   if(lockMutex) 
     lock();
   if(m_size == m_arraySize){
-    T * tmp = new T [m_arraySize + m_incSize];
-    if (tmp == NULL)
+    const int err = expand(m_arraySize + m_incSize);
+    if (unlikely(err))
     {
-      errno = ENOMEM;
       if(lockMutex) 
         unlock();
-      return -1;
+      return err;
     }
-    for (unsigned k = 0; k < m_size; k++)
-      tmp[k] = m_items[k];
-    delete[] m_items;
-    m_items = tmp;
-    m_arraySize = m_arraySize + m_incSize;
   }
   m_items[m_size] = t;
   m_size++;

=== modified file 'storage/ndb/memcache/extra/memcached/CMakeLists.txt'
--- a/storage/ndb/memcache/extra/memcached/CMakeLists.txt	2011-09-23 00:00:18 +0000
+++ b/storage/ndb/memcache/extra/memcached/CMakeLists.txt	2012-03-31 15:10:44 +0000
@@ -84,6 +84,7 @@ CHECK_INCLUDE_FILE("netdb.h"         HAV
 CHECK_INCLUDE_FILE("sys/uio.h"       HAVE_SYS_UIO_H)
 CHECK_INCLUDE_FILE("signal.h"        HAVE_SIGIGNORE)
 CHECK_INCLUDE_FILES("sys/types.h;netinet/tcp.h" HAVE_NETINET_TCP_H)
+CHECK_FUNCTION_EXISTS(htonll         HAVE_HTONLL)
 
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config_tests.in 
                ${CMAKE_CURRENT_SOURCE_DIR}/config.h)

=== modified file 'storage/ndb/memcache/extra/memcached/config_tests.in'
--- a/storage/ndb/memcache/extra/memcached/config_tests.in	2011-09-20 05:13:01 +0000
+++ b/storage/ndb/memcache/extra/memcached/config_tests.in	2012-03-31 15:16:18 +0000
@@ -24,6 +24,7 @@
 #cmakedefine HAVE_SYS_UIO_H
 #cmakedefine HAVE_NETINET_TCP_H
 #cmakedefine HAVE_SIGIGNORE
+#cmakedefine HAVE_HTONLL
 
 #include "my_config.h"
 #include "config_static.h"

=== modified file 'storage/ndb/memcache/extra/memcached/daemon/memcached.c'
--- a/storage/ndb/memcache/extra/memcached/daemon/memcached.c	2011-09-20 05:13:01 +0000
+++ b/storage/ndb/memcache/extra/memcached/daemon/memcached.c	2012-04-06 00:06:41 +0000
@@ -1761,6 +1761,10 @@ static void process_bin_get(conn *c) {
     case ENGINE_NOT_MY_VBUCKET:
         write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
         break;
+    case ENGINE_TMPFAIL:
+        write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
+        break;
+
     default:
         /* @todo add proper error handling! */
         settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,

=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h	2012-04-03 00:34:18 +0000
@@ -23,13 +23,21 @@
 #include "ndbmemcache_global.h"
 #include <memcached/types.h>
 
-#include "ndb_pipeline.h"
+#include "thread_identifier.h"
+
+
+typedef struct {
+  int nthreads;                /* number of worker threads */
+  int max_clients;             /* maximum number of client connections */
+  const char * config_string;  /* scheduler-specific configuration string */
+} scheduler_options;       
+
 
 #ifdef __cplusplus
-#include "NdbInstance.h"
-#include "Configuration.h"
-#include "thread_identifier.h"
 
+/* Forward declarations */
+class Configuration;
+class workitem;
 
 /* Scheduler is an interface */
 
@@ -44,10 +52,9 @@ public:
   /** init() is the called from the main thread, 
       after configuration has been read. 
       threadnum: which thread this scheduler will eventually attach to 
-      nthreads: how many total threads will be initialized 
-      config_string: additional configuration string for scheduler   
+      options: struct specifying run-time options   
   */
-  virtual void init(int threadnum, int nthreads, const char *config_string) = 0;
+  virtual void init(int threadnum, const scheduler_options * options) = 0;
                     
   /** attach_thread() is called from each thread 
       at pipeline initialization time. */
@@ -87,9 +94,6 @@ public:
    */
   virtual bool global_reconfigure(Configuration *new_config) = 0;
     
-  /** each scheduler instance serves a single NDB pipeline 
-  */
-  ndb_pipeline *pipeline;
 };
 #endif
 

=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/ndb_engine.h	2012-04-03 00:34:18 +0000
@@ -62,6 +62,7 @@ struct ndb_engine {
   } startup_options;
   
   struct {
+    size_t maxconns;
     size_t nthreads;
     bool cas_enabled;  
     size_t verbose;

=== modified file 'storage/ndb/memcache/include/ndb_engine_errors.h'
--- a/storage/ndb/memcache/include/ndb_engine_errors.h	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/include/ndb_engine_errors.h	2012-04-05 21:37:02 +0000
@@ -22,11 +22,22 @@
 
 #include <ndberror.h>
 
-/* Errors 9000 - 9099 are reported as "Scheduler Error" */
-extern ndberror_struct AppError9001_ReconfLock;
-extern ndberror_struct AppError9002_NoNDBs;
-extern ndberror_struct AppError9003_SyncClose;
+/* 
+   The NDB Engine for Memcached uses error codes 29000 - 29999 
+*/
 
-/* Errors 9100 and up are reported as "Memcached Error" */
+
+
+/*** Errors 290xx and 291xx are reported as "Scheduler Error" ***/
+
+/* 2900x: general scheduler error codes */
+extern ndberror_struct AppError29001_ReconfLock;
+extern ndberror_struct AppError29002_NoNDBs;
+
+/* 2902x: blocking NDB operations in worker thread */
+extern ndberror_struct AppError29023_SyncClose;
+extern ndberror_struct AppError29024_autogrow;
+
+/*** Errors 29200 and up are reported as "Memcached Error" ***/
 
 #endif

=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h	2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h	2012-04-05 21:00:05 +0000
@@ -26,14 +26,13 @@
 
 #include "workqueue.h"
 #include "ndb_engine.h"
+#include "Scheduler.h"
 
 /* This structure is used in both C and C++ code, requiring a small hack: */
 #ifdef __cplusplus
-/* forward declaration: */
-class Scheduler;
-#define C_OR_CPP_SCHEDULER Scheduler
+#define CPP_SCHEDULER Scheduler
 #else 
-#define C_OR_CPP_SCHEDULER void
+#define CPP_SCHEDULER void
 #endif
 
 /* In each pipeline there lives an allocator, which is used for workitems, 
@@ -83,9 +82,9 @@ typedef struct request_pipeline {
   unsigned int id;              /*! each pipeline has an id */
   unsigned int nworkitems;      /*! counter used to give each workitem an id */
   struct ndb_engine *engine;    
-  pthread_t engine_thread_id;   
+  pthread_t worker_thread_id;   
   allocator_slab_class alligator[ALLIGATOR_ARRAY_SIZE];  /*! an allocator */
-  C_OR_CPP_SCHEDULER *scheduler;
+  CPP_SCHEDULER *scheduler;
   memory_pool *pool;            /*! has the whole lifetime of the pipeline */
 } ndb_pipeline;
 
@@ -99,7 +98,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE
 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *);
 
 /** create a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id);
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *);
 
 /** call into a pipeline for its own statistics */
 void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);
@@ -111,7 +110,7 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
 /***** SCHEDULER APIS *****/
 
 /** Global initialization of scheduler, at startup time */
-void * scheduler_initialize(const char *name, int nthreads, int threadnum);
+void * scheduler_initialize(ndb_pipeline *, scheduler_options *);
 
 /** shutdown a scheduler */
 void scheduler_shutdown(ndb_pipeline *);

=== modified file 'storage/ndb/memcache/include/ndbmemcache_config.in'
--- a/storage/ndb/memcache/include/ndbmemcache_config.in	2011-10-14 08:26:28 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_config.in	2012-03-31 18:56:49 +0000
@@ -41,5 +41,6 @@
  
 #cmakedefine HAVE_FUNC_IN_CXX
 
+#cmakedefine HAVE_HTONLL
 
 #endif

=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ExternalValue.cc	2012-04-05 21:00:05 +0000
@@ -22,6 +22,7 @@
 #include <assert.h>
 
 #include "workitem.h"
+#include "NdbInstance.h"
 #include "Operation.h"
 #include "Scheduler.h"
 #include "status_block.h"

=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c	2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c	2012-04-05 21:00:05 +0000
@@ -166,6 +166,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
   ENGINE_ERROR_CODE return_status;
   struct ndb_engine *ndb_eng = ndb_handle(handle);
   struct default_engine *def_eng = default_handle(ndb_eng);
+  scheduler_options sched_opts;
   
   /* Process options for both the ndb engine and the default engine:  */
   read_cmdline_options(ndb_eng, def_eng, config_str);
@@ -212,6 +213,10 @@ static ENGINE_ERROR_CODE ndb_initialize(
   /* prefetch data dictionary objects */
   prefetch_dictionary_objects();
 
+  /* Build the scheduler options structure */
+  sched_opts.nthreads = ndb_eng->server_options.nthreads;
+  sched_opts.max_clients = ndb_eng->server_options.maxconns;
+
   /* Allocate and initailize the pipelines, and their schedulers.  
      This will take some time; each pipeline creates slab and pool allocators,
      and each scheduler may preallocate a large number of Ndb objects and 
@@ -224,9 +229,9 @@ static ENGINE_ERROR_CODE ndb_initialize(
   ndb_eng->pipelines  = malloc(nthreads * sizeof(void *));
   ndb_eng->schedulers = malloc(nthreads * sizeof(void *));
   for(i = 0 ; i < nthreads ; i++) {
-    ndb_eng->pipelines[i] = get_request_pipeline(i);
-    ndb_eng->schedulers[i] = 
-      scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i);
+    ndb_eng->pipelines[i] = get_request_pipeline(i, ndb_eng);
+    ndb_eng->schedulers[i] = scheduler_initialize(ndb_eng->pipelines[i], 
+                                                  & sched_opts);
     if(ndb_eng->schedulers[i] == 0) {
       logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n", 
                   ndb_eng->startup_options.scheduler); 
@@ -819,6 +824,9 @@ int fetch_core_settings(struct ndb_engin
     { .key = "cas_enabled",
       .datatype = DT_BOOL,
       .value.dt_bool = &engine->server_options.cas_enabled },
+    { .key = "maxconns",
+      .datatype = DT_SIZE,
+      .value.dt_size = &engine->server_options.maxconns },
     { .key = "num_threads",
       .datatype = DT_SIZE,
       .value.dt_size = &engine->server_options.nthreads },

=== modified file 'storage/ndb/memcache/src/ndb_engine_errors.cc'
--- a/storage/ndb/memcache/src/ndb_engine_errors.cc	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_errors.cc	2012-04-05 21:37:02 +0000
@@ -20,17 +20,22 @@
 
 #include <ndberror.h>
 
-ndberror_struct AppError9001_ReconfLock = 
-  { ndberror_st_temporary , ndberror_cl_application , 9001, -1,
+ndberror_struct AppError29001_ReconfLock = 
+  { ndberror_st_temporary , ndberror_cl_application , 29001, -1,
     "Could not obtain configuration read lock", 0
   };
 
-ndberror_struct AppError9002_NoNDBs =
-  { ndberror_st_temporary , ndberror_cl_application , 9002, -1,
+ndberror_struct AppError29002_NoNDBs =
+  { ndberror_st_temporary , ndberror_cl_application , 29002, -1,
     "No Ndb Instances in freelist", 0
   };
 
-ndberror_struct AppError9003_SyncClose =
-  { ndberror_st_temporary , ndberror_cl_application , 9003, -1,
+ndberror_struct AppError29023_SyncClose =
+  { ndberror_st_temporary , ndberror_cl_application , 29023, -1,
     "Waited for synchronous close of NDB transaction", 0 
   };
+
+ndberror_struct AppError29024_autogrow = 
+  { ndberror_st_temporary , ndberror_cl_application , 29024, -1,
+    "Out of Ndb instances, growing freelist", 0 
+  };

=== modified file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc	2012-03-07 03:57:36 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc	2012-04-06 01:00:19 +0000
@@ -208,8 +208,10 @@ void ndb_error_logger_stats(ADD_STAT add
 
   for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) {
     for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) { 
-      klen = sprintf(key, "NDB_Error_%d", sym->error_code);
-      vlen = sprintf(val, "%du", sym->count);
+      klen = sprintf(key, "%s_Error_%d", 
+                     (sym->error_code < 29000 ? "NDB" : "Engine"),
+                     sym->error_code);
+      vlen = sprintf(val, "%lu", (long unsigned int) sym->count);
       add_stat(key, klen, val, vlen, cookie);
     }
   }

=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc	2012-04-03 00:34:18 +0000
@@ -71,7 +71,12 @@ void init_pool_header(allocation_referen
 
 /* The public API */
 
-/* initialize a new pipeline for an NDB engine thread */
+/* Attach a new pipeline to an NDB worker thread. 
+   Some initialization has already occured when the main single-thread startup
+   called get_request_pipeline().  But this is the first call into a pipeline
+   from its worker thread.  It will initialize the thread's identifier, and 
+   attach the pipeline to its scheduler.
+*/
 ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
   bool did_inc;
   unsigned int id;
@@ -85,14 +90,13 @@ ndb_pipeline * ndb_pipeline_initialize(s
   
   /* Fetch the partially initialized pipeline */
   ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
-  
+
+  /* Sanity checks */
   assert(self->id == id);
+  assert(self->engine == engine);
   
-  /* Set the pointer back to the engine */
-  self->engine = engine;
-  
-  /* And the thread id */
-  self->engine_thread_id = pthread_self(); 
+  /* Set the pthread id */
+  self->worker_thread_id = pthread_self(); 
 
   /* Create and set a thread identity */
   tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
@@ -108,16 +112,20 @@ ndb_pipeline * ndb_pipeline_initialize(s
 }
 
 
-/* Allocate and initialize a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id) { 
+/* Allocate and initialize a generic request pipeline.
+   In unit test code, this can be called with a NULL engine pointer -- 
+   it will still initialize a usable slab allocator and memory pool 
+   which can be tested.  
+*/
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *engine) { 
   /* Allocate the pipeline */
   ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline)); 
   
   /* Initialize */  
-  self->engine = 0;
+  self->engine = engine;
   self->id = thd_id;
   self->nworkitems = 0;
-
+    
   /* Say hi to the alligator */  
   init_allocator(self);
   
@@ -160,27 +168,27 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
 
 /* The scheduler API */
 
-void * scheduler_initialize(const char *cf, int nthreads, int athread) {
+void * scheduler_initialize(ndb_pipeline *self, scheduler_options *options) {
   Scheduler *s = 0;
-  const char *sched_options = 0;
+  const char *cf = self->engine->startup_options.scheduler;
+  options->config_string = 0;
   
   if(cf == 0 || *cf == 0) {
     s = new DEFAULT_SCHEDULER;
   }
   else if(!strncasecmp(cf, "stockholm", 9)) {
     s = new Scheduler_stockholm;
-    sched_options = & cf[9];
+    options->config_string = & cf[9];
   }
   else if(!strncasecmp(cf,"S", 1)) {
     s = new S::SchedulerWorker;
-    sched_options = & cf[1];
+    options->config_string = & cf[1];
   }
   else {
     return NULL;
   }
-    
-  s->init(athread, nthreads, sched_options);
-  s->pipeline = 0;
+  
+  s->init(self->id, options);
 
   return (void *) s;
 }

=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc	2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc	2012-04-05 21:37:02 +0000
@@ -880,7 +880,7 @@ void worker_close(NdbTransaction *tx, wo
   nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
 
   if(nwaits_post > nwaits_pre) 
-    log_app_error(& AppError9003_SyncClose);
+    log_app_error(& AppError29023_SyncClose);
  
   if(wqitem->ext_val) 
     delete wqitem->ext_val;

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-03-22 19:24:32 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	2012-04-06 01:00:19 +0000
@@ -64,13 +64,14 @@ S::SchedulerGlobal::SchedulerGlobal(Conf
 }
 
 
-void S::SchedulerGlobal::init(int _nthreads, const char *_config_string) {
+void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
   DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
 
   /* Set member variables */
-  nthreads = _nthreads;
-  config_string = _config_string;
+  nthreads = sched_opts->nthreads;
+  config_string = sched_opts->config_string;
   parse_config_string(nthreads, config_string);
+  options.max_clients = sched_opts->max_clients;
 
   /* Fetch or initialize clusters */
   nclusters = conf->nclusters;
@@ -104,8 +105,9 @@ void S::SchedulerGlobal::init(int _nthre
   
   /* Log message for startup */
   logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
-              "c%d,f%d,t%d", nclusters, nclusters == 1 ? "" : "s",
-              options.n_connections, options.force_send, options.send_timer);
+              "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
+              options.n_connections, options.force_send, 
+              options.auto_grow, options.send_timer);
 
   /* Now Running */
   running = true;
@@ -171,6 +173,7 @@ void S::SchedulerGlobal::parse_config_st
   options.n_connections = 0;   // 0 = n_connections based on db-stored config
   options.force_send = 0;      // 0 = force send always off
   options.send_timer = 1;      // 1 = 1 ms. timer in send thread
+  options.auto_grow = 1;       // 1 = allow NDB instance pool to grow on demand
 
   if(str) {
     const char *s = str;
@@ -188,6 +191,9 @@ void S::SchedulerGlobal::parse_config_st
         case 'f':
           options.force_send = value;
           break;
+        case 'g':
+          options.auto_grow = value;
+          break;
         case 't':
           options.send_timer = value;
           break;
@@ -214,6 +220,10 @@ void S::SchedulerGlobal::parse_config_st
     logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
     assert(options.send_timer >= 1 && options.send_timer <= 10);
   }
+  if(options.auto_grow < 0 || options.auto_grow > 1) {
+    logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+    assert(options.auto_grow == 0 || options.auto_grow == 1);
+  }
 }
 
 
@@ -249,15 +259,14 @@ void S::SchedulerGlobal::add_stats(const
 /* SchedulerWorker methods */
 
 void S::SchedulerWorker::init(int my_thread, 
-                              int nthreads, 
-                              const char * config_string) {
+                              const scheduler_options * options) {
   /* On the first call in, initialize the SchedulerGlobal.
    * This will start the send & poll threads for each connection.
    */
   if(my_thread == 0) {
     sched_generation_number = 1;
     s_global = new SchedulerGlobal(& get_Configuration());
-    s_global->init(nthreads, config_string);
+    s_global->init(options);
   }
   
   /* Initialize member variables */
@@ -286,7 +295,7 @@ void S::SchedulerWorker::attach_thread(t
 ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
   int c = item->prefix_info.cluster_id;
   ENGINE_ERROR_CODE response_code;
-  NdbInstance *inst;
+  NdbInstance *inst = 0;
   S::WorkerConnection *wc;
   const KeyPrefix *pfx;
   
@@ -299,29 +308,41 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
     pthread_rwlock_unlock(& reconf_lock);
   }
   else {
-    log_app_error(& AppError9001_ReconfLock);
+    log_app_error(& AppError29001_ReconfLock);
     return ENGINE_TMPFAIL;
   }
   /* READ LOCK RELEASED */
   
   item->base.nsuffix = item->base.nkey - pfx->prefix_len;
   if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
-  
-  if(wc && wc->freelist) {
+ 
+  if(wc == 0) return ENGINE_FAILED;
+    
+  if(wc->freelist) {                 /* Get the next NDB from the freelist. */
     inst = wc->freelist;
     wc->freelist = inst->next;
   }
-  else {
-    /* No more free NDBs. 
-     Eventually Scheduler::io_completed() will run _in this thread_ and return 
-     an NDB to the freelist.  But no other thread can free one, so 
-     all we can do is return an error. 
-     (Or, alternately, the scheduler may be shutting down.)
-     */
-    log_app_error(& AppError9002_NoNDBs);
-    return ENGINE_TMPFAIL;
+  else {                             /* No free NDBs. */
+    if(wc->sendqueue->is_aborted()) {
+      return ENGINE_TMPFAIL;
+    }
+    else {                           /* Try to make an NdbInstance on the fly */
+      inst = wc->newNdbInstance();
+      if(inst) {
+        log_app_error(& AppError29024_autogrow);
+      }
+      else {
+        /* We have hit a hard maximum.  Eventually Scheduler::io_completed() 
+           will run _in this thread_ and return an NDB to the freelist.  
+           But no other thread can free one, so here we return an error. 
+         */
+        log_app_error(& AppError29002_NoNDBs);
+        return ENGINE_TMPFAIL;
+      }
+    }
   }
   
+  assert(inst);
   inst->link_workitem(item);
   
   // Fetch the query plan for this prefix.
@@ -520,6 +541,18 @@ void S::Cluster::add_stats(const char *s
 
 /* WorkerConnection methods */
 
+
+NdbInstance * S::WorkerConnection::newNdbInstance() {
+  NdbInstance *inst = 0;
+  if(instances.current < instances.max) {
+    inst = new NdbInstance(conn->conn, 2);
+    instances.current++;
+    inst->id = ((id.thd + 1) * 10000) + instances.current;
+  }
+  return inst;
+}
+
+
 S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
                                       int thd_id, int cluster_id) {
   S::Cluster *cl = global->clusters[cluster_id];  
@@ -536,21 +569,25 @@ S::WorkerConnection::WorkerConnection(Sc
   plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes);
   plan_set->buildSetForConfiguration(conf, cluster_id);
 
+  /* How many NDB instances to start initially */
+  instances.initial = conn->instances.initial / conn->n_workers;
+
+  /* Maximum size of send queue, and upper bound on NDB instances */
+  instances.max = conn->instances.max / conn->n_workers;
+
   /* Build the freelist */
   freelist = 0;
-  int my_ndb_inst = conn->nInst / conn->n_workers;
-  for(int j = 0 ; j < my_ndb_inst ; j++ ) {
-    NdbInstance *inst = new NdbInstance(conn->conn, 2);
-    inst->id = ((id.thd + 1) * 10000) + j + 1; 
+  for(instances.current = 0; instances.current < instances.initial; ) {
+    NdbInstance *inst = newNdbInstance();
     inst->next = freelist;
     freelist = inst;
   }
 
   DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.", 
-              id.cluster, id.conn, id.node, id.thd, my_ndb_inst);
+              id.cluster, id.conn, id.node, id.thd, instances.current);
   
   /* Initialize the sendqueue */
-  sendqueue = new Queue<NdbInstance>(my_ndb_inst);
+  sendqueue = new Queue<NdbInstance>(instances.max);
   
   /* Hoard a transaction (an API connect record) for each Ndb object.  This
    * first call to startTransaction() will send TC_SEIZEREQ and wait for a 
@@ -560,7 +597,7 @@ S::WorkerConnection::WorkerConnection(Sc
   QueryPlan *plan;
   const KeyPrefix *prefix = conf->getNextPrefixForCluster(id.cluster, NULL);
   if(prefix) {
-    NdbTransaction ** txlist = new NdbTransaction * [my_ndb_inst];
+    NdbTransaction ** txlist = new NdbTransaction * [instances.current];
     int i = 0;
 
     // Open them all.
@@ -573,7 +610,7 @@ S::WorkerConnection::WorkerConnection(Sc
     }
     
     // Close them all.
-    for(i = 0 ; i < my_ndb_inst ; i++) {
+    for(i = 0 ; i < instances.current ; i++) {
       txlist[i]->close();
     }    
     
@@ -642,17 +679,29 @@ S::Connection::Connection(S::Cluster & _
   n_workers = global->options.n_worker_threads / cluster.nconnections;
   if(n_total_workers % cluster.nconnections > id) n_workers += 1;  
 
-  /* How many NDB objects are needed? */
-  /* Note that this is used to configure hard limits on the size of the 
-   * waitgroup, the sentqueue, and the reschedulequeue -- and it will not be 
+  /* How many NDB objects are needed for the desired performance? */
+  double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
+  instances.initial = (int) (total_ndb_objects / cluster.nconnections);
+  while(instances.initial % n_workers) instances.initial++; // round up
+
+  /* The maximum number of NDB objects.
+   * This is used to configure hard limits on the size of the waitgroup, 
+   * the sentqueue, and the reschedulequeue -- and it will not be 
    * possible to increase those limits during online reconfig. 
    */
-  double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
-  nInst = (int) (total_ndb_objects / cluster.nconnections);
-  while(nInst % n_workers) nInst++; // round up
-    
+  instances.max = instances.initial;
+  // allow the pool to grow on demand? 
+  if(global->options.auto_grow)
+    instances.max = (int) (instances.max * 1.6);
+  // max_clients imposes a hard upper limit
+  if(instances.max > (global->options.max_clients / cluster.nconnections))
+    instances.max = global->options.max_clients / cluster.nconnections;
+  // instances.initial might also be subject to the max_clients limit
+  if(instances.initial > instances.max) 
+    instances.initial = instances.max;
+  
   /* Get a multi-wait Poll Group */
-  pollgroup = conn->create_ndb_wait_group(nInst);
+  pollgroup = conn->create_ndb_wait_group(instances.max);
       
   /* Initialize the statistics */
   stats.sent_operations = 0;
@@ -665,8 +714,8 @@ S::Connection::Connection(S::Cluster & _
   sem.counter = 0;
     
   /* Initialize the queues for sent and resceduled items */
-  sentqueue = new Queue<NdbInstance>(nInst);
-  reschedulequeue = new Queue<NdbInstance>(nInst);
+  sentqueue = new Queue<NdbInstance>(instances.max);
+  reschedulequeue = new Queue<NdbInstance>(instances.max);
 }
 
 
@@ -726,6 +775,14 @@ void S::Connection::add_stats(const char
   klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
   vlen = sprintf(val, "%llu", stats.timeout_races);
   add_stat(key, klen, val, vlen, cookie);
+  
+  klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
+  vlen = sprintf(val, "%d", instances.initial);
+  add_stat(key, klen, val, vlen, cookie);
+
+  klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
+  vlen = sprintf(val, "%d", instances.max);
+  add_stat(key, klen, val, vlen, cookie);
 }                              
   
 

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h	2012-04-06 00:12:54 +0000
@@ -61,7 +61,7 @@ class S::SchedulerGlobal {
 public:
   SchedulerGlobal(Configuration *);
   ~SchedulerGlobal() {};
-  void init(int threads, const char *config_string);
+  void init(const scheduler_options *options);
   void add_stats(const char *, ADD_STAT, const void *);
   void reconfigure(Configuration *);
   void shutdown();
@@ -82,6 +82,8 @@ public:
     int n_connections;     /** preferred number of NDB cluster connections */
     int force_send;        /** how to use NDB force-send */
     int send_timer;        /** milliseconds to set for adaptive send timer */
+    int auto_grow;         /** whether to allow NDB instance pool to grow */
+    int max_clients;       /** memcached max allowed connections */
   } options;
 
 private:
@@ -99,7 +101,7 @@ class S::SchedulerWorker : public Schedu
 public:  
   SchedulerWorker() {};
   ~SchedulerWorker() {};
-  void init(int threadnum, int nthreads, const char *config_string);
+  void init(int threadnum, const scheduler_options * sched_opts);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
   void yield(workitem *) const {};
@@ -111,6 +113,7 @@ public:  
   
 private:
   int id;
+  ndb_pipeline *pipeline;
   SchedulerGlobal * m_global;
 };
 
@@ -159,9 +162,12 @@ private:
   Queue<NdbInstance> * reschedulequeue;
   int id;
   int node_id;
-  int nInst;
-  int n_total_workers;
-  int n_workers;
+  int n_total_workers;   /* same as SchedulerGlobal::options.n_worker_threads */
+  int n_workers;         /* number of workers for this connection */
+  struct {
+    int initial;         /* start with this many NDB instances */
+    int max;             /* scale up to this many */
+  } instances; 
   pthread_t send_thread_id;
   pthread_t poll_thread_id;  
   struct {
@@ -187,6 +193,7 @@ public:
   ~WorkerConnection();
   void shutdown();
   void reconfigure(Configuration *);
+  NdbInstance * newNdbInstance();
 
   struct { 
     int thd           : 8;
@@ -194,6 +201,11 @@ public:
     int conn          : 8;
     unsigned int node : 8;
   } id;
+  struct {
+    int initial;
+    int current;
+    int max;
+  } instances;
   S::Connection *conn;
   ConnQueryPlanSet *plan_set, *old_plan_set;
   NdbInstance *freelist;

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc	2012-04-03 00:34:18 +0000
@@ -46,14 +46,15 @@ extern "C" {
 }
 
 
-void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) {
+void Scheduler_stockholm::init(int my_thread, 
+                               const scheduler_options *options) {
   const Configuration & conf = get_Configuration();
 
   /* How many NDB instances are needed per cluster? */
   for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
     ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
     double total_ndb_objects = conf.figureInFlightTransactions(c);
-    cluster[c].nInst = (int) total_ndb_objects / nthreads;
+    cluster[c].nInst = (int) total_ndb_objects / options->nthreads;
     DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
                 c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
   }

=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h	2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h	2012-04-03 00:34:18 +0000
@@ -44,7 +44,7 @@ class Scheduler_stockholm : public Sched
 public:
   Scheduler_stockholm() {};
   ~Scheduler_stockholm() {};
-  void init(int threadnum, int nthreads, const char *config_string);
+  void init(int threadnum, const scheduler_options *options);
   void attach_thread(thread_identifier *);
   ENGINE_ERROR_CODE schedule(workitem *);
   void yield(workitem *) const;                                       // inlined
@@ -56,6 +56,7 @@ public:
   bool global_reconfigure(Configuration *) { return false; } ;
 
 private:  
+  ndb_pipeline *pipeline;
   struct {
     struct workqueue *queue; 
     struct sched_stats_stockholm { 

=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc	2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/unit/alloc.cc	2012-04-03 00:34:18 +0000
@@ -27,14 +27,16 @@
 
 #include "all_tests.h"
 
+#define TEST_ALLOC_BLOCKS 34
+
 int run_allocator_test(QueryPlan *, Ndb *, int v) {
-  struct request_pipeline *p = get_request_pipeline(0);
+  struct request_pipeline *p = get_request_pipeline(0, NULL);
   
   memory_pool *p1 = pipeline_create_memory_pool(p);
   int sz = 13;
   uint tot = 0;
   void *v1, *v2;
-  for(int i = 0 ; i < 25 ; i++) {
+  for(int i = 0 ; i < TEST_ALLOC_BLOCKS ; i++) {
     v1 = memory_pool_alloc(p1, sz);     tot += sz;
     v2 = memory_pool_alloc(p1, sz + 1); tot += sz + 1;
     sz = (int) (sz * 1.25);

=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2012-03-30 13:18:59 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp	2012-04-11 10:13:24 +0000
@@ -702,24 +702,7 @@ NdbQueryOperationDef::getIndex() const
 NdbQueryBuilder* NdbQueryBuilder::create()
 {
   NdbQueryBuilderImpl* const impl = new NdbQueryBuilderImpl();
-  if (likely (impl != NULL))
-  {
-    if (likely(impl->getNdbError().code == 0))
-    {
-      return &impl->m_interface;
-    }
-    else
-    {
-      // Probably failed to create Vector instances.
-      assert(impl->getNdbError().code == Err_MemoryAlloc);
-      delete impl;
-      return NULL;
-    }
-  }
-  else
-  {
-    return NULL;
-  }
+  return (likely(impl!=NULL)) ? &impl->m_interface : NULL;
 }
 
 void NdbQueryBuilder::destroy()
@@ -1116,17 +1099,11 @@ NdbQueryBuilder::prepare()
 NdbQueryBuilderImpl::NdbQueryBuilderImpl()
 : m_interface(*this),
   m_error(),
-  m_operations(),
-  m_operands(),
+  m_operations(0),
+  m_operands(0),
   m_paramCnt(0),
   m_hasError(false)
-{
-  if (errno == ENOMEM)
-  {
-    // ENOMEM probably comes from Vector().
-    setErrorCode(Err_MemoryAlloc);
-  }
-}
+{}
 
 NdbQueryBuilderImpl::~NdbQueryBuilderImpl()
 {
@@ -1242,12 +1219,12 @@ NdbQueryDefImpl(const Vector<NdbQueryOpe
                 const Vector<NdbQueryOperandImpl*>& operands,
                 int& error)
  : m_interface(*this), 
-   m_operations(operations),
-   m_operands(operands)
+   m_operations(0),
+   m_operands(0)
 {
-  if (errno == ENOMEM)
+  if (m_operations.assign(operations) || m_operands.assign(operands))
   {
-    // Failed to allocate memory for m_operations or m_operands.
+    // Failed to allocate memory in Vector::assign().
     error = Err_MemoryAlloc;
     return;
   }
@@ -1909,16 +1886,10 @@ NdbQueryOperationDefImpl::NdbQueryOperat
    m_opNo(opNo), m_internalOpNo(internalOpNo),
    m_options(options),
    m_parent(NULL), 
-   m_children(), 
-   m_params(),
-   m_spjProjection() 
+   m_children(0), 
+   m_params(0),
+   m_spjProjection(0) 
 {
-  if (unlikely(errno == ENOMEM))
-  {
-    // Heap allocation in Vector() must have failed.
-    error = Err_MemoryAlloc;
-    return;
-  }
   if (unlikely(m_internalOpNo >= NDB_SPJ_MAX_TREE_NODES))
   {
     error = QRY_DEFINITION_TOO_LARGE;

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2012-03-30 13:18:59 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2012-04-11 10:13:24 +0000
@@ -3788,7 +3788,7 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_queryImpl(queryImpl),
   m_operationDef(def),
   m_parent(NULL),
-  m_children(def.getNoOfChildOperations()),
+  m_children(0),
   m_maxBatchRows(0),   // >0: User specified prefered value, ==0: Use default CFG values
   m_params(),
   m_resultBuffer(NULL),
@@ -3805,9 +3805,9 @@ NdbQueryOperationImpl::NdbQueryOperation
                 ? Parallelism_max : Parallelism_adaptive),
   m_rowSize(0xffffffff)
 { 
-  if (errno == ENOMEM)
+  if (m_children.expand(def.getNoOfChildOperations()))
   {
-    // Memory allocation in Vector() (for m_children) assumed to have failed.
+    // Memory allocation during Vector::expand() failed.
     queryImpl.setErrorCode(Err_MemoryAlloc);
     return;
   }

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.3 branch (magnus.blaudd:3876 to 3877) magnus.blaudd11 Apr