List:NDB Connectors« Previous MessageNext Message »
From:Monty Taylor Date:July 3 2007 12:25am
Subject:Rev 136: Added support to NDB/J layer for async transactions. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/ndbjmerge
View as plain text  
------------------------------------------------------------
revno: 136
revision-id: mtaylor@stripped
parent: mtaylor@stripped
committer: Monty Taylor <mtaylor@stripped>
branch nick: ndbjmerge
timestamp: Mon 2007-07-02 17:21:23 -0700
message:
  Added support to NDB/J layer for async transactions. 
  Added ndbj.examples.TestAsync.java and ndbj.examples.TestBaseCallback.java.
  The code was already there to do the async from ndb/j in the Impl classes, but hadn't
been added to the public interfaces. 
added:
  java/com/mysql/cluster/ndbj/examples/TestAsync.java
testasync.java-20070703002120-jsnz1ul38pp1h3w3-1
  java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java
testbasecallback.jav-20070703002120-jsnz1ul38pp1h3w3-2
modified:
  java/com/mysql/cluster/ndbj/Ndb.java ndb.java-20070517181935-98huwjarzuh25b30-2
  java/com/mysql/cluster/ndbj/NdbImpl.java ndbimpl.java-20070517181935-98huwjarzuh25b30-14
  java/com/mysql/cluster/ndbj/NdbScanOperation.java
ndbscanoperation.jav-20070517181935-98huwjarzuh25b30-25
  java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java
ndbscanoperationimpl-20070517181935-98huwjarzuh25b30-26
  java/com/mysql/cluster/ndbj/NdbTransaction.java
ndbtransaction.java-20070517181935-98huwjarzuh25b30-27
  java/com/mysql/cluster/ndbj/NdbTransactionImpl.java
