List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:September 12 2012 1:07pm
Subject:bzr push into mysql-5.1-telco-7.0-coord branch (jonas.oreland:4961 to 4962)
View as plain text  
 4962 Jonas Oreland	2012-09-12
      commit for Club

    added:
      mysql-test/suite/ndb/r/ndb_coord.result
      mysql-test/suite/ndb/r/ndb_coord2.result
      mysql-test/suite/ndb/r/ndb_coord3.result
      mysql-test/suite/ndb/r/ndb_coord4.result
      mysql-test/suite/ndb/r/ndb_coord_commit_rollback.result
      mysql-test/suite/ndb/r/ndb_coord_err.result
      mysql-test/suite/ndb/t/ndb_coord.test
      mysql-test/suite/ndb/t/ndb_coord2.test
      mysql-test/suite/ndb/t/ndb_coord3.test
      mysql-test/suite/ndb/t/ndb_coord4.test
      mysql-test/suite/ndb/t/ndb_coord_commit_rollback.test
      mysql-test/suite/ndb/t/ndb_coord_err.test
    modified:
      mysql-test/suite/ndb/r/ndb_basic.result
      mysql-test/suite/ndb/r/ndbinfo.result
      mysql-test/suite/ndb/t/ndbinfo.test
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster.h
      storage/ndb/include/kernel/signaldata/ScanTab.hpp
      storage/ndb/include/kernel/signaldata/SignalData.hpp
      storage/ndb/include/kernel/signaldata/TcCommit.hpp
      storage/ndb/include/kernel/signaldata/TcKeyConf.hpp
      storage/ndb/include/kernel/signaldata/TcKeyReq.hpp
      storage/ndb/include/kernel/signaldata/TcRollbackRep.hpp
      storage/ndb/include/ndb_version.h.in
      storage/ndb/include/ndbapi/Ndb.hpp
      storage/ndb/include/ndbapi/NdbScanOperation.hpp
      storage/ndb/include/ndbapi/NdbTransaction.hpp
      storage/ndb/include/ndbapi/ndbapi_limits.h
      storage/ndb/src/common/debugger/signaldata/Makefile.am
      storage/ndb/src/common/debugger/signaldata/ScanTab.cpp
      storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp
      storage/ndb/src/common/debugger/signaldata/TcKeyConf.cpp
      storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp
      storage/ndb/src/common/debugger/signaldata/TcRollbackRep.cpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/ndbapi/Ndb.cpp
      storage/ndb/src/ndbapi/NdbApiSignal.cpp
      storage/ndb/src/ndbapi/NdbOperationExec.cpp
      storage/ndb/src/ndbapi/NdbScanOperation.cpp
      storage/ndb/src/ndbapi/NdbTransaction.cpp
      storage/ndb/src/ndbapi/Ndberr.cpp
      storage/ndb/src/ndbapi/ndberror.c
      storage/ndb/test/ndbapi/Makefile.am
      storage/ndb/test/ndbapi/testBasic.cpp
      storage/ndb/test/ndbapi/testNdbApi.cpp
      storage/ndb/test/ndbapi/testScan.cpp
      storage/ndb/test/run-test/daily-basic-tests.txt
 4961 Jonas Oreland	2012-09-12 [merge]
      merge main

    modified:
      mysql-test/mysql-test-run.pl
      mysql-test/r/ctype_cp932_binlog_stm.result
      mysql-test/t/ctype_cp932_binlog_stm.test
      storage/ndb/include/kernel/signaldata/DbspjErr.hpp
      storage/ndb/include/kernel/signaldata/DihScanTab.hpp
      storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp
      storage/ndb/include/kernel/signaldata/SchemaTrans.hpp
      storage/ndb/include/kernel/signaldata/TcContinueB.hpp
      storage/ndb/include/ndb_version.h.in
      storage/ndb/include/util/NdbOut.hpp
      storage/ndb/src/kernel/blocks/backup/Backup.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjProxy.hpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/blocks/suma/Suma.cpp
      storage/ndb/src/mgmapi/LocalConfig.cpp
      storage/ndb/src/mgmapi/LocalConfig.hpp
      storage/ndb/src/ndbapi/ndberror.c
      storage/ndb/test/include/NDBT_Stats.hpp
      storage/ndb/test/ndbapi/testDict.cpp
      storage/ndb/test/ndbapi/testRedo.cpp
      storage/ndb/test/ndbapi/testScan.cpp
      storage/ndb/test/run-test/daily-basic-tests.txt
=== modified file '.bzr-mysql/default.conf'
--- a/.bzr-mysql/default.conf	2012-06-07 09:06:47 +0000
+++ b/.bzr-mysql/default.conf	2012-08-22 09:41:37 +0000
@@ -1,4 +1,4 @@
 [MYSQL]
 post_commit_to = commits@stripped
 post_push_to = commits@stripped
-tree_name = mysql-5.1-telco-7.0
+tree_name = mysql-5.1-telco-7.0-coord

=== modified file 'mysql-test/suite/ndb/r/ndb_basic.result'
--- a/mysql-test/suite/ndb/r/ndb_basic.result	2012-06-11 10:23:01 +0000
+++ b/mysql-test/suite/ndb/r/ndb_basic.result	2012-09-12 13:07:04 +0000
@@ -112,6 +112,7 @@ ndb_index_stat_cache_entries	#
 ndb_index_stat_enable	#
 ndb_index_stat_option	#
 ndb_index_stat_update_freq	#
+ndb_join_transaction_id	#
 ndb_log_apply_status	#
 ndb_log_bin	#
 ndb_log_binlog_index	#
@@ -128,6 +129,7 @@ ndb_report_thresh_binlog_epoch_slip	#
 ndb_report_thresh_binlog_mem_usage	#
 ndb_table_no_logging	#
 ndb_table_temporary	#
+ndb_transaction_id	#
 ndb_use_copying_alter_table	#
 ndb_use_exact_count	#
 ndb_use_transactions	#

=== added file 'mysql-test/suite/ndb/r/ndb_coord.result'
--- a/mysql-test/suite/ndb/r/ndb_coord.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_coord.result	2012-09-12 13:07:04 +0000
@@ -0,0 +1,96 @@
+DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
+drop database if exists mysqltest;
+CREATE TABLE t1 (
+pk INT NOT NULL PRIMARY KEY,
+attr1 INT NOT NULL,
+attr2 INT,
+attr3 VARCHAR(10),
+unique(attr2)
+) ENGINE=ndbcluster;
+begin;
+insert into t1 values(1,1,1,'a');
+begin;
+select * from t1;
+pk	attr1	attr2	attr3
+1	1	1	a
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+1	1	1	a
+select * from t1 where attr2 = 1;
+pk	attr1	attr2	attr3
+1	1	1	a
+select * from t1 where pk >= 1;
+pk	attr1	attr2	attr3
+1	1	1	a
+delete from t1;
+select * from t1;
+pk	attr1	attr2	attr3
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+select * from t1 where attr2 = 1;
+pk	attr1	attr2	attr3
+select * from t1 where pk >= 1;
+pk	attr1	attr2	attr3
+insert into t1 values(2,2,2,'b');
+select * from t1;
+pk	attr1	attr2	attr3
+2	2	2	b
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+select * from t1 where attr2 = 1;
+pk	attr1	attr2	attr3
+select * from t1 where pk >= 1;
+pk	attr1	attr2	attr3
+2	2	2	b
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+2	2	2	b
+select * from t1 where attr2 = 2;
+pk	attr1	attr2	attr3
+2	2	2	b
+select * from t1 where pk >= 2;
+pk	attr1	attr2	attr3
+2	2	2	b
+update t1 set attr3 = 'c';
+select * from t1;
+pk	attr1	attr2	attr3
+2	2	2	c
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+select * from t1 where attr2 = 1;
+pk	attr1	attr2	attr3
+select * from t1 where pk >= 1;
+pk	attr1	attr2	attr3
+2	2	2	c
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+2	2	2	c
+select * from t1 where attr2 = 2;
+pk	attr1	attr2	attr3
+2	2	2	c
+select * from t1 where pk >= 2;
+pk	attr1	attr2	attr3
+2	2	2	c
+update t1 set attr2 = 1;
+select * from t1;
+pk	attr1	attr2	attr3
+2	2	1	c
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+select * from t1 where attr2 = 1;
+pk	attr1	attr2	attr3
+2	2	1	c
+select * from t1 where pk >= 1;
+pk	attr1	attr2	attr3
+2	2	1	c
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+2	2	1	c
+select * from t1 where attr2 = 2;
+pk	attr1	attr2	attr3
+select * from t1 where pk >= 2;
+pk	attr1	attr2	attr3
+2	2	1	c
+rollback;
+rollback;
+drop table t1;

