List:Commits« Previous MessageNext Message »
From:Craig L Russell Date:December 16 2010 5:03pm
Subject:bzr commit into mysql-5.1-telco-7.1 branch (Craig.Russell:4025)
View as plain text  
#At file:///Users/clr/ndb/bzr-repo/mysql-5.1-telco-7.1-coord/ based on revid:craig.russell@stripped

 4025 Craig L Russell	2010-12-16
      Add coordinated transaction support to clusterj

    added:
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java
      storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java
    modified:
      storage/ndb/clusterj/clusterj-api/pom.xml
      storage/ndb/clusterj/clusterj-bindings/pom.xml
      storage/ndb/clusterj/clusterj-core/pom.xml
      storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
      storage/ndb/clusterj/clusterj-jpatest/pom.xml
      storage/ndb/clusterj/clusterj-openjpa/pom.xml
      storage/ndb/clusterj/clusterj-test/pom.xml
      storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java
      storage/ndb/clusterj/clusterj-tie/pom.xml
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
      storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
      storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java
      storage/ndb/clusterj/pom.xml
=== modified file 'storage/ndb/clusterj/clusterj-api/pom.xml'
--- a/storage/ndb/clusterj/clusterj-api/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-api/pom.xml	2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
   <groupId>com.mysql.clusterj</groupId>
   <artifactId>clusterj-api</artifactId>
   <packaging>bundle</packaging>
-  <version>7.1.10-SNAPSHOT</version>
+  <version>7.1.10-coord-SNAPSHOT</version>
   <name>ClusterJ API</name>
   <description>The API for ClusterJ</description>
   <build>

=== modified file 'storage/ndb/clusterj/clusterj-bindings/pom.xml'
--- a/storage/ndb/clusterj/clusterj-bindings/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-bindings/pom.xml	2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
   <groupId>com.mysql.clusterj</groupId>
   <artifactId>clusterj-bindings</artifactId>
   <packaging>bundle</packaging>
-  <version>7.1.10-SNAPSHOT</version>
+  <version>7.1.10-coord-SNAPSHOT</version>
   <name>ClusterJ Bindings</name>
   <description>The ndb-bindings implementation of ClusterJ storage spi</description>
   <build>
@@ -113,13 +113,13 @@
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-api</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-core</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
@@ -149,7 +149,7 @@
     <dependency>
     	<groupId>com.mysql.clusterj</groupId>
     	<artifactId>clusterj-test</artifactId>
-        <version>7.1.10-SNAPSHOT</version>
+        <version>7.1.10-coord-SNAPSHOT</version>
     	<scope>test</scope>
     </dependency>
   </dependencies>   

=== modified file 'storage/ndb/clusterj/clusterj-core/pom.xml'
--- a/storage/ndb/clusterj/clusterj-core/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-core/pom.xml	2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
   <groupId>com.mysql.clusterj</groupId>
   <artifactId>clusterj-core</artifactId>
   <packaging>bundle</packaging>
-  <version>7.1.10-SNAPSHOT</version>
+  <version>7.1.10-coord-SNAPSHOT</version>
   <name>ClusterJ Core</name>
   <description>The core implementation of ClusterJ</description>
   <build>
@@ -89,7 +89,7 @@
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-api</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>

=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2010-10-28 09:50:56 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java	2010-12-16 17:03:32 +0000
@@ -639,6 +639,7 @@ public class SessionImpl implements Sess
             clusterTransaction.close();
             clusterTransaction = null;
             partitionKey = null;
+            joinTransactionId = null;
         }
     }
 
@@ -668,6 +669,7 @@ public class SessionImpl implements Sess
             }
             clusterTransaction = null;
             partitionKey = null;
+            joinTransactionId = null;
         }
     }
 
