List:Commits« Previous MessageNext Message »
From:Frazer Clement Date:September 3 2009 3:51pm
Subject:bzr commit into mysql-5.1-telco-6.2 branch (frazer:2976) Bug#44607
View as plain text  
#At file:///home/frazer/bzr/mysql-5.1-telco-6.2/

 2976 Frazer Clement	2009-09-03
      Bug#44607 : Fragmented long signals need node failure handling
      modified:
        storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp
        storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp
        storage/ndb/include/kernel/signaldata/NodeFailRep.hpp
        storage/ndb/src/kernel/blocks/backup/Backup.cpp
        storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
        storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp
        storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
        storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
        storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
        storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
        storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
        storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
        storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp
        storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp
        storage/ndb/src/kernel/blocks/lgman.cpp
        storage/ndb/src/kernel/blocks/lgman.hpp
        storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
        storage/ndb/src/kernel/blocks/suma/Suma.cpp
        storage/ndb/src/kernel/blocks/suma/Suma.hpp
        storage/ndb/src/kernel/blocks/tsman.cpp
        storage/ndb/src/kernel/blocks/tsman.hpp
        storage/ndb/src/kernel/vm/SimulatedBlock.cpp
        storage/ndb/src/kernel/vm/SimulatedBlock.hpp

=== modified file 'storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp'
--- a/storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp	2009-09-03 15:51:43 +0000
@@ -32,7 +32,33 @@ class ContinueFragmented {
 public:
   
 private:
-  Uint32 line;
+  enum {
+    CONTINUE_SENDING = 0,
+    CONTINUE_CLEANUP = 1
+  };
+  
+  STATIC_CONST(CONTINUE_CLEANUP_FIXED_WORDS = 5);
+
+  enum {
+    RES_FRAGSEND = 0, /* Fragmented send lists */
+    RES_FRAGINFO = 1, /* Fragmented signal assembly hash */
+    RES_LAST = 2      /* Must be last */
+  };
+
+  Uint32 type;
+  
+  union
+  {
+    Uint32 line;  /* For CONTINUE_SENDING */
+    struct        /* For CONTINUE_CLEANUP */
+    {
+      Uint32 failedNodeId;
+      Uint32 resource;
+      Uint32 cursor;
+      Uint32 elementsCleaned;
+      Uint32 callbackStart; /* Callback structure placed here */
+    };
+  };
 };
 
 #endif

=== modified file 'storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp'
--- a/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp	2009-09-03 15:51:43 +0000
@@ -124,6 +124,7 @@ public:
                                       to be able to debug if events
                                       for some reason does not end up
                                       in clusterlog */
+    CmvmiTestLongSig = 2605,  /* Long signal testing trigger */
     LCPContinue = 5900,
     // 7000 DIH
     // 7001 DIH

=== modified file 'storage/ndb/include/kernel/signaldata/NodeFailRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/NodeFailRep.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/NodeFailRep.hpp	2009-09-03 15:51:43 +0000
@@ -24,7 +24,8 @@
 
 /**
  * This signals is sent by Qmgr to NdbCntr
- *   and then from NdbCntr sent to: dih, dict, lqh, tc & API
+ *   and then from NdbCntr sent to: dih, dict, lqh, tc, API
+ *   and others
  */
 struct NodeFailRep {
   STATIC_CONST( SignalLength = 3 + NdbNodeBitmask::Size );

=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp	2009-09-03 15:51:43 +0000
@@ -850,6 +850,18 @@ Backup::execNODE_FAILREP(Signal* signal)
     jam();
     checkNodeFail(signal, ptr, newCoordinator, theFailedNodes);
   }
+
+  /* Block level cleanup */
+  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+    jam();
+    if(NdbNodeBitmask::get(theFailedNodes, i))
+    {
+      jam();
+      Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+      ndbassert(elementsCleaned == 0); // Backup should have no distributed frag signals
+      (void) elementsCleaned; // Remove compiler warning
+    }//if
+  }//for
 }
 
 bool

=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2009-09-01 10:50:11 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp	2009-09-03 15:51:43 +0000
@@ -1152,6 +1152,16 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal
     sendSignal(reference(), GSN_TESTSIG, signal, 8, JBB, ptr, 2);
   }
 
