MySQL Lists are EOL. Please join:

List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:November 6 2009 12:12pm
Subject:bzr push into mysql-5.1-telco-6.3 branch (jonas:3158 to 3159)
View as plain text  
 3159 Jonas Oreland	2009-11-06
      lcp-cleanup (commit for autotest)
      
      This patch changes LCP to adhere to the T-protocol design.
      I.e distributed communication is only between same blocks
        (in this case DIH <-> DIH) and inter-block communication
        is node-local. This design is best for non-super-performance
        protocols, i.e all but transactions.
      
      In this case, it changes so that
      - LCP_FRAG_ORD is sent to remote DIH (instead of remote LQH)
        which forwards to local LQH
      
      - LCP_FRAG_REP is sent to local LQH (instead of remote DIH)
        which forwards to remote DIHs
      
      - LCP_COMPLETE_REP is sent to local LQH (instead of remote DIH)
        which forwards to remote DIHs
      
      - EMPTY_LCP_REQ is sent to remote DIH (instead of remote LQH)
      - EMPTY_LCP_REP to local DIH (instead of remote DIH)
      
      This work acomplishes the follwing:
      1) it aligns code with 7.0 (which has done several of these changes due to MT)
      2) it prepares code for a future L-LCP, in the sense that handling upgrade
         will be easier (LQH does not need to be change at all)
      
      The LCP_FRAG_REP/EMPTY_LCP_REQ is made conditional on ndbd versions
      (i.e handles upgrades)
      
      ---
       storage/ndb/include/kernel/GlobalSignalNumbers.h   |    1 
       storage/ndb/include/kernel/signaldata/EmptyLcp.hpp |   13 ++
       storage/ndb/include/kernel/signaldata/LCP.hpp      |    2 
       storage/ndb/include/ndb_version.h.in               |   25 +++++
       storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp      |    3 
       storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp  |    3 
       storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp  |   99 ++++++++++++++++++++-
       storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp  |   52 ++---------
       8 files changed, 156 insertions(+), 42 deletions(-)

    modified:
      storage/ndb/include/kernel/GlobalSignalNumbers.h
      storage/ndb/include/kernel/signaldata/EmptyLcp.hpp
      storage/ndb/include/kernel/signaldata/LCP.hpp
      storage/ndb/include/ndb_version.h.in
      storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
 3158 Jonas Oreland	2009-11-05
      ndb - bug#48584 - fix master lcp take-over bug

    modified:
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
=== modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h'
--- a/storage/ndb/include/kernel/GlobalSignalNumbers.h	2009-10-12 06:21:54 +0000
+++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h	2009-11-06 12:11:52 +0000
@@ -317,6 +317,7 @@ extern const GlobalSignalNumber NO_OF_SI
 
 #define GSN_EMPTY_LCP_REQ               223
 #define GSN_EMPTY_LCP_CONF              224
+#define GSN_EMPTY_LCP_REP               430 /* local */
 
 #define GSN_SCHEMA_INFO                 225
 #define GSN_SCHEMA_INFOCONF             226

=== modified file 'storage/ndb/include/kernel/signaldata/EmptyLcp.hpp'
--- a/storage/ndb/include/kernel/signaldata/EmptyLcp.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/EmptyLcp.hpp	2009-11-06 12:11:52 +0000
@@ -76,4 +76,17 @@ private:
   Uint32 idle;
 };
 
