List:Commits« Previous MessageNext Message »
From:Craig L Russell Date:January 23 2012 12:47am
Subject:bzr push into mysql-5.1-telco-7.1 branch (Craig.Russell:4428 to 4429)
View as plain text  
 4429 Craig L Russell	2012-01-22
      Add NdbRecord support for clusterj.
      This implementation supports only insert.
      The original implementation using NdbOperation is still available by setting the variable
      com.mysql.clusterj.UseNdbRecord to a value other than "true" via -D option or environment.

    added:
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
    modified:
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java
      storage/ndb/clusterj/clusterj-tie/Makefile.am
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
      storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties
 4428 Craig L Russell	2012-01-20
      clean up after test

    modified:
      mysql-test/suite/ndb/t/clusterj.test
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2011-11-22 22:01:23 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
-   Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+   Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -392,8 +392,10 @@ public class SessionImpl implements Sess
             storeTable = domainTypeHandler.getStoreTable();
             op = clusterTransaction.getInsertOperation(storeTable);
             // set all values in the operation, keys first
+            op.beginDefinition();
             domainTypeHandler.operationSetKeys(valueHandler, op);
             domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
+            op.endDefinition();
             // reset modified bits in instance
             domainTypeHandler.objectResetModified(valueHandler);
         } catch (ClusterJUserException cjuex) {

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java	2011-02-03 14:37:50 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
-   Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+   Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -85,4 +85,8 @@ public interface Operation {
 
     public void setString(Column storeColumn, String string);
 
+    public void beginDefinition();
+
+    public void endDefinition();
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/Makefile.am'
--- a/storage/ndb/clusterj/clusterj-tie/Makefile.am	2011-12-14 19:26:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/Makefile.am	2012-01-23 00:44:39 +0000
@@ -1,4 +1,4 @@
-#   Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+#   Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
 #
 #   This program is free software; you can redistribute it and/or modify
 #   it under the terms of the GNU General Public License as published by
@@ -38,6 +38,9 @@ clusterj_tie_java= \
   $(clusterj_tie_src)/com/mysql/clusterj/tie/IndexImpl.java \
   $(clusterj_tie_src)/com/mysql/clusterj/tie/IndexOperationImpl.java \
   $(clusterj_tie_src)/com/mysql/clusterj/tie/IndexScanOperationImpl.java \
+  $(clusterj_tie_src)/com/mysql/clusterj/tie/NdbRecordBlobImpl.java \
+  $(clusterj_tie_src)/com/mysql/clusterj/tie/NdbRecordImpl.java \
+  $(clusterj_tie_src)/com/mysql/clusterj/tie/NdbRecordOperationImpl.java \
   $(clusterj_tie_src)/com/mysql/clusterj/tie/OperationImpl.java \
   $(clusterj_tie_src)/com/mysql/clusterj/tie/ResultDataImpl.java \
   $(clusterj_tie_src)/com/mysql/clusterj/tie/ScanFilterImpl.java \

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java	2011-06-30 16:04:23 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -42,7 +42,11 @@ class BlobImpl implements Blob {
     static final Logger logger = LoggerFactoryService.getFactory()
             .getInstance(BlobImpl.class);
 
-    private NdbBlob ndbBlob;
+    protected NdbBlob ndbBlob;
+
+    public BlobImpl() {
+        // this is only for NdbRecordBlobImpl constructor when there is no ndbBlob available yet
+    }
 
     public BlobImpl(NdbBlob blob) {
         this.ndbBlob = blob;

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	2011-12-18 11:47:59 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -20,13 +20,18 @@ package com.mysql.clusterj.tie;
 import java.util.IdentityHashMap;
 import java.util.Map;
 
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 import com.mysql.ndbjtie.ndbapi.Ndb;
 import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
 
 import com.mysql.clusterj.ClusterJDatastoreException;
 import com.mysql.clusterj.ClusterJFatalInternalException;
 
 import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.Table;
 
 import com.mysql.clusterj.core.util.I18NHelper;
 import com.mysql.clusterj.core.util.Logger;
@@ -57,6 +62,12 @@ public class ClusterConnectionImpl
     /** All dbs given out by this cluster connection */
     private Map<DbImpl, Object> dbs = new IdentityHashMap<DbImpl, Object>();
 
+    /** The map of table name to NdbRecordImpl */
+    private ConcurrentMap<String, NdbRecordImpl> ndbRecordImplMap = new ConcurrentHashMap<String, NdbRecordImpl>();
+
+    /** The dictionary used to create NdbRecords */
+    Dictionary dictionaryForNdbRecord = null;
+
     /** Connect to the MySQL Cluster
      * 
      * @param connectString the connect string
@@ -83,6 +94,14 @@ public class ClusterConnectionImpl
         synchronized(this) {
             ndb = Ndb.create(clusterConnection, database, "def");
             handleError(ndb, clusterConnection, connectString, nodeId);
+            if (dictionaryForNdbRecord == null) {
+                // create a dictionary for NdbRecord
+                Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def");
+                handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId);
+                DbImpl dbForNdbRecord = new DbImpl(this, ndbForNdbRecord, maxTransactions);
+                dbs.put(dbForNdbRecord, null);
+                dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary();
+            }
         }
         DbImpl result = new DbImpl(this, ndb, maxTransactions);
         dbs.put(result, null);
@@ -143,6 +162,16 @@ public class ClusterConnectionImpl
     public void close() {
         if (clusterConnection != null) {
             logger.info(local.message("INFO_Close_Cluster_Connection", connectString, nodeId));
+            for (NdbRecordImpl ndbRecord: ndbRecordImplMap.values()) {
+                ndbRecord.releaseNdbRecord();
+            }
+            ndbRecordImplMap.clear();
+            if (dbs.size() != 0) {
+                Map<DbImpl, Object> dbsToClose = new IdentityHashMap<DbImpl, Object>(dbs);
+                for (Db db: dbsToClose.keySet()) {
+                    db.close();
+                }
+            }
             Ndb_cluster_connection.delete(clusterConnection);
             clusterConnection = null;
         }
@@ -153,7 +182,43 @@ public class ClusterConnectionImpl
     }
 
     public int dbCount() {
-        return dbs.size();
+        // one of the dbs is for the NdbRecord dictionary if it is not null
+        int dbForNdbRecord = (dictionaryForNdbRecord == null)?0:1;
+        return dbs.size() - dbForNdbRecord;
+    }
+
+    /** 
+     * Get the cached NdbRecord implementation for this cluster connection.
+     * There are three possibilities:
+     * <ul><li>Case 1: return the already-cached NdbRecord
+     * </li><li>Case 2: return a new instance created by this method
+     * </li><li>Case 3: return the winner of a race with another thread
+     * </li></ul>
+     * @param storeTable the store table
+     * @param ndbDictionary the ndb dictionary
+     * @return the NdbRecordImpl
+     */
+    protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
+        String tableName = storeTable.getName();
+        // find the NdbRecordImpl in the global cache
+        NdbRecordImpl result = ndbRecordImplMap.get(tableName);
+        if (result != null) {
+            // case 1
+            return result;
+        } else {
+            NdbRecordImpl newNdbRecordImpl = new NdbRecordImpl(storeTable, dictionaryForNdbRecord);
+            NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(tableName, newNdbRecordImpl);
+            if (winner == null) {
+                // case 2: the previous value was null, so return the new (winning) value
+                if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + tableName);
+                return newNdbRecordImpl;
+            } else {
+                // case 3: another thread beat us, so return the winner and garbage collect ours
+                if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + tableName);
+                newNdbRecordImpl.releaseNdbRecord();
+                return winner;
+            }
+        }
     }
 
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java	2011-12-18 11:47:59 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -22,6 +22,10 @@ import com.mysql.clusterj.core.store.Clu
 import com.mysql.clusterj.core.util.I18NHelper;
 import com.mysql.clusterj.core.util.Logger;
 import com.mysql.clusterj.core.util.LoggerFactoryService;
+import com.mysql.ndbjtie.ndbapi.Ndb;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary;
+import com.mysql.ndbjtie.ndbapi.NdbOperation;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
 
 /**
  *
@@ -43,6 +47,18 @@ public class ClusterConnectionServiceImp
     /** Load the ndbclient system library only once */
     static boolean ndbclientLoaded = false;
 
+    /** Size of OperationOptions, needed for some ndb apis */
+    static int SIZEOF_OPERATION_OPTIONS;
+
+    /** Size of PartitionSpec, needed for some ndb apis */
+    static int SIZEOF_PARTITION_SPEC;
+
+    /** Size of RecordSpecification, needed for some ndb apis */
+    static int SIZEOF_RECORD_SPECIFICATION;
+
+    /** Size of ScanOptions, needed for some ndb apis */
+    static int SIZEOF_SCAN_OPTIONS;
+
     static protected void loadSystemLibrary(String name) {
         // this is not thread-protected so it might be executed multiple times but no matter
         if (ndbclientLoaded) {
@@ -52,6 +68,11 @@ public class ClusterConnectionServiceImp
             System.loadLibrary(name);
             // initialize the charset map
             Utility.getCharsetMap();
+            // get the size information for Ndb structs as needed by some ndb apis
+            SIZEOF_OPERATION_OPTIONS = NdbOperation.OperationOptions.size();
+            SIZEOF_PARTITION_SPEC = Ndb.PartitionSpec.size();
+            SIZEOF_RECORD_SPECIFICATION = NdbDictionary.RecordSpecification.size();
+            SIZEOF_SCAN_OPTIONS = NdbScanOperation.ScanOptions.size();
             ndbclientLoaded = true;
         } catch (Throwable e) {
             String path = getLoadLibraryPath();

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2011-12-05 22:07:02 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -17,6 +17,7 @@
 
 package com.mysql.clusterj.tie;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -42,11 +43,14 @@ import com.mysql.ndbjtie.ndbapi.NdbError
 import com.mysql.ndbjtie.ndbapi.NdbIndexOperation;
 import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
 import com.mysql.ndbjtie.ndbapi.NdbOperation;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
+import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
 import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
 import com.mysql.ndbjtie.ndbapi.NdbTransaction;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
 import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
+import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst;
 import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
 import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
 
@@ -63,9 +67,15 @@ class ClusterTransactionImpl implements 
     static final Logger logger = LoggerFactoryService.getFactory()
             .getInstance(ClusterTransactionImpl.class);
 
+    protected final static String USE_NDBRECORD_NAME = "com.mysql.clusterj.UseNdbRecord";
+    private static boolean USE_NDBRECORD = getUseNdbRecord();
+
     protected NdbTransaction ndbTransaction;
     private List<Runnable> postExecuteCallbacks = new ArrayList<Runnable>();
 
+    /** The cluster connection for this transaction */
+    protected ClusterConnectionImpl clusterConnectionImpl;
+
     /** The DbImpl associated with this NdbTransaction */
     protected DbImpl db;
 
@@ -104,8 +114,10 @@ class ClusterTransactionImpl implements 
 
     private BufferManager bufferManager;
 
-    public ClusterTransactionImpl(DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
+    public ClusterTransactionImpl(ClusterConnectionImpl clusterConnectionImpl,
+            DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
         this.db = db;
+        this.clusterConnectionImpl = clusterConnectionImpl;
         this.ndbDictionary = ndbDictionary;
         this.joinTransactionId = joinTransactionId;
         this.bufferManager = db.getBufferManager();
@@ -209,13 +221,16 @@ class ClusterTransactionImpl implements 
 
     public Operation getInsertOperation(Table storeTable) {
         enlist();
+        if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
+        if (USE_NDBRECORD) {
+            return new NdbRecordOperationImpl(this, storeTable);
+        }
         TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
         handleError(ndbTable, ndbDictionary);
         NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
         handleError(ndbOperation, ndbTransaction);
         int returnCode = ndbOperation.insertTuple();
         handleError(returnCode, ndbTransaction);
-        if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
         return new OperationImpl(ndbOperation, this);
     }
 
@@ -373,6 +388,22 @@ class ClusterTransactionImpl implements 
         return new OperationImpl(storeTable, ndbOperation, this);
     }
 
+    /** Create an NdbOperation for insert using NdbRecord.
+     * 
+     * @param ndbRecord the NdbRecord
+     * @param buffer the buffer with data for the operation
+     * @param mask the mask of column values already set in the buffer
+     * @param options the OperationOptions for this operation
+     * @param i 
+     * @return
+     */
+    public NdbOperationConst insertTuple(NdbRecordConst ndbRecord,
+            ByteBuffer buffer, byte[] mask, OperationOptionsConst options) {
+        NdbOperationConst operation = ndbTransaction.insertTuple(ndbRecord, buffer, mask, options, 0);
+        handleError(operation, ndbTransaction);
+        return operation;
+    }
+
     public void postExecuteCallback(Runnable callback) {
         postExecuteCallbacks.add(callback);
     }
@@ -520,4 +551,21 @@ class ClusterTransactionImpl implements 
         return bufferManager;
     }
 
+    protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
+        return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable);
+    }
+
+    /** Get the UseNdbRecord property from either the environment or system properties.
+     * 
+     * @return the system property if it is set via -D or the system environment
+     */
+    protected static boolean getUseNdbRecord() {
+        String useNdbRecordENV = System.getenv(USE_NDBRECORD_NAME);
+        // system property overrides environment variable
+        boolean result = System.getProperty(USE_NDBRECORD_NAME, useNdbRecordENV==null?"true":useNdbRecordENV)
+                .equals("true")?true:false;
+        logger.info("useNdbRecordENV: " + useNdbRecordENV + " UseNdbRecord: " + result);
+        return result;
+    }
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java	2011-03-23 22:41:01 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -209,7 +209,7 @@ class ColumnImpl implements Column {
                 break;
             case ColumnConst.Type.Timestamp:
                 this.prefixLength = 0;
-                this.columnSpace = 4;
+                this.columnSpace = 0;
                 break;
             default: throw new ClusterJFatalInternalException(
                     local.message("ERR_Unknown_Column_Type",

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2011-12-05 22:07:02 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -32,7 +32,6 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
 
 import com.mysql.clusterj.ClusterJDatastoreException;
 import com.mysql.clusterj.ClusterJFatalInternalException;
-import com.mysql.clusterj.core.store.ClusterConnection;
 import com.mysql.clusterj.core.store.ClusterTransaction;
 
 import com.mysql.clusterj.core.util.I18NHelper;
@@ -82,9 +81,9 @@ class DbImpl implements com.mysql.cluste
     private DictionaryImpl dictionary;
 
     /** The ClusterConnection */
-    private ClusterConnection clusterConnection;
+    private ClusterConnectionImpl clusterConnection;
 
-    public DbImpl(ClusterConnection clusterConnection, Ndb ndb, int maxTransactions) {
+    public DbImpl(ClusterConnectionImpl clusterConnection, Ndb ndb, int maxTransactions) {
         this.clusterConnection = clusterConnection;
         this.ndb = ndb;
         int returnCode = ndb.init(maxTransactions);
@@ -103,8 +102,12 @@ class DbImpl implements com.mysql.cluste
         return dictionary;
     }
 
+    public Dictionary getNdbDictionary() {
+        return ndbDictionary;
+    }
+
     public ClusterTransaction startTransaction(String joinTransactionId) {
-        return new ClusterTransactionImpl(this, ndbDictionary, joinTransactionId);
+        return new ClusterTransactionImpl(clusterConnection, this, ndbDictionary, joinTransactionId);
     }
 
     protected void handleError(int returnCode, Ndb ndb) {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java	2011-10-27 23:43:25 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -127,4 +127,8 @@ class DictionaryImpl implements com.mysq
         ndbDictionary.removeCachedTable(tableName);
     }
 
+    public Dictionary getNdbDictionary() {
+        return ndbDictionary;
+    }
+
 }

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java	2012-01-23 00:44:39 +0000
@@ -0,0 +1,56 @@
+/*
+ *  Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.Column;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+/**
+ * NdbRecord blob handling defers the acquisition of an NdbBlob until the NdbOperation
+ * is created. At that time, this implementation will get the NdbBlob from its NdbOperation.
+ * Operations on the NdbBlob are delegated to the parent (by inheritance).
+ */
+class NdbRecordBlobImpl extends BlobImpl {
+
+    /** My message translator */
+    static final I18NHelper local = I18NHelper
+            .getInstance(NdbRecordBlobImpl.class);
+
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance(NdbRecordBlobImpl.class);
+
+    /** The store column for this blob */
+    private Column storeColumn;
+
+    /** The operation */
+    private NdbRecordOperationImpl operation;
+
+    public NdbRecordBlobImpl(NdbRecordOperationImpl operation, Column storeColumn) {
+        this.storeColumn = storeColumn;
+        this.operation = operation;
+    }
+
+    protected void setNdbBlob() {
+        this.ndbBlob = operation.getNdbBlob(storeColumn);
+    }
+
+}

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java	2012-01-23 00:44:39 +0000
@@ -0,0 +1,375 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import java.nio.ByteBuffer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJFatalInternalException;
+import com.mysql.clusterj.ClusterJFatalUserException;
+
+import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Table;
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.clusterj.tie.DbImpl.BufferManager;
+
+import com.mysql.ndbjtie.ndbapi.NdbRecord;
+import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecification;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecificationArray;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
+
+/**
+ * Wrapper around an NdbRecord. The default implementation can be used for insert,
+ * using an NdbRecord that defines every column in the table. After construction, the instance is
+ * read-only and can be shared among all threads that use the same cluster connection; and the size of the
+ * buffer required for operations is available. The NdbRecord instance is released when the cluster
+ * connection is closed. Column values can be set using a provided
+ * buffer and buffer manager.
+ */
+public class NdbRecordImpl {
+
+    /** My message translator */
+    static final I18NHelper local = I18NHelper
+            .getInstance(NdbRecordImpl.class);
+
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance(NdbRecordImpl.class);
+
+    /** The size of the NdbRecord struct */
+    protected final static int SIZEOF_RECORD_SPECIFICATION = ClusterConnectionServiceImpl.SIZEOF_RECORD_SPECIFICATION;
+
+    /** The NdbRecord for this operation */
+    private NdbRecord ndbRecord = null;
+
+    /** The store columns for this operation */
+    protected List<Column> storeColumns = new ArrayList<Column>();
+
+    /** The RecordSpecificationArray used to define the columns in the NdbRecord */
+    private RecordSpecificationArray recordSpecificationArray;
+
+    /** The NdbTable */
+    TableConst tableConst;
+
+    /** The size of the receive buffer for this operation (may be zero for non-read operations) */
+    protected int bufferSize;
+
+    /** The maximum column id for this operation (may be zero for non-read operations) */
+    protected int maximumColumnId;
+
+    /** The offsets into the buffer for each column (may be null for non-read operations) */
+    protected int[] offsets;
+
+    /** Values for setting column mask and null bit mask */
+    protected final static byte[] BIT_IN_BYTE_MASK = new byte[] {1, 2, 4, 8, 16, 32, 64, -128};
+
+    /** The position in the null indicator for the field */
+    protected int nullablePositions[] = null;
+
+    /** The null indicator for the field bit in the byte */
+    protected int nullbitBitInByte[] = null;
+
+    /** The null indicator for the field byte offset*/
+    protected int nullbitByteOffset[] = null;
+
+    /** The maximum length of any column in this operation */
+    protected int maximumColumnLength;
+
+    /** The dictionary used to create (and release) the NdbRecord */
+    private Dictionary ndbDictionary;
+
+    /** Number of columns for this NdbRecord */
+    private int numberOfColumns;
+
+    /** These fields are only used during construction of the RecordSpecificationArray */
+    int offset = 0;
+    int nullablePosition = 0;
+
+    /** Constructor used for insert operations that do not need to read data.
+     * 
+     * @param storeTable the store table
+     * @param ndbDictionary the ndb dictionary
+     */
+    protected NdbRecordImpl(Table storeTable, Dictionary ndbDictionary) {
+        this.ndbDictionary = ndbDictionary;
+        this.tableConst = getNdbTable(storeTable.getName());
+        this.numberOfColumns = tableConst.getNoOfColumns();
+        this.recordSpecificationArray = RecordSpecificationArray.create(numberOfColumns);
+        this.offsets = new int[numberOfColumns];
+        this.nullablePositions = new int[numberOfColumns];
+        this.nullbitBitInByte = new int[numberOfColumns];
+        this.nullbitByteOffset = new int[numberOfColumns];
+        this.ndbRecord = createNdbRecord(storeTable, ndbDictionary);
+    }
+
+
+    public int setBigInteger(ByteBuffer buffer, Column storeColumn, BigInteger value) {
+        int columnId = storeColumn.getColumnId();
+        int newPosition = offsets[columnId];
+        buffer.position(newPosition);
+        ByteBuffer bigIntegerBuffer = Utility.convertValue(storeColumn, value);
+        buffer.put(bigIntegerBuffer);
+        return columnId;
+    }
+
+    public int setByte(ByteBuffer buffer, Column storeColumn, byte value) {
+        int columnId = storeColumn.getColumnId();
+        buffer.put(offsets[columnId], value);
+        return columnId;
+    }
+
+    public int setBytes(ByteBuffer buffer, Column storeColumn, byte[] value) {
+        int columnId = storeColumn.getColumnId();
+        int newPosition = offsets[columnId];
+        buffer.position(newPosition);
+        Utility.convertValue(buffer, storeColumn, value);
+        return columnId;
+    }
+
+    public int setDecimal(ByteBuffer buffer, Column storeColumn, BigDecimal value) {
+        int columnId = storeColumn.getColumnId();
+        int newPosition = offsets[columnId];
+        buffer.position(newPosition);
+        ByteBuffer decimalBuffer = Utility.convertValue(storeColumn, value);
+        buffer.put(decimalBuffer);
+        return columnId;
+    }
+
+    public int setDouble(ByteBuffer buffer, Column storeColumn, Double value) {
+        int columnId = storeColumn.getColumnId();
+        buffer.putDouble(offsets[columnId], value);
+        return columnId;
+    }
+
+    public int setFloat(ByteBuffer buffer, Column storeColumn, Float value) {
+        int columnId = storeColumn.getColumnId();
+        buffer.putFloat(offsets[columnId], value);
+        return columnId;
+    }
+
+    public int setInt(ByteBuffer buffer, Column storeColumn, Integer value) {
+        int columnId = storeColumn.getColumnId();
+        buffer.putInt(offsets[columnId], value);
+        return columnId;
+    }
+
+    public int setLong(ByteBuffer buffer, Column storeColumn, long value) {
+        int columnId = storeColumn.getColumnId();
+        long storeValue = Utility.convertLongValueForStorage(storeColumn, value);
+        buffer.putLong(offsets[columnId], storeValue);
+        return columnId;
+    }
+
+    public int setNull(ByteBuffer buffer, Column storeColumn) {
+        int columnId = storeColumn.getColumnId();
+        int index = nullbitByteOffset[columnId];
+        byte mask = BIT_IN_BYTE_MASK[nullbitBitInByte[columnId]];
+        byte nullbyte = buffer.get(index);
+        buffer.put(index, (byte)(nullbyte|mask));
+        return columnId;
+    }
+
+    public int setShort(ByteBuffer buffer, Column storeColumn, Short value) {
+        int columnId = storeColumn.getColumnId();
+        buffer.putShort(offsets[columnId], value);
+        return columnId;
+    }
+
+    public int setString(ByteBuffer buffer, BufferManager bufferManager, Column storeColumn, String value) {
+        int columnId = storeColumn.getColumnId();
+        buffer.position(offsets[columnId]);
+        // for now, use the encode method to encode the value then copy it
+        ByteBuffer converted = Utility.encode(value, storeColumn, bufferManager);
+        buffer.put(converted);
+        return columnId;
+    }
+
+    protected static void handleError(Object object, Dictionary ndbDictionary) {
+        if (object != null) {
+            return;
+        } else {
+            Utility.throwError(null, ndbDictionary.getNdbError());
+        }
+    }
+
+    protected NdbRecord createNdbRecord(Table storeTable, Dictionary ndbDictionary) {
+        String[] columnNames = storeTable.getColumnNames();
+        List<Column> align8 = new ArrayList<Column>();
+        List<Column> align4 = new ArrayList<Column>();
+        List<Column> align2 = new ArrayList<Column>();
+        List<Column> align1 = new ArrayList<Column>();
+        List<Column> nullables = new ArrayList<Column>();
+        for (String columnName: columnNames) {
+            Column storeColumn = storeTable.getColumn(columnName);
+            storeColumns.add(storeColumn);
+            // for each column, put into alignment bucket
+            switch (storeColumn.getType()) {
+                case Bigint:
+                case Bigunsigned:
+                case Bit:
+                case Blob:
+                case Date:
+                case Datetime:
+                case Double:
+                case Text:
+                case Time:
+                case Timestamp:
+                    align8.add(storeColumn);
+                    break;
+                case Binary:
+                case Char:
+                case Decimal:
+                case Decimalunsigned:
+                case Longvarbinary:
+                case Longvarchar:
+                case Olddecimal:
+                case Olddecimalunsigned:
+                case Tinyint:
+                case Tinyunsigned:
+                case Varbinary:
+                case Varchar:
+                    align1.add(storeColumn);
+                    break;
+                case Float:
+                case Int:
+                case Mediumint:
+                case Mediumunsigned:
+                case Unsigned:
+                    align4.add(storeColumn);
+                    break;
+                case Smallint:
+                case Smallunsigned:
+                case Year:
+                    align2.add(storeColumn);
+                    break;
+                case Undefined:
+                    throw new ClusterJFatalUserException(local.message("ERR_Unknown_Column_Type",
+                            storeTable.getName(), columnName, storeColumn.getType()));
+                default:
+                    throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Column_Type",
+                            storeTable.getName(), columnName, storeColumn.getType()));
+            }
+            if (storeColumn.getNullable()) {
+                nullables.add(storeColumn);
+            }
+        }
+        // for each column, allocate space in the buffer, starting with align8 and ending with align1
+        // null indicators are allocated first, with one bit per nullable column
+        // nullable columns take one bit each
+        offset = nullables.size() + 7 / 8;
+        // align the first column following the nullable column indicators to 8
+        offset = (7 + offset) / 8 * 8;
+        for (Column storeColumn: align8) {
+            handleColumn(8, storeColumn);
+        }
+        for (Column storeColumn: align4) {
+            handleColumn(4, storeColumn);
+        }
+        for (Column storeColumn: align2) {
+            handleColumn(2, storeColumn);
+        }
+        for (Column storeColumn: align1) {
+            handleColumn(1, storeColumn);
+        }
+        bufferSize = offset;
+
+        // now create an NdbRecord
+        NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
+                numberOfColumns, SIZEOF_RECORD_SPECIFICATION, 0);
+        // delete the RecordSpecificationArray since it is no longer needed
+        RecordSpecificationArray.delete(recordSpecificationArray);
+        handleError(result, ndbDictionary);
+        return result;
+    }
+
+    /** Create a record specification for a column. Keep track of the offset into the buffer
+     * and the null indicator position for each column.
+     * 
+     * @param alignment the alignment for this column in the buffer
+     * @param storeColumn the column
+     */
+    private void handleColumn(int alignment, Column storeColumn) {
+        int columnId = storeColumn.getColumnId();
+        RecordSpecification recordSpecification = recordSpecificationArray.at(columnId);
+        ColumnConst columnConst = tableConst.getColumn(columnId);
+        recordSpecification.column(columnConst);
+        recordSpecification.offset(offset);
+        offsets[columnId] = offset;
+        int columnSpace = storeColumn.getColumnSpace();
+        offset += (columnSpace==0)?8:columnSpace;
+        if (storeColumn.getNullable()) {
+            nullablePositions[columnId] = nullablePosition++;
+            int nullbitByteOffsetValue = nullablePosition/8;
+            int nullbitBitInByteValue = nullablePosition - nullablePosition / 8 * 8;
+            nullbitBitInByte[columnId] = nullbitBitInByteValue;
+            nullbitByteOffset[columnId] = nullbitByteOffsetValue;
+            recordSpecification.nullbit_byte_offset(nullbitByteOffsetValue);
+            recordSpecification.nullbit_bit_in_byte(nullbitBitInByteValue);
+        } else {
+            recordSpecification.nullbit_byte_offset(0);
+            recordSpecification.nullbit_bit_in_byte(0);
+        }
+        if (logger.isDetailEnabled()) logger.detail(
+                "column: " + storeColumn.getName()
+                + " columnSpace: " + columnSpace 
+                + " offset: " + offsets[columnId]
+                + " nullable position: " + nullablePositions[columnId]
+                + " nullbitByteOffset: " + nullbitByteOffset[columnId]
+                + " nullbitBitInByte: " +  nullbitBitInByte[columnId]);
+    }
+
+    TableConst getNdbTable(String tableName) {
+        TableConst ndbTable = ndbDictionary.getTable(tableName);
+        if (ndbTable == null) {
+            // try the lower case table name
+            ndbTable = ndbDictionary.getTable(tableName.toLowerCase());
+        }
+        return ndbTable;
+    }
+
+    public int getBufferSize() {
+        return bufferSize;
+    }
+
+    public NdbRecordConst getNdbRecord() {
+        return ndbRecord;
+    }
+
+    public int getNumberOfColumns() {
+        return numberOfColumns;
+    }
+
+    protected void releaseNdbRecord() {
+        if (ndbRecord != null) {
+            ndbDictionary.releaseRecord(ndbRecord);
+            ndbRecord = null;
+        }
+    }
+
+}

=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java	2012-01-23 00:44:39 +0000
@@ -0,0 +1,318 @@
+/*
+   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJFatalInternalException;
+
+import com.mysql.clusterj.core.store.Blob;
+import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
+import com.mysql.clusterj.core.store.Table;
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.clusterj.tie.DbImpl.BufferManager;
+
+import com.mysql.ndbjtie.ndbapi.NdbBlob;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+
+/**
+ * Implementation of store operation that uses NdbRecord.
+ */
+class NdbRecordOperationImpl implements Operation {
+
+    /** My message translator */
+    static final I18NHelper local = I18NHelper
+            .getInstance(NdbRecordOperationImpl.class);
+
+    /** My logger */
+    static final Logger logger = LoggerFactoryService.getFactory()
+            .getInstance(NdbRecordOperationImpl.class);
+
+    /** The ClusterTransaction that this operation belongs to */
+    protected ClusterTransactionImpl clusterTransaction;
+
+    /** The NdbOperation wrapped by this object */
+    private NdbOperationConst ndbOperation = null;
+
+    /** The NdbRecord for this operation */
+    private NdbRecordImpl ndbRecordImpl = null;
+
+    /** The mask for this operation, which contains a bit set for each column to be inserted */
+    byte[] mask;
+
+    /** The ByteBuffer containing all of the data */
+    ByteBuffer buffer = null;
+
+    /** Blobs for this NdbRecord */
+    protected NdbRecordBlobImpl[] blobs = null;
+
+    /** Blobs that have been accessed for this operation */
+    protected List<NdbRecordBlobImpl> activeBlobs = new ArrayList<NdbRecordBlobImpl>();
+
+    /** The size of the receive buffer for this operation (may be zero for non-read operations) */
+    protected int bufferSize;
+
+    /** The number of columns for this operation */
+    protected int numberOfColumns;
+
+    protected BufferManager bufferManager;
+
+    /** Constructor used for insert and delete operations that do not need to read data.
+     * 
+     * @param clusterTransaction the cluster transaction
+     * @param transaction the ndb transaction
+     * @param storeTable the store table
+     */
+    public NdbRecordOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
+        this.ndbRecordImpl = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+        this.bufferSize = ndbRecordImpl.getBufferSize();
+        this.numberOfColumns = ndbRecordImpl.getNumberOfColumns();
+        this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+        this.clusterTransaction = clusterTransaction;
+        this.bufferManager = clusterTransaction.getBufferManager();
+    }
+
+    public void equalBigInteger(Column storeColumn, BigInteger value) {
+        setBigInteger(storeColumn, value);
+    }
+
+    public void equalBoolean(Column storeColumn, boolean booleanValue) {
+        setBoolean(storeColumn, booleanValue);
+    }
+
+    public void equalByte(Column storeColumn, byte value) {
+        setByte(storeColumn, value);
+    }
+
+    public void equalBytes(Column storeColumn, byte[] value) {
+        setBytes(storeColumn, value);
+   }
+
+    public void equalDecimal(Column storeColumn, BigDecimal value) {
+        setDecimal(storeColumn, value);
+    }
+
+    public void equalDouble(Column storeColumn, double value) {
+        setDouble(storeColumn, value);
+    }
+
+    public void equalFloat(Column storeColumn, float value) {
+        setFloat(storeColumn, value);
+    }
+
+    public void equalInt(Column storeColumn, int value) {
+        setInt(storeColumn, value);
+    }
+
+    public void equalShort(Column storeColumn, short value) {
+        setShort(storeColumn, value);
+    }
+
+    public void equalLong(Column storeColumn, long value) {
+        setLong(storeColumn, value);
+    }
+
+    public void equalString(Column storeColumn, String value) {
+        setString(storeColumn, value);
+    }
+
+    public void getBlob(Column storeColumn) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "getBlob"));
+    }
+
+    /**
+     * Get the blob handle for this column. The same column will return the same blob handle
+     * regardless of how many times it is called.
+     * @param storeColumn the store column
+     * @return the blob handle
+     */
+    public Blob getBlobHandle(Column storeColumn) {
+        int columnId = storeColumn.getColumnId();
+        NdbRecordBlobImpl result = blobs[columnId];
+        if (result == null) {
+            columnSet(columnId);
+            result = new NdbRecordBlobImpl(this, storeColumn);
+            blobs[columnId] = result;
+            activeBlobs.add(result);
+        }
+        return result;
+    }
+
+    /** Specify the columns to be used for the operation.
+     */
+    public void getValue(Column storeColumn) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "getValue"));
+    }
+
+    public void postExecuteCallback(Runnable callback) {
+        clusterTransaction.postExecuteCallback(callback);
+    }
+
+    /** Construct a new ResultData using the saved column data and then execute the operation.
+     */
+    public ResultData resultData() {
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "resultData"));
+    }
+
+    /** Construct a new ResultData and if requested, execute the operation.
+     */
+    public ResultData resultData(boolean execute) {
+        throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "resultData"));
+    }
+
+    public void setBigInteger(Column storeColumn, BigInteger value) {
+        int columnId = ndbRecordImpl.setBigInteger(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setBoolean(Column storeColumn, Boolean booleanValue) {
+        byte value = (booleanValue?(byte)0x01:(byte)0x00);
+        setByte(storeColumn, value);
+    }
+
+    public void setByte(Column storeColumn, byte value) {
+        int columnId = ndbRecordImpl.setByte(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setBytes(Column storeColumn, byte[] value) {
+        int columnId = ndbRecordImpl.setBytes(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setDecimal(Column storeColumn, BigDecimal value) {
+        int columnId = ndbRecordImpl.setDecimal(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setDouble(Column storeColumn, Double value) {
+        int columnId = ndbRecordImpl.setDouble(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setFloat(Column storeColumn, Float value) {
+        int columnId = ndbRecordImpl.setFloat(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setInt(Column storeColumn, Integer value) {
+        int columnId = ndbRecordImpl.setInt(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setLong(Column storeColumn, long value) {
+        int columnId = ndbRecordImpl.setLong(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setNull(Column storeColumn) {
+        int columnId = ndbRecordImpl.setNull(buffer, storeColumn);
+        columnSet(columnId);
+    }
+
+    public void setShort(Column storeColumn, Short value) {
+        int columnId = ndbRecordImpl.setShort(buffer, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public void setString(Column storeColumn, String value) {
+        int columnId = ndbRecordImpl.setString(buffer, bufferManager, storeColumn, value);
+        columnSet(columnId);
+    }
+
+    public int errorCode() {
+        return ndbOperation.getNdbError().code();
+    }
+
+    protected static void handleError(int returnCode, NdbOperationConst ndbOperation2) {
+        if (returnCode == 0) {
+            return;
+        } else {
+            Utility.throwError(returnCode, ndbOperation2.getNdbError());
+        }
+    }
+
+    protected static void handleError(Object object, NdbOperationConst ndbOperation) {
+        if (object != null) {
+            return;
+        } else {
+            Utility.throwError(null, ndbOperation.getNdbError());
+        }
+    }
+
+    protected static void handleError(Object object, Dictionary ndbDictionary) {
+        if (object != null) {
+            return;
+        } else {
+            Utility.throwError(null, ndbDictionary.getNdbError());
+        }
+    }
+
+    public void beginDefinition() {
+        // allocate a buffer for the operation data
+        buffer = ByteBuffer.allocateDirect(bufferSize);
+        // use platform's native byte ordering
+        buffer.order(ByteOrder.nativeOrder());
+        mask = new byte[1 + (numberOfColumns/8)];
+    }
+
+    public void endDefinition() {
+        // create the insert operation
+        buffer.position(0);
+        buffer.limit(bufferSize);
+        // create the insert operation
+        ndbOperation = clusterTransaction.insertTuple(ndbRecordImpl.getNdbRecord(), buffer, mask, null);
+        // now set the NdbBlob into the blobs
+        for (NdbRecordBlobImpl blob: activeBlobs) {
+            if (blob != null) {
+                blob.setNdbBlob();
+            }
+        }
+    }
+
+    public NdbBlob getNdbBlob(Column storeColumn) {
+        NdbBlob result = ndbOperation.getBlobHandle(storeColumn.getColumnId());
+        handleError(result, ndbOperation);
+        return result;
+    }
+
+    /**
+     * Set this column into the mask for NdbRecord operation.
+     * @param columnId the column id
+     */
+    private void columnSet(int columnId) {
+        int byteOffset = columnId / 8;
+        int bitInByte = columnId - (byteOffset * 8);
+        mask[byteOffset] |= NdbRecordImpl.BIT_IN_BYTE_MASK[bitInByte];
+        
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java	2011-02-03 14:37:50 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
-   Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+   Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -263,7 +263,7 @@ class OperationImpl implements Operation
 
     public void setLong(Column storeColumn, long value) {
         long storeValue = Utility.convertLongValueForStorage(storeColumn, value);
-        int returnCode = ndbOperation.setValue(storeColumn.getName(), storeValue);
+        int returnCode = ndbOperation.setValue(storeColumn.getColumnId(), storeValue);
         handleError(returnCode, ndbOperation);
     }
 
@@ -305,4 +305,12 @@ class OperationImpl implements Operation
         }
     }
 
+    public void beginDefinition() {
+        // nothing to do
+    }
+
+    public void endDefinition() {
+        // nothing to do
+    }
+
 }

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java	2011-07-05 06:00:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java	2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
 /*
- *  Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ *  Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
  *
  *  This program is free software; you can redistribute it and/or modify
  *  it under the terms of the GNU General Public License as published by
@@ -792,9 +792,22 @@ public class Utility {
      * @return the ByteBuffer
      */
     public static ByteBuffer convertValue(Column storeColumn, byte[] value) {
+        int requiredLength = storeColumn.getColumnSpace();
+        ByteBuffer result = ByteBuffer.allocateDirect(requiredLength);
+        convertValue(result, storeColumn, value);
+        result.flip();
+        return result;
+    }
+
+    /** Convert the parameter value and store it in a given ByteBuffer that can be passed to ndbjtie.
+     * 
+     * @param buffer the buffer, positioned at the location to store the value
+     * @param storeColumn the column definition
+     * @param value the value to be converted
+     */
+    public static void convertValue(ByteBuffer buffer, Column storeColumn, byte[] value) {
         int dataLength = value.length;
         int prefixLength = storeColumn.getPrefixLength();
-        ByteBuffer result;
         switch (prefixLength) {
             case 0:
                 int requiredLength = storeColumn.getColumnSpace();
@@ -803,12 +816,10 @@ public class Utility {
                             local.message("ERR_Data_Too_Long",
                             storeColumn.getName(), requiredLength, dataLength));
                 } else {
-                    result = ByteBuffer.allocateDirect(requiredLength);
-                    result.order(ByteOrder.nativeOrder());
-                    result.put(value);
+                    buffer.put(value);
                     if (dataLength < requiredLength) {
                         // pad with 0x00 on right
-                        result.put(ZERO_PAD, 0, requiredLength - dataLength);
+                        buffer.put(ZERO_PAD, 0, requiredLength - dataLength);
                     }
                 }
                 break;
@@ -818,10 +829,8 @@ public class Utility {
                             local.message("ERR_Data_Too_Long",
                             storeColumn.getName(), "255", dataLength));
                 }
-                result = ByteBuffer.allocateDirect(prefixLength + dataLength);
-                result.order(ByteOrder.nativeOrder());
-                result.put((byte)dataLength);
-                result.put(value);
+                buffer.put((byte)dataLength);
+                buffer.put(value);
                 break;
             case 2:
                 if (dataLength > 8000) {
@@ -829,19 +838,15 @@ public class Utility {
                             local.message("ERR_Data_Too_Long",
                             storeColumn.getName(), "8000", dataLength));
                 }
-                result = ByteBuffer.allocateDirect(prefixLength + dataLength);
-                result.order(ByteOrder.nativeOrder());
-                result.put((byte)(dataLength%256));
-                result.put((byte)(dataLength/256));
-                result.put(value);
+                buffer.put((byte)(dataLength%256));
+                buffer.put((byte)(dataLength/256));
+                buffer.put(value);
                 break;
             default: 
                     throw new ClusterJFatalInternalException(
                             local.message("ERR_Unknown_Prefix_Length",
                             prefixLength, storeColumn.getName()));
         }
-        result.flip();
-        return result;
     }
 
     /** Convert a BigDecimal value to the binary decimal form used by MySQL.

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties	2011-06-29 18:02:39 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties	2012-01-23 00:44:39 +0000
@@ -28,6 +28,7 @@ ERR_Cannot_Create_Cluster_Connection:Can
 ERR_Duplicate_NdbRecAttr_In_List:Duplicate NdbRecAttr for column {0} column id {1}.
 ERR_No_Operation_In_Result:There is no ndbOperation in the ResultData.
 ERR_Not_Implemented:Not implemented
+ERR_Method_Not_Implemented:Not implemented: {0}.
 ERR_NdbJTie:Error in NdbJTie: returnCode {0}, code {1}, mysqlCode {2}, \
 status {3}, classification {4}, message {5} {6}.
 ERR_Invalid_Prefix_Length:The prefix length {0} is invalid.

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (Craig.Russell:4428 to 4429) Craig L Russell23 Jan