+  if (arg == DumpStateOrd::CmvmiTestLongSig)
+  {
+    /* Forward as GSN_TESTSIG to self */
+    Uint32 numArgs= signal->length() - 1;
+    memmove(signal->getDataPtrSend(), 
+            signal->getDataPtrSend() + 1, 
+            numArgs << 2);
+    sendSignal(reference(), GSN_TESTSIG, signal, numArgs, JBB);
+  }
+
 #ifdef ERROR_INSERT
   if (arg == 9000 || arg == 9002)
   {
@@ -1265,6 +1275,379 @@ Cmvmi::execNODE_START_REP(Signal* signal
 
 BLOCK_FUNCTIONS(Cmvmi)
 
+void
+Cmvmi::startFragmentedSend(Signal* signal,
+                           Uint32 variant,
+                           Uint32 numSigs,
+                           NodeReceiverGroup rg)
+{
+  Uint32* sigData = signal->getDataPtrSend();
+  const Uint32 sigLength = 6;
+  const Uint32 sectionWords = 240;
+  Uint32 sectionData[ sectionWords ];
+  
+  for (Uint32 i = 0; i < sectionWords; i++)
+    sectionData[ i ] = i;
+  
+  const Uint32 secCount = 1; 
+  LinearSectionPtr ptr[3];
+  ptr[0].sz = sectionWords;
+  ptr[0].p = &sectionData[0];
+
+  for (Uint32 i = 0; i < numSigs; i++)
+  {
+    sigData[0] = variant;
+    sigData[1] = 31;
+    sigData[2] = 0;
+    sigData[3] = 1; // print
+    sigData[4] = 0;
+    sigData[5] = sectionWords;
+    
+    if ((i & 1) == 0)
+    {
+      DEBUG("Starting linear fragmented send (" << i + 1
+            << "/" << numSigs << ")");
+
+      /* Linear send */
+      /* Todo : Avoid reading from invalid stackptr in CONTINUEB */
+      sendFragmentedSignal(rg,
+                           GSN_TESTSIG,
+                           signal,
+                           sigLength,
+                           JBB,
+                           ptr,
+                           secCount,
+                           TheEmptyCallback,
+                           90); // messageSize
+    }
+    else
+    {
+      /* Segmented send */
+      DEBUG("Starting segmented fragmented send (" << i + 1
+            << "/" << numSigs << ")");
+      Ptr<SectionSegment> segPtr;
+      ndbrequire(import(segPtr, sectionData, sectionWords));
+      SegmentedSectionPtr ssPtr;
+      getSection(ssPtr, segPtr.i);
+      
+      signal->header.m_noOfSections = 1;
+      signal->m_sectionPtr[0] = ssPtr;
+      
+      sendFragmentedSignal(rg,
+                           GSN_TESTSIG,
+                           signal,
+                           sigLength,
+                           JBB,
+                           TheEmptyCallback,
+                           90); // messageSize
+    }
+  }
+}
+
+void
+Cmvmi::testNodeFailureCleanupCallback(Signal* signal, Uint32 data, Uint32 elementsCleaned)
+{
+  DEBUG("testNodeFailureCleanupCallback");
+  DEBUG("Data : " << data 
+        << " elementsCleaned : " << elementsCleaned);
+
+  debugPrintFragmentCounts();
+
+  Uint32 variant = data & 0xffff;
+  Uint32 testType = (data >> 16) & 0xffff;
+
+  DEBUG("Sending trigger(" << testType 
+        << ") variant " << variant 
+        << " to self to cleanup any fragments that arrived "
+        << "before send was cancelled");
+
+  Uint32* sigData = signal->getDataPtrSend();
+  sigData[0] = variant;
+  sigData[1] = testType;
+  sendSignal(reference(), GSN_TESTSIG, signal, 2, JBB);
+  
+  return; 
+}
+
+void 
+Cmvmi::testFragmentedCleanup(Signal* signal, Uint32 testType, Uint32 variant)
+{
+  DEBUG("TestType " << testType << " variant " << variant);
+  debugPrintFragmentCounts();
+
+  /* Variants : 
+   *     Local fragmented send   Multicast fragmented send
+   * 0 : Immediate cleanup       Immediate cleanup
+   * 1 : Continued cleanup       Immediate cleanup
+   * 2 : Immediate cleanup       Continued cleanup
+   * 3 : Continued cleanup       Continued cleanup
+   */
+  const Uint32 NUM_VARIANTS = 4;
+  if (variant >= NUM_VARIANTS)
+  {
+    DEBUG("Unsupported variant");
+    releaseSections(signal);
+    return;
+  }
+
+  /* Test from ndb_mgm with
+   * <node(s)> DUMP 2605 0 30 
+   * 
+   * Use
+   * <node(s)> DUMP 2605 0 39 to get fragment resource usage counts
+   * Use
+   * <node(s)> DUMP 2601 to get segment usage counts in clusterlog
+   */
+  if (testType == 30)
+  {
+    /* Send the first fragment of a fragmented signal to self
+     * Receiver will allocate assembly hash entries
+     * which must be freed when node failure cleanup
+     * executes later
+     */
+    const Uint32 sectionWords = 240;
+    Uint32 sectionData[ sectionWords ];
+
+    for (Uint32 i = 0; i < sectionWords; i++)
+      sectionData[ i ] = i;
+
+    const Uint32 secCount = 1; 
+    LinearSectionPtr ptr[3];
+    ptr[0].sz = sectionWords;
+    ptr[0].p = &sectionData[0];
+
+    /* Send signal with testType == 31 */
+    NodeReceiverGroup me(reference());
+    Uint32* sigData = signal->getDataPtrSend();
+    const Uint32 sigLength = 6;
+    const Uint32 numPartialSigs = 4; 
+    /* Not too many as CMVMI's fragInfo hash is limited size */
+    // TODO : Consider making it debug-larger to get 
+    // more coverage on CONTINUEB path
+
+    for (Uint32 i = 0; i < numPartialSigs; i++)
+    {
+      /* Fill in messy TESTSIG format */
+      sigData[0] = variant;
+      sigData[1] = 31;
+      sigData[2] = 0;
+      sigData[3] = 0; // print
+      sigData[4] = 0;
+      sigData[5] = sectionWords;
+      
+      FragmentSendInfo fsi;
+      
+      DEBUG("Sending first fragment to self");
+      sendFirstFragment(fsi,
+                        me,
+                        GSN_TESTSIG,
+                        signal,
+                        sigLength,
+                        JBB,
+                        ptr,
+                        secCount,
+                        90); // FragmentLength
+
+      DEBUG("Cancelling remainder to free internal section");
+      fsi.m_status = FragmentSendInfo::SendCancelled;
+      sendNextLinearFragment(signal, fsi);
+    };
+
+    /* Ok, now send short signal with testType == 32
+     * to trigger 'remote-side' actions in middle of
+     * multiple fragment assembly
+     */
+    sigData[0] = variant;
+    sigData[1] = 32;
+
+    DEBUG("Sending node fail trigger to self");
+    sendSignal(me, GSN_TESTSIG, signal, 2, JBB);
+    return;
+  }
+
+  if (testType == 31)
+  {
+    /* Just release sections - execTESTSIG() has shown sections received */
+    releaseSections(signal);
+    return;
+  }
+
+  if (testType == 32)
+  {
+    /* 'Remote side' trigger to clean up fragmented signal resources */
+    BlockReference senderRef = signal->getSendersBlockRef();
+    Uint32 sendingNode = refToNode(senderRef);
+    
+    /* Start sending some linear and fragmented responses to the
+     * sender, to exercise frag-send cleanup code when we execute
+     * node-failure later
+     */
+    DEBUG("Starting fragmented send using continueB back to self");
+
+    NodeReceiverGroup sender(senderRef);
+    startFragmentedSend(signal, variant, 6, sender);
+
+    debugPrintFragmentCounts();
+
+    Uint32 cbData= (((Uint32) 33) << 16) | variant;
+    Callback cb = { safe_cast(&Cmvmi::testNodeFailureCleanupCallback),
+                    cbData };
+
+    Callback* cbPtr = NULL;
+
+    bool passCallback = variant & 1;
+
+    if (passCallback)
+    {
+      DEBUG("Running simBlock failure code WITH CALLBACK for node " 
+            << sendingNode);
+      cbPtr = &cb;
+    }
+    else
+    {
+      DEBUG("Running simBlock failure code IMMEDIATELY (no callback) for node "
+            << sendingNode);
+      cbPtr = &TheEmptyCallback;
+    }
+
+    Uint32 elementsCleaned = simBlockNodeFailure(signal, sendingNode, *cbPtr);
+    
+    DEBUG("Elements cleaned by call : " << elementsCleaned);
+
+    debugPrintFragmentCounts();
+
+    if (! passCallback)
+    {
+      DEBUG("Variant " << variant << " manually executing callback");
+      /* We call the callback inline here to continue processing */
+      testNodeFailureCleanupCallback(signal, 
+                                     cbData,
+                                     elementsCleaned);
+    }
+
+    return;
+  }
+
+  if (testType == 33)
+  {
+    /* Original side - receive cleanup trigger from 'remote' side
+     * after node failure cleanup performed there.  We may have
+     * fragments it managed to send before the cleanup completed
+     * so we'll get rid of them.
+     * This would not be necessary in reality as this node would
+     * be failed
+     */
+    Uint32 sendingNode = refToNode(signal->getSendersBlockRef());
+    DEBUG("Running simBlock failure code for node " << sendingNode);
+
+    Uint32 elementsCleaned = simBlockNodeFailure(signal, sendingNode);
+
+    DEBUG("Elements cleaned : " << elementsCleaned);
+
+    /* Should have no fragment resources in use now */
+    ndbrequire(debugPrintFragmentCounts() == 0);
+
+    /* Now use ReceiverGroup to multicast a fragmented signal to
+     * all database nodes
+     */
+    DEBUG("Starting to send fragmented continueB to all nodes inc. self : ");
+    NodeReceiverGroup allNodes(CMVMI, c_dbNodes);
+    
+    unsigned nodeId = 0;
+    while((nodeId = c_dbNodes.find(nodeId+1)) != BitmaskImpl::NotFound)
+    {
+      DEBUG("Node " << nodeId);
+    }
+
+    startFragmentedSend(signal, variant, 8, allNodes);
+
+    debugPrintFragmentCounts();
+
+    Uint32 cbData= (((Uint32) 34) << 16) | variant;
+    Callback cb = { safe_cast(&Cmvmi::testNodeFailureCleanupCallback),
+                    cbData };
+    
+    Callback* cbPtr = NULL;
+    
+    bool passCallback = variant & 2;
+
+    if (passCallback)
+    {
+      DEBUG("Running simBlock failure code for self WITH CALLBACK (" 
+            << getOwnNodeId() << ")");
+      cbPtr= &cb;
+    }
+    else
+    {
+      DEBUG("Running simBlock failure code for self IMMEDIATELY (no callback) ("
+            << getOwnNodeId() << ")");
+      cbPtr= &TheEmptyCallback;
+    }
+    
+
+    /* Fragmented signals being sent will have this node removed
+     * from their receiver group, but will keep sending to the 
+     * other node(s).
+     * Other node(s) should therefore receive the complete signals.
+     * We will then receive only the first fragment of each of 
+     * the signals which must be removed later.
+     */
+    elementsCleaned = simBlockNodeFailure(signal, getOwnNodeId(), *cbPtr);
+
+    DEBUG("Elements cleaned : " << elementsCleaned);
+    
+    debugPrintFragmentCounts();
+
+    /* Callback will send a signal to self to clean up fragments that 
+     * were sent to self before the send was cancelled.  
+     * (Again, unnecessary in a 'real' situation
+     */
+    if (!passCallback)
+    {
+      DEBUG("Variant " << variant << " manually executing callback");
+
+      testNodeFailureCleanupCallback(signal,
+                                     cbData,
+                                     elementsCleaned);
+    }
+
+    return;
+  }
+  
+  if (testType == 34)
+  {
+    /* Cleanup fragments which were sent before send was cancelled. */
+    Uint32 elementsCleaned = simBlockNodeFailure(signal, getOwnNodeId());
+    
+    DEBUG("Elements cleaned " << elementsCleaned);
+    
+    /* All FragInfo should be clear, may still be sending some
+     * to other node(s)
+     */
+    debugPrintFragmentCounts();
+
+    DEBUG("Variant " << variant << " completed.");
+    
+    if (++variant < NUM_VARIANTS)
+    {
+      DEBUG("Re-executing with variant " << variant);
+      Uint32* sigData = signal->getDataPtrSend();
+      sigData[0] = variant;
+      sigData[1] = 30;
+      sendSignal(reference(), GSN_TESTSIG, signal, 2, JBB);
+    }
+//    else
+//    {
+//      // Infinite loop to test for leaks
+//       DEBUG("Back to zero");
+//       Uint32* sigData = signal->getDataPtrSend();
+//       sigData[0] = 0;
+//       sigData[1] = 30;
+//       sendSignal(reference(), GSN_TESTSIG, signal, 2, JBB);
+//    }
+  }
+}
+
 static Uint32 g_print;
 static LinearSectionPtr g_test[3];
 
@@ -1337,6 +1720,16 @@ Cmvmi::execTESTSIG(Signal* signal){
   
   NodeReceiverGroup rg(CMVMI, c_dbNodes);
 
+  /**
+   * Testing SimulatedBlock fragment assembly cleanup
+   */
+  if ((testType >= 30) &&
+      (testType < 40))
+  {
+    testFragmentedCleanup(signal, testType, ref);
+    return;
+  }
+
   if(signal->getSendersBlockRef() == ref){
     /**
      * Signal from API (not via NodeReceiverGroup)

=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp	2009-09-03 15:51:43 +0000
@@ -114,6 +114,9 @@ private:
   Cmvmi(const Cmvmi &obj);
   void operator = (const Cmvmi &);
 
+  void startFragmentedSend(Signal* signal, Uint32 variant, Uint32 numSigs, NodeReceiverGroup rg);
+  void testNodeFailureCleanupCallback(Signal* signal, Uint32 variant, Uint32 elementsCleaned);
+  void testFragmentedCleanup(Signal* signal, Uint32 testType, Uint32 variant);
   void sendFragmentedComplete(Signal* signal, Uint32 data, Uint32 returnCode);
 };
 

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2009-08-21 09:39:39 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp	2009-09-03 15:51:43 +0000
@@ -3893,6 +3893,17 @@ Dbdict::restartDropObj_commit_complete_d
 /* ---------------------------------------------------------------- */
 /* **************************************************************** */
 
+void Dbdict::handleApiFailureCallback(Signal* signal, 
+                                      Uint32 failedNodeId,
+                                      Uint32 ignoredRc)
+{
+  jamEntry();
+  
+  signal->theData[0] = failedNodeId;
+  signal->theData[1] = reference();
+  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+}
+
 /* ---------------------------------------------------------------- */
 // We receive a report of an API that failed.
 /* ---------------------------------------------------------------- */
@@ -3910,11 +3921,27 @@ void Dbdict::execAPI_FAILREQ(Signal* sig
   }//if
 #endif
 
-  signal->theData[0] = failedApiNode;
-  signal->theData[1] = reference();
-  sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+  ndbrequire(retRef == QMGR_REF); // As callback hard-codes QMGR_REF
+  Callback cb = { safe_cast(&Dbdict::handleApiFailureCallback),
+                  failedApiNode };
+  simBlockNodeFailure(signal, failedApiNode, cb);
 }//execAPI_FAILREQ()
 
+void Dbdict::handleNdbdFailureCallback(Signal* signal, 
+                                       Uint32 failedNodeId, 
+                                       Uint32 ignoredRc)
+{
+  jamEntry();
+
+  /* Node failure handling is complete */
+  NFCompleteRep * const nfCompRep = (NFCompleteRep *)&signal->theData[0];
+  nfCompRep->blockNo      = DBDICT;
+  nfCompRep->nodeId       = getOwnNodeId();
+  nfCompRep->failedNodeId = failedNodeId;
+  sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal, 
+             NFCompleteRep::SignalLength, JBB);
+}
+
 /* ---------------------------------------------------------------- */
 // We receive a report of one or more node failures of kernel nodes.
 /* ---------------------------------------------------------------- */
@@ -3978,14 +4005,12 @@ void Dbdict::execNODE_FAILREP(Signal* si
       c_nodes.getPtr(nodePtr, i);
 
       nodePtr.p->nodeState = NodeRecord::NDB_NODE_DEAD;
-      NFCompleteRep * const nfCompRep = (NFCompleteRep *)&signal->theData[0];
-      nfCompRep->blockNo      = DBDICT;
-      nfCompRep->nodeId       = getOwnNodeId();
-      nfCompRep->failedNodeId = nodePtr.i;
-      sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal, 
-		 NFCompleteRep::SignalLength, JBB);
-      
       c_aliveNodes.clear(i);
+
+      Callback cb = {safe_cast(&Dbdict::handleNdbdFailureCallback),
+                     i};
+
+      simBlockNodeFailure(signal, nodePtr.i, cb);
     }//if
   }//for
 