+/**
+ * This is a envelope signal
+ *   sent from LQH to local DIH, that will forward it as a
+ *   EMPTY_LCP_CONF to avoid race condition with LCP_FRAG_REP
+ *   which is now routed via local DIH
+ */
+struct EmptyLcpRep
+{
+  STATIC_CONST( SignalLength = NdbNodeBitmask::Size );
+  Uint32 receiverGroup[NdbNodeBitmask::Size];
+  Uint32 conf[EmptyLcpConf::SignalLength];
+};
+
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/LCP.hpp'
--- a/storage/ndb/include/kernel/signaldata/LCP.hpp	2009-05-27 12:11:46 +0000
+++ b/storage/ndb/include/kernel/signaldata/LCP.hpp	2009-11-06 12:11:52 +0000
@@ -123,6 +123,7 @@ struct LcpFragRep {
   friend bool printLCP_FRAG_REP(FILE *, const Uint32 *, Uint32, Uint16);  
 
   STATIC_CONST( SignalLength = 7 );
+  STATIC_CONST( BROADCAST_REQ = 0 );
 
   Uint32 nodeId;
   Uint32 lcpId;
@@ -147,6 +148,7 @@ class LcpCompleteRep {
   friend bool printLCP_COMPLETE_REP(FILE *, const Uint32 *, Uint32, Uint16);  
 public:
   STATIC_CONST( SignalLength = 3 );
+  STATIC_CONST( BROADCAST_REQ = 0 );
   
 private:
   Uint32 nodeId;

=== modified file 'storage/ndb/include/ndb_version.h.in'
--- a/storage/ndb/include/ndb_version.h.in	2009-11-02 17:09:12 +0000
+++ b/storage/ndb/include/ndb_version.h.in	2009-11-06 12:11:52 +0000
@@ -286,4 +286,29 @@ ndb_delayed_copy_active_req(Uint32 x)
   }
 }
 
+#define NDBD_SANE_LCP_0_63 NDB_MAKE_VERSION(6,3,29)
+#define NDBD_SANE_LCP_0_70 NDB_MAKE_VERSION(7,0,10)
+
+static
+inline
+int
+ndb_sane_lcp_0(Uint32 x)
+{
+  if (x >= NDB_VERSION_D)
+    return 1;
+
+  {
+    const Uint32 major = (x >> 16) & 0xFF;
+
+    if (major >= 6)
+    {
+      return x >= NDBD_SANE_LCP_0_63;
+    }
+    else
+    {
+      return x >= NDBD_SANE_LCP_0_70;
+    }
+  }
+}
+
 #endif

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2009-11-02 17:09:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/Dbdih.hpp	2009-11-06 12:11:52 +0000
@@ -624,6 +624,8 @@ private:
   void execDUMP_STATE_ORD(Signal *);
   void execNDB_TAMPER(Signal *);
   void execDEBUG_SIG(Signal *);
+  void execEMPTY_LCP_REQ(Signal*);
+  void execEMPTY_LCP_REP(Signal*);
   void execEMPTY_LCP_CONF(Signal *);
   void execMASTER_GCPREF(Signal *);
   void execMASTER_GCPREQ(Signal *);
@@ -676,6 +678,7 @@ private:
   void execTC_CLOPSIZECONF(Signal *);
 
   int handle_invalid_lcp_no(const class LcpFragRep*, ReplicaRecordPtr);
+  void execLCP_FRAG_ORD(Signal *);
   void execLCP_FRAG_REP(Signal *);
   void execLCP_COMPLETE_REP(Signal *);
   void execSTART_LCP_REQ(Signal *);

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2009-09-09 08:20:02 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihInit.cpp	2009-11-06 12:11:52 +0000
@@ -138,6 +138,8 @@ Dbdih::Dbdih(Block_context& ctx):
   addRecSignal(GSN_MASTER_GCPREQ, &Dbdih::execMASTER_GCPREQ);
   addRecSignal(GSN_MASTER_GCPREF, &Dbdih::execMASTER_GCPREF);
   addRecSignal(GSN_MASTER_GCPCONF, &Dbdih::execMASTER_GCPCONF);
+  addRecSignal(GSN_EMPTY_LCP_REQ, &Dbdih::execEMPTY_LCP_REQ);
+  addRecSignal(GSN_EMPTY_LCP_REP, &Dbdih::execEMPTY_LCP_REP);
   addRecSignal(GSN_EMPTY_LCP_CONF, &Dbdih::execEMPTY_LCP_CONF);
   addRecSignal(GSN_MASTER_LCPREQ, &Dbdih::execMASTER_LCPREQ);
   addRecSignal(GSN_MASTER_LCPREF, &Dbdih::execMASTER_LCPREF);
@@ -188,6 +190,7 @@ Dbdih::Dbdih(Block_context& ctx):
   addRecSignal(GSN_TC_CLOPSIZECONF, &Dbdih::execTC_CLOPSIZECONF);
 
   addRecSignal(GSN_LCP_COMPLETE_REP, &Dbdih::execLCP_COMPLETE_REP);
