From: Jonas Oreland Date: November 16 2011 4:33pm Subject: bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4669 to 4670) List-Archive: http://lists.mysql.com/commits/141999 Message-Id: <20111116163348.6031617125@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4670 Jonas Oreland 2011-11-16 ndb - revert trpman...red in CluB...need to investigate why...(don't get red on local machine :-( modified: mysql-test/suite/ndb/r/ndbinfo.result mysql-test/suite/ndb/r/ndbinfo_dump.result storage/ndb/include/kernel/BlockNumbers.h storage/ndb/include/kernel/GlobalSignalNumbers.h storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp storage/ndb/include/kernel/signaldata/DisconnectRep.hpp storage/ndb/include/kernel/signaldata/EnableCom.hpp storage/ndb/include/kernel/signaldata/RouteOrd.hpp storage/ndb/src/common/debugger/BlockNames.cpp storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/blocks/CMakeLists.txt storage/ndb/src/kernel/blocks/Makefile.am storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt_thr_config.cpp 4669 Jonas Oreland 2011-11-16 ndb - this patch moves transporter handling logic out from cmvmi into trpman (transporter manager) added: storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/blocks/trpman.hpp modified: mysql-test/suite/ndb/r/ndbinfo.result mysql-test/suite/ndb/r/ndbinfo_dump.result storage/ndb/include/kernel/BlockNumbers.h storage/ndb/include/kernel/GlobalSignalNumbers.h storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp storage/ndb/include/kernel/signaldata/DisconnectRep.hpp storage/ndb/include/kernel/signaldata/EnableCom.hpp storage/ndb/include/kernel/signaldata/RouteOrd.hpp storage/ndb/src/common/debugger/BlockNames.cpp storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/blocks/CMakeLists.txt storage/ndb/src/kernel/blocks/Makefile.am storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt_thr_config.cpp === modified file 'mysql-test/suite/ndb/r/ndbinfo.result' --- a/mysql-test/suite/ndb/r/ndbinfo.result 2011-11-16 15:38:25 +0000 +++ b/mysql-test/suite/ndb/r/ndbinfo.result 2011-11-16 16:23:37 +0000 @@ -347,7 +347,6 @@ RESTORE SUMA THRMAN TRIX -TRPMAN TSMAN desc threadstat; Field Type Null Key Default Extra === modified file 'mysql-test/suite/ndb/r/ndbinfo_dump.result' --- a/mysql-test/suite/ndb/r/ndbinfo_dump.result 2011-11-16 15:38:25 +0000 +++ b/mysql-test/suite/ndb/r/ndbinfo_dump.result 2011-11-16 16:23:37 +0000 @@ -1,7 +1,7 @@ USE ndbinfo; select count(*) from blocks; count(*) -23 +22 select count(*) from blocks; count(*) -23 +22 === modified file 'storage/ndb/include/kernel/BlockNumbers.h' --- a/storage/ndb/include/kernel/BlockNumbers.h 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/include/kernel/BlockNumbers.h 2011-11-16 16:23:37 +0000 @@ -61,7 +61,6 @@ #define DBINFO 0x107 #define DBSPJ 0x108 #define THRMAN 0x109 -#define TRPMAN 0x10A const BlockReference BACKUP_REF = numberToRef(BACKUP, 0); const BlockReference DBTC_REF = numberToRef(DBTC, 0); @@ -85,7 +84,6 @@ const BlockReference RESTORE_REF = numbe const BlockReference DBINFO_REF = numberToRef(DBINFO, 0); const BlockReference DBSPJ_REF = numberToRef(DBSPJ, 0); const BlockReference THRMAN_REF = numberToRef(THRMAN, 0); -const BlockReference TRPMAN_REF = numberToRef(TRPMAN, 0); static inline void __hide_warnings_unused_ref_vars(void) { // Hide annoying warnings about unused variables @@ -96,11 +94,11 @@ static inline void __hide_warnings_unuse (void)DBUTIL_REF; (void)SUMA_REF; (void)DBTUX_REF; (void)TSMAN_REF; (void)LGMAN_REF; (void)PGMAN_REF; (void)RESTORE_REF; (void)DBINFO_REF; (void)DBSPJ_REF; - (void)THRMAN_REF; (void)TRPMAN_REF; + (void)THRMAN_REF; } const BlockNumber MIN_BLOCK_NO = BACKUP; -const BlockNumber MAX_BLOCK_NO = TRPMAN; +const BlockNumber MAX_BLOCK_NO = THRMAN; const BlockNumber NO_OF_BLOCKS = (MAX_BLOCK_NO - MIN_BLOCK_NO + 1); /** === modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h' --- a/storage/ndb/include/kernel/GlobalSignalNumbers.h 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h 2011-11-16 16:23:37 +0000 @@ -592,7 +592,7 @@ extern const GlobalSignalNumber NO_OF_SI #define GSN_NODE_PING_REQ 461 /* distr. */ #define GSN_NODE_PING_CONF 462 /* distr. */ -#define GSN_CANCEL_SUBSCRIPTION_REQ 463 +/* 463 unused */ /* 464 unused */ #define GSN_DUMP_STATE_ORD 465 === modified file 'storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp' --- a/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp 2011-11-16 16:23:37 +0000 @@ -34,7 +34,7 @@ class CloseComReqConf { * Sender(s) / Reciver(s) */ friend class Qmgr; - friend class Trpman; + friend class Cmvmi; /** * For printing === modified file 'storage/ndb/include/kernel/signaldata/DisconnectRep.hpp' --- a/storage/ndb/include/kernel/signaldata/DisconnectRep.hpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/include/kernel/signaldata/DisconnectRep.hpp 2011-11-16 16:23:37 +0000 @@ -29,7 +29,7 @@ struct DisconnectRep * Receiver(s) */ friend class Qmgr; - friend class Trpman; + friend class Cmvmi; // Cmvmi friend class ClusterMgr; /** === modified file 'storage/ndb/include/kernel/signaldata/EnableCom.hpp' --- a/storage/ndb/include/kernel/signaldata/EnableCom.hpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/include/kernel/signaldata/EnableCom.hpp 2011-11-16 16:23:37 +0000 @@ -20,7 +20,7 @@ class EnableComReq { friend class Qmgr; - friend class Trpman; + friend class Cmvmi; public: STATIC_CONST( SignalLength = 2 + NodeBitmask::Size ); @@ -33,7 +33,7 @@ private: class EnableComConf { friend class Qmgr; - friend class Trpman; + friend class Cmvmi; public: STATIC_CONST( SignalLength = 2 + NodeBitmask::Size ); === modified file 'storage/ndb/include/kernel/signaldata/RouteOrd.hpp' --- a/storage/ndb/include/kernel/signaldata/RouteOrd.hpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/include/kernel/signaldata/RouteOrd.hpp 2011-11-16 16:23:37 +0000 @@ -25,17 +25,13 @@ /** * Request to allocate node id */ -struct RouteOrd -{ +struct RouteOrd { STATIC_CONST( SignalLength = 4 ); Uint32 dstRef; Uint32 srcRef; Uint32 gsn; - union { - Uint32 cnt; - Uint32 from; - }; + Uint32 cnt; }; #endif === modified file 'storage/ndb/src/common/debugger/BlockNames.cpp' --- a/storage/ndb/src/common/debugger/BlockNames.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/common/debugger/BlockNames.cpp 2011-11-16 16:23:37 +0000 @@ -41,7 +41,6 @@ const BlockName BlockNames[] = { ,{ "DBINFO", DBINFO } ,{ "DBSPJ", DBSPJ } ,{ "THRMAN", THRMAN } - ,{ "TRPMAN", TRPMAN } }; const BlockNumber NO_OF_BLOCK_NAMES = sizeof(BlockNames) / sizeof(BlockName); === modified file 'storage/ndb/src/kernel/SimBlockList.cpp' --- a/storage/ndb/src/kernel/SimBlockList.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/SimBlockList.cpp 2011-11-16 16:23:37 +0000 @@ -52,7 +52,6 @@ #include #include #include -#include #include #ifndef VM_TRACE @@ -91,6 +90,10 @@ void * operator new (size_t sz, SIMBLOCK void SimBlockList::load(EmulatorData& data){ noOfBlocks = NO_OF_BLOCKS; +#define THR 1 +#ifndef THR + noOfBlocks--; +#endif theList = new SimulatedBlock * [noOfBlocks]; if (!theList) { @@ -162,15 +165,14 @@ SimBlockList::load(EmulatorData& data){ theList[20] = NEW_BLOCK(Dbspj)(ctx); else theList[20] = NEW_BLOCK(DbspjProxy)(ctx); +#ifdef THR if (NdbIsMultiThreaded() == false) theList[21] = NEW_BLOCK(Thrman)(ctx); else theList[21] = NEW_BLOCK(ThrmanProxy)(ctx); - if (NdbIsMultiThreaded() == false) - theList[22] = NEW_BLOCK(Trpman)(ctx); - else - theList[22] = NEW_BLOCK(TrpmanProxy)(ctx); - assert(NO_OF_BLOCKS == 23); + + assert(NO_OF_BLOCKS == 22); +#endif // Check that all blocks could be created for (int i = 0; i < noOfBlocks; i++) === modified file 'storage/ndb/src/kernel/blocks/CMakeLists.txt' --- a/storage/ndb/src/kernel/blocks/CMakeLists.txt 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/blocks/CMakeLists.txt 2011-11-16 16:23:37 +0000 @@ -73,8 +73,7 @@ ADD_LIBRARY(ndbblocks STATIC PgmanProxy.cpp dbtup/DbtupClient.cpp ${EXTRA_SRC} - thrman.cpp - trpman.cpp) + thrman.cpp) MYSQL_ADD_EXECUTABLE(ndb_print_file print_file.cpp === modified file 'storage/ndb/src/kernel/blocks/Makefile.am' --- a/storage/ndb/src/kernel/blocks/Makefile.am 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/blocks/Makefile.am 2011-11-16 16:23:37 +0000 @@ -69,8 +69,7 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp PgmanProxy.cpp \ dbtup/DbtupClient.cpp \ dbtc/DbtcProxy.cpp \ - thrman.cpp \ - trpman.cpp + thrman.cpp ndbtools_PROGRAMS = ndb_print_file ndb_print_file_SOURCES = print_file.cpp diskpage.cpp dbtup/tuppage.cpp === modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp' --- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-11-16 16:23:37 +0000 @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -32,9 +33,13 @@ #include #include #include +#include #include #include #include +#include +#include +#include #include #include #include @@ -86,11 +91,17 @@ Cmvmi::Cmvmi(Block_context& ctx) : mt_set_section_chunk_size(); // Add received signals + addRecSignal(GSN_CONNECT_REP, &Cmvmi::execCONNECT_REP); + addRecSignal(GSN_DISCONNECT_REP, &Cmvmi::execDISCONNECT_REP); + addRecSignal(GSN_NDB_TAMPER, &Cmvmi::execNDB_TAMPER, true); addRecSignal(GSN_SET_LOGLEVELORD, &Cmvmi::execSET_LOGLEVELORD); addRecSignal(GSN_EVENT_REP, &Cmvmi::execEVENT_REP); addRecSignal(GSN_STTOR, &Cmvmi::execSTTOR); addRecSignal(GSN_READ_CONFIG_REQ, &Cmvmi::execREAD_CONFIG_REQ); + addRecSignal(GSN_CLOSE_COMREQ, &Cmvmi::execCLOSE_COMREQ); + addRecSignal(GSN_ENABLE_COMREQ, &Cmvmi::execENABLE_COMREQ); + addRecSignal(GSN_OPEN_COMREQ, &Cmvmi::execOPEN_COMREQ); addRecSignal(GSN_TEST_ORD, &Cmvmi::execTEST_ORD); addRecSignal(GSN_TAMPER_ORD, &Cmvmi::execTAMPER_ORD); @@ -98,8 +109,6 @@ Cmvmi::Cmvmi(Block_context& ctx) : addRecSignal(GSN_START_ORD, &Cmvmi::execSTART_ORD); addRecSignal(GSN_EVENT_SUBSCRIBE_REQ, &Cmvmi::execEVENT_SUBSCRIBE_REQ); - addRecSignal(GSN_CANCEL_SUBSCRIPTION_REQ, - &Cmvmi::execCANCEL_SUBSCRIPTION_REQ); addRecSignal(GSN_DUMP_STATE_ORD, &Cmvmi::execDUMP_STATE_ORD); @@ -107,6 +116,7 @@ Cmvmi::Cmvmi(Block_context& ctx) : addRecSignal(GSN_NODE_START_REP, &Cmvmi::execNODE_START_REP, true); addRecSignal(GSN_CONTINUEB, &Cmvmi::execCONTINUEB); + addRecSignal(GSN_ROUTE_ORD, &Cmvmi::execROUTE_ORD); addRecSignal(GSN_DBINFO_SCANREQ, &Cmvmi::execDBINFO_SCANREQ); addRecSignal(GSN_SYNC_REQ, &Cmvmi::execSYNC_REQ, true); @@ -169,6 +179,11 @@ Cmvmi::~Cmvmi() m_shared_page_pool.clear(); } +#ifdef ERROR_INSERT +NodeBitmask c_error_9000_nodes_mask; +extern Uint32 MAX_RECEIVED_SIGNALS; +#endif + void Cmvmi::execNDB_TAMPER(Signal* signal) { jamEntry(); @@ -197,12 +212,21 @@ void Cmvmi::execNDB_TAMPER(Signal* signa } #endif +#ifdef ERROR_INSERT if (signal->theData[0] == 9003) { - // Migrated to TRPMAN + if (MAX_RECEIVED_SIGNALS < 1024) + { + MAX_RECEIVED_SIGNALS = 1024; + } + else + { + MAX_RECEIVED_SIGNALS = 1 + (rand() % 128); + } + ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS); CLEAR_ERROR_INSERT_VALUE; - sendSignal(TRPMAN_REF, GSN_NDB_TAMPER, signal, signal->getLength(),JBB); } +#endif }//execNDB_TAMPER() static Uint32 blocks[] = @@ -226,7 +250,6 @@ static Uint32 blocks[] = PGMAN_REF, DBINFO_REF, DBSPJ_REF, - TRPMAN_REF, 0 }; @@ -608,12 +631,11 @@ Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * } void -Cmvmi::execCANCEL_SUBSCRIPTION_REQ(Signal *signal){ +Cmvmi::cancelSubscription(NodeId nodeId){ SubscriberPtr ptr; - NodeId nodeId = signal->theData[0]; - subscribers.first(ptr); + while(ptr.i != RNIL){ Uint32 i = ptr.i; BlockReference blockRef = ptr.p->blockRef; @@ -816,20 +838,17 @@ void Cmvmi::execSTTOR(Signal* signal) #ifdef ERROR_INSERT if (ERROR_INSERTED(9004)) { - Uint32 tmp[25]; Uint32 len = signal->getLength(); - memcpy(tmp, signal->theData, sizeof(tmp)); - Uint32 db = c_dbNodes.find(0); if (db == getOwnNodeId()) db = c_dbNodes.find(db); - - DumpStateOrd * ord = (DumpStateOrd *)&signal->theData[0]; - ord->args[0] = 9005; // Active 9004 - ord->args[1] = db; - sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, 2, JBB); + Uint32 i = c_error_9000_nodes_mask.find(0); + Uint32 tmp[25]; + memcpy(tmp, signal->theData, sizeof(tmp)); + signal->theData[0] = i; + sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA); + ndbout_c("stopping %u using %u", i, db); CLEAR_ERROR_INSERT_VALUE; - memcpy(signal->theData, tmp, sizeof(tmp)); sendSignalWithDelay(reference(), GSN_STTOR, signal, 100, len); @@ -841,6 +860,220 @@ void Cmvmi::execSTTOR(Signal* signal) } } +void Cmvmi::execCLOSE_COMREQ(Signal* signal) +{ + // Close communication with the node and halt input/output from + // other blocks than QMGR + + CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0]; + + const BlockReference userRef = closeCom->xxxBlockRef; + Uint32 requestType = closeCom->requestType; + Uint32 failNo = closeCom->failNo; +// Uint32 noOfNodes = closeCom->noOfNodes; + + jamEntry(); + for (unsigned i = 0; i < MAX_NODES; i++) + { + if(NodeBitmask::get(closeCom->theNodes, i)) + { + jam(); + + //----------------------------------------------------- + // Report that the connection to the node is closed + //----------------------------------------------------- + signal->theData[0] = NDB_LE_CommunicationClosed; + signal->theData[1] = i; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); + + globalTransporterRegistry.setIOState(i, HaltIO); + globalTransporterRegistry.do_disconnect(i); + } + } + if (requestType != CloseComReqConf::RT_NO_REPLY) + { + ndbassert((requestType == CloseComReqConf::RT_API_FAILURE) || + ((requestType == CloseComReqConf::RT_NODE_FAILURE) && + (failNo != 0))); + jam(); + CloseComReqConf* closeComConf = (CloseComReqConf *)signal->getDataPtrSend(); + closeComConf->xxxBlockRef = userRef; + closeComConf->requestType = requestType; + closeComConf->failNo = failNo; + + /* Note assumption that noOfNodes and theNodes + * bitmap is not trampled above + * signals received from the remote node. + */ + sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA); + } +} + +void Cmvmi::execOPEN_COMREQ(Signal* signal) +{ + // Connect to the specifed NDB node, only QMGR allowed communication + // so far with the node + + const BlockReference userRef = signal->theData[0]; + Uint32 tStartingNode = signal->theData[1]; + Uint32 tData2 = signal->theData[2]; + jamEntry(); + + const Uint32 len = signal->getLength(); + if(len == 2) + { +#ifdef ERROR_INSERT + if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002)) + && c_error_9000_nodes_mask.get(tStartingNode))) +#endif + { + globalTransporterRegistry.do_connect(tStartingNode); + globalTransporterRegistry.setIOState(tStartingNode, HaltIO); + + //----------------------------------------------------- + // Report that the connection to the node is opened + //----------------------------------------------------- + signal->theData[0] = NDB_LE_CommunicationOpened; + signal->theData[1] = tStartingNode; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); + //----------------------------------------------------- + } + } else { + for(unsigned int i = 1; i < MAX_NODES; i++ ) + { + jam(); + if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2) + { + jam(); + +#ifdef ERROR_INSERT + if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002)) + && c_error_9000_nodes_mask.get(i)) + continue; +#endif + globalTransporterRegistry.do_connect(i); + globalTransporterRegistry.setIOState(i, HaltIO); + + signal->theData[0] = NDB_LE_CommunicationOpened; + signal->theData[1] = i; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); + } + } + } + + if (userRef != 0) + { + jam(); + signal->theData[0] = tStartingNode; + signal->theData[1] = tData2; + sendSignal(userRef, GSN_OPEN_COMCONF, signal, len - 1,JBA); + } +} + +void Cmvmi::execENABLE_COMREQ(Signal* signal) +{ + jamEntry(); + const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr(); + + /* Need to copy out signal data to not clobber it with sendSignal(). */ + Uint32 senderRef = enableComReq->m_senderRef; + Uint32 senderData = enableComReq->m_senderData; + Uint32 nodes[NodeBitmask::Size]; + MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size); + + /* Enable communication with all our NDB blocks to these nodes. */ + Uint32 search_from = 0; + for (;;) + { + Uint32 tStartingNode = NodeBitmask::find(nodes, search_from); + if (tStartingNode == NodeBitmask::NotFound) + break; + search_from = tStartingNode + 1; + + globalTransporterRegistry.setIOState(tStartingNode, NoHalt); + setNodeInfo(tStartingNode).m_connected = true; + + //----------------------------------------------------- + // Report that the version of the node + //----------------------------------------------------- + signal->theData[0] = NDB_LE_ConnectedApiVersion; + signal->theData[1] = tStartingNode; + signal->theData[2] = getNodeInfo(tStartingNode).m_version; + signal->theData[3] = getNodeInfo(tStartingNode).m_mysql_version; + + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB); + //----------------------------------------------------- + } + + EnableComConf *enableComConf = (EnableComConf *)signal->getDataPtrSend(); + enableComConf->m_senderRef = reference(); + enableComConf->m_senderData = senderData; + MEMCOPY_NO_WORDS(enableComConf->m_nodeIds, nodes, NodeBitmask::Size); + sendSignal(senderRef, GSN_ENABLE_COMCONF, signal, + EnableComConf::SignalLength, JBA); +} + +void Cmvmi::execDISCONNECT_REP(Signal *signal) +{ + const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0]; + const Uint32 hostId = rep->nodeId; + jamEntry(); + + setNodeInfo(hostId).m_connected = false; + setNodeInfo(hostId).m_connectCount++; + const NodeInfo::NodeType type = getNodeInfo(hostId).getType(); + ndbrequire(type != NodeInfo::INVALID); + + sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal, + DisconnectRep::SignalLength, JBA); + + cancelSubscription(hostId); + + signal->theData[0] = NDB_LE_Disconnected; + signal->theData[1] = hostId; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); +} + +void Cmvmi::execCONNECT_REP(Signal *signal){ + const Uint32 hostId = signal->theData[0]; + jamEntry(); + + const NodeInfo::NodeType type = (NodeInfo::NodeType)getNodeInfo(hostId).m_type; + ndbrequire(type != NodeInfo::INVALID); + globalData.m_nodeInfo[hostId].m_version = 0; + globalData.m_nodeInfo[hostId].m_mysql_version = 0; + + /** + * Inform QMGR that client has connected + */ + signal->theData[0] = hostId; + if (ERROR_INSERTED(9005)) + { + sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1); + } + else + { + sendSignal(QMGR_REF, GSN_CONNECT_REP, signal, 1, JBA); + } + + /* Automatically subscribe events for MGM nodes. + */ + if(type == NodeInfo::MGM) + { + jam(); + globalTransporterRegistry.setIOState(hostId, NoHalt); + } + + //------------------------------------------ + // Also report this event to the Event handler + //------------------------------------------ + signal->theData[0] = NDB_LE_Connected; + signal->theData[1] = hostId; + signal->header.theLength = 2; + + execEVENT_REP(signal); +} + #ifdef VM_TRACE void modifySignalLogger(bool allBlocks, BlockNumber bno, @@ -1558,21 +1791,41 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal #ifdef ERROR_INSERT if (arg == 9000 || arg == 9002) { - // Migrated to TRPMAN - sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); + SET_ERROR_INSERT_VALUE(arg); + for (Uint32 i = 1; igetLength(); i++) + c_error_9000_nodes_mask.set(signal->theData[i]); } + if (arg == 9001) { - // Migrated to TRPMAN - sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); + CLEAR_ERROR_INSERT_VALUE; + if (signal->getLength() == 1 || signal->theData[1]) + { + for (Uint32 i = 0; itheData[0] = 0; + signal->theData[1] = i; + EXECUTE_DIRECT(CMVMI, GSN_OPEN_COMREQ, signal, 2); + } + } + } + c_error_9000_nodes_mask.clear(); } if (arg == 9004 && signal->getLength() == 2) { SET_ERROR_INSERT_VALUE(9004); + c_error_9000_nodes_mask.clear(); + c_error_9000_nodes_mask.set(signal->theData[1]); + } - // Migrated to TRPMAN - sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); + if (arg == 9004 && signal->getLength() == 2) + { + SET_ERROR_INSERT_VALUE(9004); + c_error_9000_nodes_mask.clear(); + c_error_9000_nodes_mask.set(signal->theData[1]); } #endif @@ -1619,20 +1872,87 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal if((arg == 9993) || /* Unblock recv from nodeid */ (arg == 9992)) /* Block recv from nodeid */ { - // Migrated to TRPMAN - sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); - } + bool block = (arg == 9992); + for (Uint32 n = 1; n < signal->getLength(); n++) + { + Uint32 nodeId = signal->theData[n]; + + if ((nodeId > 0) && + (nodeId < MAX_NODES)) + { + if (block) + { + ndbout_c("CMVMI : Blocking receive from node %u", nodeId); + + globalTransporterRegistry.blockReceive(nodeId); + } + else + { + ndbout_c("CMVMI : Unblocking receive from node %u", nodeId); + globalTransporterRegistry.unblockReceive(nodeId); + } + } + else + { + ndbout_c("CMVMI : Ignoring dump %u for node %u", + arg, nodeId); + } + } + } if (arg == 9990) /* Block recv from all ndbd matching pattern */ { - // Migrated to TRPMAN - sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); - } + Uint32 pattern = 0; + if (signal->getLength() > 1) + { + pattern = signal->theData[1]; + ndbout_c("CMVMI : Blocking receive from all ndbds matching pattern -%s-", + ((pattern == 1)? "Other side":"Unknown")); + } + for (Uint32 node = 1; node < MAX_NDB_NODES; node++) + { + if (globalTransporterRegistry.is_connected(node)) + { + if (getNodeInfo(node).m_type == NodeInfo::DB) + { + if (!globalTransporterRegistry.isBlocked(node)) + { + switch (pattern) + { + case 1: + { + /* Match if given node is on 'other side' of + * 2-replica cluster + */ + if ((getOwnNodeId() & 1) != (node & 1)) + { + /* Node is on the 'other side', match */ + break; + } + /* Node is on 'my side', don't match */ + continue; + } + default: + break; + } + ndbout_c("CMVMI : Blocking receive from node %u", node); + globalTransporterRegistry.blockReceive(node); + } + } + } + } + } if (arg == 9991) /* Unblock recv from all blocked */ { - // Migrated to TRPMAN - sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); + for (Uint32 node = 0; node < MAX_NODES; node++) + { + if (globalTransporterRegistry.isBlocked(node)) + { + ndbout_c("CMVMI : Unblocking receive from node %u", node); + globalTransporterRegistry.unblockReceive(node); + } + } } #endif @@ -1731,6 +2051,44 @@ void Cmvmi::execDBINFO_SCANREQ(Signal *s jamEntry(); switch(req.tableId){ + case Ndbinfo::TRANSPORTERS_TABLEID: + { + jam(); + Uint32 rnode = cursor->data[0]; + if (rnode == 0) + rnode++; // Skip node 0 + + while(rnode < MAX_NODES) + { + switch(getNodeInfo(rnode).m_type) + { + default: + { + jam(); + Ndbinfo::Row row(signal, req); + row.write_uint32(getOwnNodeId()); // Node id + row.write_uint32(rnode); // Remote node id + row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State + ndbinfo_send_row(signal, req, row, rl); + break; + } + + case NodeInfo::INVALID: + jam(); + break; + } + + rnode++; + if (rl.need_break(req)) + { + jam(); + ndbinfo_send_scan_break(signal, req, rl, rnode); + return; + } + } + break; + } + case Ndbinfo::RESOURCES_TABLEID: { jam(); @@ -2777,6 +3135,69 @@ Cmvmi::reportIMUsage(Signal* signal, int sendSignal(ref, GSN_EVENT_REP, signal, 6, JBB); } + +/** + * execROUTE_ORD + * Allows other blocks to route signals as if they + * came from Cmvmi + * Useful in ndbmtd for synchronising signals w.r.t + * external signals received from other nodes which + * arrive from the same thread that runs CMVMI. + */ +void +Cmvmi::execROUTE_ORD(Signal* signal) +{ + jamEntry(); + if(!assembleFragments(signal)){ + jam(); + return; + } + + SectionHandle handle(this, signal); + + RouteOrd* ord = (RouteOrd*)signal->getDataPtr(); + Uint32 dstRef = ord->dstRef; + Uint32 srcRef = ord->srcRef; + Uint32 gsn = ord->gsn; + /* ord->cnt ignored */ + + Uint32 nodeId = refToNode(dstRef); + + if (likely((nodeId == 0) || + getNodeInfo(nodeId).m_connected)) + { + jam(); + Uint32 secCount = handle.m_cnt; + ndbrequire(secCount >= 1 && secCount <= 3); + + jamLine(secCount); + + /** + * Put section 0 in signal->theData + */ + Uint32 sigLen = handle.m_ptr[0].sz; + ndbrequire(sigLen <= 25); + copy(signal->theData, handle.m_ptr[0]); + + SegmentedSectionPtr save = handle.m_ptr[0]; + for (Uint32 i = 0; i < secCount - 1; i++) + handle.m_ptr[i] = handle.m_ptr[i+1]; + handle.m_cnt--; + + sendSignal(dstRef, gsn, signal, sigLen, JBB, &handle); + + handle.m_cnt = 1; + handle.m_ptr[0] = save; + releaseSections(handle); + return ; + } + + releaseSections(handle); + warningEvent("Unable to route GSN: %d from %x to %x", + gsn, srcRef, dstRef); +} + + void Cmvmi::execGET_CONFIG_REQ(Signal *signal) { jamEntry(); === modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp' --- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp 2011-11-16 16:23:37 +0000 @@ -18,6 +18,7 @@ #ifndef Cmvmi_H_ #define Cmvmi_H_ +#include #include #include @@ -32,6 +33,16 @@ public: virtual ~Cmvmi(); private: + /** + * These methods used to be reportXXX + * + * But they in a nasty way intefere with the execution model + * they been turned in to exec-Method used via prio A signals + */ + void execDISCONNECT_REP(Signal*); + void execCONNECT_REP(Signal*); + +private: BLOCK_DEFINES(Cmvmi); // The signal processing functions @@ -40,6 +51,9 @@ private: void execEVENT_REP(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal); void execSTTOR(Signal* signal); + void execCLOSE_COMREQ(Signal* signal); + void execENABLE_COMREQ(Signal* signal); + void execOPEN_COMREQ(Signal* signal); void execSIZEALT_ACK(Signal* signal); void execTEST_ORD(Signal* signal); @@ -50,13 +64,15 @@ private: void execDUMP_STATE_ORD(Signal* signal); void execEVENT_SUBSCRIBE_REQ(Signal *); - void execCANCEL_SUBSCRIPTION_REQ(Signal *); + void cancelSubscription(NodeId nodeId); void execTESTSIG(Signal* signal); void execNODE_START_REP(Signal* signal); void execCONTINUEB(Signal* signal); + void execROUTE_ORD(Signal* signal); + void execDBINFO_SCANREQ(Signal *signal); void execALLOC_MEM_REF(Signal*); === modified file 'storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp' --- a/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2011-11-16 16:23:37 +0000 @@ -29,7 +29,7 @@ Uint32 dbinfo_blocks[] = { DBACC, DBTUP, BACKUP, DBTC, SUMA, DBUTIL, TRIX, DBTUX, DBDICT, CMVMI, DBLQH, LGMAN, - PGMAN, DBSPJ, THRMAN, TRPMAN, 0}; + PGMAN, DBSPJ, THRMAN, 0}; Dbinfo::Dbinfo(Block_context& ctx) : SimulatedBlock(DBINFO, ctx) === modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp' --- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-11-16 16:23:37 +0000 @@ -606,7 +606,7 @@ void Qmgr::execCM_INFOCONF(Signal* signa signal->theData[0] = 0; // no answer signal->theData[1] = 0; // no id signal->theData[2] = NodeInfo::DB; - sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 3, JBB); + sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 3, JBB); cpresident = ZNIL; cpresidentAlive = ZFALSE; @@ -1987,7 +1987,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR nodePtr.p->failState = NORMAL; signal->theData[0] = 0; signal->theData[1] = nodePtr.i; - sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 2, JBB); + sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA); #endif return; case ZSTARTING: @@ -2143,8 +2143,8 @@ void Qmgr::execCM_ADD(Signal* signal) enableComReq->m_senderData = ENABLE_COM_CM_ADD_COMMIT; NodeBitmask::clear(enableComReq->m_nodeIds); NodeBitmask::set(enableComReq->m_nodeIds, addNodePtr.i); - sendSignal(TRPMAN_REF, GSN_ENABLE_COMREQ, signal, - EnableComReq::SignalLength, JBB); + sendSignal(CMVMI_REF, GSN_ENABLE_COMREQ, signal, + EnableComReq::SignalLength, JBA); break; } case CmAdd::CommitNew: @@ -2249,8 +2249,8 @@ Qmgr::joinedCluster(Signal* signal, Node if (!NodeBitmask::isclear(enableComReq->m_nodeIds)) { jam(); - sendSignal(TRPMAN_REF, GSN_ENABLE_COMREQ, signal, - EnableComReq::SignalLength, JBB); + sendSignal(CMVMI_REF, GSN_ENABLE_COMREQ, signal, + EnableComReq::SignalLength, JBA); } else { @@ -2924,7 +2924,7 @@ void Qmgr::checkStartInterface(Signal* s setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; signal->theData[0] = 0; signal->theData[1] = nodePtr.i; - sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 2, JBB); + sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA); } else { @@ -3019,7 +3019,6 @@ void Qmgr::sendApiFailReq(Signal* signal RouteOrd* routeOrd = (RouteOrd*) &signal->theData[0]; routeOrd->srcRef = reference(); routeOrd->gsn = GSN_API_FAILREQ; - routeOrd->from = failedNodeNo; NodeRecPtr failedNodePtr; failedNodePtr.i = failedNodeNo; @@ -3039,19 +3038,19 @@ void Qmgr::sendApiFailReq(Signal* signal jam(); add_failconf_block(failedNodePtr, DBTC); routeOrd->dstRef = DBTC_REF; - sendSignalNoRelease(TRPMAN_REF, GSN_ROUTE_ORD, signal, + sendSignalNoRelease(CMVMI_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); add_failconf_block(failedNodePtr, DBDICT); routeOrd->dstRef = DBDICT_REF; - sendSignalNoRelease(TRPMAN_REF, GSN_ROUTE_ORD, signal, + sendSignalNoRelease(CMVMI_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); add_failconf_block(failedNodePtr, DBSPJ); routeOrd->dstRef = DBSPJ_REF; - sendSignalNoRelease(TRPMAN_REF, GSN_ROUTE_ORD, signal, + sendSignalNoRelease(CMVMI_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); } @@ -3059,7 +3058,7 @@ void Qmgr::sendApiFailReq(Signal* signal /* Suma always notified */ add_failconf_block(failedNodePtr, SUMA); routeOrd->dstRef = SUMA_REF; - sendSignal(TRPMAN_REF, GSN_ROUTE_ORD, signal, + sendSignal(CMVMI_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); }//Qmgr::sendApiFailReq() @@ -3418,8 +3417,8 @@ void Qmgr::node_failed(Signal* signal, U closeCom->noOfNodes = 1; NodeBitmask::clear(closeCom->theNodes); NodeBitmask::set(closeCom->theNodes, failedNodePtr.i); - sendSignal(TRPMAN_REF, GSN_CLOSE_COMREQ, signal, - CloseComReqConf::SignalLength, JBB); + sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, + CloseComReqConf::SignalLength, JBA); }//if return; } @@ -3487,8 +3486,8 @@ Qmgr::api_failed(Signal* signal, Uint32 closeCom->noOfNodes = 1; NodeBitmask::clear(closeCom->theNodes); NodeBitmask::set(closeCom->theNodes, failedNodePtr.i); - sendSignal(TRPMAN_REF, GSN_CLOSE_COMREQ, signal, - CloseComReqConf::SignalLength, JBB); + sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, + CloseComReqConf::SignalLength, JBA); } // api_failed /**-------------------------------------------------------------------------- @@ -3597,8 +3596,8 @@ void Qmgr::execAPI_REGREQ(Signal* signal enableComReq->m_senderData = ENABLE_COM_API_REGREQ; NodeBitmask::clear(enableComReq->m_nodeIds); NodeBitmask::set(enableComReq->m_nodeIds, apiNodePtr.i); - sendSignal(TRPMAN_REF, GSN_ENABLE_COMREQ, signal, - EnableComReq::SignalLength, JBB); + sendSignal(CMVMI_REF, GSN_ENABLE_COMREQ, signal, + EnableComReq::SignalLength, JBA); return; } } @@ -4997,8 +4996,8 @@ void Qmgr::sendCloseComReq(Signal* signa NodeBitmask::set(closeCom->theNodes, nodeId); } - sendSignal(TRPMAN_REF, GSN_CLOSE_COMREQ, signal, - CloseComReqConf::SignalLength, JBB); + sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, + CloseComReqConf::SignalLength, JBA); }//Qmgr::sendCloseComReq() === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-11-16 16:23:37 +0000 @@ -1030,7 +1030,7 @@ Configuration::setLockCPU(NdbThread * pT } else if (!NdbIsMultiThreaded()) { - BlockNumber list[] = { DBDIH }; + BlockNumber list[] = { CMVMI }; res = m_thr_config.do_bind(pThread, list, 1); } === modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp' --- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2011-11-16 16:23:37 +0000 @@ -374,7 +374,7 @@ TransporterCallbackKernel::reportConnect signal.header.theLength = 1; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); - signal.header.theReceiversBlockNumber = TRPMAN; + signal.header.theReceiversBlockNumber = CMVMI; signal.header.theVerId_signalNumber = GSN_CONNECT_REP; signal.theData[0] = nodeId; @@ -405,7 +405,7 @@ TransporterCallbackKernel::reportDisconn signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); signal.header.theTrace = TestOrd::TraceDisconnect; signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP; - signal.header.theReceiversBlockNumber = TRPMAN; + signal.header.theReceiversBlockNumber = CMVMI; DisconnectRep * rep = CAST_PTR(DisconnectRep, &signal.theData[0]); rep->nodeId = nodeId; === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-11-16 16:23:37 +0000 @@ -2603,7 +2603,7 @@ mt_init_thr_map() /* Keep mt-classic assignments in MT LQH. */ const Uint32 thr_GLOBAL = 0; const Uint32 thr_LOCAL = 1; - //const Uint32 thr_RECEIVER = receiver_thread_no; + const Uint32 thr_RECEIVER = receiver_thread_no; add_thr_map(BACKUP, 0, thr_LOCAL); add_thr_map(DBTC, 0, thr_GLOBAL); @@ -2615,7 +2615,7 @@ mt_init_thr_map() add_thr_map(NDBCNTR, 0, thr_GLOBAL); add_thr_map(QMGR, 0, thr_GLOBAL); add_thr_map(NDBFS, 0, thr_GLOBAL); - add_thr_map(CMVMI, 0, thr_GLOBAL); + add_thr_map(CMVMI, 0, thr_RECEIVER); add_thr_map(TRIX, 0, thr_GLOBAL); add_thr_map(DBUTIL, 0, thr_GLOBAL); add_thr_map(SUMA, 0, thr_LOCAL); @@ -2627,7 +2627,6 @@ mt_init_thr_map() add_thr_map(DBINFO, 0, thr_LOCAL); add_thr_map(DBSPJ, 0, thr_GLOBAL); add_thr_map(THRMAN, 0, thr_GLOBAL); - add_thr_map(TRPMAN, 0, thr_GLOBAL); } Uint32 @@ -2649,8 +2648,6 @@ mt_get_instance_count(Uint32 block) case DBSPJ: return globalData.ndbMtTcThreads; break; - case TRPMAN: - return 1; case THRMAN: return num_threads; default: @@ -2688,9 +2685,6 @@ mt_add_thr_map(Uint32 block, Uint32 inst case DBSPJ: thr_no += num_lqh_threads + (instance - 1); break; - case TRPMAN: - thr_no = receiver_thread_no; - break; case THRMAN: thr_no = instance - 1; break; @@ -2821,9 +2815,14 @@ Uint32 receiverThreadId; * As part of the receive loop, we also periodically call update_connections() * (this way we are similar to single-threaded ndbd). * - * The TRPMAN block (and no other blocks) run in the same thread as this + * The CMVMI block (and no other blocks) run in the same thread as this * receive loop; this way we avoid races between update_connections() and - * TRPMAN calls into the transporters. + * CMVMI calls into the transporters. + * + * Note that with this setup, local signals to CMVMI cannot wake up the thread + * if it is sleeping on the receive sockets. Thus CMVMI local signal processing + * can be (slightly) delayed, however CMVMI is not really performance critical + * (hopefully). */ extern "C" void * === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-11-16 15:38:25 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-11-16 16:23:37 +0000 @@ -928,6 +928,10 @@ THRConfigApplier::find_thread(const unsi { return &m_threads[T_REP][instanceNo]; } + else if ((instanceNo = findBlock(CMVMI, instancelist, cnt)) >= 0) + { + return &m_threads[T_RECV][instanceNo]; + } else if ((instanceNo = findBlock(DBDIH, instancelist, cnt)) >= 0) { return &m_threads[T_MAIN][instanceNo]; @@ -940,10 +944,6 @@ THRConfigApplier::find_thread(const unsi { return &m_threads[T_LDM][instanceNo - 1]; // remove proxy... } - else if ((instanceNo = findBlock(TRPMAN, instancelist, cnt)) >= 0) - { - return &m_threads[T_RECV][instanceNo - 1]; // remove proxy - } return 0; } No bundle (reason: useless for push emails).