=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2009-08-21 09:35:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp	2009-09-03 15:51:43 +0000
@@ -2153,7 +2153,12 @@ private:
 
   // NF handling
   void removeStaleDictLocks(Signal* signal, const Uint32* theFailedNodes);
-
+  void handleNdbdFailureCallback(Signal* signal, 
+                                 Uint32 failedNodeId,
+                                 Uint32 ignoredRc);
+  void handleApiFailureCallback(Signal* signal,
+                                Uint32 failedNodeId,
+                                Uint32 ignoredRc);
 
   // Statement blocks
 

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2009-09-01 12:27:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp	2009-09-03 15:51:43 +0000
@@ -942,11 +942,12 @@ public:
 
     enum NodeFailBits
     {
-      NF_TAKEOVER          = 0x1,
-      NF_CHECK_SCAN        = 0x2,
-      NF_CHECK_TRANSACTION = 0x4,
-      NF_CHECK_DROP_TAB    = 0x8,
-      NF_NODE_FAIL_BITS    = 0xF // All bits...
+      NF_TAKEOVER          = 0x01,
+      NF_CHECK_SCAN        = 0x02,
+      NF_CHECK_TRANSACTION = 0x04,
+      NF_CHECK_DROP_TAB    = 0x08,
+      NF_BLOCK_HANDLE      = 0x10,
+      NF_NODE_FAIL_BITS    = 0x1F // All bits...
     };
     Uint32 m_nf_bits;
     NdbNodeBitmask m_lqh_trans_conf;
