#At file:///home/frazer/bzr/mysql-5.1-telco-6.2/
2962 Frazer Clement 2009-08-12
Bug#44607 Ndb : Fragmented long signals need node failure handling code
modified:
storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp
storage/ndb/include/kernel/signaldata/NodeFailRep.hpp
storage/ndb/src/kernel/blocks/backup/Backup.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp
storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp
storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp
storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp
storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp
storage/ndb/src/kernel/blocks/lgman.cpp
storage/ndb/src/kernel/blocks/lgman.hpp
storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp
storage/ndb/src/kernel/blocks/suma/Suma.cpp
storage/ndb/src/kernel/blocks/suma/Suma.hpp
storage/ndb/src/kernel/blocks/tsman.cpp
storage/ndb/src/kernel/blocks/tsman.hpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.hpp
=== modified file 'storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp'
--- a/storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/ContinueFragmented.hpp 2009-08-12 13:24:40 +0000
@@ -32,7 +32,32 @@ class ContinueFragmented {
public:
private:
- Uint32 line;
+ enum {
+ CONTINUE_SENDING = 0,
+ CONTINUE_CLEANUP = 1
+ };
+
+ STATIC_CONST(CONTINUE_CLEANUP_FIXED_WORDS= 4);
+
+ enum {
+ RES_FRAGINFO = 0, /* Fragmented signal assembly hash */
+ RES_FRAGSEND = 1, /* Fragmented send lists */
+ RES_LAST = 2 /* Must be last */
+ };
+
+ Uint32 type;
+
+ union
+ {
+ Uint32 line; /* For CONTINUE_SENDING */
+ struct /* For CONTINUE_CLEANUP */
+ {
+ Uint32 failedNodeId;
+ Uint32 resource;
+ Uint32 cursor;
+ Uint32 callbackStart; /* Callback structure placed here */
+ };
+ };
};
#endif
=== modified file 'storage/ndb/include/kernel/signaldata/NodeFailRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/NodeFailRep.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/kernel/signaldata/NodeFailRep.hpp 2009-08-12 13:24:40 +0000
@@ -24,7 +24,8 @@
/**
* This signals is sent by Qmgr to NdbCntr
- * and then from NdbCntr sent to: dih, dict, lqh, tc & API
+ * and then from NdbCntr sent to: dih, dict, lqh, tc, API
+ * and others
*/
struct NodeFailRep {
STATIC_CONST( SignalLength = 3 + NdbNodeBitmask::Size );
=== modified file 'storage/ndb/src/kernel/blocks/backup/Backup.cpp'
--- a/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/backup/Backup.cpp 2009-08-12 13:24:40 +0000
@@ -850,6 +850,16 @@ Backup::execNODE_FAILREP(Signal* signal)
jam();
checkNodeFail(signal, ptr, newCoordinator, theFailedNodes);
}
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(NdbNodeBitmask::get(theFailedNodes, i))
+ {
+ jam();
+ handleNodeFailure(signal, i); // No callback
+ }//if
+ }//for
}
bool
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp 2009-08-12 13:24:40 +0000
@@ -3749,6 +3749,18 @@ Dbdict::restartDropObj_commit_complete_d
/* ---------------------------------------------------------------- */
/* **************************************************************** */
+void Dbdict::handleApiFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jam();
+ ndbassert(ignoredRc == 0);
+
+ signal->theData[0] = failedNodeId;
+ signal->theData[1] = reference();
+ sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+}
+
/* ---------------------------------------------------------------- */
// We receive a report of an API that failed.
/* ---------------------------------------------------------------- */
@@ -3766,11 +3778,27 @@ void Dbdict::execAPI_FAILREQ(Signal* sig
}//if
#endif
- signal->theData[0] = failedApiNode;
- signal->theData[1] = reference();
- sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+ ndbassert(retRef == QMGR_REF);
+ Callback cb= { safe_cast(&Dbdict::handleApiFailureCallback),
+ failedApiNode };
+ handleNodeFailure(signal, failedApiNode, cb);
}//execAPI_FAILREQ()
+void Dbdict::handleNdbdFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jam();
+
+ /* Node failure handling is complete */
+ NFCompleteRep * const nfCompRep = (NFCompleteRep *)&signal->theData[0];
+ nfCompRep->blockNo = DBDICT;
+ nfCompRep->nodeId = getOwnNodeId();
+ nfCompRep->failedNodeId = failedNodeId;
+ sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal,
+ NFCompleteRep::SignalLength, JBB);
+}
+
/* ---------------------------------------------------------------- */
// We receive a report of one or more node failures of kernel nodes.
/* ---------------------------------------------------------------- */
@@ -3834,14 +3862,12 @@ void Dbdict::execNODE_FAILREP(Signal* si
c_nodes.getPtr(nodePtr, i);
nodePtr.p->nodeState = NodeRecord::NDB_NODE_DEAD;
- NFCompleteRep * const nfCompRep = (NFCompleteRep *)&signal->theData[0];
- nfCompRep->blockNo = DBDICT;
- nfCompRep->nodeId = getOwnNodeId();
- nfCompRep->failedNodeId = nodePtr.i;
- sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP, signal,
- NFCompleteRep::SignalLength, JBB);
-
c_aliveNodes.clear(i);
+
+ Callback cb = {safe_cast(&Dbdict::handleNdbdFailureCallback),
+ i};
+
+ handleNodeFailure(signal, nodePtr.i, cb);
}//if
}//for
=== modified file 'storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp'
--- a/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdict/Dbdict.hpp 2009-08-12 13:24:40 +0000
@@ -2151,7 +2151,12 @@ private:
// NF handling
void removeStaleDictLocks(Signal* signal, const Uint32* theFailedNodes);
-
+ void handleNdbdFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc);
+ void handleApiFailureCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc);
// Statement blocks
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/Dbtc.hpp 2009-08-12 13:24:40 +0000
@@ -942,11 +942,12 @@ public:
enum NodeFailBits
{
- NF_TAKEOVER = 0x1,
- NF_CHECK_SCAN = 0x2,
- NF_CHECK_TRANSACTION = 0x4,
- NF_CHECK_DROP_TAB = 0x8,
- NF_NODE_FAIL_BITS = 0xF // All bits...
+ NF_TAKEOVER = 0x01,
+ NF_CHECK_SCAN = 0x02,
+ NF_CHECK_TRANSACTION = 0x04,
+ NF_CHECK_DROP_TAB = 0x08,
+ NF_BLOCK_HANDLE = 0x10,
+ NF_NODE_FAIL_BITS = 0x1F // All bits...
};
Uint32 m_nf_bits;
NdbNodeBitmask m_lqh_trans_conf;
@@ -1634,7 +1635,10 @@ private:
LocalDLList<ScanFragRec>::Head&);
void nodeFailCheckTransactions(Signal*,Uint32 transPtrI,Uint32 failedNodeId);
+ void ndbdFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
void checkNodeFailComplete(Signal* signal, Uint32 failedNodeId, Uint32 bit);
+
+ void apiFailBlockCleanupCallback(Signal* signal, Uint32 failedNodeId, Uint32 ignoredRc);
// Initialisation
void initData();
=== modified file 'storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2009-08-04 10:52:04 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp 2009-08-12 13:24:40 +0000
@@ -1100,9 +1100,11 @@ Dbtc::handleFailedApiNode(Signal* signal
/**
* Finished with scanning connection record
*
- * Now scan markers
+ * Now perform block level node failure cleanup
*/
- removeMarkerForFailedAPI(signal, TapiFailedNode, 0);
+ Callback cb= {safe_cast(&Dbtc::apiFailBlockCleanupCallback),
+ TapiFailedNode};
+ handleNodeFailure(signal, TapiFailedNode, cb);
return;
}//if
} while (TloopCount++ < 256);
@@ -7352,6 +7354,9 @@ void Dbtc::execNODE_FAILREP(Signal* sign
checkScanActiveInFailedLqh(signal, 0, hostptr.i);
checkWaitDropTabFailedLqh(signal, hostptr.i, 0); // nodeid, tableid
nodeFailCheckTransactions(signal, 0, hostptr.i);
+ Callback cb= {safe_cast(&Dbtc::ndbdFailBlockCleanupCallback),
+ hostptr.i};
+ handleNodeFailure(signal, hostptr.i, cb);
}
}//Dbtc::execNODE_FAILREP()
@@ -7491,6 +7496,28 @@ Dbtc::nodeFailCheckTransactions(Signal*
}
}
+void
+Dbtc::ndbdFailBlockCleanupCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jamEntry();
+ ndbassert(ignoredRc == 0);
+
+ checkNodeFailComplete(signal, failedNodeId,
+ HostRecord::NF_BLOCK_HANDLE);
+}
+
+void
+Dbtc::apiFailBlockCleanupCallback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jamEntry();
+ ndbassert(ignoredRc == 0);
+
+ removeMarkerForFailedAPI(signal, failedNodeId, 0);
+}
void
Dbtc::checkScanFragList(Signal* signal,
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp 2009-08-12 13:24:40 +0000
@@ -1714,6 +1714,7 @@ private:
void execALTER_TAB_REQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
// Ordered index related
void execBUILDINDXREQ(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp 2009-08-12 13:24:40 +0000
@@ -32,6 +32,7 @@
#include <signaldata/FsRemoveReq.hpp>
#include <signaldata/TupCommit.hpp>
#include <signaldata/TupKey.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include <signaldata/DropTab.hpp>
#include <SLList.hpp>
@@ -65,6 +66,7 @@ Dbtup::Dbtup(Block_context& ctx, Pgman*
addRecSignal(GSN_DEBUG_SIG, &Dbtup::execDEBUG_SIG);
addRecSignal(GSN_CONTINUEB, &Dbtup::execCONTINUEB);
addRecSignal(GSN_LCP_FRAG_ORD, &Dbtup::execLCP_FRAG_ORD);
+ addRecSignal(GSN_NODE_FAILREP, &Dbtup::execNODE_FAILREP);
addRecSignal(GSN_DUMP_STATE_ORD, &Dbtup::execDUMP_STATE_ORD);
addRecSignal(GSN_SEND_PACKED, &Dbtup::execSEND_PACKED);
@@ -773,4 +775,20 @@ void Dbtup::releaseFragrec(FragrecordPtr
}//Dbtup::releaseFragrec()
+void Dbtup::execNODE_FAILREP(Signal* signal)
+{
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ handleNodeFailure(signal, i); // No callback
+ }//if
+ }//for
+}
=== modified file 'storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp'
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.cpp 2009-08-12 13:24:40 +0000
@@ -29,6 +29,7 @@
#include <signaldata/TcKeyFailConf.hpp>
#include <signaldata/GetTabInfo.hpp>
#include <signaldata/DictTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include <signaldata/UtilSequence.hpp>
#include <signaldata/UtilPrepare.hpp>
@@ -66,6 +67,7 @@ DbUtil::DbUtil(Block_context& ctx) :
addRecSignal(GSN_NDB_STTOR, &DbUtil::execNDB_STTOR);
addRecSignal(GSN_DUMP_STATE_ORD, &DbUtil::execDUMP_STATE_ORD);
addRecSignal(GSN_CONTINUEB, &DbUtil::execCONTINUEB);
+ addRecSignal(GSN_NODE_FAILREP, &DbUtil::execNODE_FAILREP);
//addRecSignal(GSN_TCSEIZEREF, &DbUtil::execTCSEIZEREF);
addRecSignal(GSN_TCSEIZECONF, &DbUtil::execTCSEIZECONF);
@@ -304,6 +306,23 @@ DbUtil::execCONTINUEB(Signal* signal){
}
void
+DbUtil::execNODE_FAILREP(Signal* signal){
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ handleNodeFailure(signal, i); // No callback
+ }//if
+ }//for
+}
+
+void
DbUtil::execDUMP_STATE_ORD(Signal* signal){
jamEntry();
=== modified file 'storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp'
--- a/storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbutil/DbUtil.hpp 2009-08-12 13:24:40 +0000
@@ -76,6 +76,7 @@ protected:
void execNDB_STTOR(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
/**
* Sequence Service : Public interface
=== modified file 'storage/ndb/src/kernel/blocks/lgman.cpp'
--- a/storage/ndb/src/kernel/blocks/lgman.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.cpp 2009-08-12 13:24:40 +0000
@@ -29,6 +29,7 @@
#include <signaldata/SumaImpl.hpp>
#include <signaldata/LgmanContinueB.hpp>
#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include "ndbfs/Ndbfs.hpp"
#include "dbtup/Dbtup.hpp"
@@ -66,6 +67,7 @@ Lgman::Lgman(Block_context & ctx) :
addRecSignal(GSN_READ_CONFIG_REQ, &Lgman::execREAD_CONFIG_REQ);
addRecSignal(GSN_DUMP_STATE_ORD, &Lgman::execDUMP_STATE_ORD);
addRecSignal(GSN_CONTINUEB, &Lgman::execCONTINUEB);
+ addRecSignal(GSN_NODE_FAILREP, &Lgman::execNODE_FAILREP);
addRecSignal(GSN_CREATE_FILE_REQ, &Lgman::execCREATE_FILE_REQ);
addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Lgman::execCREATE_FILEGROUP_REQ);
@@ -258,6 +260,24 @@ Lgman::execCONTINUEB(Signal* signal){
}
void
+Lgman::execNODE_FAILREP(Signal* signal)
+{
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ handleNodeFailure(signal, i); // No callback
+ }//if
+ }//for
+}
+
+void
Lgman::execDUMP_STATE_ORD(Signal* signal){
jamEntry();
if(signal->theData[0] == 12001)
=== modified file 'storage/ndb/src/kernel/blocks/lgman.hpp'
--- a/storage/ndb/src/kernel/blocks/lgman.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/lgman.hpp 2009-08-12 13:24:40 +0000
@@ -47,6 +47,7 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
void execCREATE_FILE_REQ(Signal* signal);
void execCREATE_FILEGROUP_REQ(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2009-08-03 11:28:27 +0000
+++ b/storage/ndb/src/kernel/blocks/ndbcntr/NdbcntrMain.cpp 2009-08-12 13:24:40 +0000
@@ -1705,6 +1705,18 @@ void Ndbcntr::execNODE_FAILREP(Signal* s
sendSignal(QMGR_REF, GSN_NODE_FAILREP, signal,
NodeFailRep::SignalLength, JBB);
+ sendSignal(DBUTIL_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
+ sendSignal(DBTUP_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
+ sendSignal(TSMAN_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
+ sendSignal(LGMAN_REF, GSN_NODE_FAILREP, signal,
+ NodeFailRep::SignalLength, JBB);
+
if (c_stopRec.stopReq.senderRef)
{
jam();
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.cpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.cpp 2009-08-12 13:24:40 +0000
@@ -717,20 +717,24 @@ void Suma::execAPI_FAILREQ(Signal* signa
jamEntry();
DBUG_ENTER("Suma::execAPI_FAILREQ");
Uint32 failedApiNode = signal->theData[0];
- BlockReference retRef = signal->theData[1];
+ ndbassert(signal->theData[1] == QMGR_REF);
c_connected_nodes.clear(failedApiNode);
if (c_failedApiNodes.get(failedApiNode))
{
jam();
+ /* Being handled already, just conf */
goto CONF;
}
if (!c_subscriber_nodes.get(failedApiNode))
{
jam();
- goto CONF;
+ /* No Subscribers on that node, no SUMA
+ * specific work to do
+ */
+ goto BLOCK_CLEANUP;
}
c_failedApiNodes.set(failedApiNode);
@@ -744,15 +748,48 @@ void Suma::execAPI_FAILREQ(Signal* signa
sendSignal(SUMA_REF, GSN_CONTINUEB, signal, 2, JBB);
return;
+BLOCK_CLEANUP:
+ jam();
+ api_fail_block_cleanup(signal, failedApiNode);
+ DBUG_VOID_RETURN;
+
CONF:
+ jam();
signal->theData[0] = failedApiNode;
signal->theData[1] = reference();
- sendSignal(retRef, GSN_API_FAILCONF, signal, 2, JBB);
+ sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
DBUG_VOID_RETURN;
}//execAPI_FAILREQ()
void
+Suma::api_fail_block_cleanup_callback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc)
+{
+ jamEntry();
+
+ ndbassert(ignoredRc == 0);
+
+ /* Node failure handling is complete */
+ signal->theData[0] = failedNodeId;
+ signal->theData[1] = reference();
+ sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
+ c_failedApiNodes.clear(failedNodeId);
+}
+
+void
+Suma::api_fail_block_cleanup(Signal* signal, Uint32 failedNode)
+{
+ jam();
+
+ Callback cb= {safe_cast(&Suma::api_fail_block_cleanup_callback),
+ failedNode};
+
+ handleNodeFailure(signal, failedNode, cb);
+}
+
+void
Suma::api_fail_gci_list(Signal* signal, Uint32 nodeId)
{
jam();
@@ -869,10 +906,7 @@ Suma::api_fail_subscriber_list(Signal* s
if (iter.curr.isNull())
{
jam();
- signal->theData[0] = nodeId;
- signal->theData[1] = reference();
- sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
- c_failedApiNodes.clear(nodeId);
+ api_fail_block_cleanup(signal, nodeId);
return;
}
@@ -981,10 +1015,9 @@ Suma::api_fail_subscription(Signal* sign
}
c_subOpPool.release(subOpPtr);
- signal->theData[0] = nodeId;
- signal->theData[1] = reference();
- sendSignal(QMGR_REF, GSN_API_FAILCONF, signal, 2, JBB);
- c_failedApiNodes.clear(nodeId);
+
+ /* Now do block level cleanup */
+ api_fail_block_cleanup(signal, nodeId);
}
void
@@ -1059,6 +1092,15 @@ Suma::execNODE_FAILREP(Signal* signal){
}
}
}
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ handleNodeFailure(signal, i); // No callback
+ }//if
+ }//for
c_alive_nodes.assign(tmp);
=== modified file 'storage/ndb/src/kernel/blocks/suma/Suma.hpp'
--- a/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/suma/Suma.hpp 2009-08-12 13:24:40 +0000
@@ -434,6 +434,10 @@ public:
void api_fail_gci_list(Signal*, Uint32 node);
void api_fail_subscriber_list(Signal*, Uint32 node);
void api_fail_subscription(Signal*);
+ void api_fail_block_cleanup(Signal* signal, Uint32 failedNode);
+ void api_fail_block_cleanup_callback(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 ignoredRc);
void execSUB_GCP_COMPLETE_ACK(Signal* signal);
=== modified file 'storage/ndb/src/kernel/blocks/tsman.cpp'
--- a/storage/ndb/src/kernel/blocks/tsman.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.cpp 2009-08-12 13:24:40 +0000
@@ -30,6 +30,7 @@
#include <signaldata/DumpStateOrd.hpp>
#include <signaldata/TsmanContinueB.hpp>
#include <signaldata/GetTabInfo.hpp>
+#include <signaldata/NodeFailRep.hpp>
#include <dbtup/Dbtup.hpp>
#define JONAS 0
@@ -60,6 +61,7 @@ Tsman::Tsman(Block_context& ctx,
addRecSignal(GSN_READ_CONFIG_REQ, &Tsman::execREAD_CONFIG_REQ);
addRecSignal(GSN_DUMP_STATE_ORD, &Tsman::execDUMP_STATE_ORD);
addRecSignal(GSN_CONTINUEB, &Tsman::execCONTINUEB);
+ addRecSignal(GSN_NODE_FAILREP, &Tsman::execNODE_FAILREP);
addRecSignal(GSN_CREATE_FILE_REQ, &Tsman::execCREATE_FILE_REQ);
addRecSignal(GSN_CREATE_FILEGROUP_REQ, &Tsman::execCREATE_FILEGROUP_REQ);
@@ -176,6 +178,24 @@ Tsman::execCONTINUEB(Signal* signal){
ndbrequire(false);
}
+void
+Tsman::execNODE_FAILREP(Signal* signal)
+{
+ jamEntry();
+ const NodeFailRep * rep = (NodeFailRep*)signal->getDataPtr();
+ NdbNodeBitmask failed;
+ failed.assign(NdbNodeBitmask::Size, rep->theNodes);
+
+ /* Block level cleanup */
+ for(unsigned i = 1; i < MAX_NDB_NODES; i++) {
+ jam();
+ if(failed.get(i)) {
+ jam();
+ handleNodeFailure(signal, i); // No callback
+ }//if
+ }//for
+}
+
#ifdef VM_TRACE
struct TsmanChunk
{
=== modified file 'storage/ndb/src/kernel/blocks/tsman.hpp'
--- a/storage/ndb/src/kernel/blocks/tsman.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/blocks/tsman.hpp 2009-08-12 13:24:40 +0000
@@ -43,6 +43,7 @@ protected:
void execREAD_CONFIG_REQ(Signal* signal);
void execDUMP_STATE_ORD(Signal* signal);
void execCONTINUEB(Signal* signal);
+ void execNODE_FAILREP(Signal* signal);
void execCREATE_FILE_REQ(Signal* signal);
void execCREATE_FILEGROUP_REQ(Signal* signal);
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2009-08-12 13:24:40 +0000
@@ -906,53 +906,86 @@ SimulatedBlock::execSIGNAL_DROPPED_REP(S
void
SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
ljamEntry();
-
- Ptr<FragmentSendInfo> fragPtr;
-
- c_segmentedFragmentSendList.first(fragPtr);
- for(; !fragPtr.isNull();){
+
+ ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ ndbrequire(signal->getSendersBlockRef() == reference()); /* Paranoia */
+
+ switch (sig->type)
+ {
+ case ContinueFragmented::CONTINUE_SENDING :
+ {
ljam();
- Ptr<FragmentSendInfo> copyPtr = fragPtr;
- c_segmentedFragmentSendList.next(fragPtr);
+ Ptr<FragmentSendInfo> fragPtr;
- sendNextSegmentedFragment(signal, * copyPtr.p);
- if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
+ c_segmentedFragmentSendList.first(fragPtr);
+ for(; !fragPtr.isNull();){
ljam();
- if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ Ptr<FragmentSendInfo> copyPtr = fragPtr;
+ c_segmentedFragmentSendList.next(fragPtr);
+
+ sendNextSegmentedFragment(signal, * copyPtr.p);
+ if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
ljam();
- execute(signal, copyPtr.p->m_callback, 0);
- }//if
- c_segmentedFragmentSendList.release(copyPtr);
+ if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ ljam();
+ execute(signal, copyPtr.p->m_callback, 0);
+ }//if
+ c_segmentedFragmentSendList.release(copyPtr);
+ }
}
- }
-
- c_linearFragmentSendList.first(fragPtr);
- for(; !fragPtr.isNull();){
- ljam();
- Ptr<FragmentSendInfo> copyPtr = fragPtr;
- c_linearFragmentSendList.next(fragPtr);
- sendNextLinearFragment(signal, * copyPtr.p);
- if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
- ljam();
- if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ c_linearFragmentSendList.first(fragPtr);
+ for(; !fragPtr.isNull();){
+ ljam();
+ Ptr<FragmentSendInfo> copyPtr = fragPtr;
+ c_linearFragmentSendList.next(fragPtr);
+
+ sendNextLinearFragment(signal, * copyPtr.p);
+ if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
ljam();
- execute(signal, copyPtr.p->m_callback, 0);
- }//if
- c_linearFragmentSendList.release(copyPtr);
+ if(copyPtr.p->m_callback.m_callbackFunction != 0) {
+ ljam();
+ execute(signal, copyPtr.p->m_callback, 0);
+ }//if
+ c_linearFragmentSendList.release(copyPtr);
+ }
}
+
+ if(c_segmentedFragmentSendList.isEmpty() &&
+ c_linearFragmentSendList.isEmpty()){
+ ljam();
+ c_fragSenderRunning = false;
+ return;
+ }
+
+ sig->type= ContinueFragmented::CONTINUE_SENDING;
+ sig->line = __LINE__;
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
+ break;
}
-
- if(c_segmentedFragmentSendList.isEmpty() &&
- c_linearFragmentSendList.isEmpty()){
+ case ContinueFragmented::CONTINUE_CLEANUP:
+ {
ljam();
- c_fragSenderRunning = false;
- return;
+
+ const Uint32 callbackWords= (sizeof(Callback) + 3) >> 2;
+ /* Check length of signal */
+ ndbassert(signal->getLength() ==
+ ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
+ callbackWords);
+
+ Callback cb;
+ memcpy(&cb, &sig->callbackStart, callbackWords << 2);
+
+ doNodeFailureCleanup(signal,
+ sig->failedNodeId,
+ sig->resource,
+ sig->cursor,
+ cb);
+ break;
+ }
+ default:
+ ndbrequire(false);
}
-
- ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
- sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
}
void
@@ -1117,6 +1150,243 @@ SimulatedBlock::assembleFragments(Signal
return false;
}
+/**
+ * doCleanupFragInfo
+ * Iterate over block's Fragment assembly hash, looking
+ * for in-assembly fragments from the failed node
+ * Release these
+ * Returns after each scanned bucket.
+ *
+ * Parameters
+ * failedNodeId : Node id of failed node
+ * cursor : Hash bucket to start iteration from
+ * Updates
+ * cursor : Hash bucket to continue iteration from
+ * 0 if iteration complete
+ * Returns
+ * Number of Rt units consumed
+ */
+Uint32
+SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
+ Uint32& cursor)
+{
+ DLHashTable<FragmentInfo>::Iterator iter;
+ bool haveElement= false;
+ Uint32 rtUnitsUsed= 0;
+
+ if (cursor == 0)
+ {
+ /* Start from beginning of fragment info hash */
+ haveElement= c_fragmentInfoHash.first(iter);
+ }
+ else
+ {
+ /* Start from fragment info hash bucket given by cursor */
+ haveElement= c_fragmentInfoHash.next(cursor, iter);
+ }
+
+ if (haveElement)
+ {
+ Uint32 currBucket;
+ /* Have at least one entry to look at now */
+ do
+ {
+ FragmentInfo* fragInfo= iter.curr.p;
+
+ if (refToNode(fragInfo->m_senderRef) == failedNodeId)
+ {
+ /* We were assembling a fragmented signal from the
+ * failed node, discard the partially assembled
+ * sections and free the FragmentInfo hash entry
+ */
+ for(Uint32 s= 0; s<3; s++)
+ {
+ if (fragInfo->m_sectionPtrI[s] != RNIL)
+ {
+ SegmentedSectionPtr ssptr;
+ getSection(ssptr, fragInfo->m_sectionPtrI[s]);
+ release(ssptr);
+ }
+ }
+
+ /* Note that we require next() to work after release() */
+#ifdef VM_TRACE
+ DLHashTable<FragmentInfo>::Iterator test1, test2;
+ test1= test2= iter;
+ c_fragmentInfoHash.next(test1);
+#endif
+
+ c_fragmentInfoHash.release(iter.curr);
+
+#ifdef VM_TRACE
+ c_fragmentInfoHash.next(test2);
+ ndbassert(test1.curr.i == test2.curr.i);
+#endif
+ rtUnitsUsed+=3;
+ }
+
+ rtUnitsUsed++;
+ currBucket= iter.bucket;
+
+ } while (c_fragmentInfoHash.next(iter) && // Have another element to process
+ // and NOT
+ ! (iter.bucket != currBucket)); // On bucket boundary
+
+ if (! iter.isNull())
+ {
+ /* Have another element to process, but we're on a bucket boundary
+ * Return to caller to decide if we've done enough.
+ */
+ ndbrequire(iter.bucket > cursor); // Require progress through buckets
+
+ cursor= iter.bucket;
+ return rtUnitsUsed;
+ }
+ } // if (haveElement)
+
+ cursor= 0;
+ return rtUnitsUsed;
+}
+
+
+Uint32
+SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
+ Uint32& cursor)
+{
+ ljam();
+
+ Ptr<FragmentSendInfo> fragPtr;
+ Uint32 sendsFound= 0;
+ ndbrequire(cursor < 2);
+
+ DLList<FragmentSendInfo>* fragSendLists[2]=
+ { &c_segmentedFragmentSendList,
+ &c_linearFragmentSendList };
+
+ DLList<FragmentSendInfo>* list= fragSendLists[ cursor ];
+
+ list->first(fragPtr);
+ for(; !fragPtr.isNull();){
+ ljam();
+ Ptr<FragmentSendInfo> copyPtr = fragPtr;
+ list->next(fragPtr);
+ sendsFound++;
+
+ NodeReceiverGroup& rg= copyPtr.p->m_nodeReceiverGroup;
+
+ if (rg.m_nodes.get(failedNodeId))
+ {
+ ljam();
+ /* Fragmented signal is being sent to node */
+ rg.m_nodes.clear(failedNodeId);
+
+ if (rg.m_nodes.isclear())
+ {
+ ljam();
+ /* No other nodes in receiver group - send
+ * is cancelled
+ * Will be cleaned up in the usual CONTINUE_FRAGMENTED
+ * handling code.
+ */
+ copyPtr.p->m_status= FragmentSendInfo::SendCancelled;
+ }
+ }
+ }
+
+ /* Move from segmented to linear, linear to finished */
+ cursor= (cursor + 1) & 1;
+
+ return sendsFound;
+}
+
+void
+SimulatedBlock::doNodeFailureCleanup(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 resource,
+ Uint32 cursor,
+ Callback& cb)
+{
+ ljam();
+ const bool userCallback= (cb.m_callbackFunction != 0);
+ const Uint32 maxRtUnits= userCallback ?
+ 16 :
+ ~0; /* Must complete all processing in this call */
+
+ Uint32 rtUnitsUsed= 0;
+
+ /* Loop over resources, cleaning them up */
+ while (resource < ContinueFragmented::RES_LAST)
+ {
+ switch(resource) {
+ case ContinueFragmented::RES_FRAGINFO:
+ {
+ ljam();
+ rtUnitsUsed+= doCleanupFragInfo(failedNodeId, cursor);
+ break;
+ }
+ case ContinueFragmented::RES_FRAGSEND:
+ {
+ ljam();
+ rtUnitsUsed+= doCleanupFragSend(failedNodeId, cursor);
+ break;
+ }
+ default:
+ ndbrequire(false);
+ }
+
+ /* Did we complete cleaning up this resource? */
+ if (cursor == 0)
+ resource++;
+
+ /* Did we consume all available time? */
+ if (rtUnitsUsed > maxRtUnits)
+ break;
+ }
+
+ if (resource == ContinueFragmented::RES_LAST)
+ {
+ ljam();
+ /* Node failure processing complete, execute user callback if provided */
+ if (userCallback)
+ execute(signal, cb, 0);
+ }
+ else
+ {
+ ljam();
+ /* Not yet completed failure handling.
+ * Must have exhausted RT units.
+ * Update cursor and re-invoke
+ */
+ ndbrequire(rtUnitsUsed > maxRtUnits);
+ ndbrequire(userCallback);
+
+ /* Send signal to continue processing */
+
+ ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type= ContinueFragmented::CONTINUE_CLEANUP;
+ sig->failedNodeId= failedNodeId;
+ sig->resource= resource;
+ sig->cursor= cursor;
+ Uint32 callbackWords= (sizeof(Callback) + 3) >> 2;
+ Uint32 sigLen= ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
+ callbackWords;
+ ndbassert(sigLen <= 25); // Should be STATIC_ASSERT
+ memcpy(&sig->callbackStart, &cb, callbackWords << 2);
+
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
+ }
+}
+
+void
+SimulatedBlock::handleNodeFailure(Signal* signal,
+ Uint32 failedNodeId,
+ Callback& cb)
+{
+ doNodeFailureCleanup(signal, failedNodeId, 0, 0, cb);
+}
+
+
+
bool
SimulatedBlock::sendFirstFragment(FragmentSendInfo & info,
NodeReceiverGroup rg,
@@ -1207,6 +1477,29 @@ void
SimulatedBlock::sendNextSegmentedFragment(Signal* signal,
FragmentSendInfo & info){
+ if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
+ {
+ /* Send was cancelled - all dest. nodes have failed
+ * since send was started
+ *
+ * Free any sections still to be sent
+ */
+ for (Uint32 s= 0; s < 3; s++)
+ {
+ Uint32& sectionI= info.m_sectionPtr[s].m_segmented.i;
+ if (sectionI != RNIL)
+ g_sectionSegmentPool.release(sectionI);
+ sectionI= RNIL;
+ }
+
+ /* Free inline signal data storage section */
+ Uint32 inlineDataI= info.m_theDataSection.p[info.m_theDataSection.sz];
+ g_sectionSegmentPool.release(inlineDataI);
+
+ info.m_status = FragmentSendInfo::SendComplete;
+ return;
+ }
+
/**
* Store "theData"
*/
@@ -1446,6 +1739,19 @@ void
SimulatedBlock::sendNextLinearFragment(Signal* signal,
FragmentSendInfo & info){
+ if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
+ {
+ /* Send was cancelled - all dest. nodes have failed
+ * since send was started
+ */
+ /* Free inline signal data storage section */
+ Uint32 inlineDataI= info.m_theDataSection.p[info.m_theDataSection.sz];
+ g_sectionSegmentPool.release(inlineDataI);
+
+ info.m_status = FragmentSendInfo::SendComplete;
+ return;
+ }
+
/**
* Store "theData"
*/
@@ -1617,8 +1923,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type= ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
@@ -1655,8 +1962,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type= ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
@@ -1704,8 +2012,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type= ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
@@ -1746,8 +2055,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
if(!c_fragSenderRunning){
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
+ sig->type= ContinueFragmented::CONTINUE_SENDING;
sig->line = __LINE__;
- sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB);
+ sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
}
}
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.hpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.hpp 2009-08-12 13:24:40 +0000
@@ -237,6 +237,43 @@ protected:
Callback &,
Uint32 messageSize = 240);
+ /**
+ * handleNodeFailure
+ *
+ * Method must be called by blocks that receive Fragmented
+ * Signals when they detect a node (NDBD or API) failure.
+ * If the block needs to acknowledge or perform further
+ * processing after completing block-level node failure
+ * handling, it can supply a Callback which will be invoked
+ * when block-level node failure handling has completed.
+ * Otherwise TheEmptyCallback is used.
+ * If TheEmptyCallback is used, all failure handling is
+ * performed in the current timeslice, to avoid any
+ * races.
+ *
+ * Parameters
+ * signal : Current signal*
+ * failedNodeId : Node id of failed node
+ * cb : Callback to be executed when block-level
+ * node failure handling completed.
+ * TheEmptyCallback is passed if no further
+ * processing is required.
+ */
+ void handleNodeFailure(Signal* signal,
+ Uint32 failedNodeId,
+ Callback& cb = TheEmptyCallback);
+
+ void doNodeFailureCleanup(Signal* signal,
+ Uint32 failedNodeId,
+ Uint32 resource,
+ Uint32 cursor,
+ Callback& cb);
+
+ Uint32 doCleanupFragInfo(Uint32 failedNodeId,
+ Uint32& cursor);
+ Uint32 doCleanupFragSend(Uint32 failedNodeId,
+ Uint32& cursor);
+
/**********************************************************
* Fragmented signals structures
*/
@@ -273,7 +310,8 @@ protected:
enum Status {
SendNotComplete = 0,
- SendComplete = 1
+ SendComplete = 1,
+ SendCancelled = 2
};
Uint8 m_status;
Uint8 m_prio;
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.2 branch (frazer:2962) Bug#44607 | Frazer Clement | 12 Aug |