+  addRecSignal(GSN_LCP_FRAG_ORD, &Dbdih::execLCP_FRAG_ORD);
   addRecSignal(GSN_LCP_FRAG_REP, &Dbdih::execLCP_FRAG_REP);
   addRecSignal(GSN_START_LCP_REQ, &Dbdih::execSTART_LCP_REQ);
   addRecSignal(GSN_START_LCP_CONF, &Dbdih::execSTART_LCP_CONF);

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2009-11-05 20:46:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2009-11-06 12:11:52 +0000
@@ -177,6 +177,11 @@ void Dbdih::sendDIH_SWITCH_REPLICA_REQ(S
 void Dbdih::sendEMPTY_LCP_REQ(Signal* signal, Uint32 nodeId, Uint32 extra)
 {
   BlockReference ref = calcLqhBlockRef(nodeId);
+  if (ndb_sane_lcp_0(getNodeInfo(nodeId).m_version))
+  {
+    jam();
+    ref = calcDihBlockRef(nodeId);
+  }
   sendSignal(ref, GSN_EMPTY_LCP_REQ, signal, EmptyLcpReq::SignalLength, JBB);
 }//Dbdih::sendEMPTY_LCPREQ()
 
@@ -6181,6 +6186,32 @@ void Dbdih::startLcpTakeOverLab(Signal* 
   /*--------------------------------------------------------------------*/
 }//Dbdih::startLcpTakeOver()
 
+void
+Dbdih::execEMPTY_LCP_REQ(Signal* signal)
+{
+  jamEntry();
+  sendSignal(DBLQH_REF, GSN_EMPTY_LCP_REQ, signal, signal->getLength(), JBB);
+}
+
+void
+Dbdih::execEMPTY_LCP_REP(Signal* signal)
+{
+  jamEntry();
+  EmptyLcpRep* rep = (EmptyLcpRep*)signal->getDataPtr();
+
+  Uint32 len = signal->getLength();
+  ndbrequire(len > EmptyLcpRep::SignalLength);
+  len -= EmptyLcpRep::SignalLength;
+
+  NdbNodeBitmask nodes;
+  nodes.assign(NdbNodeBitmask::Size, rep->receiverGroup);
+  NodeReceiverGroup rg (DBDIH, nodes);
+  memmove(signal->getDataPtrSend(),
+          signal->getDataPtr()+EmptyLcpRep::SignalLength, 4*len);
+
+  sendSignal(rg, GSN_EMPTY_LCP_CONF, signal, len, JBB);
+}
+
 void Dbdih::execEMPTY_LCP_CONF(Signal* signal)
 {
   jamEntry();
@@ -11508,6 +11539,11 @@ void Dbdih::sendLastLCP_FRAG_ORD(Signal*
 
       CRASH_INSERTION(7193);
       BlockReference ref = calcLqhBlockRef(nodePtr.i);
+      if (ndb_sane_lcp_0(getNodeInfo(nodePtr.i).m_version))
+      {
+        jam();
+        ref = calcDihBlockRef(nodePtr.i);
+      }
       sendSignal(ref, GSN_LCP_FRAG_ORD, signal,LcpFragOrd::SignalLength, JBB);
     }
   }
@@ -11517,6 +11553,13 @@ void Dbdih::sendLastLCP_FRAG_ORD(Signal*
   }
 }//Dbdih::sendLastLCP_FRAGORD()
 
+void
+Dbdih::execLCP_FRAG_ORD(Signal* signal)
+{
+  jamEntry();
+  sendSignal(DBLQH_REF, GSN_LCP_FRAG_ORD, signal, signal->getLength(), JBB);
+}
+
 /* ------------------------------------------------------------------------- */
 /*       A FRAGMENT REPLICA HAS COMPLETED EXECUTING ITS LOCAL CHECKPOINT.    */
 /*       CHECK IF ALL REPLICAS IN THE TABLE HAVE COMPLETED. IF SO STORE THE  */