@@ -1054,16 +1056,17 @@ public class SessionImpl implements Sess
     }
 
     public void flush() {
-        if (logger.isDetailEnabled()) logger.detail("flush changes with changeList: " + changeList);
-        if (changeList.isEmpty()) {
-            return;
-        }
-        for (StateManager sm: changeList) {
-            sm.flush(this);
+        if (logger.isDetailEnabled()) logger.detail("flush changes with changeList size: " + changeList.size());
+        if (!changeList.isEmpty()) {
+            for (StateManager sm: changeList) {
+                sm.flush(this);
+            }
+            changeList.clear();
         }
         // now flush changes to the back end
-        clusterTransaction.executeNoCommit();
-        changeList.clear();
+        if (clusterTransaction != null) {
+            clusterTransaction.executeNoCommit();
+        }
     }
 
     public List getChangeList() {
@@ -1128,7 +1131,7 @@ public class SessionImpl implements Sess
      * @return the coordinatedTransactionId
      */
     public String getCoordinatedTransactionId() {
-        return clusterTransaction.getCoordinatedTransactionId();
+        return clusterTransaction==null?null:clusterTransaction.getCoordinatedTransactionId();
     }
 
     /** Set the coordinatedTransactionId for the next transaction. This
@@ -1136,7 +1139,11 @@ public class SessionImpl implements Sess
      * @param coordinatedTransactionId the coordinatedTransactionId
      */
     public void setCoordinatedTransactionId(String coordinatedTransactionId) {
-        clusterTransaction.setCoordinatedTransactionId(coordinatedTransactionId);
+        if (clusterTransaction != null) {
+            throw new ClusterJUserException(
+                    local.message("ERR_Cannot_Set_Join_Transaction_Id_After_Transaction_Begin"));
+        }
+        joinTransactionId = coordinatedTransactionId;
     }
 
     /** Set the lock mode for subsequent operations. The lock mode takes effect immediately

=== modified file 'storage/ndb/clusterj/clusterj-jpatest/pom.xml'
--- a/storage/ndb/clusterj/clusterj-jpatest/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-jpatest/pom.xml	2010-12-16 17:03:32 +0000
@@ -29,7 +29,7 @@
     <modelVersion>4.0.0</modelVersion>
     <groupId>com.mysql.clusterj</groupId>
     <artifactId>clusterj-jpatest</artifactId>
-    <version>7.1.10-SNAPSHOT</version>
+    <version>7.1.10-coord-SNAPSHOT</version>
     <packaging>jar</packaging>
     <name>ClusterJ JPA Integration Tests</name>
 

=== modified file 'storage/ndb/clusterj/clusterj-openjpa/pom.xml'
--- a/storage/ndb/clusterj/clusterj-openjpa/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/pom.xml	2010-12-16 17:03:32 +0000
@@ -25,7 +25,7 @@
   <modelVersion>4.0.0</modelVersion>
   <groupId>com.mysql.clusterj</groupId>
   <artifactId>clusterj-openjpa</artifactId>
-  <version>7.1.10-SNAPSHOT</version>
+  <version>7.1.10-coord-SNAPSHOT</version>
   <packaging>bundle</packaging>
   <name>ClusterJ OpenJPA Integration</name>
 
@@ -122,24 +122,24 @@
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-jpatest</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-api</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-core</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-tie</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
     <dependency>

=== modified file 'storage/ndb/clusterj/clusterj-test/pom.xml'
--- a/storage/ndb/clusterj/clusterj-test/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-test/pom.xml	2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
   <groupId>com.mysql.clusterj</groupId>
   <artifactId>clusterj-test</artifactId>
   <packaging>jar</packaging>
-  <version>7.1.10-SNAPSHOT</version>
+  <version>7.1.10-coord-SNAPSHOT</version>
   <name>ClusterJ Test Suite</name>
   <build>
     <plugins>
@@ -91,13 +91,13 @@
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-api</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-core</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>junit</groupId>

=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java	2010-09-14 09:43:52 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java	2010-12-16 17:03:32 +0000
@@ -21,20 +21,20 @@ package testsuite.clusterj;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-
-import org.junit.Ignore;
+import java.sql.Types;
 
 /** Test that mysql session variable ndb_coordinated_transaction_id can be
  * read and written by jdbc.
  */
-@Ignore
 public class CoordinatedTransactionIdVariableTest extends AbstractClusterJTest {
 
     /** Format is Uint32+Uint32:Uint64 */
     private String newId = "1+1:9000000000000099";
+    private String badIdTooLong = "123456789012345678901234567890123456";
+    private String badIdTooShort = "1+1";
     private String sqlQuery = "select id from t_basic where id = 0";
-    private String getTransactionIdVariableName = "@@ndb_transaction_id";
-    private String setTransactionIdVariableName = "@@ndb_join_transaction_id";
+    private String transactionIdVariableName = "@@ndb_transaction_id";
+    private String joinTransactionIdVariableName = "@@ndb_join_transaction_id";
 
     @Override
     protected void localSetUp() {
@@ -49,42 +49,110 @@ public class CoordinatedTransactionIdVar
         return false;
     }
 
-    /** Verify that the initial value of the variable ndb_coordinated_transaction_id is null.
+    /** Verify that the initial value of the variable transaction_id is null.
      */
-    public void checkInitialValue() {
+    public void checkTransactionIdInitialValue() {
         getConnection();
-        String id = getJDBCCoordinatedTransactionId("checkInitialValue");
-        errorIfNotEqual("Coordinated transaction id must default to null.", null, id);
+        String id = getJDBCTransactionId("checkTransactionIdInitialValue");
+        errorIfNotEqual("Transaction id must default to null.", null, id);
     }
 
-    /** Try to set the ndb_coordinated_transaction_id variable to a new value
-     * and verify that it can be read back.
+    /** Verify that the initial value of the variable join_transaction_id is null.
      */
-    public void checkNewValue() {
+    public void checkJoinTransactionIdInitialValue() {
         getConnection();
-        // set the coordinated_transaction_id to some random value
-        setJDBCCoordinatedTransactionId("checkNewValue", newId);
-        String id = getJDBCCoordinatedTransactionId("checkNewValue");
+        String id = getJDBCJoinTransactionId("checkJoinTransactionIdInitialValue");
+        errorIfNotEqual("Join transaction id must default to null.", null, id);
+    }
+
+    /** Verify that you cannot set the value of the variable transaction_id.
+     */
+    public void checkSetTransactionId() {
+        getConnection();
+        setAutoCommit(connection, false);
+        // try set the transaction_id to a good value; expect error 1238 "read only variable"
+        setJDBCVariable("checkSetTransactionId", transactionIdVariableName, newId, 1238);
+        // close the connection so the value isn't accidentally used by a new transaction
+        closeConnection();
+    }
+
+    /** Try to set the join_transaction_id variable to a good value
+     * and verify that it can be read back. A null string is used to reset the value.
+     */
+    public void checkNewIdResetWithNullString() {
+        getConnection();
+        setAutoCommit(connection, false);
+        // set the coordinated_transaction_id to some value
+        setJDBCVariable("checkNewId", joinTransactionIdVariableName, newId, 0);
+        String id = getJDBCJoinTransactionId("checkNewId");
         errorIfNotEqual("failed to set coordinated transaction id.", newId, id);
-        executeJDBCQuery("checkNewValue");
+        setJDBCVariable("checkNewId", joinTransactionIdVariableName, null, 0);
+        id = getJDBCJoinTransactionId("checkNewId");
+        errorIfNotEqual("failed to set coordinated transaction id to null.", null, id);
         // close the connection so the value isn't accidentally used by a new transaction
         closeConnection();
     }
 
-    /** Verify that after an ndb transaction is started the coordinated transaction id is not null
+    /** Try to set the join_transaction_id variable to a good value
+     * and verify that it can be read back. An empty string is used to reset the value.
+     */
+    public void checkNewIdResetWithEmptyString() {
+        getConnection();
+        setAutoCommit(connection, false);
+        // set the coordinated_transaction_id to some value
+        setJDBCVariable("checkNewId", joinTransactionIdVariableName, newId, 0);
+        String id = getJDBCJoinTransactionId("checkNewId");
+        errorIfNotEqual("failed to set coordinated transaction id.", newId, id);
+        setJDBCVariable("checkNewId", joinTransactionIdVariableName, "", 0);
+        id = getJDBCJoinTransactionId("checkNewId");
+        errorIfNotEqual("failed to set coordinated transaction id to null.", null, id);
+        // close the connection so the value isn't accidentally used by a new transaction
+        closeConnection();
+    }
+
+    /** Try to set the join_transaction_id variable to a bad value
+     * and verify that an exception is thrown.
+     */
+    public void checkBadIdTooLong() {
+        getConnection();
+        setAutoCommit(connection, false);
+        // set the join_transaction_id to a bad value and expect 1210 "Incorrect arguments to SET"
+        setJDBCVariable("checkBadIdTooLong", joinTransactionIdVariableName, badIdTooLong, 1210);
+        String id = getJDBCJoinTransactionId("checkBadIdTooLong");
+        errorIfNotEqual("failed to set coordinated transaction id.", null, id);
+        // close the connection so the value isn't accidentally used by a new transaction
+        closeConnection();
+    }
+
+    /** Try to set the join_transaction_id variable to a bad value
+     * and verify that an exception is thrown.
+     */
+    public void checkBadIdTooShort() {
+        getConnection();
+        setAutoCommit(connection, false);
+        // set the join_transaction_id to a bad value and expect 1210 "Incorrect arguments to SET"
+        setJDBCVariable("checkBadIdTooShort", joinTransactionIdVariableName, badIdTooShort, 1210);
+        String id = getJDBCJoinTransactionId("checkBadIdTooShort");
+        errorIfNotEqual("failed to set coordinated transaction id.", null, id);
+        // close the connection so the value isn't accidentally used by a new transaction
+        closeConnection();
+    }
+
+    /** Verify that after an ndb transaction is started the transaction id is not null
      * and is null after commit.
      */
     public void checkIdAfterTransactionStartAndCommit() {
         getConnection();
-        // execute a query statement that will cause the server to start an ndb transaction
+        setAutoCommit(connection, false);
+       // execute a query statement that will cause the server to start an ndb transaction
         executeJDBCQuery("checkIdAfterTransactionStartAndCommit");
         // the coordinated transaction id should now be available
-        String id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndCommit");
+        String id = getJDBCTransactionId("checkIdAfterTransactionStartAndCommit");
         // we can only test for not null since we cannot predict the transaction id
-        errorIfEqual("Coordinated transaction must not be null after transaction start", null, id);
+        errorIfEqual("Transaction id must not be null after transaction start.", null, id);
         commitConnection();
-        id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndCommit");
-        errorIfNotEqual("Coordinated transaction id must be null after commit.", null, id);
+        id = getJDBCTransactionId("checkIdAfterTransactionStartAndCommit");
+        errorIfNotEqual("Transaction id must be null after commit.", null, id);
         }
 
     /** Verify that after an ndb transaction is started the coordinated transaction id is not null
@@ -92,15 +160,16 @@ public class CoordinatedTransactionIdVar
      */
     public void checkIdAfterTransactionStartAndRollback() {
         getConnection();
+        setAutoCommit(connection, false);
         // execute a query statement that will cause the server to start an ndb transaction
         executeJDBCQuery("checkIdAfterTransactionStartAndRollback");
         // the coordinated transaction id should now be available
-        String id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndRollback");
+        String id = getJDBCTransactionId("checkIdAfterTransactionStartAndRollback");
         // we can only test for not null since we cannot predict the transaction id
-        errorIfEqual("Coordinated transaction must not be null after transaction start", null, id);
+        errorIfEqual("Transaction must not be null after transaction start.", null, id);
         rollbackConnection();
-        id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndRollback");
-        errorIfNotEqual("Coordinated transaction id must be null after rollback.", null, id);
+        id = getJDBCTransactionId("checkIdAfterTransactionStartAndRollback");
+        errorIfNotEqual("Transaction id must be null after rollback.", null, id);
         }
 
     /** Execute a SQL query. Throw away the results. Keep the transaction open.
@@ -133,38 +202,60 @@ public class CoordinatedTransactionIdVar
         }
     }
 
-    /** Set the coordinated_transaction_id variable in the server.
+    /** Set the variable in the server.
+     * @param where the context
+     * @param variableName the name of the server variable
      * @param newId the id to set
      */
-    protected void setJDBCCoordinatedTransactionId(String where, String newId) {
-        if (newId == null) {
-            fail(where + " test case error: coordinated transaction id must not be null.");
-        }
+    protected void setJDBCVariable(String where, String variableName, String newId, int expectedErrorCode) {
         try {
-            String setSql = "set " + setTransactionIdVariableName + " = '" + newId + "'";
-            PreparedStatement setCoordinatedTransactionIdStatement = connection.prepareStatement(setSql);
-            boolean result = setCoordinatedTransactionIdStatement.execute();
-            errorIfNotEqual(where + " set coordinated transaction id returned true.", false, result);
+            String setSql = "set " + variableName + " = ?";
+            PreparedStatement setVariableStatement = connection.prepareStatement(setSql);
+            if (newId == null) {
+                setVariableStatement.setNull(1, Types.VARCHAR);
+            } else {
+                setVariableStatement.setString(1, newId);
+            }
+            boolean resultNotUpdateCount = setVariableStatement.execute();
+            errorIfNotEqual(where + " set join transaction id returned true.", false, resultNotUpdateCount);
+            errorIfNotEqual(where + " set join transaction id failed to throw expected exception " +
+                    expectedErrorCode + " for " + newId, expectedErrorCode, 0);
         } catch (SQLException e) {
-            error(where + " caught exception on set coordinated transaction id:", e);
+            int errorCode = e.getErrorCode();
+            errorIfNotEqual(where + " caught wrong exception on set coordinated transaction id:" +
+                    " errorCode: " + errorCode + " SQLState: " + e.getSQLState(), expectedErrorCode, errorCode);
         }
     }
 
-    /** Get the coordinated_transaction_id variable from the server.
+    /** Get the join_transaction_id variable from the server.
+     * @return the id from the server
+     */
+    protected String getJDBCJoinTransactionId(String where) {
+        String getId = "select " + joinTransactionIdVariableName;
+        String result = executeSelect(where, getId);
+        return result;
+    }
+
+    /** Get the join_transaction_id variable from the server.
      * @return the id from the server
      */
-    protected String getJDBCCoordinatedTransactionId(String where) {
-        String getId = "select " + getTransactionIdVariableName;
+    protected String getJDBCTransactionId(String where) {
+        String getId = "select " + transactionIdVariableName;
+        String result = executeSelect(where, getId);
+        return result;
+    }
+
+    private String executeSelect(String where, String getId) {
         String result = null;
         try {
             PreparedStatement getCoordinatedTransactionIdStatement = connection.prepareStatement(getId);
             ResultSet rs = getCoordinatedTransactionIdStatement.executeQuery();
             boolean hasResult = rs.next();
-            errorIfNotEqual(where + " select coordinated transaction id returned false.", true, hasResult);
+            errorIfNotEqual(where + " select " + getId + " returned false.", true, hasResult);
             result = rs.getString(1);
-            if (getDebug()) System.out.println(where + " getJDBCCoordinatedTransactionId returns " + result);
+            if (getDebug()) System.out.println(where + " " + getId + " returns " + result);
         } catch (SQLException e) {
-            error(where + " caught exception on get coordinated transaction id.", e);
+            error(where + " caught exception on select " + getId + ".", e);
         }
         return result;
     }

=== added file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java	2010-12-16 17:03:32 +0000
@@ -0,0 +1,1437 @@
+/*
+ *  Copyright (C) 2010 Sun Microsystems, Inc.
+ *  All rights reserved. Use is subject to license terms.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; version 2 of the License.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+ */
+
+package testsuite.clusterj;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import testsuite.clusterj.model.AllPrimitives;
+import testsuite.clusterj.model.IdBase;
+
+import com.mysql.clusterj.Query;
+import com.mysql.clusterj.core.query.EqualPredicateImpl;
+import com.mysql.clusterj.core.spi.SessionSPI;
+import com.mysql.clusterj.query.PredicateOperand;
+import com.mysql.clusterj.query.QueryBuilder;
+import com.mysql.clusterj.query.QueryDomainType;
+
+
+/**
+ * Tests coordinated transaction methods interacting between sessions. The sessions can be
+ * any combination of: clusterj sessions, normal jdbc sessions, and clusterjdbc sessions.
+ * This test class defines interfaces that allow any of the three session types to be tested
+ * against the others. The actual tests to be run are defined in the subclasses specific
+ * to each project.
+ * 
+ * While it is possible to test the interactions of sessions that are not joined to the
+ * same transaction, these tests usually result in deadlock timeouts and are not useful
+ * for the purpose of joined transaction testing.
+ * 
+ * The following are representative tests on sessions T1 and T2. This list is not
+ * exhaustive and additional tests may be written to cover other interesting cases.
+ * 
+ * 1. Create an instance in T1. Join the same transaction in T2.
+ * Verify that you can find the new instance by primary key, unique key, index scan, and table scan in T1.
+ * Verify that you can find the new instance by primary key, unique key, index scan, and table scan in T2.
+ * 
+ * 2. Delete an instance in T1. Join the same transaction in T2.
+ * Verify that you cannot find it by primary key, unique key, index scan, and table scan in T1.
+ * Verify that you cannot find it by primary key, unique key, index scan, and table scan in T2.
+ * 
+ * 3. Update an instance in T1. Join the same transaction in T2.
+ * Verify that you can find it by primary key, unique key, index scan, and table scan in T1.
+ * Verify that you can find it by primary key, unique key, index scan, and table scan in T2.
+ *
+ * 4. 
+ */
+public class CoordinatedTransactionVisibilityTest extends AbstractClusterJModelTest{
+
+    protected static final String SET_TRANSACTION_ID_VARIABLE_NAME = "@@ndb_join_transaction_id";
+
+    protected static final String GET_TRANSACTION_ID_VARIABLE_NAME = "@@ndb_transaction_id";
+
+    /** The id (and non-id values) for the inserted row from transaction 1 */
+    private int INSERT_ID1 = 101;
+
+    /** The id (and non-id values) for the inserted row from transaction 2 */
+    private int INSERT_ID2 = 102;
+
+    /** The id for the deleted row for transaction 1 */
+    private int DELETE_ID1 = 1;
+
+    /** The id for the deleted row for transaction 2 */
+    private int DELETE_ID2 = 2;
+
+    /** The id for the updated row */
+    private int UPDATE_ID = 1;
+
+    /** The id for the row to find*/
+    private int FIND_ID = 1;
+
+    /** The id values for the updated row */
+    private int UPDATED_VALUE = 10;
+
+    /** The column names */
+    private String[] columnNames = new String[] {
+            "id", 
+            "int_not_null_hash",
+            "int_not_null_btree",
+            "int_not_null_both",
+            "int_not_null_none",
+            "int_null_hash",
+            "int_null_btree",
+            "int_null_both",
+            "int_null_none",
+
+            "byte_not_null_hash",
+            "byte_not_null_btree",
+            "byte_not_null_both",
+            "byte_not_null_none",
+            "byte_null_hash",
+            "byte_null_btree",
+            "byte_null_both",
+            "byte_null_none",
+
+            "short_not_null_hash",
+            "short_not_null_btree",
+            "short_not_null_both",
+            "short_not_null_none",
+            "short_null_hash",
+            "short_null_btree",
+            "short_null_both",
+            "short_null_none",
+
+            "long_not_null_hash",
+            "long_not_null_btree",
+            "long_not_null_both",
+            "long_not_null_none",
+            "long_null_hash",
+            "long_null_btree",
+            "long_null_both",
+            "long_null_none"
+
+    };
+
+    private int NUMBER_OF_INSTANCES = 3;
+
+    private boolean testFind = true;
+    private boolean testLookup = true;
+    private boolean testIndexScan = true;
+    private boolean testTableScan = true;
+
+    public void localSetUp() {
+        super.localSetUp();
+        session = sessionFactory.getSession();
+        createAllPrimitivesInstances(NUMBER_OF_INSTANCES);
+        session.deletePersistentAll(AllPrimitives.class);
+        session.makePersistentAll(instances);
+    }
+
+    @Override
+    public boolean getDebug() {
+        return true;
+    }
+
+    /** Subclasses must override this method to provide the name of the table for the test */
+    protected String getTableName() {
+        return "allprimitives";
+    }
+
+    public void clusterjVersusClusterj() {
+        String where = "clusterjVersusClusterj";
+        AbstractSession session1 = newClusterJSession(newSession());
+        if (getDebug()) System.out.println("session1: " + session1);
+        AbstractSession session2 = newClusterJSession(newSession());
+        if (getDebug()) System.out.println("session2: " + session2);
+        runTests(where, session1, session2);
+    }
+
+    public void jdbcVersusJdbc() {
+        String where = "jdbcVersusJdbc";
+        AbstractSession session1 = newJdbcSession(newConnection(where));
+        if (getDebug()) System.out.println("session1: " + session1);
+        AbstractSession session2 = newJdbcSession(newConnection(where));
+        if (getDebug()) System.out.println("session2: " + session2);
+        runTests(where, session1, session2);
+    }
+
+    public void clusterjVersusJdbc() {
+        String where = "clusterjVersusJdbc";
+        AbstractSession session1 = newClusterJSession(newSession());
+        if (getDebug()) System.out.println("session1: " + session1);
+        AbstractSession session2 = newJdbcSession(newConnection(where));
+        if (getDebug()) System.out.println("session2: " + session2);
+        runTests(where, session1, session2);
+    }
+
+    public void jdbcVersusClusterj() {
+        String where = "jdbcVersusClusterj";
+        AbstractSession session1 = newJdbcSession(newConnection(where));
+        if (getDebug()) System.out.println("session2: " + session1);
+        AbstractSession session2 = newClusterJSession(newSession());
+        if (getDebug()) System.out.println("session1: " + session2);
+        runTests(where, session1, session2);
+    }
+
+    protected void runTests(String where, AbstractSession session1, AbstractSession session2) {
+        findVersusFind(where, session1, session2);
+        findVersusLookup(where, session1, session2);
+        findVersusIndexScan(where, session1, session2);
+        findVersusTableScan(where, session1, session2);
+
+        insertVersusFind(where, session1, session2);
+        insertVersusLookup(where, session1, session2);
+        insertVersusIndexScan(where, session1, session2);
+        insertVersusTableScan(where, session1, session2);
+
+        insertInsertVersusFind(where, session1, session2);
+        insertInsertVersusLookup(where, session1, session2);
+        insertInsertVersusIndexScan(where, session1, session2);
+        insertInsertVersusTableScan(where, session1, session2);
+
+        deleteVersusFind(where, session1, session2);
+        deleteVersusLookup(where, session1, session2);
+        deleteVersusIndexScan(where, session1, session2);
+        deleteVersusTableScan(where, session1, session2);
+
+        deleteDeleteVersusFind(where, session1, session2);
+        deleteDeleteVersusLookup(where, session1, session2);
+        deleteDeleteVersusIndexScan(where, session1, session2);
+        deleteDeleteVersusTableScan(where, session1, session2);
+
+        // cannot run update versus find because there's no API to change primary key in clusterj
+        updateVersusLookup(where, session1, session2);
+        updateVersusIndexScan(where, session1, session2);
+        updateVersusTableScan(where, session1, session2);
+
+        indexScanVersusIndexScan(where, session1, session2);
+        indexScanVersusTableScan(where, session1, session2);
+        tableScanVersusIndexScan(where, session1, session2);
+        tableScanVersusTableScan(where, session1, session2);
+
+        indexScanVersusInsert(where, session1, session2);
+        indexScanVersusDelete(where, session1, session2);
+        indexScanVersusUpdate(where, session1, session2);
+        
+        tableScanVersusInsert(where, session1, session2);
+        tableScanVersusDelete(where, session1, session2);
+        tableScanVersusUpdate(where, session1, session2);
+        
+    }
+
+    protected void findVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " findVersusFind";
+        try {
+            session1.begin(now + " original transaction");
+            session1.find(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.find(now + " joined transaction", FIND_ID, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void findVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " findVersusLookup";
+        try {
+            session1.begin(now + " original transaction");
+            session1.find(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.lookup(now + " joined transaction", FIND_ID, true);
+            session1.commit(now);
+            session2.commit(now);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void findVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " findVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.find(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", FIND_ID, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void findVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " findVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.find(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", FIND_ID, true);
+            session1.commit(now);
+            session2.commit(now);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertVersusFind";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.find(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.find(now + " joined transaction", INSERT_ID1, true);
+            session1.commit(now);
+            session2.commit(now);
+            session1.delete(now, INSERT_ID1);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertVersusLookup";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.lookup(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.lookup(now + " joined transaction", INSERT_ID1, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.indexScan(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", INSERT_ID1, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.tableScan(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", INSERT_ID1, true);
+            session1.commit(now);
+            session2.commit(now);
+            session1.delete(now, INSERT_ID1);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertInsertVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertInsertVersusFind";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.find(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.find(now + " joined transaction", INSERT_ID1, true);
+            session2.insert(now, INSERT_ID2);
+            session2.find(now + " joined transaction", INSERT_ID2, true);
+            session1.find(now + " original transaction", INSERT_ID2, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertInsertVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertInsertVersusLookup";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.lookup(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.lookup(now + " joined transaction", INSERT_ID1, true);
+            session2.insert(now, INSERT_ID2);
+            session2.lookup(now + " joined transaction", INSERT_ID2, true);
+            session1.lookup(now + " original transaction", INSERT_ID2, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertInsertVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertInsertVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.indexScan(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", INSERT_ID1, true);
+            session2.insert(now, INSERT_ID2);
+            session2.indexScan(now + " joined transaction", INSERT_ID2, true);
+            session1.indexScan(now + " original transaction", INSERT_ID2, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void insertInsertVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " insertInsertVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.insert(now, INSERT_ID1);
+            session1.tableScan(now + " original transaction", INSERT_ID1, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", INSERT_ID1, true);
+            session2.insert(now, INSERT_ID2);
+            session2.tableScan(now + " joined transaction", INSERT_ID2, true);
+            session1.tableScan(now + " original transaction", INSERT_ID2, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void updateVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " updateVersusLookup";
+        try {
+            session1.begin(now + " original transaction");
+            session1.update(now, UPDATE_ID, UPDATED_VALUE);
+            session1.lookup(now + " original transaction", UPDATE_ID, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.lookup(now + " joined transaction", UPDATE_ID, false);
+            session1.commit(now);
+            session2.commit(now);
+            session1.update(now, UPDATE_ID, UPDATE_ID);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void updateVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " updateVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.update(now, UPDATE_ID, UPDATED_VALUE);
+            session1.indexScan(now + " original transaction", UPDATE_ID, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", UPDATE_ID, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void updateVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " updateVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.update(now, UPDATE_ID, UPDATED_VALUE);
+            session1.tableScan(now + " original transaction", UPDATE_ID, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", UPDATE_ID, false);
+            session1.commit(now);
+            session2.commit(now);
+            session1.update(now, UPDATE_ID, UPDATE_ID);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteVersusFind";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.find(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.find(now + " joined transaction", DELETE_ID1, false);
+            session1.commit(now);
+            session2.commit(now);
+            session1.insert(now, DELETE_ID1);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteVersusLookup";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.lookup(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.lookup(now + " joined transaction", DELETE_ID1, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.indexScan(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", DELETE_ID1, false);
+            session1.commit(now);
+            session2.commit(now);
+            session1.insert(now, DELETE_ID1);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.tableScan(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", DELETE_ID1, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteDeleteVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteDeleteVersusFind";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.find(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.find(now + " joined transaction", DELETE_ID1, false);
+            session2.delete(now, DELETE_ID2);
+            session2.find(now + " joined transaction", DELETE_ID2, false);
+            session1.find(now + " original transaction", DELETE_ID2, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteDeleteVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteDeleteVersusLookup";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.lookup(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.lookup(now + " joined transaction", DELETE_ID1, false);
+            session2.delete(now, DELETE_ID2);
+            session2.lookup(now + " joined transaction", DELETE_ID2, false);
+            session1.lookup(now + " original transaction", DELETE_ID2, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteDeleteVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteDeleteVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.indexScan(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", DELETE_ID1, false);
+            session2.delete(now, DELETE_ID2);
+            session2.indexScan(now + " joined transaction", DELETE_ID2, false);
+            session1.indexScan(now + " original transaction", DELETE_ID2, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void deleteDeleteVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " deleteDeleteVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.delete(now, DELETE_ID1);
+            session1.tableScan(now + " original transaction", DELETE_ID1, false);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", DELETE_ID1, false);
+            session2.delete(now, DELETE_ID2);
+            session2.tableScan(now + " joined transaction", DELETE_ID2, false);
+            session1.tableScan(now + " original transaction", DELETE_ID2, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void indexScanVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " indexScanVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.indexScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", FIND_ID, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void indexScanVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " indexScanVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.indexScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", FIND_ID, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void tableScanVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " tableScanVersusIndexScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.tableScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.indexScan(now + " joined transaction", FIND_ID, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void tableScanVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " tableScanVersusTableScan";
+        try {
+            session1.begin(now + " original transaction");
+            session1.tableScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.tableScan(now + " joined transaction", FIND_ID, true);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void indexScanVersusInsert(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " indexScanVersusInsert";
+        try {
+            session1.begin(now + " original transaction");
+            session1.indexScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.insert(now + " joined transaction", INSERT_ID2);
+            session1.find(now + " original transaction", INSERT_ID2, true);
+            session2.find(now + " joined transaction", INSERT_ID2, true);
+            session2.commit(now + " joined transaction");
+            session1.commit(now + " original transaction");
+            session2.delete(now + " joined transaction", INSERT_ID2);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void indexScanVersusDelete(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " indexScanVersusDelete";
+        try {
+            session1.begin(now + " original transaction");
+            session1.indexScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.delete(now + " joined transaction", DELETE_ID2);
+            session1.lookup(now + " original transaction", DELETE_ID2, false);
+            session2.lookup(now + " joined transaction", DELETE_ID2, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void indexScanVersusUpdate(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " indexScanVersusUpdate";
+        try {
+            session1.begin(now + " original transaction");
+            session1.indexScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.update(now + " joined transaction", UPDATE_ID, UPDATED_VALUE);
+            session1.lookup(now + " original transaction", UPDATE_ID, false);
+            session2.lookup(now + " joined transaction", UPDATE_ID, false);
+            session2.commit(now + " joined transaction");
+            session1.commit(now + " original transaction");
+            session2.update(now + " joined transaction", UPDATE_ID, UPDATE_ID);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void tableScanVersusInsert(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " tableScanVersusInsert";
+        try {
+            session1.begin(now + " original transaction");
+            session1.tableScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.insert(now + " joined transaction", INSERT_ID2);
+            session1.find(now + " original transaction", INSERT_ID2, true);
+            session2.find(now + " joined transaction", INSERT_ID2, true);
+            session2.commit(now + " joined transaction");
+            session1.commit(now + " original transaction");
+            session2.delete(now + " joined transaction", INSERT_ID2);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void tableScanVersusDelete(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " tableScanVersusDelete";
+        try {
+            session1.begin(now + " original transaction");
+            session1.tableScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.delete(now + " joined transaction", DELETE_ID2);
+            session1.lookup(now + " original transaction", DELETE_ID2, false);
+            session2.lookup(now + " joined transaction", DELETE_ID2, false);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    protected void tableScanVersusUpdate(String where, AbstractSession session1, AbstractSession session2) {
+        String now = where + " tableScanVersusUpdate";
+        try {
+            session1.begin(now + " original transaction");
+            session1.tableScan(now + " original transaction", FIND_ID, true);
+            String transactionId = session1.getTransactionId(now);
+            if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+            session2.begin(now + " joined transaction", transactionId);
+            session2.update(now + " joined transaction", UPDATE_ID, UPDATED_VALUE);
+            session1.lookup(now + " original transaction", UPDATE_ID, false);
+            session2.lookup(now + " joined transaction", UPDATE_ID, false);
+            session2.commit(now + " joined transaction");
+            session1.commit(now + " original transaction");
+            session2.update(now + " joined transaction", UPDATE_ID, UPDATE_ID);
+        } catch (Exception ex) {
+            error(now, ex);
+        } finally {
+            session1.rollback(now);
+            session2.rollback(now);
+        }
+    }
+
+    public AbstractSession newClusterJSession(final SessionSPI session) {
+        return new AbstractSession() {
+
+            public void begin(String where) throws Exception {
+                session.begin();
+            }
+
+            public void begin(String where, String txid) throws Exception {
+                session.setCoordinatedTransactionId(txid);
+                session.begin();
+            }
+
+            public void delete(String where, int value) throws Exception {
+                session.deletePersistent(AllPrimitives.class, value);
+                session.flush();
+            }
+
+            public void find(String where, int value, boolean expectSuccess) throws Exception {
+                AllPrimitives result = session.find(AllPrimitives.class, value);
+                checkResult(where + " find", value, expectSuccess, result);
+            }
+
+            public String getTransactionId(String where) throws Exception {
+                return session.getCoordinatedTransactionId();
+            }
+
+            public void indexScan(String where, int value, boolean expectSuccess) throws Exception {
+                List<AllPrimitives> result = query(where + " index scan", AllPrimitives.class, "int_not_null_btree", value);
+                if (getDebug()) System.out.println(where + " index scan " + result.size());
+                checkResults(where + " index scan", value, expectSuccess, result);
+            }
+
+            public void insert(String where, int value) throws Exception {
+                AllPrimitives instance = createAllPrimitiveInstance(session, value);
+                session.makePersistent(instance);
+                session.flush();
+            }
+
+            public void lookup(String where, int value, boolean expectSuccess) throws Exception {
+                List<AllPrimitives> result = query(where + " lookup", AllPrimitives.class, "int_not_null_hash", value);
+                checkResults(where + " lookup", value, expectSuccess, result);
+            }
+
+            public void commit(String where) {
+                if (session.currentTransaction().isActive()) session.commit();
+            }
+
+            public void rollback(String where) {
+                if (session.currentTransaction().isActive()) session.rollback();
+            }
+
+            public void tableScan(String where, int value, boolean expectSuccess) throws Exception {
+                List<AllPrimitives> result = query(where + " table scan", AllPrimitives.class, "int_not_null_none", value);
+                if (getDebug()) System.out.println(where + " table scan " + result.size());
+                checkResults(where + " table scan", value, expectSuccess, result);
+            }
+
+            public void update(String where, int value, int updatedValue) throws Exception {
+                AllPrimitives instance = session.find(AllPrimitives.class, value);
+                initialize(instance, updatedValue);
+                // restore id so update works
+                instance.setId(value);
+                session.updatePersistent(instance);
+                session.flush();
+            }
+
+            private <T extends IdBase> List<T> query(String where, Class<T> cls, String fieldName, int value) {
+                List<T> results = null;
+                QueryBuilder builder = session.getQueryBuilder();
+                QueryDomainType<T> queryDomainType = builder.createQueryDefinition(cls);
+                PredicateOperand field = queryDomainType.get(fieldName);
+                PredicateOperand parameter = queryDomainType.param(fieldName);
+                queryDomainType.where(field.equal(parameter));
+                Query<T> query = session.createQuery(queryDomainType);
+                query.setParameter(fieldName, value);
+                results = query.getResultList();
+                return results;
+            }
+
+            private <T extends IdBase> void checkResults(
+                    String where, int value, boolean expectSuccess, List<T> results) {
+                int size = results.size();
+                if (size > 1) {
+                    error(where + " returned wrong number of results: " + size);
+                    return;
+                }
+                T result = null;
+                if (size == 1) {
+                    result = results.get(0);
+                }
+                checkResult(where, value, expectSuccess, result);
+            }
+
+            private <T extends IdBase> void checkResult(
+                    String where, int value, boolean expectSuccess, T result) {
+                if (result != null) {
+                    int actual = result.getId();
+                    if (expectSuccess) {
+                        if (actual != value) {
+                            error(where + " returned wrong row; expected " + value + " got " + actual + ".");
+                        } else if (getDebug()) error (where + " CORRECTLY RETURNED ROW " + value + ".");
+                    } else {
+                        error(where + " returned a row that was unexpected with value " + value + ".");
+                    }
+                } else {
+                    if (expectSuccess) {
+                        error(where + " failed to find an expected row with id " + value + ".");
+                    } else if (getDebug()) error (where + " CORRECTLY DIDN'T RETURN ROW " + value + ".");
+                }
+            }
+
+            };
+        }
+
+    /** AbstractSession to support Jdbc. This session does not automatically
+     * use coordinated transactions. Set transaction id uses the mysql set command
+     * to set the transaction id.
+     * @param connection the jdbc connection to use
+     */
+    public AbstractSession newJdbcSession(final Connection connection) {
+        if (getDebug()) System.out.println("Connection: " + connection);
+        return new AbstractSession() {
+
+            private final String SQL_SHOW_WARNINGS = "SHOW WARNINGS";
+
+            public void begin(String where) throws Exception {
+                connection.setAutoCommit(false);
+            }
+
+            public void delete(String where, int value) throws Exception {
+                deleteBySingleColumn(where + " delete", connection, value, true);
+            }
+
+            public void find(String where, int value, boolean expectSuccess) throws Exception {
+                queryBySingleColumn(where + " find", connection, "id", "=", value, expectSuccess);
+            }
+
+            public void indexScan(String where, int value, boolean expectSuccess) throws Exception {
+                queryBySingleColumn(where + " indexScan", connection, "int_not_null_btree", "=", value, expectSuccess);
+            }
+
+            public void insert(String where, int value) throws Exception {
+                Integer[] values = new Integer[columnNames.length];
+                for (int i = 0; i < columnNames.length; ++i) {
+                    values[i] = value;
+                }
+                insertJDBC(where + " insert", connection, columnNames, values);
+            }
+
+            public void lookup(String where, int value, boolean expectSuccess) throws Exception {
+                queryBySingleColumn(where + " lookup", connection, "int_not_null_hash", "=", value, expectSuccess);
+            }
+
+            public void commit(String where) {
+                try {
+                    if (!connection.isClosed() && !connection.getAutoCommit()) {
+                        connection.commit();
+                    }
+                } catch (SQLException e) {
+                    error(where + " SQLException from commit: " + e.getMessage());
+                } finally {
+                    try {
+                        connection.setAutoCommit(true);
+                    } catch (SQLException e) {
+                        // ignore; can't fail ;-)
+                    }
+                }
+
+            }
+
+            public void rollback(String where) {
+                try {
+                    if (!connection.isClosed() && !connection.getAutoCommit()) {
+                        connection.rollback();
+                   }
+                } catch (SQLException e) {
+                    error(where + " SQLException from rollback: " + e.getMessage());
+                } finally {
+                    try {
+                        connection.setAutoCommit(true);
+                    } catch (SQLException e) {
+                        // ignore; can't fail ;-)
+                    }
+                }
+            }
+
+            public void tableScan(String where, int value, boolean expectSuccess) throws Exception {
+                queryBySingleColumn(where + " tableScan", connection, "int_not_null_none", "=", value, expectSuccess);
+            }
+
+            public void begin(String where, String txid) throws Exception {
+                try {
+                    connection.setAutoCommit(false);
+                    String sql = "set " + SET_TRANSACTION_ID_VARIABLE_NAME + " = ?";
+                    if (getDebug()) System.out.println(where + " sql: " + sql + " with parameter " + txid);
+                    PreparedStatement preparedStatement = connection.prepareStatement(sql);
+                    preparedStatement.setString(1, txid);
+                    int result = preparedStatement.executeUpdate();
+                    if (result != 0) {
+                        error(where + " setTransactionId failed to return any results.");
+                    }
+                } catch (SQLException e) {
+                    // get more information with SHOW WARNINGS
+                    PreparedStatement preparedStatement = connection.prepareStatement(SQL_SHOW_WARNINGS);
+                    ResultSet resultSet = preparedStatement.executeQuery();
+                    String warnings = "";
+                    while (resultSet.next()) {
+                        warnings += " level: " + resultSet.getString(1)
+                        + " code: " + resultSet.getString(2)
+                        + " message: " + resultSet.getString(3)
+                        + "\n";
+                        error(where + " " + warnings);
+                    }
+                    throw new RuntimeException(where + " begin with txid: " + txid + " " + e.getMessage() + warnings);
+                }
+            }
+
+            public String getTransactionId(String where) throws Exception {
+                PreparedStatement preparedStatement = connection.prepareStatement("select " + GET_TRANSACTION_ID_VARIABLE_NAME);
+                ResultSet rs = preparedStatement.executeQuery();
+                if (!rs.next()) {
+                    error(where + " getTransactionId failed to return any results.");
+                }
+                String result = rs.getString(1);
+                if (getDebug()) System.out.println(where + " getTransactionId returned " + result);
+                return result;
+            }
+
+            public void update(String where, int key, int updatedValue) throws Exception {
+                // don't update id column which is the first column name in columnNames
+                Integer[] values = new Integer[columnNames.length];
+                for (int i = 0; i < columnNames.length; ++i) {
+                    values[i] = updatedValue;
+                }
+                updateJDBC(where + " insert", connection, key, columnNames, values);
+            }
+            
+        };
+    }
+
+
+    /** Abstract clusterj sessions, jdbc connections, and clusterjdbc connections.
+     * All operations are defined in this interface. Implementations should result in 
+     * the same operations being performed on the underlying ndb transaction.
+     * find is a primary key operation.
+     * insert is a primary key operation.
+     * lookup is a unique key operation.
+     * delete is a primary key operation.
+     * indexScan is an index scan operation.
+     * tableScan is a table scan operation.
+     */
+    protected interface AbstractSession {
+
+        /** Initialize this session. Set autocommit to false. Begin a transaction.
+         */
+        void begin(String where) throws Exception;
+
+        /** Initialize this session. Set autocommit to false. Begin a transaction
+         * with the specified txid.
+         * @param txid the transaction id for the new transaction
+         */
+        void begin(String where, String txid) throws Exception;
+
+        /** Get the transaction id for this session.
+         * @param where the context
+         * @return the txid
+         */
+        String getTransactionId(String where) throws Exception;
+
+        /** Insert a new row or instance as specified by the INSERT_ID variable.
+         * @param where the context
+         * @param id the row to delete
+         */
+        void insert(String where, int value) throws Exception;
+        
+        /** Delete a row or instance as specified by the DELETE_ID variable.
+         * @param where the context
+         * @param value the row to delete
+         */
+        void delete(String where, int value) throws Exception;
+        
+        /** Update a row or instance as specified by the UPDATE_ID variable.
+         * @param where the context
+         * @param value the row to update
+         * @param updatedValue the new value for all fields
+         */
+        void update(String where, int value, int updatedValue) throws Exception;
+        
+        /** Find a row by primary key specified by the FIND_ID variable.
+         * @param where the context
+         * @param value the row to find
+         * @param expectSuccess whether the row is expected to be found
+         */
+        void find(String where, int value, boolean expectSuccess) throws Exception;
+
+        /** Look up a row by unique key specified by the FIND_ID variable.
+         * @param where the context
+         * @param value the row to look up by unique key
+         * @param expectSuccess whether the row is expected to be found
+         */
+        void lookup(String where, int value, boolean expectSuccess) throws Exception;
+
+        /** Find a row by index scan specified by the FIND_ID variable.
+         * @param where the context
+         * @param id the row to find by index scan
+         * @param expectSuccess whether the row is expected to be found
+         */
+        void indexScan(String where, int value, boolean expectSuccess) throws Exception;
+
+        /** Find a row by table scan specified by the FIND_ID variable.
+         * @param where the context
+         * @param id the row to find by table scan
+         * @param expectSuccess whether the row is expected to be found
+         */
+        void tableScan(String where, int value, boolean expectSuccess) throws Exception;
+
+        /** Commit this session.
+         * @param where the context
+         */
+        void commit(String where);
+
+        /** Clean up this session. If a transaction is active, rollback.
+         * @param where the context
+         */
+        void rollback(String where);
+
+    }
+
+
+    private SessionSPI newSession() {
+        SessionSPI result = (SessionSPI) sessionFactory.getSession();
+        return result;
+    }
+
+    private void insertJDBC(String where, Connection connection, String[] columnNames, Integer[] values) {
+        if (columnNames.length != values.length) {
+            throw new RuntimeException(where + " Error: columnNames and values must have the same length.");
+        }
+        String now = where + " insert";
+        int result = 0;
+        try {
+            StringBuilder buffer = new StringBuilder(" INSERT INTO ");
+            buffer.append(getTableName());
+            buffer.append("(");
+            String separator = "";
+            StringBuilder buffer2 = new StringBuilder("VALUES (");
+            for (String columnName: columnNames) {
+                buffer.append(separator);
+                buffer.append(columnName);
+                buffer2.append(separator);
+                buffer2.append("?");
+                separator = ", ";
+            }
+            buffer.append(")");
+            buffer2.append(")");
+            buffer.append(buffer2);
+            String sql = buffer.toString();
+            if (debug) System.out.println(where + sql);
+            PreparedStatement preparedStatement = connection.prepareStatement(sql);
+            int i = 0;
+            for (Integer value: values) {
+                preparedStatement.setInt(++i, value);
+            }
+            result = preparedStatement.executeUpdate();
+        } catch (SQLException e) {
+            if (getDebug()) e.printStackTrace();
+            throw new RuntimeException(now + " Error: insert threw SQLException "
+                    + e.getMessage());
+        }
+        if (result != 1) {
+            throw new RuntimeException(now + " Error: insert failed to insert row; insert returned " + result); 
+        }
+    }
+
+    private void updateJDBC(String where, Connection connection, int key, String[] columnNames, Integer[] values) {
+        if (columnNames.length != values.length) {
+            throw new RuntimeException(where + " Error: columnNames and values must have the same length.");
+        }
+        // the first name is the id column which gets put at the end
+        String now = where + " update";
+        int result = 0;
+        try {
+            StringBuilder buffer = new StringBuilder(" UPDATE ");
+            buffer.append(getTableName());
+            buffer.append(" SET ");
+            String separator = "";
+            // skip past id column name
+            for (int i = 1; i < columnNames.length; ++i) {
+                buffer.append(separator);
+                buffer.append(columnNames[i]);
+                buffer.append(" = ?");
+                separator = ", ";
+            }
+            buffer.append(" WHERE ");
+            buffer.append(columnNames[0]);
+            buffer.append(" = ?");
+            String sql = buffer.toString();
+            if (debug) System.out.println(where + sql);
+            PreparedStatement preparedStatement = connection.prepareStatement(sql);
+            int i = 0;
+            for (Integer value: values) {
+                preparedStatement.setInt(++i, value);
+            }
+            preparedStatement.setInt(i, key);
+            result = preparedStatement.executeUpdate();
+        } catch (SQLException e) {
+            if (getDebug()) e.printStackTrace();
+            throw new RuntimeException(now + " Error: update threw SQLException "
+                    + e.getMessage());
+        }
+        if (result != 1) {
+            throw new RuntimeException(now + " Error: update failed to update row; update returned " + result); 
+        }
+    }
+
+    /**
+     * @param connection
+     * @param id
+     * @param expectSuccess
+     * @param where
+     */
+    private void queryBySingleColumn(String where, Connection connection, 
+            String columnName, String operator, int value,
+            boolean expectSuccess) {
+        String now = where + " query " + operator + " by column " + columnName + " with value " + value;
+        try {
+            // SELECT id from allprimitives where id = ?
+            StringBuilder buffer = new StringBuilder("SELECT id FROM ");
+            buffer.append(getTableName());
+            buffer.append(" WHERE ");
+            buffer.append(columnName);
+            buffer.append(" ");
+            buffer.append(operator);
+            buffer.append(" ?");
+            String sql = buffer.toString();
+            if (debug) System.out.println(sql);
+            PreparedStatement preparedStatement = connection.prepareStatement(sql);
+            preparedStatement.setInt(1, value);
+            ResultSet rs = preparedStatement.executeQuery();
+            if (rs.next()) {
+                int idColumn = rs.getInt(1);
+                if (!expectSuccess) {
+                    error(now + " incorrectly found instance with id " + idColumn + ".");
+                } else {
+                    if (idColumn != value) {
+                        error(now + " found wrong instance with id " + idColumn + "."); 
+                    } else if (getDebug()) error (where + " CORRECTLY RETURNED ROW " + value + ".");
+                }
+            } else {
+                if (expectSuccess) {
+                    error(now + " failed to find instance.");
+                } else if (getDebug()) error (where + " CORRECTLY DIDN'T RETURN ROW " + value + ".");
+            }
+        } catch (Exception e) {
+            error(now + " Exception " + e.getMessage());
+            session.currentTransaction().setRollbackOnly();
+        }
+    }
+
+    /**
+     * @param connection
+     * @param id
+     * @param expectSuccess
+     * @param where
+     */
+    private void deleteBySingleColumn(String where, Connection connection, 
+            int value, boolean expectSuccess) {
+        String now = where + " delete where id =  " + value;
+        try {
+            // DELETE from allprimitives where id = ?
+            StringBuilder buffer = new StringBuilder("DELETE FROM ");
+            buffer.append(getTableName());
+            buffer.append(" WHERE id = ?");
+            String sql = buffer.toString();
+            if (debug) System.out.println(sql);
+            PreparedStatement preparedStatement = connection.prepareStatement(sql);
+            preparedStatement.setInt(1, value);
+            int result = preparedStatement.executeUpdate();
+            if (result == 1) {
+                if (!expectSuccess) {
+                    error(now + " incorrectly deleted instance.");
+                }
+            } else {
+                if (expectSuccess) {
+                    error(now + " failed to delete instance.");
+                }
+            }
+        } catch (Exception e) {
+            error(now + " resulted in Exception " + e.getMessage());
+            session.currentTransaction().setRollbackOnly();
+        }
+    }
+
+    private void resetCoordinatedTransactionId(String where, Connection connection) {
+        String sql = "set " + SET_TRANSACTION_ID_VARIABLE_NAME + " = ''";
+        PreparedStatement preparedStatement;
+        try {
+            preparedStatement = connection.prepareStatement(sql);
+            int result = preparedStatement.executeUpdate();
+            if (result != 0) {
+                error(where + " resetTransactionId failed to return any results.");
+            }
+        } catch (SQLException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+
+    private void rollback(String where, Connection connection) {
+        // if active, roll back
+        try {
+            connection.rollback();
+        } catch (SQLException e) {
+            // ignore
+        }
+        try {
+            connection.setAutoCommit(true);
+        } catch (SQLException e) {
+            // ignore
+        }
+    }
+
+    private Connection newConnection(String where) {
+        Connection result = getConnection();
+        rollback(where, connection);
+        resetCoordinatedTransactionId(where + " newConnection", result);
+        connection = null;
+        return result;
+    }
+
+    private void printTransactionState(SessionSPI session, String where) {
+        System.out.println(where + " coordinatedTransactionIdentifier: "
+                + session.getCoordinatedTransactionId()
+                + " enlisted?: " + session.isEnlisted());
+    }
+
+    private void waitFor(int millis) {
+        Object monitor = new Object();
+        synchronized(monitor) {
+            try {
+                monitor.wait(millis);
+            } catch (InterruptedException e) {
+                return;
+            }
+        }
+    }
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/pom.xml'
--- a/storage/ndb/clusterj/clusterj-tie/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/pom.xml	2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
   <groupId>com.mysql.clusterj</groupId>
   <artifactId>clusterj-tie</artifactId>
   <packaging>bundle</packaging>
-  <version>7.1.10-SNAPSHOT</version>
+  <version>7.1.10-coord-SNAPSHOT</version>
   <name>ClusterJ Tie</name>
   <description>The ndbj-tie implementation of ClusterJ storage spi</description>
   <build>
@@ -123,13 +123,13 @@
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-api</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
       <groupId>com.mysql.clusterj</groupId>
       <artifactId>clusterj-core</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
     <dependency>
@@ -153,13 +153,13 @@
     <dependency>
     	<groupId>com.mysql.clusterj</groupId>
     	<artifactId>clusterj-test</artifactId>
-        <version>7.1.10-SNAPSHOT</version>
+        <version>7.1.10-coord-SNAPSHOT</version>
     	<scope>test</scope>
     </dependency>
     <dependency>
       <groupId>ndbjtie</groupId>
       <artifactId>ndbjtie</artifactId>
-      <version>7.1.10-SNAPSHOT</version>
+      <version>7.1.10-coord-SNAPSHOT</version>
       <scope>compile</scope>
     </dependency>
   </dependencies>   

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2010-10-28 09:50:56 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java	2010-12-16 17:03:32 +0000
@@ -78,7 +78,7 @@ class ClusterTransactionImpl implements 
     private Dictionary ndbDictionary;
 
     /** The coordinated transaction identifier */
-    private String coordinatedTransactionId = null;
+    private String transactionId = null;
 
     /** Is getCoordinatedTransactionId supported? True until proven false. */
     private static boolean supportsGetCoordinatedTransactionId = true;
@@ -135,11 +135,11 @@ class ClusterTransactionImpl implements 
      */
     private void enlist() {
         if (ndbTransaction == null) {
-            if (coordinatedTransactionId != null) {
-                ndbTransaction = db.joinTransaction(coordinatedTransactionId);
+            if (joinTransactionId != null) {
+                ndbTransaction = db.joinTransaction(joinTransactionId);
             } else {
                 ndbTransaction = partitionKey.enlist(db);
-                getCoordinatedTransactionId(db);
+                getTransactionId(db);
             }
         }
     }
@@ -400,11 +400,11 @@ class ClusterTransactionImpl implements 
     }
 
     public String getCoordinatedTransactionId() {
-        return coordinatedTransactionId;
+        return transactionId;
     }
 
     /** Get the coordinated transaction id if possible and update the field with
-     * the id. If running on a back level system (prior to 7.1.6 for the ndbjtie
+     * the id. If running on a back level system (prior to 7.1.10 for the ndbjtie
      * and native library) the ndbTransaction.getCoordinatedTransactionId() method
      * will throw an Error of some kind (java.lang.NoSuchMethodError or
      * java.lang.UnsatisfiedLinkError) and this will cause this instance
@@ -412,16 +412,14 @@ class ClusterTransactionImpl implements 
      * supportsGetCoordinatedTransactionId) to never try again.
      * @param db the DbImpl instance
      */
-    private void getCoordinatedTransactionId(DbImpl db) {
+    private void getTransactionId(DbImpl db) {
         try {
             if (supportsGetCoordinatedTransactionId) {
-// not implemented quite yet...
-//                ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
-//                coordinatedTransactionId = ndbTransaction.
-//                        getCoordinatedTransactionId(buffer, buffer.capacity());
+                ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
+                transactionId = ndbTransaction.
+                        getCoordinatedTransactionId(buffer, buffer.capacity());
                 if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
-                        + coordinatedTransactionId);
-                throw new ClusterJFatalInternalException("Not Implemented");
+                        + transactionId);
             }
         } catch (Throwable t) {
             // oops, don't do this again
@@ -429,8 +427,8 @@ class ClusterTransactionImpl implements 
         }
     }
 
-    public void setCoordinatedTransactionId(String coordinatedTransactionId) {
-        this.coordinatedTransactionId = coordinatedTransactionId;
+    public void setCoordinatedTransactionId(String joinTransactionId) {
+        this.joinTransactionId = joinTransactionId;
     }
 
     public void setLockMode(LockMode lockmode) {

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2010-10-28 09:50:56 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java	2010-12-16 17:03:32 +0000
@@ -62,11 +62,11 @@ class DbImpl implements com.mysql.cluste
 
     // TODO change the allocation to a constant in ndbjtie
     /** The size of the coordinated transaction identifier buffer */
-    private int coordinatedTransactionIdBufferSize = 26;
+    private final static int COORDINATED_TRANSACTION_ID_SIZE = 36;
 
     /** The coordinated transaction identifier buffer */
     private ByteBuffer coordinatedTransactionIdBuffer =
-            ByteBuffer.allocateDirect(coordinatedTransactionIdBufferSize);
+            ByteBuffer.allocateDirect(COORDINATED_TRANSACTION_ID_SIZE);
 
     // TODO change the allocation to something reasonable
     /** The partition key scratch buffer */
@@ -224,12 +224,11 @@ class DbImpl implements com.mysql.cluste
      * @return a transaction joined to the existing transaction
      */
     public NdbTransaction joinTransaction(String coordinatedTransactionId) {
-        if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
+        if (logger.isDetailEnabled()) logger.detail("JoinTransactionId: "
                 + coordinatedTransactionId);
-//        NdbTransaction result = ndb.joinTransaction(coordinatedTransactionId);
-//        handleError(result, ndb);
-//        return result;
-        throw new ClusterJFatalInternalException("Not Implemented");
+        NdbTransaction result = ndb.joinTransaction(coordinatedTransactionId);
+        handleError(result, ndb);
+        return result;
     }
 
     /** Get the buffer manager for this DbImpl. All operations that need byte buffers

=== added file 'storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java	2010-12-16 17:03:32 +0000
@@ -0,0 +1,36 @@
+/*
+   Copyright (C) 2009 Sun Microsystems Inc.
+   All rights reserved. Use is subject to license terms.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+package com.mysql.clusterj.tie;
+
+public class CoordinatedTransactionVisibilityTest extends testsuite.clusterj.CoordinatedTransactionVisibilityTest {
+
+    public void test() {
+        clusterjVersusClusterj();
+        clusterjVersusJdbc();
+        jdbcVersusClusterj();
+        jdbcVersusJdbc();
+        failOnError();
+    }
+
+    @Override
+    public boolean getDebug() {
+        return false;
+    }
+
+}

=== modified file 'storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java	2010-09-13 10:48:19 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java	2010-12-16 17:03:32 +0000
@@ -3,11 +3,20 @@ package testsuite.clusterj.tie;
 public class CoordinatedTransactionIdVariableTest extends testsuite.clusterj.CoordinatedTransactionIdVariableTest {
 
     public void test() {
-        // checkInitialValue();
-        // causes crash in ndbd currently
-        // checkNewValue();
-        // checkIdAfterTransactionStartAndCommit();
-        // checkIdAfterTransactionStartAndRollback();
-        // failOnError();
+        checkTransactionIdInitialValue();
+        checkJoinTransactionIdInitialValue();
+        checkNewIdResetWithEmptyString();
+        checkNewIdResetWithNullString();
+        checkBadIdTooLong();
+        checkBadIdTooShort();
+        checkIdAfterTransactionStartAndCommit();
+        checkIdAfterTransactionStartAndRollback();
+        failOnError();
     }
+
+    @Override
+    public boolean getDebug() {
+        return false;
+    }
+
 }

=== modified file 'storage/ndb/clusterj/pom.xml'
--- a/storage/ndb/clusterj/pom.xml	2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/pom.xml	2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
   <groupId>com.mysql.clusterj</groupId>
   <artifactId>clusterj-aggregate</artifactId>
   <packaging>pom</packaging>
-  <version>7.1.10-SNAPSHOT</version>
+  <version>7.1.10-coord-SNAPSHOT</version>
   <name>ClusterJ Aggregate</name>
   <description>The aggregate maven project of ClusterJ</description>
   <modules>


Attachment: [text/bzr-bundle] bzr/craig.russell@oracle.com-20101216170332-d5t1qz6o1l4aepdj.bundle
Thread
bzr commit into mysql-5.1-telco-7.1 branch (Craig.Russell:4025)Craig L Russell16 Dec