ndbtransactionimpl.j-20070517181935-98huwjarzuh25b30-28
=== added file 'java/com/mysql/cluster/ndbj/examples/TestAsync.java'
--- a/java/com/mysql/cluster/ndbj/examples/TestAsync.java	1970-01-01 00:00:00 +0000
+++ b/java/com/mysql/cluster/ndbj/examples/TestAsync.java	2007-07-03 00:21:23 +0000
@@ -0,0 +1,273 @@
+package com.mysql.cluster.ndbj.examples;
+
+import java.sql.*;
+
+import java.util.Date;
+import java.util.ArrayList;
+import com.mysql.cluster.ndbj.*;
+import com.mysql.cluster.errors.*;
+
+
+public class TestAsync { 
+
+  private static Date beginTime;
+  private static Date initEndTime;
+  private static Date endTime;
+  private static long minTime = 10;
+  private static long maxTime = 0;
+
+  private static int num_iter = 0; 
+  private static int INSERT_NUM = 0; 
+
+  static { 
+    System.loadLibrary("ndbj");
+  }
+
+
+  public static void main(String argv[]) throws SQLException{ 
+
+    if (argv.length<2) { 
+      System.out.println("Usage:\n\tjava test NUM_OF_ITERATIONS NUM_OF_ROWS "); 
+      System.exit(1);
+    }
+
+    num_iter = Integer.parseInt(argv[0]);
+    INSERT_NUM = Integer.parseInt(argv[1]);
+    int BATCH_SIZE=100;
+
+    //ndbapi.ndb_init();
+
+    /**************************************************************
+     * Connect to mysql server and create table                   *
+     **************************************************************/
+    
+    String table_name = "mytablename";
+    try { 
+      Class.forName("com.mysql.jdbc.Driver");
+    } catch (ClassNotFoundException e) { 
+      System.out.println("MySQL JDBC Driver not found"); 
+    }
+
+    Connection conn =
DriverManager.getConnection("jdbc:mysql://localhost/test?user=root");
+
+    System.out.println("Dropping and recreating schema");
+
+    Statement s = conn.createStatement(); 
+
+    
+    try { 
+      s.executeUpdate("DROP TABLE if exists " + table_name);
+    } catch (SQLException e) { 
+    }
+    try { 
+      s.executeUpdate("CREATE TABLE if not exists " +
+		      table_name + 
+		      "    (ATTR1 INT UNSIGNED," +
+		      "     ATTR2 INT UNSIGNED NOT NULL," +
+		      "     PRIMARY KEY (ATTR1) )" + 
+		      "  ENGINE=NDBCLUSTER");
+    } catch (SQLException e) { 
+    }
+    
+
+    /**************************************************************
+     * Connect to ndb cluster                                     *
+     **************************************************************/
+
+    System.out.println("connecting to cluster");
+
+  try { 
+    NdbClusterConnection connection = NdbFactory.createNdbClusterConnection();
+
+    try {
+    if (connection.connect(5,3,true)==-1) { 
+      System.out.println("Connect to cluster management server failed.");
+      System.exit(-1);
+    }
+    } catch (Exception e) { 
+      System.out.println(e.getMessage());
+      System.exit(-1);
+    }
+      
+
+    
+    try { 
+       connection.waitUntilReady(30,30); 
+    } catch (Exception e) { 
+        System.out.println("Cluster was not ready within 30 secs.");
+    	System.exit(-1);
+    }
+
+    Ndb myNdb = connection.createNdb( "test" ) ;
+
+    myNdb.init(4); 
+    
+    System.out.println("running tests:");
+
+    /**************************************************************************
+     * Fill table with tuples, using auto_increment IDs                       *
+     **************************************************************************/
+
+    beginTime = new Date(System.currentTimeMillis());
+
+
+    for(int t=0;t<Math.ceil(INSERT_NUM/BATCH_SIZE);t++) { 
+
+      NdbTransaction myTransaction = myNdb.startTransaction();
+
+      int val = ((t+1)*BATCH_SIZE)-INSERT_NUM;
+      int offset = 0;
+      if ( val > 0 ) {
+	offset = val; 
+      }
+
+      for(int i=0;i<BATCH_SIZE-offset;i++) { 
+
+    	  NdbOperation myOperation = myTransaction.getNdbOperation(table_name);
+    	  if (myOperation == null) { 
+    		  System.out.println(myTransaction.getNdbError().getMessage());
+    		  System.exit(1);
+    	  }
+
+    	  myOperation.insertTuple();
+    	  java.math.BigInteger auto_id = myNdb.getAutoIncrementValue(table_name,BATCH_SIZE);
+
+    	  myOperation.equal("ATTR1",auto_id);
+    	  myOperation.setLong("ATTR2", t*BATCH_SIZE+i);
+
+      }      
+
+      myTransaction.execute( NdbTransaction.ExecType.Commit,
NdbTransaction.AbortOption.AbortOnError, true ); 
+
+
+
+      myTransaction.close();
+
+
+    }
+    endTime = new Date(System.currentTimeMillis());
+    System.out.println("Insert time for " + INSERT_NUM + ":");
+    System.out.println("   " + (endTime.getTime() - beginTime.getTime()) + "ms"); 
+
+
+    /*********************************
+     * Get list of ids
+     *********************************/
+
+    System.out.println();
+    System.out.println("Getting list of ids");
+
+    NdbTransaction myTransaction = myNdb.startTransaction();
+
+    NdbScanOperation myScanOperation=myTransaction.getNdbScanOperation(table_name);
+
+    if (myScanOperation == null) { 
+      System.out.println(myTransaction.getNdbError().getMessage());
+    }
+    
+    
+    myScanOperation.readTuples(NdbOperation.LockMode.LM_CommittedRead);
+    
+    
+    myScanOperation.getValue("ATTR1");
+    
+
+	NdbResultSet rs = myScanOperation.resultData();
+    myTransaction.execute(NdbTransaction.ExecType.NoCommit);
+
+    ArrayList<Integer> ids = new ArrayList<Integer>();
+
+    while (true) { 
+      
+      if (myScanOperation.nextResult(true) != 0) { 
+	break;
+      }
+      
+      int random_id = rs.getInt("ATTR1");
+      ids.add(random_id);
+    }
+      
+    myTransaction.close();
+
+    /*******************
+     * Test NDB API Speed
+     *******************/
+    
+    System.out.println("Testing NDBAPI speed");
+    long foo = 0;
+
+    beginTime = new Date(System.currentTimeMillis());
+    for(int x=0;x<num_iter;x++){ 
+      
+      int rand_id = (int) (Math.random() * ids.size() ) ; 
+      int id_num = (int)ids.get(rand_id);
+
+      NdbTransaction myTrans = myNdb.startTransaction(); //(table_name,id_num);
+      
+      
+      NdbOperation myOper = myTrans.getNdbOperation(table_name);
+      myOper.readTuple(NdbOperation.LockMode.LM_Read);
+      
+      myOper.equal("ATTR1",id_num);
+      
+      myOper.getValue("ATTR2"); 
+      
+      
+      rs = myOper.resultData();
+      
+      myTrans.execute(NdbTransaction.ExecType.Commit); 
+
+
+      foo=rs.getInt("ATTR2");
+      myTrans.close();
+    }
+    endTime = new Date(System.currentTimeMillis());
+    System.out.println("NDBAPI Execution time for " + num_iter + ": ");
+    System.out.println("   "+(endTime.getTime() - beginTime.getTime())+"ms");
+    s.close();
+    
+
+    System.out.println("Testing NDBAPI async speed");
+
+    
+    ArrayList<TestBaseCallback> cbs = new ArrayList<TestBaseCallback>();
+
+    beginTime = new Date(System.currentTimeMillis());
+    for(int x=0;x<num_iter;x++){ 
+      
+      int rand_id = (int) (Math.random() * ids.size() ) ; 
+      int id_num = (int)ids.get(rand_id);
+
+      NdbTransaction myTrans = myNdb.startTransaction(); //table_name,id_num);
+      
+      
+      NdbOperation myOper = myTrans.getNdbOperation(table_name);
+      myOper.readTuple(NdbOperation.LockMode.LM_Read);
+      
+      myOper.equal("ATTR1",id_num);
+
+    
+      myOper.getValue("ATTR2"); 
+
+      rs = myOper.resultData();
+      TestBaseCallback cb = new TestBaseCallback(myNdb,rs);
+      
+      myTrans.executeAsynchPrepare(NdbTransaction.ExecType.Commit, cb );
+
+//      foo=myRecAttr.int32_value();
+//      myNdb.closeTransaction(myTrans);
+    }
+
+    myNdb.sendPollNdb(5000);
+    endTime = new Date(System.currentTimeMillis());
+    System.out.println("NDBAPI Execution time for " + num_iter + ": ");
+    System.out.println("   "+(endTime.getTime() - beginTime.getTime())+"ms");
+    s.close();
+    
+  } catch ( NdbApiException e ) { 
+    e.printStackTrace();
+    System.exit(-1);
+  }
+
+  }
+}

