List:Commits« Previous MessageNext Message »
From:Mikael Ronstrom Date:January 20 2012 4:04pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3710 to 3717)
View as plain text  
 3717 Mikael Ronstrom	2012-01-20
      Sixth step of merge

    modified:
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/kernel/vm/mt.cpp
 3716 Mikael Ronstrom	2012-01-20
      Fifth step of merge

    removed:
      storage/ndb/include/util/Checksum.hpp
    modified:
      storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt_thr_config.cpp
 3715 Mikael Ronstrom	2012-01-20
      Fourth step of merge

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/Packer.cpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp
      storage/ndb/src/common/transporter/perftest/perfTransporterTest.cpp
      storage/ndb/src/common/transporter/priotest/prioTransporterTest.cpp
      storage/ndb/src/kernel/ndbd.cpp
      storage/ndb/src/kernel/vm/Configuration.cpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
 3714 Mikael Ronstrom	2012-01-20
      Third merge step

    modified:
      storage/ndb/include/portlib/ndb_socket_poller.h
 3713 Mikael Ronstrom	2012-01-19
      Second step of merge

    modified:
      storage/ndb/include/transporter/TransporterCallback.hpp
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/include/util/Checksum.hpp
      storage/ndb/src/common/transporter/Packer.cpp
      storage/ndb/src/common/transporter/Transporter.cpp
      storage/ndb/src/common/transporter/Transporter.hpp
 3712 Mikael Ronstrom	2012-01-19 [merge]
      First step of merge

    added:
      mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc
    modified:
      VERSION
      mysql-test/mysql-test-run.pl
      mysql-test/suite/ndb/r/ndb_condition_pushdown.result
      mysql-test/suite/ndb/t/ndb_condition_pushdown.test
      mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result
      mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test
      mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
      sql/ha_ndbcluster_cond.cc
      sql/ndb_share.h
      sql/ndb_thd.cc
      storage/ndb/VERSION
      storage/ndb/include/debugger/SignalLoggerManager.hpp
      storage/ndb/include/kernel/ndb_limits.h
      storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp
      storage/ndb/include/ndb_types.h.in
      storage/ndb/include/ndbapi/Ndb.hpp
      storage/ndb/include/ndbapi/NdbDictionary.hpp
      storage/ndb/include/ndbapi/NdbOperation.hpp
      storage/ndb/include/ndbapi/NdbScanOperation.hpp
      storage/ndb/include/transporter/TransporterCallback.hpp
      storage/ndb/include/transporter/TransporterDefinitions.hpp
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/memcache/src/schedulers/S_sched.cc
      storage/ndb/memcache/unit/test_workqueue.c
      storage/ndb/src/common/debugger/EventLogger.cpp
      storage/ndb/src/common/debugger/SignalLoggerManager.cpp
      storage/ndb/src/common/transporter/Packer.cpp
      storage/ndb/src/common/transporter/TCP_Transporter.cpp
      storage/ndb/src/common/transporter/TCP_Transporter.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
      storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/error/ErrorReporter.cpp
      storage/ndb/src/kernel/ndbd.cpp
      storage/ndb/src/kernel/vm/Configuration.cpp
      storage/ndb/src/kernel/vm/Emulator.cpp
      storage/ndb/src/kernel/vm/Emulator.hpp
      storage/ndb/src/kernel/vm/SectionReader.cpp
      storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/kernel/vm/mt.hpp
      storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/TransporterFacade.hpp
 3711 Mikael Ronstrom	2012-01-19 [merge]
      merge

    added:
      mysql-test/suite/ndb/r/ndb_bug13563280.result
      mysql-test/suite/ndb/t/ndb_bug13563280.test
    modified:
      sql/ha_ndbcluster_binlog.cc
      storage/ndb/include/kernel/GlobalSignalNumbers.h
      storage/ndb/include/kernel/NodeInfo.hpp
      storage/ndb/include/ndb_global.h
      storage/ndb/include/ndb_types.h.in
      storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/blocks/trpman.hpp
      storage/ndb/src/kernel/vm/ArrayPool.hpp
      storage/ndb/src/kernel/vm/Configuration.cpp
      storage/ndb/src/kernel/vm/GlobalData.hpp
      storage/ndb/src/kernel/vm/SimulatedBlock.cpp
      storage/ndb/src/kernel/vm/TransporterCallback.cpp
      storage/ndb/src/kernel/vm/mt.cpp
      storage/ndb/src/mgmsrv/ConfigInfo.cpp
      storage/ndb/src/ndbapi/NdbDictionary.cpp
      storage/ndb/test/ndbapi/testNodeRestart.cpp
 3710 Mikael Ronstrom	2012-01-18
      Increase max number of attributes in flexAsynch

    modified:
      storage/ndb/test/ndbapi/flexAsynch.cpp
=== modified file 'VERSION'
--- a/VERSION	revid:mikael.ronstrom@stripped
+++ b/VERSION	revid:mikael.ronstrom@stripped
@@ -1,4 +1,4 @@
 MYSQL_VERSION_MAJOR=5
 MYSQL_VERSION_MINOR=5
 MYSQL_VERSION_PATCH=19
-MYSQL_VERSION_EXTRA=-ndb-7.2.4
+MYSQL_VERSION_EXTRA=-ndb-7.2.5

=== modified file 'mysql-test/mysql-test-run.pl'
--- a/mysql-test/mysql-test-run.pl	revid:mikael.ronstrom@stripped
+++ b/mysql-test/mysql-test-run.pl	revid:mikael.ronstrom@stripped
@@ -3055,13 +3055,17 @@ sub memcached_start {
   
   my $found_perl_source = my_find_file($basedir, 
      ["storage/ndb/memcache",        # source
-      "mysql-test/lib"],             # install 
+      "mysql-test/lib",              # install
+      "share/mysql-test/lib"],       # install 
       "memcached_path.pl", NOT_REQUIRED);
   
   my $found_so = my_find_file($bindir,
     ["storage/ndb/memcache/",       # source or build
-     "lib"],                        # install
+     "lib", "lib64"],               # install
     "ndb_engine.so", NOT_REQUIRED); 
+
+  mtr_verbose("Found memcache script: $found_perl_source");
+  mtr_verbose("Found memcache plugin: $found_so");
      
   my $mgm_host;
   my $mgm_port;

=== added file 'mysql-test/suite/ndb/r/ndb_bug13563280.result'
--- a/mysql-test/suite/ndb/r/ndb_bug13563280.result	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/r/ndb_bug13563280.result	revid:mikael.ronstrom@stripped
@@ -0,0 +1,13 @@
+create table t1 (a int, b text) engine=ndb;
+insert into t1 values (1,'xxx'),(2,'yyy'),(3,'zzz');
+select * from t1 order by a;
+a	b
+1	xxx
+2	yyy
+3	zzz
+select * from t1 order by a;
+a	b
+1	xxx
+2	yyy
+3	zzz
+drop table t1;

=== modified file 'mysql-test/suite/ndb/r/ndb_condition_pushdown.result'
--- a/mysql-test/suite/ndb/r/ndb_condition_pushdown.result	revid:mikael.ronstrom@stripped
+++ b/mysql-test/suite/ndb/r/ndb_condition_pushdown.result	revid:mikael.ronstrom@stripped
@@ -2393,5 +2393,16 @@ select b from mytab where a like -1 havi
 b
 1
 drop table mytab;
+create table t(a bigint unsigned not null primary key auto_increment, b varchar(100)) character set utf8 engine ndb;
+insert into t (b) values('abc'),('aaa'),('bbb'),('ccc');
+select * from t where b like 'a%';
+a	b
+1	abc
+2	aaa
+select * from t where b not like 'a%';
+a	b
+3	bbb
+4	ccc
+drop table t;
 set @@session.optimizer_switch = @old_ecpd;
 DROP TABLE t1,t2,t3,t4,t5;

=== added file 'mysql-test/suite/ndb/t/ndb_bug13563280.test'
--- a/mysql-test/suite/ndb/t/ndb_bug13563280.test	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb/t/ndb_bug13563280.test	revid:mikael.ronstrom@stripped
@@ -0,0 +1,15 @@
+--source include/have_ndb.inc
+--source include/have_log_bin.inc
+
+--disable_warnings
+create table t1 (a int, b text) engine=ndb;
+--enable_warnings
+
+insert into t1 values (1,'xxx'),(2,'yyy'),(3,'zzz');
+select * from t1 order by a;
+
+let $mysqld_name=mysqld.1.1;
+--source include/restart_mysqld.inc
+
+select * from t1 order by a;
+drop table t1;

=== modified file 'mysql-test/suite/ndb/t/ndb_condition_pushdown.test'
--- a/mysql-test/suite/ndb/t/ndb_condition_pushdown.test	revid:mikael.ronstrom@stripped
+++ b/mysql-test/suite/ndb/t/ndb_condition_pushdown.test	revid:mikael.ronstrom@stripped
@@ -2414,6 +2414,16 @@ select b from mytab where a like -1 havi
 
 drop table mytab;
 
