List:Commits« Previous MessageNext Message »
From:magnus.blaudd Date:April 11 2012 10:03am
Subject:bzr push into mysql-5.5-cluster-7.2 branch (magnus.blaudd:3880 to 3882)
View as plain text  
 3882 magnus.blaudd@stripped	2012-04-11 [merge]
      Merge 7.0->7.2

    removed:
      mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt
    added:
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java
    modified:
      mysql-test/suite/ndb/r/ndb_restore_misc.result
      mysql-test/suite/ndb/t/ndb_restore_misc.test
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java
      storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java
      storage/ndb/clusterj/clusterj-tie/logging.properties
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
      storage/ndb/src/common/util/version.cpp
 3881 magnus.blaudd@stripped	2012-04-11 [merge]
      Merge

    modified:
      mysql-test/suite/ndb_rpl/my.cnf
      mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf
      mysql-test/suite/rpl_ndb/my.cnf
      storage/ndb/compile-cluster
 3880 Ole John Aske	2012-04-11
      Part 2 fix for Bug#13945264: VECTOR TEMPLATE HAS NO SAFE WAY TO SIGNAL 'OUT OF MEMORY' ERRORS
            
      Changes usage of Vector in the SPJ API such that memory alloc failured
      from Vector is now detected by using Vector methods returning an 'int' != 0
      instead of checking 'errno == ENOMEM' 

    modified:
      storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_restore_misc.result'
--- a/mysql-test/suite/ndb/r/ndb_restore_misc.result	2011-08-17 10:36:01 +0000
+++ b/mysql-test/suite/ndb/r/ndb_restore_misc.result	2012-04-11 09:56:27 +0000
@@ -134,11 +134,12 @@ ForceVarPart: 1
 drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c,t10_c,t11_c;
 ForceVarPart: 0
 ForceVarPart: 1
-select * from information_schema.columns where table_name = "t1_c";
-TABLE_CATALOG	TABLE_SCHEMA	TABLE_NAME	COLUMN_NAME	ORDINAL_POSITION	COLUMN_DEFAULT	IS_NULLABLE	DATA_TYPE	CHARACTER_MAXIMUM_LENGTH	CHARACTER_OCTET_LENGTH	NUMERIC_PRECISION	NUMERIC_SCALE	CHARACTER_SET_NAME	COLLATION_NAME	COLUMN_TYPE	COLUMN_KEY	EXTRA	PRIVILEGES	COLUMN_COMMENT
-def	test	t1_c	capgoaledatta	1	NULL	NO	mediumint	NULL	NULL	7	0	NULL	NULL	mediumint(5) unsigned	PRI	auto_increment	#	
-def	test	t1_c	goaledatta	2		NO	char	2	2	NULL	NULL	latin1	latin1_swedish_ci	char(2)	PRI		#	
-def	test	t1_c	maturegarbagefa	3		NO	varchar	32	32	NULL	NULL	latin1	latin1_swedish_ci	varchar(32)	PRI		#	
+select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE
+from information_schema.columns where table_name = "t1_c";
+TABLE_SCHEMA	TABLE_NAME	COLUMN_NAME	ORDINAL_POSITION	DATA_TYPE
+test	t1_c	capgoaledatta	1	mediumint
+test	t1_c	goaledatta	2	char
+test	t1_c	maturegarbagefa	3	varchar
 select count(*) from t1;
 count(*)
 5

=== modified file 'mysql-test/suite/ndb/t/ndb_restore_misc.test'
--- a/mysql-test/suite/ndb/t/ndb_restore_misc.test	2011-07-19 10:54:29 +0000
+++ b/mysql-test/suite/ndb/t/ndb_restore_misc.test	2012-04-11 07:17:49 +0000
@@ -192,9 +192,8 @@ source show_varpart.inc;
 # Bug #30667
 # ndb table discovery does not work correcly with information schema
 # - prior to bug fix this would yeild no output and a warning
-# (priviliges differ on embedded and server so replace)
---replace_column 18 #
-select * from information_schema.columns where table_name = "t1_c";
+select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE
+  from information_schema.columns where table_name = "t1_c";
 
 # random output order??
 #show tables;

=== modified file 'mysql-test/suite/ndb_rpl/my.cnf'
--- a/mysql-test/suite/ndb_rpl/my.cnf	2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/ndb_rpl/my.cnf	2012-03-29 10:13:47 +0000
@@ -59,12 +59,6 @@ report-host=                  127.0.0.1
 report-port=                  @mysqld.1.slave.port
 report-user=                  root
 
-# Configure slave mysqld without innodb, and set myisam
-# as default storage engine(since innodb will be default
-# otherwise starting  from 5.5)
-loose-skip-innodb
-default-storage-engine=myisam
-
 skip-slave-start
 
 # Directory where slaves find the dumps generated by "load data"

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf	2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf	2012-03-29 10:13:47 +0000
@@ -47,9 +47,6 @@ relay-log=                    cluster2-r
 log-slave-updates
 ndb-log-apply-status
 
-loose-skip-innodb
-default-storage-engine=myisam
-
 # Directory where slaves find the dumps generated by "load data"
 # on the server. The path need to have constant length otherwise
 # test results will vary, thus a relative path is used.
@@ -62,9 +59,6 @@ relay-log=                    cluster3-r
 log-slave-updates
 ndb-log-apply-status
 
-loose-skip-innodb
-default-storage-engine=myisam
-
 # Directory where slaves find the dumps generated by "load data"
 # on the server. The path need to have constant length otherwise
 # test results will vary, thus a relative path is used.

=== removed file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt	2011-05-18 12:56:24 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt	1970-01-01 00:00:00 +0000
@@ -1,3 +0,0 @@
---max_relay_log_size=16384
---loose-innodb
---log-warnings

=== modified file 'mysql-test/suite/rpl_ndb/my.cnf'
--- a/mysql-test/suite/rpl_ndb/my.cnf	2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/rpl_ndb/my.cnf	2012-03-29 10:13:47 +0000
@@ -59,12 +59,6 @@ report-host=                  127.0.0.1
 report-port=                  @mysqld.1.slave.port
 report-user=                  root
 
-# Configure slave mysqld without innodb, and set myisam
-# as default storage engine(since innodb will be default
-# otherwise starting  from 5.5)
-loose-skip-innodb
-default-storage-engine=myisam
-
 skip-slave-start
 
 # Directory where slaves find the dumps generated by "load data"

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2012-04-02 20:43:14 +0000
@@ -202,7 +202,7 @@ public class SessionImpl implements Sess
                 }
                 if (smartValueHandler.found()) {
                     // create a new proxy (or dynamic instance) with the smart value handler
-                    return domainTypeHandler.newInstance(smartValueHandler, db);
+                    return domainTypeHandler.newInstance(smartValueHandler);
                 } else {
                     // not found
                     return null;
@@ -289,6 +289,16 @@ public class SessionImpl implements Sess
         return instance;
     }
 
+    /** Create an instance from a result data row.
+     * @param resultData the result of a query
+     * @param domainTypeHandler the domain type handler
+     * @return the instance
+     */
+    public <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler) {
+        T result = domainTypeHandler.newInstance(resultData, db);
+        return result;
+    }
+
     /** Load the instance from the database into memory. Loading
      * is asynchronous and will be executed when an operation requiring
      * database access is executed: find, flush, or query. The instance must

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java	2012-03-12 09:22:04 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java	2012-04-11 09:56:27 +0000
@@ -413,6 +413,10 @@ public abstract class AbstractDomainType
         throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
     }
 
+    public T newInstance(ResultData resultData, Db db) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
     public void objectMarkModified(ValueHandler handler, String fieldName) {
         throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
     }
@@ -450,7 +454,7 @@ public abstract class AbstractDomainType
         return reasons == null?null:reasons.toString();
     }
 
-    public T newInstance(ValueHandler valueHandler, Db db) {
+    public T newInstance(ValueHandler valueHandler) {
         throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
     }
 

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java	2012-04-02 20:43:14 +0000
@@ -18,6 +18,7 @@
 package com.mysql.clusterj.core.metadata;
 
 import com.mysql.clusterj.core.spi.DomainFieldHandler;
+import com.mysql.clusterj.core.spi.SmartValueHandler;
 import com.mysql.clusterj.core.spi.ValueHandlerFactory;
 import com.mysql.clusterj.core.spi.ValueHandler;
 import com.mysql.clusterj.ClusterJException;
@@ -36,6 +37,7 @@ import com.mysql.clusterj.core.store.Db;
 import com.mysql.clusterj.core.store.Index;
 import com.mysql.clusterj.core.store.Dictionary;
 import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationHandler;
@@ -422,11 +424,22 @@ public class DomainTypeHandlerImpl<T> ex
     @Override
     public T newInstance(Db db) {
         ValueHandler valueHandler = valueHandlerFactory.getValueHandler(this, db);
-        return newInstance(valueHandler, db);
+        return newInstance(valueHandler);
+    }
+
+    /** Create a new domain type instance from the result.
+     * @param resultData the results from a database query
+     * @param db the Db
+     * @return the domain type instance
+     */
+    public T newInstance(ResultData resultData, Db db) {
+        ValueHandler valueHandler = valueHandlerFactory.getValueHandler(this, db, resultData);
+        T result = newInstance(valueHandler);
+        return result;
     }
 
     @Override