=== added file 'java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java'
--- a/java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java	1970-01-01 00:00:00 +0000
+++ b/java/com/mysql/cluster/ndbj/examples/TestBaseCallback.java	2007-07-03 00:21:23 +0000
@@ -0,0 +1,24 @@
+package com.mysql.cluster.ndbj.examples;
+
+import com.mysql.cluster.ndbj.*;
+import com.mysql.cluster.errors.*;
+
+class TestBaseCallback extends com.mysql.cluster.ndbapi.BaseCallback { 
+
+  NdbResultSet myRs; 
+  Ndb myNdb; 
+
+  public TestBaseCallback(Ndb theNdb, NdbResultSet theRs) { 
+    this.myRs=theRs;
+    this.myNdb=theNdb; 
+  }
+
+  public void callback(int result, NdbTransaction myTrans) { 
+    System.out.println("result " + result);
+    try { 
+    	System.out.println("value " + this.myRs.getInt("ATTR2"));
+    } catch (NdbApiException e) { 
+    	System.out.println("Got an exception in the callback: " + e.getMessage());
+    }
+  };
+}

=== modified file 'java/com/mysql/cluster/ndbj/Ndb.java'
--- a/java/com/mysql/cluster/ndbj/Ndb.java	2007-05-20 09:20:25 +0000
+++ b/java/com/mysql/cluster/ndbj/Ndb.java	2007-07-03 00:21:23 +0000
@@ -1,5 +1,7 @@
 package com.mysql.cluster.ndbj;
 