@@ -1634,7 +1635,10 @@ private:
 			 LocalDLList<ScanFragRec>::Head&);
 
   void nodeFailCheckTransactions(Signal*,Uint32 transPtrI,Uint32 failedNodeId);
+  void ndbdFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
   void checkNodeFailComplete(Signal* signal, Uint32 failedNodeId, Uint32 bit);
+
+  void apiFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
   
   // Initialisation
   void initData();

=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2009-09-01 12:27:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp	2009-09-03 15:51:43 +0000
@@ -1157,13 +1157,15 @@ Dbtc::removeMarkerForFailedAPI(Signal* s
       capiConnectClosing[nodeId]--;
       if (capiConnectClosing[nodeId] == 0) {
         jam();
+
         /********************************************************************/
         // No outstanding ABORT or COMMIT's of this failed API node. 
-        // We can respond with API_FAILCONF
+        // Perform SimulatedBlock level cleanup before sending
+        // API_FAILCONF
         /********************************************************************/
-        signal->theData[0] = nodeId;
-        signal->theData[1] = cownref;
-        sendSignal(capiFailRef, GSN_API_FAILCONF, signal, 2, JBB);
+        Callback cb = {safe_cast(&Dbtc::apiFailBlockCleanupCallback),
+                       nodeId};
+        simBlockNodeFailure(signal, nodeId, cb);
       }
       return;
     }
@@ -7378,6 +7380,9 @@ void Dbtc::execNODE_FAILREP(Signal* sign
     checkScanActiveInFailedLqh(signal, 0, hostptr.i);
     checkWaitDropTabFailedLqh(signal, hostptr.i, 0); // nodeid, tableid
     nodeFailCheckTransactions(signal, 0, hostptr.i);
+    Callback cb = {safe_cast(&Dbtc::ndbdFailBlockCleanupCallback), 
+                  hostptr.i};
+    simBlockNodeFailure(signal, hostptr.i, cb);
   }
 }//Dbtc::execNODE_FAILREP()
 
@@ -7517,6 +7522,28 @@ Dbtc::nodeFailCheckTransactions(Signal* 
   }
 }
 
+void
+Dbtc::ndbdFailBlockCleanupCallback(Signal* signal,
+                                   Uint32 failedNodeId,
+                                   Uint32 ignoredRc)
+{
+  jamEntry();
+  
+  checkNodeFailComplete(signal, failedNodeId,
+                        HostRecord::NF_BLOCK_HANDLE);
+}
+
+void
+Dbtc::apiFailBlockCleanupCallback(Signal* signal,
+                                  Uint32 failedNodeId,
+                                  Uint32 ignoredRc)
+{
+  jamEntry();
+  
+  signal->theData[0] = failedNodeId;
+  signal->theData[1] = cownref;
+  sendSignal(capiFailRef, GSN_API_FAILCONF, signal, 2, JBB);
+}
 
 void
 Dbtc::checkScanFragList(Signal* signal,

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp	2009-09-03 15:51:43 +0000
@@ -1714,6 +1714,7 @@ private:
   void execALTER_TAB_REQ(Signal* signal);
   void execTUP_DEALLOCREQ(Signal* signal);
   void execTUP_WRITELOG_REQ(Signal* signal);
+  void execNODE_FAILREP(Signal* signal);
 
   // Ordered index related
   void execBUILDINDXREQ(Signal* signal);

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp	2009-09-03 15:51:43 +0000
@@ -32,6 +32,7 @@
 #include <signaldata/FsRemoveReq.hpp>
 #include <signaldata/TupCommit.hpp>
 #include <signaldata/TupKey.hpp>
+#include <signaldata/NodeFailRep.hpp>
 
 #include <signaldata/DropTab.hpp>
 #include <SLList.hpp>
@@ -65,6 +66,7 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* 
   addRecSignal(GSN_DEBUG_SIG, &Dbtup::execDEBUG_SIG);
   addRecSignal(GSN_CONTINUEB, &Dbtup::execCONTINUEB);
   addRecSignal(GSN_LCP_FRAG_ORD, &Dbtup::execLCP_FRAG_ORD);
+  addRecSignal(GSN_NODE_FAILREP, &Dbtup::execNODE_FAILREP);
 
   addRecSignal(GSN_DUMP_STATE_ORD, &Dbtup::execDUMP_STATE_ORD);
   addRecSignal(GSN_SEND_PACKED, &Dbtup::execSEND_PACKED);
@@ -773,4 +775,22 @@ void Dbtup::releaseFragrec(FragrecordPtr
 }//Dbtup::releaseFragrec()
 
 
+void Dbtup::execNODE_FAILREP(Signal* signal)
+{
+  jamEntry();
+  const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+  NdbNodeBitmask failed; 
+  failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+  /* Block level cleanup */
+  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+    jam();
+    if(failed.get(i)) {
+      jam();
+      Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+      ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+      (void) elementsCleaned; // Remove compiler warning
+    }//if
+  }//for
+}
 

=== modified file 'storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp'
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp	2009-09-03 15:51:43 +0000
@@ -29,6 +29,7 @@
 #include <signaldata/TcKeyFailConf.hpp>
 #include <signaldata/GetTabInfo.hpp>
 #include <signaldata/DictTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
 
 #include <signaldata/UtilSequence.hpp>
 #include <signaldata/UtilPrepare.hpp>