-    public T newInstance(ValueHandler valueHandler, Db db) {
+    public T newInstance(ValueHandler valueHandler) {
         T instance;
         try {
             if (dynamic) {
@@ -608,7 +621,7 @@ public class DomainTypeHandlerImpl<T> ex
         Object[] result = new Object[numberOfFields];
         int i = 0;
         for (Integer idFieldNumber: idFieldNumbers) {
-            result[idFieldNumber] = keyValues[i];
+            result[idFieldNumber] = keyValues[i++];
         }
         return result;
     }
@@ -624,6 +637,7 @@ public class DomainTypeHandlerImpl<T> ex
 
     /** Factory for default InvocationHandlerImpl */
     protected ValueHandlerFactory defaultInvocationHandlerFactory = new ValueHandlerFactory()  {
+
         public <V> ValueHandler getValueHandler(DomainTypeHandlerImpl<V> domainTypeHandler, Db db) {
             return new InvocationHandlerImpl<V>(domainTypeHandler);
         }
@@ -633,6 +647,13 @@ public class DomainTypeHandlerImpl<T> ex
             Object[] expandedKeyValues = expandKeyValues(keyValues);
             return new KeyValueHandlerImpl(expandedKeyValues);
         }
+
+        public <V> ValueHandler getValueHandler(
+                DomainTypeHandlerImpl<V> domainTypeHandler, Db db, ResultData resultData) {
+            ValueHandler result = new InvocationHandlerImpl<V>(domainTypeHandler);
+            objectSetValues(resultData, result);
+            return result;
+        }
     };
 
     public int getNumberOfTransientFields() {

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java	2012-02-09 10:22:48 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java	2012-04-11 09:56:27 +0000
@@ -27,7 +27,6 @@ import com.mysql.clusterj.core.spi.Domai
 import com.mysql.clusterj.core.spi.DomainTypeHandler;
 import com.mysql.clusterj.core.spi.QueryExecutionContext;
 import com.mysql.clusterj.core.spi.SessionSPI;
-import com.mysql.clusterj.core.spi.ValueHandler;
 import com.mysql.clusterj.core.spi.ValueHandlerBatching;
 
 import com.mysql.clusterj.core.store.Index;
@@ -163,10 +162,7 @@ public class QueryDomainTypeImpl<T> impl
             ResultData resultData = getResultData(context);
             // put the result data into the result list
             while (resultData.next()) {
-                T row = (T) session.newInstance(cls);
-                ValueHandler handler =domainTypeHandler.getValueHandler(row);
-                // set values from result set into object
-                domainTypeHandler.objectSetValues(resultData, handler);
+                T row = session.newInstance(resultData, domainTypeHandler);
                 resultList.add(row);
             }
             session.endAutoTransaction();
@@ -206,6 +202,7 @@ public class QueryDomainTypeImpl<T> impl
         switch (scanType) {
 
             case PRIMARY_KEY: {
+                if (logger.isDetailEnabled()) logger.detail("Using primary key find for query.");
                 // perform a select operation
                 Operation op = session.getSelectOperation(domainTypeHandler.getStoreTable());
                 op.beginDefinition();
@@ -221,7 +218,7 @@ public class QueryDomainTypeImpl<T> impl
 
             case INDEX_SCAN: {
                 storeIndex = index.getStoreIndex();
-                if (logger.isDetailEnabled()) logger.detail("Using index scan with index " + index.getIndexName());
+                if (logger.isDetailEnabled()) logger.detail("Using index scan with ordered index " + index.getIndexName() + " for query.");
                 IndexScanOperation op;
                 // perform an index scan operation
                 if (index.isMultiRange()) {
@@ -231,27 +228,31 @@ public class QueryDomainTypeImpl<T> impl
                     op = session.getIndexScanOperation(storeIndex, domainTypeHandler.getStoreTable());
                     
                 }
+                op.beginDefinition();
                 // set the expected columns into the operation
                 domainTypeHandler.operationGetValues(op);
                 // set the bounds into the operation
                 index.operationSetBounds(context, op);
                 // set additional filter conditions
                 where.filterCmpValue(context, op);
+                op.endDefinition();
                 // execute the scan and get results
                 result = op.resultData();
                 break;
             }
 
             case TABLE_SCAN: {
-                if (logger.isDetailEnabled()) logger.detail("Using table scan");
+                if (logger.isDetailEnabled()) logger.detail("Using table scan for query.");
                 // perform a table scan operation
                 ScanOperation op = session.getTableScanOperation(domainTypeHandler.getStoreTable());
+                op.beginDefinition();
                 // set the expected columns into the operation
                 domainTypeHandler.operationGetValues(op);
-                // set the bounds into the operation
+                // set filter conditions into the operation
                 if (where != null) {
                     where.filterCmpValue(context, op);
                 }
+                op.endDefinition();
                 // execute the scan and get results
                 result = op.resultData();
                 break;
@@ -259,14 +260,16 @@ public class QueryDomainTypeImpl<T> impl
 
             case UNIQUE_KEY: {
                 storeIndex = index.getStoreIndex();
-                if (logger.isDetailEnabled()) logger.detail("Using unique lookup with index " + index.getIndexName());
+                if (logger.isDetailEnabled()) logger.detail("Using lookup with unique index " + index.getIndexName() + " for query.");
                 // perform a unique lookup operation
                 IndexOperation op = session.getUniqueIndexOperation(storeIndex, domainTypeHandler.getStoreTable());
+                op.beginDefinition();
                 // set the keys of the indexName into the operation
                 where.operationEqual(context, op);
                 // set the expected columns into the operation
                 //domainTypeHandler.operationGetValuesExcept(op, indexName);
                 domainTypeHandler.operationGetValues(op);
+                op.endDefinition();
                 // execute the select and get results
                 result = op.resultData();
                 break;

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java	2012-04-02 20:43:14 +0000
@@ -93,6 +93,8 @@ public interface DomainTypeHandler<T> {
 
     public void setUnsupported(String reason);
 
-    public T newInstance(ValueHandler valueHandler, Db db);
+    public T newInstance(ValueHandler valueHandler);
+
+    public T newInstance(ResultData resultData, Db db);
 
 }

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java	2011-11-22 22:01:23 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java	2012-04-02 20:43:14 +0000
@@ -98,6 +98,8 @@ public interface SessionSPI extends Sess
 
     <T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> handler);
 
+    <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler);
+
     String getCoordinatedTransactionId();
 
     void setCoordinatedTransactionId(String coordinatedTransactionId);

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java	2012-04-02 20:43:14 +0000
@@ -19,6 +19,7 @@ package com.mysql.clusterj.core.spi;
 
 import com.mysql.clusterj.core.store.ClusterTransaction;
 import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
 
 /** SmartValueHandler is the interface that must be implemented for
  * operations that bypass the normal value handler and directly

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java	2012-04-02 20:43:14 +0000
@@ -19,6 +19,7 @@ package com.mysql.clusterj.core.spi;
 
 import com.mysql.clusterj.core.metadata.DomainTypeHandlerImpl;
 import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.ResultData;
 
 /** ValueHandlerFactory allows a component to provide an alternative value handler.
  *
@@ -27,5 +28,8 @@ public interface ValueHandlerFactory {
 
     <T> ValueHandler getValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db);
 
+    <T> ValueHandler getValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db, ResultData resultData);
+
     <T> ValueHandler getKeyValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db, Object keyValues);
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java	2012-04-02 20:43:14 +0000
@@ -255,7 +255,12 @@ public class NdbOpenJPADomainTypeHandler
                 local.message("ERR_Implementation_Should_Not_Occur"));
     }
 
-    public T newInstance(ValueHandler valueHandler, Db db) {
+    public T newInstance(ValueHandler valueHandler) {
+        throw new ClusterJFatalInternalException(
+                local.message("ERR_Implementation_Should_Not_Occur"));
+    }
+
+    public T newInstance(ResultData resultData,Db db) {
         throw new ClusterJFatalInternalException(
                 local.message("ERR_Implementation_Should_Not_Occur"));
     }

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java	2012-04-04 06:22:39 +0000
@@ -23,6 +23,8 @@ import com.mysql.clusterj.Constants;
 import com.mysql.clusterj.Session;
 import com.mysql.clusterj.SessionFactory;
 import com.mysql.clusterj.Transaction;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -57,6 +59,10 @@ import junit.framework.TestCase;
  *
  */
 public abstract class AbstractClusterJTest extends TestCase {
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance("com.mysql.clusterj.test");
+
     protected static final String JDBC_DRIVER_NAME = "jdbc.driverName";
     protected static final String JDBC_URL = "jdbc.url";
     protected static Connection connection;
@@ -76,7 +82,7 @@ public abstract class AbstractClusterJTe
      *
      * Error messages collected during a test.
      */
-    private StringBuffer errorMessages;
+    protected StringBuffer errorMessages;
     /**
      *
      * A list of registered pc classes.

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java	2012-04-07 02:06:08 +0000
@@ -1,5 +1,5 @@
 /*
-  Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
 
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
@@ -80,6 +80,7 @@ public class BitTypesTest extends Abstra
             case 1: { // boolean
                 boolean data = (i % 2) == 0;
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (boolean)" + data);
                 return data;
             }
@@ -90,6 +91,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 2) + (int)(Math.random() * 2);
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (byte)" + data);
                 return Byte.valueOf((byte)data);
             }
@@ -100,6 +102,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 2) + (int)(Math.random() * 2);
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (short)" + data);
                 return Short.valueOf((short)data);
             }
@@ -111,6 +114,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 2) + ((int)(Math.random() * 2));
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (int)" + data);
                 // TODO bug in JDBC handling high bit
                 data = Math.abs(data);
@@ -124,6 +128,7 @@ public class BitTypesTest extends Abstra
                     data = (data * 256) + (i * 16) + d;
                 }
                 if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+                        + " " + columnDescriptors[j].getColumnName()
                         + "  is (long)" + data);
                 return Long.valueOf(data);
             }
@@ -148,13 +153,14 @@ public class BitTypesTest extends Abstra
             errorIfNotEqual(where + " got failure on id for row " + i, i, actual[0]);
             for (int j = 1; j < expected.length; ++j) {
                 if (getDebug()) System.out.println("BitTypesTest.verify for " + i + ", " + j
+                        + " " + columnDescriptors[j - 1].getColumnName()
                         + "  is (" + actual[j].getClass().getName() + ")" + actual[j]);
                 switch (j) {
                     case 1: { // boolean
                         Boolean expectedColumn = (Boolean)expected[j];
                         Boolean actualColumn = (Boolean)actual[j];
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 expectedColumn, actualColumn);
                         break;
                     }
@@ -163,7 +169,7 @@ public class BitTypesTest extends Abstra
                         byte actualColumn = (Byte)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
                         break;
                     }
@@ -172,7 +178,7 @@ public class BitTypesTest extends Abstra
                         short actualColumn = (Short)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
                         break;
                     }
@@ -182,7 +188,7 @@ public class BitTypesTest extends Abstra
                         int actualColumn = (Integer)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
                         break;
                     }
@@ -192,7 +198,7 @@ public class BitTypesTest extends Abstra
                         long actualColumn = (Long)actual[j];
                         // now compare bit by bit
                         errorIfNotEqual(where + " got failure on comparison of data for row "
-                                + i + " column " + j,
+                                + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
                                 Long.toHexString(expectedColumn), Long.toHexString(actualColumn));
                         break;
                    }
@@ -209,9 +215,8 @@ public class BitTypesTest extends Abstra
     }
 
     public void testWriteNDBReadJDBC() {
-//        TODO: investigate platform dependency when reading via JDBC
-//        writeNDBreadJDBC();
-//        failOnError();
+        writeNDBreadJDBC();
+        failOnError();
    }
 
     public void testWriteNDBReadNDB() {
@@ -220,9 +225,8 @@ public class BitTypesTest extends Abstra
    }
 
     public void testWriteJDBCReadJDBC() {
-//      TODO: investigate platform dependency when reading via JDBC
-//        writeJDBCreadJDBC();
-//        failOnError();
+        writeJDBCreadJDBC();
+        failOnError();
    }
 
    static ColumnDescriptor bit1 = new ColumnDescriptor

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java	2011-12-18 18:37:39 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java	2012-04-04 00:32:48 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2011, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -17,6 +17,8 @@
 
 package testsuite.clusterj;
 
+import java.io.File;
+
 import com.mysql.clusterj.ClusterJHelper;
 import com.mysql.clusterj.Dbug;
 
@@ -25,7 +27,9 @@ import com.mysql.clusterj.Dbug;
  */
 public class DbugTest extends AbstractClusterJTest{
 
-    static String tmpFileName = System.getProperty("MYSQL_TMP_DIR", "/tmp") + "/clusterj-test-dbug";
+    private static final String TMP_DIR_NAME = System.getProperty("java.io.tmpdir");
+    private static final String FILE_SEPARATOR = File.separator;
+    private static final String TMP_FILE_NAME = TMP_DIR_NAME + FILE_SEPARATOR + "clusterj-test-dbug";
 
     public boolean getDebug() {
         return false;
@@ -46,7 +50,7 @@ public class DbugTest extends AbstractCl
             return;
         }
         String originalState = "t";
-        String newState = "d,jointx:o," + tmpFileName;
+        String newState = "d,jointx:o," + TMP_FILE_NAME;
         dbug.set(originalState);
         String actualState = dbug.get();
         errorIfNotEqual("Failed to set original state", originalState, actualState);
@@ -58,17 +62,17 @@ public class DbugTest extends AbstractCl
         errorIfNotEqual("Failed to pop original state", originalState, actualState);
 
         dbug = ClusterJHelper.newDbug();
-        dbug.output(tmpFileName).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
+        dbug.output(TMP_FILE_NAME).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
         actualState = dbug.get();
         // keywords are stored LIFO
-        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + tmpFileName, actualState);
+        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + TMP_FILE_NAME, actualState);
         dbug.pop();
 
         dbug = ClusterJHelper.newDbug();
-        dbug.append(tmpFileName).trace().debug("a,b,c,d,e,f").set();
+        dbug.append(TMP_FILE_NAME).trace().debug("a,b,c,d,e,f").set();
         actualState = dbug.get();
         // keywords are stored LIFO
-        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + tmpFileName + ":t", actualState);
+        errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + TMP_FILE_NAME + ":t", actualState);
         dbug.pop();
 
         failOnError();

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java	2011-03-22 15:32:28 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java	2012-04-04 06:46:06 +0000
@@ -103,7 +103,7 @@ create table longintstringix (
     }
 
     public void testPrettyBigIn() {
-        int arraySize = 4096;
+        int arraySize = 20;
         Integer[] keys = new Integer[arraySize];
         for (int i = 0; i < arraySize; ++i) {
             keys[i] = i;

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java	2011-10-28 23:29:26 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java	2012-04-04 06:22:39 +0000
@@ -44,6 +44,7 @@ public class SchemaChangeTest extends Ab
     }
 
     public void testFind() {
+        logger.info("PLEASE IGNORE THE FOLLOWING EXPECTED SEVERE ERROR.");
         // change the schema (drop a column)
         executeSQL(modifyTableStatement);
         try {
@@ -80,6 +81,7 @@ public class SchemaChangeTest extends Ab
                 session.find(StringTypes.class, 0);
             }
         }
+        logger.info("PLEASE IGNORE THE PRECEDING EXPECTED SEVERE ERROR.\n");
         failOnError();
     }
 

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java	2012-04-02 20:43:14 +0000
@@ -110,7 +110,7 @@ public class CrazyDomainTypeHandlerFacto
                     throw new UnsupportedOperationException("Not supported yet.");
                 }
 
-                public T newInstance(ValueHandler valueHandler, Db db) {
+                public T newInstance(ValueHandler valueHandler) {
                     throw new UnsupportedOperationException("Not supported yet.");
                 }
 
@@ -208,6 +208,10 @@ public class CrazyDomainTypeHandlerFacto
                     throw new UnsupportedOperationException("Not supported yet.");
                }
 
+                public T newInstance(ResultData resultData, Db db) {
+                    throw new UnsupportedOperationException("Not supported yet.");
+                }
+
             };
         } else {
             return null;

=== modified file 'storage/ndb/clusterj/clusterj-tie/logging.properties'
--- a/storage/ndb/clusterj/clusterj-tie/logging.properties	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/logging.properties	2012-04-11 09:56:27 +0000
@@ -13,7 +13,7 @@
 # You should have received a copy of the GNU General Public License
 # along with this program; if not, write to the Free Software
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
-
+com.mysql.clusterj.test.level=INFO
 com.mysql.clusterj.bindings.level=INFO
 com.mysql.clusterj.core.level=INFO
 com.mysql.clusterj.core.metadata.level=INFO

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	2012-03-06 16:51:07 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	2012-04-02 20:43:14 +0000
@@ -18,6 +18,7 @@
 package com.mysql.clusterj.tie;
 
 import java.util.IdentityHashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +34,7 @@ import com.mysql.clusterj.ClusterJHelper
 
 import com.mysql.clusterj.core.spi.ValueHandlerFactory;
 import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.Index;
 import com.mysql.clusterj.core.store.Table;
 
 import com.mysql.clusterj.core.util.I18NHelper;
@@ -50,7 +52,7 @@ public class ClusterConnectionImpl
 
     /** My logger */
     static final Logger logger = LoggerFactoryService.getFactory()
-            .getInstance(com.mysql.clusterj.core.store.ClusterConnection.class);
+            .getInstance(ClusterConnectionImpl.class);
 
     /** Ndb_cluster_connection is wrapped by ClusterConnection */
     protected Ndb_cluster_connection clusterConnection;
@@ -195,7 +197,9 @@ public class ClusterConnectionImpl
     }
 
     /** 
-     * Get the cached NdbRecord implementation for this cluster connection.
+     * Get the cached NdbRecord implementation for the table
+     * used with this cluster connection. All columns are included
+     * in the NdbRecord.
      * Use a ConcurrentHashMap for best multithread performance.
      * There are three possibilities:
      * <ul><li>Case 1: return the already-cached NdbRecord
@@ -203,8 +207,7 @@ public class ClusterConnectionImpl
      * </li><li>Case 3: return the winner of a race with another thread
      * </li></ul>
      * @param storeTable the store table
-     * @param ndbDictionary the ndb dictionary
-     * @return the NdbRecordImpl
+     * @return the NdbRecordImpl for the table
      */
     protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
         String tableName = storeTable.getName();