=== added file 'mysql-test/suite/ndb/r/ndb_coord2.result'
--- a/mysql-test/suite/ndb/r/ndb_coord2.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_coord2.result	2012-09-12 13:07:04 +0000
@@ -0,0 +1,182 @@
+CREATE TABLE t1 (
+pk INT NOT NULL PRIMARY KEY,
+attr1 INT NOT NULL,
+attr2 INT,
+attr3 VARCHAR(10),
+unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+insert into t1 values(2,2,2,'2');
+insert into t1 values(3,3,3,'3');
+insert into t1 values(4,4,4,'4');
+OI5-JI6-JR1-OS6-JS5
+begin;
+insert into t1 values(5,5,5,'5');
+begin;
+set ndb_join_transaction_id='<tid>';
+insert into t1 values(6,6,6,'6');
+key read should find record
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+index scan should find record inserted by other
+select * from t1 where pk > 5 and pk <= 6;
+pk	attr1	attr2	attr3
+6	6	6	6
+index scan should find record inserted by other
+select * from t1 where pk > 4 and pk <= 5;
+pk	attr1	attr2	attr3
+5	5	5	5
+commit;
+commit;
+OI7-JI8-OR8-JS7
+begin;
+insert into t1 values(7,7,7,'7');
+begin;
+set ndb_join_transaction_id='<tid>';
+insert into t1 values(8,8,8,'8');
+key read should find record inserted by other
+select * from t1 where pk = 8;
+pk	attr1	attr2	attr3
+8	8	8	8
+index scan should find record inserted by other
+select * from t1 where pk > 6 and pk <= 7;
+pk	attr1	attr2	attr3
+7	7	7	7
+commit;
+commit;
+OS1-JI10-OS10
+begin;
+select * from t1 where pk > 0 and pk <= 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+begin;
+set ndb_join_transaction_id='<tid>';
+insert into t1 values(10,10,10,'10');
+index scan should find record inserted by other
+select * from t1 where pk > 9 and pk <= 10;
+pk	attr1	attr2	attr3
+10	10	10	10
+commit;
+commit;
+OS1-JR2-OI11-JR11
+begin;
+select * from t1 where pk > 0 and pk <= 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+begin;
+set ndb_join_transaction_id='<tid>';
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+2	2	2	2
+insert into t1 values(11,11,11,'11');
+key read should find record inserted by other
+select * from t1 where pk = 11;
+pk	attr1	attr2	attr3
+11	11	11	11
+commit;
+commit;
+OS1-JI12-OS12-OI13-JR13
+begin;
+select * from t1 where pk > 0 and pk <= 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+begin;
+set ndb_join_transaction_id='<tid>';
+insert into t1 values(12,12,12,'12');
+index scan should find record inserted by other
+select * from t1 where pk > 11 and pk <= 12;
+pk	attr1	attr2	attr3
+12	12	12	12
+insert into t1 values(13,13,13,'13');
+key read should find record inserted by other
+select * from t1 where pk = 13;
+pk	attr1	attr2	attr3
+13	13	13	13
+commit;
+commit;
+OI14-JS14-JI15-OS15
+begin;
+insert into t1 values(14,14,14,'14');
+begin;
+set ndb_join_transaction_id='<tid>';
+select * from t1 where pk > 13 and pk <=14;
+pk	attr1	attr2	attr3
+14	14	14	14
+insert into t1 values(15,15,15,'15');
+index scan should find record inserted by other
+select * from t1 where pk > 14 and pk <= 15;
+pk	attr1	attr2	attr3
+15	15	15	15
+commit;
+commit;
+OS1-JS2-OI17-JR17
+begin;
+select * from t1 where pk <= 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+begin;
+set ndb_join_transaction_id='<tid>';
+select * from t1 where pk > 1 and pk <= 2;
+pk	attr1	attr2	attr3
+2	2	2	2
+insert into t1 values(17,17,17,'17');
+key read should find record inserted by other
+select * from t1 where pk = 17;
+pk	attr1	attr2	attr3
+17	17	17	17
+commit;
+commit;
+OS1-JS2-OI19-JS19
+begin;
+select * from t1 where pk <= 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+begin;
+set ndb_join_transaction_id='<tid>';
+select * from t1 where pk > 1 and pk <= 2;
+pk	attr1	attr2	attr3
+2	2	2	2
+insert into t1 values(19,19,19,'19');
+index scan should find record inserted by other
+select * from t1 where pk > 18 and pk <= 19;
+pk	attr1	attr2	attr3
+19	19	19	19
+commit;
+commit;
+OS1-JS2-JI20-OS20
+begin;
+select * from t1 where pk <= 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+begin;
+set ndb_join_transaction_id='<tid>';
+index scan should find record
+select * from t1 where pk > 1 and pk <= 2;
+pk	attr1	attr2	attr3
+2	2	2	2
+insert into t1 values(20,20,20,'20');
+index scan should find record inserted by other
+select * from t1 where pk > 19 and pk <= 20;
+pk	attr1	attr2	attr3
+20	20	20	20
+commit;
+commit;
+OS1-JS2-JI22-OR22
+begin;
+select * from t1 where pk <= 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+begin;
+set ndb_join_transaction_id='<tid>';
+select * from t1 where pk > 1 and pk <= 2;
+pk	attr1	attr2	attr3
+2	2	2	2
+insert into t1 values(22,22,22,'22');
+key read should find record inserted by other
+select * from t1 where pk = 22;
+pk	attr1	attr2	attr3
+22	22	22	22
+commit;
+commit;
+drop table t1;

=== added file 'mysql-test/suite/ndb/r/ndb_coord3.result'
--- a/mysql-test/suite/ndb/r/ndb_coord3.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_coord3.result	2012-09-12 13:07:04 +0000
@@ -0,0 +1,37 @@
+CREATE TABLE t1 (
+pk INT NOT NULL PRIMARY KEY,
+attr1 INT NOT NULL,
+attr2 INT,
+attr3 VARCHAR(10),
+unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+select @@ndb_transaction_id is NULL;
+@@ndb_transaction_id is NULL
+1
+begin;
+select @@ndb_transaction_id is NULL;
+@@ndb_transaction_id is NULL
+1
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+select @@ndb_transaction_id is not NULL;
+@@ndb_transaction_id is not NULL
+1
+rollback;
+select @@ndb_transaction_id is NULL;
+@@ndb_transaction_id is NULL
+1
+begin;
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+1	1	1	1
+select @@ndb_transaction_id is not NULL;
+@@ndb_transaction_id is not NULL
+1
+commit;
+select @@ndb_transaction_id is NULL;
+@@ndb_transaction_id is NULL
+1
+drop table t1;

=== added file 'mysql-test/suite/ndb/r/ndb_coord4.result'
--- a/mysql-test/suite/ndb/r/ndb_coord4.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_coord4.result	2012-09-12 13:07:04 +0000
@@ -0,0 +1,70 @@
+CREATE TABLE t1 (
+pk INT NOT NULL PRIMARY KEY,
+attr1 INT NOT NULL,
+attr2 INT,
+attr3 VARCHAR(10),
+unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+begin;
+insert into t1 values(2,2,2,'2');
+begin;
+set ndb_join_transaction_id='<tid>';
+update t1 set attr1 = 3 where pk = 2;
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+2	3	2	2
+commit;
+commit;
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+2	3	2	2
+begin;
+insert into t1 values(3,3,3,'3');
+begin;
+set ndb_join_transaction_id='<tid>';
+update t1 set attr1 = 4 where pk = 3;
+select * from t1 where pk = 3;
+pk	attr1	attr2	attr3
+3	4	3	3
+commit;
+commit;
+select * from t1 where pk = 3;
+pk	attr1	attr2	attr3
+3	4	3	3
+begin;
+insert into t1 values(4,4,4,'4');
+begin;
+set ndb_join_transaction_id='<tid>';
+update t1 set attr1 = 5 where pk = 4;
+select * from t1 where pk = 4;
+pk	attr1	attr2	attr3
+4	5	4	4
+rollback;
+rollback;
+select * from t1 where pk = 4;
+pk	attr1	attr2	attr3
+begin;
+insert into t1 values(5,5,5,'5');
+begin;
+set ndb_join_transaction_id='<tid>';
+update t1 set attr1 = 6 where pk = 5;
+select * from t1 where pk = 5;
+pk	attr1	attr2	attr3
+5	6	5	5
+rollback;
+rollback;
+select * from t1 where pk = 5;
+pk	attr1	attr2	attr3
+rollback;
+delete from t1;
+begin;
+select * from t1 where pk = 1;
+pk	attr1	attr2	attr3
+begin;
+set ndb_join_transaction_id='<tid>';
+update t1 set attr1 = 5 where pk = 1;
+update t1 set attr1 = 5 where pk = 1;
+rollback;
+rollback;
+drop table t1;

=== added file 'mysql-test/suite/ndb/r/ndb_coord_commit_rollback.result'
--- a/mysql-test/suite/ndb/r/ndb_coord_commit_rollback.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_coord_commit_rollback.result	2012-09-12 13:07:04 +0000
@@ -0,0 +1,37 @@
+CREATE TABLE t1 (
+pk INT NOT NULL PRIMARY KEY,
+attr1 INT NOT NULL,
+attr2 INT,
+attr3 VARCHAR(10),
+unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+begin;
+insert into t1 values(2,2,2,'2');
+begin;
+set ndb_join_transaction_id='<tid>';
+update t1 set attr1 = 3 where pk = 2;
+commit;
+rollback;
+ERROR HY000: Got error 260 'Coordinated transaction already aborted' from NDBCLUSTER
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+begin;
+insert into t1 values(2,2,2,'2');
+begin;
+set ndb_join_transaction_id='<tid>';
+update t1 set attr1 = 3 where pk = 2;
+rollback;
+commit;
+ERROR HY000: Got error 260 'Coordinated transaction already aborted' from NDBCLUSTER
+select * from t1 where pk = 2;
+pk	attr1	attr2	attr3
+begin;
+select * from t1 where pk = 1000 for update;
+pk	attr1	attr2	attr3
+begin;
+set ndb_join_transaction_id='<tid>';
+insert into t1 values(2,2,2,'2');
+commit;
+commit;
+drop table t1;

=== added file 'mysql-test/suite/ndb/r/ndb_coord_err.result'
--- a/mysql-test/suite/ndb/r/ndb_coord_err.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_coord_err.result	2012-09-12 13:07:04 +0000
@@ -0,0 +1,29 @@
+CREATE TABLE t1 (
+pk INT NOT NULL PRIMARY KEY,
+attr1 INT NOT NULL,
+attr2 INT,
+attr3 VARCHAR(10),
+unique(attr2)
+) ENGINE=ndbcluster;
+set ndb_join_transaction_id='kalleanka';
+ERROR HY000: Incorrect arguments to SET
+show warnings;
+Level	Code	Message
+Error	1210	Malformed coordinated transaction identifier (correct is "%x:%x:%x:%x:%x")
+Error	1210	Incorrect arguments to SET
+set ndb_join_transaction_id='99:99:99:99:99';
+ERROR HY000: Incorrect arguments to SET
+show warnings;
+Level	Code	Message
+Error	1210	Malformed coordinated transaction identifier (node id out of range)
+Error	1210	Incorrect arguments to SET
+begin;
+insert into t1 values(1,1,1,'a');
+ERROR HY000: Incorrect arguments to SET
+begin;
+set ndb_join_transaction_id=NULL;
+select @@ndb_join_transaction_id;
+@@ndb_join_transaction_id
+NULL
+rollback;
+drop table t1;

=== modified file 'mysql-test/suite/ndb/r/ndbinfo.result'
--- a/mysql-test/suite/ndb/r/ndbinfo.result	2011-12-13 15:24:44 +0000
+++ b/mysql-test/suite/ndb/r/ndbinfo.result	2012-09-12 13:07:04 +0000
@@ -432,8 +432,10 @@ insert into t1 values (1);
 select state, count_operations, outstanding_operations,
 IF(client_node_id <= 255, "<client_node_id>", "<incorrect node id>") 
   client_node_id
-from server_transactions;
+from server_transactions order by 1,2,3,4;
 state	count_operations	outstanding_operations	client_node_id
+Started	0	0	<client_node_id>
+Started	0	0	<client_node_id>
 Started	1	0	<client_node_id>
 select node_id, operation_type, state,
 IF(tc_node_id <= 48, "<tc_node_id>", "<incorrect nodeid>") tc_node_id,

=== added file 'mysql-test/suite/ndb/t/ndb_coord.test'
--- a/mysql-test/suite/ndb/t/ndb_coord.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_coord.test	2012-09-12 13:07:04 +0000
@@ -0,0 +1,94 @@
+-- source include/have_ndb.inc
+
+--disable_warnings
+DROP TABLE IF EXISTS t1,t2,t3,t4,t5,t6,t7;
+drop database if exists mysqltest;
+--enable_warnings
+
+#
+# Create a normal table with primary key
+#
+CREATE TABLE t1 (
+  pk INT NOT NULL PRIMARY KEY,
+  attr1 INT NOT NULL,
+  attr2 INT,
+  attr3 VARCHAR(10),
+  unique(attr2)
+) ENGINE=ndbcluster;
+
+connect (con1,localhost,root,,test);
+connect (con2,localhost,root,,test);
+
+connection con1;
+begin;
+insert into t1 values(1,1,1,'a');
+let $tid = `select @@ndb_transaction_id`;
+
+connection con2;
+begin;
+disable_query_log;
+eval set ndb_join_transaction_id='$tid';
+enable_query_log;
+
+connection con1;
+select * from t1;
+select * from t1 where pk = 1;
+select * from t1 where attr2 = 1;
+select * from t1 where pk >= 1;
+
+connection con1;
+delete from t1;
+
+connection con2;
+select * from t1;
+select * from t1 where pk = 1;
+select * from t1 where attr2 = 1;
+select * from t1 where pk >= 1;
+
+connection con1;
+insert into t1 values(2,2,2,'b');
+
+connection con2;
+select * from t1;
+select * from t1 where pk = 1;
+select * from t1 where attr2 = 1;
+select * from t1 where pk >= 1;
+
+select * from t1 where pk = 2;
+select * from t1 where attr2 = 2;
+select * from t1 where pk >= 2;
+
+connection con1;
+update t1 set attr3 = 'c';
+
+connection con2;
+select * from t1;
+select * from t1 where pk = 1;
+select * from t1 where attr2 = 1;
+select * from t1 where pk >= 1;
+
+select * from t1 where pk = 2;
+select * from t1 where attr2 = 2;
+select * from t1 where pk >= 2;
+
+connection con1;
+update t1 set attr2 = 1;
+
+connection con2;
+select * from t1;
+select * from t1 where pk = 1;
+select * from t1 where attr2 = 1;
+select * from t1 where pk >= 1;
+
+select * from t1 where pk = 2;
+select * from t1 where attr2 = 2;
+select * from t1 where pk >= 2;
+
+connection con1;
+rollback;
+
+connection con2;
+rollback;
+
+connection con1;
+drop table t1;

=== added file 'mysql-test/suite/ndb/t/ndb_coord2.test'
--- a/mysql-test/suite/ndb/t/ndb_coord2.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_coord2.test	2012-09-12 13:07:04 +0000
@@ -0,0 +1,386 @@
+-- source include/have_ndb.inc
+
+# The rationale for the tests is that there are different cases for joining
+# transactions depending on whether the original transaction is a key op
+# or a scan op, and the joining transaction is a key op or a scan op.
+# Subsequently, the original transaction can perform a key op or a scan op.
+# The four base cases have up to four sub cases. Each sub case exercises
+# a different code path in the tc node.
+
+# Once both the original transaction and the joining transaction have
+# executed key ops, there is no longer any unique code to be executed in tc.
+
+#
+# Create a normal table with primary key
+#
+CREATE TABLE t1 (
+  pk INT NOT NULL PRIMARY KEY,
+  attr1 INT NOT NULL,
+  attr2 INT,
+  attr3 VARCHAR(10),
+  unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+insert into t1 values(2,2,2,'2');
+insert into t1 values(3,3,3,'3');
+insert into t1 values(4,4,4,'4');
+
+connect (con1,localhost,root,,test);
+connect (con2,localhost,root,,test);
+
+# A. Joining transaction key op to original transaction key op OK-JK
+# OK-JK-JK-OS-JS
+--echo OI5-JI6-JR1-OS6-JS5
+
+# OI5
+connection con1;
+begin;
+insert into t1 values(5,5,5,'5');
+let $tid = `select @@ndb_transaction_id`;
+
+# JI6
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+insert into t1 values(6,6,6,'6');
+
+# JR1
+connection con2;
+--echo key read should find record
+select * from t1 where pk = 1;
+
+# OS6
+connection con1;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 5 and pk <= 6;
+
+# JS5
+connection con2;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 4 and pk <= 5;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# OK-JK-OK-JS
+--echo OI7-JI8-OR8-JS7
+
+# OI7
+connection con1;
+begin;
+insert into t1 values(7,7,7,'7');
+let $tid = `select @@ndb_transaction_id`;
+
+# JI8
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+insert into t1 values(8,8,8,'8');
+
+# OR8
+connection con1;
+--echo key read should find record inserted by other
+select * from t1 where pk = 8;
+
+# JS7
+connection con2;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 6 and pk <= 7;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# B. Joining transaction key op to original transaction scan op OS-JK
+# OS-JK-OS
+--echo OS1-JI10-OS10
+
+# OS1
+connection con1;
+begin;
+select * from t1 where pk > 0 and pk <= 1;
+let $tid = `select @@ndb_transaction_id`;
+
+# JI10
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+insert into t1 values(10,10,10,'10');
+
+# OS10
+connection con1;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 9 and pk <= 10;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# OS-JK-OK-JK
+--echo OS1-JR2-OI11-JR11
+
+# OS1
+connection con1;
+begin;
+select * from t1 where pk > 0 and pk <= 1;
+let $tid = `select @@ndb_transaction_id`;
+
+# JR2
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+select * from t1 where pk = 2;
+
+# OI11
+connection con1;
+insert into t1 values(11,11,11,'11');
+
+# JR11
+connection con2;
+--echo key read should find record inserted by other
+select * from t1 where pk = 11;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# OS-JK-OS-OK-JK
+--echo OS1-JI12-OS12-OI13-JR13
+
+# OS1
+connection con1;
+begin;
+select * from t1 where pk > 0 and pk <= 1;
+let $tid = `select @@ndb_transaction_id`;
+
+# JI12
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+insert into t1 values(12,12,12,'12');
+
+# OS12
+connection con1;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 11 and pk <= 12;
+
+# OI13
+connection con1;
+insert into t1 values(13,13,13,'13');
+
+# JR13
+connection con2;
+--echo key read should find record inserted by other
+select * from t1 where pk = 13;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# C. Joining transaction scan op to original transaction key op OK-JS
+# OK-JS-JK-OS
+--echo OI14-JS14-JI15-OS15
+
+# OI14
+connection con1;
+begin;
+insert into t1 values(14,14,14,'14');
+let $tid = `select @@ndb_transaction_id`;
+
+# JS14
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+select * from t1 where pk > 13 and pk <=14;
+
+# JI15
+connection con2;
+insert into t1 values(15,15,15,'15');
+
+# OS15
+connection con1;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 14 and pk <= 15;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# D. Joining transaction scan op to original transaction scan op OS-JS
+# OS-JS-OK-JK
+--echo OS1-JS2-OI17-JR17
+
+# OS1
+connection con1;
+begin;
+select * from t1 where pk <= 1;
+let $tid = `select @@ndb_transaction_id`;
+
+# JS2
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+select * from t1 where pk > 1 and pk <= 2;
+
+# OI17
+connection con1;
+insert into t1 values(17,17,17,'17');
+
+# JR17
+connection con2;
+--echo key read should find record inserted by other
+select * from t1 where pk = 17;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# OS-JS-OK-JS
+--echo OS1-JS2-OI19-JS19
+
+# OS1
+connection con1;
+begin;
+select * from t1 where pk <= 1;
+let $tid = `select @@ndb_transaction_id`;
+
+# JS2
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+select * from t1 where pk > 1 and pk <= 2;
+
+# OI19
+connection con1;
+insert into t1 values(19,19,19,'19');
+
+# JS19
+connection con2;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 18 and pk <= 19;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# OS-JS-JK-OK
+--echo OS1-JS2-JI20-OS20
+
+# OS1
+connection con1;
+begin;
+select * from t1 where pk <= 1;
+let $tid = `select @@ndb_transaction_id`;
+
+# JS2
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+--echo index scan should find record
+select * from t1 where pk > 1 and pk <= 2;
+
+# JI20
+connection con2;
+insert into t1 values(20,20,20,'20');
+
+# OS20
+connection con1;
+--echo index scan should find record inserted by other
+select * from t1 where pk > 19 and pk <= 20;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+# OS-JS-JK-OS
+--echo OS1-JS2-JI22-OR22
+
+# OS1
+connection con1;
+begin;
+select * from t1 where pk <= 1;
+let $tid = `select @@ndb_transaction_id`;
+
+# JS2
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+select * from t1 where pk > 1 and pk <= 2;
+
+# JI22
+connection con2;
+insert into t1 values(22,22,22,'22');
+
+# OR22
+connection con1;
+--echo key read should find record inserted by other
+select * from t1 where pk = 22;
+
+connection con1;
+--send commit
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+connection con1;
+drop table t1;
+

=== added file 'mysql-test/suite/ndb/t/ndb_coord3.test'
--- a/mysql-test/suite/ndb/t/ndb_coord3.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_coord3.test	2012-09-12 13:07:04 +0000
@@ -0,0 +1,40 @@
+-- source include/have_ndb.inc
+
+#
+# Create a normal table with primary key
+#
+CREATE TABLE t1 (
+  pk INT NOT NULL PRIMARY KEY,
+  attr1 INT NOT NULL,
+  attr2 INT,
+  attr3 VARCHAR(10),
+  unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+
+# verify transaction id is NULL before begin
+select @@ndb_transaction_id is NULL;
+
+# verify transaction id is NULL after begin before any transaction operations
+begin;
+select @@ndb_transaction_id is NULL;
+
+# verify transaction id is not null after transactional operation
+select * from t1 where pk = 1;
+select @@ndb_transaction_id is not NULL;
+
+# verify transaction id is NULL after rollback
+rollback;
+select @@ndb_transaction_id is NULL;
+
+# verify transaction id is not null after transactional operation
+begin;
+select * from t1 where pk = 1;
+select @@ndb_transaction_id is not NULL;
+
+# verify transaction id is NULL after commit
+commit;
+select @@ndb_transaction_id is NULL;
+
+drop table t1;
+

=== added file 'mysql-test/suite/ndb/t/ndb_coord4.test'
--- a/mysql-test/suite/ndb/t/ndb_coord4.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_coord4.test	2012-09-12 13:07:04 +0000
@@ -0,0 +1,181 @@
+-- source include/have_ndb.inc
+
+#
+# Create a normal table with primary key
+#
+CREATE TABLE t1 (
+  pk INT NOT NULL PRIMARY KEY,
+  attr1 INT NOT NULL,
+  attr2 INT,
+  attr3 VARCHAR(10),
+  unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+
+connect (con1,localhost,root,,test);
+connect (con2,localhost,root,,test);
+
+# A. insert original, update joined, commit original, commit joined
+
+# insert in original transaction
+
+connection con1;
+begin;
+insert into t1 values(2,2,2,'2');
+let $tid = `select @@ndb_transaction_id`;
+
+# update in joined transaction
+
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+update t1 set attr1 = 3 where pk = 2;
+select * from t1 where pk = 2;
+
+# commit original transaction
+
+connection con1;
+--send commit
+
+# commit joined transaction
+
+connection con2;
+commit;
+
+connection con1;
+--reap
+
+connection con2;
+
+# select in new transaction
+
+select * from t1 where pk = 2;
+
+# B. insert original, update joined, commit joined, commit original
+
+# insert in original transaction
+
+connection con1;
+begin;
+insert into t1 values(3,3,3,'3');
+let $tid = `select @@ndb_transaction_id`;
+
+# update in joined transaction
+
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+update t1 set attr1 = 4 where pk = 3;
+select * from t1 where pk = 3;
+
+# commit joined transaction
+
+connection con2;
+--send commit
+
+# commit original transaction
+
+connection con1;
+commit;
+
+connection con2;
+--reap
+
+connection con1;
+
+# select in new transaction
+
+select * from t1 where pk = 3;
+
+# C. insert original, update joined, roll back original, roll back joined
+
+# insert in original transaction
+
+connection con1;
+begin;
+insert into t1 values(4,4,4,'4');
+let $tid = `select @@ndb_transaction_id`;
+
+# update in joined transaction
+
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+update t1 set attr1 = 5 where pk = 4;
+select * from t1 where pk = 4;
+
+# roll back original transaction
+
+connection con1;
+rollback;
+
+# roll back joined transaction
+
+connection con2;
+rollback;
+
+# select in new transaction no results because insert row 4 was rolled back
+
+select * from t1 where pk = 4;
+
+# D. insert original, update joined, roll back joined, roll back original
+
+# insert in original transaction
+
+connection con1;
+begin;
+insert into t1 values(5,5,5,'5');
+let $tid = `select @@ndb_transaction_id`;
+
+# update in joined transaction
+
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+update t1 set attr1 = 6 where pk = 5;
+select * from t1 where pk = 5;
+
+# roll back joined transaction
+
+connection con2;
+rollback;
+
+# roll back original transaction
+
+connection con1;
+rollback;
+
+# select in new transaction no results because insert row 5 was rolled back
+
+select * from t1 where pk = 5;
+rollback;
+
+#
+#
+#
+delete from t1;
+
+connection con1;
+begin;
+select * from t1 where pk = 1;
+let $tid = `select @@ndb_transaction_id`;
+
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+update t1 set attr1 = 5 where pk = 1;
+
+connection con1;
+update t1 set attr1 = 5 where pk = 1;
+rollback;
+
+connection con2;
+rollback;
+
+drop table t1;
+

=== added file 'mysql-test/suite/ndb/t/ndb_coord_commit_rollback.test'
--- a/mysql-test/suite/ndb/t/ndb_coord_commit_rollback.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_coord_commit_rollback.test	2012-09-12 13:07:04 +0000
@@ -0,0 +1,100 @@
+-- source include/have_ndb.inc
+
+#
+# Create a normal table with primary key
+#
+CREATE TABLE t1 (
+  pk INT NOT NULL PRIMARY KEY,
+  attr1 INT NOT NULL,
+  attr2 INT,
+  attr3 VARCHAR(10),
+  unique(attr2)
+) ENGINE=ndbcluster;
+insert into t1 values(1,1,1,'1');
+
+connect (con1,localhost,root,,test);
+connect (con2,localhost,root,,test);
+
+##
+# insert in original, update in joined, commit original, roll back joined
+#
+connection con1;
+begin;
+insert into t1 values(2,2,2,'2');
+let $tid = `select @@ndb_transaction_id`;
+
+# update in joined transaction
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+update t1 set attr1 = 3 where pk = 2;
+
+# commit original transaction
+connection con1;
+--send commit
+
+# roll back joined transaction
+connection con2;
+rollback;
+
+# expect original to get error from commit
+connection con1;
+--error 1296
+--reap
+
+# verify
+select * from t1 where pk = 2;
+
+##
+# insert in original, update in joined, rollback original, commit joined
+#
+connection con1;
+begin;
+insert into t1 values(2,2,2,'2');
+let $tid = `select @@ndb_transaction_id`;
+
+# update in joined transaction
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+update t1 set attr1 = 3 where pk = 2;
+
+# rollback original transaction
+connection con1;
+rollback;
+
+# commit joined transaction (expect error)
+connection con2;
+--error 1296
+commit;
+
+connection con1;
+
+# verify
+select * from t1 where pk = 2;
+
+#
+# verify that a select for update (even if it finds no row)
+#   is a true participant
+#
+connection con1;
+begin;
+select * from t1 where pk = 1000 for update;
+let $tid = `select @@ndb_transaction_id`;
+
+connection con2;
+begin;
+--replace_result $tid <tid>
+eval set ndb_join_transaction_id='$tid';
+insert into t1 values(2,2,2,'2');
+--send commit
+
+connection con1;
+commit;
+
+connection con2;
+--reap
+
+drop table t1;

=== added file 'mysql-test/suite/ndb/t/ndb_coord_err.test'
--- a/mysql-test/suite/ndb/t/ndb_coord_err.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_coord_err.test	2012-09-12 13:07:04 +0000
@@ -0,0 +1,60 @@
+-- source include/have_ndb.inc
+
+#
+# Create a normal table with primary key
+#
+CREATE TABLE t1 (
+  pk INT NOT NULL PRIMARY KEY,
+  attr1 INT NOT NULL,
+  attr2 INT,
+  attr3 VARCHAR(10),
+  unique(attr2)
+) ENGINE=ndbcluster;
+
+connect (con1,localhost,root,,test);
+connect (con2,localhost,root,,test);
+
+connection con1;
+
+--error 1210
+set ndb_join_transaction_id='kalleanka';
+show warnings;
+
+--error 1210
+set ndb_join_transaction_id='99:99:99:99:99';
+show warnings;
+
+begin;
+insert into t1 values(1,1,1,'a');
+let $tid = `select @@ndb_transaction_id`;
+
+disable_query_log;
+--error 1210
+eval set ndb_join_transaction_id='$tid';
+# warning message include transid which is unstable...
+# so skip show warnings
+#
+# show warnings;
+enable_query_log;
+
+connection con2;
+begin;
+disable_query_log;
+# ok...(before we started trans (first op))
+eval set ndb_join_transaction_id='$tid';
+enable_query_log;
+
+disable_query_log;
+# ok...(before we started trans (first op))
+eval set ndb_join_transaction_id='$tid';
+enable_query_log;
+
+# ok...(before we started trans (first op))
+set ndb_join_transaction_id=NULL;
+
+select @@ndb_join_transaction_id;
+
+connection con1;
+rollback;
+
+drop table t1;

=== modified file 'mysql-test/suite/ndb/t/ndbinfo.test'
--- a/mysql-test/suite/ndb/t/ndbinfo.test	2011-12-01 09:10:51 +0000
+++ b/mysql-test/suite/ndb/t/ndbinfo.test	2012-09-12 13:07:04 +0000
@@ -218,7 +218,7 @@ insert into t1 values (1);
 select state, count_operations, outstanding_operations,
 IF(client_node_id <= 255, "<client_node_id>", "<incorrect node id>") 
   client_node_id
-from server_transactions;
+from server_transactions order by 1,2,3,4;
 select node_id, operation_type, state,
 IF(tc_node_id <= 48, "<tc_node_id>", "<incorrect nodeid>") tc_node_id,
 IF(client_node_id <= 255, "<client_node_id>", "<incorrect node id>") 

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2012-06-13 18:49:49 +0000
+++ b/sql/ha_ndbcluster.cc	2012-09-12 13:07:04 +0000
@@ -70,6 +70,35 @@ static char* opt_ndb_index_stat_option;
 static char* opt_ndb_connectstring;
 static uint opt_ndb_nodeid;
 
+static int join_transaction_id_var_check_func(MYSQL_THD thd,
+                                              struct st_mysql_sys_var *,
+                                              void *, struct st_mysql_value *);
+static void join_transaction_id_var_update_func(MYSQL_THD thd,
+                                                struct st_mysql_sys_var *,
+                                                void *, const void *);
+
+/* The value of this variable will always be NULL */
+/* or a pointer to thd_ndb->m_join_transaction_id */
+static MYSQL_THDVAR_STR(
+  join_transaction_id,                 /* name */
+  PLUGIN_VAR_NOCMDOPT,
+  "Join an existing ndb transaction.",
+  join_transaction_id_var_check_func,  /* use special check func. */
+  join_transaction_id_var_update_func, /* use special update func. */
+  NULL                                 /* default value */
+);
+
+/* The value of this variable will always be NULL */
+/* or a pointer to thd_ndb->m_transaction_id */
+static MYSQL_THDVAR_STR(
+  transaction_id,                    /* name */
+  PLUGIN_VAR_READONLY | PLUGIN_VAR_NOCMDOPT,
+  "Get the current ndb transaction.",
+  NULL,                              /* check func. */
+  NULL,                              /* update func. */
+  NULL                               /* default value */
+);
+
 static MYSQL_THDVAR_UINT(
   autoincrement_prefetch_sz,         /* name */
   PLUGIN_VAR_RQCMDARG,
@@ -1188,6 +1217,8 @@ Thd_ndb::Thd_ndb()
   global_schema_lock_count= 0;
   global_schema_lock_error= 0;
   init_alloc_root(&m_batch_mem_root, BATCH_FLUSH_SIZE/4, 0);
+  bzero(m_transaction_id, sizeof(m_transaction_id));
+  bzero(m_join_transaction_id, sizeof(m_join_transaction_id));
 }
 
 Thd_ndb::~Thd_ndb()
@@ -3855,7 +3886,7 @@ int ha_ndbcluster::full_table_scan(const
   bool use_set_part_id= FALSE;
   NdbOperation::GetValueSpec gets[2];
 
-  DBUG_ENTER("full_table_scan");  
+  DBUG_ENTER("ha_ndbcluster::full_table_scan");
   DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
 
   if (m_use_partition_pruning && m_user_defined_partitioning)
@@ -7465,6 +7496,10 @@ int ha_ndbcluster::external_lock(THD *th
           thd_ndb->ndb->closeTransaction(thd_ndb->trans);
           thd_ndb->trans= NULL;
           thd_ndb->m_handler= NULL;
+          // reset the coordinated transaction
+          DBUG_PRINT("jointx", ("Commit is resetting transaction id."));
+          THDVAR(thd, join_transaction_id) = NULL;
+          THDVAR(thd, transaction_id) = NULL;
         }
       }
     }
@@ -7560,6 +7595,8 @@ ha_ndbcluster::start_transaction_row(con
 {
   NdbTransaction *trans;
   DBUG_ENTER("ha_ndbcluster::start_transaction_row");
+  THD *thd = current_thd;
+  char * buffer = THDVAR(thd, join_transaction_id);
   DBUG_ASSERT(m_thd_ndb);
   DBUG_ASSERT(m_thd_ndb->trans == NULL);
 
@@ -7567,6 +7604,13 @@ ha_ndbcluster::start_transaction_row(con
 
   Ndb *ndb= m_thd_ndb->ndb;
 
+  // if coordinated transaction id is set (by client) use it now
+  if (buffer)
+  {
+    DBUG_RETURN(join_transaction(thd, ndb, buffer, error));
+  }
+
+  // no transaction to join
   Uint64 tmp[(MAX_KEY_SIZE_IN_WORDS*MAX_XFRM_MULTIPLY) >> 1];
   char *buf= (char*)&tmp[0];
   trans= ndb->startTransaction(ndb_record,
@@ -7575,6 +7619,8 @@ ha_ndbcluster::start_transaction_row(con
 
   if (trans)
   {
+    set_coordinated_transaction_id(thd, m_thd_ndb, trans);
+
     m_thd_ndb->m_transaction_hint_count[trans->getConnectedNodeId()]++;
     DBUG_PRINT("info", ("Delayed allocation of TC"));
     DBUG_RETURN(m_thd_ndb->trans= trans);
@@ -7591,12 +7637,22 @@ ha_ndbcluster::start_transaction_key(uin
 {
   NdbTransaction *trans;
   DBUG_ENTER("ha_ndbcluster::start_transaction_key");
+  THD* thd = current_thd;
+  char * buffer = THDVAR(thd, join_transaction_id);
   DBUG_ASSERT(m_thd_ndb);
   DBUG_ASSERT(m_thd_ndb->trans == NULL);
 
   transaction_checks(table->in_use, m_thd_ndb);
 
   Ndb *ndb= m_thd_ndb->ndb;
+
+  // if join transaction id is set (by client) use it now
+  if (buffer)
+  {
+    DBUG_RETURN(join_transaction(thd, ndb, buffer, error));
+  }
+
+  // no transaction to join
   const NdbRecord *key_rec= m_index[inx_no].ndb_unique_record_key;
 
   Uint64 tmp[(MAX_KEY_SIZE_IN_WORDS*MAX_XFRM_MULTIPLY) >> 1];
@@ -7607,6 +7663,8 @@ ha_ndbcluster::start_transaction_key(uin
 
   if (trans)
   {
+    set_coordinated_transaction_id(thd, m_thd_ndb, trans);
+
     m_thd_ndb->m_transaction_hint_count[trans->getConnectedNodeId()]++;
     DBUG_PRINT("info", ("Delayed allocation of TC"));
     DBUG_RETURN(m_thd_ndb->trans= trans);
@@ -7621,15 +7679,27 @@ ha_ndbcluster::start_transaction(int &er
 {
   NdbTransaction *trans;
   DBUG_ENTER("ha_ndbcluster::start_transaction");
+  THD* thd = current_thd;
+  char * buffer = THDVAR(thd, join_transaction_id);
 
   DBUG_ASSERT(m_thd_ndb);
   DBUG_ASSERT(m_thd_ndb->trans == NULL);
 
   transaction_checks(table->in_use, m_thd_ndb);
+  // if join transaction id is set (by client) use it now
+  if (buffer)
+  {
+    Ndb *ndb = m_thd_ndb->ndb;
+    DBUG_RETURN(join_transaction(thd, ndb, buffer, error));
+  }
+
+  // no transaction to join
   const uint opti_node_select= THDVAR(table->in_use, optimized_node_selection);
   m_thd_ndb->connection->set_optimized_node_selection(opti_node_select & 1);
   if ((trans= m_thd_ndb->ndb->startTransaction()))
   {
+    set_coordinated_transaction_id(thd, m_thd_ndb, trans);
+
     m_thd_ndb->m_transaction_no_hint_count[trans->getConnectedNodeId()]++;
     DBUG_PRINT("info", ("Delayed allocation of TC"));
     DBUG_RETURN(m_thd_ndb->trans= trans);
@@ -7644,13 +7714,25 @@ ha_ndbcluster::start_transaction_part_id
 {
   NdbTransaction *trans;
   DBUG_ENTER("ha_ndbcluster::start_transaction_part_id");
+  THD* thd = current_thd;
+  char * buffer = THDVAR(thd, join_transaction_id);
 
   DBUG_ASSERT(m_thd_ndb);
   DBUG_ASSERT(m_thd_ndb->trans == NULL);
 
   transaction_checks(table->in_use, m_thd_ndb);
+  // if join transaction id is set (by client) use it now
+  if (buffer)
+  {
+    Ndb *ndb = m_thd_ndb->ndb;
+    DBUG_RETURN(join_transaction(thd, ndb, buffer, error));
+  }
+
+  // no coordinated transaction in progress
   if ((trans= m_thd_ndb->ndb->startTransaction(m_table, part_id)))
   {
+    set_coordinated_transaction_id(thd, m_thd_ndb, trans);
+
     m_thd_ndb->m_transaction_hint_count[trans->getConnectedNodeId()]++;
     DBUG_PRINT("info", ("Delayed allocation of TC"));
     DBUG_RETURN(m_thd_ndb->trans= trans);
@@ -7825,6 +7907,8 @@ int ndbcluster_commit(handlerton *hton,
   ndb->closeTransaction(trans);
   thd_ndb->trans= NULL;
   thd_ndb->m_handler= NULL;
+  THDVAR(thd, join_transaction_id) = NULL;
+  THDVAR(thd, transaction_id) = NULL;
 
   /* Clear commit_count for tables changed by transaction */
   NDB_SHARE* share;
@@ -7883,6 +7967,8 @@ static int ndbcluster_rollback(handlerto
     my_error(ER_WARN_ENGINE_TRANSACTION_ROLLBACK, MYF(0), "NDB");
     DBUG_RETURN(0);
   }
+  // reset the coordinated transaction only if real rollback
+  THDVAR(thd, join_transaction_id) = NULL;
   thd_ndb->save_point_count= 0;
   if (thd->slave_thread)
     g_ndb_slave_state.atTransactionAbort();
@@ -7901,6 +7987,8 @@ static int ndbcluster_rollback(handlerto
   ndb->closeTransaction(trans);
   thd_ndb->trans= NULL;
   thd_ndb->m_handler= NULL;
+  THDVAR(thd, join_transaction_id) = NULL;
+  THDVAR(thd, transaction_id) = NULL;
 
   /* Clear list of tables changed by transaction */
   NDB_SHARE* share;
@@ -13374,6 +13462,8 @@ ndb_get_table_statistics(THD *thd, ha_nd
     pOp->close(TRUE);
 
     ndb->closeTransaction(pTrans);
+    THDVAR(thd, join_transaction_id) = NULL;
+    THDVAR(thd, transaction_id) = NULL;
 
     ndbstat->row_count= sum_rows;
     ndbstat->commit_count= sum_commits;
@@ -13416,6 +13506,8 @@ retry:
     {
       ndb->closeTransaction(pTrans);
       pTrans= NULL;
+      THDVAR(thd, join_transaction_id) = NULL;
+      THDVAR(thd, transaction_id) = NULL;
     }
     if (error.status == NdbError::TemporaryError &&
         retries-- && !thd->killed)
@@ -16799,6 +16891,8 @@ static struct st_mysql_sys_var* system_v
   MYSQL_SYSVAR(nodeid),
   MYSQL_SYSVAR(blob_read_batch_bytes),
   MYSQL_SYSVAR(blob_write_batch_bytes),
+  MYSQL_SYSVAR(join_transaction_id),
+  MYSQL_SYSVAR(transaction_id),
   MYSQL_SYSVAR(deferred_constraints),
 #ifndef DBUG_OFF
   MYSQL_SYSVAR(dbg_check_shares),
@@ -16835,4 +16929,131 @@ ndbinfo_plugin, /* ndbinfo plugin */
 i_s_ndb_transid_mysql_connection_map_plugin
 mysql_declare_plugin_end;
 
+
+/**
+ * coordinated transaction stuff
+ */
+NdbTransaction *
+ha_ndbcluster::join_transaction(THD * thd, Ndb *ndb, char * buffer,
+                                int & error)
+{
+  DBUG_ENTER("ha_ndbcluster::join_transaction");
+  DBUG_PRINT("jointx", ("join_transaction_id: %s", buffer));
+
+  NdbTransaction *trans = ndb->joinTransaction(buffer);
+  if (trans)
+  {
+    THDVAR(thd, transaction_id) = buffer;
+    m_thd_ndb->m_transaction_hint_count[trans->getConnectedNodeId()]++;
+    DBUG_RETURN(m_thd_ndb->trans= trans);
+  }
+
+  ERR_SET(m_thd_ndb->ndb->getNdbError(), error);
+  DBUG_RETURN(NULL);
+}
+
+void
+ha_ndbcluster::set_coordinated_transaction_id(THD * thd,
+                                              Thd_ndb * dst,
+                                              const NdbTransaction * trans)
+{
+  DBUG_ENTER("ha_ndbcluster::set_coordinated_transaction_id");
+  // set the transaction id into the session variable
+  trans->getCoordinatedTransactionId(dst->m_transaction_id,
+                                     sizeof(dst->m_transaction_id));
+  THDVAR(thd, transaction_id) = dst->m_transaction_id;
+
+  DBUG_PRINT("jointx", ("new transaction_id is: %s",
+                        THDVAR(thd, transaction_id)));
+  DBUG_VOID_RETURN;
+}
+
+static
+int
+join_transaction_id_var_check_func(MYSQL_THD thd,
+                                   struct st_mysql_sys_var *var,
+                                   void *save,
+                                   struct st_mysql_value *value)
+{
+  DBUG_ENTER("ha_ndbcluster::join_transaction_id_var_check_func");
+  // allocate a Thd_ndb and an Ndb since we're going to need them
+  Ndb *ndb= check_ndb_in_thd(thd);
+  if (!ndb)
+  {
+    DBUG_RETURN(HA_ERR_NO_CONNECTION);
+  }
+  char *transaction_id = THDVAR(thd, transaction_id);
+  if (transaction_id)
+  {
+    DBUG_PRINT("jointx",
+               ("error: transaction_id variable already set to %s; "
+                "transaction already started.", transaction_id));
+    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                        ER_LOCK_OR_ACTIVE_TRANSACTION,
+                        "transaction_id variable already set to %s; "
+                        "transaction already started.", transaction_id);
+    DBUG_RETURN(ER_LOCK_OR_ACTIVE_TRANSACTION);
+  }
+  int length;
+  // get the length and pointer to the proposed value provided by the user
+  const char *join_transaction_id= value->val_str(value, NULL, &length);
+  DBUG_PRINT("jointx", ("join_transaction_id length: %d value: %s",
+    length, join_transaction_id));
+  // if value is null or is an empty string reset the join_transaction_id
+  if ((!join_transaction_id) || (length == 0))
+  {
+    *((char**)save)= NULL;
+    DBUG_RETURN(0);
+  }
+
+  // check to make sure the join transaction value is legal
+  // decompose the coordinatedTransactionIdentifier into nodeId, tcConPtr
+  // and transId
+  Uint32 scannedNodeId = 0;
+  Uint32 scannedInstance = 0;
+  Uint32 scannedTCConPtr = 0;
+  Uint64 scannedTransId = 0;
+  NdbError errObj;
+
+  int err= Ndb::scanJoinTransactionId(join_transaction_id,
+                                      scannedNodeId, scannedInstance,
+                                      scannedTCConPtr, scannedTransId,
+                                      errObj);
+  DBUG_PRINT("jointx",
+             ("scan of join_transaction_id got %d numbers: %x, %x, %x and %llx",
+              err, scannedNodeId, scannedInstance,
+	      scannedTCConPtr, scannedTransId));
+
+  if (err != 0)
+  {
+    // join_transaction_id is not valid
+    push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
+                        errObj.mysql_code,
+                        errObj.message);
+    DBUG_RETURN(errObj.mysql_code);
+  }
+
+  // get the Thd_ndb
+  Thd_ndb *thd_ndb= get_thd_ndb(thd);
+  // value is has the right format;
+  // copy it to the Thd_ndb; length has already been checked
+  strcpy(thd_ndb->m_join_transaction_id, join_transaction_id);
+  // save it for the update function
+  *((char**)save)= thd_ndb->m_join_transaction_id;
+  DBUG_RETURN(0);
+}
+
+static
+void
+join_transaction_id_var_update_func(MYSQL_THD thd,
+                                    struct st_mysql_sys_var *var,
+                                    void *var_ptr, const void *save)
+{
+  DBUG_ENTER("ha_ndbcluster::join_transaction_id_var_update_func");
+  DBUG_PRINT("jointx", ("saved join_transaction_id %s.", *((char**)save)));
+  // save the pointer to thd_ndb->m_join_transaction_id in the variable
+  *((char**)var_ptr)= *((char**)save);
+  DBUG_VOID_RETURN;
+}
+
 #endif

=== modified file 'sql/ha_ndbcluster.h'
--- a/sql/ha_ndbcluster.h	2012-06-07 09:06:47 +0000
+++ b/sql/ha_ndbcluster.h	2012-09-12 13:07:04 +0000
@@ -331,6 +331,8 @@ class Thd_ndb
   unsigned m_connect_count;
   bool valid_ndb(void);
   bool recycle_ndb(THD* thd);
+  char m_join_transaction_id[NDBAPI_MAX_COORD_TRANSACTION_ID_SIZE];
+  char m_transaction_id[NDBAPI_MAX_COORD_TRANSACTION_ID_SIZE];
 };
 
 struct st_ndb_status {
@@ -768,6 +770,10 @@ private:
                                         const uchar *key_data,
                                         int &error);
 
+  NdbTransaction *join_transaction(THD *, Ndb*, char * buffer, int & error);
+  void set_coordinated_transaction_id(THD*, Thd_ndb* dst,
+                                      const NdbTransaction* src);
+
   friend int check_completed_operations_pre_commit(Thd_ndb*,
                                                    NdbTransaction*,
                                                    const NdbOperation*,

=== modified file 'storage/ndb/include/kernel/signaldata/ScanTab.hpp'
--- a/storage/ndb/include/kernel/signaldata/ScanTab.hpp	2012-06-21 12:33:15 +0000
+++ b/storage/ndb/include/kernel/signaldata/ScanTab.hpp	2012-09-12 13:07:04 +0000
@@ -89,6 +89,7 @@ private:
    * Optional
    */
   Uint32 distributionKey;
+  Uint32 coordinatedConnectPtr;
 
   /**
    * Get:ers for requestInfo
@@ -107,6 +108,7 @@ private:
   static Uint32 getViaSPJFlag(const Uint32 & requestInfo);
   static Uint32 getPassAllConfsFlag(const Uint32 & requestInfo);
   static Uint32 get4WordConf(const Uint32&);
+  static UintR getCoordinatedTransactionFlag(const UintR & requestInfo);
 
   /**
    * Set:ers for requestInfo
@@ -126,6 +128,7 @@ private:
   static void setViaSPJFlag(Uint32 & requestInfo, Uint32 val);
   static void setPassAllConfsFlag(Uint32 & requestInfo, Uint32 val);
   static void set4WordConf(Uint32 & requestInfo, Uint32 val);
+  static void setCoordinatedTransactionFlag(UintR & requestInfo, UintR val);
 };
 
 /**
@@ -156,10 +159,11 @@ private:
  j = Via SPJ flag          - 1  Bit 27
  a = Pass all confs flag   - 1  Bit 28
  f = 4 word conf           - 1  Bit 29
+ c = Coordinated Tx   flag - 1  Bit 30
 
            1111111111222222222233
  01234567890123456789012345678901
- pppppppplnhcktzxbbbbbbbbbbdjaf
+ pppppppplnhcktzxbbbbbbbbbbdjafc
 */
 
 #define PARALLEL_SHIFT     (0)
@@ -199,6 +203,9 @@ private:
 #define SCAN_PASS_CONF_SHIFT (28)
 #define SCAN_4WORD_CONF_SHIFT (29)
 
+#define SCAN_COORDINATED_SHIFT (30)
+#define SCAN_COORDINATED_MASK  (1)
+
 inline
 Uint8
 ScanTabReq::getParallelism(const UintR & requestInfo){
@@ -398,6 +405,21 @@ ScanTabReq::set4WordConf(UintR & request
   requestInfo |= (flag << SCAN_4WORD_CONF_SHIFT);
 }
 
+inline
+UintR
+ScanTabReq::getCoordinatedTransactionFlag(const UintR & requestInfo){
+  return (requestInfo >> SCAN_COORDINATED_SHIFT) & SCAN_COORDINATED_MASK;
+}
+
+inline
+void
+ScanTabReq::setCoordinatedTransactionFlag(UintR & requestInfo, Uint32 flag){
+  ASSERT_BOOL(flag, "TcKeyReq::setCoordFlag");
+  requestInfo= (requestInfo & ~(SCAN_COORDINATED_MASK << SCAN_COORDINATED_SHIFT)) |
+               ((flag & SCAN_COORDINATED_MASK) << SCAN_COORDINATED_SHIFT);
+}
+
+
 /**
  * 
  * SENDER:  Dbtc
@@ -405,9 +427,9 @@ ScanTabReq::set4WordConf(UintR & request
  */
 class ScanTabConf {
   /**
-   * Reciver(s) 
+   * Receiver(s)
    */
-  friend class NdbTransaction;         // Reciver
+  friend class NdbTransaction;         // Receiver
 
   /**
    * Sender(s)

=== modified file 'storage/ndb/include/kernel/signaldata/SignalData.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalData.hpp	2012-05-21 22:27:28 +0000
+++ b/storage/ndb/include/kernel/signaldata/SignalData.hpp	2012-09-12 13:07:04 +0000
@@ -325,4 +325,7 @@ GSN_PRINT_SIGNATURE(printLCP_STATUS_REQ)
 GSN_PRINT_SIGNATURE(printLCP_STATUS_CONF);
 GSN_PRINT_SIGNATURE(printLCP_STATUS_REF);
 
+GSN_PRINT_SIGNATURE(printTC_COMMITREQ);
+GSN_PRINT_SIGNATURE(printTCROLLBACKREQ);
+
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/TcCommit.hpp'
--- a/storage/ndb/include/kernel/signaldata/TcCommit.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/TcCommit.hpp	2012-09-12 13:07:04 +0000
@@ -22,6 +22,25 @@
 #include "SignalData.hpp"
 
 /**
+ * This is signal is sent from API to TC
+ *   requesting a transaction commit (or leave)
+ */
+struct TcCommitReq
+{
+  Uint32 apiConnectPtr;
+  Uint32 transId1;
+  Uint32 transId2;
+  Uint32 flags; // 0 == normal, 1 == commit or leave
+
+  enum
+  {
+    COMMIT_OR_LEAVE = 0x1
+  };
+
+  STATIC_CONST( SignalLength = 4 );
+};
+
+/**
  * This is signal is sent from TC to API
  * It means that the transaction was committed
  */

=== modified file 'storage/ndb/include/kernel/signaldata/TcKeyConf.hpp'
--- a/storage/ndb/include/kernel/signaldata/TcKeyConf.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/TcKeyConf.hpp	2012-09-12 13:07:04 +0000
@@ -82,6 +82,7 @@ private:
   static Uint32 getNoOfOperations(const Uint32 & confInfo);
   static Uint32 getCommitFlag(const Uint32 & confInfo);
   static bool getMarkerFlag(const Uint32 & confInfo);
+  static Uint32 getLeaveFlag(const Uint32 & confInfo);
   
   /**
    * Set:ers for confInfo
@@ -89,6 +90,7 @@ private:
   static void setCommitFlag(Uint32 & confInfo, Uint8 flag);
   static void setNoOfOperations(Uint32 & confInfo, Uint32 noOfOps);
   static void setMarkerFlag(Uint32 & confInfo, Uint32 flag);
+  static void setLeaveFlag(Uint32 & confInfo, Uint32 flag);
 };
 
 inline
@@ -131,4 +133,20 @@ TcKeyConf::setMarkerFlag(Uint32 & confIn
   confInfo |= (flag << 17);
 }
 
+inline
+Uint32
+TcKeyConf::getLeaveFlag(const Uint32 & confInfo)
+{
+  return (confInfo >> 18) & 1;
+}
+
+inline
+void
+TcKeyConf::setLeaveFlag(Uint32 & confInfo, Uint32 flag)
+{
+  ASSERT_BOOL(flag, "TcKeyConf::setLeaveFlag");
+  confInfo |= (flag << 18);
+}
+
+
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/TcKeyReq.hpp'
--- a/storage/ndb/include/kernel/signaldata/TcKeyReq.hpp	2011-09-05 17:12:17 +0000
+++ b/storage/ndb/include/kernel/signaldata/TcKeyReq.hpp	2012-09-12 13:07:04 +0000
@@ -217,6 +217,12 @@ private:
   static void setDeferredConstraints(UintR & requestInfo, UintR val);
 
   /**
+   * Commit or leave
+   */
+  static UintR getCommitOrLeaveFlag(const UintR & requestInfo);
+  static void setCommitOrLeaveFlag(UintR & requestInfo, UintR val);
+
+  /**
    * Set:ers for scanInfo
    */
   static void setTakeOverScanFlag(UintR & scanInfo, Uint8 flag);
@@ -248,11 +254,13 @@ private:
  x = Coordinated Tx flag   - 1  Bit 16
  q = Queue on redo problem - 1  Bit 9
  D = deferred constraint   - 1  Bit 17
+ L = Commit or Leave       - 1 Bit 18
+     (only valid if Commit is set)
 
            1111111111222222222233
  01234567890123456789012345678901
  dnb cooop lsyyeiaaarkkkkkkkkkkkk  (Short TCKEYREQ)
- dnbvcooopqlsyyeixD r              (Long TCKEYREQ)
+ dnbvcooopqlsyyeixDLr              (Long TCKEYREQ)
 */
 
 #define TCKEY_NODISK_SHIFT (1)
@@ -283,6 +291,7 @@ private:
 
 #define TC_COORDINATED_SHIFT (16)
 #define TC_DEFERRED_CONSTAINTS_SHIFT (17)
+#define TC_COMMIT_OR_LEAVE_SHIFT (18)
 
 /**
  * Scan Info
@@ -653,5 +662,18 @@ TcKeyReq::getDeferredConstraints(const U
   return (requestInfo >> TC_DEFERRED_CONSTAINTS_SHIFT) & 1;
 }
 
+inline
+void
+TcKeyReq::setCommitOrLeaveFlag(UintR & requestInfo, UintR val){
+  ASSERT_BOOL(val, "TcKeyReq::setCommitOrLeaveFlag");
+  requestInfo |= (val << TC_COMMIT_OR_LEAVE_SHIFT);
+}
+
+inline
+UintR
+TcKeyReq::getCommitOrLeaveFlag(const UintR & requestInfo){
+  return (requestInfo >> TC_COMMIT_OR_LEAVE_SHIFT) & 1;
+}
+
 
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/TcRollbackRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/TcRollbackRep.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/kernel/signaldata/TcRollbackRep.hpp	2012-09-12 13:07:04 +0000
@@ -21,6 +21,21 @@
 
 #include "SignalData.hpp"
 
+struct TcRollbackReq
+{
+  Uint32 apiConnectPtr;
+  Uint32 transId1;
+  Uint32 transId2;
+  Uint32 flags; // 1 = maybe bad
+
+  enum
+  {
+    MAYBE_BAD = 0x1
+  };
+
+  STATIC_CONST( SignalLength = 4 );
+};
+
 class TcRollbackRep {
   /**
    * Sender(s)

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2012-08-24 11:46:00 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2012-09-12 13:07:04 +0000
@@ -809,4 +809,48 @@ ndb_join_pushdown(Uint32 x)
   return x >= NDBD_JOIN_PUSHDOWN;
 }
 
+#define NDBD_SCAN_START_TRANS_70 NDB_MAKE_VERSION(7,0,36)
+#define NDBD_SCAN_START_TRANS_71 NDB_MAKE_VERSION(7,1,24)
+#define NDBD_SCAN_START_TRANS_72 NDB_MAKE_VERSION(7,2,8)
+
+static
+inline
+int
+ndbd_scan_start_trans(Uint32 x)
+{
+  const Uint32 major = (x >> 16) & 0xFF;
+  const Uint32 minor = (x >>  8) & 0xFF;
+
+  if (major == 7 && minor < 2)
+  {
+    if (minor == 0)
+      return x >= NDBD_SCAN_START_TRANS_70;
+    else if (minor == 1)
+      return x >= NDBD_SCAN_START_TRANS_71;
+  }
+  return x >= NDBD_SCAN_START_TRANS_72;
+}
+
+#define NDBD_COORDINATED_TRANS_VERSION_70 NDB_MAKE_VERSION(7,0,36)
+#define NDBD_COORDINATED_TRANS_VERSION_71 NDB_MAKE_VERSION(7,1,24)
+#define NDBD_COORDINATED_TRANS_VERSION_72 NDB_MAKE_VERSION(7,2,8)
+
+static
+inline
+int
+ndbd_coordinated_transactions(Uint32 x)
+{
+  const Uint32 major = (x >> 16) & 0xFF;
+  const Uint32 minor = (x >>  8) & 0xFF;
+
+  if (major == 7 && minor < 2)
+  {
+    if (minor == 0)
+      return x >= NDBD_COORDINATED_TRANS_VERSION_70;
+    else if (minor == 1)
+      return x >= NDBD_COORDINATED_TRANS_VERSION_71;
+  }
+  return x >= NDBD_COORDINATED_TRANS_VERSION_72;
+}
+
 #endif

=== modified file 'storage/ndb/include/ndbapi/Ndb.hpp'
--- a/storage/ndb/include/ndbapi/Ndb.hpp	2012-06-07 14:49:35 +0000
+++ b/storage/ndb/include/ndbapi/Ndb.hpp	2012-09-12 13:07:04 +0000
@@ -1480,6 +1480,14 @@ public:
                                    Uint32 partitionId);
 
   /**
+   * Join an existing transaction that is already active on a node.
+   * Use the coordinated transaction identifier obtained from the
+   * existing transaction via
+   * NdbTransaction::getCoordinatedTransactionIdentifier.
+   */
+  NdbTransaction* joinTransaction(const char* coordinatedTransactionIdentifier);
+
+  /**
    * Compute distribution hash value given table/keys
    *
    * @param  hashvalueptr - OUT, is set to hashvalue if return value is 0
@@ -1503,6 +1511,13 @@ public:
                          const NdbDictionary::Table*, 
                          const struct Key_part_ptr * keyData,
                          void* xfrmbuf = 0, Uint32 xfrmbuflen = 0);
+
+  static int scanJoinTransactionId(const char *input,
+                                   Uint32 &nodeId,
+                                   Uint32 &instance,
+                                   Uint32 &apiConnect,
+                                   Uint64 &transid,
+                                   NdbError& errObj);
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
   static int computeHash(Uint32* hashvalueptr,
                          const NdbRecord *keyRec, const char *keyData,
@@ -1846,6 +1861,11 @@ private:
   NdbTransaction*  startTransactionLocal(Uint32 aPrio, Uint32 aNode,
                                          Uint32 instance);
 
+  NdbTransaction*  startTransactionLocal(Uint32 aPrio,
+                                         Uint32 aNodeId, Uint32 instance,
+                                         Uint64 aTransactionId,
+                                         Uint32 coordConnectPtr);
+
 // Connect the connection object to the Database.
   int NDB_connect(Uint32 tNode, Uint32 instance);
   NdbTransaction* doConnect(Uint32 nodeId, Uint32 instance);

=== modified file 'storage/ndb/include/ndbapi/NdbScanOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp	2012-06-13 12:28:41 +0000
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp	2012-09-12 13:07:04 +0000
@@ -587,6 +587,7 @@ protected:
   int doSendScan(int ProcessorId);
   int finaliseScanOldApi();
   int prepareSendScan(Uint32 TC_ConnectPtr, Uint64 TransactionId);
+  int setCoordinatedConnectPtr(Uint32 connectPtr);
   
   int fix_receivers(Uint32 parallel);
   void reset_receivers(Uint32 parallel, Uint32 ordered);

=== modified file 'storage/ndb/include/ndbapi/NdbTransaction.hpp'
--- a/storage/ndb/include/ndbapi/NdbTransaction.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/ndbapi/NdbTransaction.hpp	2012-09-12 13:07:04 +0000
@@ -65,7 +65,8 @@ enum ExecType {
   Prepare,
   NoCommit,
   Commit,
-  Rollback
+  Rollback,
+  CommitOrLeave  // leave a coordinated transaction, or if last commit
 };
 #endif
 
@@ -191,6 +192,11 @@ public:
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
     = ::Rollback
 #endif
+    ,CommitOrLeave              ///< If part of joined transction, leave it
+                                ///< else commit
+#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
+    = ::CommitOrLeave
+#endif
   };
 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
@@ -506,6 +512,17 @@ public:
 #endif
   void close();
 
+  /**
+   * Compute the transaction id that can be used to
+   * join this transaction from a different client node.
+   * See Ndb::joinTransaction.
+   * @param  buffer a buffer of at least length MAX_JOIN_TRANSACTION_ID_SIZE
+   * @param length the length of the buffer
+   * @return a string representing the transaction
+   *         null if length is less than required
+   */
+  const char* getCoordinatedTransactionId(char* buffer, int length) const;
+
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
   /**
    * Restart transaction
@@ -1204,6 +1221,8 @@ private:
   NdbTransaction(const NdbTransaction&); // Not impl.
   NdbTransaction&operator=(const NdbTransaction&);
 
+  void scanOpStarted(); // called by scan on main trans when starting
+
   // Query operation (aka multicursor)
   NdbQueryImpl* m_firstQuery;        // First query in defining list.
   NdbQueryImpl* m_firstExecQuery;    // First query to send for execution
@@ -1216,6 +1235,33 @@ private:
   NdbQueryImpl* m_scanningQuery;
 
   Uint32 m_tcRef;
+
+  /**
+   * Flags
+   */
+  enum Flags
+  {
+    /**
+     * Has a scan started transaction so that it does not need start-flag in
+     *   tc{key/idx}req
+     */
+    SCAN_STARTED = 0x1
+
+    /**
+     * Commit OR_LEAVE
+     *   for coordinated transactions,
+     *   leave (iff not last)
+     */
+    ,OR_LEAVE = 0x2
+  };
+  Uint32 m_flags;
+
+  /**
+   * Iff coordinated
+   */
+  Uint32 m_coordinatedConnectPtr;
+
+  // ADD LAST FOR ABI COMPABILITY
 };
 
 #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL

=== modified file 'storage/ndb/include/ndbapi/ndbapi_limits.h'
--- a/storage/ndb/include/ndbapi/ndbapi_limits.h	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/include/ndbapi/ndbapi_limits.h	2012-09-12 13:07:04 +0000
@@ -31,4 +31,15 @@
 /* TUP ZATTR_BUFFER_SIZE 16384 (minus 1) minus place for getValue()s */
 #define NDB_MAX_SCANFILTER_SIZE_IN_WORDS (16384 - 1 - 1024)
 
+/**
+ * Max number of characters in transaction id
+ * Id is string made up of Hex ASCII with : separators
+ * and a terminating null character
+ */
+#define NDBAPI_MAX_COORD_TRANSACTION_ID_SIZE ((5 * 8) + 4 + /* null */ 1)
+
+#define NDBAPI_COORD_TRANSACTION_ID_FORMAT "%x:%x:%x:%x:%x"
+
+#define NDBAPI_COORD_TRANSACTION_ID_FORMAT_PRINT "%%x:%%x:%%x:%%x:%%x"
+
 #endif

=== modified file 'storage/ndb/src/common/debugger/signaldata/Makefile.am'
--- a/storage/ndb/src/common/debugger/signaldata/Makefile.am	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/Makefile.am	2012-09-12 13:07:04 +0000
@@ -20,6 +20,7 @@ noinst_LTLIBRARIES = libsignaldataprint.
 libsignaldataprint_la_SOURCES = \
           TcKeyReq.cpp TcKeyConf.cpp TcKeyRef.cpp \
 	  TcRollbackRep.cpp \
+          TcCommit.cpp \
           TupKey.cpp TupCommit.cpp LqhKey.cpp \
           FsOpenReq.cpp FsCloseReq.cpp FsRef.cpp FsConf.cpp FsReadWriteReq.cpp\
           SignalDataPrint.cpp SignalNames.cpp \

=== modified file 'storage/ndb/src/common/debugger/signaldata/ScanTab.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/ScanTab.cpp	2011-09-29 11:31:28 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/ScanTab.cpp	2012-09-12 13:07:04 +0000
@@ -47,7 +47,13 @@ printSCANTABREQ(FILE * output, const Uin
   
   if(sig->getDistributionKeyFlag(requestInfo))
     fprintf(output, " DKey: %x", sig->distributionKey);
-  
+
+  if(sig->getCoordinatedTransactionFlag(requestInfo))
+  {
+    Uint32 offset = (sig->getDistributionKeyFlag(requestInfo) ? 1 : 0);
+    fprintf(output, " JoinTCRec: 0x%x", (&sig->distributionKey)[offset]);
+  }
+
   Uint32 keyLen = (sig->attrLenKeyLen >> 16);
   Uint32 attrLen = (sig->attrLenKeyLen & 0xFFFF);
   fprintf(output, " attrLen: %d, keyLen: %d tableId: %d, tableSchemaVer: %d\n",

=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp	2012-05-21 22:27:28 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/SignalDataPrint.cpp	2012-09-12 13:07:04 +0000
@@ -276,6 +276,9 @@ SignalDataPrintFunctions[] = {
   ,{ GSN_LCP_STATUS_CONF, printLCP_STATUS_CONF }
   ,{ GSN_LCP_STATUS_REF, printLCP_STATUS_REF }
 
+  ,{ GSN_TC_COMMITREQ, printTC_COMMITREQ }
+  ,{ GSN_TCROLLBACKREQ, printTCROLLBACKREQ }
+
   ,{ 0, 0 }
 };
 

=== modified file 'storage/ndb/src/common/debugger/signaldata/TcKeyConf.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/TcKeyConf.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/TcKeyConf.cpp	2012-09-12 13:07:04 +0000
@@ -50,10 +50,12 @@ printTCKEYCONF(FILE * output, const Uint
             sig->gci_hi, *(Uint32*)&sig->operations[noOfOp],
             sig->transId1, sig->transId2);
     
-    fprintf(output, " noOfOperations: %u, commitFlag: %s, markerFlag: %s\n", 
+    fprintf(output,
+            " noOfOperations: %u, commitFlag: %s, markerFlag: %s leave: %u\n",
 	    noOfOp,
-	  (TcKeyConf::getCommitFlag(confInfo) == 0)?"false":"true",
-	    (TcKeyConf::getMarkerFlag(confInfo) == 0)?"false":"true");
+            (TcKeyConf::getCommitFlag(confInfo) == 0)?"false":"true",
+	    (TcKeyConf::getMarkerFlag(confInfo) == 0)?"false":"true",
+            TcKeyConf::getLeaveFlag(confInfo));
     fprintf(output, "Operations:\n");
     for(i = 0; i < noOfOp; i++) {
       if(sig->operations[i].attrInfoLen > TcKeyConf::DirtyReadBit)

=== modified file 'storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/TcKeyReq.cpp	2012-09-12 13:07:04 +0000
@@ -49,7 +49,14 @@ printTCKEYREQ(FILE * output, const Uint3
       fprintf(output, "Execute ");
     }
     if(sig->getCommitFlag(requestInfo)){
-      fprintf(output, "Commit ");
+      if (sig->getCommitOrLeaveFlag(requestInfo))
+      {
+        fprintf(output, "CommitOrLeave ");
+      }
+      else
+      {
+        fprintf(output, "Commit ");
+      }
     }
     if (sig->getNoDiskFlag(requestInfo)) {
       fprintf(output, "NoDisk ");
@@ -77,6 +84,9 @@ printTCKEYREQ(FILE * output, const Uint3
     if(sig->getViaSPJFlag(sig->requestInfo)){
       fprintf(output, " spj");
     }
+    if(sig->getCoordinatedTransactionFlag(sig->requestInfo)){
+      fprintf(output, "Coordinated ");
+    }
     if(sig->getQueueOnRedoProblemFlag(sig->requestInfo))
       fprintf(output, "Queue ");
 

=== modified file 'storage/ndb/src/common/debugger/signaldata/TcRollbackRep.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/TcRollbackRep.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/common/debugger/signaldata/TcRollbackRep.cpp	2012-09-12 13:07:04 +0000
@@ -19,6 +19,28 @@
 #include <signaldata/TcRollbackRep.hpp>
 
 bool
+printTCROLLBACKREQ(FILE * output, const Uint32 * theData, Uint32 len,
+                   Uint16 receiverBlockNo)
+{
+  const TcRollbackReq * sig = CAST_CONSTPTR(TcRollbackReq, theData);
+  fprintf(output, " apiConnectPtr: H\'%.8x transId(1, 2): (H\'%.8x, H\'%.8x)\n",
+	  sig->apiConnectPtr, sig->transId1, sig->transId2);
+
+  if (len == TcRollbackReq::SignalLength)
+  {
+    fprintf(output, " Flags:");
+    Uint32 flags = sig->flags;
+    if (flags & TcRollbackReq::MAYBE_BAD)
+      fprintf(output, " MaybeBad");
+    fprintf(output, "\n");
+
+    return true;
+  }
+
+  return false;
+}
+
+bool
 printTCROLLBACKREP(FILE * output, const Uint32 * theData, Uint32 len, Uint16 receiverBlockNo){
   fprintf(output, "Signal data: ");
   Uint32 i = 0;

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2012-08-24 12:00:05 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2012-09-12 13:07:04 +0000
@@ -102,6 +102,12 @@
 #define ZWRONG_SCHEMA_VERSION_ERROR 241 // Also Scan
 #define ZSCAN_NODE_ERROR 250
 #define ZTRANS_STATUS_ERROR 253
+
+#define ZCOORD_ALREADY_LINKED        257
+#define ZCOORD_TRANSID_NO_MATCH      258
+#define ZCOORD_TRANS_ALREADY_STARTED 259
+#define ZCOORD_ABORTED               260
+
 #define ZTIME_OUT_ERROR 266
 #define ZSIMPLE_READ_WITHOUT_AI 271
 #define ZNO_AI_WITH_UPDATE 272
@@ -195,6 +201,17 @@ public:
      * Waiting for FIRE_TRIG_CONF/REF (or operations generated by this)
      */
     CS_WAIT_FIRE_TRIG_REQ = 27
+
+    /**
+     * Coordinated transaction, this ApiConnectRecord has received
+     *   commit-req, but not all in chain has
+     */
+    ,CS_COORD_TRANS_WAIT_COMMIT_REQ = 28
+
+    /**
+     * Coordinated transaction, is committing...waiting for commit to complete
+     */
+    ,CS_COORD_TRANS_WAIT_COMMITTING
   };
 
 #ifndef DBTC_STATE_EXTRACT
@@ -746,6 +763,17 @@ public:
       TF_COMMIT_ACK_MARKER_RECEIVED = 8,
       TF_DEFERRED_CONSTRAINTS = 16, // check constraints in deferred fashion
       TF_DEFERRED_UK_TRIGGERS = 32, // trans has deferred UK triggers
+
+      /**
+       * TF_NO_COMMIT_NEEDED
+       *   Transactions that has never (tried) acquired any lock
+       *   does not need commit/rollback. ndbapi knows this and
+       *   optimizes this away
+       */
+      TF_NO_COMMIT_NEEDED = 128,
+
+      TF_COMMIT_OR_LEAVE = 256,     //
+
       TF_END = 0
     };
     Uint32 m_flags;
@@ -813,6 +841,12 @@ public:
     UintR tcIndxSendArray[6];
     DLList<TcIndexOperation> theSeizedIndexOperations;
 
+    /**
+     * Next in the circular single-linked list of ApiConnectRecord
+     * coordinated with us
+     */
+    UintR nextCoordApiConnect;
+
 #ifdef ERROR_INSERT
     Uint32 continueBCount;  // ERROR_INSERT 8082
 #endif
@@ -822,6 +856,10 @@ public:
       return apiConnectstate == CS_SEND_FIRE_TRIG_REQ ||
         apiConnectstate == CS_WAIT_FIRE_TRIG_REQ ;
     }
+
+    bool getNoCommitNeeded() const {
+      return (m_flags & TF_NO_COMMIT_NEEDED) != 0;
+    }
   };
   
   typedef Ptr<ApiConnectRecord> ApiConnectRecordPtr;
@@ -1464,7 +1502,7 @@ private:
   Uint32 sendFireTrigReqLqh(Signal*, Ptr<TcConnectRecord>, Uint32 pass);
 
   void sendTCKEY_FAILREF(Signal* signal, ApiConnectRecord *);
-  void sendTCKEY_FAILCONF(Signal* signal, ApiConnectRecord *);
+  void sendTCKEY_FAILCONF(Signal* signal, ApiConnectRecordPtr);
   void routeTCKEY_FAILREFCONF(Signal* signal, const ApiConnectRecord *, 
 			      Uint32 gsn, Uint32 len);
   void execTCKEY_FAILREFCONF_R(Signal* signal);
@@ -1568,9 +1606,9 @@ private:
   void handleFailedOperation(Signal* signal,
 			     const LqhKeyRef * const lqhKeyRef, 
 			     bool gotLqhKeyRef);
-  void markOperationAborted(ApiConnectRecord * const regApiPtr,
+  void markOperationAborted(ApiConnectRecordPtr regApiPtr,
 			    TcConnectRecord * const regTcPtr);
-  void clearCommitAckMarker(ApiConnectRecord * const regApiPtr,
+  void clearCommitAckMarker(ApiConnectRecordPtr regApiPtr,
 			    TcConnectRecord * const regTcPtr);
   // Trigger and index handling
   int saveINDXKEYINFO(Signal* signal,
@@ -1725,11 +1763,35 @@ private:
 
   void apiFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
   bool isRefreshSupported() const;
-  
+
+  Uint32 coordTrans_join(ApiConnectRecordPtr newTrans,
+                         Uint32 existingTransPtrI,
+                         Uint32 firstTcConnect);
+  void coordTrans_unlink(ApiConnectRecordPtr existingTrans);
+  void coordTrans_setSavePointId(ApiConnectRecordPtr apiRec, Uint32 newValue);
+  bool coordTrans_searchMarker(ApiConnectRecordPtr apiRec);
+  void coordTrans_markerReceived(ApiConnectRecordPtr apiRec);
+  void coordTrans_clearMarker(ApiConnectRecordPtr);
+
+  void coordTrans_merge(ApiConnectRecordPtr dst, ApiConnectRecordPtr src);
+  void coordTrans_abort(Signal*, ApiConnectRecordPtr);
+  bool coordTrans_commit(ApiConnectRecordPtr);
+  void coordTrans_sendApiCommit(Signal*, ApiConnectRecordPtr);
+
   // Initialisation
   void initData();
   void initRecords();
 
+  /**
+   * Check if a scan can be started on this ApiConnectRecord
+   */
+  Uint32 checkStateStartScan(const ApiConnectRecord*);
+
+  /**
+   * Check if a scan can be started on this buddy ApiConnectRecord
+   */
+  Uint32 checkBuddyStateStartScan(const ApiConnectRecord*);
+
 protected:
   virtual bool getParam(const char* name, Uint32* count);
   
@@ -1974,7 +2036,15 @@ public:
     Uint32 prevHash;
     Uint32 apiConnectPtr;
     Uint16 apiNodeId;
-    NdbNodeBitmask m_commit_ack_marker_nodes; 
+
+    /**
+     * When using coordinated transactions,
+     *   multiple ApiConnectRecord's share the same CommitAckMarker
+     *   This variable keeps count of how many
+     */
+    Uint16 apiConnectCount;
+
+    NdbNodeBitmask m_commit_ack_marker_nodes;
 
     inline bool equal(const CommitAckMarker & p) const {
       return ((p.transid1 == transid1) && (p.transid2 == transid2));

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2012-08-24 12:00:05 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2012-09-12 13:07:04 +0000
@@ -1014,6 +1014,7 @@ Dbtc::handleFailedApiNode(Signal* signal
       case CS_PREPARE_TO_COMMIT:
       case CS_COMMITTING:
       case CS_COMMIT_SENT:
+      case CS_COORD_TRANS_WAIT_COMMITTING:
         /*********************************************************************/
         // These states indicate that an abort process or commit process is 
         // already ongoing. We will set a state in the api record indicating 
@@ -1047,6 +1048,7 @@ Dbtc::handleFailedApiNode(Signal* signal
       case CS_REC_COMMITTING:
       case CS_RECEIVING:
       case CS_STARTED:
+      case CS_COORD_TRANS_WAIT_COMMIT_REQ:
         /*********************************************************************/
         // The api record was in the process of performing a transaction but 
         // had not yet sent all information. 
@@ -1158,17 +1160,19 @@ Dbtc::removeMarkerForFailedAPI(Signal* s
       return;
     }
     
-    if(iter.curr.p->apiNodeId == nodeId){
+    if(iter.curr.p->apiNodeId == nodeId)
+    {
       jam();
-      
+
       /**
        * Check so that the record is not still in use
        *
+       * transactions that gets aborted, automatically clear their markers
+       * transactions that gets committed, keep refcount until they commit
+       *   don't remove the record until that happens...
        */
-      ApiConnectRecordPtr apiConnectPtr;
-      apiConnectPtr.i = iter.curr.p->apiConnectPtr;
-      ptrCheckGuard(apiConnectPtr, capiConnectFilesize, apiConnectRecord);
-      if(apiConnectPtr.p->commitAckMarker == iter.curr.i){
+      if (iter.curr.p->apiConnectCount > 0)
+      {
 	jam();
         /**
          * The record is still active
@@ -1346,7 +1350,6 @@ void Dbtc::execTCRELEASEREQ(Signal* sign
 	   apiConnectptr.p->firstTcConnect == RNIL))
       {
         jam();                                   /* JUST REPLY OK */
-	apiConnectptr.p->m_transaction_nodes.clear();
         releaseApiCon(signal, apiConnectptr.i);
         signal->theData[0] = tuserpointer;
         sendSignal(tapiBlockref,
@@ -2399,6 +2402,7 @@ void Dbtc::initApiConnectRec(Signal* sig
 
   tc_clearbit(regApiPtr->m_flags,
               ApiConnectRecord::TF_DEFERRED_CONSTRAINTS);
+  regApiPtr->m_flags |= ApiConnectRecord::TF_NO_COMMIT_NEEDED;
   c_counters.ctransCount++;
 
 #ifdef ERROR_INSERT
@@ -2406,6 +2410,11 @@ void Dbtc::initApiConnectRec(Signal* sig
 #endif
 
   regApiPtr->m_write_count = 0;
+  ndbassert(regApiPtr->nextCoordApiConnect == RNIL);
+  regApiPtr->nextCoordApiConnect = RNIL;
+
+  tc_clearbit(regApiPtr->m_flags,
+              ApiConnectRecord::TF_COMMIT_OR_LEAVE);
 }//Dbtc::initApiConnectRec()
 
 int
@@ -2503,6 +2512,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   const TcKeyReq * const tcKeyReq = (TcKeyReq *)signal->getDataPtr();
   UintR Treqinfo;
   SectionHandle handle(this, signal);
+  const bool isLongTcKeyReq = (handle.m_cnt != 0);
 
   jamEntry();
   /*-------------------------------------------------------------------------
@@ -2551,6 +2561,8 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   Uint32 TstartFlag = TcKeyReq::getStartFlag(Treqinfo);
   Uint32 TexecFlag =
     TcKeyReq::getExecuteFlag(Treqinfo) ? ApiConnectRecord::TF_EXEC_FLAG : 0;
+  Uint32 TcoordFlag =
+    isLongTcKeyReq ? TcKeyReq::getCoordinatedTransactionFlag(Treqinfo) : 0;
 
   Uint8 Tspecial_op_flags = regApiPtr->m_special_op_flags;
   bool isIndexOpReturn = tc_testbit(regApiPtr->m_flags,
@@ -2591,14 +2603,27 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   }
   break;
   case CS_STARTED:
-    if(TstartFlag == 1 && regApiPtr->firstTcConnect == RNIL)
+    if (TstartFlag == 1 &&
+        regApiPtr->getNoCommitNeeded() &&
+        /** this part "should not be needed" */
+        regApiPtr->firstTcConnect == RNIL)
     {
       /**
        * If last operation in last transaction was a simple/dirty read
        *  it does not have to be committed or rollbacked hence,
        *  the state will be CS_STARTED
+       *
+       * For this case we (optionally) uncoordinate here
+       *
        */
       jam();
+
+      if (regApiPtr->nextCoordApiConnect != RNIL)
+      {
+        // jam() inside coordTrans_unlink
+        coordTrans_unlink(apiConnectptr);
+      }
+
       if (unlikely(getNodeState().getSingleUserMode()) &&
           getNodeState().getSingleUserApi() != sendersNodeId &&
           !localTabptr.p->singleUserMode)
@@ -2754,7 +2779,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   regCachePtr->apiVersionNo = TapiVersionNo;
 
   /* If we have any sections at all then this is a long TCKEYREQ */
-  regCachePtr->isLongTcKeyReq= ( handle.m_cnt != 0 );
+  regCachePtr->isLongTcKeyReq= isLongTcKeyReq;
 
   UintR TapiConnectptrIndex = apiConnectptr.i;
   UintR TsenderData = tcKeyReq->senderData;
@@ -2797,6 +2822,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
 
   if (regCachePtr->isLongTcKeyReq)
   {
+    jam();
     SegmentedSectionPtr keyInfoSec;
     if (handle.getSection(keyInfoSec, TcKeyReq::KeyInfoSectionNum))
       TkeyLength= keyInfoSec.sz;
@@ -2812,6 +2838,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   }
   else
   {
+    jam();
     TkeyLength = TcKeyReq::getKeyLength(Treqinfo);
     TattrLen= TcKeyReq::getAttrinfoLen(tcKeyReq->attrLen);
     titcLenAiInTckeyreq = TcKeyReq::getAIInTcKeyReq(Treqinfo);
@@ -2842,11 +2869,6 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   ndbassert(isExecutingTrigger || 
             (regApiPtr->immediateTriggerId == RNIL));
 
-  if (TexecFlag){
-    Uint32 currSPId = regApiPtr->currSavePointId;
-    regApiPtr->currSavePointId = ++currSPId;
-  }
-
   regCachePtr->attrlength = TattrLen;
   c_counters.cattrinfoCount += TattrLen;
 
@@ -2891,10 +2913,12 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   // The next step is to read the upto three conditional words.
   //-------------------------------------------------------------
   Uint32 TkeyIndex;
+  Uint32 coordApiConnectPtr = RNIL;
   Uint32* TOptionalDataPtr = (Uint32*)&tcKeyReq->scanInfo;
   {
     Uint32  TDistrGHIndex    = TcKeyReq::getScanIndFlag(Treqinfo);
     Uint32  TDistrKeyIndex   = TDistrGHIndex;
+    Uint32  TCoordApiIndex   = TDistrKeyIndex + TDistrKeyFlag;
 
     Uint32 TscanInfo = TcKeyReq::getTakeOverScanInfo(TOptionalDataPtr[0]);
 
@@ -2902,12 +2926,55 @@ void Dbtc::execTCKEYREQ(Signal* signal)
     regCachePtr->scanInfo = TscanInfo;
 
     regCachePtr->distributionKey = TOptionalDataPtr[TDistrKeyIndex];
+    coordApiConnectPtr = TOptionalDataPtr[TCoordApiIndex];
 
-    TkeyIndex = TDistrKeyIndex + TDistrKeyFlag;
+    TkeyIndex = TDistrKeyIndex + TDistrKeyFlag + TcoordFlag;
   }
 
-  regCachePtr->m_no_hash = false;
+  /**
+   * Check coordinated transaction
+   */
+  if (TcoordFlag)
+  {
+    jam();
+
+    Uint32 errCode = coordTrans_join(apiConnectptr,
+                                     coordApiConnectPtr,
+                                     tcConnectptr.i);
+    if (unlikely(errCode != 0))
+    {
+      jam();
+      terrorCode = errCode;
+      releaseSections(handle);
+      releaseAtErrorLab(signal);
+      return;
+    }
+  }
+
+  /**
+   * Handle savepoints
+   * note: must be after checking of coordinated transctions
+   */
+  Uint32 currSPId = regApiPtr->currSavePointId;
+  regTcPtr->savePointId = currSPId;
+  if (TexecFlag)
+  {
+    jam();
+    if (regApiPtr->nextCoordApiConnect == RNIL)
+    {
+      regApiPtr->currSavePointId = currSPId + 1;
+    }
+    else
+    {
+      // jam inside coordTrans_setSavePointId
+      coordTrans_setSavePointId(apiConnectptr, currSPId + 1);
+    }
+  }
 
+  /**
+   * Check hashing
+   */
+  regCachePtr->m_no_hash = false;
   if (TOperationType == ZUNLOCK)
   {
     /* Unlock op has distribution key containing
@@ -3020,8 +3087,11 @@ void Dbtc::execTCKEYREQ(Signal* signal)
     if (!tc_testbit(regApiPtr->m_flags,
                     ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED))
     {
-      if(regApiPtr->commitAckMarker != RNIL)
+      if (regApiPtr->commitAckMarker != RNIL)
+      {
+    marker_found:
         regTcPtr->commitAckMarker = regApiPtr->commitAckMarker;
+      }
       else
       {
         jam();
@@ -3033,6 +3103,17 @@ void Dbtc::execTCKEYREQ(Signal* signal)
           return;
         }
 
+        if (regApiPtr->nextCoordApiConnect != RNIL)
+        {
+          /**
+           * Check if any of the other coordinated transactions has
+           *   a marker
+           */
+          // jam(); inside coordTrans_searchMarker
+          if (coordTrans_searchMarker(apiConnectptr))
+            goto marker_found;
+        }
+
         if (!m_commitAckMarkerHash.seize(tmp))
         {
           TCKEY_abort(signal, 56);
@@ -3046,6 +3127,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
           tmp.p->transid2      = tcKeyReq->transId2;
           tmp.p->apiNodeId     = refToNode(regApiPtr->ndbapiBlockref);
           tmp.p->apiConnectPtr = TapiIndex;
+          tmp.p->apiConnectCount = 1;
           tmp.p->m_commit_ack_marker_nodes.clear();
 #if defined VM_TRACE || defined ERROR_INSERT
 	  {
@@ -3097,6 +3179,7 @@ void Dbtc::execTCKEYREQ(Signal* signal)
     }//switch
   }//if
   
+  Uint32 TcommitOrLeave = TcKeyReq::getCommitOrLeaveFlag(Treqinfo);
   Uint32 TabortOption = TcKeyReq::getAbortOption(Treqinfo);
   regTcPtr->m_execAbortOption = TabortOption;
   
@@ -3107,6 +3190,8 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   if (TcKeyReq::getCommitFlag(Treqinfo) == 1) {
     ndbrequire(TexecuteFlag);
     regApiPtr->apiConnectstate = CS_REC_COMMITTING;
+    regApiPtr->m_flags |=
+      TcommitOrLeave == 0 ? 0 : ApiConnectRecord::TF_COMMIT_OR_LEAVE;
   } else {
     /* ---------------------------------------------------------------------
      *       PREPARE TRANSACTION IS NOT IMPLEMENTED YET.
@@ -3154,6 +3239,762 @@ void Dbtc::execTCKEYREQ(Signal* signal)
   return;
 }//Dbtc::execTCKEYREQ()
 
+Uint32
+Dbtc::coordTrans_join(ApiConnectRecordPtr newTrans,
+                      Uint32 existingTransPtrI,
+                      Uint32 firstTcConnect)
+{
+  jam();
+
+  if (newTrans.p->nextCoordApiConnect != RNIL)
+  {
+    jam();
+    return ZCOORD_ALREADY_LINKED;
+  }
+
+  if (newTrans.p->firstTcConnect != firstTcConnect)
+  {
+    jam();
+    return ZCOORD_TRANS_ALREADY_STARTED;
+  }
+
+  /**
+   * Initial link...coordinated with self...
+   *   no need to check transid etc...
+   */
+  if (newTrans.i == existingTransPtrI)
+  {
+    jam();
+    ndbout_c("initial coord: %u", newTrans.i);
+    newTrans.p->nextCoordApiConnect = newTrans.i;
+    return 0;
+  }
+
+  ApiConnectRecordPtr existingTrans;
+  existingTrans.i = existingTransPtrI;
+  ptrCheckGuard(existingTrans, capiConnectFilesize, apiConnectRecord);
+
+  // check that new transaction has the same transaction id
+  // as the existing transaction
+  UintR existingTransid1 = existingTrans.p->transid[0];
+  UintR existingTransid2 = existingTrans.p->transid[1];
+  UintR newTransid1 = newTrans.p->transid[0];
+  UintR newTransid2 = newTrans.p->transid[1];
+
+  // one of the transaction ids has to match or there is an error
+  if (!(existingTransid1 == newTransid1 && existingTransid2 == newTransid2))
+  {
+    jam();
+    return ZCOORD_TRANSID_NO_MATCH;
+  }
+
+  /**
+   * TODO check state of transaction we're joining!!!
+   */
+
+  /**
+   * Link into chain
+   */
+  if  (existingTrans.p->nextCoordApiConnect == RNIL)
+  {
+    jam();
+    existingTrans.p->nextCoordApiConnect = newTrans.i;
+  }
+  else
+  {
+    jam();
+    /**
+     * Find the end of the existing chain.
+     */
+    ApiConnectRecordPtr endPtr;
+    endPtr.i = existingTrans.p->nextCoordApiConnect;
+    ptrCheckGuard(endPtr, capiConnectFilesize, apiConnectRecord);
+    while (endPtr.p->nextCoordApiConnect != existingTrans.i)
+    {
+      jam();
+      endPtr.i = endPtr.p->nextCoordApiConnect;
+      ptrCheckGuard(endPtr, capiConnectFilesize, apiConnectRecord);
+    }
+
+    /* Found the end; now link end of existing chain to new transaction */
+    endPtr.p->nextCoordApiConnect = newTrans.i;
+  }
+
+  /* Link new transaction to existing */
+  newTrans.p->nextCoordApiConnect = existingTrans.i;
+
+  /* Set the existing savepoint id into the new transaction */
+  newTrans.p->currSavePointId = existingTrans.p->currSavePointId;
+
+  /* Set marker received (if applicable) */
+  newTrans.p->m_flags |=
+    (existingTrans.p->m_flags&ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED);
+
+  return 0;
+}
+
+void
+Dbtc::coordTrans_unlink(ApiConnectRecordPtr existingTrans)
+{
+  jam();
+  UintR nextCoordApiConnect = existingTrans.p->nextCoordApiConnect;
+  if (nextCoordApiConnect == RNIL)
+  {
+    jam();
+    return;
+  }
+
+  // Find the end of the existing chain.
+  ApiConnectRecordPtr endPtr;
+  endPtr.i = nextCoordApiConnect;
+  ptrCheckGuard(endPtr, capiConnectFilesize, apiConnectRecord);
+
+  if (endPtr.p->nextCoordApiConnect == existingTrans.i)
+  {
+    jam();
+    /**
+     * Special case...if chain contains only 2...uncoordinate both of them...
+     */
+    endPtr.p->nextCoordApiConnect = RNIL;
+  }
+  else
+  {
+    jam();
+    // find the end
+    while (endPtr.p->nextCoordApiConnect != existingTrans.i)
+    {
+      jam();
+      endPtr.i = endPtr.p->nextCoordApiConnect;
+      ptrCheckGuard(endPtr, capiConnectFilesize, apiConnectRecord);
+    }
+
+    /**
+     * Found the end
+     * now link end of existing chain to previous next api connect ptr
+     */
+    endPtr.p->nextCoordApiConnect = nextCoordApiConnect;
+  }
+
+  // now unlink the parameter
+  existingTrans.p->nextCoordApiConnect = RNIL;
+
+  return;
+}
+
+/**
+ * Set the savepoint id for all apiConnect records linked with the parameter.
+ */
+void
+Dbtc::coordTrans_setSavePointId(ApiConnectRecordPtr apiRec, Uint32 newValue)
+{
+  jam();
+  apiRec.p->currSavePointId = newValue;
+
+  if (apiRec.p->nextCoordApiConnect != RNIL)
+  {
+    jam();
+    ApiConnectRecordPtr coordRec;
+    coordRec.i = apiRec.p->nextCoordApiConnect;
+
+    /**
+     * TODO : Check state of coordinated ApiConnectRecords?
+     * TODO : Check that we don't set the savepoint lower?
+     */
+    while (coordRec.i != apiRec.i)
+    {
+      jam();
+      ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+      coordRec.p->currSavePointId = newValue;
+      coordRec.i = coordRec.p->nextCoordApiConnect;
+    }
+  }
+}
+
+/**
+ * See if any in the chain has a markter,
+ *   if so update "self" to use this marker too
+ */
+bool
+Dbtc::coordTrans_searchMarker(ApiConnectRecordPtr apiRec)
+{
+  jam();
+  ApiConnectRecordPtr coordRec;
+  coordRec.i = apiRec.p->nextCoordApiConnect;
+
+  CommitAckMarkerPtr tmp;
+  while (coordRec.i != apiRec.i)
+  {
+    jam();
+    ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+
+    tmp.i = coordRec.p->commitAckMarker;
+    coordRec.i = coordRec.p->nextCoordApiConnect; // next
+
+    if (tmp.i != RNIL)
+    {
+      jam();
+      apiRec.p->commitAckMarker = tmp.i;
+      m_commitAckMarkerPool.getPtr(tmp);
+      tmp.p->apiConnectCount++;
+      return true;
+    }
+  }
+
+  return false;
+}
+
+/**
+ * Set TF_COMMIT_ACK_MARKER_RECEIVED on all transactions in chain
+ */
+void
+Dbtc::coordTrans_markerReceived(ApiConnectRecordPtr apiRec)
+{
+  if (tc_testbit(apiRec.p->m_flags,
+                 ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED))
+  {
+    jam();
+    return;
+  }
+
+  apiRec.p->m_flags |= ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED;
+
+  ApiConnectRecordPtr coordRec;
+  coordRec.i = apiRec.p->nextCoordApiConnect;
+
+  while (coordRec.i != apiRec.i)
+  {
+    jam();
+    ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+
+    coordRec.i = coordRec.p->nextCoordApiConnect; // next
+    coordRec.p->m_flags |= ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED;
+  }
+}
+
+void
+Dbtc::coordTrans_clearMarker(ApiConnectRecordPtr apiRec)
+{
+  if (apiRec.p->no_commit_ack_markers > 0)
+  {
+    jam();
+    return;
+  }
+
+  CommitAckMarkerPtr tmp;
+  m_commitAckMarkerPool.getPtr(tmp, apiRec.p->commitAckMarker);
+
+  ndbrequire(tmp.p->apiConnectCount > 0);
+  tmp.p->apiConnectCount--;
+
+  if (tmp.p->apiConnectCount > 0)
+  {
+    jam();
+    return;
+  }
+
+  apiRec.p->commitAckMarker = RNIL;
+  ndbassert(!tc_testbit(apiRec.p->m_flags,
+                        ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED));
+  tc_clearbit(apiRec.p->m_flags,
+              ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED);
+
+  ApiConnectRecordPtr coordRec;
+  coordRec.i = apiRec.p->nextCoordApiConnect;
+
+  while (coordRec.i != apiRec.i)
+  {
+    jam();
+    ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+
+    coordRec.i = coordRec.p->nextCoordApiConnect; // next
+
+    ndbrequire(coordRec.p->no_commit_ack_markers == 0);
+    coordRec.p->commitAckMarker = RNIL;
+    ndbassert(!tc_testbit(coordRec.p->m_flags,
+                          ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED));
+    tc_clearbit(coordRec.p->m_flags,
+                ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED);
+  }
+
+  m_commitAckMarkerHash.release(tmp);
+}
+
+void
+Dbtc::coordTrans_merge(ApiConnectRecordPtr dstPtr,
+                       ApiConnectRecordPtr srcPtr)
+{
+  if (srcPtr.i == dstPtr.i)
+  {
+    jam();
+    return;
+  }
+
+  /**
+   * move all relevant information (for commit/abort) from srcPtr onto dstPtr
+   */
+
+  /**
+   * Operations...
+   */
+  Uint32 src_firstTcConnect = srcPtr.p->firstTcConnect;
+  Uint32 src_lastTcConnect = srcPtr.p->lastTcConnect;
+  Uint32 dst_firstTcConnect = dstPtr.p->firstTcConnect;
+  Uint32 dst_lastTcConnect = dstPtr.p->lastTcConnect;
+
+  if (src_firstTcConnect == RNIL)
+  {
+    jam();
+    ndbassert(src_lastTcConnect == RNIL);
+  }
+  else
+  {
+    jam();
+    TcConnectRecordPtr srcFirst;
+
+    srcFirst.i = src_firstTcConnect;
+    ptrCheckGuard(srcFirst, ctcConnectFilesize, tcConnectRecord);
+
+    if (dst_firstTcConnect == RNIL)
+    {
+      jam();
+      ndbassert(dst_lastTcConnect == RNIL);
+      dstPtr.p->firstTcConnect = src_firstTcConnect;
+      dstPtr.p->lastTcConnect = src_lastTcConnect;
+    }
+    else
+    {
+      jam();
+      TcConnectRecordPtr dstLast;
+      dstLast.i = dst_lastTcConnect;
+      ptrCheckGuard(dstLast, ctcConnectFilesize, tcConnectRecord);
+      ndbrequire(dstLast.p->nextTcConnect == RNIL);
+      dstLast.p->nextTcConnect = srcFirst.i;
+      dstPtr.p->lastTcConnect = src_lastTcConnect;
+    }
+
+    ndbrequire(srcFirst.p->prevTcConnect == RNIL);
+    srcFirst.p->prevTcConnect = dst_lastTcConnect;
+
+    /**
+     * This is seriously not optimal...but it will have to do for now
+     */
+    while (srcFirst.i != RNIL)
+    {
+      jam();
+      ptrCheckGuard(srcFirst, ctcConnectFilesize, tcConnectRecord);
+      srcFirst.p->apiConnect = dstPtr.i;
+      srcFirst.i = srcFirst.p->nextTcConnect;
+    }
+  }
+
+  srcPtr.p->firstTcConnect = RNIL;
+  srcPtr.p->lastTcConnect = RNIL;
+
+  dstPtr.p->lqhkeyreqrec += srcPtr.p->lqhkeyreqrec;
+  dstPtr.p->lqhkeyconfrec += srcPtr.p->lqhkeyconfrec;
+
+  srcPtr.p->lqhkeyreqrec = 0;
+  srcPtr.p->lqhkeyconfrec = 0;
+
+  /**
+   * Marker info
+   */
+  if (srcPtr.p->commitAckMarker != RNIL)
+  {
+    jam();
+    if (dstPtr.p->commitAckMarker == RNIL)
+    {
+      jam();
+      /**
+       * No need to modify use count...
+       *   ref-count is taken over
+       */
+      dstPtr.p->commitAckMarker = srcPtr.p->commitAckMarker;
+    }
+    else if (dstPtr.p->commitAckMarker == srcPtr.p->commitAckMarker)
+    {
+      jam();
+      /**
+       * decrease ref-count...
+       */
+      CommitAckMarkerPtr tmp;
+      m_commitAckMarkerPool.getPtr(tmp, srcPtr.p->commitAckMarker);
+      ndbrequire(tmp.p->apiConnectCount > 1);
+      tmp.p->apiConnectCount--;
+    }
+    else
+    {
+      /**
+       * this shouldn't happen...(two apiconnectrecord in same coordinated
+       *   transaction but with different markers...)
+       *
+       */
+      ndbrequire(false);
+    }
+  }
+  dstPtr.p->no_commit_ack_markers += srcPtr.p->no_commit_ack_markers;
+  srcPtr.p->commitAckMarker = RNIL;
+  srcPtr.p->no_commit_ack_markers = 0;
+
+  /**
+   * Some interesting state bits...
+   */
+  Uint32 mask =
+    ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED |
+    ApiConnectRecord::TF_DEFERRED_CONSTRAINTS       |
+    ApiConnectRecord::TF_DEFERRED_UK_TRIGGERS;
+
+  dstPtr.p->m_flags |= (srcPtr.p->m_flags & mask);
+  srcPtr.p->m_flags &= ~mask;
+
+  /**
+   * node mask
+   */
+  dstPtr.p->m_transaction_nodes.bitOR(srcPtr.p->m_transaction_nodes);
+  srcPtr.p->m_transaction_nodes.clear();
+
+  /**
+   * singleUserMode...
+   */
+  dstPtr.p->singleUserMode |= srcPtr.p->singleUserMode;
+  srcPtr.p->singleUserMode = 0;
+
+  /*
+   */
+  releaseAllSeizedIndexOperations(srcPtr.p);
+}
+
+void
+Dbtc::coordTrans_abort(Signal * signal, ApiConnectRecordPtr apiRec)
+{
+  BaseString msg;
+  msg.appfmt("coordTrans_abort(%u 0x%.8x %.8x ref: %u req/conf: %u/%u/%u)",
+             apiRec.i,
+             apiRec.p->transid[0],
+             apiRec.p->transid[1],
+             refToBlock(apiRec.p->ndbapiBlockref),
+             apiRec.p->lqhkeyreqrec,
+             apiRec.p->lqhkeyconfrec,
+             apiRec.p->firstTcConnect);
+
+  /**
+   * save global variable
+   */
+  ApiConnectRecordPtr save = apiConnectptr;
+  Uint32 saveTerror = terrorCode;
+  Uint32 useTerrorCode = 0;
+
+  if (apiRec.p->returnsignal == RS_TCROLLBACKCONF)
+  {
+    jam();
+    /**
+     * requested abort
+     */
+    useTerrorCode = ZCOORD_ABORTED;
+  }
+  else if (apiRec.p->returncode != 0)
+  {
+    jam();
+    /**
+     * use same error code as transaction getting the abort
+     */
+    useTerrorCode = apiRec.p->returncode;
+  }
+  else
+  {
+    jam();
+    /**
+     * E.g handleFailedApiNode gets into this path
+     *       use generic error code
+     */
+    useTerrorCode = ZCOORD_ABORTED;
+  }
+
+  /**
+   * merge other transactions in chain into this(apiRec),
+   *   unlink chain
+   *   mark them as aborted
+   *   perform the actual abort on this(apiRec)
+   */
+  ApiConnectRecordPtr coordRec;
+  coordRec.i = apiRec.p->nextCoordApiConnect;
+
+  while (coordRec.i != apiRec.i)
+  {
+    jam();
+    ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+
+    ndbassert(coordRec.p->transid[0] == apiRec.p->transid[0]);
+    ndbassert(coordRec.p->transid[1] == apiRec.p->transid[1]);
+
+    Uint32 next = coordRec.p->nextCoordApiConnect; // next
+    coordRec.p->nextCoordApiConnect = RNIL;
+
+    msg.appfmt("- (%u ref: %u req/conf: %u/%u/%u)",
+               coordRec.i, refToBlock(coordRec.p->ndbapiBlockref),
+               coordRec.p->lqhkeyreqrec,
+               coordRec.p->lqhkeyconfrec,
+               coordRec.p->firstTcConnect);
+
+    coordTrans_merge(apiRec, coordRec);
+
+    /**
+     * set global variable
+     */
+    terrorCode = useTerrorCode;
+    apiConnectptr = coordRec;
+    abortErrorLab(signal);
+
+    coordRec.i = next;
+  }
+
+#ifdef VM_TRACE
+  if (apiRec.p->commitAckMarker != RNIL)
+  {
+    CommitAckMarkerPtr tmp;
+    m_commitAckMarkerPool.getPtr(tmp, apiRec.p->commitAckMarker);
+    ndbrequire(tmp.p->apiConnectCount == 1);
+  }
+#endif
+
+  if (0)
+  {
+    printf("%s (%u/%u/%u)\n",
+           msg.c_str(),
+           apiRec.p->lqhkeyreqrec,
+           apiRec.p->lqhkeyconfrec,
+           apiRec.p->firstTcConnect);
+    fflush(stdout);
+  }
+
+  apiRec.p->nextCoordApiConnect = RNIL;
+
+  /**
+   * Restore global variable
+   */
+  apiConnectptr = save;
+  terrorCode = saveTerror;
+}
+
+bool
+Dbtc::coordTrans_commit(ApiConnectRecordPtr apiRec)
+{
+  /**
+   * has API ordered CommitOrLeave
+   */
+  const bool orleave =
+    (apiRec.p->m_flags & ApiConnectRecord::TF_COMMIT_OR_LEAVE) != 0;
+
+  apiRec.p->apiConnectstate = CS_COORD_TRANS_WAIT_COMMIT_REQ;
+
+  ApiConnectRecordPtr coordRec;
+  coordRec.i = apiRec.p->nextCoordApiConnect;
+
+  /**
+   * will this be a leave or a commit
+   */
+  bool doleave = false;
+  while (doleave == false && coordRec.i != apiRec.i)
+  {
+    jam();
+    ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+    Uint32 next = coordRec.p->nextCoordApiConnect;
+
+    if (coordRec.p->apiConnectstate == CS_COORD_TRANS_WAIT_COMMIT_REQ)
+    {
+      jam();
+      /**
+       * OK
+       */
+    }
+    else if (coordRec.p->getNoCommitNeeded())
+    {
+      jam();
+
+      /**
+       * Special case. Ignore the committed-read/scan only transaction
+       *   when deciding if everyone is ready or not
+       */
+    }
+    else if (orleave)
+    {
+      jam();
+      /**
+       * This is a "target" to dump our stuff onto
+       */
+      doleave = true;
+    }
+    else
+    {
+      jam();
+      return false;
+    }
+    coordRec.i = next;
+  }
+
+  /**
+   * Everyone is ready...
+   *   merge...
+   */
+  coordRec.i = apiRec.p->nextCoordApiConnect;
+  while (coordRec.i != apiRec.i)
+  {
+    jam();
+    ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+    Uint32 next = coordRec.p->nextCoordApiConnect; // next
+
+    if (coordRec.p->getNoCommitNeeded())
+    {
+      jam();
+
+      if (!doleave)
+      {
+        jam();
+        /**
+         * Special case. Unlink the committed-read/scan only transaction
+         *   when starting to commit
+         */
+        coordTrans_unlink(coordRec);
+
+        /**
+         * and set it too aborting...to prevent new operations being added
+         *   to it
+         */
+        setApiConTimer(coordRec.i, 0, __LINE__);
+        coordRec.p->apiConnectstate = CS_ABORTING;
+        coordRec.p->abortState = AS_IDLE;
+        coordRec.p->m_transaction_nodes.clear();
+      }
+    }
+    else if (doleave)
+    {
+      jam();
+
+      /**
+       *
+       * We found a transaction in chain that is not ready to commit,
+       *   instead of block waiting on more commit's
+       *   leave the transaction
+       */
+
+      /**
+       * Move all state from apiRec to coordRec
+       */
+      coordTrans_merge(coordRec, apiRec);
+
+      /**
+       * leave chain
+       */
+      coordTrans_unlink(apiRec);
+
+      /**
+       * we're done
+       */
+      return true;
+    }
+    else
+    {
+      jam();
+      coordTrans_merge(apiRec, coordRec);
+      coordRec.p->apiConnectstate = CS_COORD_TRANS_WAIT_COMMITTING;
+    }
+
+    coordRec.i = next;
+  }
+
+  /**
+   *
+   */
+  return true;
+}
+
+void
+Dbtc::coordTrans_sendApiCommit(Signal * signal, ApiConnectRecordPtr apiRec)
+{
+  /**
+   * save global variable
+   */
+  ApiConnectRecordPtr save = apiConnectptr;
+
+  Uint32 transId1 = apiRec.p->transid[0];
+  Uint32 transId2 = apiRec.p->transid[1];
+  Uint64 gci = apiRec.p->globalcheckpointid;
+  Uint32 gci_hi = Uint32(apiRec.p->globalcheckpointid >> 32);
+  Uint32 gci_lo = Uint32(apiRec.p->globalcheckpointid);
+
+  ApiConnectRecordPtr coordRec;
+  coordRec.i = apiRec.p->nextCoordApiConnect;
+  while (coordRec.i != apiRec.i)
+  {
+    jam();
+    ptrCheckGuard(coordRec, capiConnectFilesize, apiConnectRecord);
+    ndbrequire(coordRec.p->apiConnectstate == CS_COORD_TRANS_WAIT_COMMITTING);
+    ndbrequire(coordRec.p->commitAckMarker == RNIL);
+
+    switch(coordRec.p->returnsignal) {
+    case RS_TCKEYCONF:
+      jam();
+      /**
+       * set global variable
+       */
+      apiConnectptr = coordRec;
+      coordRec.p->globalcheckpointid = gci;
+      sendtckeyconf(signal, 1);
+      break;
+    case RS_TC_COMMITCONF:
+    {
+      jam();
+      TcCommitConf * commitConf = CAST_PTR(TcCommitConf,
+                                           signal->getDataPtrSend());
+
+      commitConf->apiConnectPtr = coordRec.p->ndbapiConnect;
+      commitConf->transId1 = transId1;
+      commitConf->transId2 = transId2;
+      commitConf->gci_hi = gci_hi;
+      commitConf->gci_lo = gci_lo;
+
+      sendSignal(coordRec.p->ndbapiBlockref, GSN_TC_COMMITCONF, signal,
+                 TcCommitConf::SignalLength, JBB);
+      break;
+    }
+    default:
+      ndbassert(false);
+    }
+
+    /**
+     * From copyApi
+     */
+    setApiConTimer(coordRec.i, 0, __LINE__);
+    coordRec.p->apiConnectstate = CS_CONNECTED;
+    ndbassert(coordRec.p->commitAckMarker == RNIL);
+    ndbassert(coordRec.p->firstTcConnect == RNIL);
+    ndbassert(coordRec.p->lastTcConnect == RNIL);
+    ndbassert(coordRec.p->singleUserMode == 0);
+    ndbassert(coordRec.p->m_transaction_nodes.isclear());
+    ndbassert(coordRec.p->theSeizedIndexOperations.isEmpty());
+
+    coordRec.p->commitAckMarker = RNIL;
+    coordRec.p->firstTcConnect = RNIL;
+    coordRec.p->lastTcConnect = RNIL;
+    coordRec.p->singleUserMode = 0;
+    coordRec.p->m_transaction_nodes.clear();
+    releaseAllSeizedIndexOperations(coordRec.p);
+
+    coordRec.i = coordRec.p->nextCoordApiConnect; // next
+    coordRec.p->nextCoordApiConnect = RNIL;       // unlink
+  }
+
+  /**
+   * uncoordinate
+   */
+  apiRec.p->nextCoordApiConnect = RNIL;
+
+  /**
+   * Restore global variable
+   */
+  apiConnectptr = save;
+}
+
 static
 void
 handle_reorg_trigger(DiGetNodesConf * conf)
@@ -3534,7 +4375,7 @@ void Dbtc::attrinfoDihReceivedLab(Signal
     releaseAttrinfo();
     regApiPtr->lqhkeyreqrec--;
     unlinkReadyTcCon(signal);
-    clearCommitAckMarker(regApiPtr, regTcPtr);
+    clearCommitAckMarker(apiConnectptr, regTcPtr);
     releaseTcCon();
 
     if (trigOp != RNIL)
@@ -3907,6 +4748,7 @@ void Dbtc::packLqhkeyreq040Lab(Signal* s
     return;
   }//if
   regTcPtr->tcConnectstate = OS_OPERATING;
+  tc_clearbit(regApiPtr->m_flags, ApiConnectRecord::TF_NO_COMMIT_NEEDED);
   return;
 }//Dbtc::packLqhkeyreq040Lab()
 
@@ -3968,7 +4810,7 @@ void Dbtc::releaseDirtyRead(Signal* sign
      * Special case of lqhKeyConf_checkTransactionState:
      * - commit with zero operations: handle only for simple read
      */
-    sendtckeyconf(signal, state == CS_START_COMMITTING);
+    sendtckeyconf(signal, state == CS_START_COMMITTING ? 1 : 2);
     regApiPtr.p->apiConnectstate = 
       (state == CS_START_COMMITTING ? CS_CONNECTED : state);
     setApiConTimer(regApiPtr.i, 0, __LINE__);
@@ -4318,9 +5160,7 @@ void Dbtc::execLQHKEYCONF(Signal* signal
   }//if
 
 #ifdef ERROR_INSERT
-  if (ERROR_INSERTED(8029)) {
-    systemErrorLab(signal, __LINE__);
-  }//if
+  CRASH_INSERTION(8029);
   if (ERROR_INSERTED(8003)) {
     if (regApiPtr.p->apiConnectstate == CS_STARTED) {
       CLEAR_ERROR_INSERT_VALUE;
@@ -4390,13 +5230,25 @@ void Dbtc::execLQHKEYCONF(Signal* signal
     const Uint32 noOfLqhs = regTcPtr->noOfNodes;
     CommitAckMarker * tmp = m_commitAckMarkerHash.getPtr(commitAckMarker);
     jam();
-    regApiPtr.p->m_flags |= ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED;
+
     /**
      * Populate LQH array
      */
     for(Uint32 i = 0; i < noOfLqhs; i++)
       tmp->m_commit_ack_marker_nodes.set(regTcPtr->tcNodedata[i]);
+
+    if (regApiPtr.p->nextCoordApiConnect != RNIL)
+    {
+      jam();
+      coordTrans_markerReceived(regApiPtr);
+    }
+
+    /**
+     * unconditionally set (regardless if coord or not)
+     */
+    regApiPtr.p->m_flags |= ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED;
   }
+
   if (regTcPtr->isIndexOp(regTcPtr->m_special_op_flags)) {
     jam();
     // This was an internal TCKEYREQ
@@ -4941,6 +5793,20 @@ void Dbtc::diverify010Lab(Signal* signal
     systemErrorLab(signal, __LINE__);
   }//if
 
+  if (regApiPtr->nextCoordApiConnect != RNIL)
+  {
+    jam();
+    if (!coordTrans_commit(apiConnectptr))
+    {
+      jam();
+      /**
+       * We need wait for commit-req on the other transactions in the chain
+       */
+      setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
+      return;
+    }
+  }
+
   if (tc_testbit(regApiPtr->m_flags, ApiConnectRecord::TF_DEFERRED_UK_TRIGGERS))
   {
     jam();
@@ -4987,14 +5853,64 @@ void Dbtc::diverify010Lab(Signal* signal
       return;
     }
   }
-  else
+
+  if (regApiPtr->nextCoordApiConnect != RNIL)
+  {
+    jam();
+    coordTrans_sendApiCommit(signal, apiConnectptr);
+  }
+
+  /**
+   * Empty commit
+   */
+  switch(regApiPtr->returnsignal){
+  case RS_TCKEYCONF:
   {
     jam();
     sendtckeyconf(signal, 1);
-    regApiPtr->apiConnectstate = CS_CONNECTED;
-    regApiPtr->m_transaction_nodes.clear();
-    setApiConTimer(apiConnectptr.i, 0,__LINE__);
+    break;
+  }
+  case RS_TC_COMMITCONF:
+  {
+    jam();
+    /**
+     * empty trans should not have a marker...
+     * but if it does...so be it
+     */
+    ndbassert(regApiPtr->commitAckMarker == RNIL);
+    Uint32 markerBit = (regApiPtr->commitAckMarker == RNIL) ? 0 : 1;
+
+    TcCommitConf * commitConf = CAST_PTR(TcCommitConf,
+                                         signal->getDataPtrSend());
+
+    commitConf->apiConnectPtr = regApiPtr->ndbapiConnect | markerBit;
+    commitConf->transId1 = regApiPtr->transid[0];
+    commitConf->transId2 = regApiPtr->transid[1];
+    commitConf->gci_hi = Uint32(regApiPtr->globalcheckpointid >> 32);
+    commitConf->gci_lo = Uint32(regApiPtr->globalcheckpointid);
+    sendSignal(regApiPtr->ndbapiBlockref, GSN_TC_COMMITCONF, signal,
+	       TcCommitConf::SignalLength, JBB);
+    break;
+  }
+  case RS_NO_RETURN:
+  {
+    jam();
+    // maybe apifailreq...
+    break;
+  }
+  case RS_TCROLLBACKREP:
+  case RS_TCROLLBACKCONF:
+  {
+    jam();
+    // WTF ?? maybe apifailreq...
+    ndbassert(false);
+    break;
   }
+  }
+
+  regApiPtr->apiConnectstate = CS_CONNECTED;
+  regApiPtr->m_transaction_nodes.clear();
+  setApiConTimer(apiConnectptr.i, 0,__LINE__);
 }//Dbtc::diverify010Lab()
 
 /* ------------------------------------------------------------------------- */
@@ -5452,10 +6368,7 @@ void Dbtc::execCOMMITTED(Signal* signal)
     /*-------------------------------------------------------*/
     return;
   }//if
-  if (ERROR_INSERTED(8020)) {
-    jam();
-    systemErrorLab(signal, __LINE__);
-  }//if
+  CRASH_INSERTION(8020);
   /*-------------------------------------------------------*/
   /* THE ENTIRE TRANSACTION IS NOW COMMITED                */
   /* NOW WE NEED TO SEND THE RESPONSE TO THE APPLICATION.  */
@@ -5487,6 +6400,7 @@ Ptr<Dbtc::ApiConnectRecord>
 Dbtc::sendApiCommit(Signal* signal)
 {
   ApiConnectRecordPtr regApiPtr = apiConnectptr;
+  const Uint32 marker = regApiPtr.p->commitAckMarker;
 
   if (ERROR_INSERTED(8055))
   {
@@ -5506,6 +6420,12 @@ Dbtc::sendApiCommit(Signal* signal)
     goto err8055;
   }
 
+  if (regApiPtr.p->nextCoordApiConnect != RNIL)
+  {
+    jam();
+    coordTrans_sendApiCommit(signal, regApiPtr);
+  }
+
   if (regApiPtr.p->returnsignal == RS_TCKEYCONF)
   {
     if (ERROR_INSERTED(8054))
@@ -5519,16 +6439,16 @@ Dbtc::sendApiCommit(Signal* signal)
       sendtckeyconf(signal, 1);
     }
   }
-  else if (regApiPtr.p->returnsignal == RS_TC_COMMITCONF) 
+  else if (regApiPtr.p->returnsignal == RS_TC_COMMITCONF)
   {
     jam();
     TcCommitConf * const commitConf = (TcCommitConf *)&signal->theData[0];
-    if(regApiPtr.p->commitAckMarker == RNIL)
+    if (marker == RNIL)
     {
       jam();
       commitConf->apiConnectPtr = regApiPtr.p->ndbapiConnect;
-    } 
-    else 
+    }
+    else
     {
       jam();
       commitConf->apiConnectPtr = regApiPtr.p->ndbapiConnect | 1;
@@ -5540,7 +6460,7 @@ Dbtc::sendApiCommit(Signal* signal)
 
     sendSignal(regApiPtr.p->ndbapiBlockref, GSN_TC_COMMITCONF, signal,
 	       TcCommitConf::SignalLength, JBB);
-  } 
+  }
   else if (regApiPtr.p->returnsignal == RS_NO_RETURN) 
   {
     jam();
@@ -5551,6 +6471,22 @@ Dbtc::sendApiCommit(Signal* signal)
     return regApiPtr;
   }//if
 
+  if (marker != RNIL)
+  {
+    jam();
+    /**
+     * This is the place were we "disown" the commit-ack-marker
+     *   and handover responsibility to ndbapi.
+     *
+     * There should be exactly one reference...
+     *   (the participant performing the actual commit)
+     */
+    CommitAckMarkerPtr tmp;
+    m_commitAckMarkerPool.getPtr(tmp, marker);
+    ndbrequire(tmp.p->apiConnectCount == 1);
+    tmp.p->apiConnectCount = 0;
+  }
+
 err8055:
   Ptr<ApiConnectRecord> copyPtr;
   UintR TapiConnectFilesize = capiConnectFilesize;
@@ -6374,7 +7310,7 @@ void Dbtc::execLQHKEYREF(Signal* signal)
          * An failing op in LQH, never leaves the commit ack marker around
          * TODO: This can be bug in ordinary code too!!!
          */
-        clearCommitAckMarker(regApiPtr, regTcPtr);
+        clearCommitAckMarker(apiConnectptr, regTcPtr);
 
         unlinkReadyTcCon(signal);
         releaseTcCon();
@@ -6384,7 +7320,7 @@ void Dbtc::execLQHKEYREF(Signal* signal)
       }
       
   do_abort:
-      markOperationAborted(regApiPtr, regTcPtr);
+      markOperationAborted(apiConnectptr, regTcPtr);
       
       if(regApiPtr->apiConnectstate == CS_ABORTING){
 	/**
@@ -6464,33 +7400,44 @@ void Dbtc::execLQHKEYREF(Signal* signal)
   return;
 }//Dbtc::execLQHKEYREF()
 
-void Dbtc::clearCommitAckMarker(ApiConnectRecord * const regApiPtr,
+void Dbtc::clearCommitAckMarker(ApiConnectRecordPtr regApiPtr,
 				TcConnectRecord * const regTcPtr)
 {
   const Uint32 commitAckMarker = regTcPtr->commitAckMarker;
-  if (regApiPtr->commitAckMarker == RNIL)
+  if (regApiPtr.p->commitAckMarker == RNIL)
   {
     ndbassert(commitAckMarker == RNIL);
   }
 
-  if(commitAckMarker != RNIL)
+  if (commitAckMarker != RNIL)
   {
     jam();
-    ndbassert(regApiPtr->commitAckMarker == commitAckMarker);
-    ndbrequire(regApiPtr->no_commit_ack_markers > 0);
-    regApiPtr->no_commit_ack_markers--;
+    ndbassert(regApiPtr.p->commitAckMarker == commitAckMarker);
+    ndbrequire(regApiPtr.p->no_commit_ack_markers > 0);
+    regApiPtr.p->no_commit_ack_markers--;
     regTcPtr->commitAckMarker = RNIL;
-    if (regApiPtr->no_commit_ack_markers == 0)
+
+    if (regApiPtr.p->nextCoordApiConnect != RNIL)
+    {
+      jam();
+      coordTrans_clearMarker(regApiPtr);
+    }
+    else if (regApiPtr.p->no_commit_ack_markers == 0)
     {
-      regApiPtr->commitAckMarker = RNIL;
-      tc_clearbit(regApiPtr->m_flags,
+      jam();
+      regApiPtr.p->commitAckMarker = RNIL;
+      tc_clearbit(regApiPtr.p->m_flags,
                   ApiConnectRecord::TF_COMMIT_ACK_MARKER_RECEIVED);
-      m_commitAckMarkerHash.release(commitAckMarker);
+
+      CommitAckMarkerPtr tmp;
+      m_commitAckMarkerPool.getPtr(tmp, commitAckMarker);
+      ndbrequire(tmp.p->apiConnectCount == 1); // or else nextCoordApiConnect
+      m_commitAckMarkerHash.release(tmp);
     }
   }
 }
 
-void Dbtc::markOperationAborted(ApiConnectRecord * const regApiPtr,
+void Dbtc::markOperationAborted(ApiConnectRecordPtr regApiPtr,
 				TcConnectRecord * const regTcPtr)
 {
   /*------------------------------------------------------------------------
@@ -6513,11 +7460,20 @@ void Dbtc::execTC_COMMITREQ(Signal* sign
   UintR compare_transid1, compare_transid2;
 
   jamEntry();
-  apiConnectptr.i = signal->theData[0];
+  const TcCommitReq * req = CAST_CONSTPTR(TcCommitReq, signal->getDataPtr());
+  Uint32 flags = req->flags;
+
+  if (signal->getLength() < TcCommitReq::SignalLength)
+  {
+    jam();
+    flags = 0;
+  }
+
+  apiConnectptr.i = req->apiConnectPtr;
   if (apiConnectptr.i < capiConnectFilesize) {
     ptrAss(apiConnectptr, apiConnectRecord);
-    compare_transid1 = apiConnectptr.p->transid[0] ^ signal->theData[1];
-    compare_transid2 = apiConnectptr.p->transid[1] ^ signal->theData[2];
+    compare_transid1 = apiConnectptr.p->transid[0] ^ req->transId1;
+    compare_transid2 = apiConnectptr.p->transid[1] ^ req->transId2;
     compare_transid1 = compare_transid1 | compare_transid2;
     if (compare_transid1 != 0) {
       jam();
@@ -6530,14 +7486,18 @@ void Dbtc::execTC_COMMITREQ(Signal* sign
     const Uint32 apiBlockRef   = regApiPtr->ndbapiBlockref;
     const Uint32 transId1      = regApiPtr->transid[0];
     const Uint32 transId2      = regApiPtr->transid[1];
+    const bool leave = (flags & TcCommitReq::COMMIT_OR_LEAVE);
+
     Uint32 errorCode           = 0;
 
     regApiPtr->m_flags |= ApiConnectRecord::TF_EXEC_FLAG;
+    regApiPtr->m_flags |= leave == false ? 0 :
+      ApiConnectRecord::TF_COMMIT_OR_LEAVE;
+
     switch (regApiPtr->apiConnectstate) {
     case CS_STARTED:
       tcConnectptr.i = regApiPtr->firstTcConnect;
-      if (tcConnectptr.i != RNIL) {
-        ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
+      if (tcConnectptr.i != RNIL || regApiPtr->nextCoordApiConnect != RNIL) {
         if (regApiPtr->lqhkeyconfrec == regApiPtr->lqhkeyreqrec) {
           jam();
           /*******************************************************************/
@@ -6560,7 +7520,9 @@ void Dbtc::execTC_COMMITREQ(Signal* sign
           errorCode = ZTRANS_STATUS_ERROR;
           abort010Lab(signal);
         }//if
-      } else {
+      }
+      else
+      {
         jam();
         /**
          * No operations, accept commit
@@ -6655,25 +7617,34 @@ void Dbtc::execTCROLLBACKREQ(Signal* sig
 
   jamEntry();
 
-  if(unlikely((signal->getLength() >= 4) && (signal->theData[3] & 0x1)))
+  const TcRollbackReq * req = CAST_CONSTPTR(TcRollbackReq,signal->getDataPtr());
+  Uint32 flags = req->flags;
+  if (signal->getLength() < TcRollbackReq::SignalLength)
+  {
+    jam();
+    flags = 0;
+  }
+
+  if (unlikely(flags & TcRollbackReq::MAYBE_BAD))
   {
     ndbout_c("Trying to roll back potentially bad txn\n");
     potentiallyBad= true;
   }
 
-  apiConnectptr.i = signal->theData[0];
+  apiConnectptr.i = req->apiConnectPtr;
   if (apiConnectptr.i >= capiConnectFilesize) {
     goto TC_ROLL_warning;
   }//if
   ptrAss(apiConnectptr, apiConnectRecord);
-  compare_transid1 = apiConnectptr.p->transid[0] ^ signal->theData[1];
-  compare_transid2 = apiConnectptr.p->transid[1] ^ signal->theData[2];
+  compare_transid1 = apiConnectptr.p->transid[0] ^ req->transId1;
+  compare_transid2 = apiConnectptr.p->transid[1] ^ req->transId2;
   compare_transid1 = compare_transid1 | compare_transid2;
   if (compare_transid1 != 0) {
     jam();
     return;
   }//if
 
+  terrorCode = 0;
   apiConnectptr.p->m_flags |= ApiConnectRecord::TF_EXEC_FLAG;
   switch (apiConnectptr.p->apiConnectstate) {
   case CS_STARTED:
@@ -6719,6 +7690,10 @@ void Dbtc::execTCROLLBACKREQ(Signal* sig
     jam();
     if (apiConnectptr.p->abortState == AS_IDLE) {
       jam();
+      /**
+       * all transactions should unlink already when setting CS_ABORTING
+       */
+      ndbassert(apiConnectptr.p->nextCoordApiConnect == RNIL);
       signal->theData[0] = apiConnectptr.p->ndbapiConnect;
       signal->theData[1] = apiConnectptr.p->transid[0];
       signal->theData[2] = apiConnectptr.p->transid[1];
@@ -7056,15 +8031,13 @@ void Dbtc::execABORTED(Signal* signal)
     warningReport(signal, 1);
     return;
   }//if
-  if (ERROR_INSERTED(8024)) {
-    jam();
-    systemErrorLab(signal, __LINE__);
-  }//if
+
+  CRASH_INSERTION(8024);
 
   /**
    * Release marker
    */
-  clearCommitAckMarker(apiConnectptr.p, tcConnectptr.p);
+  clearCommitAckMarker(apiConnectptr, tcConnectptr.p);
 
   Uint32 i;
   Uint32 Tfound = 0;
@@ -7151,6 +8124,13 @@ void Dbtc::abortErrorLab(Signal* signal)
 void Dbtc::abort010Lab(Signal* signal) 
 {
   ApiConnectRecord * transP = apiConnectptr.p;
+
+  if (transP->nextCoordApiConnect != RNIL)
+  {
+    jam();
+    coordTrans_abort(signal, apiConnectptr);
+  }
+
   if (transP->apiConnectstate == CS_ABORTING && transP->abortState != AS_IDLE){
     jam();
     return;
@@ -7564,6 +8544,26 @@ void Dbtc::timeOutFoundLab(Signal* signa
     apiConnectptr.p->returncode = errCode;
     abort010Lab(signal);
     return;
+  case CS_COORD_TRANS_WAIT_COMMIT_REQ:
+  {
+    jam();
+    /**
+     * How long should a transaction be allowed to wait in this state...
+     *   In syncronous ndbapi, we wait for (atleast) 3 * 120s (6 minutes)
+     *   Skip the 3x and wait for 2 minutes
+     */
+    Uint32 timeout_sec = 120;
+    Uint32 timeout_val = (timeout_sec * 1000) / 10; // measured in 100ms
+    if ((ctcTimer - getApiConTimer(apiConnectptr.i)) <= timeout_val)
+    {
+      jam();
+      return;
+    }
+    apiConnectptr.p->returnsignal = RS_TCROLLBACKREP;
+    apiConnectptr.p->returncode = errCode;
+    abort010Lab(signal);
+    return;
+  }
   case CS_RECEIVING:
   case CS_REC_COMMITTING:
   case CS_START_COMMITTING:
@@ -7591,6 +8591,9 @@ void Dbtc::timeOutFoundLab(Signal* signa
     // We are simply waiting for a signal in the job buffer. Only extreme
     // conditions should get us here. We ignore it.
     /*------------------------------------------------------------------*/
+
+  case CS_COORD_TRANS_WAIT_COMMITTING: // commit ongoing
+
   case CS_PREPARE_TO_COMMIT:
   {
     jam();
@@ -9091,7 +10094,7 @@ void Dbtc::completeTransAtTakeOverDoOne(
     /*       COMMITTED TO THE APPLICATION AND CONTINUE WITH THE   */
     /*       COMPLETE PHASE.                                      */
     /*------------------------------------------------------------*/
-    sendTCKEY_FAILCONF(signal, apiConnectptr.p);
+    sendTCKEY_FAILCONF(signal, apiConnectptr);
     tcConnectptr.i = apiConnectptr.p->firstTcConnect;
     ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
     apiConnectptr.p->currentTcConnect = tcConnectptr.i;
@@ -9148,7 +10151,7 @@ void Dbtc::completeTransAtTakeOverDoOne(
     break;
   case CS_FAIL_COMPLETED:
     jam();
-    sendTCKEY_FAILCONF(signal, apiConnectptr.p);
+    sendTCKEY_FAILCONF(signal, apiConnectptr);
     
     signal->theData[0] = TcContinueB::ZCOMPLETE_TRANS_AT_TAKE_OVER;
     signal->theData[1] = (UintR)apiConnectptr.p->takeOverRec;
@@ -9198,19 +10201,19 @@ Dbtc::sendTCKEY_FAILREF(Signal* signal,
 }
 
 void 
-Dbtc::sendTCKEY_FAILCONF(Signal* signal, ApiConnectRecord * regApiPtr){
+Dbtc::sendTCKEY_FAILCONF(Signal* signal, ApiConnectRecordPtr regApiPtr){
   jam();
   TcKeyFailConf * const failConf = (TcKeyFailConf *)&signal->theData[0];
   
-  const Uint32 ref = regApiPtr->ndbapiBlockref;
-  const Uint32 marker = regApiPtr->commitAckMarker;
+  const Uint32 ref = regApiPtr.p->ndbapiBlockref;
+  const Uint32 marker = regApiPtr.p->commitAckMarker;
   const Uint32 nodeId = refToNode(ref);
   if(ref != 0)
   {
     jam();
-    failConf->apiConnectPtr = regApiPtr->ndbapiConnect | (marker != RNIL);
-    failConf->transId1 = regApiPtr->transid[0];
-    failConf->transId2 = regApiPtr->transid[1];
+    failConf->apiConnectPtr = regApiPtr.p->ndbapiConnect | (marker != RNIL);
+    failConf->transId1 = regApiPtr.p->transid[0];
+    failConf->transId2 = regApiPtr.p->transid[1];
     
     bool connectedToNode = getNodeInfo(nodeId).m_connected;
     if (likely(connectedToNode))
@@ -9221,11 +10224,28 @@ Dbtc::sendTCKEY_FAILCONF(Signal* signal,
     }
     else
     {
-      routeTCKEY_FAILREFCONF(signal, regApiPtr,
+      routeTCKEY_FAILREFCONF(signal, regApiPtr.p,
 			     GSN_TCKEY_FAILCONF, TcKeyFailConf::SignalLength);
     }
   }
-  regApiPtr->commitAckMarker = RNIL;
+
+  if (marker != RNIL)
+  {
+    jam();
+    /**
+     * This is the place were we "disown" the commit-ack-marker
+     *   and handover responsibility to ndbapi.
+     *
+     * There should be exactly one reference...
+     *   (the participant performing the actual commit)
+     */
+    CommitAckMarkerPtr tmp;
+    m_commitAckMarkerPool.getPtr(tmp, marker);
+    ndbrequire(tmp.p->apiConnectCount == 1);
+    tmp.p->apiConnectCount = 0;
+  }
+
+  regApiPtr.p->commitAckMarker = RNIL;
 }
 
 void
@@ -9574,7 +10594,7 @@ void Dbtc::toCommitHandlingLab(Signal* s
 	/*------------------------------------------------------------*/
         if (apiConnectptr.p->takeOverRec != (Uint8)Z8NIL) {
           jam();
-	  sendTCKEY_FAILCONF(signal, apiConnectptr.p);
+	  sendTCKEY_FAILCONF(signal, apiConnectptr);
 	} else {
           jam();
           apiConnectptr = sendApiCommit(signal);
@@ -9882,6 +10902,7 @@ void Dbtc::initApiConnectFail(Signal* si
     tmp.p->m_commit_ack_marker_nodes.clear();
     tmp.p->m_commit_ack_marker_nodes.set(tnodeid);
     tmp.p->apiConnectPtr = apiConnectptr.i;
+    tmp.p->apiConnectCount = 1;
 
 #if defined VM_TRACE || defined ERROR_INSERT
     {
@@ -10045,6 +11066,7 @@ void Dbtc::updateApiStateFail(Signal* si
       tmp.p->transid2      = ttransid2;
       tmp.p->apiNodeId     = refToNode(tapplRef);
       tmp.p->apiConnectPtr = apiConnectptr.i;
+      tmp.p->apiConnectCount = 1;
       tmp.p->m_commit_ack_marker_nodes.clear();
 #if defined VM_TRACE || defined ERROR_INSERT
       {
@@ -10405,7 +11427,94 @@ bool Dbtc::testFragmentDrop(Signal* sign
   is out of synch.
    
   * ######################################################################## */
-void Dbtc::execSCAN_TABREQ(Signal* signal) 
+
+
+/**
+ * Check state of transaction record to see if it's correct to start a scan
+ *   on it
+ */
+Uint32
+Dbtc::checkStateStartScan(const ApiConnectRecord * transP)
+{
+  switch(transP->apiConnectstate) {
+  case CS_CONNECTED:
+    return 0;
+
+  case CS_ABORTING:
+    jam();
+    if (transP->abortState == AS_IDLE)
+    {
+      jam();
+      /**
+       * a completed abort...
+       */
+      return 0;
+    }
+    break;
+  case CS_STARTED:
+    jam();
+    if (transP->getNoCommitNeeded())
+    {
+      jam();
+      // left over from simple/dirty read
+      return 0;
+    }
+    break;
+  default:
+    /**
+     * everything else is NOK
+     */
+    jamLine(transP->apiConnectstate);
+    break;
+  }
+
+  return ZSTATE_ERROR;
+}
+
+/**
+ * Check state of buddy record when starting a scan...
+ */
+Uint32
+Dbtc::checkBuddyStateStartScan(const ApiConnectRecord* buddyP)
+{
+  /**
+   * TODO: make "default" return an error...
+   */
+  switch(buddyP->apiConnectstate) {
+  case CS_CONNECTED:
+    jam();
+    return 0;
+  case CS_ABORTING:
+    jam();
+    if (buddyP->abortState == AS_IDLE)
+    {
+      jam();
+      /**
+       * a completed abort...
+       */
+      return 0;
+    }
+    if (buddyP->returncode)
+    {
+      jam();
+      return buddyP->returncode;
+    }
+    else
+    {
+      jam();
+      return ZSTATE_ERROR;
+    }
+  case CS_STARTED:
+    jam();
+    return 0;
+  default:
+    break;
+  }
+
+  return 0;
+}
+
+void Dbtc::execSCAN_TABREQ(Signal* signal)
 {
   jamEntry();
 
@@ -10437,9 +11546,13 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
   const Uint32 transid2 = scanTabReq->transId2;
   const Uint32 tmpXX = scanTabReq->buddyConPtr;
   const Uint32 buddyPtr = (tmpXX == 0xFFFFFFFF ? RNIL : tmpXX);
+  const Uint32 coordIndex = ScanTabReq::getDistributionKeyFlag(ri);
+  const Uint32 coordTransPtrI = (&scanTabReq->distributionKey)[coordIndex];
   Uint32 currSavePointId = 0;
-  
+
   Uint32 noOprecPerFrag = ScanTabReq::getScanBatch(ri);
+  const Uint32 coordFlag = ScanTabReq::getCoordinatedTransactionFlag(ri);
+  bool didCoordinate = false;
   Uint32 errCode;
   ScanRecordPtr scanptr;
 
@@ -10498,22 +11611,11 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
   ptrAss(apiConnectptr, apiConnectRecord);
   ApiConnectRecord * transP = apiConnectptr.p;
 
-  if (transP->apiConnectstate != CS_CONNECTED) {
+  if ((errCode = checkStateStartScan(transP)) != 0)
+  {
     jam();
-    // could be left over from TCKEYREQ rollback
-    if (transP->apiConnectstate == CS_ABORTING &&
-	transP->abortState == AS_IDLE) {
-      jam();
-    } else if(transP->apiConnectstate == CS_STARTED && 
-	      transP->firstTcConnect == RNIL){
-      jam();
-      // left over from simple/dirty read
-    } else {
-      jam();
-      jamLine(transP->apiConnectstate);
-      errCode = ZSTATE_ERROR;
-      goto SCAN_TAB_error_no_state_change;
-    }
+    jamLine(transP->apiConnectstate);
+    goto SCAN_TAB_error_no_state_change;
   }
 
   if(tabptr.i >= ctabrecFilesize)
@@ -10530,26 +11632,73 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
       (cfirstfreeScanrec == RNIL)) {
     goto SCAN_error_check;
   }
-  if (buddyPtr != RNIL) {
+
+  if (buddyPtr != RNIL)
+  {
     jam();
     ApiConnectRecordPtr buddyApiPtr;
     buddyApiPtr.i = buddyPtr;
     ptrCheckGuard(buddyApiPtr, capiConnectFilesize, apiConnectRecord);
+
     if ((transid1 == buddyApiPtr.p->transid[0]) &&
-	(transid2 == buddyApiPtr.p->transid[1])) {
+	(transid2 == buddyApiPtr.p->transid[1]))
+    {
       jam();
-      
-      if (buddyApiPtr.p->apiConnectstate == CS_ABORTING) {
-	// transaction has been aborted
+
+      if ((errCode = checkBuddyStateStartScan(buddyApiPtr.p)) != 0)
+      {
+        /**
+         * Incorrect state of buddy transaction
+         */
 	jam();
-	errCode = buddyApiPtr.p->returncode;
 	goto SCAN_TAB_error;
       }//if
       currSavePointId = buddyApiPtr.p->currSavePointId;
-      buddyApiPtr.p->currSavePointId++;
+      coordTrans_setSavePointId(buddyApiPtr, currSavePointId + 1);
+    }
+    else if ((errCode = checkStateStartScan(buddyApiPtr.p)) == 0)
+    {
+      jam();
+      Uint32 nodeId = refToNode(transP->ndbapiBlockref);
+      Uint32 version = getNodeInfo(nodeId).m_version;
+      if (ndbd_scan_start_trans(version))
+      {
+        jam();
+        /**
+         * Init transaction record
+         */
+        buddyApiPtr.p->apiConnectstate = CS_STARTED;
+        {
+          SaveSignal<TcKeyReq::StaticLength> tmp(signal);
+          TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq, signal->getDataPtrSend());
+          tcKeyReq->transId1 = transid1;
+          tcKeyReq->transId2 = transid2;
+          initApiConnectRec(signal, buddyApiPtr.p); // uses Signal object
+        }
+      }
+
+      if (coordFlag)
+      {
+        jam();
+        errCode = coordTrans_join(buddyApiPtr, coordTransPtrI, RNIL);
+        if (unlikely(errCode != 0))
+        {
+          jam();
+          goto SCAN_TAB_error;
+        }
+
+        didCoordinate = true;
+        currSavePointId = buddyApiPtr.p->currSavePointId;
+        coordTrans_setSavePointId(buddyApiPtr, currSavePointId + 1);
+      }
+    }
+    else
+    {
+      jam();
+      goto SCAN_TAB_error;
     }
   }
-  
+
   if (getNodeState().startLevel == NodeState::SL_SINGLEUSER &&
       getNodeState().getSingleUserApi() !=
       refToNode(apiConnectptr.p->ndbapiBlockref))
@@ -10677,7 +11826,15 @@ SCAN_TAB_error:
   transP->transid[1] = transid2;
  
 SCAN_TAB_error_no_state_change:
-  
+  if (didCoordinate)
+  {
+    jam();
+    ApiConnectRecordPtr buddyApiPtr;
+    buddyApiPtr.i = buddyPtr;
+    ptrCheckGuard(buddyApiPtr, capiConnectFilesize, apiConnectRecord);
+    coordTrans_unlink(buddyApiPtr);
+  }
+
   releaseSections(handle);
 
   ScanTabRef * ref = (ScanTabRef*)&signal->theData[0];
@@ -12468,6 +13625,7 @@ void Dbtc::initApiConnect(Signal* signal
     apiConnectptr.p->m_transaction_nodes.clear();
     apiConnectptr.p->singleUserMode = 0;
     apiConnectptr.p->apiCopyRecord = RNIL;
+    apiConnectptr.p->nextCoordApiConnect = RNIL;
   }//for
   apiConnectptr.i = tiacTmp - 1;
   ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
@@ -12498,6 +13656,7 @@ void Dbtc::initApiConnect(Signal* signal
       apiConnectptr.p->m_transaction_nodes.clear();
       apiConnectptr.p->singleUserMode = 0;
       apiConnectptr.p->apiCopyRecord = RNIL;
+      apiConnectptr.p->nextCoordApiConnect = RNIL;
     }//for
   apiConnectptr.i = (2 * tiacTmp) - 1;
   ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
@@ -12528,6 +13687,7 @@ void Dbtc::initApiConnect(Signal* signal
     apiConnectptr.p->m_transaction_nodes.clear();
     apiConnectptr.p->singleUserMode = 0;
     apiConnectptr.p->apiCopyRecord = RNIL;
+    apiConnectptr.p->nextCoordApiConnect = RNIL;
   }//for
   apiConnectptr.i = (3 * tiacTmp) - 1;
   ptrCheckGuard(apiConnectptr, capiConnectFilesize, apiConnectRecord);
@@ -12812,7 +13972,7 @@ void Dbtc::releaseAbortResources(Signal*
     jam();
     ptrCheckGuard(tcConnectptr, ctcConnectFilesize, tcConnectRecord);
     // Clear any markers that were set in CS_RECEIVING state
-    clearCommitAckMarker(apiConnectptr.p, tcConnectptr.p);
+    clearCommitAckMarker(apiConnectptr, tcConnectptr.p);
     rarTcConnectptr.i = tcConnectptr.p->nextTcConnect;
     releaseTcCon();
     tcConnectptr.i = rarTcConnectptr.i;
@@ -12822,6 +13982,17 @@ void Dbtc::releaseAbortResources(Signal*
   if (marker != RNIL)
   {
     jam();
+#ifdef VM_TRACE
+    {
+      /**
+       * only one reference since coordTrans_abort
+       * merges all references...
+       */
+      CommitAckMarkerPtr tmp;
+      m_commitAckMarkerPool.getPtr(tmp, marker);
+      ndbassert(tmp.p->apiConnectCount == 1);
+    }
+#endif
     m_commitAckMarkerHash.release(marker);
     apiConnectptr.p->commitAckMarker = RNIL;
   }
@@ -12909,9 +14080,9 @@ void Dbtc::releaseApiCon(Signal* signal,
   cfirstfreeApiConnect = TlocalApiConnectptr.i;
   setApiConTimer(TlocalApiConnectptr.i, 0, __LINE__);
   TlocalApiConnectptr.p->apiConnectstate = CS_DISCONNECTED;
-  ndbassert(TlocalApiConnectptr.p->m_transaction_nodes.isclear());
-  ndbassert(TlocalApiConnectptr.p->apiScanRec == RNIL);
+  TlocalApiConnectptr.p->m_transaction_nodes.clear();
   TlocalApiConnectptr.p->ndbapiBlockref = 0;
+  ndbassert(TlocalApiConnectptr.p->apiScanRec == RNIL);
 }//Dbtc::releaseApiCon()
 
 void Dbtc::releaseApiConnectFail(Signal* signal) 
@@ -13078,8 +14249,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
     CommitAckMarkerIterator iter;
     for(m_commitAckMarkerHash.first(iter); iter.curr.i != RNIL;
         m_commitAckMarkerHash.next(iter)){
-      infoEvent("CommitAckMarker: i = %d (0x%x, 0x%x)"
-                " Api: %d %x %x %x %x bucket = %d",
+      infoEvent("CommitAckMarker: i = %d (0x%.8x, 0x%.8x)"
+                " Api: %d (%x %x %x %x) bucket = %d cnt: %u",
                 iter.curr.i,
                 iter.curr.p->transid1,
                 iter.curr.p->transid2,
@@ -13088,7 +14259,8 @@ Dbtc::execDUMP_STATE_ORD(Signal* signal)
                 iter.curr.p->m_commit_ack_marker_nodes.getWord(1),
                 iter.curr.p->m_commit_ack_marker_nodes.getWord(2),
                 iter.curr.p->m_commit_ack_marker_nodes.getWord(3),
-                iter.bucket);
+                iter.bucket,
+                iter.curr.p->apiConnectCount);
     }
     return;
   }
@@ -13719,6 +14891,8 @@ Dbtc::ndbinfo_write_trans(Ndbinfo::Row &
     outstanding = transPtr.p->counter;
     break;
   case CS_PREPARE_TO_COMMIT:
+  case CS_COORD_TRANS_WAIT_COMMIT_REQ:
+  case CS_COORD_TRANS_WAIT_COMMITTING:
     break;
   case CS_START_SCAN:
     // TODO
@@ -14514,9 +15688,17 @@ void Dbtc::execTCINDXREQ(Signal* signal)
     // previous transaction.
     releaseAllSeizedIndexOperations(regApiPtr);
 
-    regApiPtr->apiConnectstate = CS_STARTED;
-    regApiPtr->transid[0] = tcIndxReq->transId1;
-    regApiPtr->transid[1] = tcIndxReq->transId2;
+    {
+      Uint32 transid1 = tcIndxReq->transId1;
+      Uint32 transid2 = tcIndxReq->transId2;
+
+      SaveSignal<TcKeyReq::StaticLength> tmp(signal);
+      TcKeyReq * tcKeyReq = CAST_PTR(TcKeyReq, signal->getDataPtrSend());
+      tcKeyReq->transId1 = transid1;
+      tcKeyReq->transId2 = transid2;
+      initApiConnectRec(signal, regApiPtr); // uses Signal object
+      regApiPtr->apiConnectstate = CS_STARTED;
+    }
   }//if
 
   if (getNodeState().startLevel == NodeState::SL_SINGLEUSER &&

=== modified file 'storage/ndb/src/ndbapi/Ndb.cpp'
--- a/storage/ndb/src/ndbapi/Ndb.cpp	2012-06-07 14:49:35 +0000
+++ b/storage/ndb/src/ndbapi/Ndb.cpp	2012-09-12 13:07:04 +0000
@@ -330,8 +330,6 @@ int
 Ndb::waitUntilReady(int timeout)
 {
   DBUG_ENTER("Ndb::waitUntilReady");
-  int secondsCounter = 0;
-  int milliCounter = 0;
 
   if (theInitState != Initialised) {
     // Ndb::init is not called
@@ -339,27 +337,53 @@ Ndb::waitUntilReady(int timeout)
     DBUG_RETURN(-1);
   }
 
-  while (theNode == 0) {
-    if (secondsCounter >= timeout)
+  Uint64 now = NdbTick_CurrentMillisecond();
+  Uint64 end = now + 1000 * (Uint64)timeout;
+  while (theNode == 0)
+  {
+    if (now > end)
     {
       theError.code = 4269;
       DBUG_RETURN(-1);
     }
     NdbSleep_MilliSleep(100);
-    milliCounter += 100;
-    if (milliCounter >= 1000) {
-      secondsCounter++;
-      milliCounter = 0;
-    }//if
+    now = NdbTick_CurrentMillisecond();
   }
 
-  if (theImpl->m_ndb_cluster_connection.wait_until_ready
-      (timeout-secondsCounter,30) < 0)
+  Uint64 remain = end - now;
+
+  int res = theImpl->m_ndb_cluster_connection.wait_until_ready(remain/1000, 0);
+  if (res < 0)
   {
+    /**
+     * No nodes alive...give up
+     */
     theError.code = 4009;
     DBUG_RETURN(-1);
   }
 
+  if (res > 0)
+  {
+    /**
+     * Some nodes alive...check if we have remaining time to wait
+     */
+    now = NdbTick_CurrentMillisecond();
+    if (now >= end)
+    {
+      /**
+       * No remaining time...but atleast one node found => return 0
+       */
+    }
+    else
+    {
+      /**
+       * Some time remaining...use it
+       */
+      remain = end - now;
+      theImpl->m_ndb_cluster_connection.wait_until_ready(0, remain/1000);
+    }
+  }
+
   DBUG_RETURN(0);
 }
 
@@ -766,6 +790,7 @@ Ndb::startTransaction(const NdbDictionar
                        (long) (trans ? trans->getTransactionId() : 0)));
     DBUG_RETURN(trans);
   }
+  theError.code = 4256; // "Must call Ndb::init() before this function"
   DBUG_RETURN(NULL);
 }
 
@@ -836,6 +861,7 @@ Ndb::startTransaction(const NdbDictionar
       DBUG_RETURN(trans);
     }
   } else {
+    theError.code = 4256; // "Must call Ndb::init() before this function"
     DBUG_RETURN(NULL);
   }//if
 }//Ndb::startTransaction()
@@ -891,8 +917,76 @@ Ndb::hupp(NdbTransaction* pBuddyTrans)
 }//Ndb::hupp()
 
 NdbTransaction*
+Ndb::joinTransaction(const char* coordinatedTransactionIdentifier)
+{
+  // decompose the coordinatedTransactionIdentifier into nodeId, TCConPtr and transId
+  Uint32 scannedNodeId;
+  Uint32 scannedInstance;
+  Uint32 scannedTCConPtr;
+  Uint64 scannedTransId;
+  DBUG_ENTER("Ndb::joinTransaction");
+  DBUG_PRINT("join", ("parameter: %s", coordinatedTransactionIdentifier));
+
+  /**
+   * Note that all TC's need to be of correct version
+   *   since tc-takeover is affected (a little)
+   */
+  if (unlikely(!ndbd_coordinated_transactions(getMinDbNodeVersion())))
+  {
+    /* Function not implemented yet */
+    theError.code = 4003;
+    DBUG_RETURN(NULL);
+  }
+  int err = scanJoinTransactionId(coordinatedTransactionIdentifier,
+                                  scannedNodeId, scannedInstance,
+                                  scannedTCConPtr, scannedTransId,
+                                  theError);
+
+  DBUG_PRINT("join", ("Scan of join_transaction_id found numbers: %x, %x, %x and %llx\n",
+                      scannedNodeId, scannedInstance, scannedTCConPtr, scannedTransId));
+  if (err != 0)
+  {
+    // did not find nodeId, instance, tcConPtr, and transId
+    // in the coordinated_transaction_id
+    DBUG_RETURN(NULL);
+  }
+  NdbTransaction *result= startTransactionLocal(0,
+                                                scannedNodeId,
+                                                scannedInstance,
+                                                scannedTransId,
+                                                scannedTCConPtr);
+
+  if (result != 0)
+  {
+    /**
+     * Verify (cause startTransactionLocal can ignore args)
+     */
+    if ((result->getConnectedNodeId() != scannedNodeId) ||
+        (refToInstance(result->m_tcRef) != scannedInstance))
+    {
+      closeTransaction(result);
+      theError.code = 4332;
+      DBUG_RETURN(NULL);
+    }
+  }
+
+  DBUG_PRINT("join", ("success"));
+  DBUG_RETURN(result);
+}//Ndb::joinTransaction()
+
+NdbTransaction*
 Ndb::startTransactionLocal(Uint32 aPriority, Uint32 nodeId, Uint32 instance)
 {
+  Uint64 tFirstTransId = allocate_transaction_id();
+  return startTransactionLocal(aPriority, nodeId, 0, tFirstTransId, RNIL);
+}//Ndb::startTransactionLocal()
+
+NdbTransaction*
+Ndb::startTransactionLocal(Uint32 aPriority,
+                           Uint32 nodeId, Uint32 instance,
+                           Uint64 tFirstTransId,
+                           Uint32 coordinatedTcConPtr)
+{
 #ifdef VM_TRACE
   char buf[255];
   const char* val = NdbEnv_GetEnv("NDB_TRANSACTION_NODE_ID", buf, 255);
@@ -902,7 +996,7 @@ Ndb::startTransactionLocal(Uint32 aPrior
 #endif
 
   DBUG_ENTER("Ndb::startTransactionLocal");
-  DBUG_PRINT("enter", ("nodeid: %d", nodeId));
+  DBUG_PRINT("join", ("Connection request to nodeid: %x; instance: %x coordTc: %x; transid: %llx", nodeId, instance, coordinatedTcConPtr, tFirstTransId));
 
   if(unlikely(theRemainingStartTransactions == 0))
   {
@@ -911,7 +1005,6 @@ Ndb::startTransactionLocal(Uint32 aPrior
   }
   
   NdbTransaction* tConnection;
-  Uint64 tFirstTransId = theFirstTransId;
   tConnection = doConnect(nodeId, instance);
   if (tConnection == NULL) {
     DBUG_RETURN(NULL);
@@ -928,21 +1021,14 @@ Ndb::startTransactionLocal(Uint32 aPrior
   tConnection->next(tConNext);   // Add the active connection object
   tConnection->setTransactionId(tFirstTransId);
   tConnection->thePriority = aPriority;
-  if ((tFirstTransId & 0xFFFFFFFF) == 0xFFFFFFFF) {
-    //---------------------------------------------------
-// Transaction id rolling round. We will start from
-// consecutive identity 0 again.
-//---------------------------------------------------
-    theFirstTransId = ((tFirstTransId >> 32) << 32);      
-  } else {
-    theFirstTransId = tFirstTransId + 1;
-  }//if
+  tConnection->m_coordinatedConnectPtr = coordinatedTcConPtr;
 #ifdef VM_TRACE
   if (tConnection->theListState != NdbTransaction::NotInList) {
     printState("startTransactionLocal %lx", (long)tConnection);
     abort();
   }
 #endif
+  DBUG_PRINT("join", ("Connection success to nodeid: %x; instance: %x; coordTc: %x; transid: %llx", nodeId, instance, coordinatedTcConPtr, tFirstTransId));
   DBUG_RETURN(tConnection);
 }//Ndb::startTransactionLocal()
 
@@ -2295,6 +2381,44 @@ Ndb::getNdbErrorDetail(const NdbError& e
   DBUG_RETURN(NULL);
 }
 
+extern void NdbUpdateError(NdbError & err);
+
+int
+Ndb::scanJoinTransactionId(const char *input,
+                           Uint32 &nodeId, Uint32 & instance,
+                           Uint32 &apiConnect, Uint64 &transid,
+                           NdbError & error)
+{
+  DBUG_ENTER("Ndb::scanJoinTransactionId");
+  DBUG_PRINT("enter", ("input: %s", input));
+
+  Uint32 transId1, transId2;
+  int res = sscanf(input, NDBAPI_COORD_TRANSACTION_ID_FORMAT,
+                   &nodeId,
+                   &instance,
+                   &apiConnect,
+                   &transId1,
+                   &transId2);
+
+  if (res != 5)
+  {
+    error.code = 4330; // malformed string
+    NdbUpdateError(error);
+    DBUG_RETURN(1);
+  }
+
+  if (nodeId == 0 || nodeId > MAX_DATA_NODE_ID)
+  {
+    error.code = 4331; // incorrect node id...
+    NdbUpdateError(error);
+    DBUG_RETURN(1);
+  }
+
+  transid = transId1 + (Uint64(transId2) << 32);
+
+  DBUG_RETURN(0); // 0 == OK
+}
+
 void
 Ndb::setCustomData(void* _customDataPtr)
 {

=== modified file 'storage/ndb/src/ndbapi/NdbApiSignal.cpp'
--- a/storage/ndb/src/ndbapi/NdbApiSignal.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbApiSignal.cpp	2012-09-12 13:07:04 +0000
@@ -36,6 +36,8 @@
 #include <signaldata/IndxAttrInfo.hpp>
 #include <signaldata/TcHbRep.hpp>
 #include <signaldata/ScanTab.hpp>
+#include <signaldata/TcCommit.hpp>
+#include <signaldata/TcRollbackRep.hpp>
 
 #include <NdbOut.hpp>
 
@@ -175,7 +177,7 @@ NdbApiSignal::setSignal(int aNdbSignalTy
       theTrace                = TestOrd::TraceAPI;
       theReceiversBlockNumber = receiversBlockNo;
       theVerId_signalNumber   = GSN_TCROLLBACKREQ;
-      theLength               = 3;
+      theLength               = TcRollbackReq::SignalLength;
     }
     break;
 
@@ -195,7 +197,7 @@ NdbApiSignal::setSignal(int aNdbSignalTy
       theTrace                = TestOrd::TraceAPI;
       theReceiversBlockNumber = receiversBlockNo;
       theVerId_signalNumber   = GSN_TC_COMMITREQ;
-      theLength               = 3;
+      theLength               = TcCommitReq::SignalLength;
     }
     break;
 

=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2011-10-22 09:38:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp	2012-09-12 13:07:04 +0000
@@ -508,29 +508,41 @@ NdbOperation::prepareSend(Uint32 aTC_Con
   
   Uint8 tDistrKeyIndicator = theDistrKeyIndicator_;
   Uint8 tScanIndicator = theScanInfo & 1;
+  Uint32 coordinatedConnectPtr = theNdbCon->m_coordinatedConnectPtr;
+  Uint32 tCoordinatedIndicator = (tStartIndicator &&
+                            (coordinatedConnectPtr != RNIL));
 
+  Uint32 orLeaveFlag = tCommitIndicator &&
+    (theNdbCon->m_flags & NdbTransaction::OR_LEAVE);
   tcKeyReq->setDistributionKeyFlag(tReqInfo, tDistrKeyIndicator);
   tcKeyReq->setScanIndFlag(tReqInfo, tScanIndicator);
 
+  tcKeyReq->setCoordinatedTransactionFlag(tReqInfo, tCoordinatedIndicator);
+  tcKeyReq->setCommitOrLeaveFlag(tReqInfo, orLeaveFlag);
+
   tcKeyReq->requestInfo  = tReqInfo;
 
 //-------------------------------------------------------------
-// The next step is to fill in the upto three conditional words.
+// The next step is to fill in up to three conditional words.
 //-------------------------------------------------------------
   Uint32* tOptionalDataPtr = &tcKeyReq->scanInfo;
   Uint32 tDistrGHIndex = tScanIndicator;
   Uint32 tDistrKeyIndex = tDistrGHIndex;
+  Uint32 tCoordinatedIndex = tScanIndicator + tDistrKeyIndex;
 
   Uint32 tScanInfo = theScanInfo;
   Uint32 tDistrKey = theDistributionKey;
 
   tOptionalDataPtr[0] = tScanInfo;
   tOptionalDataPtr[tDistrKeyIndex] = tDistrKey;
-
+  if (tCoordinatedIndicator)
+  {
+    tOptionalDataPtr[tCoordinatedIndex] = coordinatedConnectPtr;
+  }
   theTCREQ->setLength(TcKeyReq::StaticLength + 
                       tDistrKeyIndex +         // 1 for scan info present
-                      theDistrKeyIndicator_);  // 1 for distr key present
-
+                      theDistrKeyIndicator_ +  // 1 for distr key present
+                      tCoordinatedIndicator);        // 1 for coordinated con ptr present
   /* Ensure the signal objects have the correct length
    * information
    */
@@ -1429,6 +1441,26 @@ NdbOperation::prepareSendNdbRecord(Abort
   TcKeyReq::setSimpleFlag(tcKeyReq->requestInfo, theSimpleIndicator);
   TcKeyReq::setDirtyFlag(tcKeyReq->requestInfo, theDirtyIndicator);
 
+  Uint32 coordinatedConnectPtr = theNdbCon->m_coordinatedConnectPtr;
+  Uint32 coordinatedIndicator = (theStartIndicator &&
+                           (coordinatedConnectPtr != RNIL));
+  Uint32 orLeaveFlag = theCommitIndicator &&
+    (theNdbCon->m_flags & NdbTransaction::OR_LEAVE);
+
+  TcKeyReq::setCoordinatedTransactionFlag(tcKeyReq->requestInfo, coordinatedIndicator);
+  TcKeyReq::setCommitOrLeaveFlag(tcKeyReq->requestInfo, orLeaveFlag);
+  if (coordinatedIndicator)
+  {
+    /* Add the Coordinated Connect Ptr to the end of the optional
+     * data
+     */
+    Uint32 coordinatedOffset =
+      (TcKeyReq::getScanIndFlag(tcKeyReq->requestInfo) ? 1 : 0) +
+      (TcKeyReq::getDistributionKeyFlag(tcKeyReq->requestInfo) ? 1 : 0);
+
+    (&tcKeyReq->scanInfo)[coordinatedOffset] = coordinatedConnectPtr;
+    theTCREQ->setLength(TcKeyReq::StaticLength + coordinatedOffset + 1);
+  }
   TcKeyReq::setQueueOnRedoProblemFlag(tcKeyReq->requestInfo, tQueable);
   TcKeyReq::setDeferredConstraints(tcKeyReq->requestInfo, tDeferred);
 

=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2012-06-26 09:18:05 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp	2012-09-12 13:07:04 +0000
@@ -1673,6 +1673,30 @@ NdbScanOperation::getFirstATTRINFOScan()
 }
 
 int
+NdbScanOperation::setCoordinatedConnectPtr(Uint32 connectPtr)
+{
+  /* Here we set the Coordinated flag in the request info,
+   * and add the Coordinated connect ptr to the signal
+   */
+  assert(connectPtr != RNIL);
+  ScanTabReq* scanTabReq = (ScanTabReq*) theSCAN_TABREQ->getDataPtrSend();
+
+  ScanTabReq::setCoordinatedTransactionFlag(scanTabReq->requestInfo, 1);
+
+  /* Add it as word 0 or 1 of optional data, depending on whether
+   * we also have a distribution key
+   */
+  Uint32 coordinatedPtrIndex = ScanTabReq::getDistributionKeyFlag(scanTabReq->requestInfo) ?
+    1 : 0;
+  (&scanTabReq->distributionKey)[coordinatedPtrIndex] = connectPtr;
+
+  assert(theSCAN_TABREQ->getLength() == (ScanTabReq::StaticLength + coordinatedPtrIndex));
+  theSCAN_TABREQ->setLength(ScanTabReq::StaticLength + coordinatedPtrIndex + 1);
+
+  return 0;
+}
+
+int
 NdbScanOperation::executeCursor(int nodeId)
 {
   /*
@@ -1689,6 +1713,18 @@ NdbScanOperation::executeCursor(int node
     goto done;
   }
 
+  if (m_transConnection->m_coordinatedConnectPtr != RNIL)
+  {
+    /* We need to indicate that this scan is in a transaction
+     * which should be coordinated with another.
+     */
+    if (setCoordinatedConnectPtr(m_transConnection->m_coordinatedConnectPtr) != 0)
+    {
+      res = -1;
+      goto done;
+    }
+  }
+
   {
     locked = true;
     NdbTransaction * tCon = theNdbCon;
@@ -1708,7 +1744,12 @@ NdbScanOperation::executeCursor(int node
       }
       
       m_executed= true; // Mark operation as executed
-    } 
+
+      /**
+       * A scan now also modifies buddy transaction to started
+       */
+      m_transConnection->scanOpStarted();
+    }
     else
     {
       if (!(theImpl->get_node_stopping(nodeId) &&

=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp	2011-10-22 09:38:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp	2012-09-12 13:07:04 +0000
@@ -85,7 +85,9 @@ NdbTransaction::NdbTransaction( Ndb* aNd
   pendingBlobWriteBytes(0),
   m_theFirstLockHandle(NULL),
   m_theLastLockHandle(NULL),
-  m_tcRef(numberToRef(DBTC, 0))
+  m_tcRef(numberToRef(DBTC, 0)),
+  m_flags(0),
+  m_coordinatedConnectPtr(RNIL)
 {
   theListState = NotInList;
   theError.code = 0;
@@ -177,8 +179,10 @@ NdbTransaction::init()
       return -1;
     }
   }
-  return 0;
 
+  m_flags = 0;
+  m_coordinatedConnectPtr = RNIL;
+  return 0;
 }//NdbTransaction::init()
 
 /*****************************************************************************
@@ -244,6 +248,7 @@ NdbTransaction::restart(){
     theCommitStatus = Started;
     theCompletionStatus = NotCompleted;
     theTransactionIsStarted = false;
+    m_flags = 0;
     DBUG_RETURN(0);
   }
   DBUG_PRINT("error",("theCompletionStatus != CompletedSuccess"));
@@ -695,6 +700,12 @@ NdbTransaction::executeAsynchPrepare(Ndb
   DBUG_PRINT("enter", ("aTypeOfExec: %d, aCallback: 0x%lx, anyObject: Ox%lx",
 		       aTypeOfExec, (long) aCallback, (long) anyObject));
 
+  if (aTypeOfExec == NdbTransaction::CommitOrLeave)
+  {
+    m_flags |= OR_LEAVE;
+    aTypeOfExec = NdbTransaction::Commit;
+  }
+
   /**
    * Reset error.code on execute
    */
@@ -753,6 +764,8 @@ NdbTransaction::executeAsynchPrepare(Ndb
   }
 
   bool tTransactionIsStarted = theTransactionIsStarted;
+  bool tSetStartFlag = (m_flags & NdbTransaction::SCAN_STARTED) == 0;
+
   NdbOperation*	tLastOp = theLastOpInList;
   Ndb* tNdb = theNdb;
   CommitStatusType tCommitStatus = theCommitStatus;
@@ -842,10 +855,13 @@ NdbTransaction::executeAsynchPrepare(Ndb
      * on a query root lookup, and the commit indicator on a non-linked 
      * lookup.
      */
-    if (lastLookupQuery != NULL) {
-      getFirstLookupQuery(m_firstQuery)->setStartIndicator();
-    } else if (tFirstOp != NULL) {
-      tFirstOp->setStartIndicator();
+    if (tSetStartFlag)
+    {
+      if (lastLookupQuery != NULL) {
+        getFirstLookupQuery(m_firstQuery)->setStartIndicator();
+      } else if (tFirstOp != NULL) {
+        tFirstOp->setStartIndicator();
+      }
     }
 
     if (tFirstOp != NULL) {
@@ -963,6 +979,43 @@ void NdbTransaction::close()
   theNdb->closeTransaction(this);
 }
 
+/**
+ * Compute the transaction identifier that can be used to join this transaction
+ * from a different client Ndb on the same or different client node.
+ * The NdbTransaction instance is not modified by this method.
+ * @param  buffer a buffer of at least length MAX_TRANSACTION_ID_SIZE
+ * @param length the length of the buffer
+ * @return character encoded hex string representing nodeId and transaction id
+ *         null (error) if length is less than MAX_TRANSACTION_ID_SIZE
+ */
+const char*
+NdbTransaction::getCoordinatedTransactionId(char *buffer, int length) const {
+  DBUG_ENTER("NdbTransaction::getCoordinatedTransactionId");
+  if (buffer && length >= NDBAPI_MAX_COORD_TRANSACTION_ID_SIZE)
+  {
+    // compute representation of theDBnode, theMyRef, and theTransactionId
+    // separate node, ref, and transaction id
+    // put terminating null at end
+    sprintf(buffer,
+            NDBAPI_COORD_TRANSACTION_ID_FORMAT,
+            theDBnode,
+            refToInstance(m_tcRef),
+            theTCConPtr,
+            Uint32(theTransactionId),
+            Uint32(theTransactionId >> 32));
+    DBUG_PRINT("info",
+               ("theDBnode: 0x%x instance: 0x%x theTCConPtr: 0x%x, "
+                "theTransactionId: 0x%.8x 0x%.8x\n",
+    		theDBnode, refToInstance(m_tcRef),
+                theTCConPtr,
+                Uint32(theTransactionId),
+                Uint32(theTransactionId >> 32)));
+    DBUG_RETURN(buffer);
+  }
+
+  DBUG_RETURN(NULL);
+}//NdbTransaction::getCoordinatedTransactionId
+
 int NdbTransaction::refresh()
 {
   for(NdbIndexScanOperation* scan_op = m_firstExecutedScanOp;
@@ -1149,14 +1202,15 @@ NdbTransaction::sendROLLBACK()      // S
     tTransId1 = (Uint32) theTransactionId;
     tTransId2 = (Uint32) (theTransactionId >> 32);
     tSignal.setSignal(GSN_TCROLLBACKREQ, refToBlock(m_tcRef));
-    tSignal.setData(theTCConPtr, 1);
-    tSignal.setData(tTransId1, 2);
-    tSignal.setData(tTransId2, 3);
+    TcRollbackReq * req = CAST_PTR(TcRollbackReq, tSignal.getDataPtrSend());
+    req->apiConnectPtr = theTCConPtr;
+    req->transId1 = tTransId1;
+    req->transId2 = tTransId2;
+    req->flags = 0;
     if(theError.code == 4012)
     {
       g_eventLogger->error("Sending TCROLLBACKREQ with Bad flag");
-      tSignal.setLength(tSignal.getLength() + 1); // + flags
-      tSignal.setData(0x1, 4); // potentially bad data
+      req->flags |= TcRollbackReq::MAYBE_BAD; // potentially bad data
     }
     tReturnCode = impl->sendSignal(&tSignal,theDBnode);
     if (tReturnCode != -1) {
@@ -1202,10 +1256,17 @@ NdbTransaction::sendCOMMIT()    // Send
   tTransId1 = (Uint32) theTransactionId;
   tTransId2 = (Uint32) (theTransactionId >> 32);
   tSignal.setSignal(GSN_TC_COMMITREQ, refToBlock(m_tcRef));
-  tSignal.setData(theTCConPtr, 1);
-  tSignal.setData(tTransId1, 2);
-  tSignal.setData(tTransId2, 3);
-      
+  TcCommitReq * req = CAST_PTR(TcCommitReq, tSignal.getDataPtrSend());
+  req->apiConnectPtr = theTCConPtr;
+  req->transId1 = tTransId1;
+  req->transId2 = tTransId2;
+  req->flags = 0;
+
+  if (m_flags & OR_LEAVE)
+  {
+    req->flags |= TcCommitReq::COMMIT_OR_LEAVE;
+  }
+
   tReturnCode = impl->sendSignal(&tSignal,theDBnode);
   if (tReturnCode != -1) {
     theSendStatus = sendTC_COMMIT;
@@ -1441,6 +1502,7 @@ Remark:         Get an operation from Nd
 NdbOperation*
 NdbTransaction::getNdbOperation(const char* aTableName)
 {
+  theError.code = 0;
   if (theCommitStatus == Started){
     NdbTableImpl* table = theNdb->theDictionary->getTable(aTableName);
     if (table != 0){
@@ -1475,6 +1537,7 @@ NdbTransaction::getNdbOperation(const Nd
                                 NdbOperation* aNextOp,
                                 bool useRec)
 { 
+  theError.code = 0;
   NdbOperation* tOp;
 
   if (theScanningOp != NULL || m_scanningQuery != NULL){
@@ -1521,6 +1584,7 @@ NdbTransaction::getNdbOperation(const Nd
 
 NdbOperation* NdbTransaction::getNdbOperation(const NdbDictionary::Table * table)
 {
+  theError.code = 0;
   if (table)
     return getNdbOperation(& NdbTableImpl::getImpl(*table));
   else
@@ -1542,6 +1606,7 @@ Remark:         Get an operation from Nd
 NdbScanOperation*
 NdbTransaction::getNdbScanOperation(const char* aTableName)
 {
+  theError.code = 0;
   if (theCommitStatus == Started){
     NdbTableImpl* tab = theNdb->theDictionary->getTable(aTableName);
     if (tab != 0){
@@ -1571,6 +1636,7 @@ NdbIndexScanOperation*
 NdbTransaction::getNdbIndexScanOperation(const char* anIndexName, 
 					const char* aTableName)
 {
+  theError.code = 0;
   NdbIndexImpl* index = 
     theNdb->theDictionary->getIndex(anIndexName, aTableName);
   if (index == 0)
@@ -1592,6 +1658,7 @@ NdbIndexScanOperation*
 NdbTransaction::getNdbIndexScanOperation(const NdbIndexImpl* index,
 					const NdbTableImpl* table)
 {
+  theError.code = 0;
   if (theCommitStatus == Started){
     const NdbTableImpl * indexTable = index->getIndexTable();
     if (indexTable != 0){
@@ -1616,6 +1683,7 @@ NdbTransaction::getNdbIndexScanOperation
 NdbIndexScanOperation* 
 NdbTransaction::getNdbIndexScanOperation(const NdbDictionary::Index * index)
 { 
+  theError.code = 0;
   if (index)
   {
     /* This fetches the underlying table being indexed. */
@@ -1636,6 +1704,7 @@ NdbIndexScanOperation*
 NdbTransaction::getNdbIndexScanOperation(const NdbDictionary::Index * index,
 					const NdbDictionary::Table * table)
 {
+  theError.code = 0;
   if (index && table)
     return getNdbIndexScanOperation(& NdbIndexImpl::getImpl(*index),
 				    & NdbTableImpl::getImpl(*table));
@@ -1656,6 +1725,7 @@ Remark:         Get an operation from Nd
 NdbIndexScanOperation*
 NdbTransaction::getNdbScanOperation(const NdbTableImpl * tab)
 { 
+  theError.code = 0;
   NdbIndexScanOperation* tOp;
   
   tOp = theNdb->getScanOperation();
@@ -1705,6 +1775,7 @@ NdbTransaction::define_scan_op(NdbIndexS
 NdbScanOperation* 
 NdbTransaction::getNdbScanOperation(const NdbDictionary::Table * table)
 {
+  theError.code = 0;
   if (table)
     return getNdbScanOperation(& NdbTableImpl::getImpl(*table));
   else
@@ -1729,6 +1800,7 @@ NdbIndexOperation*
 NdbTransaction::getNdbIndexOperation(const char* anIndexName, 
                                     const char* aTableName)
 {
+  theError.code = 0;
   if (theCommitStatus == Started) {
     NdbTableImpl * table = theNdb->theDictionary->getTable(aTableName);
     NdbIndexImpl * index;
@@ -1784,6 +1856,7 @@ NdbTransaction::getNdbIndexOperation(con
                                      NdbOperation* aNextOp,
                                      bool useRec)
 { 
+  theError.code = 0;
   NdbIndexOperation* tOp;
   
   tOp = theNdb->getIndexOperation();
@@ -1826,6 +1899,7 @@ NdbTransaction::getNdbIndexOperation(con
 NdbIndexOperation* 
 NdbTransaction::getNdbIndexOperation(const NdbDictionary::Index * index)
 { 
+  theError.code = 0;
   if (index)
   {
     const NdbDictionary::Table *table=
@@ -1845,6 +1919,7 @@ NdbIndexOperation*
 NdbTransaction::getNdbIndexOperation(const NdbDictionary::Index * index,
 				    const NdbDictionary::Table * table)
 {
+  theError.code = 0;
   if (index && table)
     return getNdbIndexOperation(& NdbIndexImpl::getImpl(*index),
 				& NdbTableImpl::getImpl(*table));
@@ -3239,3 +3314,18 @@ NdbTransaction::releaseLockHandle(const
 
   return 0;
 }
+
+void
+NdbTransaction::scanOpStarted()
+{
+  /**
+   * just inform that StartFlag does not need to be set in TCKEYREQ
+   *   for nodes that has this behaviour :-)
+   */
+  Uint32 nodeId = refToNode(m_tcRef);
+  Uint32 version = theNdb->theImpl->getNodeInfo(nodeId).m_info.m_version;
+  if (ndbd_scan_start_trans(version))
+  {
+    m_flags |= NdbTransaction::SCAN_STARTED;
+  }
+}

=== modified file 'storage/ndb/src/ndbapi/Ndberr.cpp'
--- a/storage/ndb/src/ndbapi/Ndberr.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/ndbapi/Ndberr.cpp	2012-09-12 13:07:04 +0000
@@ -25,6 +25,12 @@ update(const NdbError & _err){
   error = NdbError(ndberror);
 }
 
+void
+NdbUpdateError(NdbError & err)
+{
+  update(err);
+}
+
 const 
 NdbError & 
 Ndb::getNdbError(int code){

=== modified file 'storage/ndb/src/ndbapi/ndberror.c'
--- a/storage/ndb/src/ndbapi/ndberror.c	2012-08-24 12:00:05 +0000
+++ b/storage/ndb/src/ndbapi/ndberror.c	2012-09-12 13:07:04 +0000
@@ -24,6 +24,7 @@
 
 #include "../mgmsrv/ndb_mgmd_error.h"
 
+#include <ndbapi_limits.h>
 #include "NdbQueryBuilderImpl.hpp"
 
 typedef struct ErrorBundle {
@@ -352,6 +353,11 @@ ErrorBundle ErrorCodes[] = {
   { 324,  DMEC, AE, "Invalid node(s) specified for new nodegroup, no node in nodegroup is started" },
   { 417,  DMEC, AE, "Bad operation reference - double unlock" },
 
+  { 257,  DMEC, AE, "Unable to join coordinated transaction" },
+  { 258,  DMEC, AE, "Unable to join coordinated transaction" },
+  { 259,  DMEC, AE, "Unable to join coordinated transaction" },
+  { 260,  DMEC, AE, "Coordinated transaction already aborted" },
+
   /** 
    * Scan application errors
    */
@@ -789,6 +795,14 @@ ErrorBundle ErrorCodes[] = {
   { 2815, DMEC, TR, "Error in reading files, please check file system" },
   {  920, DMEC, AE, "Row operation defined after refreshTuple()" },
 
+  { 4330, 1210, AE,
+    "Malformed coordinated transaction identifier "
+    "(correct is \"" NDBAPI_COORD_TRANSACTION_ID_FORMAT_PRINT "\")" },
+  { 4331, 1210, AE,
+    "Malformed coordinated transaction identifier (node id out of range)" },
+  { 4332, DMEC, TR,
+    "Failed to join transaction. Unable to get connection to node/instance" },
+
   /**
    * NdbQueryBuilder API errors
    */

=== modified file 'storage/ndb/test/ndbapi/Makefile.am'
--- a/storage/ndb/test/ndbapi/Makefile.am	2012-08-13 13:03:29 +0000
+++ b/storage/ndb/test/ndbapi/Makefile.am	2012-09-12 13:07:04 +0000
@@ -68,7 +68,8 @@ testReconnect \
 testNdbinfo \
 NdbRepStress \
 msa \
-testRedo
+testRedo \
+testCoordTrans
 
 EXTRA_PROGRAMS = \
  test_event \
@@ -151,6 +152,7 @@ testIndexStat_SOURCES = testIndexStat.cp
 reorg_tab_SOURCES = reorg_tab.cpp
 msa_SOURCES = msa.cpp
 testRedo_SOURCES = testRedo.cpp
+testCoordTrans_SOURCES = testCoordTrans.cpp
 
 ndbapi_50compat0_CPPFLAGS = -DNDBAPI_50_COMPAT
 ndbapi_50compat0_SOURCES = ndbapi_50compat0.cpp

=== modified file 'storage/ndb/test/ndbapi/testBasic.cpp'
--- a/storage/ndb/test/ndbapi/testBasic.cpp	2012-06-25 12:55:27 +0000
+++ b/storage/ndb/test/ndbapi/testBasic.cpp	2012-09-12 13:07:04 +0000
@@ -3277,6 +3277,71 @@ runRefreshLocking(NDBT_Context* ctx, NDB
   return NDBT_OK;
 }
 
+int
+runBugXXX(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb* pNdb = GETNDB(step);
+  const NdbDictionary::Table * pTab = ctx->getTab();
+  NdbRestarter res;
+
+  int nodes = res.getNumDbNodes();
+  if (nodes < 2)
+  {
+    return NDBT_OK;
+  }
+
+  HugoOperations hugoOps(*pTab);
+  hugoOps.startTransaction(pNdb);
+
+  int row_no = 1;
+  int row_count = 100;
+  /**
+   * insert
+   */
+  ndbout_c("insert+commit");
+  hugoOps.pkInsertRecord(pNdb, row_no, row_count);
+  hugoOps.execute_Commit(pNdb);
+  hugoOps.closeTransaction(pNdb);
+
+  /**
+   * this should fail...
+   */
+  ndbout_c("failed-insert (AO_IgnoreError) + nocommit");
+  hugoOps.startTransaction(pNdb);
+  hugoOps.pkInsertRecord(pNdb, row_no, row_count);
+  hugoOps.execute_NoCommit(pNdb, AO_IgnoreError);
+
+  /**
+   * perform CommittedRead (execute-bit set)
+   */
+  ndbout_c("CommittedRead");
+  hugoOps.pkReadRecord(pNdb, row_no, row_count, NdbOperation::LM_CommittedRead);
+  hugoOps.execute_NoCommit(pNdb);
+
+  /**
+   * Kill other node...will cause our transaction to be aborted
+   *   but we're not expecting a reply
+   */
+  ndbout_c("kill non-tc node");
+  int tcNodeId = hugoOps.getTransaction()->getConnectedNodeId();
+  int nodeId = tcNodeId;
+  while (nodeId == tcNodeId)
+  {
+    nodeId = res.getDbNodeId(rand() % nodes);
+  }
+
+  res.restartOneDbNode(nodeId, false, true, true);
+  res.waitNodesNoStart(&nodeId, 1);
+  res.startNodes(&nodeId, 1);
+  res.waitNodesStarted(&nodeId, 1);
+
+  if (hugoOps.execute_Commit(pNdb) == 0)
+  {
+    return NDBT_FAILED;
+  }
+
+  return NDBT_OK;
+}
 
 NDBT_TESTSUITE(testBasic);
 TESTCASE("PkInsert", 
@@ -3651,6 +3716,10 @@ TESTCASE("RefreshLocking",
 {
   INITIALIZER(runRefreshLocking);
 }
+TESTCASE("BugXXX", "")
+{
+  INITIALIZER(runBugXXX);
+}
 NDBT_TESTSUITE_END(testBasic);
 
 #if 0

=== modified file 'storage/ndb/test/ndbapi/testNdbApi.cpp'
--- a/storage/ndb/test/ndbapi/testNdbApi.cpp	2012-03-14 07:26:13 +0000
+++ b/storage/ndb/test/ndbapi/testNdbApi.cpp	2012-09-12 13:07:04 +0000
@@ -601,6 +601,105 @@ int runTestWaitUntilReady(NDBT_Context*
   return NDBT_OK;
 }
 
+static
+int
+testwaituntilready(NDBT_Context * ctx, int expect)
+{
+  const int waitArg[] = { 0, 1, 5, 30 };
+
+  int result = NDBT_OK;
+  const int slack = 5; // seconds
+
+  for (unsigned i = 0; i < NDB_ARRAY_SIZE(waitArg); i++)
+  {
+    Ndb pNdb(&ctx->m_cluster_connection, "TEST_DB");
+    pNdb.init();
+    printf("waitUntilReady(%d)...", waitArg[i]); fflush(stdout);
+    Uint64 time = NdbTick_CurrentMillisecond();
+    int res = pNdb.waitUntilReady(waitArg[i]);
+    Uint64 elapsed = NdbTick_CurrentMillisecond() - time;
+    int waitTime = (int)(elapsed / 1000); // sec is fine
+
+    bool ok;
+    if (res == 0)
+    {
+      /**
+       * ok => ok to wait up until waitTime (modulo slack)
+       */
+      ok = waitTime <= waitArg[i] + slack;
+    }
+    else
+    {
+      /**
+       * nok => must wait waitArg[i] +/- slack
+       */
+      ok =
+        (waitTime + slack >= waitArg[i]) &&
+        (waitTime <= waitArg[i] + slack);
+    }
+    if (res != expect)
+      ok = false;
+
+    ndbout_c(" => %d (expect: %d) time: %ds : %s",
+             res,
+             expect,
+             waitTime,
+             ok ? "OK" : "NOK");
+
+    if (!ok)
+    {
+      result = NDBT_FAILED;
+    }
+  }
+
+  return result;
+}
+
+int
+runTestWaitUntilReady2(NDBT_Context* ctx, NDBT_Step* step)
+{
+  int result;
+  ndbout_c("testing with cluster up");
+  if ((result = testwaituntilready(ctx, 0)) != NDBT_OK)
+    return result;
+
+  ndbout_c("testing with cluster down");
+  printf("shutting down cluster..."); fflush(stdout);
+  NdbRestarter res;
+  res.restartAll(false, true, true);
+  res.waitClusterNoStart();
+  ndbout_c("done");
+
+  if ((result = testwaituntilready(ctx, -1)) != NDBT_OK)
+    return result;
+
+  printf("starting cluster again..."); fflush(stdout);
+  res.startAll();
+  res.waitClusterStarted();
+  ndbout_c("done");
+
+  if (res.getNumDbNodes() < 2)
+  {
+    return NDBT_OK;
+  }
+
+  ndbout_c("testing with one node down");
+  int nodeId = res.getDbNodeId(0);
+  printf("shutting down node %d...", nodeId); fflush(stdout);
+  res.restartOneDbNode(nodeId, false, true, true);
+  res.waitNodesNoStart(&nodeId, 1);
+  ndbout_c("done");
+
+  result = testwaituntilready(ctx, 0);
+
+  printf("starting node again..."); fflush(stdout);
+  res.startAll();
+  res.waitClusterStarted();
+  ndbout_c("done");
+
+  return NDBT_OK;
+}
+
 int runGetNdbOperationNoTab(NDBT_Context* ctx, NDBT_Step* step){
 
   Ndb* pNdb = new Ndb(&ctx->m_cluster_connection, "TEST_DB");
@@ -5641,6 +5740,11 @@ TESTCASE("WaitUntilReady",
 	"without an init'ed Ndb\n"){ 
   INITIALIZER(runTestWaitUntilReady);
 }
+TESTCASE("WaitUntilReady2",
+         "Check timeout handling with Ndb::waitUntilReady")
+{
+  INITIALIZER(runTestWaitUntilReady2);
+}
 TESTCASE("GetOperationNoTab", 
 	"Call getNdbOperation on a table that does not exist\n"){ 
   INITIALIZER(runGetNdbOperationNoTab);

=== modified file 'storage/ndb/test/ndbapi/testScan.cpp'
--- a/storage/ndb/test/ndbapi/testScan.cpp	2012-08-24 12:00:05 +0000
+++ b/storage/ndb/test/ndbapi/testScan.cpp	2012-09-12 13:07:04 +0000
@@ -1776,6 +1776,46 @@ runExtraNextResult(NDBT_Context* ctx, ND
   return result;
 }
 
+int
+runBugXXU(NDBT_Context* ctx, NDBT_Step* step)
+{
+  Ndb * pNdb = GETNDB(step);
+  HugoOperations hugoOps(*ctx->getTab());
+
+  unsigned row_no = 1;
+  unsigned row_count = 1;
+
+  NdbRestarter res;
+  hugoOps.startTransaction(pNdb);
+  hugoOps.pkReadRecord(pNdb, row_no, row_count, NdbOperation::LM_CommittedRead);
+  hugoOps.execute_NoCommit(pNdb);
+
+  NdbTransaction * pTrans = hugoOps.getTransaction();
+  int nodeId = (int)pTrans->getConnectedNodeId();
+
+  res.restartOneDbNode(nodeId, false, true, true);
+  res.waitNodesNoStart(&nodeId, 1);
+  res.startNodes(&nodeId, 1);
+
+  NdbScanOperation * pOp = pTrans->getNdbScanOperation(ctx->getTab());
+  if (pOp != 0)
+  {
+    ndbout_c("%u", __LINE__);
+    return NDBT_FAILED;
+  }
+
+  if (pTrans->getNdbError().status != NdbError::TemporaryError)
+  {
+    ndbout_c("%u", __LINE__);
+    ERR(pTrans->getNdbError());
+    return NDBT_FAILED;
+  }
+
+  res.waitClusterStarted();
+
+  return NDBT_OK;
+}
+
 NDBT_TESTSUITE(testScan);
 TESTCASE("ScanRead", 
 	 "Verify scan requirement: It should be possible "\
@@ -2364,6 +2404,10 @@ TESTCASE("extraNextResultBug11748194",
 {
   INITIALIZER(runExtraNextResult);
 }
+TESTCASE("BugXXU", "")
+{
+  INITIALIZER(runBugXXU);
+}
 NDBT_TESTSUITE_END(testScan);
 
 int main(int argc, const char** argv){

=== modified file 'storage/ndb/test/run-test/daily-basic-tests.txt'
--- a/storage/ndb/test/run-test/daily-basic-tests.txt	2012-08-22 10:49:42 +0000
+++ b/storage/ndb/test/run-test/daily-basic-tests.txt	2012-09-12 13:07:04 +0000
@@ -246,6 +246,10 @@ cmd: testBasic
 args: -n RollbackNothing T1 T6 D1 D2 
 
 max-time: 500
+cmd: testBasic
+args: -n BugXXX T1
+
+max-time: 500
 cmd: testBasicAsynch
 args: -n PkInsertAsynch 
 
@@ -621,6 +625,10 @@ args: -n Bug36124 T1
 
 max-time: 500
 cmd: testScan
+args: -n BugXXU T1
+
+max-time: 500
+cmd: testScan
 args: -n Bug54945 T1
 
 max-time: 600
@@ -872,6 +880,10 @@ args: -n WaitUntilReady T1 T6 T13
 
 max-time: 500
 cmd: testNdbApi
+args: -n WaitUntilReady2 T1
+
+max-time: 500
+cmd: testNdbApi
 args: -n GetOperationNoTab T6 
 
 max-time: 500
@@ -1883,3 +1895,23 @@ max-time: 600
 cmd: testRedo
 args: -n RestartFD -l 2 T1
 
+max-time: 600
+cmd: testCoordTrans
+args: -n Basic T1
+
+max-time: 600
+cmd: testCoordTrans
+args: -n Multi T1
+
+max-time: 600
+cmd: testCoordTrans
+args: -n BasicAndDisconnect T1
+
+max-time: 600
+cmd: testCoordTrans
+args: -n BasicAndNF T1
+
+max-time: 600
+cmd: testCoordTrans
+args: -n MultiNF T1
+

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-coord branch (jonas.oreland:4961 to 4962) Jonas Oreland12 Sep