List:Commits« Previous MessageNext Message »
From:jonas oreland Date:January 10 2012 6:23pm
Subject:bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3757 to 3758)
View as plain text  
 3758 jonas oreland	2012-01-10 [merge]
      ndb - merge 71 to 72

    modified:
      storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp
      storage/ndb/include/kernel/signaldata/EnableCom.hpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/blocks/trpman.hpp
      storage/ndb/src/kernel/vm/Configuration.cpp
      storage/ndb/src/kernel/vm/GlobalData.hpp
 3757 Jonas Oreland	2012-01-10 [merge]
      ndb - merge 71 to 72

    modified:
      storage/ndb/include/kernel/GlobalSignalNumbers.h
      storage/ndb/include/ndb_types.h.in
      storage/ndb/src/common/debugger/signaldata/SignalNames.cpp
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
      storage/ndb/src/kernel/blocks/trpman.cpp
      storage/ndb/src/kernel/blocks/trpman.hpp
      storage/ndb/src/kernel/vm/ArrayPool.hpp
      storage/ndb/src/kernel/vm/mt.cpp
=== modified file 'storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp'
--- a/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp	2011-11-22 20:11:29 +0000
+++ b/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp	2012-01-10 18:21:38 +0000
@@ -35,6 +35,7 @@ class CloseComReqConf {
    */
   friend class Qmgr;
   friend class Trpman;
+  friend class TrpmanProxy;
 
   /**
    * For printing

=== modified file 'storage/ndb/include/kernel/signaldata/EnableCom.hpp'
--- a/storage/ndb/include/kernel/signaldata/EnableCom.hpp	2011-11-22 13:03:47 +0000
+++ b/storage/ndb/include/kernel/signaldata/EnableCom.hpp	2012-01-10 13:54:55 +0000
@@ -21,6 +21,7 @@
 class EnableComReq  {
   friend class Qmgr;
   friend class Trpman;
+  friend class TrpmanProxy;
 
 public:
   STATIC_CONST( SignalLength = 2 + NodeBitmask::Size );
@@ -34,6 +35,7 @@ private:
 class EnableComConf  {
   friend class Qmgr;
   friend class Trpman;
+  friend class TrpmanProxy;
   friend class Cmvmi;
 
 public:

=== modified file 'storage/ndb/src/kernel/blocks/trpman.cpp'
--- a/storage/ndb/src/kernel/blocks/trpman.cpp	2012-01-10 10:59:37 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.cpp	2012-01-10 13:54:55 +0000
@@ -29,6 +29,7 @@ Trpman::Trpman(Block_context & ctx, Uint
   BLOCK_CONSTRUCTOR(Trpman);
 
   addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ);
+  addRecSignal(GSN_CLOSE_COMCONF, &Trpman::execCLOSE_COMCONF);
   addRecSignal(GSN_OPEN_COMORD, &Trpman::execOPEN_COMORD);
   addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ);
   addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP);
@@ -51,11 +52,17 @@ static NodeBitmask c_error_9000_nodes_ma
 extern Uint32 MAX_RECEIVED_SIGNALS;
 #endif
 
-static
 bool
-handles_this_node(Uint32 nodeId)
+Trpman::handles_this_node(Uint32 nodeId)
 {
+#if MAX_NDBMT_RECEIVE_THREADS == 1
   return true;
+#else
+  if (globalData.ndbMtReceiveThreads <= (Uint32)1)
+    return true;
+  return (instance()==
+          (globalTransporterRegistry.getReceiveThreadId(nodeId) + 1));
+#endif
 }
 
 void
@@ -213,7 +220,7 @@ Trpman::execCLOSE_COMREQ(Signal* signal)
      * bitmap is not trampled above
      * signals received from the remote node.
      */
-    sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
+    sendSignal(TRPMAN_REF, GSN_CLOSE_COMCONF, signal, 19, JBA);
   }
 }
 
@@ -236,13 +243,13 @@ Trpman::execENABLE_COMREQ(Signal* signal
   const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr();
 
   /* Need to copy out signal data to not clobber it with sendSignal(). */
-  Uint32 senderRef = enableComReq->m_senderRef;
+  BlockReference senderRef = enableComReq->m_senderRef;
   Uint32 senderData = enableComReq->m_senderData;
   Uint32 nodes[NodeBitmask::Size];
   MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size);
 
   /* Enable communication with all our NDB blocks to these nodes. */