@@ -239,17 +242,81 @@ public class ClusterConnectionImpl
         }
     }
 
-    /** Remove the cached NdbRecord associated with this table. This allows schema change to work.
+    /** 
+     * Get the cached NdbRecord implementation for the index and table
+     * used with this cluster connection.
+     * The NdbRecordImpl is cached under the name tableName+indexName.
+     * Only the key columns are included in the NdbRecord.
+     * Use a ConcurrentHashMap for best multithread performance.
+     * There are three possibilities:
+     * <ul><li>Case 1: return the already-cached NdbRecord
+     * </li><li>Case 2: return a new instance created by this method
+     * </li><li>Case 3: return the winner of a race with another thread
+     * </li></ul>
+     * @param storeTable the store table
+     * @param storeIndex the store index
+     * @return the NdbRecordImpl for the index
+     */
+    protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
+        String recordName = storeTable.getName() + "+" + storeIndex.getInternalName();
+        // find the NdbRecordImpl in the global cache
+        NdbRecordImpl result = ndbRecordImplMap.get(recordName);
+        if (result != null) {
+            // case 1
+            if (logger.isDebugEnabled())logger.debug("NdbRecordImpl found for " + recordName);
+            return result;
+        } else {
+            // dictionary is single thread
+            NdbRecordImpl newNdbRecordImpl;
+            synchronized (dictionaryForNdbRecord) {
+                // try again; another thread might have beat us
+                result = ndbRecordImplMap.get(recordName);
+                if (result != null) {
+                    return result;
+                }
+                newNdbRecordImpl = new NdbRecordImpl(storeIndex, storeTable, dictionaryForNdbRecord);   
+            }
+            NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(recordName, newNdbRecordImpl);
+            if (winner == null) {
+                // case 2: the previous value was null, so return the new (winning) value
+                if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + recordName);
+                return newNdbRecordImpl;
+            } else {
+                // case 3: another thread beat us, so return the winner and garbage collect ours
+                if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + recordName);
+                newNdbRecordImpl.releaseNdbRecord();
+                return winner;
+            }
+        }
+    }
+
+    /** Remove the cached NdbRecord(s) associated with this table. This allows schema change to work.
+     * All NdbRecords including any index NdbRecords will be removed. Index NdbRecords are named
+     * tableName+indexName.
      * @param tableName the name of the table
      */
     public void unloadSchema(String tableName) {
-        if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + tableName);
-        NdbRecordImpl ndbRecordImpl = ndbRecordImplMap.remove(tableName);
-        if (ndbRecordImpl != null) {
-            ndbRecordImpl.releaseNdbRecord();
+        // synchronize to avoid multiple threads unloading schema simultaneously
+        // it is possible although unlikely that another thread is adding an entry while 
+        // we are removing entries; if this occurs an error will be signaled here
+        synchronized(ndbRecordImplMap) {
+            Iterator<Map.Entry<String, NdbRecordImpl>> iterator = ndbRecordImplMap.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<String, NdbRecordImpl> entry = iterator.next();
+                String key = entry.getKey();
+                if (key.startsWith(tableName)) {
+                    // remove all records whose key begins with the table name; this will remove index records also
+                    if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + key);
+                    NdbRecordImpl record = entry.getValue();
+                    iterator.remove();
+                    if (record != null) {
+                        record.releaseNdbRecord();
+                    }
+                }
+            }
+            if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName);
+            dictionaryForNdbRecord.removeCachedTable(tableName);
         }
-        if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName);
-        dictionaryForNdbRecord.removeCachedTable(tableName);
     }
 
     public ValueHandlerFactory getSmartValueHandlerFactory() {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2012-04-02 20:43:14 +0000
@@ -22,7 +22,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 import com.mysql.clusterj.ClusterJDatastoreException;
-import com.mysql.clusterj.ClusterJException;
 import com.mysql.clusterj.ClusterJFatalInternalException;
 import com.mysql.clusterj.ClusterJHelper;
 import com.mysql.clusterj.LockMode;
@@ -55,6 +54,8 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
 import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst;
 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst;
 
 /**
  *
@@ -243,12 +244,18 @@ class ClusterTransactionImpl implements 
 
     public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, indexScanLockMode);
+        }
         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
         handleError(ndbIndex, ndbDictionary);
         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
         handleError(ndbOperation, ndbTransaction);
-        int lockMode = indexScanLockMode;
         int scanFlags = 0;
+        int lockMode = indexScanLockMode;
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            scanFlags = ScanFlag.SF_KeyInfo;
+        }
         int parallel = 0;
         int batch = 0;
         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -259,12 +266,19 @@ class ClusterTransactionImpl implements 
 
     public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, true, indexScanLockMode);
+        }
         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
         handleError(ndbIndex, ndbDictionary);
         NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
         handleError(ndbOperation, ndbTransaction);
+        int scanFlags = 0;
         int lockMode = indexScanLockMode;
-        int scanFlags = ScanFlag.SF_MultiRange;
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            scanFlags = ScanFlag.SF_KeyInfo;
+        }
+        scanFlags |= ScanFlag.SF_MultiRange;
         int parallel = 0;
         int batch = 0;
         int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -307,12 +321,18 @@ class ClusterTransactionImpl implements 
 
     public ScanOperation getTableScanOperation(Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordTableScanOperationImpl(this, storeTable, tableScanLockMode);
+        }
         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
         handleError(ndbTable, ndbDictionary);
         NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
         handleError(ndbScanOperation, ndbTransaction);
         int lockMode = tableScanLockMode;
         int scanFlags = 0;
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            scanFlags = ScanFlag.SF_KeyInfo;
+        }
         int parallel = 0;
         int batch = 0;
         int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -339,6 +359,9 @@ class ClusterTransactionImpl implements 
 
     public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
         enlist();
+        if (USE_NDBRECORD) {
+            return new NdbRecordUniqueKeyOperationImpl(this, storeIndex, storeTable);
+        }
         IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
         handleError(ndbIndex, ndbDictionary);
         NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
@@ -414,6 +437,33 @@ class ClusterTransactionImpl implements 
         return operation;
     }
 
+    /** Create a table scan operation using NdbRecord.
+     * 
+     * @param ndbRecord the NdbRecord for the result
+     * @param mask the columns to read
+     * @param options the scan options
+     * @return
+     */
+    public NdbScanOperation scanTable(NdbRecordConst ndbRecord, byte[] mask, ScanOptionsConst options) {
+        enlist();
+        int lockMode = tableScanLockMode;
+        NdbScanOperation operation = ndbTransaction.scanTable(ndbRecord, lockMode, mask, options, 0);
+        handleError(operation, ndbTransaction);
+        return operation;
+    }
+
+    /** Create a scan operation on the index using NdbRecord. 
+     * 
+     * @param ndbRecord the ndb record
+     * @param mask the mask that specifies which columns to read
+     * @param object scan options // TODO change this
+     * @return
+     */
+    public NdbIndexScanOperation scanIndex(NdbRecordConst key_record, NdbRecordConst result_record,
+            byte[] result_mask, ScanOptions scanOptions) {
+        return ndbTransaction.scanIndex(key_record, result_record, indexScanLockMode, result_mask, null, scanOptions, 0);
+    }
+
     /** Create an NdbOperation for delete using NdbRecord.
      * 
      * @param ndbRecord the NdbRecord
@@ -655,6 +705,16 @@ class ClusterTransactionImpl implements 
         return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable);
     }
 
+    /** Get the cached NdbRecordImpl for this index and table. The NdbRecordImpl is cached in the
+     * cluster connection.
+     * @param storeTable the table
+     * @param storeIndex the index
+     * @return
+     */
+    protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
+        return clusterConnectionImpl.getCachedNdbRecordImpl(storeIndex, storeTable);
+    }
+
     /** 
      * Add an operation to check for errors after execute.
      * @param op the operation to check

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2012-04-02 20:43:14 +0000
@@ -95,7 +95,10 @@ class DbImpl implements com.mysql.cluste
     }
 
     public void close() {
-        Ndb.delete(ndb);
+        if (ndb != null) {
+            Ndb.delete(ndb);
+            ndb = null;
+        }
         clusterConnection.close(this);
     }
 
@@ -142,7 +145,7 @@ class DbImpl implements com.mysql.cluste
     /** Enlist an NdbTransaction using table and key data to specify 
      * the transaction coordinator.
      * 
-     * @param table the table
+     * @param tableName the name of the table
      * @param keyParts the list of partition key parts
      * @return the ndbTransaction
      */
@@ -199,8 +202,8 @@ class DbImpl implements com.mysql.cluste
      * the transaction coordinator. This method is also used if
      * the key data is null.
      * 