@@ -66,6 +67,7 @@ DbUtil::DbUtil(Block_context& ctx) :
   addRecSignal(GSN_NDB_STTOR, &DbUtil::execNDB_STTOR);
   addRecSignal(GSN_DUMP_STATE_ORD, &DbUtil::execDUMP_STATE_ORD);
   addRecSignal(GSN_CONTINUEB, &DbUtil::execCONTINUEB);
+  addRecSignal(GSN_NODE_FAILREP, &DbUtil::execNODE_FAILREP);
   
   //addRecSignal(GSN_TCSEIZEREF, &DbUtil::execTCSEIZEREF);
   addRecSignal(GSN_TCSEIZECONF, &DbUtil::execTCSEIZECONF);
@@ -304,6 +306,25 @@ DbUtil::execCONTINUEB(Signal* signal){
 }
 
 void
+DbUtil::execNODE_FAILREP(Signal* signal){
+  jamEntry();
+  const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+  NdbNodeBitmask failed; 
+  failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+  /* Block level cleanup */
+  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+    jam();
+    if(failed.get(i)) {
+      jam();
+      Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+      ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+      (void) elementsCleaned; // Remove compiler warning
+    }//if
+  }//for
+}
+
+void
 DbUtil::execDUMP_STATE_ORD(Signal* signal){
   jamEntry();
 

=== modified file 'storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp'
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp	2009-09-03 15:51:43 +0000
@@ -76,6 +76,7 @@ protected:
   void execNDB_STTOR(Signal* signal);
   void execDUMP_STATE_ORD(Signal* signal);
   void execCONTINUEB(Signal* signal);
+  void execNODE_FAILREP(Signal* signal);
 
   /**
    * Sequence Service : Public interface

=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp	2009-08-21 09:26:34 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp	2009-09-03 15:51:43 +0000
@@ -29,6 +29,7 @@
 #include <signaldata/SumaImpl.hpp>
 #include <signaldata/LgmanContinueB.hpp>
 #include <signaldata/GetTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
 #include "ndbfs/Ndbfs.hpp"
 #include "dbtup/Dbtup.hpp"
 
@@ -66,6 +67,7 @@ Lgman::Lgman(Block_context & ctx) :
   addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
   addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
   addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
+  addRecSignal(GSN_NODE_FAILREP, &Lgman::execNODE_FAILREP);
 
   addRecSignal(GSN_CREATE_FILE_REQ, &Lgman::execCREATE_FILE_REQ);
   addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Lgman::execCREATE_FILEGROUP_REQ);
@@ -258,6 +260,26 @@ Lgman::execCONTINUEB(Signal* signal){
 }
 
 void
+Lgman::execNODE_FAILREP(Signal* signal)
+{
+  jamEntry();
+  const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+  NdbNodeBitmask failed; 
+  failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+  /* Block level cleanup */
+  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+    jam();
+    if(failed.get(i)) {
+      jam();
+      Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+      ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+      (void) elementsCleaned; // Remove compiler warning
+    }//if
+  }//for
+}
+
+void
 Lgman::execDUMP_STATE_ORD(Signal* signal){
   jamEntry();
   if(signal->theData[0] == 12001)

=== modified file 'storage/ndb/src/kernel/blocks/lgman.hpp'
--- a/storage/ndb/src/kernel/blocks/lgman.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.hpp	2009-09-03 15:51:43 +0000
@@ -47,6 +47,7 @@ protected:
   void execREAD_CONFIG_REQ(Signal* signal);
   void execDUMP_STATE_ORD(Signal* signal);
   void execCONTINUEB(Signal* signal);
+  void execNODE_FAILREP(Signal* signal);
   
   void execCREATE_FILE_REQ(Signal* signal);
   void execCREATE_FILEGROUP_REQ(Signal* signal);

=== modified file 'storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2009-08-03 11:28:27 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp	2009-09-03 15:51:43 +0000
@@ -1705,6 +1705,18 @@ void Ndbcntr::execNODE_FAILREP(Signal* s
   sendSignal(QMGR_REF, GSN_NODE_FAILREP, signal,
 	     NodeFailRep::SignalLength, JBB);
 
+  sendSignal(DBUTIL_REF, GSN_NODE_FAILREP, signal,
+             NodeFailRep::SignalLength, JBB);
+
+  sendSignal(DBTUP_REF, GSN_NODE_FAILREP, signal,
+             NodeFailRep::SignalLength, JBB);
+
+  sendSignal(TSMAN_REF, GSN_NODE_FAILREP, signal,
+             NodeFailRep::SignalLength, JBB);
+
+  sendSignal(LGMAN_REF, GSN_NODE_FAILREP, signal,
+             NodeFailRep::SignalLength, JBB);
+
   if (c_stopRec.stopReq.senderRef)
   {
     jam();

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp	2009-09-03 15:51:43 +0000
@@ -717,20 +717,24 @@ void Suma::execAPI_FAILREQ(Signal* signa
   jamEntry();
   DBUG_ENTER("Suma::execAPI_FAILREQ");
   Uint32 failedApiNode = signal->theData[0];
-  BlockReference retRef = signal->theData[1];
+  ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
 
   c_connected_nodes.clear(failedApiNode);
 
   if (c_failedApiNodes.get(failedApiNode))
   {
     jam();
+    /* Being handled already, just conf */
     goto CONF;
   }
 
   if (!c_subscriber_nodes.get(failedApiNode))
   {
     jam();
-    goto CONF;
+    /* No Subscribers on that node, no SUMA 
+     * specific work to do
+     */
+    goto BLOCK_CLEANUP;
   }
 
   c_failedApiNodes.set(failedApiNode);
@@ -744,15 +748,52 @@ void Suma::execAPI_FAILREQ(Signal* signa
   sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
   return;
 
+BLOCK_CLEANUP:
+  jam();
+  api_fail_block_cleanup(signal, failedApiNode);
+  DBUG_VOID_RETURN;
+
 CONF:
+  jam();
   signal->theData[0] = failedApiNode;
   signal->theData[1] = reference();
-  sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
 
   DBUG_VOID_RETURN;
 }//execAPI_FAILREQ()
 
 void
+Suma::api_fail_block_cleanup_callback(Signal* signal,
+                                      Uint32 failedNodeId,
+                                      Uint32 elementsCleaned)
+{
+  jamEntry();
+
+  /* Suma should not have any block level elements
+   * to be cleaned (Fragmented send/receive structures etc.)
+   * As it only uses Fragmented send/receive locally
+   */
+  ndbassert(elementsCleaned == 0);
+
+  /* Node failure handling is complete */
+  signal->theData[0] = failedNodeId;
+  signal->theData[1] = reference();
+  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+  c_failedApiNodes.clear(failedNodeId);
+}
+
+void
+Suma::api_fail_block_cleanup(Signal* signal, Uint32 failedNode)
+{
+  jam();
+
+  Callback cb = {safe_cast(&Suma::api_fail_block_cleanup_callback),
+                 failedNode};
+  
+  simBlockNodeFailure(signal, failedNode, cb);
+}
+
+void
 Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
 {
   jam();
@@ -869,10 +910,7 @@ Suma::api_fail_subscriber_list(Signal* s
   if (iter.curr.isNull())
   {
     jam();
-    signal->theData[0] = nodeId;
-    signal->theData[1] = reference();
-    sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
-    c_failedApiNodes.clear(nodeId);
+    api_fail_block_cleanup(signal, nodeId);
     return;
   }
 
@@ -981,10 +1019,9 @@ Suma::api_fail_subscription(Signal* sign
   }
 
   c_subOpPool.release(subOpPtr);
-  signal->theData[0] = nodeId;
-  signal->theData[1] = reference();
-  sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
-  c_failedApiNodes.clear(nodeId);
+
+  /* Now do block level cleanup */
+  api_fail_block_cleanup(signal, nodeId);
 }
 
 void
@@ -1059,6 +1096,17 @@ Suma::execNODE_FAILREP(Signal* signal){
       }
     }
   }
+
+  /* Block level cleanup */
+  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+    jam();
+    if(failed.get(i)) {
+      jam();
+      Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+      ndbassert(elementsCleaned == 0); // As Suma has no remote fragmented signals
+      (void) elementsCleaned; // Avoid compiler error
+    }//if
+  }//for
   
   c_alive_nodes.assign(tmp);
   

=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp	2009-09-03 15:51:43 +0000
@@ -434,6 +434,10 @@ public:
   void api_fail_gci_list(Signal*, Uint32 node);
   void api_fail_subscriber_list(Signal*, Uint32 node);
   void api_fail_subscription(Signal*);
+  void api_fail_block_cleanup(Signal* signal, Uint32 failedNode);
+  void api_fail_block_cleanup_callback(Signal* signal,
+                                       Uint32 failedNodeId,
+                                       Uint32 elementsCleaned);
 
   void execSUB_GCP_COMPLETE_ACK(Signal* signal);
 

=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp	2009-08-21 09:26:34 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp	2009-09-03 15:51:43 +0000
@@ -30,6 +30,7 @@
 #include <signaldata/DumpStateOrd.hpp>
 #include <signaldata/TsmanContinueB.hpp>
 #include <signaldata/GetTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
 #include <dbtup/Dbtup.hpp>
 
 #define JONAS 0
@@ -60,6 +61,7 @@ Tsman::Tsman(Block_context& ctx,
   addRecSignal(GSN_READ_CONFIG_REQ, &Tsman::execREAD_CONFIG_REQ);
   addRecSignal(GSN_DUMP_STATE_ORD, &Tsman::execDUMP_STATE_ORD);
   addRecSignal(GSN_CONTINUEB, &Tsman::execCONTINUEB);
+  addRecSignal(GSN_NODE_FAILREP, &Tsman::execNODE_FAILREP);
 
   addRecSignal(GSN_CREATE_FILE_REQ, &Tsman::execCREATE_FILE_REQ);
   addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Tsman::execCREATE_FILEGROUP_REQ);
@@ -176,6 +178,26 @@ Tsman::execCONTINUEB(Signal* signal){
   ndbrequire(false);
 }
 
+void
+Tsman::execNODE_FAILREP(Signal* signal)
+{
+  jamEntry();
+  const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+  NdbNodeBitmask failed; 
+  failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+  /* Block level cleanup */
+  for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+    jam();
+    if(failed.get(i)) {
+      jam();
+      Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+      ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+      (void) elementsCleaned; // Remove compiler warning
+    }//if
+  }//for
+}
+
 #ifdef VM_TRACE
 struct TsmanChunk
 { 

=== modified file 'storage/ndb/src/kernel/blocks/tsman.hpp'
--- a/storage/ndb/src/kernel/blocks/tsman.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.hpp	2009-09-03 15:51:43 +0000
@@ -43,6 +43,7 @@ protected:
   void execREAD_CONFIG_REQ(Signal* signal);
   void execDUMP_STATE_ORD(Signal* signal);
   void execCONTINUEB(Signal* signal);
+  void execNODE_FAILREP(Signal* signal);
 
   void execCREATE_FILE_REQ(Signal* signal);
   void execCREATE_FILEGROUP_REQ(Signal* signal);

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2009-09-03 15:51:43 +0000
@@ -906,53 +906,87 @@ SimulatedBlock::execSIGNAL_DROPPED_REP(S
 void
 SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
   ljamEntry();
-  
-  Ptr<FragmentSendInfo> fragPtr;
-  
-  c_segmentedFragmentSendList.first(fragPtr);  
-  for(; !fragPtr.isNull();){
+
+  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+  ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
+
+  switch (sig->type)
+  {
+  case ContinueFragmented::CONTINUE_SENDING :
+  {
     ljam();
-    Ptr<FragmentSendInfo> copyPtr = fragPtr;
-    c_segmentedFragmentSendList.next(fragPtr);
+    Ptr<FragmentSendInfo> fragPtr;
     
-    sendNextSegmentedFragment(signal, * copyPtr.p);
-    if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
+    c_segmentedFragmentSendList.first(fragPtr);  
+    for(; !fragPtr.isNull();){
       ljam();
-      if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+      Ptr<FragmentSendInfo> copyPtr = fragPtr;
+      c_segmentedFragmentSendList.next(fragPtr);
+      
+      sendNextSegmentedFragment(signal, * copyPtr.p);
+      if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
         ljam();
-	execute(signal, copyPtr.p->m_callback, 0);
-      }//if
-      c_segmentedFragmentSendList.release(copyPtr);
+        if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+          ljam();
+          execute(signal, copyPtr.p->m_callback, 0);
+        }//if
+        c_segmentedFragmentSendList.release(copyPtr);
+      }
     }
-  }
-  
-  c_linearFragmentSendList.first(fragPtr);  
-  for(; !fragPtr.isNull();){
-    ljam(); 
-    Ptr<FragmentSendInfo> copyPtr = fragPtr;
-    c_linearFragmentSendList.next(fragPtr);
     
-    sendNextLinearFragment(signal, * copyPtr.p);
-    if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
-      ljam();
-      if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+    c_linearFragmentSendList.first(fragPtr);  
+    for(; !fragPtr.isNull();){
+      ljam(); 
+      Ptr<FragmentSendInfo> copyPtr = fragPtr;
+      c_linearFragmentSendList.next(fragPtr);
+      
+      sendNextLinearFragment(signal, * copyPtr.p);
+      if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
         ljam();
-	execute(signal, copyPtr.p->m_callback, 0);
-      }//if
-      c_linearFragmentSendList.release(copyPtr);
+        if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+          ljam();
+          execute(signal, copyPtr.p->m_callback, 0);
+        }//if
+        c_linearFragmentSendList.release(copyPtr);
+      }
     }
