2715 Pekka Nousiainen 2008-08-11 [merge]
merge
modified:
storage/ndb/include/ndbapi/NdbScanOperation.hpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
2714 Pekka Nousiainen 2008-08-11 [merge]
merge
renamed:
mysql-test/t/ndb_log_update_as_write_basic.test => mysql-test/suite/ndb/t/ndb_log_update_as_write_basic.test
mysql-test/t/ndb_log_updated_only_basic.test => mysql-test/suite/ndb/t/ndb_log_updated_only_basic.test
modified:
mysql-test/r/ndb_log_update_as_write_basic.result
mysql-test/r/ndb_log_updated_only_basic.result
mysql-test/suite/rpl/r/rpl_heartbeat.result
mysql-test/suite/rpl_ndb/r/rpl_ndb_rep_error.result
storage/ndb/src/kernel/blocks/ERROR_codes.txt
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcInit.cpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/ndbcntr/Ndbcntr.hpp
storage/ndb/src/kernel/blocks/qmgr/Qmgr.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/pc.hpp
storage/ndb/test/ndbapi/test_event.cpp
storage/ndb/test/run-test/daily-basic-tests.txt
mysql-test/suite/ndb/t/ndb_log_update_as_write_basic.test
mysql-test/suite/ndb/t/ndb_log_updated_only_basic.test
2713 Pekka Nousiainen 2008-08-11
wl#4391 12.diff
Drop table.
modified:
storage/ndb/include/kernel/signaldata/PrepDropTab.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp
storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp
2712 Pekka Nousiainen 2008-08-11
wl#4391 11.diff
GCP.
modified:
storage/ndb/include/kernel/signaldata/GCP.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
2711 Pekka Nousiainen 2008-08-11
wl#4391 10.diff
BACKUP create sequence.
modified:
storage/ndb/include/kernel/signaldata/UtilSequence.hpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp
storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp
2710 Pekka Nousiainen 2008-08-11
wl#4391 09.diff
LCP (no log part assignment yet).
modified:
storage/ndb/include/kernel/signaldata/LCP.hpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp
2709 Pekka Nousiainen 2008-08-11
wl#4391 08.diff
PK/SCAN TC->LQH/inst un-PACKED.
modified:
storage/ndb/include/kernel/signaldata/DiGetNodes.hpp
storage/ndb/include/kernel/signaldata/DihScanTab.hpp
storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp
storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
2708 Pekka Nousiainen 2008-08-11
wl#4391 07.diff
Create table (required for IS).
added:
storage/ndb/include/kernel/signaldata/TabCommit.hpp
modified:
storage/ndb/include/kernel/signaldata/LqhFrag.hpp
storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp
2707 Pekka Nousiainen 2008-08-11
wl#4391 06.diff
Misc small signals.
modified:
storage/ndb/src/kernel/blocks/LocalProxy.cpp
storage/ndb/src/kernel/blocks/LocalProxy.hpp
=== modified file 'storage/ndb/include/kernel/kernel_config_parameters.h'
--- a/storage/ndb/include/kernel/kernel_config_parameters.h 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/include/kernel/kernel_config_parameters.h 2008-08-11 11:21:36 +0000
@@ -63,7 +63,7 @@
#define CFG_TUX_ATTRIBUTE (PRIVATE_BASE + 42)
#define CFG_TUX_SCAN_OP (PRIVATE_BASE + 43)
-#define CFG_NDBMT_WORKERS (PRIVATE_BASE + 44)
-#define CFG_NDBMT_THREADS (PRIVATE_BASE + 45)
+#define CFG_NDBMT_LQH_WORKERS (PRIVATE_BASE + 44)
+#define CFG_NDBMT_LQH_THREADS (PRIVATE_BASE + 45)
#endif
=== modified file 'storage/ndb/include/kernel/ndb_limits.h'
--- a/storage/ndb/include/kernel/ndb_limits.h 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/include/kernel/ndb_limits.h 2008-08-11 11:21:36 +0000
@@ -181,7 +181,7 @@
#define NDBMT_BLOCK_INSTANCE_BITS 7
#define NDBMT_BLOCK_INSTANCE_MASK 0xFE00
-#define MAX_NDBMT_WORKERS 4
-#define MAX_NDBMT_THREADS 4
+#define MAX_NDBMT_LQH_WORKERS 4
+#define MAX_NDBMT_LQH_THREADS 4
#endif
=== modified file 'storage/ndb/include/kernel/signaldata/DiGetNodes.hpp'
--- a/storage/ndb/include/kernel/signaldata/DiGetNodes.hpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/include/kernel/signaldata/DiGetNodes.hpp 2008-08-11 11:44:42 +0000
@@ -32,13 +32,14 @@ struct DiGetNodesConf {
*/
friend class Dbdih;
- STATIC_CONST( SignalLength = 3 + MAX_REPLICAS );
+ STATIC_CONST( SignalLength = 4 + MAX_REPLICAS );
STATIC_CONST( REORG_MOVING = 0x80000000);
Uint32 zero;
Uint32 fragId;
Uint32 reqinfo;
- Uint32 nodes[MAX_REPLICAS];
+ Uint32 instanceKey;
+ Uint32 nodes[MAX_REPLICAS]; //+1
};
/**
*
=== modified file 'storage/ndb/include/kernel/signaldata/DihScanTab.hpp'
--- a/storage/ndb/include/kernel/signaldata/DihScanTab.hpp 2008-06-05 20:31:21 +0000
+++ b/storage/ndb/include/kernel/signaldata/DihScanTab.hpp 2008-08-11 11:44:42 +0000
@@ -66,6 +66,7 @@ struct DihScanGetNodesConf
Uint32 count;
Uint32 tableId;
Uint32 fragId;
+ Uint32 instanceKey;
};
/**
=== modified file 'storage/ndb/include/kernel/signaldata/GCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/GCP.hpp 2008-01-01 12:45:11 +0000
+++ b/storage/ndb/include/kernel/signaldata/GCP.hpp 2008-08-11 11:51:25 +0000
@@ -91,6 +91,7 @@ class GCPSaveReq // Distr. DIH-LQH
* Reciver(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
friend bool printGCPSaveReq(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo);
@@ -109,6 +110,7 @@ class GCPSaveRef // Distr. LQH-DIH
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
/**
* Reciver(s)
@@ -139,6 +141,7 @@ class GCPSaveConf // Distr. LQH-DIH
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
/**
* Reciver(s)
=== modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/LCP.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/LCP.hpp 2008-08-11 11:48:10 +0000
@@ -92,6 +92,7 @@ class LcpFragOrd {
* Receiver(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
friend bool printLCP_FRAG_ORD(FILE *, const Uint32 *, Uint32, Uint16);
public:
@@ -117,6 +118,7 @@ class LcpFragRep {
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
friend bool printLCP_FRAG_REP(FILE *, const Uint32 *, Uint32, Uint16);
public:
@@ -142,6 +144,7 @@ class LcpCompleteRep {
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
friend bool printLCP_COMPLETE_REP(FILE *, const Uint32 *, Uint32, Uint16);
public:
=== modified file 'storage/ndb/include/kernel/signaldata/LqhFrag.hpp'
--- a/storage/ndb/include/kernel/signaldata/LqhFrag.hpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/include/kernel/signaldata/LqhFrag.hpp 2008-08-11 11:42:06 +0000
@@ -103,6 +103,7 @@ class LqhFragReq {
* Receiver(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
friend bool printLQH_FRAG_REQ(FILE *, const Uint32 *, Uint32, Uint16);
@@ -150,6 +151,7 @@ class LqhFragConf {
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
/**
* Receiver(s)
@@ -173,6 +175,7 @@ class LqhFragRef {
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
/**
* Receiver(s)
@@ -202,6 +205,7 @@ class LqhAddAttrReq {
* Receiver(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
friend bool printLQH_ADD_ATTR_REQ(FILE *, const Uint32 *, Uint32, Uint16);
public:
@@ -226,6 +230,7 @@ class LqhAddAttrRef {
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
/**
* Receiver(s)
@@ -246,6 +251,7 @@ class LqhAddAttrConf {
* Sender(s)
*/
friend class Dblqh;
+ friend class DblqhProxy;
/**
* Receiver(s)
=== modified file 'storage/ndb/include/kernel/signaldata/NdbSttor.hpp'
--- a/storage/ndb/include/kernel/signaldata/NdbSttor.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/NdbSttor.hpp 2008-08-11 11:37:57 +0000
@@ -37,6 +37,7 @@ class NdbSttor {
friend class Backup;
friend class Suma;
friend class Grep;
+ friend class LocalProxy;
friend bool printNDB_STTOR(FILE *, const Uint32 *, Uint32, Uint16);
public:
@@ -72,6 +73,7 @@ class NdbSttorry {
friend class Backup;
friend class Suma;
friend class Grep;
+ friend class LocalProxy;
friend bool printNDB_STTORRY(FILE *, const Uint32 *, Uint32, Uint16);
public:
=== modified file 'storage/ndb/include/kernel/signaldata/PrepDropTab.hpp'
--- a/storage/ndb/include/kernel/signaldata/PrepDropTab.hpp 2008-02-06 20:00:10 +0000
+++ b/storage/ndb/include/kernel/signaldata/PrepDropTab.hpp 2008-08-11 11:53:43 +0000
@@ -29,6 +29,7 @@ class PrepDropTabReq {
*/
friend class Dbtc;
friend class Dblqh;
+ friend class DblqhProxy;
friend class Dbdih;
friend bool printPREP_DROP_TAB_REQ(FILE *, const Uint32 *, Uint32, Uint16);
@@ -48,6 +49,7 @@ class PrepDropTabConf {
*/
friend class Dbtc;
friend class Dblqh;
+ friend class DblqhProxy;
friend class Dbdih;
/**
@@ -71,6 +73,7 @@ class PrepDropTabRef {
*/
friend class Dbtc;
friend class Dblqh;
+ friend class DblqhProxy;
friend class Dbdih;
/**
=== modified file 'storage/ndb/include/kernel/signaldata/ReadNodesConf.hpp'
--- a/storage/ndb/include/kernel/signaldata/ReadNodesConf.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/kernel/signaldata/ReadNodesConf.hpp 2008-08-11 11:37:57 +0000
@@ -46,6 +46,7 @@ class ReadNodesConf {
friend class Backup;
friend class Suma;
friend class Grep;
+ friend class LocalProxy;
friend bool printREAD_NODES_CONF(FILE*, const Uint32 *, Uint32, Uint16);
public:
=== added file 'storage/ndb/include/kernel/signaldata/TabCommit.hpp'
--- a/storage/ndb/include/kernel/signaldata/TabCommit.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/kernel/signaldata/TabCommit.hpp 2008-08-11 11:42:06 +0000
@@ -0,0 +1,42 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#ifndef TAB_COMMIT_HPP
+#define TAB_COMMIT_HPP
+
+struct TabCommitReq {
+ enum { SignalLength = 3 };
+ Uint32 senderData;
+ Uint32 senderRef;
+ Uint32 tableId;
+};
+
+struct TabCommitConf {
+ enum { SignalLength = 3 };
+ Uint32 senderData;
+ Uint32 nodeId;
+ Uint32 tableId;
+};
+
+struct TabCommitRef {
+ enum { SignalLength = 5 };
+ Uint32 senderData;
+ Uint32 nodeId;
+ Uint32 tableId;
+ Uint32 errorCode;
+ Uint32 tableStatus;
+};
+
+#endif
=== modified file 'storage/ndb/include/kernel/signaldata/UtilSequence.hpp'
--- a/storage/ndb/include/kernel/signaldata/UtilSequence.hpp 2008-03-03 11:12:37 +0000
+++ b/storage/ndb/include/kernel/signaldata/UtilSequence.hpp 2008-08-11 11:49:35 +0000
@@ -29,6 +29,7 @@ class UtilSequenceReq {
* Sender
*/
friend class Backup;
+ friend class BackupProxy;
friend class Suma;
friend bool printUTIL_SEQUENCE_REQ(FILE *, const Uint32 *, Uint32, Uint16);
=== modified file 'storage/ndb/src/kernel/SimBlockList.cpp'
--- a/storage/ndb/src/kernel/SimBlockList.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/SimBlockList.cpp 2008-08-11 11:21:36 +0000
@@ -75,9 +75,6 @@ void * operator new (size_t sz, SIMBLOCK
#define NEW_BLOCK(B) new(A_VALUE) B
#endif
-extern bool g_ndbMt;
-extern bool g_ndbMtLqh;
-
void
SimBlockList::load(EmulatorData& data){
noOfBlocks = NO_OF_BLOCKS;
@@ -102,10 +99,12 @@ SimBlockList::load(EmulatorData& data){
}
}
+ const bool mtLqh = globalData.isNdbMtLqh;
+
theList[0] = pg = NEW_BLOCK(Pgman)(ctx);
theList[1] = lg = NEW_BLOCK(Lgman)(ctx);
theList[2] = ts = NEW_BLOCK(Tsman)(ctx, pg, lg);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[3] = NEW_BLOCK(Dbacc)(ctx);
else
theList[3] = NEW_BLOCK(DbaccProxy)(ctx);
@@ -113,29 +112,29 @@ SimBlockList::load(EmulatorData& data){
theList[5] = fs;
theList[6] = dbdict = NEW_BLOCK(Dbdict)(ctx);
theList[7] = dbdih = NEW_BLOCK(Dbdih)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[8] = NEW_BLOCK(Dblqh)(ctx);
else
theList[8] = NEW_BLOCK(DblqhProxy)(ctx);
theList[9] = NEW_BLOCK(Dbtc)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[10] = NEW_BLOCK(Dbtup)(ctx, pg);
else
theList[10] = NEW_BLOCK(DbtupProxy)(ctx);//wl4391_todo pg
theList[11] = NEW_BLOCK(Ndbcntr)(ctx);
theList[12] = NEW_BLOCK(Qmgr)(ctx);
theList[13] = NEW_BLOCK(Trix)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[14] = NEW_BLOCK(Backup)(ctx);
else
theList[14] = NEW_BLOCK(BackupProxy)(ctx);
theList[15] = NEW_BLOCK(DbUtil)(ctx);
theList[16] = NEW_BLOCK(Suma)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[17] = NEW_BLOCK(Dbtux)(ctx);
else
theList[17] = NEW_BLOCK(DbtuxProxy)(ctx);
- if (!g_ndbMtLqh)
+ if (!mtLqh)
theList[18] = NEW_BLOCK(Restore)(ctx);
else
theList[18] = NEW_BLOCK(RestoreProxy)(ctx);
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp 2008-08-11 11:39:17 +0000
@@ -17,7 +17,8 @@
#include "LocalProxy.hpp"
LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
- SimulatedBlock(blockNumber, ctx)
+ SimulatedBlock(blockNumber, ctx),
+ c_nodeList(c_nodePool)
{
BLOCK_CONSTRUCTOR(LocalProxy);
@@ -28,9 +29,36 @@ LocalProxy::LocalProxy(BlockNumber block
for (i = 0; i < MaxWorkers; i++)
c_worker[i] = 0;
+ c_ssIdSeq = 0;
+
+ c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
+ c_masterNodeId = ZNIL;
+ c_nodePool.setSize(MAX_NDB_NODES);
+
// GSN_READ_CONFIG_REQ
addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF, true);
+
+ // GSN_STTOR
+ addRecSignal(GSN_STTOR, &LocalProxy::execSTTOR);
+ addRecSignal(GSN_STTORRY, &LocalProxy::execSTTORRY);
+
+ // GSN_NDB_STTOR
+ addRecSignal(GSN_NDB_STTOR, &LocalProxy::execNDB_STTOR);
+ addRecSignal(GSN_NDB_STTORRY, &LocalProxy::execNDB_STTORRY);
+
+ // GSN_READ_NODESREQ
+ addRecSignal(GSN_READ_NODESCONF, &LocalProxy::execREAD_NODESCONF);
+ addRecSignal(GSN_READ_NODESREF, &LocalProxy::execREAD_NODESREF);
+
+ // GSN_DUMP_STATE_ORD
+ addRecSignal(GSN_DUMP_STATE_ORD, &LocalProxy::execDUMP_STATE_ORD);
+
+ // GSN_NDB_TAMPER
+ addRecSignal(GSN_NDB_TAMPER, &LocalProxy::execNDB_TAMPER, true);
+
+ // GSN_TIME_SIGNAL
+ addRecSignal(GSN_TIME_SIGNAL, &LocalProxy::execTIME_SIGNAL);
}
LocalProxy::~LocalProxy()
@@ -38,21 +66,137 @@ LocalProxy::~LocalProxy()
// dtor of main block deletes workers
}
+// support routines
+
+void
+LocalProxy::sendREQ(Signal* signal, SsSequential& ss)
+{
+ ss.m_worker = 0;
+ ndbrequire(ss.m_sendREQ != 0);
+ (this->*ss.m_sendREQ)(signal, ss.m_ssId);
+}
+
+void
+LocalProxy::recvCONF(Signal* signal, SsSequential& ss)
+{
+ ndbrequire(ss.m_sendCONF != 0);
+ (this->*ss.m_sendCONF)(signal, ss.m_ssId);
+
+ ss.m_worker++;
+ if (ss.m_worker < c_workers) {
+ jam();
+ ndbrequire(ss.m_sendREQ != 0);
+ (this->*ss.m_sendREQ)(signal, ss.m_ssId);
+ return;
+ }
+}
+
+void
+LocalProxy::recvREF(Signal* signal, SsSequential& ss, Uint32 error)
+{
+ ndbrequire(error != 0);
+ if (ss.m_error == 0)
+ ss.m_error = error;
+ recvCONF(signal, ss);
+}
+
+void
+LocalProxy::skipReq(SsSequential& ss)
+{
+}
+
+bool
+LocalProxy::firstReply(const SsSequential& ss)
+{
+ return ss.m_worker == 0;
+}
+
+bool
+LocalProxy::lastReply(const SsSequential& ss)
+{
+ return ss.m_worker + 1 == c_workers;
+}
+
+void
+LocalProxy::sendREQ(Signal* signal, SsParallel& ss)
+{
+ ndbrequire(ss.m_sendREQ != 0);
+
+ ss.m_workerMask.clear();
+ ss.m_worker = 0;
+ while (ss.m_worker < c_workers) {
+ jam();
+ ss.m_workerMask.set(ss.m_worker);
+ (this->*ss.m_sendREQ)(signal, ss.m_ssId);
+ ss.m_worker++;
+ }
+}
+
+void
+LocalProxy::recvCONF(Signal* signal, SsParallel& ss)
+{
+ ndbrequire(ss.m_sendCONF != 0);
+
+ BlockReference ref = signal->getSendersBlockRef();
+ ndbrequire(refToMain(ref) == number());
+
+ ss.m_worker = refToInstance(ref) - 1;
+ ndbrequire(ref == workerRef(ss.m_worker));
+ ndbrequire(ss.m_worker < c_workers);
+ ndbrequire(ss.m_workerMask.get(ss.m_worker));
+ ss.m_workerMask.clear(ss.m_worker);
+
+ (this->*ss.m_sendCONF)(signal, ss.m_ssId);
+}
+
+void
+LocalProxy::recvREF(Signal* signal, SsParallel& ss, Uint32 error)
+{
+ ndbrequire(error != 0);
+ if (ss.m_error == 0)
+ ss.m_error = error;
+ recvCONF(signal, ss);
+}
+
+void
+LocalProxy::skipReq(SsParallel& ss)
+{
+ ndbrequire(ss.m_workerMask.get(ss.m_worker));
+ ss.m_workerMask.clear(ss.m_worker);
+}
+
+bool
+LocalProxy::firstReply(const SsParallel& ss)
+{
+ const WorkerMask& mask = ss.m_workerMask;
+ const Uint32 count = mask.count();
+
+ // recvCONF has cleared current worker
+ ndbrequire(ss.m_worker < c_workers);
+ ndbrequire(!mask.get(ss.m_worker));
+ ndbrequire(count < c_workers);
+ return count + 1 == c_workers;
+}
+
+bool
+LocalProxy::lastReply(const SsParallel& ss)
+{
+ return ss.m_workerMask.isclear();
+}
+
// GSN_READ_CONFIG_REQ
void
LocalProxy::execREAD_CONFIG_REQ(Signal* signal)
{
- Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
- ndbrequire(!ss.m_active);
- ss.m_active = true;
+ Ss_READ_CONFIG_REQ& ss = ssSeize<Ss_READ_CONFIG_REQ>();
const ReadConfigReq* req = (const ReadConfigReq*)signal->getDataPtr();
- ss.m_readConfigReq = *req;
- ndbrequire(ss.m_readConfigReq.noOfParameters == 0);
+ ss.m_req = *req;
+ ndbrequire(ss.m_req.noOfParameters == 0);
- const Uint32 workers = globalData.ndbmtWorkers;
- const Uint32 threads = globalData.ndbmtThreads;
+ const Uint32 workers = globalData.ndbMtLqhWorkers;
+ const Uint32 threads = globalData.ndbMtLqhThreads;
Uint32 i;
for (i = 0; i < workers; i++) {
@@ -62,7 +206,7 @@ LocalProxy::execREAD_CONFIG_REQ(Signal*
ndbrequire(this->getInstance(instanceNo) == worker);
c_worker[i] = worker;
- add_worker_thr_map(number(), instanceNo);
+ add_lqh_worker_thr_map(number(), instanceNo);
}
// set after instances are created (sendpacked)
@@ -70,52 +214,327 @@ LocalProxy::execREAD_CONFIG_REQ(Signal*
c_threads = threads;
// run sequentially due to big mallocs and initializations
- sendREAD_CONFIG_REQ(signal, 0);
+ sendREQ(signal, ss);
}
void
-LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 i)
+LocalProxy::sendREAD_CONFIG_REQ(Signal* signal, Uint32 ssId)
{
- Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
+ Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
ReadConfigReq* req = (ReadConfigReq*)signal->getDataPtrSend();
req->senderRef = reference();
- req->senderData = i;
+ req->senderData = ssId;
req->noOfParameters = 0;
- sendSignal(workerRef(i), GSN_READ_CONFIG_REQ,
+ sendSignal(workerRef(ss.m_worker), GSN_READ_CONFIG_REQ,
signal, ReadConfigReq::SignalLength, JBB);
- // for verification only
- ss.m_worker = i;
}
void
LocalProxy::execREAD_CONFIG_CONF(Signal* signal)
{
- Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
- ndbrequire(ss.m_active);
-
const ReadConfigConf* conf = (const ReadConfigConf*)signal->getDataPtr();
- ndbrequire(ss.m_worker == conf->senderData);
- if (ss.m_worker + 1 < c_workers) {
- jam();
- sendREAD_CONFIG_REQ(signal, ss.m_worker + 1);
+ Uint32 ssId = conf->senderData;
+ Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+LocalProxy::sendREAD_CONFIG_CONF(Signal* signal, Uint32 ssId)
+{
+ Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
+
+ if (!lastReply(ss))
return;
+
+ ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = ss.m_req.senderData;
+ sendSignal(ss.m_req.senderRef, GSN_READ_CONFIG_CONF,
+ signal, ReadConfigConf::SignalLength, JBB);
+
+ ssRelease<Ss_READ_CONFIG_REQ>(ssId);
+}
+
+// GSN_STTOR
+
+void
+LocalProxy::execSTTOR(Signal* signal)
+{
+ Ss_STTOR& ss = ssSeize<Ss_STTOR>(1);
+
+ const Uint32 startphase = signal->theData[1];
+ const Uint32 typeOfStart = signal->theData[7];
+
+ if (startphase == 3) {
+ jam();
+ c_typeOfStart = typeOfStart;
}
- sendREAD_CONFIG_CONF(signal);
- ss.m_active = false;
+ ss.m_reqlength = signal->getLength();
+ memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
+
+ callSTTOR(signal);
}
void
-LocalProxy::sendREAD_CONFIG_CONF(Signal* signal)
+LocalProxy::callSTTOR(Signal* signal)
{
- Ss_READ_CONFIG_REQ& ss = c_ss_READ_CONFIG_REQ;
+ backSTTOR(signal);
+}
- ReadConfigConf* conf = (ReadConfigConf*)signal->getDataPtrSend();
+void
+LocalProxy::backSTTOR(Signal* signal)
+{
+ Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
+ sendREQ(signal, ss);
+}
+
+void
+LocalProxy::sendSTTOR(Signal* signal, Uint32 ssId)
+{
+ Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
+
+ memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
+ sendSignal(workerRef(ss.m_worker), GSN_STTOR,
+ signal, ss.m_reqlength, JBB);
+}
+
+void
+LocalProxy::execSTTORRY(Signal* signal)
+{
+ Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
+ recvCONF(signal, ss);
+}
+
+void
+LocalProxy::sendSTTORRY(Signal* signal, Uint32 ssId)
+{
+ Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
+
+ const Uint32 conflength = signal->getLength();
+ const Uint32* confdata = signal->getDataPtr();
+
+ // the reply is identical from all
+ if (firstReply(ss)) {
+ ss.m_conflength = conflength;
+ memcpy(ss.m_confdata, confdata, conflength << 2);
+ } else {
+ ndbrequire(ss.m_conflength == conflength);
+ ndbrequire(memcmp(ss.m_confdata, confdata, conflength << 2) == 0);
+ }
+
+ if (!lastReply(ss))
+ return;
+
+ memcpy(signal->getDataPtrSend(), ss.m_confdata, ss.m_conflength << 2);
+ sendSignal(NDBCNTR_REF, GSN_STTORRY,
+ signal, ss.m_conflength, JBB);
+
+ ssRelease<Ss_STTOR>(ssId);
+}
+
+// GSN_NDB_STTOR
+
+void
+LocalProxy::execNDB_STTOR(Signal* signal)
+{
+ Ss_NDB_STTOR& ss = ssSeize<Ss_NDB_STTOR>(1);
+
+ const NdbSttor* req = (const NdbSttor*)signal->getDataPtr();
+ ss.m_req = *req;
+
+ callNDB_STTOR(signal);
+}
+
+void
+LocalProxy::callNDB_STTOR(Signal* signal)
+{
+ backNDB_STTOR(signal);
+}
+
+void
+LocalProxy::backNDB_STTOR(Signal* signal)
+{
+ Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
+ sendREQ(signal, ss);
+}
+
+void
+LocalProxy::sendNDB_STTOR(Signal* signal, Uint32 ssId)
+{
+ Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
+
+ NdbSttor* req = (NdbSttor*)signal->getDataPtrSend();
+ *req = ss.m_req;
+ req->senderRef = reference();
+ sendSignal(workerRef(ss.m_worker), GSN_NDB_STTOR,
+ signal, ss.m_reqlength, JBB);
+}
+
+void
+LocalProxy::execNDB_STTORRY(Signal* signal)
+{
+ Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
+
+ // the reply contains only senderRef
+ const NdbSttorry* conf = (const NdbSttorry*)signal->getDataPtr();
+ ndbrequire(conf->senderRef == signal->getSendersBlockRef());
+ recvCONF(signal, ss);
+}
+
+void
+LocalProxy::sendNDB_STTORRY(Signal* signal, Uint32 ssId)
+{
+ Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
+
+ if (!lastReply(ss))
+ return;
+
+ NdbSttorry* conf = (NdbSttorry*)signal->getDataPtrSend();
conf->senderRef = reference();
- conf->senderData = ss.m_readConfigReq.senderData;
- sendSignal(ss.m_readConfigReq.senderRef, GSN_READ_CONFIG_CONF,
- signal, ReadConfigConf::SignalLength, JBB);
+ sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY,
+ signal, NdbSttorry::SignalLength, JBB);
+
+ ssRelease<Ss_NDB_STTOR>(ssId);
+}
+
+// GSN_READ_NODESREQ
+
+void
+LocalProxy::sendREAD_NODESREQ(Signal* signal)
+{
+ signal->theData[0] = reference();
+ sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
+}
+
+void
+LocalProxy::execREAD_NODESCONF(Signal* signal)
+{
+ Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
+
+ const ReadNodesConf* conf = (const ReadNodesConf*)signal->getDataPtr();
+
+ ndbrequire(c_nodePool.getNoOfFree() == c_nodePool.getSize());
+ Uint32 count = 0;
+ Uint32 i;
+ for (i = 0; i < MAX_NDB_NODES; i++) {
+ if (NdbNodeBitmask::get(conf->allNodes, i)) {
+ jam();
+ count++;
+
+ NodePtr nodePtr;
+ bool ok = c_nodePool.seize(nodePtr);
+ ndbrequire(ok);
+ new (nodePtr.p) Node;
+
+ nodePtr.p->m_nodeId = i;
+ if (NdbNodeBitmask::get(conf->inactiveNodes, i)) {
+ jam();
+ nodePtr.p->m_alive = false;
+ } else {
+ jam();
+ nodePtr.p->m_alive = true;
+ }
+
+ c_nodeList.addLast(nodePtr);
+ }
+ }
+ ndbrequire(count != 0 && count == conf->noOfNodes);
+
+ c_masterNodeId = conf->masterNodeId;
+
+ switch (ss.m_gsn) {
+ case GSN_STTOR:
+ backSTTOR(signal);
+ break;
+ case GSN_NDB_STTOR:
+ backNDB_STTOR(signal);
+ break;
+ default:
+ ndbrequire(false);
+ break;
+ }
+
+ ss.m_gsn = 0;
+}
+
+void
+LocalProxy::execREAD_NODESREF(Signal* signal)
+{
+ Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
+ ndbrequire(ss.m_gsn != 0);
+ ndbrequire(false);
+}
+
+// GSN_DUMP_STATE_ORD
+
+void
+LocalProxy::execDUMP_STATE_ORD(Signal* signal)
+{
+ Ss_DUMP_STATE_ORD& ss = ssSeize<Ss_DUMP_STATE_ORD>();
+
+ ss.m_reqlength = signal->getLength();
+ memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
+ sendREQ(signal, ss);
+ ssRelease<Ss_DUMP_STATE_ORD>(ss);
+}
+
+void
+LocalProxy::sendDUMP_STATE_ORD(Signal* signal, Uint32 ssId)
+{
+ Ss_DUMP_STATE_ORD& ss = ssFind<Ss_DUMP_STATE_ORD>(ssId);
+
+ memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
+ sendSignal(workerRef(ss.m_worker), GSN_DUMP_STATE_ORD,
+ signal, ss.m_reqlength, JBB);
+}
+
+// GSN_NDB_TAMPER
+
+void
+LocalProxy::execNDB_TAMPER(Signal* signal)
+{
+ Ss_NDB_TAMPER& ss = ssSeize<Ss_NDB_TAMPER>();
+
+ ndbrequire(signal->getLength() == 1);
+ ss.m_errorInsert = signal->theData[0];
+
+ SimulatedBlock::execNDB_TAMPER(signal);
+ sendREQ(signal, ss);
+ ssRelease<Ss_NDB_TAMPER>(ss);
+}
+
+void
+LocalProxy::sendNDB_TAMPER(Signal* signal, Uint32 ssId)
+{
+ Ss_NDB_TAMPER& ss = ssFind<Ss_NDB_TAMPER>(ssId);
+
+ signal->theData[0] = ss.m_errorInsert;
+ sendSignal(workerRef(ss.m_worker), GSN_NDB_TAMPER,
+ signal, 1, JBB);
+}
+
+// GSN_TIME_SIGNAL
+
+void
+LocalProxy::execTIME_SIGNAL(Signal* signal)
+{
+ Ss_TIME_SIGNAL& ss = ssSeize<Ss_TIME_SIGNAL>();
+
+ // could use same for MT TC
+ ndbrequire(number() == DBLQH);
+ sendREQ(signal, ss);
+ ssRelease<Ss_TIME_SIGNAL>(ss);
+}
+
+void
+LocalProxy::sendTIME_SIGNAL(Signal* signal, Uint32 ssId)
+{
+ Ss_TIME_SIGNAL& ss = ssFind<Ss_TIME_SIGNAL>(ssId);
+ signal->theData[0] = 0;
+ sendSignal(workerRef(ss.m_worker), GSN_TIME_SIGNAL,
+ signal, 1, JBB);
}
BLOCK_FUNCTIONS(LocalProxy)
=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp 2008-08-11 11:39:17 +0000
@@ -18,7 +18,11 @@
#include <pc.hpp>
#include <SimulatedBlock.hpp>
+#include <Bitmask.hpp>
+#include <DLFifoList.hpp>
#include <signaldata/ReadConfig.hpp>
+#include <signaldata/NdbSttor.hpp>
+#include <signaldata/ReadNodesConf.hpp>
/*
* Proxy blocks for MT LQH.
@@ -38,7 +42,8 @@ public:
BLOCK_DEFINES(LocalProxy);
protected:
- enum { MaxWorkers = MAX_NDBMT_WORKERS };
+ enum { MaxWorkers = MAX_NDBMT_LQH_WORKERS };
+ typedef Bitmask<MaxWorkers> WorkerMask;
Uint32 c_workers;
Uint32 c_threads;
SimulatedBlock* c_worker[MaxWorkers];
@@ -47,23 +52,285 @@ protected:
// worker index to worker ref
BlockReference workerRef(Uint32 i) {
+ ndbrequire(i < c_workers);
return numberToRef(number(), 1 + i, getOwnNodeId());
}
+ // support routines and classes ("Ss" = signal state)
+
+ typedef void (LocalProxy::*SsFUNC)(Signal*, Uint32 ssId);
+
+ struct SsCommon {
+ Uint32 m_ssId; // unique id in SsPool (below)
+ SsFUNC m_sendREQ; // from proxy to worker
+ SsFUNC m_sendCONF; // from proxy to caller
+ Uint32 m_worker; // current worker
+ Uint32 m_error;
+ SsCommon() {
+ m_ssId = 0;
+ m_sendREQ = 0;
+ m_sendCONF = 0;
+ m_worker = 0;
+ m_error = 0;
+ }
+ };
+
+ // run workers sequentially
+ struct SsSequential : SsCommon {
+ };
+ void sendREQ(Signal*, SsSequential& ss);
+ void recvCONF(Signal*, SsSequential& ss);
+ void recvREF(Signal*, SsSequential& ss, Uint32 error);
+ // for use in sendREQ
+ void skipReq(SsSequential& ss);
+ // for use in sendCONF
+ bool firstReply(const SsSequential& ss);
+ bool lastReply(const SsSequential& ss);
+
+ // run workers in parallel
+ struct SsParallel : SsCommon {
+ WorkerMask m_workerMask;
+ SsParallel() {
+ m_workerMask.clear();
+ }
+ };
+ void sendREQ(Signal*, SsParallel& ss);
+ void recvCONF(Signal*, SsParallel& ss);
+ void recvREF(Signal*, SsParallel& ss, Uint32 error);
+ // for use in sendREQ
+ void skipReq(SsParallel& ss);
+ // for use in sendCONF
+ bool firstReply(const SsParallel& ss);
+ bool lastReply(const SsParallel& ss);
+
+ /*
+ * Ss instances are seized from a pool. Each pool is simply an array
+ * of Ss instances. Usually poolSize is 1. Some signals need a few
+ * more but the heavy stuff (query/DML) by-passes the proxy.
+ *
+ * Each Ss instance has a unique Uint32 ssId. If there are multiple
+ * instances then ssId must be computable from signal data. One option
+ * often is to use a generated ssId and set it as senderData,
+ */
+
+ template <class Ss>
+ struct SsPool {
+ Ss m_pool[Ss::poolSize];
+ };
+
+ Uint32 c_ssIdSeq;
+
+ // convenient for adding non-zero high nibble
+ enum { SsIdBase = (1 << 28) };
+
+ template <class Ss>
+ Ss& ssSeize() {
+ const Uint32 base = (1 << 28);
+ const Uint32 mask = (1 << 28) - 1;
+ Uint32 ssId = base | c_ssIdSeq;
+ c_ssIdSeq = (c_ssIdSeq + 1) & mask;
+ return ssSeize<Ss>(ssId);
+ }
+
+ template <class Ss>
+ Ss& ssSeize(Uint32 ssId) {
+ SsPool<Ss>& sp = Ss::pool(this);
+ ndbrequire(ssId != 0);
+ Ss* ssptr = 0;
+ for (Uint32 i = 0; i < Ss::poolSize; i++) {
+ Ss& ss = sp.m_pool[i];
+ ndbrequire(ss.m_ssId != ssId);
+ if (ss.m_ssId == 0 && ssptr == 0) {
+ new (&ss) Ss;
+ ss.m_ssId = ssId;
+ ssptr = &ss;
+ // keep looping to verify ssId is unique
+ }
+ }
+ ndbrequire(ssptr != 0);
+ return *ssptr;
+ }
+
+ template <class Ss>
+ Ss& ssFind(Uint32 ssId) {
+ SsPool<Ss>& sp = Ss::pool(this);
+ ndbrequire(ssId != 0);
+ Ss* ssptr = 0;
+ for (Uint32 i = 0; i < Ss::poolSize; i++) {
+ Ss& ss = sp.m_pool[i];
+ if (ss.m_ssId == ssId) {
+ ssptr = &ss;
+ break;
+ }
+ }
+ ndbrequire(ssptr != 0);
+ return *ssptr;
+ }
+
+ template <class Ss>
+ void ssRelease(Uint32 ssId) {
+ SsPool<Ss>& sp = Ss::pool(this);
+ ndbrequire(ssId != 0);
+ Ss* ssptr = 0;
+ for (Uint32 i = 0; i < Ss::poolSize; i++) {
+ Ss& ss = sp.m_pool[i];
+ if (ss.m_ssId == ssId) {
+ ss.m_ssId = 0;
+ ssptr = &ss;
+ break;
+ }
+ }
+ ndbrequire(ssptr != 0);
+ }
+
+ template <class Ss>
+ void ssRelease(Ss& ss) {
+ ssRelease<Ss>(ss.m_ssId);
+ }
+
+ // system info
+
+ Uint32 c_typeOfStart;
+ Uint32 c_masterNodeId;
+
+ struct Node {
+ Uint32 m_nodeId;
+ bool m_alive;
+ Node() {
+ m_nodeId = 0;
+ m_alive = false;
+ }
+ Uint32 nextList;
+ union {
+ Uint32 prevList;
+ Uint32 nextPool;
+ };
+ };
+ typedef Ptr<Node> NodePtr;
+ ArrayPool<Node> c_nodePool;
+ DLFifoList<Node> c_nodeList;
+
// GSN_READ_CONFIG_REQ
- struct Ss_READ_CONFIG_REQ {
- bool m_active;
- Uint32 m_worker;
- ReadConfigReq m_readConfigReq;
- Ss_READ_CONFIG_REQ() :
- m_active(false)
- {}
+ struct Ss_READ_CONFIG_REQ : SsSequential {
+ ReadConfigReq m_req;
+ Ss_READ_CONFIG_REQ() {
+ m_sendREQ = &LocalProxy::sendREAD_CONFIG_REQ;
+ m_sendCONF = &LocalProxy::sendREAD_CONFIG_CONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_READ_CONFIG_REQ>& pool(LocalProxy* proxy) {
+ return proxy->c_ss_READ_CONFIG_REQ;
+ }
};
- Ss_READ_CONFIG_REQ c_ss_READ_CONFIG_REQ;
+ SsPool<Ss_READ_CONFIG_REQ> c_ss_READ_CONFIG_REQ;
void execREAD_CONFIG_REQ(Signal*);
- void sendREAD_CONFIG_REQ(Signal*, Uint32 i);
+ void sendREAD_CONFIG_REQ(Signal*, Uint32 ssId);
void execREAD_CONFIG_CONF(Signal*);
- void sendREAD_CONFIG_CONF(Signal*);
+ void sendREAD_CONFIG_CONF(Signal*, Uint32 ssId);
+
+ // GSN_STTOR
+ struct Ss_STTOR : SsParallel {
+ Uint32 m_reqlength;
+ Uint32 m_reqdata[25];
+ Uint32 m_conflength;
+ Uint32 m_confdata[25];
+ Ss_STTOR() {
+ m_sendREQ = &LocalProxy::sendSTTOR;
+ m_sendCONF = &LocalProxy::sendSTTORRY;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_STTOR>& pool(LocalProxy* proxy) {
+ return proxy->c_ss_STTOR;
+ }
+ };
+ SsPool<Ss_STTOR> c_ss_STTOR;
+ void execSTTOR(Signal*);
+ virtual void callSTTOR(Signal*);
+ void backSTTOR(Signal*);
+ void sendSTTOR(Signal*, Uint32 ssId);
+ void execSTTORRY(Signal*);
+ void sendSTTORRY(Signal*, Uint32 ssId);
+
+ // GSN_NDB_STTOR
+ struct Ss_NDB_STTOR : SsParallel {
+ NdbSttor m_req;
+ enum { m_reqlength = sizeof(NdbSttor) >> 2 };
+ Ss_NDB_STTOR() {
+ m_sendREQ = &LocalProxy::sendNDB_STTOR;
+ m_sendCONF = &LocalProxy::sendNDB_STTORRY;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_NDB_STTOR>& pool(LocalProxy* proxy) {
+ return proxy->c_ss_NDB_STTOR;
+ }
+ };
+ SsPool<Ss_NDB_STTOR> c_ss_NDB_STTOR;
+ void execNDB_STTOR(Signal*);
+ virtual void callNDB_STTOR(Signal*);
+ void backNDB_STTOR(Signal*);
+ void sendNDB_STTOR(Signal*, Uint32 ssId);
+ void execNDB_STTORRY(Signal*);
+ void sendNDB_STTORRY(Signal*, Uint32 ssId);
+
+ // GSN_READ_NODESREQ
+ struct Ss_READ_NODES_REQ {
+ GlobalSignalNumber m_gsn; // STTOR or NDB_STTOR
+ Ss_READ_NODES_REQ() {
+ m_gsn = 0;
+ }
+ };
+ Ss_READ_NODES_REQ c_ss_READ_NODESREQ;
+ void sendREAD_NODESREQ(Signal*);
+ void execREAD_NODESCONF(Signal*);
+ void execREAD_NODESREF(Signal*);
+
+ // GSN_DUMP_STATE_ORD
+ struct Ss_DUMP_STATE_ORD : SsParallel {
+ Uint32 m_reqlength;
+ Uint32 m_reqdata[25];
+ Ss_DUMP_STATE_ORD() {
+ m_sendREQ = &LocalProxy::sendDUMP_STATE_ORD;
+ m_sendCONF = 0;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_DUMP_STATE_ORD>& pool(LocalProxy* proxy) {
+ return proxy->c_ss_DUMP_STATE_ORD;
+ }
+ };
+ SsPool<Ss_DUMP_STATE_ORD> c_ss_DUMP_STATE_ORD;
+ void execDUMP_STATE_ORD(Signal*);
+ void sendDUMP_STATE_ORD(Signal*, Uint32 ssId);
+
+ // GSN_NDB_TAMPER
+ struct Ss_NDB_TAMPER : SsParallel {
+ Uint32 m_errorInsert;
+ Ss_NDB_TAMPER() {
+ m_sendREQ = &LocalProxy::sendNDB_TAMPER;
+ m_sendCONF = 0;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_NDB_TAMPER>& pool(LocalProxy* proxy) {
+ return proxy->c_ss_NDB_TAMPER;
+ }
+ };
+ SsPool<Ss_NDB_TAMPER> c_ss_NDB_TAMPER;
+ void execNDB_TAMPER(Signal*);
+ void sendNDB_TAMPER(Signal*, Uint32 ssId);
+
+ // GSN_TIME_SIGNAL
+ struct Ss_TIME_SIGNAL : SsParallel {
+ Ss_TIME_SIGNAL() {
+ m_sendREQ = &LocalProxy::sendTIME_SIGNAL;
+ m_sendCONF = 0;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_TIME_SIGNAL>& pool(LocalProxy* proxy) {
+ return proxy->c_ss_TIME_SIGNAL;
+ }
+ };
+ SsPool<Ss_TIME_SIGNAL> c_ss_TIME_SIGNAL;
+ void execTIME_SIGNAL(Signal*);
+ void sendTIME_SIGNAL(Signal*, Uint32 ssId);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-06-09 14:32:01 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2008-08-11 11:49:35 +0000
@@ -94,7 +94,7 @@ Backup::execSTTOR(Signal* signal)
m_reset_disk_speed_time = NdbTick_CurrentMillisecond();
m_reset_delay_used = Backup::DISK_SPEED_CHECK_DELAY;
signal->theData[0] = BackupContinueB::RESET_DISK_SPEED_COUNTER;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal,
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal,
Backup::DISK_SPEED_CHECK_DELAY, 1);
}
if (startphase == 3) {
@@ -111,7 +111,7 @@ Backup::execSTTOR(Signal* signal)
}
if(startphase == 7 && g_TypeOfStart == NodeState::ST_INITIAL_START &&
- c_masterNodeId == getOwnNodeId()){
+ c_masterNodeId == getOwnNodeId() && !isNdbMtLqh()){
jam();
createSequence(signal);
return;
@@ -163,7 +163,8 @@ Backup::sendSTTORRY(Signal* signal)
signal->theData[4] = 3;
signal->theData[5] = 7;
signal->theData[6] = 255; // No more start phases from missra
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 7, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : BACKUP_REF;
+ sendSignal(cntrRef, GSN_STTORRY, signal, 7, JBB);
}
void
@@ -216,7 +217,7 @@ Backup::execCONTINUEB(Signal* signal)
delay_time = Backup::DISK_SPEED_CHECK_DELAY - (sig_delay - delay_time);
m_reset_delay_used= delay_time;
signal->theData[0] = BackupContinueB::RESET_DISK_SPEED_COUNTER;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, delay_time, 1);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, delay_time, 1);
#if 0
ndbout << "Signal delay was = " << sig_delay;
ndbout << " Current time = " << curr_time << endl;
@@ -343,7 +344,7 @@ Backup::execCONTINUEB(Signal* signal)
signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
signal->theData[1] = Tdata1;
signal->theData[2] = Tdata2;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 3);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 3);
return;
}//if
@@ -375,7 +376,8 @@ Backup::execCONTINUEB(Signal* signal)
ndbout_c("Resuming backup");
memmove(signal->theData, signal->theData + 1,
4*ScanFragNextReq::SignalLength);
- sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
+ BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+ sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
return ;
}
@@ -3232,7 +3234,7 @@ Backup::openFilesReply(Signal* signal,
signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 3);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 3);
return;
}
@@ -3433,7 +3435,7 @@ Backup::afterGetTabinfoLockTab(Signal *s
signal->theData[0] = BackupContinueB::BUFFER_FULL_META;
signal->theData[1] = ptr.i;
signal->theData[2] = tabPtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 3);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 3);
return;
}
@@ -3539,7 +3541,7 @@ Backup::parseTableDescription(Signal* si
if (lcp)
{
- Dbtup* tup = (Dbtup*)globalData.getBlock(DBTUP);
+ Dbtup* tup = (Dbtup*)globalData.getBlock(DBTUP, instance());
tabPtr.p->maxRecordSize = 1 + tup->get_max_lcp_record_size(tmpTab.TableId);
}
else
@@ -3844,7 +3846,8 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal*
req->clientOpPtr= filePtr.i;
req->batch_size_rows= parallelism;
req->batch_size_bytes= 0;
- sendSignal(DBLQH_REF, GSN_SCAN_FRAGREQ, signal,
+ BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+ sendSignal(lqhRef, GSN_SCAN_FRAGREQ, signal,
ScanFragReq::SignalLength, JBB);
signal->theData[0] = filePtr.i;
@@ -3862,7 +3865,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal*
memcpy(signal->theData + dataPos, table.attrInfo, 4*table.attrInfoLen);
dataPos += table.attrInfoLen;
ndbassert(dataPos < 25);
- sendSignal(DBLQH_REF, GSN_ATTRINFO, signal, dataPos, JBB);
+ sendSignal(lqhRef, GSN_ATTRINFO, signal, dataPos, JBB);
}
}
@@ -4154,6 +4157,7 @@ void
Backup::checkScan(Signal* signal, BackupFilePtr filePtr)
{
OperationRecord & op = filePtr.p->operation;
+ BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
if(filePtr.p->errorCode != 0)
{
@@ -4168,7 +4172,7 @@ Backup::checkScan(Signal* signal, Backup
req->closeFlag = 1;
req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);
- sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
+ sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
return;
}//if
@@ -4200,12 +4204,12 @@ Backup::checkScan(Signal* signal, Backup
return;
}
if(ERROR_INSERTED(10032))
- sendSignalWithDelay(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
+ sendSignalWithDelay(lqhRef, GSN_SCAN_NEXTREQ, signal,
100, ScanFragNextReq::SignalLength);
else if(ERROR_INSERTED(10033))
{
SET_ERROR_INSERT_VALUE(10032);
- sendSignalWithDelay(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
+ sendSignalWithDelay(lqhRef, GSN_SCAN_NEXTREQ, signal,
10000, ScanFragNextReq::SignalLength);
BackupRecordPtr ptr LINT_SET_PTR;
@@ -4220,7 +4224,7 @@ Backup::checkScan(Signal* signal, Backup
}
else
{
- sendSignal(DBLQH_REF, GSN_SCAN_NEXTREQ, signal,
+ sendSignal(lqhRef, GSN_SCAN_NEXTREQ, signal,
ScanFragNextReq::SignalLength, JBB);
/*
@@ -4394,7 +4398,7 @@ Backup::checkFile(Signal* signal, Backup
jam();
signal->theData[0] = BackupContinueB::BUFFER_UNDERFLOW;
signal->theData[1] = filePtr.i;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 20, 2);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 20, 2);
return;
}
else if (sz > 0)
@@ -5299,7 +5303,7 @@ Backup::lcp_open_file_done(Signal* signa
signal->theData[0] = BackupContinueB::START_FILE_THREAD;
signal->theData[1] = filePtr.i;
signal->theData[2] = __LINE__;
- sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 3);
+ sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100, 3);
}
void
=== modified file 'storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupProxy.cpp 2008-08-11 11:49:35 +0000
@@ -19,6 +19,9 @@
BackupProxy::BackupProxy(Block_context& ctx) :
LocalProxy(BACKUP, ctx)
{
+ // GSN_STTOR
+ addRecSignal(GSN_UTIL_SEQUENCE_CONF, &BackupProxy::execUTIL_SEQUENCE_CONF);
+ addRecSignal(GSN_UTIL_SEQUENCE_REF, &BackupProxy::execUTIL_SEQUENCE_REF);
}
BackupProxy::~BackupProxy()
@@ -31,4 +34,60 @@ BackupProxy::newWorker(Uint32 instanceNo
return new Backup(m_ctx, instanceNo);
}
+// GSN_STTOR
+
+void
+BackupProxy::callSTTOR(Signal* signal)
+{
+ Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
+ ndbrequire(ss.m_gsn == 0);
+
+ const Uint32 startPhase = signal->theData[1];
+ switch (startPhase) {
+ case 3:
+ ss.m_gsn = GSN_STTOR;
+ sendREAD_NODESREQ(signal);
+ break;
+ case 7:
+ if (c_typeOfStart == NodeState::ST_INITIAL_START &&
+ c_masterNodeId == getOwnNodeId()) {
+ jam();
+ sendUTIL_SEQUENCE_REQ(signal);
+ return;
+ }
+ backSTTOR(signal);
+ break;
+ default:
+ backSTTOR(signal);
+ break;
+ }
+}
+
+static const Uint32 BACKUP_SEQUENCE = 0x1F000000;
+
+void
+BackupProxy::sendUTIL_SEQUENCE_REQ(Signal* signal)
+{
+ UtilSequenceReq* req = (UtilSequenceReq*)signal->getDataPtrSend();
+
+ req->senderData = RNIL;
+ req->sequenceId = BACKUP_SEQUENCE;
+ req->requestType = UtilSequenceReq::Create;
+
+ sendSignal(DBUTIL_REF, GSN_UTIL_SEQUENCE_REQ,
+ signal, UtilSequenceReq::SignalLength, JBB);
+}
+
+void
+BackupProxy::execUTIL_SEQUENCE_CONF(Signal* signal)
+{
+ backSTTOR(signal);
+}
+
+void
+BackupProxy::execUTIL_SEQUENCE_REF(Signal* signal)
+{
+ ndbrequire(false);
+}
+
BLOCK_FUNCTIONS(BackupProxy)
=== modified file 'storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/BackupProxy.hpp 2008-08-11 11:49:35 +0000
@@ -17,6 +17,7 @@
#define NDB_BACKUP_PROXY_HPP
#include <LocalProxy.hpp>
+#include <signaldata/UtilSequence.hpp>
class BackupProxy : public LocalProxy {
public:
@@ -26,6 +27,12 @@ public:
protected:
virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+
+ // GSN_STTOR
+ virtual void callSTTOR(Signal*);
+ void sendUTIL_SEQUENCE_REQ(Signal*);
+ void execUTIL_SEQUENCE_CONF(Signal*);
+ void execUTIL_SEQUENCE_REF(Signal*);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2008-06-05 20:19:01 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccMain.cpp 2008-08-11 11:53:43 +0000
@@ -231,8 +231,8 @@ void Dbacc::execSTTOR(Signal* signal)
switch (tstartphase) {
case 1:
jam();
- ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP)) != 0);
- ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH)) != 0);
+ ndbrequire((c_tup = (Dbtup*)globalData.getBlock(DBTUP, instance())) != 0);
+ ndbrequire((c_lqh = (Dblqh*)globalData.getBlock(DBLQH, instance())) != 0);
break;
}
tuserblockref = signal->theData[3];
@@ -247,7 +247,7 @@ void Dbacc::execSTTOR(Signal* signal)
void Dbacc::ndbrestart1Lab(Signal* signal)
{
cmynodeid = globalData.ownId;
- cownBlockref = numberToRef(DBACC, cmynodeid);
+ cownBlockref = calcInstanceBlockRef(DBACC);
czero = 0;
cminusOne = czero - 1;
ctest = 0;
@@ -384,7 +384,8 @@ void Dbacc::sttorrysignalLab(Signal* sig
/* SIGNAL VERSION NUMBER */
signal->theData[3] = ZSPH1;
signal->theData[4] = 255;
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : DBACC_REF;
+ sendSignal(cntrRef, GSN_STTORRY, signal, 5, JBB);
/* END OF START PHASES */
return;
}//Dbacc::sttorrysignalLab()
@@ -730,8 +731,8 @@ void Dbacc::releaseRootFragResources(Sig
tabPtr.i = tableId;
ptrCheckGuard(tabPtr, ctablesize, tabrec);
- //XXX ugly
- if (refToBlock(tabPtr.p->tabUserRef) == DBDICT)
+ const BlockNumber dictBlock = !isNdbMtLqh() ? DBDICT : DBACC;
+ if (refToBlock(tabPtr.p->tabUserRef) == dictBlock)
{
jam();
for (Uint32 i = 0; i < MAX_FRAG_PER_NODE; i++) {
@@ -6395,7 +6396,7 @@ void Dbacc::execNEXT_SCANREQ(Signal* sig
if (tscanNextFlag == NextScanReq::ZSCAN_COMMIT) {
jam();
signal->theData[0] = scanPtr.p->scanUserptr;
- Uint32 blockNo = refToBlock(scanPtr.p->scanUserblockref);
+ Uint32 blockNo = refToMain(scanPtr.p->scanUserblockref);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, 1);
return;
}//if
@@ -7394,7 +7395,7 @@ bool Dbacc::searchScanContainer(Signal*
void Dbacc::sendNextScanConf(Signal* signal)
{
scanPtr.p->scanTimer = scanPtr.p->scanContinuebCounter;
- Uint32 blockNo = refToBlock(scanPtr.p->scanUserblockref);
+ Uint32 blockNo = refToMain(scanPtr.p->scanUserblockref);
jam();
/** ---------------------------------------------------------------------
* LQH WILL NOT HAVE ANY USE OF THE TUPLE KEY LENGTH IN THIS CASE AND
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.cpp 2008-08-11 11:53:43 +0000
@@ -19,6 +19,9 @@
DbaccProxy::DbaccProxy(Block_context& ctx) :
LocalProxy(DBACC, ctx)
{
+ // GSN_DROP_TAB_REQ
+ addRecSignal(GSN_DROP_TAB_REQ, &DbaccProxy::execDROP_TAB_REQ);
+ addRecSignal(GSN_DROP_TAB_CONF, &DbaccProxy::execDROP_TAB_CONF);
}
DbaccProxy::~DbaccProxy()
@@ -31,4 +34,63 @@ DbaccProxy::newWorker(Uint32 instanceNo)
return new Dbacc(m_ctx, instanceNo);
}
+// GSN_DROP_TAB_REQ
+
+void
+DbaccProxy::execDROP_TAB_REQ(Signal* signal)
+{
+ const DropTabReq* req = (const DropTabReq*)signal->getDataPtr();
+ Uint32 ssId = getSsId(req);
+ Ss_DROP_TAB_REQ& ss = ssSeize<Ss_DROP_TAB_REQ>(ssId);
+ ss.m_req = *req;
+ ndbrequire(signal->getLength() == DropTabReq::SignalLength);
+ sendREQ(signal, ss);
+}
+
+void
+DbaccProxy::sendDROP_TAB_REQ(Signal* signal, Uint32 ssId)
+{
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+
+ DropTabReq* req = (DropTabReq*)signal->getDataPtrSend();
+ *req = ss.m_req;
+ req->senderRef = reference();
+ req->senderData = ssId; // redundant since tableId is used
+ sendSignal(workerRef(ss.m_worker), GSN_DROP_TAB_REQ,
+ signal, DropTabReq::SignalLength, JBB);
+}
+
+void
+DbaccProxy::execDROP_TAB_CONF(Signal* signal)
+{
+ const DropTabConf* conf = (const DropTabConf*)signal->getDataPtr();
+ Uint32 ssId = getSsId(conf);
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DbaccProxy::sendDROP_TAB_CONF(Signal* signal, Uint32 ssId)
+{
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+ BlockReference dictRef = ss.m_req.senderRef;
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ jam();
+ DropTabConf* conf = (DropTabConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = ss.m_req.senderData;
+ conf->tableId = ss.m_req.tableId;
+ sendSignal(dictRef, GSN_DROP_TAB_CONF,
+ signal, DropTabConf::SignalLength, JBB);
+ } else {
+ ndbrequire(false);
+ }
+
+ ssRelease<Ss_DROP_TAB_REQ>(ssId);
+}
+
BLOCK_FUNCTIONS(DbaccProxy)
=== modified file 'storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbacc/DbaccProxy.hpp 2008-08-11 11:53:43 +0000
@@ -17,6 +17,7 @@
#define NDB_DBACC_PROXY_HPP
#include <LocalProxy.hpp>
+#include <signaldata/DropTab.hpp>
class DbaccProxy : public LocalProxy {
public:
@@ -26,6 +27,30 @@ public:
protected:
virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+
+ // GSN_DROP_TAB_REQ
+ struct Ss_DROP_TAB_REQ : SsParallel {
+ DropTabReq m_req;
+ Ss_DROP_TAB_REQ() {
+ m_sendREQ = (SsFUNC)&DbaccProxy::sendDROP_TAB_REQ;
+ m_sendCONF = (SsFUNC)&DbaccProxy::sendDROP_TAB_CONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_DROP_TAB_REQ>& pool(LocalProxy* proxy) {
+ return ((DbaccProxy*)proxy)->c_ss_DROP_TAB_REQ;
+ }
+ };
+ SsPool<Ss_DROP_TAB_REQ> c_ss_DROP_TAB_REQ;
+ Uint32 getSsId(const DropTabReq* req) {
+ return SsIdBase | req->tableId;
+ }
+ Uint32 getSsId(const DropTabConf* conf) {
+ return SsIdBase | conf->tableId;
+ }
+ void execDROP_TAB_REQ(Signal*);
+ void sendDROP_TAB_REQ(Signal*, Uint32 ssId);
+ void execDROP_TAB_CONF(Signal*);
+ void sendDROP_TAB_CONF(Signal*, Uint32 ssId);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp 2008-08-11 11:21:36 +0000
@@ -1779,10 +1779,11 @@ public:
Uint32 dihGetInstanceKey(FragmentstorePtr tFragPtr) {
ndbrequire(!tFragPtr.isNull());
Uint32 log_part_id = tFragPtr.p->m_log_part_id;
- Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_WORKERS;
+ Uint32 instanceKey = 1 + log_part_id % MAX_NDBMT_LQH_WORKERS;
return instanceKey;
}
Uint32 dihGetInstanceKey(Uint32 tabId, Uint32 fragId);
+ Uint32 dihGetLogPartId(Uint32 tabId, Uint32 fragId);
};
#if (DIH_CDATA_SIZE < _SYSFILE_SIZE32)
=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-08-11 11:30:18 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp 2008-08-11 12:48:03 +0000
@@ -8099,6 +8099,7 @@ void Dbdih::execDIGETNODESREQ(Signal* si
conf->zero = 0;
conf->reqinfo = sig2;
conf->fragId = fragId;
+ conf->instanceKey = dihGetInstanceKey(fragPtr);
if (unlikely(newFragId != RNIL))
{
@@ -8352,6 +8353,7 @@ void Dbdih::execDIH_SCAN_GET_NODES_REQ(S
conf->count = count;
conf->tableId = tabPtr.i;
conf->fragId = fragId;
+ conf->instanceKey = dihGetInstanceKey(fragPtr);
sendSignal(senderRef, GSN_DIH_SCAN_GET_NODES_CONF, signal,
DihScanGetNodesConf::SignalLength, JBB);
}//Dbdih::execDIGETPRIMREQ()
@@ -12024,7 +12026,9 @@ Dbdih::sendLCP_FRAG_ORD(Signal* signal,
replicaPtr.i = info.replicaPtr;
ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
- BlockReference ref = calcLqhBlockRef(replicaPtr.p->procNode);
+ // address LQH/instance directly
+ Uint32 instanceKey = dihGetInstanceKey(info.tableId, info.fragId);
+ BlockReference ref = numberToRef(DBLQH, instanceKey, replicaPtr.p->procNode);
if (ERROR_INSERTED(7193) && replicaPtr.p->procNode == getOwnNodeId())
{
@@ -16944,3 +16948,15 @@ Dbdih::dihGetInstanceKey(Uint32 tabId, U
Uint32 instanceKey = dihGetInstanceKey(tFragPtr);
return instanceKey;
}
+
+Uint32
+Dbdih::dihGetLogPartId(Uint32 tabId, Uint32 fragId)
+{
+ TabRecordPtr tTabPtr;
+ tTabPtr.i = tabId;
+ ptrCheckGuard(tTabPtr, ctabFileSize, tabRecord);
+ FragmentstorePtr tFragPtr;
+ getFragstore(tTabPtr.p, fragId, tFragPtr);
+ Uint32 logPartId = tFragPtr.p->m_log_part_id;
+ return logPartId;
+}
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-08-11 11:31:48 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp 2008-08-11 12:48:03 +0000
@@ -539,8 +539,8 @@ void Dblqh::execSTTOR(Signal* signal)
case ZSTART_PHASE1:
jam();
cstartPhase = tstartPhase;
- c_tup = (Dbtup*)globalData.getBlock(DBTUP);
- c_acc = (Dbacc*)globalData.getBlock(DBACC);
+ c_tup = (Dbtup*)globalData.getBlock(DBTUP, instance());
+ c_acc = (Dbacc*)globalData.getBlock(DBACC, instance());
ndbrequire(c_tup != 0 && c_acc != 0);
sendsttorryLab(signal);
@@ -589,7 +589,8 @@ Dblqh::define_backup(Signal* signal)
req->nodes.set(getOwnNodeId());
req->backupDataLen = ~0;
- sendSignal(BACKUP_REF, GSN_DEFINE_BACKUP_REQ, signal,
+ BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+ sendSignal(backupRef, GSN_DEFINE_BACKUP_REQ, signal,
DefineBackupReq::SignalLength, JBB);
}
@@ -708,14 +709,16 @@ void Dblqh::startphase1Lab(Signal* signa
/* ------- INITIATE ALL RECORDS ------- */
cownNodeid = ownNodeId;
- caccBlockref = calcAccBlockRef (cownNodeid);
- ctupBlockref = calcTupBlockRef (cownNodeid);
- ctuxBlockref = calcTuxBlockRef (cownNodeid);
- cownref = calcLqhBlockRef (cownNodeid);
+ caccBlockref = calcInstanceBlockRef(DBACC);
+ ctupBlockref = calcInstanceBlockRef(DBTUP);
+ ctuxBlockref = calcInstanceBlockRef(DBTUX);
+ cownref = calcInstanceBlockRef(DBLQH);
+ ndbassert(cownref == reference());
for (Ti = 0; Ti < chostFileSize; Ti++) {
ThostPtr.i = Ti;
ptrCheckGuard(ThostPtr, chostFileSize, hostRecord);
- ThostPtr.p->hostLqhBlockRef = calcLqhBlockRef(ThostPtr.i);
+ // wl4391_todo using own instance() does not work with mixed versions
+ ThostPtr.p->hostLqhBlockRef = calcInstanceBlockRef(DBLQH, ThostPtr.i);
ThostPtr.p->hostTcBlockRef = calcTcBlockRef(ThostPtr.i);
ThostPtr.p->inPackedList = false;
ThostPtr.p->noOfPackedWordsLqh = 0;
@@ -974,7 +977,8 @@ void Dblqh::startphase6Lab(Signal* signa
void Dblqh::sendNdbSttorryLab(Signal* signal)
{
signal->theData[0] = cownref;
- sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY, signal, 1, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : DBLQH_REF;
+ sendSignal(cntrRef, GSN_NDB_STTORRY, signal, 1, JBB);
return;
}//Dblqh::sendNdbSttorryLab()
@@ -989,7 +993,8 @@ void Dblqh::sendsttorryLab(Signal* signa
signal->theData[3] = ZSTART_PHASE1;
signal->theData[4] = 4;
signal->theData[5] = 255;
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 6, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : DBLQH_REF;
+ sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
return;
}//Dblqh::sendsttorryLab()
@@ -1156,7 +1161,7 @@ Dblqh::sendCreateTabReq(Signal* signal,
req->senderRef = reference();
req->senderData = addfragptr.i;
- Uint32 ref = DBTUP_REF;
+ Uint32 ref = calcInstanceBlockRef(DBTUP);
switch(addfragptr.p->addfragStatus){
case AddFragRecord::WAIT_TUP:
if (DictTabInfo::isOrderedIndex(tabPtr.p->tableType))
@@ -1171,7 +1176,7 @@ Dblqh::sendCreateTabReq(Signal* signal,
jam();
ndbrequire(req->noOfAttributes >= 2);
req->noOfAttributes--;
- ref = DBTUX_REF;
+ ref = calcInstanceBlockRef(DBTUX);
break;
default:
jamLine(addfragptr.p->addfragStatus);
@@ -1437,7 +1442,8 @@ Dblqh::sendAddAttrReq(Signal* signal)
tupreq->attrId = attrId;
tupreq->attrDescriptor = entry.attrDescriptor;
tupreq->extTypeInfo = entry.extTypeInfo;
- sendSignal(DBTUP_REF, GSN_TUP_ADD_ATTRREQ,
+ BlockReference tupRef = calcInstanceBlockRef(DBTUP);
+ sendSignal(tupRef, GSN_TUP_ADD_ATTRREQ,
signal, TupAddAttrReq::SignalLength, JBB);
return;
}
@@ -1467,7 +1473,8 @@ Dblqh::sendAddAttrReq(Signal* signal)
tuxreq->attrDescriptor = entry.attrDescriptor;
tuxreq->extTypeInfo = entry.extTypeInfo;
tuxreq->primaryAttrId = primaryAttrId;
- sendSignal(DBTUX_REF, GSN_TUX_ADD_ATTRREQ,
+ BlockReference tuxRef = calcInstanceBlockRef(DBTUX);
+ sendSignal(tuxRef, GSN_TUX_ADD_ATTRREQ,
signal, TuxAddAttrReq::SignalLength, JBB);
return;
}
@@ -2674,7 +2681,7 @@ Dblqh::execREMOVE_MARKER_ORD(Signal* sig
CommitAckMarkerPtr removedPtr;
m_commitAckMarkerHash.remove(removedPtr, key);
-#if defined VM_TRACE || defined ERROR_INSERT
+#if (defined VM_TRACE || defined ERROR_INSERT) && defined(wl4391_todo)
ndbrequire(removedPtr.i != RNIL);
m_commitAckMarkerPool.release(removedPtr);
#else
@@ -4390,7 +4397,7 @@ Dblqh::exec_acckeyreq(Signal* signal, Tc
TRACE_OP(regTcPtr.p, "ACC");
- EXECUTE_DIRECT(refToBlock(regTcPtr.p->tcAccBlockref), GSN_ACCKEYREQ,
+ EXECUTE_DIRECT(refToMain(regTcPtr.p->tcAccBlockref), GSN_ACCKEYREQ,
signal, 7 + regTcPtr.p->primKeyLen);
if (signal->theData[0] < RNIL) {
signal->theData[0] = regTcPtr.i;
@@ -5067,7 +5074,7 @@ Dblqh::acckeyconf_tupkeyreq(Signal* sign
sig1 = regTcPtr->transid[0];
sig2 = regTcPtr->transid[1];
sig3 = regFragptrP->tupFragptr;
- Uint32 tup = refToBlock(regTcPtr->tcTupBlockref);
+ Uint32 tup = refToMain(regTcPtr->tcTupBlockref);
tupKeyReq->storedProcedure = sig0;
tupKeyReq->transId1 = sig1;
@@ -5799,7 +5806,8 @@ void Dblqh::packLqhkeyreqLab(Signal* sig
lqhKeyReq->variableData[nextPos + 0] = sig0;
nextPos += LqhKeyReq::getGCIFlag(Treqinfo);
- BlockReference lqhRef = calcLqhBlockRef(regTcPtr->nextReplica);
+ // wl4391_todo for mixed versions must recompute full instance key here
+ BlockReference lqhRef = calcInstanceBlockRef(DBLQH, regTcPtr->nextReplica);
if (likely(sendLongReq))
{
@@ -6747,7 +6755,7 @@ void Dblqh::commitReqLab(Signal* signal,
jam();
regTcPtr->transactionState = TcConnectionrec::PREPARED_RECEIVED_COMMIT;
TcConnectionrecPtr saveTcPtr = tcConnectptr;
- Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
+ Uint32 blockNo = refToMain(regTcPtr->tcTupBlockref);
signal->theData[0] = regTcPtr->tupConnectrec;
signal->theData[1] = gci_hi;
signal->theData[2] = gci_lo;
@@ -6928,7 +6936,7 @@ void Dblqh::commitContinueAfterBlockedLa
TupCommitReq * const tupCommitReq =
(TupCommitReq *)signal->getDataPtrSend();
Uint32 sig0 = regTcPtr.p->tupConnectrec;
- Uint32 tup = refToBlock(regTcPtr.p->tcTupBlockref);
+ Uint32 tup = refToMain(regTcPtr.p->tcTupBlockref);
jam();
tupCommitReq->opPtr = sig0;
tupCommitReq->gci_hi = regTcPtr.p->gci_hi;
@@ -6968,7 +6976,7 @@ void Dblqh::commitContinueAfterBlockedLa
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
- Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
+ Uint32 acc = refToMain(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@@ -6976,7 +6984,7 @@ void Dblqh::commitContinueAfterBlockedLa
if(!dirtyOp){
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
- Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
+ Uint32 acc = refToMain(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
}
@@ -9097,7 +9105,7 @@ void Dblqh::continueAfterReceivingAllAiL
AccScanReq::setReadCommittedFlag(req->requestInfo, scanptr.p->readCommitted);
AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
- if (refToBlock(tcConnectptr.p->clientBlockref) == BACKUP)
+ if (refToMain(tcConnectptr.p->clientBlockref) == BACKUP)
{
if (scanptr.p->lcpScan)
{
@@ -9787,7 +9795,7 @@ Dblqh::next_scanconf_tupkeyreq(Signal* s
tupKeyReq->savePointId = regTcPtr->savePointId;
tupKeyReq->disk_page= disk_page;
tupKeyReq->attrInfoIVal= RNIL;
- Uint32 blockNo = refToBlock(regTcPtr->tcTupBlockref);
+ Uint32 blockNo = refToMain(regTcPtr->tcTupBlockref);
EXECUTE_DIRECT(blockNo, GSN_TUPKEYREQ, signal,
TupKeyReq::SignalLength);
}
@@ -12097,13 +12105,17 @@ void Dblqh::execLCP_PREPARE_CONF(Signal*
LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
lcpPtr.p->firstFragmentFlag= false;
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
- EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
+ // wl4391_todo DD
+ if (!isNdbMtLqh())
+ EXECUTE_DIRECT(PGMAN, GSN_LCP_FRAG_ORD, signal, signal->length(), 0);
jamEntry();
/**
* First fragment mean that last LCP is complete :-)
*/
- EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
+ // wl4391_todo DD
+ if (!isNdbMtLqh())
+ EXECUTE_DIRECT(TSMAN, GSN_LCP_FRAG_ORD, signal, signal->length(), 0);
jamEntry();
}
@@ -12130,7 +12142,9 @@ void Dblqh::execLCP_PREPARE_CONF(Signal*
{
LcpFragOrd *ord= (LcpFragOrd*)signal->getDataPtrSend();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
- EXECUTE_DIRECT(LGMAN, GSN_LCP_FRAG_ORD, signal, signal->length());
+ // wl4391_todo DD
+ if (!isNdbMtLqh())
+ EXECUTE_DIRECT(LGMAN, GSN_LCP_FRAG_ORD, signal, signal->length(), 0);
jamEntry();
*ord = lcpPtr.p->currentFragment.lcpFragOrd;
@@ -12167,7 +12181,8 @@ void Dblqh::execLCP_PREPARE_CONF(Signal*
}
else
{
- sendSignal(BACKUP_REF, GSN_BACKUP_FRAGMENT_REQ, signal,
+ BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+ sendSignal(backupRef, GSN_BACKUP_FRAGMENT_REQ, signal,
BackupFragmentReq::SignalLength, JBB);
}
}
@@ -12236,7 +12251,7 @@ Dblqh::sendLCP_FRAG_REP(Signal * signal,
jam();
BlockReference Tblockref = calcDihBlockRef(nodeId);
sendSignal(Tblockref, GSN_LCP_FRAG_REP, signal,
- LcpFragRep::SignalLength, JBB);
+ LcpFragRep::SignalLength, JBB);
}//if
}//for
}
@@ -12328,7 +12343,8 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* si
req->lcpId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId % MAX_LCP_STORED;
req->backupPtr = m_backup_ptr;
req->backupId = lcpPtr.p->currentFragment.lcpFragOrd.lcpId;
- sendSignal(BACKUP_REF, GSN_LCP_PREPARE_REQ, signal,
+ BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+ sendSignal(backupRef, GSN_LCP_PREPARE_REQ, signal,
LcpPrepareReq::SignalLength, JBB);
}//Dblqh::sendLCP_FRAGIDREQ()
@@ -12388,21 +12404,32 @@ void Dblqh::completeLcpRoundLab(Signal*
req->senderRef= reference();
req->backupPtr= m_backup_ptr;
req->backupId= lcpId;
- sendSignal(BACKUP_REF, GSN_END_LCP_REQ, signal,
+
+ BlockReference backupRef = calcInstanceBlockRef(BACKUP);
+ sendSignal(backupRef, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB);
-
+ // wl4391_todo DD
+ if (!isNdbMtLqh())
sendSignal(PGMAN_REF, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB);
+ // wl4391_todo DD
+ if (!isNdbMtLqh())
sendSignal(LGMAN_REF, GSN_END_LCP_REQ, signal,
EndLcpReq::SignalLength, JBB);
-
- EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength);
+
+ // wl4391_todo DD
+ if (!isNdbMtLqh())
+ EXECUTE_DIRECT(TSMAN, GSN_END_LCP_REQ, signal, EndLcpReq::SignalLength, 0);
jamEntry();
-
+
lcpPtr.i = 0;
ptrAss(lcpPtr, lcpRecord);
+ // wl4391_todo DD
+ if (!isNdbMtLqh())
lcpPtr.p->m_outstanding = 3;
+ else
+ lcpPtr.p->m_outstanding = 1;
return;
}//Dblqh::completeLcpRoundLab()
@@ -12441,17 +12468,23 @@ void Dblqh::sendLCP_COMPLETE_REP(Signal*
rep->lcpId = lcpId;
rep->blockNo = DBLQH;
- for (Uint32 i = 0; i < cnoOfNodes; i++) {
- jam();
- Uint32 nodeId = cnodeData[i];
- if(cnodeStatus[i] == ZNODE_UP){
+ if (!isNdbMtLqh()) {
+ for (Uint32 i = 0; i < cnoOfNodes; i++) {
jam();
-
- BlockReference blockref = calcDihBlockRef(nodeId);
- sendSignal(blockref, GSN_LCP_COMPLETE_REP, signal,
- LcpCompleteRep::SignalLength, JBB);
- }//if
- }//for
+ Uint32 nodeId = cnodeData[i];
+ if(cnodeStatus[i] == ZNODE_UP){
+ jam();
+
+ BlockReference blockref = calcDihBlockRef(nodeId);
+ sendSignal(blockref, GSN_LCP_COMPLETE_REP, signal,
+ LcpCompleteRep::SignalLength, JBB);
+ }//if
+ }//for
+ } else {
+ jam();
+ sendSignal(DBLQH_REF, GSN_LCP_COMPLETE_REP, signal,
+ LcpCompleteRep::SignalLength, JBB);
+ }
if(lcpPtr.p->reportEmpty){
jam();
@@ -17758,7 +17791,7 @@ void Dblqh::initialiseRecordsLab(Signal*
signal->theData[2] = 0;
signal->theData[3] = retRef;
signal->theData[4] = retData;
- sendSignal(DBLQH_REF, GSN_CONTINUEB, signal, 5, JBB);
+ sendSignal(reference(), GSN_CONTINUEB, signal, 5, JBB);
return;
}//Dblqh::initialiseRecordsLab()
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp 2008-08-11 11:53:43 +0000
@@ -21,6 +21,45 @@ DblqhProxy::DblqhProxy(Block_context& ct
{
// GSN_SEND_PACKED
addRecSignal(GSN_SEND_PACKED, &DblqhProxy::execSEND_PACKED);
+
+ // GSN_CREATE_TAB_REQ
+ addRecSignal(GSN_CREATE_TAB_REQ, &DblqhProxy::execCREATE_TAB_REQ);
+ addRecSignal(GSN_CREATE_TAB_CONF, &DblqhProxy::execCREATE_TAB_CONF);
+ addRecSignal(GSN_CREATE_TAB_REF, &DblqhProxy::execCREATE_TAB_REF);
+
+ // GSN_LQHADDATTREQ
+ addRecSignal(GSN_LQHADDATTREQ, &DblqhProxy::execLQHADDATTREQ);
+ addRecSignal(GSN_LQHADDATTCONF, &DblqhProxy::execLQHADDATTCONF);
+ addRecSignal(GSN_LQHADDATTREF, &DblqhProxy::execLQHADDATTREF);
+
+ // GSN_LQHFRAGREQ
+ addRecSignal(GSN_LQHFRAGREQ, &DblqhProxy::execLQHFRAGREQ);
+ addRecSignal(GSN_LQHFRAGCONF, &DblqhProxy::execLQHFRAGCONF);
+ addRecSignal(GSN_LQHFRAGREF, &DblqhProxy::execLQHFRAGREF);
+
+ // GSN_TAB_COMMITREQ
+ addRecSignal(GSN_TAB_COMMITREQ, &DblqhProxy::execTAB_COMMITREQ);
+ addRecSignal(GSN_TAB_COMMITCONF, &DblqhProxy::execTAB_COMMITCONF);
+ addRecSignal(GSN_TAB_COMMITREF, &DblqhProxy::execTAB_COMMITREF);
+
+ // GSN_PREP_DROP_TAB_REQ
+ addRecSignal(GSN_PREP_DROP_TAB_REQ, &DblqhProxy::execPREP_DROP_TAB_REQ);
+ addRecSignal(GSN_PREP_DROP_TAB_CONF, &DblqhProxy::execPREP_DROP_TAB_CONF);
+ addRecSignal(GSN_PREP_DROP_TAB_REF, &DblqhProxy::execPREP_DROP_TAB_REF);
+
+ // GSN_DROP_TAB_REQ
+ addRecSignal(GSN_DROP_TAB_REQ, &DblqhProxy::execDROP_TAB_REQ);
+ addRecSignal(GSN_DROP_TAB_CONF, &DblqhProxy::execDROP_TAB_CONF);
+ addRecSignal(GSN_DROP_TAB_REF, &DblqhProxy::execDROP_TAB_REF);
+
+ // GSN_LCP_FRAG_ORD
+ addRecSignal(GSN_LCP_FRAG_ORD, &DblqhProxy::execLCP_FRAG_ORD);
+ addRecSignal(GSN_LCP_COMPLETE_REP, &DblqhProxy::execLCP_COMPLETE_REP);
+
+ // GSN_GCP_SAVEREQ
+ addRecSignal(GSN_GCP_SAVEREQ, &DblqhProxy::execGCP_SAVEREQ);
+ addRecSignal(GSN_GCP_SAVECONF, &DblqhProxy::execGCP_SAVECONF);
+ addRecSignal(GSN_GCP_SAVEREF, &DblqhProxy::execGCP_SAVEREF);
}
DblqhProxy::~DblqhProxy()
@@ -46,4 +85,653 @@ DblqhProxy::execSEND_PACKED(Signal* sign
}
}
+// GSN_NDB_STTOR
+
+void
+DblqhProxy::callNDB_STTOR(Signal* signal)
+{
+ Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
+ ndbrequire(ss.m_gsn == 0);
+
+ const Uint32 startPhase = signal->theData[2];
+ switch (startPhase) {
+ case 3:
+ ss.m_gsn = GSN_NDB_STTOR;
+ sendREAD_NODESREQ(signal);
+ break;
+ default:
+ backNDB_STTOR(signal);
+ break;
+ }
+}
+
+// GSN_CREATE_TAB_REQ
+
+// there is no consistent LQH connect pointer to use as ssId
+
+void
+DblqhProxy::execCREATE_TAB_REQ(Signal* signal)
+{
+ Ss_CREATE_TAB_REQ& ss = ssSeize<Ss_CREATE_TAB_REQ>(1);
+
+ const CreateTabReq* req = (const CreateTabReq*)signal->getDataPtr();
+ ss.m_req = *req;
+ ndbrequire(signal->getLength() == CreateTabReq::SignalLengthLDM);
+
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendCREATE_TAB_REQ(Signal* signal, Uint32 ssId)
+{
+ Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+
+ CreateTabReq* req = (CreateTabReq*)signal->getDataPtrSend();
+ *req = ss.m_req;
+ req->senderRef = reference();
+ req->senderData = ssId;
+ sendSignal(workerRef(ss.m_worker), GSN_CREATE_TAB_REQ,
+ signal, CreateTabReq::SignalLengthLDM, JBB);
+}
+
+void
+DblqhProxy::execCREATE_TAB_CONF(Signal* signal)
+{
+ const CreateTabConf* conf = (const CreateTabConf*)signal->getDataPtr();
+ Uint32 ssId = conf->senderData;
+ Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::execCREATE_TAB_REF(Signal* signal)
+{
+ const CreateTabRef* ref = (const CreateTabRef*)signal->getDataPtr();
+ Uint32 ssId = ref->senderData;
+ Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+ recvREF(signal, ss, ref->errorCode);
+}
+
+void
+DblqhProxy::sendCREATE_TAB_CONF(Signal* signal, Uint32 ssId)
+{
+ Ss_CREATE_TAB_REQ& ss = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+ BlockReference dictRef = ss.m_req.senderRef;
+
+ {
+ const CreateTabConf* conf = (const CreateTabConf*)signal->getDataPtr();
+ ss.m_lqhConnectPtr[ss.m_worker] = conf->lqhConnectPtr;
+ }
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ jam();
+ CreateTabConf* conf = (CreateTabConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = ss.m_req.senderData;
+ conf->lqhConnectPtr = ssId;
+ sendSignal(dictRef, GSN_CREATE_TAB_CONF,
+ signal, CreateTabConf::SignalLength, JBB);
+ } else {
+ CreateTabRef* ref = (CreateTabRef*)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->senderData = ss.m_req.senderData;
+ ref->errorCode = ss.m_error;
+ ref->errorLine = 0;
+ ref->errorKey = 0;
+ ref->errorStatus = 0;
+ sendSignal(dictRef, GSN_CREATE_TAB_REF,
+ signal, CreateTabRef::SignalLength, JBB);
+ ssRelease<Ss_CREATE_TAB_REQ>(ssId);
+ }
+}
+
+// GSN_LQHADDATTREQ [ sub-op ]
+
+void
+DblqhProxy::execLQHADDATTREQ(Signal* signal)
+{
+ const LqhAddAttrReq* req = (const LqhAddAttrReq*)signal->getDataPtr();
+ Uint32 ssId = req->lqhFragPtr;
+ Ss_LQHADDATTREQ& ss = ssSeize<Ss_LQHADDATTREQ>(ssId);
+
+ const Uint32 reqlength =
+ LqhAddAttrReq::HeaderLength +
+ req->noOfAttributes * LqhAddAttrReq::EntryLength;
+ ndbrequire(signal->getLength() == reqlength);
+ memcpy(&ss.m_req, req, reqlength << 2);
+ ss.m_reqlength = reqlength;
+
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendLQHADDATTREQ(Signal* signal, Uint32 ssId)
+{
+ Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
+ Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+
+ LqhAddAttrReq* req = (LqhAddAttrReq*)signal->getDataPtrSend();
+ const Uint32 reqlength = ss.m_reqlength;
+ memcpy(req, &ss.m_req, reqlength << 2);
+ req->lqhFragPtr = ss_main.m_lqhConnectPtr[ss.m_worker];
+ req->noOfAttributes = ss.m_req.noOfAttributes;
+ req->senderData = ssId;
+ req->senderAttrPtr = ss.m_req.senderAttrPtr;
+ sendSignal(workerRef(ss.m_worker), GSN_LQHADDATTREQ,
+ signal, reqlength, JBB);
+}
+
+void
+DblqhProxy::execLQHADDATTCONF(Signal* signal)
+{
+ const LqhAddAttrConf* conf = (const LqhAddAttrConf*)signal->getDataPtr();
+ Uint32 ssId = conf->senderData;
+ Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::execLQHADDATTREF(Signal* signal)
+{
+ const LqhAddAttrRef* ref = (const LqhAddAttrRef*)signal->getDataPtr();
+ Uint32 ssId = ref->senderData;
+ Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
+ recvREF(signal, ss, ref->errorCode);
+}
+
+void
+DblqhProxy::sendLQHADDATTCONF(Signal* signal, Uint32 ssId)
+{
+ Ss_LQHADDATTREQ& ss = ssFind<Ss_LQHADDATTREQ>(ssId);
+ Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+ BlockReference dictRef = ss_main.m_req.senderRef;
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ LqhAddAttrConf* conf = (LqhAddAttrConf*)signal->getDataPtrSend();
+ conf->senderData = ss.m_req.senderData;
+ conf->senderAttrPtr = ss.m_req.senderAttrPtr;
+ sendSignal(dictRef, GSN_LQHADDATTCONF,
+ signal, LqhAddAttrConf::SignalLength, JBB);
+ } else {
+ jam();
+ LqhAddAttrRef* ref = (LqhAddAttrRef*)signal->getDataPtrSend();
+ ref->senderData = ss.m_req.senderData;
+ ref->errorCode = ss.m_error;
+ sendSignal(dictRef, GSN_LQHADDATTREF,
+ signal, LqhAddAttrRef::SignalLength, JBB);
+ ssRelease<Ss_CREATE_TAB_REQ>(ssId);
+ }
+
+ ssRelease<Ss_LQHADDATTREQ>(ssId);
+}
+
+// GSN_LQHFRAGREQ [ sub-op ]
+
+void
+DblqhProxy::execLQHFRAGREQ(Signal* signal)
+{
+ Ss_LQHFRAGREQ& ss = ssSeize<Ss_LQHFRAGREQ>(1); // lost connection
+
+ const LqhFragReq* req = (const LqhFragReq*)signal->getDataPtr();
+ ss.m_req = *req;
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendLQHFRAGREQ(Signal* signal, Uint32 ssId)
+{
+ Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
+
+ LqhFragReq* req = (LqhFragReq*)signal->getDataPtrSend();
+ *req = ss.m_req;
+
+ if (!isLogPartOwner(ss.m_worker, req->logPartId)) {
+ jam();
+ skipReq(ss);
+ return;
+ }
+
+ req->senderRef = reference();
+ req->senderData = ssId;
+ sendSignal(workerRef(ss.m_worker), GSN_LQHFRAGREQ,
+ signal, LqhFragReq::SignalLength, JBB);
+}
+
+void
+DblqhProxy::execLQHFRAGCONF(Signal* signal)
+{
+ const LqhFragConf* conf = (const LqhFragConf*)signal->getDataPtr();
+ Uint32 ssId = conf->senderData;
+ Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::execLQHFRAGREF(Signal* signal)
+{
+ const LqhFragRef* ref = (const LqhFragRef*)signal->getDataPtr();
+ Uint32 ssId = ref->senderData;
+ Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
+ recvREF(signal, ss, ref->errorCode);
+}
+
+void
+DblqhProxy::sendLQHFRAGCONF(Signal* signal, Uint32 ssId)
+{
+ Ss_LQHFRAGREQ& ss = ssFind<Ss_LQHFRAGREQ>(ssId);
+ Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+ BlockReference dictRef = ss_main.m_req.senderRef;
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ LqhFragConf* conf = (LqhFragConf*)signal->getDataPtrSend();
+ conf->senderData = ss.m_req.senderData;
+ conf->lqhFragPtr = RNIL; //wl4391_todo
+ conf->tableId = ss.m_req.tableId;
+ conf->fragId = ss.m_req.fragId;
+ conf->changeMask = 0;
+ sendSignal(dictRef, GSN_LQHFRAGCONF,
+ signal, LqhFragConf::SignalLength, JBB);
+ } else {
+ jam();
+ LqhFragRef* ref = (LqhFragRef*)signal->getDataPtrSend();
+ ref->senderData = ss.m_req.senderData;
+ ref->errorCode = ss.m_error;
+ ref->tableId = ss.m_req.tableId;
+ ref->fragId = ss.m_req.fragId;
+ ref->requestInfo = 0;
+ ref->changeMask = 0;
+ sendSignal(dictRef, GSN_LQHFRAGREF,
+ signal, LqhFragRef::SignalLength, JBB);
+ ssRelease<Ss_CREATE_TAB_REQ>(ssId);
+ }
+
+ ssRelease<Ss_LQHFRAGREQ>(ssId);
+}
+
+// GSN_TAB_COMMITREQ
+
+void
+DblqhProxy::execTAB_COMMITREQ(Signal* signal)
+{
+ Ss_TAB_COMMITREQ& ss = ssSeize<Ss_TAB_COMMITREQ>(1); // lost connection
+
+ const TabCommitReq* req = (const TabCommitReq*)signal->getDataPtr();
+ ss.m_req = *req;
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendTAB_COMMITREQ(Signal* signal, Uint32 ssId)
+{
+ Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
+
+ TabCommitReq* req = (TabCommitReq*)signal->getDataPtrSend();
+ req->senderRef = reference();
+ req->senderData = ssId;
+ req->tableId = ss.m_req.tableId;
+ sendSignal(workerRef(ss.m_worker), GSN_TAB_COMMITREQ,
+ signal, TabCommitReq::SignalLength, JBB);
+}
+
+void
+DblqhProxy::execTAB_COMMITCONF(Signal* signal)
+{
+ const TabCommitConf* conf = (TabCommitConf*)signal->getDataPtr();
+ Uint32 ssId = conf->senderData;
+ Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::execTAB_COMMITREF(Signal* signal)
+{
+ const TabCommitRef* ref = (TabCommitRef*)signal->getDataPtr();
+ Uint32 ssId = ref->senderData;
+ Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
+
+ // wl4391_todo omit extra info now since DBDICT only does ndbrequire
+ recvREF(signal, ss, ref->errorCode);
+}
+
+void
+DblqhProxy::sendTAB_COMMITCONF(Signal* signal, Uint32 ssId)
+{
+ Ss_TAB_COMMITREQ& ss = ssFind<Ss_TAB_COMMITREQ>(ssId);
+ Ss_CREATE_TAB_REQ& ss_main = ssFind<Ss_CREATE_TAB_REQ>(ssId);
+ BlockReference dictRef = ss_main.m_req.senderRef;
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ jam();
+ TabCommitConf* conf = (TabCommitConf*)signal->getDataPtrSend();
+ conf->senderData = ss.m_req.senderData;
+ conf->nodeId = getOwnNodeId();
+ conf->tableId = ss.m_req.tableId;
+ sendSignal(dictRef, GSN_TAB_COMMITCONF,
+ signal, TabCommitConf::SignalLength, JBB);
+ } else {
+ jam();
+ TabCommitRef* ref = (TabCommitRef*)signal->getDataPtrSend();
+ ref->senderData = ss.m_req.senderData;
+ ref->nodeId = getOwnNodeId();
+ ref->tableId = ss.m_req.tableId;
+ sendSignal(dictRef, GSN_TAB_COMMITREF,
+ signal, TabCommitRef::SignalLength, JBB);
+ return;
+ }
+
+ ssRelease<Ss_CREATE_TAB_REQ>(ssId);
+ ssRelease<Ss_TAB_COMMITREQ>(ssId);
+}
+
+// GSN_LCP_FRAG_ORD
+
+void
+DblqhProxy::execLCP_FRAG_ORD(Signal* signal)
+{
+ const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
+ ndbrequire(req->lastFragmentFlag);
+ execLCP_COMPLETE_ORD(signal);
+}
+
+// GSN_LCP_COMPLETE_ORD [ fictional gsn ]
+
+void
+DblqhProxy::execLCP_COMPLETE_ORD(Signal* signal)
+{
+ const LcpFragOrd* req = (const LcpFragOrd*)signal->getDataPtr();
+ Uint32 ssId = getSsId(req);
+ Ss_LCP_COMPLETE_ORD& ss = ssSeize<Ss_LCP_COMPLETE_ORD>(ssId);
+ ss.m_req = *req;
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendLCP_COMPLETE_ORD(Signal* signal, Uint32 ssId)
+{
+ Ss_LCP_COMPLETE_ORD& ss = ssFind<Ss_LCP_COMPLETE_ORD>(ssId);
+
+ LcpFragOrd* req = (LcpFragOrd*)signal->getDataPtrSend();
+ *req = ss.m_req;
+ sendSignal(workerRef(ss.m_worker), GSN_LCP_FRAG_ORD,
+ signal, LcpFragOrd::SignalLength, JBB);
+}
+
+void
+DblqhProxy::execLCP_COMPLETE_REP(Signal* signal)
+{
+ const LcpCompleteRep* conf = (const LcpCompleteRep*)signal->getDataPtr();
+ Uint32 ssId = getSsId(conf);
+ Ss_LCP_COMPLETE_ORD& ss = ssFind<Ss_LCP_COMPLETE_ORD>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::sendLCP_COMPLETE_REP(Signal* signal, Uint32 ssId)
+{
+ Ss_LCP_COMPLETE_ORD& ss = ssFind<Ss_LCP_COMPLETE_ORD>(ssId);
+
+ if (!lastReply(ss))
+ return;
+
+ NodePtr nodePtr;
+ c_nodeList.first(nodePtr);
+ ndbrequire(nodePtr.i != RNIL);
+ while (nodePtr.i != RNIL) {
+ if (nodePtr.p->m_alive) {
+ Uint32 nodeId = nodePtr.p->m_nodeId;
+ BlockReference dihRef = calcDihBlockRef(nodeId);
+
+ LcpCompleteRep* conf = (LcpCompleteRep*)signal->getDataPtrSend();
+ conf->nodeId = getOwnNodeId();
+ conf->blockNo = DBLQH;
+ conf->lcpId = ss.m_req.lcpId;
+ sendSignal(dihRef, GSN_LCP_COMPLETE_REP,
+ signal, LcpCompleteRep::SignalLength, JBB);
+ }
+ c_nodeList.next(nodePtr);
+ }
+
+ ssRelease<Ss_LCP_COMPLETE_ORD>(ssId);
+}
+
+// GSN_GCP_SAVEREQ
+
+void
+DblqhProxy::execGCP_SAVEREQ(Signal* signal)
+{
+ const GCPSaveReq* req = (const GCPSaveReq*)signal->getDataPtr();
+ Uint32 ssId = getSsId(req);
+ Ss_GCP_SAVEREQ& ss = ssSeize<Ss_GCP_SAVEREQ>(ssId);
+ ss.m_req = *req;
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendGCP_SAVEREQ(Signal* signal, Uint32 ssId)
+{
+ Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
+
+ GCPSaveReq* req = (GCPSaveReq*)signal->getDataPtrSend();
+ *req = ss.m_req;
+
+ req->dihBlockRef = reference();
+ req->dihPtr = ss.m_worker;
+ sendSignal(workerRef(ss.m_worker), GSN_GCP_SAVEREQ,
+ signal, GCPSaveReq::SignalLength, JBB);
+}
+
+void
+DblqhProxy::execGCP_SAVECONF(Signal* signal)
+{
+ const GCPSaveConf* conf = (const GCPSaveConf*)signal->getDataPtr();
+ Uint32 ssId = getSsId(conf);
+ Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::execGCP_SAVEREF(Signal* signal)
+{
+ const GCPSaveRef* ref = (const GCPSaveRef*)signal->getDataPtr();
+ Uint32 ssId = getSsId(ref);
+ Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
+
+ if (ss.m_error != 0) {
+ // wl4391_todo check
+ ndbrequire(ss.m_error == ref->errorCode);
+ }
+ recvREF(signal, ss, ref->errorCode);
+}
+
+void
+DblqhProxy::sendGCP_SAVECONF(Signal* signal, Uint32 ssId)
+{
+ Ss_GCP_SAVEREQ& ss = ssFind<Ss_GCP_SAVEREQ>(ssId);
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ GCPSaveConf* conf = (GCPSaveConf*)signal->getDataPtrSend();
+ conf->dihPtr = ss.m_req.dihPtr;
+ conf->nodeId = getOwnNodeId();
+ conf->gci = ss.m_req.gci;
+ sendSignal(ss.m_req.dihBlockRef, GSN_GCP_SAVECONF,
+ signal, GCPSaveConf::SignalLength, JBB);
+ } else {
+ jam();
+ GCPSaveRef* ref = (GCPSaveRef*)signal->getDataPtrSend();
+ ref->dihPtr = ss.m_req.dihPtr;
+ ref->nodeId = getOwnNodeId();
+ ref->gci = ss.m_req.gci;
+ ref->errorCode = ss.m_error;
+ sendSignal(ss.m_req.dihBlockRef, GSN_GCP_SAVEREF,
+ signal, GCPSaveRef::SignalLength, JBB);
+ }
+
+ ssRelease<Ss_GCP_SAVEREQ>(ssId);
+}
+
+// GSN_PREP_DROP_TAB_REQ
+
+void
+DblqhProxy::execPREP_DROP_TAB_REQ(Signal* signal)
+{
+ const PrepDropTabReq* req = (const PrepDropTabReq*)signal->getDataPtr();
+ Uint32 ssId = getSsId(req);
+ Ss_PREP_DROP_TAB_REQ& ss = ssSeize<Ss_PREP_DROP_TAB_REQ>(ssId);
+ ss.m_req = *req;
+ ndbrequire(signal->getLength() == PrepDropTabReq::SignalLength);
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendPREP_DROP_TAB_REQ(Signal* signal, Uint32 ssId)
+{
+ Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
+
+ PrepDropTabReq* req = (PrepDropTabReq*)signal->getDataPtrSend();
+ *req = ss.m_req;
+ req->senderRef = reference();
+ req->senderData = ssId; // redundant since tableId is used
+ sendSignal(workerRef(ss.m_worker), GSN_PREP_DROP_TAB_REQ,
+ signal, PrepDropTabReq::SignalLength, JBB);
+}
+
+void
+DblqhProxy::execPREP_DROP_TAB_CONF(Signal* signal)
+{
+ const PrepDropTabConf* conf = (const PrepDropTabConf*)signal->getDataPtr();
+ Uint32 ssId = getSsId(conf);
+ Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::execPREP_DROP_TAB_REF(Signal* signal)
+{
+ const PrepDropTabRef* ref = (const PrepDropTabRef*)signal->getDataPtr();
+ Uint32 ssId = getSsId(ref);
+ Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
+ recvREF(signal, ss, ref->errorCode);
+}
+
+void
+DblqhProxy::sendPREP_DROP_TAB_CONF(Signal* signal, Uint32 ssId)
+{
+ Ss_PREP_DROP_TAB_REQ& ss = ssFind<Ss_PREP_DROP_TAB_REQ>(ssId);
+ BlockReference dictRef = ss.m_req.senderRef;
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ jam();
+ PrepDropTabConf* conf = (PrepDropTabConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = ss.m_req.senderData;
+ conf->tableId = ss.m_req.tableId;
+ sendSignal(dictRef, GSN_PREP_DROP_TAB_CONF,
+ signal, PrepDropTabConf::SignalLength, JBB);
+ } else {
+ jam();
+ PrepDropTabRef* ref = (PrepDropTabRef*)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->senderData = ss.m_req.senderData;
+ ref->tableId = ss.m_req.tableId;
+ ref->errorCode = ss.m_error;
+ sendSignal(dictRef, GSN_PREP_DROP_TAB_REF,
+ signal, PrepDropTabRef::SignalLength, JBB);
+ }
+
+ ssRelease<Ss_PREP_DROP_TAB_REQ>(ssId);
+}
+
+// GSN_DROP_TAB_REQ
+
+void
+DblqhProxy::execDROP_TAB_REQ(Signal* signal)
+{
+ const DropTabReq* req = (const DropTabReq*)signal->getDataPtr();
+ Uint32 ssId = getSsId(req);
+ Ss_DROP_TAB_REQ& ss = ssSeize<Ss_DROP_TAB_REQ>(ssId);
+ ss.m_req = *req;
+ ndbrequire(signal->getLength() == DropTabReq::SignalLength);
+ sendREQ(signal, ss);
+}
+
+void
+DblqhProxy::sendDROP_TAB_REQ(Signal* signal, Uint32 ssId)
+{
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+
+ DropTabReq* req = (DropTabReq*)signal->getDataPtrSend();
+ *req = ss.m_req;
+ req->senderRef = reference();
+ req->senderData = ssId; // redundant since tableId is used
+ sendSignal(workerRef(ss.m_worker), GSN_DROP_TAB_REQ,
+ signal, DropTabReq::SignalLength, JBB);
+}
+
+void
+DblqhProxy::execDROP_TAB_CONF(Signal* signal)
+{
+ const DropTabConf* conf = (const DropTabConf*)signal->getDataPtr();
+ Uint32 ssId = getSsId(conf);
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::execDROP_TAB_REF(Signal* signal)
+{
+ const DropTabRef* ref = (const DropTabRef*)signal->getDataPtr();
+ Uint32 ssId = getSsId(ref);
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+ recvREF(signal, ss, ref->errorCode);
+}
+
+void
+DblqhProxy::sendDROP_TAB_CONF(Signal* signal, Uint32 ssId)
+{
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+ BlockReference dictRef = ss.m_req.senderRef;
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ jam();
+ DropTabConf* conf = (DropTabConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = ss.m_req.senderData;
+ conf->tableId = ss.m_req.tableId;
+ sendSignal(dictRef, GSN_DROP_TAB_CONF,
+ signal, DropTabConf::SignalLength, JBB);
+ } else {
+ jam();
+ DropTabRef* ref = (DropTabRef*)signal->getDataPtrSend();
+ ref->senderRef = reference();
+ ref->senderData = ss.m_req.senderData;
+ ref->tableId = ss.m_req.tableId;
+ ref->errorCode = ss.m_error;
+ sendSignal(dictRef, GSN_DROP_TAB_CONF,
+ signal, DropTabConf::SignalLength, JBB);
+ }
+
+ ssRelease<Ss_DROP_TAB_REQ>(ssId);
+}
+
BLOCK_FUNCTIONS(DblqhProxy)
=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp 2008-08-11 11:53:43 +0000
@@ -17,6 +17,13 @@
#define NDB_DBLQH_PROXY_HPP
#include <LocalProxy.hpp>
+#include <signaldata/CreateTab.hpp>
+#include <signaldata/LqhFrag.hpp>
+#include <signaldata/TabCommit.hpp>
+#include <signaldata/PrepDropTab.hpp>
+#include <signaldata/DropTab.hpp>
+#include <signaldata/LCP.hpp>
+#include <signaldata/GCP.hpp>
class DblqhProxy : public LocalProxy {
public:
@@ -24,11 +31,203 @@ public:
virtual ~DblqhProxy();
BLOCK_DEFINES(DblqhProxy);
+protected:
+ virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+
// GSN_SEND_PACKED
void execSEND_PACKED(Signal*);
-protected:
- virtual SimulatedBlock* newWorker(Uint32 instanceNo);
+ // GSN_NDB_STTOR
+ virtual void callNDB_STTOR(Signal*);
+
+ // GSN_CREATE_TAB_REQ
+ struct Ss_CREATE_TAB_REQ : SsParallel {
+ CreateTabReq m_req;
+ Uint32 m_lqhConnectPtr[MaxWorkers];
+ Ss_CREATE_TAB_REQ() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendCREATE_TAB_REQ;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendCREATE_TAB_CONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_CREATE_TAB_REQ>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_CREATE_TAB_REQ;
+ }
+ };
+ SsPool<Ss_CREATE_TAB_REQ> c_ss_CREATE_TAB_REQ;
+ void execCREATE_TAB_REQ(Signal*);
+ void sendCREATE_TAB_REQ(Signal*, Uint32 ssId);
+ void execCREATE_TAB_CONF(Signal*);
+ void execCREATE_TAB_REF(Signal*);
+ void sendCREATE_TAB_CONF(Signal*, Uint32 ssId);
+
+ // GSN_LQHADDATTREQ [ sub-op ]
+ struct Ss_LQHADDATTREQ : SsParallel {
+ LqhAddAttrReq m_req;
+ Uint32 m_reqlength;
+ Ss_LQHADDATTREQ() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendLQHADDATTREQ;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendLQHADDATTCONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_LQHADDATTREQ>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_LQHADDATTREQ;
+ }
+ };
+ SsPool<Ss_LQHADDATTREQ> c_ss_LQHADDATTREQ;
+ void execLQHADDATTREQ(Signal*);
+ void sendLQHADDATTREQ(Signal*, Uint32 ssId);
+ void execLQHADDATTCONF(Signal*);
+ void execLQHADDATTREF(Signal*);
+ void sendLQHADDATTCONF(Signal*, Uint32 ssId);
+
+ // GSN_LQHFRAGREQ [ sub-op ]
+ struct Ss_LQHFRAGREQ : SsParallel {
+ LqhFragReq m_req;
+ Ss_LQHFRAGREQ() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendLQHFRAGREQ;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendLQHFRAGCONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_LQHFRAGREQ>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_LQHFRAGREQ;
+ }
+ };
+ SsPool<Ss_LQHFRAGREQ> c_ss_LQHFRAGREQ;
+ void execLQHFRAGREQ(Signal*);
+ void sendLQHFRAGREQ(Signal*, Uint32 ssId);
+ void execLQHFRAGCONF(Signal*);
+ void execLQHFRAGREF(Signal*);
+ void sendLQHFRAGCONF(Signal*, Uint32 ssId);
+
+ // GSN_TAB_COMMITREQ [ sub-op ]
+ struct Ss_TAB_COMMITREQ : SsParallel {
+ TabCommitReq m_req;
+ Ss_TAB_COMMITREQ() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendTAB_COMMITREQ;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendTAB_COMMITCONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_TAB_COMMITREQ>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_TAB_COMMITREQ;
+ }
+ };
+ SsPool<Ss_TAB_COMMITREQ> c_ss_TAB_COMMITREQ;
+ void execTAB_COMMITREQ(Signal*);
+ void sendTAB_COMMITREQ(Signal*, Uint32 ssId);
+ void execTAB_COMMITCONF(Signal*);
+ void execTAB_COMMITREF(Signal*);
+ void sendTAB_COMMITCONF(Signal*, Uint32 ssId);
+
+ // GSN_LCP_FRAG_ORD
+ void execLCP_FRAG_ORD(Signal*);
+
+ // GSN_LCP_COMPLETE_ORD [ fictional gsn ]
+ struct Ss_LCP_COMPLETE_ORD : SsParallel {
+ LcpFragOrd m_req;
+ Ss_LCP_COMPLETE_ORD(){
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendLCP_COMPLETE_ORD;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendLCP_COMPLETE_REP;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_LCP_COMPLETE_ORD>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_LCP_COMPLETE_ORD;
+ }
+ };
+ SsPool<Ss_LCP_COMPLETE_ORD> c_ss_LCP_COMPLETE_ORD;
+ static Uint32 getSsId(const LcpFragOrd* req) {
+ return SsIdBase | (req->lcpId & 0xFFFF);
+ }
+ static Uint32 getSsId(const LcpCompleteRep* conf) {
+ return SsIdBase | (conf->lcpId & 0xFFFF);
+ }
+ void execLCP_COMPLETE_ORD(Signal*);
+ void sendLCP_COMPLETE_ORD(Signal*, Uint32 ssId);
+ void execLCP_COMPLETE_REP(Signal*);
+ void sendLCP_COMPLETE_REP(Signal*, Uint32 ssId);
+
+ // GSN_GCP_SAVEREQ
+ struct Ss_GCP_SAVEREQ : SsParallel {
+ GCPSaveReq m_req;
+ Ss_GCP_SAVEREQ() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendGCP_SAVEREQ;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendGCP_SAVECONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_GCP_SAVEREQ>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_GCP_SAVEREQ;
+ }
+ };
+ SsPool<Ss_GCP_SAVEREQ> c_ss_GCP_SAVEREQ;
+ Uint32 getSsId(const GCPSaveReq* req) {
+ return SsIdBase | (req->gci & 0xFFFF);
+ }
+ Uint32 getSsId(const GCPSaveConf* conf) {
+ return SsIdBase | (conf->gci & 0xFFFF);
+ }
+ Uint32 getSsId(const GCPSaveRef* ref) {
+ return SsIdBase | (ref->gci & 0xFFFF);
+ }
+ void execGCP_SAVEREQ(Signal*);
+ void sendGCP_SAVEREQ(Signal*, Uint32 ssId);
+ void execGCP_SAVECONF(Signal*);
+ void execGCP_SAVEREF(Signal*);
+ void sendGCP_SAVECONF(Signal*, Uint32 ssId);
+
+ // GSN_PREP_DROP_TAB_REQ
+ struct Ss_PREP_DROP_TAB_REQ : SsParallel {
+ PrepDropTabReq m_req;
+ Ss_PREP_DROP_TAB_REQ() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendPREP_DROP_TAB_REQ;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendPREP_DROP_TAB_CONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_PREP_DROP_TAB_REQ>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_PREP_DROP_TAB_REQ;
+ }
+ };
+ SsPool<Ss_PREP_DROP_TAB_REQ> c_ss_PREP_DROP_TAB_REQ;
+ Uint32 getSsId(const PrepDropTabReq* req) {
+ return SsIdBase | req->tableId;
+ }
+ Uint32 getSsId(const PrepDropTabConf* conf) {
+ return SsIdBase | conf->tableId;
+ }
+ Uint32 getSsId(const PrepDropTabRef* ref) {
+ return SsIdBase | ref->tableId;
+ }
+ void execPREP_DROP_TAB_REQ(Signal*);
+ void sendPREP_DROP_TAB_REQ(Signal*, Uint32 ssId);
+ void execPREP_DROP_TAB_CONF(Signal*);
+ void execPREP_DROP_TAB_REF(Signal*);
+ void sendPREP_DROP_TAB_CONF(Signal*, Uint32 ssId);
+
+ // GSN_DROP_TAB_REQ
+ struct Ss_DROP_TAB_REQ : SsParallel {
+ DropTabReq m_req;
+ Ss_DROP_TAB_REQ() {
+ m_sendREQ = (SsFUNC)&DblqhProxy::sendDROP_TAB_REQ;
+ m_sendCONF = (SsFUNC)&DblqhProxy::sendDROP_TAB_CONF;
+ };
+ enum { poolSize = 1 };
+ static SsPool<Ss_DROP_TAB_REQ>& pool(LocalProxy* proxy) {
+ return ((DblqhProxy*)proxy)->c_ss_DROP_TAB_REQ;
+ }
+ };
+ SsPool<Ss_DROP_TAB_REQ> c_ss_DROP_TAB_REQ;
+ Uint32 getSsId(const DropTabReq* req) {
+ return SsIdBase | req->tableId;
+ }
+ Uint32 getSsId(const DropTabConf* conf) {
+ return SsIdBase | conf->tableId;
+ }
+ Uint32 getSsId(const DropTabRef* ref) {
+ return SsIdBase | ref->tableId;
+ }
+ void execDROP_TAB_REQ(Signal*);
+ void sendDROP_TAB_REQ(Signal*, Uint32 ssId);
+ void execDROP_TAB_CONF(Signal*);
+ void execDROP_TAB_REF(Signal*);
+ void sendDROP_TAB_CONF(Signal*, Uint32 ssId);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2008-08-11 11:30:18 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2008-08-11 12:48:03 +0000
@@ -851,6 +851,8 @@ public:
UintR savePointId;
Uint16 tcNodedata[4];
+ /* Instance key to send to LQH. Receiver maps it to actual instance. */
+ Uint16 lqhInstanceKey;
// Trigger data
FiredTriggerPtr accumulatingTriggerData;
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-08-11 11:30:18 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2008-08-11 12:48:03 +0000
@@ -3270,6 +3270,9 @@ void Dbtc::tckeyreq050Lab(Signal* signal
regTcPtr->tcNodedata[2] = Tdata5;
regTcPtr->tcNodedata[3] = Tdata6;
+ UintR Tdata7 = conf->instanceKey;
+ regTcPtr->lqhInstanceKey = Tdata7;
+
Uint8 Toperation = regTcPtr->operation;
Uint8 Tdirty = regTcPtr->dirtyOp;
tnoOfBackup = tnodeinfo & 3;
@@ -3392,7 +3395,9 @@ void Dbtc::attrinfoDihReceivedLab(Signal
{
jam();
arrGuard(Tnode, MAX_NDB_NODES);
- packLqhkeyreq(signal, calcLqhBlockRef(Tnode));
+ Uint32 instanceKey = regTcPtr->lqhInstanceKey;
+ BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ packLqhkeyreq(signal, lqhRef);
}
else
{
@@ -4989,6 +4994,29 @@ void Dbtc::sendCommitLqh(Signal* signal,
ApiConnectRecord * const regApiPtr = apiConnectptr.p;
Thostptr.i = regTcPtr->lastLqhNodeId;
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
+
+ UintR Tdata1 = regTcPtr->lastLqhCon;
+ UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32);
+ UintR Tdata3 = regApiPtr->transid[0];
+ UintR Tdata4 = regApiPtr->transid[1];
+ UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid);
+
+ // wl4391_todo testing own config is wrong for mixed versions
+ bool send_unpacked = isNdbMtLqh();
+ if (send_unpacked) {
+ Uint32* data = signal->getDataPtrSend();
+ data[0] = Tdata1;
+ data[1] = Tdata2;
+ data[2] = Tdata3;
+ data[3] = Tdata4;
+ data[4] = Tdata5;
+ Uint32 Tnode = Thostptr.i;
+ Uint32 instanceKey = regTcPtr->lqhInstanceKey;
+ BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(lqhRef, GSN_COMMIT, signal, 5, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsLqh > 25 - 5) {
jam();
sendPackedSignalLqh(signal, Thostptr.p);
@@ -4998,12 +5026,6 @@ void Dbtc::sendCommitLqh(Signal* signal,
}//if
UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
- UintR Tdata1 = regTcPtr->lastLqhCon;
- UintR Tdata2 = Uint32(regApiPtr->globalcheckpointid >> 32);
- UintR Tdata3 = regApiPtr->transid[0];
- UintR Tdata4 = regApiPtr->transid[1];
- UintR Tdata5 = Uint32(regApiPtr->globalcheckpointid);
-
TDataPtr[0] = Tdata1 | (ZCOMMIT << 28);
TDataPtr[1] = Tdata2;
TDataPtr[2] = Tdata3;
@@ -5364,8 +5386,26 @@ void Dbtc::sendCompleteLqh(Signal* signa
HostRecordPtr Thostptr;
UintR ThostFilesize = chostFilesize;
ApiConnectRecord * const regApiPtr = apiConnectptr.p;
- Thostptr.i = regTcPtr->lastLqhNodeId;
+ Thostptr.i = regTcPtr->lastLqhNodeId; //last???
ptrCheckGuard(Thostptr, ThostFilesize, hostRecord);
+
+ UintR Tdata1 = regTcPtr->lastLqhCon;
+ UintR Tdata2 = regApiPtr->transid[0];
+ UintR Tdata3 = regApiPtr->transid[1];
+
+ bool send_unpacked = isNdbMtLqh();
+ if (send_unpacked) {
+ Uint32* data = signal->getDataPtrSend();
+ data[0] = Tdata1;
+ data[1] = Tdata2;
+ data[2] = Tdata3;
+ Uint32 Tnode = Thostptr.i;
+ Uint32 instanceKey = regTcPtr->lqhInstanceKey;
+ BlockReference lqhRef = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(lqhRef, GSN_COMPLETE, signal, 3, JBB);
+ return;
+ }
+
if (Thostptr.p->noOfPackedWordsLqh > 22) {
jam();
sendPackedSignalLqh(signal, Thostptr.p);
@@ -5373,14 +5413,9 @@ void Dbtc::sendCompleteLqh(Signal* signa
jam();
updatePackedList(signal, Thostptr.p, Thostptr.i);
}//if
-
UintR Tindex = Thostptr.p->noOfPackedWordsLqh;
UintR* TDataPtr = &Thostptr.p->packedWordsLqh[Tindex];
- UintR Tdata1 = regTcPtr->lastLqhCon | (ZCOMPLETE << 28);
- UintR Tdata2 = regApiPtr->transid[0];
- UintR Tdata3 = regApiPtr->transid[1];
-
- TDataPtr[0] = Tdata1;
+ TDataPtr[0] = Tdata1 | (ZCOMPLETE << 28);
TDataPtr[1] = Tdata2;
TDataPtr[2] = Tdata3;
Thostptr.p->noOfPackedWordsLqh = Tindex + 3;
@@ -5433,6 +5468,27 @@ Dbtc::sendRemoveMarker(Signal* signal,
hostPtr.i = nodeId;
ptrCheckGuard(hostPtr, ThostFilesize, hostRecord);
+ UintR Tdata1 = 0;
+ UintR Tdata2 = transid1;
+ UintR Tdata3 = transid2;
+
+ bool send_unpacked = isNdbMtLqh();
+ if (send_unpacked) {
+ Uint32* data = signal->getDataPtrSend();
+ data[0] = Tdata1;
+ data[1] = Tdata2;
+ data[2] = Tdata3;
+ Uint32 Tnode = hostPtr.i;
+ Uint32 i;
+ for (i = 0; i < MAX_NDBMT_LQH_WORKERS; i++) {
+ // wl4391_todo skip workers not part of tx
+ Uint32 instanceKey = 1 + i;
+ BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
+ sendSignal(ref, GSN_REMOVE_MARKER_ORD, signal, 3, JBB);
+ }
+ return;
+ }
+
if (hostPtr.p->noOfPackedWordsLqh > (25 - 3)){
jam();
sendPackedSignalLqh(signal, hostPtr.p);
@@ -5444,9 +5500,9 @@ Dbtc::sendRemoveMarker(Signal* signal,
UintR numWord = hostPtr.p->noOfPackedWordsLqh;
UintR* dataPtr = &hostPtr.p->packedWordsLqh[numWord];
- dataPtr[0] = (ZREMOVE_MARKER << 28);
- dataPtr[1] = transid1;
- dataPtr[2] = transid2;
+ dataPtr[0] = Tdata1 | (ZREMOVE_MARKER << 28);
+ dataPtr[1] = Tdata2;
+ dataPtr[2] = Tdata3;
hostPtr.p->noOfPackedWordsLqh = numWord + 3;
}
@@ -6674,7 +6730,8 @@ int Dbtc::releaseAndAbort(Signal* signal
/* ************< */
/* ABORT < */
/* ************< */
- tblockref = calcLqhBlockRef(localHostptr.i);
+ Uint32 instanceKey = tcConnectptr.p->lqhInstanceKey;
+ tblockref = numberToRef(DBLQH, instanceKey, localHostptr.i);
signal->theData[0] = tcConnectptr.i;
signal->theData[1] = cownref;
signal->theData[2] = apiConnectptr.p->transid[0];
@@ -8756,7 +8813,8 @@ void Dbtc::toCommitHandlingLab(Signal* s
ptrCheckGuard(hostptr, chostFilesize, hostRecord);
if (hostptr.p->hostStatus == HS_ALIVE) {
jam();
- tblockref = calcLqhBlockRef(hostptr.i);
+ Uint32 instanceKey = tcConnectptr.p->lqhInstanceKey;
+ tblockref = numberToRef(DBLQH, instanceKey, hostptr.i);
setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
apiConnectptr.p->apiConnectstate = CS_WAIT_COMMIT_CONF;
apiConnectptr.p->timeOutCounter = 0;
@@ -8902,7 +8960,8 @@ void Dbtc::toCompleteHandlingLab(Signal*
ptrCheckGuard(hostptr, chostFilesize, hostRecord);
if (hostptr.p->hostStatus == HS_ALIVE) {
jam();
- tblockref = calcLqhBlockRef(hostptr.i);
+ Uint32 instanceKey = tcConnectptr.p->lqhInstanceKey;
+ tblockref = numberToRef(DBLQH, instanceKey, hostptr.i);
setApiConTimer(apiConnectptr.i, ctcTimer, __LINE__);
tcConnectptr.p->tcConnectstate = OS_WAIT_COMPLETE_CONF;
apiConnectptr.p->apiConnectstate = CS_WAIT_COMPLETE_CONF;
@@ -10269,7 +10328,8 @@ void Dbtc::execDIH_SCAN_GET_NODES_CONF(S
/*empty*/;
break;
}//switch
- Uint32 ref = calcLqhBlockRef(tnodeid);
+ Uint32 instanceKey = conf->instanceKey;
+ Uint32 ref = numberToRef(DBLQH, instanceKey, tnodeid);
scanFragptr.p->lqhBlockref = ref;
scanFragptr.p->m_connectCount = getNodeInfo(tnodeid).m_connectCount;
sendScanFragReq(signal, scanptr.p, scanFragptr.p);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-04-09 06:15:37 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupBuffer.cpp 2008-08-11 11:48:10 +0000
@@ -245,10 +245,19 @@ void Dbtup::sendReadAttrinfo(Signal* sig
*
* The UTIL/TC blocks are in another thread (in multi-threaded ndbd), so
* must use sendSignal().
+ *
+ * In MT LQH only LQH and BACKUP are in same thread.
*/
- if (block != DBUTIL && block != DBTC)
+ BlockNumber blockMain = blockToMain(block);
+ if (blockMain == DBLQH || blockMain == BACKUP)
+ {
+ EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
+ jamEntry();
+ }
+ else if (blockMain == SUMA)
{
- EXECUTE_DIRECT(block, GSN_TRANSID_AI, signal, 3 + ToutBufIndex);
+ // wl4391_todo not MT safe
+ EXECUTE_DIRECT(blockMain, GSN_TRANSID_AI, signal, 3 + ToutBufIndex, 0);
jamEntry();
}
else
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2008-08-11 11:44:42 +0000
@@ -290,10 +290,10 @@ void Dbtup::execSTTOR(Signal* signal)
switch (startPhase) {
case ZSTARTPHASE1:
jam();
- ndbrequire((c_lqh= (Dblqh*)globalData.getBlock(DBLQH)) != 0);
+ ndbrequire((c_lqh= (Dblqh*)globalData.getBlock(DBLQH, instance())) != 0);
ndbrequire((c_tsman= (Tsman*)globalData.getBlock(TSMAN)) != 0);
ndbrequire((c_lgman= (Lgman*)globalData.getBlock(LGMAN)) != 0);
- cownref = calcTupBlockRef(0);
+ cownref = calcInstanceBlockRef(DBTUP);
break;
default:
jam();
@@ -304,7 +304,8 @@ void Dbtup::execSTTOR(Signal* signal)
signal->theData[2] = 2;
signal->theData[3] = ZSTARTPHASE1;
signal->theData[4] = 255;
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 5, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : DBTUP_REF;
+ sendSignal(cntrRef, GSN_STTORRY, signal, 5, JBB);
return;
}//Dbtup::execSTTOR()
@@ -533,7 +534,7 @@ void Dbtup::execNDB_STTOR(Signal* signal
case ZSTARTPHASE1:
jam();
cownNodeId = ownNodeId;
- cownref = calcTupBlockRef(ownNodeId);
+ cownref = calcInstanceBlockRef(DBTUP);
break;
case ZSTARTPHASE2:
jam();
@@ -561,7 +562,8 @@ void Dbtup::execNDB_STTOR(Signal* signal
break;
}//switch
signal->theData[0] = cownref;
- sendSignal(cndbcntrRef, GSN_NDB_STTORRY, signal, 1, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : DBTUP_REF;
+ sendSignal(cntrRef, GSN_NDB_STTORRY, signal, 1, JBB);
}//Dbtup::execNDB_STTOR()
void Dbtup::startphase3Lab(Signal* signal, Uint32 config1, Uint32 config2)
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.cpp 2008-08-11 11:53:43 +0000
@@ -20,6 +20,10 @@ DbtupProxy::DbtupProxy(Block_context& ct
LocalProxy(DBTUP, ctx)
{
addRecSignal(GSN_SEND_PACKED, &DbtupProxy::execSEND_PACKED);
+
+ // GSN_DROP_TAB_REQ
+ addRecSignal(GSN_DROP_TAB_REQ, &DbtupProxy::execDROP_TAB_REQ);
+ addRecSignal(GSN_DROP_TAB_CONF, &DbtupProxy::execDROP_TAB_CONF);
}
DbtupProxy::~DbtupProxy()
@@ -44,4 +48,63 @@ DbtupProxy::execSEND_PACKED(Signal* sign
}
}
+// GSN_DROP_TAB_REQ
+
+void
+DbtupProxy::execDROP_TAB_REQ(Signal* signal)
+{
+ const DropTabReq* req = (const DropTabReq*)signal->getDataPtr();
+ Uint32 ssId = getSsId(req);
+ Ss_DROP_TAB_REQ& ss = ssSeize<Ss_DROP_TAB_REQ>(ssId);
+ ss.m_req = *req;
+ ndbrequire(signal->getLength() == DropTabReq::SignalLength);
+ sendREQ(signal, ss);
+}
+
+void
+DbtupProxy::sendDROP_TAB_REQ(Signal* signal, Uint32 ssId)
+{
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+
+ DropTabReq* req = (DropTabReq*)signal->getDataPtrSend();
+ *req = ss.m_req;
+ req->senderRef = reference();
+ req->senderData = ssId; // redundant since tableId is used
+ sendSignal(workerRef(ss.m_worker), GSN_DROP_TAB_REQ,
+ signal, DropTabReq::SignalLength, JBB);
+}
+
+void
+DbtupProxy::execDROP_TAB_CONF(Signal* signal)
+{
+ const DropTabConf* conf = (const DropTabConf*)signal->getDataPtr();
+ Uint32 ssId = getSsId(conf);
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+DbtupProxy::sendDROP_TAB_CONF(Signal* signal, Uint32 ssId)
+{
+ Ss_DROP_TAB_REQ& ss = ssFind<Ss_DROP_TAB_REQ>(ssId);
+ BlockReference dictRef = ss.m_req.senderRef;
+
+ if (!lastReply(ss))
+ return;
+
+ if (ss.m_error == 0) {
+ jam();
+ DropTabConf* conf = (DropTabConf*)signal->getDataPtrSend();
+ conf->senderRef = reference();
+ conf->senderData = ss.m_req.senderData;
+ conf->tableId = ss.m_req.tableId;
+ sendSignal(dictRef, GSN_DROP_TAB_CONF,
+ signal, DropTabConf::SignalLength, JBB);
+ } else {
+ ndbrequire(false);
+ }
+
+ ssRelease<Ss_DROP_TAB_REQ>(ssId);
+}
+
BLOCK_FUNCTIONS(DbtupProxy)
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupProxy.hpp 2008-08-11 11:53:43 +0000
@@ -17,6 +17,7 @@
#define NDB_DBTUP_PROXY
#include <LocalProxy.hpp>
+#include <signaldata/DropTab.hpp>
class DbtupProxy : public LocalProxy {
public:
@@ -29,6 +30,30 @@ protected:
// GSN_SEND_PACKED
void execSEND_PACKED(Signal*);
+
+ // GSN_DROP_TAB_REQ
+ struct Ss_DROP_TAB_REQ : SsParallel {
+ DropTabReq m_req;
+ Ss_DROP_TAB_REQ() {
+ m_sendREQ = (SsFUNC)&DbtupProxy::sendDROP_TAB_REQ;
+ m_sendCONF = (SsFUNC)&DbtupProxy::sendDROP_TAB_CONF;
+ }
+ enum { poolSize = 1 };
+ static SsPool<Ss_DROP_TAB_REQ>& pool(LocalProxy* proxy) {
+ return ((DbtupProxy*)proxy)->c_ss_DROP_TAB_REQ;
+ }
+ };
+ SsPool<Ss_DROP_TAB_REQ> c_ss_DROP_TAB_REQ;
+ Uint32 getSsId(const DropTabReq* req) {
+ return SsIdBase | req->tableId;
+ }
+ Uint32 getSsId(const DropTabConf* conf) {
+ return SsIdBase | conf->tableId;
+ }
+ void execDROP_TAB_REQ(Signal*);
+ void sendDROP_TAB_REQ(Signal*, Uint32 ssId);
+ void execDROP_TAB_CONF(Signal*);
+ void sendDROP_TAB_CONF(Signal*, Uint32 ssId);
};
#endif
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2008-03-25 15:47:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupScan.cpp 2008-08-11 11:48:10 +0000
@@ -412,7 +412,7 @@ Dbtup::scanReply(Signal* signal, ScanOpP
sendSignal(scan.m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB);
} else {
- Uint32 blockNo = refToBlock(scan.m_userRef);
+ Uint32 blockNo = refToMain(scan.m_userRef);
EXECUTE_DIRECT(blockNo, GSN_NEXT_SCANCONF, signal, signalLength);
jamEntry();
}
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp 2008-01-31 13:47:50 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp 2008-08-11 11:48:10 +0000
@@ -81,7 +81,8 @@ void Dbtup::deleteScanProcedure(Signal*
set_trans_state(regOperPtr, TRANS_IDLE);
signal->theData[0] = regOperPtr->userpointer;
signal->theData[1] = storedProcId;
- sendSignal(DBLQH_REF, GSN_STORED_PROCCONF, signal, 2, JBB);
+ BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+ sendSignal(lqhRef, GSN_STORED_PROCCONF, signal, 2, JBB);
}//Dbtup::deleteScanProcedure()
void Dbtup::scanProcedure(Signal* signal,
@@ -212,7 +213,8 @@ bool Dbtup::storedProcedureAttrInfo(Sign
set_trans_state(regOperPtr, TRANS_IDLE);
signal->theData[0] = regOperPtr->userpointer;
signal->theData[1] = storedPtr.i;
- sendSignal(DBLQH_REF, GSN_STORED_PROCCONF, signal, 2, JBB);
+ BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+ sendSignal(lqhRef, GSN_STORED_PROCCONF, signal, 2, JBB);
return true;
}//Dbtup::storedProcedureAttrInfo()
@@ -232,6 +234,7 @@ void Dbtup::storedSeizeAttrinbufrecError
signal->theData[0] = regOperPtr->userpointer;
signal->theData[1] = errorCode;
signal->theData[2] = regOperPtr->storedProcPtr;
- sendSignal(DBLQH_REF, GSN_STORED_PROCREF, signal, 3, JBB);
+ BlockReference lqhRef = calcInstanceBlockRef(DBLQH);
+ sendSignal(lqhRef, GSN_STORED_PROCREF, signal, 3, JBB);
}//Dbtup::storedSeizeAttrinbufrecErrorLab()
=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxGen.cpp 2008-08-11 11:37:57 +0000
@@ -152,7 +152,8 @@ Dbtux::execSTTOR(Signal* signal)
signal->theData[4] = 3; // for c_typeOfStart
signal->theData[5] = 7; // for c_internalStartPhase
signal->theData[6] = 255;
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 7, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : DBTUX_REF;
+ sendSignal(cntrRef, GSN_STTORRY, signal, 7, JBB);
}
void
=== modified file 'storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2007-12-23 12:52:25 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp 2008-08-11 11:21:36 +0000
@@ -313,11 +313,13 @@ Ndbfs::readWriteRequest(int action, Sign
Uint16 filePointer = (Uint16)fsRWReq->filePointer;
const UintR userPointer = fsRWReq->userPointer;
const BlockReference userRef = fsRWReq->userReference;
- const BlockNumber blockNumber = refToBlock(userRef);
+ const BlockNumber blockNumber = refToMain(userRef);
+ const Uint32 instanceNumber = refToInstance(userRef);
AsyncFile* openFile = theOpenFiles.find(filePointer);
- const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsRWReq->varIndex];
+ const NewVARIABLE *myBaseAddrRef =
+ &getBat(blockNumber, instanceNumber)[fsRWReq->varIndex];
UintPtr tPageSize;
UintPtr tClusterSize;
UintPtr tNRR;
@@ -343,7 +345,7 @@ Ndbfs::readWriteRequest(int action, Sign
if(format != FsReadWriteReq::fsFormatGlobalPage &&
format != FsReadWriteReq::fsFormatSharedPage)
{
- if (fsRWReq->varIndex >= getBatSize(blockNumber)) {
+ if (fsRWReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
jam();// Ensure that a valid variable is used
errorCode = FsRef::fsErrInvalidParameters;
goto error;
@@ -561,12 +563,14 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
const Uint16 filePointer = (Uint16)fsReq->filePointer;
const UintR userPointer = fsReq->userPointer;
const BlockReference userRef = fsReq->userReference;
- const BlockNumber blockNumber = refToBlock(userRef);
+ const BlockNumber blockNumber = refToMain(userRef);
+ const Uint32 instanceNumber = refToInstance(userRef);
FsRef::NdbfsErrorCodeType errorCode;
AsyncFile* openFile = theOpenFiles.find(filePointer);
- const NewVARIABLE *myBaseAddrRef = &getBat(blockNumber)[fsReq->varIndex];
+ const NewVARIABLE *myBaseAddrRef =
+ &getBat(blockNumber, instanceNumber)[fsReq->varIndex];
const Uint32* tWA = (const Uint32*)myBaseAddrRef->WA;
const Uint32 tSz = myBaseAddrRef->nrr;
@@ -587,7 +591,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
goto error;
}
- if (fsReq->varIndex >= getBatSize(blockNumber)) {
+ if (fsReq->varIndex >= getBatSize(blockNumber, instanceNumber)) {
jam();// Ensure that a valid variable is used
errorCode = FsRef::fsErrInvalidParameters;
goto error;
=== modified file 'storage/ndb/src/kernel/blocks/restore.cpp'
--- a/storage/ndb/src/kernel/blocks/restore.cpp 2008-07-26 05:13:40 +0000
+++ b/storage/ndb/src/kernel/blocks/restore.cpp 2008-08-11 11:37:57 +0000
@@ -190,7 +190,8 @@ Restore::sendSTTORRY(Signal* signal){
signal->theData[3] = 1;
signal->theData[4] = 3;
signal->theData[5] = 255; // No more start phases from missra
- sendSignal(NDBCNTR_REF, GSN_STTORRY, signal, 6, JBB);
+ BlockReference cntrRef = !isNdbMtLqh() ? NDBCNTR_REF : RESTORE_REF;
+ sendSignal(cntrRef, GSN_STTORRY, signal, 6, JBB);
}
void
=== modified file 'storage/ndb/src/kernel/main.cpp'
--- a/storage/ndb/src/kernel/main.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/main.cpp 2008-08-11 11:21:36 +0000
@@ -286,23 +286,20 @@ init_global_memory_manager(EmulatorData
return 0; // Success
}
-bool g_ndbMt = false;
-bool g_ndbMtLqh = false;
-
static int
get_multithreaded_config(EmulatorData& ed)
{
- // multithreaded is compiled in ndbd/ndbmtd
- g_ndbMt = SimulatedBlock::isMultiThreaded();
- if (!g_ndbMt)
+ // multithreaded is compiled in ndbd/ndbmtd for now
+ globalData.isNdbMt = SimulatedBlock::isMultiThreaded();
+ if (!globalData.isNdbMt)
return 0;
// mt lqh via environment during development
{
const char* p = NdbEnv_GetEnv("NDB_MT_LQH", (char*)0, 0);
if (p != 0 && strchr("1Y", p[0]) != 0)
- g_ndbMtLqh = true;
- if (!g_ndbMtLqh)
+ globalData.isNdbMtLqh = true;
+ if (!globalData.isNdbMtLqh)
return 0;
}
@@ -315,8 +312,8 @@ get_multithreaded_config(EmulatorData& e
Uint32 workers = 0;
Uint32 threads = 0;
- if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_WORKERS, &workers) ||
- ndb_mgm_get_int_parameter(p, CFG_NDBMT_THREADS, &threads))
+ if (ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_WORKERS, &workers) ||
+ ndb_mgm_get_int_parameter(p, CFG_NDBMT_LQH_THREADS, &threads))
{
g_eventLogger->alert("Failed to get CFG_NDBMT parameters from "
"config, exiting.");
@@ -326,12 +323,12 @@ get_multithreaded_config(EmulatorData& e
ndbout << "NDBMT: workers=" << workers
<< " threads=" << threads << endl;
- assert(workers != 0 && workers <= MAX_NDBMT_WORKERS);
- assert(threads != 0 && threads <= MAX_NDBMT_THREADS);
+ assert(workers != 0 && workers <= MAX_NDBMT_LQH_WORKERS);
+ assert(threads != 0 && threads <= MAX_NDBMT_LQH_THREADS);
assert(workers % threads == 0);
- globalData.ndbmtWorkers = workers;
- globalData.ndbmtThreads = threads;
+ globalData.ndbMtLqhWorkers = workers;
+ globalData.ndbMtLqhThreads = threads;
return 0;
}
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-08-11 11:21:36 +0000
@@ -1003,11 +1003,11 @@ Configuration::calcSizeAlt(ConfigValues
{
uint workers = 4;
uint threads = 2;
- assert(workers <= MAX_NDBMT_WORKERS);
- assert(threads <= MAX_NDBMT_THREADS);
+ assert(workers <= MAX_NDBMT_LQH_WORKERS);
+ assert(threads <= MAX_NDBMT_LQH_THREADS);
assert(workers % threads == 0);
- cfg.put(CFG_NDBMT_WORKERS, workers);
- cfg.put(CFG_NDBMT_THREADS, threads);
+ cfg.put(CFG_NDBMT_LQH_WORKERS, workers);
+ cfg.put(CFG_NDBMT_LQH_THREADS, threads);
}
m_ownConfig = (ndb_mgm_configuration*)cfg.getConfigValues();
=== modified file 'storage/ndb/src/kernel/vm/FastScheduler.cpp'
--- a/storage/ndb/src/kernel/vm/FastScheduler.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/kernel/vm/FastScheduler.cpp 2008-08-11 11:21:36 +0000
@@ -99,7 +99,8 @@ FastScheduler::doJob()
// signal->garbage_register();
// To ensure we find bugs quickly
register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
- register BlockNumber reg_bnr = gsnbnr & 0xFFF;
+ // also strip any instance bits since this is non-MT code
+ register BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
globalData.incrementWatchDogCounter(1);
if (reg_bnr > 0) {
@@ -411,6 +412,8 @@ void FastScheduler::dumpSignalMemory(Uin
ReadPtr[tLevel]--;
theJobBuffers[tLevel].retrieveDump(&signal, ReadPtr[tLevel]);
+ // strip instance bits since this in non-MT code
+ signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
print_restart(output, &signal, tLevel);
if (tJob == 0)
=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2008-08-11 11:21:36 +0000
@@ -68,15 +68,19 @@ struct GlobalData {
Uint32 sendPackedActivated;
Uint32 activateSendPacked;
- Uint32 ndbmtWorkers;
- Uint32 ndbmtThreads;
+ bool isNdbMt; // ndbd multithreaded, no workers
+ bool isNdbMtLqh; // ndbd multithreaded, LQH workers
+ Uint32 ndbMtLqhWorkers;
+ Uint32 ndbMtLqhThreads;
GlobalData(){
theSignalId = 0;
theStartLevel = NodeState::SL_NOTHING;
theRestartFlag = perform_start;
- ndbmtWorkers = 0;
- ndbmtThreads = 0;
+ isNdbMt = false;
+ isNdbMtLqh = false;
+ ndbMtLqhWorkers = 0;
+ ndbMtLqhThreads = 0;
#ifdef GCP_TIMER_HACK
gcp_timer_limit = 0;
#endif
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-08-11 11:30:18 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2008-08-11 12:48:03 +0000
@@ -225,6 +225,25 @@ SimulatedBlock::getInstanceKey(Uint32 ta
return instanceKey;
}
+Uint32
+SimulatedBlock::getLogPartId(Uint32 tabId, Uint32 fragId)
+{
+ Dbdih* dbdih = (Dbdih*)globalData.getBlock(DBDIH);
+ ndbrequire(dbdih != 0);
+ Uint32 logPartId = dbdih->dihGetLogPartId(tabId, fragId);
+ return logPartId;
+}
+
+bool
+SimulatedBlock::isLogPartOwner(Uint32 worker, Uint32 logPartId)
+{
+ if (!globalData.isNdbMtLqh)
+ return true;
+ Uint32 workers = globalData.ndbMtLqhWorkers;
+ ndbrequire(workers != 0 && worker < workers);
+ return worker == logPartId % workers;
+}
+
void
SimulatedBlock::assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
Uint32 *watchDogCounter)
@@ -1300,6 +1319,7 @@ SimulatedBlock::freeBat(){
const NewVARIABLE *
SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
+ assert(blockNo == blockToMain(blockNo));
SimulatedBlock * sb = globalData.getBlock(blockNo);
if (sb != 0 && instanceNo != 0)
sb = sb->getInstance(instanceNo);
@@ -1310,6 +1330,7 @@ SimulatedBlock::getBat(Uint16 blockNo, U
Uint16
SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
+ assert(blockNo == blockToMain(blockNo));
SimulatedBlock * sb = globalData.getBlock(blockNo);
if (sb != 0 && instanceNo != 0)
sb = sb->getInstance(instanceNo);
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2008-08-11 11:21:36 +0000
@@ -158,7 +158,6 @@ public:
}
return 0;
}
- Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
/* Setup state of a block object for executing in a particular thread. */
void assignToThread(Uint32 threadId, EmulatedJamBuffer *jamBuffer,
@@ -167,6 +166,20 @@ public:
uint32 getThreadId() const { return m_threadId; }
static bool isMultiThreaded();
+ /* Configuration based alternative. Applies only to this node */
+ static bool isNdbMt() { return globalData.isNdbMt; }
+ static bool isNdbMtLqh() { return globalData.isNdbMtLqh; }
+
+ /*
+ * Instance key (1-4, even if not MT LQH) is set in receiver block ref.
+ * The receiver maps it to a real instance (0, if not MT LQH).
+ */
+ Uint32 getInstanceKey(Uint32 tabId, Uint32 fragId);
+
+ /* MT LQH log parts info for use by this node */
+ Uint32 getLogPartId(Uint32 tabId, Uint32 fragId);
+ bool isLogPartOwner(Uint32 worker, Uint32 logPartId);
+
public:
typedef void (SimulatedBlock::* CallbackFunction)(class Signal*,
Uint32 callbackData,
@@ -442,7 +455,7 @@ private:
* In MT LQH main instance is the LQH proxy and the others ("workers")
* are real LQHs run by multiple threads.
*/
- enum { MaxInstances = 1 + MAX_NDBMT_WORKERS };
+ enum { MaxInstances = 1 + MAX_NDBMT_LQH_WORKERS };
Uint32 theInstanceCount; // set in main
SimulatedBlock** theInstanceList; // set in main, indexed by instance
SimulatedBlock* theMainInstance; // set in all
@@ -464,9 +477,9 @@ protected:
NewVARIABLE* allocateBat(int batSize);
void freeBat();
static const NewVARIABLE* getBat (BlockNumber blockNo,
- Uint32 instanceNo = 0);
+ Uint32 instanceNo);
static Uint16 getBatSize(BlockNumber blockNo,
- Uint32 instanceNo = 0);
+ Uint32 instanceNo);
static BlockReference calcTcBlockRef (NodeId aNode);
static BlockReference calcLqhBlockRef (NodeId aNode);
@@ -483,6 +496,12 @@ protected:
static BlockReference calcApiClusterMgrBlockRef (NodeId aNode);
+ // matching instance on same node e.g. LQH-ACC-TUP
+ BlockReference calcInstanceBlockRef(BlockNumber aBlock);
+
+ // matching instance on another node e.g. LQH-LQH
+ BlockReference calcInstanceBlockRef(BlockNumber aBlock, NodeId aNode);
+
/**
* allocRecord
* Allocates memory for the datastructures where ndb keeps the data
@@ -832,6 +851,18 @@ SimulatedBlock::calcApiClusterMgrBlockRe
}
inline
+BlockReference
+SimulatedBlock::calcInstanceBlockRef(BlockNumber aBlock){
+ return numberToRef(aBlock, instance(), getOwnNodeId());
+}
+
+inline
+BlockReference
+SimulatedBlock::calcInstanceBlockRef(BlockNumber aBlock, NodeId aNodeId){
+ return numberToRef(aBlock, instance(), aNodeId);
+}
+
+inline
const NodeState &
SimulatedBlock::getNodeState() const {
return theNodeState;
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-08-07 11:52:50 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-08-11 11:21:36 +0000
@@ -202,13 +202,7 @@ TransporterCallbackKernel::deliver_signa
const Uint32 secCount = header->m_noOfSections;
const Uint32 length = header->theLength;
- /*
- * Strip instance bits if not multithreaded. This is also
- * done in versions prior to MT LQH, to simplify online upgrade.
- */
-#ifndef NDBD_MULTITHREADED
- header->theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
-#endif
+ // if this node is not MT LQH then instance bits are stripped at execute
#ifdef TRACE_DISTRIBUTED
ndbout_c("recv: %s(%d) from (%s, %d)",
=== modified file 'storage/ndb/src/kernel/vm/dummy_nonmt.cpp'
--- a/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/dummy_nonmt.cpp 2008-08-11 11:21:36 +0000
@@ -8,7 +8,7 @@ add_thr_map(Uint32, Uint32, Uint32)
}
void
-add_worker_thr_map(Uint32, Uint32)
+add_lqh_worker_thr_map(Uint32, Uint32)
{
assert(false);
}
=== modified file 'storage/ndb/src/kernel/vm/mt.cpp'
--- a/storage/ndb/src/kernel/vm/mt.cpp 2008-08-09 09:59:50 +0000
+++ b/storage/ndb/src/kernel/vm/mt.cpp 2008-08-11 11:21:36 +0000
@@ -43,9 +43,9 @@ static const Uint32 MAX_SIGNALS_PER_JB =
//#define NDB_MT_LOCK_TO_CPU
-#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_WORKERS)
+#define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS)
#define NUM_MAIN_THREADS 2 // except receiver
-#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_THREADS + 1)
+#define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
static Uint32 ndbmt_workers = 0;
static Uint32 ndbmt_threads = 0;
@@ -1783,10 +1783,19 @@ execute_signals(thr_data *selfptr, thr_j
if(siglen>16)
__builtin_prefetch (read_buffer->m_data + read_pos + 32, 0, 3);
Uint32 bno = blockToMain(s->theReceiversBlockNumber);
- Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
+ SimulatedBlock* block = globalData.getBlock(bno);
+
+ if (ndbmt_workers != 0) { // MT LQH
+ Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
+ if (ino != 0) {
+ // map instance key to real instance
+ ino = 1 + (ino - 1) % ndbmt_workers;
+ block = block->getInstance(ino);
+ assert(block != 0);
+ }
+ }
+
Uint32 gsn = s->theVerId_signalNumber;
- SimulatedBlock * main = globalData.getBlock(bno);
- SimulatedBlock * block = main->getInstance(ino);
*watchDogCounter = 1;
/* Must update original buffer so signal dump will see it. */
s->theSignalId = (*signalIdCounter)++;
@@ -1926,7 +1935,7 @@ add_main_thr_map()
// workers added by LocalProxy
void
-add_worker_thr_map(Uint32 block, Uint32 instance)
+add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
{
assert(instance != 0);
Uint32 i = instance - 1;
@@ -2187,11 +2196,13 @@ sendlocal(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+ Uint32 instance = 0;
- // map on receiver side
- if (instance != 0)
- instance = 1 + (instance - 1) % ndbmt_workers;
+ if (ndbmt_workers != 0) { // MT LQH
+ instance = blockToInstance(s->theReceiversBlockNumber);
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+ }
/*
* Max number of signals to put into job buffer before flushing the buffer
@@ -2223,11 +2234,13 @@ sendprioa(Uint32 self, const SignalHeade
const Uint32 secPtr[3])
{
Uint32 block = blockToMain(s->theReceiversBlockNumber);
- Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
+ Uint32 instance = 0;
- // map on receiver side
- if (instance != 0)
- instance = 1 + (instance - 1) % ndbmt_workers;
+ if (ndbmt_workers != 0) { // MT LQH
+ instance = blockToInstance(s->theReceiversBlockNumber);
+ if (instance != 0)
+ instance = 1 + (instance - 1) % ndbmt_workers;
+ }
Uint32 dst = block2ThreadId(block, instance);
struct thr_repository* rep = &g_thr_repository;
@@ -2615,8 +2628,8 @@ ThreadConfig::~ThreadConfig()
void
ThreadConfig::init(EmulatorData *emulatorData)
{
- ndbmt_workers = globalData.ndbmtWorkers;
- ndbmt_threads = globalData.ndbmtThreads;
+ ndbmt_workers = globalData.ndbMtLqhWorkers;
+ ndbmt_threads = globalData.ndbMtLqhThreads;
num_threads = NUM_MAIN_THREADS + ndbmt_threads + 1;
assert(num_threads <= MAX_THREADS);
receiver_thread_no = num_threads - 1;
@@ -3005,6 +3018,9 @@ FastScheduler::dumpSignalMemory(Uint32 t
if (siglen > 25)
siglen = 25; // Sanity check
memcpy(&signal.header, s, 4*siglen);
+ // instance number in trace file is confusing if not MT LQH
+ if (ndbmt_workers == 0)
+ signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
const Uint32 *posptr = reinterpret_cast<const Uint32 *>(s);
signal.m_sectionPtrI[0] = posptr[siglen + 0];
=== modified file 'storage/ndb/src/kernel/vm/mt.hpp'
--- a/storage/ndb/src/kernel/vm/mt.hpp 2008-07-25 05:48:32 +0000
+++ b/storage/ndb/src/kernel/vm/mt.hpp 2008-08-11 11:21:36 +0000
@@ -16,7 +16,7 @@ extern Uint32 receiverThreadId;
/* Assign block instance to thread */
void add_thr_map(Uint32 block, Uint32 instance, Uint32 thr_no);
-void add_worker_thr_map(Uint32 block, Uint32 instance);
+void add_lqh_worker_thr_map(Uint32 block, Uint32 instance);
void sendlocal(Uint32 self, const struct SignalHeader *s,
const Uint32 *data, const Uint32 secPtr[3]);
| Thread |
|---|
| • bzr push into mysql-5.1 branch (pekka:2707 to 2715) WL#4391 | Pekka Nousiainen | 12 Aug |