-     * @param table the table
-     * @param keyParts the list of partition key parts
+     * @param tableName the name of the table
+     * @param partitionId the partition id
      * @return the ndbTransaction
      */
     public NdbTransaction enlist(String tableName, int partitionId) {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java	2012-04-08 20:50:07 +0000
@@ -26,12 +26,13 @@ import java.nio.ByteOrder;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.mysql.clusterj.ClusterJDatastoreException;
 import com.mysql.clusterj.ClusterJFatalInternalException;
 import com.mysql.clusterj.ClusterJFatalUserException;
 import com.mysql.clusterj.ClusterJUserException;
+import com.mysql.clusterj.ColumnType;
 
 import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Index;
 import com.mysql.clusterj.core.store.Table;
 
 import com.mysql.clusterj.core.util.I18NHelper;
@@ -44,15 +45,23 @@ import com.mysql.ndbjtie.ndbapi.NdbRecor
 import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecification;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecificationArray;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
 
 /**
- * Wrapper around an NdbRecord. The default implementation can be used for create, read, update, or delete
- * using an NdbRecord that defines every column in the table. After construction, the instance is
+ * Wrapper around an NdbRecord. Operations may use one or two instances.
+ * <ul><li>The table implementation can be used for create, read, update, or delete
+ * using an NdbRecord that defines every column in the table.
+ * </li><li>The index implementation for unique indexes can be used with a unique lookup operation.
+ * </li><li>The index implementation for ordered (non-unique) indexes can be used with an index scan operation.
+ * </li></ul>
+ * After construction, the instance is
  * read-only and can be shared among all threads that use the same cluster connection; and the size of the
- * buffer required for operations is available. The NdbRecord instance is released when the cluster
+ * buffer required for operations is available. 
+ * Methods on the instance generally require a buffer to be passed, which is modified by the method.
+ * The NdbRecord instance is released when the cluster
  * connection is closed or when schema change invalidates it. Column values can be set using a provided
  * buffer and buffer manager.
  */
@@ -79,12 +88,15 @@ public class NdbRecordImpl {
     private RecordSpecificationArray recordSpecificationArray;
 
     /** The NdbTable */
-    TableConst tableConst;
+    TableConst tableConst = null;
 
-    /** The size of the receive buffer for this operation */
+    /** The NdbIndex, which will be null for complete-table instances */
+    IndexConst indexConst = null;
+
+    /** The size of the buffer for this NdbRecord */
     protected int bufferSize;
 
-    /** The maximum column id for this operation */
+    /** The maximum column id for this NdbRecord */
     protected int maximumColumnId;
 
     /** The offsets into the buffer for each column */
@@ -115,14 +127,16 @@ public class NdbRecordImpl {
     private Dictionary ndbDictionary;
 
     /** Number of columns for this NdbRecord */
-    private int numberOfColumns;
+    private int numberOfTableColumns;
 
     /** These fields are only used during construction of the RecordSpecificationArray */
     int offset = 0;
     int nullablePosition = 0;
     byte[] defaultValues;
 
-    /** Constructor used for insert operations that do not need to read data.
+    private int[] recordSpecificationIndexes = null;
+
+    /** Constructor for table operations.
      * 
      * @param storeTable the store table
      * @param ndbDictionary the ndb dictionary
@@ -130,14 +144,41 @@ public class NdbRecordImpl {
     protected NdbRecordImpl(Table storeTable, Dictionary ndbDictionary) {
         this.ndbDictionary = ndbDictionary;
         this.tableConst = getNdbTable(storeTable.getName());
-        this.numberOfColumns = tableConst.getNoOfColumns();
-        this.recordSpecificationArray = RecordSpecificationArray.create(numberOfColumns);
-        this.offsets = new int[numberOfColumns];
-        this.lengths = new int[numberOfColumns];
-        this.nullbitBitInByte = new int[numberOfColumns];
-        this.nullbitByteOffset = new int[numberOfColumns];
-        this.storeColumns = new Column[numberOfColumns];
+        this.numberOfTableColumns = tableConst.getNoOfColumns();
+        this.recordSpecificationArray = RecordSpecificationArray.create(numberOfTableColumns);
+        recordSpecificationIndexes = new int[numberOfTableColumns];
+        this.offsets = new int[numberOfTableColumns];
+        this.lengths = new int[numberOfTableColumns];
+        this.nullbitBitInByte = new int[numberOfTableColumns];
+        this.nullbitByteOffset = new int[numberOfTableColumns];
+        this.storeColumns = new Column[numberOfTableColumns];
         this.ndbRecord = createNdbRecord(storeTable, ndbDictionary);
+        if (logger.isDetailEnabled()) logger.detail(storeTable.getName() + " " + dumpDefinition());
+        initializeDefaultBuffer();
+    }
+
+    /** Constructor for index operations. The NdbRecord has columns just for
+     * the columns in the index. 
+     * 
+     * @param storeIndex the store index
+     * @param storeTable the store table
+     * @param ndbDictionary the ndb dictionary
+     */
+    protected NdbRecordImpl(Index storeIndex, Table storeTable, Dictionary ndbDictionary) {
+        this.ndbDictionary = ndbDictionary;
+        this.tableConst = getNdbTable(storeTable.getName());
+        this.indexConst = getNdbIndex(storeIndex.getInternalName(), tableConst.getName());
+        this.numberOfTableColumns = tableConst.getNoOfColumns();
+        int numberOfIndexColumns = this.indexConst.getNoOfColumns();
+        this.recordSpecificationArray = RecordSpecificationArray.create(numberOfIndexColumns);
+        recordSpecificationIndexes = new int[numberOfTableColumns];
+        this.offsets = new int[numberOfTableColumns];
+        this.lengths = new int[numberOfTableColumns];
+        this.nullbitBitInByte = new int[numberOfTableColumns];
+        this.nullbitByteOffset = new int[numberOfTableColumns];
+        this.storeColumns = new Column[numberOfTableColumns];
+        this.ndbRecord = createNdbRecord(storeIndex, storeTable, ndbDictionary);
+        if (logger.isDetailEnabled()) logger.detail(storeIndex.getInternalName() + " " + dumpDefinition());
         initializeDefaultBuffer();
     }
 
@@ -152,9 +193,10 @@ public class NdbRecordImpl {
         zeros.order(ByteOrder.nativeOrder());
         // just to be sure, initialize with zeros
         zeros.put(defaultValues);
+        // not all columns are set at this point, so only check for those that are set
         for (Column storeColumn: storeColumns) {
             // nullable columns get the null bit set
-            if (storeColumn.getNullable()) {
+            if (storeColumn != null && storeColumn.getNullable()) {
                 setNull(zeros, storeColumn);
             }
         }
@@ -170,13 +212,22 @@ public class NdbRecordImpl {
      */
     protected ByteBuffer newBuffer() {
         ByteBuffer result = ByteBuffer.allocateDirect(bufferSize);
-        result.order(ByteOrder.nativeOrder());
-        result.put(defaultValues);
-        result.limit(bufferSize);
-        result.position(0);
+        initializeBuffer(result);
         return result;
     }
 
+    /** Initialize an already-allocated buffer with default values for all columns.
+     * 
+     * @param buffer
+     */
+    protected void initializeBuffer(ByteBuffer buffer) {
+        buffer.order(ByteOrder.nativeOrder());
+        buffer.limit(bufferSize);
+        buffer.position(0);
+        buffer.put(defaultValues);
+        buffer.position(0);
+    }
+
     public int setNull(ByteBuffer buffer, Column storeColumn) {
         int columnId = storeColumn.getColumnId();
         if (!storeColumn.getNullable()) {
@@ -217,11 +268,11 @@ public class NdbRecordImpl {
     public int setByte(ByteBuffer buffer, Column storeColumn, byte value) {
         resetNull(buffer, storeColumn);
         int columnId = storeColumn.getColumnId();
-        if (storeColumn.getLength() == 4) {
+        if (storeColumn.getType() == ColumnType.Bit) {
             // the byte is stored as a BIT array of four bytes
-            buffer.putInt(offsets[columnId], value);
+            buffer.putInt(offsets[columnId], value & 0xff);
         } else {
-            buffer.put(offsets[columnId], (byte)value);
+            buffer.put(offsets[columnId], value);
         }
         buffer.limit(bufferSize);
         buffer.position(0);
@@ -232,8 +283,13 @@ public class NdbRecordImpl {
         resetNull(buffer, storeColumn);
         int columnId = storeColumn.getColumnId();
         int offset = offsets[columnId];
-        int length = storeColumn.getLength() + storeColumn.getPrefixLength();
-        buffer.limit(offset + length);
+        int length = storeColumn.getLength();
+        int prefixLength = storeColumn.getPrefixLength();
+        if (length < value.length) {
+            throw new ClusterJUserException(local.message("ERR_Data_Too_Long",
+                    storeColumn.getName(), length, value.length));
+        }
+        buffer.limit(offset + prefixLength + length);
         buffer.position(offset);
         Utility.convertValue(buffer, storeColumn, value);
         buffer.limit(bufferSize);
@@ -287,7 +343,7 @@ public class NdbRecordImpl {
         int columnId = storeColumn.getColumnId();
         if (storeColumn.getLength() == 4) {
             // the short is stored as a BIT array of four bytes
-            buffer.putInt(offsets[columnId], value);
+            buffer.putInt(offsets[columnId], value & 0xffff);
         } else {
             buffer.putShort(offsets[columnId], (short)value);
         }
@@ -298,15 +354,16 @@ public class NdbRecordImpl {
         resetNull(buffer, storeColumn);
         int columnId = storeColumn.getColumnId();
         int offset = offsets[columnId];
-        int length = storeColumn.getLength() + storeColumn.getPrefixLength();
+        int prefixLength = storeColumn.getPrefixLength();
+        int length = storeColumn.getLength() + prefixLength;
         buffer.limit(offset + length);
         buffer.position(offset);
         // TODO provide the buffer to Utility.encode to avoid copying
         // for now, use the encode method to encode the value then copy it
         ByteBuffer converted = Utility.encode(value, storeColumn, bufferManager);
         if (length < converted.remaining()) {
-            throw new ClusterJUserException(local.message("ERR_Data_Too_Large",
-                    storeColumn.getName(), length, converted.remaining()));
+            throw new ClusterJUserException(local.message("ERR_Data_Too_Long",
+                    storeColumn.getName(), length - prefixLength, converted.remaining() - prefixLength));
         }
         buffer.put(converted);
         buffer.limit(bufferSize);
@@ -321,9 +378,9 @@ public class NdbRecordImpl {
 
     public byte getByte(ByteBuffer buffer, int columnId) {
         Column storeColumn = storeColumns[columnId];
-        if (storeColumn.getLength() == 4) {
+        if (storeColumn.getType() == ColumnType.Bit) {
             // the byte was stored in a BIT column as four bytes
-            return (byte)buffer.get(offsets[columnId]);
+            return (byte)(buffer.getInt(offsets[columnId]));
         } else {
             // the byte was stored as a byte
             return buffer.get(offsets[columnId]);
@@ -391,7 +448,7 @@ public class NdbRecordImpl {
         Column storeColumn = storeColumns[columnId];
         if (storeColumn.getLength() == 4) {
             // the short was stored in a BIT column as four bytes
-            return (short)buffer.get(offsets[columnId]);
+            return (short)buffer.getInt(offsets[columnId]);
         } else {
             // the short was stored as a short
             return buffer.getShort(offsets[columnId]);
@@ -599,8 +656,35 @@ public class NdbRecordImpl {
         }
     }
 
+    protected NdbRecord createNdbRecord(Index storeIndex, Table storeTable, Dictionary ndbDictionary) {
+        String[] columnNames = storeIndex.getColumnNames();
+        // analyze columns; sort into alignment buckets, allocate space in the buffer
+        // and build the record specification array
+        analyzeColumns(storeTable, columnNames);
+        // create the NdbRecord
+        NdbRecord result = ndbDictionary.createRecord(indexConst, tableConst, recordSpecificationArray,
+                columnNames.length, SIZEOF_RECORD_SPECIFICATION, 0);
+        // delete the RecordSpecificationArray since it is no longer needed
+        RecordSpecificationArray.delete(recordSpecificationArray);
+        handleError(result, ndbDictionary);
+        return result;
+    }
+
     protected NdbRecord createNdbRecord(Table storeTable, Dictionary ndbDictionary) {
         String[] columnNames = storeTable.getColumnNames();
+        // analyze columns; sort into alignment buckets, allocate space in the buffer,
+        // and build the record specification array
+        analyzeColumns(storeTable, columnNames);
+        // create the NdbRecord
+        NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
+                columnNames.length, SIZEOF_RECORD_SPECIFICATION, 0);
+        // delete the RecordSpecificationArray since it is no longer needed
+        RecordSpecificationArray.delete(recordSpecificationArray);
+        handleError(result, ndbDictionary);
+        return result;
+    }
+
+    private void analyzeColumns(Table storeTable, String[] columnNames) {
         List<Column> align8 = new ArrayList<Column>();
         List<Column> align4 = new ArrayList<Column>();
         List<Column> align2 = new ArrayList<Column>();
@@ -609,6 +693,8 @@ public class NdbRecordImpl {
         int i = 0;
         for (String columnName: columnNames) {
             Column storeColumn = storeTable.getColumn(columnName);
+            int columnId = storeColumn.getColumnId();
+            recordSpecificationIndexes[columnId] = i;
             if (logger.isDetailEnabled()) logger.detail("storeColumn: " + storeColumn.getName() + " id: " + storeColumn.getColumnId() + " index: " + i);
             lengths[i] = storeColumn.getLength();
             storeColumns[i++] = storeColumn;
@@ -671,28 +757,20 @@ public class NdbRecordImpl {
         offset = (7 + offset) / 8 * 8;
         nullIndicatorSize = offset;
         for (Column storeColumn: align8) {
-            handleColumn(8, storeColumn);
+            analyzeColumn(8, storeColumn);
         }
         for (Column storeColumn: align4) {
-            handleColumn(4, storeColumn);
+            analyzeColumn(4, storeColumn);
         }
         for (Column storeColumn: align2) {
-            handleColumn(2, storeColumn);
+            analyzeColumn(2, storeColumn);
         }
         for (Column storeColumn: align1) {
-            handleColumn(1, storeColumn);
+            analyzeColumn(1, storeColumn);
         }
         bufferSize = offset;
 
         if (logger.isDebugEnabled()) logger.debug(dumpDefinition());
-
-        // now create an NdbRecord
-        NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
-                numberOfColumns, SIZEOF_RECORD_SPECIFICATION, 0);
-        // delete the RecordSpecificationArray since it is no longer needed
-        RecordSpecificationArray.delete(recordSpecificationArray);
-        handleError(result, ndbDictionary);
-        return result;
     }
 
     /** Create a record specification for a column. Keep track of the offset into the buffer
@@ -701,9 +779,10 @@ public class NdbRecordImpl {
      * @param alignment the alignment for this column in the buffer
      * @param storeColumn the column
      */
-    private void handleColumn(int alignment, Column storeColumn) {
+    private void analyzeColumn(int alignment, Column storeColumn) {
         int columnId = storeColumn.getColumnId();
-        RecordSpecification recordSpecification = recordSpecificationArray.at(columnId);
+        int recordSpecificationIndex = recordSpecificationIndexes[columnId];
+        RecordSpecification recordSpecification = recordSpecificationArray.at(recordSpecificationIndex);
         ColumnConst columnConst = tableConst.getColumn(columnId);
         recordSpecification.column(columnConst);
         recordSpecification.offset(offset);
@@ -727,21 +806,23 @@ public class NdbRecordImpl {
     private String dumpDefinition() {
         StringBuilder builder = new StringBuilder(tableConst.getName());
         builder.append(" numberOfColumns: ");
-        builder.append(numberOfColumns);
+        builder.append(numberOfTableColumns);
         builder.append('\n');
-        for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
+        for (int columnId = 0; columnId < numberOfTableColumns; ++columnId) {
             Column storeColumn = storeColumns[columnId];
-            builder.append(" column: ");
-            builder.append(storeColumn.getName());
-            builder.append(" offset: ");
-            builder.append(offsets[columnId]);
-            builder.append(" length: ");
-            builder.append(lengths[columnId]);
-            builder.append(" nullbitBitInByte: ");
-            builder.append(nullbitBitInByte[columnId]);
-            builder.append(" nullbitByteOffset: ");
-            builder.append(nullbitByteOffset[columnId]);
-            builder.append('\n');
+            if (storeColumn != null) {
+                builder.append(" column: ");
+                builder.append(storeColumn.getName());
+                builder.append(" offset: ");
+                builder.append(offsets[columnId]);
+                builder.append(" length: ");
+                builder.append(lengths[columnId]);
+                builder.append(" nullbitBitInByte: ");
+                builder.append(nullbitBitInByte[columnId]);
+                builder.append(" nullbitByteOffset: ");
+                builder.append(nullbitByteOffset[columnId]);
+                builder.append('\n');
+            }
         }
         return builder.toString();
     }
@@ -749,37 +830,39 @@ public class NdbRecordImpl {
     public String dumpValues(ByteBuffer data, byte[] mask) {
         StringBuilder builder = new StringBuilder(tableConst.getName());
         builder.append(" numberOfColumns: ");
-        builder.append(numberOfColumns);
+        builder.append(numberOfTableColumns);
         builder.append('\n');
-        for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
+        for (int columnId = 0; columnId < numberOfTableColumns; ++columnId) {
             Column storeColumn = storeColumns[columnId];
-            builder.append(" column: ");
-            builder.append(storeColumn.getName());
-            builder.append(" offset: ");
-            builder.append(offsets[columnId]);
-            builder.append(" length: ");
-            builder.append(lengths[columnId]);
-            builder.append(" nullbitBitInByte: ");
-            int nullBitInByte = nullbitBitInByte[columnId];
-            builder.append(nullBitInByte);
-            builder.append(" nullbitByteOffset: ");
-            int nullByteOffset = nullbitByteOffset[columnId];
-            builder.append(nullByteOffset);
-            builder.append(" data: ");
-            int size = storeColumn.getColumnSpace() != 0 ? storeColumn.getColumnSpace():storeColumn.getSize();
-            int offset = offsets[columnId];
-            data.limit(bufferSize);
-            data.position(0);
-            for (int index = offset; index < offset + size; ++index) {
-                builder.append(String.format("%2x ", data.get(index)));
-            }
-            builder.append(" null: ");
-            builder.append(isNull(data, columnId));
-            builder.append(" present: ");
-            if (mask != null) {
-                builder.append(isPresent(mask, columnId));
+            if (storeColumn != null) {
+                builder.append(" column: ");
+                builder.append(storeColumn.getName());
+                builder.append(" offset: ");
+                builder.append(offsets[columnId]);
+                builder.append(" length: ");
+                builder.append(lengths[columnId]);
+                builder.append(" nullbitBitInByte: ");
+                int nullBitInByte = nullbitBitInByte[columnId];
+                builder.append(nullBitInByte);
+                builder.append(" nullbitByteOffset: ");
+                int nullByteOffset = nullbitByteOffset[columnId];
+                builder.append(nullByteOffset);
+                builder.append(" data: ");
+                int size = storeColumn.getColumnSpace() != 0 ? storeColumn.getColumnSpace():storeColumn.getSize();
+                int offset = offsets[columnId];
+                data.limit(bufferSize);
+                data.position(0);
+                for (int index = offset; index < offset + size; ++index) {
+                    builder.append(String.format("%2x ", data.get(index)));
+                }
+                builder.append(" null: ");
+                builder.append(isNull(data, columnId));
+                if (mask != null) {
+                    builder.append(" present: ");
+                    builder.append(isPresent(mask, columnId));
+                }
+                builder.append('\n');
             }
-            builder.append('\n');
         }
         data.position(0);
         return builder.toString();
@@ -791,9 +874,28 @@ public class NdbRecordImpl {
             // try the lower case table name
             ndbTable = ndbDictionary.getTable(tableName.toLowerCase());
         }
+        if (ndbTable == null) {
+            Utility.throwError(ndbTable, ndbDictionary.getNdbError(), tableName);
+        }
         return ndbTable;
     }
 
+    TableConst getNdbTable() {
+        return tableConst;
+    }
+
+    IndexConst getNdbIndex(String indexName, String tableName) {
+        IndexConst ndbIndex = ndbDictionary.getIndex(indexName, tableName);
+        if (ndbIndex == null) {
+            Utility.throwError(ndbIndex, ndbDictionary.getNdbError(),  tableName+ "+" + indexName);
+        }
+        return ndbIndex;
+    }
+
+    IndexConst getNdbIndex() {
+        return indexConst;
+    }
+
     public int getBufferSize() {
         return bufferSize;
     }
@@ -803,7 +905,7 @@ public class NdbRecordImpl {
     }
 
     public int getNumberOfColumns() {
-        return numberOfColumns;
+        return numberOfTableColumns;
     }
 
     protected void releaseNdbRecord() {

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java	2012-04-05 15:12:21 +0000
@@ -0,0 +1,359 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJFatalInternalException;
+
+import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Index;
+import com.mysql.clusterj.core.store.IndexScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
+
+/** NdbRecordIndexScanOperationImpl performs index scans using NdbRecord.
+ * Two NdbRecordImpl instances are used: one to define the bounds (low and high)
+ * and one to define the result. The superclass NdbRecordScanOperationImpl
+ * holds the NdbRecordImpl and buffer that define and hold the result.
+ * <p>
+ * This instance declares and holds the bounds while they are being defined.
+ * Bounds are handled by creating two bound buffers: one for the low bound
+ * and a second for the high bound. While the bounds are being created, the
+ * number of columns and the strictness of the bound are recorded.
+ * <p>
+ * Bounds are calculated elsewhere based on the query parameters and delivered, in sequence,
+ * to this instance. Bounds are delivered for the most significant index column first,
+ * followed by the next most significant index column, until all columns that have bounds
+ * have been delivered. There may be more columns for the low bound versus the high bound,
+ * or vice versa. For each bound that is delivered, the method (assignBoundBuffer) determines 
+ * to which bound, low or high, the bound belongs. The column count is incremented
+ * for the appropriate bound buffer. The value is then applied to the bound buffer using
+ * the setXXX method of the NdbRecordImpl that manages the layout of the bound buffer.
+ * <p>
+ * The superclass declares and holds the filter while it is being defined.
+ * <p>
+ * At endDefinition, the filter is used to create the scanOptions which is
+ * passed to create the NdbIndexScanOperation. Then the bounds are set into
+ * the newly created NdbIndexScanOperation.
+ * The resulting NdbIndexScanOperation is iterated (scanned) by the NdbRecordResultDataImpl.
+ */
+public class NdbRecordIndexScanOperationImpl extends NdbRecordScanOperationImpl implements IndexScanOperation {
+
+    /** The ndb index scan operation */
+    private NdbIndexScanOperation ndbIndexScanOperation;
+
+    /** The range for this bound */
+    private int indexBoundRange = 0;
+
+    /** The buffer that contains low bounds for all index columns */
+    private ByteBuffer indexBoundLowBuffer = null;
+
+    /** The number of columns in the low bound */
+    private int indexBoundLowCount = 0;
+
+    /** Is the low bound strict? */
+    private boolean indexBoundLowStrict = false;
+
+    /** The buffer that contains high bounds for all index columns */
+    private ByteBuffer indexBoundHighBuffer = null;
+
+    /** The number of columns in the high bound */
+    private int indexBoundHighCount = 0;
+
+    /** Is the high bound strict? */
+    private boolean indexBoundHighStrict = false;
+
+    /** Is this an equal scan? */
+    private boolean equalScan = true;
+
+    /** The list of index bounds already defined; null for a single range */
+    List<NdbIndexScanOperation.IndexBound> ndbIndexBoundList = null;
+
+    public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
+            Index storeIndex, Table storeTable, int lockMode) {
+        this(clusterTransaction, storeIndex, storeTable, false, lockMode);
+    }
+
+    public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
+                Index storeIndex, Table storeTable, boolean multiRange, int lockMode) {
+        super(clusterTransaction, storeTable, lockMode);
+        this.multiRange = multiRange;
+        if (this.multiRange) {
+            ndbIndexBoundList = new ArrayList<NdbIndexScanOperation.IndexBound>();
+        }
+        ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeIndex, storeTable);
+        keyBufferSize = ndbRecordKeys.bufferSize;
+        indexBoundLowBuffer = ndbRecordKeys.newBuffer();
+        indexBoundHighBuffer = ndbRecordKeys.newBuffer();
+    }
+
+    public void endDefinition() {
+        // get the scan options which also sets the filter
+        getScanOptions();
+        if (logger.isDetailEnabled()) logger.detail("scan options present " + dumpScanOptions(scanOptions.optionsPresent(), scanOptions.scan_flags()));
+
+        // create the scan operation
+        ndbIndexScanOperation = clusterTransaction.scanIndex(
+                ndbRecordKeys.getNdbRecord(), ndbRecordValues.getNdbRecord(), mask, scanOptions);
+        ndbOperation = ndbIndexScanOperation;
+
+        // set the bounds, either from the single indexBound or from multiple ranges
+        if (ndbIndexBoundList != null) {
+            if (logger.isDetailEnabled()) logger.detail("list size " + ndbIndexBoundList.size());
+            // apply all of the bounds to the operation
+            for (NdbIndexScanOperation.IndexBound ndbIndexBound: ndbIndexBoundList) {
+                int returnCode = ndbIndexScanOperation.setBound(ndbRecordKeys.getNdbRecord(), ndbIndexBound);
+                handleError(returnCode, ndbIndexScanOperation);
+            }
+        } else {
+            // only one range defined
+            NdbIndexScanOperation.IndexBound ndbIndexBound = getNdbIndexBound();
+            int returnCode = ndbIndexScanOperation.setBound(ndbRecordKeys.getNdbRecord(), ndbIndexBound);
+            handleError(returnCode, ndbIndexScanOperation);
+        }
+    }
+
+    public void setBoundBigInteger(Column storeColumn, BoundType type, BigInteger value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setBigInteger(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setBigInteger(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setBigInteger(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundByte(Column storeColumn, BoundType type, byte value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setByte(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setByte(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setByte(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundBytes(Column storeColumn, BoundType type, byte[] value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setBytes(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setBytes(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setBytes(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundDecimal(Column storeColumn, BoundType type, BigDecimal value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setDecimal(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setDecimal(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setDecimal(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundDouble(Column storeColumn, BoundType type, Double value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setDouble(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setDouble(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setDouble(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundFloat(Column storeColumn, BoundType type, Float value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setFloat(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setFloat(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setFloat(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundShort(Column storeColumn, BoundType type, short value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setShort(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setShort(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setShort(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundInt(Column storeColumn, BoundType type, Integer value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setInt(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setInt(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setInt(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundLong(Column storeColumn, BoundType type, long value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setLong(indexBoundLowBuffer, storeColumn, value);
+            ndbRecordKeys.setLong(indexBoundHighBuffer, storeColumn, value);
+        } else {
+            ndbRecordKeys.setLong(keyBuffer, storeColumn, value);
+        }
+    }
+
+    public void setBoundString(Column storeColumn, BoundType type, String value) {
+        if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+        // calculate the bound data, the buffer, and the strictness
+        ByteBuffer keyBuffer = assignBoundBuffer(type);
+        if (keyBuffer == null) {
+            // BoundEQ put data into both buffers
+            ndbRecordKeys.setString(indexBoundLowBuffer, bufferManager, storeColumn, value);
+            ndbRecordKeys.setString(indexBoundHighBuffer, bufferManager, storeColumn, value);
+        } else {
+            ndbRecordKeys.setString(keyBuffer, bufferManager, storeColumn, value);
+        }
+    }
+
+    public void endBound(int rangeNumber) {
+        if (logger.isDetailEnabled()) logger.detail("range: " + rangeNumber);
+        indexBoundRange = rangeNumber;
+        ndbIndexBoundList.add(getNdbIndexBound());
+    }
+
+    private ByteBuffer assignBoundBuffer(BoundType type) {
+        switch (type) {
+            case BoundEQ:
+                indexBoundHighCount++;
+                indexBoundLowCount++;
+                return null;
+            case BoundGE:
+                equalScan = false;
+                indexBoundHighCount++;
+                return indexBoundHighBuffer;
+            case BoundGT:
+                equalScan = false;
+                indexBoundHighStrict = true;
+                indexBoundHighCount++;
+                return indexBoundHighBuffer;
+            case BoundLE:
+                equalScan = false;
+                indexBoundLowCount++;
+                return indexBoundLowBuffer;
+            case BoundLT:
+                equalScan = false;
+                indexBoundLowStrict = true;
+                indexBoundLowCount++;
+                return indexBoundLowBuffer;
+            default:
+                throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+        }
+    }
+
+    /** Create an ndb index bound for the current bounds and clear the current bounds
+     * 
+     */
+    private NdbIndexScanOperation.IndexBound getNdbIndexBound() {
+        ByteBuffer reclaimed = null;
+        if (indexBoundLowCount + indexBoundHighCount > 0) {
+            if (indexBoundLowCount == 0) {
+                indexBoundLowBuffer =  null;
+            } else {
+                indexBoundLowBuffer.limit(keyBufferSize);
+                indexBoundLowBuffer.position(0);
+            }
+            if (indexBoundHighCount == 0) {
+                indexBoundHighBuffer =  null;
+            } else {
+                indexBoundHighBuffer.limit(keyBufferSize);
+                indexBoundHighBuffer.position(0);
+            }
+            if (equalScan) {
+                reclaimed = indexBoundLowBuffer;
+                indexBoundLowBuffer = indexBoundHighBuffer;
+            }
+            // set the index bound
+            NdbIndexScanOperation.IndexBound ndbindexBound = NdbIndexScanOperation.IndexBound.create();
+            ndbindexBound.low_key(indexBoundLowBuffer);
+            ndbindexBound.high_key(indexBoundHighBuffer);
+            ndbindexBound.low_key_count(indexBoundLowCount);
+            ndbindexBound.high_key_count(indexBoundHighCount);
+            ndbindexBound.low_inclusive(!indexBoundLowStrict);
+            ndbindexBound.high_inclusive(!indexBoundHighStrict);
+            ndbindexBound.range_no(indexBoundRange);
+            if (logger.isDetailEnabled()) logger.detail(
+                    " indexBoundLowCount: " + indexBoundLowCount + " indexBoundHighCount: " + indexBoundHighCount +
+                    " indexBoundLowStrict: " + indexBoundLowStrict + " indexBoundHighStrict: " + indexBoundHighStrict +
+                    " range: " + indexBoundRange
+                    );
+            // reset the index bound for the next range
+            // if equal bound, initialize and reuse previous buffer
+            if (reclaimed != null) {
+                indexBoundLowBuffer = reclaimed;
+                ndbRecordKeys.initializeBuffer(reclaimed);
+            } else {
+                indexBoundLowBuffer = ndbRecordKeys.newBuffer();
+            }
+            indexBoundHighBuffer = ndbRecordKeys.newBuffer();
+            indexBoundLowCount = 0;
+            indexBoundHighCount = 0;
+            indexBoundLowStrict = false;
+            indexBoundHighStrict = false;
+            indexBoundRange = 0;
+            equalScan = true;
+            return ndbindexBound;
+        } else {
+            return null;
+        }
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -23,20 +23,11 @@ public class NdbRecordInsertOperationImp
 
     public NdbRecordInsertOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
         super(clusterTransaction, storeTable);
-        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
         this.ndbRecordKeys = ndbRecordValues;
-        this.valueBufferSize = ndbRecordValues.getBufferSize();
-        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
-        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        this.keyBuffer = valueBuffer;
         resetMask();
     }
 
-    public void beginDefinition() {
-        // allocate a buffer for the operation data
-        valueBuffer = ndbRecordValues.newBuffer();
-        keyBuffer = valueBuffer;
-    }
-
     public void endDefinition() {
         ndbOperation = insert(clusterTransaction);
     }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -17,101 +17,31 @@
 
 package com.mysql.clusterj.tie;
 
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import com.mysql.clusterj.core.store.Column;
-import com.mysql.clusterj.core.store.ResultData;
 import com.mysql.clusterj.core.store.Table;
 
 public class NdbRecordKeyOperationImpl extends NdbRecordOperationImpl {
 
     public NdbRecordKeyOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
         super(clusterTransaction, storeTable);
-        this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeTable);
-        this.keyBufferSize = ndbRecordKeys.getBufferSize();
-        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
-        this.valueBufferSize = ndbRecordValues.getBufferSize();
-        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
-        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
-        resetMask();
-    }
-
-    public void beginDefinition() {
-        // allocate a buffer for the key data
-        keyBuffer = ByteBuffer.allocateDirect(keyBufferSize);
-        keyBuffer.order(ByteOrder.nativeOrder());
-        // allocate a buffer for the value result data
-        // TODO: we should not need another buffer
-        valueBuffer = ByteBuffer.allocateDirect(valueBufferSize);
-        valueBuffer.order(ByteOrder.nativeOrder());
-    }
-
-    /** Specify the columns to be used for the operation.
-     */
-    public void getValue(Column storeColumn) {
-        int columnId = storeColumn.getColumnId();
-        columnSet(columnId);
-    }
-
-    /**
-     * Mark this blob column to be read.
-     * @param storeColumn the store column
-     */
-    @Override
-    public void getBlob(Column storeColumn) {
-        // create an NdbRecordBlobImpl for the blob
-        int columnId = storeColumn.getColumnId();
-        columnSet(columnId);
-        NdbRecordBlobImpl blob = new NdbRecordBlobImpl(this, storeColumn);
-        blobs[columnId] = blob;
+        this.ndbRecordKeys = this.ndbRecordValues;
+        this.keyBufferSize = this.valueBufferSize;
+        this.keyBuffer = valueBuffer;
     }
 
     public void endDefinition() {
-        // position the key buffer at the beginning for ndbjtie
-        keyBuffer.position(0);
-        keyBuffer.limit(keyBufferSize);
         // position the value buffer at the beginning for ndbjtie
         valueBuffer.position(0);
         valueBuffer.limit(valueBufferSize);
         // create the key operation
         ndbOperation = clusterTransaction.readTuple(ndbRecordKeys.getNdbRecord(), keyBuffer,
                 ndbRecordValues.getNdbRecord(), valueBuffer, mask, null);
-        // set up a callback when this operation is executed
-        clusterTransaction.postExecuteCallback(new Runnable() {
-            public void run() {
-                for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
-                    NdbRecordBlobImpl blob = blobs[columnId];
-                    if (blob != null) {
-                        blob.setNdbBlob();
-                    }
-                }
-            }
-        });
-    }
-
-    /** Construct a new ResultData using the saved column data and then execute the operation.
-     */
-    @Override
-    public ResultData resultData() {
-        return resultData(true);
-    }
-
-    /** Construct a new ResultData and if requested, execute the operation.
-     */
-    @Override
-    public ResultData resultData(boolean execute) {
-        NdbRecordResultDataImpl result =
-            new NdbRecordResultDataImpl(this);
-        if (execute) {
-            clusterTransaction.executeNoCommit(false, true);
-        }
-        return result;
+        // set the NdbBlob for all active blob columns
+        activateBlobs();
     }
 
     @Override
     public String toString() {
-        return " key " + tableName;
+        return " primary key " + tableName;
     }
 
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java	2012-04-05 05:45:15 +0000
@@ -45,6 +45,8 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
 
 /**
  * Implementation of store operation that uses NdbRecord.
+ * Operations of the "equal" variety delegate to the key NdbRecordImpl.
+ * Operations of the "set" and "get" varieties delegate to the value NdbRecordImpl.
  */
 public class NdbRecordOperationImpl implements Operation {
 
@@ -68,7 +70,11 @@ public class NdbRecordOperationImpl impl
     /** The NdbRecord for values */
     protected NdbRecordImpl ndbRecordValues = null;
 
-    /** The mask for this operation, which contains a bit set for each column accessed */
+    /** The mask for this operation, which contains a bit set for each column referenced.
+     * For insert, this contains a bit for each column to be inserted.
+     * For update, this contains a bit for each column to be updated.
+     * For read/scan operations, this contains a bit for each column to be read.
+     */
     byte[] mask;
 
     /** The ByteBuffer containing keys */
@@ -89,27 +95,32 @@ public class NdbRecordOperationImpl impl
     /** The size of the value buffer for this operation */
     protected int valueBufferSize;
 
-    /** The size of the null indicator byte array */
-    protected int nullIndicatorSize;
-
     /** The buffer manager for string encode and decode */
     protected BufferManager bufferManager;
 
     /** The table name */
     protected String tableName;
 
+    /** The store table */
+    protected Table storeTable;
+
     /** The store columns. */
     protected Column[] storeColumns;
 
     /** The number of columns */
     int numberOfColumns;
 
-    /** Constructor used for smart value handler.
+    /** Constructor used for smart value handler for new instances,
+     * and the cluster transaction is not yet known. There is only one
+     * NdbRecord and one buffer, so all operations result in using
+     * the same buffer.
      * 
      * @param clusterConnection the cluster connection
+     * @param db the Db
      * @param storeTable the store table
      */
     public NdbRecordOperationImpl(ClusterConnectionImpl clusterConnection, Db db, Table storeTable) {
+        this.storeTable = storeTable;
         this.tableName = storeTable.getName();
         this.ndbRecordValues = clusterConnection.getCachedNdbRecordImpl(storeTable);
         this.ndbRecordKeys = ndbRecordValues;
@@ -124,18 +135,41 @@ public class NdbRecordOperationImpl impl
         resetMask();
     }
 
-    protected void resetMask() {
-        this.mask = new byte[1 + (numberOfColumns/8)];
-    }
-
-    /** Constructor used for insert and delete operations that do not need to read data.
+    /** Constructor used when the transaction is known.
      * 
      * @param clusterTransaction the cluster transaction
      */
     public NdbRecordOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
-        this.tableName = storeTable.getName();
         this.clusterTransaction = clusterTransaction;
         this.bufferManager = clusterTransaction.getBufferManager();
+        this.tableName = storeTable.getName();
+        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+        this.valueBufferSize = ndbRecordValues.getBufferSize();
+        this.valueBuffer = ndbRecordValues.newBuffer();
+        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
+        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        resetMask();
+    }
+
+    /** Constructor used to copy an existing NdbRecordOperationImpl for use with a SmartValueHandler.
+     * The value buffer is copied and cannot be used by the existing NdbRecordOperationImpl.
+     * 
+     * @param ndbRecordOperationImpl2 the existing NdbRecordOperationImpl with value buffer
+     */
+    public NdbRecordOperationImpl(NdbRecordOperationImpl ndbRecordOperationImpl2) {
+        this.ndbRecordValues = ndbRecordOperationImpl2.ndbRecordValues;
+        this.valueBufferSize = ndbRecordOperationImpl2.valueBufferSize;
+        this.ndbRecordKeys = ndbRecordValues;
+        this.keyBufferSize = ndbRecordKeys.bufferSize;
+        this.valueBuffer = ndbRecordOperationImpl2.valueBuffer;
+        this.keyBuffer = this.valueBuffer;
+        this.bufferManager = ndbRecordOperationImpl2.bufferManager;
+        this.tableName = ndbRecordOperationImpl2.tableName;
+        this.storeColumns = ndbRecordOperationImpl2.ndbRecordValues.storeColumns;
+        this.numberOfColumns = this.storeColumns.length;
+        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        this.activeBlobs = ndbRecordOperationImpl2.activeBlobs;
+        resetMask();
     }
 
     public NdbOperationConst insert(ClusterTransactionImpl clusterTransactionImpl) {
@@ -206,6 +240,20 @@ public class NdbRecordOperationImpl impl
         }
     }
 
+    protected void resetMask() {
+        this.mask = new byte[1 + (numberOfColumns/8)];
+    }
+
+    public void allocateValueBuffer() {
+        this.valueBuffer = ndbRecordValues.newBuffer();
+    }
+
+    protected void activateBlobs() {
+        for (NdbRecordBlobImpl blob: activeBlobs) {
+            blob.setNdbBlob();
+        }
+    }
+
     public void equalBigInteger(Column storeColumn, BigInteger value) {
         int columnId = ndbRecordKeys.setBigInteger(keyBuffer, storeColumn, value);
         columnSet(columnId);
@@ -262,8 +310,7 @@ public class NdbRecordOperationImpl impl
     }
 
     public void getBlob(Column storeColumn) {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.getBlob(Column)"));
+        getBlobHandle(storeColumn);
     }
 
     /**
@@ -285,11 +332,10 @@ public class NdbRecordOperationImpl impl
     }
 
     /** Specify the columns to be used for the operation.
-     * This is implemented by a subclass.
      */
     public void getValue(Column storeColumn) {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.getValue(Column)"));
+        int columnId = storeColumn.getColumnId();
+        columnSet(columnId);
     }
 
     public void postExecuteCallback(Runnable callback) {
@@ -297,19 +343,20 @@ public class NdbRecordOperationImpl impl
     }
 
     /** Construct a new ResultData using the saved column data and then execute the operation.
-     * This is implemented by a subclass.
      */
     public ResultData resultData() {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.resultData()"));
+        return resultData(true);
     }
 
     /** Construct a new ResultData and if requested, execute the operation.
-     * This is implemented by a subclass.
      */
     public ResultData resultData(boolean execute) {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-                "NdbRecordOperationImpl.resultData(boolean)"));
+        NdbRecordResultDataImpl result =
+            new NdbRecordResultDataImpl(this);
+        if (execute) {
+            clusterTransaction.executeNoCommit(false, true);
+        }
+        return result;
     }
 
     public void setBigInteger(Column storeColumn, BigInteger value) {
@@ -594,10 +641,6 @@ public class NdbRecordOperationImpl impl
         return ndbRecordValues.getLong(valueBuffer, columnId);
     }
 
-    public long getLong(Column storeColumn) {
-        return getLong(storeColumn.getColumnId());
-     }
-
     public float getFloat(int columnId) {
         return ndbRecordValues.getFloat(valueBuffer, columnId);
     }
@@ -718,19 +761,21 @@ public class NdbRecordOperationImpl impl
     }
 
     public void beginDefinition() {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-        "NdbRecordResultDataImpl.beginDefinition()"));
+        // by default, nothing to do
     }
 
     public void endDefinition() {
-        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
-        "NdbRecordResultDataImpl.endDefinition()"));
+        // by default, nothing to do
     }
 
     public String dumpValues() {
         return ndbRecordValues.dumpValues(valueBuffer, mask);
     }
 
+    public String dumpKeys() {
+        return ndbRecordKeys.dumpValues(keyBuffer, null);
+    }
+
     public boolean isModified(int columnId) {
         return ndbRecordValues.isPresent(mask, columnId);
     }
@@ -781,4 +826,16 @@ public class NdbRecordOperationImpl impl
         }
     }
 
+    /** Transform this NdbRecordOperationImpl into one that can be used to back a SmartValueHandler.
+     * For instances that are used in primary key or unique key operations, the same instance is used.
+     * Scans are handled by a subclass that overrides this method.
+     * 
+     * @return this NdbRecordOperationImpl
+     */
+    public NdbRecordOperationImpl transformNdbRecordOperationImpl() {
+        this.keyBuffer = valueBuffer;
+        resetModified();
+        return this;
+    }
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java	2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java	2012-04-02 20:43:14 +0000
@@ -31,7 +31,7 @@ import com.mysql.clusterj.core.util.Logg
 import com.mysql.clusterj.core.util.LoggerFactoryService;
 
 /**
- *
+ * Handle the results of an operation using NdbRecord. 
  */
 class NdbRecordResultDataImpl implements ResultData {
 
@@ -43,28 +43,21 @@ class NdbRecordResultDataImpl implements
     static final Logger logger = LoggerFactoryService.getFactory()
             .getInstance(NdbRecordResultDataImpl.class);
 
-    /** Flags for iterating a scan */
-    protected final int RESULT_READY = 0;
-    protected final int SCAN_FINISHED = 1;
-    protected final int CACHE_EMPTY = 2;
-
     /** The NdbOperation that defines the result */
-    private NdbRecordOperationImpl operation = null;
+    protected NdbRecordOperationImpl operation = null;
 
     /** The flag indicating that there are no more results */
     private boolean nextDone;
 
-    /** Construct the ResultDataImpl based on an NdbRecordOperationImpl, and the 
-     * buffer manager to help with string columns.
+    /** Construct the ResultDataImpl based on an NdbRecordOperationImpl.
      * @param operation the NdbRecordOperationImpl
-     * @param bufferManager the buffer manager
      */
     public NdbRecordResultDataImpl(NdbRecordOperationImpl operation) {
         this.operation = operation;
     }
 
     public boolean next() {
-        // NdbOperation has exactly zero or one result. ScanResultDataImpl handles scans...
+        // NdbOperation has exactly zero or one result. NdbRecordScanResultDataImpl handles scans...
         // if the ndbOperation reports an error there is no result
         int errorCode = operation.errorCode();
         if (errorCode != 0) {
@@ -264,4 +257,12 @@ class NdbRecordResultDataImpl implements
         return null;
     }
 
+    /** Return an operation that can be used by SmartValueHandler.
+     * The operation contains the buffer with the row data from the operation.
+     * @return the operation
+     */
+    public NdbRecordOperationImpl transformOperation() {
+        return operation.transformNdbRecordOperationImpl();
+    }
+
 }

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,223 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.spi.QueryExecutionContext;
+import com.mysql.clusterj.core.store.ResultData;
+import com.mysql.clusterj.core.store.ScanFilter;
+import com.mysql.clusterj.core.store.ScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+import com.mysql.ndbjtie.ndbapi.NdbInterpretedCode;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
+import com.mysql.ndbjtie.ndbapi.NdbScanFilter;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst.Type;
+
+/** NdbRecordScanOperationImpl performs table and index scans using NdbRecord.
+ * The scans are set up via subclasses. After executing, the NdbRecordScanOperationImpl instance
+ * is owned and iterated (scanned) by NdbRecordScanResultDataImpl.
+ */
+public abstract class NdbRecordScanOperationImpl extends NdbRecordOperationImpl implements ScanOperation {
+
+    /** The ndb scan options */
+    ScanOptions scanOptions = null;
+
+    /** The ndb scan filter */
+    NdbScanFilter ndbScanFilter = null;
+
+    /** The ndb interpreted code used for filters */
+    NdbInterpretedCode ndbInterpretedCode = null;
+
+    /** Is this scan multi-range? */
+    protected boolean multiRange = false;
+
+    /** The lock mode for this operation */
+    int lockMode;
+
+    public NdbRecordScanOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable,
+            int lockMode) {
+        super(clusterTransaction, storeTable);
+        this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+        this.keyBufferSize = ndbRecordKeys.getBufferSize();
+        this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+        this.valueBufferSize = ndbRecordValues.getBufferSize();
+        this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
+        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        this.lockMode = lockMode;
+        resetMask();
+    }
+
+    /** Construct a new ResultData and if requested, execute the operation.
+     */
+    @Override
+    public ResultData resultData(boolean execute) {
+        NdbRecordResultDataImpl result =
+            new NdbRecordScanResultDataImpl(this);
+        if (execute) {
+            clusterTransaction.executeNoCommit(false, true);
+        }
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return " scan " + tableName;
+    }
+
+    /** Deallocate resources used in by this scan after the scan is complete.
+     * 
+     */
+    public void close() {
+        if (ndbInterpretedCode != null) {
+            NdbInterpretedCode.delete(ndbInterpretedCode);
+        }
+        if (ndbScanFilter != null) {
+            NdbScanFilter.delete(ndbScanFilter);
+        }
+        if (scanOptions != null) {
+            ScanOptions.delete(scanOptions);
+        }
+        ((NdbScanOperation)ndbOperation).close(true, true);
+    }
+
+    public void deleteCurrentTuple() {
+        int returnCode = ((NdbScanOperation)ndbOperation).deleteCurrentTuple();
+        handleError(returnCode, ndbOperation);
+    }
+
+    /** Create scan options for this scan. 
+     * Scan options are used to set a filter into the NdbScanOperation,
+     * set the key info flag if using a lock mode that requires lock takeover, and set the multi range flag.
+     */
+    protected void getScanOptions() {
+        long options = 0L;
+        int flags = 0;
+        if (multiRange | (ndbScanFilter != null) | 
+                (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead)) {
+
+            scanOptions = ScanOptions.create();
+            if (multiRange) {
+                flags |= ScanFlag.SF_MultiRange;
+                options |= (long)Type.SO_SCANFLAGS;
+                scanOptions.scan_flags(flags);
+            }
+            if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+                flags |= ScanFlag.SF_KeyInfo;
+                options |= (long)Type.SO_SCANFLAGS;
+                scanOptions.scan_flags(flags);
+            }
+            if (ndbScanFilter != null) {
+                options |= (long)Type.SO_INTERPRETED;
+                scanOptions.interpretedCode(ndbScanFilter.getInterpretedCode());
+            }
+            
+            scanOptions.optionsPresent(options);
+        }
+        if (logger.isDebugEnabled()) logger.debug("ScanOptions: " + dumpScanOptions(options, flags));
+    }
+
+    protected String dumpScanOptions(long optionsPresent, int flags) {
+        StringBuilder builder = new StringBuilder();
+        if (0L != (optionsPresent & (long)Type.SO_BATCH)) builder.append("SO_BATCH ");
+        if (0L != (optionsPresent & (long)Type.SO_GETVALUE)) builder.append("SO_GETVALUE ");
+        if (0L != (optionsPresent & (long)Type.SO_PARALLEL)) builder.append("SO_PARALLEL ");
+        if (0L != (optionsPresent & (long)Type.SO_CUSTOMDATA)) builder.append("SO_CUSTOMDATA ");
+        if (0L != (optionsPresent & (long)Type.SO_INTERPRETED)) builder.append("SO_INTERPRETED ");
+        if (0L != (optionsPresent & (long)Type.SO_PARTITION_ID)) builder.append("SO_PARTITION_ID ");
+        if (0L != (optionsPresent & (long)Type.SO_SCANFLAGS)) {
+            builder.append("SO_SCANFLAGS(");
+            if (0 != (flags & ScanFlag.SF_KeyInfo)) builder.append("SF_KeyInfo ");
+            if (0 != (flags & ScanFlag.SF_Descending)) builder.append("SF_Descending ");
+            if (0 != (flags & ScanFlag.SF_DiskScan)) builder.append("SF_DiskScan ");
+            if (0 != (flags & ScanFlag.SF_MultiRange)) builder.append("SF_MultiRange ");
+            if (0 != (flags & ScanFlag.SF_OrderBy)) builder.append("SF_OrderBy ");
+            if (0 != (flags & ScanFlag.SF_ReadRangeNo)) builder.append("SF_ReadRangeNo ");
+            if (0 != (flags & ScanFlag.SF_TupScan)) builder.append("SF_TupScan ");
+            builder.append(")");
+        }
+        return builder.toString();
+    }
+
+    /** Create a scan filter for this scan.
+     * @param context the query execution context
+     * @return the ScanFilter to build the filter for the scan
+     */
+    public ScanFilter getScanFilter(QueryExecutionContext context) {
+        
+        ndbInterpretedCode = NdbInterpretedCode.create(ndbRecordValues.getNdbTable(), null, 0);
+        ndbScanFilter = NdbScanFilter.create(ndbInterpretedCode);
+        handleError(ndbScanFilter, ndbOperation);
+        ScanFilter scanFilter = new ScanFilterImpl(ndbScanFilter);
+        context.addFilter(scanFilter);
+        return scanFilter;
+    }
+
+    /** Get the next result from the scan.
+     * Only used for deletePersistentAll to scan the table and delete all rows.
+     */
+    public int nextResult(boolean fetch) {
+        int result = ((NdbScanOperation)ndbOperation).nextResult(fetch, false);
+        clusterTransaction.handleError(result);
+        return result;
+    }
+
+    /** Get the next result from the scan. Copy the data into a newly allocated result buffer.
+     * 
+     */
+    public int nextResultCopyOut(boolean fetch, boolean force) {
+        allocateValueBuffer();
+        int result = ((NdbScanOperation)ndbOperation).nextResultCopyOut(valueBuffer, fetch, force);
+        return result;
+    }
+
+    /** Transfer the lock on the current tuple to the original transaction.
+     * This allows the original transaction to keep the results locked until
+     * the original transaction completes.
+     * Only transfer the lock if the lock mode is not committed read
+     * (there is no lock held for committed read).
+     */
+    public void lockCurrentTuple() {
+        if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+            NdbOperationConst op = ((NdbScanOperation)ndbOperation).lockCurrentTuple(
+                    clusterTransaction.ndbTransaction, ndbRecordValues.getNdbRecord(),
+                    null, null, null, 0);
+            if (op == null) {
+                Utility.throwError(op, ndbOperation.getNdbError());
+            }
+        }
+    }
+
+    /** Transform this NdbRecordOperationImpl into one that can be used to back a SmartValueHandler.
+     * For instances that are used in scans, create a new instance and allocate a new buffer
+     * to continue the scan.
+     * 
+     * @return the NdbRecordOperationImpl
+     */
+    @Override
+    public NdbRecordOperationImpl transformNdbRecordOperationImpl() {
+        NdbRecordOperationImpl result = new NdbRecordOperationImpl(this);
+        // we gave away our buffers; get new ones for the next result
+        this.valueBuffer = ndbRecordValues.newBuffer();
+        this.keyBuffer = valueBuffer;
+        return result;
+    }
+
+}

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,87 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+
+/**
+ *
+ */
+class NdbRecordScanResultDataImpl extends NdbRecordResultDataImpl {
+
+    /** My message translator */
+    static final I18NHelper local = I18NHelper
+            .getInstance(NdbRecordScanResultDataImpl.class);
+
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance(NdbRecordScanResultDataImpl.class);
+
+    /** Flags for iterating a scan */
+    protected final int RESULT_READY = 0;
+    protected final int SCAN_FINISHED = 1;
+    protected final int CACHE_EMPTY = 2;
+
+    /** The NdbOperation that defines the result */
+    private NdbRecordScanOperationImpl scanOperation = null;
+
+    /** The NdbScanOperation */
+    private NdbScanOperation ndbScanOperation = null;
+
+    /** Construct the ResultDataImpl based on an NdbRecordOperationImpl.
+     * When used with the compatibility operations, delegate to the NdbRecordOperation
+     * to copy data.
+     * @param operation the NdbRecordOperationImpl
+     */
+    public NdbRecordScanResultDataImpl(NdbRecordScanOperationImpl scanOperation) {
+        super(scanOperation);
+        this.scanOperation = scanOperation;
+        this.ndbScanOperation = (NdbScanOperation)scanOperation.ndbOperation;
+    }
+
+    @Override
+    public boolean next() {
+        // NdbScanOperation may have many results.
+        boolean done = false;
+        boolean fetch = false;
+        boolean force = true; // always true for scans
+        while (!done) {
+            int result = scanOperation.nextResultCopyOut(fetch, force);
+            switch (result) {
+                case RESULT_READY:
+                    // if scanning with locks, grab the lock for the current transaction
+                    scanOperation.lockCurrentTuple();
+                    return true;
+                case SCAN_FINISHED:
+                    ndbScanOperation.close(true, true);
+                    return false;
+                case CACHE_EMPTY:
+                    fetch = true;
+                    break;
+                default:
+                    Utility.throwError(result, ndbScanOperation.getNdbError());
+            }
+        }
+        return true; // this statement is needed to make the compiler happy but it's never executed
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java	2012-04-02 20:43:14 +0000
@@ -21,6 +21,7 @@ import com.mysql.clusterj.core.metadata.
 import com.mysql.clusterj.core.spi.ValueHandler;
 import com.mysql.clusterj.core.spi.ValueHandlerFactory;
 import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.ResultData;
 
 public class NdbRecordSmartValueHandlerFactoryImpl implements ValueHandlerFactory {
 
@@ -35,4 +36,11 @@ public class NdbRecordSmartValueHandlerF
         return result;
     }
 
+    public <T> ValueHandler getValueHandler(
+            DomainTypeHandlerImpl<T> domainTypeHandler, Db db, ResultData resultData) {
+        NdbRecordSmartValueHandlerImpl result;
+        result = new NdbRecordSmartValueHandlerImpl(domainTypeHandler, db, resultData);
+        return result;
+    }
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java	2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java	2012-04-08 20:50:07 +0000
@@ -38,12 +38,11 @@ import com.mysql.clusterj.core.metadata.
 import com.mysql.clusterj.core.spi.DomainFieldHandler;
 import com.mysql.clusterj.core.spi.DomainTypeHandler;
 import com.mysql.clusterj.core.spi.SmartValueHandler;
-import com.mysql.clusterj.core.spi.ValueHandler;
 
 import com.mysql.clusterj.core.store.ClusterTransaction;
 import com.mysql.clusterj.core.store.Db;
 import com.mysql.clusterj.core.store.Operation;
-import com.mysql.clusterj.core.store.Table;
+import com.mysql.clusterj.core.store.ResultData;
 
 import com.mysql.clusterj.core.util.I18NHelper;
 import com.mysql.clusterj.core.util.Logger;
@@ -94,14 +93,10 @@ public class NdbRecordSmartValueHandlerI
 
     private Object proxy;
 
-    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db) {
+    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler) {
         this.domainTypeHandler = domainTypeHandler;
         this.domainFieldHandlers = domainTypeHandler.getFieldHandlers();
         fieldNumberToColumnNumberMap = domainTypeHandler.getFieldNumberToColumnNumberMap();
-
-        Table storeTable = domainTypeHandler.getStoreTable();
-        this.operation = ((DbImpl)db).newNdbRecordOperationImpl(storeTable);
-
         numberOfTransientFields = domainTypeHandler.getNumberOfTransientFields();
         transientModified = new boolean[numberOfTransientFields];
         if (numberOfTransientFields != 0) {
@@ -109,6 +104,16 @@ public class NdbRecordSmartValueHandlerI
         }
     }
 
+    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db) {
+        this(domainTypeHandler);
+        this.operation = ((DbImpl)db).newNdbRecordOperationImpl(domainTypeHandler.getStoreTable());
+    }
+
+    public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db, ResultData resultData) {
+        this(domainTypeHandler);
+        this.operation = ((NdbRecordResultDataImpl)resultData).transformOperation();
+    }
+
     public Operation insert(ClusterTransaction clusterTransaction) {
         if (logger.isDetailEnabled()) logger.detail("smart insert for type: " + domainTypeHandler.getName()
                 + "\n" + operation.dumpValues());
@@ -199,8 +204,8 @@ public class NdbRecordSmartValueHandlerI
     }
 
     public boolean[] getBooleans(int fieldNumber) {
-        // TODO Auto-generated method stub
-        return null;
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
+                "NdbRecordSmartValueHandler.getBooleans(int)"));
     }
 
     public byte getByte(int fieldNumber) {
@@ -445,8 +450,8 @@ public class NdbRecordSmartValueHandlerI
     }
 
     public void setBooleans(int fieldNumber, boolean[] b) {
-        // TODO Auto-generated method stub
-        
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
+                "NdbRecordSmartValueHandler.setBooleans(int, boolean[])"));
     }
 
     public void setByte(int fieldNumber, byte value) {
@@ -686,8 +691,10 @@ public class NdbRecordSmartValueHandlerI
         int columnId = fieldNumberToColumnNumberMap[fieldNumber];
         if (columnId < 0) {
             transientValues[-1 - columnId] = value;
+            transientModified[-1 - columnId] = true;
+        } else {
+            domainFieldHandlers[fieldNumber].objectSetValue(value, this);
         }
-        domainFieldHandlers[fieldNumber].objectSetValue(value, this);
     }
 
     public Object invoke(Object proxy, Method method, Object[] args)

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,40 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.ScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+/** NdbRecordTableScanOperationImpl performs table scans using NdbRecord.
+ * Most methods are implemented in the superclass.
+ */
+public class NdbRecordTableScanOperationImpl extends NdbRecordScanOperationImpl implements ScanOperation {
+
+    public NdbRecordTableScanOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable,
+            int lockMode) {
+        super(clusterTransaction, storeTable, lockMode);
+    }
+
+    public void endDefinition() {
+        // get the scan options which also sets the filter
+        getScanOptions();
+        // create the ndb scan operation
+        ndbOperation = clusterTransaction.scanTable(ndbRecordValues.getNdbRecord(), mask, scanOptions);
+    }
+
+}

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java	2012-04-02 20:43:14 +0000
@@ -0,0 +1,53 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.Index;
+import com.mysql.clusterj.core.store.IndexOperation;
+import com.mysql.clusterj.core.store.Table;
+
+public class NdbRecordUniqueKeyOperationImpl extends NdbRecordOperationImpl implements IndexOperation {
+
+    public NdbRecordUniqueKeyOperationImpl(ClusterTransactionImpl clusterTransaction, Index storeIndex, Table storeTable) {
+        super(clusterTransaction, storeTable);
+        this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeIndex, storeTable);
+        this.keyBufferSize = ndbRecordKeys.getBufferSize();
+        // allocate a buffer for the key data
+        keyBuffer = ndbRecordKeys.newBuffer();
+    }
+
+    public void endDefinition() {
+        // position the key buffer at the beginning for ndbjtie
+        keyBuffer.limit(keyBufferSize);
+        keyBuffer.position(0);
+        // position the value buffer at the beginning for ndbjtie
+        valueBuffer.limit(valueBufferSize);
+        valueBuffer.position(0);
+        // create the key operation
+        ndbOperation = clusterTransaction.readTuple(ndbRecordKeys.getNdbRecord(), keyBuffer,
+                ndbRecordValues.getNdbRecord(), valueBuffer, mask, null);
+        // set the NdbBlob for all active blob columns
+        activateBlobs();
+    }
+
+    @Override
+    public String toString() {
+        return " unique key " + tableName;
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java	2012-04-05 05:45:15 +0000
@@ -128,12 +128,6 @@ class ResultDataImpl implements ResultDa
         }
     }
 
-    private NdbRecAttr getValue(NdbOperation ndbOperation2, int columnId,
-            ByteBuffer byteBuffer2) {
-        // TODO: to help profiling
-        return ndbOperation2.getValue(columnId, byteBuffer2);
-    }
-
     public boolean next() {
         // NdbOperation has exactly zero or one result. ScanResultDataImpl handles scans...
         NdbErrorConst error = ndbOperation.getNdbError();
@@ -215,7 +209,7 @@ class ResultDataImpl implements ResultDa
     public long getLong(Column storeColumn) {
         int index = storeColumn.getColumnId();
         NdbRecAttr ndbRecAttr = ndbRecAttrs[index];
-        return Utility.getLong(storeColumn, ndbRecAttr);
+        return Utility.getLong(storeColumn, ndbRecAttr.int64_value());
      }
 
     public float getFloat(int column) {
@@ -377,7 +371,7 @@ class ResultDataImpl implements ResultDa
     public Long getObjectLong(Column storeColumn) {
         int index = storeColumn.getColumnId();
         NdbRecAttr ndbRecAttr = ndbRecAttrs[index];
-        return (ndbRecAttr.isNULL() == 1)?null:Utility.getLong(storeColumn, ndbRecAttr);
+        return (ndbRecAttr.isNULL() == 1)?null:Utility.getLong(storeColumn, ndbRecAttr.int64_value());
     }
 
     public Float getObjectFloat(int column) {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java	2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java	2012-04-02 20:43:14 +0000
@@ -27,6 +27,7 @@ import com.mysql.clusterj.core.util.Logg
 import com.mysql.clusterj.tie.DbImpl.BufferManager;
 
 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode;
 
 /**
  *
@@ -67,6 +68,9 @@ class ScanResultDataImpl extends ResultD
             int result = ndbScanOperation.nextResult(fetch, force);
             switch (result) {
                 case RESULT_READY:
+                    if (ndbScanOperation.getLockMode() != LockMode.LM_CommittedRead) { 
+                        ndbScanOperation.lockCurrentTuple();
+                    }
                     return true;
                 case SCAN_FINISHED:
                     ndbScanOperation.close(true, true);

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java	2012-03-29 00:25:53 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java	2012-04-05 06:37:40 +0000
@@ -355,11 +355,23 @@ public class Utility {
                 case Timestamp:
                     return (value >> 32) * 1000L;
                 case Date:
-                    // the unsigned value is stored in the top 3 bytes
-                    return unpackDate((int)(value >>> 40));
+                    // the three high order bytes are the little endian representation
+                    // the original is zzyyxx0000000000 and the result is 0000000000xxyyzz
+                    long packedDate = 0L;
+                    packedDate |= (value & ffoooooooooooooo) >>> 56;
+                    packedDate |= (value & ooffoooooooooooo) >>> 40;
+                    // the xx byte is signed, so shift left 16 and arithmetic shift right 40
+                    packedDate |= ((value & ooooffoooooooooo) << 16) >> 40;
+                    return unpackDate((int)packedDate);
                 case Time:
-                    // the signed value is stored in the top 3 bytes
-                    return unpackTime((int)(value >> 40));
+                    // the three high order bytes are the little endian representation
+                    // the original is zzyyxx0000000000 and the result is 0000000000xxyyzz
+                    long packedTime = 0L;
+                    packedTime |= (value & ffoooooooooooooo) >>> 56;
+                    packedTime |= (value & ooffoooooooooooo) >>> 40;
+                    // the xx byte is signed, so shift left 16 and arithmetic shift right 40
+                    packedTime |= ((value & ooooffoooooooooo) << 16) >> 40;
+                    return unpackTime((int)packedTime);
                 default:
                     throw new ClusterJUserException(
                             local.message("ERR_Unsupported_Mapping", storeColumn.getType(), "long"));
@@ -685,9 +697,9 @@ public class Utility {
                 case Timestamp:
                     return value * 1000L;
                 case Date:
-                    return unpackDate((int)value);
+                    return unpackDate((int)(value));
                 case Time:
-                    return unpackTime((int)value);
+                    return unpackTime((int)(value));
                 default:
                     throw new ClusterJUserException(
                             local.message("ERR_Unsupported_Mapping", storeColumn.getType(), "long"));
@@ -2020,10 +2032,12 @@ public class Utility {
      * @param ndbRecAttr the NdbRecAttr
      * @return the long
      */
-    public static long getLong(Column storeColumn, NdbRecAttr ndbRecAttr) {
-        return endianManager.getLong(storeColumn, ndbRecAttr);
-    }
 
+    /** Convert a long value from storage.
+     * The value stored in the database might be a time, timestamp, date, bit array,
+     * or simply a long value. The converted value can be converted into a 
+     * time, timestamp, date, bit array, or long value.
+     */
     public static long getLong(Column storeColumn, long value) {
         return endianManager.getLong(storeColumn, value);
     }

=== modified file 'storage/ndb/compile-cluster'
--- a/storage/ndb/compile-cluster	2012-03-15 08:23:12 +0000
+++ b/storage/ndb/compile-cluster	2012-04-10 11:40:39 +0000
@@ -85,7 +85,7 @@ my $cmake_version_id;
   my @args;
   
   # Hardcoded options controlling how to build MySQL Server
-  # push(@args, "-DWITH_SSL=bundled");
+  push(@args, "-DWITH_SSL=yes");
  
   if ($opt_debug)
   {

=== modified file 'storage/ndb/src/common/util/version.cpp'
--- a/storage/ndb/src/common/util/version.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/common/util/version.cpp	2012-04-11 09:56:27 +0000
@@ -96,6 +96,11 @@ struct NdbUpGradeCompatible {
 };
 
 struct NdbUpGradeCompatible ndbCompatibleTable_full[] = {
+  { MAKE_VERSION(7,3,NDB_VERSION_BUILD), MAKE_VERSION(7,3,0), UG_Range },
+  { MAKE_VERSION(7,3,NDB_VERSION_BUILD), MAKE_VERSION(7,2,0), UG_Range },
+  { MAKE_VERSION(7,3,NDB_VERSION_BUILD), MAKE_VERSION(7,1,0), UG_Range },
+  { MAKE_VERSION(7,3,NDB_VERSION_BUILD), MAKE_VERSION(7,0,0), UG_Range },
+
   { MAKE_VERSION(7,2,NDB_VERSION_BUILD), MAKE_VERSION(7,2,0), UG_Range },
   { MAKE_VERSION(7,2,NDB_VERSION_BUILD), MAKE_VERSION(7,1,0), UG_Range },
   { MAKE_VERSION(7,2,NDB_VERSION_BUILD), MAKE_VERSION(7,0,0), UG_Range },

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (magnus.blaudd:3880 to 3882) magnus.blaudd11 Apr