List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:December 1 2008 6:07pm
Subject:bzr commit into mysql-5.1 branch (pekka:3145) WL#4391
View as plain text  
#At file:///export/space/pekka/ndb/version/my51-wl4391/

 3145 Pekka Nousiainen	2008-12-01
      wl#4391 33e_mixed.diff
      mixed - SR
modified:
  storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp
  storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-01 18:06:29 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2008-12-01 18:06:58 +0000
@@ -15302,10 +15302,15 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
      *    WE NEED TO SEND THOSE SIGNALS EVEN IF WE HAVE NOT REQUESTED 
      *    ANY FRAGMENTS PARTICIPATE IN THIS PHASE.
      * --------------------------------------------------------------------- */
-    BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
-    NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
     signal->theData[0] = cownNodeid;
-    sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+    if (!isNdbMtLqh()) {
+      NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+      sendSignal(rg, GSN_EXEC_SRREQ, signal, 1, JBB);
+    } else {
+      const Uint32 sz = NdbNodeBitmask::Size;
+      m_sr_nodes.copyto(sz, &signal->theData[1]);
+      sendSignal(DBLQH_REF, GSN_EXEC_SRREQ, signal, 1 + sz, JBB);
+    }
     return;
   } else {
     jam();
@@ -15319,7 +15324,9 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
       
       Uint32 index = csrPhasesCompleted;
       arrGuard(index, MAX_LOG_EXEC);
-      BlockReference ref = calcInstanceBlockRef(DBLQH, fragptr.p->srLqhLognode[index]);
+      Uint32 Tnode = fragptr.p->srLqhLognode[index];
+      Uint32 instanceKey = fragptr.p->lqhInstanceKey;
+      BlockReference ref = numberToRef(DBLQH, instanceKey, Tnode);
       fragptr.p->srStatus = Fragrecord::SS_STARTED;
 
       /* --------------------------------------------------------------------
@@ -15336,7 +15343,6 @@ void Dblqh::execSTART_EXEC_SR(Signal* si
       execFragReq->lastGci = fragptr.p->srLastGci[index];
       sendSignal(ref, GSN_EXEC_FRAGREQ, signal, 
 		 ExecFragReq::SignalLength, JBB);
-
     }
     signal->theData[0] = next;
     sendSignal(cownref, GSN_START_EXEC_SR, signal, 1, JBB);
@@ -15426,6 +15432,13 @@ void Dblqh::execEXEC_FRAGREF(Signal* sig
 /* *************** */
 void Dblqh::execEXEC_SRCONF(Signal* signal) 
 {
+  // wl4391_todo workaround until timing fixed
+  if (cnoOutstandingExecFragReq != 0) {
+    ndbout << "delay:" << V(cnoOutstandingExecFragReq) << "\n";
+    sendSignalWithDelay(reference(), GSN_EXEC_SRCONF,
+                        signal, 10, signal->getLength());
+    return;
+  }
   jamEntry();
   Uint32 nodeId = signal->theData[0];
   arrGuard(nodeId, MAX_NDB_NODES);
@@ -16791,9 +16804,14 @@ void Dblqh::srPhase3Comp(Signal* signal)
   jamEntry();
 
   signal->theData[0] = cownNodeid;
-  BlockNumber lqhBlockNo = numberToBlock(DBLQH, instance());
-  NodeReceiverGroup rg(lqhBlockNo, m_sr_nodes);
-  sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+  if (!isNdbMtLqh()) {
+    NodeReceiverGroup rg(DBLQH, m_sr_nodes);
+    sendSignal(rg, GSN_EXEC_SRCONF, signal, 1, JBB);
+  } else {
+    const Uint32 sz = NdbNodeBitmask::Size;
+    m_sr_nodes.copyto(sz, &signal->theData[1]);
+    sendSignal(DBLQH_REF, GSN_EXEC_SRCONF, signal, 1 + sz, JBB);
+  }
   return;
 }//Dblqh::srPhase3Comp()
 

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.cpp	2008-12-01 18:06:58 +0000
@@ -84,6 +84,10 @@ DblqhProxy::DblqhProxy(Block_context& ct
 
   // GSN_SUB_GCP_COMPLETE_REP
   addRecSignal(GSN_SUB_GCP_COMPLETE_REP, &DblqhProxy::execSUB_GCP_COMPLETE_REP);
+
+  // GSN_EXEC_SRREQ
+  addRecSignal(GSN_EXEC_SRREQ, &DblqhProxy::execEXEC_SRREQ);
+  addRecSignal(GSN_EXEC_SRCONF, &DblqhProxy::execEXEC_SRCONF);
 }
 
 DblqhProxy::~DblqhProxy()
@@ -1329,4 +1333,112 @@ DblqhProxy::sendEMPTY_LCP_CONF(Signal* s
   ssRelease<Ss_EMPTY_LCP_REQ>(ssId);
 }
 
+// GSN_EXEC_SR_1 [fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SRREQ(Signal* signal)
+{
+  const BlockReference senderRef = signal->getSendersBlockRef();
+
+  if (refToInstance(senderRef) != 0) {
+    jam();
+    execEXEC_SR_2(signal, GSN_EXEC_SRREQ);
+    return;
+  }
+
+  execEXEC_SR_1(signal, GSN_EXEC_SRREQ);
+}
+
+void
+DblqhProxy::execEXEC_SRCONF(Signal* signal)
+{
+  const BlockReference senderRef = signal->getSendersBlockRef();
+
+  if (refToInstance(senderRef) != 0) {
+    jam();
+    execEXEC_SR_2(signal, GSN_EXEC_SRCONF);
+    return;
+  }
+
+  execEXEC_SR_1(signal, GSN_EXEC_SRCONF);
+}
+
+void
+DblqhProxy::execEXEC_SR_1(Signal* signal, GlobalSignalNumber gsn)
+{
+  ndbrequire(signal->getLength() == Ss_EXEC_SR_1::Sig::SignalLength);
+
+  const Ss_EXEC_SR_1::Sig* sig =
+    (const Ss_EXEC_SR_1::Sig*)signal->getDataPtr();
+  Uint32 ssId = getSsId(sig);
+  Ss_EXEC_SR_1& ss = ssSeize<Ss_EXEC_SR_1>(ssId);
+  ss.m_gsn = gsn;
+  ss.m_sig = *sig;
+
+  sendREQ(signal, ss);
+  ssRelease<Ss_EXEC_SR_1>(ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_1(Signal* signal, Uint32 ssId)
+{
+  Ss_EXEC_SR_1& ss = ssFind<Ss_EXEC_SR_1>(ssId);
+  signal->theData[0] = ss.m_sig.nodeId;
+  sendSignal(workerRef(ss.m_worker), ss.m_gsn, signal, 1, JBB);
+}
+
+// GSN_EXEC_SRREQ_2 [ fictional gsn ]
+
+void
+DblqhProxy::execEXEC_SR_2(Signal* signal, GlobalSignalNumber gsn)
+{
+  ndbrequire(signal->getLength() == Ss_EXEC_SR_2::Sig::SignalLength);
+
+  const Ss_EXEC_SR_2::Sig* sig =
+    (const Ss_EXEC_SR_2::Sig*)signal->getDataPtr();
+  Uint32 ssId = getSsId(sig);
+
+  bool found = false;
+  Ss_EXEC_SR_2& ss = ssFindSeize<Ss_EXEC_SR_2>(ssId, &found);
+  if (!found) {
+    jam();
+    setMask(ss);
+  }
+
+  ndbrequire(sig->nodeId == getOwnNodeId());
+  if (ss.m_sigcount == 0) {
+    jam();
+    ss.m_gsn = gsn;
+    ss.m_sig = *sig;
+  } else {
+    jam();
+    ndbrequire(ss.m_gsn == gsn);
+    ndbrequire(memcmp(&ss.m_sig, sig, sizeof(*sig)) == 0);
+  }
+  ss.m_sigcount++;
+
+  // reversed roles
+  recvCONF(signal, ss);
+}
+
+void
+DblqhProxy::sendEXEC_SR_2(Signal* signal, Uint32 ssId)
+{
+  Ss_EXEC_SR_2& ss = ssFind<Ss_EXEC_SR_2>(ssId);
+
+  if (!lastReply(ss)) {
+    jam();
+    return;
+  }
+
+  NodeBitmask nodes;
+  nodes.assign(NdbNodeBitmask::Size, ss.m_sig.sr_nodes);
+  NodeReceiverGroup rg(DBLQH, nodes);
+
+  signal->theData[0] = ss.m_sig.nodeId;
+  sendSignal(rg, ss.m_gsn, signal, 1, JBB);
+
+  ssRelease<Ss_EXEC_SR_2>(ssId);
+}
+
 BLOCK_FUNCTIONS(DblqhProxy)

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-11-16 15:28:22 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhProxy.hpp	2008-12-01 18:06:58 +0000
@@ -380,9 +380,7 @@ protected:
 
   // GSN_START_RECREQ_2 [ sub-op, fictional gsn ]
   struct Ss_START_RECREQ_2 : SsParallel {
-#ifdef VM_TRACE
     static const char* name() { return "START_RECREQ_2"; }
-#endif
     struct Req {
       enum { SignalLength = 2 };
       Uint32 lcpId;
@@ -458,6 +456,71 @@ protected:
   void execEMPTY_LCP_CONF(Signal*);
   void execEMPTY_LCP_REF(Signal*);
   void sendEMPTY_LCP_CONF(Signal*, Uint32 ssId);
+
+  // GSN_EXEC_SR_1 [ fictional gsn ]
+  struct Ss_EXEC_SR_1 : SsParallel {
+    /*
+     * Handle EXEC_SRREQ and EXEC_SRCONF.  These are broadcast
+     * signals (not REQ/CONF).  EXEC_SR_1 receives one signal and
+     * sends it to its workers.  EXEC_SR_2 waits for signal from
+     * all workers and broadcasts it to all nodes.  These are
+     * required to handle mixed versions (non-mt, mt-lqh-1,2,4).
+     */
+    static const char* name() { return "EXEC_SR_1"; }
+    struct Sig {
+      enum { SignalLength = 1 };
+      Uint32 nodeId;
+    };
+    GlobalSignalNumber m_gsn;
+    Sig m_sig;
+    Ss_EXEC_SR_1() {
+      m_sendREQ = (SsFUNC)&DblqhProxy::sendEXEC_SR_1;
+      m_sendCONF = (SsFUNC)0;
+      m_gsn = 0;
+    };
+    enum { poolSize = 1 };
+    static SsPool<Ss_EXEC_SR_1>& pool(LocalProxy* proxy) {
+      return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_1;
+    }
+  };
+  SsPool<Ss_EXEC_SR_1> c_ss_EXEC_SR_1;
+  Uint32 getSsId(const Ss_EXEC_SR_1::Sig* sig) {
+    return SsIdBase | refToNode(sig->nodeId);
+  };
+  void execEXEC_SRREQ(Signal*);
+  void execEXEC_SRCONF(Signal*);
+  void execEXEC_SR_1(Signal*, GlobalSignalNumber gsn);
+  void sendEXEC_SR_1(Signal*, Uint32 ssId);
+
+  // GSN_EXEC_SR_2 [ fictional gsn ]
+  struct Ss_EXEC_SR_2 : SsParallel {
+    static const char* name() { return "EXEC_SR_2"; }
+    struct Sig {
+      enum { SignalLength = 1 + NdbNodeBitmask::Size };
+      Uint32 nodeId;
+      Uint32 sr_nodes[NdbNodeBitmask::Size]; // local signal so ok to add
+    };
+    GlobalSignalNumber m_gsn;
+    Uint32 m_sigcount;
+    Sig m_sig; // all signals must be identical
+    Ss_EXEC_SR_2() {
+      // reversed roles
+      m_sendREQ = (SsFUNC)0;
+      m_sendCONF = (SsFUNC)&DblqhProxy::sendEXEC_SR_2;
+      m_gsn = 0;
+      m_sigcount = 0;
+    };
+    enum { poolSize = 1 };
+    static SsPool<Ss_EXEC_SR_2>& pool(LocalProxy* proxy) {
+      return ((DblqhProxy*)proxy)->c_ss_EXEC_SR_2;
+    }
+  };
+  SsPool<Ss_EXEC_SR_2> c_ss_EXEC_SR_2;
+  Uint32 getSsId(const Ss_EXEC_SR_2::Sig* sig) {
+    return SsIdBase | refToNode(sig->nodeId);
+  };
+  void execEXEC_SR_2(Signal*, GlobalSignalNumber gsn);
+  void sendEXEC_SR_2(Signal*, Uint32 ssId);
 };
 
 #endif

Thread
bzr commit into mysql-5.1 branch (pekka:3145) WL#4391Pekka Nousiainen1 Dec