+import java.math.BigInteger;
+
 import com.mysql.cluster.errors.*;
 /**
 *
@@ -107,5 +109,8 @@
      * ndb object is used.
      */
     public abstract void close();
+    
+    public BigInteger getAutoIncrementValue(String aTableName, long cacheSize) throws
NdbApiException;
 
+    public abstract int sendPollNdb(int aMillisecondNumber) throws NdbApiException;
 }
\ No newline at end of file

=== modified file 'java/com/mysql/cluster/ndbj/NdbImpl.java'
--- a/java/com/mysql/cluster/ndbj/NdbImpl.java	2007-06-25 18:38:46 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbImpl.java	2007-07-03 00:21:23 +0000
@@ -1,5 +1,6 @@
 package com.mysql.cluster.ndbj;
 
+import java.math.BigInteger;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -302,5 +303,58 @@
 	public int pollEvents(int aMillisecondNumber) throws NdbApiException {
 		return ndbRef.pollEvents(aMillisecondNumber);
 	}
+
+	/**
+	 * @param aTableName
+	 * @param cacheSize
+	 * @return
+	 * @throws NdbApiException
+	 * @see com.mysql.cluster.ndbapi.Ndb#getAutoIncrementValue(java.lang.String, long)
+	 */
+	public BigInteger getAutoIncrementValue(String aTableName, long cacheSize) throws
NdbApiException {
+		return ndbRef.getAutoIncrementValue(aTableName, cacheSize);
+	}
+
+	/**
+	 * @return
+	 * @throws NdbApiException
+	 * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb()
+	 */
+	public int sendPollNdb() throws NdbApiException {
+		return ndbRef.sendPollNdb();
+	}
+
+	/**
+	 * @param aMillisecondNumber
+	 * @param minNoOfEventsToWakeup
+	 * @param forceSend
+	 * @return
+	 * @throws NdbApiException
+	 * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb(int, int, int)
+	 */
+	public int sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup, int forceSend)
throws NdbApiException {
+		return ndbRef.sendPollNdb(aMillisecondNumber, minNoOfEventsToWakeup, forceSend);
+	}
+
+	/**
+	 * @param aMillisecondNumber
+	 * @param minNoOfEventsToWakeup
+	 * @return
+	 * @throws NdbApiException
+	 * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb(int, int)
+	 */
+	public int sendPollNdb(int aMillisecondNumber, int minNoOfEventsToWakeup) throws
NdbApiException {
+		return ndbRef.sendPollNdb(aMillisecondNumber, minNoOfEventsToWakeup);
+	}
+
+	/**
+	 * @param aMillisecondNumber
+	 * @return
+	 * @throws NdbApiException
+	 * @see com.mysql.cluster.ndbapi.Ndb#sendPollNdb(int)
+	 */
+	public int sendPollNdb(int aMillisecondNumber) throws NdbApiException {
+		return ndbRef.sendPollNdb(aMillisecondNumber);
+	}
 	
 }

=== modified file 'java/com/mysql/cluster/ndbj/NdbScanOperation.java'
--- a/java/com/mysql/cluster/ndbj/NdbScanOperation.java	2007-05-21 00:08:02 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbScanOperation.java	2007-07-03 00:21:23 +0000
@@ -92,6 +92,7 @@
      * Use 0 to specify the maximum automatically.
      * @throws NdbApiException if the object has been closed or if readTuples failed
      */
+	public abstract void readTuples(LockMode mode) throws NdbApiException; 
     public abstract void readTuples(LockMode mode, long parallel,
             long batch) throws NdbApiException;
     

