From: Jonas Oreland Date: November 16 2011 3:39pm Subject: bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4668 to 4669) List-Archive: http://lists.mysql.com/commits/141996 Message-Id: <20111116153907.E8855165C7@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4669 Jonas Oreland 2011-11-16 ndb - this patch moves transporter handling logic out from cmvmi into trpman (transporter manager) added: storage/ndb/src/kernel/blocks/trpman.cpp storage/ndb/src/kernel/blocks/trpman.hpp modified: mysql-test/suite/ndb/r/ndbinfo.result mysql-test/suite/ndb/r/ndbinfo_dump.result storage/ndb/include/kernel/BlockNumbers.h storage/ndb/include/kernel/GlobalSignalNumbers.h storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp storage/ndb/include/kernel/signaldata/DisconnectRep.hpp storage/ndb/include/kernel/signaldata/EnableCom.hpp storage/ndb/include/kernel/signaldata/RouteOrd.hpp storage/ndb/src/common/debugger/BlockNames.cpp storage/ndb/src/kernel/SimBlockList.cpp storage/ndb/src/kernel/blocks/CMakeLists.txt storage/ndb/src/kernel/blocks/Makefile.am storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/kernel/vm/TransporterCallback.cpp storage/ndb/src/kernel/vm/mt.cpp storage/ndb/src/kernel/vm/mt_thr_config.cpp 4668 Jonas Oreland 2011-11-16 ndb - further cleanups wrt to CFG_DB_NO_REDOLOG_PARTS modified: storage/ndb/include/kernel/ndb_limits.h storage/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp storage/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp storage/ndb/src/kernel/blocks/ndbfs/Ndbfs.cpp storage/ndb/src/kernel/ndbd.cpp === modified file 'mysql-test/suite/ndb/r/ndbinfo.result' --- a/mysql-test/suite/ndb/r/ndbinfo.result 2011-10-28 09:56:57 +0000 +++ b/mysql-test/suite/ndb/r/ndbinfo.result 2011-11-16 15:38:25 +0000 @@ -347,6 +347,7 @@ RESTORE SUMA THRMAN TRIX +TRPMAN TSMAN desc threadstat; Field Type Null Key Default Extra === modified file 'mysql-test/suite/ndb/r/ndbinfo_dump.result' --- a/mysql-test/suite/ndb/r/ndbinfo_dump.result 2011-10-07 09:28:24 +0000 +++ b/mysql-test/suite/ndb/r/ndbinfo_dump.result 2011-11-16 15:38:25 +0000 @@ -1,7 +1,7 @@ USE ndbinfo; select count(*) from blocks; count(*) -22 +23 select count(*) from blocks; count(*) -22 +23 === modified file 'storage/ndb/include/kernel/BlockNumbers.h' --- a/storage/ndb/include/kernel/BlockNumbers.h 2011-10-07 08:07:21 +0000 +++ b/storage/ndb/include/kernel/BlockNumbers.h 2011-11-16 15:38:25 +0000 @@ -61,6 +61,7 @@ #define DBINFO 0x107 #define DBSPJ 0x108 #define THRMAN 0x109 +#define TRPMAN 0x10A const BlockReference BACKUP_REF = numberToRef(BACKUP, 0); const BlockReference DBTC_REF = numberToRef(DBTC, 0); @@ -84,6 +85,7 @@ const BlockReference RESTORE_REF = numbe const BlockReference DBINFO_REF = numberToRef(DBINFO, 0); const BlockReference DBSPJ_REF = numberToRef(DBSPJ, 0); const BlockReference THRMAN_REF = numberToRef(THRMAN, 0); +const BlockReference TRPMAN_REF = numberToRef(TRPMAN, 0); static inline void __hide_warnings_unused_ref_vars(void) { // Hide annoying warnings about unused variables @@ -94,11 +96,11 @@ static inline void __hide_warnings_unuse (void)DBUTIL_REF; (void)SUMA_REF; (void)DBTUX_REF; (void)TSMAN_REF; (void)LGMAN_REF; (void)PGMAN_REF; (void)RESTORE_REF; (void)DBINFO_REF; (void)DBSPJ_REF; - (void)THRMAN_REF; + (void)THRMAN_REF; (void)TRPMAN_REF; } const BlockNumber MIN_BLOCK_NO = BACKUP; -const BlockNumber MAX_BLOCK_NO = THRMAN; +const BlockNumber MAX_BLOCK_NO = TRPMAN; const BlockNumber NO_OF_BLOCKS = (MAX_BLOCK_NO - MIN_BLOCK_NO + 1); /** === modified file 'storage/ndb/include/kernel/GlobalSignalNumbers.h' --- a/storage/ndb/include/kernel/GlobalSignalNumbers.h 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/include/kernel/GlobalSignalNumbers.h 2011-11-16 15:38:25 +0000 @@ -592,7 +592,7 @@ extern const GlobalSignalNumber NO_OF_SI #define GSN_NODE_PING_REQ 461 /* distr. */ #define GSN_NODE_PING_CONF 462 /* distr. */ -/* 463 unused */ +#define GSN_CANCEL_SUBSCRIPTION_REQ 463 /* 464 unused */ #define GSN_DUMP_STATE_ORD 465 === modified file 'storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp' --- a/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp 2011-06-30 15:55:35 +0000 +++ b/storage/ndb/include/kernel/signaldata/CloseComReqConf.hpp 2011-11-16 15:38:25 +0000 @@ -34,7 +34,7 @@ class CloseComReqConf { * Sender(s) / Reciver(s) */ friend class Qmgr; - friend class Cmvmi; + friend class Trpman; /** * For printing === modified file 'storage/ndb/include/kernel/signaldata/DisconnectRep.hpp' --- a/storage/ndb/include/kernel/signaldata/DisconnectRep.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/kernel/signaldata/DisconnectRep.hpp 2011-11-16 15:38:25 +0000 @@ -29,7 +29,7 @@ struct DisconnectRep * Receiver(s) */ friend class Qmgr; - friend class Cmvmi; // Cmvmi + friend class Trpman; friend class ClusterMgr; /** === modified file 'storage/ndb/include/kernel/signaldata/EnableCom.hpp' --- a/storage/ndb/include/kernel/signaldata/EnableCom.hpp 2008-05-23 09:26:56 +0000 +++ b/storage/ndb/include/kernel/signaldata/EnableCom.hpp 2011-11-16 15:38:25 +0000 @@ -20,7 +20,7 @@ class EnableComReq { friend class Qmgr; - friend class Cmvmi; + friend class Trpman; public: STATIC_CONST( SignalLength = 2 + NodeBitmask::Size ); @@ -33,7 +33,7 @@ private: class EnableComConf { friend class Qmgr; - friend class Cmvmi; + friend class Trpman; public: STATIC_CONST( SignalLength = 2 + NodeBitmask::Size ); === modified file 'storage/ndb/include/kernel/signaldata/RouteOrd.hpp' --- a/storage/ndb/include/kernel/signaldata/RouteOrd.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/include/kernel/signaldata/RouteOrd.hpp 2011-11-16 15:38:25 +0000 @@ -25,13 +25,17 @@ /** * Request to allocate node id */ -struct RouteOrd { +struct RouteOrd +{ STATIC_CONST( SignalLength = 4 ); Uint32 dstRef; Uint32 srcRef; Uint32 gsn; - Uint32 cnt; + union { + Uint32 cnt; + Uint32 from; + }; }; #endif === modified file 'storage/ndb/src/common/debugger/BlockNames.cpp' --- a/storage/ndb/src/common/debugger/BlockNames.cpp 2011-10-07 08:07:21 +0000 +++ b/storage/ndb/src/common/debugger/BlockNames.cpp 2011-11-16 15:38:25 +0000 @@ -41,6 +41,7 @@ const BlockName BlockNames[] = { ,{ "DBINFO", DBINFO } ,{ "DBSPJ", DBSPJ } ,{ "THRMAN", THRMAN } + ,{ "TRPMAN", TRPMAN } }; const BlockNumber NO_OF_BLOCK_NAMES = sizeof(BlockNames) / sizeof(BlockName); === modified file 'storage/ndb/src/kernel/SimBlockList.cpp' --- a/storage/ndb/src/kernel/SimBlockList.cpp 2011-10-07 08:07:21 +0000 +++ b/storage/ndb/src/kernel/SimBlockList.cpp 2011-11-16 15:38:25 +0000 @@ -52,6 +52,7 @@ #include #include #include +#include #include #ifndef VM_TRACE @@ -90,10 +91,6 @@ void * operator new (size_t sz, SIMBLOCK void SimBlockList::load(EmulatorData& data){ noOfBlocks = NO_OF_BLOCKS; -#define THR 1 -#ifndef THR - noOfBlocks--; -#endif theList = new SimulatedBlock * [noOfBlocks]; if (!theList) { @@ -165,14 +162,15 @@ SimBlockList::load(EmulatorData& data){ theList[20] = NEW_BLOCK(Dbspj)(ctx); else theList[20] = NEW_BLOCK(DbspjProxy)(ctx); -#ifdef THR if (NdbIsMultiThreaded() == false) theList[21] = NEW_BLOCK(Thrman)(ctx); else theList[21] = NEW_BLOCK(ThrmanProxy)(ctx); - - assert(NO_OF_BLOCKS == 22); -#endif + if (NdbIsMultiThreaded() == false) + theList[22] = NEW_BLOCK(Trpman)(ctx); + else + theList[22] = NEW_BLOCK(TrpmanProxy)(ctx); + assert(NO_OF_BLOCKS == 23); // Check that all blocks could be created for (int i = 0; i < noOfBlocks; i++) === modified file 'storage/ndb/src/kernel/blocks/CMakeLists.txt' --- a/storage/ndb/src/kernel/blocks/CMakeLists.txt 2011-10-07 08:07:21 +0000 +++ b/storage/ndb/src/kernel/blocks/CMakeLists.txt 2011-11-16 15:38:25 +0000 @@ -73,7 +73,8 @@ ADD_LIBRARY(ndbblocks STATIC PgmanProxy.cpp dbtup/DbtupClient.cpp ${EXTRA_SRC} - thrman.cpp) + thrman.cpp + trpman.cpp) MYSQL_ADD_EXECUTABLE(ndb_print_file print_file.cpp === modified file 'storage/ndb/src/kernel/blocks/Makefile.am' --- a/storage/ndb/src/kernel/blocks/Makefile.am 2011-10-07 08:07:21 +0000 +++ b/storage/ndb/src/kernel/blocks/Makefile.am 2011-11-16 15:38:25 +0000 @@ -69,7 +69,8 @@ libblocks_a_SOURCES = tsman.cpp lgman.cp PgmanProxy.cpp \ dbtup/DbtupClient.cpp \ dbtc/DbtcProxy.cpp \ - thrman.cpp + thrman.cpp \ + trpman.cpp ndbtools_PROGRAMS = ndb_print_file ndb_print_file_SOURCES = print_file.cpp diskpage.cpp dbtup/tuppage.cpp === modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp' --- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-10-14 13:24:26 +0000 +++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.cpp 2011-11-16 15:38:25 +0000 @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -33,13 +32,9 @@ #include #include #include -#include #include #include #include -#include -#include -#include #include #include #include @@ -91,17 +86,11 @@ Cmvmi::Cmvmi(Block_context& ctx) : mt_set_section_chunk_size(); // Add received signals - addRecSignal(GSN_CONNECT_REP, &Cmvmi::execCONNECT_REP); - addRecSignal(GSN_DISCONNECT_REP, &Cmvmi::execDISCONNECT_REP); - addRecSignal(GSN_NDB_TAMPER, &Cmvmi::execNDB_TAMPER, true); addRecSignal(GSN_SET_LOGLEVELORD, &Cmvmi::execSET_LOGLEVELORD); addRecSignal(GSN_EVENT_REP, &Cmvmi::execEVENT_REP); addRecSignal(GSN_STTOR, &Cmvmi::execSTTOR); addRecSignal(GSN_READ_CONFIG_REQ, &Cmvmi::execREAD_CONFIG_REQ); - addRecSignal(GSN_CLOSE_COMREQ, &Cmvmi::execCLOSE_COMREQ); - addRecSignal(GSN_ENABLE_COMREQ, &Cmvmi::execENABLE_COMREQ); - addRecSignal(GSN_OPEN_COMREQ, &Cmvmi::execOPEN_COMREQ); addRecSignal(GSN_TEST_ORD, &Cmvmi::execTEST_ORD); addRecSignal(GSN_TAMPER_ORD, &Cmvmi::execTAMPER_ORD); @@ -109,6 +98,8 @@ Cmvmi::Cmvmi(Block_context& ctx) : addRecSignal(GSN_START_ORD, &Cmvmi::execSTART_ORD); addRecSignal(GSN_EVENT_SUBSCRIBE_REQ, &Cmvmi::execEVENT_SUBSCRIBE_REQ); + addRecSignal(GSN_CANCEL_SUBSCRIPTION_REQ, + &Cmvmi::execCANCEL_SUBSCRIPTION_REQ); addRecSignal(GSN_DUMP_STATE_ORD, &Cmvmi::execDUMP_STATE_ORD); @@ -116,7 +107,6 @@ Cmvmi::Cmvmi(Block_context& ctx) : addRecSignal(GSN_NODE_START_REP, &Cmvmi::execNODE_START_REP, true); addRecSignal(GSN_CONTINUEB, &Cmvmi::execCONTINUEB); - addRecSignal(GSN_ROUTE_ORD, &Cmvmi::execROUTE_ORD); addRecSignal(GSN_DBINFO_SCANREQ, &Cmvmi::execDBINFO_SCANREQ); addRecSignal(GSN_SYNC_REQ, &Cmvmi::execSYNC_REQ, true); @@ -179,11 +169,6 @@ Cmvmi::~Cmvmi() m_shared_page_pool.clear(); } -#ifdef ERROR_INSERT -NodeBitmask c_error_9000_nodes_mask; -extern Uint32 MAX_RECEIVED_SIGNALS; -#endif - void Cmvmi::execNDB_TAMPER(Signal* signal) { jamEntry(); @@ -212,21 +197,12 @@ void Cmvmi::execNDB_TAMPER(Signal* signa } #endif -#ifdef ERROR_INSERT if (signal->theData[0] == 9003) { - if (MAX_RECEIVED_SIGNALS < 1024) - { - MAX_RECEIVED_SIGNALS = 1024; - } - else - { - MAX_RECEIVED_SIGNALS = 1 + (rand() % 128); - } - ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS); + // Migrated to TRPMAN CLEAR_ERROR_INSERT_VALUE; + sendSignal(TRPMAN_REF, GSN_NDB_TAMPER, signal, signal->getLength(),JBB); } -#endif }//execNDB_TAMPER() static Uint32 blocks[] = @@ -250,6 +226,7 @@ static Uint32 blocks[] = PGMAN_REF, DBINFO_REF, DBSPJ_REF, + TRPMAN_REF, 0 }; @@ -631,11 +608,12 @@ Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * } void -Cmvmi::cancelSubscription(NodeId nodeId){ +Cmvmi::execCANCEL_SUBSCRIPTION_REQ(Signal *signal){ SubscriberPtr ptr; + NodeId nodeId = signal->theData[0]; + subscribers.first(ptr); - while(ptr.i != RNIL){ Uint32 i = ptr.i; BlockReference blockRef = ptr.p->blockRef; @@ -838,17 +816,20 @@ void Cmvmi::execSTTOR(Signal* signal) #ifdef ERROR_INSERT if (ERROR_INSERTED(9004)) { + Uint32 tmp[25]; Uint32 len = signal->getLength(); + memcpy(tmp, signal->theData, sizeof(tmp)); + Uint32 db = c_dbNodes.find(0); if (db == getOwnNodeId()) db = c_dbNodes.find(db); - Uint32 i = c_error_9000_nodes_mask.find(0); - Uint32 tmp[25]; - memcpy(tmp, signal->theData, sizeof(tmp)); - signal->theData[0] = i; - sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA); - ndbout_c("stopping %u using %u", i, db); + + DumpStateOrd * ord = (DumpStateOrd *)&signal->theData[0]; + ord->args[0] = 9005; // Active 9004 + ord->args[1] = db; + sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, 2, JBB); CLEAR_ERROR_INSERT_VALUE; + memcpy(signal->theData, tmp, sizeof(tmp)); sendSignalWithDelay(reference(), GSN_STTOR, signal, 100, len); @@ -860,220 +841,6 @@ void Cmvmi::execSTTOR(Signal* signal) } } -void Cmvmi::execCLOSE_COMREQ(Signal* signal) -{ - // Close communication with the node and halt input/output from - // other blocks than QMGR - - CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0]; - - const BlockReference userRef = closeCom->xxxBlockRef; - Uint32 requestType = closeCom->requestType; - Uint32 failNo = closeCom->failNo; -// Uint32 noOfNodes = closeCom->noOfNodes; - - jamEntry(); - for (unsigned i = 0; i < MAX_NODES; i++) - { - if(NodeBitmask::get(closeCom->theNodes, i)) - { - jam(); - - //----------------------------------------------------- - // Report that the connection to the node is closed - //----------------------------------------------------- - signal->theData[0] = NDB_LE_CommunicationClosed; - signal->theData[1] = i; - sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); - - globalTransporterRegistry.setIOState(i, HaltIO); - globalTransporterRegistry.do_disconnect(i); - } - } - if (requestType != CloseComReqConf::RT_NO_REPLY) - { - ndbassert((requestType == CloseComReqConf::RT_API_FAILURE) || - ((requestType == CloseComReqConf::RT_NODE_FAILURE) && - (failNo != 0))); - jam(); - CloseComReqConf* closeComConf = (CloseComReqConf *)signal->getDataPtrSend(); - closeComConf->xxxBlockRef = userRef; - closeComConf->requestType = requestType; - closeComConf->failNo = failNo; - - /* Note assumption that noOfNodes and theNodes - * bitmap is not trampled above - * signals received from the remote node. - */ - sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA); - } -} - -void Cmvmi::execOPEN_COMREQ(Signal* signal) -{ - // Connect to the specifed NDB node, only QMGR allowed communication - // so far with the node - - const BlockReference userRef = signal->theData[0]; - Uint32 tStartingNode = signal->theData[1]; - Uint32 tData2 = signal->theData[2]; - jamEntry(); - - const Uint32 len = signal->getLength(); - if(len == 2) - { -#ifdef ERROR_INSERT - if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002)) - && c_error_9000_nodes_mask.get(tStartingNode))) -#endif - { - globalTransporterRegistry.do_connect(tStartingNode); - globalTransporterRegistry.setIOState(tStartingNode, HaltIO); - - //----------------------------------------------------- - // Report that the connection to the node is opened - //----------------------------------------------------- - signal->theData[0] = NDB_LE_CommunicationOpened; - signal->theData[1] = tStartingNode; - sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); - //----------------------------------------------------- - } - } else { - for(unsigned int i = 1; i < MAX_NODES; i++ ) - { - jam(); - if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2) - { - jam(); - -#ifdef ERROR_INSERT - if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002)) - && c_error_9000_nodes_mask.get(i)) - continue; -#endif - globalTransporterRegistry.do_connect(i); - globalTransporterRegistry.setIOState(i, HaltIO); - - signal->theData[0] = NDB_LE_CommunicationOpened; - signal->theData[1] = i; - sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); - } - } - } - - if (userRef != 0) - { - jam(); - signal->theData[0] = tStartingNode; - signal->theData[1] = tData2; - sendSignal(userRef, GSN_OPEN_COMCONF, signal, len - 1,JBA); - } -} - -void Cmvmi::execENABLE_COMREQ(Signal* signal) -{ - jamEntry(); - const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr(); - - /* Need to copy out signal data to not clobber it with sendSignal(). */ - Uint32 senderRef = enableComReq->m_senderRef; - Uint32 senderData = enableComReq->m_senderData; - Uint32 nodes[NodeBitmask::Size]; - MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size); - - /* Enable communication with all our NDB blocks to these nodes. */ - Uint32 search_from = 0; - for (;;) - { - Uint32 tStartingNode = NodeBitmask::find(nodes, search_from); - if (tStartingNode == NodeBitmask::NotFound) - break; - search_from = tStartingNode + 1; - - globalTransporterRegistry.setIOState(tStartingNode, NoHalt); - setNodeInfo(tStartingNode).m_connected = true; - - //----------------------------------------------------- - // Report that the version of the node - //----------------------------------------------------- - signal->theData[0] = NDB_LE_ConnectedApiVersion; - signal->theData[1] = tStartingNode; - signal->theData[2] = getNodeInfo(tStartingNode).m_version; - signal->theData[3] = getNodeInfo(tStartingNode).m_mysql_version; - - sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB); - //----------------------------------------------------- - } - - EnableComConf *enableComConf = (EnableComConf *)signal->getDataPtrSend(); - enableComConf->m_senderRef = reference(); - enableComConf->m_senderData = senderData; - MEMCOPY_NO_WORDS(enableComConf->m_nodeIds, nodes, NodeBitmask::Size); - sendSignal(senderRef, GSN_ENABLE_COMCONF, signal, - EnableComConf::SignalLength, JBA); -} - -void Cmvmi::execDISCONNECT_REP(Signal *signal) -{ - const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0]; - const Uint32 hostId = rep->nodeId; - jamEntry(); - - setNodeInfo(hostId).m_connected = false; - setNodeInfo(hostId).m_connectCount++; - const NodeInfo::NodeType type = getNodeInfo(hostId).getType(); - ndbrequire(type != NodeInfo::INVALID); - - sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal, - DisconnectRep::SignalLength, JBA); - - cancelSubscription(hostId); - - signal->theData[0] = NDB_LE_Disconnected; - signal->theData[1] = hostId; - sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); -} - -void Cmvmi::execCONNECT_REP(Signal *signal){ - const Uint32 hostId = signal->theData[0]; - jamEntry(); - - const NodeInfo::NodeType type = (NodeInfo::NodeType)getNodeInfo(hostId).m_type; - ndbrequire(type != NodeInfo::INVALID); - globalData.m_nodeInfo[hostId].m_version = 0; - globalData.m_nodeInfo[hostId].m_mysql_version = 0; - - /** - * Inform QMGR that client has connected - */ - signal->theData[0] = hostId; - if (ERROR_INSERTED(9005)) - { - sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1); - } - else - { - sendSignal(QMGR_REF, GSN_CONNECT_REP, signal, 1, JBA); - } - - /* Automatically subscribe events for MGM nodes. - */ - if(type == NodeInfo::MGM) - { - jam(); - globalTransporterRegistry.setIOState(hostId, NoHalt); - } - - //------------------------------------------ - // Also report this event to the Event handler - //------------------------------------------ - signal->theData[0] = NDB_LE_Connected; - signal->theData[1] = hostId; - signal->header.theLength = 2; - - execEVENT_REP(signal); -} - #ifdef VM_TRACE void modifySignalLogger(bool allBlocks, BlockNumber bno, @@ -1791,41 +1558,21 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal #ifdef ERROR_INSERT if (arg == 9000 || arg == 9002) { - SET_ERROR_INSERT_VALUE(arg); - for (Uint32 i = 1; igetLength(); i++) - c_error_9000_nodes_mask.set(signal->theData[i]); + // Migrated to TRPMAN + sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); } - if (arg == 9001) { - CLEAR_ERROR_INSERT_VALUE; - if (signal->getLength() == 1 || signal->theData[1]) - { - for (Uint32 i = 0; itheData[0] = 0; - signal->theData[1] = i; - EXECUTE_DIRECT(CMVMI, GSN_OPEN_COMREQ, signal, 2); - } - } - } - c_error_9000_nodes_mask.clear(); + // Migrated to TRPMAN + sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); } if (arg == 9004 && signal->getLength() == 2) { SET_ERROR_INSERT_VALUE(9004); - c_error_9000_nodes_mask.clear(); - c_error_9000_nodes_mask.set(signal->theData[1]); - } - if (arg == 9004 && signal->getLength() == 2) - { - SET_ERROR_INSERT_VALUE(9004); - c_error_9000_nodes_mask.clear(); - c_error_9000_nodes_mask.set(signal->theData[1]); + // Migrated to TRPMAN + sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); } #endif @@ -1872,87 +1619,20 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal if((arg == 9993) || /* Unblock recv from nodeid */ (arg == 9992)) /* Block recv from nodeid */ { - bool block = (arg == 9992); - for (Uint32 n = 1; n < signal->getLength(); n++) - { - Uint32 nodeId = signal->theData[n]; - - if ((nodeId > 0) && - (nodeId < MAX_NODES)) - { - if (block) - { - ndbout_c("CMVMI : Blocking receive from node %u", nodeId); - - globalTransporterRegistry.blockReceive(nodeId); - } - else - { - ndbout_c("CMVMI : Unblocking receive from node %u", nodeId); - - globalTransporterRegistry.unblockReceive(nodeId); - } - } - else - { - ndbout_c("CMVMI : Ignoring dump %u for node %u", - arg, nodeId); - } - } + // Migrated to TRPMAN + sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); } + if (arg == 9990) /* Block recv from all ndbd matching pattern */ { - Uint32 pattern = 0; - if (signal->getLength() > 1) - { - pattern = signal->theData[1]; - ndbout_c("CMVMI : Blocking receive from all ndbds matching pattern -%s-", - ((pattern == 1)? "Other side":"Unknown")); - } - - for (Uint32 node = 1; node < MAX_NDB_NODES; node++) - { - if (globalTransporterRegistry.is_connected(node)) - { - if (getNodeInfo(node).m_type == NodeInfo::DB) - { - if (!globalTransporterRegistry.isBlocked(node)) - { - switch (pattern) - { - case 1: - { - /* Match if given node is on 'other side' of - * 2-replica cluster - */ - if ((getOwnNodeId() & 1) != (node & 1)) - { - /* Node is on the 'other side', match */ - break; - } - /* Node is on 'my side', don't match */ - continue; - } - default: - break; - } - ndbout_c("CMVMI : Blocking receive from node %u", node); - globalTransporterRegistry.blockReceive(node); - } - } - } - } + // Migrated to TRPMAN + sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); } + if (arg == 9991) /* Unblock recv from all blocked */ { - for (Uint32 node = 0; node < MAX_NODES; node++) - { - if (globalTransporterRegistry.isBlocked(node)) - { - ndbout_c("CMVMI : Unblocking receive from node %u", node); - globalTransporterRegistry.unblockReceive(node); - } - } + // Migrated to TRPMAN + sendSignal(TRPMAN_REF, GSN_DUMP_STATE_ORD, signal, signal->getLength(),JBB); } #endif @@ -2051,44 +1731,6 @@ void Cmvmi::execDBINFO_SCANREQ(Signal *s jamEntry(); switch(req.tableId){ - case Ndbinfo::TRANSPORTERS_TABLEID: - { - jam(); - Uint32 rnode = cursor->data[0]; - if (rnode == 0) - rnode++; // Skip node 0 - - while(rnode < MAX_NODES) - { - switch(getNodeInfo(rnode).m_type) - { - default: - { - jam(); - Ndbinfo::Row row(signal, req); - row.write_uint32(getOwnNodeId()); // Node id - row.write_uint32(rnode); // Remote node id - row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State - ndbinfo_send_row(signal, req, row, rl); - break; - } - - case NodeInfo::INVALID: - jam(); - break; - } - - rnode++; - if (rl.need_break(req)) - { - jam(); - ndbinfo_send_scan_break(signal, req, rl, rnode); - return; - } - } - break; - } - case Ndbinfo::RESOURCES_TABLEID: { jam(); @@ -3135,69 +2777,6 @@ Cmvmi::reportIMUsage(Signal* signal, int sendSignal(ref, GSN_EVENT_REP, signal, 6, JBB); } - -/** - * execROUTE_ORD - * Allows other blocks to route signals as if they - * came from Cmvmi - * Useful in ndbmtd for synchronising signals w.r.t - * external signals received from other nodes which - * arrive from the same thread that runs CMVMI. - */ -void -Cmvmi::execROUTE_ORD(Signal* signal) -{ - jamEntry(); - if(!assembleFragments(signal)){ - jam(); - return; - } - - SectionHandle handle(this, signal); - - RouteOrd* ord = (RouteOrd*)signal->getDataPtr(); - Uint32 dstRef = ord->dstRef; - Uint32 srcRef = ord->srcRef; - Uint32 gsn = ord->gsn; - /* ord->cnt ignored */ - - Uint32 nodeId = refToNode(dstRef); - - if (likely((nodeId == 0) || - getNodeInfo(nodeId).m_connected)) - { - jam(); - Uint32 secCount = handle.m_cnt; - ndbrequire(secCount >= 1 && secCount <= 3); - - jamLine(secCount); - - /** - * Put section 0 in signal->theData - */ - Uint32 sigLen = handle.m_ptr[0].sz; - ndbrequire(sigLen <= 25); - copy(signal->theData, handle.m_ptr[0]); - - SegmentedSectionPtr save = handle.m_ptr[0]; - for (Uint32 i = 0; i < secCount - 1; i++) - handle.m_ptr[i] = handle.m_ptr[i+1]; - handle.m_cnt--; - - sendSignal(dstRef, gsn, signal, sigLen, JBB, &handle); - - handle.m_cnt = 1; - handle.m_ptr[0] = save; - releaseSections(handle); - return ; - } - - releaseSections(handle); - warningEvent("Unable to route GSN: %d from %x to %x", - gsn, srcRef, dstRef); -} - - void Cmvmi::execGET_CONFIG_REQ(Signal *signal) { jamEntry(); === modified file 'storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp' --- a/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp 2011-07-04 13:37:56 +0000 +++ b/storage/ndb/src/kernel/blocks/cmvmi/Cmvmi.hpp 2011-11-16 15:38:25 +0000 @@ -18,7 +18,6 @@ #ifndef Cmvmi_H_ #define Cmvmi_H_ -#include #include #include @@ -33,16 +32,6 @@ public: virtual ~Cmvmi(); private: - /** - * These methods used to be reportXXX - * - * But they in a nasty way intefere with the execution model - * they been turned in to exec-Method used via prio A signals - */ - void execDISCONNECT_REP(Signal*); - void execCONNECT_REP(Signal*); - -private: BLOCK_DEFINES(Cmvmi); // The signal processing functions @@ -51,9 +40,6 @@ private: void execEVENT_REP(Signal* signal); void execREAD_CONFIG_REQ(Signal* signal); void execSTTOR(Signal* signal); - void execCLOSE_COMREQ(Signal* signal); - void execENABLE_COMREQ(Signal* signal); - void execOPEN_COMREQ(Signal* signal); void execSIZEALT_ACK(Signal* signal); void execTEST_ORD(Signal* signal); @@ -64,15 +50,13 @@ private: void execDUMP_STATE_ORD(Signal* signal); void execEVENT_SUBSCRIBE_REQ(Signal *); - void cancelSubscription(NodeId nodeId); + void execCANCEL_SUBSCRIPTION_REQ(Signal *); void execTESTSIG(Signal* signal); void execNODE_START_REP(Signal* signal); void execCONTINUEB(Signal* signal); - void execROUTE_ORD(Signal* signal); - void execDBINFO_SCANREQ(Signal *signal); void execALLOC_MEM_REF(Signal*); === modified file 'storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp' --- a/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2011-10-07 08:07:21 +0000 +++ b/storage/ndb/src/kernel/blocks/dbinfo/Dbinfo.cpp 2011-11-16 15:38:25 +0000 @@ -29,7 +29,7 @@ Uint32 dbinfo_blocks[] = { DBACC, DBTUP, BACKUP, DBTC, SUMA, DBUTIL, TRIX, DBTUX, DBDICT, CMVMI, DBLQH, LGMAN, - PGMAN, DBSPJ, THRMAN, 0}; + PGMAN, DBSPJ, THRMAN, TRPMAN, 0}; Dbinfo::Dbinfo(Block_context& ctx) : SimulatedBlock(DBINFO, ctx) === modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp' --- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-10-20 12:35:16 +0000 +++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp 2011-11-16 15:38:25 +0000 @@ -606,7 +606,7 @@ void Qmgr::execCM_INFOCONF(Signal* signa signal->theData[0] = 0; // no answer signal->theData[1] = 0; // no id signal->theData[2] = NodeInfo::DB; - sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 3, JBB); + sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 3, JBB); cpresident = ZNIL; cpresidentAlive = ZFALSE; @@ -1987,7 +1987,7 @@ Qmgr::cmAddPrepare(Signal* signal, NodeR nodePtr.p->failState = NORMAL; signal->theData[0] = 0; signal->theData[1] = nodePtr.i; - sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA); + sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 2, JBB); #endif return; case ZSTARTING: @@ -2143,8 +2143,8 @@ void Qmgr::execCM_ADD(Signal* signal) enableComReq->m_senderData = ENABLE_COM_CM_ADD_COMMIT; NodeBitmask::clear(enableComReq->m_nodeIds); NodeBitmask::set(enableComReq->m_nodeIds, addNodePtr.i); - sendSignal(CMVMI_REF, GSN_ENABLE_COMREQ, signal, - EnableComReq::SignalLength, JBA); + sendSignal(TRPMAN_REF, GSN_ENABLE_COMREQ, signal, + EnableComReq::SignalLength, JBB); break; } case CmAdd::CommitNew: @@ -2249,8 +2249,8 @@ Qmgr::joinedCluster(Signal* signal, Node if (!NodeBitmask::isclear(enableComReq->m_nodeIds)) { jam(); - sendSignal(CMVMI_REF, GSN_ENABLE_COMREQ, signal, - EnableComReq::SignalLength, JBA); + sendSignal(TRPMAN_REF, GSN_ENABLE_COMREQ, signal, + EnableComReq::SignalLength, JBB); } else { @@ -2924,7 +2924,7 @@ void Qmgr::checkStartInterface(Signal* s setNodeInfo(nodePtr.i).m_heartbeat_cnt= 0; signal->theData[0] = 0; signal->theData[1] = nodePtr.i; - sendSignal(CMVMI_REF, GSN_OPEN_COMREQ, signal, 2, JBA); + sendSignal(TRPMAN_REF, GSN_OPEN_COMREQ, signal, 2, JBB); } else { @@ -3019,6 +3019,7 @@ void Qmgr::sendApiFailReq(Signal* signal RouteOrd* routeOrd = (RouteOrd*) &signal->theData[0]; routeOrd->srcRef = reference(); routeOrd->gsn = GSN_API_FAILREQ; + routeOrd->from = failedNodeNo; NodeRecPtr failedNodePtr; failedNodePtr.i = failedNodeNo; @@ -3038,19 +3039,19 @@ void Qmgr::sendApiFailReq(Signal* signal jam(); add_failconf_block(failedNodePtr, DBTC); routeOrd->dstRef = DBTC_REF; - sendSignalNoRelease(CMVMI_REF, GSN_ROUTE_ORD, signal, + sendSignalNoRelease(TRPMAN_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); add_failconf_block(failedNodePtr, DBDICT); routeOrd->dstRef = DBDICT_REF; - sendSignalNoRelease(CMVMI_REF, GSN_ROUTE_ORD, signal, + sendSignalNoRelease(TRPMAN_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); add_failconf_block(failedNodePtr, DBSPJ); routeOrd->dstRef = DBSPJ_REF; - sendSignalNoRelease(CMVMI_REF, GSN_ROUTE_ORD, signal, + sendSignalNoRelease(TRPMAN_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); } @@ -3058,7 +3059,7 @@ void Qmgr::sendApiFailReq(Signal* signal /* Suma always notified */ add_failconf_block(failedNodePtr, SUMA); routeOrd->dstRef = SUMA_REF; - sendSignal(CMVMI_REF, GSN_ROUTE_ORD, signal, + sendSignal(TRPMAN_REF, GSN_ROUTE_ORD, signal, RouteOrd::SignalLength, JBA, &handle); }//Qmgr::sendApiFailReq() @@ -3417,8 +3418,8 @@ void Qmgr::node_failed(Signal* signal, U closeCom->noOfNodes = 1; NodeBitmask::clear(closeCom->theNodes); NodeBitmask::set(closeCom->theNodes, failedNodePtr.i); - sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, - CloseComReqConf::SignalLength, JBA); + sendSignal(TRPMAN_REF, GSN_CLOSE_COMREQ, signal, + CloseComReqConf::SignalLength, JBB); }//if return; } @@ -3486,8 +3487,8 @@ Qmgr::api_failed(Signal* signal, Uint32 closeCom->noOfNodes = 1; NodeBitmask::clear(closeCom->theNodes); NodeBitmask::set(closeCom->theNodes, failedNodePtr.i); - sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, - CloseComReqConf::SignalLength, JBA); + sendSignal(TRPMAN_REF, GSN_CLOSE_COMREQ, signal, + CloseComReqConf::SignalLength, JBB); } // api_failed /**-------------------------------------------------------------------------- @@ -3596,8 +3597,8 @@ void Qmgr::execAPI_REGREQ(Signal* signal enableComReq->m_senderData = ENABLE_COM_API_REGREQ; NodeBitmask::clear(enableComReq->m_nodeIds); NodeBitmask::set(enableComReq->m_nodeIds, apiNodePtr.i); - sendSignal(CMVMI_REF, GSN_ENABLE_COMREQ, signal, - EnableComReq::SignalLength, JBA); + sendSignal(TRPMAN_REF, GSN_ENABLE_COMREQ, signal, + EnableComReq::SignalLength, JBB); return; } } @@ -4996,8 +4997,8 @@ void Qmgr::sendCloseComReq(Signal* signa NodeBitmask::set(closeCom->theNodes, nodeId); } - sendSignal(CMVMI_REF, GSN_CLOSE_COMREQ, signal, - CloseComReqConf::SignalLength, JBA); + sendSignal(TRPMAN_REF, GSN_CLOSE_COMREQ, signal, + CloseComReqConf::SignalLength, JBB); }//Qmgr::sendCloseComReq() === added file 'storage/ndb/src/kernel/blocks/trpman.cpp' --- a/storage/ndb/src/kernel/blocks/trpman.cpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/trpman.cpp 2011-11-16 15:38:25 +0000 @@ -0,0 +1,648 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "trpman.hpp" +#include +#include +#include +#include +#include +#include + +Trpman::Trpman(Block_context & ctx, Uint32 instanceno) : + SimulatedBlock(TRPMAN, ctx, instanceno) +{ + BLOCK_CONSTRUCTOR(Trpman); + + addRecSignal(GSN_CLOSE_COMREQ, &Trpman::execCLOSE_COMREQ); + addRecSignal(GSN_OPEN_COMREQ, &Trpman::execOPEN_COMREQ); + addRecSignal(GSN_ENABLE_COMREQ, &Trpman::execENABLE_COMREQ); + addRecSignal(GSN_DISCONNECT_REP, &Trpman::execDISCONNECT_REP); + addRecSignal(GSN_CONNECT_REP, &Trpman::execCONNECT_REP); + addRecSignal(GSN_ROUTE_ORD, &Trpman::execROUTE_ORD); + + addRecSignal(GSN_NDB_TAMPER, &Trpman::execNDB_TAMPER, true); + addRecSignal(GSN_DUMP_STATE_ORD, &Trpman::execDUMP_STATE_ORD); + addRecSignal(GSN_DBINFO_SCANREQ, &Trpman::execDBINFO_SCANREQ); +} + +Trpman::~Trpman() +{ +} + +BLOCK_FUNCTIONS(Trpman) + +#ifdef ERROR_INSERT +NodeBitmask c_error_9000_nodes_mask; +extern Uint32 MAX_RECEIVED_SIGNALS; +#endif + +void +Trpman::execOPEN_COMREQ(Signal* signal) +{ + // Connect to the specifed NDB node, only QMGR allowed communication + // so far with the node + + const BlockReference userRef = signal->theData[0]; + Uint32 tStartingNode = signal->theData[1]; + Uint32 tData2 = signal->theData[2]; + jamEntry(); + + const Uint32 len = signal->getLength(); + if (len == 2) + { +#ifdef ERROR_INSERT + if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002)) + && c_error_9000_nodes_mask.get(tStartingNode))) +#endif + { + globalTransporterRegistry.do_connect(tStartingNode); + globalTransporterRegistry.setIOState(tStartingNode, HaltIO); + + //----------------------------------------------------- + // Report that the connection to the node is opened + //----------------------------------------------------- + signal->theData[0] = NDB_LE_CommunicationOpened; + signal->theData[1] = tStartingNode; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); + //----------------------------------------------------- + } + } + else + { + for(unsigned int i = 1; i < MAX_NODES; i++ ) + { + jam(); + if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2) + { + jam(); + +#ifdef ERROR_INSERT + if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002)) + && c_error_9000_nodes_mask.get(i)) + continue; +#endif + globalTransporterRegistry.do_connect(i); + globalTransporterRegistry.setIOState(i, HaltIO); + + signal->theData[0] = NDB_LE_CommunicationOpened; + signal->theData[1] = i; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); + } + } + } + + if (userRef != 0) + { + jam(); + signal->theData[0] = tStartingNode; + signal->theData[1] = tData2; + sendSignal(userRef, GSN_OPEN_COMCONF, signal, len - 1,JBA); + } +} + +void +Trpman::execCONNECT_REP(Signal *signal) +{ + const Uint32 hostId = signal->theData[0]; + jamEntry(); + + const NodeInfo::NodeType type = (NodeInfo::NodeType)getNodeInfo(hostId).m_type; + ndbrequire(type != NodeInfo::INVALID); + globalData.m_nodeInfo[hostId].m_version = 0; + globalData.m_nodeInfo[hostId].m_mysql_version = 0; + + /** + * Inform QMGR that client has connected + */ + signal->theData[0] = hostId; + if (ERROR_INSERTED(9005)) + { + sendSignalWithDelay(QMGR_REF, GSN_CONNECT_REP, signal, 50, 1); + } + else + { + sendSignal(QMGR_REF, GSN_CONNECT_REP, signal, 1, JBA); + } + + /* Automatically subscribe events for MGM nodes. + */ + if (type == NodeInfo::MGM) + { + jam(); + globalTransporterRegistry.setIOState(hostId, NoHalt); + } + + //------------------------------------------ + // Also report this event to the Event handler + //------------------------------------------ + signal->theData[0] = NDB_LE_Connected; + signal->theData[1] = hostId; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); +} + +void +Trpman::execCLOSE_COMREQ(Signal* signal) +{ + // Close communication with the node and halt input/output from + // other blocks than QMGR + + CloseComReqConf * const closeCom = (CloseComReqConf *)&signal->theData[0]; + + const BlockReference userRef = closeCom->xxxBlockRef; + Uint32 requestType = closeCom->requestType; + Uint32 failNo = closeCom->failNo; +// Uint32 noOfNodes = closeCom->noOfNodes; + + jamEntry(); + for (unsigned i = 0; i < MAX_NODES; i++) + { + if (NodeBitmask::get(closeCom->theNodes, i)) + { + jam(); + + //----------------------------------------------------- + // Report that the connection to the node is closed + //----------------------------------------------------- + signal->theData[0] = NDB_LE_CommunicationClosed; + signal->theData[1] = i; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); + + globalTransporterRegistry.setIOState(i, HaltIO); + globalTransporterRegistry.do_disconnect(i); + } + } + + if (requestType != CloseComReqConf::RT_NO_REPLY) + { + ndbassert((requestType == CloseComReqConf::RT_API_FAILURE) || + ((requestType == CloseComReqConf::RT_NODE_FAILURE) && + (failNo != 0))); + jam(); + CloseComReqConf* closeComConf = (CloseComReqConf *)signal->getDataPtrSend(); + closeComConf->xxxBlockRef = userRef; + closeComConf->requestType = requestType; + closeComConf->failNo = failNo; + + /* Note assumption that noOfNodes and theNodes + * bitmap is not trampled above + * signals received from the remote node. + */ + sendSignal(QMGR_REF, GSN_CLOSE_COMCONF, signal, 19, JBA); + } +} + +void +Trpman::execENABLE_COMREQ(Signal* signal) +{ + jamEntry(); + const EnableComReq *enableComReq = (const EnableComReq *)signal->getDataPtr(); + + /* Need to copy out signal data to not clobber it with sendSignal(). */ + Uint32 senderRef = enableComReq->m_senderRef; + Uint32 senderData = enableComReq->m_senderData; + Uint32 nodes[NodeBitmask::Size]; + MEMCOPY_NO_WORDS(nodes, enableComReq->m_nodeIds, NodeBitmask::Size); + + /* Enable communication with all our NDB blocks to these nodes. */ + Uint32 search_from = 0; + for (;;) + { + Uint32 tStartingNode = NodeBitmask::find(nodes, search_from); + if (tStartingNode == NodeBitmask::NotFound) + break; + search_from = tStartingNode + 1; + + globalTransporterRegistry.setIOState(tStartingNode, NoHalt); + setNodeInfo(tStartingNode).m_connected = true; + + //----------------------------------------------------- + // Report that the version of the node + //----------------------------------------------------- + signal->theData[0] = NDB_LE_ConnectedApiVersion; + signal->theData[1] = tStartingNode; + signal->theData[2] = getNodeInfo(tStartingNode).m_version; + signal->theData[3] = getNodeInfo(tStartingNode).m_mysql_version; + + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 4, JBB); + //----------------------------------------------------- + } + + EnableComConf *enableComConf = (EnableComConf *)signal->getDataPtrSend(); + enableComConf->m_senderRef = reference(); + enableComConf->m_senderData = senderData; + MEMCOPY_NO_WORDS(enableComConf->m_nodeIds, nodes, NodeBitmask::Size); + sendSignal(senderRef, GSN_ENABLE_COMCONF, signal, + EnableComConf::SignalLength, JBA); +} + +void +Trpman::execDISCONNECT_REP(Signal *signal) +{ + const DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0]; + const Uint32 hostId = rep->nodeId; + jamEntry(); + + setNodeInfo(hostId).m_connected = false; + setNodeInfo(hostId).m_connectCount++; + const NodeInfo::NodeType type = getNodeInfo(hostId).getType(); + ndbrequire(type != NodeInfo::INVALID); + + sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal, + DisconnectRep::SignalLength, JBA); + + signal->theData[0] = hostId; + sendSignal(CMVMI_REF, GSN_CANCEL_SUBSCRIPTION_REQ, signal, 1, JBB); + + signal->theData[0] = NDB_LE_Disconnected; + signal->theData[1] = hostId; + sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); +} + +/** + * execROUTE_ORD + * Allows other blocks to route signals as if they + * came from TRPMAN + * Useful in ndbmtd for synchronising signals w.r.t + * external signals received from other nodes which + * arrive from the same thread that runs TRPMAN + */ +void +Trpman::execROUTE_ORD(Signal* signal) +{ + jamEntry(); + if (!assembleFragments(signal)) + { + jam(); + return; + } + + SectionHandle handle(this, signal); + + RouteOrd* ord = (RouteOrd*)signal->getDataPtr(); + Uint32 dstRef = ord->dstRef; + Uint32 srcRef = ord->srcRef; + Uint32 gsn = ord->gsn; + /* ord->cnt ignored */ + + Uint32 nodeId = refToNode(dstRef); + + if (likely((nodeId == 0) || + getNodeInfo(nodeId).m_connected)) + { + jam(); + Uint32 secCount = handle.m_cnt; + ndbrequire(secCount >= 1 && secCount <= 3); + + jamLine(secCount); + + /** + * Put section 0 in signal->theData + */ + Uint32 sigLen = handle.m_ptr[0].sz; + ndbrequire(sigLen <= 25); + copy(signal->theData, handle.m_ptr[0]); + + SegmentedSectionPtr save = handle.m_ptr[0]; + for (Uint32 i = 0; i < secCount - 1; i++) + handle.m_ptr[i] = handle.m_ptr[i+1]; + handle.m_cnt--; + + sendSignal(dstRef, gsn, signal, sigLen, JBB, &handle); + + handle.m_cnt = 1; + handle.m_ptr[0] = save; + releaseSections(handle); + return ; + } + + releaseSections(handle); + warningEvent("Unable to route GSN: %d from %x to %x", + gsn, srcRef, dstRef); +} + +void +Trpman::execDBINFO_SCANREQ(Signal *signal) +{ + DbinfoScanReq req= *(DbinfoScanReq*)signal->theData; + const Ndbinfo::ScanCursor* cursor = + CAST_CONSTPTR(Ndbinfo::ScanCursor, DbinfoScan::getCursorPtr(&req)); + Ndbinfo::Ratelimit rl; + + jamEntry(); + + switch(req.tableId){ + case Ndbinfo::TRANSPORTERS_TABLEID: + { + jam(); + Uint32 rnode = cursor->data[0]; + if (rnode == 0) + rnode++; // Skip node 0 + + while (rnode < MAX_NODES) + { + switch(getNodeInfo(rnode).m_type) + { + default: + { + jam(); + Ndbinfo::Row row(signal, req); + row.write_uint32(getOwnNodeId()); // Node id + row.write_uint32(rnode); // Remote node id + row.write_uint32(globalTransporterRegistry.getPerformState(rnode)); // State + ndbinfo_send_row(signal, req, row, rl); + break; + } + + case NodeInfo::INVALID: + jam(); + break; + } + + rnode++; + if (rl.need_break(req)) + { + jam(); + ndbinfo_send_scan_break(signal, req, rl, rnode); + return; + } + } + break; + } + + default: + break; + } + + ndbinfo_send_scan_conf(signal, req, rl); +} + +void +Trpman::execNDB_TAMPER(Signal* signal) +{ + jamEntry(); +#ifdef ERROR_INSERT + if (signal->theData[0] == 9003) + { + if (MAX_RECEIVED_SIGNALS < 1024) + { + MAX_RECEIVED_SIGNALS = 1024; + } + else + { + MAX_RECEIVED_SIGNALS = 1 + (rand() % 128); + } + ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS); + CLEAR_ERROR_INSERT_VALUE; + } +#endif +}//execNDB_TAMPER() + +void +Trpman::execDUMP_STATE_ORD(Signal* signal) +{ + DumpStateOrd * const & dumpState = (DumpStateOrd *)&signal->theData[0]; + Uint32 arg = dumpState->args[0]; (void)arg; + +#ifdef ERROR_INSERT + if (arg == 9000 || arg == 9002) + { + SET_ERROR_INSERT_VALUE(arg); + for (Uint32 i = 1; igetLength(); i++) + c_error_9000_nodes_mask.set(signal->theData[i]); + } + + if (arg == 9001) + { + CLEAR_ERROR_INSERT_VALUE; + if (signal->getLength() == 1 || signal->theData[1]) + { + for (Uint32 i = 0; itheData[0] = 0; + signal->theData[1] = i; + execOPEN_COMREQ(signal); + } + } + } + c_error_9000_nodes_mask.clear(); + } + + if (arg == 9004 && signal->getLength() == 2) + { + SET_ERROR_INSERT_VALUE(9004); + c_error_9000_nodes_mask.clear(); + c_error_9000_nodes_mask.set(signal->theData[1]); + } + + if (arg == 9005 && signal->getLength() == 2 && ERROR_INSERTED(9004)) + { + Uint32 db = signal->theData[1]; + Uint32 i = c_error_9000_nodes_mask.find(0); + signal->theData[0] = i; + sendSignal(calcQmgrBlockRef(db),GSN_API_FAILREQ, signal, 1, JBA); + ndbout_c("stopping %u using %u", i, db); + CLEAR_ERROR_INSERT_VALUE; + } +#endif + +#ifdef ERROR_INSERT + /* dump 9992 + * On Target NodeId, block receiving signals from NodeId list + * + * dump 9993 + * On Target NodeId, resume receiving signals from NodeId list + * + * dump 9991 + * On Target NodeId, resume receiving signals from any blocked node + * + * + * See also code in QMGR for blocking receive from nodes based + * on HB roles. + * + */ + if((arg == 9993) || /* Unblock recv from nodeid */ + (arg == 9992)) /* Block recv from nodeid */ + { + bool block = (arg == 9992); + for (Uint32 n = 1; n < signal->getLength(); n++) + { + Uint32 nodeId = signal->theData[n]; + + if ((nodeId > 0) && + (nodeId < MAX_NODES)) + { + if (block) + { + ndbout_c("CMVMI : Blocking receive from node %u", nodeId); + + globalTransporterRegistry.blockReceive(nodeId); + } + else + { + ndbout_c("CMVMI : Unblocking receive from node %u", nodeId); + + globalTransporterRegistry.unblockReceive(nodeId); + } + } + else + { + ndbout_c("CMVMI : Ignoring dump %u for node %u", + arg, nodeId); + } + } + } + if (arg == 9990) /* Block recv from all ndbd matching pattern */ + { + Uint32 pattern = 0; + if (signal->getLength() > 1) + { + pattern = signal->theData[1]; + ndbout_c("CMVMI : Blocking receive from all ndbds matching pattern -%s-", + ((pattern == 1)? "Other side":"Unknown")); + } + + for (Uint32 node = 1; node < MAX_NDB_NODES; node++) + { + if (globalTransporterRegistry.is_connected(node)) + { + if (getNodeInfo(node).m_type == NodeInfo::DB) + { + if (!globalTransporterRegistry.isBlocked(node)) + { + switch (pattern) + { + case 1: + { + /* Match if given node is on 'other side' of + * 2-replica cluster + */ + if ((getOwnNodeId() & 1) != (node & 1)) + { + /* Node is on the 'other side', match */ + break; + } + /* Node is on 'my side', don't match */ + continue; + } + default: + break; + } + ndbout_c("CMVMI : Blocking receive from node %u", node); + globalTransporterRegistry.blockReceive(node); + } + } + } + } + } + if (arg == 9991) /* Unblock recv from all blocked */ + { + for (Uint32 node = 0; node < MAX_NODES; node++) + { + if (globalTransporterRegistry.isBlocked(node)) + { + ndbout_c("CMVMI : Unblocking receive from node %u", node); + globalTransporterRegistry.unblockReceive(node); + } + } + } +#endif +} + +TrpmanProxy::TrpmanProxy(Block_context & ctx) : + LocalProxy(TRPMAN, ctx) +{ + addRecSignal(GSN_CLOSE_COMREQ, &TrpmanProxy::execCLOSE_COMREQ); + addRecSignal(GSN_OPEN_COMREQ, &TrpmanProxy::execOPEN_COMREQ); + addRecSignal(GSN_ENABLE_COMREQ, &TrpmanProxy::execENABLE_COMREQ); + addRecSignal(GSN_DISCONNECT_REP, &TrpmanProxy::execDISCONNECT_REP); + addRecSignal(GSN_CONNECT_REP, &TrpmanProxy::execCONNECT_REP); + addRecSignal(GSN_ROUTE_ORD, &TrpmanProxy::execROUTE_ORD); +} + +TrpmanProxy::~TrpmanProxy() +{ +} + +SimulatedBlock* +TrpmanProxy::newWorker(Uint32 instanceNo) +{ + return new Trpman(m_ctx, instanceNo); +} + +BLOCK_FUNCTIONS(TrpmanProxy); + +/** + * TODO TrpmanProxy need to have operation records + * to support splicing a request onto several Trpman-instances + * according to how receive-threads are assigned to instances + */ +void +TrpmanProxy::execOPEN_COMREQ(Signal* signal) +{ + jamEntry(); + SectionHandle handle(this, signal); + sendSignal(workerRef(0), GSN_OPEN_COMREQ, signal, + signal->getLength(), JBB, &handle); +} + +void +TrpmanProxy::execCONNECT_REP(Signal *signal) +{ + jamEntry(); + SectionHandle handle(this, signal); + sendSignal(workerRef(0), GSN_CONNECT_REP, signal, + signal->getLength(), JBB, &handle); +} + +void +TrpmanProxy::execCLOSE_COMREQ(Signal* signal) +{ + jamEntry(); + SectionHandle handle(this, signal); + sendSignal(workerRef(0), GSN_CLOSE_COMREQ, signal, + signal->getLength(), JBB, &handle); +} + +void +TrpmanProxy::execENABLE_COMREQ(Signal* signal) +{ + jamEntry(); + SectionHandle handle(this, signal); + sendSignal(workerRef(0), GSN_ENABLE_COMREQ, signal, + signal->getLength(), JBB, &handle); +} + +void +TrpmanProxy::execDISCONNECT_REP(Signal *signal) +{ + jamEntry(); + SectionHandle handle(this, signal); + sendSignal(workerRef(0), GSN_DISCONNECT_REP, signal, + signal->getLength(), JBB, &handle); +} + +void +TrpmanProxy::execROUTE_ORD(Signal* signal) +{ + jamEntry(); + SectionHandle handle(this, signal); + sendSignal(workerRef(0), GSN_ROUTE_ORD, signal, + signal->getLength(), JBB, &handle); +} === added file 'storage/ndb/src/kernel/blocks/trpman.hpp' --- a/storage/ndb/src/kernel/blocks/trpman.hpp 1970-01-01 00:00:00 +0000 +++ b/storage/ndb/src/kernel/blocks/trpman.hpp 2011-11-16 15:38:25 +0000 @@ -0,0 +1,66 @@ +/* + Copyright (c) 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#ifndef TRPMAN_H +#define TRPMAN_H + +#include +#include +#include + +class Trpman : public SimulatedBlock +{ +public: + Trpman(Block_context& ctx, Uint32 instanceNumber = 0); + virtual ~Trpman(); + BLOCK_DEFINES(Trpman); + + void execCLOSE_COMREQ(Signal *signal); + void execOPEN_COMREQ(Signal *signal); + void execENABLE_COMREQ(Signal *signal); + void execDISCONNECT_REP(Signal *signal); + void execCONNECT_REP(Signal *signal); + void execROUTE_ORD(Signal* signal); + + void execDBINFO_SCANREQ(Signal*); + + void execNDB_TAMPER(Signal*); + void execDUMP_STATE_ORD(Signal*); +protected: + +}; + +class TrpmanProxy : public LocalProxy +{ +public: + TrpmanProxy(Block_context& ctx); + virtual ~TrpmanProxy(); + BLOCK_DEFINES(TrpmanProxy); + + void execCLOSE_COMREQ(Signal *signal); + void execOPEN_COMREQ(Signal *signal); + void execENABLE_COMREQ(Signal *signal); + void execDISCONNECT_REP(Signal *signal); + void execCONNECT_REP(Signal *signal); + void execROUTE_ORD(Signal* signal); + + void execNDB_TAMPER(Signal*); + void execDUMP_STATE_ORD(Signal*); +protected: + virtual SimulatedBlock* newWorker(Uint32 instanceNo); +}; +#endif === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2011-09-23 09:13:22 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2011-11-16 15:38:25 +0000 @@ -1030,7 +1030,7 @@ Configuration::setLockCPU(NdbThread * pT } else if (!NdbIsMultiThreaded()) { - BlockNumber list[] = { CMVMI }; + BlockNumber list[] = { DBDIH }; res = m_thr_config.do_bind(pThread, list, 1); } === modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp' --- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2011-11-16 15:38:25 +0000 @@ -374,7 +374,7 @@ TransporterCallbackKernel::reportConnect signal.header.theLength = 1; signal.header.theSendersSignalId = 0; signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); - signal.header.theReceiversBlockNumber = CMVMI; + signal.header.theReceiversBlockNumber = TRPMAN; signal.header.theVerId_signalNumber = GSN_CONNECT_REP; signal.theData[0] = nodeId; @@ -405,7 +405,7 @@ TransporterCallbackKernel::reportDisconn signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId); signal.header.theTrace = TestOrd::TraceDisconnect; signal.header.theVerId_signalNumber = GSN_DISCONNECT_REP; - signal.header.theReceiversBlockNumber = CMVMI; + signal.header.theReceiversBlockNumber = TRPMAN; DisconnectRep * rep = CAST_PTR(DisconnectRep, &signal.theData[0]); rep->nodeId = nodeId; === modified file 'storage/ndb/src/kernel/vm/mt.cpp' --- a/storage/ndb/src/kernel/vm/mt.cpp 2011-11-11 17:59:49 +0000 +++ b/storage/ndb/src/kernel/vm/mt.cpp 2011-11-16 15:38:25 +0000 @@ -2603,7 +2603,7 @@ mt_init_thr_map() /* Keep mt-classic assignments in MT LQH. */ const Uint32 thr_GLOBAL = 0; const Uint32 thr_LOCAL = 1; - const Uint32 thr_RECEIVER = receiver_thread_no; + //const Uint32 thr_RECEIVER = receiver_thread_no; add_thr_map(BACKUP, 0, thr_LOCAL); add_thr_map(DBTC, 0, thr_GLOBAL); @@ -2615,7 +2615,7 @@ mt_init_thr_map() add_thr_map(NDBCNTR, 0, thr_GLOBAL); add_thr_map(QMGR, 0, thr_GLOBAL); add_thr_map(NDBFS, 0, thr_GLOBAL); - add_thr_map(CMVMI, 0, thr_RECEIVER); + add_thr_map(CMVMI, 0, thr_GLOBAL); add_thr_map(TRIX, 0, thr_GLOBAL); add_thr_map(DBUTIL, 0, thr_GLOBAL); add_thr_map(SUMA, 0, thr_LOCAL); @@ -2627,6 +2627,7 @@ mt_init_thr_map() add_thr_map(DBINFO, 0, thr_LOCAL); add_thr_map(DBSPJ, 0, thr_GLOBAL); add_thr_map(THRMAN, 0, thr_GLOBAL); + add_thr_map(TRPMAN, 0, thr_GLOBAL); } Uint32 @@ -2648,6 +2649,8 @@ mt_get_instance_count(Uint32 block) case DBSPJ: return globalData.ndbMtTcThreads; break; + case TRPMAN: + return 1; case THRMAN: return num_threads; default: @@ -2685,6 +2688,9 @@ mt_add_thr_map(Uint32 block, Uint32 inst case DBSPJ: thr_no += num_lqh_threads + (instance - 1); break; + case TRPMAN: + thr_no = receiver_thread_no; + break; case THRMAN: thr_no = instance - 1; break; @@ -2815,14 +2821,9 @@ Uint32 receiverThreadId; * As part of the receive loop, we also periodically call update_connections() * (this way we are similar to single-threaded ndbd). * - * The CMVMI block (and no other blocks) run in the same thread as this + * The TRPMAN block (and no other blocks) run in the same thread as this * receive loop; this way we avoid races between update_connections() and - * CMVMI calls into the transporters. - * - * Note that with this setup, local signals to CMVMI cannot wake up the thread - * if it is sleeping on the receive sockets. Thus CMVMI local signal processing - * can be (slightly) delayed, however CMVMI is not really performance critical - * (hopefully). + * TRPMAN calls into the transporters. */ extern "C" void * === modified file 'storage/ndb/src/kernel/vm/mt_thr_config.cpp' --- a/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-10-07 08:07:21 +0000 +++ b/storage/ndb/src/kernel/vm/mt_thr_config.cpp 2011-11-16 15:38:25 +0000 @@ -928,10 +928,6 @@ THRConfigApplier::find_thread(const unsi { return &m_threads[T_REP][instanceNo]; } - else if ((instanceNo = findBlock(CMVMI, instancelist, cnt)) >= 0) - { - return &m_threads[T_RECV][instanceNo]; - } else if ((instanceNo = findBlock(DBDIH, instancelist, cnt)) >= 0) { return &m_threads[T_MAIN][instanceNo]; @@ -944,6 +940,10 @@ THRConfigApplier::find_thread(const unsi { return &m_threads[T_LDM][instanceNo - 1]; // remove proxy... } + else if ((instanceNo = findBlock(TRPMAN, instancelist, cnt)) >= 0) + { + return &m_threads[T_RECV][instanceNo - 1]; // remove proxy + } return 0; } No bundle (reason: useless for push emails).