From: Jonas Oreland Date: January 23 2012 7:41am Subject: bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3777 to 3778) List-Archive: http://lists.mysql.com/commits/142506 Message-Id: <20120123074138.5236555C278@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3778 Jonas Oreland 2012-01-23 [merge] ndb - merge 71 to 72 added: storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java modified: storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties 3777 jonas oreland 2012-01-21 [merge] ndb - merge 71 to 72 added: storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/StressTest.java storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/StressTest.java modified: mysql-test/suite/ndb/t/clusterj.test storage/ndb/clusterj/clusterj-api/pom.xml storage/ndb/clusterj/clusterj-bindings/pom.xml storage/ndb/clusterj/clusterj-core/pom.xml storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainFieldHandlerImpl.java storage/ndb/clusterj/clusterj-jdbc/pom.xml storage/ndb/clusterj/clusterj-jpatest/pom.xml storage/ndb/clusterj/clusterj-openjpa/pom.xml storage/ndb/clusterj/clusterj-test/pom.xml storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DynamicObjectTest.java storage/ndb/clusterj/clusterj-test/src/main/resources/schema.sql storage/ndb/clusterj/clusterj-tie/pom.xml storage/ndb/clusterj/pom.xml === modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java' --- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2011-11-22 22:01:23 +0000 +++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -392,8 +392,10 @@ public class SessionImpl implements Sess storeTable = domainTypeHandler.getStoreTable(); op = clusterTransaction.getInsertOperation(storeTable); // set all values in the operation, keys first + op.beginDefinition(); domainTypeHandler.operationSetKeys(valueHandler, op); domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op); + op.endDefinition(); // reset modified bits in instance domainTypeHandler.objectResetModified(valueHandler); } catch (ClusterJUserException cjuex) { === modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java' --- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java 2011-02-03 14:37:50 +0000 +++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -85,4 +85,8 @@ public interface Operation { public void setString(Column storeColumn, String string); + public void beginDefinition(); + + public void endDefinition(); + } === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java 2011-07-05 12:46:07 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java 2012-01-23 07:39:42 +0000 @@ -1,6 +1,5 @@ /* - * Copyright 2010 Sun Microsystems, Inc. - * All rights reserved. Use is subject to license terms. + * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -43,7 +42,11 @@ class BlobImpl implements Blob { static final Logger logger = LoggerFactoryService.getFactory() .getInstance(BlobImpl.class); - private NdbBlob ndbBlob; + protected NdbBlob ndbBlob; + + public BlobImpl() { + // this is only for NdbRecordBlobImpl constructor when there is no ndbBlob available yet + } public BlobImpl(NdbBlob blob) { this.ndbBlob = blob; === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java 2011-12-18 11:47:59 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -20,13 +20,18 @@ package com.mysql.clusterj.tie; import java.util.IdentityHashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + import com.mysql.ndbjtie.ndbapi.Ndb; import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary; import com.mysql.clusterj.ClusterJDatastoreException; import com.mysql.clusterj.ClusterJFatalInternalException; import com.mysql.clusterj.core.store.Db; +import com.mysql.clusterj.core.store.Table; import com.mysql.clusterj.core.util.I18NHelper; import com.mysql.clusterj.core.util.Logger; @@ -57,6 +62,12 @@ public class ClusterConnectionImpl /** All dbs given out by this cluster connection */ private Map dbs = new IdentityHashMap(); + /** The map of table name to NdbRecordImpl */ + private ConcurrentMap ndbRecordImplMap = new ConcurrentHashMap(); + + /** The dictionary used to create NdbRecords */ + Dictionary dictionaryForNdbRecord = null; + /** Connect to the MySQL Cluster * * @param connectString the connect string @@ -83,6 +94,14 @@ public class ClusterConnectionImpl synchronized(this) { ndb = Ndb.create(clusterConnection, database, "def"); handleError(ndb, clusterConnection, connectString, nodeId); + if (dictionaryForNdbRecord == null) { + // create a dictionary for NdbRecord + Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def"); + handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId); + DbImpl dbForNdbRecord = new DbImpl(this, ndbForNdbRecord, maxTransactions); + dbs.put(dbForNdbRecord, null); + dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary(); + } } DbImpl result = new DbImpl(this, ndb, maxTransactions); dbs.put(result, null); @@ -143,6 +162,16 @@ public class ClusterConnectionImpl public void close() { if (clusterConnection != null) { logger.info(local.message("INFO_Close_Cluster_Connection", connectString, nodeId)); + for (NdbRecordImpl ndbRecord: ndbRecordImplMap.values()) { + ndbRecord.releaseNdbRecord(); + } + ndbRecordImplMap.clear(); + if (dbs.size() != 0) { + Map dbsToClose = new IdentityHashMap(dbs); + for (Db db: dbsToClose.keySet()) { + db.close(); + } + } Ndb_cluster_connection.delete(clusterConnection); clusterConnection = null; } @@ -153,7 +182,43 @@ public class ClusterConnectionImpl } public int dbCount() { - return dbs.size(); + // one of the dbs is for the NdbRecord dictionary if it is not null + int dbForNdbRecord = (dictionaryForNdbRecord == null)?0:1; + return dbs.size() - dbForNdbRecord; + } + + /** + * Get the cached NdbRecord implementation for this cluster connection. + * There are three possibilities: + *
  • Case 1: return the already-cached NdbRecord + *
  • Case 2: return a new instance created by this method + *
  • Case 3: return the winner of a race with another thread + *
