4130 jonas oreland 2011-01-18
ndb - change cluster-mgr to handle transporter-callback by sending signals to self, meaning that all api nodes (previously only ndb_mgmd) have a loopback transporter to self
modified:
storage/ndb/include/kernel/signaldata/DisconnectRep.hpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/ndbapi/ClusterMgr.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/trp_node.cpp
storage/ndb/src/ndbapi/trp_node.hpp
4129 jonas oreland 2011-01-18
ndb - cleanup ext_update_connections, and add check that node is not already connected in try_alloc
modified:
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/ndbapi/ClusterMgr.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
4128 jonas oreland 2011-01-18
ndb - remove special handling of self (own-node-id), use same code as for others (using loopback transporter)
modified:
storage/ndb/src/mgmsrv/ConfigManager.cpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
4127 jonas oreland 2011-01-18
ndb - put most offending program (currently) last
modified:
storage/ndb/test/ndbapi/testMgmd.cpp
=== modified file 'storage/ndb/include/kernel/signaldata/DisconnectRep.hpp'
--- a/storage/ndb/include/kernel/signaldata/DisconnectRep.hpp 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/kernel/signaldata/DisconnectRep.hpp 2011-01-18 07:39:47 +0000
@@ -24,12 +24,14 @@
/**
*
*/
-class DisconnectRep {
+struct DisconnectRep
+{
/**
* Receiver(s)
*/
friend class Qmgr;
friend class Cmvmi; // Cmvmi
+ friend class ClusterMgr;
/**
* Senders
@@ -43,7 +45,6 @@ class DisconnectRep {
*/
friend bool printDISCONNECT_REP(FILE *, const Uint32 *, Uint32, Uint16);
-public:
STATIC_CONST( SignalLength = 2 );
enum ErrCode {
@@ -54,8 +55,6 @@ public:
TcReportNodeFailed = 0xFF000001
};
-private:
-
Uint32 nodeId;
Uint32 err;
};
=== modified file 'storage/ndb/src/mgmsrv/ConfigManager.cpp'
--- a/storage/ndb/src/mgmsrv/ConfigManager.cpp 2011-01-11 19:13:37 +0000
+++ b/storage/ndb/src/mgmsrv/ConfigManager.cpp 2011-01-18 07:35:35 +0000
@@ -1752,8 +1752,6 @@ ConfigManager::run()
// Build bitmaks of all mgm nodes in config
m_config->get_nodemask(m_all_mgm, NDB_MGM_NODE_TYPE_MGM);
- m_started.set(m_facade->ownId());
-
// exclude nowait-nodes from config change protcol
m_all_mgm.bitANDC(m_opts.nowait_nodes);
m_all_mgm.set(m_facade->ownId()); // Never exclude own node
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2011-01-10 13:30:14 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2011-01-18 07:39:47 +0000
@@ -434,14 +434,6 @@ MgmtSrvr::start_transporter(const Config
DBUG_RETURN(false);
}
- /**
- * Wait for loopback interface to be enabled
- */
- while (!theFacade->ext_isConnected(_ownNodeId))
- {
- NdbSleep_MilliSleep(20);
- }
-
_ownReference = numberToRef(_blockNumber, _ownNodeId);
/*
@@ -2857,6 +2849,10 @@ MgmtSrvr::try_alloc(unsigned id, const c
const struct sockaddr *client_addr,
Uint32 timeout_ms)
{
+ if (theFacade && theFacade->ext_isConnected(id))
+ {
+ return -1;
+ }
if (client_addr != 0)
{
int res = alloc_node_id_req(id, type, timeout_ms);
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2011-01-10 13:30:14 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2011-01-18 07:39:47 +0000
@@ -141,12 +141,6 @@ ClusterMgr::configure(Uint32 nodeId,
theNodes[i]= Node();
}
- /* Init own node info */
- trp_node &node= theNodes[getOwnNodeId()];
- assert(node.defined);
- node.set_connected(true);
- node.set_confirmed(true);
-
#if 0
print_nodes("init");
#endif
@@ -181,12 +175,19 @@ void
ClusterMgr::startThread() {
Guard g(clusterMgrThreadMutex);
- theStop = 0;
+ theStop = -1;
theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
(void**)this,
0, // default stack size
"ndb_clustermgr",
NDB_THREAD_PRIO_HIGH);
+ Uint32 cnt = 0;
+ while (theStop == -1 && cnt < 60)
+ {
+ NdbCondition_WaitTimeout(waitForHBCond, clusterMgrThreadMutex, 1000);
+ }
+
+ assert(theStop == 0);
}
void
@@ -262,7 +263,6 @@ ClusterMgr::forceHB()
req->mysql_version = NDB_MYSQL_VERSION_D;
{
- Guard g(clusterMgrThreadMutex);
lock();
int nodeId= 0;
for(int i=0;
@@ -289,15 +289,40 @@ ClusterMgr::forceHB()
}
void
-ClusterMgr::force_update_connections()
+ClusterMgr::startup()
{
- theFacade.lock_mutex();
- theFacade.theTransporterRegistry->update_connections();
- theFacade.unlock_mutex();
+ assert(theStop == -1);
+ Uint32 nodeId = getOwnNodeId();
+ Node & cm_node = theNodes[nodeId];
+ trp_node & theNode = cm_node;
+ assert(theNode.defined);
+ assert(theNode.is_connected() == false);
+
+ lock();
+ theFacade.doConnect(nodeId);
+ unlock();
+
+ for (Uint32 i = 0; i<3000; i++)
+ {
+ lock();
+ theFacade.theTransporterRegistry->update_connections();
+ unlock();
+ if (theNode.is_connected())
+ break;
+ NdbSleep_MilliSleep(20);
+ }
+
+ assert(theNode.is_connected());
+ Guard g(clusterMgrThreadMutex);
+ theStop = 0;
+ NdbCondition_Broadcast(waitForHBCond);
}
void
-ClusterMgr::threadMain( ){
+ClusterMgr::threadMain()
+{
+ startup();
+
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
signal.theVerId_signalNumber = GSN_API_REGREQ;
@@ -309,6 +334,12 @@ ClusterMgr::threadMain( ){
req->version = NDB_VERSION;
req->mysql_version = NDB_MYSQL_VERSION_D;
+ NdbApiSignal nodeFail_signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
+ nodeFail_signal.theVerId_signalNumber = GSN_NODE_FAILREP;
+ nodeFail_signal.theReceiversBlockNumber = API_CLUSTERMGR;
+ nodeFail_signal.theTrace = 0;
+ nodeFail_signal.theLength = NodeFailRep::SignalLengthLong;
+
NDB_TICKS timeSlept = 100;
NDB_TICKS now = NdbTick_CurrentMillisecond();
@@ -346,7 +377,13 @@ ClusterMgr::threadMain( ){
m_cluster_state = CS_waiting_for_first_connect;
}
- lock();
+
+ NodeFailRep * nodeFailRep = CAST_PTR(NodeFailRep,
+ nodeFail_signal.getDataPtrSend());
+ nodeFailRep->noOfNodes = 0;
+ NodeBitmask::clear(nodeFailRep->theNodes);
+
+ trp_client::lock();
for (int i = 1; i < MAX_NODES; i++){
/**
* Send register request (heartbeat) to all available nodes
@@ -358,9 +395,6 @@ ClusterMgr::threadMain( ){
Node & cm_node = theNodes[nodeId];
trp_node & theNode = cm_node;
- if (nodeId == getOwnNodeId())
- continue;
-
if (!theNode.defined)
continue;
@@ -373,6 +407,15 @@ ClusterMgr::threadMain( ){
continue;
}
+ if (nodeId == getOwnNodeId() && theNode.is_confirmed())
+ {
+ /**
+ * Don't send HB to self more than once
+ * (once needed to avoid weird special cases in e.g ConfigManager)
+ */
+ continue;
+ }
+
cm_node.hbCounter += (Uint32)timeSlept;
if (cm_node.hbCounter >= m_max_api_reg_req_interval ||
cm_node.hbCounter >= cm_node.hbFrequency)
@@ -397,11 +440,18 @@ ClusterMgr::threadMain( ){
raw_sendSignal(&signal, nodeId);
}//if
- if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0){
- reportNodeFailed(i);
- }//if
+ if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0)
+ {
+ nodeFailRep->noOfNodes++;
+ NodeBitmask::set(nodeFailRep->theNodes, nodeId);
+ }
}
- unlock();
+
+ if (nodeFailRep->noOfNodes)
+ {
+ raw_sendSignal(&nodeFail_signal, getOwnNodeId());
+ }
+ trp_client::unlock();
}
}
@@ -418,20 +468,15 @@ ClusterMgr::trp_deliver_signal(const Ndb
break;
case GSN_API_REGCONF:
- {
- execAPI_REGCONF(theData);
-
- // Distribute signal to all threads/blocks
- theFacade.for_each(this, sig, ptr);
+ execAPI_REGCONF(sig, ptr);
break;
- }
case GSN_API_REGREF:
execAPI_REGREF(theData);
break;
case GSN_NODE_FAILREP:
- execNODE_FAILREP(theData);
+ execNODE_FAILREP(sig, ptr);
break;
case GSN_NF_COMPLETEREP:
@@ -499,6 +544,16 @@ ClusterMgr::trp_deliver_signal(const Ndb
theFacade.for_each(this, sig, ptr);
return;
}
+ case GSN_CONNECT_REP:
+ {
+ execCONNECT_REP(sig, ptr);
+ return;
+ }
+ case GSN_DISCONNECT_REP:
+ {
+ execDISCONNECT_REP(sig, ptr);
+ return;
+ }
default:
break;
@@ -628,8 +683,11 @@ ClusterMgr::execAPI_REGREQ(const Uint32
}
void
-ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
- const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
+ClusterMgr::execAPI_REGCONF(const NdbApiSignal * signal,
+ const LinearSectionPtr ptr[])
+{
+ const ApiRegConf * apiRegConf = CAST_CONSTPTR(ApiRegConf,
+ signal->getDataPtr());
const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
#ifdef DEBUG_REG
@@ -688,6 +746,10 @@ ClusterMgr::execAPI_REGCONF(const Uint32
cm_node.hbCounter = 0;
cm_node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
+ // Distribute signal to all threads/blocks
+ // TODO only if state changed...
+ theFacade.for_each(this, signal, ptr);
+
check_wait_for_hb(nodeId);
}
@@ -742,17 +804,6 @@ ClusterMgr::execAPI_REGREF(const Uint32
check_wait_for_hb(nodeId);
}
-
-void
-ClusterMgr::execNODE_FAILREP(const Uint32 * theData){
- const NodeFailRep * const nodeFail = (NodeFailRep *)&theData[0];
- for(int i = 1; i < MAX_NDB_NODES; i++){
- if(NdbNodeBitmask::get(nodeFail->theNodes, i)){
- reportNodeFailed(i);
- }
- }
-}
-
void
ClusterMgr::execNF_COMPLETEREP(const NdbApiSignal* signal,
const LinearSectionPtr ptr[3])
@@ -781,6 +832,10 @@ ClusterMgr::reportConnected(NodeId nodeI
* us with the real time-out period to use.
*/
assert(nodeId > 0 && nodeId < MAX_NODES);
+ if (nodeId == getOwnNodeId())
+ {
+ noOfConnectedNodes--; // Don't count self...
+ }
noOfConnectedNodes++;
@@ -791,6 +846,8 @@ ClusterMgr::reportConnected(NodeId nodeI
cm_node.hbCounter = 0;
cm_node.hbFrequency = 0;
+ assert(theNode.is_connected() == false);
+
/**
* make sure the node itself is marked connected even
* if first API_REGCONF has not arrived
@@ -800,63 +857,112 @@ ClusterMgr::reportConnected(NodeId nodeI
theNode.m_info.m_version = 0;
theNode.compatible = true;
theNode.nfCompleteRep = true;
+ theNode.m_node_fail_rep = false;
theNode.m_state.startLevel = NodeState::SL_NOTHING;
theNode.minDbVersion = 0;
+ /**
+ * We know that we have clusterMgrThreadMutex and trp_client::mutex
+ * but we don't know if we are polling...and for_each can
+ * only be used by a poller...
+ *
+ * Send signal to self, so that we can do this when receiving a signal
+ */
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
signal.theVerId_signalNumber = GSN_CONNECT_REP;
- signal.theReceiversBlockNumber = 0;
+ signal.theReceiversBlockNumber = API_CLUSTERMGR;
signal.theTrace = 0;
signal.theLength = 1;
signal.getDataPtrSend()[0] = nodeId;
- theFacade.for_each(this, &signal, 0);
+ raw_sendSignal(&signal, getOwnNodeId());
DBUG_VOID_RETURN;
}
void
+ClusterMgr::execCONNECT_REP(const NdbApiSignal* sig,
+ const LinearSectionPtr ptr[])
+{
+ theFacade.for_each(this, sig, 0);
+}
+
+void
+ClusterMgr::set_node_dead(trp_node& theNode)
+{
+ set_node_alive(theNode, false);
+ theNode.set_confirmed(false);
+ theNode.m_state.m_connected_nodes.clear();
+ theNode.m_state.startLevel = NodeState::SL_NOTHING;
+ theNode.m_info.m_connectCount ++;
+ theNode.nfCompleteRep = false;
+}
+
+void
ClusterMgr::reportDisconnected(NodeId nodeId)
{
assert(nodeId > 0 && nodeId < MAX_NODES);
assert(noOfConnectedNodes > 0);
- reportNodeFailed(nodeId, true);
+ /**
+ * We know that we have clusterMgrThreadMutex and trp_client::mutex
+ * but we don't know if we are polling...and for_each can
+ * only be used by a poller...
+ *
+ * Send signal to self, so that we can do this when receiving a signal
+ */
+ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
+ signal.theVerId_signalNumber = GSN_DISCONNECT_REP;
+ signal.theReceiversBlockNumber = API_CLUSTERMGR;
+ signal.theTrace = 0;
+ signal.theLength = DisconnectRep::SignalLength;
+
+ DisconnectRep * rep = CAST_PTR(DisconnectRep, signal.getDataPtrSend());
+ rep->nodeId = nodeId;
+ rep->err = 0;
+ raw_sendSignal(&signal, getOwnNodeId());
}
void
-ClusterMgr::reportNodeFailed(NodeId nodeId, bool disconnect)
+ClusterMgr::execDISCONNECT_REP(const NdbApiSignal* sig,
+ const LinearSectionPtr ptr[])
{
- // Check array bounds + don't allow node 0 to be touched
+ const DisconnectRep * rep = CAST_CONSTPTR(DisconnectRep, sig->getDataPtr());
+ Uint32 nodeId = rep->nodeId;
+
assert(nodeId > 0 && nodeId < MAX_NODES);
Node & cm_node = theNodes[nodeId];
trp_node & theNode = cm_node;
- set_node_alive(theNode, false);
- theNode.m_info.m_connectCount ++;
+ bool node_failrep = theNode.m_node_fail_rep;
+ set_node_dead(theNode);
+ theNode.set_connected(false);
- if (disconnect)
- {
- noOfConnectedNodes--;
- theNode.set_confirmed(false);
- theNode.set_connected(false);
- theNode.m_state.m_connected_nodes.clear();
- }
-
- if (theNode.is_connected())
+ noOfConnectedNodes--;
+ if (noOfConnectedNodes == 0)
{
- theFacade.doDisconnect(nodeId);
+ if (!global_flag_skip_invalidate_cache &&
+ theFacade.m_globalDictCache)
+ {
+ theFacade.m_globalDictCache->lock();
+ theFacade.m_globalDictCache->invalidate_all();
+ theFacade.m_globalDictCache->unlock();
+ m_connect_count ++;
+ m_cluster_state = CS_waiting_for_clean_cache;
+ }
+
+ if (m_auto_reconnect == 0)
+ {
+ theStop = 2;
+ }
}
- if (theNode.m_info.getType() == NodeInfo::DB)
- recalcMinDbVersion();
-
- const bool report = (theNode.m_state.startLevel != NodeState::SL_NOTHING);
- theNode.m_state.startLevel = NodeState::SL_NOTHING;
-
- if (disconnect || report)
+ if (node_failrep == false)
{
+ /**
+ * Inform API
+ */
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
signal.theVerId_signalNumber = GSN_NODE_FAILREP;
- signal.theReceiversBlockNumber = 0;
+ signal.theReceiversBlockNumber = API_CLUSTERMGR;
signal.theTrace = 0;
signal.theLength = NodeFailRep::SignalLengthLong;
@@ -866,27 +972,58 @@ ClusterMgr::reportNodeFailed(NodeId node
rep->noOfNodes = 1;
NodeBitmask::clear(rep->theNodes);
NodeBitmask::set(rep->theNodes, nodeId);
- theFacade.for_each(this, &signal, 0);
+ execNODE_FAILREP(&signal, 0);
}
+}
+
+void
+ClusterMgr::execNODE_FAILREP(const NdbApiSignal* sig,
+ const LinearSectionPtr ptr[])
+{
+ const NodeFailRep * rep = CAST_CONSTPTR(NodeFailRep, sig->getDataPtr());
+
+ NdbApiSignal signal(sig->theSendersBlockRef);
+ signal.theVerId_signalNumber = GSN_NODE_FAILREP;
+ signal.theReceiversBlockNumber = API_CLUSTERMGR;
+ signal.theTrace = 0;
+ signal.theLength = NodeFailRep::SignalLengthLong;
- if (noOfConnectedNodes == 0)
- {
- if (!global_flag_skip_invalidate_cache &&
- theFacade.m_globalDictCache)
+ NodeFailRep * copy = CAST_PTR(NodeFailRep, signal.getDataPtrSend());
+ copy->failNo = 0;
+ copy->masterNodeId = 0;
+ copy->noOfNodes = 0;
+ NodeBitmask::clear(copy->theNodes);
+
+ for (Uint32 i = NdbNodeBitmask::find_first(rep->theNodes);
+ i != NdbNodeBitmask::NotFound;
+ i = NdbNodeBitmask::find_next(rep->theNodes, i + 1))
+ {
+ Node & cm_node = theNodes[i];
+ trp_node & theNode = cm_node;
+
+ bool node_failrep = theNode.m_node_fail_rep;
+ bool connected = theNode.is_connected();
+ set_node_dead(theNode);
+
+ if (node_failrep == false)
{
- theFacade.m_globalDictCache->lock();
- theFacade.m_globalDictCache->invalidate_all();
- theFacade.m_globalDictCache->unlock();
- m_connect_count ++;
- m_cluster_state = CS_waiting_for_clean_cache;
+ theNode.m_node_fail_rep = true;
+ NodeBitmask::set(copy->theNodes, i);
+ copy->noOfNodes++;
}
- if (m_auto_reconnect == 0)
+ if (connected)
{
- theStop = 2;
+ theFacade.doDisconnect(i);
}
}
- theNode.nfCompleteRep = false;
+
+ recalcMinDbVersion();
+ if (copy->noOfNodes)
+ {
+ theFacade.for_each(this, &signal, 0); // report GSN_NODE_FAILREP
+ }
+
if (noOfAliveNodes == 0)
{
NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
@@ -913,7 +1050,6 @@ ClusterMgr::reportNodeFailed(NodeId node
}
}
-
void
ClusterMgr::print_nodes(const char* where, NdbOut& out)
{
@@ -1287,7 +1423,6 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal&
#endif
{
- Guard g(m_clusterMgr.clusterMgrThreadMutex);
m_clusterMgr.lock();
m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
m_clusterMgr.unlock();
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.hpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.hpp 2011-01-10 13:30:14 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp 2011-01-18 07:39:47 +0000
@@ -27,6 +27,7 @@
#include <signaldata/NodeStateSignalData.hpp>
#include "trp_client.hpp"
#include "trp_node.hpp"
+#include <signaldata/DisconnectRep.hpp>
extern "C" void* runClusterMgr_C(void * me);
@@ -56,9 +57,12 @@ public:
void set_max_api_reg_req_interval(unsigned int millisec) {
m_max_api_reg_req_interval = millisec;
}
- void force_update_connections();
+
+ void lock() { NdbMutex_Lock(clusterMgrThreadMutex); trp_client::lock(); }
+ void unlock() { trp_client::unlock();NdbMutex_Unlock(clusterMgrThreadMutex); }
private:
+ void startup();
void threadMain();
int theStop;
@@ -108,15 +112,15 @@ private:
*/
NdbMutex* clusterMgrThreadMutex;
- void reportNodeFailed(NodeId nodeId, bool disconnect = false);
-
/**
* Signals received
*/
void execAPI_REGREQ (const Uint32 * theData);
- void execAPI_REGCONF (const Uint32 * theData);
+ void execAPI_REGCONF (const NdbApiSignal*, const LinearSectionPtr ptr[]);
void execAPI_REGREF (const Uint32 * theData);
- void execNODE_FAILREP (const Uint32 * theData);
+ void execCONNECT_REP (const NdbApiSignal*, const LinearSectionPtr ptr[]);
+ void execDISCONNECT_REP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
+ void execNODE_FAILREP (const NdbApiSignal*, const LinearSectionPtr ptr[]);
void execNF_COMPLETEREP(const NdbApiSignal*, const LinearSectionPtr ptr[]);
void check_wait_for_hb(NodeId nodeId);
@@ -139,6 +143,8 @@ private:
node.m_alive = alive;
}
+ void set_node_dead(trp_node&);
+
void print_nodes(const char* where, NdbOut& out = ndbout);
void recalcMinDbVersion();
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-01-10 13:30:14 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2011-01-18 07:39:47 +0000
@@ -485,9 +485,9 @@ void TransporterFacade::threadMainReceiv
#endif
while(!theStopReceive)
{
- NdbMutex_Lock(theMutexPtr);
+ theClusterMgr->lock();
theTransporterRegistry->update_connections();
- NdbMutex_Unlock(theMutexPtr);
+ theClusterMgr->unlock();
NdbSleep_MilliSleep(100);
}//while
theTransporterRegistry->stopReceiving();
@@ -599,14 +599,6 @@ TransporterFacade::do_connect_mgm(NodeId
}
}
- /**
- * Also setup Loopback Transporter
- */
- if (is_mgmd(nodeId, conf))
- {
- doConnect(nodeId);
- }
-
DBUG_RETURN(true);
}
@@ -624,7 +616,7 @@ TransporterFacade::configure(NodeId node
if (!IPCConfig::configureTransporters(nodeId,
* conf,
* theTransporterRegistry,
- is_mgmd(nodeId, conf)))
+ true))
DBUG_RETURN(false);
// Configure cluster manager
@@ -663,7 +655,12 @@ TransporterFacade::configure(NodeId node
// Open connection between MGM servers
if (!do_connect_mgm(nodeId, conf))
DBUG_RETURN(false);
-
+
+ /**
+ * Also setup Loopback Transporter
+ */
+ doConnect(nodeId);
+
DBUG_RETURN(true);
}
@@ -874,7 +871,9 @@ TransporterFacade::sendSignal(const NdbA
if (ss == SEND_OK)
{
assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
- aSignal->readSignalNumber() == GSN_API_REGREQ);
+ aSignal->readSignalNumber() == GSN_API_REGREQ ||
+ (aSignal->readSignalNumber() == GSN_CONNECT_REP &&
+ aNode == ownId()));
}
return (ss == SEND_OK ? 0 : -1);
}
@@ -1978,7 +1977,9 @@ TransporterFacade::ext_set_max_api_reg_r
void
TransporterFacade::ext_update_connections()
{
- theClusterMgr->force_update_connections();
+ theClusterMgr->lock();
+ theTransporterRegistry->update_connections();
+ theClusterMgr->unlock();
}
struct in_addr
@@ -1996,14 +1997,19 @@ TransporterFacade::ext_forceHB()
bool
TransporterFacade::ext_isConnected(NodeId aNodeId)
{
- return theTransporterRegistry->is_connected(aNodeId);
+ bool val;
+ theClusterMgr->lock();
+ val = theClusterMgr->theNodes[aNodeId].is_connected();
+ theClusterMgr->unlock();
+ return val;
}
void
TransporterFacade::ext_doConnect(int aNodeId)
{
- lock_mutex();
+ theClusterMgr->lock();
+ assert(theClusterMgr->theNodes[aNodeId].is_connected() == false);
doConnect(aNodeId);
- unlock_mutex();
+ theClusterMgr->unlock();
}
=== modified file 'storage/ndb/src/ndbapi/trp_node.cpp'
--- a/storage/ndb/src/ndbapi/trp_node.cpp 2010-12-21 11:43:32 +0000
+++ b/storage/ndb/src/ndbapi/trp_node.cpp 2011-01-18 07:39:47 +0000
@@ -21,7 +21,7 @@
trp_node::trp_node()
{
compatible = nfCompleteRep = true;
- m_connected = defined = m_alive = m_api_reg_conf = false;
+ m_connected = defined = m_alive = m_api_reg_conf = m_node_fail_rep = false;
bzero(&m_state, sizeof(m_state));
m_state.init();
m_state.startLevel = NodeState::SL_NOTHING;
@@ -37,6 +37,7 @@ trp_node::operator==(const trp_node& oth
defined == other.defined &&
m_alive == other.m_alive &&
m_api_reg_conf == other.m_api_reg_conf &&
+ m_node_fail_rep == other.m_node_fail_rep &&
minDbVersion == other.minDbVersion &&
memcmp(&m_state, &other.m_state, sizeof(m_state)) == 0);
}
@@ -50,6 +51,7 @@ operator<<(NdbOut& out, const trp_node&
<< ", connected: " << n.m_connected
<< ", api_reg_conf: " << n.m_api_reg_conf
<< ", alive: " << n.m_alive
+ << ", nodefailrep: " << n.m_node_fail_rep
<< ", nfCompleteRep: " << n.nfCompleteRep
<< ", minDbVersion: " << n.minDbVersion
<< ", state: " << n.m_state
=== modified file 'storage/ndb/src/ndbapi/trp_node.hpp'
--- a/storage/ndb/src/ndbapi/trp_node.hpp 2010-12-21 11:43:32 +0000
+++ b/storage/ndb/src/ndbapi/trp_node.hpp 2011-01-18 07:39:47 +0000
@@ -28,15 +28,21 @@ NdbOut& operator<<(NdbOut&, const struct
struct trp_node
{
trp_node();
- bool defined;
- bool compatible; // Version is compatible
- bool nfCompleteRep; // NF Complete Rep has arrived
- bool m_alive; // Node is alive
- Uint32 minDbVersion;
NodeInfo m_info;
NodeState m_state;
+ Uint32 minDbVersion;
+ bool defined;
+ bool compatible; // Version is compatible
+ bool nfCompleteRep; // NF Complete Rep has arrived
+ bool m_alive; // Node is alive
+ bool m_node_fail_rep;// NodeFailRep has arrived
+private:
+ bool m_connected; // Transporter connected
+ bool m_api_reg_conf;// API_REGCONF has arrived
+public:
+
void set_connected(bool connected) {
assert(defined);
m_connected = connected;
@@ -50,7 +56,8 @@ struct trp_node
}
void set_confirmed(bool confirmed) {
- assert(is_connected()); // Must be connected to change confirmed
+ if (confirmed)
+ assert(is_connected());
m_api_reg_conf = confirmed;
}
@@ -64,8 +71,6 @@ struct trp_node
bool operator==(const trp_node& other) const;
private:
- bool m_connected; // Transporter connected
- bool m_api_reg_conf;// API_REGCONF has arrived
friend NdbOut& operator<<(NdbOut&, const trp_node&);
};
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas:4127 to 4130) | jonas oreland | 18 Jan |