+    
+    if(c_segmentedFragmentSendList.isEmpty() && 
+       c_linearFragmentSendList.isEmpty()){
+      ljam();
+      c_fragSenderRunning = false;
+      return;
+    }
+    
+    sig->type = ContinueFragmented::CONTINUE_SENDING;
+    sig->line = __LINE__;
+    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
+    break;
   }
-  
-  if(c_segmentedFragmentSendList.isEmpty() && 
-     c_linearFragmentSendList.isEmpty()){
+  case ContinueFragmented::CONTINUE_CLEANUP:
+  {
     ljam();
-    c_fragSenderRunning = false;
-    return;
+    
+    const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
+    /* Check length of signal */
+    ndbassert(signal->getLength() ==
+              ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS + 
+              callbackWords);
+    
+    Callback cb;
+    memcpy(&cb, &sig->callbackStart, callbackWords << 2);
+
+    doNodeFailureCleanup(signal,
+                         sig->failedNodeId,
+                         sig->resource,
+                         sig->cursor,
+                         sig->elementsCleaned,
+                         cb);
+    break;
+  }
+  default:
+    ndbrequire(false);
   }
-  
-  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
-  sig->line = __LINE__;
-  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
 }
 
 void
@@ -1117,6 +1151,287 @@ SimulatedBlock::assembleFragments(Signal
   return false;
 }
 
