3758 jonas oreland 2012-01-10 [merge]
ndb - merge 71 to 72
modified:
storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp
storage/ndb/include/kernel/signaldata/EnableCom.hpp
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/blocks/trpman.hpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/GlobalData.hpp
3757 Jonas Oreland 2012-01-10 [merge]
ndb - merge 71 to 72
modified:
storage/ndb/include/kernel/GlobalSignalNumbers.h
storage/ndb/include/ndb_types.h.in
storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
storage/ndb/src/kernel/blocks/trpman.cpp
storage/ndb/src/kernel/blocks/trpman.hpp
storage/ndb/src/kernel/vm/ArrayPool.hpp
storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp'
--- a/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp 2011-11-22 20:11:29 +0000
+++ b/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp 2012-01-10 18:21:38 +0000
@@ -35,6 +35,7 @@ class CloseComReqConf {
*/
friend class Qmgr;
friend class Trpman;
+ friend class TrpmanProxy;
/**
* For printing
=== modified file 'storage/ndb/include/kernel/signaldata/EnableCom.hpp'
--- a/storage/ndb/include/kernel/signaldata/EnableCom.hpp 2011-11-22 13:03:47 +0000
+++ b/storage/ndb/include/kernel/signaldata/EnableCom.hpp 2012-01-10 13:54:55 +0000
@@ -21,6 +21,7 @@
class EnableComReq {
friend class Qmgr;
friend class Trpman;
+ friend class TrpmanProxy;
public:
STATIC_CONST( SignalLength = 2 + NodeBitmask::Size );
@@ -34,6 +35,7 @@ private:
class EnableComConf {
friend class Qmgr;
friend class Trpman;
+ friend class TrpmanProxy;
friend class Cmvmi;
public:
=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-10 10:59:37 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2012-01-10 13:54:55 +0000
@@ -29,6 +29,7 @@ Trpman::Trpman(Block_context & ctx, Uint
BLOCK_CONSTRUCTOR(Trpman);
addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ);
+ addRecSignal(GSN_CLOSE_COMCONF, &Trpman::execCLOSE_COMCONF);
addRecSignal(GSN_OPEN_COMORD, &Trpman::execOPEN_COMORD);
addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ);
addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP);
@@ -51,11 +52,17 @@ static NodeBitmask c_error_9000_nodes_ma
extern Uint32 MAX_RECEIVED_SIGNALS;
#endif
-static
bool
-handles_this_node(Uint32 nodeId)
+Trpman::handles_this_node(Uint32 nodeId)
{
+#if MAX_NDBMT_RECEIVE_THREADS == 1
return true;
+#else
+ if (globalData.ndbMtReceiveThreads <= (Uint32)1)
+ return true;
+ return (instance()==
+ (globalTransporterRegistry.getReceiveThreadId(nodeId) + 1));
+#endif
}
void
@@ -213,7 +220,7 @@ Trpman::execCLOSE_COMREQ(Signal* signal)
* bitmap is not trampled above
* signals received from the remote node.
*/
- sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
+ sendSignal(TRPMAN_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
}
}
@@ -236,13 +243,13 @@ Trpman::execENABLE_COMREQ(Signal* signal
const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr();
/* Need to copy out signal data to not clobber it with sendSignal(). */
- Uint32 senderRef = enableComReq->m_senderRef;
+ BlockReference senderRef = enableComReq->m_senderRef;
Uint32 senderData = enableComReq->m_senderData;
Uint32 nodes[NodeBitmask::Size];
MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size);
/* Enable communication with all our NDB blocks to these nodes. */
- Uint32 search_from = 0;
+ Uint32 search_from = 1;
for (;;)
{
Uint32 tStartingNode = NodeBitmask::find(nodes, search_from);
@@ -610,9 +617,11 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
TrpmanProxy::TrpmanProxy(Block_context & ctx) :
LocalProxy(TRPMAN, ctx)
{
- addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
addRecSignal(GSN_OPEN_COMORD, &TrpmanProxy::execOPEN_COMORD);
addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ);
+ addRecSignal(GSN_ENABLE_COMCONF, &TrpmanProxy::execENABLE_COMCONF);
+ addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
+ addRecSignal(GSN_CLOSE_COMCONF, &TrpmanProxy::execCLOSE_COMCONF);
addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD);
}
@@ -628,43 +637,147 @@ TrpmanProxy::newWorker(Uint32 instanceNo
BLOCK_FUNCTIONS(TrpmanProxy);
-/**
- * TODO TrpmanProxy need to have operation records
- * to support splicing a request onto several Trpman-instances
- * according to how receive-threads are assigned to instances
- */
+// GSN_OPEN_COMORD
+
void
TrpmanProxy::execOPEN_COMORD(Signal* signal)
{
jamEntry();
- SectionHandle handle(this, signal);
- sendSignal(workerRef(0), GSN_OPEN_COMORD, signal,
- signal->getLength(), JBB, &handle);
+
+ for (Uint32 i = 0; i<c_workers; i++)
+ {
+ jam();
+ sendSignal(workerRef(i), GSN_OPEN_COMORD, signal,
+ signal->getLength(), JBB);
+ }
}
+// GSN_CLOSE_COMREQ
+
void
TrpmanProxy::execCLOSE_COMREQ(Signal* signal)
{
jamEntry();
- SectionHandle handle(this, signal);
- sendSignal(workerRef(0), GSN_CLOSE_COMREQ, signal,
- signal->getLength(), JBB, &handle);
+ Ss_CLOSE_COMREQ& ss = ssSeize<Ss_CLOSE_COMREQ>();
+ const CloseComReqConf* req = (const CloseComReqConf*)signal->getDataPtr();
+ ss.m_req = *req;
+ sendREQ(signal, ss);
+}
+
+void
+TrpmanProxy::sendCLOSE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
+{
+ jam();
+ Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
+ CloseComReqConf* req = (CloseComReqConf*)signal->getDataPtrSend();
+
+ *req = ss.m_req;
+ req->xxxBlockRef = reference();
+ req->failNo = ssId;
+ sendSignal(workerRef(ss.m_worker), GSN_CLOSE_COMREQ, signal,
+ CloseComReqConf::SignalLength, JBB);
+}
+
+void
+TrpmanProxy::execCLOSE_COMCONF(Signal* signal)
+{
+ const CloseComReqConf* conf = (const CloseComReqConf*)signal->getDataPtr();
+ Uint32 ssId = conf->failNo;
+ jamEntry();
+ Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+TrpmanProxy::sendCLOSE_COMCONF(Signal *signal, Uint32 ssId)
+{
+ jam();
+ Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
+
+ if (!lastReply(ss))
+ {
+ jam();
+ return;
+ }
+
+ CloseComReqConf* conf = (CloseComReqConf*)signal->getDataPtrSend();
+ *conf = ss.m_req;
+ sendSignal(conf->xxxBlockRef, GSN_CLOSE_COMCONF, signal,
+ CloseComReqConf::SignalLength, JBB);
+ ssRelease<Ss_CLOSE_COMREQ>(ssId);
}
+// GSN_ENABLE_COMREQ
+
void
TrpmanProxy::execENABLE_COMREQ(Signal* signal)
{
jamEntry();
- SectionHandle handle(this, signal);
- sendSignal(workerRef(0), GSN_ENABLE_COMREQ, signal,
- signal->getLength(), JBB, &handle);
+ Ss_ENABLE_COMREQ& ss = ssSeize<Ss_ENABLE_COMREQ>();
+ const EnableComReq* req = (const EnableComReq*)signal->getDataPtr();
+ ss.m_req = *req;
+ sendREQ(signal, ss);
+}
+
+void
+TrpmanProxy::sendENABLE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
+{
+ jam();
+ Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
+ EnableComReq* req = (EnableComReq*)signal->getDataPtrSend();
+
+ *req = ss.m_req;
+ req->m_senderRef = reference();
+ req->m_senderData = ssId;
+ sendSignal(workerRef(ss.m_worker), GSN_ENABLE_COMREQ, signal,
+ EnableComReq::SignalLength, JBB);
}
void
+TrpmanProxy::execENABLE_COMCONF(Signal* signal)
+{
+ const EnableComConf* conf = (const EnableComConf*)signal->getDataPtr();
+ Uint32 ssId = conf->m_senderData;
+ jamEntry();
+ Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
+ recvCONF(signal, ss);
+}
+
+void
+TrpmanProxy::sendENABLE_COMCONF(Signal *signal, Uint32 ssId)
+{
+ jam();
+ Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
+
+ if (!lastReply(ss))
+ {
+ jam();
+ return;
+ }
+
+ EnableComReq* conf = (EnableComReq*)signal->getDataPtr();
+ *conf = ss.m_req;
+ sendSignal(conf->m_senderRef, GSN_ENABLE_COMCONF, signal,
+ EnableComReq::SignalLength, JBB);
+ ssRelease<Ss_ENABLE_COMREQ>(ssId);
+}
+
+// GSN_ROUTE_ORD
+
+void
TrpmanProxy::execROUTE_ORD(Signal* signal)
{
+ RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
+ Uint32 nodeId = ord->from;
jamEntry();
+
+ ndbassert(nodeId != 0);
+#if MAX_NDBMT_RECEIVE_THREADS == 1
+ Uint32 workerId = 0;
+#else
+ Uint32 workerId = globalTransporterRegistry.getReceiveThreadId(nodeId);
+#endif
SectionHandle handle(this, signal);
- sendSignal(workerRef(0), GSN_ROUTE_ORD, signal,
+ sendSignal(workerRef(workerId), GSN_ROUTE_ORD, signal,
signal->getLength(), JBB, &handle);
}
=== modified file 'storage/ndb/src/kernel/blocks/trpman.hpp'
--- a/storage/ndb/src/kernel/blocks/trpman.hpp 2012-01-10 10:59:37 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.hpp 2012-01-10 13:54:55 +0000
@@ -21,6 +21,8 @@
#include <pc.hpp>
#include <SimulatedBlock.hpp>
#include <LocalProxy.hpp>
+#include <signaldata/EnableCom.hpp>
+#include <signaldata/CloseComReqConf.hpp>
class Trpman : public SimulatedBlock
{
@@ -42,7 +44,8 @@ public:
void execNDB_TAMPER(Signal*);
void execDUMP_STATE_ORD(Signal*);
protected:
-
+private:
+ bool handles_this_node(Uint32 nodeId);
};
class TrpmanProxy : public LocalProxy
@@ -52,14 +55,50 @@ public:
virtual ~TrpmanProxy();
BLOCK_DEFINES(TrpmanProxy);
- void execCLOSE_COMREQ(Signal *signal);
+ // GSN_OPEN_COMORD
void execOPEN_COMORD(Signal *signal);
+
+ // GSN_CLOSE_COMREQ
+ struct Ss_CLOSE_COMREQ : SsParallel {
+ CloseComReqConf m_req;
+ Ss_CLOSE_COMREQ() {
+ m_sendREQ = (SsFUNCREQ)&TrpmanProxy::sendCLOSE_COMREQ;
+ m_sendCONF = (SsFUNCREP)&TrpmanProxy::sendCLOSE_COMCONF;
+ }
+ enum { poolSize = MAX_NODES };
+ static SsPool<Ss_CLOSE_COMREQ>& pool(LocalProxy* proxy) {
+ return ((TrpmanProxy*)proxy)->c_ss_CLOSE_COMREQ;
+ }
+ };
+ SsPool<Ss_CLOSE_COMREQ> c_ss_CLOSE_COMREQ;
+ void execCLOSE_COMREQ(Signal *signal);
+ void sendCLOSE_COMREQ(Signal*, Uint32 ssId, SectionHandle*);
+ void execCLOSE_COMCONF(Signal *signal);
+ void sendCLOSE_COMCONF(Signal*, Uint32 ssId);
+
+ // GSN_ENABLE_COMREQ
+ struct Ss_ENABLE_COMREQ : SsParallel {
+ EnableComReq m_req;
+ Ss_ENABLE_COMREQ() {
+ m_sendREQ = (SsFUNCREQ)&TrpmanProxy::sendENABLE_COMREQ;
+ m_sendCONF = (SsFUNCREP)&TrpmanProxy::sendENABLE_COMCONF;
+ }
+ enum { poolSize = MAX_NODES };
+ static SsPool<Ss_ENABLE_COMREQ>& pool(LocalProxy* proxy) {
+ return ((TrpmanProxy*)proxy)->c_ss_ENABLE_COMREQ;
+ }
+ };
+ SsPool<Ss_ENABLE_COMREQ> c_ss_ENABLE_COMREQ;
void execENABLE_COMREQ(Signal *signal);
- void execROUTE_ORD(Signal* signal);
+ void sendENABLE_COMREQ(Signal*, Uint32 ssId, SectionHandle*);
+ void execENABLE_COMCONF(Signal *signal);
+ void sendENABLE_COMCONF(Signal*, Uint32 ssId);
+ void execROUTE_ORD(Signal* signal);
void execNDB_TAMPER(Signal*);
void execDUMP_STATE_ORD(Signal*);
protected:
virtual SimulatedBlock* newWorker(Uint32 instanceNo);
};
+
#endif
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-11-22 20:11:29 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2012-01-10 18:21:38 +0000
@@ -470,6 +470,11 @@ Configuration::setupConfiguration(){
break;
globalData.ndbMtTcThreads = m_thr_config.getThreadCount(THRConfig::T_TC);
+ globalData.ndbMtSendThreads =
+ m_thr_config.getThreadCount(THRConfig::T_SEND);
+ globalData.ndbMtReceiveThreads =
+ m_thr_config.getThreadCount(THRConfig::T_RECV);
+
globalData.isNdbMtLqh = true;
{
if (m_thr_config.getMtClassic())
=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp 2011-11-14 12:02:56 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp 2012-01-10 13:54:55 +0000
@@ -41,10 +41,10 @@ enum restartStates {initial_state,
perform_stop};
struct GlobalData {
+ NodeInfo m_nodeInfo[MAX_NODES]; // At top to ensure cache alignment
+ Signal VMSignals[1]; // Owned by FastScheduler::
Uint32 m_restart_seq; //
NodeVersionInfo m_versionInfo;
- NodeInfo m_nodeInfo[MAX_NODES];
- Signal VMSignals[1]; // Owned by FastScheduler::
Uint64 internalMillisecCounter; // Owned by ThreadConfig::
Uint32 highestAvailablePrio; // Owned by FastScheduler::
@@ -75,6 +75,8 @@ struct GlobalData {
Uint32 ndbMtLqhWorkers;
Uint32 ndbMtLqhThreads;
Uint32 ndbMtTcThreads;
+ Uint32 ndbMtSendThreads;
+ Uint32 ndbMtReceiveThreads;
Uint32 ndbLogParts;
GlobalData(){
@@ -86,6 +88,8 @@ struct GlobalData {
ndbMtLqhWorkers = 0;
ndbMtLqhThreads = 0;
ndbMtTcThreads = 0;
+ ndbMtSendThreads = 0;
+ ndbMtReceiveThreads = 0;
ndbLogParts = 0;
#ifdef GCP_TIMER_HACK
gcp_timer_limit = 0;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3757 to 3758) | jonas oreland | 10 Jan |