=== modified file 'java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java'
--- a/java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java	2007-05-21 00:08:02 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbScanOperationImpl.java	2007-07-03 00:21:23 +0000
@@ -1,6 +1,7 @@
 package com.mysql.cluster.ndbj;
 
 import com.mysql.cluster.errors.*;
+import com.mysql.cluster.ndbapi.NdbOperation.LockMode;
 
 import java.util.ArrayList;
 
@@ -270,6 +271,16 @@
 
 	}
 
+	/**
+	 * @param lock_mode
+	 * @return
+	 * @throws NdbApiException
+	 * @see
com.mysql.cluster.ndbapi.NdbScanOperation#readTuples(com.mysql.cluster.ndbapi.NdbOperation.LockMode)
+	 */
+	public void readTuples(LockMode lock_mode) throws NdbApiException {
+		scanOpRef.readTuples(com.mysql.cluster.ndbapi.NdbScanOperation.LockMode.swigToEnum(lock_mode.type));
+	}
+
 
 
 

=== modified file 'java/com/mysql/cluster/ndbj/NdbTransaction.java'
--- a/java/com/mysql/cluster/ndbj/NdbTransaction.java	2007-06-29 15:17:26 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbTransaction.java	2007-07-03 00:21:23 +0000
@@ -1,6 +1,8 @@
 package com.mysql.cluster.ndbj;
 
 import com.mysql.cluster.errors.*;
+import com.mysql.cluster.ndbapi.BaseCallback;
+
 import java.math.BigInteger;
 
 /**
@@ -207,7 +209,12 @@
 	 * @throws IllegalStateException if the close method has already been called on the
object.
 	 */
 	public abstract void execute(ExecType execType, AbortOption abortOption, boolean force)
throws NdbApiException;
+	public abstract void execute(ExecType execType, AbortOption abortOption) throws
NdbApiException;
+	public abstract void execute(ExecType execType) throws NdbApiException;
 
+	public void executeAsynchPrepare(ExecType execType, BaseCallback cb,
+			AbortOption abortOption);
+	public void executeAsynchPrepare(ExecType execType, BaseCallback cb);
 	/**
 	 * Creates a NdbOperation (primary key operation) and adds it to the transaction.
 	 * @param tableName name of the schema

=== modified file 'java/com/mysql/cluster/ndbj/NdbTransactionImpl.java'
--- a/java/com/mysql/cluster/ndbj/NdbTransactionImpl.java	2007-06-29 15:17:26 +0000
+++ b/java/com/mysql/cluster/ndbj/NdbTransactionImpl.java	2007-07-03 00:21:23 +0000
@@ -299,9 +299,9 @@
 	 * @see
com.mysql.cluster.ndbapi.NdbTransaction#execute(com.mysql.cluster.ndbapi.ExecType,
 	 *      com.mysql.cluster.ndbapi.AbortOption)
 	 */
-	public int execute(ExecType execType, AbortOption abortOption)
+	public void execute(ExecType execType, AbortOption abortOption)
 			throws NdbApiException {
-		return transRef.execute(
+		transRef.execute(
 				com.mysql.cluster.ndbapi.ExecType.swigToEnum(execType.getEnumValue()), 
 				com.mysql.cluster.ndbapi.AbortOption.swigToEnum(abortOption.getEnumValue())
 				);
@@ -313,8 +313,8 @@
 	 * @throws NdbApiException
 	 * @see
com.mysql.cluster.ndbapi.NdbTransaction#execute(com.mysql.cluster.ndbapi.ExecType)
 	 */
-	public int execute(ExecType execType) throws NdbApiException {
-		return transRef.execute(
+	public void execute(ExecType execType) throws NdbApiException {
+		transRef.execute(
 				com.mysql.cluster.ndbapi.ExecType.swigToEnum(execType.getEnumValue())); 
 				
 	}

Thread
Rev 136: Added support to NDB/J layer for async transactions. in http://bazaar.launchpad.net/~ndb-connectors/ndb-connectors/ndbjmergeMonty Taylor3 Jul