#At file:///Users/clr/ndb/bzr-repo/mysql-5.1-telco-7.1-coord/ based on revid:craig.russell@stripped
4025 Craig L Russell 2010-12-16
Add coordinated transaction support to clusterj
added:
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java
storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java
modified:
storage/ndb/clusterj/clusterj-api/pom.xml
storage/ndb/clusterj/clusterj-bindings/pom.xml
storage/ndb/clusterj/clusterj-core/pom.xml
storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java
storage/ndb/clusterj/clusterj-jpatest/pom.xml
storage/ndb/clusterj/clusterj-openjpa/pom.xml
storage/ndb/clusterj/clusterj-test/pom.xml
storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java
storage/ndb/clusterj/clusterj-tie/pom.xml
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java
storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java
storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java
storage/ndb/clusterj/pom.xml
=== modified file 'storage/ndb/clusterj/clusterj-api/pom.xml'
--- a/storage/ndb/clusterj/clusterj-api/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-api/pom.xml 2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-api</artifactId>
<packaging>bundle</packaging>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<name>ClusterJ API</name>
<description>The API for ClusterJ</description>
<build>
=== modified file 'storage/ndb/clusterj/clusterj-bindings/pom.xml'
--- a/storage/ndb/clusterj/clusterj-bindings/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-bindings/pom.xml 2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-bindings</artifactId>
<packaging>bundle</packaging>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<name>ClusterJ Bindings</name>
<description>The ndb-bindings implementation of ClusterJ storage spi</description>
<build>
@@ -113,13 +113,13 @@
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-api</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-core</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
@@ -149,7 +149,7 @@
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-test</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
=== modified file 'storage/ndb/clusterj/clusterj-core/pom.xml'
--- a/storage/ndb/clusterj/clusterj-core/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-core/pom.xml 2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-core</artifactId>
<packaging>bundle</packaging>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<name>ClusterJ Core</name>
<description>The core implementation of ClusterJ</description>
<build>
@@ -89,7 +89,7 @@
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-api</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
=== modified file 'storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java'
--- a/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2010-10-28 09:50:56 +0000
+++ b/storage/ndb/clusterj/clusterj-core/src/main/java/com/mysql/clusterj/core/SessionImpl.java 2010-12-16 17:03:32 +0000
@@ -639,6 +639,7 @@ public class SessionImpl implements Sess
clusterTransaction.close();
clusterTransaction = null;
partitionKey = null;
+ joinTransactionId = null;
}
}
@@ -668,6 +669,7 @@ public class SessionImpl implements Sess
}
clusterTransaction = null;
partitionKey = null;
+ joinTransactionId = null;
}
}
@@ -1054,16 +1056,17 @@ public class SessionImpl implements Sess
}
public void flush() {
- if (logger.isDetailEnabled()) logger.detail("flush changes with changeList: " + changeList);
- if (changeList.isEmpty()) {
- return;
- }
- for (StateManager sm: changeList) {
- sm.flush(this);
+ if (logger.isDetailEnabled()) logger.detail("flush changes with changeList size: " + changeList.size());
+ if (!changeList.isEmpty()) {
+ for (StateManager sm: changeList) {
+ sm.flush(this);
+ }
+ changeList.clear();
}
// now flush changes to the back end
- clusterTransaction.executeNoCommit();
- changeList.clear();
+ if (clusterTransaction != null) {
+ clusterTransaction.executeNoCommit();
+ }
}
public List getChangeList() {
@@ -1128,7 +1131,7 @@ public class SessionImpl implements Sess
* @return the coordinatedTransactionId
*/
public String getCoordinatedTransactionId() {
- return clusterTransaction.getCoordinatedTransactionId();
+ return clusterTransaction==null?null:clusterTransaction.getCoordinatedTransactionId();
}
/** Set the coordinatedTransactionId for the next transaction. This
@@ -1136,7 +1139,11 @@ public class SessionImpl implements Sess
* @param coordinatedTransactionId the coordinatedTransactionId
*/
public void setCoordinatedTransactionId(String coordinatedTransactionId) {
- clusterTransaction.setCoordinatedTransactionId(coordinatedTransactionId);
+ if (clusterTransaction != null) {
+ throw new ClusterJUserException(
+ local.message("ERR_Cannot_Set_Join_Transaction_Id_After_Transaction_Begin"));
+ }
+ joinTransactionId = coordinatedTransactionId;
}
/** Set the lock mode for subsequent operations. The lock mode takes effect immediately
=== modified file 'storage/ndb/clusterj/clusterj-jpatest/pom.xml'
--- a/storage/ndb/clusterj/clusterj-jpatest/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-jpatest/pom.xml 2010-12-16 17:03:32 +0000
@@ -29,7 +29,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-jpatest</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<packaging>jar</packaging>
<name>ClusterJ JPA Integration Tests</name>
=== modified file 'storage/ndb/clusterj/clusterj-openjpa/pom.xml'
--- a/storage/ndb/clusterj/clusterj-openjpa/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-openjpa/pom.xml 2010-12-16 17:03:32 +0000
@@ -25,7 +25,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-openjpa</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<packaging>bundle</packaging>
<name>ClusterJ OpenJPA Integration</name>
@@ -122,24 +122,24 @@
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-jpatest</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-api</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-core</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-tie</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
=== modified file 'storage/ndb/clusterj/clusterj-test/pom.xml'
--- a/storage/ndb/clusterj/clusterj-test/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-test/pom.xml 2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-test</artifactId>
<packaging>jar</packaging>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<name>ClusterJ Test Suite</name>
<build>
<plugins>
@@ -91,13 +91,13 @@
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-api</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-core</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
=== modified file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java 2010-09-14 09:43:52 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionIdVariableTest.java 2010-12-16 17:03:32 +0000
@@ -21,20 +21,20 @@ package testsuite.clusterj;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-
-import org.junit.Ignore;
+import java.sql.Types;
/** Test that mysql session variable ndb_coordinated_transaction_id can be
* read and written by jdbc.
*/
-@Ignore
public class CoordinatedTransactionIdVariableTest extends AbstractClusterJTest {
/** Format is Uint32+Uint32:Uint64 */
private String newId = "1+1:9000000000000099";
+ private String badIdTooLong = "123456789012345678901234567890123456";
+ private String badIdTooShort = "1+1";
private String sqlQuery = "select id from t_basic where id = 0";
- private String getTransactionIdVariableName = "@@ndb_transaction_id";
- private String setTransactionIdVariableName = "@@ndb_join_transaction_id";
+ private String transactionIdVariableName = "@@ndb_transaction_id";
+ private String joinTransactionIdVariableName = "@@ndb_join_transaction_id";
@Override
protected void localSetUp() {
@@ -49,42 +49,110 @@ public class CoordinatedTransactionIdVar
return false;
}
- /** Verify that the initial value of the variable ndb_coordinated_transaction_id is null.
+ /** Verify that the initial value of the variable transaction_id is null.
*/
- public void checkInitialValue() {
+ public void checkTransactionIdInitialValue() {
getConnection();
- String id = getJDBCCoordinatedTransactionId("checkInitialValue");
- errorIfNotEqual("Coordinated transaction id must default to null.", null, id);
+ String id = getJDBCTransactionId("checkTransactionIdInitialValue");
+ errorIfNotEqual("Transaction id must default to null.", null, id);
}
- /** Try to set the ndb_coordinated_transaction_id variable to a new value
- * and verify that it can be read back.
+ /** Verify that the initial value of the variable join_transaction_id is null.
*/
- public void checkNewValue() {
+ public void checkJoinTransactionIdInitialValue() {
getConnection();
- // set the coordinated_transaction_id to some random value
- setJDBCCoordinatedTransactionId("checkNewValue", newId);
- String id = getJDBCCoordinatedTransactionId("checkNewValue");
+ String id = getJDBCJoinTransactionId("checkJoinTransactionIdInitialValue");
+ errorIfNotEqual("Join transaction id must default to null.", null, id);
+ }
+
+ /** Verify that you cannot set the value of the variable transaction_id.
+ */
+ public void checkSetTransactionId() {
+ getConnection();
+ setAutoCommit(connection, false);
+ // try set the transaction_id to a good value; expect error 1238 "read only variable"
+ setJDBCVariable("checkSetTransactionId", transactionIdVariableName, newId, 1238);
+ // close the connection so the value isn't accidentally used by a new transaction
+ closeConnection();
+ }
+
+ /** Try to set the join_transaction_id variable to a good value
+ * and verify that it can be read back. A null string is used to reset the value.
+ */
+ public void checkNewIdResetWithNullString() {
+ getConnection();
+ setAutoCommit(connection, false);
+ // set the coordinated_transaction_id to some value
+ setJDBCVariable("checkNewId", joinTransactionIdVariableName, newId, 0);
+ String id = getJDBCJoinTransactionId("checkNewId");
errorIfNotEqual("failed to set coordinated transaction id.", newId, id);
- executeJDBCQuery("checkNewValue");
+ setJDBCVariable("checkNewId", joinTransactionIdVariableName, null, 0);
+ id = getJDBCJoinTransactionId("checkNewId");
+ errorIfNotEqual("failed to set coordinated transaction id to null.", null, id);
// close the connection so the value isn't accidentally used by a new transaction
closeConnection();
}
- /** Verify that after an ndb transaction is started the coordinated transaction id is not null
+ /** Try to set the join_transaction_id variable to a good value
+ * and verify that it can be read back. An empty string is used to reset the value.
+ */
+ public void checkNewIdResetWithEmptyString() {
+ getConnection();
+ setAutoCommit(connection, false);
+ // set the coordinated_transaction_id to some value
+ setJDBCVariable("checkNewId", joinTransactionIdVariableName, newId, 0);
+ String id = getJDBCJoinTransactionId("checkNewId");
+ errorIfNotEqual("failed to set coordinated transaction id.", newId, id);
+ setJDBCVariable("checkNewId", joinTransactionIdVariableName, "", 0);
+ id = getJDBCJoinTransactionId("checkNewId");
+ errorIfNotEqual("failed to set coordinated transaction id to null.", null, id);
+ // close the connection so the value isn't accidentally used by a new transaction
+ closeConnection();
+ }
+
+ /** Try to set the join_transaction_id variable to a bad value
+ * and verify that an exception is thrown.
+ */
+ public void checkBadIdTooLong() {
+ getConnection();
+ setAutoCommit(connection, false);
+ // set the join_transaction_id to a bad value and expect 1210 "Incorrect arguments to SET"
+ setJDBCVariable("checkBadIdTooLong", joinTransactionIdVariableName, badIdTooLong, 1210);
+ String id = getJDBCJoinTransactionId("checkBadIdTooLong");
+ errorIfNotEqual("failed to set coordinated transaction id.", null, id);
+ // close the connection so the value isn't accidentally used by a new transaction
+ closeConnection();
+ }
+
+ /** Try to set the join_transaction_id variable to a bad value
+ * and verify that an exception is thrown.
+ */
+ public void checkBadIdTooShort() {
+ getConnection();
+ setAutoCommit(connection, false);
+ // set the join_transaction_id to a bad value and expect 1210 "Incorrect arguments to SET"
+ setJDBCVariable("checkBadIdTooShort", joinTransactionIdVariableName, badIdTooShort, 1210);
+ String id = getJDBCJoinTransactionId("checkBadIdTooShort");
+ errorIfNotEqual("failed to set coordinated transaction id.", null, id);
+ // close the connection so the value isn't accidentally used by a new transaction
+ closeConnection();
+ }
+
+ /** Verify that after an ndb transaction is started the transaction id is not null
* and is null after commit.
*/
public void checkIdAfterTransactionStartAndCommit() {
getConnection();
- // execute a query statement that will cause the server to start an ndb transaction
+ setAutoCommit(connection, false);
+ // execute a query statement that will cause the server to start an ndb transaction
executeJDBCQuery("checkIdAfterTransactionStartAndCommit");
// the coordinated transaction id should now be available
- String id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndCommit");
+ String id = getJDBCTransactionId("checkIdAfterTransactionStartAndCommit");
// we can only test for not null since we cannot predict the transaction id
- errorIfEqual("Coordinated transaction must not be null after transaction start", null, id);
+ errorIfEqual("Transaction id must not be null after transaction start.", null, id);
commitConnection();
- id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndCommit");
- errorIfNotEqual("Coordinated transaction id must be null after commit.", null, id);
+ id = getJDBCTransactionId("checkIdAfterTransactionStartAndCommit");
+ errorIfNotEqual("Transaction id must be null after commit.", null, id);
}
/** Verify that after an ndb transaction is started the coordinated transaction id is not null
@@ -92,15 +160,16 @@ public class CoordinatedTransactionIdVar
*/
public void checkIdAfterTransactionStartAndRollback() {
getConnection();
+ setAutoCommit(connection, false);
// execute a query statement that will cause the server to start an ndb transaction
executeJDBCQuery("checkIdAfterTransactionStartAndRollback");
// the coordinated transaction id should now be available
- String id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndRollback");
+ String id = getJDBCTransactionId("checkIdAfterTransactionStartAndRollback");
// we can only test for not null since we cannot predict the transaction id
- errorIfEqual("Coordinated transaction must not be null after transaction start", null, id);
+ errorIfEqual("Transaction must not be null after transaction start.", null, id);
rollbackConnection();
- id = getJDBCCoordinatedTransactionId("checkIdAfterTransactionStartAndRollback");
- errorIfNotEqual("Coordinated transaction id must be null after rollback.", null, id);
+ id = getJDBCTransactionId("checkIdAfterTransactionStartAndRollback");
+ errorIfNotEqual("Transaction id must be null after rollback.", null, id);
}
/** Execute a SQL query. Throw away the results. Keep the transaction open.
@@ -133,38 +202,60 @@ public class CoordinatedTransactionIdVar
}
}
- /** Set the coordinated_transaction_id variable in the server.
+ /** Set the variable in the server.
+ * @param where the context
+ * @param variableName the name of the server variable
* @param newId the id to set
*/
- protected void setJDBCCoordinatedTransactionId(String where, String newId) {
- if (newId == null) {
- fail(where + " test case error: coordinated transaction id must not be null.");
- }
+ protected void setJDBCVariable(String where, String variableName, String newId, int expectedErrorCode) {
try {
- String setSql = "set " + setTransactionIdVariableName + " = '" + newId + "'";
- PreparedStatement setCoordinatedTransactionIdStatement = connection.prepareStatement(setSql);
- boolean result = setCoordinatedTransactionIdStatement.execute();
- errorIfNotEqual(where + " set coordinated transaction id returned true.", false, result);
+ String setSql = "set " + variableName + " = ?";
+ PreparedStatement setVariableStatement = connection.prepareStatement(setSql);
+ if (newId == null) {
+ setVariableStatement.setNull(1, Types.VARCHAR);
+ } else {
+ setVariableStatement.setString(1, newId);
+ }
+ boolean resultNotUpdateCount = setVariableStatement.execute();
+ errorIfNotEqual(where + " set join transaction id returned true.", false, resultNotUpdateCount);
+ errorIfNotEqual(where + " set join transaction id failed to throw expected exception " +
+ expectedErrorCode + " for " + newId, expectedErrorCode, 0);
} catch (SQLException e) {
- error(where + " caught exception on set coordinated transaction id:", e);
+ int errorCode = e.getErrorCode();
+ errorIfNotEqual(where + " caught wrong exception on set coordinated transaction id:" +
+ " errorCode: " + errorCode + " SQLState: " + e.getSQLState(), expectedErrorCode, errorCode);
}
}
- /** Get the coordinated_transaction_id variable from the server.
+ /** Get the join_transaction_id variable from the server.
+ * @return the id from the server
+ */
+ protected String getJDBCJoinTransactionId(String where) {
+ String getId = "select " + joinTransactionIdVariableName;
+ String result = executeSelect(where, getId);
+ return result;
+ }
+
+ /** Get the join_transaction_id variable from the server.
* @return the id from the server
*/
- protected String getJDBCCoordinatedTransactionId(String where) {
- String getId = "select " + getTransactionIdVariableName;
+ protected String getJDBCTransactionId(String where) {
+ String getId = "select " + transactionIdVariableName;
+ String result = executeSelect(where, getId);
+ return result;
+ }
+
+ private String executeSelect(String where, String getId) {
String result = null;
try {
PreparedStatement getCoordinatedTransactionIdStatement = connection.prepareStatement(getId);
ResultSet rs = getCoordinatedTransactionIdStatement.executeQuery();
boolean hasResult = rs.next();
- errorIfNotEqual(where + " select coordinated transaction id returned false.", true, hasResult);
+ errorIfNotEqual(where + " select " + getId + " returned false.", true, hasResult);
result = rs.getString(1);
- if (getDebug()) System.out.println(where + " getJDBCCoordinatedTransactionId returns " + result);
+ if (getDebug()) System.out.println(where + " " + getId + " returns " + result);
} catch (SQLException e) {
- error(where + " caught exception on get coordinated transaction id.", e);
+ error(where + " caught exception on select " + getId + ".", e);
}
return result;
}
=== added file 'storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java'
--- a/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-test/src/main/java/testsuite/clusterj/CoordinatedTransactionVisibilityTest.java 2010-12-16 17:03:32 +0000
@@ -0,0 +1,1437 @@
+/*
+ * Copyright (C) 2010 Sun Microsystems, Inc.
+ * All rights reserved. Use is subject to license terms.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; version 2 of the License.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+package testsuite.clusterj;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import testsuite.clusterj.model.AllPrimitives;
+import testsuite.clusterj.model.IdBase;
+
+import com.mysql.clusterj.Query;
+import com.mysql.clusterj.core.query.EqualPredicateImpl;
+import com.mysql.clusterj.core.spi.SessionSPI;
+import com.mysql.clusterj.query.PredicateOperand;
+import com.mysql.clusterj.query.QueryBuilder;
+import com.mysql.clusterj.query.QueryDomainType;
+
+
+/**
+ * Tests coordinated transaction methods interacting between sessions. The sessions can be
+ * any combination of: clusterj sessions, normal jdbc sessions, and clusterjdbc sessions.
+ * This test class defines interfaces that allow any of the three session types to be tested
+ * against the others. The actual tests to be run are defined in the subclasses specific
+ * to each project.
+ *
+ * While it is possible to test the interactions of sessions that are not joined to the
+ * same transaction, these tests usually result in deadlock timeouts and are not useful
+ * for the purpose of joined transaction testing.
+ *
+ * The following are representative tests on sessions T1 and T2. This list is not
+ * exhaustive and additional tests may be written to cover other interesting cases.
+ *
+ * 1. Create an instance in T1. Join the same transaction in T2.
+ * Verify that you can find the new instance by primary key, unique key, index scan, and table scan in T1.
+ * Verify that you can find the new instance by primary key, unique key, index scan, and table scan in T2.
+ *
+ * 2. Delete an instance in T1. Join the same transaction in T2.
+ * Verify that you cannot find it by primary key, unique key, index scan, and table scan in T1.
+ * Verify that you cannot find it by primary key, unique key, index scan, and table scan in T2.
+ *
+ * 3. Update an instance in T1. Join the same transaction in T2.
+ * Verify that you can find it by primary key, unique key, index scan, and table scan in T1.
+ * Verify that you can find it by primary key, unique key, index scan, and table scan in T2.
+ *
+ * 4.
+ */
+public class CoordinatedTransactionVisibilityTest extends AbstractClusterJModelTest{
+
+ protected static final String SET_TRANSACTION_ID_VARIABLE_NAME = "@@ndb_join_transaction_id";
+
+ protected static final String GET_TRANSACTION_ID_VARIABLE_NAME = "@@ndb_transaction_id";
+
+ /** The id (and non-id values) for the inserted row from transaction 1 */
+ private int INSERT_ID1 = 101;
+
+ /** The id (and non-id values) for the inserted row from transaction 2 */
+ private int INSERT_ID2 = 102;
+
+ /** The id for the deleted row for transaction 1 */
+ private int DELETE_ID1 = 1;
+
+ /** The id for the deleted row for transaction 2 */
+ private int DELETE_ID2 = 2;
+
+ /** The id for the updated row */
+ private int UPDATE_ID = 1;
+
+ /** The id for the row to find*/
+ private int FIND_ID = 1;
+
+ /** The id values for the updated row */
+ private int UPDATED_VALUE = 10;
+
+ /** The column names */
+ private String[] columnNames = new String[] {
+ "id",
+ "int_not_null_hash",
+ "int_not_null_btree",
+ "int_not_null_both",
+ "int_not_null_none",
+ "int_null_hash",
+ "int_null_btree",
+ "int_null_both",
+ "int_null_none",
+
+ "byte_not_null_hash",
+ "byte_not_null_btree",
+ "byte_not_null_both",
+ "byte_not_null_none",
+ "byte_null_hash",
+ "byte_null_btree",
+ "byte_null_both",
+ "byte_null_none",
+
+ "short_not_null_hash",
+ "short_not_null_btree",
+ "short_not_null_both",
+ "short_not_null_none",
+ "short_null_hash",
+ "short_null_btree",
+ "short_null_both",
+ "short_null_none",
+
+ "long_not_null_hash",
+ "long_not_null_btree",
+ "long_not_null_both",
+ "long_not_null_none",
+ "long_null_hash",
+ "long_null_btree",
+ "long_null_both",
+ "long_null_none"
+
+ };
+
+ private int NUMBER_OF_INSTANCES = 3;
+
+ private boolean testFind = true;
+ private boolean testLookup = true;
+ private boolean testIndexScan = true;
+ private boolean testTableScan = true;
+
+ public void localSetUp() {
+ super.localSetUp();
+ session = sessionFactory.getSession();
+ createAllPrimitivesInstances(NUMBER_OF_INSTANCES);
+ session.deletePersistentAll(AllPrimitives.class);
+ session.makePersistentAll(instances);
+ }
+
+ @Override
+ public boolean getDebug() {
+ return true;
+ }
+
+ /** Subclasses must override this method to provide the name of the table for the test */
+ protected String getTableName() {
+ return "allprimitives";
+ }
+
+ public void clusterjVersusClusterj() {
+ String where = "clusterjVersusClusterj";
+ AbstractSession session1 = newClusterJSession(newSession());
+ if (getDebug()) System.out.println("session1: " + session1);
+ AbstractSession session2 = newClusterJSession(newSession());
+ if (getDebug()) System.out.println("session2: " + session2);
+ runTests(where, session1, session2);
+ }
+
+ public void jdbcVersusJdbc() {
+ String where = "jdbcVersusJdbc";
+ AbstractSession session1 = newJdbcSession(newConnection(where));
+ if (getDebug()) System.out.println("session1: " + session1);
+ AbstractSession session2 = newJdbcSession(newConnection(where));
+ if (getDebug()) System.out.println("session2: " + session2);
+ runTests(where, session1, session2);
+ }
+
+ public void clusterjVersusJdbc() {
+ String where = "clusterjVersusJdbc";
+ AbstractSession session1 = newClusterJSession(newSession());
+ if (getDebug()) System.out.println("session1: " + session1);
+ AbstractSession session2 = newJdbcSession(newConnection(where));
+ if (getDebug()) System.out.println("session2: " + session2);
+ runTests(where, session1, session2);
+ }
+
+ public void jdbcVersusClusterj() {
+ String where = "jdbcVersusClusterj";
+ AbstractSession session1 = newJdbcSession(newConnection(where));
+ if (getDebug()) System.out.println("session2: " + session1);
+ AbstractSession session2 = newClusterJSession(newSession());
+ if (getDebug()) System.out.println("session1: " + session2);
+ runTests(where, session1, session2);
+ }
+
+ protected void runTests(String where, AbstractSession session1, AbstractSession session2) {
+ findVersusFind(where, session1, session2);
+ findVersusLookup(where, session1, session2);
+ findVersusIndexScan(where, session1, session2);
+ findVersusTableScan(where, session1, session2);
+
+ insertVersusFind(where, session1, session2);
+ insertVersusLookup(where, session1, session2);
+ insertVersusIndexScan(where, session1, session2);
+ insertVersusTableScan(where, session1, session2);
+
+ insertInsertVersusFind(where, session1, session2);
+ insertInsertVersusLookup(where, session1, session2);
+ insertInsertVersusIndexScan(where, session1, session2);
+ insertInsertVersusTableScan(where, session1, session2);
+
+ deleteVersusFind(where, session1, session2);
+ deleteVersusLookup(where, session1, session2);
+ deleteVersusIndexScan(where, session1, session2);
+ deleteVersusTableScan(where, session1, session2);
+
+ deleteDeleteVersusFind(where, session1, session2);
+ deleteDeleteVersusLookup(where, session1, session2);
+ deleteDeleteVersusIndexScan(where, session1, session2);
+ deleteDeleteVersusTableScan(where, session1, session2);
+
+ // cannot run update versus find because there's no API to change primary key in clusterj
+ updateVersusLookup(where, session1, session2);
+ updateVersusIndexScan(where, session1, session2);
+ updateVersusTableScan(where, session1, session2);
+
+ indexScanVersusIndexScan(where, session1, session2);
+ indexScanVersusTableScan(where, session1, session2);
+ tableScanVersusIndexScan(where, session1, session2);
+ tableScanVersusTableScan(where, session1, session2);
+
+ indexScanVersusInsert(where, session1, session2);
+ indexScanVersusDelete(where, session1, session2);
+ indexScanVersusUpdate(where, session1, session2);
+
+ tableScanVersusInsert(where, session1, session2);
+ tableScanVersusDelete(where, session1, session2);
+ tableScanVersusUpdate(where, session1, session2);
+
+ }
+
+ protected void findVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " findVersusFind";
+ try {
+ session1.begin(now + " original transaction");
+ session1.find(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.find(now + " joined transaction", FIND_ID, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void findVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " findVersusLookup";
+ try {
+ session1.begin(now + " original transaction");
+ session1.find(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.lookup(now + " joined transaction", FIND_ID, true);
+ session1.commit(now);
+ session2.commit(now);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void findVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " findVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.find(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", FIND_ID, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void findVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " findVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.find(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", FIND_ID, true);
+ session1.commit(now);
+ session2.commit(now);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertVersusFind";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.find(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.find(now + " joined transaction", INSERT_ID1, true);
+ session1.commit(now);
+ session2.commit(now);
+ session1.delete(now, INSERT_ID1);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertVersusLookup";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.lookup(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.lookup(now + " joined transaction", INSERT_ID1, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.indexScan(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", INSERT_ID1, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.tableScan(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", INSERT_ID1, true);
+ session1.commit(now);
+ session2.commit(now);
+ session1.delete(now, INSERT_ID1);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertInsertVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertInsertVersusFind";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.find(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.find(now + " joined transaction", INSERT_ID1, true);
+ session2.insert(now, INSERT_ID2);
+ session2.find(now + " joined transaction", INSERT_ID2, true);
+ session1.find(now + " original transaction", INSERT_ID2, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertInsertVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertInsertVersusLookup";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.lookup(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.lookup(now + " joined transaction", INSERT_ID1, true);
+ session2.insert(now, INSERT_ID2);
+ session2.lookup(now + " joined transaction", INSERT_ID2, true);
+ session1.lookup(now + " original transaction", INSERT_ID2, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertInsertVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertInsertVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.indexScan(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", INSERT_ID1, true);
+ session2.insert(now, INSERT_ID2);
+ session2.indexScan(now + " joined transaction", INSERT_ID2, true);
+ session1.indexScan(now + " original transaction", INSERT_ID2, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void insertInsertVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " insertInsertVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.insert(now, INSERT_ID1);
+ session1.tableScan(now + " original transaction", INSERT_ID1, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", INSERT_ID1, true);
+ session2.insert(now, INSERT_ID2);
+ session2.tableScan(now + " joined transaction", INSERT_ID2, true);
+ session1.tableScan(now + " original transaction", INSERT_ID2, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void updateVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " updateVersusLookup";
+ try {
+ session1.begin(now + " original transaction");
+ session1.update(now, UPDATE_ID, UPDATED_VALUE);
+ session1.lookup(now + " original transaction", UPDATE_ID, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.lookup(now + " joined transaction", UPDATE_ID, false);
+ session1.commit(now);
+ session2.commit(now);
+ session1.update(now, UPDATE_ID, UPDATE_ID);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void updateVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " updateVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.update(now, UPDATE_ID, UPDATED_VALUE);
+ session1.indexScan(now + " original transaction", UPDATE_ID, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", UPDATE_ID, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void updateVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " updateVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.update(now, UPDATE_ID, UPDATED_VALUE);
+ session1.tableScan(now + " original transaction", UPDATE_ID, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", UPDATE_ID, false);
+ session1.commit(now);
+ session2.commit(now);
+ session1.update(now, UPDATE_ID, UPDATE_ID);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteVersusFind";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.find(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.find(now + " joined transaction", DELETE_ID1, false);
+ session1.commit(now);
+ session2.commit(now);
+ session1.insert(now, DELETE_ID1);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteVersusLookup";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.lookup(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.lookup(now + " joined transaction", DELETE_ID1, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.indexScan(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", DELETE_ID1, false);
+ session1.commit(now);
+ session2.commit(now);
+ session1.insert(now, DELETE_ID1);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.tableScan(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", DELETE_ID1, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteDeleteVersusFind(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteDeleteVersusFind";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.find(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.find(now + " joined transaction", DELETE_ID1, false);
+ session2.delete(now, DELETE_ID2);
+ session2.find(now + " joined transaction", DELETE_ID2, false);
+ session1.find(now + " original transaction", DELETE_ID2, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteDeleteVersusLookup(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteDeleteVersusLookup";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.lookup(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.lookup(now + " joined transaction", DELETE_ID1, false);
+ session2.delete(now, DELETE_ID2);
+ session2.lookup(now + " joined transaction", DELETE_ID2, false);
+ session1.lookup(now + " original transaction", DELETE_ID2, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteDeleteVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteDeleteVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.indexScan(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", DELETE_ID1, false);
+ session2.delete(now, DELETE_ID2);
+ session2.indexScan(now + " joined transaction", DELETE_ID2, false);
+ session1.indexScan(now + " original transaction", DELETE_ID2, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void deleteDeleteVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " deleteDeleteVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.delete(now, DELETE_ID1);
+ session1.tableScan(now + " original transaction", DELETE_ID1, false);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", DELETE_ID1, false);
+ session2.delete(now, DELETE_ID2);
+ session2.tableScan(now + " joined transaction", DELETE_ID2, false);
+ session1.tableScan(now + " original transaction", DELETE_ID2, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void indexScanVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " indexScanVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.indexScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", FIND_ID, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void indexScanVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " indexScanVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.indexScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", FIND_ID, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void tableScanVersusIndexScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " tableScanVersusIndexScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.tableScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.indexScan(now + " joined transaction", FIND_ID, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void tableScanVersusTableScan(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " tableScanVersusTableScan";
+ try {
+ session1.begin(now + " original transaction");
+ session1.tableScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.tableScan(now + " joined transaction", FIND_ID, true);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void indexScanVersusInsert(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " indexScanVersusInsert";
+ try {
+ session1.begin(now + " original transaction");
+ session1.indexScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.insert(now + " joined transaction", INSERT_ID2);
+ session1.find(now + " original transaction", INSERT_ID2, true);
+ session2.find(now + " joined transaction", INSERT_ID2, true);
+ session2.commit(now + " joined transaction");
+ session1.commit(now + " original transaction");
+ session2.delete(now + " joined transaction", INSERT_ID2);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void indexScanVersusDelete(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " indexScanVersusDelete";
+ try {
+ session1.begin(now + " original transaction");
+ session1.indexScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.delete(now + " joined transaction", DELETE_ID2);
+ session1.lookup(now + " original transaction", DELETE_ID2, false);
+ session2.lookup(now + " joined transaction", DELETE_ID2, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void indexScanVersusUpdate(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " indexScanVersusUpdate";
+ try {
+ session1.begin(now + " original transaction");
+ session1.indexScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.update(now + " joined transaction", UPDATE_ID, UPDATED_VALUE);
+ session1.lookup(now + " original transaction", UPDATE_ID, false);
+ session2.lookup(now + " joined transaction", UPDATE_ID, false);
+ session2.commit(now + " joined transaction");
+ session1.commit(now + " original transaction");
+ session2.update(now + " joined transaction", UPDATE_ID, UPDATE_ID);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void tableScanVersusInsert(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " tableScanVersusInsert";
+ try {
+ session1.begin(now + " original transaction");
+ session1.tableScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.insert(now + " joined transaction", INSERT_ID2);
+ session1.find(now + " original transaction", INSERT_ID2, true);
+ session2.find(now + " joined transaction", INSERT_ID2, true);
+ session2.commit(now + " joined transaction");
+ session1.commit(now + " original transaction");
+ session2.delete(now + " joined transaction", INSERT_ID2);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void tableScanVersusDelete(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " tableScanVersusDelete";
+ try {
+ session1.begin(now + " original transaction");
+ session1.tableScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.delete(now + " joined transaction", DELETE_ID2);
+ session1.lookup(now + " original transaction", DELETE_ID2, false);
+ session2.lookup(now + " joined transaction", DELETE_ID2, false);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ protected void tableScanVersusUpdate(String where, AbstractSession session1, AbstractSession session2) {
+ String now = where + " tableScanVersusUpdate";
+ try {
+ session1.begin(now + " original transaction");
+ session1.tableScan(now + " original transaction", FIND_ID, true);
+ String transactionId = session1.getTransactionId(now);
+ if (getDebug()) System.out.println(now + " transactionId: " + transactionId);
+ session2.begin(now + " joined transaction", transactionId);
+ session2.update(now + " joined transaction", UPDATE_ID, UPDATED_VALUE);
+ session1.lookup(now + " original transaction", UPDATE_ID, false);
+ session2.lookup(now + " joined transaction", UPDATE_ID, false);
+ session2.commit(now + " joined transaction");
+ session1.commit(now + " original transaction");
+ session2.update(now + " joined transaction", UPDATE_ID, UPDATE_ID);
+ } catch (Exception ex) {
+ error(now, ex);
+ } finally {
+ session1.rollback(now);
+ session2.rollback(now);
+ }
+ }
+
+ public AbstractSession newClusterJSession(final SessionSPI session) {
+ return new AbstractSession() {
+
+ public void begin(String where) throws Exception {
+ session.begin();
+ }
+
+ public void begin(String where, String txid) throws Exception {
+ session.setCoordinatedTransactionId(txid);
+ session.begin();
+ }
+
+ public void delete(String where, int value) throws Exception {
+ session.deletePersistent(AllPrimitives.class, value);
+ session.flush();
+ }
+
+ public void find(String where, int value, boolean expectSuccess) throws Exception {
+ AllPrimitives result = session.find(AllPrimitives.class, value);
+ checkResult(where + " find", value, expectSuccess, result);
+ }
+
+ public String getTransactionId(String where) throws Exception {
+ return session.getCoordinatedTransactionId();
+ }
+
+ public void indexScan(String where, int value, boolean expectSuccess) throws Exception {
+ List<AllPrimitives> result = query(where + " index scan", AllPrimitives.class, "int_not_null_btree", value);
+ if (getDebug()) System.out.println(where + " index scan " + result.size());
+ checkResults(where + " index scan", value, expectSuccess, result);
+ }
+
+ public void insert(String where, int value) throws Exception {
+ AllPrimitives instance = createAllPrimitiveInstance(session, value);
+ session.makePersistent(instance);
+ session.flush();
+ }
+
+ public void lookup(String where, int value, boolean expectSuccess) throws Exception {
+ List<AllPrimitives> result = query(where + " lookup", AllPrimitives.class, "int_not_null_hash", value);
+ checkResults(where + " lookup", value, expectSuccess, result);
+ }
+
+ public void commit(String where) {
+ if (session.currentTransaction().isActive()) session.commit();
+ }
+
+ public void rollback(String where) {
+ if (session.currentTransaction().isActive()) session.rollback();
+ }
+
+ public void tableScan(String where, int value, boolean expectSuccess) throws Exception {
+ List<AllPrimitives> result = query(where + " table scan", AllPrimitives.class, "int_not_null_none", value);
+ if (getDebug()) System.out.println(where + " table scan " + result.size());
+ checkResults(where + " table scan", value, expectSuccess, result);
+ }
+
+ public void update(String where, int value, int updatedValue) throws Exception {
+ AllPrimitives instance = session.find(AllPrimitives.class, value);
+ initialize(instance, updatedValue);
+ // restore id so update works
+ instance.setId(value);
+ session.updatePersistent(instance);
+ session.flush();
+ }
+
+ private <T extends IdBase> List<T> query(String where, Class<T> cls, String fieldName, int value) {
+ List<T> results = null;
+ QueryBuilder builder = session.getQueryBuilder();
+ QueryDomainType<T> queryDomainType = builder.createQueryDefinition(cls);
+ PredicateOperand field = queryDomainType.get(fieldName);
+ PredicateOperand parameter = queryDomainType.param(fieldName);
+ queryDomainType.where(field.equal(parameter));
+ Query<T> query = session.createQuery(queryDomainType);
+ query.setParameter(fieldName, value);
+ results = query.getResultList();
+ return results;
+ }
+
+ private <T extends IdBase> void checkResults(
+ String where, int value, boolean expectSuccess, List<T> results) {
+ int size = results.size();
+ if (size > 1) {
+ error(where + " returned wrong number of results: " + size);
+ return;
+ }
+ T result = null;
+ if (size == 1) {
+ result = results.get(0);
+ }
+ checkResult(where, value, expectSuccess, result);
+ }
+
+ private <T extends IdBase> void checkResult(
+ String where, int value, boolean expectSuccess, T result) {
+ if (result != null) {
+ int actual = result.getId();
+ if (expectSuccess) {
+ if (actual != value) {
+ error(where + " returned wrong row; expected " + value + " got " + actual + ".");
+ } else if (getDebug()) error (where + " CORRECTLY RETURNED ROW " + value + ".");
+ } else {
+ error(where + " returned a row that was unexpected with value " + value + ".");
+ }
+ } else {
+ if (expectSuccess) {
+ error(where + " failed to find an expected row with id " + value + ".");
+ } else if (getDebug()) error (where + " CORRECTLY DIDN'T RETURN ROW " + value + ".");
+ }
+ }
+
+ };
+ }
+
+ /** AbstractSession to support Jdbc. This session does not automatically
+ * use coordinated transactions. Set transaction id uses the mysql set command
+ * to set the transaction id.
+ * @param connection the jdbc connection to use
+ */
+ public AbstractSession newJdbcSession(final Connection connection) {
+ if (getDebug()) System.out.println("Connection: " + connection);
+ return new AbstractSession() {
+
+ private final String SQL_SHOW_WARNINGS = "SHOW WARNINGS";
+
+ public void begin(String where) throws Exception {
+ connection.setAutoCommit(false);
+ }
+
+ public void delete(String where, int value) throws Exception {
+ deleteBySingleColumn(where + " delete", connection, value, true);
+ }
+
+ public void find(String where, int value, boolean expectSuccess) throws Exception {
+ queryBySingleColumn(where + " find", connection, "id", "=", value, expectSuccess);
+ }
+
+ public void indexScan(String where, int value, boolean expectSuccess) throws Exception {
+ queryBySingleColumn(where + " indexScan", connection, "int_not_null_btree", "=", value, expectSuccess);
+ }
+
+ public void insert(String where, int value) throws Exception {
+ Integer[] values = new Integer[columnNames.length];
+ for (int i = 0; i < columnNames.length; ++i) {
+ values[i] = value;
+ }
+ insertJDBC(where + " insert", connection, columnNames, values);
+ }
+
+ public void lookup(String where, int value, boolean expectSuccess) throws Exception {
+ queryBySingleColumn(where + " lookup", connection, "int_not_null_hash", "=", value, expectSuccess);
+ }
+
+ public void commit(String where) {
+ try {
+ if (!connection.isClosed() && !connection.getAutoCommit()) {
+ connection.commit();
+ }
+ } catch (SQLException e) {
+ error(where + " SQLException from commit: " + e.getMessage());
+ } finally {
+ try {
+ connection.setAutoCommit(true);
+ } catch (SQLException e) {
+ // ignore; can't fail ;-)
+ }
+ }
+
+ }
+
+ public void rollback(String where) {
+ try {
+ if (!connection.isClosed() && !connection.getAutoCommit()) {
+ connection.rollback();
+ }
+ } catch (SQLException e) {
+ error(where + " SQLException from rollback: " + e.getMessage());
+ } finally {
+ try {
+ connection.setAutoCommit(true);
+ } catch (SQLException e) {
+ // ignore; can't fail ;-)
+ }
+ }
+ }
+
+ public void tableScan(String where, int value, boolean expectSuccess) throws Exception {
+ queryBySingleColumn(where + " tableScan", connection, "int_not_null_none", "=", value, expectSuccess);
+ }
+
+ public void begin(String where, String txid) throws Exception {
+ try {
+ connection.setAutoCommit(false);
+ String sql = "set " + SET_TRANSACTION_ID_VARIABLE_NAME + " = ?";
+ if (getDebug()) System.out.println(where + " sql: " + sql + " with parameter " + txid);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ preparedStatement.setString(1, txid);
+ int result = preparedStatement.executeUpdate();
+ if (result != 0) {
+ error(where + " setTransactionId failed to return any results.");
+ }
+ } catch (SQLException e) {
+ // get more information with SHOW WARNINGS
+ PreparedStatement preparedStatement = connection.prepareStatement(SQL_SHOW_WARNINGS);
+ ResultSet resultSet = preparedStatement.executeQuery();
+ String warnings = "";
+ while (resultSet.next()) {
+ warnings += " level: " + resultSet.getString(1)
+ + " code: " + resultSet.getString(2)
+ + " message: " + resultSet.getString(3)
+ + "\n";
+ error(where + " " + warnings);
+ }
+ throw new RuntimeException(where + " begin with txid: " + txid + " " + e.getMessage() + warnings);
+ }
+ }
+
+ public String getTransactionId(String where) throws Exception {
+ PreparedStatement preparedStatement = connection.prepareStatement("select " + GET_TRANSACTION_ID_VARIABLE_NAME);
+ ResultSet rs = preparedStatement.executeQuery();
+ if (!rs.next()) {
+ error(where + " getTransactionId failed to return any results.");
+ }
+ String result = rs.getString(1);
+ if (getDebug()) System.out.println(where + " getTransactionId returned " + result);
+ return result;
+ }
+
+ public void update(String where, int key, int updatedValue) throws Exception {
+ // don't update id column which is the first column name in columnNames
+ Integer[] values = new Integer[columnNames.length];
+ for (int i = 0; i < columnNames.length; ++i) {
+ values[i] = updatedValue;
+ }
+ updateJDBC(where + " insert", connection, key, columnNames, values);
+ }
+
+ };
+ }
+
+
+ /** Abstract clusterj sessions, jdbc connections, and clusterjdbc connections.
+ * All operations are defined in this interface. Implementations should result in
+ * the same operations being performed on the underlying ndb transaction.
+ * find is a primary key operation.
+ * insert is a primary key operation.
+ * lookup is a unique key operation.
+ * delete is a primary key operation.
+ * indexScan is an index scan operation.
+ * tableScan is a table scan operation.
+ */
+ protected interface AbstractSession {
+
+ /** Initialize this session. Set autocommit to false. Begin a transaction.
+ */
+ void begin(String where) throws Exception;
+
+ /** Initialize this session. Set autocommit to false. Begin a transaction
+ * with the specified txid.
+ * @param txid the transaction id for the new transaction
+ */
+ void begin(String where, String txid) throws Exception;
+
+ /** Get the transaction id for this session.
+ * @param where the context
+ * @return the txid
+ */
+ String getTransactionId(String where) throws Exception;
+
+ /** Insert a new row or instance as specified by the INSERT_ID variable.
+ * @param where the context
+ * @param id the row to delete
+ */
+ void insert(String where, int value) throws Exception;
+
+ /** Delete a row or instance as specified by the DELETE_ID variable.
+ * @param where the context
+ * @param value the row to delete
+ */
+ void delete(String where, int value) throws Exception;
+
+ /** Update a row or instance as specified by the UPDATE_ID variable.
+ * @param where the context
+ * @param value the row to update
+ * @param updatedValue the new value for all fields
+ */
+ void update(String where, int value, int updatedValue) throws Exception;
+
+ /** Find a row by primary key specified by the FIND_ID variable.
+ * @param where the context
+ * @param value the row to find
+ * @param expectSuccess whether the row is expected to be found
+ */
+ void find(String where, int value, boolean expectSuccess) throws Exception;
+
+ /** Look up a row by unique key specified by the FIND_ID variable.
+ * @param where the context
+ * @param value the row to look up by unique key
+ * @param expectSuccess whether the row is expected to be found
+ */
+ void lookup(String where, int value, boolean expectSuccess) throws Exception;
+
+ /** Find a row by index scan specified by the FIND_ID variable.
+ * @param where the context
+ * @param id the row to find by index scan
+ * @param expectSuccess whether the row is expected to be found
+ */
+ void indexScan(String where, int value, boolean expectSuccess) throws Exception;
+
+ /** Find a row by table scan specified by the FIND_ID variable.
+ * @param where the context
+ * @param id the row to find by table scan
+ * @param expectSuccess whether the row is expected to be found
+ */
+ void tableScan(String where, int value, boolean expectSuccess) throws Exception;
+
+ /** Commit this session.
+ * @param where the context
+ */
+ void commit(String where);
+
+ /** Clean up this session. If a transaction is active, rollback.
+ * @param where the context
+ */
+ void rollback(String where);
+
+ }
+
+
+ private SessionSPI newSession() {
+ SessionSPI result = (SessionSPI) sessionFactory.getSession();
+ return result;
+ }
+
+ private void insertJDBC(String where, Connection connection, String[] columnNames, Integer[] values) {
+ if (columnNames.length != values.length) {
+ throw new RuntimeException(where + " Error: columnNames and values must have the same length.");
+ }
+ String now = where + " insert";
+ int result = 0;
+ try {
+ StringBuilder buffer = new StringBuilder(" INSERT INTO ");
+ buffer.append(getTableName());
+ buffer.append("(");
+ String separator = "";
+ StringBuilder buffer2 = new StringBuilder("VALUES (");
+ for (String columnName: columnNames) {
+ buffer.append(separator);
+ buffer.append(columnName);
+ buffer2.append(separator);
+ buffer2.append("?");
+ separator = ", ";
+ }
+ buffer.append(")");
+ buffer2.append(")");
+ buffer.append(buffer2);
+ String sql = buffer.toString();
+ if (debug) System.out.println(where + sql);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ int i = 0;
+ for (Integer value: values) {
+ preparedStatement.setInt(++i, value);
+ }
+ result = preparedStatement.executeUpdate();
+ } catch (SQLException e) {
+ if (getDebug()) e.printStackTrace();
+ throw new RuntimeException(now + " Error: insert threw SQLException "
+ + e.getMessage());
+ }
+ if (result != 1) {
+ throw new RuntimeException(now + " Error: insert failed to insert row; insert returned " + result);
+ }
+ }
+
+ private void updateJDBC(String where, Connection connection, int key, String[] columnNames, Integer[] values) {
+ if (columnNames.length != values.length) {
+ throw new RuntimeException(where + " Error: columnNames and values must have the same length.");
+ }
+ // the first name is the id column which gets put at the end
+ String now = where + " update";
+ int result = 0;
+ try {
+ StringBuilder buffer = new StringBuilder(" UPDATE ");
+ buffer.append(getTableName());
+ buffer.append(" SET ");
+ String separator = "";
+ // skip past id column name
+ for (int i = 1; i < columnNames.length; ++i) {
+ buffer.append(separator);
+ buffer.append(columnNames[i]);
+ buffer.append(" = ?");
+ separator = ", ";
+ }
+ buffer.append(" WHERE ");
+ buffer.append(columnNames[0]);
+ buffer.append(" = ?");
+ String sql = buffer.toString();
+ if (debug) System.out.println(where + sql);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ int i = 0;
+ for (Integer value: values) {
+ preparedStatement.setInt(++i, value);
+ }
+ preparedStatement.setInt(i, key);
+ result = preparedStatement.executeUpdate();
+ } catch (SQLException e) {
+ if (getDebug()) e.printStackTrace();
+ throw new RuntimeException(now + " Error: update threw SQLException "
+ + e.getMessage());
+ }
+ if (result != 1) {
+ throw new RuntimeException(now + " Error: update failed to update row; update returned " + result);
+ }
+ }
+
+ /**
+ * @param connection
+ * @param id
+ * @param expectSuccess
+ * @param where
+ */
+ private void queryBySingleColumn(String where, Connection connection,
+ String columnName, String operator, int value,
+ boolean expectSuccess) {
+ String now = where + " query " + operator + " by column " + columnName + " with value " + value;
+ try {
+ // SELECT id from allprimitives where id = ?
+ StringBuilder buffer = new StringBuilder("SELECT id FROM ");
+ buffer.append(getTableName());
+ buffer.append(" WHERE ");
+ buffer.append(columnName);
+ buffer.append(" ");
+ buffer.append(operator);
+ buffer.append(" ?");
+ String sql = buffer.toString();
+ if (debug) System.out.println(sql);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ preparedStatement.setInt(1, value);
+ ResultSet rs = preparedStatement.executeQuery();
+ if (rs.next()) {
+ int idColumn = rs.getInt(1);
+ if (!expectSuccess) {
+ error(now + " incorrectly found instance with id " + idColumn + ".");
+ } else {
+ if (idColumn != value) {
+ error(now + " found wrong instance with id " + idColumn + ".");
+ } else if (getDebug()) error (where + " CORRECTLY RETURNED ROW " + value + ".");
+ }
+ } else {
+ if (expectSuccess) {
+ error(now + " failed to find instance.");
+ } else if (getDebug()) error (where + " CORRECTLY DIDN'T RETURN ROW " + value + ".");
+ }
+ } catch (Exception e) {
+ error(now + " Exception " + e.getMessage());
+ session.currentTransaction().setRollbackOnly();
+ }
+ }
+
+ /**
+ * @param connection
+ * @param id
+ * @param expectSuccess
+ * @param where
+ */
+ private void deleteBySingleColumn(String where, Connection connection,
+ int value, boolean expectSuccess) {
+ String now = where + " delete where id = " + value;
+ try {
+ // DELETE from allprimitives where id = ?
+ StringBuilder buffer = new StringBuilder("DELETE FROM ");
+ buffer.append(getTableName());
+ buffer.append(" WHERE id = ?");
+ String sql = buffer.toString();
+ if (debug) System.out.println(sql);
+ PreparedStatement preparedStatement = connection.prepareStatement(sql);
+ preparedStatement.setInt(1, value);
+ int result = preparedStatement.executeUpdate();
+ if (result == 1) {
+ if (!expectSuccess) {
+ error(now + " incorrectly deleted instance.");
+ }
+ } else {
+ if (expectSuccess) {
+ error(now + " failed to delete instance.");
+ }
+ }
+ } catch (Exception e) {
+ error(now + " resulted in Exception " + e.getMessage());
+ session.currentTransaction().setRollbackOnly();
+ }
+ }
+
+ private void resetCoordinatedTransactionId(String where, Connection connection) {
+ String sql = "set " + SET_TRANSACTION_ID_VARIABLE_NAME + " = ''";
+ PreparedStatement preparedStatement;
+ try {
+ preparedStatement = connection.prepareStatement(sql);
+ int result = preparedStatement.executeUpdate();
+ if (result != 0) {
+ error(where + " resetTransactionId failed to return any results.");
+ }
+ } catch (SQLException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ private void rollback(String where, Connection connection) {
+ // if active, roll back
+ try {
+ connection.rollback();
+ } catch (SQLException e) {
+ // ignore
+ }
+ try {
+ connection.setAutoCommit(true);
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+
+ private Connection newConnection(String where) {
+ Connection result = getConnection();
+ rollback(where, connection);
+ resetCoordinatedTransactionId(where + " newConnection", result);
+ connection = null;
+ return result;
+ }
+
+ private void printTransactionState(SessionSPI session, String where) {
+ System.out.println(where + " coordinatedTransactionIdentifier: "
+ + session.getCoordinatedTransactionId()
+ + " enlisted?: " + session.isEnlisted());
+ }
+
+ private void waitFor(int millis) {
+ Object monitor = new Object();
+ synchronized(monitor) {
+ try {
+ monitor.wait(millis);
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+ }
+}
=== modified file 'storage/ndb/clusterj/clusterj-tie/pom.xml'
--- a/storage/ndb/clusterj/clusterj-tie/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/pom.xml 2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-tie</artifactId>
<packaging>bundle</packaging>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<name>ClusterJ Tie</name>
<description>The ndbj-tie implementation of ClusterJ storage spi</description>
<build>
@@ -123,13 +123,13 @@
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-api</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-core</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
<dependency>
@@ -153,13 +153,13 @@
<dependency>
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-test</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ndbjtie</groupId>
<artifactId>ndbjtie</artifactId>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2010-10-28 09:50:56 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/ClusterTransactionImpl.java 2010-12-16 17:03:32 +0000
@@ -78,7 +78,7 @@ class ClusterTransactionImpl implements
private Dictionary ndbDictionary;
/** The coordinated transaction identifier */
- private String coordinatedTransactionId = null;
+ private String transactionId = null;
/** Is getCoordinatedTransactionId supported? True until proven false. */
private static boolean supportsGetCoordinatedTransactionId = true;
@@ -135,11 +135,11 @@ class ClusterTransactionImpl implements
*/
private void enlist() {
if (ndbTransaction == null) {
- if (coordinatedTransactionId != null) {
- ndbTransaction = db.joinTransaction(coordinatedTransactionId);
+ if (joinTransactionId != null) {
+ ndbTransaction = db.joinTransaction(joinTransactionId);
} else {
ndbTransaction = partitionKey.enlist(db);
- getCoordinatedTransactionId(db);
+ getTransactionId(db);
}
}
}
@@ -400,11 +400,11 @@ class ClusterTransactionImpl implements
}
public String getCoordinatedTransactionId() {
- return coordinatedTransactionId;
+ return transactionId;
}
/** Get the coordinated transaction id if possible and update the field with
- * the id. If running on a back level system (prior to 7.1.6 for the ndbjtie
+ * the id. If running on a back level system (prior to 7.1.10 for the ndbjtie
* and native library) the ndbTransaction.getCoordinatedTransactionId() method
* will throw an Error of some kind (java.lang.NoSuchMethodError or
* java.lang.UnsatisfiedLinkError) and this will cause this instance
@@ -412,16 +412,14 @@ class ClusterTransactionImpl implements
* supportsGetCoordinatedTransactionId) to never try again.
* @param db the DbImpl instance
*/
- private void getCoordinatedTransactionId(DbImpl db) {
+ private void getTransactionId(DbImpl db) {
try {
if (supportsGetCoordinatedTransactionId) {
-// not implemented quite yet...
-// ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
-// coordinatedTransactionId = ndbTransaction.
-// getCoordinatedTransactionId(buffer, buffer.capacity());
+ ByteBuffer buffer = db.getCoordinatedTransactionIdBuffer();
+ transactionId = ndbTransaction.
+ getCoordinatedTransactionId(buffer, buffer.capacity());
if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
- + coordinatedTransactionId);
- throw new ClusterJFatalInternalException("Not Implemented");
+ + transactionId);
}
} catch (Throwable t) {
// oops, don't do this again
@@ -429,8 +427,8 @@ class ClusterTransactionImpl implements
}
}
- public void setCoordinatedTransactionId(String coordinatedTransactionId) {
- this.coordinatedTransactionId = coordinatedTransactionId;
+ public void setCoordinatedTransactionId(String joinTransactionId) {
+ this.joinTransactionId = joinTransactionId;
}
public void setLockMode(LockMode lockmode) {
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2010-10-28 09:50:56 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/main/java/com/mysql/clusterj/tie/DbImpl.java 2010-12-16 17:03:32 +0000
@@ -62,11 +62,11 @@ class DbImpl implements com.mysql.cluste
// TODO change the allocation to a constant in ndbjtie
/** The size of the coordinated transaction identifier buffer */
- private int coordinatedTransactionIdBufferSize = 26;
+ private final static int COORDINATED_TRANSACTION_ID_SIZE = 36;
/** The coordinated transaction identifier buffer */
private ByteBuffer coordinatedTransactionIdBuffer =
- ByteBuffer.allocateDirect(coordinatedTransactionIdBufferSize);
+ ByteBuffer.allocateDirect(COORDINATED_TRANSACTION_ID_SIZE);
// TODO change the allocation to something reasonable
/** The partition key scratch buffer */
@@ -224,12 +224,11 @@ class DbImpl implements com.mysql.cluste
* @return a transaction joined to the existing transaction
*/
public NdbTransaction joinTransaction(String coordinatedTransactionId) {
- if (logger.isDetailEnabled()) logger.detail("CoordinatedTransactionId: "
+ if (logger.isDetailEnabled()) logger.detail("JoinTransactionId: "
+ coordinatedTransactionId);
-// NdbTransaction result = ndb.joinTransaction(coordinatedTransactionId);
-// handleError(result, ndb);
-// return result;
- throw new ClusterJFatalInternalException("Not Implemented");
+ NdbTransaction result = ndb.joinTransaction(coordinatedTransactionId);
+ handleError(result, ndb);
+ return result;
}
/** Get the buffer manager for this DbImpl. All operations that need byte buffers
=== added file 'storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/test/java/com/mysql/clusterj/tie/CoordinatedTransactionVisibilityTest.java 2010-12-16 17:03:32 +0000
@@ -0,0 +1,36 @@
+/*
+ Copyright (C) 2009 Sun Microsystems Inc.
+ All rights reserved. Use is subject to license terms.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+package com.mysql.clusterj.tie;
+
+public class CoordinatedTransactionVisibilityTest extends testsuite.clusterj.CoordinatedTransactionVisibilityTest {
+
+ public void test() {
+ clusterjVersusClusterj();
+ clusterjVersusJdbc();
+ jdbcVersusClusterj();
+ jdbcVersusJdbc();
+ failOnError();
+ }
+
+ @Override
+ public boolean getDebug() {
+ return false;
+ }
+
+}
=== modified file 'storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java'
--- a/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java 2010-09-13 10:48:19 +0000
+++ b/storage/ndb/clusterj/clusterj-tie/src/test/java/testsuite/clusterj/tie/CoordinatedTransactionIdVariableTest.java 2010-12-16 17:03:32 +0000
@@ -3,11 +3,20 @@ package testsuite.clusterj.tie;
public class CoordinatedTransactionIdVariableTest extends testsuite.clusterj.CoordinatedTransactionIdVariableTest {
public void test() {
- // checkInitialValue();
- // causes crash in ndbd currently
- // checkNewValue();
- // checkIdAfterTransactionStartAndCommit();
- // checkIdAfterTransactionStartAndRollback();
- // failOnError();
+ checkTransactionIdInitialValue();
+ checkJoinTransactionIdInitialValue();
+ checkNewIdResetWithEmptyString();
+ checkNewIdResetWithNullString();
+ checkBadIdTooLong();
+ checkBadIdTooShort();
+ checkIdAfterTransactionStartAndCommit();
+ checkIdAfterTransactionStartAndRollback();
+ failOnError();
}
+
+ @Override
+ public boolean getDebug() {
+ return false;
+ }
+
}
=== modified file 'storage/ndb/clusterj/pom.xml'
--- a/storage/ndb/clusterj/pom.xml 2010-12-14 18:31:20 +0000
+++ b/storage/ndb/clusterj/pom.xml 2010-12-16 17:03:32 +0000
@@ -22,7 +22,7 @@
<groupId>com.mysql.clusterj</groupId>
<artifactId>clusterj-aggregate</artifactId>
<packaging>pom</packaging>
- <version>7.1.10-SNAPSHOT</version>
+ <version>7.1.10-coord-SNAPSHOT</version>
<name>ClusterJ Aggregate</name>
<description>The aggregate maven project of ClusterJ</description>
<modules>
Attachment: [text/bzr-bundle] bzr/craig.russell@oracle.com-20101216170332-d5t1qz6o1l4aepdj.bundle
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.1 branch (Craig.Russell:4025) | Craig L Russell | 16 Dec |