4429 Craig L Russell 2012-01-22
Add NdbRecord support for clusterj.
This implementation supports only insert.
The original implementation using NdbOperation is still available by setting the variable
com.mysql.clusterj.UseNdbRecord to a value other than "true" via -D option or environment.
added:
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
modified:
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java
storage/ndb/clusterj/clusterj-tie/Makefile.am
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties
4428 Craig L Russell 2012-01-20
clean up after test
modified:
mysql-test/suite/ndb/t/clusterj.test
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2011-11-22 22:01:23 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -392,8 +392,10 @@ public class SessionImpl implements Sess
storeTable = domainTypeHandler.getStoreTable();
op = clusterTransaction.getInsertOperation(storeTable);
// set all values in the operation, keys first
+ op.beginDefinition();
domainTypeHandler.operationSetKeys(valueHandler, op);
domainTypeHandler.operationSetModifiedNonPKValues(valueHandler, op);
+ op.endDefinition();
// reset modified bits in instance
domainTypeHandler.objectResetModified(valueHandler);
} catch (ClusterJUserException cjuex) {
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java 2011-02-03 14:37:50 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/store/Operation.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -85,4 +85,8 @@ public interface Operation {
public void setString(Column storeColumn, String string);
+ public void beginDefinition();
+
+ public void endDefinition();
+
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/Makefile.am'
--- a/storage/ndb/clusterj/clusterj-tie/Makefile.am 2011-12-14 19:26:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/Makefile.am 2012-01-23 00:44:39 +0000
@@ -1,4 +1,4 @@
-# Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+# Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -38,6 +38,9 @@ clusterj_tie_java= \
$(clusterj_tie_src)/com/mysql/clusterj/tie/IndexImpl.java \
$(clusterj_tie_src)/com/mysql/clusterj/tie/IndexOperationImpl.java \
$(clusterj_tie_src)/com/mysql/clusterj/tie/IndexScanOperationImpl.java \
+ $(clusterj_tie_src)/com/mysql/clusterj/tie/NdbRecordBlobImpl.java \
+ $(clusterj_tie_src)/com/mysql/clusterj/tie/NdbRecordImpl.java \
+ $(clusterj_tie_src)/com/mysql/clusterj/tie/NdbRecordOperationImpl.java \
$(clusterj_tie_src)/com/mysql/clusterj/tie/OperationImpl.java \
$(clusterj_tie_src)/com/mysql/clusterj/tie/ResultDataImpl.java \
$(clusterj_tie_src)/com/mysql/clusterj/tie/ScanFilterImpl.java \
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java 2011-06-30 16:04:23 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/BlobImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -42,7 +42,11 @@ class BlobImpl implements Blob {
static final Logger logger = LoggerFactoryService.getFactory()
.getInstance(BlobImpl.class);
- private NdbBlob ndbBlob;
+ protected NdbBlob ndbBlob;
+
+ public BlobImpl() {
+ // this is only for NdbRecordBlobImpl constructor when there is no ndbBlob available yet
+ }
public BlobImpl(NdbBlob blob) {
this.ndbBlob = blob;
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java 2011-12-18 11:47:59 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -20,13 +20,18 @@ package com.mysql.clusterj.tie;
import java.util.IdentityHashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
import com.mysql.ndbjtie.ndbapi.Ndb;
import com.mysql.ndbjtie.ndbapi.Ndb_cluster_connection;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
import com.mysql.clusterj.ClusterJDatastoreException;
import com.mysql.clusterj.ClusterJFatalInternalException;
import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.Table;
import com.mysql.clusterj.core.util.I18NHelper;
import com.mysql.clusterj.core.util.Logger;
@@ -57,6 +62,12 @@ public class ClusterConnectionImpl
/** All dbs given out by this cluster connection */
private Map<DbImpl, Object> dbs = new IdentityHashMap<DbImpl, Object>();
+ /** The map of table name to NdbRecordImpl */
+ private ConcurrentMap<String, NdbRecordImpl> ndbRecordImplMap = new ConcurrentHashMap<String, NdbRecordImpl>();
+
+ /** The dictionary used to create NdbRecords */
+ Dictionary dictionaryForNdbRecord = null;
+
/** Connect to the MySQL Cluster
*
* @param connectString the connect string
@@ -83,6 +94,14 @@ public class ClusterConnectionImpl
synchronized(this) {
ndb = Ndb.create(clusterConnection, database, "def");
handleError(ndb, clusterConnection, connectString, nodeId);
+ if (dictionaryForNdbRecord == null) {
+ // create a dictionary for NdbRecord
+ Ndb ndbForNdbRecord = Ndb.create(clusterConnection, database, "def");
+ handleError(ndbForNdbRecord, clusterConnection, connectString, nodeId);
+ DbImpl dbForNdbRecord = new DbImpl(this, ndbForNdbRecord, maxTransactions);
+ dbs.put(dbForNdbRecord, null);
+ dictionaryForNdbRecord = dbForNdbRecord.getNdbDictionary();
+ }
}
DbImpl result = new DbImpl(this, ndb, maxTransactions);
dbs.put(result, null);
@@ -143,6 +162,16 @@ public class ClusterConnectionImpl
public void close() {
if (clusterConnection != null) {
logger.info(local.message("INFO_Close_Cluster_Connection", connectString, nodeId));
+ for (NdbRecordImpl ndbRecord: ndbRecordImplMap.values()) {
+ ndbRecord.releaseNdbRecord();
+ }
+ ndbRecordImplMap.clear();
+ if (dbs.size() != 0) {
+ Map<DbImpl, Object> dbsToClose = new IdentityHashMap<DbImpl, Object>(dbs);
+ for (Db db: dbsToClose.keySet()) {
+ db.close();
+ }
+ }
Ndb_cluster_connection.delete(clusterConnection);
clusterConnection = null;
}
@@ -153,7 +182,43 @@ public class ClusterConnectionImpl
}
public int dbCount() {
- return dbs.size();
+ // one of the dbs is for the NdbRecord dictionary if it is not null
+ int dbForNdbRecord = (dictionaryForNdbRecord == null)?0:1;
+ return dbs.size() - dbForNdbRecord;
+ }
+
+ /**
+ * Get the cached NdbRecord implementation for this cluster connection.
+ * There are three possibilities:
+ * <ul><li>Case 1: return the already-cached NdbRecord
+ * </li><li>Case 2: return a new instance created by this method
+ * </li><li>Case 3: return the winner of a race with another thread
+ * </li></ul>
+ * @param storeTable the store table
+ * @param ndbDictionary the ndb dictionary
+ * @return the NdbRecordImpl
+ */
+ protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
+ String tableName = storeTable.getName();
+ // find the NdbRecordImpl in the global cache
+ NdbRecordImpl result = ndbRecordImplMap.get(tableName);
+ if (result != null) {
+ // case 1
+ return result;
+ } else {
+ NdbRecordImpl newNdbRecordImpl = new NdbRecordImpl(storeTable, dictionaryForNdbRecord);
+ NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(tableName, newNdbRecordImpl);
+ if (winner == null) {
+ // case 2: the previous value was null, so return the new (winning) value
+ if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + tableName);
+ return newNdbRecordImpl;
+ } else {
+ // case 3: another thread beat us, so return the winner and garbage collect ours
+ if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + tableName);
+ newNdbRecordImpl.releaseNdbRecord();
+ return winner;
+ }
+ }
}
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java 2011-12-18 11:47:59 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionServiceImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -22,6 +22,10 @@ import com.mysql.clusterj.core.store.Clu
import com.mysql.clusterj.core.util.I18NHelper;
import com.mysql.clusterj.core.util.Logger;
import com.mysql.clusterj.core.util.LoggerFactoryService;
+import com.mysql.ndbjtie.ndbapi.Ndb;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary;
+import com.mysql.ndbjtie.ndbapi.NdbOperation;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
/**
*
@@ -43,6 +47,18 @@ public class ClusterConnectionServiceImp
/** Load the ndbclient system library only once */
static boolean ndbclientLoaded = false;
+ /** Size of OperationOptions, needed for some ndb apis */
+ static int SIZEOF_OPERATION_OPTIONS;
+
+ /** Size of PartitionSpec, needed for some ndb apis */
+ static int SIZEOF_PARTITION_SPEC;
+
+ /** Size of RecordSpecification, needed for some ndb apis */
+ static int SIZEOF_RECORD_SPECIFICATION;
+
+ /** Size of ScanOptions, needed for some ndb apis */
+ static int SIZEOF_SCAN_OPTIONS;
+
static protected void loadSystemLibrary(String name) {
// this is not thread-protected so it might be executed multiple times but no matter
if (ndbclientLoaded) {
@@ -52,6 +68,11 @@ public class ClusterConnectionServiceImp
System.loadLibrary(name);
// initialize the charset map
Utility.getCharsetMap();
+ // get the size information for Ndb structs as needed by some ndb apis
+ SIZEOF_OPERATION_OPTIONS = NdbOperation.OperationOptions.size();
+ SIZEOF_PARTITION_SPEC = Ndb.PartitionSpec.size();
+ SIZEOF_RECORD_SPECIFICATION = NdbDictionary.RecordSpecification.size();
+ SIZEOF_SCAN_OPTIONS = NdbScanOperation.ScanOptions.size();
ndbclientLoaded = true;
} catch (Throwable e) {
String path = getLoadLibraryPath();
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2011-12-05 22:07:02 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -17,6 +17,7 @@
package com.mysql.clusterj.tie;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -42,11 +43,14 @@ import com.mysql.ndbjtie.ndbapi.NdbError
import com.mysql.ndbjtie.ndbapi.NdbIndexOperation;
import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
import com.mysql.ndbjtie.ndbapi.NdbOperation;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
+import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
import com.mysql.ndbjtie.ndbapi.NdbTransaction;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
+import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst;
import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
@@ -63,9 +67,15 @@ class ClusterTransactionImpl implements
static final Logger logger = LoggerFactoryService.getFactory()
.getInstance(ClusterTransactionImpl.class);
+ protected final static String USE_NDBRECORD_NAME = "com.mysql.clusterj.UseNdbRecord";
+ private static boolean USE_NDBRECORD = getUseNdbRecord();
+
protected NdbTransaction ndbTransaction;
private List<Runnable> postExecuteCallbacks = new ArrayList<Runnable>();
+ /** The cluster connection for this transaction */
+ protected ClusterConnectionImpl clusterConnectionImpl;
+
/** The DbImpl associated with this NdbTransaction */
protected DbImpl db;
@@ -104,8 +114,10 @@ class ClusterTransactionImpl implements
private BufferManager bufferManager;
- public ClusterTransactionImpl(DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
+ public ClusterTransactionImpl(ClusterConnectionImpl clusterConnectionImpl,
+ DbImpl db, Dictionary ndbDictionary, String joinTransactionId) {
this.db = db;
+ this.clusterConnectionImpl = clusterConnectionImpl;
this.ndbDictionary = ndbDictionary;
this.joinTransactionId = joinTransactionId;
this.bufferManager = db.getBufferManager();
@@ -209,13 +221,16 @@ class ClusterTransactionImpl implements
public Operation getInsertOperation(Table storeTable) {
enlist();
+ if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
+ if (USE_NDBRECORD) {
+ return new NdbRecordOperationImpl(this, storeTable);
+ }
TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
handleError(ndbTable, ndbDictionary);
NdbOperation ndbOperation = ndbTransaction.getNdbOperation(ndbTable);
handleError(ndbOperation, ndbTransaction);
int returnCode = ndbOperation.insertTuple();
handleError(returnCode, ndbTransaction);
- if (logger.isTraceEnabled()) logger.trace("Table: " + storeTable.getName());
return new OperationImpl(ndbOperation, this);
}
@@ -373,6 +388,22 @@ class ClusterTransactionImpl implements
return new OperationImpl(storeTable, ndbOperation, this);
}
+ /** Create an NdbOperation for insert using NdbRecord.
+ *
+ * @param ndbRecord the NdbRecord
+ * @param buffer the buffer with data for the operation
+ * @param mask the mask of column values already set in the buffer
+ * @param options the OperationOptions for this operation
+ * @param i
+ * @return
+ */
+ public NdbOperationConst insertTuple(NdbRecordConst ndbRecord,
+ ByteBuffer buffer, byte[] mask, OperationOptionsConst options) {
+ NdbOperationConst operation = ndbTransaction.insertTuple(ndbRecord, buffer, mask, options, 0);
+ handleError(operation, ndbTransaction);
+ return operation;
+ }
+
public void postExecuteCallback(Runnable callback) {
postExecuteCallbacks.add(callback);
}
@@ -520,4 +551,21 @@ class ClusterTransactionImpl implements
return bufferManager;
}
+ protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
+ return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable);
+ }
+
+ /** Get the UseNdbRecord property from either the environment or system properties.
+ *
+ * @return the system property if it is set via -D or the system environment
+ */
+ protected static boolean getUseNdbRecord() {
+ String useNdbRecordENV = System.getenv(USE_NDBRECORD_NAME);
+ // system property overrides environment variable
+ boolean result = System.getProperty(USE_NDBRECORD_NAME, useNdbRecordENV==null?"true":useNdbRecordENV)
+ .equals("true")?true:false;
+ logger.info("useNdbRecordENV: " + useNdbRecordENV + " UseNdbRecord: " + result);
+ return result;
+ }
+
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java 2011-03-23 22:41:01 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ColumnImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -209,7 +209,7 @@ class ColumnImpl implements Column {
break;
case ColumnConst.Type.Timestamp:
this.prefixLength = 0;
- this.columnSpace = 4;
+ this.columnSpace = 0;
break;
default: throw new ClusterJFatalInternalException(
local.message("ERR_Unknown_Column_Type",
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2011-12-05 22:07:02 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -32,7 +32,6 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
import com.mysql.clusterj.ClusterJDatastoreException;
import com.mysql.clusterj.ClusterJFatalInternalException;
-import com.mysql.clusterj.core.store.ClusterConnection;
import com.mysql.clusterj.core.store.ClusterTransaction;
import com.mysql.clusterj.core.util.I18NHelper;
@@ -82,9 +81,9 @@ class DbImpl implements com.mysql.cluste
private DictionaryImpl dictionary;
/** The ClusterConnection */
- private ClusterConnection clusterConnection;
+ private ClusterConnectionImpl clusterConnection;
- public DbImpl(ClusterConnection clusterConnection, Ndb ndb, int maxTransactions) {
+ public DbImpl(ClusterConnectionImpl clusterConnection, Ndb ndb, int maxTransactions) {
this.clusterConnection = clusterConnection;
this.ndb = ndb;
int returnCode = ndb.init(maxTransactions);
@@ -103,8 +102,12 @@ class DbImpl implements com.mysql.cluste
return dictionary;
}
+ public Dictionary getNdbDictionary() {
+ return ndbDictionary;
+ }
+
public ClusterTransaction startTransaction(String joinTransactionId) {
- return new ClusterTransactionImpl(this, ndbDictionary, joinTransactionId);
+ return new ClusterTransactionImpl(clusterConnection, this, ndbDictionary, joinTransactionId);
}
protected void handleError(int returnCode, Ndb ndb) {
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java 2011-10-27 23:43:25 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DictionaryImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -127,4 +127,8 @@ class DictionaryImpl implements com.mysq
ndbDictionary.removeCachedTable(tableName);
}
+ public Dictionary getNdbDictionary() {
+ return ndbDictionary;
+ }
+
}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordBlobImpl.java 2012-01-23 00:44:39 +0000
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.Column;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+/**
+ * NdbRecord blob handling defers the acquisition of an NdbBlob until the NdbOperation
+ * is created. At that time, this implementation will get the NdbBlob from its NdbOperation.
+ * Operations on the NdbBlob are delegated to the parent (by inheritance).
+ */
+class NdbRecordBlobImpl extends BlobImpl {
+
+ /** My message translator */
+ static final I18NHelper local = I18NHelper
+ .getInstance(NdbRecordBlobImpl.class);
+
+ /** My logger */
+ static final Logger logger = LoggerFactoryService.getFactory()
+ .getInstance(NdbRecordBlobImpl.class);
+
+ /** The store column for this blob */
+ private Column storeColumn;
+
+ /** The operation */
+ private NdbRecordOperationImpl operation;
+
+ public NdbRecordBlobImpl(NdbRecordOperationImpl operation, Column storeColumn) {
+ this.storeColumn = storeColumn;
+ this.operation = operation;
+ }
+
+ protected void setNdbBlob() {
+ this.ndbBlob = operation.getNdbBlob(storeColumn);
+ }
+
+}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java 2012-01-23 00:44:39 +0000
@@ -0,0 +1,375 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import java.nio.ByteBuffer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJFatalInternalException;
+import com.mysql.clusterj.ClusterJFatalUserException;
+
+import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Table;
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.clusterj.tie.DbImpl.BufferManager;
+
+import com.mysql.ndbjtie.ndbapi.NdbRecord;
+import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecification;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecificationArray;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
+
+/**
+ * Wrapper around an NdbRecord. The default implementation can be used for insert,
+ * using an NdbRecord that defines every column in the table. After construction, the instance is
+ * read-only and can be shared among all threads that use the same cluster connection; and the size of the
+ * buffer required for operations is available. The NdbRecord instance is released when the cluster
+ * connection is closed. Column values can be set using a provided
+ * buffer and buffer manager.
+ */
+public class NdbRecordImpl {
+
+ /** My message translator */
+ static final I18NHelper local = I18NHelper
+ .getInstance(NdbRecordImpl.class);
+
+ /** My logger */
+ static final Logger logger = LoggerFactoryService.getFactory()
+ .getInstance(NdbRecordImpl.class);
+
+ /** The size of the NdbRecord struct */
+ protected final static int SIZEOF_RECORD_SPECIFICATION = ClusterConnectionServiceImpl.SIZEOF_RECORD_SPECIFICATION;
+
+ /** The NdbRecord for this operation */
+ private NdbRecord ndbRecord = null;
+
+ /** The store columns for this operation */
+ protected List<Column> storeColumns = new ArrayList<Column>();
+
+ /** The RecordSpecificationArray used to define the columns in the NdbRecord */
+ private RecordSpecificationArray recordSpecificationArray;
+
+ /** The NdbTable */
+ TableConst tableConst;
+
+ /** The size of the receive buffer for this operation (may be zero for non-read operations) */
+ protected int bufferSize;
+
+ /** The maximum column id for this operation (may be zero for non-read operations) */
+ protected int maximumColumnId;
+
+ /** The offsets into the buffer for each column (may be null for non-read operations) */
+ protected int[] offsets;
+
+ /** Values for setting column mask and null bit mask */
+ protected final static byte[] BIT_IN_BYTE_MASK = new byte[] {1, 2, 4, 8, 16, 32, 64, -128};
+
+ /** The position in the null indicator for the field */
+ protected int nullablePositions[] = null;
+
+ /** The null indicator for the field bit in the byte */
+ protected int nullbitBitInByte[] = null;
+
+ /** The null indicator for the field byte offset*/
+ protected int nullbitByteOffset[] = null;
+
+ /** The maximum length of any column in this operation */
+ protected int maximumColumnLength;
+
+ /** The dictionary used to create (and release) the NdbRecord */
+ private Dictionary ndbDictionary;
+
+ /** Number of columns for this NdbRecord */
+ private int numberOfColumns;
+
+ /** These fields are only used during construction of the RecordSpecificationArray */
+ int offset = 0;
+ int nullablePosition = 0;
+
+ /** Constructor used for insert operations that do not need to read data.
+ *
+ * @param storeTable the store table
+ * @param ndbDictionary the ndb dictionary
+ */
+ protected NdbRecordImpl(Table storeTable, Dictionary ndbDictionary) {
+ this.ndbDictionary = ndbDictionary;
+ this.tableConst = getNdbTable(storeTable.getName());
+ this.numberOfColumns = tableConst.getNoOfColumns();
+ this.recordSpecificationArray = RecordSpecificationArray.create(numberOfColumns);
+ this.offsets = new int[numberOfColumns];
+ this.nullablePositions = new int[numberOfColumns];
+ this.nullbitBitInByte = new int[numberOfColumns];
+ this.nullbitByteOffset = new int[numberOfColumns];
+ this.ndbRecord = createNdbRecord(storeTable, ndbDictionary);
+ }
+
+
+ public int setBigInteger(ByteBuffer buffer, Column storeColumn, BigInteger value) {
+ int columnId = storeColumn.getColumnId();
+ int newPosition = offsets[columnId];
+ buffer.position(newPosition);
+ ByteBuffer bigIntegerBuffer = Utility.convertValue(storeColumn, value);
+ buffer.put(bigIntegerBuffer);
+ return columnId;
+ }
+
+ public int setByte(ByteBuffer buffer, Column storeColumn, byte value) {
+ int columnId = storeColumn.getColumnId();
+ buffer.put(offsets[columnId], value);
+ return columnId;
+ }
+
+ public int setBytes(ByteBuffer buffer, Column storeColumn, byte[] value) {
+ int columnId = storeColumn.getColumnId();
+ int newPosition = offsets[columnId];
+ buffer.position(newPosition);
+ Utility.convertValue(buffer, storeColumn, value);
+ return columnId;
+ }
+
+ public int setDecimal(ByteBuffer buffer, Column storeColumn, BigDecimal value) {
+ int columnId = storeColumn.getColumnId();
+ int newPosition = offsets[columnId];
+ buffer.position(newPosition);
+ ByteBuffer decimalBuffer = Utility.convertValue(storeColumn, value);
+ buffer.put(decimalBuffer);
+ return columnId;
+ }
+
+ public int setDouble(ByteBuffer buffer, Column storeColumn, Double value) {
+ int columnId = storeColumn.getColumnId();
+ buffer.putDouble(offsets[columnId], value);
+ return columnId;
+ }
+
+ public int setFloat(ByteBuffer buffer, Column storeColumn, Float value) {
+ int columnId = storeColumn.getColumnId();
+ buffer.putFloat(offsets[columnId], value);
+ return columnId;
+ }
+
+ public int setInt(ByteBuffer buffer, Column storeColumn, Integer value) {
+ int columnId = storeColumn.getColumnId();
+ buffer.putInt(offsets[columnId], value);
+ return columnId;
+ }
+
+ public int setLong(ByteBuffer buffer, Column storeColumn, long value) {
+ int columnId = storeColumn.getColumnId();
+ long storeValue = Utility.convertLongValueForStorage(storeColumn, value);
+ buffer.putLong(offsets[columnId], storeValue);
+ return columnId;
+ }
+
+ public int setNull(ByteBuffer buffer, Column storeColumn) {
+ int columnId = storeColumn.getColumnId();
+ int index = nullbitByteOffset[columnId];
+ byte mask = BIT_IN_BYTE_MASK[nullbitBitInByte[columnId]];
+ byte nullbyte = buffer.get(index);
+ buffer.put(index, (byte)(nullbyte|mask));
+ return columnId;
+ }
+
+ public int setShort(ByteBuffer buffer, Column storeColumn, Short value) {
+ int columnId = storeColumn.getColumnId();
+ buffer.putShort(offsets[columnId], value);
+ return columnId;
+ }
+
+ public int setString(ByteBuffer buffer, BufferManager bufferManager, Column storeColumn, String value) {
+ int columnId = storeColumn.getColumnId();
+ buffer.position(offsets[columnId]);
+ // for now, use the encode method to encode the value then copy it
+ ByteBuffer converted = Utility.encode(value, storeColumn, bufferManager);
+ buffer.put(converted);
+ return columnId;
+ }
+
+ protected static void handleError(Object object, Dictionary ndbDictionary) {
+ if (object != null) {
+ return;
+ } else {
+ Utility.throwError(null, ndbDictionary.getNdbError());
+ }
+ }
+
+ protected NdbRecord createNdbRecord(Table storeTable, Dictionary ndbDictionary) {
+ String[] columnNames = storeTable.getColumnNames();
+ List<Column> align8 = new ArrayList<Column>();
+ List<Column> align4 = new ArrayList<Column>();
+ List<Column> align2 = new ArrayList<Column>();
+ List<Column> align1 = new ArrayList<Column>();
+ List<Column> nullables = new ArrayList<Column>();
+ for (String columnName: columnNames) {
+ Column storeColumn = storeTable.getColumn(columnName);
+ storeColumns.add(storeColumn);
+ // for each column, put into alignment bucket
+ switch (storeColumn.getType()) {
+ case Bigint:
+ case Bigunsigned:
+ case Bit:
+ case Blob:
+ case Date:
+ case Datetime:
+ case Double:
+ case Text:
+ case Time:
+ case Timestamp:
+ align8.add(storeColumn);
+ break;
+ case Binary:
+ case Char:
+ case Decimal:
+ case Decimalunsigned:
+ case Longvarbinary:
+ case Longvarchar:
+ case Olddecimal:
+ case Olddecimalunsigned:
+ case Tinyint:
+ case Tinyunsigned:
+ case Varbinary:
+ case Varchar:
+ align1.add(storeColumn);
+ break;
+ case Float:
+ case Int:
+ case Mediumint:
+ case Mediumunsigned:
+ case Unsigned:
+ align4.add(storeColumn);
+ break;
+ case Smallint:
+ case Smallunsigned:
+ case Year:
+ align2.add(storeColumn);
+ break;
+ case Undefined:
+ throw new ClusterJFatalUserException(local.message("ERR_Unknown_Column_Type",
+ storeTable.getName(), columnName, storeColumn.getType()));
+ default:
+ throw new ClusterJFatalInternalException(local.message("ERR_Unknown_Column_Type",
+ storeTable.getName(), columnName, storeColumn.getType()));
+ }
+ if (storeColumn.getNullable()) {
+ nullables.add(storeColumn);
+ }
+ }
+ // for each column, allocate space in the buffer, starting with align8 and ending with align1
+ // null indicators are allocated first, with one bit per nullable column
+ // nullable columns take one bit each
+ offset = nullables.size() + 7 / 8;
+ // align the first column following the nullable column indicators to 8
+ offset = (7 + offset) / 8 * 8;
+ for (Column storeColumn: align8) {
+ handleColumn(8, storeColumn);
+ }
+ for (Column storeColumn: align4) {
+ handleColumn(4, storeColumn);
+ }
+ for (Column storeColumn: align2) {
+ handleColumn(2, storeColumn);
+ }
+ for (Column storeColumn: align1) {
+ handleColumn(1, storeColumn);
+ }
+ bufferSize = offset;
+
+ // now create an NdbRecord
+ NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
+ numberOfColumns, SIZEOF_RECORD_SPECIFICATION, 0);
+ // delete the RecordSpecificationArray since it is no longer needed
+ RecordSpecificationArray.delete(recordSpecificationArray);
+ handleError(result, ndbDictionary);
+ return result;
+ }
+
+ /** Create a record specification for a column. Keep track of the offset into the buffer
+ * and the null indicator position for each column.
+ *
+ * @param alignment the alignment for this column in the buffer
+ * @param storeColumn the column
+ */
+ private void handleColumn(int alignment, Column storeColumn) {
+ int columnId = storeColumn.getColumnId();
+ RecordSpecification recordSpecification = recordSpecificationArray.at(columnId);
+ ColumnConst columnConst = tableConst.getColumn(columnId);
+ recordSpecification.column(columnConst);
+ recordSpecification.offset(offset);
+ offsets[columnId] = offset;
+ int columnSpace = storeColumn.getColumnSpace();
+ offset += (columnSpace==0)?8:columnSpace;
+ if (storeColumn.getNullable()) {
+ nullablePositions[columnId] = nullablePosition++;
+ int nullbitByteOffsetValue = nullablePosition/8;
+ int nullbitBitInByteValue = nullablePosition - nullablePosition / 8 * 8;
+ nullbitBitInByte[columnId] = nullbitBitInByteValue;
+ nullbitByteOffset[columnId] = nullbitByteOffsetValue;
+ recordSpecification.nullbit_byte_offset(nullbitByteOffsetValue);
+ recordSpecification.nullbit_bit_in_byte(nullbitBitInByteValue);
+ } else {
+ recordSpecification.nullbit_byte_offset(0);
+ recordSpecification.nullbit_bit_in_byte(0);
+ }
+ if (logger.isDetailEnabled()) logger.detail(
+ "column: " + storeColumn.getName()
+ + " columnSpace: " + columnSpace
+ + " offset: " + offsets[columnId]
+ + " nullable position: " + nullablePositions[columnId]
+ + " nullbitByteOffset: " + nullbitByteOffset[columnId]
+ + " nullbitBitInByte: " + nullbitBitInByte[columnId]);
+ }
+
+ TableConst getNdbTable(String tableName) {
+ TableConst ndbTable = ndbDictionary.getTable(tableName);
+ if (ndbTable == null) {
+ // try the lower case table name
+ ndbTable = ndbDictionary.getTable(tableName.toLowerCase());
+ }
+ return ndbTable;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public NdbRecordConst getNdbRecord() {
+ return ndbRecord;
+ }
+
+ public int getNumberOfColumns() {
+ return numberOfColumns;
+ }
+
+ protected void releaseNdbRecord() {
+ if (ndbRecord != null) {
+ ndbDictionary.releaseRecord(ndbRecord);
+ ndbRecord = null;
+ }
+ }
+
+}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java 2012-01-23 00:44:39 +0000
@@ -0,0 +1,318 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJFatalInternalException;
+
+import com.mysql.clusterj.core.store.Blob;
+import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
+import com.mysql.clusterj.core.store.Table;
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.clusterj.tie.DbImpl.BufferManager;
+
+import com.mysql.ndbjtie.ndbapi.NdbBlob;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+
+/**
+ * Implementation of store operation that uses NdbRecord.
+ */
+class NdbRecordOperationImpl implements Operation {
+
+ /** My message translator */
+ static final I18NHelper local = I18NHelper
+ .getInstance(NdbRecordOperationImpl.class);
+
+ /** My logger */
+ static final Logger logger = LoggerFactoryService.getFactory()
+ .getInstance(NdbRecordOperationImpl.class);
+
+ /** The ClusterTransaction that this operation belongs to */
+ protected ClusterTransactionImpl clusterTransaction;
+
+ /** The NdbOperation wrapped by this object */
+ private NdbOperationConst ndbOperation = null;
+
+ /** The NdbRecord for this operation */
+ private NdbRecordImpl ndbRecordImpl = null;
+
+ /** The mask for this operation, which contains a bit set for each column to be inserted */
+ byte[] mask;
+
+ /** The ByteBuffer containing all of the data */
+ ByteBuffer buffer = null;
+
+ /** Blobs for this NdbRecord */
+ protected NdbRecordBlobImpl[] blobs = null;
+
+ /** Blobs that have been accessed for this operation */
+ protected List<NdbRecordBlobImpl> activeBlobs = new ArrayList<NdbRecordBlobImpl>();
+
+ /** The size of the receive buffer for this operation (may be zero for non-read operations) */
+ protected int bufferSize;
+
+ /** The number of columns for this operation */
+ protected int numberOfColumns;
+
+ protected BufferManager bufferManager;
+
+ /** Constructor used for insert and delete operations that do not need to read data.
+ *
+ * @param clusterTransaction the cluster transaction
+ * @param transaction the ndb transaction
+ * @param storeTable the store table
+ */
+ public NdbRecordOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
+ this.ndbRecordImpl = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+ this.bufferSize = ndbRecordImpl.getBufferSize();
+ this.numberOfColumns = ndbRecordImpl.getNumberOfColumns();
+ this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+ this.clusterTransaction = clusterTransaction;
+ this.bufferManager = clusterTransaction.getBufferManager();
+ }
+
+ public void equalBigInteger(Column storeColumn, BigInteger value) {
+ setBigInteger(storeColumn, value);
+ }
+
+ public void equalBoolean(Column storeColumn, boolean booleanValue) {
+ setBoolean(storeColumn, booleanValue);
+ }
+
+ public void equalByte(Column storeColumn, byte value) {
+ setByte(storeColumn, value);
+ }
+
+ public void equalBytes(Column storeColumn, byte[] value) {
+ setBytes(storeColumn, value);
+ }
+
+ public void equalDecimal(Column storeColumn, BigDecimal value) {
+ setDecimal(storeColumn, value);
+ }
+
+ public void equalDouble(Column storeColumn, double value) {
+ setDouble(storeColumn, value);
+ }
+
+ public void equalFloat(Column storeColumn, float value) {
+ setFloat(storeColumn, value);
+ }
+
+ public void equalInt(Column storeColumn, int value) {
+ setInt(storeColumn, value);
+ }
+
+ public void equalShort(Column storeColumn, short value) {
+ setShort(storeColumn, value);
+ }
+
+ public void equalLong(Column storeColumn, long value) {
+ setLong(storeColumn, value);
+ }
+
+ public void equalString(Column storeColumn, String value) {
+ setString(storeColumn, value);
+ }
+
+ public void getBlob(Column storeColumn) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "getBlob"));
+ }
+
+ /**
+ * Get the blob handle for this column. The same column will return the same blob handle
+ * regardless of how many times it is called.
+ * @param storeColumn the store column
+ * @return the blob handle
+ */
+ public Blob getBlobHandle(Column storeColumn) {
+ int columnId = storeColumn.getColumnId();
+ NdbRecordBlobImpl result = blobs[columnId];
+ if (result == null) {
+ columnSet(columnId);
+ result = new NdbRecordBlobImpl(this, storeColumn);
+ blobs[columnId] = result;
+ activeBlobs.add(result);
+ }
+ return result;
+ }
+
+ /** Specify the columns to be used for the operation.
+ */
+ public void getValue(Column storeColumn) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "getValue"));
+ }
+
+ public void postExecuteCallback(Runnable callback) {
+ clusterTransaction.postExecuteCallback(callback);
+ }
+
+ /** Construct a new ResultData using the saved column data and then execute the operation.
+ */
+ public ResultData resultData() {
+ throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "resultData"));
+ }
+
+ /** Construct a new ResultData and if requested, execute the operation.
+ */
+ public ResultData resultData(boolean execute) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented", "resultData"));
+ }
+
+ public void setBigInteger(Column storeColumn, BigInteger value) {
+ int columnId = ndbRecordImpl.setBigInteger(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setBoolean(Column storeColumn, Boolean booleanValue) {
+ byte value = (booleanValue?(byte)0x01:(byte)0x00);
+ setByte(storeColumn, value);
+ }
+
+ public void setByte(Column storeColumn, byte value) {
+ int columnId = ndbRecordImpl.setByte(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setBytes(Column storeColumn, byte[] value) {
+ int columnId = ndbRecordImpl.setBytes(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setDecimal(Column storeColumn, BigDecimal value) {
+ int columnId = ndbRecordImpl.setDecimal(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setDouble(Column storeColumn, Double value) {
+ int columnId = ndbRecordImpl.setDouble(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setFloat(Column storeColumn, Float value) {
+ int columnId = ndbRecordImpl.setFloat(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setInt(Column storeColumn, Integer value) {
+ int columnId = ndbRecordImpl.setInt(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setLong(Column storeColumn, long value) {
+ int columnId = ndbRecordImpl.setLong(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setNull(Column storeColumn) {
+ int columnId = ndbRecordImpl.setNull(buffer, storeColumn);
+ columnSet(columnId);
+ }
+
+ public void setShort(Column storeColumn, Short value) {
+ int columnId = ndbRecordImpl.setShort(buffer, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public void setString(Column storeColumn, String value) {
+ int columnId = ndbRecordImpl.setString(buffer, bufferManager, storeColumn, value);
+ columnSet(columnId);
+ }
+
+ public int errorCode() {
+ return ndbOperation.getNdbError().code();
+ }
+
+ protected static void handleError(int returnCode, NdbOperationConst ndbOperation2) {
+ if (returnCode == 0) {
+ return;
+ } else {
+ Utility.throwError(returnCode, ndbOperation2.getNdbError());
+ }
+ }
+
+ protected static void handleError(Object object, NdbOperationConst ndbOperation) {
+ if (object != null) {
+ return;
+ } else {
+ Utility.throwError(null, ndbOperation.getNdbError());
+ }
+ }
+
+ protected static void handleError(Object object, Dictionary ndbDictionary) {
+ if (object != null) {
+ return;
+ } else {
+ Utility.throwError(null, ndbDictionary.getNdbError());
+ }
+ }
+
+ public void beginDefinition() {
+ // allocate a buffer for the operation data
+ buffer = ByteBuffer.allocateDirect(bufferSize);
+ // use platform's native byte ordering
+ buffer.order(ByteOrder.nativeOrder());
+ mask = new byte[1 + (numberOfColumns/8)];
+ }
+
+ public void endDefinition() {
+ // create the insert operation
+ buffer.position(0);
+ buffer.limit(bufferSize);
+ // create the insert operation
+ ndbOperation = clusterTransaction.insertTuple(ndbRecordImpl.getNdbRecord(), buffer, mask, null);
+ // now set the NdbBlob into the blobs
+ for (NdbRecordBlobImpl blob: activeBlobs) {
+ if (blob != null) {
+ blob.setNdbBlob();
+ }
+ }
+ }
+
+ public NdbBlob getNdbBlob(Column storeColumn) {
+ NdbBlob result = ndbOperation.getBlobHandle(storeColumn.getColumnId());
+ handleError(result, ndbOperation);
+ return result;
+ }
+
+ /**
+ * Set this column into the mask for NdbRecord operation.
+ * @param columnId the column id
+ */
+ private void columnSet(int columnId) {
+ int byteOffset = columnId / 8;
+ int bitInByte = columnId - (byteOffset * 8);
+ mask[byteOffset] |= NdbRecordImpl.BIT_IN_BYTE_MASK[bitInByte];
+
+ }
+
+}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java 2011-02-03 14:37:50 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/OperationImpl.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2009, 2011, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2009, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -263,7 +263,7 @@ class OperationImpl implements Operation
public void setLong(Column storeColumn, long value) {
long storeValue = Utility.convertLongValueForStorage(storeColumn, value);
- int returnCode = ndbOperation.setValue(storeColumn.getName(), storeValue);
+ int returnCode = ndbOperation.setValue(storeColumn.getColumnId(), storeValue);
handleError(returnCode, ndbOperation);
}
@@ -305,4 +305,12 @@ class OperationImpl implements Operation
}
}
+ public void beginDefinition() {
+ // nothing to do
+ }
+
+ public void endDefinition() {
+ // nothing to do
+ }
+
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java 2011-07-05 06:00:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java 2012-01-23 00:44:39 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2010, 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -792,9 +792,22 @@ public class Utility {
* @return the ByteBuffer
*/
public static ByteBuffer convertValue(Column storeColumn, byte[] value) {
+ int requiredLength = storeColumn.getColumnSpace();
+ ByteBuffer result = ByteBuffer.allocateDirect(requiredLength);
+ convertValue(result, storeColumn, value);
+ result.flip();
+ return result;
+ }
+
+ /** Convert the parameter value and store it in a given ByteBuffer that can be passed to ndbjtie.
+ *
+ * @param buffer the buffer, positioned at the location to store the value
+ * @param storeColumn the column definition
+ * @param value the value to be converted
+ */
+ public static void convertValue(ByteBuffer buffer, Column storeColumn, byte[] value) {
int dataLength = value.length;
int prefixLength = storeColumn.getPrefixLength();
- ByteBuffer result;
switch (prefixLength) {
case 0:
int requiredLength = storeColumn.getColumnSpace();
@@ -803,12 +816,10 @@ public class Utility {
local.message("ERR_Data_Too_Long",
storeColumn.getName(), requiredLength, dataLength));
} else {
- result = ByteBuffer.allocateDirect(requiredLength);
- result.order(ByteOrder.nativeOrder());
- result.put(value);
+ buffer.put(value);
if (dataLength < requiredLength) {
// pad with 0x00 on right
- result.put(ZERO_PAD, 0, requiredLength - dataLength);
+ buffer.put(ZERO_PAD, 0, requiredLength - dataLength);
}
}
break;
@@ -818,10 +829,8 @@ public class Utility {
local.message("ERR_Data_Too_Long",
storeColumn.getName(), "255", dataLength));
}
- result = ByteBuffer.allocateDirect(prefixLength + dataLength);
- result.order(ByteOrder.nativeOrder());
- result.put((byte)dataLength);
- result.put(value);
+ buffer.put((byte)dataLength);
+ buffer.put(value);
break;
case 2:
if (dataLength > 8000) {
@@ -829,19 +838,15 @@ public class Utility {
local.message("ERR_Data_Too_Long",
storeColumn.getName(), "8000", dataLength));
}
- result = ByteBuffer.allocateDirect(prefixLength + dataLength);
- result.order(ByteOrder.nativeOrder());
- result.put((byte)(dataLength%256));
- result.put((byte)(dataLength/256));
- result.put(value);
+ buffer.put((byte)(dataLength%256));
+ buffer.put((byte)(dataLength/256));
+ buffer.put(value);
break;
default:
throw new ClusterJFatalInternalException(
local.message("ERR_Unknown_Prefix_Length",
prefixLength, storeColumn.getName()));
}
- result.flip();
- return result;
}
/** Convert a BigDecimal value to the binary decimal form used by MySQL.
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties 2011-06-29 18:02:39 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/resources/com/mysql/clusterj/tie/Bundle.properties 2012-01-23 00:44:39 +0000
@@ -28,6 +28,7 @@ ERR_Cannot_Create_Cluster_Connection:Can
ERR_Duplicate_NdbRecAttr_In_List:Duplicate NdbRecAttr for column {0} column id {1}.
ERR_No_Operation_In_Result:There is no ndbOperation in the ResultData.
ERR_Not_Implemented:Not implemented
+ERR_Method_Not_Implemented:Not implemented: {0}.
ERR_NdbJTie:Error in NdbJTie: returnCode {0}, code {1}, mysqlCode {2}, \
status {3}, classification {4}, message {5} {6}.
ERR_Invalid_Prefix_Length:The prefix length {0} is invalid.
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (Craig.Russell:4428 to 4429) | Craig L Russell | 23 Jan |