+# Bug #13579318 LIKE SEARCH DOESN'T MATCH ANY ROWS ON A MULTI BYTE CHARSET COLUMN
+create table t(a bigint unsigned not null primary key auto_increment, b varchar(100)) character set utf8 engine ndb;
+
+insert into t (b) values('abc'),('aaa'),('bbb'),('ccc');
+--sorted_result
+select * from t where b like 'a%';
+--sorted_result
+select * from t where b not like 'a%';
+drop table t;
+
 set @@session.optimizer_switch = @old_ecpd;
 DROP TABLE t1,t2,t3,t4,t5;
 

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result	revid:mikael.ronstrom@stripped
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_conflict.result	revid:mikael.ronstrom@stripped
@@ -229,7 +229,51 @@ select server_id, master_server_id, coun
 server_id	master_server_id	count	a
 2	1	1	3
 drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+Test that online table distribution sets up conflict functions and exceptions tables
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+create table t2diffex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t3oneex$EX (
+server_id int unsigned,
+master_server_id int unsigned,
+master_epoch bigint unsigned,
+count int unsigned,
+a int not null,
+primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+show variables like 'server_id';
+Variable_name	Value
+server_id	1
+MySQLD error output for server 1.1 matching pattern %NDB Slave%
+relevant
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$old on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+show variables like 'server_id';
+Variable_name	Value
+server_id	3
+MySQLD error output for server 2.1 matching pattern %NDB Slave%
+relevant
+[warning] ndb slave: table test.t3oneex : no extra row author bits in table.
+[note] ndb slave: table test.t3oneex : cft_ndb_epoch[_trans], low epoch resolution
+[note] ndb slave: table test.t2diffex using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2diffex logging exceptions to test.t2diffex$ex
+[note] ndb slave: table test.t1allsame using conflict_fn ndb$max on attribute x.
+[note] ndb slave: table test.t2_max using conflict_fn ndb$max on attribute x.
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
 "Cleanup"
 drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
 include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result'
--- a/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	revid:mikael.ronstrom@stripped
+++ b/mysql-test/suite/ndb_rpl/r/ndb_rpl_rep_error.result	revid:mikael.ronstrom@stripped
@@ -15,7 +15,7 @@ delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
 create table t1 (a int key, X int) engine ndb;
 Warnings:
-Warning	1626	Error in parsing conflict function. Message: column 'X' has wrong datatype
+Warning	1626	Error in parsing conflict function. Message: Column 'X' has wrong datatype
 drop table t1;
 delete from mysql.ndb_replication;
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test	revid:mikael.ronstrom@stripped
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict.test	revid:mikael.ronstrom@stripped
@@ -2,7 +2,7 @@
 # Test engine native conflict resolution for ndb
 #
 #
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
 --source include/have_binlog_format_mixed_or_row.inc
 --source suite/ndb_rpl/ndb_master-slave.inc
 
@@ -307,14 +307,77 @@ select server_id, master_server_id, coun
 
 --connection master
 drop table t2_max, t2_max$EX;
+drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
+delete from mysql.ndb_replication;
+
+--echo Test that online table distribution sets up conflict functions and exceptions tables
+
+# t1allsame - Same on all servers, no exceptions table
+insert into mysql.ndb_replication values ("test", "t1allsame", 0, 7, "NDB$MAX(X)");
+
+# t2diffex - Different on each server with an exceptions table
+# Not a recommended configuration!
+insert into mysql.ndb_replication values ("test", "t2diffex", 1, 7, "NDB$OLD(X)");
+insert into mysql.ndb_replication values ("test", "t2diffex", 3, 7, "NDB$MAX(X)");
+
+# t3oneex - Only on one server with an exceptions table
+# Note that it's not defined on the server where it's created (not recommended)
+# so on the server where it's defined, we get an error due to having no extra
+# author bits
+#
+insert into mysql.ndb_replication values ("test", "t3oneex", 3, 7, "NDB$EPOCH()");
+
+# Create exception tables
+create table t2diffex$EX (
+   server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   a int not null,
+   primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
+
+create table t3oneex$EX (
+   server_id int unsigned,
+   master_server_id int unsigned,
+   master_epoch bigint unsigned,
+   count int unsigned,
+   a int not null,
+   primary key(server_id, master_server_id, master_epoch, count)) engine ndb;
 
+# Now create actual tables on this server (id 1)
+create table t1allsame(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t2diffex(a int primary key, b varchar(200), X int unsigned) engine=ndb;
+create table t3oneex(a int primary key, b varchar(200)) engine=ndb;
+
+# Now examine server logs to see that conflict detection and
+# exceptions tables were setup as expected
+show variables like 'server_id';
+--let $server_num=1.1
+--let $pattern=%NDB Slave%
+--let $limit=4
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+
+--connection server2
+--disable_query_log
+call mtr.add_suppression("NDB Slave: .* No extra row author bits in table.*");
+--enable_query_log
+
+show variables like 'server_id';
+--let $server_num=2.1
+--let $pattern=%NDB Slave%
+--let $limit=6
+
+--source suite/ndb_rpl/t/show_mysqld_warnings.inc
+
+--connection master
+drop table t3oneex, t2diffex, t1allsame, t3oneex$EX, t2diffex$EX;
 
 ###############
 --echo "Cleanup"
 
 --connection master
 drop table mysql.ndb_replication;
-drop table t1_old, `t1_old$EX`, t1_max, `t1_max$EX`, t1_max_delete_win, `t1_max_delete_win$EX`;
 --sync_slave_with_master
 
 --source include/rpl_end.inc

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test	revid:mikael.ronstrom@stripped
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.test	revid:mikael.ronstrom@stripped
@@ -231,6 +231,11 @@ insert into mysql.ndb_replication values
   ("test", "t3", 0, 7, "NDB\$EPOCH(32)"),
   ("test", "t4", 0, 7, "NDB\$EPOCH(-1)");
 
+--disable_query_log
+# Only need suppress here, as table creation fails due to this.
+call mtr.add_suppression("NDB Slave: .* Too many extra Gci bits at .*");
+--enable_query_log
+
 --error 1005
 create table test.t3 (a int primary key) engine=ndb;
 show warnings;

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	revid:mikael.ronstrom@stripped
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_rep_error.test	revid:mikael.ronstrom@stripped
@@ -2,7 +2,7 @@
 # Some negative tests of the ndb_replication table
 #
 #
---source include/have_ndb.inc
+--source include/have_multi_ndb.inc
 --source include/have_binlog_format_mixed_or_row.inc
 --source suite/ndb_rpl/ndb_master-slave.inc
 
@@ -46,8 +46,25 @@ CREATE TABLE mysql.ndb_replication
 --enable_warnings
 --enable_query_log
 
+# Need suppressions on all servers where warnings/errors can be seen.
+--disable_query_log
+--connection server1
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection server2
+call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
+--connection default
+--enable_query_log
+
 # Non existant conflict_fn
 # gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* unknown conflict resolution function .*");
+
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$X(X)");
 --error 1005
 create table t1 (a int key, X int) engine ndb;
@@ -56,6 +73,8 @@ delete from mysql.ndb_replication;
 
 # Column type cannot be used for this function
 # gives warning when creating table
+#call mtr.add_suppression("NDB Slave: .* has wrong datatype.*");
+
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X)");
 create table t1 (a int key, X int) engine ndb;
 drop table t1;
@@ -63,6 +82,8 @@ delete from mysql.ndb_replication;
 
 # Too few arguments
 # gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing function argument .*");
+
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX()");
 --error 1005
 create table t1 (a int key, X int) engine ndb;
@@ -71,6 +92,7 @@ delete from mysql.ndb_replication;
 
 # Too many arguments
 # gives error when creating table
+#call mtr.add_suppression("NDB Slave: .* missing \')\' .*");
 insert into mysql.ndb_replication values ("test", "t1", 0, NULL, "NDB$MAX(X Y)");
 --error 1005
 create table t1 (a int key, X int) engine ndb;

=== added file 'mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc'
--- a/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc	1970-01-01 00:00:00 +0000
+++ b/mysql-test/suite/ndb_rpl/t/show_mysqld_warnings.inc	revid:mikael.ronstrom@stripped
@@ -0,0 +1,10 @@
+--disable_query_log
+--echo MySQLD error output for server $server_num matching pattern $pattern
+create table errlog (a int auto_increment primary key, txt text) engine=myisam;
+
+--eval load data local infile "$MYSQLTEST_VARDIR/log/mysqld.$server_num.err" into table errlog columns terminated by '\n' (txt);
+
+--eval select replace( lower( right(txt, length(txt) - locate('[',txt) + 1)), '\r', '') as relevant from errlog where txt like '$pattern' order by a desc limit $limit;
+
+drop table errlog;
+--enable_query_log

=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster.cc	revid:mikael.ronstrom@stripped
@@ -2130,7 +2130,7 @@ int ha_ndbcluster::get_metadata(THD *thd
 
 #ifdef HAVE_NDB_BINLOG
   ndbcluster_read_binlog_replication(thd, ndb, m_share, m_table,
-                                     ::server_id, table, FALSE);
+                                     ::server_id, FALSE);
 #endif
 
   DBUG_RETURN(0);
@@ -5143,6 +5143,7 @@ ha_ndbcluster::prepare_conflict_detectio
   {
     res = conflict_fn->prep_func(m_share->m_cfn_share,
                                  op_type,
+                                 m_ndb_record,
                                  old_data,
                                  new_data,
                                  table->write_set,
@@ -5993,7 +5994,10 @@ handle_row_conflict(NDB_CONFLICT_FN_SHAR
         for (k= 0; k < nkey; k++)
         {
           DBUG_ASSERT(pk_row != NULL);
-          const uchar* data= pk_row + cfn_share->m_offset[k];
+          const uchar* data=
+            (const uchar*) NdbDictionary::getValuePtr(key_rec,
+                                                      (const char*) pk_row,
+                                                      cfn_share->m_key_attrids[k]);
           if (ex_op->setValue((Uint32)(fixed_cols + k), (const char*)data) == -1)
           {
             err= ex_op->getNdbError();
@@ -10020,7 +10024,6 @@ int ha_ndbcluster::create(const char *na
                                                           m_dbname,
                                                           m_tabname,
                                                           ::server_id,
-                                                          form,
                                                           &binlog_flags,
                                                           &conflict_fn,
                                                           args,
@@ -10484,7 +10487,6 @@ cleanup_failed:
         ndbcluster_apply_binlog_replication_info(thd,
                                                  share,
                                                  m_table,
-                                                 form,
                                                  conflict_fn,
                                                  args,
                                                  num_args,
@@ -10929,7 +10931,7 @@ int ha_ndbcluster::rename_table(const ch
 #ifdef HAVE_NDB_BINLOG
     if (share)
       ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
-                                         ::server_id, NULL, TRUE);
+                                         ::server_id, TRUE);
 #endif
     /* always create an event for the table */
     String event_name(INJECTOR_EVENT_LEN);

=== modified file 'sql/ha_ndbcluster_binlog.cc'
--- a/sql/ha_ndbcluster_binlog.cc	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster_binlog.cc	revid:mikael.ronstrom@stripped
@@ -4004,7 +4004,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
                      const NDBTAB *ndbtab, uint field_index,
                      uint resolve_col_sz,
                      const st_conflict_fn_def* conflict_fn,
-                     TABLE *table,
                      uint8 flags)
 {
   DBUG_ENTER("slave_set_resolve_fn");
@@ -4024,8 +4023,6 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
   /* Calculate resolve col stuff (if relevant) */
   cfn_share->m_resolve_size= resolve_col_sz;
   cfn_share->m_resolve_column= field_index;
-  cfn_share->m_resolve_offset= (uint16)(table->field[field_index]->ptr -
-                                        table->record[0]);
   cfn_share->m_flags = flags;
 
   {
@@ -4074,8 +4071,11 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
               col->getNullable() == ex_col->getNullable();
             if (!ok)
               break;
-            cfn_share->m_offset[k]=
-              (uint16)(table->field[i]->ptr - table->record[0]);
+            /*
+               Store mapping of Exception table key# to
+               orig table attrid
+            */
+            cfn_share->m_key_attrids[k]= i;
             k++;
           }
         }
@@ -4086,9 +4086,9 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
           ndbtab_g.release();
           if (opt_ndb_extra_logging)
             sql_print_information("NDB Slave: Table %s.%s logging exceptions to %s.%s",
-                                  table->s->db.str,
-                                  table->s->table_name.str,
-                                  table->s->db.str,
+                                  share->db,
+                                  share->table_name,
+                                  share->db,
                                   ex_tab_name);
         }
         else
@@ -4121,6 +4121,7 @@ slave_set_resolve_fn(THD *thd, NDB_SHARE
 int
 row_conflict_fn_old(NDB_CONFLICT_FN_SHARE* cfn_share,
                     enum_conflicting_op_type op_type,
+                    const NdbRecord* data_record,
                     const uchar* old_data,
                     const uchar* new_data,
                     const MY_BITMAP* write_set,
@@ -4129,7 +4130,10 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR
   DBUG_ENTER("row_conflict_fn_old");
   uint32 resolve_column= cfn_share->m_resolve_column;
   uint32 resolve_size= cfn_share->m_resolve_size;
-  const uchar* field_ptr = old_data + cfn_share->m_resolve_offset;
+  const uchar* field_ptr = (const uchar*)
+    NdbDictionary::getValuePtr(data_record,
+                               (const char*) old_data,
+                               cfn_share->m_resolve_column);
 
   assert((resolve_size == 4) || (resolve_size == 8));
 
@@ -4197,6 +4201,7 @@ row_conflict_fn_old(NDB_CONFLICT_FN_SHAR
 int
 row_conflict_fn_max_update_only(NDB_CONFLICT_FN_SHARE* cfn_share,
                                 enum_conflicting_op_type op_type,
+                                const NdbRecord* data_record,
                                 const uchar* old_data,
                                 const uchar* new_data,
                                 const MY_BITMAP* write_set,
@@ -4205,7 +4210,10 @@ row_conflict_fn_max_update_only(NDB_CONF
   DBUG_ENTER("row_conflict_fn_max_update_only");
   uint32 resolve_column= cfn_share->m_resolve_column;
   uint32 resolve_size= cfn_share->m_resolve_size;
-  const uchar* field_ptr = new_data + cfn_share->m_resolve_offset;
+  const uchar* field_ptr = (const uchar*)
+    NdbDictionary::getValuePtr(data_record,
+                               (const char*) new_data,
+                               cfn_share->m_resolve_column);
 
   assert((resolve_size == 4) || (resolve_size == 8));
 
@@ -4284,6 +4292,7 @@ row_conflict_fn_max_update_only(NDB_CONF
 int
 row_conflict_fn_max(NDB_CONFLICT_FN_SHARE* cfn_share,
                     enum_conflicting_op_type op_type,
+                    const NdbRecord* data_record,
                     const uchar* old_data,
                     const uchar* new_data,
                     const MY_BITMAP* write_set,
@@ -4297,6 +4306,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
   case UPDATE_ROW:
     return row_conflict_fn_max_update_only(cfn_share,
                                            op_type,
+                                           data_record,
                                            old_data,
                                            new_data,
                                            write_set,
@@ -4308,6 +4318,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
      */
     return row_conflict_fn_old(cfn_share,
                                op_type,
+                               data_record,
                                old_data,
                                new_data,
                                write_set,
@@ -4336,6 +4347,7 @@ row_conflict_fn_max(NDB_CONFLICT_FN_SHAR
 int
 row_conflict_fn_max_del_win(NDB_CONFLICT_FN_SHARE* cfn_share,
                             enum_conflicting_op_type op_type,
+                            const NdbRecord* data_record,
                             const uchar* old_data,
                             const uchar* new_data,
                             const MY_BITMAP* write_set,
@@ -4349,6 +4361,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT
   case UPDATE_ROW:
     return row_conflict_fn_max_update_only(cfn_share,
                                            op_type,
+                                           data_record,
                                            old_data,
                                            new_data,
                                            write_set,
@@ -4373,6 +4386,7 @@ row_conflict_fn_max_del_win(NDB_CONFLICT
 int
 row_conflict_fn_epoch(NDB_CONFLICT_FN_SHARE* cfn_share,
                       enum_conflicting_op_type op_type,
+                      const NdbRecord* data_record,
                       const uchar* old_data,
                       const uchar* new_data,
                       const MY_BITMAP* write_set,
@@ -4468,7 +4482,6 @@ parse_conflict_fn_spec(const char* confl
                        const st_conflict_fn_def** conflict_fn,
                        st_conflict_fn_arg* args,
                        Uint32* max_args,
-                       const TABLE* table,
                        char *msg, uint msg_len)
 {
   DBUG_ENTER("parse_conflict_fn_spec");
@@ -4559,9 +4572,6 @@ parse_conflict_fn_spec(const char* confl
 
       uint len= (uint)(end_arg - start_arg);
       args[no_args].type=    type;
-      args[no_args].ptr=     start_arg;
-      args[no_args].len=     len;
-      args[no_args].fieldno= (uint32)-1;
  
       DBUG_PRINT("info", ("found argument %s %u", start_arg, len));
 
@@ -4570,20 +4580,13 @@ parse_conflict_fn_spec(const char* confl
       {
       case CFAT_COLUMN_NAME:
       {
-        /* find column in table */
-        DBUG_PRINT("info", ("searching for %s %u", start_arg, len));
-        TABLE_SHARE *table_s= table->s;
-        for (uint j= 0; j < table_s->fields; j++)
-        {
-          Field *field= table_s->field[j];
-          if (strncmp(start_arg, field->field_name, len) == 0 &&
-              field->field_name[len] == '\0')
-          {
-            DBUG_PRINT("info", ("found %s", field->field_name));
-            args[no_args].fieldno= j;
-            break;
-          }
-        }
+        /* Copy column name out into argument's buffer */
+        char* dest= &args[no_args].resolveColNameBuff[0];
+
+        memcpy(dest, start_arg, (len < (uint) NAME_CHAR_LEN ?
+                                 len :
+                                 NAME_CHAR_LEN));
+        dest[len]= '\0';
         break;
       }
       case CFAT_EXTRA_GCI_BITS:
@@ -4645,7 +4648,7 @@ parse_conflict_fn_spec(const char* confl
   }
   /* parse error */
   my_snprintf(msg, msg_len, "%s, %s at '%s'",
-           conflict_fn_spec, error_str, ptr);
+              conflict_fn_spec, error_str, ptr);
   DBUG_PRINT("info", ("%s", msg));
   DBUG_RETURN(-1);
 }
@@ -4654,7 +4657,6 @@ static int
 setup_conflict_fn(THD *thd, NDB_SHARE *share,
                   const NDBTAB *ndbtab,
                   char *msg, uint msg_len,
-                  TABLE *table,
                   const st_conflict_fn_def* conflict_fn,
                   const st_conflict_fn_arg* args,
                   const Uint32 num_args)
@@ -4676,38 +4678,65 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
       DBUG_RETURN(-1);
     }
 
+    /* Now try to find the column in the table */
+    int colNum = -1;
+    const char* resolveColName = args[0].resolveColNameBuff;
+    int resolveColNameLen = (int)strlen(resolveColName);
+
+    for (int j=0; j< ndbtab->getNoOfColumns(); j++)
+    {
+      const char* colName = ndbtab->getColumn(j)->getName();
+
+      if (strncmp(colName,
+                  resolveColName,
+                  resolveColNameLen) == 0 &&
+          colName[resolveColNameLen] == '\0')
+      {
+        colNum = j;
+        break;
+      }
+    }
+    if (colNum == -1)
+    {
+      my_snprintf(msg, msg_len,
+                  "Could not find resolve column %s.",
+                  resolveColName);
+      DBUG_PRINT("info", ("%s", msg));
+      DBUG_RETURN(-1);
+    }
+
     uint resolve_col_sz= 0;
 
     if (0 == (resolve_col_sz =
-              slave_check_resolve_col_type(ndbtab, args[0].fieldno)))
+              slave_check_resolve_col_type(ndbtab, colNum)))
     {
       /* wrong data type */
       slave_reset_conflict_fn(share);
       my_snprintf(msg, msg_len,
-                  "column '%s' has wrong datatype",
-                  table->s->field[args[0].fieldno]->field_name);
+                  "Column '%s' has wrong datatype",
+                  resolveColName);
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
 
     if (slave_set_resolve_fn(thd, share, ndbtab,
-                             args[0].fieldno, resolve_col_sz,
-                             conflict_fn, table, CFF_NONE))
+                             colNum, resolve_col_sz,
+                             conflict_fn, CFF_NONE))
     {
       my_snprintf(msg, msg_len,
-                  "unable to setup conflict resolution using column '%s'",
-                  table->s->field[args[0].fieldno]->field_name);
+                  "Unable to setup conflict resolution using column '%s'",
+                  resolveColName);
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
-    if (opt_ndb_extra_logging)
-    {
-       sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
-                             table->s->db.str,
-                             table->s->table_name.str,
-                             conflict_fn->name,
-                             table->s->field[args[0].fieldno]->field_name);
-    }
+
+    /* Success, update message */
+    my_snprintf(msg, msg_len,
+                "NDB Slave: Table %s.%s using conflict_fn %s on attribute %s.",
+                share->db,
+                share->table_name,
+                conflict_fn->name,
+                resolveColName);
     break;
   }
   case CFT_NDB_EPOCH:
@@ -4734,7 +4763,9 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
      * represent SavePeriod/EpochPeriod
      */
     if (ndbtab->getExtraRowGciBits() == 0)
-      sql_print_information("Ndb Slave : CFT_NDB_EPOCH[_TRANS], low epoch resolution");
+      sql_print_information("NDB Slave: Table %s.%s : CFT_NDB_EPOCH[_TRANS], low epoch resolution",
+                            share->db,
+                            share->table_name);
 
     if (ndbtab->getExtraRowAuthorBits() == 0)
     {
@@ -4746,20 +4777,20 @@ setup_conflict_fn(THD *thd, NDB_SHARE *s
     if (slave_set_resolve_fn(thd, share, ndbtab,
                              0, // field_no
                              0, // resolve_col_sz
-                             conflict_fn, table, CFF_REFRESH_ROWS))
+                             conflict_fn, CFF_REFRESH_ROWS))
     {
       my_snprintf(msg, msg_len,
                   "unable to setup conflict resolution");
       DBUG_PRINT("info", ("%s", msg));
       DBUG_RETURN(-1);
     }
-    if (opt_ndb_extra_logging)
-    {
-      sql_print_information("NDB Slave: Table %s.%s using conflict_fn %s.",
-                            table->s->db.str,
-                            table->s->table_name.str,
-                            conflict_fn->name);
-    }
+    /* Success, update message */
+    my_snprintf(msg, msg_len,
+                "NDB Slave: Table %s.%s using conflict_fn %s.",
+                share->db,
+                share->table_name,
+                conflict_fn->name);
+
     break;
   }
   case CFT_NUMBER_OF_CFTS:
@@ -5065,7 +5096,6 @@ ndbcluster_get_binlog_replication_info(T
                                        const char* db,
                                        const char* table_name,
                                        uint server_id,
-                                       const TABLE *table,
                                        Uint32* binlog_flags,
                                        const st_conflict_fn_def** conflict_fn,
                                        st_conflict_fn_arg* args,
@@ -5113,34 +5143,42 @@ ndbcluster_get_binlog_replication_info(T
     DBUG_RETURN(ER_NDB_REPLICATION_SCHEMA_ERROR);
   }
 
-  if (table != NULL)
+  if (conflict_fn_spec != NULL)
   {
-    if (conflict_fn_spec != NULL)
+    char tmp_buf[FN_REFLEN];
+
+    if (parse_conflict_fn_spec(conflict_fn_spec,
+                               conflict_fn,
+                               args,
+                               num_args,
+                               tmp_buf,
+                               sizeof(tmp_buf)) != 0)
     {
-      char tmp_buf[FN_REFLEN];
+        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
+                          ER_CONFLICT_FN_PARSE_ERROR,
+                          ER(ER_CONFLICT_FN_PARSE_ERROR),
+                          tmp_buf);
 
-      if (parse_conflict_fn_spec(conflict_fn_spec,
-                                 conflict_fn,
-                                 args,
-                                 num_args,
-                                 table,
-                                 tmp_buf,
-                                 sizeof(tmp_buf)) != 0)
+      /*
+         Log as well, useful for contexts where the thd's stack of
+         warnings are ignored
+       */
+      if (opt_ndb_extra_logging)
       {
-        push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
-                            ER_CONFLICT_FN_PARSE_ERROR,
-                            ER(ER_CONFLICT_FN_PARSE_ERROR),
-                            tmp_buf);
-        DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
+        sql_print_warning("NDB Slave: Table %s.%s : Parse error on conflict fn : %s",
+                          db, table_name,
+                          tmp_buf);
       }
-    }
-    else
-    {
-      /* No conflict function specified */
-      conflict_fn= NULL;
-      num_args= 0;
+
+      DBUG_RETURN(ER_CONFLICT_FN_PARSE_ERROR);
     }
   }
+  else
+  {
+    /* No conflict function specified */
+    conflict_fn= NULL;
+    num_args= 0;
+  }
 
   DBUG_RETURN(0);
 }
@@ -5149,7 +5187,6 @@ int
 ndbcluster_apply_binlog_replication_info(THD *thd,
                                          NDB_SHARE *share,
                                          const NDBTAB* ndbtab,
-                                         TABLE* table,
                                          const st_conflict_fn_def* conflict_fn,
                                          const st_conflict_fn_arg* args,
                                          Uint32 num_args,
@@ -5170,15 +5207,32 @@ ndbcluster_apply_binlog_replication_info
     if (setup_conflict_fn(thd, share,
                           ndbtab,
                           tmp_buf, sizeof(tmp_buf),
-                          table,
                           conflict_fn,
                           args,
-                          num_args) != 0)
+                          num_args) == 0)
+    {
+      if (opt_ndb_extra_logging)
+      {
+        sql_print_information("%s", tmp_buf);
+      }
+    }
+    else
     {
+      /*
+        Dump setup failure message to error log
+        for cases where thd warning stack is
+        ignored
+      */
+      sql_print_warning("NDB Slave: Table %s.%s : %s",
+                        share->db,
+                        share->table_name,
+                        tmp_buf);
+
       push_warning_printf(thd, MYSQL_ERROR::WARN_LEVEL_WARN,
                           ER_CONFLICT_FN_PARSE_ERROR,
                           ER(ER_CONFLICT_FN_PARSE_ERROR),
                           tmp_buf);
+
       DBUG_RETURN(-1);
     }
   }
@@ -5196,7 +5250,6 @@ ndbcluster_read_binlog_replication(THD *
                                    NDB_SHARE *share,
                                    const NDBTAB *ndbtab,
                                    uint server_id,
-                                   TABLE *table,
                                    bool do_set_binlog_flags)
 {
   DBUG_ENTER("ndbcluster_read_binlog_replication");
@@ -5209,7 +5262,6 @@ ndbcluster_read_binlog_replication(THD *
                                               share->db,
                                               share->table_name,
                                               server_id,
-                                              table,
                                               &binlog_flags,
                                               &conflict_fn,
                                               args,
@@ -5217,7 +5269,6 @@ ndbcluster_read_binlog_replication(THD *
       (ndbcluster_apply_binlog_replication_info(thd,
                                                 share,
                                                 ndbtab,
-                                                table,
                                                 conflict_fn,
                                                 args,
                                                 num_args,
@@ -5322,11 +5373,18 @@ int ndbcluster_create_binlog_setup(THD *
     /*
      */
     ndbcluster_read_binlog_replication(thd, ndb, share, ndbtab,
-                                       ::server_id, NULL, TRUE);
+                                       ::server_id, TRUE);
 #endif
     /*
       check if logging turned off for this table
     */
+    if ((share->flags & NSF_HIDDEN_PK) &&
+        (share->flags & NSF_BLOB_FLAG) &&
+        !(share->flags & NSF_NO_BINLOG))
+    {
+      DBUG_PRINT("NDB_SHARE", ("NSF_HIDDEN_PK && NSF_BLOB_FLAG -> NSF_NO_BINLOG"));
+      share->flags |= NSF_NO_BINLOG;
+    }
     if (get_binlog_nologging(share))
     {
       if (opt_ndb_extra_logging)
@@ -6683,6 +6741,71 @@ void updateInjectorStats(Ndb* schemaNdb,
     dataNdb->getClientStat(Ndb::EventBytesRecvdCount);
 }
 
+/**
+   injectApplyStatusWriteRow
+
+   Inject a WRITE_ROW event on the ndb_apply_status table into
+   the Binlog.
+   This contains our server_id and the supplied epoch number.
+   When applied on the Slave it gives a transactional position
+   marker
+*/
+static
+bool
+injectApplyStatusWriteRow(injector::transaction& trans,
+                          ulonglong gci)
+{
+  DBUG_ENTER("injectApplyStatusWriteRow");
+  if (ndb_apply_status_share == NULL)
+  {
+    sql_print_error("NDB: Could not get apply status share");
+    DBUG_ASSERT(ndb_apply_status_share != NULL);
+    DBUG_RETURN(false);
+  }
+
+  /* Build row buffer for generated ndb_apply_status
+     WRITE_ROW event
+     First get the relevant table structure.
+  */
+  DBUG_ASSERT(!ndb_apply_status_share->event_data);
+  DBUG_ASSERT(ndb_apply_status_share->op);
+  Ndb_event_data* event_data=
+    (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
+  DBUG_ASSERT(event_data);
+  DBUG_ASSERT(event_data->shadow_table);
+  TABLE* apply_status_table= event_data->shadow_table;
+
+  /*
+    Intialize apply_status_table->record[0]
+  */
+  empty_record(apply_status_table);
+
+  apply_status_table->field[0]->store((longlong)::server_id, true);
+  apply_status_table->field[1]->store((longlong)gci, true);
+  apply_status_table->field[2]->store("", 0, &my_charset_bin);
+  apply_status_table->field[3]->store((longlong)0, true);
+  apply_status_table->field[4]->store((longlong)0, true);
+#ifndef DBUG_OFF
+  const LEX_STRING& name= apply_status_table->s->table_name;
+  DBUG_PRINT("info", ("use_table: %.*s",
+                      (int) name.length, name.str));
+#endif
+  injector::transaction::table tbl(apply_status_table, true);
+  int ret = trans.use_table(::server_id, tbl);
+  assert(ret == 0); NDB_IGNORE_VALUE(ret);
+
+  ret= trans.write_row(::server_id,
+                       injector::transaction::table(apply_status_table,
+                                                    true),
+                       &apply_status_table->s->all_set,
+                       apply_status_table->s->fields,
+                       apply_status_table->record[0]);
+
+  assert(ret == 0);
+
+  DBUG_RETURN(true);
+}
+
 
 extern ulong opt_ndb_report_thresh_binlog_epoch_slip;
 extern ulong opt_ndb_report_thresh_binlog_mem_usage;
@@ -7154,44 +7277,6 @@ restart_cluster_failure:
     {
       DBUG_PRINT("info", ("pollEvents res: %d", res));
       thd->proc_info= "Processing events";
-      uchar apply_status_buf[512];
-      TABLE *apply_status_table= NULL;
-      if (ndb_apply_status_share)
-      {
-        /*
-          We construct the buffer to write the apply status binlog
-          event here, as the table->record[0] buffer is referenced
-          by the apply status event operation, and will be filled
-          with data at the nextEvent call if the first event should
-          happen to be from the apply status table
-        */
-        Ndb_event_data *event_data= ndb_apply_status_share->event_data;
-        if (!event_data)
-        {
-          DBUG_ASSERT(ndb_apply_status_share->op);
-          event_data= 
-            (Ndb_event_data *) ndb_apply_status_share->op->getCustomData();
-          DBUG_ASSERT(event_data);
-        }
-        apply_status_table= event_data->shadow_table;
-
-        /* 
-           Intialize apply_status_table->record[0] 
-        */
-        empty_record(apply_status_table);
-
-        apply_status_table->field[0]->store((longlong)::server_id, true);
-        /*
-          gci is added later, just before writing to binlog as gci
-          is unknown here
-        */
-        apply_status_table->field[2]->store("", 0, &my_charset_bin);
-        apply_status_table->field[3]->store((longlong)0, true);
-        apply_status_table->field[4]->store((longlong)0, true);
-        DBUG_ASSERT(sizeof(apply_status_buf) >= apply_status_table->s->reclength);
-        memcpy(apply_status_buf, apply_status_table->record[0],
-               apply_status_table->s->reclength);
-      }
       NdbEventOperation *pOp= i_ndb->nextEvent();
       ndb_binlog_index_row _row;
       ndb_binlog_index_row *rows= &_row;
@@ -7314,35 +7399,11 @@ restart_cluster_failure:
         }
         if (trans.good())
         {
-          if (apply_status_table)
-          {
-#ifndef DBUG_OFF
-            const LEX_STRING& name= apply_status_table->s->table_name;
-            DBUG_PRINT("info", ("use_table: %.*s",
-                                (int) name.length, name.str));
-#endif
-            injector::transaction::table tbl(apply_status_table, true);
-            int ret = trans.use_table(::server_id, tbl);
-            assert(ret == 0); NDB_IGNORE_VALUE(ret);
-
-            /* add the gci to the record */
-            Field *field= apply_status_table->field[1];
-            my_ptrdiff_t row_offset=
-              (my_ptrdiff_t) (apply_status_buf - apply_status_table->record[0]);
-            field->move_field_offset(row_offset);
-            field->store((longlong)gci, true);
-            field->move_field_offset(-row_offset);
-
-            trans.write_row(::server_id,
-                            injector::transaction::table(apply_status_table,
-                                                         true),
-                            &apply_status_table->s->all_set,
-                            apply_status_table->s->fields,
-                            apply_status_buf);
-          }
-          else
+          /* Inject ndb_apply_status WRITE_ROW event */
+          if (!injectApplyStatusWriteRow(trans,
+                                         gci))
           {
-            sql_print_error("NDB: Could not get apply status share");
+            sql_print_error("NDB Binlog: Failed to inject apply status write row");
           }
         }
 

=== modified file 'sql/ha_ndbcluster_binlog.h'
--- a/sql/ha_ndbcluster_binlog.h	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster_binlog.h	revid:mikael.ronstrom@stripped
@@ -82,7 +82,6 @@ ndbcluster_get_binlog_replication_info(T
                                        const char* db,
                                        const char* table_name,
                                        uint server_id,
-                                       const TABLE *table,
                                        Uint32* binlog_flags,
                                        const st_conflict_fn_def** conflict_fn,
                                        st_conflict_fn_arg* args,
@@ -91,7 +90,6 @@ int
 ndbcluster_apply_binlog_replication_info(THD *thd,
                                          NDB_SHARE *share,
                                          const NDBTAB* ndbtab,
-                                         TABLE* table,
                                          const st_conflict_fn_def* conflict_fn,
                                          const st_conflict_fn_arg* args,
                                          Uint32 num_args,
@@ -102,7 +100,6 @@ ndbcluster_read_binlog_replication(THD *
                                    NDB_SHARE *share,
                                    const NDBTAB *ndbtab,
                                    uint server_id,
-                                   TABLE *table,
                                    bool do_set_binlog_flags);
 #endif
 int ndb_create_table_from_engine(THD *thd, const char *db,

=== modified file 'sql/ha_ndbcluster_cond.cc'
--- a/sql/ha_ndbcluster_cond.cc	revid:mikael.ronstrom@stripped
+++ b/sql/ha_ndbcluster_cond.cc	revid:mikael.ronstrom@stripped
@@ -1354,13 +1354,14 @@ ha_ndbcluster_cond::build_scan_filter_pr
       if (!value || !field) break;
       bool is_string= (value->qualification.value_type == Item::STRING_ITEM);
       // Save value in right format for the field type
-      uint32 len= value->save_in_field(field);
+      uint32 val_len= value->save_in_field(field);
       char buff[MAX_FIELD_WIDTH];
       String str(buff,sizeof(buff),field->get_field_charset());
-      if (len > field->get_field()->field_length)
-        str.set(value->get_val(), len, field->get_field_charset());
+      if (val_len > field->get_field()->field_length)
+        str.set(value->get_val(), val_len, field->get_field_charset());
       else
         field->get_field_val_str(&str);
+      uint32 len= str.length();
       const char *val=
         ((value->is_const_func() || value->is_cached()) && is_string)?
         str.ptr()
@@ -1382,13 +1383,14 @@ ha_ndbcluster_cond::build_scan_filter_pr
       if (!value || !field) break;
       bool is_string= (value->qualification.value_type == Item::STRING_ITEM);
       // Save value in right format for the field type
-      uint32 len= value->save_in_field(field);
+      uint32 val_len= value->save_in_field(field);
       char buff[MAX_FIELD_WIDTH];
       String str(buff,sizeof(buff),field->get_field_charset());
-      if (len > field->get_field()->field_length)
-        str.set(value->get_val(), len, field->get_field_charset());
+      if (val_len > field->get_field()->field_length)
+        str.set(value->get_val(), val_len, field->get_field_charset());
       else
         field->get_field_val_str(&str);
+      uint32 len= str.length();
       const char *val=
         ((value->is_const_func() || value->is_cached()) && is_string)?
         str.ptr()

=== modified file 'sql/ndb_share.h'
--- a/sql/ndb_share.h	revid:mikael.ronstrom@stripped
+++ b/sql/ndb_share.h	revid:mikael.ronstrom@stripped
@@ -22,6 +22,8 @@
 #include <my_alloc.h>        // MEM_ROOT
 #include <thr_lock.h>        // THR_LOCK
 #include <my_bitmap.h>       // MY_BITMAP
+#include <mysql_com.h>       // NAME_CHAR_LEN
+#include <sql_const.h>       // MAX_REF_PARTS
 
 #include <ndbapi/Ndb.hpp>    // Ndb::TupleIdRange
 
@@ -56,11 +58,9 @@ enum enum_conflict_fn_arg_type
 struct st_conflict_fn_arg
 {
   enum_conflict_fn_arg_type type;
-  const char *ptr;
-  uint32 len;
   union
   {
-    uint32 fieldno;      // CFAT_COLUMN_NAME
+    char resolveColNameBuff[ NAME_CHAR_LEN + 1 ]; // CFAT_COLUMN_NAME
     uint32 extraGciBits; // CFAT_EXTRA_GCI_BITS
   };
 };
@@ -88,6 +88,7 @@ enum enum_conflicting_op_type
 */
 typedef int (* prepare_detect_func) (struct st_ndbcluster_conflict_fn_share* cfn_share,
                                      enum_conflicting_op_type op_type,
+                                     const NdbRecord* data_record,
                                      const uchar* old_data,
                                      const uchar* new_data,
                                      const MY_BITMAP* write_set,
@@ -131,16 +132,21 @@ enum enum_conflict_fn_table_flags
   CFF_REFRESH_ROWS = 1
 };
 
+/*
+   Maximum supported key parts (16)
+   (Ndb supports 32, but MySQL has a lower limit)
+*/
+static const int NDB_MAX_KEY_PARTS = MAX_REF_PARTS;
+
 typedef struct st_ndbcluster_conflict_fn_share {
   const st_conflict_fn_def* m_conflict_fn;
 
   /* info about original table */
   uint8 m_pk_cols;
-  uint8 m_resolve_column;
+  uint16 m_resolve_column;
   uint8 m_resolve_size;
   uint8 m_flags;
-  uint16 m_offset[16];
-  uint16 m_resolve_offset;
+  uint16 m_key_attrids[ NDB_MAX_KEY_PARTS ];
 
   const NdbDictionary::Table *m_ex_tab;
   uint32 m_count;

=== modified file 'sql/ndb_thd.cc'
--- a/sql/ndb_thd.cc	revid:mikael.ronstrom@stripped
+++ b/sql/ndb_thd.cc	revid:mikael.ronstrom@stripped
@@ -15,6 +15,10 @@
    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
 */
 
+#ifndef MYSQL_SERVER
+#define MYSQL_SERVER
+#endif
+
 #include "ndb_thd.h"
 #include "ndb_thd_ndb.h"
 
@@ -44,10 +48,6 @@ Ndb* check_ndb_in_thd(THD* thd, bool val
   return thd_ndb->ndb;
 }
 
-#ifndef MYSQL_SERVER
-#define MYSQL_SERVER
-#endif
-
 #include <sql_class.h>
 
 void

=== modified file 'storage/ndb/VERSION'
--- a/storage/ndb/VERSION	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/VERSION	revid:mikael.ronstrom@stripped
@@ -2,7 +2,7 @@
 # Should be updated when creating a new NDB version
 NDB_VERSION_MAJOR=7
 NDB_VERSION_MINOR=2
-NDB_VERSION_BUILD=4
+NDB_VERSION_BUILD=5
 NDB_VERSION_STATUS=""
 NDB_SHARED_LIB_VERSION_MAJOR=6
 NDB_SHARED_LIB_VERSION_MINOR=0

=== modified file 'storage/ndb/include/debugger/SignalLoggerManager.hpp'
--- a/storage/ndb/include/debugger/SignalLoggerManager.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/debugger/SignalLoggerManager.hpp	revid:mikael.ronstrom@stripped
@@ -28,8 +28,10 @@
 
 #include <kernel_types.h>
 #include <BlockNumbers.h>
-#include <TransporterDefinitions.hpp>
 #include <RefConvert.hpp>
+#include <NdbMutex.h>
+
+struct SignalHeader;
 
 class SignalLoggerManager
 {

=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h	revid:mikael.ronstrom@stripped
@@ -454,9 +454,13 @@ extern const GlobalSignalNumber NO_OF_SI
 #define GSN_COPY_DATA_REF               337
 #define GSN_COPY_DATA_CONF              338
 
+/*
+   jonas 2012-01-10 unused currently...
+     so rename REQ to ORD
 #define GSN_OPEN_COMCONF                339
 #define GSN_OPEN_COMREF                 340
-#define GSN_OPEN_COMREQ                 341
+*/
+#define GSN_OPEN_COMORD                 341
 #define GSN_PACKED_SIGNAL               342
 #define GSN_PREP_FAILCONF               343
 #define GSN_PREP_FAILREF                344

=== modified file 'storage/ndb/include/kernel/NodeInfo.hpp'
--- a/storage/ndb/include/kernel/NodeInfo.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/kernel/NodeInfo.hpp	revid:mikael.ronstrom@stripped
@@ -37,18 +37,13 @@ public:
     INVALID = 255 ///< Invalid type
   };
   NodeType getType() const;
-  
+
   Uint32 m_version;       ///< Ndb version
   Uint32 m_mysql_version; ///< MySQL version
   Uint32 m_lqh_workers;   ///< LQH workers
   Uint32 m_type;          ///< Node type
   Uint32 m_connectCount;  ///< No of times connected
   Uint32 m_connected;     ///< Node is connected
-  /* Ensure the above variables reside on its own 64 byte cache line */
-  Uint32 m_not_used1[10];
-  Uint32 m_heartbeat_cnt; ///< Missed heartbeats
-  /* Ensure the m_heartbeat_cnt reside on its own 64 byte cache line */
-  Uint32 m_not_used2[15];
 
   friend NdbOut & operator<<(NdbOut&, const NodeInfo&); 
 };
@@ -61,7 +56,6 @@ NodeInfo::NodeInfo(){
   m_lqh_workers = 0;
   m_type = INVALID;
   m_connectCount = 0;
-  m_heartbeat_cnt= 0;
 }
 
 inline

=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/kernel/ndb_limits.h	revid:mikael.ronstrom@stripped
@@ -126,11 +126,7 @@
 * signals.
 * This parameter is configurable, this is the default value.
 */
-#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
-#define SCAN_BATCH_SIZE 32768
-#else
 #define SCAN_BATCH_SIZE 16384
-#endif
 /*
 * To protect the NDB API from overload we also define a maximum total
 * batch size from all nodes. This parameter should most likely be

=== modified file 'storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/kernel/signaldata/SignalDroppedRep.hpp	revid:mikael.ronstrom@stripped
@@ -21,8 +21,8 @@
 
 #include "SignalData.hpp"
 
-class SignalDroppedRep {
-
+struct SignalDroppedRep
+{
   /**
    * Receiver(s)
    */
@@ -37,8 +37,7 @@ class SignalDroppedRep {
   friend class TransporterCallbackKernel;
 
   friend bool printSIGNAL_DROPPED_REP(FILE *, const Uint32 *, Uint32, Uint16);  
-public:
-private:
+
   Uint32 originalGsn;
   Uint32 originalLength;
   Uint32 originalSectionCount;

=== modified file 'storage/ndb/include/ndb_global.h'
--- a/storage/ndb/include/ndb_global.h	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndb_global.h	revid:mikael.ronstrom@stripped
@@ -249,6 +249,11 @@ extern "C" {
 #define ATTRIBUTE_NOINLINE
 #endif
 
+/**
+ * Pad to NDB_CL size
+ */
+#define NDB_CL_PADSZ(x) (NDB_CL - ((x) % NDB_CL))
+
 /*
  * require is like a normal assert, only it's always on (eg. in release)
  */

=== modified file 'storage/ndb/include/ndb_types.h.in'
--- a/storage/ndb/include/ndb_types.h.in	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndb_types.h.in	revid:mikael.ronstrom@stripped
@@ -22,6 +22,13 @@
 #ifndef NDB_TYPES_H
 #define NDB_TYPES_H
 
+/**
+ * sizeof cacheline (in bytes)
+ *
+ * TODO: Add configure check...
+ */
+#define NDB_CL 64
+
 #define NDB_SIZEOF_CHARP @NDB_SIZEOF_CHARP@
 #define NDB_SIZEOF_CHAR @NDB_SIZEOF_CHAR@
 #define NDB_SIZEOF_INT @NDB_SIZEOF_INT@
@@ -89,4 +96,76 @@ typedef unsigned int UintR;
 
 #include "ndb_constants.h"
 
+struct LinearSectionPtr
+{
+  Uint32 sz;
+  Uint32 * p;
+};
+
+struct SegmentedSectionPtrPOD
+{
+  Uint32 sz;
+  Uint32 i;
+  struct SectionSegment * p;
+
+#ifdef __cplusplus
+  void setNull() { p = 0;}
+  bool isNull() const { return p == 0;}
+  inline SegmentedSectionPtrPOD& assign(struct SegmentedSectionPtr&);
+#endif
+};
+
+struct SegmentedSectionPtr
+{
+  Uint32 sz;
+  Uint32 i;
+  struct SectionSegment * p;
+
+#ifdef __cplusplus
+  SegmentedSectionPtr() {}
+  SegmentedSectionPtr(Uint32 sz_arg, Uint32 i_arg,
+                      struct SectionSegment *p_arg)
+    :sz(sz_arg), i(i_arg), p(p_arg)
+  {}
+  SegmentedSectionPtr(const SegmentedSectionPtrPOD & src)
+    :sz(src.sz), i(src.i), p(src.p)
+  {}
+
+  void setNull() { p = 0;}
+  bool isNull() const { return p == 0;}
+#endif
+};
+
+#ifdef __cplusplus
+inline
+SegmentedSectionPtrPOD&
+SegmentedSectionPtrPOD::assign(struct SegmentedSectionPtr& src)
+{
+  this->i = src.i;
+  this->p = src.p;
+  this->sz = src.sz;
+  return *this;
+}
+#endif
+
+/* Abstract interface for iterating over
+ * words in a section
+ */
+#ifdef __cplusplus
+struct GenericSectionIterator
+{
+  virtual ~GenericSectionIterator() {};
+  virtual void reset()=0;
+  virtual const Uint32* getNextWords(Uint32& sz)=0;
+};
+#else
+struct GenericSectionIterator;
+#endif
+
+struct GenericSectionPtr
+{
+  Uint32 sz;
+  struct GenericSectionIterator* sectionIter;
+};
+
 #endif

=== modified file 'storage/ndb/include/ndbapi/Ndb.hpp'
--- a/storage/ndb/include/ndbapi/Ndb.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndbapi/Ndb.hpp	revid:mikael.ronstrom@stripped
@@ -1380,6 +1380,14 @@ public:
 
   struct PartitionSpec
   {
+    /*
+      Size of the PartitionSpec structure.
+    */
+    static inline Uint32 size()
+    {
+        return sizeof(PartitionSpec);
+    }
+
     enum SpecType
     {
       PS_NONE                = 0,

=== modified file 'storage/ndb/include/ndbapi/NdbDictionary.hpp'
--- a/storage/ndb/include/ndbapi/NdbDictionary.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndbapi/NdbDictionary.hpp	revid:mikael.ronstrom@stripped
@@ -1657,6 +1657,14 @@ public:
   };
   struct RecordSpecification {
     /*
+      Size of the RecordSpecification structure.
+    */
+    static inline Uint32 size()
+    {
+        return sizeof(RecordSpecification);
+    }
+
+    /*
       Column described by this entry (the column maximum size defines field
       size in row).
       Note that even when creating an NdbRecord for an index, the column

=== modified file 'storage/ndb/include/ndbapi/NdbOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbOperation.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndbapi/NdbOperation.hpp	revid:mikael.ronstrom@stripped
@@ -1029,6 +1029,14 @@ public:
   struct OperationOptions
   {
     /*
+      Size of the OperationOptions structure.
+    */
+    static inline Uint32 size()
+    {
+        return sizeof(OperationOptions);
+    }
+
+    /*
      * Which options are present.  See below for option details
      */
     Uint64 optionsPresent;

=== modified file 'storage/ndb/include/ndbapi/NdbScanOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp	revid:mikael.ronstrom@stripped
@@ -107,6 +107,14 @@ public:
    */
   struct ScanOptions
   {
+    /*
+      Size of the ScanOptions structure.
+    */
+    static inline Uint32 size()
+    {
+        return sizeof(ScanOptions);
+    }
+
     /* Which options are present - see below for possibilities */
     Uint64 optionsPresent;
 

=== modified file 'storage/ndb/include/portlib/ndb_socket_poller.h'
--- a/storage/ndb/include/portlib/ndb_socket_poller.h	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/portlib/ndb_socket_poller.h	revid:mikael.ronstrom@stripped
@@ -79,11 +79,7 @@ class ndb_socket_poller {
 
   int m_nfds; // Max fd number for 'select'
 #endif
-  /*
-   * Safety precaution to avoid false CPU cache sharing between
-   * different ndb_socket_poller instances in the same array.
-   */
-  Uint32 not_used[16];
+
 public:
 
   ndb_socket_poller(void) :

=== modified file 'storage/ndb/include/transporter/TransporterCallback.hpp'
--- a/storage/ndb/include/transporter/TransporterCallback.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/transporter/TransporterCallback.hpp	revid:mikael.ronstrom@stripped
@@ -31,277 +31,15 @@
  
 #include <kernel_types.h> 
 #include "TransporterDefinitions.hpp" 
+#include "TransporterRegistry.hpp"
  
- 
-#define TE_DO_DISCONNECT 0x8000
-
-enum TransporterError {
-  TE_NO_ERROR = 0,
-  /**
-   * TE_ERROR_CLOSING_SOCKET
-   *
-   *   Error found during closing of socket
-   *
-   * Recommended behavior: Ignore
-   */
-  TE_ERROR_CLOSING_SOCKET = 0x1,
-
-  /**
-   * TE_ERROR_IN_SELECT_BEFORE_ACCEPT
-   *
-   *   Error found during accept (just before)
-   *     The transporter will retry.
-   *
-   * Recommended behavior: Ignore
-   *   (or possible do setPerformState(PerformDisconnect)
-   */
-  TE_ERROR_IN_SELECT_BEFORE_ACCEPT = 0x2,
-
-  /**
-   * TE_INVALID_MESSAGE_LENGTH
-   *
-   *   Error found in message (message length)
-   *
-   * Recommended behavior: setPerformState(PerformDisconnect)
-   */
-  TE_INVALID_MESSAGE_LENGTH = 0x3 | TE_DO_DISCONNECT,
-
-  /**
-   * TE_INVALID_CHECKSUM
-   *
-   *   Error found in message (checksum)
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  TE_INVALID_CHECKSUM = 0x4 | TE_DO_DISCONNECT,
-
-  /**
-   * TE_COULD_NOT_CREATE_SOCKET
-   *
-   *   Error found while creating socket
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  TE_COULD_NOT_CREATE_SOCKET = 0x5,
-
-  /**
-   * TE_COULD_NOT_BIND_SOCKET
-   *
-   *   Error found while binding server socket
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  TE_COULD_NOT_BIND_SOCKET = 0x6,
-
-  /**
-   * TE_LISTEN_FAILED
-   *
-   *   Error found while listening to server socket
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  TE_LISTEN_FAILED = 0x7,
-
-  /**
-   * TE_ACCEPT_RETURN_ERROR
-   *
-   *   Error found during accept
-   *     The transporter will retry.
-   *
-   * Recommended behavior: Ignore
-   *   (or possible do setPerformState(PerformDisconnect)
-   */
-  TE_ACCEPT_RETURN_ERROR = 0x8
-
-  /**
-   * TE_SHM_DISCONNECT
-   *
-   *    The remote node has disconnected
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SHM_DISCONNECT = 0xb | TE_DO_DISCONNECT
-
-  /**
-   * TE_SHM_IPC_STAT
-   *
-   *    Unable to check shm segment
-   *      probably because remote node
-   *      has disconnected and removed it
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SHM_IPC_STAT = 0xc | TE_DO_DISCONNECT
-
-  /**
-   * Permanent error
-   */
-  ,TE_SHM_IPC_PERMANENT = 0x21
-
-  /**
-   * TE_SHM_UNABLE_TO_CREATE_SEGMENT
-   *
-   *    Unable to create shm segment
-   *      probably os something error
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SHM_UNABLE_TO_CREATE_SEGMENT = 0xd
-
-  /**
-   * TE_SHM_UNABLE_TO_ATTACH_SEGMENT
-   *
-   *    Unable to attach shm segment
-   *      probably invalid group / user
-   *
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SHM_UNABLE_TO_ATTACH_SEGMENT = 0xe
-
-  /**
-   * TE_SHM_UNABLE_TO_REMOVE_SEGMENT
-   *
-   *    Unable to remove shm segment
-   *
-   * Recommended behavior: Ignore (not much to do)
-   *                       Print warning to logfile
-   */
-  ,TE_SHM_UNABLE_TO_REMOVE_SEGMENT = 0xf
-
-  ,TE_TOO_SMALL_SIGID = 0x10
-  ,TE_TOO_LARGE_SIGID = 0x11
-  ,TE_WAIT_STACK_FULL = 0x12 | TE_DO_DISCONNECT
-  ,TE_RECEIVE_BUFFER_FULL = 0x13 | TE_DO_DISCONNECT
-
-  /**
-   * TE_SIGNAL_LOST_SEND_BUFFER_FULL
-   *
-   *   Send buffer is full, and trying to force send fails
-   *   a signal is dropped!! very bad very bad
-   *
-   */
-  ,TE_SIGNAL_LOST_SEND_BUFFER_FULL = 0x14 | TE_DO_DISCONNECT
-
-  /**
-   * TE_SIGNAL_LOST
-   *
-   *   Send failed for unknown reason
-   *   a signal is dropped!! very bad very bad
-   *
-   */
-  ,TE_SIGNAL_LOST = 0x15
-
-  /**
-   * TE_SEND_BUFFER_FULL
-   *
-   *   The send buffer was full, but sleeping for a while solved it
-   */
-  ,TE_SEND_BUFFER_FULL = 0x16
-
-  /**
-   * TE_SCI_UNABLE_TO_CLOSE_CHANNEL
-   *
-   *  Unable to close the sci channel and the resources allocated by
-   *  the sisci api.
-   */
-  ,TE_SCI_UNABLE_TO_CLOSE_CHANNEL = 0x22
-
-  /**
-   * TE_SCI_LINK_ERROR
-   *
-   *  There is no link from this node to the switch.
-   *  No point in continuing. Must check the connections.
-   * Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_LINK_ERROR = 0x0017
-
-  /**
-   * TE_SCI_UNABLE_TO_START_SEQUENCE
-   *
-   *  Could not start a sequence, because system resources
-   *  are exumed or no sequence has been created.
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_UNABLE_TO_START_SEQUENCE = 0x18 | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
-   *
-   *  Could not remove a sequence
-   */
-  ,TE_SCI_UNABLE_TO_REMOVE_SEQUENCE = 0x19 | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_UNABLE_TO_CREATE_SEQUENCE
-   *
-   *  Could not create a sequence, because system resources are
-   *  exempted. Must reboot.
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_UNABLE_TO_CREATE_SEQUENCE = 0x1a | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
-   *
-   *  Tried to send data on redundant link but failed.
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR = 0x1b | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_CANNOT_INIT_LOCALSEGMENT
-   *
-   *  Cannot initialize local segment. A whole lot of things has
-   *  gone wrong (no system resources). Must reboot.
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_CANNOT_INIT_LOCALSEGMENT = 0x1c | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_CANNOT_MAP_REMOTESEGMENT
-   *
-   *  Cannot map remote segment. No system resources are left.
-   *  Must reboot system.
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_CANNOT_MAP_REMOTESEGMENT = 0x1d | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_UNABLE_TO_UNMAP_SEGMENT
-   *
-   *  Cannot free the resources used by this segment (step 1).
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_UNABLE_TO_UNMAP_SEGMENT = 0x1e | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_UNABLE_TO_REMOVE_SEGMENT
-   *
-   *  Cannot free the resources used by this segment (step 2).
-   *  Cannot guarantee that enough resources exist for NDB
-   *  to map more segment
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_UNABLE_TO_REMOVE_SEGMENT = 0x1f  | TE_DO_DISCONNECT
-
-  /**
-   * TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT
-   *
-   *  Cannot disconnect from a remote segment.
-   *  Recommended behavior: setPerformState(PerformDisonnect)
-   */
-  ,TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT = 0x20 | TE_DO_DISCONNECT
-
-  /* Used 0x21 */
-  /* Used 0x22 */
-};
-
 /**
- * The TransporterCallback class encapsulates those aspects of the transporter
- * code that is specific to particular upper layer (NDB API, single-threaded
- * kernel, or multi-threaded kernel).
+ * The TransporterReceiveCallback class encapsulates
+ * the receive aspects of the transporter code that is
+ * specific to particular
+ * upper layer (NDB API, single-threaded kernel, or multi-threaded kernel).
  */
-class TransporterCallback {
+class TransporterReceiveHandle : public TransporterReceiveData {
 public:
   /**
    * This method is called to deliver a signal to the upper layer.
@@ -312,9 +50,7 @@ public:
   virtual void deliver_signal(SignalHeader * const header,
                               Uint8 prio,
                               Uint32 * const signalData,
-                              LinearSectionPtr ptr[3],
-                              Uint32 receiverThreadId,
-                              Uint32 receiverThreadNum) = 0;
+                              LinearSectionPtr ptr[3]) = 0;
 
   /**
    * This method is called regularly (currently after receive from each
@@ -328,16 +64,7 @@ public:
    *
    * The method should return non-zero if signals were execute, zero if not.
    */
-  virtual int checkJobBuffer(Uint32 recv_thread_id) = 0;
-
-  /**
-   * The transporter periodically calls this method, indicating the number
-   * of sends done to one NodeId, as well as total bytes sent.
-   *
-   * For multithreaded cases, this is only called while the send lock for the
-   * given node is held.
-   */
-  virtual void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes) = 0;
+  virtual int checkJobBuffer() = 0;
 
   /**
    * Same as reportSendLen(), but for received data.
@@ -381,6 +108,27 @@ public:
    */
   virtual void transporter_recv_from(NodeId node) = 0;
 
+  /**
+   *
+   */
+  virtual ~TransporterReceiveHandle() { };
+};
+
+/**
+ * The TransporterCallback class encapsulates those aspects of the transporter
+ * code that is specific to particular upper layer (NDB API, single-threaded
+ * kernel, or multi-threaded kernel).
+ */
+class TransporterCallback {
+public:
+  /**
+   * The transporter periodically calls this method, indicating the number
+   * of sends done to one NodeId, as well as total bytes sent.
+   *
+   * For multithreaded cases, this is only called while the send lock for the
+   * given node is held.
+   */
+  virtual void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes) = 0;
 
   /**
    * Locking (no-op in single-threaded VM).

=== modified file 'storage/ndb/include/transporter/TransporterDefinitions.hpp'
--- a/storage/ndb/include/transporter/TransporterDefinitions.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/transporter/TransporterDefinitions.hpp	revid:mikael.ronstrom@stripped
@@ -126,64 +126,269 @@ struct SignalHeader {	
   Uint8  m_fragmentInfo;
 }; /** 7x4 = 28 Bytes */
 
-struct LinearSectionPtr {
-  Uint32 sz;
-  Uint32 * p;
-};
-
-struct SegmentedSectionPtrPOD
-{
-  Uint32 sz;
-  Uint32 i;
-  struct SectionSegment * p;
-
-  void setNull() { p = 0;}
-  bool isNull() const { return p == 0;}
-  inline SegmentedSectionPtrPOD& assign(struct SegmentedSectionPtr&);
-};
+class NdbOut & operator <<(class NdbOut & out, SignalHeader & sh);
 
-struct SegmentedSectionPtr {
-  Uint32 sz;
-  Uint32 i;
-  struct SectionSegment * p;
-
-  SegmentedSectionPtr() {}
-  SegmentedSectionPtr(Uint32 sz_arg, Uint32 i_arg,
-                      struct SectionSegment *p_arg)
-    :sz(sz_arg), i(i_arg), p(p_arg)
-  {}
-  SegmentedSectionPtr(const SegmentedSectionPtrPOD & src)
-    :sz(src.sz), i(src.i), p(src.p)
-  {}
+#define TE_DO_DISCONNECT 0x8000
 
-  void setNull() { p = 0;}
-  bool isNull() const { return p == 0;}
-};
+enum TransporterError {
+  TE_NO_ERROR = 0,
+  /**
+   * TE_ERROR_CLOSING_SOCKET
+   *
+   *   Error found during closing of socket
+   *
+   * Recommended behavior: Ignore
+   */
+  TE_ERROR_CLOSING_SOCKET = 0x1,
+
+  /**
+   * TE_ERROR_IN_SELECT_BEFORE_ACCEPT
+   *
+   *   Error found during accept (just before)
+   *     The transporter will retry.
+   *
+   * Recommended behavior: Ignore
+   *   (or possible do setPerformState(PerformDisconnect)
+   */
+  TE_ERROR_IN_SELECT_BEFORE_ACCEPT = 0x2,
+
+  /**
+   * TE_INVALID_MESSAGE_LENGTH
+   *
+   *   Error found in message (message length)
+   *
+   * Recommended behavior: setPerformState(PerformDisconnect)
+   */
+  TE_INVALID_MESSAGE_LENGTH = 0x3 | TE_DO_DISCONNECT,
+
+  /**
+   * TE_INVALID_CHECKSUM
+   *
+   *   Error found in message (checksum)
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  TE_INVALID_CHECKSUM = 0x4 | TE_DO_DISCONNECT,
+
+  /**
+   * TE_COULD_NOT_CREATE_SOCKET
+   *
+   *   Error found while creating socket
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  TE_COULD_NOT_CREATE_SOCKET = 0x5,
+
+  /**
+   * TE_COULD_NOT_BIND_SOCKET
+   *
+   *   Error found while binding server socket
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  TE_COULD_NOT_BIND_SOCKET = 0x6,
+
+  /**
+   * TE_LISTEN_FAILED
+   *
+   *   Error found while listening to server socket
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  TE_LISTEN_FAILED = 0x7,
+
+  /**
+   * TE_ACCEPT_RETURN_ERROR
+   *
+   *   Error found during accept
+   *     The transporter will retry.
+   *
+   * Recommended behavior: Ignore
+   *   (or possible do setPerformState(PerformDisconnect)
+   */
+  TE_ACCEPT_RETURN_ERROR = 0x8
+
+  /**
+   * TE_SHM_DISCONNECT
+   *
+   *    The remote node has disconnected
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SHM_DISCONNECT = 0xb | TE_DO_DISCONNECT
+
+  /**
+   * TE_SHM_IPC_STAT
+   *
+   *    Unable to check shm segment
+   *      probably because remote node
+   *      has disconnected and removed it
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SHM_IPC_STAT = 0xc | TE_DO_DISCONNECT
+
+  /**
+   * Permanent error
+   */
+  ,TE_SHM_IPC_PERMANENT = 0x21
+
+  /**
+   * TE_SHM_UNABLE_TO_CREATE_SEGMENT
+   *
+   *    Unable to create shm segment
+   *      probably os something error
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SHM_UNABLE_TO_CREATE_SEGMENT = 0xd
+
+  /**
+   * TE_SHM_UNABLE_TO_ATTACH_SEGMENT
+   *
+   *    Unable to attach shm segment
+   *      probably invalid group / user
+   *
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SHM_UNABLE_TO_ATTACH_SEGMENT = 0xe
+
+  /**
+   * TE_SHM_UNABLE_TO_REMOVE_SEGMENT
+   *
+   *    Unable to remove shm segment
+   *
+   * Recommended behavior: Ignore (not much to do)
+   *                       Print warning to logfile
+   */
+  ,TE_SHM_UNABLE_TO_REMOVE_SEGMENT = 0xf
+
+  ,TE_TOO_SMALL_SIGID = 0x10
+  ,TE_TOO_LARGE_SIGID = 0x11
+  ,TE_WAIT_STACK_FULL = 0x12 | TE_DO_DISCONNECT
+  ,TE_RECEIVE_BUFFER_FULL = 0x13 | TE_DO_DISCONNECT
+
+  /**
+   * TE_SIGNAL_LOST_SEND_BUFFER_FULL
+   *
+   *   Send buffer is full, and trying to force send fails
+   *   a signal is dropped!! very bad very bad
+   *
+   */
+  ,TE_SIGNAL_LOST_SEND_BUFFER_FULL = 0x14 | TE_DO_DISCONNECT
+
+  /**
+   * TE_SIGNAL_LOST
+   *
+   *   Send failed for unknown reason
+   *   a signal is dropped!! very bad very bad
+   *
+   */
+  ,TE_SIGNAL_LOST = 0x15
+
+  /**
+   * TE_SEND_BUFFER_FULL
+   *
+   *   The send buffer was full, but sleeping for a while solved it
+   */
+  ,TE_SEND_BUFFER_FULL = 0x16
+
+  /**
+   * TE_SCI_UNABLE_TO_CLOSE_CHANNEL
+   *
+   *  Unable to close the sci channel and the resources allocated by
+   *  the sisci api.
+   */
+  ,TE_SCI_UNABLE_TO_CLOSE_CHANNEL = 0x22
+
+  /**
+   * TE_SCI_LINK_ERROR
+   *
+   *  There is no link from this node to the switch.
+   *  No point in continuing. Must check the connections.
+   * Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_LINK_ERROR = 0x0017
+
+  /**
+   * TE_SCI_UNABLE_TO_START_SEQUENCE
+   *
+   *  Could not start a sequence, because system resources
+   *  are exumed or no sequence has been created.
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_UNABLE_TO_START_SEQUENCE = 0x18 | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
+   *
+   *  Could not remove a sequence
+   */
+  ,TE_SCI_UNABLE_TO_REMOVE_SEQUENCE = 0x19 | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_UNABLE_TO_CREATE_SEQUENCE
+   *
+   *  Could not create a sequence, because system resources are
+   *  exempted. Must reboot.
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_UNABLE_TO_CREATE_SEQUENCE = 0x1a | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
+   *
+   *  Tried to send data on redundant link but failed.
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR = 0x1b | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_CANNOT_INIT_LOCALSEGMENT
+   *
+   *  Cannot initialize local segment. A whole lot of things has
+   *  gone wrong (no system resources). Must reboot.
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_CANNOT_INIT_LOCALSEGMENT = 0x1c | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_CANNOT_MAP_REMOTESEGMENT
+   *
+   *  Cannot map remote segment. No system resources are left.
+   *  Must reboot system.
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_CANNOT_MAP_REMOTESEGMENT = 0x1d | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_UNABLE_TO_UNMAP_SEGMENT
+   *
+   *  Cannot free the resources used by this segment (step 1).
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_UNABLE_TO_UNMAP_SEGMENT = 0x1e | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_UNABLE_TO_REMOVE_SEGMENT
+   *
+   *  Cannot free the resources used by this segment (step 2).
+   *  Cannot guarantee that enough resources exist for NDB
+   *  to map more segment
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_UNABLE_TO_REMOVE_SEGMENT = 0x1f  | TE_DO_DISCONNECT
+
+  /**
+   * TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT
+   *
+   *  Cannot disconnect from a remote segment.
+   *  Recommended behavior: setPerformState(PerformDisonnect)
+   */
+  ,TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT = 0x20 | TE_DO_DISCONNECT
 
-inline
-SegmentedSectionPtrPOD&
-SegmentedSectionPtrPOD::assign(struct SegmentedSectionPtr& src)
-{
-  this->i = src.i;
-  this->p = src.p;
-  this->sz = src.sz;
-  return *this;
-}
-
-/* Abstract interface for iterating over
- * words in a section
- */
-struct GenericSectionIterator {
-  virtual ~GenericSectionIterator() {};
-  virtual void reset()=0;
-  virtual const Uint32* getNextWords(Uint32& sz)=0;
+  /* Used 0x21 */
+  /* Used 0x22 */
 };
 
-struct GenericSectionPtr {
-  Uint32 sz;
-  GenericSectionIterator* sectionIter;
-};
-
-class NdbOut & operator <<(class NdbOut & out, SignalHeader & sh);
-
 #endif // Define of TransporterDefinitions_H

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	revid:mikael.ronstrom@stripped
@@ -33,7 +33,6 @@
 #include <sys/epoll.h>
 #endif
 #include "TransporterDefinitions.hpp"
-#include "TransporterCallback.hpp"
 #include <SocketServer.hpp>
 #include <SocketClient.hpp>
 
@@ -85,6 +84,49 @@ public:
 };
 
 /**
+ * TransporterReceiveData
+ *
+ *   State for pollReceive/performReceive
+ *   Moved into own class to enable multi receive threads
+ */
+struct TransporterReceiveData
+{
+  TransporterReceiveData();
+  ~TransporterReceiveData();
+
+  bool init (unsigned maxTransporters);
+
+  /**
+   * Add a transporter to epoll_set
+   *   does nothing if epoll not active
+   */
+  bool epoll_add(TCP_Transporter*);
+
+  /**
+   * Bitmask of transporters currently handled by this instance
+   */
+  NodeBitmask m_transporters;
+
+  /**
+   * Bitmask of transporters that has data "carried over" since
+   *   last performReceive
+   */
+  NodeBitmask m_has_data_transporters;
+#if defined(HAVE_EPOLL_CREATE)
+  int m_epoll_fd;
+  struct epoll_event *m_epoll_events;
+  bool change_epoll(TCP_Transporter *t, bool add);
+#endif
+
+  /**
+   * Used in polling if exists TCP_Transporter
+   */
+  ndb_socket_poller m_socket_poller;
+};
+
+#include "TransporterCallback.hpp"
+
+/**
  * @class TransporterRegistry
  * @brief ...
  */
@@ -98,10 +140,9 @@ public:
   * Constructor
   */
   TransporterRegistry(TransporterCallback *callback,
+                      TransporterReceiveHandle * receiveHandle,
                       bool use_default_send_buffer = true,
-                      unsigned maxReceiveThreads = MAX_NDBMT_RECEIVE_THREADS,
-		      unsigned maxTransporters = MAX_NTRANSPORTERS, 
-		      unsigned sizeOfLongSignalMemory = 100);
+		      unsigned maxTransporters = MAX_NTRANSPORTERS);
 
   /**
    * this handle will be used in the client connect thread
@@ -114,6 +155,12 @@ public:
   bool init(NodeId localNodeId);
 
   /**
+   * Iff using non-default TransporterReceiveHandle's
+   *   they need to get initalized
+   */
+  bool init(TransporterReceiveHandle&);
+
+  /**
      Handle the handshaking with a new client connection
      on the server port.
      NOTE! Connection should be closed if function
@@ -155,29 +202,6 @@ public:
   struct NdbThread* start_clients();
   bool stop_clients();
   void start_clients_thread();
-  void update_connections(Uint32 receiver_thread_id = 0);
-
-  /**
-   * Assign the transporters to receiver threads
-   * Simple round-robin scheme, we could make this flexible in the future
-   * by synchronizing a change based on usage of the various transporters.
-   */
-  void assignRecvThreads(Uint32 num_receive_threads);
-  Uint32 getReceiveThreadId(Uint32 node);
-  void setReceiveThreadNum(Uint32 first_receiver_thread_num)
-  {
-    theFirstReceiverThreadNum = first_receiver_thread_num;
-  }
-  Uint32 getReceiveThreadNum(Uint32 node)
-  {
-    return getReceiveThreadId(node) + theFirstReceiverThreadNum;
-  }
-  void getReceiveThreadInfo(Uint32 node, Uint32 *id, Uint32 *num)
-  {
-    *id = getReceiveThreadId(node);
-    *num = *id + theFirstReceiverThreadNum;
-  }
-
 
   /**
    * Start/Stop receiving
@@ -211,8 +235,8 @@ public:
   void do_connect(NodeId node_id);
   void do_disconnect(NodeId node_id, int errnum = 0);
   bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
-  void report_connect(NodeId node_id);
-  void report_disconnect(NodeId node_id, int errnum);
+  void report_connect(TransporterReceiveHandle&, NodeId node_id);
+  void report_disconnect(TransporterReceiveHandle&, NodeId node_id, int errnum);
   void report_error(NodeId nodeId, TransporterError errorCode,
                     const char *errorInfo = 0);
   
@@ -328,20 +352,10 @@ public:
    *
    */
   void external_IO(Uint32 timeOutMillis);
-  
-  inline Uint32 pollReceive(Uint32 timeOutMillis,
-                            Uint32 receiver_thread_id = 0) {
-    return pollReceive(timeOutMillis,
-                       receiver_thread_id,
-                       m_has_data_transporters_ptr[receiver_thread_id]);
-  }
-  Uint32 pollReceive(Uint32 timeOutMillis,
-                     Uint32 receiver_thread_id,
-                     NodeBitmask* mask);
-  void performReceive(Uint32 receiver_thread_id = 0);
+
   int performSend(NodeId nodeId);
   void performSend();
-  
+
   /**
    * Force sending if more than or equal to sendLimit
    * number have asked for send. Returns 0 if not sending
@@ -353,13 +367,6 @@ public:
   void printState();
 #endif
 
-#ifdef ERROR_INSERT
-  /* Utils for testing latency issues */
-  bool isBlocked(NodeId nodeId);
-  void blockReceive(NodeId nodeId);
-  void unblockReceive(NodeId nodeId);
-#endif
-  
   class Transporter_interface {
   public:
     NodeId m_remote_nodeId;
@@ -371,14 +378,11 @@ public:
 		  		 int s_port);	// signed port. <0 is dynamic
   Transporter* get_transporter(NodeId nodeId);
   struct in_addr get_connect_address(NodeId node_id) const;
-  void registerBitmaskPtr(Uint32 recv_thread_id, NodeBitmask *mask)
-  {
-    m_has_data_transporters_ptr[recv_thread_id] = mask;
-  }
 protected:
   
 private:
   TransporterCallback *callbackObj;
+  TransporterReceiveHandle * receiveHandle;
 
   NdbMgmHandle m_mgm_handle;
 
@@ -401,17 +405,6 @@ private:
 #endif
 
   /**
-   * Bitmask of transporters that has data "carried over" since
-   *   last performReceive
-   */
-  NodeBitmask m_has_data_transporters;
-  NodeBitmask *m_has_data_transporters_ptr[MAX_NDBMT_RECEIVE_THREADS];
-#if defined(HAVE_EPOLL_CREATE)
-  int m_epoll_fd[MAX_NDBMT_RECEIVE_THREADS];
-  struct epoll_event *m_epoll_events[MAX_NDBMT_RECEIVE_THREADS];
-  bool change_epoll(TCP_Transporter *t, bool add);
-#endif
-  /**
    * Arrays holding all transporters in the order they are created
    */
   TCP_Transporter** theTCPTransporters;
@@ -440,21 +433,23 @@ private:
    * Overloaded bits, for fast check.
    */
   NodeBitmask m_status_overloaded;
- 
+
   /**
    * Unpack signal data.
    *
    * Defined in Packer.cpp.
    */
-  Uint32 unpack(Uint32 * readPtr,
-		Uint32 bufferSize,
-		NodeId remoteNodeId, 
-		IOState state);
-  
-  Uint32 * unpack(Uint32 * readPtr,
-		  Uint32 * eodPtr,
-		  NodeId remoteNodeId,
-		  IOState state);
+  Uint32 unpack(TransporterReceiveHandle&,
+                Uint32 * readPtr,
+                Uint32 bufferSize,
+                NodeId remoteNodeId,
+                IOState state);
+
+  Uint32 * unpack(TransporterReceiveHandle&,
+                  Uint32 * readPtr,
+                  Uint32 * eodPtr,
+                  NodeId remoteNodeId,
+                  IOState state);
 
   static Uint32 unpack_length_words(const Uint32 *readPtr, Uint32 maxWords);
   /** 
@@ -465,23 +460,21 @@ private:
    */
   void removeTransporter(NodeId nodeId);
 
-  /**
-   * Used in polling if exists TCP_Transporter
-   */
-  int tcpReadSelectReply;
-  Uint32 theFirstReceiverThreadNum;
-  ndb_socket_poller m_socket_poller[MAX_NDBMT_RECEIVE_THREADS];
-
-  Uint32 poll_TCP(Uint32 timeOutMillis, Uint32 recv_thread_id, NodeBitmask*);
-  Uint32 poll_SCI(Uint32 timeOutMillis, Uint32 recv_thread_id, NodeBitmask*);
-  Uint32 poll_SHM(Uint32 timeOutMillis, Uint32 recv_thread_id, NodeBitmask*);
+  Uint32 poll_TCP(Uint32 timeOutMillis, TransporterReceiveHandle&);
+  Uint32 poll_SCI(Uint32 timeOutMillis, TransporterReceiveHandle&);
+  Uint32 poll_SHM(Uint32 timeOutMillis, TransporterReceiveHandle&);
 
   int m_shm_own_pid;
   int m_transp_count;
 
 public:
-  bool setup_wakeup_socket();
+  bool setup_wakeup_socket(TransporterReceiveHandle&);
   void wakeup();
+
+  inline bool setup_wakeup_socket() {
+    assert(receiveHandle != 0);
+    return setup_wakeup_socket(* receiveHandle);
+  }
 private:
   bool m_has_extra_wakeup_socket;
   NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
@@ -562,6 +555,34 @@ public:
 
   void print_transporters(const char* where, NdbOut& out = ndbout);
 
+  /**
+   * Receiving
+   */
+  Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
+  void performReceive(TransporterReceiveHandle&);
+  void update_connections(TransporterReceiveHandle&);
+
+  inline Uint32 pollReceive(Uint32 timeOutMillis) {
+    assert(receiveHandle != 0);
+    return pollReceive(timeOutMillis, * receiveHandle);
+  }
+
+  inline void performReceive() {
+    assert(receiveHandle != 0);
+    performReceive(* receiveHandle);
+  }
+
+  inline void update_connections() {
+    assert(receiveHandle != 0);
+    update_connections(* receiveHandle);
+  }
+
+#ifdef ERROR_INSERT
+  /* Utils for testing latency issues */
+  bool isBlocked(NodeId nodeId);
+  void blockReceive(TransporterReceiveHandle&, NodeId nodeId);
+  void unblockReceive(TransporterReceiveHandle&, NodeId nodeId);
+#endif
 };
 
 inline void

=== removed file 'storage/ndb/include/util/Checksum.hpp'
--- a/storage/ndb/include/util/Checksum.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/include/util/Checksum.hpp	1970-01-01 00:00:00 +0000
@@ -1,126 +0,0 @@
-/*
-   Copyright (c) 2012, Oracle and/or its affiliates. All rights reserved.
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; version 2 of the License.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
-*/
-
-#ifndef CHECKSUM_HPP
-#define CHECKSUM_HPP
-
-
-/**
-  Optimized XOR checksum calculation. Loop unrolling will 
-  reduce relative loop overhead and encourace usage of parallel
-  arithmetic adders which are common on most modern CPUs.
-*/
-inline
-Uint32
-computeXorChecksumShort(const Uint32 *buf, Uint32 words, Uint32 sum = 0)
-{
-  const Uint32 *end_unroll = buf + (words & ~3);
-  const Uint32 *end        = buf + words;
-
-  /**
-   * Aggregate as chunks of 4*Uint32 words:
-   * Take care if rewriting this part, code has intentionally
-   * been unrolled in order to take advantage of HW parallelism
-   * where there are multiple adders in the CPU core.
-   */
-  while (buf < end_unroll)
-  {
-    sum ^= buf[0] ^ buf[1] ^ buf[2] ^ buf[3];
-    buf += 4;
-  }
-  // Wrap up remaining part
-  while (buf < end)
-  {
-    sum ^= buf[0];
-    buf++;
-  }
-  return sum;
-}
-
-/**
-  Optimized XOR checksum calculation intended for longer strings.
-  Temporary aggregate XOR-sums into Uint64 which are folded into
-  Uint32 in the final stage.
-  Also unrool loop as above to take advantage of HW parallelism.
-  Callee is responsible for checking that there are sufficient 'words'
-  to be checksumed to complete at least a chunk of 4*Uint64 words.
-*/
-inline
-Uint32
-computeXorChecksumLong(const Uint32 *buf, Uint32 words, Uint32 sum = 0)
-{
-  unsigned int i = 0;
-
-  // Align to Uint64 boundary to optimize mem. access below
-  if (((size_t)(buf) % 8) != 0)
-  {
-    sum ^= buf[0];
-    buf++;
-    words--;
-  }
-
-  const Uint64 *p = reinterpret_cast<const Uint64*>(buf);
-  Uint64 sum64 = *p++;
-
-  const Uint32 words64 = (words/2) - 1;  // Rem. after init of sum64
-  const Uint64 *end = p + (words64 & ~3);
-
-  /**
-   * Aggregate as chunks of 4*Uint64 words:
-   * Take care if rewriting this part: code has intentionally
-   * been unrolled in order to take advantage of HW parallelism
-   * where there are multiple adders in the CPU core.
-   */
-  do
-  {
-    sum64 ^= p[0] ^ p[1] ^ p[2] ^ p[3];
-    p+=4;
-  } while (p < end);
-
-  // Wrap up last part which didn't fit in a 4*Uint64 chunk
-  end += (words64 % 4);
-  while (p < end)
-  {
-    sum64 ^= p[0];
-    p++;
-  }
-
-  // Fold temp Uint64 sum into a final Uint32 sum
-  sum ^= (Uint32)(sum64 & 0xffffffff) ^ 
-         (Uint32)(sum64 >> 32);
-  
-  // Append last odd Uint32 word
-  if ((words%2) != 0)
-    sum ^= buf[words-1];
-
-  return sum;
-}
-
-
-inline
-Uint32
-computeXorChecksum(const Uint32 *buf, Uint32 words, Uint32 sum = 0)
-{
-  if (words < 16)  // Decided by empirical experiments 
-    return computeXorChecksumShort(buf,words,sum);
-  else
-    return computeXorChecksumLong(buf,words,sum);
-}
-
-
-#endif // CHECKSUM_HPP
-

=== modified file 'storage/ndb/memcache/src/schedulers/S_sched.cc'
--- a/storage/ndb/memcache/src/schedulers/S_sched.cc	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/memcache/src/schedulers/S_sched.cc	revid:mikael.ronstrom@stripped
@@ -919,6 +919,8 @@ void * S::Connection::run_ndb_poll_threa
       }
     }
   }
+  return 0; /* not reached */
+  return 0; /* not reached */
 }
 
 

=== modified file 'storage/ndb/memcache/unit/test_workqueue.c'
--- a/storage/ndb/memcache/unit/test_workqueue.c	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/memcache/unit/test_workqueue.c	revid:mikael.ronstrom@stripped
@@ -17,6 +17,12 @@
  Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
  02110-1301  USA
  */
+
+#ifdef __GNUC__
+/* Required for useconds_t */
+#define _XOPEN_SOURCE 500
+#endif
+
 #include <unistd.h>
 #include <stdlib.h>
 #include <stdio.h>

=== modified file 'storage/ndb/src/common/debugger/EventLogger.cpp'
--- a/storage/ndb/src/common/debugger/EventLogger.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/debugger/EventLogger.cpp	revid:mikael.ronstrom@stripped
@@ -18,7 +18,7 @@
 #include <ndb_global.h>
 
 #include <EventLogger.hpp>
-#include <TransporterCallback.hpp>
+#include <TransporterDefinitions.hpp>
 
 #include <NdbConfig.h>
 #include <kernel/BlockNumbers.h>

=== modified file 'storage/ndb/src/common/debugger/SignalLoggerManager.cpp'
--- a/storage/ndb/src/common/debugger/SignalLoggerManager.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/debugger/SignalLoggerManager.cpp	revid:mikael.ronstrom@stripped
@@ -18,7 +18,7 @@
 #include <ndb_global.h>
 
 #include "SignalLoggerManager.hpp"
-#include <LongSignal.hpp>
+#include "TransporterDefinitions.hpp"
 #include <GlobalSignalNumbers.h>
 #include <DebuggerNames.hpp>
 #include <NdbTick.h>

=== modified file 'storage/ndb/src/common/debugger/signaldata/SignalNames.cpp'
--- a/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/debugger/signaldata/SignalNames.cpp	revid:mikael.ronstrom@stripped
@@ -259,9 +259,9 @@ const GsnName SignalNames [] = {
   ,{ GSN_NEXT_SCANREF,           "NEXT_SCANREF" }
   ,{ GSN_NEXT_SCANREQ,           "NEXT_SCANREQ" }
   ,{ GSN_NF_COMPLETEREP,         "NF_COMPLETEREP" }
-  ,{ GSN_OPEN_COMCONF,           "OPEN_COMCONF" }
-  ,{ GSN_OPEN_COMREF,            "OPEN_COMREF" }
-  ,{ GSN_OPEN_COMREQ,            "OPEN_COMREQ" }
+//  ,{ GSN_OPEN_COMCONF,           "OPEN_COMCONF" }
+//  ,{ GSN_OPEN_COMREF,            "OPEN_COMREF" }
+  ,{ GSN_OPEN_COMORD,            "OPEN_COMORD" }
   ,{ GSN_PACKED_SIGNAL,          "PACKED_SIGNAL" }
   ,{ GSN_PREP_FAILCONF,          "PREP_FAILCONF" }
   ,{ GSN_PREP_FAILREF,           "PREP_FAILREF" }

=== modified file 'storage/ndb/src/common/transporter/Packer.cpp'
--- a/storage/ndb/src/common/transporter/Packer.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/Packer.cpp	revid:mikael.ronstrom@stripped
@@ -29,18 +29,16 @@ Uint32 MAX_RECEIVED_SIGNALS = 1024;
 #endif
 
 Uint32
-TransporterRegistry::unpack(Uint32 * readPtr,
-			    Uint32 sizeOfData,
-			    NodeId remoteNodeId,
-			    IOState state) {
+TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
+                            Uint32 * readPtr,
+                            Uint32 sizeOfData,
+                            NodeId remoteNodeId,
+                            IOState state) {
   SignalHeader signalHeader;
   LinearSectionPtr ptr[3];
   
   Uint32 usedData   = 0;
-  Uint32 loop_count = 0;
-  Uint32 receiverThreadNum;
-  Uint32 receiverThreadId;
-  getReceiveThreadInfo(remoteNodeId, &receiverThreadId, &receiverThreadNum);
+  Uint32 loop_count = 0; 
  
   if(state == NoHalt || state == HaltOutput){
     while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
@@ -115,8 +113,7 @@ TransporterRegistry::unpack(Uint32 * rea
 	sectionData += sz;
       }
 
-      callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr,
-                                  receiverThreadId, receiverThreadNum);
+      recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       
       readPtr     += messageLen32;
       sizeOfData  -= messageLenBytes;
@@ -202,8 +199,7 @@ TransporterRegistry::unpack(Uint32 * rea
 	  sectionData += sz;
 	}
 
-	callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr,
-                                    receiverThreadId, receiverThreadNum);
+	recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       } else {
 	DEBUG("prepareReceive(...) - Discarding message to block: "
 	      << rBlockNum << " from Node: " << remoteNodeId);
@@ -220,17 +216,14 @@ TransporterRegistry::unpack(Uint32 * rea
 }
 
 Uint32 *
-TransporterRegistry::unpack(Uint32 * readPtr,
-			    Uint32 * eodPtr,
-			    NodeId remoteNodeId,
-			    IOState state) {
+TransporterRegistry::unpack(TransporterReceiveHandle & recvHandle,
+                            Uint32 * readPtr,
+                            Uint32 * eodPtr,
+                            NodeId remoteNodeId,
+                            IOState state) {
   SignalHeader signalHeader;
   LinearSectionPtr ptr[3];
   Uint32 loop_count = 0;
-  Uint32 receiverThreadNum;
-  Uint32 receiverThreadId;
-  getReceiveThreadInfo(remoteNodeId, &receiverThreadId, &receiverThreadNum);
-
   if(state == NoHalt || state == HaltOutput){
     while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
       Uint32 word1 = readPtr[0];
@@ -298,8 +291,7 @@ TransporterRegistry::unpack(Uint32 * rea
 	sectionData += sz;
       }
       
-      callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr,
-                                  receiverThreadId, receiverThreadNum);
+      recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       
       readPtr += messageLen32;
     }//while
@@ -376,8 +368,7 @@ TransporterRegistry::unpack(Uint32 * rea
 	  sectionData += sz;
 	}
 
-	callbackObj->deliver_signal(&signalHeader, prio, signalData, ptr,
-                                    receiverThreadId, receiverThreadNum);
+	recvHandle.deliver_signal(&signalHeader, prio, signalData, ptr);
       } else {
 	DEBUG("prepareReceive(...) - Discarding message to block: "
 	      << rBlockNum << " from Node: " << remoteNodeId);

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	revid:mikael.ronstrom@stripped
@@ -386,7 +386,8 @@ ok:
 }
 
 int
-TCP_Transporter::doReceive() {
+TCP_Transporter::doReceive(TransporterReceiveHandle& recvdata)
+{
   // Select-function must return the socket for read
   // before this method is called
   // It reads the external TCP/IP interface once
@@ -417,8 +418,8 @@ TCP_Transporter::doReceive() {
       receiveSize  += nBytesRead;
       
       if(receiveCount == reportFreq){
-	get_callback_obj()->reportReceiveLen(remoteNodeId,
-                                             receiveCount, receiveSize);
+        recvdata.reportReceiveLen(remoteNodeId,
+                                  receiveCount, receiveSize);
 	receiveCount = 0;
 	receiveSize  = 0;
       }

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp	revid:mikael.ronstrom@stripped
@@ -43,6 +43,7 @@ struct ReceiveBuffer {
 };
 
 class TCP_Transporter : public Transporter {
+  friend struct TransporterReceiveData;
   friend class TransporterRegistry;
   friend class Loopback_Transporter;
 private:
@@ -69,7 +70,7 @@ private:
    * It reads the external TCP/IP interface once 
    * and puts the data in the receiveBuffer
    */
-  int doReceive(); 
+  int doReceive(TransporterReceiveHandle&);
 
   /**
    * Returns socket (used for select)

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	revid:mikael.ronstrom@stripped
@@ -40,8 +40,7 @@ Transporter::Transporter(TransporterRegi
 			 int _byteorder, 
 			 bool _compression, bool _checksum, bool _signalId,
                          Uint32 max_send_buffer)
-  : m_s_port(s_port), m_receiver_thread_id(0),
-    remoteNodeId(rNodeId), localNodeId(lNodeId),
+  : m_s_port(s_port), remoteNodeId(rNodeId), localNodeId(lNodeId),
     isServer(lNodeId==serverNodeId),
     m_packer(_signalId, _checksum), m_max_send_buffer(max_send_buffer),
     m_overload_limit(0xFFFFFFFF), isMgmConnection(_isMgmConnection),

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	revid:mikael.ronstrom@stripped
@@ -92,16 +92,6 @@ public:
                                                  used >= m_overload_limit);
   }
 
-  Uint32 receiver_thread_id()
-  {
-    return m_receiver_thread_id;
-  }
-
-  void set_receiver_thread_id(Uint32 receiver_thread_id)
-  {
-    m_receiver_thread_id = receiver_thread_id;
-  }
-
   virtual int doSend() = 0;
 
   bool has_data_to_send()
@@ -154,8 +144,6 @@ protected:
 
   int m_s_port;
 
-  Uint32 m_receiver_thread_id;
-
   const NodeId remoteNodeId;
   const NodeId localNodeId;
   

=== modified file 'storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp'
--- a/storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TransporterInternalDefinitions.hpp	revid:mikael.ronstrom@stripped
@@ -19,8 +19,6 @@
 #ifndef TransporterInternalDefinitions_H
 #define TransporterInternalDefinitions_H
 
-#include <Checksum.hpp>
-
 #if defined DEBUG_TRANSPORTER || defined VM_TRACE
 #include <NdbOut.hpp>
 #endif
@@ -51,7 +49,10 @@
 inline
 Uint32
 computeChecksum(const Uint32 * const startOfData, int nWords) {
-  return computeXorChecksum(startOfData+1, nWords-1, startOfData[0]);
+  Uint32 chksum = startOfData[0];
+  for (int i=1; i < nWords; i++)
+    chksum ^= startOfData[i];
+  return chksum;
 }
 
 struct Protocol6 {

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	revid:mikael.ronstrom@stripped
@@ -74,11 +74,128 @@ SocketServer::Session * TransporterServi
   DBUG_RETURN(0);
 }
 
+TransporterReceiveData::TransporterReceiveData()
+{
+  /**
+   * With multi receiver threads
+   *   an interface to reassign these is needed...
+   */
+  m_transporters.set();            // Handle all
+  m_transporters.clear(Uint32(0)); // Except wakeup socket...
+
+#if defined(HAVE_EPOLL_CREATE)
+  m_epoll_fd = -1;
+  m_epoll_events = 0;
+#endif
+}
+
+bool
+TransporterReceiveData::init(unsigned maxTransporters)
+{
+  maxTransporters += 1; /* wakeup socket */
+#if defined(HAVE_EPOLL_CREATE)
+  m_epoll_fd = epoll_create(maxTransporters);
+  if (m_epoll_fd == -1)
+  {
+    perror("epoll_create failed... falling back to select!");
+    goto fallback;
+  }
+  m_epoll_events = new struct epoll_event[maxTransporters];
+  if (m_epoll_events == 0)
+  {
+    perror("Failed to alloc epoll-array... falling back to select!");
+    close(m_epoll_fd);
+    m_epoll_fd = -1;
+    goto fallback;
+  }
+  bzero(m_epoll_events, maxTransporters * sizeof(struct epoll_event));
+  return true;
+fallback:
+#endif
+  return m_socket_poller.set_max_count(maxTransporters);
+}
+
+bool
+TransporterReceiveData::epoll_add(TCP_Transporter *t)
+{
+  assert(m_transporters.get(t->getRemoteNodeId()));
+#if defined(HAVE_EPOLL_CREATE)
+  if (m_epoll_fd != -1)
+  {
+    bool add = true;
+    struct epoll_event event_poll;
+    bzero(&event_poll, sizeof(event_poll));
+    NDB_SOCKET_TYPE sock_fd = t->getSocket();
+    int node_id = t->getRemoteNodeId();
+    int op = EPOLL_CTL_ADD;
+    int ret_val, error;
+
+    if (!my_socket_valid(sock_fd))
+      return FALSE;
+
+    event_poll.data.u32 = t->getRemoteNodeId();
+    event_poll.events = EPOLLIN;
+    ret_val = epoll_ctl(m_epoll_fd, op, sock_fd.fd, &event_poll);
+    if (!ret_val)
+      goto ok;
+    error= errno;
+    if (error == ENOENT && !add)
+    {
+      /*
+       * Could be that socket was closed premature to this call.
+       * Not a problem that this occurs.
+       */
+      goto ok;
+    }
+    if (!add || (add && (error != ENOMEM)))
+    {
+      /*
+       * Serious problems, we are either using wrong parameters,
+       * have permission problems or the socket doesn't support
+       * epoll!!
+       */
+      ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
+               " node %u to epoll-set,"
+               " errno: %u %s",
+               add ? "ADD" : "DEL",
+               m_epoll_fd,
+               MY_SOCKET_FORMAT_VALUE(sock_fd),
+               node_id,
+               error,
+               strerror(error));
+      abort();
+    }
+    ndbout << "We lacked memory to add the socket for node id ";
+    ndbout << node_id << endl;
+    return false;
+  }
+
+ok:
+#endif
+  return true;
+}
+
+TransporterReceiveData::~TransporterReceiveData()
+{
+#if defined(HAVE_EPOLL_CREATE)
+  if (m_epoll_fd != -1)
+  {
+    close(m_epoll_fd);
+    m_epoll_fd = -1;
+  }
+
+  if (m_epoll_events)
+  {
+    delete [] m_epoll_events;
+    m_epoll_events = 0;
+  }
+#endif
+}
+
 TransporterRegistry::TransporterRegistry(TransporterCallback *callback,
+                                         TransporterReceiveHandle * recvHandle,
                                          bool use_default_send_buffer,
-                                         unsigned maxReceiveThreads,
-					 unsigned _maxTransporters,
-					 unsigned sizeOfLongSignalMemory) :
+                                         unsigned _maxTransporters) :
   m_mgm_handle(0),
   localNodeId(0),
   m_transp_count(0),
@@ -86,9 +203,9 @@ TransporterRegistry::TransporterRegistry
   m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
   m_total_max_send_buffer(0)
 {
-  Uint32 i;
   DBUG_ENTER("TransporterRegistry::TransporterRegistry");
 
+  receiveHandle = recvHandle;
   maxTransporters = _maxTransporters;
   sendCounter = 1;
   
@@ -103,47 +220,9 @@ TransporterRegistry::TransporterRegistry
   ioStates            = new IOState           [maxTransporters]; 
   m_disconnect_errnum = new int               [maxTransporters];
   m_error_states      = new ErrorState        [maxTransporters];
- 
+
   m_has_extra_wakeup_socket = false;
-  m_has_data_transporters_ptr[0] = &m_has_data_transporters;
-#if defined(HAVE_EPOLL_CREATE)
- for (Uint32 recv_thread_id = 0;
-      recv_thread_id < maxReceiveThreads;
-      recv_thread_id++)
- {
-   m_epoll_fd[recv_thread_id] = -1;
-   m_epoll_events[recv_thread_id]  = new struct epoll_event[maxTransporters];
-   m_epoll_fd[recv_thread_id] = epoll_create(maxTransporters);
-   if (m_epoll_fd[recv_thread_id] == -1 || !m_epoll_events[recv_thread_id])
-   {
-     /* Failure to allocate data or get epoll socket, abort */
-     perror("Failed to alloc epoll-array or calling epoll_create... falling back to select!");
-     for (i = 0; i <= recv_thread_id; i++)
-     {
-       if (m_epoll_fd[recv_thread_id] != -1)
-       {
-         close(m_epoll_fd[i]);
-         m_epoll_fd[i] = -1;
-       }
-       if (m_epoll_events[i])
-       {
-         delete [] m_epoll_events[i];
-         m_epoll_events[i] = 0;
-       }
-     }
-   }
-   else
-   {
-     memset((char*)m_epoll_events[recv_thread_id], 0,
-            maxTransporters * sizeof(struct epoll_event));
-   }
- }
- for (i = maxReceiveThreads; i < MAX_NDBMT_RECEIVE_THREADS; i++)
- {
-   m_epoll_events[i] = NULL;
-   m_epoll_fd[i] = (int)-1;
- }
-#endif
+
 #ifdef ERROR_INSERT
   m_blocked.clear();
   m_blocked_with_data.clear();
@@ -271,13 +350,6 @@ TransporterRegistry::~TransporterRegistr
   if (m_send_buffer_memory)
     delete[] m_send_buffer_memory;
 
-#if defined(HAVE_EPOLL_CREATE)
-  for (Uint32 i = 0; i < MAX_NDBMT_RECEIVE_THREADS; i++)
-  {
-    if (m_epoll_events[i]) delete [] m_epoll_events[i];
-    if (m_epoll_fd[i] != -1) close(m_epoll_fd[i]);
-  }
-#endif
   if (m_mgm_handle)
     ndb_mgm_destroy_handle(&m_mgm_handle);
 
@@ -316,16 +388,22 @@ TransporterRegistry::init(NodeId nodeId)
 
   DEBUG("TransporterRegistry started node: " << localNodeId);
 
-  for (Uint32 i = 0; i < MAX_NDBMT_RECEIVE_THREADS; i++)
+  if (receiveHandle)
   {
-    if (!m_socket_poller[i].set_max_count(maxTransporters +
-                                          1 /* wakeup socket */))
+    if (!init(* receiveHandle))
       DBUG_RETURN(false);
   }
+
   DBUG_RETURN(true);
 }
 
 bool
+TransporterRegistry::init(TransporterReceiveHandle& recvhandle)
+{
+  return recvhandle.init(maxTransporters);
+}
+
+bool
 TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd,
                                     BaseString & msg) const
 {
@@ -427,25 +505,10 @@ TransporterRegistry::connect_server(NDB_
     // false to close the connection
     DBUG_RETURN(false);
   }
+
   DBUG_RETURN(res);
 }
 
-void TransporterRegistry::assignRecvThreads(Uint32 num_receive_threads)
-{
-  Uint32 recv_thread_id= 0;
-  if (!num_receive_threads)
-    num_receive_threads = 1; //0 receive threads => 1 receive thread
-  for (Uint32 i = 0; i < maxTransporters; i++)
-  {
-    Transporter * t = theTransporters[i];
-    if (!t)
-      continue;
-    t->set_receiver_thread_id(recv_thread_id);
-    recv_thread_id++;
-    if (recv_thread_id == num_receive_threads)
-      recv_thread_id = 0;
-  }
-}
 
 bool
 TransporterRegistry::configureTransporter(TransporterConfiguration *config)
@@ -699,7 +762,8 @@ TransporterRegistry::prepareSend(Transpo
 	  return SEND_OK;
 	}
 
-	int sleepTime = 2;	
+        set_status_overloaded(nodeId, true);
+        int sleepTime = 2;
 
 	/**
 	 * @note: on linux/i386 the granularity is 10ms
@@ -782,12 +846,12 @@ TransporterRegistry::prepareSend(Transpo
 	  return SEND_OK;
 	}
 	
-	
 	/**
 	 * @note: on linux/i386 the granularity is 10ms
 	 *        so sleepTime = 2 generates a 10 ms sleep.
 	 */
-	int sleepTime = 2;
+        set_status_overloaded(nodeId, true);
+        int sleepTime = 2;
 	for(int i = 0; i<50; i++){
 	  if((nSHMTransporters+nSCITransporters) == 0)
 	    NdbSleep_MilliSleep(sleepTime); 
@@ -865,12 +929,12 @@ TransporterRegistry::prepareSend(Transpo
           return SEND_OK;
 	}
 
-
 	/**
 	 * @note: on linux/i386 the granularity is 10ms
 	 *        so sleepTime = 2 generates a 10 ms sleep.
 	 */
-        int sleepTime = 2;	
+        set_status_overloaded(nodeId, true);
+        int sleepTime = 2;
 	for(int i = 0; i<50; i++){
 	  if((nSHMTransporters+nSCITransporters) == 0)
 	    NdbSleep_MilliSleep(sleepTime); 
@@ -920,20 +984,24 @@ TransporterRegistry::external_IO(Uint32 
   // followed by the receive part where we expect to sleep for
   // a while.
   //-----------------------------------------------------------
-  if(pollReceive(timeOutMillis)){
-    performReceive();
+  if(pollReceive(timeOutMillis, * receiveHandle)){
+    performReceive(* receiveHandle);
   }
   performSend();
 }
 
 bool
-TransporterRegistry::setup_wakeup_socket()
+TransporterRegistry::setup_wakeup_socket(TransporterReceiveHandle& recvdata)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
   if (m_has_extra_wakeup_socket)
   {
     return true;
   }
 
+  assert(!recvdata.m_transporters.get(0));
+
   if (my_socketpair(m_extra_wakeup_sockets))
   {
     perror("socketpair failed!");
@@ -947,15 +1015,15 @@ TransporterRegistry::setup_wakeup_socket
   }
 
 #if defined(HAVE_EPOLL_CREATE)
-  if (m_epoll_fd[0] != -1)
+  if (recvdata.m_epoll_fd != -1)
   {
     int sock = m_extra_wakeup_sockets[0].fd;
     struct epoll_event event_poll;
     bzero(&event_poll, sizeof(event_poll));
     event_poll.data.u32 = 0;
     event_poll.events = EPOLLIN;
-    /* Always use receiver thread id 0 for extra socket */
-    int ret_val = epoll_ctl(m_epoll_fd[0], EPOLL_CTL_ADD, sock, &event_poll);
+    int ret_val = epoll_ctl(recvdata.m_epoll_fd, EPOLL_CTL_ADD, sock,
+                            &event_poll);
     if (ret_val != 0)
     {
       int error= errno;
@@ -967,6 +1035,7 @@ TransporterRegistry::setup_wakeup_socket
   }
 #endif
   m_has_extra_wakeup_socket = true;
+  recvdata.m_transporters.set(Uint32(0));
   return true;
 
 err:
@@ -987,28 +1056,19 @@ TransporterRegistry::wakeup()
   }
 }
 
-Uint32 TransporterRegistry::getReceiveThreadId(Uint32 node)
-{
-  /* Always use receive thread id 0 for own node and not used
-     node ids */
-  if (theTransporters[node])
-    return theTransporters[node]->receiver_thread_id();
-  else
-    return 0; /* Returns an impossible thread id simply */
-}
-
 Uint32
 TransporterRegistry::pollReceive(Uint32 timeOutMillis,
-                                 Uint32 recv_thread_id,
-                                 NodeBitmask* mask)
+                                 TransporterReceiveHandle& recvdata)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
   Uint32 retVal = 0;
 
   /**
    * If any transporters have left-over data that was not fully executed in
    * last loop, don't wait and return 'data available' even if nothing new
    */
-  if (!mask->isclear())
+  if (!recvdata.m_has_data_transporters.isclear())
   {
     timeOutMillis = 0;
     retVal = 1;
@@ -1022,7 +1082,7 @@ TransporterRegistry::pollReceive(Uint32 
 #ifdef NDB_SHM_TRANSPORTER
   if (nSHMTransporters > 0)
   {
-    Uint32 res = poll_SHM(0, recv_thread_id, mask);
+    Uint32 res = poll_SHM(0, recvdata);
     if(res)
     {
       retVal |= res;
@@ -1033,16 +1093,16 @@ TransporterRegistry::pollReceive(Uint32 
 
 #ifdef NDB_TCP_TRANSPORTER
 #if defined(HAVE_EPOLL_CREATE)
-  if (likely(m_epoll_fd[recv_thread_id] != -1))
+  if (likely(recvdata.m_epoll_fd != -1))
   {
+    int tcpReadSelectReply = 0;
     Uint32 num_trps = nTCPTransporters + (m_has_extra_wakeup_socket ? 1 : 0);
-    
+
     if (num_trps)
     {
-      tcpReadSelectReply = epoll_wait(m_epoll_fd[recv_thread_id],
-                                      m_epoll_events[recv_thread_id],
-                                      num_trps,
-                                      timeOutMillis);
+      tcpReadSelectReply = epoll_wait(recvdata.m_epoll_fd,
+                                      recvdata.m_epoll_events,
+                                      num_trps, timeOutMillis);
       retVal |= tcpReadSelectReply;
     }
 
@@ -1051,7 +1111,7 @@ TransporterRegistry::pollReceive(Uint32 
     {
       for (int i = 0; i < num_socket_events; i++)
       {
-        const Uint32 trpid = m_epoll_events[recv_thread_id][i].data.u32;
+        const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
 #ifdef ERROR_INSERT
         if (m_blocked.get(trpid))
         {
@@ -1060,7 +1120,12 @@ TransporterRegistry::pollReceive(Uint32 
           continue;
         }
 #endif
-        mask->set(trpid);
+        /**
+         * check that it's assigned to "us"
+         */
+        assert(recvdata.m_transporters.get(trpid));
+
+        recvdata.m_has_data_transporters.set(trpid);
       }
     }
     else if (num_socket_events < 0)
@@ -1073,20 +1138,18 @@ TransporterRegistry::pollReceive(Uint32 
   {
     if (nTCPTransporters > 0 || m_has_extra_wakeup_socket)
     {
-      retVal |= poll_TCP(timeOutMillis, recv_thread_id, mask);
+      retVal |= poll_TCP(timeOutMillis, recvdata);
     }
-    else
-      tcpReadSelectReply = 0;
   }
 #endif
 #ifdef NDB_SCI_TRANSPORTER
   if (nSCITransporters > 0)
-    retVal |= poll_SCI(timeOutMillis, recv_thread_id, mask);
+    retVal |= poll_SCI(timeOutMillis, recvdata);
 #endif
 #ifdef NDB_SHM_TRANSPORTER
   if (nSHMTransporters > 0)
   {
-    int res = poll_SHM(0, recv_thread_id, mask);
+    int res = poll_SHM(0, recvdata);
     retVal |= res;
   }
 #endif
@@ -1097,21 +1160,24 @@ TransporterRegistry::pollReceive(Uint32 
 #ifdef NDB_SCI_TRANSPORTER
 Uint32
 TransporterRegistry::poll_SCI(Uint32 timeOutMillis,
-                              Uint32 recv_thread_id,
-                              NodeBitmask* mask)
+                              TransporterReceiveHandle& recvdata)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
   Uint32 retVal = 0;
   for (int i = 0; i < nSCITransporters; i++)
   {
     SCI_Transporter * t = theSCITransporters[i];
-    if (t->receiver_thread_id() != recv_thread_id)
-      continue;
     Uint32 node_id = t->getRemoteNodeId();
+
+    if (!recvdata.m_transporters.get(nodeId))
+      continue;
+
     if (t->isConnected() && is_connected(node_id))
     {
       if (t->hasDataToRead())
       {
-        mask->set(node_id);
+        recvdata.m_has_data_transporters.set(node_id);
 	retVal = 1;
       }
     }
@@ -1125,24 +1191,27 @@ TransporterRegistry::poll_SCI(Uint32 tim
 static int g_shm_counter = 0;
 Uint32
 TransporterRegistry::poll_SHM(Uint32 timeOutMillis,
-                              Uint32 recv_thread_id,
-                              NodeBitmask* mask)
-{  
+                              TransporterReceiveHandle& recvdata)
+{
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
   Uint32 retVal = 0;
   for (int j = 0; j < 100; j++)
   {
     for (int i = 0; i<nSHMTransporters; i++)
     {
       SHM_Transporter * t = theSHMTransporters[i];
-      if (t->receiver_thread_id() != recv_thread_id)
-        continue;
       Uint32 node_id = t->getRemoteNodeId();
+
+      if (!recvdata.m_transporters.get(node_id))
+        continue;
+
       if (t->isConnected() && is_connected(node_id))
       {
 	if (t->hasDataToRead())
         {
           j = 100;
-          mask->set(node_id);
+          recvdata.m_has_data_transporters.set(node_id);
           retVal = 1;
 	}
       }
@@ -1161,34 +1230,35 @@ TransporterRegistry::poll_SHM(Uint32 tim
  * socket, which will be handled correctly in performReceive() (which _is_
  * protected by transporter locks on upper layer).
  */
-Uint32 
+Uint32
 TransporterRegistry::poll_TCP(Uint32 timeOutMillis,
-                              Uint32 recv_thread_id,
-                              NodeBitmask* mask)
+                              TransporterReceiveHandle& recvdata)
 {
-  ndb_socket_poller *socket_poller = &m_socket_poller[recv_thread_id];
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
+  recvdata.m_socket_poller.clear();
 
-  socket_poller->clear();
-  if (recv_thread_id == 0 && m_has_extra_wakeup_socket)
+  if (m_has_extra_wakeup_socket && recvdata.m_transporters.get(0))
   {
     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
 
     // Poll the wakup-socket for read
-    socket_poller->add(socket, true, false, false);
+    recvdata.m_socket_poller.add(socket, true, false, false);
   }
 
   Uint16 idx[MAX_NODES];
   for (int i = 0; i < nTCPTransporters; i++)
   {
     TCP_Transporter * t = theTCPTransporters[i];
-    if (t->receiver_thread_id() != recv_thread_id)
-      continue;
     const NDB_SOCKET_TYPE socket = t->getSocket();
     Uint32 node_id = t->getRemoteNodeId();
 
+    if (!recvdata.m_transporters.get(node_id))
+      continue;
+
     if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
     {
-      idx[i] = socket_poller->add(socket, true, false, false);
+      idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
     }
     else
     {
@@ -1196,21 +1266,19 @@ TransporterRegistry::poll_TCP(Uint32 tim
     }
   }
 
-  tcpReadSelectReply = socket_poller->poll_unsafe(timeOutMillis);
+  int tcpReadSelectReply = recvdata.m_socket_poller.poll_unsafe(timeOutMillis);
 
   if (tcpReadSelectReply > 0)
   {
-    if (recv_thread_id == 0 && m_extra_wakeup_sockets)
+    if (m_extra_wakeup_sockets)
     {
-      if (socket_poller->has_read(0))
-        mask->set((Uint32)0);
+      if (recvdata.m_socket_poller.has_read(0))
+        recvdata.m_has_data_transporters.set((Uint32)0);
     }
 
     for (int i = 0; i < nTCPTransporters; i++)
     {
       TCP_Transporter * t = theTCPTransporters[i];
-      if (t->receiver_thread_id() != recv_thread_id)
-        continue;
       if (idx[i] != MAX_NODES + 1)
       {
         Uint32 node_id = t->getRemoteNodeId();
@@ -1222,8 +1290,8 @@ TransporterRegistry::poll_TCP(Uint32 tim
           continue;
         }
 #endif
-        if (socket_poller->has_read(idx[i]))
-          mask->set(node_id);
+        if (recvdata.m_socket_poller.has_read(idx[i]))
+          recvdata.m_has_data_transporters.set(node_id);
       }
     }
   }
@@ -1232,83 +1300,27 @@ TransporterRegistry::poll_TCP(Uint32 tim
 }
 #endif
 
-#if defined(HAVE_EPOLL_CREATE)
-bool
-TransporterRegistry::change_epoll(TCP_Transporter *t, bool add)
-{
-  struct epoll_event event_poll;
-  bzero(&event_poll, sizeof(event_poll));
-  NDB_SOCKET_TYPE sock_fd = t->getSocket();
-  int node_id = t->getRemoteNodeId();
-  int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
-  int ret_val, error;
-
-  if (!my_socket_valid(sock_fd))
-    return FALSE;
-
-  event_poll.data.u32 = t->getRemoteNodeId();
-  event_poll.events = EPOLLIN;
-  ret_val = epoll_ctl(m_epoll_fd[t->receiver_thread_id()],
-                      op, sock_fd.fd, &event_poll);
-  if (!ret_val)
-    goto ok;
-  error= errno;
-  if (error == ENOENT && !add)
-  {
-    /*
-     * Could be that socket was closed premature to this call.
-     * Not a problem that this occurs.
-     */
-    goto ok;
-  }
-  if (!add || (add && (error != ENOMEM)))
-  {
-    /*
-     * Serious problems, we are either using wrong parameters,
-     * have permission problems or the socket doesn't support
-     * epoll!!
-     */
-    ndbout_c("Failed to %s epollfd: %u fd " MY_SOCKET_FORMAT
-             " node %u to epoll-set,"
-             " errno: %u %s",
-             add ? "ADD" : "DEL",
-             m_epoll_fd[t->receiver_thread_id()],
-             MY_SOCKET_FORMAT_VALUE(sock_fd),
-             node_id,
-             error,
-             strerror(error));
-    abort();
-  }
-  ndbout << "We lacked memory to add the socket for node id ";
-  ndbout << node_id << endl;
-  return TRUE;
-
-ok:
-  return FALSE;
-}
-
-#endif
-
 /**
  * In multi-threaded cases, this must be protected by a global receive lock.
  */
 void
-TransporterRegistry::performReceive(Uint32 recv_thread_id)
+TransporterRegistry::performReceive(TransporterReceiveHandle& recvdata)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
   bool hasReceived = false;
-  NodeBitmask *has_data_transporters =
-    m_has_data_transporters_ptr[recv_thread_id];
 
-  if (has_data_transporters->get(0))
+  if (recvdata.m_has_data_transporters.get(0))
   {
-    has_data_transporters->clear(Uint32(0));
+    assert(recvdata.m_transporters.get(0));
+    recvdata.m_has_data_transporters.clear(Uint32(0));
     consume_extra_sockets();
   }
 
 #ifdef ERROR_INSERT
   if (!m_blocked.isclear())
   {
-    if (has_data_transporters->isclear())
+    if (recvdata.m_has_data_transporters.isclear())
     {
         /* poll sees data, but we want to ignore for now
          * sleep a little to avoid busy loop
@@ -1319,29 +1331,33 @@ TransporterRegistry::performReceive(Uint
 #endif
 
 #ifdef NDB_TCP_TRANSPORTER
-  Uint32 id = 0;
-  while ((id = has_data_transporters->find(id + 1)) != BitmaskImpl::NotFound)
+  for(Uint32 id = recvdata.m_has_data_transporters.find_first();
+      id != BitmaskImpl::NotFound;
+      id = recvdata.m_has_data_transporters.find_next(id + 1))
   {
     bool hasdata = false;
     TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
+
+    assert(recvdata.m_transporters.get(id));
+
     if (is_connected(id))
     {
       if (t->isConnected())
       {
-        t->doReceive();
+        t->doReceive(recvdata);
         if (hasReceived)
-          callbackObj->checkJobBuffer(recv_thread_id);
+          recvdata.checkJobBuffer();
         hasReceived = true;
         Uint32 * ptr;
         Uint32 sz = t->getReceiveData(&ptr);
-        callbackObj->transporter_recv_from(id);
-        Uint32 szUsed = unpack(ptr, sz, id, ioStates[id]);
+        recvdata.transporter_recv_from(id);
+        Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
         t->updateReceiveDataPtr(szUsed);
         hasdata = t->hasReceiveData();
       }
     }
     // If transporter still have data, make sure that it's remember to next time
-    has_data_transporters->set(id, hasdata);
+    recvdata.m_has_data_transporters.set(id, hasdata);
   }
 #endif
   
@@ -1351,15 +1367,14 @@ TransporterRegistry::performReceive(Uint
   for (int i=0; i<nSCITransporters; i++) 
   {
     SCI_Transporter  *t = theSCITransporters[i];
-    if (t->receiver_thread_id() != recv_thread_id)
-      continue;
     const NodeId nodeId = t->getRemoteNodeId();
+    assert(recvdata.m_transporters.get(nodeId));
     if(is_connected(nodeId))
     {
       if(t->isConnected() && t->checkConnected())
       {
         if (hasReceived)
-          callbackObj->checkJobBuffer(recv_thread_id);
+          callbackObj->checkJobBuffer();
         hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
@@ -1374,19 +1389,19 @@ TransporterRegistry::performReceive(Uint
   for (int i=0; i<nSHMTransporters; i++) 
   {
     SHM_Transporter *t = theSHMTransporters[i];
-    if (t->receiver_thread_id() != recv_thread_id)
-      continue;
     const NodeId nodeId = t->getRemoteNodeId();
+    assert(recvdata.m_transporters.get(nodeId));
     if(is_connected(nodeId)){
       if(t->isConnected() && t->checkConnected())
       {
         if (hasReceived)
-          callbackObj->checkJobBuffer(recv_thread_id);
+          recvdata.checkJobBuffer();
         hasReceived = true;
         Uint32 * readPtr, * eodPtr;
         t->getReceivePtr(&readPtr, &eodPtr);
-        callbackObj->transporter_recv_from(nodeId);
-        Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
+        recvdata.transporter_recv_from(nodeId);
+        Uint32 *newPtr = unpack(recvdata,
+                                readPtr, eodPtr, nodeId, ioStates[nodeId]);
         t->updateReceivePtr(newPtr);
       }
     } 
@@ -1505,7 +1520,7 @@ void
 TransporterRegistry::printState(){
   ndbout << "-- TransporterRegistry -- " << endl << endl
 	 << "Transporters = " << nTransporters << endl;
-  for(Uint32 i = 0; i<maxTransporters; i++)
+  for(int i = 0; i<maxTransporters; i++)
     if(theTransporters[i] != NULL){
       const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
       ndbout << "Transporter: " << remoteNodeId 
@@ -1523,8 +1538,12 @@ TransporterRegistry::isBlocked(NodeId no
 }
 
 void
-TransporterRegistry::blockReceive(NodeId nodeId)
+TransporterRegistry::blockReceive(TransporterReceiveHandle& recvdata,
+                                  NodeId nodeId)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(nodeId));
+
   /* Check that node is not already blocked?
    * Stop pulling from its socket (but track received data etc)
    */
@@ -1533,29 +1552,33 @@ TransporterRegistry::blockReceive(NodeId
 
   m_blocked.set(nodeId);
 
-  if (m_has_data_transporters_ptr[0]->get(nodeId))
+  if (recvdata.m_has_data_transporters.get(nodeId))
   {
     assert(!m_blocked_with_data.get(nodeId));
     m_blocked_with_data.set(nodeId);
-    m_has_data_transporters_ptr[0]->clear(nodeId);
+    recvdata.m_has_data_transporters.clear(nodeId);
   }
 }
 
 void
-TransporterRegistry::unblockReceive(NodeId nodeId)
+TransporterRegistry::unblockReceive(TransporterReceiveHandle& recvdata,
+                                    NodeId nodeId)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(nodeId));
+
   /* Check that node is blocked?
    * Resume pulling from its socket
    * Ensure in-flight data is processed if there was some
    */
   assert(m_blocked.get(nodeId));
-  assert(!m_has_data_transporters_ptr[0]->get(nodeId));
+  assert(!recvdata.m_has_data_transporters.get(nodeId));
 
   m_blocked.clear(nodeId);
 
   if (m_blocked_with_data.get(nodeId))
   {
-    m_has_data_transporters_ptr[0]->set(nodeId);
+    recvdata.m_has_data_transporters.set(nodeId);
   }
 
   if (m_blocked_disconnected.get(nodeId))
@@ -1563,7 +1586,7 @@ TransporterRegistry::unblockReceive(Node
     /* Process disconnect notification/handling now */
     m_blocked_disconnected.clear(nodeId);
 
-    report_disconnect(nodeId, m_disconnect_errors[nodeId]);
+    report_disconnect(recvdata, nodeId, m_disconnect_errors[nodeId]);
   }
 }
 #endif
@@ -1592,7 +1615,7 @@ run_start_clients_C(void * me)
 }
 
 /**
- * This method is used to initiate connection, called from the TRPMAN blockx.
+ * This method is used to initiate connection, called from the TRPMAN block.
  *
  * This works asynchronously, no actions are taken directly in the calling
  * thread.
@@ -1652,8 +1675,12 @@ TransporterRegistry::do_disconnect(NodeI
 }
 
 void
-TransporterRegistry::report_connect(NodeId node_id)
+TransporterRegistry::report_connect(TransporterReceiveHandle& recvdata,
+                                    NodeId node_id)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(node_id));
+
   DBUG_ENTER("TransporterRegistry::report_connect");
   DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
 
@@ -1666,26 +1693,28 @@ TransporterRegistry::report_connect(Node
   */
   callbackObj->reset_send_buffer(node_id, true);
 
-  performStates[node_id] = CONNECTED;
-#if defined(HAVE_EPOLL_CREATE)
-  /* If receive thread id 0 is using epoll, then all threads are using it */
-  if (likely(m_epoll_fd[0] != -1))
+  if (recvdata.epoll_add((TCP_Transporter*)theTransporters[node_id]))
   {
-    if (change_epoll((TCP_Transporter*)theTransporters[node_id],
-                     TRUE))
-    {
-      performStates[node_id] = DISCONNECTING;
-      DBUG_VOID_RETURN;
-    }
+    performStates[node_id] = CONNECTED;
+    recvdata.reportConnect(node_id);
+    DBUG_VOID_RETURN;
   }
-#endif
-  callbackObj->reportConnect(node_id);
+
+  /**
+   * Failed to add to epoll_set...
+   *   disconnect it (this is really really bad)
+   */
+  performStates[node_id] = DISCONNECTING;
   DBUG_VOID_RETURN;
 }
 
 void
-TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
+TransporterRegistry::report_disconnect(TransporterReceiveHandle& recvdata,
+                                       NodeId node_id, int errnum)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(node_id));
+
   DBUG_ENTER("TransporterRegistry::report_disconnect");
   DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
 
@@ -1702,11 +1731,8 @@ TransporterRegistry::report_disconnect(N
 #endif
 
   performStates[node_id] = DISCONNECTED;
-  Uint32 recv_thread_id = getReceiveThreadId(node_id);
-  assert(recv_thread_id < MAX_NDBMT_RECEIVE_THREADS &&
-         m_has_data_transporters_ptr[recv_thread_id]);
-  m_has_data_transporters_ptr[recv_thread_id]->clear(node_id);
-  callbackObj->reportDisconnect(node_id, errnum);
+  recvdata.m_has_data_transporters.clear(node_id);
+  recvdata.reportDisconnect(node_id, errnum);
   DBUG_VOID_RETURN;
 }
 
@@ -1735,37 +1761,40 @@ TransporterRegistry::report_error(NodeId
  * connect and disconnect.
  */
 void
-TransporterRegistry::update_connections(Uint32 receiver_thread_id)
+TransporterRegistry::update_connections(TransporterReceiveHandle& recvdata)
 {
+  assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+
   for (int i= 0, n= 0; n < nTransporters; i++){
     Transporter * t = theTransporters[i];
     if (!t)
       continue;
     n++;
-    if (t->receiver_thread_id() != receiver_thread_id)
-      continue;
 
     const NodeId nodeId = t->getRemoteNodeId();
+    if (!recvdata.m_transporters.get(nodeId))
+      continue;
 
     TransporterError code = m_error_states[nodeId].m_code;
     const char *info = m_error_states[nodeId].m_info;
     if (code != TE_NO_ERROR && info != (const char *)~(UintPtr)0)
     {
-      callbackObj->reportError(nodeId, code, info);
+      recvdata.reportError(nodeId, code, info);
       m_error_states[nodeId].m_code = TE_NO_ERROR;
       m_error_states[nodeId].m_info = (const char *)~(UintPtr)0;
     }
+
     switch(performStates[nodeId]){
     case CONNECTED:
     case DISCONNECTED:
       break;
     case CONNECTING:
       if(t->isConnected())
-	report_connect(nodeId);
+	report_connect(recvdata, nodeId);
       break;
     case DISCONNECTING:
       if(!t->isConnected())
-	report_disconnect(nodeId, m_disconnect_errnum[nodeId]);
+	report_disconnect(recvdata, nodeId, m_disconnect_errnum[nodeId]);
       break;
     }
   }

=== modified file 'storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp'
--- a/storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/basictest/basicTransporterTest.cpp	revid:mikael.ronstrom@stripped
@@ -396,12 +396,11 @@ reportDisconnect(void* callbackObj, Node
 }
 
 int
-checkJobBuffer(Uint32 recv_thread_id) {
+checkJobBuffer() {
   /** 
    * Check to see if jobbbuffers are starting to get full
    * and if so call doJob
    */
-  (void)recv_thread_id; /* Not used */
   return 0;
 }
 

=== modified file 'storage/ndb/src/common/transporter/perftest/perfTransporterTest.cpp'
--- a/storage/ndb/src/common/transporter/perftest/perfTransporterTest.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/perftest/perfTransporterTest.cpp	revid:mikael.ronstrom@stripped
@@ -601,12 +601,11 @@ reportDisconnect(void* callbackObj, Node
 
 
 int
-checkJobBuffer(Uint32 recv_thread_id) {
+checkJobBuffer() {
   /** 
    * Check to see if jobbbuffers are starting to get full
    * and if so call doJob
    */
-  (void)recv_thread_id;
   return 0;
 }
 

=== modified file 'storage/ndb/src/common/transporter/priotest/prioTransporterTest.cpp'
--- a/storage/ndb/src/common/transporter/priotest/prioTransporterTest.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/common/transporter/priotest/prioTransporterTest.cpp	revid:mikael.ronstrom@stripped
@@ -701,11 +701,10 @@ reportDisconnect(NodeId nodeId, Uint32 e
 
 
 int
-checkJobBuffer(Uint32 recv_thread_id) {
+checkJobBuffer() {
   /** 
    * Check to see if jobbbuffers are starting to get full
    * and if so call doJob
    */
-  (void)recv_thread_id;
   return 0;
 }

=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	revid:mikael.ronstrom@stripped
@@ -23,6 +23,7 @@
 #include <NdbMem.h>
 #include <NdbTick.h>
 
+#include <TransporterRegistry.hpp>
 #include <SignalLoggerManager.hpp>
 #include <FastScheduler.hpp>
 

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	revid:mikael.ronstrom@stripped
@@ -31,7 +31,6 @@
 #include <SimpleProperties.hpp>
 #include <AttributeHeader.hpp>
 #include <KeyDescriptor.hpp>
-#include <Checksum.hpp>
 #include <signaldata/DictSchemaInfo.hpp>
 #include <signaldata/DictTabInfo.hpp>
 #include <signaldata/DropTabFile.hpp>
@@ -16532,8 +16531,11 @@ void Dbdict::createEvent_sendReply(Signa
       evntRecPtr.p->m_errorLine = __LINE__;
       evntRecPtr.p->m_errorNode = reference();
       jam();
-    } else
+    }
+    else
+    {
       jam();
+    }
   }
 
   // reference to API if master DICT
@@ -20748,7 +20750,10 @@ Dbdict::validateChecksum(const XSchemaFi
 
 Uint32
 Dbdict::computeChecksum(const Uint32 * src, Uint32 len){
-  return computeXorChecksum(src,len);
+  Uint32 ret = 0;
+  for(Uint32 i = 0; i<len; i++)
+    ret ^= src[i];
+  return ret;
 }
 
 SchemaFile::TableEntry *

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	revid:mikael.ronstrom@stripped
@@ -1326,7 +1326,7 @@ private:
     Uint32 clastVerifyQueue;
     Uint32 m_empty_done;
     Uint32 m_ref;
-    char pad[64 - sizeof(char*) - 4*sizeof(Uint32)];
+    char pad[NDB_CL_PADSZ(sizeof(void*) + 4 * sizeof(Uint32))];
   };
 
   bool isEmpty(const DIVERIFY_queue&);

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	revid:mikael.ronstrom@stripped
@@ -76,11 +76,16 @@
 #include <signaldata/SystemError.hpp>
 #include <signaldata/FireTrigOrd.hpp>
 #include <NdbEnv.h>
-#include <Checksum.hpp>
 
 #include "../suma/Suma.hpp"
 #include "DblqhCommon.hpp"
 
+/**
+ * overload handling...
+ * TODO: cleanup...from all sorts of perspective
+ */
+#include <TransporterRegistry.hpp>
+
 #include <EventLogger.hpp>
 extern EventLogger * g_eventLogger;
 
@@ -4278,9 +4283,10 @@ void Dblqh::seizeTcrec() 
   locTcConnectptr.p->connectState = TcConnectionrec::CONNECTED;
 }//Dblqh::seizeTcrec()
 
-bool Dblqh::checkTransporterOverloaded(Signal* signal,
-                                       const NodeBitmask& all,
-                                       const LqhKeyReq* req)
+bool
+Dblqh::checkTransporterOverloaded(Signal* signal,
+                                  const NodeBitmask& all,
+                                  const LqhKeyReq* req)
 {
   // nodes likely to be affected by this op
   NodeBitmask mask;
@@ -4397,16 +4403,29 @@ void Dblqh::execLQHKEYREQ(Signal* signal
 
   {
     const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
-    if (unlikely((!all.isclear() &&
-                  checkTransporterOverloaded(signal, all, lqhKeyReq))) ||
-        ERROR_INSERTED_CLEAR(5047)) {
-      jam();
-      releaseSections(handle);
-      noFreeRecordLab(signal, lqhKeyReq, ZTRANSPORTER_OVERLOADED_ERROR);
-      return;
+    if (unlikely(!all.isclear()))
+    {
+      if (checkTransporterOverloaded(signal, all, lqhKeyReq))
+      {
+        /**
+         * TODO: We should have counters for this...
+         */
+        jam();
+        releaseSections(handle);
+        noFreeRecordLab(signal, lqhKeyReq, ZTRANSPORTER_OVERLOADED_ERROR);
+        return;
+      }
     }
   }
 
+  if (ERROR_INSERTED_CLEAR(5047))
+  {
+    jam();
+    releaseSections(handle);
+    noFreeRecordLab(signal, lqhKeyReq, ZTRANSPORTER_OVERLOADED_ERROR);
+    return;
+  }
+
   sig0 = lqhKeyReq->clientConnectPtr;
   if (cfirstfreeTcConrec != RNIL && !ERROR_INSERTED_CLEAR(5031)) {
     jamEntry();
@@ -9341,7 +9360,8 @@ void Dblqh::lqhTransNextLab(Signal* sign
 
       if (ERROR_INSERTED(5050))
       {
-        ndbout_c("send ZSCAN_MARKERS with 5s delay and killing master");
+        ndbout_c("send ZSCAN_MARKERS with 5s delay and killing master: %u",
+                 c_master_node_id);
         CLEAR_ERROR_INSERT_VALUE;
         signal->theData[0] = ZSCAN_MARKERS;
         signal->theData[1] = tcNodeFailptr.i;
@@ -9350,7 +9370,7 @@ void Dblqh::lqhTransNextLab(Signal* sign
         sendSignalWithDelay(cownref, GSN_CONTINUEB, signal, 5000, 4);
         
         signal->theData[0] = 9999;
-        sendSignal(numberToRef(CMVMI, c_master_node_id), 
+        sendSignal(numberToRef(CMVMI, c_error_insert_extra),
                    GSN_NDB_TAMPER, signal, 1, JBB);
         return;
       }
@@ -11263,6 +11283,21 @@ void Dblqh::scanTupkeyConfLab(Signal* si
   scanptr.p->m_curr_batch_size_bytes+= tdata4 * sizeof(Uint32);
   scanptr.p->m_curr_batch_size_rows = rows + 1;
   scanptr.p->m_last_row = tdata5;
+
+  const NodeBitmask& all = globalTransporterRegistry.get_status_overloaded();
+  if (unlikely(!all.isclear()))
+  {
+    if (all.get(refToNode(scanptr.p->scanApiBlockref)))
+    {
+      /**
+       * End scan batch if transporter-buffer are overloaded
+       *
+       * TODO: We should have counters for this...
+       */
+      tdata5 = 1;
+    }
+  }
+
   if (scanptr.p->check_scan_batch_completed() | tdata5){
     if (scanptr.p->scanLockHold == ZTRUE) {
       jam();
@@ -23390,6 +23425,12 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal
     }
   }
 
+  if (arg == 5050)
+  {
+#ifdef ERROR_INSERT
+    SET_ERROR_INSERT_VALUE2(5050, c_master_node_id);
+#endif
+  }
 }//Dblqh::execDUMP_STATE_ORD()
 
 
@@ -23695,10 +23736,8 @@ Dblqh::execDROP_TRIG_IMPL_REF(Signal* si
 Uint32 Dblqh::calcPageCheckSum(LogPageRecordPtr logP){
     Uint32 checkSum = 37;
 #ifdef VM_TRACE
-    checkSum = computeXorChecksum(
-                 logP.p->logPageWord + (ZPOS_CHECKSUM+1),
-                 ZPAGE_SIZE - (ZPOS_CHECKSUM+1),
-                 checkSum);
+    for (Uint32 i = (ZPOS_CHECKSUM+1); i<ZPAGE_SIZE; i++)
+      checkSum = logP.p->logPageWord[i] ^ checkSum;
 #endif
     return checkSum;  
   }

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	revid:mikael.ronstrom@stripped
@@ -1484,10 +1484,11 @@ private:
   void handleScanStop(Signal* signal, UintR aFailedNode);
   void initScanTcrec(Signal* signal);
   Uint32 initScanrec(ScanRecordPtr,  const class ScanTabReq*,
-		   const UintR scanParallel, 
-		   const UintR noOprecPerFrag,
-                   const Uint32 aiLength,
-                   const Uint32 keyLength);
+                     const UintR scanParallel,
+                     const UintR noOprecPerFrag,
+                     const Uint32 aiLength,
+                     const Uint32 keyLength,
+                     const Uint32 apiPtr[]);
   void initScanfragrec(Signal* signal);
   void releaseScanResources(Signal*, ScanRecordPtr, bool not_started = false);
   ScanRecordPtr seizeScanrec(Signal* signal);
@@ -1940,10 +1941,9 @@ private:
   UintR tconfig1;
   UintR tconfig2;
 
-  UintR cdata[32];
   UintR ctransidFailHash[512];
   UintR ctcConnectFailHash[1024];
-  
+
   /**
    * Commit Ack handling
    */

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	revid:mikael.ronstrom@stripped
@@ -88,6 +88,8 @@
 #include <signaldata/TransIdAI.hpp>
 #include <signaldata/CreateTab.hpp>
 
+#include <TransporterRegistry.hpp> // error 8035
+
 // Use DEBUG to print messages that should be
 // seen only when we debug the product
 #ifdef VM_TRACE
@@ -8380,7 +8382,8 @@ void Dbtc::execNODE_FAILREP(Signal* sign
   cfailure_nr = nodeFail->failNo;
   const Uint32 tnoOfNodes  = nodeFail->noOfNodes;
   const Uint32 tnewMasterId = nodeFail->masterNodeId;
-  
+  Uint32 cdata[MAX_NDB_NODES];
+
   arrGuard(tnoOfNodes, MAX_NDB_NODES);
   Uint32 i;
   int index = 0;
@@ -10413,7 +10416,8 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
   SectionHandle handle(this, signal);
   SegmentedSectionPtr api_op_ptr;
   handle.getSection(api_op_ptr, 0);
-  copy(&cdata[0], api_op_ptr);
+  Uint32 * apiPtr = signal->theData+25; // temp storage
+  copy(apiPtr, api_op_ptr);
 
   Uint32 aiLength= 0;
   Uint32 keyLen= 0;
@@ -10539,7 +10543,8 @@ void Dbtc::execSCAN_TABREQ(Signal* signa
   ndbrequire(transP->apiScanRec == RNIL);
   ndbrequire(scanptr.p->scanApiRec == RNIL);
 
-  errCode = initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag, aiLength, keyLen);
+  errCode = initScanrec(scanptr, scanTabReq, scanParallel, noOprecPerFrag,
+                        aiLength, keyLen, apiPtr);
   if (unlikely(errCode))
   {
     jam();
@@ -10647,7 +10652,8 @@ Dbtc::initScanrec(ScanRecordPtr scanptr,
 		  UintR scanParallel,
 		  UintR noOprecPerFrag,
 		  Uint32 aiLength,
-		  Uint32 keyLength)
+		  Uint32 keyLength,
+                  const Uint32 apiPtr[])
 {
   const UintR ri = scanTabReq->requestInfo;
   scanptr.p->scanTcrec = tcConnectptr.i;
@@ -10699,7 +10705,7 @@ Dbtc::initScanrec(ScanRecordPtr scanptr,
     ptr.p->scanFragState = ScanFragRec::IDLE;
     ptr.p->scanRec = scanptr.i;
     ptr.p->scanFragId = 0;
-    ptr.p->m_apiPtr = cdata[i];
+    ptr.p->m_apiPtr = apiPtr[i];
   }//for
 
   (* (ScanTabReq::getRangeScanFlag(ri) ? 

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp	revid:mikael.ronstrom@stripped
@@ -29,7 +29,6 @@
 #include <signaldata/TupKey.hpp>
 #include <signaldata/AttrInfo.hpp>
 #include <NdbSqlUtil.hpp>
-#include <Checksum.hpp>
 
 // #define TRACE_INTERPRETER
 
@@ -132,14 +131,17 @@ Dbtup::calculateChecksum(Tuple_header* t
                          Tablerec* regTabPtr)
 {
   Uint32 checksum;
-  Uint32 rec_size, *tuple_header;
+  Uint32 i, rec_size, *tuple_header;
   rec_size= regTabPtr->m_offsets[MM].m_fix_header_size;
   tuple_header= tuple_ptr->m_data;
+  checksum= 0;
   // includes tupVersion
   //printf("%p - ", tuple_ptr);
-
-  checksum = computeXorChecksum(
-               tuple_header, rec_size-Tuple_header::HeaderSize);
+  
+  for (i= 0; i < rec_size-Tuple_header::HeaderSize; i++) {
+    checksum ^= tuple_header[i];
+    //printf("%.8x ", tuple_header[i]);
+  }
   
   //printf("-> %.8x\n", checksum);
 
@@ -156,7 +158,9 @@ Dbtup::calculateChecksum(Tuple_header* t
     vsize_words= calculate_total_var_size(req_struct->var_len_array,
                                           regTabPtr->no_var_attr);
     ndbassert(req_struct->var_data_end >= &var_data_part[vsize_words]);
-    checksum = computeXorChecksum(var_data_part,vsize_words,checksum);
+    for (i= 0; i < vsize_words; i++) {
+      checksum ^= var_data_part[i];
+    }
   }
 #endif
   return checksum;

=== modified file 'storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp	revid:mikael.ronstrom@stripped
@@ -575,6 +575,14 @@ private:
 #ifdef ERROR_INSERT
   Uint32 nodeFailCount;
 #endif
+
+  Uint32 get_hb_count(Uint32 nodeId) const {
+    return globalData.get_hb_count(nodeId);
+  }
+
+  Uint32& set_hb_count(Uint32 nodeId) {
+    return globalData.set_hb_count(nodeId);
+  }
 };
 
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrInit.cpp	revid:mikael.ronstrom@stripped
@@ -122,7 +122,7 @@ void Qmgr::initData() 
       nodePtr.p->phase = ZAPI_INACTIVE;
     }
 
-    setNodeInfo(nodePtr.i).m_heartbeat_cnt = cnt;
+    set_hb_count(nodePtr.i) = cnt;
     nodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE;
     nodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
     nodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;

=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	revid:mikael.ronstrom@stripped
@@ -42,6 +42,8 @@
 #include <signaldata/DihRestart.hpp>
 #include <ndb_version.h>
 
+#include <TransporterRegistry.hpp> // Get connect address
+
 #include <EventLogger.hpp>
 extern EventLogger * g_eventLogger;
 
@@ -97,7 +99,7 @@ void Qmgr::execCM_HEARTBEAT(Signal* sign
   jamEntry();
   hbNodePtr.i = signal->theData[0];
   ptrCheckGuard(hbNodePtr, MAX_NDB_NODES, nodeRec);
-  setNodeInfo(hbNodePtr.i).m_heartbeat_cnt= 0;
+  set_hb_count(hbNodePtr.i) = 0;
   return;
 }//Qmgr::execCM_HEARTBEAT()
 
@@ -324,7 +326,7 @@ void Qmgr::execSTTOR(Signal* signal) 
       if (nodePtr.p->phase == ZAPI_INACTIVE)
       {
         jam();
-        setNodeInfo(nodePtr.i).m_heartbeat_cnt = 3;
+        set_hb_count(nodePtr.i) = 3;
         nodePtr.p->phase = ZFAIL_CLOSING;
         nodePtr.p->failState = NORMAL;
       }
@@ -576,9 +578,7 @@ void Qmgr::execCM_INFOCONF(Signal* signa
   signal->theData[0] = 0; // no answer
   signal->theData[1] = 0; // no id
   signal->theData[2] = NodeInfo::DB;
-  signal->theData[3] = 0;
-  signal->theData[4] = 3;
-  sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 5, JBB);
+  sendSignal(TRPMAN_REF, GSN_OPEN_COMORD, signal, 3, JBB);
 
   cpresident = ZNIL;
   cpresidentAlive = ZFALSE;
@@ -1958,10 +1958,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR
     nodePtr.p->failState = NORMAL;
     signal->theData[0] = 0;
     signal->theData[1] = nodePtr.i;
-    signal->theData[2] = 0;
-    signal->theData[3] = 0;
-    signal->theData[4] = 2;
-    sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 5, JBB);
+    sendSignal(TRPMAN_REF, GSN_OPEN_COMORD, signal, 2, JBB);
 #endif
     return;
   case ZSTARTING:
@@ -2098,7 +2095,7 @@ void Qmgr::execCM_ADD(Signal* signal) 
     ndbrequire(addNodePtr.p->phase == ZSTARTING);
     addNodePtr.p->phase = ZRUNNING;
     m_connectivity_check.reportNodeConnect(addNodePtr.i);
-    setNodeInfo(addNodePtr.i).m_heartbeat_cnt= 0;
+    set_hb_count(addNodePtr.i) = 0;
     c_clusterNodes.set(addNodePtr.i);
     findNeighbours(signal, __LINE__);
 
@@ -2187,7 +2184,7 @@ Qmgr::joinedCluster(Signal* signal, Node
    * NODES IN THE CLUSTER.
    */
   nodePtr.p->phase = ZRUNNING;
-  setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+  set_hb_count(nodePtr.i) = 0;
   findNeighbours(signal, __LINE__);
   c_clusterNodes.set(nodePtr.i);
   c_start.reset();
@@ -2434,7 +2431,7 @@ void Qmgr::findNeighbours(Signal* signal
        *---------------------------------------------------------------------*/
       fnNodePtr.i = cneighbourl;
       ptrCheckGuard(fnNodePtr, MAX_NDB_NODES, nodeRec);
-      setNodeInfo(fnNodePtr.i).m_heartbeat_cnt= 0;
+      set_hb_count(fnNodePtr.i) = 0;
     }//if
   }//if
 
@@ -2742,18 +2739,20 @@ void Qmgr::checkHeartbeat(Signal* signal
   }//if
   ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);
   
-  setNodeInfo(nodePtr.i).m_heartbeat_cnt++;
+  set_hb_count(nodePtr.i)++;
   ndbrequire(nodePtr.p->phase == ZRUNNING);
   ndbrequire(getNodeInfo(nodePtr.i).m_type == NodeInfo::DB);
 
-  if(getNodeInfo(nodePtr.i).m_heartbeat_cnt > 2){
+  if (get_hb_count(nodePtr.i) > 2)
+  {
     signal->theData[0] = NDB_LE_MissedHeartbeat;
     signal->theData[1] = nodePtr.i;
-    signal->theData[2] = getNodeInfo(nodePtr.i).m_heartbeat_cnt - 1;
+    signal->theData[2] = get_hb_count(nodePtr.i) - 1;
     sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
   }
 
-  if (getNodeInfo(nodePtr.i).m_heartbeat_cnt > 4) {
+  if (get_hb_count(nodePtr.i) > 4)
+  {
     jam();
     if (m_connectivity_check.getEnabled())
     {
@@ -2796,21 +2795,21 @@ void Qmgr::apiHbHandlingLab(Signal* sign
     if (c_connectedNodes.get(nodeId))
     {
       jam();
-      setNodeInfo(TnodePtr.i).m_heartbeat_cnt++;
-      
-      if(getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 2)
+      set_hb_count(TnodePtr.i)++;
+
+      if (get_hb_count(TnodePtr.i) > 2)
       {
 	signal->theData[0] = NDB_LE_MissedHeartbeat;
 	signal->theData[1] = nodeId;
-	signal->theData[2] = getNodeInfo(TnodePtr.i).m_heartbeat_cnt - 1;
+	signal->theData[2] = get_hb_count(TnodePtr.i) - 1;
 	sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
       }
-      
-      if (getNodeInfo(TnodePtr.i).m_heartbeat_cnt > 4) 
+
+      if (get_hb_count(TnodePtr.i) > 4)
       {
         jam();
 	/*------------------------------------------------------------------*/
-	/* THE API NODE HAS NOT SENT ANY HEARTBEAT FOR THREE SECONDS. 
+	/* THE API NODE HAS NOT SENT ANY HEARTBEAT FOR THREE SECONDS.
 	 * WE WILL DISCONNECT FROM IT NOW.
 	 *------------------------------------------------------------------*/
 	/*------------------------------------------------------------------*/
@@ -2819,7 +2818,7 @@ void Qmgr::apiHbHandlingLab(Signal* sign
 	signal->theData[0] = NDB_LE_DeadDueToHeartbeat;
 	signal->theData[1] = nodeId;
 	sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
-        
+
         api_failed(signal, nodeId);
       }//if
     }//if
@@ -2848,16 +2847,16 @@ void Qmgr::checkStartInterface(Signal* s
     Uint32 type = getNodeInfo(nodePtr.i).m_type;
     if (nodePtr.p->phase == ZFAIL_CLOSING) {
       jam();
-      setNodeInfo(nodePtr.i).m_heartbeat_cnt++;
+      set_hb_count(nodePtr.i)++;
       if (c_connectedNodes.get(nodePtr.i)){
         jam();
 	/*-------------------------------------------------------------------*/
 	// We need to ensure that the connection is not restored until it has 
 	// been disconnected for at least three seconds.
 	/*-------------------------------------------------------------------*/
-        setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+        set_hb_count(nodePtr.i) = 0;
       }//if
-      if ((getNodeInfo(nodePtr.i).m_heartbeat_cnt > 3)
+      if ((get_hb_count(nodePtr.i) > 3)
 	  && (nodePtr.p->failState == NORMAL)) {
 	/**------------------------------------------------------------------
 	 * WE HAVE DISCONNECTED THREE SECONDS AGO. WE ARE NOW READY TO 
@@ -2890,23 +2889,20 @@ void Qmgr::checkStartInterface(Signal* s
              * Dont allow API node to connect before c_allow_api_connect
              */
             jam();
-            setNodeInfo(nodePtr.i).m_heartbeat_cnt = 3;
+            set_hb_count(nodePtr.i) = 3;
             continue;
           }
         }
 
-        setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+        set_hb_count(nodePtr.i) = 0;
         signal->theData[0] = 0;
         signal->theData[1] = nodePtr.i;
-        signal->theData[2] = 0;
-        signal->theData[3] = 0;
-        signal->theData[4] = 2;
-        sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 5, JBB);
+        sendSignal(TRPMAN_REF, GSN_OPEN_COMORD, signal, 2, JBB);
       }
       else
       {
         jam();
-        if(((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 60) == 0)
+        if(((get_hb_count(nodePtr.i) + 1) % 60) == 0)
         {
           jam();
 	  char buf[256];
@@ -2917,10 +2913,10 @@ void Qmgr::checkStartInterface(Signal* s
                                  "Failure handling of node %d has not completed"
                                  " in %d min - state = %d",
                                  nodePtr.i,
-                                 (getNodeInfo(nodePtr.i).m_heartbeat_cnt+1)/60,
+                                 (get_hb_count(nodePtr.i)+1)/60,
                                  nodePtr.p->failState);
             warningEvent("%s", buf);
-            if (((getNodeInfo(nodePtr.i).m_heartbeat_cnt + 1) % 300) == 0)
+            if (((get_hb_count(nodePtr.i) + 1) % 300) == 0)
             {
               jam();
               /**
@@ -2938,7 +2934,7 @@ void Qmgr::checkStartInterface(Signal* s
                                  "Failure handling of api %u has not completed"
                                  " in %d min - state = %d",
                                  nodePtr.i,
-                                 (getNodeInfo(nodePtr.i).m_heartbeat_cnt+1)/60,
+                                 (get_hb_count(nodePtr.i)+1)/60,
                                  nodePtr.p->failState);
             warningEvent("%s", buf);
             if (nodePtr.p->failState == WAITING_FOR_API_FAILCONF)
@@ -3384,7 +3380,7 @@ void Qmgr::node_failed(Signal* signal, U
     /*---------------------------------------------------------------------*/
     failedNodePtr.p->failState = NORMAL;
     failedNodePtr.p->phase = ZFAIL_CLOSING;
-    setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
+    set_hb_count(failedNodePtr.i) = 0;
 
     CloseComReqConf * const closeCom = 
       (CloseComReqConf *)&signal->theData[0];
@@ -3453,7 +3449,7 @@ Qmgr::api_failed(Signal* signal, Uint32 
 
   failedNodePtr.p->failState = initialState;
   failedNodePtr.p->phase = ZFAIL_CLOSING;
-  setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
+  set_hb_count(failedNodePtr.i) = 0;
   setNodeInfo(failedNodePtr.i).m_version = 0;
   recompute_version_info(getNodeInfo(failedNodePtr.i).m_type);
   
@@ -3562,7 +3558,7 @@ void Qmgr::execAPI_REGREQ(Signal* signal
 
   setNodeInfo(apiNodePtr.i).m_version = version;
   setNodeInfo(apiNodePtr.i).m_mysql_version = mysql_version;
-  setNodeInfo(apiNodePtr.i).m_heartbeat_cnt= 0;
+  set_hb_count(apiNodePtr.i) = 0;
 
   NodeState state = getNodeState();
   if (apiNodePtr.p->phase == ZAPI_INACTIVE)
@@ -4131,7 +4127,7 @@ void Qmgr::handleApiCloseComConf(Signal*
          * Allow MGM do reconnect "directly"
          */
         jam();
-        setNodeInfo(failedNodePtr.i).m_heartbeat_cnt = 3;
+        set_hb_count(failedNodePtr.i) = 3;
       }
       
       /* Handled the single API node failure */
@@ -4532,7 +4528,7 @@ void Qmgr::execCOMMIT_FAILREQ(Signal* si
       ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRec);
       nodePtr.p->phase = ZFAIL_CLOSING;
       nodePtr.p->failState = WAITING_FOR_NDB_FAILCONF;
-      setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0;
+      set_hb_count(nodePtr.i) = 0;
       setNodeInfo(nodePtr.i).m_version = 0;
       c_clusterNodes.clear(nodePtr.i);
     }//for
@@ -4824,7 +4820,7 @@ void Qmgr::failReport(Signal* signal,
     failedNodePtr.p->sendPrepFailReqStatus = Q_NOT_ACTIVE;
     failedNodePtr.p->sendCommitFailReqStatus = Q_NOT_ACTIVE;
     failedNodePtr.p->sendPresToStatus = Q_NOT_ACTIVE;
-    setNodeInfo(failedNodePtr.i).m_heartbeat_cnt= 0;
+    set_hb_count(failedNodePtr.i) = 0;
     if (aSendFailRep == ZTRUE) {
       jam();
       if (failedNodePtr.i != getOwnNodeId()) {

=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp	revid:mikael.ronstrom@stripped
@@ -30,7 +30,7 @@ Trpman::Trpman(Block_context & ctx, Uint
 
   addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ);
   addRecSignal(GSN_CLOSE_COMCONF, &Trpman::execCLOSE_COMCONF);
-  addRecSignal(GSN_OPEN_COMREQ, &Trpman::execOPEN_COMREQ);
+  addRecSignal(GSN_OPEN_COMORD, &Trpman::execOPEN_COMORD);
   addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ);
   addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP);
   addRecSignal(GSN_CONNECT_REP, &Trpman::execCONNECT_REP);
@@ -50,19 +50,26 @@ BLOCK_FUNCTIONS(Trpman)
 #ifdef ERROR_INSERT
 static NodeBitmask c_error_9000_nodes_mask;
 extern Uint32 MAX_RECEIVED_SIGNALS;
+
+class TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance);
 #endif
 
 bool
 Trpman::handles_this_node(Uint32 nodeId)
 {
+#if MAX_NDBMT_RECEIVE_THREADS == 1
+  return true;
+#else
   if (globalData.ndbMtReceiveThreads <= (Uint32)1)
     return true;
   return (instance()==
           (globalTransporterRegistry.getReceiveThreadId(nodeId) + 1));
+#endif
 }
 
 void
-Trpman::execOPEN_COMREQ(Signal* signal)
+Trpman::execOPEN_COMORD(Signal* signal)
 {
   // Connect to the specifed NDB node, only QMGR allowed communication
   // so far with the node
@@ -70,10 +77,9 @@ Trpman::execOPEN_COMREQ(Signal* signal)
   const BlockReference userRef = signal->theData[0];
   Uint32 tStartingNode = signal->theData[1];
   Uint32 tData2 = signal->theData[2];
-  Uint32 tData3 = signal->theData[3];
-  Uint32 len = signal->theData[4];
   jamEntry();
 
+  const Uint32 len = signal->getLength();
   if (len == 2)
   {
 #ifdef ERROR_INSERT
@@ -125,14 +131,10 @@ Trpman::execOPEN_COMREQ(Signal* signal)
   }
 
 done:
-  if (userRef != 0)
-  {
-    jam();
-    signal->theData[0] = tStartingNode;
-    signal->theData[1] = tData2;
-    signal->theData[2] = tData3;
-    sendSignal(userRef, GSN_OPEN_COMCONF, signal, 3, JBA);
-  }
+  /**
+   * NO REPLY for now
+   */
+  (void)userRef;
 }
 
 void
@@ -148,7 +150,6 @@ Trpman::execCONNECT_REP(Signal *signal)
    * Inform QMGR that client has connected
    */
   signal->theData[0] = hostId;
-  ndbout_c("CONNECT_REP from node %u", hostId);
   if (ERROR_INSERTED(9005))
   {
     sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1);
@@ -335,8 +336,8 @@ Trpman::execROUTE_ORD(Signal* signal)
 
   Uint32 nodeId = refToNode(dstRef);
 
-  if (((nodeId == 0) ||
-        getNodeInfo(nodeId).m_connected))
+  if (likely((nodeId == 0) ||
+             getNodeInfo(nodeId).m_connected))
   {
     jam();
     Uint32 secCount = handle.m_cnt;
@@ -471,15 +472,15 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
     CLEAR_ERROR_INSERT_VALUE;
     if (signal->getLength() == 1 || signal->theData[1])
     {
+      signal->header.theLength = 2;
       for (Uint32 i = 1; i<MAX_NODES; i++)
       {
-	if (c_error_9000_nodes_mask.get(i) &&
+        if (c_error_9000_nodes_mask.get(i) &&
             handles_this_node(i))
-	{
-	  signal->theData[0] = 0;
-	  signal->theData[1] = i;
-          signal->theData[4] = 2;
-          execOPEN_COMREQ(signal);
+        {
+          signal->theData[0] = 0;
+          signal->theData[1] = i;
+          execOPEN_COMORD(signal);
         }
       }
     }
@@ -526,6 +527,8 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
      (arg == 9992))    /* Block recv from nodeid */
   {
     bool block = (arg == 9992);
+    TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
+    assert(recvdata != 0);
     for (Uint32 n = 1; n < signal->getLength(); n++)
     {
       Uint32 nodeId = signal->theData[n];
@@ -538,14 +541,13 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
         if (block)
         {
           ndbout_c("TRPMAN : Blocking receive from node %u", nodeId);
-
-          globalTransporterRegistry.blockReceive(nodeId);
+          globalTransporterRegistry.blockReceive(*recvdata, nodeId);
         }
         else
         {
           ndbout_c("TRPMAN : Unblocking receive from node %u", nodeId);
 
-          globalTransporterRegistry.unblockReceive(nodeId);
+          globalTransporterRegistry.unblockReceive(*recvdata, nodeId);
         }
       }
       else
@@ -565,6 +567,8 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
                ((pattern == 1)? "Other side":"Unknown"));
     }
 
+    TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
+    assert(recvdata != 0);
     for (Uint32 node = 1; node < MAX_NDB_NODES; node++)
     {
       if (!handles_this_node(node))
@@ -594,7 +598,7 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
               break;
             }
             ndbout_c("TRPMAN : Blocking receive from node %u", node);
-            globalTransporterRegistry.blockReceive(node);
+            globalTransporterRegistry.blockReceive(*recvdata, node);
           }
         }
       }
@@ -602,6 +606,8 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
   }
   if (arg == 9991) /* Unblock recv from all blocked */
   {
+    TransporterReceiveHandle * recvdata = mt_get_trp_receive_handle(instance());
+    assert(recvdata != 0);
     for (Uint32 node = 1; node < MAX_NODES; node++)
     {
       if (!handles_this_node(node))
@@ -609,7 +615,7 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
       if (globalTransporterRegistry.isBlocked(node))
       {
         ndbout_c("CMVMI : Unblocking receive from node %u", node);
-        globalTransporterRegistry.unblockReceive(node);
+        globalTransporterRegistry.unblockReceive(*recvdata, node);
       }
     }
   }
@@ -619,12 +625,11 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
 TrpmanProxy::TrpmanProxy(Block_context & ctx) :
   LocalProxy(TRPMAN, ctx)
 {
-  addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
-  addRecSignal(GSN_OPEN_COMREQ, &TrpmanProxy::execOPEN_COMREQ);
+  addRecSignal(GSN_OPEN_COMORD, &TrpmanProxy::execOPEN_COMORD);
   addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ);
-  addRecSignal(GSN_CLOSE_COMCONF, &TrpmanProxy::execCLOSE_COMCONF);
-  addRecSignal(GSN_OPEN_COMCONF, &TrpmanProxy::execOPEN_COMCONF);
   addRecSignal(GSN_ENABLE_COMCONF, &TrpmanProxy::execENABLE_COMCONF);
+  addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
+  addRecSignal(GSN_CLOSE_COMCONF, &TrpmanProxy::execCLOSE_COMCONF);
   addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD);
 }
 
@@ -640,67 +645,23 @@ TrpmanProxy::newWorker(Uint32 instanceNo
 
 BLOCK_FUNCTIONS(TrpmanProxy);
 
-void
-TrpmanProxy::sendOPEN_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
-{
-  jam();
-  Ss_OPEN_COMREQ& ss = ssFind<Ss_OPEN_COMREQ>(ssId);
-
-  signal->theData[0] = reference();
-  signal->theData[1] = ss.data1;
-  signal->theData[2] = ss.data2;
-  signal->theData[3] = ssId;
-  signal->theData[4] = ss.len;
-  sendSignal(workerRef(ss.m_worker), GSN_OPEN_COMREQ,
-             signal, 5, JBB);
-}
+// GSN_OPEN_COMORD
 
 void
-TrpmanProxy::execOPEN_COMREQ(Signal* signal)
+TrpmanProxy::execOPEN_COMORD(Signal* signal)
 {
   jamEntry();
-  Ss_OPEN_COMREQ& ss = ssSeize<Ss_OPEN_COMREQ>();
-
-  ss.save_ref = signal->theData[0];
-  ss.data1 = signal->theData[1];
-  ss.data2 = signal->theData[2];
-  ss.len = signal->theData[4];
-  sendREQ(signal, ss);
-}
 
-void
-TrpmanProxy::execOPEN_COMCONF(Signal* signal)
-{
-  Uint32 ssId = signal->theData[2];
-  jamEntry();
-  Ss_OPEN_COMREQ& ss = ssFind<Ss_OPEN_COMREQ>(ssId);
-  recvCONF(signal, ss);
-}
-
-void
-TrpmanProxy::sendOPEN_COMCONF(Signal *signal, Uint32 ssId)
-{
-  jam();
-  Ss_OPEN_COMREQ& ss = ssFind<Ss_OPEN_COMREQ>(ssId);
-
-  if (!lastReply(ss))
-  {
-    jam();
-    return;
-  }
-
-  if (ss.save_ref)
+  for (Uint32 i = 0; i<c_workers; i++)
   {
     jam();
-    signal->theData[0] = ss.save_ref;
-    signal->theData[1] = ss.data1;
-    signal->theData[2] = ss.data2;
-    sendSignal(ss.save_ref, GSN_OPEN_COMCONF, signal,
-               ss.len - 1, JBB);
+    sendSignal(workerRef(i), GSN_OPEN_COMORD, signal,
+               signal->getLength(), JBB);
   }
-  ssRelease<Ss_OPEN_COMREQ>(ssId);
 }
 
+// GSN_CLOSE_COMREQ
+
 void
 TrpmanProxy::execCLOSE_COMREQ(Signal* signal)
 {
@@ -749,11 +710,13 @@ TrpmanProxy::sendCLOSE_COMCONF(Signal *s
 
   CloseComReqConf* conf = (CloseComReqConf*)signal->getDataPtrSend();
   *conf = ss.m_req;
-  sendSignal(conf->xxxBlockRef, GSN_CLOSE_COMCONF, signal,
+  sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal,
              CloseComReqConf::SignalLength, JBB);
   ssRelease<Ss_CLOSE_COMREQ>(ssId);
 }
 
+// GSN_ENABLE_COMREQ
+
 void
 TrpmanProxy::execENABLE_COMREQ(Signal* signal)
 {
@@ -807,6 +770,8 @@ TrpmanProxy::sendENABLE_COMCONF(Signal *
   ssRelease<Ss_ENABLE_COMREQ>(ssId);
 }
 
+// GSN_ROUTE_ORD
+
 void
 TrpmanProxy::execROUTE_ORD(Signal* signal)
 {
@@ -814,8 +779,12 @@ TrpmanProxy::execROUTE_ORD(Signal* signa
   Uint32 nodeId = ord->from;
   jamEntry();
 
-  ndbrequire(nodeId != 0);
+  ndbassert(nodeId != 0);
+#if MAX_NDBMT_RECEIVE_THREADS == 1
+  Uint32 workerId = 0;
+#else
   Uint32 workerId = globalTransporterRegistry.getReceiveThreadId(nodeId);
+#endif
   SectionHandle handle(this, signal);
   sendSignal(workerRef(workerId), GSN_ROUTE_ORD, signal,
              signal->getLength(), JBB, &handle);

=== modified file 'storage/ndb/src/kernel/blocks/trpman.hpp'
--- a/storage/ndb/src/kernel/blocks/trpman.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/blocks/trpman.hpp	revid:mikael.ronstrom@stripped
@@ -33,7 +33,7 @@ public:
 
   void execCLOSE_COMREQ(Signal *signal);
   void execCLOSE_COMCONF(Signal * signal);
-  void execOPEN_COMREQ(Signal *signal);
+  void execOPEN_COMORD(Signal *signal);
   void execENABLE_COMREQ(Signal *signal);
   void execDISCONNECT_REP(Signal *signal);
   void execCONNECT_REP(Signal *signal);
@@ -55,26 +55,8 @@ public:
   virtual ~TrpmanProxy();
   BLOCK_DEFINES(TrpmanProxy);
 
-  // GSN_OPEN_COMREQ
-  struct Ss_OPEN_COMREQ : SsParallel {
-    BlockReference save_ref;
-    Uint32 data1;
-    Uint32 data2;
-    Uint32 len;
-    Ss_OPEN_COMREQ() {
-      m_sendREQ = (SsFUNCREQ)&TrpmanProxy::sendOPEN_COMREQ;
-      m_sendCONF = (SsFUNCREP)&TrpmanProxy::sendOPEN_COMCONF;
-    }
-    enum { poolSize = MAX_NODES };
-    static SsPool<Ss_OPEN_COMREQ>& pool(LocalProxy* proxy) {
-      return ((TrpmanProxy*)proxy)->c_ss_OPEN_COMREQ;
-    }
-  };
-  SsPool<Ss_OPEN_COMREQ> c_ss_OPEN_COMREQ;
-  void execOPEN_COMREQ(Signal *signal);
-  void sendOPEN_COMREQ(Signal*, Uint32 ssId, SectionHandle*);
-  void execOPEN_COMCONF(Signal *signal);
-  void sendOPEN_COMCONF(Signal*, Uint32 ssId);
+  // GSN_OPEN_COMORD
+  void execOPEN_COMORD(Signal *signal);
 
   // GSN_CLOSE_COMREQ
   struct Ss_CLOSE_COMREQ : SsParallel {
@@ -118,4 +100,5 @@ public:
 protected:
   virtual SimulatedBlock* newWorker(Uint32 instanceNo);
 };
+
 #endif

=== modified file 'storage/ndb/src/kernel/error/ErrorReporter.cpp'
--- a/storage/ndb/src/kernel/error/ErrorReporter.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/error/ErrorReporter.cpp	revid:mikael.ronstrom@stripped
@@ -213,9 +213,6 @@ ErrorReporter::handleAssert(const char* 
 #ifdef NO_EMULATED_JAM
   BaseString::snprintf(refMessage, 100, "file: %s lineNo: %d",
 	   file, line);
-  jam = NULL;
-  jamIndex = 0;
-  jamBlockNumber = 0;
 #else
   const EmulatedJamBuffer *jamBuffer =
     (EmulatedJamBuffer *)NdbThread_GetTlsKey(NDB_THREAD_TLS_JAM);

=== modified file 'storage/ndb/src/kernel/ndbd.cpp'
--- a/storage/ndb/src/kernel/ndbd.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/ndbd.cpp	revid:mikael.ronstrom@stripped
@@ -29,6 +29,8 @@
 
 #include "ndbd.hpp"
 
+#include <TransporterRegistry.hpp>
+
 #include <ConfigRetriever.hpp>
 #include <LogLevel.hpp>
 
@@ -607,7 +609,6 @@ ndbd_run(bool foreground, int report_fd,
   }
 
   theConfig->setupConfiguration();
-  globalTransporterRegistry.assignRecvThreads(globalData.ndbMtReceiveThreads);
 
   if (get_multithreaded_config(globalEmulatorData))
     ndbd_exit(-1);

=== modified file 'storage/ndb/src/kernel/vm/ArrayPool.hpp'
--- a/storage/ndb/src/kernel/vm/ArrayPool.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/ArrayPool.hpp	revid:mikael.ronstrom@stripped
@@ -261,7 +261,7 @@ protected:
    * Protect here means to have them on separate CPU cache lines to
    * avoid false CPU cache line sharing.
    */
-  char protect_read_var[64 - (sizeof(Uint32) + sizeof(void*))];
+  char protect_read_var[NDB_CL_PADSZ(sizeof(Uint32) + sizeof(void*))];
   Uint32 firstFree;
   Uint32 noOfFree;
   Uint32 noOfFreeMin;

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	revid:mikael.ronstrom@stripped
@@ -17,6 +17,7 @@
 
 #include <ndb_global.h>
 
+#include <TransporterRegistry.hpp>
 #include "Configuration.hpp"
 #include <ErrorHandlingMacros.hpp>
 #include "GlobalData.hpp"
@@ -470,7 +471,11 @@ Configuration::setupConfiguration(){
       break;
 
     globalData.ndbMtTcThreads = m_thr_config.getThreadCount(THRConfig::T_TC);
-    globalData.ndbMtSendThreads = m_thr_config.getThreadCount(THRConfig::T_SEND);
+    globalData.ndbMtSendThreads =
+      m_thr_config.getThreadCount(THRConfig::T_SEND);
+    globalData.ndbMtReceiveThreads =
+      m_thr_config.getThreadCount(THRConfig::T_RECV);
+
     globalData.isNdbMtLqh = true;
     {
       if (m_thr_config.getMtClassic())
@@ -503,10 +508,6 @@ Configuration::setupConfiguration(){
 
     globalData.ndbMtLqhWorkers = workers;
     globalData.ndbMtLqhThreads = threads;
-    globalData.ndbMtSendThreads =
-      m_thr_config.getThreadCount(THRConfig::T_SEND);
-    globalData.ndbMtReceiveThreads =
-      m_thr_config.getThreadCount(THRConfig::T_RECV);
   } while (0);
 
   calcSizeAlt(cf);
@@ -681,8 +682,9 @@ Configuration::calcSizeAlt(ConfigValues 
   {
     lqhInstances = globalData.ndbMtLqhWorkers;
   }
+
   Uint32 tcInstances = 1;
-  if (globalData.ndbMtTcThreads)
+  if (globalData.ndbMtTcThreads > 1)
   {
     tcInstances = globalData.ndbMtTcThreads;
   }
@@ -814,7 +816,7 @@ Configuration::calcSizeAlt(ConfigValues 
     noOfLocalScanRecords = (noOfDBNodes * noOfScanRecords) + 
 #else
     noOfLocalScanRecords = tcInstances * lqhInstances *
-                           (noOfDBNodes * noOfScanRecords) +
+      (noOfDBNodes * noOfScanRecords) +
 #endif
       1 /* NR */ + 
       1 /* LCP */; 

=== modified file 'storage/ndb/src/kernel/vm/Emulator.cpp'
--- a/storage/ndb/src/kernel/vm/Emulator.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/Emulator.cpp	revid:mikael.ronstrom@stripped
@@ -79,7 +79,12 @@ EmulatorData::create(){
     Global jam() buffer, for non-multithreaded operation.
     For multithreaded ndbd, each thread will set a local jam buffer later.
   */
-  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, (void *)&theEmulatedJamBuffer);
+#ifndef NO_EMULATED_JAM
+  void * jamBuffer = (void *)&theEmulatedJamBuffer;
+#else
+  void * jamBuffer = 0;
+#endif
+  NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, jamBuffer);
 
   NdbMem_Create();
 

=== modified file 'storage/ndb/src/kernel/vm/Emulator.hpp'
--- a/storage/ndb/src/kernel/vm/Emulator.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/Emulator.hpp	revid:mikael.ronstrom@stripped
@@ -26,7 +26,6 @@
 //
 //===========================================================================
 #include <kernel_types.h>
-#include <TransporterRegistry.hpp>
 
 extern class  JobTable            globalJobTable;
 extern class  TimeQueue           globalTimeQueue;
@@ -38,18 +37,17 @@ extern struct GlobalData          global
 extern class SignalLoggerManager globalSignalLoggers;
 #endif
 
-#ifndef NO_EMULATED_JAM
 /* EMULATED_JAM_SIZE must be a power of two, so JAM_MASK will work. */
 #define EMULATED_JAM_SIZE 1024
 #define JAM_MASK (EMULATED_JAM_SIZE - 1)
 
-struct EmulatedJamBuffer {
+struct EmulatedJamBuffer
+{
   Uint32 theEmulatedJamIndex;
   // last block entry, used in dumpJam() if jam contains no block entries
   Uint32 theEmulatedJamBlockNumber;
   Uint32 theEmulatedJam[EMULATED_JAM_SIZE];
 };
-#endif
 
 struct EmulatorData {
   class Configuration * theConfiguration;

=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp	revid:mikael.ronstrom@stripped
@@ -41,9 +41,10 @@ enum  restartStates {initial_state, 
                      perform_stop};
 
 struct GlobalData {
+  Uint32     m_hb_count[MAX_NODES];   // hb counters
   NodeInfo   m_nodeInfo[MAX_NODES];   // At top to ensure cache alignment
   Signal     VMSignals[1];            // Owned by FastScheduler::
-  Uint32     m_restart_seq;           // 
+  Uint32     m_restart_seq;           //
   NodeVersionInfo m_versionInfo;
   
   Uint64     internalMillisecCounter; // Owned by ThreadConfig::
@@ -91,6 +92,7 @@ struct GlobalData {
     ndbMtSendThreads = 0;
     ndbMtReceiveThreads = 0;
     ndbLogParts = 0;
+    bzero(m_hb_count, sizeof(m_hb_count));
 #ifdef GCP_TIMER_HACK
     gcp_timer_limit = 0;
 #endif
@@ -107,7 +109,18 @@ struct GlobalData {
   
   void           incrementWatchDogCounter(Uint32 place);
   Uint32 * getWatchDogPtr();
-  
+
+  Uint32 getBlockThreads() const {
+    return ndbMtLqhThreads + ndbMtTcThreads + ndbMtReceiveThreads;
+  }
+
+  Uint32 get_hb_count(Uint32 nodeId) const {
+    return m_hb_count[nodeId];
+  }
+
+  Uint32& set_hb_count(Uint32 nodeId) {
+    return m_hb_count[nodeId];
+  }
 private:
   Uint32     watchDog;
   SimulatedBlock* blockTable[NO_OF_BLOCKS]; // Owned by Dispatcher::

=== modified file 'storage/ndb/src/kernel/vm/SectionReader.cpp'
--- a/storage/ndb/src/kernel/vm/SectionReader.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/SectionReader.cpp	revid:mikael.ronstrom@stripped
@@ -17,7 +17,6 @@
 
 
 #include <SectionReader.hpp>
-#include <TransporterDefinitions.hpp>
 #include "LongSignal.hpp"
 
 #if 0

=== modified file 'storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp'
--- a/storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/SimplePropertiesSection.cpp	revid:mikael.ronstrom@stripped
@@ -17,7 +17,6 @@
 */
 
 #include <SimpleProperties.hpp>
-#include <TransporterDefinitions.hpp>
 #include "LongSignal.hpp"
 #include "LongSignalImpl.hpp"
 #include "SimulatedBlock.hpp"

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	revid:mikael.ronstrom@stripped
@@ -161,15 +161,20 @@ SimulatedBlock::initCommon()
   this->getParam("FragmentInfoHash", &count);
   c_fragmentInfoHash.setSize(count);
 
-  count = MAX_THREADS_IN_BLOCK + 1;
+  Uint32 def = 5;
+#ifdef NDBD_MULTITHREADED
+  def += globalData.getBlockThreads();
+#endif
+
+  count = def;
   this->getParam("ActiveMutexes", &count);
   c_mutexMgr.setSize(count);
-  
-  count = MAX_THREADS_IN_BLOCK + 1;
+
+  count = def;
   this->getParam("ActiveCounters", &count);
   c_counterMgr.setSize(count);
 
-  count = MAX_THREADS_IN_BLOCK + 1;
+  count = def;
   this->getParam("ActiveThreadSync", &count);
   c_syncThreadPool.setSize(count);
 }

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp	revid:mikael.ronstrom@stripped
@@ -17,7 +17,6 @@
 
 #include <ndb_global.h>
 
-#include <TransporterCallback.hpp>
 #include <TransporterRegistry.hpp>
 #include <FastScheduler.hpp>
 #include <Emulator.hpp>
@@ -80,17 +79,10 @@ const char *lookupConnectionError(Uint32
 #ifndef NDBD_MULTITHREADED
 extern TransporterRegistry globalTransporterRegistry; // Forward declaration
 
-class TransporterCallbackKernelNonMT : public TransporterCallbackKernel
+class TransporterCallbackKernelNonMT :
+  public TransporterCallback,
+  public TransporterReceiveHandleKernel
 {
-  /**
-   * Check to see if jobbbuffers are starting to get full
-   * and if so call doJob
-   */
-  int checkJobBuffer(Uint32 recv_thread_id)
-  {
-    (void)recv_thread_id;
-    return globalScheduler.checkDoJob();
-  }
   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
   Uint32 get_bytes_to_send_iovec(NodeId node, struct iovec *dst, Uint32 max)
   {
@@ -110,7 +102,8 @@ class TransporterCallbackKernelNonMT : p
   }
 };
 static TransporterCallbackKernelNonMT myTransporterCallback;
-TransporterRegistry globalTransporterRegistry(&myTransporterCallback);
+TransporterRegistry globalTransporterRegistry(&myTransporterCallback,
+                                              &myTransporterCallback);
 #endif
 
 #ifdef NDBD_MULTITHREADED
@@ -138,20 +131,17 @@ mt_set_section_chunk_size()
 #else
 void mt_init_receiver_cache(){}
 void mt_set_section_chunk_size(){}
-void init_receiver_cache(){}
 #endif
 
 void
-TransporterCallbackKernel::deliver_signal(SignalHeader * const header,
-                                          Uint8 prio,
-                                          Uint32 * const theData,
-                                          LinearSectionPtr ptr[3],
-                                          Uint32 recvThreadId,
-                                          Uint32 recvThreadNum)
+TransporterReceiveHandleKernel::deliver_signal(SignalHeader * const header,
+                                               Uint8 prio,
+                                               Uint32 * const theData,
+                                               LinearSectionPtr ptr[3])
 {
 #ifdef NDBD_MULTITHREADED
-  assert(recvThreadId < MAX_NDBMT_RECEIVE_THREADS);
-  SectionSegmentPool::Cache & cache = g_receiver_thread_cache[recvThreadId].cache_instance;
+  SectionSegmentPool::Cache & cache =
+    g_receiver_thread_cache[m_receiver_thread_idx].cache_instance;
 #endif
 
   const Uint32 secCount = header->m_noOfSections;
@@ -201,10 +191,10 @@ TransporterCallbackKernel::deliver_signa
     globalScheduler.execute(header, prio, theData, secPtrI);  
 #else
     if (prio == JBB)
-      sendlocal(recvThreadNum,
+      sendlocal(m_thr_no /* self */,
                 header, theData, secPtrI);
     else
-      sendprioa(recvThreadNum,
+      sendprioa(m_thr_no /* self */,
                 header, theData, secPtrI);
 
 #endif
@@ -238,12 +228,11 @@ TransporterCallbackKernel::deliver_signa
   globalScheduler.execute(header, prio, theData, secPtrI);    
 #else
   if (prio == JBB)
-    sendlocal(recvThreadNum,
+    sendlocal(m_thr_no /* self */,
               header, theData, NULL);
   else
-    sendprioa(recvThreadNum,
+    sendprioa(m_thr_no /* self */,
               header, theData, NULL);
-    
 #endif
 }
 
@@ -254,9 +243,9 @@ operator<<(NdbOut& out, const SectionSeg
 }
 
 void
-TransporterCallbackKernel::reportError(NodeId nodeId,
-                                       TransporterError errorCode,
-                                       const char *info)
+TransporterReceiveHandleKernel::reportError(NodeId nodeId,
+                                            TransporterError errorCode,
+                                            const char *info)
 {
 #ifdef DEBUG_TRANSPORTER
   ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
@@ -322,7 +311,7 @@ TransporterCallbackKernel::reportError(N
   Uint32 secPtr[3];
   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 #else
-  sendprioa(globalTransporterRegistry.getReceiveThreadNum(nodeId),
+  sendprioa(m_thr_no /* self */,
             &signal.header, signal.theData, NULL);
 #endif
 
@@ -360,7 +349,7 @@ TransporterCallbackKernelNonMT::reportSe
  * Report average receive length in bytes (4096 last receives)
  */
 void
-TransporterCallbackKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
+TransporterReceiveHandleKernel::reportReceiveLen(NodeId nodeId, Uint32 count,
                                             Uint64 bytes)
 {
 
@@ -380,7 +369,7 @@ TransporterCallbackKernel::reportReceive
   Uint32 secPtr[3];
   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 #else
-  sendprioa(globalTransporterRegistry.getReceiveThreadNum(nodeId),
+  sendprioa(m_thr_no /* self */,
             &signal.header, signal.theData, NULL);
 #endif
 }
@@ -390,17 +379,16 @@ TransporterCallbackKernel::reportReceive
  */
 
 void
-TransporterCallbackKernel::reportConnect(NodeId nodeId)
+TransporterReceiveHandleKernel::reportConnect(NodeId nodeId)
 {
 
   SignalT<1> signal;
   memset(&signal.header, 0, sizeof(signal.header));
 
-#ifdef NDBD_MULTITHREADED
-  Uint32 trpman_instance = 1 +
-    globalTransporterRegistry.getReceiveThreadId(nodeId);
+#ifndef NDBD_MULTITHREADED
+  Uint32 trpman_instance = 1;
 #else
-  Uint32 trpman_instance = 0;
+  Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
 #endif
   signal.header.theLength = 1;
   signal.header.theSendersSignalId = 0;
@@ -409,7 +397,7 @@ TransporterCallbackKernel::reportConnect
   signal.header.theVerId_signalNumber = GSN_CONNECT_REP;
 
   signal.theData[0] = nodeId;
-  
+
 #ifndef NDBD_MULTITHREADED
   Uint32 secPtr[3];
   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
@@ -418,7 +406,7 @@ TransporterCallbackKernel::reportConnect
    * The first argument to sendprioa is from which thread number this
    * signal is sent, it is always sent from a receive thread
    */
-  sendprioa(globalTransporterRegistry.getReceiveThreadNum(nodeId),
+  sendprioa(m_thr_no /* self */,
             &signal.header, signal.theData, NULL);
 #endif
 }
@@ -427,18 +415,17 @@ TransporterCallbackKernel::reportConnect
  * Report connection broken
  */
 void
-TransporterCallbackKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
+TransporterReceiveHandleKernel::reportDisconnect(NodeId nodeId, Uint32 errNo)
 {
   DBUG_ENTER("reportDisconnect");
 
   SignalT<sizeof(DisconnectRep)/4> signal;
   memset(&signal.header, 0, sizeof(signal.header));
 
-#ifdef NDBD_MULTITHREADED
-  Uint32 trpman_instance = 1 +
-    globalTransporterRegistry.getReceiveThreadId(nodeId);
+#ifndef NDBD_MULTITHREADED
+  Uint32 trpman_instance = 1;
 #else
-  Uint32 trpman_instance = 0;
+  Uint32 trpman_instance = 1 /* proxy */ + m_receiver_thread_idx;
 #endif
   signal.header.theLength = DisconnectRep::SignalLength;
   signal.header.theSendersSignalId = 0;
@@ -455,7 +442,7 @@ TransporterCallbackKernel::reportDisconn
   Uint32 secPtr[3];
   globalScheduler.execute(&signal.header, JBA, signal.theData, secPtr);
 #else
-  sendprioa(globalTransporterRegistry.getReceiveThreadNum(nodeId),
+  sendprioa(m_thr_no /* self */,
             &signal.header, signal.theData, NULL);
 #endif
 
@@ -487,9 +474,46 @@ SignalLoggerManager::printSegmentedSecti
     putc('\n', output);
 }
 
+/**
+ * Check to see if jobbbuffers are starting to get full
+ * and if so call doJob
+ */
+int
+TransporterReceiveHandleKernel::checkJobBuffer()
+{
+#ifndef NDBD_MULTITHREADED
+  return globalScheduler.checkDoJob();
+#else
+  return mt_checkDoJob(m_receiver_thread_idx);
+#endif
+}
+
 void
-TransporterCallbackKernel::transporter_recv_from(NodeId nodeId)
+TransporterReceiveHandleKernel::assign_nodes(Uint32 *recv_thread_idx_array)
 {
-  globalData.m_nodeInfo[nodeId].m_heartbeat_cnt= 0;
+  m_transporters.clear(); /* Clear all first */
+  for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
+  {
+    if (recv_thread_idx_array[nodeId] == m_receiver_thread_idx)
+      m_transporters.set(nodeId); /* Belongs to our receive thread */
+  }
   return;
 }
+
+void
+TransporterReceiveHandleKernel::transporter_recv_from(NodeId nodeId)
+{
+  if (globalData.get_hb_count(nodeId) != 0)
+  {
+    globalData.set_hb_count(nodeId) = 0;
+  }
+}
+
+#ifndef NDBD_MULTITHREADED
+class TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance)
+{
+  assert(instance == 0);
+  return &myTransporterCallback;
+}
+#endif

=== modified file 'storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/TransporterCallbackKernel.hpp	revid:mikael.ronstrom@stripped
@@ -1,4 +1,4 @@
-/* Copyright (C) 2008 MySQL AB
+/* Copyright (c) 2008, 2012, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -13,21 +13,45 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
-class TransporterCallbackKernel: public TransporterCallback
+#ifndef TRANSPORTER_CALLBACK_KERNEL_HPP
+#define TRANSPORTER_CALLBACK_KERNEL_HPP
+
+#include <TransporterCallback.hpp>
+
+class TransporterReceiveHandleKernel
+  : public TransporterReceiveHandle
 {
 public:
+#ifdef NDBD_MULTITHREADED
+  TransporterReceiveHandleKernel(Uint32 thr_no, Uint32 recv_thr_no) :
+    m_thr_no(thr_no), m_receiver_thread_idx(recv_thr_no) {}
+
+  /**
+   * m_thr_no == index in m_thr_data[]
+   */
+  Uint32 m_thr_no;
+
+  /**
+   * m_receiver_thread_idx == m_thr_no - firstReceiverThread ==
+   *   instance() - 1(proxy)
+   */
+  Uint32 m_receiver_thread_idx;
+#endif
+
   /* TransporterCallback interface. */
   void deliver_signal(SignalHeader * const header,
                       Uint8 prio,
                       Uint32 * const signalData,
-                      LinearSectionPtr ptr[3],
-                      Uint32 recvThreadId,
-                      Uint32 recvThreadNum);
+                      LinearSectionPtr ptr[3]);
   void reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes);
   void reportConnect(NodeId nodeId);
   void reportDisconnect(NodeId nodeId, Uint32 errNo);
   void reportError(NodeId nodeId, TransporterError errorCode,
                    const char *info = 0);
   void transporter_recv_from(NodeId node);
-  virtual ~TransporterCallbackKernel() { }
+  void assign_nodes(Uint32 *recv_thread_idx_array);
+  int checkJobBuffer();
+  virtual ~TransporterReceiveHandleKernel() { }
 };
+
+#endif

=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.cpp	revid:mikael.ronstrom@stripped
@@ -1,4 +1,4 @@
-/* Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
+/* Copyright (c) 2008, 2011, 2012, Oracle and/or its affiliates. All rights reserved.
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
@@ -15,6 +15,8 @@
 
 #include <ndb_global.h>
 
+#define NDBD_MULTITHREADED
+
 #include <VMSignal.hpp>
 #include <kernel_types.h>
 #include <Prio.hpp>
@@ -24,6 +26,7 @@
 #include <GlobalData.hpp>
 #include <WatchDog.hpp>
 #include <TransporterDefinitions.hpp>
+#include <TransporterRegistry.hpp>
 #include "FastScheduler.hpp"
 #include "mt.hpp"
 #include <DebuggerNames.hpp>
@@ -49,9 +52,6 @@ GlobalData::mt_getBlock(BlockNumber bloc
 #define memcpy __builtin_memcpy
 #endif
 
-/* size of a cacheline */
-#define NDB_CL 64
-
 /* Constants found by benchmarks to be reasonable values. */
 
 /* Maximum number of signals to execute before sending to remote nodes. */
@@ -358,212 +358,6 @@ trylock(struct thr_spin_lock<SZ>* sl)
 #define thr_spin_lock thr_mutex
 #endif
 
-#if defined(HAVE_GCC_ATOMIC_BUILTINS) && defined(USE_FUTEX)
-
-/**
- * Adaptive locks: Combine spinlocks with FUTEX locks
- *
- * Has these properties:
- *  1) There is a 'fast track' not requiring any systems calls,
- *     where an atomic CAS (Compare And Swap) take or release
- *     the lock in the normal (?) uncontended case.
- *     
- *  2) If this fails, we are allowed to spinlock upto
- *     LOCK_MAX_SPINS waiting for the lock to become free.
- *     The rational here is to try to avoid expensive context
- *     switches.
- *
- *  3) If even this fail, FUTEXes are used to suspend/resume
- *     the thread.
- *
- * In addition there is the 'adaptive' part where we detect
- * in 2) that there are other lock waiters which failed to grab
- * lock while spining. (Lock is 'contended'). In these cases
- * other lock waiters will skip part 2),
- * and go directly from 1) -> 3).
- */
-
-/**
- * State transitions for lock is:
- * LOCK_FREE -> LOCK_TAKEN -> LOCK_CONTENDED -> LOCK_FREE
- * NOTE: It should never go directly from  LOCK_CONTENDED -> LOCK_TAKEN
- */
-#define LOCK_FREE      0   // Lock is free
-#define LOCK_TAKEN     1   // Lock taken, there might be spinlock waiters
-#define LOCK_CONTENDED 2   // Lock taken, and spinlock waiters might have suspended
-
-#define LOCK_MAX_SPINS (100*1000)
-
-class Adaptive_Lock
-{
-public:
-  void init()
-  {
-    m_state = LOCK_FREE;
-  }
-
-  void lock()
-  {
-    // Atomically set LOCK_TAKEN iff it was LOCK_FREE
-    Uint32 state = __sync_val_compare_and_swap(&m_state, LOCK_FREE, LOCK_TAKEN);
-    if (likely(state==LOCK_FREE))
-    {
-      return; // Immediate success
-    }
-    /**
-     * Has to wait for LOCK_FREE, we either:
-     */
-    if (likely(state==LOCK_TAKEN))
-    {
-      /**
-       * Spinlock for a while until it's LOCK_FREE, or giving up
-       * and declare LOCK_CONTENDED where we suspend this thread.
-       */
-      lock_wait();
-    }
-    else
-    {
-      /**
-       * If lock already was in a LOCK_CONTENDED state,
-       * we suspend directly.
-       */
-      lock_contended();
-    }
-  }
-
-  int trylock()
-  {
-    // Atomically set LOCK_TAKEN iff it was LOCK_FREE
-    if (likely(__sync_bool_compare_and_swap(&m_state, LOCK_FREE, LOCK_TAKEN)))
-    {
-      return 0; // success
-    }
-    return 1;
-  }
-
-  void unlock()
-  {
-    // We own the lock, so it's either LOCK_TAKEN or LOCK_CONTENDED
-    int state = __sync_fetch_and_sub(&m_state,1);
-    assert(state != LOCK_FREE);
-    if (unlikely(state == LOCK_CONTENDED))
-    {
-      // Important: we are still holding the lock!
-      m_state = LOCK_FREE; // not any more
-      // Wake up one thread (no fairness assumed)
-      futex_wake(&m_state);
-    }
-    // Else: There was no suspended waiters to wakeup
-  }
-
-private:
-  void lock_wait();
-  void lock_contended();
-
-  volatile unsigned int m_state; // LOCK_xxx state
-}; // class Adaptive_Lock
-
-
-ATTRIBUTE_NOINLINE
-void Adaptive_Lock::lock_wait()
-{
-  Uint32 spins = 0;
-  do
-  {
-    /**
-     * Inner part of loop is expected to run entirely in local L1 cache
-     * *not* writing or CAS'ing as this may steal memory bandwidth
-     * and invalidate cachelines in other L1 / L2 caches.
-     */
-    do
-    {
-      cpu_pause();
-      if (spins++ >= LOCK_MAX_SPINS)
-      {
-        // We give up waiting for it to become free, will suspend
-        lock_contended();
-        return;
-      }
-    } while (m_state != LOCK_FREE);
-
-    // Someone may steal the lock before our CAS!
-  } while(!__sync_bool_compare_and_swap(&m_state, LOCK_FREE, LOCK_TAKEN));
-
-  // LOCK_FREE -> LOCK_TAKEN succeeded -> Got lock
-}
-
-ATTRIBUTE_NOINLINE
-void Adaptive_Lock::lock_contended()
-{
-  /**
-   * Assume lock is still taken, try to make it LOCK_CONTENDED and wait
-   */
-  do
-  {
-    if (m_state == LOCK_CONTENDED ||
-        __sync_bool_compare_and_swap(&m_state, LOCK_TAKEN, LOCK_CONTENDED))
-    {
-      // let's wait, but only if the value is still LOCK_CONTENDED
-      futex_wait(&m_state, LOCK_CONTENDED, NULL);
-    }
-
-    /**
-     * Try (again) assuming the lock has become LOCK_FREE
-     * However, it could have been a spurious wakeup, or some
-     * other can steal the lock before we have CAS'ed it.
-     */
-  } while (!__sync_bool_compare_and_swap(&m_state, LOCK_FREE, LOCK_CONTENDED));
-
-  // LOCK_FREE -> LOCK_CONTENDED succeeded -> Got lock
-}
-
-template <unsigned SZ>
-struct thr_adaptive_lock
-{
-  thr_adaptive_lock(const char * name = 0)
-  {
-    m_mutex.init();
-    register_lock(this, name);
-  }
-
-  union {
-    Adaptive_Lock m_mutex;
-    char pad[SZ];
-  };
-};
-
-template <unsigned SZ>
-static
-inline
-void
-lock(struct thr_adaptive_lock<SZ>* sl)
-{
-  sl->m_mutex.lock();
-}
-
-template <unsigned SZ>
-static
-inline
-void
-unlock(struct thr_adaptive_lock<SZ>* sl)
-{
-  sl->m_mutex.unlock();
-}
-
-template <unsigned SZ>
-static
-inline
-int
-trylock(struct thr_adaptive_lock<SZ> * sl)
-{
-  return sl->m_mutex.trylock();
-}
-
-#else
-#define thr_adaptive_lock thr_mutex
-#endif // HAVE_GCC_ATOMIC_BUILTINS && USE_FUTEX
-
-
 template <unsigned SZ>
 struct thr_mutex
 {
@@ -1209,12 +1003,11 @@ struct mt_send_handle  : public Transpor
   virtual bool forceSend(NodeId node);
 };
 
-struct trp_callback : public TransporterCallbackKernel
+struct trp_callback : public TransporterCallback
 {
   trp_callback() {}
 
   /* Callback interface. */
-  int checkJobBuffer(Uint32 recv_thread_id);
   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
   void lock_transporter(NodeId node);
   void unlock_transporter(NodeId node);
@@ -2431,10 +2224,10 @@ trp_callback::unlock_transporter(NodeId 
 }
 
 int
-trp_callback::checkJobBuffer(Uint32 recv_thread_id)
+mt_checkDoJob(Uint32 recv_thread_idx)
 {
   struct thr_repository* rep = &g_thr_repository;
-  if (unlikely(check_job_buffers(rep, recv_thread_id)))
+  if (unlikely(check_job_buffers(rep, recv_thread_idx)))
   {
     do 
     {
@@ -2465,7 +2258,7 @@ trp_callback::checkJobBuffer(Uint32 recv
       NdbSleep_MilliSleep(0);
 #endif
 
-    } while (check_job_buffers(rep, recv_thread_id));
+    } while (check_job_buffers(rep, recv_thread_idx));
   }
 
   return 0;
@@ -3579,7 +3372,7 @@ aligned_signal(unsigned char signal_buf[
 }
 
 /*
- * We only do receive in receiver threads, no other threads do receive.
+ * We only do receive in receiver thread(s), no other threads do receive.
  *
  * As part of the receive loop, we also periodically call update_connections()
  * (this way we are similar to single-threaded ndbd).
@@ -3588,6 +3381,14 @@ aligned_signal(unsigned char signal_buf[
  * receive loop; this way we avoid races between update_connections() and
  * TRPMAN calls into the transporters.
  */
+
+/**
+ * Array of pointers to TransporterReceiveHandleKernel
+ *   these are not used "in traffic"
+ */
+TransporterReceiveHandleKernel *
+  g_trp_receive_handle_ptr[MAX_NDBMT_RECEIVE_THREADS];
+
 extern "C"
 void *
 mt_receiver_thread_main(void *thr_arg)
@@ -3599,24 +3400,32 @@ mt_receiver_thread_main(void *thr_arg)
   unsigned thr_no = selfptr->m_thr_no;
   Uint32& watchDogCounter = selfptr->m_watchdog_counter;
   Uint32 thrSignalId = 0;
-  Uint32 recv_thread_id = thr_no - first_receiver_thread_no;
+  Uint32 recv_thread_idx = thr_no - first_receiver_thread_no;
   bool has_received = false;
   int cnt = 0;
-  NodeBitmask mask;
 
-  mask.clear();
-  /* Register our own bitmask to avoid false CPU cache sharing */
-  globalTransporterRegistry.registerBitmaskPtr(recv_thread_id, &mask);
   init_thread(selfptr);
   signal = aligned_signal(signal_buf, thr_no);
 
-  ndbout_c("Starting receiver thread id %u", recv_thread_id);
+  ndbout_c("Starting receiver thread id %u", recv_thread_idx);
+  /**
+   * Object that keeps track of our pollReceive-state
+   */
+  TransporterReceiveHandleKernel recvdata(thr_no, recv_thread_idx);
+  recvdata.assign_nodes(g_nodes_to_thr_map);
+  globalTransporterRegistry.init(recvdata);
+
+  /**
+   * Save pointer to this for management/error-insert
+   */
+  g_trp_receive_handle_ptr[recv_thread_idx] = &recvdata;
+
   while (globalData.theRestartFlag != perform_stop)
   {
     if (cnt == 0)
     {
       watchDogCounter = 5;
-      globalTransporterRegistry.update_connections(recv_thread_id);
+      globalTransporterRegistry.update_connections(recvdata);
     }
     cnt = (cnt + 1) & 15;
 
@@ -3638,14 +3447,14 @@ mt_receiver_thread_main(void *thr_arg)
     watchDogCounter = 7;
 
     has_received = false;
-    if (globalTransporterRegistry.pollReceive(1, recv_thread_id))
+    if (globalTransporterRegistry.pollReceive(1, recvdata))
     {
-      if (check_job_buffers(rep, recv_thread_id) == 0)
+      if (check_job_buffers(rep, recv_thread_idx) == 0)
       {
 	watchDogCounter = 8;
-        lock(&rep->m_receive_lock[recv_thread_id]);
-        globalTransporterRegistry.performReceive(recv_thread_id);
-        unlock(&rep->m_receive_lock[recv_thread_id]);
+        lock(&rep->m_receive_lock[recv_thread_idx]);
+        globalTransporterRegistry.performReceive(recvdata);
+        unlock(&rep->m_receive_lock[recv_thread_idx]);
         has_received = true;
       }
     }
@@ -4325,7 +4134,6 @@ ThreadConfig::init()
   Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
   first_receiver_thread_no =
     NUM_MAIN_THREADS + num_tc_threads + num_lqh_threads;
-  globalTransporterRegistry.setReceiveThreadNum(first_receiver_thread_no);
   num_threads = first_receiver_thread_no + num_recv_threads;
   require(num_threads <= MAX_BLOCK_THREADS);
 
@@ -4348,6 +4156,37 @@ setcpuaffinity(struct thr_repository* re
   }
 }
 
+/**
+ * Array for mapping nodes to receiver threads
+ */
+static NodeId g_node_to_recv_thr_map[MAX_NODES];
+
+static
+void
+mt_assign_receiver_threads(void)
+{
+  Uint32 num_recv_threads = globalData.ndbMtReceiveThreads;
+  Uint32 recv_thread_idx = 0;
+  for (nodeId = 0; nodeId < MAX_NODES; nodeId++)
+  {
+    Transporter *node_trp =
+      globalTransporterRegistry.get_transporter(nodeId);
+    if (node_trp)
+    {
+      g_node_to_recv_thr_map[nodeId] = recv_thread_idx;
+      recv_thread_idx++;
+      if (recv_thread_idx == num_recv_threads)
+        recv_thread_idx = 0;
+    }
+    else
+    {
+      /* Flag for no transporter */
+      g_node_to_recv_thr_map[nodeId] = MAX_NODES;
+    }
+  }
+  return;
+}
+
 void
 ThreadConfig::ipControlLoop(NdbThread* pThis, Uint32 thread_index)
 {
@@ -4377,7 +4216,10 @@ ThreadConfig::ipControlLoop(NdbThread* p
   /*
    * Start threads for all execution threads, except for the receiver
    * thread, which runs in the main thread.
+   * 
+   * Assign nodes to receiver threads before starting any threads.
    */
+  mt_assign_receiver_threads();
   for (thr_no = 0; thr_no < num_threads; thr_no++)
   {
     rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
@@ -4979,6 +4821,16 @@ mt_get_thr_stat(class SimulatedBlock * b
   dst->local_sent_priob = selfptr->m_stat.m_priob_count;
 }
 
+TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance)
+{
+  assert(instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS);
+  if (instance > 0 && instance <= MAX_NDBMT_RECEIVE_THREADS)
+  {
+    return g_trp_receive_handle_ptr[instance - 1 /* proxy */];
+  }
+  return 0;
+}
 
 /**
  * Global data
@@ -4987,4 +4839,5 @@ struct thr_repository g_thr_repository;
 
 struct trp_callback g_trp_callback;
 
-TransporterRegistry globalTransporterRegistry(&g_trp_callback, false);
+TransporterRegistry globalTransporterRegistry(&g_trp_callback, NULL,
+                                              false);

=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt.hpp	revid:mikael.ronstrom@stripped
@@ -14,17 +14,11 @@
    along with this program; if not, write to the Free Software
    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
 
-#include <kernel_types.h>
-#include <TransporterDefinitions.hpp>
-
 #ifndef ndb_mt_hpp
 #define ndb_mt_hpp
 
-
-/*
-  For now, we use locks to only have one thread at the time running in the
-  transporter as sender, and only one as receiver.
-*/
+#include <kernel_types.h>
+#include <TransporterDefinitions.hpp>
 
 Uint32 mt_get_instance_count(Uint32 block);
 
@@ -54,6 +48,8 @@ SendStatus mt_send_remote(Uint32 self, c
 void mt_section_lock();
 void mt_section_unlock();
 
+int mt_checkDoJob();
+
 /**
  * Are we (not) multi threaded
  */
@@ -107,4 +103,11 @@ struct ndb_thr_stat
 void
 mt_get_thr_stat(class SimulatedBlock *, ndb_thr_stat* dst);
 
+/**
+ * Get TransporterReceiveHandle for a specific trpman instance
+ *   Currently used for error insert that block/unblock traffic
+ */
+class TransporterReceiveHandle *
+mt_get_trp_receive_handle(unsigned instance);
+
 #endif

=== modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp'
--- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp	revid:mikael.ronstrom@stripped
@@ -152,222 +152,15 @@ THRConfig::do_parse(unsigned MaxNoOfExec
   case 1:
   case 2:
   case 3:
-    lqhthreads = 1; // Global + receiver + Local + LQH
+    lqhthreads = 1; // TC + receiver + SUMA + LQH
     break;
   case 4:
   case 5:
   case 6:
-    lqhthreads = 2; // Global + receiver + Local + 2 * LQH
-    break;
-  case 7:
-    /* 4 * LQH + receiver + Local(SUMA) + Global */
-    lqhthreads = 4;
-    break;
-  case 8:
-    /* 1 Send + 4 * LQH + receiver + Local(SUMA) + Global */
-    lqhthreads = 4;
-    sendthreads = 1;
-    break;
-  case 9:
-    /* 2 TC + 4 * LQH + Local(SUMA) + Global */
-    lqhthreads = 4;
-    tcthreads = 2;
-    break;
-  case 10:
-    /* 1 Send + 2 TC + 4 * LQH + Local(SUMA) + Global */
-    lqhthreads = 4;
-    tcthreads = 2;
-    sendthreads = 1;
-    break;
-  case 11:
-    /* 2 Send + 2 TC + 4 * LQH + receiver + Local(SUMA) + Global */
-    lqhthreads = 4;
-    sendthreads = 2;
-    tcthreads = 2;
-    break;
-  case 12:
-    /* 2 Send + 3 TC + 4 * LQH + Local(SUMA) + Global */
-    lqhthreads = 4;
-    tcthreads = 3;
-    sendthreads = 2;
-    break;
-  case 13:
-    /* 3 Send + 3 TC + 4 * LQH + Local(SUMA) + Global */
-    lqhthreads = 4;
-    tcthreads = 3;
-    sendthreads = 3;
-    break;
-  case 14:
-    /* 4 Send + 3 TC + 4 * LQH + Local(SUMA) + Global */
-    lqhthreads = 4;
-    tcthreads = 3;
-    sendthreads = 4;
-    break;
-  case 15:
-    /* 2 TC + 2 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 2;
-    tcthreads = 2;
-    break;
-  case 16:
-    /* 3 TC + 2 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 2;
-    tcthreads = 3;
-    break;
-  case 17:
-    /* 3 TC + 3 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 3;
-    tcthreads = 3;
-    break;
-  case 18:
-    /* 4 TC + 3 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 3;
-    tcthreads = 4;
-    break;
-  case 19:
-    /* 4 TC + 4 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 4;
-    tcthreads = 4;
-    break;
-  case 20:
-    /* 5 TC + 4 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 4;
-    tcthreads = 5;
-    break;
-  case 21:
-    /* 6 TC + 4 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 4;
-    tcthreads = 6;
-    break;
-  case 22:
-    /* 6 TC + 5 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 5;
-    tcthreads = 6;
-    break;
-  case 23:
-    /* 6 TC + 6 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 6;
-    tcthreads = 6;
-    break;
-  case 24:
-    /* 6 TC + 7 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 7;
-    tcthreads = 6;
-    break;
-  case 25:
-    /* 6 TC + 8 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 8;
-    tcthreads = 6;
-    break;
-  case 26:
-    /* 7 TC + 8 Send + 8 * LQH + Local + Global + receiver */
-    lqhthreads = 8;
-    sendthreads = 8;
-    tcthreads = 7;
-    break;
-  case 27:
-    /* 4 TC + 4 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 4;
-    tcthreads = 4;
-    break;
-  case 28:
-    /* 5 TC + 4 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 4;
-    tcthreads = 5;
-    break;
-  case 29:
-    /* 6 TC + 4 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 4;
-    tcthreads = 6;
-    break;
-  case 30:
-    /* 7 TC + 4 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 4;
-    tcthreads = 7;
-    break;
-  case 31:
-    /* 8 TC + 4 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 4;
-    tcthreads = 8;
-    break;
-  case 32:
-    /* 8 TC + 5 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 5;
-    tcthreads = 8;
-    break;
-  case 33:
-    /* 8 TC + 6 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 6;
-    tcthreads = 8;
-    break;
-  case 34:
-    /* 8 TC + 7 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 7;
-    tcthreads = 8;
-    break;
-  case 35:
-    /* 8 TC + 8 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 8;
-    tcthreads = 8;
-    break;
-  case 36:
-  case 37:
-  case 38:
-  case 39:
-    /* 12 TC + 8 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 8;
-    tcthreads = 12;
-    break;
-  case 40:
-  case 41:
-  case 42:
-  case 43:
-    /* 12 TC + 12 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 12;
-    tcthreads = 12;
-    break;
-  case 44:
-  case 45:
-  case 46:
-  case 47:
-    /* 12 TC + 12 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 12;
-    tcthreads = 16;
-    break;
-  case 48:
-  case 49:
-  case 50:
-  case 51:
-    /* 16 TC + 12 Send + 16 * LQH + Local + Global + receiver */
-    lqhthreads = 16;
-    sendthreads = 16;
-    tcthreads = 16;
+    lqhthreads = 2; // TC + receiver + SUMA + 2 * LQH
     break;
   default:
-    lqhthreads = 4;
-    assert(false);
+    lqhthreads = 4; // TC + receiver + SUMA + 4 * LQH
   }
 
   if (__ndbmt_lqh_threads)
@@ -1215,7 +1008,7 @@ THRConfigApplier::appendInfo(BaseString&
 const char *
 THRConfigApplier::getName(const unsigned short list[], unsigned cnt) const
 {
-  const THRConfig::T_Thread* thr = find_thread(list, cnt);
+  const T_Thread* thr = find_thread(list, cnt);
   assert(thr != 0);
   return getEntryName(thr->m_type);
 }

=== modified file 'storage/ndb/src/mgmsrv/ConfigInfo.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigInfo.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/mgmsrv/ConfigInfo.cpp	revid:mikael.ronstrom@stripped
@@ -1114,19 +1114,24 @@ const ConfigInfo::ParamInfo ConfigInfo::
     CFG_DB_NO_REDOLOG_PARTS,
     "NoOfFragmentLogParts",
     DB_TOKEN,
-    "Number of file sets of log files belonging to "DB_TOKEN_PRINT" node",
+    "Number of file groups of redo log files belonging to "DB_TOKEN_PRINT" node",
     ConfigInfo::CI_USED,
     CI_RESTART_INITIAL,
     ConfigInfo::CI_INT,
     "4",
     "4",
-    "16"},
+#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
+    "4"
+#else
+    "16"
+#endif
+  },
 
   {
     CFG_DB_NO_REDOLOG_FILES,
     "NoOfFragmentLogFiles",
     DB_TOKEN,
-    "No of Redo log files in each of the file sets belonging to "DB_TOKEN_PRINT" node",
+    "No of Redo log files in each of the file group belonging to "DB_TOKEN_PRINT" node",
     ConfigInfo::CI_USED,
     CI_RESTART_INITIAL,
     ConfigInfo::CI_INT,

=== modified file 'storage/ndb/src/ndbapi/NdbDictionary.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionary.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/NdbDictionary.cpp	revid:mikael.ronstrom@stripped
@@ -3189,10 +3189,10 @@ NdbDictionary::printFormattedValue(NdbOu
       NdbBlob::unpackBlobHead(head, (const char*) val_p, c->getBlobVersion());
       out << head.length << ":";
       const unsigned char* p = val_p + head.headsize;
-      if ((unsigned int) c->getLength() < head.headsize)
+      if ((unsigned int) c->getInlineSize() < head.headsize)
         out << "***error***"; // really cannot happen
       else {
-        unsigned n = c->getLength() - head.headsize;
+        unsigned n = c->getInlineSize() - head.headsize;
         for (unsigned k = 0; k < n && k < head.length; k++) {
           if (c->getType() == NdbDictionary::Column::Blob)
             out.print("%02X", (int)p[k]);

=== modified file 'storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp	revid:mikael.ronstrom@stripped
@@ -20,10 +20,11 @@
 
 #include <NdbEventOperation.hpp>
 #include <signaldata/SumaImpl.hpp>
-#include <transporter/TransporterDefinitions.hpp>
 #include <NdbRecAttr.hpp>
 #include <AttributeHeader.hpp>
 #include <UtilBuffer.hpp>
+#include <Vector.hpp>
+#include <NdbMutex.h>
 
 #define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
 //#define EVENT_DEBUG

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	revid:mikael.ronstrom@stripped
@@ -140,10 +140,12 @@ TransporterFacade::transporter_recv_from
  * 
  *****************************************************************************/
 
+/**
+ * Report connection broken
+ */
 int
-TransporterFacade::checkJobBuffer(Uint32 recv_thread_id)
+TransporterFacade::checkJobBuffer()
 {
-  (void)recv_thread_id;
   return 0;
 }
 
@@ -207,13 +209,9 @@ TRACE_GSN(Uint32 gsn)
 void
 TransporterFacade::deliver_signal(SignalHeader * const header,
                                   Uint8 prio, Uint32 * const theData,
-                                  LinearSectionPtr ptr[3],
-                                  Uint32 receiverThreadId,
-                                  Uint32 receiverThreadNum)
+                                  LinearSectionPtr ptr[3])
 {
   Uint32 tRecBlockNo = header->theReceiversBlockNumber;
-  (void)receiverThreadId; //Ignored in API
-  (void)receiverThreadNum; //Ignored in API
   
 #ifdef API_TRACE
   if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
@@ -353,7 +351,7 @@ TransporterFacade::start_instance(NodeId
   (void)signal(SIGPIPE, SIG_IGN);
 #endif
 
-  theTransporterRegistry = new TransporterRegistry(this, true, 1);
+  theTransporterRegistry = new TransporterRegistry(this, this);
   if (theTransporterRegistry == NULL)
     return -1;
 

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	revid:mikael.ronstrom@stripped
@@ -41,7 +41,9 @@ extern "C" {
   void* runReceiveResponse_C(void*);
 }
 
-class TransporterFacade : public TransporterCallback
+class TransporterFacade :
+  public TransporterCallback,
+  public TransporterReceiveHandle
 {
 public:
   /**
@@ -178,10 +180,8 @@ public:
   void deliver_signal(SignalHeader * const header,
                       Uint8 prio,
                       Uint32 * const signalData,
-                      LinearSectionPtr ptr[3],
-                      Uint32 recvThreadId,
-                      Uint32 recvThreadNum);
-  int checkJobBuffer(Uint32 recv_thread_id);
+                      LinearSectionPtr ptr[3]);
+  int checkJobBuffer();
   void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
   void reportReceiveLen(NodeId nodeId, Uint32 count, Uint64 bytes);
   void reportConnect(NodeId nodeId);

=== modified file 'storage/ndb/test/ndbapi/testNodeRestart.cpp'
--- a/storage/ndb/test/ndbapi/testNodeRestart.cpp	revid:mikael.ronstrom@stripped
+++ b/storage/ndb/test/ndbapi/testNodeRestart.cpp	revid:mikael.ronstrom@stripped
@@ -3188,15 +3188,12 @@ loop:
   int val2[] = { DumpStateOrd::CmvmiSetRestartOnErrorInsert, 1 };    
   res.dumpStateOneNode(master, val2, 2);
   res.dumpStateOneNode(victim, val2, 2);
-  
-  for (int i = 0; i<res.getNumDbNodes(); i++)
-  {
-    int nodeId = res.getDbNodeId(i);
-    res.insertErrorInNode(nodeId, 5050);
-  }
-  
+
+  int err5050[] = { 5050 };
+  res.dumpStateAllNodes(err5050, 1);
+
   res.insertErrorInNode(victim, 9999);
-  
+
   int nodes[2];
   nodes[0] = master;
   nodes[1] = victim;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (mikael.ronstrom:3710 to 3717) Mikael Ronstrom21 Jan