------------------------------------------------------------
revno: 136
revision-id: mtaylor@stripped
parent: mtaylor@stripped
committer: Monty Taylor <mtaylor@stripped>
branch nick: ndbjmerge
timestamp: Mon 2007-07-02 17:21:23 -0700
message:
Added support to NDB/J layer for async transactions.
Added ndbj.examples.TestAsync.java and ndbj.examples.TestBaseCallback.java.
The code was already there to do the async from ndb/j in the Impl classes, but hadn't
been added to the public interfaces.
added:
java/com/mysql/cluster/ndbj/examples/TestAsync.java
testasync.java-20070703002120-jsnz1ul38pp1h3w3-1
java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java
testbasecallback.jav-20070703002120-jsnz1ul38pp1h3w3-2
modified:
java/com/mysql/cluster/ndbj/Ndb.java ndb.java-20070517181935-98huwjarzuh25b30-2
java/com/mysql/cluster/ndbj/NdbImpl.java ndbimpl.java-20070517181935-98huwjarzuh25b30-14
java/com/mysql/cluster/ndbj/NdbScanOperation.java
ndbscanoperation.jav-20070517181935-98huwjarzuh25b30-25
java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java
ndbscanoperationimpl-20070517181935-98huwjarzuh25b30-26
java/com/mysql/cluster/ndbj/NdbTransaction.java
ndbtransaction.java-20070517181935-98huwjarzuh25b30-27
java/com/mysql/cluster/ndbj/NdbTransactionImpl.java
ndbtransactionimpl.j-20070517181935-98huwjarzuh25b30-28
=== added file 'java/com/mysql/cluster/ndbj/examples/TestAsync.java'
--- a/java/com/mysql/cluster/ndbj/examples/TestAsync.java 1970-01-01 00:00:00 +0000
+++ b/java/com/mysql/cluster/ndbj/examples/TestAsync.java 2007-07-03 00:21:23 +0000
@@ -0,0 +1,273 @@
+package com.mysql.cluster.ndbj.examples;
+
+import java.sql.*;
+
+import java.util.Date;
+import java.util.ArrayList;
+import com.mysql.cluster.ndbj.*;
+import com.mysql.cluster.errors.*;
+
+
+public class TestAsync {
+
+ private static Date beginTime;
+ private static Date initEndTime;
+ private static Date endTime;
+ private static long minTime = 10;
+ private static long maxTime = 0;
+
+ private static int num_iter = 0;
+ private static int INSERT_NUM = 0;
+
+ static {
+ System.loadLibrary("ndbj");
+ }
+
+
+ public static void main(String argv[]) throws SQLException{
+
+ if (argv.length<2) {
+ System.out.println("Usage:\n\tjava test NUM_OF_ITERATIONS NUM_OF_ROWS ");
+ System.exit(1);
+ }
+
+ num_iter = Integer.parseInt(argv[0]);
+ INSERT_NUM = Integer.parseInt(argv[1]);
+ int BATCH_SIZE=100;
+
+ //ndbapi.ndb_init();
+
+ /**************************************************************
+ * Connect to mysql server and create table *
+ **************************************************************/
+
+ String table_name = "mytablename";
+ try {
+ Class.forName("com.mysql.jdbc.Driver");
+ } catch (ClassNotFoundException e) {
+ System.out.println("MySQL JDBC Driver not found");
+ }
+
+ Connection conn =
DriverManager.getConnection("jdbc:mysql://localhost/test?user=root");
+
+ System.out.println("Dropping and recreating schema");
+
+ Statement s = conn.createStatement();
+
+
+ try {
+ s.executeUpdate("DROP TABLE if exists " + table_name);
+ } catch (SQLException e) {
+ }
+ try {
+ s.executeUpdate("CREATE TABLE if not exists " +
+ table_name +
+ " (ATTR1 INT UNSIGNED," +
+ " ATTR2 INT UNSIGNED NOT NULL," +
+ " PRIMARY KEY (ATTR1) )" +
+ " ENGINE=NDBCLUSTER");
+ } catch (SQLException e) {
+ }
+
+
+ /**************************************************************
+ * Connect to ndb cluster *
+ **************************************************************/
+
+ System.out.println("connecting to cluster");
+
+ try {
+ NdbClusterConnection connection = NdbFactory.createNdbClusterConnection();
+
+ try {
+ if (connection.connect(5,3,true)==-1) {
+ System.out.println("Connect to cluster management server failed.");
+ System.exit(-1);
+ }
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ System.exit(-1);
+ }
+
+
+
+ try {
+ connection.waitUntilReady(30,30);
+ } catch (Exception e) {
+ System.out.println("Cluster was not ready within 30 secs.");
+ System.exit(-1);
+ }
+
+ Ndb myNdb = connection.createNdb( "test" ) ;
+
+ myNdb.init(4);
+
+ System.out.println("running tests:");
+
+ /**************************************************************************
+ * Fill table with tuples, using auto_increment IDs *
+ **************************************************************************/
+
+ beginTime = new Date(System.currentTimeMillis());
+
+
+ for(int t=0;t<Math.ceil(INSERT_NUM/BATCH_SIZE);t++) {
+
+ NdbTransaction myTransaction = myNdb.startTransaction();
+
+ int val = ((t+1)*BATCH_SIZE)-INSERT_NUM;
+ int offset = 0;
+ if ( val > 0 ) {
+ offset = val;
+ }
+
+ for(int i=0;i<BATCH_SIZE-offset;i++) {
+
+ NdbOperation myOperation = myTransaction.getNdbOperation(table_name);
+ if (myOperation == null) {
+ System.out.println(myTransaction.getNdbError().getMessage());
+ System.exit(1);
+ }
+
+ myOperation.insertTuple();
+ java.math.BigInteger auto_id = myNdb.getAutoIncrementValue(table_name,BATCH_SIZE);
+
+ myOperation.equal("ATTR1",auto_id);
+ myOperation.setLong("ATTR2", t*BATCH_SIZE+i);
+
+ }
+
+ myTransaction.execute( NdbTransaction.ExecType.Commit,
NdbTransaction.AbortOption.AbortOnError, true );
+
+
+
+ myTransaction.close();
+
+
+ }
+ endTime = new Date(System.currentTimeMillis());
+ System.out.println("Insert time for " + INSERT_NUM + ":");
+ System.out.println(" " + (endTime.getTime() - beginTime.getTime()) + "ms");
+
+
+ /*********************************
+ * Get list of ids
+ *********************************/
+
+ System.out.println();
+ System.out.println("Getting list of ids");
+
+ NdbTransaction myTransaction = myNdb.startTransaction();
+
+ NdbScanOperation myScanOperation=myTransaction.getNdbScanOperation(table_name);
+
+ if (myScanOperation == null) {
+ System.out.println(myTransaction.getNdbError().getMessage());
+ }
+
+
+ myScanOperation.readTuples(NdbOperation.LockMode.LM_CommittedRead);
+
+
+ myScanOperation.getValue("ATTR1");
+
+
+ NdbResultSet rs = myScanOperation.resultData();
+ myTransaction.execute(NdbTransaction.ExecType.NoCommit);
+
+ ArrayList<Integer> ids = new ArrayList<Integer>();
+
+ while (true) {
+
+ if (myScanOperation.nextResult(true) != 0) {
+ break;
+ }
+
+ int random_id = rs.getInt("ATTR1");
+ ids.add(random_id);
+ }
+
+ myTransaction.close();
+
+ /*******************
+ * Test NDB API Speed
+ *******************/
+
+ System.out.println("Testing NDBAPI speed");
+ long foo = 0;
+
+ beginTime = new Date(System.currentTimeMillis());
+ for(int x=0;x<num_iter;x++){
+
+ int rand_id = (int) (Math.random() * ids.size() ) ;
+ int id_num = (int)ids.get(rand_id);
+
+ NdbTransaction myTrans = myNdb.startTransaction(); //(table_name,id_num);
+
+
+ NdbOperation myOper = myTrans.getNdbOperation(table_name);
+ myOper.readTuple(NdbOperation.LockMode.LM_Read);
+
+ myOper.equal("ATTR1",id_num);
+
+ myOper.getValue("ATTR2");
+
+
+ rs = myOper.resultData();
+
+ myTrans.execute(NdbTransaction.ExecType.Commit);
+
+
+ foo=rs.getInt("ATTR2");
+ myTrans.close();
+ }
+ endTime = new Date(System.currentTimeMillis());
+ System.out.println("NDBAPI Execution time for " + num_iter + ": ");
+ System.out.println(" "+(endTime.getTime() - beginTime.getTime())+"ms");
+ s.close();
+
+
+ System.out.println("Testing NDBAPI async speed");
+
+
+ ArrayList<TestBaseCallback> cbs = new ArrayList<TestBaseCallback>();
+
+ beginTime = new Date(System.currentTimeMillis());
+ for(int x=0;x<num_iter;x++){
+
+ int rand_id = (int) (Math.random() * ids.size() ) ;
+ int id_num = (int)ids.get(rand_id);
+
+ NdbTransaction myTrans = myNdb.startTransaction(); //table_name,id_num);
+
+
+ NdbOperation myOper = myTrans.getNdbOperation(table_name);
+ myOper.readTuple(NdbOperation.LockMode.LM_Read);
+
+ myOper.equal("ATTR1",id_num);
+
+
+ myOper.getValue("ATTR2");
+
+ rs = myOper.resultData();
+ TestBaseCallback cb = new TestBaseCallback(myNdb,rs);
+
+ myTrans.executeAsynchPrepare(NdbTransaction.ExecType.Commit, cb );
+
+// foo=myRecAttr.int32_value();
+// myNdb.closeTransaction(myTrans);
+ }
+
+ myNdb.sendPollNdb(5000);
+ endTime = new Date(System.currentTimeMillis());
+ System.out.println("NDBAPI Execution time for " + num_iter + ": ");
+ System.out.println(" "+(endTime.getTime() - beginTime.getTime())+"ms");
+ s.close();
+
+ } catch ( NdbApiException e ) {
+ e.printStackTrace();
+ System.exit(-1);
+ }
+
+ }
+}
=== added file 'java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java'
--- a/java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java 1970-01-01 00:00:00 +0000
+++ b/java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java 2007-07-03 00:21:23 +0000
@@ -0,0 +1,24 @@
+package com.mysql.cluster.ndbj.examples;
+
+import com.mysql.cluster.ndbj.*;
+import com.mysql.cluster.errors.*;
+
+class TestBaseCallback extends com.mysql.cluster.ndbapi.BaseCallback {
+
+ NdbResultSet myRs;
+ Ndb myNdb;
+
+ public TestBaseCallback(Ndb theNdb, NdbResultSet theRs) {
+ this.myRs=theRs;
+ this.myNdb=theNdb;
+ }
+
+ public void callback(int result, NdbTransaction myTrans) {
+ System.out.println("result " + result);
+ try {
+ System.out.println("value " + this.myRs.getInt("ATTR2"));
+ } catch (NdbApiException e) {
+ System.out.println("Got an exception in the callback: " + e.getMessage());
+ }
+ };
+}
=== modified file 'java/com/mysql/cluster/ndbj/Ndb.java'
--- a/java/com/mysql/cluster/ndbj/Ndb.java 2007-05-20 09:20:25 +0000
+++ b/java/com/mysql/cluster/ndbj/Ndb.java 2007-07-03 00:21:23 +0000
@@ -1,5 +1,7 @@
package com.mysql.cluster.ndbj;
+import java.math.BigInteger;
+
import com.mysql.cluster.errors.*;
/**
*
@@ -107,5 +109,8 @@
* ndb object is used.
*/
public abstract void close();
+
+ public BigInteger getAutoIncrementValue(String aTableName, long cacheSize) throws
NdbApiException;
+ public abstract int sendPollNdb(int aMillisecondNumber) throws NdbApiException;
}
\ No newline at end of file
=== modified file 'java/com/mysql/cluster/ndbj/NdbImpl.java'
--- a/java/com/mysql/cluster/ndbj/NdbImpl.java 2007-06-25 18:38:46 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbImpl.java 2007-07-03 00:21:23 +0000
@@ -1,5 +1,6 @@
package com.mysql.cluster.ndbj;
+import java.math.BigInteger;
import java.util.HashSet;
import java.util.Set;
@@ -302,5 +303,58 @@
public int pollEvents(int aMillisecondNumber) throws NdbApiException {
return ndbRef.pollEvents(aMillisecondNumber);
}
+
+ /**
+ * @param aTableName
+ * @param cacheSize
+ * @return
+ * @throws NdbApiException
+ * @see com.mysql.cluster.ndbapi.Ndb#getAutoIncrementValue(java.lang.String, long)
+ */
+ public BigInteger getAutoIncrementValue(String aTableName, long cacheSize) throws
NdbApiException {
+ return ndbRef.getAutoIncrementValue(aTableName, cacheSize);
+ }
+
+ /**
+ * @return
+ * @throws NdbApiException
+ * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb()
+ */
+ public int sendPollNdb() throws NdbApiException {
+ return ndbRef.sendPollNdb();
+ }
+
+ /**
+ * @param aMillisecondNumber
+ * @param minNoOfEventsToWakeup
+ * @param forceSend
+ * @return
+ * @throws NdbApiException
+ * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb(int, int, int)
+ */
+ public int sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend)
throws NdbApiException {
+ return ndbRef.sendPollNdb(aMillisecondNumber, minNoOfEventsToWakeup, forceSend);
+ }
+
+ /**
+ * @param aMillisecondNumber
+ * @param minNoOfEventsToWakeup
+ * @return
+ * @throws NdbApiException
+ * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb(int, int)
+ */
+ public int sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup) throws
NdbApiException {
+ return ndbRef.sendPollNdb(aMillisecondNumber, minNoOfEventsToWakeup);
+ }
+
+ /**
+ * @param aMillisecondNumber
+ * @return
+ * @throws NdbApiException
+ * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb(int)
+ */
+ public int sendPollNdb(int aMillisecondNumber) throws NdbApiException {
+ return ndbRef.sendPollNdb(aMillisecondNumber);
+ }
}
=== modified file 'java/com/mysql/cluster/ndbj/NdbScanOperation.java'
--- a/java/com/mysql/cluster/ndbj/NdbScanOperation.java 2007-05-21 00:08:02 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbScanOperation.java 2007-07-03 00:21:23 +0000
@@ -92,6 +92,7 @@
* Use 0 to specify the maximum automatically.
* @throws NdbApiException if the object has been closed or if readTuples failed
*/
+ public abstract void readTuples(LockMode mode) throws NdbApiException;
public abstract void readTuples(LockMode mode, long parallel,
long batch) throws NdbApiException;
=== modified file 'java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java'
--- a/java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java 2007-05-21 00:08:02 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java 2007-07-03 00:21:23 +0000
@@ -1,6 +1,7 @@
package com.mysql.cluster.ndbj;
import com.mysql.cluster.errors.*;
+import com.mysql.cluster.ndbapi.NdbOperation.LockMode;
import java.util.ArrayList;
@@ -270,6 +271,16 @@
}
+ /**
+ * @param lock_mode
+ * @return
+ * @throws NdbApiException
+ * @see
com.mysql.cluster.ndbapi.NdbScanOperation#readTuples(com.mysql.cluster.ndbapi.NdbOperation.LockMode)
+ */
+ public void readTuples(LockMode lock_mode) throws NdbApiException {
+ scanOpRef.readTuples(com.mysql.cluster.ndbapi.NdbScanOperation.LockMode.swigToEnum(lock_mode.type));
+ }
+
=== modified file 'java/com/mysql/cluster/ndbj/NdbTransaction.java'
--- a/java/com/mysql/cluster/ndbj/NdbTransaction.java 2007-06-29 15:17:26 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbTransaction.java 2007-07-03 00:21:23 +0000
@@ -1,6 +1,8 @@
package com.mysql.cluster.ndbj;
import com.mysql.cluster.errors.*;
+import com.mysql.cluster.ndbapi.BaseCallback;
+
import java.math.BigInteger;
/**
@@ -207,7 +209,12 @@
* @throws IllegalStateException if the close method has already been called on the
object.
*/
public abstract void execute(ExecType execType, AbortOption abortOption, boolean force)
throws NdbApiException;
+ public abstract void execute(ExecType execType, AbortOption abortOption) throws
NdbApiException;
+ public abstract void execute(ExecType execType) throws NdbApiException;
+ public void executeAsynchPrepare(ExecType execType, BaseCallback cb,
+ AbortOption abortOption);
+ public void executeAsynchPrepare(ExecType execType, BaseCallback cb);
/**
* Creates a NdbOperation (primary key operation) and adds it to the transaction.
* @param tableName name of the schema
=== modified file 'java/com/mysql/cluster/ndbj/NdbTransactionImpl.java'
--- a/java/com/mysql/cluster/ndbj/NdbTransactionImpl.java 2007-06-29 15:17:26 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbTransactionImpl.java 2007-07-03 00:21:23 +0000
@@ -299,9 +299,9 @@
* @see
com.mysql.cluster.ndbapi.NdbTransaction#execute(com.mysql.cluster.ndbapi.ExecType,
* com.mysql.cluster.ndbapi.AbortOption)
*/
- public int execute(ExecType execType, AbortOption abortOption)
+ public void execute(ExecType execType, AbortOption abortOption)
throws NdbApiException {
- return transRef.execute(
+ transRef.execute(
com.mysql.cluster.ndbapi.ExecType.swigToEnum(execType.getEnumValue()),
com.mysql.cluster.ndbapi.AbortOption.swigToEnum(abortOption.getEnumValue())
);
@@ -313,8 +313,8 @@
* @throws NdbApiException
* @see
com.mysql.cluster.ndbapi.NdbTransaction#execute(com.mysql.cluster.ndbapi.ExecType)
*/
- public int execute(ExecType execType) throws NdbApiException {
- return transRef.execute(
+ public void execute(ExecType execType) throws NdbApiException {
+ transRef.execute(
com.mysql.cluster.ndbapi.ExecType.swigToEnum(execType.getEnumValue()));
}
| Thread |
|---|
| • Rev 136: Added support to NDB/J layer for async transactions. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/ndbjmerge | Monty Taylor | 3 Jul |