-  Uint32 search_from = 0;
+  Uint32 search_from = 1;
   for (;;)
   {
     Uint32 tStartingNode = NodeBitmask::find(nodes, search_from);
@@ -610,9 +617,11 @@ Trpman::execDUMP_STATE_ORD(Signal* signa
 TrpmanProxy::TrpmanProxy(Block_context & ctx) :
   LocalProxy(TRPMAN, ctx)
 {
-  addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
   addRecSignal(GSN_OPEN_COMORD, &TrpmanProxy::execOPEN_COMORD);
   addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ);
+  addRecSignal(GSN_ENABLE_COMCONF, &TrpmanProxy::execENABLE_COMCONF);
+  addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ);
+  addRecSignal(GSN_CLOSE_COMCONF, &TrpmanProxy::execCLOSE_COMCONF);
   addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD);
 }
 
@@ -628,43 +637,147 @@ TrpmanProxy::newWorker(Uint32 instanceNo
 
 BLOCK_FUNCTIONS(TrpmanProxy);
 
-/**
- * TODO TrpmanProxy need to have operation records
- *      to support splicing a request onto several Trpman-instances
- *      according to how receive-threads are assigned to instances
- */
+// GSN_OPEN_COMORD
+
 void
 TrpmanProxy::execOPEN_COMORD(Signal* signal)
 {
   jamEntry();
-  SectionHandle handle(this, signal);
-  sendSignal(workerRef(0), GSN_OPEN_COMORD, signal,
-             signal->getLength(), JBB, &handle);
+
+  for (Uint32 i = 0; i<c_workers; i++)
+  {
+    jam();
+    sendSignal(workerRef(i), GSN_OPEN_COMORD, signal,
+               signal->getLength(), JBB);
+  }
 }
 
+// GSN_CLOSE_COMREQ
+
 void
 TrpmanProxy::execCLOSE_COMREQ(Signal* signal)
 {
   jamEntry();
-  SectionHandle handle(this, signal);
-  sendSignal(workerRef(0), GSN_CLOSE_COMREQ, signal,
-             signal->getLength(), JBB, &handle);
+  Ss_CLOSE_COMREQ& ss = ssSeize<Ss_CLOSE_COMREQ>();
+  const CloseComReqConf* req = (const CloseComReqConf*)signal->getDataPtr();
+  ss.m_req = *req;
+  sendREQ(signal, ss);
+}
+
+void
+TrpmanProxy::sendCLOSE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
+{
+  jam();
+  Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
+  CloseComReqConf* req = (CloseComReqConf*)signal->getDataPtrSend();
+
+  *req = ss.m_req;
+  req->xxxBlockRef = reference();
+  req->failNo = ssId;
+  sendSignal(workerRef(ss.m_worker), GSN_CLOSE_COMREQ, signal,
+             CloseComReqConf::SignalLength, JBB);
+}
+
+void
+TrpmanProxy::execCLOSE_COMCONF(Signal* signal)
+{
+  const CloseComReqConf* conf = (const CloseComReqConf*)signal->getDataPtr();
+  Uint32 ssId = conf->failNo;
+  jamEntry();
+  Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
+  recvCONF(signal, ss);
+}
+
+void
+TrpmanProxy::sendCLOSE_COMCONF(Signal *signal, Uint32 ssId)
+{
+  jam();
+  Ss_CLOSE_COMREQ& ss = ssFind<Ss_CLOSE_COMREQ>(ssId);
+
+  if (!lastReply(ss))
+  {
+    jam();
+    return;
+  }
+
+  CloseComReqConf* conf = (CloseComReqConf*)signal->getDataPtrSend();
+  *conf = ss.m_req;
+  sendSignal(conf->xxxBlockRef, GSN_CLOSE_COMCONF, signal,
+             CloseComReqConf::SignalLength, JBB);
+  ssRelease<Ss_CLOSE_COMREQ>(ssId);
 }
 
+// GSN_ENABLE_COMREQ
+
 void
 TrpmanProxy::execENABLE_COMREQ(Signal* signal)
 {
   jamEntry();
-  SectionHandle handle(this, signal);
-  sendSignal(workerRef(0), GSN_ENABLE_COMREQ, signal,
-             signal->getLength(), JBB, &handle);
+  Ss_ENABLE_COMREQ& ss = ssSeize<Ss_ENABLE_COMREQ>();
+  const EnableComReq* req = (const EnableComReq*)signal->getDataPtr();
+  ss.m_req = *req;
+  sendREQ(signal, ss);
+}
+
+void
+TrpmanProxy::sendENABLE_COMREQ(Signal *signal, Uint32 ssId, SectionHandle*)
+{
+  jam();
+  Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
+  EnableComReq* req = (EnableComReq*)signal->getDataPtrSend();
+
+  *req = ss.m_req;
+  req->m_senderRef = reference();
+  req->m_senderData = ssId;
+  sendSignal(workerRef(ss.m_worker), GSN_ENABLE_COMREQ, signal,
+             EnableComReq::SignalLength, JBB);
 }
 
 void
+TrpmanProxy::execENABLE_COMCONF(Signal* signal)
+{
+  const EnableComConf* conf = (const EnableComConf*)signal->getDataPtr();
+  Uint32 ssId = conf->m_senderData;
+  jamEntry();
+  Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
+  recvCONF(signal, ss);
+}
+
+void
+TrpmanProxy::sendENABLE_COMCONF(Signal *signal, Uint32 ssId)
+{
+  jam();
+  Ss_ENABLE_COMREQ& ss = ssFind<Ss_ENABLE_COMREQ>(ssId);
+
+  if (!lastReply(ss))
+  {
+    jam();
+    return;
+  }
+
+  EnableComReq* conf = (EnableComReq*)signal->getDataPtr();
+  *conf = ss.m_req;
+  sendSignal(conf->m_senderRef, GSN_ENABLE_COMCONF, signal,
+             EnableComReq::SignalLength, JBB);
+  ssRelease<Ss_ENABLE_COMREQ>(ssId);
+}
+
+// GSN_ROUTE_ORD
+
+void
 TrpmanProxy::execROUTE_ORD(Signal* signal)
 {
+  RouteOrd* ord = (RouteOrd*)signal->getDataPtr();
+  Uint32 nodeId = ord->from;
   jamEntry();
+
+  ndbassert(nodeId != 0);
+#if MAX_NDBMT_RECEIVE_THREADS == 1
+  Uint32 workerId = 0;
+#else
+  Uint32 workerId = globalTransporterRegistry.getReceiveThreadId(nodeId);
+#endif
   SectionHandle handle(this, signal);
-  sendSignal(workerRef(0), GSN_ROUTE_ORD, signal,
+  sendSignal(workerRef(workerId), GSN_ROUTE_ORD, signal,
              signal->getLength(), JBB, &handle);
 }

=== modified file 'storage/ndb/src/kernel/blocks/trpman.hpp'
--- a/storage/ndb/src/kernel/blocks/trpman.hpp	2012-01-10 10:59:37 +0000
+++ b/storage/ndb/src/kernel/blocks/trpman.hpp	2012-01-10 13:54:55 +0000
@@ -21,6 +21,8 @@
 #include <pc.hpp>
 #include <SimulatedBlock.hpp>
 #include <LocalProxy.hpp>
+#include <signaldata/EnableCom.hpp>
+#include <signaldata/CloseComReqConf.hpp>
 
 class Trpman : public SimulatedBlock
 {
@@ -42,7 +44,8 @@ public:
   void execNDB_TAMPER(Signal*);
   void execDUMP_STATE_ORD(Signal*);
 protected:
-
+private:
+  bool handles_this_node(Uint32 nodeId);
 };
 
 class TrpmanProxy : public LocalProxy
@@ -52,14 +55,50 @@ public:
   virtual ~TrpmanProxy();
   BLOCK_DEFINES(TrpmanProxy);
 
-  void execCLOSE_COMREQ(Signal *signal);
+  // GSN_OPEN_COMORD
   void execOPEN_COMORD(Signal *signal);
+
+  // GSN_CLOSE_COMREQ
+  struct Ss_CLOSE_COMREQ : SsParallel {
+    CloseComReqConf m_req;
+    Ss_CLOSE_COMREQ() {
+      m_sendREQ = (SsFUNCREQ)&TrpmanProxy::sendCLOSE_COMREQ;
+      m_sendCONF = (SsFUNCREP)&TrpmanProxy::sendCLOSE_COMCONF;
+    }
+    enum { poolSize = MAX_NODES };
+    static SsPool<Ss_CLOSE_COMREQ>& pool(LocalProxy* proxy) {
+      return ((TrpmanProxy*)proxy)->c_ss_CLOSE_COMREQ;
+    }
+  };
+  SsPool<Ss_CLOSE_COMREQ> c_ss_CLOSE_COMREQ;
+  void execCLOSE_COMREQ(Signal *signal);
+  void sendCLOSE_COMREQ(Signal*, Uint32 ssId, SectionHandle*);
+  void execCLOSE_COMCONF(Signal *signal);
+  void sendCLOSE_COMCONF(Signal*, Uint32 ssId);
+
+  // GSN_ENABLE_COMREQ
+  struct Ss_ENABLE_COMREQ : SsParallel {
+    EnableComReq m_req;
+    Ss_ENABLE_COMREQ() {
+      m_sendREQ = (SsFUNCREQ)&TrpmanProxy::sendENABLE_COMREQ;
+      m_sendCONF = (SsFUNCREP)&TrpmanProxy::sendENABLE_COMCONF;
+    }
+    enum { poolSize = MAX_NODES };
+    static SsPool<Ss_ENABLE_COMREQ>& pool(LocalProxy* proxy) {
+      return ((TrpmanProxy*)proxy)->c_ss_ENABLE_COMREQ;
+    }
+  };
+  SsPool<Ss_ENABLE_COMREQ> c_ss_ENABLE_COMREQ;
   void execENABLE_COMREQ(Signal *signal);
-  void execROUTE_ORD(Signal* signal);
+  void sendENABLE_COMREQ(Signal*, Uint32 ssId, SectionHandle*);
+  void execENABLE_COMCONF(Signal *signal);
+  void sendENABLE_COMCONF(Signal*, Uint32 ssId);
 
+  void execROUTE_ORD(Signal* signal);
   void execNDB_TAMPER(Signal*);
   void execDUMP_STATE_ORD(Signal*);
 protected:
   virtual SimulatedBlock* newWorker(Uint32 instanceNo);
 };
+
 #endif

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	2011-11-22 20:11:29 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	2012-01-10 18:21:38 +0000
@@ -470,6 +470,11 @@ Configuration::setupConfiguration(){
       break;
 
     globalData.ndbMtTcThreads = m_thr_config.getThreadCount(THRConfig::T_TC);
+    globalData.ndbMtSendThreads =
+      m_thr_config.getThreadCount(THRConfig::T_SEND);
+    globalData.ndbMtReceiveThreads =
+      m_thr_config.getThreadCount(THRConfig::T_RECV);
+
     globalData.isNdbMtLqh = true;
     {
       if (m_thr_config.getMtClassic())

=== modified file 'storage/ndb/src/kernel/vm/GlobalData.hpp'
--- a/storage/ndb/src/kernel/vm/GlobalData.hpp	2011-11-14 12:02:56 +0000
+++ b/storage/ndb/src/kernel/vm/GlobalData.hpp	2012-01-10 13:54:55 +0000
@@ -41,10 +41,10 @@ enum  restartStates {initial_state,
                      perform_stop};
 
 struct GlobalData {
+  NodeInfo   m_nodeInfo[MAX_NODES];   // At top to ensure cache alignment
+  Signal     VMSignals[1];            // Owned by FastScheduler::
   Uint32     m_restart_seq;           // 
   NodeVersionInfo m_versionInfo;
-  NodeInfo   m_nodeInfo[MAX_NODES];
-  Signal     VMSignals[1];            // Owned by FastScheduler::
   
   Uint64     internalMillisecCounter; // Owned by ThreadConfig::
   Uint32     highestAvailablePrio;    // Owned by FastScheduler::
@@ -75,6 +75,8 @@ struct GlobalData {
   Uint32     ndbMtLqhWorkers;
   Uint32     ndbMtLqhThreads;
   Uint32     ndbMtTcThreads;
+  Uint32     ndbMtSendThreads;
+  Uint32     ndbMtReceiveThreads;
   Uint32     ndbLogParts;
   
   GlobalData(){ 
@@ -86,6 +88,8 @@ struct GlobalData {
     ndbMtLqhWorkers = 0;
     ndbMtLqhThreads = 0;
     ndbMtTcThreads = 0;
+    ndbMtSendThreads = 0;
+    ndbMtReceiveThreads = 0;
     ndbLogParts = 0;
 #ifdef GCP_TIMER_HACK
     gcp_timer_limit = 0;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster-7.2 branch (jonas.oreland:3757 to 3758) jonas oreland10 Jan