3877 magnus.blaudd@stripped 2012-04-11 [merge]
Merge 7.2 -> 7.3
removed:
mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt
added:
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java
modified:
mysql-test/suite/ndb/r/ndb_restore_misc.result
mysql-test/suite/ndb/t/ndb_restore_misc.test
mysql-test/suite/ndb_rpl/my.cnf
mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf
mysql-test/suite/rpl_ndb/my.cnf
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java
storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java
storage/ndb/clusterj/clusterj-tie/logging.properties
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java
storage/ndb/compile-cluster
storage/ndb/include/util/Vector.hpp
storage/ndb/memcache/extra/memcached/CMakeLists.txt
storage/ndb/memcache/extra/memcached/config_tests.in
storage/ndb/memcache/extra/memcached/daemon/memcached.c
storage/ndb/memcache/include/Scheduler.h
storage/ndb/memcache/include/ndb_engine.h
storage/ndb/memcache/include/ndb_engine_errors.h
storage/ndb/memcache/include/ndb_pipeline.h
storage/ndb/memcache/include/ndbmemcache_config.in
storage/ndb/memcache/src/ExternalValue.cc
storage/ndb/memcache/src/ndb_engine.c
storage/ndb/memcache/src/ndb_engine_errors.cc
storage/ndb/memcache/src/ndb_error_logger.cc
storage/ndb/memcache/src/ndb_pipeline.cc
storage/ndb/memcache/src/ndb_worker.cc
storage/ndb/memcache/src/schedulers/S_sched.cc
storage/ndb/memcache/src/schedulers/S_sched.h
storage/ndb/memcache/src/schedulers/Stockholm.cc
storage/ndb/memcache/src/schedulers/Stockholm.h
storage/ndb/memcache/unit/alloc.cc
storage/ndb/src/ndbapi/NdbQueryBuilder.cpp
storage/ndb/src/ndbapi/NdbQueryOperation.cpp
3876 Mauritz Sundell 2012-04-10
ndb - allow upgrade from 7.x to 7.3
modified:
storage/ndb/src/common/util/version.cpp
=== modified file 'mysql-test/suite/ndb/r/ndb_restore_misc.result'
--- a/mysql-test/suite/ndb/r/ndb_restore_misc.result 2011-08-17 10:36:01 +0000
+++ b/mysql-test/suite/ndb/r/ndb_restore_misc.result 2012-04-11 09:56:27 +0000
@@ -134,11 +134,12 @@ ForceVarPart: 1
drop table t1_c,t2_c,t3_c,t4_c,t5_c,t6_c,t7_c,t8_c,t9_c,t10_c,t11_c;
ForceVarPart: 0
ForceVarPart: 1
-select * from information_schema.columns where table_name = "t1_c";
-TABLE_CATALOG TABLE_SCHEMA TABLE_NAME COLUMN_NAME ORDINAL_POSITION COLUMN_DEFAULT IS_NULLABLE DATA_TYPE CHARACTER_MAXIMUM_LENGTH CHARACTER_OCTET_LENGTH NUMERIC_PRECISION NUMERIC_SCALE CHARACTER_SET_NAME COLLATION_NAME COLUMN_TYPE COLUMN_KEY EXTRA PRIVILEGES COLUMN_COMMENT
-def test t1_c capgoaledatta 1 NULL NO mediumint NULL NULL 7 0 NULL NULL mediumint(5) unsigned PRI auto_increment #
-def test t1_c goaledatta 2 NO char 2 2 NULL NULL latin1 latin1_swedish_ci char(2) PRI #
-def test t1_c maturegarbagefa 3 NO varchar 32 32 NULL NULL latin1 latin1_swedish_ci varchar(32) PRI #
+select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE
+from information_schema.columns where table_name = "t1_c";
+TABLE_SCHEMA TABLE_NAME COLUMN_NAME ORDINAL_POSITION DATA_TYPE
+test t1_c capgoaledatta 1 mediumint
+test t1_c goaledatta 2 char
+test t1_c maturegarbagefa 3 varchar
select count(*) from t1;
count(*)
5
=== modified file 'mysql-test/suite/ndb/t/ndb_restore_misc.test'
--- a/mysql-test/suite/ndb/t/ndb_restore_misc.test 2011-07-19 10:54:29 +0000
+++ b/mysql-test/suite/ndb/t/ndb_restore_misc.test 2012-04-11 07:17:49 +0000
@@ -192,9 +192,8 @@ source show_varpart.inc;
# Bug #30667
# ndb table discovery does not work correcly with information schema
# - prior to bug fix this would yeild no output and a warning
-# (priviliges differ on embedded and server so replace)
---replace_column 18 #
-select * from information_schema.columns where table_name = "t1_c";
+select TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, ORDINAL_POSITION, DATA_TYPE
+ from information_schema.columns where table_name = "t1_c";
# random output order??
#show tables;
=== modified file 'mysql-test/suite/ndb_rpl/my.cnf'
--- a/mysql-test/suite/ndb_rpl/my.cnf 2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/ndb_rpl/my.cnf 2012-03-29 10:13:47 +0000
@@ -59,12 +59,6 @@ report-host= 127.0.0.1
report-port= @mysqld.1.slave.port
report-user= root
-# Configure slave mysqld without innodb, and set myisam
-# as default storage engine(since innodb will be default
-# otherwise starting from 5.5)
-loose-skip-innodb
-default-storage-engine=myisam
-
skip-slave-start
# Directory where slaves find the dumps generated by "load data"
=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf 2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_break_3_chain.cnf 2012-03-29 10:13:47 +0000
@@ -47,9 +47,6 @@ relay-log= cluster2-r
log-slave-updates
ndb-log-apply-status
-loose-skip-innodb
-default-storage-engine=myisam
-
# Directory where slaves find the dumps generated by "load data"
# on the server. The path need to have constant length otherwise
# test results will vary, thus a relative path is used.
@@ -62,9 +59,6 @@ relay-log= cluster3-r
log-slave-updates
ndb-log-apply-status
-loose-skip-innodb
-default-storage-engine=myisam
-
# Directory where slaves find the dumps generated by "load data"
# on the server. The path need to have constant length otherwise
# test results will vary, thus a relative path is used.
=== removed file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt 2011-05-18 12:56:24 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_relayrotate-slave.opt 1970-01-01 00:00:00 +0000
@@ -1,3 +0,0 @@
---max_relay_log_size=16384
---loose-innodb
---log-warnings
=== modified file 'mysql-test/suite/rpl_ndb/my.cnf'
--- a/mysql-test/suite/rpl_ndb/my.cnf 2011-12-09 11:57:48 +0000
+++ b/mysql-test/suite/rpl_ndb/my.cnf 2012-03-29 10:13:47 +0000
@@ -59,12 +59,6 @@ report-host= 127.0.0.1
report-port= @mysqld.1.slave.port
report-user= root
-# Configure slave mysqld without innodb, and set myisam
-# as default storage engine(since innodb will be default
-# otherwise starting from 5.5)
-loose-skip-innodb
-default-storage-engine=myisam
-
skip-slave-start
# Directory where slaves find the dumps generated by "load data"
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2012-04-02 20:43:14 +0000
@@ -202,7 +202,7 @@ public class SessionImpl implements Sess
}
if (smartValueHandler.found()) {
// create a new proxy (or dynamic instance) with the smart value handler
- return domainTypeHandler.newInstance(smartValueHandler, db);
+ return domainTypeHandler.newInstance(smartValueHandler);
} else {
// not found
return null;
@@ -289,6 +289,16 @@ public class SessionImpl implements Sess
return instance;
}
+ /** Create an instance from a result data row.
+ * @param resultData the result of a query
+ * @param domainTypeHandler the domain type handler
+ * @return the instance
+ */
+ public <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler) {
+ T result = domainTypeHandler.newInstance(resultData, db);
+ return result;
+ }
+
/** Load the instance from the database into memory. Loading
* is asynchronous and will be executed when an operation requiring
* database access is executed: find, flush, or query. The instance must
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java 2012-03-12 09:22:04 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/AbstractDomainTypeHandlerImpl.java 2012-04-11 09:56:27 +0000
@@ -413,6 +413,10 @@ public abstract class AbstractDomainType
throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
}
+ public T newInstance(ResultData resultData, Db db) {
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
public void objectMarkModified(ValueHandler handler, String fieldName) {
throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
}
@@ -450,7 +454,7 @@ public abstract class AbstractDomainType
return reasons == null?null:reasons.toString();
}
- public T newInstance(ValueHandler valueHandler, Db db) {
+ public T newInstance(ValueHandler valueHandler) {
throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
}
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/metadata/DomainTypeHandlerImpl.java 2012-04-02 20:43:14 +0000
@@ -18,6 +18,7 @@
package com.mysql.clusterj.core.metadata;
import com.mysql.clusterj.core.spi.DomainFieldHandler;
+import com.mysql.clusterj.core.spi.SmartValueHandler;
import com.mysql.clusterj.core.spi.ValueHandlerFactory;
import com.mysql.clusterj.core.spi.ValueHandler;
import com.mysql.clusterj.ClusterJException;
@@ -36,6 +37,7 @@ import com.mysql.clusterj.core.store.Db;
import com.mysql.clusterj.core.store.Index;
import com.mysql.clusterj.core.store.Dictionary;
import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationHandler;
@@ -422,11 +424,22 @@ public class DomainTypeHandlerImpl<T> ex
@Override
public T newInstance(Db db) {
ValueHandler valueHandler = valueHandlerFactory.getValueHandler(this, db);
- return newInstance(valueHandler, db);
+ return newInstance(valueHandler);
+ }
+
+ /** Create a new domain type instance from the result.
+ * @param resultData the results from a database query
+ * @param db the Db
+ * @return the domain type instance
+ */
+ public T newInstance(ResultData resultData, Db db) {
+ ValueHandler valueHandler = valueHandlerFactory.getValueHandler(this, db, resultData);
+ T result = newInstance(valueHandler);
+ return result;
}
@Override
- public T newInstance(ValueHandler valueHandler, Db db) {
+ public T newInstance(ValueHandler valueHandler) {
T instance;
try {
if (dynamic) {
@@ -608,7 +621,7 @@ public class DomainTypeHandlerImpl<T> ex
Object[] result = new Object[numberOfFields];
int i = 0;
for (Integer idFieldNumber: idFieldNumbers) {
- result[idFieldNumber] = keyValues[i];
+ result[idFieldNumber] = keyValues[i++];
}
return result;
}
@@ -624,6 +637,7 @@ public class DomainTypeHandlerImpl<T> ex
/** Factory for default InvocationHandlerImpl */
protected ValueHandlerFactory defaultInvocationHandlerFactory = new ValueHandlerFactory() {
+
public <V> ValueHandler getValueHandler(DomainTypeHandlerImpl<V> domainTypeHandler, Db db) {
return new InvocationHandlerImpl<V>(domainTypeHandler);
}
@@ -633,6 +647,13 @@ public class DomainTypeHandlerImpl<T> ex
Object[] expandedKeyValues = expandKeyValues(keyValues);
return new KeyValueHandlerImpl(expandedKeyValues);
}
+
+ public <V> ValueHandler getValueHandler(
+ DomainTypeHandlerImpl<V> domainTypeHandler, Db db, ResultData resultData) {
+ ValueHandler result = new InvocationHandlerImpl<V>(domainTypeHandler);
+ objectSetValues(resultData, result);
+ return result;
+ }
};
public int getNumberOfTransientFields() {
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java 2012-02-09 10:22:48 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/query/QueryDomainTypeImpl.java 2012-04-11 09:56:27 +0000
@@ -27,7 +27,6 @@ import com.mysql.clusterj.core.spi.Domai
import com.mysql.clusterj.core.spi.DomainTypeHandler;
import com.mysql.clusterj.core.spi.QueryExecutionContext;
import com.mysql.clusterj.core.spi.SessionSPI;
-import com.mysql.clusterj.core.spi.ValueHandler;
import com.mysql.clusterj.core.spi.ValueHandlerBatching;
import com.mysql.clusterj.core.store.Index;
@@ -163,10 +162,7 @@ public class QueryDomainTypeImpl<T> impl
ResultData resultData = getResultData(context);
// put the result data into the result list
while (resultData.next()) {
- T row = (T) session.newInstance(cls);
- ValueHandler handler =domainTypeHandler.getValueHandler(row);
- // set values from result set into object
- domainTypeHandler.objectSetValues(resultData, handler);
+ T row = session.newInstance(resultData, domainTypeHandler);
resultList.add(row);
}
session.endAutoTransaction();
@@ -206,6 +202,7 @@ public class QueryDomainTypeImpl<T> impl
switch (scanType) {
case PRIMARY_KEY: {
+ if (logger.isDetailEnabled()) logger.detail("Using primary key find for query.");
// perform a select operation
Operation op = session.getSelectOperation(domainTypeHandler.getStoreTable());
op.beginDefinition();
@@ -221,7 +218,7 @@ public class QueryDomainTypeImpl<T> impl
case INDEX_SCAN: {
storeIndex = index.getStoreIndex();
- if (logger.isDetailEnabled()) logger.detail("Using index scan with index " + index.getIndexName());
+ if (logger.isDetailEnabled()) logger.detail("Using index scan with ordered index " + index.getIndexName() + " for query.");
IndexScanOperation op;
// perform an index scan operation
if (index.isMultiRange()) {
@@ -231,27 +228,31 @@ public class QueryDomainTypeImpl<T> impl
op = session.getIndexScanOperation(storeIndex, domainTypeHandler.getStoreTable());
}
+ op.beginDefinition();
// set the expected columns into the operation
domainTypeHandler.operationGetValues(op);
// set the bounds into the operation
index.operationSetBounds(context, op);
// set additional filter conditions
where.filterCmpValue(context, op);
+ op.endDefinition();
// execute the scan and get results
result = op.resultData();
break;
}
case TABLE_SCAN: {
- if (logger.isDetailEnabled()) logger.detail("Using table scan");
+ if (logger.isDetailEnabled()) logger.detail("Using table scan for query.");
// perform a table scan operation
ScanOperation op = session.getTableScanOperation(domainTypeHandler.getStoreTable());
+ op.beginDefinition();
// set the expected columns into the operation
domainTypeHandler.operationGetValues(op);
- // set the bounds into the operation
+ // set filter conditions into the operation
if (where != null) {
where.filterCmpValue(context, op);
}
+ op.endDefinition();
// execute the scan and get results
result = op.resultData();
break;
@@ -259,14 +260,16 @@ public class QueryDomainTypeImpl<T> impl
case UNIQUE_KEY: {
storeIndex = index.getStoreIndex();
- if (logger.isDetailEnabled()) logger.detail("Using unique lookup with index " + index.getIndexName());
+ if (logger.isDetailEnabled()) logger.detail("Using lookup with unique index " + index.getIndexName() + " for query.");
// perform a unique lookup operation
IndexOperation op = session.getUniqueIndexOperation(storeIndex, domainTypeHandler.getStoreTable());
+ op.beginDefinition();
// set the keys of the indexName into the operation
where.operationEqual(context, op);
// set the expected columns into the operation
//domainTypeHandler.operationGetValuesExcept(op, indexName);
domainTypeHandler.operationGetValues(op);
+ op.endDefinition();
// execute the select and get results
result = op.resultData();
break;
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/DomainTypeHandler.java 2012-04-02 20:43:14 +0000
@@ -93,6 +93,8 @@ public interface DomainTypeHandler<T> {
public void setUnsupported(String reason);
- public T newInstance(ValueHandler valueHandler, Db db);
+ public T newInstance(ValueHandler valueHandler);
+
+ public T newInstance(ResultData resultData, Db db);
}
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java 2011-11-22 22:01:23 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SessionSPI.java 2012-04-02 20:43:14 +0000
@@ -98,6 +98,8 @@ public interface SessionSPI extends Sess
<T> QueryDomainType<T> createQueryDomainType(DomainTypeHandler<T> handler);
+ <T> T newInstance(ResultData resultData, DomainTypeHandler<T> domainTypeHandler);
+
String getCoordinatedTransactionId();
void setCoordinatedTransactionId(String coordinatedTransactionId);
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/SmartValueHandler.java 2012-04-02 20:43:14 +0000
@@ -19,6 +19,7 @@ package com.mysql.clusterj.core.spi;
import com.mysql.clusterj.core.store.ClusterTransaction;
import com.mysql.clusterj.core.store.Operation;
+import com.mysql.clusterj.core.store.ResultData;
/** SmartValueHandler is the interface that must be implemented for
* operations that bypass the normal value handler and directly
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/spi/ValueHandlerFactory.java 2012-04-02 20:43:14 +0000
@@ -19,6 +19,7 @@ package com.mysql.clusterj.core.spi;
import com.mysql.clusterj.core.metadata.DomainTypeHandlerImpl;
import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.ResultData;
/** ValueHandlerFactory allows a component to provide an alternative value handler.
*
@@ -27,5 +28,8 @@ public interface ValueHandlerFactory {
<T> ValueHandler getValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db);
+ <T> ValueHandler getValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db, ResultData resultData);
+
<T> ValueHandler getKeyValueHandler(DomainTypeHandlerImpl<T> domainTypeHandler, Db db, Object keyValues);
+
}
=== modified file 'storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/src/main/java/com/mysql/clusterj/openjpa/NdbOpenJPADomainTypeHandlerImpl.java 2012-04-02 20:43:14 +0000
@@ -255,7 +255,12 @@ public class NdbOpenJPADomainTypeHandler
local.message("ERR_Implementation_Should_Not_Occur"));
}
- public T newInstance(ValueHandler valueHandler, Db db) {
+ public T newInstance(ValueHandler valueHandler) {
+ throw new ClusterJFatalInternalException(
+ local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+
+ public T newInstance(ResultData resultData,Db db) {
throw new ClusterJFatalInternalException(
local.message("ERR_Implementation_Should_Not_Occur"));
}
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java 2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/AbstractClusterJTest.java 2012-04-04 06:22:39 +0000
@@ -23,6 +23,8 @@ import com.mysql.clusterj.Constants;
import com.mysql.clusterj.Session;
import com.mysql.clusterj.SessionFactory;
import com.mysql.clusterj.Transaction;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
import java.io.BufferedReader;
import java.io.File;
@@ -57,6 +59,10 @@ import junit.framework.TestCase;
*
*/
public abstract class AbstractClusterJTest extends TestCase {
+ /** My logger */
+ static final Logger logger = LoggerFactoryService.getFactory()
+ .getInstance("com.mysql.clusterj.test");
+
protected static final String JDBC_DRIVER_NAME = "jdbc.driverName";
protected static final String JDBC_URL = "jdbc.url";
protected static Connection connection;
@@ -76,7 +82,7 @@ public abstract class AbstractClusterJTe
*
* Error messages collected during a test.
*/
- private StringBuffer errorMessages;
+ protected StringBuffer errorMessages;
/**
*
* A list of registered pc classes.
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java 2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/BitTypesTest.java 2012-04-07 02:06:08 +0000
@@ -1,5 +1,5 @@
/*
- Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+ Copyright (c) 2010, 2012, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -80,6 +80,7 @@ public class BitTypesTest extends Abstra
case 1: { // boolean
boolean data = (i % 2) == 0;
if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+ + " " + columnDescriptors[j].getColumnName()
+ " is (boolean)" + data);
return data;
}
@@ -90,6 +91,7 @@ public class BitTypesTest extends Abstra
data = (data * 2) + (int)(Math.random() * 2);
}
if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+ + " " + columnDescriptors[j].getColumnName()
+ " is (byte)" + data);
return Byte.valueOf((byte)data);
}
@@ -100,6 +102,7 @@ public class BitTypesTest extends Abstra
data = (data * 2) + (int)(Math.random() * 2);
}
if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+ + " " + columnDescriptors[j].getColumnName()
+ " is (short)" + data);
return Short.valueOf((short)data);
}
@@ -111,6 +114,7 @@ public class BitTypesTest extends Abstra
data = (data * 2) + ((int)(Math.random() * 2));
}
if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+ + " " + columnDescriptors[j].getColumnName()
+ " is (int)" + data);
// TODO bug in JDBC handling high bit
data = Math.abs(data);
@@ -124,6 +128,7 @@ public class BitTypesTest extends Abstra
data = (data * 256) + (i * 16) + d;
}
if (getDebug()) System.out.println("BitTypesTest.getColumnValue Column data for " + i + ", " + j
+ + " " + columnDescriptors[j].getColumnName()
+ " is (long)" + data);
return Long.valueOf(data);
}
@@ -148,13 +153,14 @@ public class BitTypesTest extends Abstra
errorIfNotEqual(where + " got failure on id for row " + i, i, actual[0]);
for (int j = 1; j < expected.length; ++j) {
if (getDebug()) System.out.println("BitTypesTest.verify for " + i + ", " + j
+ + " " + columnDescriptors[j - 1].getColumnName()
+ " is (" + actual[j].getClass().getName() + ")" + actual[j]);
switch (j) {
case 1: { // boolean
Boolean expectedColumn = (Boolean)expected[j];
Boolean actualColumn = (Boolean)actual[j];
errorIfNotEqual(where + " got failure on comparison of data for row "
- + i + " column " + j,
+ + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
expectedColumn, actualColumn);
break;
}
@@ -163,7 +169,7 @@ public class BitTypesTest extends Abstra
byte actualColumn = (Byte)actual[j];
// now compare bit by bit
errorIfNotEqual(where + " got failure on comparison of data for row "
- + i + " column " + j,
+ + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
break;
}
@@ -172,7 +178,7 @@ public class BitTypesTest extends Abstra
short actualColumn = (Short)actual[j];
// now compare bit by bit
errorIfNotEqual(where + " got failure on comparison of data for row "
- + i + " column " + j,
+ + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
break;
}
@@ -182,7 +188,7 @@ public class BitTypesTest extends Abstra
int actualColumn = (Integer)actual[j];
// now compare bit by bit
errorIfNotEqual(where + " got failure on comparison of data for row "
- + i + " column " + j,
+ + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
Integer.toHexString(expectedColumn), Integer.toHexString(actualColumn));
break;
}
@@ -192,7 +198,7 @@ public class BitTypesTest extends Abstra
long actualColumn = (Long)actual[j];
// now compare bit by bit
errorIfNotEqual(where + " got failure on comparison of data for row "
- + i + " column " + j,
+ + i + " column " + j + " " + columnDescriptors[j - 1].getColumnName(),
Long.toHexString(expectedColumn), Long.toHexString(actualColumn));
break;
}
@@ -209,9 +215,8 @@ public class BitTypesTest extends Abstra
}
public void testWriteNDBReadJDBC() {
-// TODO: investigate platform dependency when reading via JDBC
-// writeNDBreadJDBC();
-// failOnError();
+ writeNDBreadJDBC();
+ failOnError();
}
public void testWriteNDBReadNDB() {
@@ -220,9 +225,8 @@ public class BitTypesTest extends Abstra
}
public void testWriteJDBCReadJDBC() {
-// TODO: investigate platform dependency when reading via JDBC
-// writeJDBCreadJDBC();
-// failOnError();
+ writeJDBCreadJDBC();
+ failOnError();
}
static ColumnDescriptor bit1 = new ColumnDescriptor
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java 2011-12-18 18:37:39 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/DbugTest.java 2012-04-04 00:32:48 +0000
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2011, 2012, Oracle and/or its affiliates. All rights reserved.
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
@@ -17,6 +17,8 @@
package testsuite.clusterj;
+import java.io.File;
+
import com.mysql.clusterj.ClusterJHelper;
import com.mysql.clusterj.Dbug;
@@ -25,7 +27,9 @@ import com.mysql.clusterj.Dbug;
*/
public class DbugTest extends AbstractClusterJTest{
- static String tmpFileName = System.getProperty("MYSQL_TMP_DIR", "/tmp") + "/clusterj-test-dbug";
+ private static final String TMP_DIR_NAME = System.getProperty("java.io.tmpdir");
+ private static final String FILE_SEPARATOR = File.separator;
+ private static final String TMP_FILE_NAME = TMP_DIR_NAME + FILE_SEPARATOR + "clusterj-test-dbug";
public boolean getDebug() {
return false;
@@ -46,7 +50,7 @@ public class DbugTest extends AbstractCl
return;
}
String originalState = "t";
- String newState = "d,jointx:o," + tmpFileName;
+ String newState = "d,jointx:o," + TMP_FILE_NAME;
dbug.set(originalState);
String actualState = dbug.get();
errorIfNotEqual("Failed to set original state", originalState, actualState);
@@ -58,17 +62,17 @@ public class DbugTest extends AbstractCl
errorIfNotEqual("Failed to pop original state", originalState, actualState);
dbug = ClusterJHelper.newDbug();
- dbug.output(tmpFileName).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
+ dbug.output(TMP_FILE_NAME).flush().debug(new String[] {"a", "b", "c", "d", "e", "f"}).push();
actualState = dbug.get();
// keywords are stored LIFO
- errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + tmpFileName, actualState);
+ errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:O," + TMP_FILE_NAME, actualState);
dbug.pop();
dbug = ClusterJHelper.newDbug();
- dbug.append(tmpFileName).trace().debug("a,b,c,d,e,f").set();
+ dbug.append(TMP_FILE_NAME).trace().debug("a,b,c,d,e,f").set();
actualState = dbug.get();
// keywords are stored LIFO
- errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + tmpFileName + ":t", actualState);
+ errorIfNotEqual("Wrong state created", "d,f,e,d,c,b,a:a," + TMP_FILE_NAME + ":t", actualState);
dbug.pop();
failOnError();
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java 2011-03-22 15:32:28 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/QueryMultiColumnIndexInTest.java 2012-04-04 06:46:06 +0000
@@ -103,7 +103,7 @@ create table longintstringix (
}
public void testPrettyBigIn() {
- int arraySize = 4096;
+ int arraySize = 20;
Integer[] keys = new Integer[arraySize];
for (int i = 0; i < arraySize; ++i) {
keys[i] = i;
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java 2011-10-28 23:29:26 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/SchemaChangeTest.java 2012-04-04 06:22:39 +0000
@@ -44,6 +44,7 @@ public class SchemaChangeTest extends Ab
}
public void testFind() {
+ logger.info("PLEASE IGNORE THE FOLLOWING EXPECTED SEVERE ERROR.");
// change the schema (drop a column)
executeSQL(modifyTableStatement);
try {
@@ -80,6 +81,7 @@ public class SchemaChangeTest extends Ab
session.find(StringTypes.class, 0);
}
}
+ logger.info("PLEASE IGNORE THE PRECEDING EXPECTED SEVERE ERROR.\n");
failOnError();
}
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/domaintypehandler/CrazyDomainTypeHandlerFactoryImpl.java 2012-04-02 20:43:14 +0000
@@ -110,7 +110,7 @@ public class CrazyDomainTypeHandlerFacto
throw new UnsupportedOperationException("Not supported yet.");
}
- public T newInstance(ValueHandler valueHandler, Db db) {
+ public T newInstance(ValueHandler valueHandler) {
throw new UnsupportedOperationException("Not supported yet.");
}
@@ -208,6 +208,10 @@ public class CrazyDomainTypeHandlerFacto
throw new UnsupportedOperationException("Not supported yet.");
}
+ public T newInstance(ResultData resultData, Db db) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
};
} else {
return null;
=== modified file 'storage/ndb/clusterj/clusterj-tie/logging.properties'
--- a/storage/ndb/clusterj/clusterj-tie/logging.properties 2011-07-05 12:46:07 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/logging.properties 2012-04-11 09:56:27 +0000
@@ -13,7 +13,7 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
-
+com.mysql.clusterj.test.level=INFO
com.mysql.clusterj.bindings.level=INFO
com.mysql.clusterj.core.level=INFO
com.mysql.clusterj.core.metadata.level=INFO
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java 2012-03-06 16:51:07 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterConnectionImpl.java 2012-04-02 20:43:14 +0000
@@ -18,6 +18,7 @@
package com.mysql.clusterj.tie;
import java.util.IdentityHashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,6 +34,7 @@ import com.mysql.clusterj.ClusterJHelper
import com.mysql.clusterj.core.spi.ValueHandlerFactory;
import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.Index;
import com.mysql.clusterj.core.store.Table;
import com.mysql.clusterj.core.util.I18NHelper;
@@ -50,7 +52,7 @@ public class ClusterConnectionImpl
/** My logger */
static final Logger logger = LoggerFactoryService.getFactory()
- .getInstance(com.mysql.clusterj.core.store.ClusterConnection.class);
+ .getInstance(ClusterConnectionImpl.class);
/** Ndb_cluster_connection is wrapped by ClusterConnection */
protected Ndb_cluster_connection clusterConnection;
@@ -195,7 +197,9 @@ public class ClusterConnectionImpl
}
/**
- * Get the cached NdbRecord implementation for this cluster connection.
+ * Get the cached NdbRecord implementation for the table
+ * used with this cluster connection. All columns are included
+ * in the NdbRecord.
* Use a ConcurrentHashMap for best multithread performance.
* There are three possibilities:
* <ul><li>Case 1: return the already-cached NdbRecord
@@ -203,8 +207,7 @@ public class ClusterConnectionImpl
* </li><li>Case 3: return the winner of a race with another thread
* </li></ul>
* @param storeTable the store table
- * @param ndbDictionary the ndb dictionary
- * @return the NdbRecordImpl
+ * @return the NdbRecordImpl for the table
*/
protected NdbRecordImpl getCachedNdbRecordImpl(Table storeTable) {
String tableName = storeTable.getName();
@@ -239,17 +242,81 @@ public class ClusterConnectionImpl
}
}
- /** Remove the cached NdbRecord associated with this table. This allows schema change to work.
+ /**
+ * Get the cached NdbRecord implementation for the index and table
+ * used with this cluster connection.
+ * The NdbRecordImpl is cached under the name tableName+indexName.
+ * Only the key columns are included in the NdbRecord.
+ * Use a ConcurrentHashMap for best multithread performance.
+ * There are three possibilities:
+ * <ul><li>Case 1: return the already-cached NdbRecord
+ * </li><li>Case 2: return a new instance created by this method
+ * </li><li>Case 3: return the winner of a race with another thread
+ * </li></ul>
+ * @param storeTable the store table
+ * @param storeIndex the store index
+ * @return the NdbRecordImpl for the index
+ */
+ protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
+ String recordName = storeTable.getName() + "+" + storeIndex.getInternalName();
+ // find the NdbRecordImpl in the global cache
+ NdbRecordImpl result = ndbRecordImplMap.get(recordName);
+ if (result != null) {
+ // case 1
+ if (logger.isDebugEnabled())logger.debug("NdbRecordImpl found for " + recordName);
+ return result;
+ } else {
+ // dictionary is single thread
+ NdbRecordImpl newNdbRecordImpl;
+ synchronized (dictionaryForNdbRecord) {
+ // try again; another thread might have beat us
+ result = ndbRecordImplMap.get(recordName);
+ if (result != null) {
+ return result;
+ }
+ newNdbRecordImpl = new NdbRecordImpl(storeIndex, storeTable, dictionaryForNdbRecord);
+ }
+ NdbRecordImpl winner = ndbRecordImplMap.putIfAbsent(recordName, newNdbRecordImpl);
+ if (winner == null) {
+ // case 2: the previous value was null, so return the new (winning) value
+ if (logger.isDebugEnabled())logger.debug("NdbRecordImpl created for " + recordName);
+ return newNdbRecordImpl;
+ } else {
+ // case 3: another thread beat us, so return the winner and garbage collect ours
+ if (logger.isDebugEnabled())logger.debug("NdbRecordImpl lost race for " + recordName);
+ newNdbRecordImpl.releaseNdbRecord();
+ return winner;
+ }
+ }
+ }
+
+ /** Remove the cached NdbRecord(s) associated with this table. This allows schema change to work.
+ * All NdbRecords including any index NdbRecords will be removed. Index NdbRecords are named
+ * tableName+indexName.
* @param tableName the name of the table
*/
public void unloadSchema(String tableName) {
- if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + tableName);
- NdbRecordImpl ndbRecordImpl = ndbRecordImplMap.remove(tableName);
- if (ndbRecordImpl != null) {
- ndbRecordImpl.releaseNdbRecord();
+ // synchronize to avoid multiple threads unloading schema simultaneously
+ // it is possible although unlikely that another thread is adding an entry while
+ // we are removing entries; if this occurs an error will be signaled here
+ synchronized(ndbRecordImplMap) {
+ Iterator<Map.Entry<String, NdbRecordImpl>> iterator = ndbRecordImplMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, NdbRecordImpl> entry = iterator.next();
+ String key = entry.getKey();
+ if (key.startsWith(tableName)) {
+ // remove all records whose key begins with the table name; this will remove index records also
+ if (logger.isDebugEnabled())logger.debug("Removing cached NdbRecord for " + key);
+ NdbRecordImpl record = entry.getValue();
+ iterator.remove();
+ if (record != null) {
+ record.releaseNdbRecord();
+ }
+ }
+ }
+ if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName);
+ dictionaryForNdbRecord.removeCachedTable(tableName);
}
- if (logger.isDebugEnabled())logger.debug("Removing dictionary entry for cached table " + tableName);
- dictionaryForNdbRecord.removeCachedTable(tableName);
}
public ValueHandlerFactory getSmartValueHandlerFactory() {
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2012-04-02 20:43:14 +0000
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.List;
import com.mysql.clusterj.ClusterJDatastoreException;
-import com.mysql.clusterj.ClusterJException;
import com.mysql.clusterj.ClusterJFatalInternalException;
import com.mysql.clusterj.ClusterJHelper;
import com.mysql.clusterj.LockMode;
@@ -55,6 +54,8 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
import com.mysql.ndbjtie.ndbapi.NdbOperation.OperationOptionsConst;
import com.mysql.ndbjtie.ndbapi.NdbOperationConst.AbortOption;
import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst;
/**
*
@@ -243,12 +244,18 @@ class ClusterTransactionImpl implements
public IndexScanOperation getIndexScanOperation(Index storeIndex, Table storeTable) {
enlist();
+ if (USE_NDBRECORD) {
+ return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, indexScanLockMode);
+ }
IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
handleError(ndbIndex, ndbDictionary);
NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
handleError(ndbOperation, ndbTransaction);
- int lockMode = indexScanLockMode;
int scanFlags = 0;
+ int lockMode = indexScanLockMode;
+ if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+ scanFlags = ScanFlag.SF_KeyInfo;
+ }
int parallel = 0;
int batch = 0;
int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -259,12 +266,19 @@ class ClusterTransactionImpl implements
public IndexScanOperation getIndexScanOperationMultiRange(Index storeIndex, Table storeTable) {
enlist();
+ if (USE_NDBRECORD) {
+ return new NdbRecordIndexScanOperationImpl(this, storeIndex, storeTable, true, indexScanLockMode);
+ }
IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
handleError(ndbIndex, ndbDictionary);
NdbIndexScanOperation ndbOperation = ndbTransaction.getNdbIndexScanOperation(ndbIndex);
handleError(ndbOperation, ndbTransaction);
+ int scanFlags = 0;
int lockMode = indexScanLockMode;
- int scanFlags = ScanFlag.SF_MultiRange;
+ if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+ scanFlags = ScanFlag.SF_KeyInfo;
+ }
+ scanFlags |= ScanFlag.SF_MultiRange;
int parallel = 0;
int batch = 0;
int returnCode = ndbOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -307,12 +321,18 @@ class ClusterTransactionImpl implements
public ScanOperation getTableScanOperation(Table storeTable) {
enlist();
+ if (USE_NDBRECORD) {
+ return new NdbRecordTableScanOperationImpl(this, storeTable, tableScanLockMode);
+ }
TableConst ndbTable = ndbDictionary.getTable(storeTable.getName());
handleError(ndbTable, ndbDictionary);
NdbScanOperation ndbScanOperation = ndbTransaction.getNdbScanOperation(ndbTable);
handleError(ndbScanOperation, ndbTransaction);
int lockMode = tableScanLockMode;
int scanFlags = 0;
+ if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+ scanFlags = ScanFlag.SF_KeyInfo;
+ }
int parallel = 0;
int batch = 0;
int returnCode = ndbScanOperation.readTuples(lockMode, scanFlags, parallel, batch);
@@ -339,6 +359,9 @@ class ClusterTransactionImpl implements
public IndexOperation getUniqueIndexOperation(Index storeIndex, Table storeTable) {
enlist();
+ if (USE_NDBRECORD) {
+ return new NdbRecordUniqueKeyOperationImpl(this, storeIndex, storeTable);
+ }
IndexConst ndbIndex = ndbDictionary.getIndex(storeIndex.getInternalName(), storeTable.getName());
handleError(ndbIndex, ndbDictionary);
NdbIndexOperation ndbIndexOperation = ndbTransaction.getNdbIndexOperation(ndbIndex);
@@ -414,6 +437,33 @@ class ClusterTransactionImpl implements
return operation;
}
+ /** Create a table scan operation using NdbRecord.
+ *
+ * @param ndbRecord the NdbRecord for the result
+ * @param mask the columns to read
+ * @param options the scan options
+ * @return
+ */
+ public NdbScanOperation scanTable(NdbRecordConst ndbRecord, byte[] mask, ScanOptionsConst options) {
+ enlist();
+ int lockMode = tableScanLockMode;
+ NdbScanOperation operation = ndbTransaction.scanTable(ndbRecord, lockMode, mask, options, 0);
+ handleError(operation, ndbTransaction);
+ return operation;
+ }
+
+ /** Create a scan operation on the index using NdbRecord.
+ *
+ * @param ndbRecord the ndb record
+ * @param mask the mask that specifies which columns to read
+ * @param object scan options // TODO change this
+ * @return
+ */
+ public NdbIndexScanOperation scanIndex(NdbRecordConst key_record, NdbRecordConst result_record,
+ byte[] result_mask, ScanOptions scanOptions) {
+ return ndbTransaction.scanIndex(key_record, result_record, indexScanLockMode, result_mask, null, scanOptions, 0);
+ }
+
/** Create an NdbOperation for delete using NdbRecord.
*
* @param ndbRecord the NdbRecord
@@ -655,6 +705,16 @@ class ClusterTransactionImpl implements
return clusterConnectionImpl.getCachedNdbRecordImpl(storeTable);
}
+ /** Get the cached NdbRecordImpl for this index and table. The NdbRecordImpl is cached in the
+ * cluster connection.
+ * @param storeTable the table
+ * @param storeIndex the index
+ * @return
+ */
+ protected NdbRecordImpl getCachedNdbRecordImpl(Index storeIndex, Table storeTable) {
+ return clusterConnectionImpl.getCachedNdbRecordImpl(storeIndex, storeTable);
+ }
+
/**
* Add an operation to check for errors after execute.
* @param op the operation to check
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2012-04-02 20:43:14 +0000
@@ -95,7 +95,10 @@ class DbImpl implements com.mysql.cluste
}
public void close() {
- Ndb.delete(ndb);
+ if (ndb != null) {
+ Ndb.delete(ndb);
+ ndb = null;
+ }
clusterConnection.close(this);
}
@@ -142,7 +145,7 @@ class DbImpl implements com.mysql.cluste
/** Enlist an NdbTransaction using table and key data to specify
* the transaction coordinator.
*
- * @param table the table
+ * @param tableName the name of the table
* @param keyParts the list of partition key parts
* @return the ndbTransaction
*/
@@ -199,8 +202,8 @@ class DbImpl implements com.mysql.cluste
* the transaction coordinator. This method is also used if
* the key data is null.
*
- * @param table the table
- * @param keyParts the list of partition key parts
+ * @param tableName the name of the table
+ * @param partitionId the partition id
* @return the ndbTransaction
*/
public NdbTransaction enlist(String tableName, int partitionId) {
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordImpl.java 2012-04-08 20:50:07 +0000
@@ -26,12 +26,13 @@ import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.List;
-import com.mysql.clusterj.ClusterJDatastoreException;
import com.mysql.clusterj.ClusterJFatalInternalException;
import com.mysql.clusterj.ClusterJFatalUserException;
import com.mysql.clusterj.ClusterJUserException;
+import com.mysql.clusterj.ColumnType;
import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Index;
import com.mysql.clusterj.core.store.Table;
import com.mysql.clusterj.core.util.I18NHelper;
@@ -44,15 +45,23 @@ import com.mysql.ndbjtie.ndbapi.NdbRecor
import com.mysql.ndbjtie.ndbapi.NdbRecordConst;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.ColumnConst;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.Dictionary;
+import com.mysql.ndbjtie.ndbapi.NdbDictionary.IndexConst;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecification;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.RecordSpecificationArray;
import com.mysql.ndbjtie.ndbapi.NdbDictionary.TableConst;
/**
- * Wrapper around an NdbRecord. The default implementation can be used for create, read, update, or delete
- * using an NdbRecord that defines every column in the table. After construction, the instance is
+ * Wrapper around an NdbRecord. Operations may use one or two instances.
+ * <ul><li>The table implementation can be used for create, read, update, or delete
+ * using an NdbRecord that defines every column in the table.
+ * </li><li>The index implementation for unique indexes can be used with a unique lookup operation.
+ * </li><li>The index implementation for ordered (non-unique) indexes can be used with an index scan operation.
+ * </li></ul>
+ * After construction, the instance is
* read-only and can be shared among all threads that use the same cluster connection; and the size of the
- * buffer required for operations is available. The NdbRecord instance is released when the cluster
+ * buffer required for operations is available.
+ * Methods on the instance generally require a buffer to be passed, which is modified by the method.
+ * The NdbRecord instance is released when the cluster
* connection is closed or when schema change invalidates it. Column values can be set using a provided
* buffer and buffer manager.
*/
@@ -79,12 +88,15 @@ public class NdbRecordImpl {
private RecordSpecificationArray recordSpecificationArray;
/** The NdbTable */
- TableConst tableConst;
+ TableConst tableConst = null;
- /** The size of the receive buffer for this operation */
+ /** The NdbIndex, which will be null for complete-table instances */
+ IndexConst indexConst = null;
+
+ /** The size of the buffer for this NdbRecord */
protected int bufferSize;
- /** The maximum column id for this operation */
+ /** The maximum column id for this NdbRecord */
protected int maximumColumnId;
/** The offsets into the buffer for each column */
@@ -115,14 +127,16 @@ public class NdbRecordImpl {
private Dictionary ndbDictionary;
/** Number of columns for this NdbRecord */
- private int numberOfColumns;
+ private int numberOfTableColumns;
/** These fields are only used during construction of the RecordSpecificationArray */
int offset = 0;
int nullablePosition = 0;
byte[] defaultValues;
- /** Constructor used for insert operations that do not need to read data.
+ private int[] recordSpecificationIndexes = null;
+
+ /** Constructor for table operations.
*
* @param storeTable the store table
* @param ndbDictionary the ndb dictionary
@@ -130,14 +144,41 @@ public class NdbRecordImpl {
protected NdbRecordImpl(Table storeTable, Dictionary ndbDictionary) {
this.ndbDictionary = ndbDictionary;
this.tableConst = getNdbTable(storeTable.getName());
- this.numberOfColumns = tableConst.getNoOfColumns();
- this.recordSpecificationArray = RecordSpecificationArray.create(numberOfColumns);
- this.offsets = new int[numberOfColumns];
- this.lengths = new int[numberOfColumns];
- this.nullbitBitInByte = new int[numberOfColumns];
- this.nullbitByteOffset = new int[numberOfColumns];
- this.storeColumns = new Column[numberOfColumns];
+ this.numberOfTableColumns = tableConst.getNoOfColumns();
+ this.recordSpecificationArray = RecordSpecificationArray.create(numberOfTableColumns);
+ recordSpecificationIndexes = new int[numberOfTableColumns];
+ this.offsets = new int[numberOfTableColumns];
+ this.lengths = new int[numberOfTableColumns];
+ this.nullbitBitInByte = new int[numberOfTableColumns];
+ this.nullbitByteOffset = new int[numberOfTableColumns];
+ this.storeColumns = new Column[numberOfTableColumns];
this.ndbRecord = createNdbRecord(storeTable, ndbDictionary);
+ if (logger.isDetailEnabled()) logger.detail(storeTable.getName() + " " + dumpDefinition());
+ initializeDefaultBuffer();
+ }
+
+ /** Constructor for index operations. The NdbRecord has columns just for
+ * the columns in the index.
+ *
+ * @param storeIndex the store index
+ * @param storeTable the store table
+ * @param ndbDictionary the ndb dictionary
+ */
+ protected NdbRecordImpl(Index storeIndex, Table storeTable, Dictionary ndbDictionary) {
+ this.ndbDictionary = ndbDictionary;
+ this.tableConst = getNdbTable(storeTable.getName());
+ this.indexConst = getNdbIndex(storeIndex.getInternalName(), tableConst.getName());
+ this.numberOfTableColumns = tableConst.getNoOfColumns();
+ int numberOfIndexColumns = this.indexConst.getNoOfColumns();
+ this.recordSpecificationArray = RecordSpecificationArray.create(numberOfIndexColumns);
+ recordSpecificationIndexes = new int[numberOfTableColumns];
+ this.offsets = new int[numberOfTableColumns];
+ this.lengths = new int[numberOfTableColumns];
+ this.nullbitBitInByte = new int[numberOfTableColumns];
+ this.nullbitByteOffset = new int[numberOfTableColumns];
+ this.storeColumns = new Column[numberOfTableColumns];
+ this.ndbRecord = createNdbRecord(storeIndex, storeTable, ndbDictionary);
+ if (logger.isDetailEnabled()) logger.detail(storeIndex.getInternalName() + " " + dumpDefinition());
initializeDefaultBuffer();
}
@@ -152,9 +193,10 @@ public class NdbRecordImpl {
zeros.order(ByteOrder.nativeOrder());
// just to be sure, initialize with zeros
zeros.put(defaultValues);
+ // not all columns are set at this point, so only check for those that are set
for (Column storeColumn: storeColumns) {
// nullable columns get the null bit set
- if (storeColumn.getNullable()) {
+ if (storeColumn != null && storeColumn.getNullable()) {
setNull(zeros, storeColumn);
}
}
@@ -170,13 +212,22 @@ public class NdbRecordImpl {
*/
protected ByteBuffer newBuffer() {
ByteBuffer result = ByteBuffer.allocateDirect(bufferSize);
- result.order(ByteOrder.nativeOrder());
- result.put(defaultValues);
- result.limit(bufferSize);
- result.position(0);
+ initializeBuffer(result);
return result;
}
+ /** Initialize an already-allocated buffer with default values for all columns.
+ *
+ * @param buffer
+ */
+ protected void initializeBuffer(ByteBuffer buffer) {
+ buffer.order(ByteOrder.nativeOrder());
+ buffer.limit(bufferSize);
+ buffer.position(0);
+ buffer.put(defaultValues);
+ buffer.position(0);
+ }
+
public int setNull(ByteBuffer buffer, Column storeColumn) {
int columnId = storeColumn.getColumnId();
if (!storeColumn.getNullable()) {
@@ -217,11 +268,11 @@ public class NdbRecordImpl {
public int setByte(ByteBuffer buffer, Column storeColumn, byte value) {
resetNull(buffer, storeColumn);
int columnId = storeColumn.getColumnId();
- if (storeColumn.getLength() == 4) {
+ if (storeColumn.getType() == ColumnType.Bit) {
// the byte is stored as a BIT array of four bytes
- buffer.putInt(offsets[columnId], value);
+ buffer.putInt(offsets[columnId], value & 0xff);
} else {
- buffer.put(offsets[columnId], (byte)value);
+ buffer.put(offsets[columnId], value);
}
buffer.limit(bufferSize);
buffer.position(0);
@@ -232,8 +283,13 @@ public class NdbRecordImpl {
resetNull(buffer, storeColumn);
int columnId = storeColumn.getColumnId();
int offset = offsets[columnId];
- int length = storeColumn.getLength() + storeColumn.getPrefixLength();
- buffer.limit(offset + length);
+ int length = storeColumn.getLength();
+ int prefixLength = storeColumn.getPrefixLength();
+ if (length < value.length) {
+ throw new ClusterJUserException(local.message("ERR_Data_Too_Long",
+ storeColumn.getName(), length, value.length));
+ }
+ buffer.limit(offset + prefixLength + length);
buffer.position(offset);
Utility.convertValue(buffer, storeColumn, value);
buffer.limit(bufferSize);
@@ -287,7 +343,7 @@ public class NdbRecordImpl {
int columnId = storeColumn.getColumnId();
if (storeColumn.getLength() == 4) {
// the short is stored as a BIT array of four bytes
- buffer.putInt(offsets[columnId], value);
+ buffer.putInt(offsets[columnId], value & 0xffff);
} else {
buffer.putShort(offsets[columnId], (short)value);
}
@@ -298,15 +354,16 @@ public class NdbRecordImpl {
resetNull(buffer, storeColumn);
int columnId = storeColumn.getColumnId();
int offset = offsets[columnId];
- int length = storeColumn.getLength() + storeColumn.getPrefixLength();
+ int prefixLength = storeColumn.getPrefixLength();
+ int length = storeColumn.getLength() + prefixLength;
buffer.limit(offset + length);
buffer.position(offset);
// TODO provide the buffer to Utility.encode to avoid copying
// for now, use the encode method to encode the value then copy it
ByteBuffer converted = Utility.encode(value, storeColumn, bufferManager);
if (length < converted.remaining()) {
- throw new ClusterJUserException(local.message("ERR_Data_Too_Large",
- storeColumn.getName(), length, converted.remaining()));
+ throw new ClusterJUserException(local.message("ERR_Data_Too_Long",
+ storeColumn.getName(), length - prefixLength, converted.remaining() - prefixLength));
}
buffer.put(converted);
buffer.limit(bufferSize);
@@ -321,9 +378,9 @@ public class NdbRecordImpl {
public byte getByte(ByteBuffer buffer, int columnId) {
Column storeColumn = storeColumns[columnId];
- if (storeColumn.getLength() == 4) {
+ if (storeColumn.getType() == ColumnType.Bit) {
// the byte was stored in a BIT column as four bytes
- return (byte)buffer.get(offsets[columnId]);
+ return (byte)(buffer.getInt(offsets[columnId]));
} else {
// the byte was stored as a byte
return buffer.get(offsets[columnId]);
@@ -391,7 +448,7 @@ public class NdbRecordImpl {
Column storeColumn = storeColumns[columnId];
if (storeColumn.getLength() == 4) {
// the short was stored in a BIT column as four bytes
- return (short)buffer.get(offsets[columnId]);
+ return (short)buffer.getInt(offsets[columnId]);
} else {
// the short was stored as a short
return buffer.getShort(offsets[columnId]);
@@ -599,8 +656,35 @@ public class NdbRecordImpl {
}
}
+ protected NdbRecord createNdbRecord(Index storeIndex, Table storeTable, Dictionary ndbDictionary) {
+ String[] columnNames = storeIndex.getColumnNames();
+ // analyze columns; sort into alignment buckets, allocate space in the buffer
+ // and build the record specification array
+ analyzeColumns(storeTable, columnNames);
+ // create the NdbRecord
+ NdbRecord result = ndbDictionary.createRecord(indexConst, tableConst, recordSpecificationArray,
+ columnNames.length, SIZEOF_RECORD_SPECIFICATION, 0);
+ // delete the RecordSpecificationArray since it is no longer needed
+ RecordSpecificationArray.delete(recordSpecificationArray);
+ handleError(result, ndbDictionary);
+ return result;
+ }
+
protected NdbRecord createNdbRecord(Table storeTable, Dictionary ndbDictionary) {
String[] columnNames = storeTable.getColumnNames();
+ // analyze columns; sort into alignment buckets, allocate space in the buffer,
+ // and build the record specification array
+ analyzeColumns(storeTable, columnNames);
+ // create the NdbRecord
+ NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
+ columnNames.length, SIZEOF_RECORD_SPECIFICATION, 0);
+ // delete the RecordSpecificationArray since it is no longer needed
+ RecordSpecificationArray.delete(recordSpecificationArray);
+ handleError(result, ndbDictionary);
+ return result;
+ }
+
+ private void analyzeColumns(Table storeTable, String[] columnNames) {
List<Column> align8 = new ArrayList<Column>();
List<Column> align4 = new ArrayList<Column>();
List<Column> align2 = new ArrayList<Column>();
@@ -609,6 +693,8 @@ public class NdbRecordImpl {
int i = 0;
for (String columnName: columnNames) {
Column storeColumn = storeTable.getColumn(columnName);
+ int columnId = storeColumn.getColumnId();
+ recordSpecificationIndexes[columnId] = i;
if (logger.isDetailEnabled()) logger.detail("storeColumn: " + storeColumn.getName() + " id: " + storeColumn.getColumnId() + " index: " + i);
lengths[i] = storeColumn.getLength();
storeColumns[i++] = storeColumn;
@@ -671,28 +757,20 @@ public class NdbRecordImpl {
offset = (7 + offset) / 8 * 8;
nullIndicatorSize = offset;
for (Column storeColumn: align8) {
- handleColumn(8, storeColumn);
+ analyzeColumn(8, storeColumn);
}
for (Column storeColumn: align4) {
- handleColumn(4, storeColumn);
+ analyzeColumn(4, storeColumn);
}
for (Column storeColumn: align2) {
- handleColumn(2, storeColumn);
+ analyzeColumn(2, storeColumn);
}
for (Column storeColumn: align1) {
- handleColumn(1, storeColumn);
+ analyzeColumn(1, storeColumn);
}
bufferSize = offset;
if (logger.isDebugEnabled()) logger.debug(dumpDefinition());
-
- // now create an NdbRecord
- NdbRecord result = ndbDictionary.createRecord(tableConst, recordSpecificationArray,
- numberOfColumns, SIZEOF_RECORD_SPECIFICATION, 0);
- // delete the RecordSpecificationArray since it is no longer needed
- RecordSpecificationArray.delete(recordSpecificationArray);
- handleError(result, ndbDictionary);
- return result;
}
/** Create a record specification for a column. Keep track of the offset into the buffer
@@ -701,9 +779,10 @@ public class NdbRecordImpl {
* @param alignment the alignment for this column in the buffer
* @param storeColumn the column
*/
- private void handleColumn(int alignment, Column storeColumn) {
+ private void analyzeColumn(int alignment, Column storeColumn) {
int columnId = storeColumn.getColumnId();
- RecordSpecification recordSpecification = recordSpecificationArray.at(columnId);
+ int recordSpecificationIndex = recordSpecificationIndexes[columnId];
+ RecordSpecification recordSpecification = recordSpecificationArray.at(recordSpecificationIndex);
ColumnConst columnConst = tableConst.getColumn(columnId);
recordSpecification.column(columnConst);
recordSpecification.offset(offset);
@@ -727,21 +806,23 @@ public class NdbRecordImpl {
private String dumpDefinition() {
StringBuilder builder = new StringBuilder(tableConst.getName());
builder.append(" numberOfColumns: ");
- builder.append(numberOfColumns);
+ builder.append(numberOfTableColumns);
builder.append('\n');
- for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
+ for (int columnId = 0; columnId < numberOfTableColumns; ++columnId) {
Column storeColumn = storeColumns[columnId];
- builder.append(" column: ");
- builder.append(storeColumn.getName());
- builder.append(" offset: ");
- builder.append(offsets[columnId]);
- builder.append(" length: ");
- builder.append(lengths[columnId]);
- builder.append(" nullbitBitInByte: ");
- builder.append(nullbitBitInByte[columnId]);
- builder.append(" nullbitByteOffset: ");
- builder.append(nullbitByteOffset[columnId]);
- builder.append('\n');
+ if (storeColumn != null) {
+ builder.append(" column: ");
+ builder.append(storeColumn.getName());
+ builder.append(" offset: ");
+ builder.append(offsets[columnId]);
+ builder.append(" length: ");
+ builder.append(lengths[columnId]);
+ builder.append(" nullbitBitInByte: ");
+ builder.append(nullbitBitInByte[columnId]);
+ builder.append(" nullbitByteOffset: ");
+ builder.append(nullbitByteOffset[columnId]);
+ builder.append('\n');
+ }
}
return builder.toString();
}
@@ -749,37 +830,39 @@ public class NdbRecordImpl {
public String dumpValues(ByteBuffer data, byte[] mask) {
StringBuilder builder = new StringBuilder(tableConst.getName());
builder.append(" numberOfColumns: ");
- builder.append(numberOfColumns);
+ builder.append(numberOfTableColumns);
builder.append('\n');
- for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
+ for (int columnId = 0; columnId < numberOfTableColumns; ++columnId) {
Column storeColumn = storeColumns[columnId];
- builder.append(" column: ");
- builder.append(storeColumn.getName());
- builder.append(" offset: ");
- builder.append(offsets[columnId]);
- builder.append(" length: ");
- builder.append(lengths[columnId]);
- builder.append(" nullbitBitInByte: ");
- int nullBitInByte = nullbitBitInByte[columnId];
- builder.append(nullBitInByte);
- builder.append(" nullbitByteOffset: ");
- int nullByteOffset = nullbitByteOffset[columnId];
- builder.append(nullByteOffset);
- builder.append(" data: ");
- int size = storeColumn.getColumnSpace() != 0 ? storeColumn.getColumnSpace():storeColumn.getSize();
- int offset = offsets[columnId];
- data.limit(bufferSize);
- data.position(0);
- for (int index = offset; index < offset + size; ++index) {
- builder.append(String.format("%2x ", data.get(index)));
- }
- builder.append(" null: ");
- builder.append(isNull(data, columnId));
- builder.append(" present: ");
- if (mask != null) {
- builder.append(isPresent(mask, columnId));
+ if (storeColumn != null) {
+ builder.append(" column: ");
+ builder.append(storeColumn.getName());
+ builder.append(" offset: ");
+ builder.append(offsets[columnId]);
+ builder.append(" length: ");
+ builder.append(lengths[columnId]);
+ builder.append(" nullbitBitInByte: ");
+ int nullBitInByte = nullbitBitInByte[columnId];
+ builder.append(nullBitInByte);
+ builder.append(" nullbitByteOffset: ");
+ int nullByteOffset = nullbitByteOffset[columnId];
+ builder.append(nullByteOffset);
+ builder.append(" data: ");
+ int size = storeColumn.getColumnSpace() != 0 ? storeColumn.getColumnSpace():storeColumn.getSize();
+ int offset = offsets[columnId];
+ data.limit(bufferSize);
+ data.position(0);
+ for (int index = offset; index < offset + size; ++index) {
+ builder.append(String.format("%2x ", data.get(index)));
+ }
+ builder.append(" null: ");
+ builder.append(isNull(data, columnId));
+ if (mask != null) {
+ builder.append(" present: ");
+ builder.append(isPresent(mask, columnId));
+ }
+ builder.append('\n');
}
- builder.append('\n');
}
data.position(0);
return builder.toString();
@@ -791,9 +874,28 @@ public class NdbRecordImpl {
// try the lower case table name
ndbTable = ndbDictionary.getTable(tableName.toLowerCase());
}
+ if (ndbTable == null) {
+ Utility.throwError(ndbTable, ndbDictionary.getNdbError(), tableName);
+ }
return ndbTable;
}
+ TableConst getNdbTable() {
+ return tableConst;
+ }
+
+ IndexConst getNdbIndex(String indexName, String tableName) {
+ IndexConst ndbIndex = ndbDictionary.getIndex(indexName, tableName);
+ if (ndbIndex == null) {
+ Utility.throwError(ndbIndex, ndbDictionary.getNdbError(), tableName+ "+" + indexName);
+ }
+ return ndbIndex;
+ }
+
+ IndexConst getNdbIndex() {
+ return indexConst;
+ }
+
public int getBufferSize() {
return bufferSize;
}
@@ -803,7 +905,7 @@ public class NdbRecordImpl {
}
public int getNumberOfColumns() {
- return numberOfColumns;
+ return numberOfTableColumns;
}
protected void releaseNdbRecord() {
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordIndexScanOperationImpl.java 2012-04-05 15:12:21 +0000
@@ -0,0 +1,359 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.mysql.clusterj.ClusterJFatalInternalException;
+
+import com.mysql.clusterj.core.store.Column;
+import com.mysql.clusterj.core.store.Index;
+import com.mysql.clusterj.core.store.IndexScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+import com.mysql.ndbjtie.ndbapi.NdbIndexScanOperation;
+
+/** NdbRecordIndexScanOperationImpl performs index scans using NdbRecord.
+ * Two NdbRecordImpl instances are used: one to define the bounds (low and high)
+ * and one to define the result. The superclass NdbRecordScanOperationImpl
+ * holds the NdbRecordImpl and buffer that define and hold the result.
+ * <p>
+ * This instance declares and holds the bounds while they are being defined.
+ * Bounds are handled by creating two bound buffers: one for the low bound
+ * and a second for the high bound. While the bounds are being created, the
+ * number of columns and the strictness of the bound are recorded.
+ * <p>
+ * Bounds are calculated elsewhere based on the query parameters and delivered, in sequence,
+ * to this instance. Bounds are delivered for the most significant index column first,
+ * followed by the next most significant index column, until all columns that have bounds
+ * have been delivered. There may be more columns for the low bound versus the high bound,
+ * or vice versa. For each bound that is delivered, the method (assignBoundBuffer) determines
+ * to which bound, low or high, the bound belongs. The column count is incremented
+ * for the appropriate bound buffer. The value is then applied to the bound buffer using
+ * the setXXX method of the NdbRecordImpl that manages the layout of the bound buffer.
+ * <p>
+ * The superclass declares and holds the filter while it is being defined.
+ * <p>
+ * At endDefinition, the filter is used to create the scanOptions which is
+ * passed to create the NdbIndexScanOperation. Then the bounds are set into
+ * the newly created NdbIndexScanOperation.
+ * The resulting NdbIndexScanOperation is iterated (scanned) by the NdbRecordResultDataImpl.
+ */
+public class NdbRecordIndexScanOperationImpl extends NdbRecordScanOperationImpl implements IndexScanOperation {
+
+ /** The ndb index scan operation */
+ private NdbIndexScanOperation ndbIndexScanOperation;
+
+ /** The range for this bound */
+ private int indexBoundRange = 0;
+
+ /** The buffer that contains low bounds for all index columns */
+ private ByteBuffer indexBoundLowBuffer = null;
+
+ /** The number of columns in the low bound */
+ private int indexBoundLowCount = 0;
+
+ /** Is the low bound strict? */
+ private boolean indexBoundLowStrict = false;
+
+ /** The buffer that contains high bounds for all index columns */
+ private ByteBuffer indexBoundHighBuffer = null;
+
+ /** The number of columns in the high bound */
+ private int indexBoundHighCount = 0;
+
+ /** Is the high bound strict? */
+ private boolean indexBoundHighStrict = false;
+
+ /** Is this an equal scan? */
+ private boolean equalScan = true;
+
+ /** The list of index bounds already defined; null for a single range */
+ List<NdbIndexScanOperation.IndexBound> ndbIndexBoundList = null;
+
+ public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
+ Index storeIndex, Table storeTable, int lockMode) {
+ this(clusterTransaction, storeIndex, storeTable, false, lockMode);
+ }
+
+ public NdbRecordIndexScanOperationImpl(ClusterTransactionImpl clusterTransaction,
+ Index storeIndex, Table storeTable, boolean multiRange, int lockMode) {
+ super(clusterTransaction, storeTable, lockMode);
+ this.multiRange = multiRange;
+ if (this.multiRange) {
+ ndbIndexBoundList = new ArrayList<NdbIndexScanOperation.IndexBound>();
+ }
+ ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeIndex, storeTable);
+ keyBufferSize = ndbRecordKeys.bufferSize;
+ indexBoundLowBuffer = ndbRecordKeys.newBuffer();
+ indexBoundHighBuffer = ndbRecordKeys.newBuffer();
+ }
+
+ public void endDefinition() {
+ // get the scan options which also sets the filter
+ getScanOptions();
+ if (logger.isDetailEnabled()) logger.detail("scan options present " + dumpScanOptions(scanOptions.optionsPresent(), scanOptions.scan_flags()));
+
+ // create the scan operation
+ ndbIndexScanOperation = clusterTransaction.scanIndex(
+ ndbRecordKeys.getNdbRecord(), ndbRecordValues.getNdbRecord(), mask, scanOptions);
+ ndbOperation = ndbIndexScanOperation;
+
+ // set the bounds, either from the single indexBound or from multiple ranges
+ if (ndbIndexBoundList != null) {
+ if (logger.isDetailEnabled()) logger.detail("list size " + ndbIndexBoundList.size());
+ // apply all of the bounds to the operation
+ for (NdbIndexScanOperation.IndexBound ndbIndexBound: ndbIndexBoundList) {
+ int returnCode = ndbIndexScanOperation.setBound(ndbRecordKeys.getNdbRecord(), ndbIndexBound);
+ handleError(returnCode, ndbIndexScanOperation);
+ }
+ } else {
+ // only one range defined
+ NdbIndexScanOperation.IndexBound ndbIndexBound = getNdbIndexBound();
+ int returnCode = ndbIndexScanOperation.setBound(ndbRecordKeys.getNdbRecord(), ndbIndexBound);
+ handleError(returnCode, ndbIndexScanOperation);
+ }
+ }
+
+ public void setBoundBigInteger(Column storeColumn, BoundType type, BigInteger value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setBigInteger(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setBigInteger(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setBigInteger(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundByte(Column storeColumn, BoundType type, byte value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setByte(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setByte(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setByte(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundBytes(Column storeColumn, BoundType type, byte[] value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setBytes(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setBytes(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setBytes(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundDecimal(Column storeColumn, BoundType type, BigDecimal value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setDecimal(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setDecimal(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setDecimal(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundDouble(Column storeColumn, BoundType type, Double value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setDouble(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setDouble(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setDouble(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundFloat(Column storeColumn, BoundType type, Float value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setFloat(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setFloat(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setFloat(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundShort(Column storeColumn, BoundType type, short value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setShort(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setShort(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setShort(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundInt(Column storeColumn, BoundType type, Integer value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setInt(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setInt(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setInt(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundLong(Column storeColumn, BoundType type, long value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setLong(indexBoundLowBuffer, storeColumn, value);
+ ndbRecordKeys.setLong(indexBoundHighBuffer, storeColumn, value);
+ } else {
+ ndbRecordKeys.setLong(keyBuffer, storeColumn, value);
+ }
+ }
+
+ public void setBoundString(Column storeColumn, BoundType type, String value) {
+ if (logger.isDetailEnabled()) logger.detail(storeColumn.getName() + " " + type + " " + value);
+ // calculate the bound data, the buffer, and the strictness
+ ByteBuffer keyBuffer = assignBoundBuffer(type);
+ if (keyBuffer == null) {
+ // BoundEQ put data into both buffers
+ ndbRecordKeys.setString(indexBoundLowBuffer, bufferManager, storeColumn, value);
+ ndbRecordKeys.setString(indexBoundHighBuffer, bufferManager, storeColumn, value);
+ } else {
+ ndbRecordKeys.setString(keyBuffer, bufferManager, storeColumn, value);
+ }
+ }
+
+ public void endBound(int rangeNumber) {
+ if (logger.isDetailEnabled()) logger.detail("range: " + rangeNumber);
+ indexBoundRange = rangeNumber;
+ ndbIndexBoundList.add(getNdbIndexBound());
+ }
+
+ private ByteBuffer assignBoundBuffer(BoundType type) {
+ switch (type) {
+ case BoundEQ:
+ indexBoundHighCount++;
+ indexBoundLowCount++;
+ return null;
+ case BoundGE:
+ equalScan = false;
+ indexBoundHighCount++;
+ return indexBoundHighBuffer;
+ case BoundGT:
+ equalScan = false;
+ indexBoundHighStrict = true;
+ indexBoundHighCount++;
+ return indexBoundHighBuffer;
+ case BoundLE:
+ equalScan = false;
+ indexBoundLowCount++;
+ return indexBoundLowBuffer;
+ case BoundLT:
+ equalScan = false;
+ indexBoundLowStrict = true;
+ indexBoundLowCount++;
+ return indexBoundLowBuffer;
+ default:
+ throw new ClusterJFatalInternalException(local.message("ERR_Implementation_Should_Not_Occur"));
+ }
+ }
+
+ /** Create an ndb index bound for the current bounds and clear the current bounds
+ *
+ */
+ private NdbIndexScanOperation.IndexBound getNdbIndexBound() {
+ ByteBuffer reclaimed = null;
+ if (indexBoundLowCount + indexBoundHighCount > 0) {
+ if (indexBoundLowCount == 0) {
+ indexBoundLowBuffer = null;
+ } else {
+ indexBoundLowBuffer.limit(keyBufferSize);
+ indexBoundLowBuffer.position(0);
+ }
+ if (indexBoundHighCount == 0) {
+ indexBoundHighBuffer = null;
+ } else {
+ indexBoundHighBuffer.limit(keyBufferSize);
+ indexBoundHighBuffer.position(0);
+ }
+ if (equalScan) {
+ reclaimed = indexBoundLowBuffer;
+ indexBoundLowBuffer = indexBoundHighBuffer;
+ }
+ // set the index bound
+ NdbIndexScanOperation.IndexBound ndbindexBound = NdbIndexScanOperation.IndexBound.create();
+ ndbindexBound.low_key(indexBoundLowBuffer);
+ ndbindexBound.high_key(indexBoundHighBuffer);
+ ndbindexBound.low_key_count(indexBoundLowCount);
+ ndbindexBound.high_key_count(indexBoundHighCount);
+ ndbindexBound.low_inclusive(!indexBoundLowStrict);
+ ndbindexBound.high_inclusive(!indexBoundHighStrict);
+ ndbindexBound.range_no(indexBoundRange);
+ if (logger.isDetailEnabled()) logger.detail(
+ " indexBoundLowCount: " + indexBoundLowCount + " indexBoundHighCount: " + indexBoundHighCount +
+ " indexBoundLowStrict: " + indexBoundLowStrict + " indexBoundHighStrict: " + indexBoundHighStrict +
+ " range: " + indexBoundRange
+ );
+ // reset the index bound for the next range
+ // if equal bound, initialize and reuse previous buffer
+ if (reclaimed != null) {
+ indexBoundLowBuffer = reclaimed;
+ ndbRecordKeys.initializeBuffer(reclaimed);
+ } else {
+ indexBoundLowBuffer = ndbRecordKeys.newBuffer();
+ }
+ indexBoundHighBuffer = ndbRecordKeys.newBuffer();
+ indexBoundLowCount = 0;
+ indexBoundHighCount = 0;
+ indexBoundLowStrict = false;
+ indexBoundHighStrict = false;
+ indexBoundRange = 0;
+ equalScan = true;
+ return ndbindexBound;
+ } else {
+ return null;
+ }
+ }
+
+}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java 2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordInsertOperationImpl.java 2012-04-02 20:43:14 +0000
@@ -23,20 +23,11 @@ public class NdbRecordInsertOperationImp
public NdbRecordInsertOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
super(clusterTransaction, storeTable);
- this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
this.ndbRecordKeys = ndbRecordValues;
- this.valueBufferSize = ndbRecordValues.getBufferSize();
- this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
- this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+ this.keyBuffer = valueBuffer;
resetMask();
}
- public void beginDefinition() {
- // allocate a buffer for the operation data
- valueBuffer = ndbRecordValues.newBuffer();
- keyBuffer = valueBuffer;
- }
-
public void endDefinition() {
ndbOperation = insert(clusterTransaction);
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java 2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordKeyOperationImpl.java 2012-04-02 20:43:14 +0000
@@ -17,101 +17,31 @@
package com.mysql.clusterj.tie;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import com.mysql.clusterj.core.store.Column;
-import com.mysql.clusterj.core.store.ResultData;
import com.mysql.clusterj.core.store.Table;
public class NdbRecordKeyOperationImpl extends NdbRecordOperationImpl {
public NdbRecordKeyOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
super(clusterTransaction, storeTable);
- this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeTable);
- this.keyBufferSize = ndbRecordKeys.getBufferSize();
- this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
- this.valueBufferSize = ndbRecordValues.getBufferSize();
- this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
- this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
- resetMask();
- }
-
- public void beginDefinition() {
- // allocate a buffer for the key data
- keyBuffer = ByteBuffer.allocateDirect(keyBufferSize);
- keyBuffer.order(ByteOrder.nativeOrder());
- // allocate a buffer for the value result data
- // TODO: we should not need another buffer
- valueBuffer = ByteBuffer.allocateDirect(valueBufferSize);
- valueBuffer.order(ByteOrder.nativeOrder());
- }
-
- /** Specify the columns to be used for the operation.
- */
- public void getValue(Column storeColumn) {
- int columnId = storeColumn.getColumnId();
- columnSet(columnId);
- }
-
- /**
- * Mark this blob column to be read.
- * @param storeColumn the store column
- */
- @Override
- public void getBlob(Column storeColumn) {
- // create an NdbRecordBlobImpl for the blob
- int columnId = storeColumn.getColumnId();
- columnSet(columnId);
- NdbRecordBlobImpl blob = new NdbRecordBlobImpl(this, storeColumn);
- blobs[columnId] = blob;
+ this.ndbRecordKeys = this.ndbRecordValues;
+ this.keyBufferSize = this.valueBufferSize;
+ this.keyBuffer = valueBuffer;
}
public void endDefinition() {
- // position the key buffer at the beginning for ndbjtie
- keyBuffer.position(0);
- keyBuffer.limit(keyBufferSize);
// position the value buffer at the beginning for ndbjtie
valueBuffer.position(0);
valueBuffer.limit(valueBufferSize);
// create the key operation
ndbOperation = clusterTransaction.readTuple(ndbRecordKeys.getNdbRecord(), keyBuffer,
ndbRecordValues.getNdbRecord(), valueBuffer, mask, null);
- // set up a callback when this operation is executed
- clusterTransaction.postExecuteCallback(new Runnable() {
- public void run() {
- for (int columnId = 0; columnId < numberOfColumns; ++columnId) {
- NdbRecordBlobImpl blob = blobs[columnId];
- if (blob != null) {
- blob.setNdbBlob();
- }
- }
- }
- });
- }
-
- /** Construct a new ResultData using the saved column data and then execute the operation.
- */
- @Override
- public ResultData resultData() {
- return resultData(true);
- }
-
- /** Construct a new ResultData and if requested, execute the operation.
- */
- @Override
- public ResultData resultData(boolean execute) {
- NdbRecordResultDataImpl result =
- new NdbRecordResultDataImpl(this);
- if (execute) {
- clusterTransaction.executeNoCommit(false, true);
- }
- return result;
+ // set the NdbBlob for all active blob columns
+ activateBlobs();
}
@Override
public String toString() {
- return " key " + tableName;
+ return " primary key " + tableName;
}
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordOperationImpl.java 2012-04-05 05:45:15 +0000
@@ -45,6 +45,8 @@ import com.mysql.ndbjtie.ndbapi.NdbDicti
/**
* Implementation of store operation that uses NdbRecord.
+ * Operations of the "equal" variety delegate to the key NdbRecordImpl.
+ * Operations of the "set" and "get" varieties delegate to the value NdbRecordImpl.
*/
public class NdbRecordOperationImpl implements Operation {
@@ -68,7 +70,11 @@ public class NdbRecordOperationImpl impl
/** The NdbRecord for values */
protected NdbRecordImpl ndbRecordValues = null;
- /** The mask for this operation, which contains a bit set for each column accessed */
+ /** The mask for this operation, which contains a bit set for each column referenced.
+ * For insert, this contains a bit for each column to be inserted.
+ * For update, this contains a bit for each column to be updated.
+ * For read/scan operations, this contains a bit for each column to be read.
+ */
byte[] mask;
/** The ByteBuffer containing keys */
@@ -89,27 +95,32 @@ public class NdbRecordOperationImpl impl
/** The size of the value buffer for this operation */
protected int valueBufferSize;
- /** The size of the null indicator byte array */
- protected int nullIndicatorSize;
-
/** The buffer manager for string encode and decode */
protected BufferManager bufferManager;
/** The table name */
protected String tableName;
+ /** The store table */
+ protected Table storeTable;
+
/** The store columns. */
protected Column[] storeColumns;
/** The number of columns */
int numberOfColumns;
- /** Constructor used for smart value handler.
+ /** Constructor used for smart value handler for new instances,
+ * and the cluster transaction is not yet known. There is only one
+ * NdbRecord and one buffer, so all operations result in using
+ * the same buffer.
*
* @param clusterConnection the cluster connection
+ * @param db the Db
* @param storeTable the store table
*/
public NdbRecordOperationImpl(ClusterConnectionImpl clusterConnection, Db db, Table storeTable) {
+ this.storeTable = storeTable;
this.tableName = storeTable.getName();
this.ndbRecordValues = clusterConnection.getCachedNdbRecordImpl(storeTable);
this.ndbRecordKeys = ndbRecordValues;
@@ -124,18 +135,41 @@ public class NdbRecordOperationImpl impl
resetMask();
}
- protected void resetMask() {
- this.mask = new byte[1 + (numberOfColumns/8)];
- }
-
- /** Constructor used for insert and delete operations that do not need to read data.
+ /** Constructor used when the transaction is known.
*
* @param clusterTransaction the cluster transaction
*/
public NdbRecordOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable) {
- this.tableName = storeTable.getName();
this.clusterTransaction = clusterTransaction;
this.bufferManager = clusterTransaction.getBufferManager();
+ this.tableName = storeTable.getName();
+ this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+ this.valueBufferSize = ndbRecordValues.getBufferSize();
+ this.valueBuffer = ndbRecordValues.newBuffer();
+ this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
+ this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+ resetMask();
+ }
+
+ /** Constructor used to copy an existing NdbRecordOperationImpl for use with a SmartValueHandler.
+ * The value buffer is copied and cannot be used by the existing NdbRecordOperationImpl.
+ *
+ * @param ndbRecordOperationImpl2 the existing NdbRecordOperationImpl with value buffer
+ */
+ public NdbRecordOperationImpl(NdbRecordOperationImpl ndbRecordOperationImpl2) {
+ this.ndbRecordValues = ndbRecordOperationImpl2.ndbRecordValues;
+ this.valueBufferSize = ndbRecordOperationImpl2.valueBufferSize;
+ this.ndbRecordKeys = ndbRecordValues;
+ this.keyBufferSize = ndbRecordKeys.bufferSize;
+ this.valueBuffer = ndbRecordOperationImpl2.valueBuffer;
+ this.keyBuffer = this.valueBuffer;
+ this.bufferManager = ndbRecordOperationImpl2.bufferManager;
+ this.tableName = ndbRecordOperationImpl2.tableName;
+ this.storeColumns = ndbRecordOperationImpl2.ndbRecordValues.storeColumns;
+ this.numberOfColumns = this.storeColumns.length;
+ this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+ this.activeBlobs = ndbRecordOperationImpl2.activeBlobs;
+ resetMask();
}
public NdbOperationConst insert(ClusterTransactionImpl clusterTransactionImpl) {
@@ -206,6 +240,20 @@ public class NdbRecordOperationImpl impl
}
}
+ protected void resetMask() {
+ this.mask = new byte[1 + (numberOfColumns/8)];
+ }
+
+ public void allocateValueBuffer() {
+ this.valueBuffer = ndbRecordValues.newBuffer();
+ }
+
+ protected void activateBlobs() {
+ for (NdbRecordBlobImpl blob: activeBlobs) {
+ blob.setNdbBlob();
+ }
+ }
+
public void equalBigInteger(Column storeColumn, BigInteger value) {
int columnId = ndbRecordKeys.setBigInteger(keyBuffer, storeColumn, value);
columnSet(columnId);
@@ -262,8 +310,7 @@ public class NdbRecordOperationImpl impl
}
public void getBlob(Column storeColumn) {
- throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
- "NdbRecordOperationImpl.getBlob(Column)"));
+ getBlobHandle(storeColumn);
}
/**
@@ -285,11 +332,10 @@ public class NdbRecordOperationImpl impl
}
/** Specify the columns to be used for the operation.
- * This is implemented by a subclass.
*/
public void getValue(Column storeColumn) {
- throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
- "NdbRecordOperationImpl.getValue(Column)"));
+ int columnId = storeColumn.getColumnId();
+ columnSet(columnId);
}
public void postExecuteCallback(Runnable callback) {
@@ -297,19 +343,20 @@ public class NdbRecordOperationImpl impl
}
/** Construct a new ResultData using the saved column data and then execute the operation.
- * This is implemented by a subclass.
*/
public ResultData resultData() {
- throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
- "NdbRecordOperationImpl.resultData()"));
+ return resultData(true);
}
/** Construct a new ResultData and if requested, execute the operation.
- * This is implemented by a subclass.
*/
public ResultData resultData(boolean execute) {
- throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
- "NdbRecordOperationImpl.resultData(boolean)"));
+ NdbRecordResultDataImpl result =
+ new NdbRecordResultDataImpl(this);
+ if (execute) {
+ clusterTransaction.executeNoCommit(false, true);
+ }
+ return result;
}
public void setBigInteger(Column storeColumn, BigInteger value) {
@@ -594,10 +641,6 @@ public class NdbRecordOperationImpl impl
return ndbRecordValues.getLong(valueBuffer, columnId);
}
- public long getLong(Column storeColumn) {
- return getLong(storeColumn.getColumnId());
- }
-
public float getFloat(int columnId) {
return ndbRecordValues.getFloat(valueBuffer, columnId);
}
@@ -718,19 +761,21 @@ public class NdbRecordOperationImpl impl
}
public void beginDefinition() {
- throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
- "NdbRecordResultDataImpl.beginDefinition()"));
+ // by default, nothing to do
}
public void endDefinition() {
- throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
- "NdbRecordResultDataImpl.endDefinition()"));
+ // by default, nothing to do
}
public String dumpValues() {
return ndbRecordValues.dumpValues(valueBuffer, mask);
}
+ public String dumpKeys() {
+ return ndbRecordKeys.dumpValues(keyBuffer, null);
+ }
+
public boolean isModified(int columnId) {
return ndbRecordValues.isPresent(mask, columnId);
}
@@ -781,4 +826,16 @@ public class NdbRecordOperationImpl impl
}
}
+ /** Transform this NdbRecordOperationImpl into one that can be used to back a SmartValueHandler.
+ * For instances that are used in primary key or unique key operations, the same instance is used.
+ * Scans are handled by a subclass that overrides this method.
+ *
+ * @return this NdbRecordOperationImpl
+ */
+ public NdbRecordOperationImpl transformNdbRecordOperationImpl() {
+ this.keyBuffer = valueBuffer;
+ resetModified();
+ return this;
+ }
+
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java 2012-03-05 22:28:15 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordResultDataImpl.java 2012-04-02 20:43:14 +0000
@@ -31,7 +31,7 @@ import com.mysql.clusterj.core.util.Logg
import com.mysql.clusterj.core.util.LoggerFactoryService;
/**
- *
+ * Handle the results of an operation using NdbRecord.
*/
class NdbRecordResultDataImpl implements ResultData {
@@ -43,28 +43,21 @@ class NdbRecordResultDataImpl implements
static final Logger logger = LoggerFactoryService.getFactory()
.getInstance(NdbRecordResultDataImpl.class);
- /** Flags for iterating a scan */
- protected final int RESULT_READY = 0;
- protected final int SCAN_FINISHED = 1;
- protected final int CACHE_EMPTY = 2;
-
/** The NdbOperation that defines the result */
- private NdbRecordOperationImpl operation = null;
+ protected NdbRecordOperationImpl operation = null;
/** The flag indicating that there are no more results */
private boolean nextDone;
- /** Construct the ResultDataImpl based on an NdbRecordOperationImpl, and the
- * buffer manager to help with string columns.
+ /** Construct the ResultDataImpl based on an NdbRecordOperationImpl.
* @param operation the NdbRecordOperationImpl
- * @param bufferManager the buffer manager
*/
public NdbRecordResultDataImpl(NdbRecordOperationImpl operation) {
this.operation = operation;
}
public boolean next() {
- // NdbOperation has exactly zero or one result. ScanResultDataImpl handles scans...
+ // NdbOperation has exactly zero or one result. NdbRecordScanResultDataImpl handles scans...
// if the ndbOperation reports an error there is no result
int errorCode = operation.errorCode();
if (errorCode != 0) {
@@ -264,4 +257,12 @@ class NdbRecordResultDataImpl implements
return null;
}
+ /** Return an operation that can be used by SmartValueHandler.
+ * The operation contains the buffer with the row data from the operation.
+ * @return the operation
+ */
+ public NdbRecordOperationImpl transformOperation() {
+ return operation.transformNdbRecordOperationImpl();
+ }
+
}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanOperationImpl.java 2012-04-02 20:43:14 +0000
@@ -0,0 +1,223 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.spi.QueryExecutionContext;
+import com.mysql.clusterj.core.store.ResultData;
+import com.mysql.clusterj.core.store.ScanFilter;
+import com.mysql.clusterj.core.store.ScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+import com.mysql.ndbjtie.ndbapi.NdbInterpretedCode;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst;
+import com.mysql.ndbjtie.ndbapi.NdbScanFilter;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanFlag;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptions;
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation.ScanOptionsConst.Type;
+
+/** NdbRecordScanOperationImpl performs table and index scans using NdbRecord.
+ * The scans are set up via subclasses. After executing, the NdbRecordScanOperationImpl instance
+ * is owned and iterated (scanned) by NdbRecordScanResultDataImpl.
+ */
+public abstract class NdbRecordScanOperationImpl extends NdbRecordOperationImpl implements ScanOperation {
+
+ /** The ndb scan options */
+ ScanOptions scanOptions = null;
+
+ /** The ndb scan filter */
+ NdbScanFilter ndbScanFilter = null;
+
+ /** The ndb interpreted code used for filters */
+ NdbInterpretedCode ndbInterpretedCode = null;
+
+ /** Is this scan multi-range? */
+ protected boolean multiRange = false;
+
+ /** The lock mode for this operation */
+ int lockMode;
+
+ public NdbRecordScanOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable,
+ int lockMode) {
+ super(clusterTransaction, storeTable);
+ this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+ this.keyBufferSize = ndbRecordKeys.getBufferSize();
+ this.ndbRecordValues = clusterTransaction.getCachedNdbRecordImpl(storeTable);
+ this.valueBufferSize = ndbRecordValues.getBufferSize();
+ this.numberOfColumns = ndbRecordValues.getNumberOfColumns();
+ this.blobs = new NdbRecordBlobImpl[this.numberOfColumns];
+ this.lockMode = lockMode;
+ resetMask();
+ }
+
+ /** Construct a new ResultData and if requested, execute the operation.
+ */
+ @Override
+ public ResultData resultData(boolean execute) {
+ NdbRecordResultDataImpl result =
+ new NdbRecordScanResultDataImpl(this);
+ if (execute) {
+ clusterTransaction.executeNoCommit(false, true);
+ }
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return " scan " + tableName;
+ }
+
+ /** Deallocate resources used in by this scan after the scan is complete.
+ *
+ */
+ public void close() {
+ if (ndbInterpretedCode != null) {
+ NdbInterpretedCode.delete(ndbInterpretedCode);
+ }
+ if (ndbScanFilter != null) {
+ NdbScanFilter.delete(ndbScanFilter);
+ }
+ if (scanOptions != null) {
+ ScanOptions.delete(scanOptions);
+ }
+ ((NdbScanOperation)ndbOperation).close(true, true);
+ }
+
+ public void deleteCurrentTuple() {
+ int returnCode = ((NdbScanOperation)ndbOperation).deleteCurrentTuple();
+ handleError(returnCode, ndbOperation);
+ }
+
+ /** Create scan options for this scan.
+ * Scan options are used to set a filter into the NdbScanOperation,
+ * set the key info flag if using a lock mode that requires lock takeover, and set the multi range flag.
+ */
+ protected void getScanOptions() {
+ long options = 0L;
+ int flags = 0;
+ if (multiRange | (ndbScanFilter != null) |
+ (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead)) {
+
+ scanOptions = ScanOptions.create();
+ if (multiRange) {
+ flags |= ScanFlag.SF_MultiRange;
+ options |= (long)Type.SO_SCANFLAGS;
+ scanOptions.scan_flags(flags);
+ }
+ if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+ flags |= ScanFlag.SF_KeyInfo;
+ options |= (long)Type.SO_SCANFLAGS;
+ scanOptions.scan_flags(flags);
+ }
+ if (ndbScanFilter != null) {
+ options |= (long)Type.SO_INTERPRETED;
+ scanOptions.interpretedCode(ndbScanFilter.getInterpretedCode());
+ }
+
+ scanOptions.optionsPresent(options);
+ }
+ if (logger.isDebugEnabled()) logger.debug("ScanOptions: " + dumpScanOptions(options, flags));
+ }
+
+ protected String dumpScanOptions(long optionsPresent, int flags) {
+ StringBuilder builder = new StringBuilder();
+ if (0L != (optionsPresent & (long)Type.SO_BATCH)) builder.append("SO_BATCH ");
+ if (0L != (optionsPresent & (long)Type.SO_GETVALUE)) builder.append("SO_GETVALUE ");
+ if (0L != (optionsPresent & (long)Type.SO_PARALLEL)) builder.append("SO_PARALLEL ");
+ if (0L != (optionsPresent & (long)Type.SO_CUSTOMDATA)) builder.append("SO_CUSTOMDATA ");
+ if (0L != (optionsPresent & (long)Type.SO_INTERPRETED)) builder.append("SO_INTERPRETED ");
+ if (0L != (optionsPresent & (long)Type.SO_PARTITION_ID)) builder.append("SO_PARTITION_ID ");
+ if (0L != (optionsPresent & (long)Type.SO_SCANFLAGS)) {
+ builder.append("SO_SCANFLAGS(");
+ if (0 != (flags & ScanFlag.SF_KeyInfo)) builder.append("SF_KeyInfo ");
+ if (0 != (flags & ScanFlag.SF_Descending)) builder.append("SF_Descending ");
+ if (0 != (flags & ScanFlag.SF_DiskScan)) builder.append("SF_DiskScan ");
+ if (0 != (flags & ScanFlag.SF_MultiRange)) builder.append("SF_MultiRange ");
+ if (0 != (flags & ScanFlag.SF_OrderBy)) builder.append("SF_OrderBy ");
+ if (0 != (flags & ScanFlag.SF_ReadRangeNo)) builder.append("SF_ReadRangeNo ");
+ if (0 != (flags & ScanFlag.SF_TupScan)) builder.append("SF_TupScan ");
+ builder.append(")");
+ }
+ return builder.toString();
+ }
+
+ /** Create a scan filter for this scan.
+ * @param context the query execution context
+ * @return the ScanFilter to build the filter for the scan
+ */
+ public ScanFilter getScanFilter(QueryExecutionContext context) {
+
+ ndbInterpretedCode = NdbInterpretedCode.create(ndbRecordValues.getNdbTable(), null, 0);
+ ndbScanFilter = NdbScanFilter.create(ndbInterpretedCode);
+ handleError(ndbScanFilter, ndbOperation);
+ ScanFilter scanFilter = new ScanFilterImpl(ndbScanFilter);
+ context.addFilter(scanFilter);
+ return scanFilter;
+ }
+
+ /** Get the next result from the scan.
+ * Only used for deletePersistentAll to scan the table and delete all rows.
+ */
+ public int nextResult(boolean fetch) {
+ int result = ((NdbScanOperation)ndbOperation).nextResult(fetch, false);
+ clusterTransaction.handleError(result);
+ return result;
+ }
+
+ /** Get the next result from the scan. Copy the data into a newly allocated result buffer.
+ *
+ */
+ public int nextResultCopyOut(boolean fetch, boolean force) {
+ allocateValueBuffer();
+ int result = ((NdbScanOperation)ndbOperation).nextResultCopyOut(valueBuffer, fetch, force);
+ return result;
+ }
+
+ /** Transfer the lock on the current tuple to the original transaction.
+ * This allows the original transaction to keep the results locked until
+ * the original transaction completes.
+ * Only transfer the lock if the lock mode is not committed read
+ * (there is no lock held for committed read).
+ */
+ public void lockCurrentTuple() {
+ if (lockMode != com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode.LM_CommittedRead) {
+ NdbOperationConst op = ((NdbScanOperation)ndbOperation).lockCurrentTuple(
+ clusterTransaction.ndbTransaction, ndbRecordValues.getNdbRecord(),
+ null, null, null, 0);
+ if (op == null) {
+ Utility.throwError(op, ndbOperation.getNdbError());
+ }
+ }
+ }
+
+ /** Transform this NdbRecordOperationImpl into one that can be used to back a SmartValueHandler.
+ * For instances that are used in scans, create a new instance and allocate a new buffer
+ * to continue the scan.
+ *
+ * @return the NdbRecordOperationImpl
+ */
+ @Override
+ public NdbRecordOperationImpl transformNdbRecordOperationImpl() {
+ NdbRecordOperationImpl result = new NdbRecordOperationImpl(this);
+ // we gave away our buffers; get new ones for the next result
+ this.valueBuffer = ndbRecordValues.newBuffer();
+ this.keyBuffer = valueBuffer;
+ return result;
+ }
+
+}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordScanResultDataImpl.java 2012-04-02 20:43:14 +0000
@@ -0,0 +1,87 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.util.I18NHelper;
+import com.mysql.clusterj.core.util.Logger;
+import com.mysql.clusterj.core.util.LoggerFactoryService;
+
+import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+
+/**
+ *
+ */
+class NdbRecordScanResultDataImpl extends NdbRecordResultDataImpl {
+
+ /** My message translator */
+ static final I18NHelper local = I18NHelper
+ .getInstance(NdbRecordScanResultDataImpl.class);
+
+ /** My logger */
+ static final Logger logger = LoggerFactoryService.getFactory()
+ .getInstance(NdbRecordScanResultDataImpl.class);
+
+ /** Flags for iterating a scan */
+ protected final int RESULT_READY = 0;
+ protected final int SCAN_FINISHED = 1;
+ protected final int CACHE_EMPTY = 2;
+
+ /** The NdbOperation that defines the result */
+ private NdbRecordScanOperationImpl scanOperation = null;
+
+ /** The NdbScanOperation */
+ private NdbScanOperation ndbScanOperation = null;
+
+ /** Construct the ResultDataImpl based on an NdbRecordOperationImpl.
+ * When used with the compatibility operations, delegate to the NdbRecordOperation
+ * to copy data.
+ * @param operation the NdbRecordOperationImpl
+ */
+ public NdbRecordScanResultDataImpl(NdbRecordScanOperationImpl scanOperation) {
+ super(scanOperation);
+ this.scanOperation = scanOperation;
+ this.ndbScanOperation = (NdbScanOperation)scanOperation.ndbOperation;
+ }
+
+ @Override
+ public boolean next() {
+ // NdbScanOperation may have many results.
+ boolean done = false;
+ boolean fetch = false;
+ boolean force = true; // always true for scans
+ while (!done) {
+ int result = scanOperation.nextResultCopyOut(fetch, force);
+ switch (result) {
+ case RESULT_READY:
+ // if scanning with locks, grab the lock for the current transaction
+ scanOperation.lockCurrentTuple();
+ return true;
+ case SCAN_FINISHED:
+ ndbScanOperation.close(true, true);
+ return false;
+ case CACHE_EMPTY:
+ fetch = true;
+ break;
+ default:
+ Utility.throwError(result, ndbScanOperation.getNdbError());
+ }
+ }
+ return true; // this statement is needed to make the compiler happy but it's never executed
+ }
+
+}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerFactoryImpl.java 2012-04-02 20:43:14 +0000
@@ -21,6 +21,7 @@ import com.mysql.clusterj.core.metadata.
import com.mysql.clusterj.core.spi.ValueHandler;
import com.mysql.clusterj.core.spi.ValueHandlerFactory;
import com.mysql.clusterj.core.store.Db;
+import com.mysql.clusterj.core.store.ResultData;
public class NdbRecordSmartValueHandlerFactoryImpl implements ValueHandlerFactory {
@@ -35,4 +36,11 @@ public class NdbRecordSmartValueHandlerF
return result;
}
+ public <T> ValueHandler getValueHandler(
+ DomainTypeHandlerImpl<T> domainTypeHandler, Db db, ResultData resultData) {
+ NdbRecordSmartValueHandlerImpl result;
+ result = new NdbRecordSmartValueHandlerImpl(domainTypeHandler, db, resultData);
+ return result;
+ }
+
}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java 2012-03-10 19:39:46 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordSmartValueHandlerImpl.java 2012-04-08 20:50:07 +0000
@@ -38,12 +38,11 @@ import com.mysql.clusterj.core.metadata.
import com.mysql.clusterj.core.spi.DomainFieldHandler;
import com.mysql.clusterj.core.spi.DomainTypeHandler;
import com.mysql.clusterj.core.spi.SmartValueHandler;
-import com.mysql.clusterj.core.spi.ValueHandler;
import com.mysql.clusterj.core.store.ClusterTransaction;
import com.mysql.clusterj.core.store.Db;
import com.mysql.clusterj.core.store.Operation;
-import com.mysql.clusterj.core.store.Table;
+import com.mysql.clusterj.core.store.ResultData;
import com.mysql.clusterj.core.util.I18NHelper;
import com.mysql.clusterj.core.util.Logger;
@@ -94,14 +93,10 @@ public class NdbRecordSmartValueHandlerI
private Object proxy;
- public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db) {
+ public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler) {
this.domainTypeHandler = domainTypeHandler;
this.domainFieldHandlers = domainTypeHandler.getFieldHandlers();
fieldNumberToColumnNumberMap = domainTypeHandler.getFieldNumberToColumnNumberMap();
-
- Table storeTable = domainTypeHandler.getStoreTable();
- this.operation = ((DbImpl)db).newNdbRecordOperationImpl(storeTable);
-
numberOfTransientFields = domainTypeHandler.getNumberOfTransientFields();
transientModified = new boolean[numberOfTransientFields];
if (numberOfTransientFields != 0) {
@@ -109,6 +104,16 @@ public class NdbRecordSmartValueHandlerI
}
}
+ public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db) {
+ this(domainTypeHandler);
+ this.operation = ((DbImpl)db).newNdbRecordOperationImpl(domainTypeHandler.getStoreTable());
+ }
+
+ public NdbRecordSmartValueHandlerImpl(DomainTypeHandlerImpl<?> domainTypeHandler, Db db, ResultData resultData) {
+ this(domainTypeHandler);
+ this.operation = ((NdbRecordResultDataImpl)resultData).transformOperation();
+ }
+
public Operation insert(ClusterTransaction clusterTransaction) {
if (logger.isDetailEnabled()) logger.detail("smart insert for type: " + domainTypeHandler.getName()
+ "\n" + operation.dumpValues());
@@ -199,8 +204,8 @@ public class NdbRecordSmartValueHandlerI
}
public boolean[] getBooleans(int fieldNumber) {
- // TODO Auto-generated method stub
- return null;
+ throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
+ "NdbRecordSmartValueHandler.getBooleans(int)"));
}
public byte getByte(int fieldNumber) {
@@ -445,8 +450,8 @@ public class NdbRecordSmartValueHandlerI
}
public void setBooleans(int fieldNumber, boolean[] b) {
- // TODO Auto-generated method stub
-
+ throw new ClusterJFatalInternalException(local.message("ERR_Method_Not_Implemented",
+ "NdbRecordSmartValueHandler.setBooleans(int, boolean[])"));
}
public void setByte(int fieldNumber, byte value) {
@@ -686,8 +691,10 @@ public class NdbRecordSmartValueHandlerI
int columnId = fieldNumberToColumnNumberMap[fieldNumber];
if (columnId < 0) {
transientValues[-1 - columnId] = value;
+ transientModified[-1 - columnId] = true;
+ } else {
+ domainFieldHandlers[fieldNumber].objectSetValue(value, this);
}
- domainFieldHandlers[fieldNumber].objectSetValue(value, this);
}
public Object invoke(Object proxy, Method method, Object[] args)
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordTableScanOperationImpl.java 2012-04-02 20:43:14 +0000
@@ -0,0 +1,40 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.ScanOperation;
+import com.mysql.clusterj.core.store.Table;
+
+/** NdbRecordTableScanOperationImpl performs table scans using NdbRecord.
+ * Most methods are implemented in the superclass.
+ */
+public class NdbRecordTableScanOperationImpl extends NdbRecordScanOperationImpl implements ScanOperation {
+
+ public NdbRecordTableScanOperationImpl(ClusterTransactionImpl clusterTransaction, Table storeTable,
+ int lockMode) {
+ super(clusterTransaction, storeTable, lockMode);
+ }
+
+ public void endDefinition() {
+ // get the scan options which also sets the filter
+ getScanOptions();
+ // create the ndb scan operation
+ ndbOperation = clusterTransaction.scanTable(ndbRecordValues.getNdbRecord(), mask, scanOptions);
+ }
+
+}
=== added file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/NdbRecordUniqueKeyOperationImpl.java 2012-04-02 20:43:14 +0000
@@ -0,0 +1,53 @@
+/*
+ Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package com.mysql.clusterj.tie;
+
+import com.mysql.clusterj.core.store.Index;
+import com.mysql.clusterj.core.store.IndexOperation;
+import com.mysql.clusterj.core.store.Table;
+
+public class NdbRecordUniqueKeyOperationImpl extends NdbRecordOperationImpl implements IndexOperation {
+
+ public NdbRecordUniqueKeyOperationImpl(ClusterTransactionImpl clusterTransaction, Index storeIndex, Table storeTable) {
+ super(clusterTransaction, storeTable);
+ this.ndbRecordKeys = clusterTransaction.getCachedNdbRecordImpl(storeIndex, storeTable);
+ this.keyBufferSize = ndbRecordKeys.getBufferSize();
+ // allocate a buffer for the key data
+ keyBuffer = ndbRecordKeys.newBuffer();
+ }
+
+ public void endDefinition() {
+ // position the key buffer at the beginning for ndbjtie
+ keyBuffer.limit(keyBufferSize);
+ keyBuffer.position(0);
+ // position the value buffer at the beginning for ndbjtie
+ valueBuffer.limit(valueBufferSize);
+ valueBuffer.position(0);
+ // create the key operation
+ ndbOperation = clusterTransaction.readTuple(ndbRecordKeys.getNdbRecord(), keyBuffer,
+ ndbRecordValues.getNdbRecord(), valueBuffer, mask, null);
+ // set the NdbBlob for all active blob columns
+ activateBlobs();
+ }
+
+ @Override
+ public String toString() {
+ return " unique key " + tableName;
+ }
+
+}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java 2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ResultDataImpl.java 2012-04-05 05:45:15 +0000
@@ -128,12 +128,6 @@ class ResultDataImpl implements ResultDa
}
}
- private NdbRecAttr getValue(NdbOperation ndbOperation2, int columnId,
- ByteBuffer byteBuffer2) {
- // TODO: to help profiling
- return ndbOperation2.getValue(columnId, byteBuffer2);
- }
-
public boolean next() {
// NdbOperation has exactly zero or one result. ScanResultDataImpl handles scans...
NdbErrorConst error = ndbOperation.getNdbError();
@@ -215,7 +209,7 @@ class ResultDataImpl implements ResultDa
public long getLong(Column storeColumn) {
int index = storeColumn.getColumnId();
NdbRecAttr ndbRecAttr = ndbRecAttrs[index];
- return Utility.getLong(storeColumn, ndbRecAttr);
+ return Utility.getLong(storeColumn, ndbRecAttr.int64_value());
}
public float getFloat(int column) {
@@ -377,7 +371,7 @@ class ResultDataImpl implements ResultDa
public Long getObjectLong(Column storeColumn) {
int index = storeColumn.getColumnId();
NdbRecAttr ndbRecAttr = ndbRecAttrs[index];
- return (ndbRecAttr.isNULL() == 1)?null:Utility.getLong(storeColumn, ndbRecAttr);
+ return (ndbRecAttr.isNULL() == 1)?null:Utility.getLong(storeColumn, ndbRecAttr.int64_value());
}
public Float getObjectFloat(int column) {
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java 2011-02-02 09:52:33 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ScanResultDataImpl.java 2012-04-02 20:43:14 +0000
@@ -27,6 +27,7 @@ import com.mysql.clusterj.core.util.Logg
import com.mysql.clusterj.tie.DbImpl.BufferManager;
import com.mysql.ndbjtie.ndbapi.NdbScanOperation;
+import com.mysql.ndbjtie.ndbapi.NdbOperationConst.LockMode;
/**
*
@@ -67,6 +68,9 @@ class ScanResultDataImpl extends ResultD
int result = ndbScanOperation.nextResult(fetch, force);
switch (result) {
case RESULT_READY:
+ if (ndbScanOperation.getLockMode() != LockMode.LM_CommittedRead) {
+ ndbScanOperation.lockCurrentTuple();
+ }
return true;
case SCAN_FINISHED:
ndbScanOperation.close(true, true);
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java 2012-03-29 00:25:53 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/Utility.java 2012-04-05 06:37:40 +0000
@@ -355,11 +355,23 @@ public class Utility {
case Timestamp:
return (value >> 32) * 1000L;
case Date:
- // the unsigned value is stored in the top 3 bytes
- return unpackDate((int)(value >>> 40));
+ // the three high order bytes are the little endian representation
+ // the original is zzyyxx0000000000 and the result is 0000000000xxyyzz
+ long packedDate = 0L;
+ packedDate |= (value & ffoooooooooooooo) >>> 56;
+ packedDate |= (value & ooffoooooooooooo) >>> 40;
+ // the xx byte is signed, so shift left 16 and arithmetic shift right 40
+ packedDate |= ((value & ooooffoooooooooo) << 16) >> 40;
+ return unpackDate((int)packedDate);
case Time:
- // the signed value is stored in the top 3 bytes
- return unpackTime((int)(value >> 40));
+ // the three high order bytes are the little endian representation
+ // the original is zzyyxx0000000000 and the result is 0000000000xxyyzz
+ long packedTime = 0L;
+ packedTime |= (value & ffoooooooooooooo) >>> 56;
+ packedTime |= (value & ooffoooooooooooo) >>> 40;
+ // the xx byte is signed, so shift left 16 and arithmetic shift right 40
+ packedTime |= ((value & ooooffoooooooooo) << 16) >> 40;
+ return unpackTime((int)packedTime);
default:
throw new ClusterJUserException(
local.message("ERR_Unsupported_Mapping", storeColumn.getType(), "long"));
@@ -685,9 +697,9 @@ public class Utility {
case Timestamp:
return value * 1000L;
case Date:
- return unpackDate((int)value);
+ return unpackDate((int)(value));
case Time:
- return unpackTime((int)value);
+ return unpackTime((int)(value));
default:
throw new ClusterJUserException(
local.message("ERR_Unsupported_Mapping", storeColumn.getType(), "long"));
@@ -2020,10 +2032,12 @@ public class Utility {
* @param ndbRecAttr the NdbRecAttr
* @return the long
*/
- public static long getLong(Column storeColumn, NdbRecAttr ndbRecAttr) {
- return endianManager.getLong(storeColumn, ndbRecAttr);
- }
+ /** Convert a long value from storage.
+ * The value stored in the database might be a time, timestamp, date, bit array,
+ * or simply a long value. The converted value can be converted into a
+ * time, timestamp, date, bit array, or long value.
+ */
public static long getLong(Column storeColumn, long value) {
return endianManager.getLong(storeColumn, value);
}
=== modified file 'storage/ndb/compile-cluster'
--- a/storage/ndb/compile-cluster 2012-03-15 08:23:12 +0000
+++ b/storage/ndb/compile-cluster 2012-04-10 11:40:39 +0000
@@ -85,7 +85,7 @@ my $cmake_version_id;
my @args;
# Hardcoded options controlling how to build MySQL Server
- # push(@args, "-DWITH_SSL=bundled");
+ push(@args, "-DWITH_SSL=yes");
if ($opt_debug)
{
=== modified file 'storage/ndb/include/util/Vector.hpp'
--- a/storage/ndb/include/util/Vector.hpp 2011-09-02 09:16:56 +0000
+++ b/storage/ndb/include/util/Vector.hpp 2012-04-11 08:45:06 +0000
@@ -24,7 +24,8 @@
template<class T>
class Vector {
public:
- Vector(int sz = 10);
+ Vector(unsigned sz = 10, unsigned inc_sz = 0);
+ int expand(unsigned sz);
~Vector();
T& operator[](unsigned i);
@@ -32,7 +33,7 @@ public:
unsigned size() const { return m_size; };
int push_back(const T &);
- void push(const T&, unsigned pos);
+ int push(const T&, unsigned pos);
T& set(T&, unsigned pos, T& fill_obj);
T& back();
@@ -63,40 +64,80 @@ private:
unsigned m_arraySize;
};
-template<class T>
-Vector<T>::Vector(int i){
- m_items = new T[i];
+/**
+ * BEWARE: Constructing Vector with initial size > 0 is
+ * unsafe wrt. catching 'out of memory' errors.
+ * (C'tor doesn't return error code)
+ * Instead construct Vector with size==0, and then
+ * expand() it to the wanted initial size.
+ */
+template<class T>
+Vector<T>::Vector(unsigned sz, unsigned inc_sz):
+ m_items(NULL),
+ m_size(0),
+ m_incSize((inc_sz > 0) ? inc_sz : 50),
+ m_arraySize(0)
+{
+ if (sz == 0)
+ return;
+
+ m_items = new T[sz];
if (m_items == NULL)
{
errno = ENOMEM;
- m_size = 0;
- m_arraySize = 0;
- m_incSize = 0;
return;
}
- m_size = 0;
- m_arraySize = i;
- m_incSize = 50;
+ m_arraySize = sz;
+}
+
+template<class T>
+int
+Vector<T>::expand(unsigned sz){
+ if (sz <= m_size)
+ return 0;
+
+ T * tmp = new T[sz];
+ if(tmp == NULL)
+ {
+ errno = ENOMEM;
+ return -1;
+ }
+ for (unsigned i = 0; i < m_size; i++)
+ tmp[i] = m_items[i];
+ delete[] m_items;
+ m_items = tmp;
+ m_arraySize = sz;
+ return 0;
}
+/**
+ * BEWARE: Copy-constructing a Vector is
+ * unsafe wrt. catching 'out of memory' errors.
+ * (C'tor doesn't return error code)
+ * Instead construct empty Vector (size==0),
+ * and then assign() it the initial contents.
+ */
template<class T>
Vector<T>::Vector(const Vector& src):
- m_items(new T[src.m_size]),
- m_size(src.m_size),
+ m_items(NULL),
+ m_size(0),
m_incSize(src.m_incSize),
- m_arraySize(src.m_size)
-
+ m_arraySize(0)
{
+ const unsigned sz = src.m_size;
+ if (sz == 0)
+ return;
+
+ m_items = new T[sz];
if (unlikely(m_items == NULL)){
errno = ENOMEM;
- m_size = 0;
- m_arraySize = 0;
- m_incSize = 0;
return;
}
- for(unsigned i = 0; i < m_size; i++){
+ for(unsigned i = 0; i < sz; i++){
m_items[i] = src.m_items[i];
}
+ m_arraySize = sz;
+ m_size = sz;
}
template<class T>
@@ -127,6 +168,8 @@ Vector<T>::operator[](unsigned i) const
template<class T>
T &
Vector<T>::back(){
+ if(m_size==0)
+ abort();
return (* this)[m_size - 1];
}
@@ -134,17 +177,9 @@ template<class T>
int
Vector<T>::push_back(const T & t){
if(m_size == m_arraySize){
- T * tmp = new T [m_arraySize + m_incSize];
- if(tmp == NULL)
- {
- errno = ENOMEM;
- return -1;
- }
- for (unsigned k = 0; k < m_size; k++)
- tmp[k] = m_items[k];
- delete[] m_items;
- m_items = tmp;
- m_arraySize = m_arraySize + m_incSize;
+ const int err = expand(m_arraySize + m_incSize);
+ if (unlikely(err))
+ return err;
}
m_items[m_size] = t;
m_size++;
@@ -152,10 +187,12 @@ Vector<T>::push_back(const T & t){
}
template<class T>
-void
+int
Vector<T>::push(const T & t, unsigned pos)
{
- push_back(t);
+ const int err = push_back(t);
+ if (unlikely(err))
+ return err;
if (pos < m_size - 1)
{
for(unsigned i = m_size - 1; i > pos; i--)
@@ -164,13 +201,15 @@ Vector<T>::push(const T & t, unsigned po
}
m_items[pos] = t;
}
+ return 0;
}
template<class T>
T&
Vector<T>::set(T & t, unsigned pos, T& fill_obj)
{
- fill(pos, fill_obj);
+ if (fill(pos, fill_obj))
+ abort();
T& ret = m_items[pos];
m_items[pos] = t;
return ret;
@@ -196,19 +235,31 @@ Vector<T>::clear(){
template<class T>
int
Vector<T>::fill(unsigned new_size, T & obj){
+ const int err = expand(new_size);
+ if (unlikely(err))
+ return err;
while(m_size <= new_size)
if (push_back(obj))
return -1;
return 0;
}
+/**
+ * 'operator=' will 'abort()' on 'out of memory' errors.
+ * You may prefer using ::assign()' which returns
+ * an error code instead of aborting.
+ */
template<class T>
Vector<T>&
Vector<T>::operator=(const Vector<T>& obj){
if(this != &obj){
clear();
+ const int err = expand(obj.size());
+ if (unlikely(err))
+ abort();
for(unsigned i = 0; i<obj.size(); i++){
- push_back(obj[i]);
+ if (push_back(obj[i]))
+ abort();
}
}
return * this;
@@ -218,12 +269,19 @@ template<class T>
int
Vector<T>::assign(const T* src, unsigned cnt)
{
+ if (getBase() == src)
+ return 0; // Self-assign is a NOOP
+
clear();
+ const int err = expand(cnt);
+ if (unlikely(err))
+ return err;
+
for (unsigned i = 0; i<cnt; i++)
{
- int ret;
- if ((ret = push_back(src[i])))
- return ret;
+ const int err = push_back(src[i]);
+ if (unlikely(err))
+ return err;
}
return 0;
}
@@ -241,7 +299,8 @@ Vector<T>::equal(const Vector<T>& obj) c
template<class T>
class MutexVector : public NdbLockable {
public:
- MutexVector(int sz = 10);
+ MutexVector(unsigned sz = 10, unsigned inc_sz = 0);
+ int expand(unsigned sz);
~MutexVector();
T& operator[](unsigned i);
@@ -260,26 +319,60 @@ public:
int fill(unsigned new_size, T & obj);
private:
+ // Don't allow copy and assignment of MutexVector
+ MutexVector(const MutexVector&);
+ MutexVector<T>& operator=(const MutexVector<T>&);
+
T * m_items;
unsigned m_size;
unsigned m_incSize;
unsigned m_arraySize;
};
-template<class T>
-MutexVector<T>::MutexVector(int i){
- m_items = new T[i];
+/**
+ * BEWARE: Constructing MutexVector with initial size > 0 is
+ * unsafe wrt. catching 'out of memory' errors.
+ * (C'tor doesn't return error code)
+ * Instead construct MutexVector with size==0, and then
+ * expand() it to the wanted initial size.
+ */
+template<class T>
+MutexVector<T>::MutexVector(unsigned sz, unsigned inc_sz):
+ m_items(NULL),
+ m_size(0),
+ m_incSize((inc_sz > 0) ? inc_sz : 50),
+ m_arraySize(0)
+{
+ if (sz == 0)
+ return;
+
+ m_items = new T[sz];
if (m_items == NULL)
{
errno = ENOMEM;
- m_size = 0;
- m_arraySize = 0;
- m_incSize = 0;
return;
}
- m_size = 0;
- m_arraySize = i;
- m_incSize = 50;
+ m_arraySize = sz;
+}
+
+template<class T>
+int
+MutexVector<T>::expand(unsigned sz){
+ if (sz <= m_size)
+ return 0;
+
+ T * tmp = new T[sz];
+ if(tmp == NULL)
+ {
+ errno = ENOMEM;
+ return -1;
+ }
+ for (unsigned i = 0; i < m_size; i++)
+ tmp[i] = m_items[i];
+ delete[] m_items;
+ m_items = tmp;
+ m_arraySize = sz;
+ return 0;
}
template<class T>
@@ -310,6 +403,8 @@ MutexVector<T>::operator[](unsigned i) c
template<class T>
T &
MutexVector<T>::back(){
+ if(m_size==0)
+ abort();
return (* this)[m_size - 1];
}
@@ -318,18 +413,12 @@ int
MutexVector<T>::push_back(const T & t){
lock();
if(m_size == m_arraySize){
- T * tmp = new T [m_arraySize + m_incSize];
- if (tmp == NULL)
+ const int err = expand(m_arraySize + m_incSize);
+ if (unlikely(err))
{
- errno = ENOMEM;
unlock();
- return -1;
+ return err;
}
- for (unsigned k = 0; k < m_size; k++)
- tmp[k] = m_items[k];
- delete[] m_items;
- m_items = tmp;
- m_arraySize = m_arraySize + m_incSize;
}
m_items[m_size] = t;
m_size++;
@@ -343,19 +432,13 @@ MutexVector<T>::push_back(const T & t, b
if(lockMutex)
lock();
if(m_size == m_arraySize){
- T * tmp = new T [m_arraySize + m_incSize];
- if (tmp == NULL)
+ const int err = expand(m_arraySize + m_incSize);
+ if (unlikely(err))
{
- errno = ENOMEM;
if(lockMutex)
unlock();
- return -1;
+ return err;
}
- for (unsigned k = 0; k < m_size; k++)
- tmp[k] = m_items[k];
- delete[] m_items;
- m_items = tmp;
- m_arraySize = m_arraySize + m_incSize;
}
m_items[m_size] = t;
m_size++;
=== modified file 'storage/ndb/memcache/extra/memcached/CMakeLists.txt'
--- a/storage/ndb/memcache/extra/memcached/CMakeLists.txt 2011-09-23 00:00:18 +0000
+++ b/storage/ndb/memcache/extra/memcached/CMakeLists.txt 2012-03-31 15:10:44 +0000
@@ -84,6 +84,7 @@ CHECK_INCLUDE_FILE("netdb.h" HAV
CHECK_INCLUDE_FILE("sys/uio.h" HAVE_SYS_UIO_H)
CHECK_INCLUDE_FILE("signal.h" HAVE_SIGIGNORE)
CHECK_INCLUDE_FILES("sys/types.h;netinet/tcp.h" HAVE_NETINET_TCP_H)
+CHECK_FUNCTION_EXISTS(htonll HAVE_HTONLL)
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/config_tests.in
${CMAKE_CURRENT_SOURCE_DIR}/config.h)
=== modified file 'storage/ndb/memcache/extra/memcached/config_tests.in'
--- a/storage/ndb/memcache/extra/memcached/config_tests.in 2011-09-20 05:13:01 +0000
+++ b/storage/ndb/memcache/extra/memcached/config_tests.in 2012-03-31 15:16:18 +0000
@@ -24,6 +24,7 @@
#cmakedefine HAVE_SYS_UIO_H
#cmakedefine HAVE_NETINET_TCP_H
#cmakedefine HAVE_SIGIGNORE
+#cmakedefine HAVE_HTONLL
#include "my_config.h"
#include "config_static.h"
=== modified file 'storage/ndb/memcache/extra/memcached/daemon/memcached.c'
--- a/storage/ndb/memcache/extra/memcached/daemon/memcached.c 2011-09-20 05:13:01 +0000
+++ b/storage/ndb/memcache/extra/memcached/daemon/memcached.c 2012-04-06 00:06:41 +0000
@@ -1761,6 +1761,10 @@ static void process_bin_get(conn *c) {
case ENGINE_NOT_MY_VBUCKET:
write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
break;
+ case ENGINE_TMPFAIL:
+ write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
+ break;
+
default:
/* @todo add proper error handling! */
settings.extensions.logger->log(EXTENSION_LOG_WARNING, c,
=== modified file 'storage/ndb/memcache/include/Scheduler.h'
--- a/storage/ndb/memcache/include/Scheduler.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/Scheduler.h 2012-04-03 00:34:18 +0000
@@ -23,13 +23,21 @@
#include "ndbmemcache_global.h"
#include <memcached/types.h>
-#include "ndb_pipeline.h"
+#include "thread_identifier.h"
+
+
+typedef struct {
+ int nthreads; /* number of worker threads */
+ int max_clients; /* maximum number of client connections */
+ const char * config_string; /* scheduler-specific configuration string */
+} scheduler_options;
+
#ifdef __cplusplus
-#include "NdbInstance.h"
-#include "Configuration.h"
-#include "thread_identifier.h"
+/* Forward declarations */
+class Configuration;
+class workitem;
/* Scheduler is an interface */
@@ -44,10 +52,9 @@ public:
/** init() is the called from the main thread,
after configuration has been read.
threadnum: which thread this scheduler will eventually attach to
- nthreads: how many total threads will be initialized
- config_string: additional configuration string for scheduler
+ options: struct specifying run-time options
*/
- virtual void init(int threadnum, int nthreads, const char *config_string) = 0;
+ virtual void init(int threadnum, const scheduler_options * options) = 0;
/** attach_thread() is called from each thread
at pipeline initialization time. */
@@ -87,9 +94,6 @@ public:
*/
virtual bool global_reconfigure(Configuration *new_config) = 0;
- /** each scheduler instance serves a single NDB pipeline
- */
- ndb_pipeline *pipeline;
};
#endif
=== modified file 'storage/ndb/memcache/include/ndb_engine.h'
--- a/storage/ndb/memcache/include/ndb_engine.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/include/ndb_engine.h 2012-04-03 00:34:18 +0000
@@ -62,6 +62,7 @@ struct ndb_engine {
} startup_options;
struct {
+ size_t maxconns;
size_t nthreads;
bool cas_enabled;
size_t verbose;
=== modified file 'storage/ndb/memcache/include/ndb_engine_errors.h'
--- a/storage/ndb/memcache/include/ndb_engine_errors.h 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/include/ndb_engine_errors.h 2012-04-05 21:37:02 +0000
@@ -22,11 +22,22 @@
#include <ndberror.h>
-/* Errors 9000 - 9099 are reported as "Scheduler Error" */
-extern ndberror_struct AppError9001_ReconfLock;
-extern ndberror_struct AppError9002_NoNDBs;
-extern ndberror_struct AppError9003_SyncClose;
+/*
+ The NDB Engine for Memcached uses error codes 29000 - 29999
+*/
-/* Errors 9100 and up are reported as "Memcached Error" */
+
+
+/*** Errors 290xx and 291xx are reported as "Scheduler Error" ***/
+
+/* 2900x: general scheduler error codes */
+extern ndberror_struct AppError29001_ReconfLock;
+extern ndberror_struct AppError29002_NoNDBs;
+
+/* 2902x: blocking NDB operations in worker thread */
+extern ndberror_struct AppError29023_SyncClose;
+extern ndberror_struct AppError29024_autogrow;
+
+/*** Errors 29200 and up are reported as "Memcached Error" ***/
#endif
=== modified file 'storage/ndb/memcache/include/ndb_pipeline.h'
--- a/storage/ndb/memcache/include/ndb_pipeline.h 2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/include/ndb_pipeline.h 2012-04-05 21:00:05 +0000
@@ -26,14 +26,13 @@
#include "workqueue.h"
#include "ndb_engine.h"
+#include "Scheduler.h"
/* This structure is used in both C and C++ code, requiring a small hack: */
#ifdef __cplusplus
-/* forward declaration: */
-class Scheduler;
-#define C_OR_CPP_SCHEDULER Scheduler
+#define CPP_SCHEDULER Scheduler
#else
-#define C_OR_CPP_SCHEDULER void
+#define CPP_SCHEDULER void
#endif
/* In each pipeline there lives an allocator, which is used for workitems,
@@ -83,9 +82,9 @@ typedef struct request_pipeline {
unsigned int id; /*! each pipeline has an id */
unsigned int nworkitems; /*! counter used to give each workitem an id */
struct ndb_engine *engine;
- pthread_t engine_thread_id;
+ pthread_t worker_thread_id;
allocator_slab_class alligator[ALLIGATOR_ARRAY_SIZE]; /*! an allocator */
- C_OR_CPP_SCHEDULER *scheduler;
+ CPP_SCHEDULER *scheduler;
memory_pool *pool; /*! has the whole lifetime of the pipeline */
} ndb_pipeline;
@@ -99,7 +98,7 @@ DECLARE_FUNCTIONS_WITH_C_LINKAGE
ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *);
/** create a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id);
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *);
/** call into a pipeline for its own statistics */
void pipeline_add_stats(ndb_pipeline *, const char *key, ADD_STAT, const void *);
@@ -111,7 +110,7 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
/***** SCHEDULER APIS *****/
/** Global initialization of scheduler, at startup time */
-void * scheduler_initialize(const char *name, int nthreads, int threadnum);
+void * scheduler_initialize(ndb_pipeline *, scheduler_options *);
/** shutdown a scheduler */
void scheduler_shutdown(ndb_pipeline *);
=== modified file 'storage/ndb/memcache/include/ndbmemcache_config.in'
--- a/storage/ndb/memcache/include/ndbmemcache_config.in 2011-10-14 08:26:28 +0000
+++ b/storage/ndb/memcache/include/ndbmemcache_config.in 2012-03-31 18:56:49 +0000
@@ -41,5 +41,6 @@
#cmakedefine HAVE_FUNC_IN_CXX
+#cmakedefine HAVE_HTONLL
#endif
=== modified file 'storage/ndb/memcache/src/ExternalValue.cc'
--- a/storage/ndb/memcache/src/ExternalValue.cc 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ExternalValue.cc 2012-04-05 21:00:05 +0000
@@ -22,6 +22,7 @@
#include <assert.h>
#include "workitem.h"
+#include "NdbInstance.h"
#include "Operation.h"
#include "Scheduler.h"
#include "status_block.h"
=== modified file 'storage/ndb/memcache/src/ndb_engine.c'
--- a/storage/ndb/memcache/src/ndb_engine.c 2012-03-23 23:33:32 +0000
+++ b/storage/ndb/memcache/src/ndb_engine.c 2012-04-05 21:00:05 +0000
@@ -166,6 +166,7 @@ static ENGINE_ERROR_CODE ndb_initialize(
ENGINE_ERROR_CODE return_status;
struct ndb_engine *ndb_eng = ndb_handle(handle);
struct default_engine *def_eng = default_handle(ndb_eng);
+ scheduler_options sched_opts;
/* Process options for both the ndb engine and the default engine: */
read_cmdline_options(ndb_eng, def_eng, config_str);
@@ -212,6 +213,10 @@ static ENGINE_ERROR_CODE ndb_initialize(
/* prefetch data dictionary objects */
prefetch_dictionary_objects();
+ /* Build the scheduler options structure */
+ sched_opts.nthreads = ndb_eng->server_options.nthreads;
+ sched_opts.max_clients = ndb_eng->server_options.maxconns;
+
/* Allocate and initailize the pipelines, and their schedulers.
This will take some time; each pipeline creates slab and pool allocators,
and each scheduler may preallocate a large number of Ndb objects and
@@ -224,9 +229,9 @@ static ENGINE_ERROR_CODE ndb_initialize(
ndb_eng->pipelines = malloc(nthreads * sizeof(void *));
ndb_eng->schedulers = malloc(nthreads * sizeof(void *));
for(i = 0 ; i < nthreads ; i++) {
- ndb_eng->pipelines[i] = get_request_pipeline(i);
- ndb_eng->schedulers[i] =
- scheduler_initialize(ndb_eng->startup_options.scheduler, nthreads, i);
+ ndb_eng->pipelines[i] = get_request_pipeline(i, ndb_eng);
+ ndb_eng->schedulers[i] = scheduler_initialize(ndb_eng->pipelines[i],
+ & sched_opts);
if(ndb_eng->schedulers[i] == 0) {
logger->log(LOG_WARNING, NULL, "Illegal scheduler: \"%s\"\n",
ndb_eng->startup_options.scheduler);
@@ -819,6 +824,9 @@ int fetch_core_settings(struct ndb_engin
{ .key = "cas_enabled",
.datatype = DT_BOOL,
.value.dt_bool = &engine->server_options.cas_enabled },
+ { .key = "maxconns",
+ .datatype = DT_SIZE,
+ .value.dt_size = &engine->server_options.maxconns },
{ .key = "num_threads",
.datatype = DT_SIZE,
.value.dt_size = &engine->server_options.nthreads },
=== modified file 'storage/ndb/memcache/src/ndb_engine_errors.cc'
--- a/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_engine_errors.cc 2012-04-05 21:37:02 +0000
@@ -20,17 +20,22 @@
#include <ndberror.h>
-ndberror_struct AppError9001_ReconfLock =
- { ndberror_st_temporary , ndberror_cl_application , 9001, -1,
+ndberror_struct AppError29001_ReconfLock =
+ { ndberror_st_temporary , ndberror_cl_application , 29001, -1,
"Could not obtain configuration read lock", 0
};
-ndberror_struct AppError9002_NoNDBs =
- { ndberror_st_temporary , ndberror_cl_application , 9002, -1,
+ndberror_struct AppError29002_NoNDBs =
+ { ndberror_st_temporary , ndberror_cl_application , 29002, -1,
"No Ndb Instances in freelist", 0
};
-ndberror_struct AppError9003_SyncClose =
- { ndberror_st_temporary , ndberror_cl_application , 9003, -1,
+ndberror_struct AppError29023_SyncClose =
+ { ndberror_st_temporary , ndberror_cl_application , 29023, -1,
"Waited for synchronous close of NDB transaction", 0
};
+
+ndberror_struct AppError29024_autogrow =
+ { ndberror_st_temporary , ndberror_cl_application , 29024, -1,
+ "Out of Ndb instances, growing freelist", 0
+ };
=== modified file 'storage/ndb/memcache/src/ndb_error_logger.cc'
--- a/storage/ndb/memcache/src/ndb_error_logger.cc 2012-03-07 03:57:36 +0000
+++ b/storage/ndb/memcache/src/ndb_error_logger.cc 2012-04-06 01:00:19 +0000
@@ -208,8 +208,10 @@ void ndb_error_logger_stats(ADD_STAT add
for(i = 0 ; i < ERROR_HASH_TABLE_SIZE; i++) {
for(sym = error_hash_table[i] ; sym != 0 ; sym = sym->next) {
- klen = sprintf(key, "NDB_Error_%d", sym->error_code);
- vlen = sprintf(val, "%du", sym->count);
+ klen = sprintf(key, "%s_Error_%d",
+ (sym->error_code < 29000 ? "NDB" : "Engine"),
+ sym->error_code);
+ vlen = sprintf(val, "%lu", (long unsigned int) sym->count);
add_stat(key, klen, val, vlen, cookie);
}
}
=== modified file 'storage/ndb/memcache/src/ndb_pipeline.cc'
--- a/storage/ndb/memcache/src/ndb_pipeline.cc 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/ndb_pipeline.cc 2012-04-03 00:34:18 +0000
@@ -71,7 +71,12 @@ void init_pool_header(allocation_referen
/* The public API */
-/* initialize a new pipeline for an NDB engine thread */
+/* Attach a new pipeline to an NDB worker thread.
+ Some initialization has already occured when the main single-thread startup
+ called get_request_pipeline(). But this is the first call into a pipeline
+ from its worker thread. It will initialize the thread's identifier, and
+ attach the pipeline to its scheduler.
+*/
ndb_pipeline * ndb_pipeline_initialize(struct ndb_engine *engine) {
bool did_inc;
unsigned int id;
@@ -85,14 +90,13 @@ ndb_pipeline * ndb_pipeline_initialize(s
/* Fetch the partially initialized pipeline */
ndb_pipeline * self = (ndb_pipeline *) engine->pipelines[id];
-
+
+ /* Sanity checks */
assert(self->id == id);
+ assert(self->engine == engine);
- /* Set the pointer back to the engine */
- self->engine = engine;
-
- /* And the thread id */
- self->engine_thread_id = pthread_self();
+ /* Set the pthread id */
+ self->worker_thread_id = pthread_self();
/* Create and set a thread identity */
tid = (thread_identifier *) memory_pool_alloc(self->pool, sizeof(thread_identifier));
@@ -108,16 +112,20 @@ ndb_pipeline * ndb_pipeline_initialize(s
}
-/* Allocate and initialize a generic request pipeline */
-ndb_pipeline * get_request_pipeline(int thd_id) {
+/* Allocate and initialize a generic request pipeline.
+ In unit test code, this can be called with a NULL engine pointer --
+ it will still initialize a usable slab allocator and memory pool
+ which can be tested.
+*/
+ndb_pipeline * get_request_pipeline(int thd_id, struct ndb_engine *engine) {
/* Allocate the pipeline */
ndb_pipeline *self = (ndb_pipeline *) malloc(sizeof(ndb_pipeline));
/* Initialize */
- self->engine = 0;
+ self->engine = engine;
self->id = thd_id;
self->nworkitems = 0;
-
+
/* Say hi to the alligator */
init_allocator(self);
@@ -160,27 +168,27 @@ ENGINE_ERROR_CODE pipeline_flush_all(ndb
/* The scheduler API */
-void * scheduler_initialize(const char *cf, int nthreads, int athread) {
+void * scheduler_initialize(ndb_pipeline *self, scheduler_options *options) {
Scheduler *s = 0;
- const char *sched_options = 0;
+ const char *cf = self->engine->startup_options.scheduler;
+ options->config_string = 0;
if(cf == 0 || *cf == 0) {
s = new DEFAULT_SCHEDULER;
}
else if(!strncasecmp(cf, "stockholm", 9)) {
s = new Scheduler_stockholm;
- sched_options = & cf[9];
+ options->config_string = & cf[9];
}
else if(!strncasecmp(cf,"S", 1)) {
s = new S::SchedulerWorker;
- sched_options = & cf[1];
+ options->config_string = & cf[1];
}
else {
return NULL;
}
-
- s->init(athread, nthreads, sched_options);
- s->pipeline = 0;
+
+ s->init(self->id, options);
return (void *) s;
}
=== modified file 'storage/ndb/memcache/src/ndb_worker.cc'
--- a/storage/ndb/memcache/src/ndb_worker.cc 2012-03-23 19:01:07 +0000
+++ b/storage/ndb/memcache/src/ndb_worker.cc 2012-04-05 21:37:02 +0000
@@ -880,7 +880,7 @@ void worker_close(NdbTransaction *tx, wo
nwaits_post = ndb->getClientStat(Ndb::WaitExecCompleteCount);
if(nwaits_post > nwaits_pre)
- log_app_error(& AppError9003_SyncClose);
+ log_app_error(& AppError29023_SyncClose);
if(wqitem->ext_val)
delete wqitem->ext_val;
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-03-22 19:24:32 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc 2012-04-06 01:00:19 +0000
@@ -64,13 +64,14 @@ S::SchedulerGlobal::SchedulerGlobal(Conf
}
-void S::SchedulerGlobal::init(int _nthreads, const char *_config_string) {
+void S::SchedulerGlobal::init(const scheduler_options *sched_opts) {
DEBUG_ENTER_METHOD("S::SchedulerGlobal::init");
/* Set member variables */
- nthreads = _nthreads;
- config_string = _config_string;
+ nthreads = sched_opts->nthreads;
+ config_string = sched_opts->config_string;
parse_config_string(nthreads, config_string);
+ options.max_clients = sched_opts->max_clients;
/* Fetch or initialize clusters */
nclusters = conf->nclusters;
@@ -104,8 +105,9 @@ void S::SchedulerGlobal::init(int _nthre
/* Log message for startup */
logger->log(LOG_WARNING, 0, "Scheduler: starting for %d cluster%s; "
- "c%d,f%d,t%d", nclusters, nclusters == 1 ? "" : "s",
- options.n_connections, options.force_send, options.send_timer);
+ "c%d,f%d,g%d,t%d", nclusters, nclusters == 1 ? "" : "s",
+ options.n_connections, options.force_send,
+ options.auto_grow, options.send_timer);
/* Now Running */
running = true;
@@ -171,6 +173,7 @@ void S::SchedulerGlobal::parse_config_st
options.n_connections = 0; // 0 = n_connections based on db-stored config
options.force_send = 0; // 0 = force send always off
options.send_timer = 1; // 1 = 1 ms. timer in send thread
+ options.auto_grow = 1; // 1 = allow NDB instance pool to grow on demand
if(str) {
const char *s = str;
@@ -188,6 +191,9 @@ void S::SchedulerGlobal::parse_config_st
case 'f':
options.force_send = value;
break;
+ case 'g':
+ options.auto_grow = value;
+ break;
case 't':
options.send_timer = value;
break;
@@ -214,6 +220,10 @@ void S::SchedulerGlobal::parse_config_st
logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
assert(options.send_timer >= 1 && options.send_timer <= 10);
}
+ if(options.auto_grow < 0 || options.auto_grow > 1) {
+ logger->log(LOG_WARNING, 0, "Invalid scheduler configuration.\n");
+ assert(options.auto_grow == 0 || options.auto_grow == 1);
+ }
}
@@ -249,15 +259,14 @@ void S::SchedulerGlobal::add_stats(const
/* SchedulerWorker methods */
void S::SchedulerWorker::init(int my_thread,
- int nthreads,
- const char * config_string) {
+ const scheduler_options * options) {
/* On the first call in, initialize the SchedulerGlobal.
* This will start the send & poll threads for each connection.
*/
if(my_thread == 0) {
sched_generation_number = 1;
s_global = new SchedulerGlobal(& get_Configuration());
- s_global->init(nthreads, config_string);
+ s_global->init(options);
}
/* Initialize member variables */
@@ -286,7 +295,7 @@ void S::SchedulerWorker::attach_thread(t
ENGINE_ERROR_CODE S::SchedulerWorker::schedule(workitem *item) {
int c = item->prefix_info.cluster_id;
ENGINE_ERROR_CODE response_code;
- NdbInstance *inst;
+ NdbInstance *inst = 0;
S::WorkerConnection *wc;
const KeyPrefix *pfx;
@@ -299,29 +308,41 @@ ENGINE_ERROR_CODE S::SchedulerWorker::sc
pthread_rwlock_unlock(& reconf_lock);
}
else {
- log_app_error(& AppError9001_ReconfLock);
+ log_app_error(& AppError29001_ReconfLock);
return ENGINE_TMPFAIL;
}
/* READ LOCK RELEASED */
item->base.nsuffix = item->base.nkey - pfx->prefix_len;
if(item->base.nsuffix == 0) return ENGINE_EINVAL; // key too short
-
- if(wc && wc->freelist) {
+
+ if(wc == 0) return ENGINE_FAILED;
+
+ if(wc->freelist) { /* Get the next NDB from the freelist. */
inst = wc->freelist;
wc->freelist = inst->next;
}
- else {
- /* No more free NDBs.
- Eventually Scheduler::io_completed() will run _in this thread_ and return
- an NDB to the freelist. But no other thread can free one, so
- all we can do is return an error.
- (Or, alternately, the scheduler may be shutting down.)
- */
- log_app_error(& AppError9002_NoNDBs);
- return ENGINE_TMPFAIL;
+ else { /* No free NDBs. */
+ if(wc->sendqueue->is_aborted()) {
+ return ENGINE_TMPFAIL;
+ }
+ else { /* Try to make an NdbInstance on the fly */
+ inst = wc->newNdbInstance();
+ if(inst) {
+ log_app_error(& AppError29024_autogrow);
+ }
+ else {
+ /* We have hit a hard maximum. Eventually Scheduler::io_completed()
+ will run _in this thread_ and return an NDB to the freelist.
+ But no other thread can free one, so here we return an error.
+ */
+ log_app_error(& AppError29002_NoNDBs);
+ return ENGINE_TMPFAIL;
+ }
+ }
}
+ assert(inst);
inst->link_workitem(item);
// Fetch the query plan for this prefix.
@@ -520,6 +541,18 @@ void S::Cluster::add_stats(const char *s
/* WorkerConnection methods */
+
+NdbInstance * S::WorkerConnection::newNdbInstance() {
+ NdbInstance *inst = 0;
+ if(instances.current < instances.max) {
+ inst = new NdbInstance(conn->conn, 2);
+ instances.current++;
+ inst->id = ((id.thd + 1) * 10000) + instances.current;
+ }
+ return inst;
+}
+
+
S::WorkerConnection::WorkerConnection(SchedulerGlobal *global,
int thd_id, int cluster_id) {
S::Cluster *cl = global->clusters[cluster_id];
@@ -536,21 +569,25 @@ S::WorkerConnection::WorkerConnection(Sc
plan_set = new ConnQueryPlanSet(conn->conn, conf->nprefixes);
plan_set->buildSetForConfiguration(conf, cluster_id);
+ /* How many NDB instances to start initially */
+ instances.initial = conn->instances.initial / conn->n_workers;
+
+ /* Maximum size of send queue, and upper bound on NDB instances */
+ instances.max = conn->instances.max / conn->n_workers;
+
/* Build the freelist */
freelist = 0;
- int my_ndb_inst = conn->nInst / conn->n_workers;
- for(int j = 0 ; j < my_ndb_inst ; j++ ) {
- NdbInstance *inst = new NdbInstance(conn->conn, 2);
- inst->id = ((id.thd + 1) * 10000) + j + 1;
+ for(instances.current = 0; instances.current < instances.initial; ) {
+ NdbInstance *inst = newNdbInstance();
inst->next = freelist;
freelist = inst;
}
DEBUG_PRINT("Cluster %d, connection %d (node %d), worker %d: %d NDBs.",
- id.cluster, id.conn, id.node, id.thd, my_ndb_inst);
+ id.cluster, id.conn, id.node, id.thd, instances.current);
/* Initialize the sendqueue */
- sendqueue = new Queue<NdbInstance>(my_ndb_inst);
+ sendqueue = new Queue<NdbInstance>(instances.max);
/* Hoard a transaction (an API connect record) for each Ndb object. This
* first call to startTransaction() will send TC_SEIZEREQ and wait for a
@@ -560,7 +597,7 @@ S::WorkerConnection::WorkerConnection(Sc
QueryPlan *plan;
const KeyPrefix *prefix = conf->getNextPrefixForCluster(id.cluster, NULL);
if(prefix) {
- NdbTransaction ** txlist = new NdbTransaction * [my_ndb_inst];
+ NdbTransaction ** txlist = new NdbTransaction * [instances.current];
int i = 0;
// Open them all.
@@ -573,7 +610,7 @@ S::WorkerConnection::WorkerConnection(Sc
}
// Close them all.
- for(i = 0 ; i < my_ndb_inst ; i++) {
+ for(i = 0 ; i < instances.current ; i++) {
txlist[i]->close();
}
@@ -642,17 +679,29 @@ S::Connection::Connection(S::Cluster & _
n_workers = global->options.n_worker_threads / cluster.nconnections;
if(n_total_workers % cluster.nconnections > id) n_workers += 1;
- /* How many NDB objects are needed? */
- /* Note that this is used to configure hard limits on the size of the
- * waitgroup, the sentqueue, and the reschedulequeue -- and it will not be
+ /* How many NDB objects are needed for the desired performance? */
+ double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
+ instances.initial = (int) (total_ndb_objects / cluster.nconnections);
+ while(instances.initial % n_workers) instances.initial++; // round up
+
+ /* The maximum number of NDB objects.
+ * This is used to configure hard limits on the size of the waitgroup,
+ * the sentqueue, and the reschedulequeue -- and it will not be
* possible to increase those limits during online reconfig.
*/
- double total_ndb_objects = conf->figureInFlightTransactions(cluster.cluster_id);
- nInst = (int) (total_ndb_objects / cluster.nconnections);
- while(nInst % n_workers) nInst++; // round up
-
+ instances.max = instances.initial;
+ // allow the pool to grow on demand?
+ if(global->options.auto_grow)
+ instances.max = (int) (instances.max * 1.6);
+ // max_clients imposes a hard upper limit
+ if(instances.max > (global->options.max_clients / cluster.nconnections))
+ instances.max = global->options.max_clients / cluster.nconnections;
+ // instances.initial might also be subject to the max_clients limit
+ if(instances.initial > instances.max)
+ instances.initial = instances.max;
+
/* Get a multi-wait Poll Group */
- pollgroup = conn->create_ndb_wait_group(nInst);
+ pollgroup = conn->create_ndb_wait_group(instances.max);
/* Initialize the statistics */
stats.sent_operations = 0;
@@ -665,8 +714,8 @@ S::Connection::Connection(S::Cluster & _
sem.counter = 0;
/* Initialize the queues for sent and resceduled items */
- sentqueue = new Queue<NdbInstance>(nInst);
- reschedulequeue = new Queue<NdbInstance>(nInst);
+ sentqueue = new Queue<NdbInstance>(instances.max);
+ reschedulequeue = new Queue<NdbInstance>(instances.max);
}
@@ -726,6 +775,14 @@ void S::Connection::add_stats(const char
klen = sprintf(key, "cl%d.conn%d.timeout_races", cluster.cluster_id, id);
vlen = sprintf(val, "%llu", stats.timeout_races);
add_stat(key, klen, val, vlen, cookie);
+
+ klen = sprintf(key, "cl%d.conn%d.instances.initial", cluster.cluster_id, id);
+ vlen = sprintf(val, "%d", instances.initial);
+ add_stat(key, klen, val, vlen, cookie);
+
+ klen = sprintf(key, "cl%d.conn%d.instances.max", cluster.cluster_id, id);
+ vlen = sprintf(val, "%d", instances.max);
+ add_stat(key, klen, val, vlen, cookie);
}
=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.h'
--- a/storage/ndb/memcache/src/schedulers/S_sched.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/S_sched.h 2012-04-06 00:12:54 +0000
@@ -61,7 +61,7 @@ class S::SchedulerGlobal {
public:
SchedulerGlobal(Configuration *);
~SchedulerGlobal() {};
- void init(int threads, const char *config_string);
+ void init(const scheduler_options *options);
void add_stats(const char *, ADD_STAT, const void *);
void reconfigure(Configuration *);
void shutdown();
@@ -82,6 +82,8 @@ public:
int n_connections; /** preferred number of NDB cluster connections */
int force_send; /** how to use NDB force-send */
int send_timer; /** milliseconds to set for adaptive send timer */
+ int auto_grow; /** whether to allow NDB instance pool to grow */
+ int max_clients; /** memcached max allowed connections */
} options;
private:
@@ -99,7 +101,7 @@ class S::SchedulerWorker : public Schedu
public:
SchedulerWorker() {};
~SchedulerWorker() {};
- void init(int threadnum, int nthreads, const char *config_string);
+ void init(int threadnum, const scheduler_options * sched_opts);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
void yield(workitem *) const {};
@@ -111,6 +113,7 @@ public:
private:
int id;
+ ndb_pipeline *pipeline;
SchedulerGlobal * m_global;
};
@@ -159,9 +162,12 @@ private:
Queue<NdbInstance> * reschedulequeue;
int id;
int node_id;
- int nInst;
- int n_total_workers;
- int n_workers;
+ int n_total_workers; /* same as SchedulerGlobal::options.n_worker_threads */
+ int n_workers; /* number of workers for this connection */
+ struct {
+ int initial; /* start with this many NDB instances */
+ int max; /* scale up to this many */
+ } instances;
pthread_t send_thread_id;
pthread_t poll_thread_id;
struct {
@@ -187,6 +193,7 @@ public:
~WorkerConnection();
void shutdown();
void reconfigure(Configuration *);
+ NdbInstance * newNdbInstance();
struct {
int thd : 8;
@@ -194,6 +201,11 @@ public:
int conn : 8;
unsigned int node : 8;
} id;
+ struct {
+ int initial;
+ int current;
+ int max;
+ } instances;
S::Connection *conn;
ConnQueryPlanSet *plan_set, *old_plan_set;
NdbInstance *freelist;
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.cc'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.cc 2012-04-03 00:34:18 +0000
@@ -46,14 +46,15 @@ extern "C" {
}
-void Scheduler_stockholm::init(int my_thread, int nthreads, const char *config_string) {
+void Scheduler_stockholm::init(int my_thread,
+ const scheduler_options *options) {
const Configuration & conf = get_Configuration();
/* How many NDB instances are needed per cluster? */
for(unsigned int c = 0 ; c < conf.nclusters ; c++) {
ClusterConnectionPool *pool = conf.getConnectionPoolById(c);
double total_ndb_objects = conf.figureInFlightTransactions(c);
- cluster[c].nInst = (int) total_ndb_objects / nthreads;
+ cluster[c].nInst = (int) total_ndb_objects / options->nthreads;
DEBUG_PRINT("cluster %d: %d TPS @ %d usec RTT ==> %d NDB instances.",
c, conf.max_tps, pool->usec_rtt, cluster[c].nInst);
}
=== modified file 'storage/ndb/memcache/src/schedulers/Stockholm.h'
--- a/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-03-07 01:22:53 +0000
+++ b/storage/ndb/memcache/src/schedulers/Stockholm.h 2012-04-03 00:34:18 +0000
@@ -44,7 +44,7 @@ class Scheduler_stockholm : public Sched
public:
Scheduler_stockholm() {};
~Scheduler_stockholm() {};
- void init(int threadnum, int nthreads, const char *config_string);
+ void init(int threadnum, const scheduler_options *options);
void attach_thread(thread_identifier *);
ENGINE_ERROR_CODE schedule(workitem *);
void yield(workitem *) const; // inlined
@@ -56,6 +56,7 @@ public:
bool global_reconfigure(Configuration *) { return false; } ;
private:
+ ndb_pipeline *pipeline;
struct {
struct workqueue *queue;
struct sched_stats_stockholm {
=== modified file 'storage/ndb/memcache/unit/alloc.cc'
--- a/storage/ndb/memcache/unit/alloc.cc 2011-12-18 23:26:44 +0000
+++ b/storage/ndb/memcache/unit/alloc.cc 2012-04-03 00:34:18 +0000
@@ -27,14 +27,16 @@
#include "all_tests.h"
+#define TEST_ALLOC_BLOCKS 34
+
int run_allocator_test(QueryPlan *, Ndb *, int v) {
- struct request_pipeline *p = get_request_pipeline(0);
+ struct request_pipeline *p = get_request_pipeline(0, NULL);
memory_pool *p1 = pipeline_create_memory_pool(p);
int sz = 13;
uint tot = 0;
void *v1, *v2;
- for(int i = 0 ; i < 25 ; i++) {
+ for(int i = 0 ; i < TEST_ALLOC_BLOCKS ; i++) {
v1 = memory_pool_alloc(p1, sz); tot += sz;
v2 = memory_pool_alloc(p1, sz + 1); tot += sz + 1;
sz = (int) (sz * 1.25);
=== modified file 'storage/ndb/src/ndbapi/NdbQueryBuilder.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2012-03-30 13:18:59 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryBuilder.cpp 2012-04-11 10:13:24 +0000
@@ -702,24 +702,7 @@ NdbQueryOperationDef::getIndex() const
NdbQueryBuilder* NdbQueryBuilder::create()
{
NdbQueryBuilderImpl* const impl = new NdbQueryBuilderImpl();
- if (likely (impl != NULL))
- {
- if (likely(impl->getNdbError().code == 0))
- {
- return &impl->m_interface;
- }
- else
- {
- // Probably failed to create Vector instances.
- assert(impl->getNdbError().code == Err_MemoryAlloc);
- delete impl;
- return NULL;
- }
- }
- else
- {
- return NULL;
- }
+ return (likely(impl!=NULL)) ? &impl->m_interface : NULL;
}
void NdbQueryBuilder::destroy()
@@ -1116,17 +1099,11 @@ NdbQueryBuilder::prepare()
NdbQueryBuilderImpl::NdbQueryBuilderImpl()
: m_interface(*this),
m_error(),
- m_operations(),
- m_operands(),
+ m_operations(0),
+ m_operands(0),
m_paramCnt(0),
m_hasError(false)
-{
- if (errno == ENOMEM)
- {
- // ENOMEM probably comes from Vector().
- setErrorCode(Err_MemoryAlloc);
- }
-}
+{}
NdbQueryBuilderImpl::~NdbQueryBuilderImpl()
{
@@ -1242,12 +1219,12 @@ NdbQueryDefImpl(const Vector<NdbQueryOpe
const Vector<NdbQueryOperandImpl*>& operands,
int& error)
: m_interface(*this),
- m_operations(operations),
- m_operands(operands)
+ m_operations(0),
+ m_operands(0)
{
- if (errno == ENOMEM)
+ if (m_operations.assign(operations) || m_operands.assign(operands))
{
- // Failed to allocate memory for m_operations or m_operands.
+ // Failed to allocate memory in Vector::assign().
error = Err_MemoryAlloc;
return;
}
@@ -1909,16 +1886,10 @@ NdbQueryOperationDefImpl::NdbQueryOperat
m_opNo(opNo), m_internalOpNo(internalOpNo),
m_options(options),
m_parent(NULL),
- m_children(),
- m_params(),
- m_spjProjection()
+ m_children(0),
+ m_params(0),
+ m_spjProjection(0)
{
- if (unlikely(errno == ENOMEM))
- {
- // Heap allocation in Vector() must have failed.
- error = Err_MemoryAlloc;
- return;
- }
if (unlikely(m_internalOpNo >= NDB_SPJ_MAX_TREE_NODES))
{
error = QRY_DEFINITION_TOO_LARGE;
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2012-03-30 13:18:59 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2012-04-11 10:13:24 +0000
@@ -3788,7 +3788,7 @@ NdbQueryOperationImpl::NdbQueryOperation
m_queryImpl(queryImpl),
m_operationDef(def),
m_parent(NULL),
- m_children(def.getNoOfChildOperations()),
+ m_children(0),
m_maxBatchRows(0), // >0: User specified prefered value, ==0: Use default CFG values
m_params(),
m_resultBuffer(NULL),
@@ -3805,9 +3805,9 @@ NdbQueryOperationImpl::NdbQueryOperation
? Parallelism_max : Parallelism_adaptive),
m_rowSize(0xffffffff)
{
- if (errno == ENOMEM)
+ if (m_children.expand(def.getNoOfChildOperations()))
{
- // Memory allocation in Vector() (for m_children) assumed to have failed.
+ // Memory allocation during Vector::expand() failed.
queryImpl.setErrorCode(Err_MemoryAlloc);
return;
}
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.3 branch (magnus.blaudd:3876 to 3877) | magnus.blaudd | 11 Apr |