+ * @param storeTable the store table + * @param ndbDictionary the ndb dictionary + * @return the NdbRecordImpl + */ + protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) { + String tableName = storeTable.getName(); + // find the NdbRecordImpl in the global cache + NdbRecordImpl result = ndbRecordImplMap.get(tableName); + if (result != null) { + // case 1 + return result; + } else { + NdbRecordImpl newNdbRecordImpl = new NdbRecordImpl(storeTable, dictionaryForNdbRecord); + NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(tableName, newNdbRecordImpl); + if (winner == null) { + // case 2: the previous value was null, so return the new (winning) value + if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + tableName); + return newNdbRecordImpl; + } else { + // case 3: another thread beat us, so return the winner and garbage collect ours + if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + tableName); + newNdbRecordImpl.releaseNdbRecord(); + return winner; + } + } } } === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java 2011-12-18 11:47:59 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -22,6 +22,10 @@ import com.mysql.clusterj.core.store.Clu import com.mysql.clusterj.core.util.I18NHelper; import com.mysql.clusterj.core.util.Logger; import com.mysql.clusterj.core.util.LoggerFactoryService; +import com.mysql.ndbjtie.ndbapi.Ndb; +import com.mysql.ndbjtie.ndbapi.NdbDictionary; +import com.mysql.ndbjtie.ndbapi.NdbOperation; +import com.mysql.ndbjtie.ndbapi.NdbScanOperation; /** * @@ -43,6 +47,18 @@ public class ClusterConnectionServiceImp /** Load the ndbclient system library only once */ static boolean ndbclientLoaded = false; + /** Size of OperationOptions, needed for some ndb apis */ + static int SIZEOF_OPERATION_OPTIONS; + + /** Size of PartitionSpec, needed for some ndb apis */ + static int SIZEOF_PARTITION_SPEC; + + /** Size of RecordSpecification, needed for some ndb apis */ + static int SIZEOF_RECORD_SPECIFICATION; + + /** Size of ScanOptions, needed for some ndb apis */ + static int SIZEOF_SCAN_OPTIONS; + static protected void loadSystemLibrary(String name) { // this is not thread-protected so it might be executed multiple times but no matter if (ndbclientLoaded) { @@ -52,6 +68,11 @@ public class ClusterConnectionServiceImp System.loadLibrary(name); // initialize the charset map Utility.getCharsetMap(); + // get the size information for Ndb structs as needed by some ndb apis + SIZEOF_OPERATION_OPTIONS = NdbOperation.OperationOptions.size(); + SIZEOF_PARTITION_SPEC = Ndb.PartitionSpec.size(); + SIZEOF_RECORD_SPECIFICATION = NdbDictionary.RecordSpecification.size(); + SIZEOF_SCAN_OPTIONS = NdbScanOperation.ScanOptions.size(); ndbclientLoaded = true; } catch (Throwable e) { String path = getLoadLibraryPath(); === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2011-12-05 22:07:02 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -17,6 +17,7 @@ package com.mysql.clusterj.tie; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -42,11 +43,14 @@ import com.mysql.ndbjtie.ndbapi.NdbError import com.mysql.ndbjtie.ndbapi.NdbIndexOperation; import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation; import com.mysql.ndbjtie.ndbapi.NdbOperation; +import com.mysql.ndbjtie.ndbapi.NdbOperationConst; +import com.mysql.ndbjtie.ndbapi.NdbRecordConst; import com.mysql.ndbjtie.ndbapi.NdbScanOperation; import com.mysql.ndbjtie.ndbapi.NdbTransaction; import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary; import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst; import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst; +import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst; import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption; import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag; @@ -63,9 +67,15 @@ class ClusterTransactionImpl implements static final Logger logger = LoggerFactoryService.getFactory() .getInstance(ClusterTransactionImpl.class); + protected final static String USE_NDBRECORD_NAME = "com.mysql.clusterj.UseNdbRecord"; + private static boolean USE_NDBRECORD = getUseNdbRecord(); + protected NdbTransaction ndbTransaction; private List postExecuteCallbacks = new ArrayList(); + /** The cluster connection for this transaction */ + protected ClusterConnectionImpl clusterConnectionImpl; + /** The DbImpl associated with this NdbTransaction */ protected DbImpl db; @@ -104,8 +114,10 @@ class ClusterTransactionImpl implements private BufferManager bufferManager; - public ClusterTransactionImpl(DbImpl db, Dictionary ndbDictionary, String joinTransactionId) { + public ClusterTransactionImpl(ClusterConnectionImpl clusterConnectionImpl, + DbImpl db, Dictionary ndbDictionary, String joinTransactionId) { this.db = db; + this.clusterConnectionImpl = clusterConnectionImpl; this.ndbDictionary = ndbDictionary; this.joinTransactionId = joinTransactionId; this.bufferManager = db.getBufferManager(); @@ -209,13 +221,16 @@ class ClusterTransactionImpl implements public Operation getInsertOperation(Table storeTable) { enlist(); + if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); + if (USE_NDBRECORD) { + return new NdbRecordOperationImpl(this, storeTable); + } TableConst ndbTable = ndbDictionary.getTable(storeTable.getName()); handleError(ndbTable, ndbDictionary); NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable); handleError(ndbOperation, ndbTransaction); int returnCode = ndbOperation.insertTuple(); handleError(returnCode, ndbTransaction); - if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName()); return new OperationImpl(ndbOperation, this); } @@ -373,6 +388,22 @@ class ClusterTransactionImpl implements return new OperationImpl(storeTable, ndbOperation, this); } + /** Create an NdbOperation for insert using NdbRecord. + * + * @param ndbRecord the NdbRecord + * @param buffer the buffer with data for the operation + * @param mask the mask of column values already set in the buffer + * @param options the OperationOptions for this operation + * @param i + * @return + */ + public NdbOperationConst insertTuple(NdbRecordConst ndbRecord, + ByteBuffer buffer, byte[] mask, OperationOptionsConst options) { + NdbOperationConst operation = ndbTransaction.insertTuple(ndbRecord, buffer, mask, options, 0); + handleError(operation, ndbTransaction); + return operation; + } + public void postExecuteCallback(Runnable callback) { postExecuteCallbacks.add(callback); } @@ -520,4 +551,21 @@ class ClusterTransactionImpl implements return bufferManager; } + protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) { + return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable); + } + + /** Get the UseNdbRecord property from either the environment or system properties. + * + * @return the system property if it is set via -D or the system environment + */ + protected static boolean getUseNdbRecord() { + String useNdbRecordENV = System.getenv(USE_NDBRECORD_NAME); + // system property overrides environment variable + boolean result = System.getProperty(USE_NDBRECORD_NAME, useNdbRecordENV==null?"true":useNdbRecordENV) + .equals("true")?true:false; + logger.info("useNdbRecordENV: " + useNdbRecordENV + " UseNdbRecord: " + result); + return result; + } + } === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java 2011-03-23 22:41:01 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -209,7 +209,7 @@ class ColumnImpl implements Column { break; case ColumnConst.Type.Timestamp: this.prefixLength = 0; - this.columnSpace = 4; + this.columnSpace = 0; break; default: throw new ClusterJFatalInternalException( local.message("ERR_Unknown_Column_Type", === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2011-12-05 22:07:02 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -32,7 +32,6 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti import com.mysql.clusterj.ClusterJDatastoreException; import com.mysql.clusterj.ClusterJFatalInternalException; -import com.mysql.clusterj.core.store.ClusterConnection; import com.mysql.clusterj.core.store.ClusterTransaction; import com.mysql.clusterj.core.util.I18NHelper; @@ -82,9 +81,9 @@ class DbImpl implements com.mysql.cluste private DictionaryImpl dictionary; /** The ClusterConnection */ - private ClusterConnection clusterConnection; + private ClusterConnectionImpl clusterConnection; - public DbImpl(ClusterConnection clusterConnection, Ndb ndb, int maxTransactions) { + public DbImpl(ClusterConnectionImpl clusterConnection, Ndb ndb, int maxTransactions) { this.clusterConnection = clusterConnection; this.ndb = ndb; int returnCode = ndb.init(maxTransactions); @@ -103,8 +102,12 @@ class DbImpl implements com.mysql.cluste return dictionary; } + public Dictionary getNdbDictionary() { + return ndbDictionary; + } + public ClusterTransaction startTransaction(String joinTransactionId) { - return new ClusterTransactionImpl(this, ndbDictionary, joinTransactionId); + return new ClusterTransactionImpl(clusterConnection, this, ndbDictionary, joinTransactionId); } protected void handleError(int returnCode, Ndb ndb) { === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java 2011-10-27 23:43:25 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -127,4 +127,8 @@ class DictionaryImpl implements com.mysq ndbDictionary.removeCachedTable(tableName); } + public Dictionary getNdbDictionary() { + return ndbDictionary; + } + } === added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java 2012-01-23 00:44:39 +0000 @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; version 2 of the License. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +package com.mysql.clusterj.tie; + +import com.mysql.clusterj.core.store.Column; + +import com.mysql.clusterj.core.util.I18NHelper; +import com.mysql.clusterj.core.util.Logger; +import com.mysql.clusterj.core.util.LoggerFactoryService; + +/** + * NdbRecord blob handling defers the acquisition of an NdbBlob until the NdbOperation + * is created. At that time, this implementation will get the NdbBlob from its NdbOperation. + * Operations on the NdbBlob are delegated to the parent (by inheritance). + */ +class NdbRecordBlobImpl extends BlobImpl { + + /** My message translator */ + static final I18NHelper local = I18NHelper + .getInstance(NdbRecordBlobImpl.class); + + /** My logger */ + static final Logger logger = LoggerFactoryService.getFactory() + .getInstance(NdbRecordBlobImpl.class); + + /** The store column for this blob */ + private Column storeColumn; + + /** The operation */ + private NdbRecordOperationImpl operation; + + public NdbRecordBlobImpl(NdbRecordOperationImpl operation, Column storeColumn) { + this.storeColumn = storeColumn; + this.operation = operation; + } + + protected void setNdbBlob() { + this.ndbBlob = operation.getNdbBlob(storeColumn); + } + +} === added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java 2012-01-23 00:44:39 +0000 @@ -0,0 +1,375 @@ +/* + Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +package com.mysql.clusterj.tie; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import java.nio.ByteBuffer; + +import java.util.ArrayList; +import java.util.List; + +import com.mysql.clusterj.ClusterJFatalInternalException; +import com.mysql.clusterj.ClusterJFatalUserException; + +import com.mysql.clusterj.core.store.Column; +import com.mysql.clusterj.core.store.Table; +import com.mysql.clusterj.core.util.I18NHelper; +import com.mysql.clusterj.core.util.Logger; +import com.mysql.clusterj.core.util.LoggerFactoryService; + +import com.mysql.clusterj.tie.DbImpl.BufferManager; + +import com.mysql.ndbjtie.ndbapi.NdbRecord; +import com.mysql.ndbjtie.ndbapi.NdbRecordConst; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecification; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecificationArray; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst; + +/** + * Wrapper around an NdbRecord. The default implementation can be used for insert, + * using an NdbRecord that defines every column in the table. After construction, the instance is + * read-only and can be shared among all threads that use the same cluster connection; and the size of the + * buffer required for operations is available. The NdbRecord instance is released when the cluster + * connection is closed. Column values can be set using a provided + * buffer and buffer manager. + */ +public class NdbRecordImpl { + + /** My message translator */ + static final I18NHelper local = I18NHelper + .getInstance(NdbRecordImpl.class); + + /** My logger */ + static final Logger logger = LoggerFactoryService.getFactory() + .getInstance(NdbRecordImpl.class); + + /** The size of the NdbRecord struct */ + protected final static int SIZEOF_RECORD_SPECIFICATION = ClusterConnectionServiceImpl.SIZEOF_RECORD_SPECIFICATION; + + /** The NdbRecord for this operation */ + private NdbRecord ndbRecord = null; + + /** The store columns for this operation */ + protected List storeColumns = new ArrayList(); + + /** The RecordSpecificationArray used to define the columns in the NdbRecord */ + private RecordSpecificationArray recordSpecificationArray; + + /** The NdbTable */ + TableConst tableConst; + + /** The size of the receive buffer for this operation (may be zero for non-read operations) */ + protected int bufferSize; + + /** The maximum column id for this operation (may be zero for non-read operations) */ + protected int maximumColumnId; + + /** The offsets into the buffer for each column (may be null for non-read operations) */ + protected int[] offsets; + + /** Values for setting column mask and null bit mask */ + protected final static byte[] BIT_IN_BYTE_MASK = new byte[] {1, 2, 4, 8, 16, 32, 64, -128}; + + /** The position in the null indicator for the field */ + protected int nullablePositions[] = null; + + /** The null indicator for the field bit in the byte */ + protected int nullbitBitInByte[] = null; + + /** The null indicator for the field byte offset*/ + protected int nullbitByteOffset[] = null; + + /** The maximum length of any column in this operation */ + protected int maximumColumnLength; + + /** The dictionary used to create (and release) the NdbRecord */ + private Dictionary ndbDictionary; + + /** Number of columns for this NdbRecord */ + private int numberOfColumns; + + /** These fields are only used during construction of the RecordSpecificationArray */ + int offset = 0; + int nullablePosition = 0; + + /** Constructor used for insert operations that do not need to read data. + * + * @param storeTable the store table + * @param ndbDictionary the ndb dictionary + */ + protected NdbRecordImpl(Table storeTable, Dictionary ndbDictionary) { + this.ndbDictionary = ndbDictionary; + this.tableConst = getNdbTable(storeTable.getName()); + this.numberOfColumns = tableConst.getNoOfColumns(); + this.recordSpecificationArray = RecordSpecificationArray.create(numberOfColumns); + this.offsets = new int[numberOfColumns]; + this.nullablePositions = new int[numberOfColumns]; + this.nullbitBitInByte = new int[numberOfColumns]; + this.nullbitByteOffset = new int[numberOfColumns]; + this.ndbRecord = createNdbRecord(storeTable, ndbDictionary); + } + + + public int setBigInteger(ByteBuffer buffer, Column storeColumn, BigInteger value) { + int columnId = storeColumn.getColumnId(); + int newPosition = offsets[columnId]; + buffer.position(newPosition); + ByteBuffer bigIntegerBuffer = Utility.convertValue(storeColumn, value); + buffer.put(bigIntegerBuffer); + return columnId; + } + + public int setByte(ByteBuffer buffer, Column storeColumn, byte value) { + int columnId = storeColumn.getColumnId(); + buffer.put(offsets[columnId], value); + return columnId; + } + + public int setBytes(ByteBuffer buffer, Column storeColumn, byte[] value) { + int columnId = storeColumn.getColumnId(); + int newPosition = offsets[columnId]; + buffer.position(newPosition); + Utility.convertValue(buffer, storeColumn, value); + return columnId; + } + + public int setDecimal(ByteBuffer buffer, Column storeColumn, BigDecimal value) { + int columnId = storeColumn.getColumnId(); + int newPosition = offsets[columnId]; + buffer.position(newPosition); + ByteBuffer decimalBuffer = Utility.convertValue(storeColumn, value); + buffer.put(decimalBuffer); + return columnId; + } + + public int setDouble(ByteBuffer buffer, Column storeColumn, Double value) { + int columnId = storeColumn.getColumnId(); + buffer.putDouble(offsets[columnId], value); + return columnId; + } + + public int setFloat(ByteBuffer buffer, Column storeColumn, Float value) { + int columnId = storeColumn.getColumnId(); + buffer.putFloat(offsets[columnId], value); + return columnId; + } + + public int setInt(ByteBuffer buffer, Column storeColumn, Integer value) { + int columnId = storeColumn.getColumnId(); + buffer.putInt(offsets[columnId], value); + return columnId; + } + + public int setLong(ByteBuffer buffer, Column storeColumn, long value) { + int columnId = storeColumn.getColumnId(); + long storeValue = Utility.convertLongValueForStorage(storeColumn, value); + buffer.putLong(offsets[columnId], storeValue); + return columnId; + } + + public int setNull(ByteBuffer buffer, Column storeColumn) { + int columnId = storeColumn.getColumnId(); + int index = nullbitByteOffset[columnId]; + byte mask = BIT_IN_BYTE_MASK[nullbitBitInByte[columnId]]; + byte nullbyte = buffer.get(index); + buffer.put(index, (byte)(nullbyte|mask)); + return columnId; + } + + public int setShort(ByteBuffer buffer, Column storeColumn, Short value) { + int columnId = storeColumn.getColumnId(); + buffer.putShort(offsets[columnId], value); + return columnId; + } + + public int setString(ByteBuffer buffer, BufferManager bufferManager, Column storeColumn, String value) { + int columnId = storeColumn.getColumnId(); + buffer.position(offsets[columnId]); + // for now, use the encode method to encode the value then copy it + ByteBuffer converted = Utility.encode(value, storeColumn, bufferManager); + buffer.put(converted); + return columnId; + } + + protected static void handleError(Object object, Dictionary ndbDictionary) { + if (object != null) { + return; + } else { + Utility.throwError(null, ndbDictionary.getNdbError()); + } + } + + protected NdbRecord createNdbRecord(Table storeTable, Dictionary ndbDictionary) { + String[] columnNames = storeTable.getColumnNames(); + List align8 = new ArrayList(); + List align4 = new ArrayList(); + List align2 = new ArrayList(); + List align1 = new ArrayList(); + List nullables = new ArrayList(); + for (String columnName: columnNames) { + Column storeColumn = storeTable.getColumn(columnName); + storeColumns.add(storeColumn); + // for each column, put into alignment bucket + switch (storeColumn.getType()) { + case Bigint: + case Bigunsigned: + case Bit: + case Blob: + case Date: + case Datetime: + case Double: + case Text: + case Time: + case Timestamp: + align8.add(storeColumn); + break; + case Binary: + case Char: + case Decimal: + case Decimalunsigned: + case Longvarbinary: + case Longvarchar: + case Olddecimal: + case Olddecimalunsigned: + case Tinyint: + case Tinyunsigned: + case Varbinary: + case Varchar: + align1.add(storeColumn); + break; + case Float: + case Int: + case Mediumint: + case Mediumunsigned: + case Unsigned: + align4.add(storeColumn); + break; + case Smallint: + case Smallunsigned: + case Year: + align2.add(storeColumn); + break; + case Undefined: + throw new ClusterJFatalUserException(local.message("ERR_Unknown_Column_Type", + storeTable.getName(), columnName, storeColumn.getType())); + default: + throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Column_Type", + storeTable.getName(), columnName, storeColumn.getType())); + } + if (storeColumn.getNullable()) { + nullables.add(storeColumn); + } + } + // for each column, allocate space in the buffer, starting with align8 and ending with align1 + // null indicators are allocated first, with one bit per nullable column + // nullable columns take one bit each + offset = nullables.size() + 7 / 8; + // align the first column following the nullable column indicators to 8 + offset = (7 + offset) / 8 * 8; + for (Column storeColumn: align8) { + handleColumn(8, storeColumn); + } + for (Column storeColumn: align4) { + handleColumn(4, storeColumn); + } + for (Column storeColumn: align2) { + handleColumn(2, storeColumn); + } + for (Column storeColumn: align1) { + handleColumn(1, storeColumn); + } + bufferSize = offset; + + // now create an NdbRecord + NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray, + numberOfColumns, SIZEOF_RECORD_SPECIFICATION, 0); + // delete the RecordSpecificationArray since it is no longer needed + RecordSpecificationArray.delete(recordSpecificationArray); + handleError(result, ndbDictionary); + return result; + } + + /** Create a record specification for a column. Keep track of the offset into the buffer + * and the null indicator position for each column. + * + * @param alignment the alignment for this column in the buffer + * @param storeColumn the column + */ + private void handleColumn(int alignment, Column storeColumn) { + int columnId = storeColumn.getColumnId(); + RecordSpecification recordSpecification = recordSpecificationArray.at(columnId); + ColumnConst columnConst = tableConst.getColumn(columnId); + recordSpecification.column(columnConst); + recordSpecification.offset(offset); + offsets[columnId] = offset; + int columnSpace = storeColumn.getColumnSpace(); + offset += (columnSpace==0)?8:columnSpace; + if (storeColumn.getNullable()) { + nullablePositions[columnId] = nullablePosition++; + int nullbitByteOffsetValue = nullablePosition/8; + int nullbitBitInByteValue = nullablePosition - nullablePosition / 8 * 8; + nullbitBitInByte[columnId] = nullbitBitInByteValue; + nullbitByteOffset[columnId] = nullbitByteOffsetValue; + recordSpecification.nullbit_byte_offset(nullbitByteOffsetValue); + recordSpecification.nullbit_bit_in_byte(nullbitBitInByteValue); + } else { + recordSpecification.nullbit_byte_offset(0); + recordSpecification.nullbit_bit_in_byte(0); + } + if (logger.isDetailEnabled()) logger.detail( + "column: " + storeColumn.getName() + + " columnSpace: " + columnSpace + + " offset: " + offsets[columnId] + + " nullable position: " + nullablePositions[columnId] + + " nullbitByteOffset: " + nullbitByteOffset[columnId] + + " nullbitBitInByte: " + nullbitBitInByte[columnId]); + } + + TableConst getNdbTable(String tableName) { + TableConst ndbTable = ndbDictionary.getTable(tableName); + if (ndbTable == null) { + // try the lower case table name + ndbTable = ndbDictionary.getTable(tableName.toLowerCase()); + } + return ndbTable; + } + + public int getBufferSize() { + return bufferSize; + } + + public NdbRecordConst getNdbRecord() { + return ndbRecord; + } + + public int getNumberOfColumns() { + return numberOfColumns; + } + + protected void releaseNdbRecord() { + if (ndbRecord != null) { + ndbDictionary.releaseRecord(ndbRecord); + ndbRecord = null; + } + } + +} === added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java 2012-01-23 00:44:39 +0000 @@ -0,0 +1,318 @@ +/* + Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + */ + +package com.mysql.clusterj.tie; + +import java.math.BigDecimal; +import java.math.BigInteger; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import java.util.ArrayList; +import java.util.List; + +import com.mysql.clusterj.ClusterJFatalInternalException; + +import com.mysql.clusterj.core.store.Blob; +import com.mysql.clusterj.core.store.Column; +import com.mysql.clusterj.core.store.Operation; +import com.mysql.clusterj.core.store.ResultData; +import com.mysql.clusterj.core.store.Table; +import com.mysql.clusterj.core.util.I18NHelper; +import com.mysql.clusterj.core.util.Logger; +import com.mysql.clusterj.core.util.LoggerFactoryService; + +import com.mysql.clusterj.tie.DbImpl.BufferManager; + +import com.mysql.ndbjtie.ndbapi.NdbBlob; +import com.mysql.ndbjtie.ndbapi.NdbOperationConst; +import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary; + +/** + * Implementation of store operation that uses NdbRecord. + */ +class NdbRecordOperationImpl implements Operation { + + /** My message translator */ + static final I18NHelper local = I18NHelper + .getInstance(NdbRecordOperationImpl.class); + + /** My logger */ + static final Logger logger = LoggerFactoryService.getFactory() + .getInstance(NdbRecordOperationImpl.class); + + /** The ClusterTransaction that this operation belongs to */ + protected ClusterTransactionImpl clusterTransaction; + + /** The NdbOperation wrapped by this object */ + private NdbOperationConst ndbOperation = null; + + /** The NdbRecord for this operation */ + private NdbRecordImpl ndbRecordImpl = null; + + /** The mask for this operation, which contains a bit set for each column to be inserted */ + byte[] mask; + + /** The ByteBuffer containing all of the data */ + ByteBuffer buffer = null; + + /** Blobs for this NdbRecord */ + protected NdbRecordBlobImpl[] blobs = null; + + /** Blobs that have been accessed for this operation */ + protected List activeBlobs = new ArrayList(); + + /** The size of the receive buffer for this operation (may be zero for non-read operations) */ + protected int bufferSize; + + /** The number of columns for this operation */ + protected int numberOfColumns; + + protected BufferManager bufferManager; + + /** Constructor used for insert and delete operations that do not need to read data. + * + * @param clusterTransaction the cluster transaction + * @param transaction the ndb transaction + * @param storeTable the store table + */ + public NdbRecordOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) { + this.ndbRecordImpl = clusterTransaction.getCachedNdbRecordImpl(storeTable); + this.bufferSize = ndbRecordImpl.getBufferSize(); + this.numberOfColumns = ndbRecordImpl.getNumberOfColumns(); + this.blobs = new NdbRecordBlobImpl[this.numberOfColumns]; + this.clusterTransaction = clusterTransaction; + this.bufferManager = clusterTransaction.getBufferManager(); + } + + public void equalBigInteger(Column storeColumn, BigInteger value) { + setBigInteger(storeColumn, value); + } + + public void equalBoolean(Column storeColumn, boolean booleanValue) { + setBoolean(storeColumn, booleanValue); + } + + public void equalByte(Column storeColumn, byte value) { + setByte(storeColumn, value); + } + + public void equalBytes(Column storeColumn, byte[] value) { + setBytes(storeColumn, value); + } + + public void equalDecimal(Column storeColumn, BigDecimal value) { + setDecimal(storeColumn, value); + } + + public void equalDouble(Column storeColumn, double value) { + setDouble(storeColumn, value); + } + + public void equalFloat(Column storeColumn, float value) { + setFloat(storeColumn, value); + } + + public void equalInt(Column storeColumn, int value) { + setInt(storeColumn, value); + } + + public void equalShort(Column storeColumn, short value) { + setShort(storeColumn, value); + } + + public void equalLong(Column storeColumn, long value) { + setLong(storeColumn, value); + } + + public void equalString(Column storeColumn, String value) { + setString(storeColumn, value); + } + + public void getBlob(Column storeColumn) { + throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "getBlob")); + } + + /** + * Get the blob handle for this column. The same column will return the same blob handle + * regardless of how many times it is called. + * @param storeColumn the store column + * @return the blob handle + */ + public Blob getBlobHandle(Column storeColumn) { + int columnId = storeColumn.getColumnId(); + NdbRecordBlobImpl result = blobs[columnId]; + if (result == null) { + columnSet(columnId); + result = new NdbRecordBlobImpl(this, storeColumn); + blobs[columnId] = result; + activeBlobs.add(result); + } + return result; + } + + /** Specify the columns to be used for the operation. + */ + public void getValue(Column storeColumn) { + throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "getValue")); + } + + public void postExecuteCallback(Runnable callback) { + clusterTransaction.postExecuteCallback(callback); + } + + /** Construct a new ResultData using the saved column data and then execute the operation. + */ + public ResultData resultData() { + throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "resultData")); + } + + /** Construct a new ResultData and if requested, execute the operation. + */ + public ResultData resultData(boolean execute) { + throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "resultData")); + } + + public void setBigInteger(Column storeColumn, BigInteger value) { + int columnId = ndbRecordImpl.setBigInteger(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setBoolean(Column storeColumn, Boolean booleanValue) { + byte value = (booleanValue?(byte)0x01:(byte)0x00); + setByte(storeColumn, value); + } + + public void setByte(Column storeColumn, byte value) { + int columnId = ndbRecordImpl.setByte(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setBytes(Column storeColumn, byte[] value) { + int columnId = ndbRecordImpl.setBytes(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setDecimal(Column storeColumn, BigDecimal value) { + int columnId = ndbRecordImpl.setDecimal(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setDouble(Column storeColumn, Double value) { + int columnId = ndbRecordImpl.setDouble(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setFloat(Column storeColumn, Float value) { + int columnId = ndbRecordImpl.setFloat(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setInt(Column storeColumn, Integer value) { + int columnId = ndbRecordImpl.setInt(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setLong(Column storeColumn, long value) { + int columnId = ndbRecordImpl.setLong(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setNull(Column storeColumn) { + int columnId = ndbRecordImpl.setNull(buffer, storeColumn); + columnSet(columnId); + } + + public void setShort(Column storeColumn, Short value) { + int columnId = ndbRecordImpl.setShort(buffer, storeColumn, value); + columnSet(columnId); + } + + public void setString(Column storeColumn, String value) { + int columnId = ndbRecordImpl.setString(buffer, bufferManager, storeColumn, value); + columnSet(columnId); + } + + public int errorCode() { + return ndbOperation.getNdbError().code(); + } + + protected static void handleError(int returnCode, NdbOperationConst ndbOperation2) { + if (returnCode == 0) { + return; + } else { + Utility.throwError(returnCode, ndbOperation2.getNdbError()); + } + } + + protected static void handleError(Object object, NdbOperationConst ndbOperation) { + if (object != null) { + return; + } else { + Utility.throwError(null, ndbOperation.getNdbError()); + } + } + + protected static void handleError(Object object, Dictionary ndbDictionary) { + if (object != null) { + return; + } else { + Utility.throwError(null, ndbDictionary.getNdbError()); + } + } + + public void beginDefinition() { + // allocate a buffer for the operation data + buffer = ByteBuffer.allocateDirect(bufferSize); + // use platform's native byte ordering + buffer.order(ByteOrder.nativeOrder()); + mask = new byte[1 + (numberOfColumns/8)]; + } + + public void endDefinition() { + // create the insert operation + buffer.position(0); + buffer.limit(bufferSize); + // create the insert operation + ndbOperation = clusterTransaction.insertTuple(ndbRecordImpl.getNdbRecord(), buffer, mask, null); + // now set the NdbBlob into the blobs + for (NdbRecordBlobImpl blob: activeBlobs) { + if (blob != null) { + blob.setNdbBlob(); + } + } + } + + public NdbBlob getNdbBlob(Column storeColumn) { + NdbBlob result = ndbOperation.getBlobHandle(storeColumn.getColumnId()); + handleError(result, ndbOperation); + return result; + } + + /** + * Set this column into the mask for NdbRecord operation. + * @param columnId the column id + */ + private void columnSet(int columnId) { + int byteOffset = columnId / 8; + int bitInByte = columnId - (byteOffset * 8); + mask[byteOffset] |= NdbRecordImpl.BIT_IN_BYTE_MASK[bitInByte]; + + } + +} === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java 2011-02-03 14:37:50 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved. + Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -263,7 +263,7 @@ class OperationImpl implements Operation public void setLong(Column storeColumn, long value) { long storeValue = Utility.convertLongValueForStorage(storeColumn, value); - int returnCode = ndbOperation.setValue(storeColumn.getName(), storeValue); + int returnCode = ndbOperation.setValue(storeColumn.getColumnId(), storeValue); handleError(returnCode, ndbOperation); } @@ -305,4 +305,12 @@ class OperationImpl implements Operation } } + public void beginDefinition() { + // nothing to do + } + + public void endDefinition() { + // nothing to do + } + } === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java' --- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java 2011-07-05 06:00:46 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java 2012-01-23 00:44:39 +0000 @@ -1,5 +1,5 @@ /* - * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by @@ -792,9 +792,22 @@ public class Utility { * @return the ByteBuffer */ public static ByteBuffer convertValue(Column storeColumn, byte[] value) { + int requiredLength = storeColumn.getColumnSpace(); + ByteBuffer result = ByteBuffer.allocateDirect(requiredLength); + convertValue(result, storeColumn, value); + result.flip(); + return result; + } + + /** Convert the parameter value and store it in a given ByteBuffer that can be passed to ndbjtie. + * + * @param buffer the buffer, positioned at the location to store the value + * @param storeColumn the column definition + * @param value the value to be converted + */ + public static void convertValue(ByteBuffer buffer, Column storeColumn, byte[] value) { int dataLength = value.length; int prefixLength = storeColumn.getPrefixLength(); - ByteBuffer result; switch (prefixLength) { case 0: int requiredLength = storeColumn.getColumnSpace(); @@ -803,12 +816,10 @@ public class Utility { local.message("ERR_Data_Too_Long", storeColumn.getName(), requiredLength, dataLength)); } else { - result = ByteBuffer.allocateDirect(requiredLength); - result.order(ByteOrder.nativeOrder()); - result.put(value); + buffer.put(value); if (dataLength < requiredLength) { // pad with 0x00 on right - result.put(ZERO_PAD, 0, requiredLength - dataLength); + buffer.put(ZERO_PAD, 0, requiredLength - dataLength); } } break; @@ -818,10 +829,8 @@ public class Utility { local.message("ERR_Data_Too_Long", storeColumn.getName(), "255", dataLength)); } - result = ByteBuffer.allocateDirect(prefixLength + dataLength); - result.order(ByteOrder.nativeOrder()); - result.put((byte)dataLength); - result.put(value); + buffer.put((byte)dataLength); + buffer.put(value); break; case 2: if (dataLength > 8000) { @@ -829,19 +838,15 @@ public class Utility { local.message("ERR_Data_Too_Long", storeColumn.getName(), "8000", dataLength)); } - result = ByteBuffer.allocateDirect(prefixLength + dataLength); - result.order(ByteOrder.nativeOrder()); - result.put((byte)(dataLength%256)); - result.put((byte)(dataLength/256)); - result.put(value); + buffer.put((byte)(dataLength%256)); + buffer.put((byte)(dataLength/256)); + buffer.put(value); break; default: throw new ClusterJFatalInternalException( local.message("ERR_Unknown_Prefix_Length", prefixLength, storeColumn.getName())); } - result.flip(); - return result; } /** Convert a BigDecimal value to the binary decimal form used by MySQL. === modified file 'storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties' --- a/storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties 2011-06-29 18:02:39 +0000 +++ b/storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties 2012-01-23 00:44:39 +0000 @@ -28,6 +28,7 @@ ERR_Cannot_Create_Cluster_Connection:Can ERR_Duplicate_NdbRecAttr_In_List:Duplicate NdbRecAttr for column {0} column id {1}. ERR_No_Operation_In_Result:There is no ndbOperation in the ResultData. ERR_Not_Implemented:Not implemented +ERR_Method_Not_Implemented:Not implemented: {0}. ERR_NdbJTie:Error in NdbJTie: returnCode {0}, code {1}, mysqlCode {2}, \ status {3}, classification {4}, message {5} {6}. ERR_Invalid_Prefix_Length:The prefix length {0} is invalid. No bundle (reason: useless for push emails).