@@ -11526,6 +11569,29 @@ void Dbdih::sendLastLCP_FRAG_ORD(Signal*
 void Dbdih::execLCP_FRAG_REP(Signal* signal) 
 {
   jamEntry();
+
+  LcpFragRep * const lcpReport = (LcpFragRep *)&signal->theData[0];
+
+  const bool broadcast_req = lcpReport->nodeId == LcpFragRep::BROADCAST_REQ;
+  if (broadcast_req)
+  {
+    jam();
+    ndbrequire(refToNode(signal->getSendersBlockRef()) == getOwnNodeId());
+
+    /**
+     * Set correct nodeId
+     */
+    lcpReport->nodeId = getOwnNodeId();
+
+    NodeReceiverGroup rg(DBDIH, c_lcpState.m_participatingDIH);
+    rg.m_nodes.clear(getOwnNodeId());
+    sendSignal(rg, GSN_LCP_FRAG_REP, signal, signal->getLength(), JBB);
+
+    /**
+     * and continue processing
+     */
+  }
+
   ndbrequire(c_lcpState.lcpStatus != LCP_STATUS_IDLE);
   
 #if 0
@@ -11534,7 +11600,6 @@ void Dbdih::execLCP_FRAG_REP(Signal* sig
 		    signal->length(), number());
 #endif  
 
-  LcpFragRep * const lcpReport = (LcpFragRep *)&signal->theData[0];
   Uint32 nodeId = lcpReport->nodeId;
   Uint32 tableId = lcpReport->tableId;
   Uint32 fragId = lcpReport->fragId;
@@ -11575,7 +11640,7 @@ void Dbdih::execLCP_FRAG_REP(Signal* sig
   CRASH_INSERTION2(7016, !isMaster());
   CRASH_INSERTION2(7191, (!isMaster() && tableId));
 
-  bool fromTimeQueue = (signal->senderBlockRef() == reference());
+  bool fromTimeQueue = (signal->senderBlockRef() ==reference())&&!broadcast_req;
   
   TabRecordPtr tabPtr;
   tabPtr.i = tableId;
@@ -11957,7 +12022,12 @@ Dbdih::sendLCP_FRAG_ORD(Signal* signal, 
   ptrCheckGuard(replicaPtr, creplicaFileSize, replicaRecord);
   
   BlockReference ref = calcLqhBlockRef(replicaPtr.p->procNode);
-  
+  if (ndb_sane_lcp_0(getNodeInfo(replicaPtr.p->procNode).m_version))
+  {
+    jam();
+    ref = calcDihBlockRef(replicaPtr.p->procNode);
+  }
+
   if (ERROR_INSERTED(7193) && replicaPtr.p->procNode == getOwnNodeId())
   {
     return;
@@ -12081,6 +12151,28 @@ void Dbdih::execLCP_COMPLETE_REP(Signal*
 {
   jamEntry();
 
+  LcpCompleteRep * rep = (LcpCompleteRep*)signal->getDataPtr();
+
+  const bool broadcast_req = rep->nodeId == LcpFragRep::BROADCAST_REQ;
+  if (broadcast_req)
+  {
+    jam();
+    ndbrequire(refToNode(signal->getSendersBlockRef()) == getOwnNodeId());
+
+    /**
+     * Set correct nodeId
+     */
+    rep->nodeId = getOwnNodeId();
+
+    NodeReceiverGroup rg(DBDIH, c_lcpState.m_participatingDIH);
+    rg.m_nodes.clear(getOwnNodeId());
+    sendSignal(rg, GSN_LCP_COMPLETE_REP, signal, signal->getLength(), JBB);
+
+    /**
+     * and continue processing
+     */
+  }
+
   CRASH_INSERTION(7191);
 
 #if 0
@@ -12090,7 +12182,6 @@ void Dbdih::execLCP_COMPLETE_REP(Signal*
 			signal->length(), number());
 #endif
 
-  LcpCompleteRep * rep = (LcpCompleteRep*)signal->getDataPtr();
   Uint32 lcpId = rep->lcpId;
   Uint32 nodeId = rep->nodeId;
   Uint32 blockNo = rep->blockNo;

=== modified file 'storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-11-02 17:09:12 +0000
+++ b/storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp	2009-11-06 12:11:52 +0000
@@ -12019,24 +12019,16 @@ Dblqh::sendLCP_FRAG_REP(Signal * signal,
   
   ndbrequire(fragOrd.lcpFragOrd.lcpNo < MAX_LCP_STORED);
   LcpFragRep * const lcpReport = (LcpFragRep *)&signal->theData[0];
-  lcpReport->nodeId = cownNodeid;
+  lcpReport->nodeId = LcpFragRep::BROADCAST_REQ;
   lcpReport->lcpId = fragOrd.lcpFragOrd.lcpId;
   lcpReport->lcpNo = fragOrd.lcpFragOrd.lcpNo;
   lcpReport->tableId = fragOrd.lcpFragOrd.tableId;
   lcpReport->fragId = fragOrd.lcpFragOrd.fragmentId;
   lcpReport->maxGciCompleted = fragPtrP->maxGciCompletedInLcp;
   lcpReport->maxGciStarted = fragPtrP->maxGciInLcp;
-  
-  for (Uint32 i = 0; i < cnoOfNodes; i++) {
-    jam();
-    Uint32 nodeId = cnodeData[i];
-    if(cnodeStatus[i] == ZNODE_UP){
-      jam();
-      BlockReference Tblockref = calcDihBlockRef(nodeId);
-      sendSignal(Tblockref, GSN_LCP_FRAG_REP, signal, 
-		 LcpFragRep::SignalLength, JBB);
-    }//if
-  }//for
+
+  sendSignal(DBDIH_REF, GSN_LCP_FRAG_REP, signal,
+             LcpFragRep::SignalLength, JBB);
 }
 
 void Dblqh::contChkpNextFragLab(Signal* signal) 
@@ -12133,8 +12125,9 @@ void Dblqh::sendLCP_FRAGIDREQ(Signal* si
 
 void Dblqh::sendEMPTY_LCP_CONF(Signal* signal, bool idle)
 {
-  
-  EmptyLcpConf * const rep = (EmptyLcpConf*)&signal->theData[0];
+  EmptyLcpRep * sig = (EmptyLcpRep*)signal->getDataPtrSend();
+  EmptyLcpConf * rep = (EmptyLcpConf*)sig->conf;
+
   /* ----------------------------------------------------------------------
    *       We have been requested to report when there are no more local
    *       waiting to be started or ongoing. In this signal we also report
@@ -12156,18 +12149,10 @@ void Dblqh::sendEMPTY_LCP_CONF(Signal* s
     rep->lcpNo = ~0;
     rep->lcpId = c_lcpId;
   }
-  
-  for (Uint32 i = 0; i < cnoOfNodes; i++) {
-    jam();
-    Uint32 nodeId = cnodeData[i];
-    if (lcpPtr.p->m_EMPTY_LCP_REQ.get(nodeId)) {
-      jam();
-      
-      BlockReference blockref = calcDihBlockRef(nodeId);
-      sendSignal(blockref, GSN_EMPTY_LCP_CONF, signal, 
-		 EmptyLcpConf::SignalLength, JBB);
-    }//if
-  }//for
+
+  lcpPtr.p->m_EMPTY_LCP_REQ.copyto(NdbNodeBitmask::Size, sig->receiverGroup);
+  sendSignal(DBDIH_REF, GSN_EMPTY_LCP_REP, signal,
+             EmptyLcpRep::SignalLength + EmptyLcpConf::SignalLength, JBB);
 
   lcpPtr.p->reportEmpty = false;
   lcpPtr.p->m_EMPTY_LCP_REQ.clear();
@@ -12235,21 +12220,12 @@ void Dblqh::sendLCP_COMPLETE_REP(Signal*
   lcpPtr.p->firstFragmentFlag = false;
   
   LcpCompleteRep* rep = (LcpCompleteRep*)signal->getDataPtrSend();
-  rep->nodeId = getOwnNodeId();
+  rep->nodeId = LcpCompleteRep::BROADCAST_REQ;
   rep->lcpId = lcpId;
   rep->blockNo = DBLQH;
   
-  for (Uint32 i = 0; i < cnoOfNodes; i++) {
-    jam();
-    Uint32 nodeId = cnodeData[i];
-    if(cnodeStatus[i] == ZNODE_UP){
-      jam();
-      
-      BlockReference blockref = calcDihBlockRef(nodeId);
-      sendSignal(blockref, GSN_LCP_COMPLETE_REP, signal, 
-		 LcpCompleteRep::SignalLength, JBB);
-    }//if
-  }//for
+  sendSignal(DBDIH_REF, GSN_LCP_COMPLETE_REP, signal,
+             LcpCompleteRep::SignalLength, JBB);
 
   if(lcpPtr.p->reportEmpty){
     jam();


Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20091106121152-6520tsfcbek77sa7.bundle
Thread
bzr push into mysql-5.1-telco-6.3 branch (jonas:3158 to 3159)Jonas Oreland6 Nov