List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:March 12 2009 6:52am
Subject:bzr commit into mysql-5.1-telco-7.0 branch (jonas:2934) Bug#43108
View as plain text  
#At file:///home/jonas/src/telco-6.4/

 2934 Jonas Oreland	2009-03-12
      ndb - bug#43108 - part I - remove LocalProxy::Node, send LCP_*_REP using local DIH instead
modified:
  storage/ndb/include/kernel/signaldata/LCP.hpp
  storage/ndb/src/kernel/blocks/LocalProxy.cpp
  storage/ndb/src/kernel/blocks/LocalProxy.hpp
  storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp

=== modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/LCP.hpp	2009-03-11 17:37:23 +0000
+++ b/storage/ndb/include/kernel/signaldata/LCP.hpp	2009-03-12 06:52:39 +0000
@@ -123,6 +123,7 @@ struct LcpFragRep {
   friend bool printLCP_FRAG_REP(FILE *, const Uint32 *, Uint32, Uint16);  
 
   STATIC_CONST( SignalLength = 7 );
+  STATIC_CONST( BROADCAST_REQ = 0 );
 
   Uint32 nodeId;
   Uint32 lcpId;

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2009-02-23 14:05:03 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.cpp	2009-03-12 06:52:39 +0000
@@ -17,8 +17,7 @@
 #include "LocalProxy.hpp"
 
 LocalProxy::LocalProxy(BlockNumber blockNumber, Block_context& ctx) :
-  SimulatedBlock(blockNumber, ctx),
-  c_nodeList(c_nodePool)
+  SimulatedBlock(blockNumber, ctx)
 {
   BLOCK_CONSTRUCTOR(LocalProxy);
 
@@ -34,7 +33,6 @@ LocalProxy::LocalProxy(BlockNumber block
 
   c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
   c_masterNodeId = ZNIL;
-  c_nodePool.setSize(MAX_NDB_NODES);
 
   // GSN_READ_CONFIG_REQ
   addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ, true);
@@ -486,33 +484,6 @@ LocalProxy::execREAD_NODESCONF(Signal* s
 
   const ReadNodesConf* conf = (const ReadNodesConf*)signal->getDataPtr();
 
-  ndbrequire(c_nodePool.getNoOfFree() == c_nodePool.getSize());
-  Uint32 count = 0;
-  Uint32 i;
-  for (i = 0; i < MAX_NDB_NODES; i++) {
-    if (NdbNodeBitmask::get(conf->allNodes, i)) {
-      jam();
-      count++;
-
-      NodePtr nodePtr;
-      bool ok = c_nodePool.seize(nodePtr);
-      ndbrequire(ok);
-      new (nodePtr.p) Node;
-
-      nodePtr.p->m_nodeId = i;
-      if (NdbNodeBitmask::get(conf->inactiveNodes, i)) {
-        jam();
-        nodePtr.p->m_alive = false;
-      } else {
-        jam();
-        nodePtr.p->m_alive = true;
-      }
-
-      c_nodeList.addLast(nodePtr);
-    }
-  }
-  ndbrequire(count != 0 && count == conf->noOfNodes);
-
   c_masterNodeId = conf->masterNodeId;
 
   switch (ss.m_gsn) {
@@ -551,20 +522,6 @@ LocalProxy::execNODE_FAILREP(Signal* sig
   NdbNodeBitmask mask;
   mask.assign(NdbNodeBitmask::Size, req->theNodes);
 
-  // proxy itself
-  NodePtr nodePtr;
-  c_nodeList.first(nodePtr);
-  ndbrequire(nodePtr.i != RNIL);
-  while (nodePtr.i != RNIL)
-  {
-    if (NdbNodeBitmask::get(req->theNodes, nodePtr.p->m_nodeId))
-    {
-      jam();
-      nodePtr.p->m_alive = false;
-    }
-    c_nodeList.next(nodePtr);
-  }
-
   // from each worker wait for ack for each failed node
   for (Uint32 i = 0; i < c_workers; i++)
   {
@@ -653,20 +610,6 @@ LocalProxy::execINCL_NODEREQ(Signal* sig
   ndbrequire(sizeof(ss.m_req) >= (ss.m_reqlength << 2));
   memcpy(&ss.m_req, signal->getDataPtr(), ss.m_reqlength << 2);
 
-  // proxy itself
-  NodePtr nodePtr;
-  c_nodeList.first(nodePtr);
-  ndbrequire(nodePtr.i != RNIL);
-  while (nodePtr.i != RNIL) {
-    jam();
-    if (ss.m_req.inclNodeId == nodePtr.p->m_nodeId) {
-      jam();
-      ndbrequire(!nodePtr.p->m_alive);
-      nodePtr.p->m_alive = true;
-    }
-    c_nodeList.next(nodePtr);
-  }
-
   sendREQ(signal, ss);
 }
 

=== modified file 'storage/ndb/src/kernel/blocks/LocalProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2009-01-30 13:07:48 +0000
+++ b/storage/ndb/src/kernel/blocks/LocalProxy.hpp	2009-03-12 06:52:39 +0000
@@ -304,23 +304,6 @@ protected:
   Uint32 c_typeOfStart;
   Uint32 c_masterNodeId;
 
-  struct Node {
-    Uint32 m_nodeId;
-    bool m_alive;
-    Node() {
-      m_nodeId = 0;
-      m_alive = false;
-    }
-    Uint32 nextList;
-    union {
-    Uint32 prevList;
-    Uint32 nextPool;
-    };
-  };
-  typedef Ptr<Node> NodePtr;
-  ArrayPool<Node> c_nodePool;
-  DLFifoList<Node> c_nodeList;
-
   // GSN_READ_CONFIG_REQ
   struct Ss_READ_CONFIG_REQ : SsSequential {
     ReadConfigReq m_req;

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2009-03-12 05:42:38 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2009-03-12 06:52:39 +0000
@@ -12030,6 +12030,32 @@ void Dbdih::sendLastLCP_FRAG_ORD(Signal*
 void Dbdih::execLCP_FRAG_REP(Signal* signal) 
 {
   jamEntry();
+
+  LcpFragRep * const lcpReport = (LcpFragRep *)&signal->theData[0];
+
+  /**
+   * Proxing LCP_FRAG_REP
+   */
+  const bool broadcast_req = lcpReport->nodeId == LcpFragRep::BROADCAST_REQ;
+  if (broadcast_req)
+  {
+    jam();
+    ndbrequire(refToNode(signal->getSendersBlockRef()) == getOwnNodeId());
+
+    /**
+     * Set correct nodeId
+     */
+    lcpReport->nodeId = getOwnNodeId();
+
+    NodeReceiverGroup rg(DBDIH, c_lcpState.m_participatingDIH);
+    rg.m_nodes.clear(getOwnNodeId());
+    sendSignal(rg, GSN_LCP_FRAG_REP, signal, signal->getLength(), JBB);  
+
+    /**
+     * and continue processing
+     */
+  }
+
   ndbrequire(c_lcpState.lcpStatus != LCP_STATUS_IDLE);
   
 #if 0
@@ -12038,7 +12064,6 @@ void Dbdih::execLCP_FRAG_REP(Signal* sig
 		    signal->length(), number());
 #endif  
 
-  LcpFragRep * const lcpReport = (LcpFragRep *)&signal->theData[0];
   Uint32 nodeId = lcpReport->nodeId;
   Uint32 tableId = lcpReport->tableId;
   Uint32 fragId = lcpReport->fragId;
@@ -12079,7 +12104,7 @@ void Dbdih::execLCP_FRAG_REP(Signal* sig
   CRASH_INSERTION2(7016, !isMaster());
   CRASH_INSERTION2(7191, (!isMaster() && tableId));
 
-  bool fromTimeQueue = (signal->senderBlockRef() == reference());
+  bool fromTimeQueue = (signal->senderBlockRef()==reference()&&!broadcast_req);
   
   TabRecordPtr tabPtr;
   tabPtr.i = tableId;
@@ -12596,6 +12621,26 @@ void Dbdih::execLCP_COMPLETE_REP(Signal*
 #endif
 
   LcpCompleteRep * rep = (LcpCompleteRep*)signal->getDataPtr();
+
+  if (rep->nodeId == LcpFragRep::BROADCAST_REQ)
+  {
+    jam();
+    ndbrequire(refToNode(signal->getSendersBlockRef()) == getOwnNodeId());
+    
+    /**
+     * Set correct nodeId
+     */
+    rep->nodeId = getOwnNodeId();
+
+    NodeReceiverGroup rg(DBDIH, c_lcpState.m_participatingDIH);
+    rg.m_nodes.clear(getOwnNodeId());
+    sendSignal(rg, GSN_LCP_COMPLETE_REP, signal, signal->getLength(), JBB);  
+    
+    /**
+     * and continue processing
+     */
+  }
+  
   Uint32 lcpId = rep->lcpId;
   Uint32 nodeId = rep->nodeId;
   Uint32 blockNo = rep->blockNo;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-03-11 16:24:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-03-12 06:52:39 +0000
@@ -12402,22 +12402,15 @@ Dblqh::sendLCP_FRAG_REP(Signal * signal,
   lcpReport->maxGciCompleted = fragPtrP->maxGciCompletedInLcp;
   lcpReport->maxGciStarted = fragPtrP->maxGciInLcp;
   
-  if (!isNdbMtLqh()) {
-    for (Uint32 i = 0; i < cnoOfNodes; i++) {
-      jam();
-      Uint32 nodeId = cnodeData[i];
-      if(cnodeStatus[i] == ZNODE_UP){
-        jam();
-        BlockReference Tblockref = calcDihBlockRef(nodeId);
-        sendSignal(Tblockref, GSN_LCP_FRAG_REP, signal, 
-                   LcpFragRep::SignalLength, JBB);
-      }//if
-    }//for
-  } else {
+  Uint32 ref = DBDIH_REF;
+  if (isNdbMtLqh())
+  {
     jam();
-    sendSignal(DBLQH_REF, GSN_LCP_FRAG_REP, signal,
-               LcpFragRep::SignalLength, JBB);
+    ref = DBLQH_REF;
   }
+  lcpReport->nodeId = LcpFragRep::BROADCAST_REQ;
+  sendSignal(ref, GSN_LCP_FRAG_REP, signal,
+             LcpFragRep::SignalLength, JBB);
 }
 
 void Dblqh::contChkpNextFragLab(Signal* signal) 
@@ -12646,29 +12639,22 @@ void Dblqh::sendLCP_COMPLETE_REP(Signal*
   rep->lcpId = lcpId;
   rep->blockNo = DBLQH;
   
-  if (!isNdbMtLqh()) {
-    for (Uint32 i = 0; i < cnoOfNodes; i++) {
-      jam();
-      Uint32 nodeId = cnodeData[i];
-      if(cnodeStatus[i] == ZNODE_UP){
-        jam();
-        
-        BlockReference blockref = calcDihBlockRef(nodeId);
-        sendSignal(blockref, GSN_LCP_COMPLETE_REP, signal, 
-                   LcpCompleteRep::SignalLength, JBB);
-      }//if
-    }//for
-  } else {
+  Uint32 ref = DBDIH_REF;
+  if (isNdbMtLqh())
+  {
     jam();
-    sendSignal(DBLQH_REF, GSN_LCP_COMPLETE_REP, signal,
-               LcpCompleteRep::SignalLength, JBB);
+    ref = DBLQH_REF;
   }
+  rep->nodeId = LcpFragRep::BROADCAST_REQ;
 
+  sendSignal(ref, GSN_LCP_COMPLETE_REP, signal,
+             LcpCompleteRep::SignalLength, JBB);
+  
   if(lcpPtr.p->reportEmpty){
     jam();
     sendEMPTY_LCP_CONF(signal, true);
   }
-
+  
   if (cstartRecReq < SRR_FIRST_LCP_DONE)
   {
     jam();

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-12-22 09:40:33 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2009-03-12 06:52:39 +0000
@@ -469,7 +469,7 @@ DblqhProxy::execLCP_FRAG_REP(Signal* sig
 {
   ndbrequire(signal->getLength() == LcpFragRep::SignalLength);
 
-  const LcpFragRep* conf = (const LcpFragRep*)signal->getDataPtr();
+  LcpFragRep* conf = (LcpFragRep*)signal->getDataPtr();
   Uint32 ssId = getSsId(conf);
   Ss_LCP_FRAG_ORD& ss = ssFind<Ss_LCP_FRAG_ORD>(ssId);
 
@@ -479,19 +479,13 @@ DblqhProxy::execLCP_FRAG_REP(Signal* sig
   c_lcpRecord.m_frags++;
   D("LCP: rep" << V(conf->lcpId) << V(c_lcpRecord.m_frags));
 
-  NodePtr nodePtr;
-  c_nodeList.first(nodePtr);
-  ndbrequire(nodePtr.i != RNIL);
-  while (nodePtr.i != RNIL) {
-    if (nodePtr.p->m_alive) {
-      jam();
-      Uint32 nodeId = nodePtr.p->m_nodeId;
-      BlockReference dihRef = calcDihBlockRef(nodeId);
-      sendSignal(dihRef, GSN_LCP_FRAG_REP,
-                 signal, LcpFragRep::SignalLength, JBB);
-    }
-    c_nodeList.next(nodePtr);
-  }
+  /**
+   * But instead of broadcasting to all DIH's
+   *   send to local that will do the broadcast
+   */
+  conf->nodeId = LcpFragRep::BROADCAST_REQ;
+  sendSignal(DBDIH_REF, GSN_LCP_FRAG_REP,
+             signal, LcpFragRep::SignalLength, JBB);
 }
 
 // GSN_LCP_COMPLETE_ORD [ sub-op, fictional gsn ]
@@ -578,24 +572,13 @@ DblqhProxy::sendLCP_COMPLETE_REP(Signal*
     }
   }
 
-  NodePtr nodePtr;
-  c_nodeList.first(nodePtr);
-  ndbrequire(nodePtr.i != RNIL);
-  while (nodePtr.i != RNIL) {
-    if (nodePtr.p->m_alive) {
-      jam();
-      Uint32 nodeId = nodePtr.p->m_nodeId;
-      BlockReference dihRef = calcDihBlockRef(nodeId);
-
-      LcpCompleteRep* conf = (LcpCompleteRep*)signal->getDataPtrSend();
-      conf->nodeId = getOwnNodeId();
-      conf->blockNo = DBLQH;
-      conf->lcpId = ss.m_req.lcpId;
-      sendSignal(dihRef, GSN_LCP_COMPLETE_REP,
-                 signal, LcpCompleteRep::SignalLength, JBB);
-    }
-    c_nodeList.next(nodePtr);
-  }
+  
+  LcpCompleteRep* conf = (LcpCompleteRep*)signal->getDataPtrSend();
+  conf->nodeId = LcpFragRep::BROADCAST_REQ;
+  conf->blockNo = DBLQH;
+  conf->lcpId = ss.m_req.lcpId;
+  sendSignal(DBDIH_REF, GSN_LCP_COMPLETE_REP,
+             signal, LcpCompleteRep::SignalLength, JBB);
 
   for (i = 0; i < ss.BlockCnt; i++) {
     jam();

Thread
bzr commit into mysql-5.1-telco-7.0 branch (jonas:2934) Bug#43108Jonas Oreland12 Mar