+/**
+ * doCleanupFragInfo
+ * Iterate over block's Fragment assembly hash, looking
+ * for in-assembly fragments from the failed node
+ * Release these
+ * Returns after each scanned bucket to avoid consuming
+ * too much time.
+ *
+ * Parameters
+ *   failedNodeId    : Node id of failed node
+ *   cursor          : Hash bucket to start iteration from
+ *   rtUnitsUsed     : Total rt units used
+ *   elementsCleaned : Number of elements cleaned
+ *
+ * Updates
+ *   cursor          : Hash bucket to continue iteration from
+ *   rtUnitsUsed     : += units used
+ *   elementsCleaned : += elements cleaned
+ * 
+ * Returns
+ *   true  if all FragInfo structs cleaned up
+ *   false if more to do 
+ */
+bool
+SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
+                                  Uint32& cursor,
+                                  Uint32& rtUnitsUsed,
+                                  Uint32& elementsCleaned)
+{
+  ljam();
+  DLHashTable<FragmentInfo>::Iterator iter;
+  
+  c_fragmentInfoHash.next(cursor, iter);
+
+  const Uint32 startBucket = iter.bucket;
+
+  while (!iter.isNull() &&
+         (iter.bucket == startBucket))
+  {
+    ljam();
+
+    Ptr<FragmentInfo> curr = iter.curr;
+    c_fragmentInfoHash.next(iter);
+
+    FragmentInfo* fragInfo = curr.p;
+    
+    if (refToNode(fragInfo->m_senderRef) == failedNodeId)
+    {
+      ljam();
+      /* We were assembling a fragmented signal from the
+       * failed node, discard the partially assembled
+       * sections and free the FragmentInfo hash entry
+       */
+      for(Uint32 s = 0; s<3; s++)
+      {
+        if (fragInfo->m_sectionPtrI[s] != RNIL)
+        {
+          ljam();
+          SegmentedSectionPtr ssptr;
+          getSection(ssptr, fragInfo->m_sectionPtrI[s]);
+          release(ssptr);
+        }
+      }
+      
+      /* Release FragmentInfo hash element */
+      c_fragmentInfoHash.release(curr);
+
+      elementsCleaned++;
+      rtUnitsUsed+=3;
+    }
+      
+    rtUnitsUsed++;
+  } // while
+   
+  cursor = iter.bucket;
+  return iter.isNull();
+}
+
+bool
+SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
+                                  Uint32& cursor,
+                                  Uint32& rtUnitsUsed,
+                                  Uint32& elementsCleaned)
+{
+  ljam();
+  
+  Ptr<FragmentSendInfo> fragPtr;
+  const Uint32 NumSendLists = 2;
+  ndbrequire(cursor < NumSendLists);
+
+  DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] =
+    { &c_segmentedFragmentSendList,
+      &c_linearFragmentSendList };
+  
+  DLList<FragmentSendInfo>* list = fragSendLists[ cursor ];
+  
+  list->first(fragPtr);  
+  for(; !fragPtr.isNull();){
+    ljam();
+    Ptr<FragmentSendInfo> copyPtr = fragPtr;
+    list->next(fragPtr);
+    rtUnitsUsed++;
+
+    NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
+    
+    if (rg.m_nodes.get(failedNodeId))
+    {
+      ljam();
+      /* Fragmented signal is being sent to node */
+      rg.m_nodes.clear(failedNodeId);
+      
+      if (rg.m_nodes.isclear())
+      {
+        ljam();
+        /* No other nodes in receiver group - send
+         * is cancelled
+         * Will be cleaned up in the usual CONTINUE_FRAGMENTED
+         * handling code.
+         */
+        copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
+      }
+      elementsCleaned++;
+    }
+  }
+
+  /* Next time we'll do the next list */
+  cursor++;
+  
+  return (cursor == NumSendLists);
+}
+
+
+Uint32
+SimulatedBlock::doNodeFailureCleanup(Signal* signal,
+                                     Uint32 failedNodeId,
+                                     Uint32 resource,
+                                     Uint32 cursor,
+                                     Uint32 elementsCleaned,
+                                     Callback& cb)
+{
+  ljam();
+  const bool userCallback = (cb.m_callbackFunction != 0);
+  const Uint32 maxRtUnits = userCallback ?
+#ifdef VM_TRACE
+    2 :
+#else
+    16 :
+#endif 
+    ~0; /* Must complete all processing in this call */
+  
+  Uint32 rtUnitsUsed = 0;
+
+  /* Loop over resources, cleaning them up */
+  do
+  {
+    bool resourceDone= false;
+    switch(resource) {
+    case ContinueFragmented::RES_FRAGSEND:
+    {
+      ljam();
+      resourceDone = doCleanupFragSend(failedNodeId, cursor,
+                                       rtUnitsUsed, elementsCleaned);
+      break;
+    }
+    case ContinueFragmented::RES_FRAGINFO:
+    {
+      ljam();
+      resourceDone = doCleanupFragInfo(failedNodeId, cursor, 
+                                       rtUnitsUsed, elementsCleaned);
+      break;
+    }
+    case ContinueFragmented::RES_LAST:
+    {
+      ljam();
+      /* Node failure processing complete, execute user callback if provided */
+      if (userCallback)
+        execute(signal, cb, elementsCleaned);
+      
+      return elementsCleaned;
+    }
+    default:
+      ndbrequire(false);
+    }
+
+    /* Did we complete cleaning up this resource? */
+    if (resourceDone)
+    {
+      resource++;
+      cursor= 0;
+    }
+
+  } while (rtUnitsUsed <= maxRtUnits);
+  
+  ljam();
+
+  /* Not yet completed failure handling.
+   * Must have exhausted RT units.  
+   * Update cursor and re-invoke
+   */
+  ndbassert(userCallback);
+  
+  /* Send signal to continue processing */
+  
+  ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+  sig->type = ContinueFragmented::CONTINUE_CLEANUP;
+  sig->failedNodeId = failedNodeId;
+  sig->resource = resource;
+  sig->cursor = cursor;
+  sig->elementsCleaned= elementsCleaned;
+  Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
+  Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS + 
+    callbackWords;
+  ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
+  memcpy(&sig->callbackStart, &cb, callbackWords << 2);
+  
+  sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
+
+  return elementsCleaned;
+}
+  
+Uint32
+SimulatedBlock::simBlockNodeFailure(Signal* signal,
+                                    Uint32 failedNodeId, 
+                                    Callback& cb)
+{
+  ljam();
+  return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
+}
+
+Uint32
+SimulatedBlock::debugPrintFragmentCounts()
+{
+  const char* blockName = getBlockName(theNumber);
+  DLHashTable<FragmentInfo>::Iterator iter;
+  Uint32 fragmentInfoCount = 0;
+  c_fragmentInfoHash.first(iter);
+  
+  while(!iter.isNull())
+  {
+    fragmentInfoCount++;
+    c_fragmentInfoHash.next(iter);
+  }
+  
+  Ptr<FragmentSendInfo> ptr;
+  Uint32 linSendInfoCount = 0;
+
+  c_linearFragmentSendList.first(ptr);
+  
+  while (!ptr.isNull())
+  {
+    linSendInfoCount++;
+    c_linearFragmentSendList.next(ptr);
+  }
+  
+  Uint32 segSendInfoCount = 0;
+  c_segmentedFragmentSendList.first(ptr);
+  
+  while (!ptr.isNull())
+  {
+    segSendInfoCount++;
+    c_segmentedFragmentSendList.next(ptr);
+  }
+
+  ndbout_c("%s : Fragment assembly hash entry count : %d", 
+           blockName,
+           fragmentInfoCount);
+
+  ndbout_c("%s : Linear fragment send list size : %d", 
+           blockName,
+           linSendInfoCount);
+
+  ndbout_c("%s : Segmented fragment send list size : %d", 
+           blockName,
+           segSendInfoCount);
+
+  return fragmentInfoCount + 
+    linSendInfoCount +
+    segSendInfoCount;
+}
+
+
 bool
 SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
 				  NodeReceiverGroup rg, 
