#At file:///home/frazer/bzr/mysql-5.1-telco-6.2/
2993 Frazer Clement 2009-09-10
Bug#44607 : Fragmented signal node failure handling
modified:
storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp
storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp
storage/ndb/include/kernel/signaldata/NodeFailRep.hpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp
storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp
storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp
storage/ndb/src/kernel/blocks/lgman.cpp
storage/ndb/src/kernel/blocks/lgman.hpp
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
storage/ndb/src/kernel/blocks/suma/Suma.hpp
storage/ndb/src/kernel/blocks/tsman.cpp
storage/ndb/src/kernel/blocks/tsman.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
=== modified file 'storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp'
--- a/storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp 2009-09-10 14:49:18 +0000
@@ -32,7 +32,33 @@ class ContinueFragmented {
public:
private:
- Uint32 line;
+ enum {
+ CONTINUE_SENDING = 0,
+ CONTINUE_CLEANUP = 1
+ };
+
+ STATIC_CONST(CONTINUE_CLEANUP_FIXED_WORDS = 5);
+
+ enum {
+ RES_FRAGSEND = 0, /* Fragmented send lists */
+ RES_FRAGINFO = 1, /* Fragmented signal assembly hash */
+ RES_LAST = 2 /* Must be last */
+ };
+
+ Uint32 type;
+
+ union
+ {
+ Uint32 line; /* For CONTINUE_SENDING */
+ struct /* For CONTINUE_CLEANUP */
+ {
+ Uint32 failedNodeId;
+ Uint32 resource;
+ Uint32 cursor;
+ Uint32 elementsCleaned;
+ Uint32 callbackStart; /* Callback structure placed here */
+ };
+ };
};
#endif
=== modified file 'storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp'
--- a/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/DumpStateOrd.hpp 2009-09-10 14:49:18 +0000
@@ -124,6 +124,7 @@ public:
to be able to debug if events
for some reason does not end up
in clusterlog */
+ CmvmiTestLongSig = 2605, /* Long signal testing trigger */
LCPContinue = 5900,
// 7000 DIH
// 7001 DIH
=== modified file 'storage/ndb/include/kernel/signaldata/NodeFailRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/NodeFailRep.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/NodeFailRep.hpp 2009-09-10 14:49:18 +0000
@@ -24,7 +24,8 @@
/**
* This signals is sent by Qmgr to NdbCntr
- * and then from NdbCntr sent to: dih, dict, lqh, tc & API
+ * and then from NdbCntr sent to: dih, dict, lqh, tc, API
+ * and others
*/
struct NodeFailRep {
STATIC_CONST( SignalLength = 3 + NdbNodeBitmask::Size );
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2009-09-10 14:49:18 +0000
@@ -850,6 +850,18 @@ Backup::execNODE_FAILREP(Signal* signal)
jam();
checkNodeFail(signal, ptr, newCoordinator, theFailedNodes);
}
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(NdbNodeBitmask::get(theFailedNodes, i))
+ {
+ jam();
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+ ndbassert(elementsCleaned == 0); // Backup should have no distributed frag signals
+ (void) elementsCleaned; // Remove compiler warning
+ }//if
+ }//for
}
bool
=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2009-09-01 10:50:11 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2009-09-10 14:49:18 +0000
@@ -1152,6 +1152,16 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal
sendSignal(reference(), GSN_TESTSIG, signal, 8, JBB, ptr, 2);
}
+ if (arg == DumpStateOrd::CmvmiTestLongSig)
+ {
+ /* Forward as GSN_TESTSIG to self */
+ Uint32 numArgs= signal->length() - 1;
+ memmove(signal->getDataPtrSend(),
+ signal->getDataPtrSend() + 1,
+ numArgs << 2);
+ sendSignal(reference(), GSN_TESTSIG, signal, numArgs, JBB);
+ }
+
#ifdef ERROR_INSERT
if (arg == 9000 || arg == 9002)
{
@@ -1265,6 +1275,379 @@ Cmvmi::execNODE_START_REP(Signal* signal
BLOCK_FUNCTIONS(Cmvmi)
+void
+Cmvmi::startFragmentedSend(Signal* signal,
+ Uint32 variant,
+ Uint32 numSigs,
+ NodeReceiverGroup rg)
+{
+ Uint32* sigData = signal->getDataPtrSend();
+ const Uint32 sigLength = 6;
+ const Uint32 sectionWords = 240;
+ Uint32 sectionData[ sectionWords ];
+
+ for (Uint32 i = 0; i < sectionWords; i++)
+ sectionData[ i ] = i;
+
+ const Uint32 secCount = 1;
+ LinearSectionPtr ptr[3];
+ ptr[0].sz = sectionWords;
+ ptr[0].p = §ionData[0];
+
+ for (Uint32 i = 0; i < numSigs; i++)
+ {
+ sigData[0] = variant;
+ sigData[1] = 31;
+ sigData[2] = 0;
+ sigData[3] = 1; // print
+ sigData[4] = 0;
+ sigData[5] = sectionWords;
+
+ if ((i & 1) == 0)
+ {
+ DEBUG("Starting linear fragmented send (" << i + 1
+ << "/" << numSigs << ")");
+
+ /* Linear send */
+ /* Todo : Avoid reading from invalid stackptr in CONTINUEB */
+ sendFragmentedSignal(rg,
+ GSN_TESTSIG,
+ signal,
+ sigLength,
+ JBB,
+ ptr,
+ secCount,
+ TheEmptyCallback,
+ 90); // messageSize
+ }
+ else
+ {
+ /* Segmented send */
+ DEBUG("Starting segmented fragmented send (" << i + 1
+ << "/" << numSigs << ")");
+ Ptr<SectionSegment> segPtr;
+ ndbrequire(import(segPtr, sectionData, sectionWords));
+ SegmentedSectionPtr ssPtr;
+ getSection(ssPtr, segPtr.i);
+
+ signal->header.m_noOfSections = 1;
+ signal->m_sectionPtr[0] = ssPtr;
+
+ sendFragmentedSignal(rg,
+ GSN_TESTSIG,
+ signal,
+ sigLength,
+ JBB,
+ TheEmptyCallback,
+ 90); // messageSize
+ }
+ }
+}
+
+void
+Cmvmi::testNodeFailureCleanupCallback(Signal* signal, Uint32 data, Uint32 elementsCleaned)
+{
+ DEBUG("testNodeFailureCleanupCallback");
+ DEBUG("Data : " << data
+ << " elementsCleaned : " << elementsCleaned);
+
+ debugPrintFragmentCounts();
+
+ Uint32 variant = data & 0xffff;
+ Uint32 testType = (data >> 16) & 0xffff;
+
+ DEBUG("Sending trigger(" << testType
+ << ") variant " << variant
+ << " to self to cleanup any fragments that arrived "
+ << "before send was cancelled");
+
+ Uint32* sigData = signal->getDataPtrSend();
+ sigData[0] = variant;
+ sigData[1] = testType;
+ sendSignal(reference(), GSN_TESTSIG, signal, 2, JBB);
+
+ return;
+}
+
+void
+Cmvmi::testFragmentedCleanup(Signal* signal, Uint32 testType, Uint32 variant)
+{
+ DEBUG("TestType " << testType << " variant " << variant);
+ debugPrintFragmentCounts();
+
+ /* Variants :
+ * Local fragmented send Multicast fragmented send
+ * 0 : Immediate cleanup Immediate cleanup
+ * 1 : Continued cleanup Immediate cleanup
+ * 2 : Immediate cleanup Continued cleanup
+ * 3 : Continued cleanup Continued cleanup
+ */
+ const Uint32 NUM_VARIANTS = 4;
+ if (variant >= NUM_VARIANTS)
+ {
+ DEBUG("Unsupported variant");
+ releaseSections(signal);
+ return;
+ }
+
+ /* Test from ndb_mgm with
+ * <node(s)> DUMP 2605 0 30
+ *
+ * Use
+ * <node(s)> DUMP 2605 0 39 to get fragment resource usage counts
+ * Use
+ * <node(s)> DUMP 2601 to get segment usage counts in clusterlog
+ */
+ if (testType == 30)
+ {
+ /* Send the first fragment of a fragmented signal to self
+ * Receiver will allocate assembly hash entries
+ * which must be freed when node failure cleanup
+ * executes later
+ */
+ const Uint32 sectionWords = 240;
+ Uint32 sectionData[ sectionWords ];
+
+ for (Uint32 i = 0; i < sectionWords; i++)
+ sectionData[ i ] = i;
+
+ const Uint32 secCount = 1;
+ LinearSectionPtr ptr[3];
+ ptr[0].sz = sectionWords;
+ ptr[0].p = §ionData[0];
+
+ /* Send signal with testType == 31 */
+ NodeReceiverGroup me(reference());
+ Uint32* sigData = signal->getDataPtrSend();
+ const Uint32 sigLength = 6;
+ const Uint32 numPartialSigs = 4;
+ /* Not too many as CMVMI's fragInfo hash is limited size */
+ // TODO : Consider making it debug-larger to get
+ // more coverage on CONTINUEB path
+
+ for (Uint32 i = 0; i < numPartialSigs; i++)
+ {
+ /* Fill in messy TESTSIG format */
+ sigData[0] = variant;
+ sigData[1] = 31;
+ sigData[2] = 0;
+ sigData[3] = 0; // print
+ sigData[4] = 0;
+ sigData[5] = sectionWords;
+
+ FragmentSendInfo fsi;
+
+ DEBUG("Sending first fragment to self");
+ sendFirstFragment(fsi,
+ me,
+ GSN_TESTSIG,
+ signal,
+ sigLength,
+ JBB,
+ ptr,
+ secCount,
+ 90); // FragmentLength
+
+ DEBUG("Cancelling remainder to free internal section");
+ fsi.m_status = FragmentSendInfo::SendCancelled;
+ sendNextLinearFragment(signal, fsi);
+ };
+
+ /* Ok, now send short signal with testType == 32
+ * to trigger 'remote-side' actions in middle of
+ * multiple fragment assembly
+ */
+ sigData[0] = variant;
+ sigData[1] = 32;
+
+ DEBUG("Sending node fail trigger to self");
+ sendSignal(me, GSN_TESTSIG, signal, 2, JBB);
+ return;
+ }
+
+ if (testType == 31)
+ {
+ /* Just release sections - execTESTSIG() has shown sections received */
+ releaseSections(signal);
+ return;
+ }
+
+ if (testType == 32)
+ {
+ /* 'Remote side' trigger to clean up fragmented signal resources */
+ BlockReference senderRef = signal->getSendersBlockRef();
+ Uint32 sendingNode = refToNode(senderRef);
+
+ /* Start sending some linear and fragmented responses to the
+ * sender, to exercise frag-send cleanup code when we execute
+ * node-failure later
+ */
+ DEBUG("Starting fragmented send using continueB back to self");
+
+ NodeReceiverGroup sender(senderRef);
+ startFragmentedSend(signal, variant, 6, sender);
+
+ debugPrintFragmentCounts();
+
+ Uint32 cbData= (((Uint32) 33) << 16) | variant;
+ Callback cb = { safe_cast(&Cmvmi::testNodeFailureCleanupCallback),
+ cbData };
+
+ Callback* cbPtr = NULL;
+
+ bool passCallback = variant & 1;
+
+ if (passCallback)
+ {
+ DEBUG("Running simBlock failure code WITH CALLBACK for node "
+ << sendingNode);
+ cbPtr = &cb;
+ }
+ else
+ {
+ DEBUG("Running simBlock failure code IMMEDIATELY (no callback) for node "
+ << sendingNode);
+ cbPtr = &TheEmptyCallback;
+ }
+
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, sendingNode, *cbPtr);
+
+ DEBUG("Elements cleaned by call : " << elementsCleaned);
+
+ debugPrintFragmentCounts();
+
+ if (! passCallback)
+ {
+ DEBUG("Variant " << variant << " manually executing callback");
+ /* We call the callback inline here to continue processing */
+ testNodeFailureCleanupCallback(signal,
+ cbData,
+ elementsCleaned);
+ }
+
+ return;
+ }
+
+ if (testType == 33)
+ {
+ /* Original side - receive cleanup trigger from 'remote' side
+ * after node failure cleanup performed there. We may have
+ * fragments it managed to send before the cleanup completed
+ * so we'll get rid of them.
+ * This would not be necessary in reality as this node would
+ * be failed
+ */
+ Uint32 sendingNode = refToNode(signal->getSendersBlockRef());
+ DEBUG("Running simBlock failure code for node " << sendingNode);
+
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, sendingNode);
+
+ DEBUG("Elements cleaned : " << elementsCleaned);
+
+ /* Should have no fragment resources in use now */
+ ndbrequire(debugPrintFragmentCounts() == 0);
+
+ /* Now use ReceiverGroup to multicast a fragmented signal to
+ * all database nodes
+ */
+ DEBUG("Starting to send fragmented continueB to all nodes inc. self : ");
+ NodeReceiverGroup allNodes(CMVMI, c_dbNodes);
+
+ unsigned nodeId = 0;
+ while((nodeId = c_dbNodes.find(nodeId+1)) != BitmaskImpl::NotFound)
+ {
+ DEBUG("Node " << nodeId);
+ }
+
+ startFragmentedSend(signal, variant, 8, allNodes);
+
+ debugPrintFragmentCounts();
+
+ Uint32 cbData= (((Uint32) 34) << 16) | variant;
+ Callback cb = { safe_cast(&Cmvmi::testNodeFailureCleanupCallback),
+ cbData };
+
+ Callback* cbPtr = NULL;
+
+ bool passCallback = variant & 2;
+
+ if (passCallback)
+ {
+ DEBUG("Running simBlock failure code for self WITH CALLBACK ("
+ << getOwnNodeId() << ")");
+ cbPtr= &cb;
+ }
+ else
+ {
+ DEBUG("Running simBlock failure code for self IMMEDIATELY (no callback) ("
+ << getOwnNodeId() << ")");
+ cbPtr= &TheEmptyCallback;
+ }
+
+
+ /* Fragmented signals being sent will have this node removed
+ * from their receiver group, but will keep sending to the
+ * other node(s).
+ * Other node(s) should therefore receive the complete signals.
+ * We will then receive only the first fragment of each of
+ * the signals which must be removed later.
+ */
+ elementsCleaned = simBlockNodeFailure(signal, getOwnNodeId(), *cbPtr);
+
+ DEBUG("Elements cleaned : " << elementsCleaned);
+
+ debugPrintFragmentCounts();
+
+ /* Callback will send a signal to self to clean up fragments that
+ * were sent to self before the send was cancelled.
+ * (Again, unnecessary in a 'real' situation
+ */
+ if (!passCallback)
+ {
+ DEBUG("Variant " << variant << " manually executing callback");
+
+ testNodeFailureCleanupCallback(signal,
+ cbData,
+ elementsCleaned);
+ }
+
+ return;
+ }
+
+ if (testType == 34)
+ {
+ /* Cleanup fragments which were sent before send was cancelled. */
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, getOwnNodeId());
+
+ DEBUG("Elements cleaned " << elementsCleaned);
+
+ /* All FragInfo should be clear, may still be sending some
+ * to other node(s)
+ */
+ debugPrintFragmentCounts();
+
+ DEBUG("Variant " << variant << " completed.");
+
+ if (++variant < NUM_VARIANTS)
+ {
+ DEBUG("Re-executing with variant " << variant);
+ Uint32* sigData = signal->getDataPtrSend();
+ sigData[0] = variant;
+ sigData[1] = 30;
+ sendSignal(reference(), GSN_TESTSIG, signal, 2, JBB);
+ }
+// else
+// {
+// // Infinite loop to test for leaks
+// DEBUG("Back to zero");
+// Uint32* sigData = signal->getDataPtrSend();
+// sigData[0] = 0;
+// sigData[1] = 30;
+// sendSignal(reference(), GSN_TESTSIG, signal, 2, JBB);
+// }
+ }
+}
+
static Uint32 g_print;
static LinearSectionPtr g_test[3];
@@ -1337,6 +1720,16 @@ Cmvmi::execTESTSIG(Signal* signal){
NodeReceiverGroup rg(CMVMI, c_dbNodes);
+ /**
+ * Testing SimulatedBlock fragment assembly cleanup
+ */
+ if ((testType >= 30) &&
+ (testType < 40))
+ {
+ testFragmentedCleanup(signal, testType, ref);
+ return;
+ }
+
if(signal->getSendersBlockRef() == ref){
/**
* Signal from API (not via NodeReceiverGroup)
=== modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp'
--- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp 2009-09-10 14:49:18 +0000
@@ -114,6 +114,9 @@ private:
Cmvmi(const Cmvmi &obj);
void operator = (const Cmvmi &);
+ void startFragmentedSend(Signal* signal, Uint32 variant, Uint32 numSigs, NodeReceiverGroup rg);
+ void testNodeFailureCleanupCallback(Signal* signal, Uint32 variant, Uint32 elementsCleaned);
+ void testFragmentedCleanup(Signal* signal, Uint32 testType, Uint32 variant);
void sendFragmentedComplete(Signal* signal, Uint32 data, Uint32 returnCode);
};
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2009-09-09 08:57:13 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2009-09-10 14:49:18 +0000
@@ -3896,6 +3896,17 @@ Dbdict::restartDropObj_commit_complete_d
/* ---------------------------------------------------------------- */
/* **************************************************************** */
+void Dbdict::handleApiFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jamEntry();
+
+ signal->theData[0] = failedNodeId;
+ signal->theData[1] = reference();
+ sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+}
+
/* ---------------------------------------------------------------- */
// We receive a report of an API that failed.
/* ---------------------------------------------------------------- */
@@ -3913,11 +3924,27 @@ void Dbdict::execAPI_FAILREQ(Signal* sig
}//if
#endif
- signal->theData[0] = failedApiNode;
- signal->theData[1] = reference();
- sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+ ndbrequire(retRef == QMGR_REF); // As callback hard-codes QMGR_REF
+ Callback cb = { safe_cast(&Dbdict::handleApiFailureCallback),
+ failedApiNode };
+ simBlockNodeFailure(signal, failedApiNode, cb);
}//execAPI_FAILREQ()
+void Dbdict::handleNdbdFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jamEntry();
+
+ /* Node failure handling is complete */
+ NFCompleteRep * const nfCompRep = (NFCompleteRep *)&signal->theData[0];
+ nfCompRep->blockNo = DBDICT;
+ nfCompRep->nodeId = getOwnNodeId();
+ nfCompRep->failedNodeId = failedNodeId;
+ sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal,
+ NFCompleteRep::SignalLength, JBB);
+}
+
/* ---------------------------------------------------------------- */
// We receive a report of one or more node failures of kernel nodes.
/* ---------------------------------------------------------------- */
@@ -3981,14 +4008,12 @@ void Dbdict::execNODE_FAILREP(Signal* si
c_nodes.getPtr(nodePtr, i);
nodePtr.p->nodeState = NodeRecord::NDB_NODE_DEAD;
- NFCompleteRep * const nfCompRep = (NFCompleteRep *)&signal->theData[0];
- nfCompRep->blockNo = DBDICT;
- nfCompRep->nodeId = getOwnNodeId();
- nfCompRep->failedNodeId = nodePtr.i;
- sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal,
- NFCompleteRep::SignalLength, JBB);
-
c_aliveNodes.clear(i);
+
+ Callback cb = {safe_cast(&Dbdict::handleNdbdFailureCallback),
+ i};
+
+ simBlockNodeFailure(signal, nodePtr.i, cb);
}//if
}//for
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2009-08-21 09:35:23 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2009-09-10 14:49:18 +0000
@@ -2153,7 +2153,12 @@ private:
// NF handling
void removeStaleDictLocks(Signal* signal, const Uint32* theFailedNodes);
-
+ void handleNdbdFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc);
+ void handleApiFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc);
// Statement blocks
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2009-09-01 12:27:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2009-09-10 14:49:18 +0000
@@ -942,11 +942,12 @@ public:
enum NodeFailBits
{
- NF_TAKEOVER = 0x1,
- NF_CHECK_SCAN = 0x2,
- NF_CHECK_TRANSACTION = 0x4,
- NF_CHECK_DROP_TAB = 0x8,
- NF_NODE_FAIL_BITS = 0xF // All bits...
+ NF_TAKEOVER = 0x01,
+ NF_CHECK_SCAN = 0x02,
+ NF_CHECK_TRANSACTION = 0x04,
+ NF_CHECK_DROP_TAB = 0x08,
+ NF_BLOCK_HANDLE = 0x10,
+ NF_NODE_FAIL_BITS = 0x1F // All bits...
};
Uint32 m_nf_bits;
NdbNodeBitmask m_lqh_trans_conf;
@@ -1634,7 +1635,10 @@ private:
LocalDLList<ScanFragRec>::Head&);
void nodeFailCheckTransactions(Signal*,Uint32 transPtrI,Uint32 failedNodeId);
+ void ndbdFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
void checkNodeFailComplete(Signal* signal, Uint32 failedNodeId, Uint32 bit);
+
+ void apiFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
// Initialisation
void initData();
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2009-09-01 12:27:40 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2009-09-10 14:49:18 +0000
@@ -1157,13 +1157,15 @@ Dbtc::removeMarkerForFailedAPI(Signal* s
capiConnectClosing[nodeId]--;
if (capiConnectClosing[nodeId] == 0) {
jam();
+
/********************************************************************/
// No outstanding ABORT or COMMIT's of this failed API node.
- // We can respond with API_FAILCONF
+ // Perform SimulatedBlock level cleanup before sending
+ // API_FAILCONF
/********************************************************************/
- signal->theData[0] = nodeId;
- signal->theData[1] = cownref;
- sendSignal(capiFailRef, GSN_API_FAILCONF, signal, 2, JBB);
+ Callback cb = {safe_cast(&Dbtc::apiFailBlockCleanupCallback),
+ nodeId};
+ simBlockNodeFailure(signal, nodeId, cb);
}
return;
}
@@ -7378,6 +7380,9 @@ void Dbtc::execNODE_FAILREP(Signal* sign
checkScanActiveInFailedLqh(signal, 0, hostptr.i);
checkWaitDropTabFailedLqh(signal, hostptr.i, 0); // nodeid, tableid
nodeFailCheckTransactions(signal, 0, hostptr.i);
+ Callback cb = {safe_cast(&Dbtc::ndbdFailBlockCleanupCallback),
+ hostptr.i};
+ simBlockNodeFailure(signal, hostptr.i, cb);
}
}//Dbtc::execNODE_FAILREP()
@@ -7517,6 +7522,28 @@ Dbtc::nodeFailCheckTransactions(Signal*
}
}
+void
+Dbtc::ndbdFailBlockCleanupCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jamEntry();
+
+ checkNodeFailComplete(signal, failedNodeId,
+ HostRecord::NF_BLOCK_HANDLE);
+}
+
+void
+Dbtc::apiFailBlockCleanupCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jamEntry();
+
+ signal->theData[0] = failedNodeId;
+ signal->theData[1] = cownref;
+ sendSignal(capiFailRef, GSN_API_FAILCONF, signal, 2, JBB);
+}
void
Dbtc::checkScanFragList(Signal* signal,
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2009-09-04 10:15:28 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2009-09-10 14:49:18 +0000
@@ -1714,6 +1714,7 @@ private:
void execALTER_TAB_REQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
// Ordered index related
void execBUILDINDXREQ(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2009-09-10 14:49:18 +0000
@@ -32,6 +32,7 @@
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/TupCommit.hpp>
#include <signaldata/TupKey.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include <signaldata/DropTab.hpp>
#include <SLList.hpp>
@@ -65,6 +66,7 @@ Dbtup::Dbtup(Block_context& ctx, Pgman*
addRecSignal(GSN_DEBUG_SIG, &Dbtup::execDEBUG_SIG);
addRecSignal(GSN_CONTINUEB, &Dbtup::execCONTINUEB);
addRecSignal(GSN_LCP_FRAG_ORD, &Dbtup::execLCP_FRAG_ORD);
+ addRecSignal(GSN_NODE_FAILREP, &Dbtup::execNODE_FAILREP);
addRecSignal(GSN_DUMP_STATE_ORD, &Dbtup::execDUMP_STATE_ORD);
addRecSignal(GSN_SEND_PACKED, &Dbtup::execSEND_PACKED);
@@ -773,4 +775,22 @@ void Dbtup::releaseFragrec(FragrecordPtr
}//Dbtup::releaseFragrec()
+void Dbtup::execNODE_FAILREP(Signal* signal)
+{
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+ ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+ (void) elementsCleaned; // Remove compiler warning
+ }//if
+ }//for
+}
=== modified file 'storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp'
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp 2009-09-10 14:49:18 +0000
@@ -29,6 +29,7 @@
#include <signaldata/TcKeyFailConf.hpp>
#include <signaldata/GetTabInfo.hpp>
#include <signaldata/DictTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include <signaldata/UtilSequence.hpp>
#include <signaldata/UtilPrepare.hpp>
@@ -66,6 +67,7 @@ DbUtil::DbUtil(Block_context& ctx) :
addRecSignal(GSN_NDB_STTOR, &DbUtil::execNDB_STTOR);
addRecSignal(GSN_DUMP_STATE_ORD, &DbUtil::execDUMP_STATE_ORD);
addRecSignal(GSN_CONTINUEB, &DbUtil::execCONTINUEB);
+ addRecSignal(GSN_NODE_FAILREP, &DbUtil::execNODE_FAILREP);
//addRecSignal(GSN_TCSEIZEREF, &DbUtil::execTCSEIZEREF);
addRecSignal(GSN_TCSEIZECONF, &DbUtil::execTCSEIZECONF);
@@ -304,6 +306,25 @@ DbUtil::execCONTINUEB(Signal* signal){
}
void
+DbUtil::execNODE_FAILREP(Signal* signal){
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+ ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+ (void) elementsCleaned; // Remove compiler warning
+ }//if
+ }//for
+}
+
+void
DbUtil::execDUMP_STATE_ORD(Signal* signal){
jamEntry();
=== modified file 'storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp'
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp 2009-09-10 14:49:18 +0000
@@ -76,6 +76,7 @@ protected:
void execNDB_STTOR(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
/**
* Sequence Service : Public interface
=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp 2009-08-21 09:26:34 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2009-09-10 14:49:18 +0000
@@ -29,6 +29,7 @@
#include <signaldata/SumaImpl.hpp>
#include <signaldata/LgmanContinueB.hpp>
#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include "ndbfs/Ndbfs.hpp"
#include "dbtup/Dbtup.hpp"
@@ -66,6 +67,7 @@ Lgman::Lgman(Block_context & ctx) :
addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
+ addRecSignal(GSN_NODE_FAILREP, &Lgman::execNODE_FAILREP);
addRecSignal(GSN_CREATE_FILE_REQ, &Lgman::execCREATE_FILE_REQ);
addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Lgman::execCREATE_FILEGROUP_REQ);
@@ -258,6 +260,26 @@ Lgman::execCONTINUEB(Signal* signal){
}
void
+Lgman::execNODE_FAILREP(Signal* signal)
+{
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+ ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+ (void) elementsCleaned; // Remove compiler warning
+ }//if
+ }//for
+}
+
+void
Lgman::execDUMP_STATE_ORD(Signal* signal){
jamEntry();
if(signal->theData[0] == 12001)
=== modified file 'storage/ndb/src/kernel/blocks/lgman.hpp'
--- a/storage/ndb/src/kernel/blocks/lgman.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.hpp 2009-09-10 14:49:18 +0000
@@ -47,6 +47,7 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
void execCREATE_FILE_REQ(Signal* signal);
void execCREATE_FILEGROUP_REQ(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2009-08-03 11:28:27 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2009-09-10 14:49:18 +0000
@@ -1705,6 +1705,18 @@ void Ndbcntr::execNODE_FAILREP(Signal* s
sendSignal(QMGR_REF, GSN_NODE_FAILREP, signal,
NodeFailRep::SignalLength, JBB);
+ sendSignal(DBUTIL_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
+ sendSignal(DBTUP_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
+ sendSignal(TSMAN_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
+ sendSignal(LGMAN_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
if (c_stopRec.stopReq.senderRef)
{
jam();
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2009-09-10 14:49:18 +0000
@@ -717,20 +717,24 @@ void Suma::execAPI_FAILREQ(Signal* signa
jamEntry();
DBUG_ENTER("Suma::execAPI_FAILREQ");
Uint32 failedApiNode = signal->theData[0];
- BlockReference retRef = signal->theData[1];
+ ndbrequire(signal->theData[1] == QMGR_REF); // As callback hard-codes QMGR
c_connected_nodes.clear(failedApiNode);
if (c_failedApiNodes.get(failedApiNode))
{
jam();
+ /* Being handled already, just conf */
goto CONF;
}
if (!c_subscriber_nodes.get(failedApiNode))
{
jam();
- goto CONF;
+ /* No Subscribers on that node, no SUMA
+ * specific work to do
+ */
+ goto BLOCK_CLEANUP;
}
c_failedApiNodes.set(failedApiNode);
@@ -744,15 +748,52 @@ void Suma::execAPI_FAILREQ(Signal* signa
sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
return;
+BLOCK_CLEANUP:
+ jam();
+ api_fail_block_cleanup(signal, failedApiNode);
+ DBUG_VOID_RETURN;
+
CONF:
+ jam();
signal->theData[0] = failedApiNode;
signal->theData[1] = reference();
- sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+ sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
DBUG_VOID_RETURN;
}//execAPI_FAILREQ()
void
+Suma::api_fail_block_cleanup_callback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 elementsCleaned)
+{
+ jamEntry();
+
+ /* Suma should not have any block level elements
+ * to be cleaned (Fragmented send/receive structures etc.)
+ * As it only uses Fragmented send/receive locally
+ */
+ ndbassert(elementsCleaned == 0);
+
+ /* Node failure handling is complete */
+ signal->theData[0] = failedNodeId;
+ signal->theData[1] = reference();
+ sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+ c_failedApiNodes.clear(failedNodeId);
+}
+
+void
+Suma::api_fail_block_cleanup(Signal* signal, Uint32 failedNode)
+{
+ jam();
+
+ Callback cb = {safe_cast(&Suma::api_fail_block_cleanup_callback),
+ failedNode};
+
+ simBlockNodeFailure(signal, failedNode, cb);
+}
+
+void
Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
{
jam();
@@ -869,10 +910,7 @@ Suma::api_fail_subscriber_list(Signal* s
if (iter.curr.isNull())
{
jam();
- signal->theData[0] = nodeId;
- signal->theData[1] = reference();
- sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
- c_failedApiNodes.clear(nodeId);
+ api_fail_block_cleanup(signal, nodeId);
return;
}
@@ -981,10 +1019,9 @@ Suma::api_fail_subscription(Signal* sign
}
c_subOpPool.release(subOpPtr);
- signal->theData[0] = nodeId;
- signal->theData[1] = reference();
- sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
- c_failedApiNodes.clear(nodeId);
+
+ /* Now do block level cleanup */
+ api_fail_block_cleanup(signal, nodeId);
}
void
@@ -1059,6 +1096,17 @@ Suma::execNODE_FAILREP(Signal* signal){
}
}
}
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+ ndbassert(elementsCleaned == 0); // As Suma has no remote fragmented signals
+ (void) elementsCleaned; // Avoid compiler error
+ }//if
+ }//for
c_alive_nodes.assign(tmp);
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2009-09-10 14:49:18 +0000
@@ -434,6 +434,10 @@ public:
void api_fail_gci_list(Signal*, Uint32 node);
void api_fail_subscriber_list(Signal*, Uint32 node);
void api_fail_subscription(Signal*);
+ void api_fail_block_cleanup(Signal* signal, Uint32 failedNode);
+ void api_fail_block_cleanup_callback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 elementsCleaned);
void execSUB_GCP_COMPLETE_ACK(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp 2009-08-21 09:26:34 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2009-09-10 14:49:18 +0000
@@ -30,6 +30,7 @@
#include <signaldata/DumpStateOrd.hpp>
#include <signaldata/TsmanContinueB.hpp>
#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include <dbtup/Dbtup.hpp>
#define JONAS 0
@@ -60,6 +61,7 @@ Tsman::Tsman(Block_context& ctx,
addRecSignal(GSN_READ_CONFIG_REQ, &Tsman::execREAD_CONFIG_REQ);
addRecSignal(GSN_DUMP_STATE_ORD, &Tsman::execDUMP_STATE_ORD);
addRecSignal(GSN_CONTINUEB, &Tsman::execCONTINUEB);
+ addRecSignal(GSN_NODE_FAILREP, &Tsman::execNODE_FAILREP);
addRecSignal(GSN_CREATE_FILE_REQ, &Tsman::execCREATE_FILE_REQ);
addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Tsman::execCREATE_FILEGROUP_REQ);
@@ -176,6 +178,26 @@ Tsman::execCONTINUEB(Signal* signal){
ndbrequire(false);
}
+void
+Tsman::execNODE_FAILREP(Signal* signal)
+{
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ Uint32 elementsCleaned = simBlockNodeFailure(signal, i); // No callback
+ ndbassert(elementsCleaned == 0); // No distributed fragmented signals
+ (void) elementsCleaned; // Remove compiler warning
+ }//if
+ }//for
+}
+
#ifdef VM_TRACE
struct TsmanChunk
{
=== modified file 'storage/ndb/src/kernel/blocks/tsman.hpp'
--- a/storage/ndb/src/kernel/blocks/tsman.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.hpp 2009-09-10 14:49:18 +0000
@@ -43,6 +43,7 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
void execCREATE_FILE_REQ(Signal* signal);
void execCREATE_FILEGROUP_REQ(Signal* signal);
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-09-10 14:49:18 +0000
@@ -906,53 +906,87 @@ SimulatedBlock::execSIGNAL_DROPPED_REP(S
void
SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
ljamEntry();
-
- Ptr<FragmentSendInfo> fragPtr;
-
- c_segmentedFragmentSendList.first(fragPtr);
- for(; !fragPtr.isNull();){
+
+ ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
+
+ switch (sig->type)
+ {
+ case ContinueFragmented::CONTINUE_SENDING :
+ {
ljam();
- Ptr<FragmentSendInfo> copyPtr = fragPtr;
- c_segmentedFragmentSendList.next(fragPtr);
+ Ptr<FragmentSendInfo> fragPtr;
- sendNextSegmentedFragment(signal, * copyPtr.p);
- if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
+ c_segmentedFragmentSendList.first(fragPtr);
+ for(; !fragPtr.isNull();){
ljam();
- if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ Ptr<FragmentSendInfo> copyPtr = fragPtr;
+ c_segmentedFragmentSendList.next(fragPtr);
+
+ sendNextSegmentedFragment(signal, * copyPtr.p);
+ if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
ljam();
- execute(signal, copyPtr.p->m_callback, 0);
- }//if
- c_segmentedFragmentSendList.release(copyPtr);
+ if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ ljam();
+ execute(signal, copyPtr.p->m_callback, 0);
+ }//if
+ c_segmentedFragmentSendList.release(copyPtr);
+ }
}
- }
-
- c_linearFragmentSendList.first(fragPtr);
- for(; !fragPtr.isNull();){
- ljam();
- Ptr<FragmentSendInfo> copyPtr = fragPtr;
- c_linearFragmentSendList.next(fragPtr);
- sendNextLinearFragment(signal, * copyPtr.p);
- if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
- ljam();
- if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ c_linearFragmentSendList.first(fragPtr);
+ for(; !fragPtr.isNull();){
+ ljam();
+ Ptr<FragmentSendInfo> copyPtr = fragPtr;
+ c_linearFragmentSendList.next(fragPtr);
+
+ sendNextLinearFragment(signal, * copyPtr.p);
+ if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
ljam();
- execute(signal, copyPtr.p->m_callback, 0);
- }//if
- c_linearFragmentSendList.release(copyPtr);
+ if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ ljam();
+ execute(signal, copyPtr.p->m_callback, 0);
+ }//if
+ c_linearFragmentSendList.release(copyPtr);
+ }
}
+
+ if(c_segmentedFragmentSendList.isEmpty() &&
+ c_linearFragmentSendList.isEmpty()){
+ ljam();
+ c_fragSenderRunning = false;
+ return;
+ }
+
+ sig->type = ContinueFragmented::CONTINUE_SENDING;
+ sig->line = __LINE__;
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
+ break;
}
-
- if(c_segmentedFragmentSendList.isEmpty() &&
- c_linearFragmentSendList.isEmpty()){
+ case ContinueFragmented::CONTINUE_CLEANUP:
+ {
ljam();
- c_fragSenderRunning = false;
- return;
+
+ const Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
+ /* Check length of signal */
+ ndbassert(signal->getLength() ==
+ ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
+ callbackWords);
+
+ Callback cb;
+ memcpy(&cb, &sig->callbackStart, callbackWords << 2);
+
+ doNodeFailureCleanup(signal,
+ sig->failedNodeId,
+ sig->resource,
+ sig->cursor,
+ sig->elementsCleaned,
+ cb);
+ break;
+ }
+ default:
+ ndbrequire(false);
}
-
- ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
- sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
}
void
@@ -1117,6 +1151,287 @@ SimulatedBlock::assembleFragments(Signal
return false;
}
+/**
+ * doCleanupFragInfo
+ * Iterate over block's Fragment assembly hash, looking
+ * for in-assembly fragments from the failed node
+ * Release these
+ * Returns after each scanned bucket to avoid consuming
+ * too much time.
+ *
+ * Parameters
+ * failedNodeId : Node id of failed node
+ * cursor : Hash bucket to start iteration from
+ * rtUnitsUsed : Total rt units used
+ * elementsCleaned : Number of elements cleaned
+ *
+ * Updates
+ * cursor : Hash bucket to continue iteration from
+ * rtUnitsUsed : += units used
+ * elementsCleaned : += elements cleaned
+ *
+ * Returns
+ * true if all FragInfo structs cleaned up
+ * false if more to do
+ */
+bool
+SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
+ Uint32& cursor,
+ Uint32& rtUnitsUsed,
+ Uint32& elementsCleaned)
+{
+ ljam();
+ DLHashTable<FragmentInfo>::Iterator iter;
+
+ c_fragmentInfoHash.next(cursor, iter);
+
+ const Uint32 startBucket = iter.bucket;
+
+ while (!iter.isNull() &&
+ (iter.bucket == startBucket))
+ {
+ ljam();
+
+ Ptr<FragmentInfo> curr = iter.curr;
+ c_fragmentInfoHash.next(iter);
+
+ FragmentInfo* fragInfo = curr.p;
+
+ if (refToNode(fragInfo->m_senderRef) == failedNodeId)
+ {
+ ljam();
+ /* We were assembling a fragmented signal from the
+ * failed node, discard the partially assembled
+ * sections and free the FragmentInfo hash entry
+ */
+ for(Uint32 s = 0; s<3; s++)
+ {
+ if (fragInfo->m_sectionPtrI[s] != RNIL)
+ {
+ ljam();
+ SegmentedSectionPtr ssptr;
+ getSection(ssptr, fragInfo->m_sectionPtrI[s]);
+ release(ssptr);
+ }
+ }
+
+ /* Release FragmentInfo hash element */
+ c_fragmentInfoHash.release(curr);
+
+ elementsCleaned++;
+ rtUnitsUsed+=3;
+ }
+
+ rtUnitsUsed++;
+ } // while
+
+ cursor = iter.bucket;
+ return iter.isNull();
+}
+
+bool
+SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
+ Uint32& cursor,
+ Uint32& rtUnitsUsed,
+ Uint32& elementsCleaned)
+{
+ ljam();
+
+ Ptr<FragmentSendInfo> fragPtr;
+ const Uint32 NumSendLists = 2;
+ ndbrequire(cursor < NumSendLists);
+
+ DLList<FragmentSendInfo>* fragSendLists[ NumSendLists ] =
+ { &c_segmentedFragmentSendList,
+ &c_linearFragmentSendList };
+
+ DLList<FragmentSendInfo>* list = fragSendLists[ cursor ];
+
+ list->first(fragPtr);
+ for(; !fragPtr.isNull();){
+ ljam();
+ Ptr<FragmentSendInfo> copyPtr = fragPtr;
+ list->next(fragPtr);
+ rtUnitsUsed++;
+
+ NodeReceiverGroup& rg = copyPtr.p->m_nodeReceiverGroup;
+
+ if (rg.m_nodes.get(failedNodeId))
+ {
+ ljam();
+ /* Fragmented signal is being sent to node */
+ rg.m_nodes.clear(failedNodeId);
+
+ if (rg.m_nodes.isclear())
+ {
+ ljam();
+ /* No other nodes in receiver group - send
+ * is cancelled
+ * Will be cleaned up in the usual CONTINUE_FRAGMENTED
+ * handling code.
+ */
+ copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
+ }
+ elementsCleaned++;
+ }
+ }
+
+ /* Next time we'll do the next list */
+ cursor++;
+
+ return (cursor == NumSendLists);
+}
+
+
+Uint32
+SimulatedBlock::doNodeFailureCleanup(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 resource,
+ Uint32 cursor,
+ Uint32 elementsCleaned,
+ Callback& cb)
+{
+ ljam();
+ const bool userCallback = (cb.m_callbackFunction != 0);
+ const Uint32 maxRtUnits = userCallback ?
+#ifdef VM_TRACE
+ 2 :
+#else
+ 16 :
+#endif
+ ~0; /* Must complete all processing in this call */
+
+ Uint32 rtUnitsUsed = 0;
+
+ /* Loop over resources, cleaning them up */
+ do
+ {
+ bool resourceDone= false;
+ switch(resource) {
+ case ContinueFragmented::RES_FRAGSEND:
+ {
+ ljam();
+ resourceDone = doCleanupFragSend(failedNodeId, cursor,
+ rtUnitsUsed, elementsCleaned);
+ break;
+ }
+ case ContinueFragmented::RES_FRAGINFO:
+ {
+ ljam();
+ resourceDone = doCleanupFragInfo(failedNodeId, cursor,
+ rtUnitsUsed, elementsCleaned);
+ break;
+ }
+ case ContinueFragmented::RES_LAST:
+ {
+ ljam();
+ /* Node failure processing complete, execute user callback if provided */
+ if (userCallback)
+ execute(signal, cb, elementsCleaned);
+
+ return elementsCleaned;
+ }
+ default:
+ ndbrequire(false);
+ }
+
+ /* Did we complete cleaning up this resource? */
+ if (resourceDone)
+ {
+ resource++;
+ cursor= 0;
+ }
+
+ } while (rtUnitsUsed <= maxRtUnits);
+
+ ljam();
+
+ /* Not yet completed failure handling.
+ * Must have exhausted RT units.
+ * Update cursor and re-invoke
+ */
+ ndbassert(userCallback);
+
+ /* Send signal to continue processing */
+
+ ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type = ContinueFragmented::CONTINUE_CLEANUP;
+ sig->failedNodeId = failedNodeId;
+ sig->resource = resource;
+ sig->cursor = cursor;
+ sig->elementsCleaned= elementsCleaned;
+ Uint32 callbackWords = (sizeof(Callback) + 3) >> 2;
+ Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
+ callbackWords;
+ ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
+ memcpy(&sig->callbackStart, &cb, callbackWords << 2);
+
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
+
+ return elementsCleaned;
+}
+
+Uint32
+SimulatedBlock::simBlockNodeFailure(Signal* signal,
+ Uint32 failedNodeId,
+ Callback& cb)
+{
+ ljam();
+ return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
+}
+
+Uint32
+SimulatedBlock::debugPrintFragmentCounts()
+{
+ const char* blockName = getBlockName(theNumber);
+ DLHashTable<FragmentInfo>::Iterator iter;
+ Uint32 fragmentInfoCount = 0;
+ c_fragmentInfoHash.first(iter);
+
+ while(!iter.isNull())
+ {
+ fragmentInfoCount++;
+ c_fragmentInfoHash.next(iter);
+ }
+
+ Ptr<FragmentSendInfo> ptr;
+ Uint32 linSendInfoCount = 0;
+
+ c_linearFragmentSendList.first(ptr);
+
+ while (!ptr.isNull())
+ {
+ linSendInfoCount++;
+ c_linearFragmentSendList.next(ptr);
+ }
+
+ Uint32 segSendInfoCount = 0;
+ c_segmentedFragmentSendList.first(ptr);
+
+ while (!ptr.isNull())
+ {
+ segSendInfoCount++;
+ c_segmentedFragmentSendList.next(ptr);
+ }
+
+ ndbout_c("%s : Fragment assembly hash entry count : %d",
+ blockName,
+ fragmentInfoCount);
+
+ ndbout_c("%s : Linear fragment send list size : %d",
+ blockName,
+ linSendInfoCount);
+
+ ndbout_c("%s : Segmented fragment send list size : %d",
+ blockName,
+ segSendInfoCount);
+
+ return fragmentInfoCount +
+ linSendInfoCount +
+ segSendInfoCount;
+}
+
+
bool
SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
NodeReceiverGroup rg,
@@ -1207,6 +1522,37 @@ void
SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
FragmentSendInfo & info){
+ if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
+ {
+ /* Send was cancelled - all dest. nodes have failed
+ * since send was started
+ *
+ * Free any sections still to be sent
+ */
+ Uint32 secCount = 0;
+ SegmentedSectionPtr ssptr[3];
+ for (Uint32 s = 0; s < 3; s++)
+ {
+ Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
+ if (sectionI != RNIL)
+ {
+ getSection(ssptr[secCount], sectionI);
+ info.m_sectionPtr[s].m_segmented.i = RNIL;
+ info.m_sectionPtr[s].m_segmented.p = NULL;
+ secCount++;
+ }
+ }
+
+ ::releaseSections(secCount, ssptr);
+
+ /* Free inline signal data storage section */
+ Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
+ g_sectionSegmentPool.release(inlineDataI);
+
+ info.m_status = FragmentSendInfo::SendComplete;
+ return;
+ }
+
/**
* Store "theData"
*/
@@ -1446,6 +1792,19 @@ void
SimulatedBlock::sendNextLinearFragment(Signal* signal,
FragmentSendInfo & info){
+ if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
+ {
+ /* Send was cancelled - all dest. nodes have failed
+ * since send was started
+ */
+ /* Free inline signal data storage section */
+ Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
+ g_sectionSegmentPool.release(inlineDataI);
+
+ info.m_status = FragmentSendInfo::SendComplete;
+ return;
+ }
+
/**
* Store "theData"
*/
@@ -1617,8 +1976,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type = ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
@@ -1655,8 +2015,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type = ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
@@ -1704,8 +2065,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type = ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
@@ -1746,8 +2108,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type = ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-09-10 14:49:18 +0000
@@ -237,6 +237,37 @@ protected:
Callback &,
Uint32 messageSize = 240);
+ /**
+ * simBlockNodeFailure
+ *
+ * Method must be called by blocks that send or receive
+ * remote Fragmented Signals when they detect a node
+ * (NDBD or API) failure.
+ * If the block needs to acknowledge or perform further
+ * processing after completing block-level node failure
+ * handling, it can supply a Callback which will be invoked
+ * when block-level node failure handling has completed.
+ * Otherwise TheEmptyCallback is used.
+ * If TheEmptyCallback is used, all failure handling is
+ * performed in the current timeslice, to avoid any
+ * races.
+ *
+ * Parameters
+ * signal : Current signal*
+ * failedNodeId : Node id of failed node
+ * cb : Callback to be executed when block-level
+ * node failure handling completed.
+ * TheEmptyCallback is passed if no further
+ * processing is required.
+ * Returns
+ * Number of 'resources' cleaned up in call.
+ * Callback return code is total resources cleaned up.
+ *
+ */
+ Uint32 simBlockNodeFailure(Signal* signal,
+ Uint32 failedNodeId,
+ Callback& cb = TheEmptyCallback);
+
/**********************************************************
* Fragmented signals structures
*/
@@ -273,7 +304,8 @@ protected:
enum Status {
SendNotComplete = 0,
- SendComplete = 1
+ SendComplete = 1,
+ SendCancelled = 2
};
Uint8 m_status;
Uint8 m_prio;
@@ -355,6 +387,23 @@ private:
const NodeId theNodeId;
const BlockNumber theNumber;
const BlockReference theReference;
+
+ Uint32 doNodeFailureCleanup(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 resource,
+ Uint32 cursor,
+ Uint32 elementsCleaned,
+ Callback& cb);
+
+ bool doCleanupFragInfo(Uint32 failedNodeId,
+ Uint32& cursor,
+ Uint32& rtUnitsUsed,
+ Uint32& elementsCleaned);
+
+ bool doCleanupFragSend(Uint32 failedNodeId,
+ Uint32& cursor,
+ Uint32& rtUnitsUsed,
+ Uint32& elementsCleaned);
protected:
Block_context m_ctx;
@@ -467,6 +516,9 @@ private:
ArrayPool<FragmentSendInfo> c_fragmentSendPool;
DLList<FragmentSendInfo> c_linearFragmentSendList;
DLList<FragmentSendInfo> c_segmentedFragmentSendList;
+
+protected:
+ Uint32 debugPrintFragmentCounts();
public:
class MutexManager {
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.2 branch (frazer:2993) Bug#44607 | Frazer Clement | 10 Sep |