@@ -1207,6 +1522,37 @@ void
 SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
 					  FragmentSendInfo & info){
   
+  if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
+  {
+    /* Send was cancelled - all dest. nodes have failed
+     * since send was started
+     *
+     * Free any sections still to be sent
+     */
+    Uint32 secCount = 0;
+    SegmentedSectionPtr ssptr[3];
+    for (Uint32 s = 0; s < 3; s++)
+    {
+      Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
+      if (sectionI != RNIL)
+      {
+        getSection(ssptr[secCount], sectionI);
+        info.m_sectionPtr[s].m_segmented.i = RNIL;
+        info.m_sectionPtr[s].m_segmented.p = NULL;
+        secCount++;
+      }
+    }
+    
+    ::releaseSections(secCount, ssptr);
+    
+    /* Free inline signal data storage section */
+    Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
+    g_sectionSegmentPool.release(inlineDataI);
+    
+    info.m_status = FragmentSendInfo::SendComplete;
+    return;
+  }
+
   /**
    * Store "theData"
    */
@@ -1446,6 +1792,19 @@ void
 SimulatedBlock::sendNextLinearFragment(Signal* signal,
 				       FragmentSendInfo & info){
   
+  if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
+  {
+    /* Send was cancelled - all dest. nodes have failed
+     * since send was started
+     */
+    /* Free inline signal data storage section */
+    Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
+    g_sectionSegmentPool.release(inlineDataI);
+    
+    info.m_status = FragmentSendInfo::SendComplete;
+    return;
+  }
+
   /**
    * Store "theData"
    */
@@ -1617,8 +1976,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
   if(!c_fragSenderRunning){
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+    sig->type = ContinueFragmented::CONTINUE_SENDING;
     sig->line = __LINE__;
-    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
   }
 }
 
@@ -1655,8 +2015,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
   if(!c_fragSenderRunning){
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+    sig->type = ContinueFragmented::CONTINUE_SENDING;
     sig->line = __LINE__;
-    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
   }
 }
 
@@ -1704,8 +2065,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
   if(!c_fragSenderRunning){
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+    sig->type = ContinueFragmented::CONTINUE_SENDING;
     sig->line = __LINE__;
-    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
   }
 }
 
@@ -1746,8 +2108,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
   if(!c_fragSenderRunning){
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+    sig->type = ContinueFragmented::CONTINUE_SENDING;
     sig->line = __LINE__;
-    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+    sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
   }
 }
 

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp	2009-09-03 15:51:43 +0000
@@ -237,6 +237,37 @@ protected:
 			    Callback &,
 			    Uint32 messageSize = 240);
 
+  /**
+   * simBlockNodeFailure
+   *
+   * Method must be called by blocks that send or receive 
+   * remote Fragmented Signals when they detect a node 
+   * (NDBD or API) failure.
+   * If the block needs to acknowledge or perform further
+   * processing after completing block-level node failure 
+   * handling, it can supply a Callback which will be invoked 
+   * when block-level node failure handling has completed.
+   * Otherwise TheEmptyCallback is used.
+   * If TheEmptyCallback is used, all failure handling is
+   * performed in the current timeslice, to avoid any
+   * races.
+   * 
+   * Parameters
+   *   signal       : Current signal*
+   *   failedNodeId : Node id of failed node
+   *   cb           : Callback to be executed when block-level
+   *                  node failure handling completed.
+   *                  TheEmptyCallback is passed if no further
+   *                  processing is required.
+   * Returns
+   *   Number of 'resources' cleaned up in call.
+   *   Callback return code is total resources cleaned up.
+   *   
+   */
+  Uint32 simBlockNodeFailure(Signal* signal,
+                             Uint32 failedNodeId,
+                             Callback& cb = TheEmptyCallback);
+
   /**********************************************************
    * Fragmented signals structures
    */
@@ -273,7 +304,8 @@ protected:
     
     enum Status {
       SendNotComplete = 0,
-      SendComplete    = 1
+      SendComplete    = 1,
+      SendCancelled   = 2
     };
     Uint8  m_status;
     Uint8  m_prio;
@@ -355,6 +387,23 @@ private:
   const NodeId         theNodeId;
   const BlockNumber    theNumber;
   const BlockReference theReference;
+
+  Uint32 doNodeFailureCleanup(Signal* signal,
+                              Uint32 failedNodeId,
+                              Uint32 resource,
+                              Uint32 cursor,
+                              Uint32 elementsCleaned,
+                              Callback& cb);
+
+  bool doCleanupFragInfo(Uint32 failedNodeId,
+                         Uint32& cursor,
+                         Uint32& rtUnitsUsed,
+                         Uint32& elementsCleaned);
+
+  bool doCleanupFragSend(Uint32 failedNodeId,
+                         Uint32& cursor,
+                         Uint32& rtUnitsUsed,
+                         Uint32& elementsCleaned);
   
 protected:
   Block_context m_ctx;
@@ -467,6 +516,9 @@ private:
   ArrayPool<FragmentSendInfo> c_fragmentSendPool;
   DLList<FragmentSendInfo> c_linearFragmentSendList;
   DLList<FragmentSendInfo> c_segmentedFragmentSendList;
+
+protected:
+  Uint32 debugPrintFragmentCounts();
   
 public: 
   class MutexManager {

Thread
bzr commit into mysql-5.1-telco-6.2 branch (frazer:2976) Bug#44607Frazer Clement3 Sep