From: Magnus Svensson Date: November 12 2008 8:17am Subject: bzr commit into mysql-5.1 branch (msvensson:3061) WL#4350 List-Archive: http://lists.mysql.com/commits/58520 Message-Id: <20081112081726.20DFD13412A@pilot> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit #At file:///home/msvensson/mysql/6.4-wl4350/ 3061 Magnus Svensson 2008-11-12 WL#4350 Reconfigure transporter Make it possible to run IPCConfig::configureTransporters also to reconfigure the transporters in TransporteRegistry. If the transporter already exist, call "configure(conf)" on it. Currently the Transporter only detect if the conf is same or different. If different a warning about this will be printed to log and the flag m_need_restart is set. Also detect if a transporter should be removed and print a warning about restart in same fashion as above. modified: storage/ndb/include/mgmcommon/IPCConfig.hpp storage/ndb/include/transporter/TransporterDefinitions.hpp storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/mgmcommon/IPCConfig.cpp storage/ndb/src/common/transporter/SCI_Transporter.cpp storage/ndb/src/common/transporter/SCI_Transporter.hpp storage/ndb/src/common/transporter/SHM_Transporter.cpp storage/ndb/src/common/transporter/SHM_Transporter.hpp storage/ndb/src/common/transporter/TCP_Transporter.cpp storage/ndb/src/common/transporter/TCP_Transporter.hpp storage/ndb/src/common/transporter/Transporter.cpp storage/ndb/src/common/transporter/Transporter.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp storage/ndb/src/kernel/vm/Configuration.cpp storage/ndb/src/mgmsrv/MgmtSrvr.cpp storage/ndb/src/mgmsrv/MgmtSrvr.hpp storage/ndb/src/ndbapi/ClusterMgr.cpp storage/ndb/src/ndbapi/ClusterMgr.hpp storage/ndb/src/ndbapi/TransporterFacade.cpp storage/ndb/src/ndbapi/TransporterFacade.hpp storage/ndb/src/ndbapi/ndb_cluster_connection.cpp per-file messages: storage/ndb/include/transporter/TransporterDefinitions.hpp Add TransporterType field to TransporterConfiguration storage/ndb/include/transporter/TransporterRegistry.hpp Use "localNodeId == 0" instead of nodeIdSpecified flag Remove 'get_localNodeId' Hide the factory functions for indivdual transporter types, instead one should use 'configureTransporter' storage/ndb/src/common/mgmcommon/IPCConfig.cpp Detect if a transporter should be removed, print a warning and return false to indicate restart would be needed. Use 'configureTransporter' regardless of which type of transporter to create storage/ndb/src/common/transporter/SCI_Transporter.cpp Add function 'configure_derived' to be used to (re)configure a transporter Currently only supports detection of "no change" storage/ndb/src/common/transporter/SCI_Transporter.hpp Add function 'configure_derived' to be used to (re)configure a transporter Currently only supports detection of "no change" storage/ndb/src/common/transporter/SHM_Transporter.cpp Add function 'configure_derived' to be used to (re)configure a transporter Currently only supports detection of "no change" storage/ndb/src/common/transporter/SHM_Transporter.hpp Add function 'configure_derived' to be used to (re)configure a transporter Currently only supports detection of "no change" storage/ndb/src/common/transporter/TCP_Transporter.cpp Add function 'configure_derived' to be used to (re)configure a transporter Currently only supports detection of "no change" Add function to extract "overload_limit" from conf storage/ndb/src/common/transporter/TCP_Transporter.hpp Add function 'configure_derived' to be used to (re)configure a transporter Currently only supports detection of "no change" storage/ndb/src/common/transporter/Transporter.cpp Add function 'configure' to be used to (re)configure a transporter Currently only supports detection of "no change" storage/ndb/src/common/transporter/Transporter.hpp Add functions 'configure' and 'configure_derived' storage/ndb/src/common/transporter/TransporterRegistry.cpp Remove "nodeidSpecified", instead use "localNodeId == 0" which means the same thing Move the code for case where "total_send_buffer == 0" into the function 'allocate_send_buffers' so that the function reflects comment and reduces code duplication Don't resize send buffer pages yet Add function 'configureTransporter' as common function to call when creating a new Transporter or reconfiguring an existing one. Add 'print_transporters' storage/ndb/src/kernel/vm/Configuration.cpp Add call to 'globalTransporterRegistry->init' to "fixate" it's nodeid Treat false from 'configureTransporters' during startup as fatal error. Move code for "total_send_buffer == 0" into 'allocate_send_buffers' storage/ndb/src/mgmsrv/MgmtSrvr.cpp Add call to reconfigure TransporterFacade and it's subclasses when configuration changed Set flag "m_need_restart" if "reconfigure" returns false storage/ndb/src/mgmsrv/MgmtSrvr.hpp Add "m_need_restart" flag. storage/ndb/src/ndbapi/ClusterMgr.cpp Mark all non existing nodes as not defined when reconfiguring ClusterMgr storage/ndb/src/ndbapi/ClusterMgr.hpp Rename 'init' to 'reconfigure' storage/ndb/src/ndbapi/TransporterFacade.cpp Move code to setup TransporterFacade from 'init/configure' to 'start_instance' Use 'start_instance' to start and 'configure' to reconfigure TransporteFacade Add calls to reconfigure Transporters, Arbitrator, ClusterMgr, scan timeout values and send buffer pages. Move code for "total_send_buffer == 0" into 'allocate_send_buffers' storage/ndb/src/ndbapi/TransporterFacade.hpp Rename 'init' to 'configure' Use "NodeId" instead of int storage/ndb/src/ndbapi/ndb_cluster_connection.cpp Check return code from 'TransporterFacade::start_instance' === modified file 'storage/ndb/include/mgmcommon/IPCConfig.hpp' --- a/storage/ndb/include/mgmcommon/IPCConfig.hpp 2008-11-11 07:25:25 +0000 +++ b/storage/ndb/include/mgmcommon/IPCConfig.hpp 2008-11-12 08:17:14 +0000 @@ -18,10 +18,19 @@ struct IPCConfig { - /* Returns the number of transporters configured */ - static Uint32 configureTransporters(Uint32 nodeId, - const struct ndb_mgm_configuration &, - class TransporterRegistry &); + /* + configure_transporters + + Create and configure transporters in TransporterRegistry + + Returns: + true - sucessfully created and (re)configured transporters + false - at least one transporter could not be created + or (re)configured + */ + static bool configureTransporters(Uint32 nodeId, + const struct ndb_mgm_configuration &, + class TransporterRegistry &); }; #endif // IPCConfig_H === modified file 'storage/ndb/include/transporter/TransporterDefinitions.hpp' --- a/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-11-11 07:26:22 +0000 +++ b/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-11-12 08:17:14 +0000 @@ -40,6 +40,12 @@ enum SendStatus { SEND_UNKNOWN_NODE = 5 }; +enum TransporterType { + tt_TCP_TRANSPORTER = 1, + tt_SCI_TRANSPORTER = 2, + tt_SHM_TRANSPORTER = 3 +}; + /** * Maximum message sizes * --------------------- @@ -75,6 +81,7 @@ struct TransporterConfiguration { bool checksum; bool signalId; bool isMgmConnection; // is a mgm connection, requires transforming + TransporterType type; union { // Transporter specific configuration information === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-06 10:17:49 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-12 08:17:14 +0000 @@ -51,12 +51,6 @@ enum IOState { HaltIO = 3 }; -enum TransporterType { - tt_TCP_TRANSPORTER = 1, - tt_SCI_TRANSPORTER = 2, - tt_SHM_TRANSPORTER = 3 - // ID 4 was OSE Transporter which has been removed. Don't use ID 4. -}; static const char *performStateString[] = { "is connected", @@ -198,17 +192,21 @@ public: IOState ioState(NodeId nodeId); void setIOState(NodeId nodeId, IOState state); - /** - * createTransporter +private: + + bool createTCPTransporter(TransporterConfiguration * config); + bool createSCITransporter(TransporterConfiguration * config); + bool createSHMTransporter(TransporterConfiguration * config); + +public: + /** + * configureTransporter + * + * Configure a transporter, ie. create new if it + * does not exist otherwise try to reconfigure it * - * If the config object indicates that the transporter - * to be created will act as a server and no server is - * started, startServer is called. A transporter of the selected kind - * is created and it is put in the transporter arrays. - */ - bool createTCPTransporter(struct TransporterConfiguration * config); - bool createSCITransporter(struct TransporterConfiguration * config); - bool createSHMTransporter(struct TransporterConfiguration * config); + */ + bool configureTransporter(TransporterConfiguration * config); /** * Allocate send buffer for default send buffer handling. @@ -324,8 +322,6 @@ public: void add_transporter_interface(NodeId remoteNodeId, const char *interf, int s_port); // signed port. <0 is dynamic Transporter* get_transporter(NodeId nodeId); - NodeId get_localNodeId() { return localNodeId; }; - struct in_addr get_connect_address(NodeId node_id) const; protected: @@ -339,7 +335,6 @@ private: int sendCounter; NodeId localNodeId; - bool nodeIdSpecified; unsigned maxTransporters; int nTransporters; int nTCPTransporters; @@ -498,6 +493,9 @@ public: bool has_data_to_send(NodeId node); void reset_send_buffer(NodeId node); + + void print_transporters(const char* where, NdbOut& out = ndbout); + }; inline void === modified file 'storage/ndb/src/common/mgmcommon/IPCConfig.cpp' --- a/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-11-11 07:25:25 +0000 +++ b/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-11-12 08:17:14 +0000 @@ -21,11 +21,12 @@ #include #include -Uint32 +bool IPCConfig::configureTransporters(Uint32 nodeId, - const struct ndb_mgm_configuration & config, - class TransporterRegistry & tr){ - TransporterConfiguration conf; + const struct ndb_mgm_configuration & config, + class TransporterRegistry & tr) +{ + bool result= true; DBUG_ENTER("IPCConfig::configureTransporters"); @@ -58,9 +59,23 @@ IPCConfig::configureTransporters(Uint32 } } - Uint32 noOfTransportersCreated= 0; + + /* Remove transporter to nodes that does not exist anymore */ + for (int i= 1; i < MAX_NODES; i++) + { + ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE); + if (tr.get_transporter(i) && iter.find(CFG_NODE_ID, i)) + { + // Transporter exist in TransporterResgistry but not + // in configuration + ndbout_c("The connection to node %d could not " + "be removed at this time", i); + result= false; // Need restart + } + } + + TransporterConfiguration conf; ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION); - for(iter.first(); iter.valid(); iter.next()){ bzero(&conf, sizeof(conf)); @@ -146,21 +161,22 @@ IPCConfig::configureTransporters(Uint32 if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break; if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break; - Uint32 tmp; - if(iter.get(CFG_SHM_SIGNUM, &tmp)) break; - conf.shm.signum= tmp; - - if(!tr.createSHMTransporter(&conf)){ - DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d", + Uint32 signum; + if(iter.get(CFG_SHM_SIGNUM, &signum)) break; + conf.shm.signum= signum; + + conf.type = tt_SHM_TRANSPORTER; + + if(!tr.configureTransporter(&conf)){ + DBUG_PRINT("error", ("Failed to configure SHM Transporter " + "from %d to %d", conf.localNodeId, conf.remoteNodeId)); - ndbout << "Failed to create SHM Transporter from: " - << conf.localNodeId << " to: " << conf.remoteNodeId << endl; - } else { - noOfTransportersCreated++; + ndbout_c("Failed to configure SHM Transporter to node %d", + conf.remoteNodeId); + result = false; } - DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, " + DBUG_PRINT("info", ("Configured SHM Transporter using shmkey %d, " "buf size = %d", conf.shm.shmKey, conf.shm.shmSize)); - break; case CONNECTION_TYPE_SCI: @@ -178,13 +194,16 @@ IPCConfig::configureTransporters(Uint32 } else { conf.sci.nLocalAdapters = 2; } - if(!tr.createSCITransporter(&conf)){ - DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d", + conf.type = tt_SCI_TRANSPORTER; + if(!tr.configureTransporter(&conf)){ + DBUG_PRINT("error", ("Failed to configure SCI Transporter " + "from %d to %d", conf.localNodeId, conf.remoteNodeId)); - ndbout << "Failed to create SCI Transporter from: " - << conf.localNodeId << " to: " << conf.remoteNodeId << endl; + ndbout_c("Failed to configure SCI Transporter to node %d", + conf.remoteNodeId); + result = false; } else { - DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, " + DBUG_PRINT("info", ("Configured SCI Transporter: Adapters = %d, " "remote SCI node id %d", conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0)); DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, " @@ -196,8 +215,6 @@ IPCConfig::configureTransporters(Uint32 "second remote SCI node id = %d", conf.sci.remoteSciNodeId1)); } - noOfTransportersCreated++; - continue; } break; @@ -217,14 +234,15 @@ IPCConfig::configureTransporters(Uint32 iter.get(CFG_TCP_RCV_BUF_SIZE, &conf.tcp.tcpRcvBufSize); iter.get(CFG_TCP_MAXSEG_SIZE, &conf.tcp.tcpMaxsegSize); iter.get(CFG_CONNECTION_OVERLOAD, &conf.tcp.tcpOverloadLimit); + + conf.type = tt_TCP_TRANSPORTER; - if(!tr.createTCPTransporter(&conf)){ - ndbout << "Failed to create TCP Transporter from: " - << nodeId << " to: " << remoteNodeId << endl; - } else { - noOfTransportersCreated++; + if(!tr.configureTransporter(&conf)){ + ndbout_c("Failed to configure TCP Transporter to node %d", + conf.remoteNodeId); + result= false; } - DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, " + DBUG_PRINT("info", ("Configured TCP Transporter: sendBufferSize = %d, " "maxReceiveSize = %d", conf.tcp.sendBufferSize, conf.tcp.maxReceiveSize)); break; @@ -235,6 +253,6 @@ IPCConfig::configureTransporters(Uint32 } // switch } // for - DBUG_RETURN(noOfTransportersCreated); + DBUG_RETURN(result); } === modified file 'storage/ndb/src/common/transporter/SCI_Transporter.cpp' --- a/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-05-29 15:06:11 +0000 +++ b/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-11-12 08:17:14 +0000 @@ -99,6 +99,20 @@ SCI_Transporter::SCI_Transporter(Transpo DBUG_VOID_RETURN; } + +bool +SCI_Transporter::configure_derived(const TransporterConfiguration* conf) +{ + if (conf->sci.sendLimit == (m_PacketSize + 3)/4 && + conf->sci.bufferSize == m_buffersize && + conf->sci.nLocalAdapters == m_adapters && + conf->sci.remoteSciNodeId0 == m_remoteNodes[0] && + conf->sci.remoteSciNodeId1 == m_remoteNodes[1]) + return true; // No change + return false; // Can't reconfigure +} + + void SCI_Transporter::disconnectImpl() { DBUG_ENTER("SCI_Transporter::disconnectImpl"); === modified file 'storage/ndb/src/common/transporter/SCI_Transporter.hpp' --- a/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-05-29 15:06:11 +0000 +++ b/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-11-12 08:17:14 +0000 @@ -144,6 +144,9 @@ private: * Destructor. Disconnects the transporter. */ ~SCI_Transporter(); + + virtual bool configure_derived(const TransporterConfiguration* conf); + bool m_mapped; bool m_initLocal; bool m_sciinit; === modified file 'storage/ndb/src/common/transporter/SHM_Transporter.cpp' --- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-11-11 07:23:27 +0000 +++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-11-12 08:17:14 +0000 @@ -64,6 +64,18 @@ SHM_Transporter::SHM_Transporter(Transpo m_signal_threshold = 4096; } + +bool +SHM_Transporter::configure_derived(const TransporterConfiguration* conf) +{ + if ((key_t)conf->shm.shmKey == shmKey && + (int)conf->shm.shmSize == shmSize && + conf->shm.signum == g_ndb_shm_signum) + return true; // No change + return false; // Can't reconfigure +} + + SHM_Transporter::~SHM_Transporter(){ doDisconnect(); } === modified file 'storage/ndb/src/common/transporter/SHM_Transporter.hpp' --- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-10-27 17:56:57 +0000 +++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-11-12 08:17:14 +0000 @@ -49,6 +49,8 @@ public: */ virtual ~SHM_Transporter(); + virtual bool configure_derived(const TransporterConfiguration* conf); + /** * Do initialization */ === modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp' --- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-10-27 17:56:57 +0000 +++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-11-12 08:17:14 +0000 @@ -69,6 +69,16 @@ setIf(int& ref, Uint32 val, Uint32 def) ref = def; } + +static +Uint32 overload_limit(const TransporterConfiguration* conf) +{ + return (conf->tcp.tcpOverloadLimit ? + conf->tcp.tcpOverloadLimit : + conf->tcp.sendBufferSize*4/5); +} + + TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg, const TransporterConfiguration* conf) : @@ -99,10 +109,25 @@ TCP_Transporter::TCP_Transporter(Transpo setIf(sockOptSndBufSize, conf->tcp.tcpSndBufSize, 71540); setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0); - m_overload_limit = conf->tcp.tcpOverloadLimit ? - conf->tcp.tcpOverloadLimit : conf->tcp.sendBufferSize*4/5; + m_overload_limit = overload_limit(conf); } + +bool +TCP_Transporter::configure_derived(const TransporterConfiguration* conf) +{ + if (conf->tcp.sendBufferSize == m_max_send_buffer && + conf->tcp.maxReceiveSize == maxReceiveSize && + conf->tcp.tcpSndBufSize == sockOptSndBufSize && + conf->tcp.tcpRcvBufSize == sockOptRcvBufSize && + conf->tcp.tcpMaxsegSize == sockOptTcpMaxSeg && + overload_limit(conf) == m_overload_limit) + return true; // No change +ndbout_c("configure_derived, can't reconfigure"); + return false; // Can't reconfigure +} + + TCP_Transporter::~TCP_Transporter() { // Disconnect === modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp' --- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-10-27 17:56:57 +0000 +++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-11-12 08:17:14 +0000 @@ -45,10 +45,12 @@ class TCP_Transporter : public Transport private: // Initialize member variables TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf); - + // Disconnect, delete send buffers and receive buffer virtual ~TCP_Transporter(); + virtual bool configure_derived(const TransporterConfiguration* conf); + /** * Allocate buffers for sending and receiving */ === modified file 'storage/ndb/src/common/transporter/Transporter.cpp' --- a/storage/ndb/src/common/transporter/Transporter.cpp 2008-10-27 17:56:57 +0000 +++ b/storage/ndb/src/common/transporter/Transporter.cpp 2008-11-12 08:17:14 +0000 @@ -102,10 +102,29 @@ Transporter::Transporter(TransporterRegi } Transporter::~Transporter(){ - if (m_socket_client) - delete m_socket_client; + delete m_socket_client; } + +bool +Transporter::configure(const TransporterConfiguration* conf) +{ + if (configure_derived(conf) && + conf->s_port == m_s_port && + strcmp(conf->remoteHostName, remoteHostName) == 0 && + strcmp(conf->localHostName, localHostName) == 0 && + conf->remoteNodeId == remoteNodeId && + conf->localNodeId == localNodeId && + (conf->serverNodeId == conf->localNodeId) == isServer && + conf->checksum == checksumUsed && + conf->signalId == signalIdUsed && + conf->isMgmConnection == isMgmConnection && + conf->type == m_type) + return true; // No change + return false; // Can't reconfigure +} + + bool Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { // all initial negotiation is done in TransporterRegistry::connect_server === modified file 'storage/ndb/src/common/transporter/Transporter.hpp' --- a/storage/ndb/src/common/transporter/Transporter.hpp 2008-10-27 17:56:57 +0000 +++ b/storage/ndb/src/common/transporter/Transporter.hpp 2008-11-12 08:17:14 +0000 @@ -116,6 +116,9 @@ protected: bool signalId, Uint32 max_send_buffer); + virtual bool configure(const TransporterConfiguration* conf); + virtual bool configure_derived(const TransporterConfiguration* conf) = 0; + /** * Blocking, for max timeOut milli seconds * Returns true if connect succeded === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-07 08:52:22 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-12 08:17:14 +0000 @@ -76,6 +76,7 @@ TransporterRegistry::TransporterRegistry unsigned _maxTransporters, unsigned sizeOfLongSignalMemory) : m_mgm_handle(0), + localNodeId(0), m_transp_count(0), m_use_default_send_buffer(use_default_send_buffer), m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0), @@ -83,7 +84,6 @@ TransporterRegistry::TransporterRegistry { DBUG_ENTER("TransporterRegistry::TransporterRegistry"); - nodeIdSpecified = false; maxTransporters = _maxTransporters; sendCounter = 1; @@ -154,6 +154,19 @@ TransporterRegistry::allocate_send_buffe if (!m_use_default_send_buffer) return; + if (total_send_buffer == 0) + total_send_buffer = get_total_max_send_buffer(); + + if (m_send_buffers) + { + /* Send buffers already allocated -> resize the buffer pages */ + assert(m_send_buffer_memory); + + // TODO resize send buffer pages + + return; + } + /* Initialize transporter send buffers (initially empty). */ m_send_buffers = new SendBuffer[maxTransporters]; for (unsigned i = 0; i < maxTransporters; i++) @@ -264,11 +277,13 @@ TransporterRegistry::disconnectAll(){ bool TransporterRegistry::init(NodeId nodeId) { DBUG_ENTER("TransporterRegistry::init"); - nodeIdSpecified = true; + assert(localNodeId == 0 || + localNodeId == nodeId); + localNodeId = nodeId; - + DEBUG("TransporterRegistry started node: " << localNodeId); - + DBUG_RETURN(true); } @@ -361,20 +376,47 @@ TransporterRegistry::connect_server(NDB_ DBUG_RETURN(res); } + +bool +TransporterRegistry::configureTransporter(TransporterConfiguration *config) +{ + NodeId remoteNodeId = config->remoteNodeId; + + assert(localNodeId); + assert(config->localNodeId == localNodeId); + + if (remoteNodeId >= maxTransporters) + return false; + + Transporter* t = theTransporters[remoteNodeId]; + if(t != NULL) + { + // Transporter already exist, try to reconfigure it + return t->configure(config); + } + + DEBUG("Configuring transporter from " << localNodeId + << " to " << remoteNodeId); + + switch (config->type){ + case tt_TCP_TRANSPORTER: + return createTCPTransporter(config); + case tt_SHM_TRANSPORTER: + return createSHMTransporter(config); + case tt_SCI_TRANSPORTER: + return createSCITransporter(config); + default: + abort(); + break; + } + return false; +} + + bool TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) { #ifdef NDB_TCP_TRANSPORTER - if(!nodeIdSpecified){ - init(config->localNodeId); - } - - if(config->localNodeId != localNodeId) - return false; - - if(theTransporters[config->remoteNodeId] != NULL) - return false; - TCP_Transporter * t = new TCP_Transporter(*this, config); if (t == NULL) @@ -405,17 +447,7 @@ TransporterRegistry::createSCITransporte if(!SCI_Transporter::initSCI()) abort(); - - if(!nodeIdSpecified){ - init(config->localNodeId); - } - - if(config->localNodeId != localNodeId) - return false; - - if(theTransporters[config->remoteNodeId] != NULL) - return false; - + SCI_Transporter * t = new SCI_Transporter(*this, config->localHostName, config->remoteHostName, @@ -457,13 +489,7 @@ bool TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) { DBUG_ENTER("TransporterRegistry::createTransporter SHM"); #ifdef NDB_SHM_TRANSPORTER - if(!nodeIdSpecified){ - init(config->localNodeId); - } - - if(config->localNodeId != localNodeId) - return false; - + if (!g_ndb_shm_signum) { g_ndb_shm_signum= config->shm.signum; DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum)); @@ -476,9 +502,6 @@ TransporterRegistry::createSHMTransporte if(config->shm.signum != g_ndb_shm_signum) return false; - - if(theTransporters[config->remoteNodeId] != NULL) - return false; SHM_Transporter * t = new SHM_Transporter(*this, config->localHostName, @@ -1270,8 +1293,12 @@ TransporterRegistry::ioState(NodeId node void TransporterRegistry::setIOState(NodeId nodeId, IOState state) { + if (ioStates[nodeId] == state) + return; + DEBUG("TransporterRegistry::setIOState(" - << nodeId << ", " << state << ")"); + << nodeId << ", " << state << ")"); + ioStates[nodeId] = state; } @@ -1609,9 +1636,10 @@ bool TransporterRegistry::start_service(SocketServer& socket_server) { DBUG_ENTER("TransporterRegistry::start_service"); - if (m_transporter_interface.size() > 0 && !nodeIdSpecified) + if (m_transporter_interface.size() > 0 && + localNodeId == 0) { - g_eventLogger->error("TransporterRegistry::startReceiving: localNodeId not specified"); + g_eventLogger->error("INTERNAL ERROR: not initialized"); DBUG_RETURN(false); } @@ -1723,6 +1751,7 @@ NdbOut & operator <<(NdbOut & out, Signa Transporter* TransporterRegistry::get_transporter(NodeId nodeId) { + assert(nodeId < maxTransporters); return theTransporters[nodeId]; } @@ -1772,7 +1801,7 @@ NDB_SOCKET_TYPE TransporterRegistry::con for(unsigned int i=0;i < m_transporter_interface.size();i++) if (m_transporter_interface[i].m_s_service_port < 0 && ndb_mgm_set_connection_int_parameter(*h, - get_localNodeId(), + localNodeId, m_transporter_interface[i].m_remote_nodeId, CFG_CONNECTION_SERVER_PORT, m_transporter_interface[i].m_s_service_port, @@ -2078,4 +2107,35 @@ TransporterRegistry::forceSend(NodeId no return false; } + +void +TransporterRegistry::print_transporters(const char* where, NdbOut& out) +{ + out << where << " >>" << endl; + + for(unsigned i = 0; i < maxTransporters; i++){ + if(theTransporters[i] == NULL) + continue; + + const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId(); + + out << i << " " + << getPerformStateString(remoteNodeId) << " to node: " + << remoteNodeId << " at " + << inet_ntoa(get_connect_address(remoteNodeId)) << endl; + } + + out << "<<" << endl; + + for (size_t i= 0; i < m_transporter_interface.size(); i++){ + Transporter_interface tf= m_transporter_interface[i]; + + out << i + << " remote node: " << tf.m_remote_nodeId + << " port: " << tf.m_s_service_port + << " interface: " << tf.m_interface << endl; + } +} + + template class Vector; === modified file 'storage/ndb/src/kernel/vm/Configuration.cpp' --- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-06 16:29:57 +0000 +++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-12 08:17:14 +0000 @@ -409,14 +409,20 @@ Configuration::setupConfiguration(){ /** * Configure transporters */ - { - int res = IPCConfig::configureTransporters(globalData.ownId, - * p, - globalTransporterRegistry); - if(res <= 0){ - ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", - "No transporters configured"); - } + if (!globalTransporterRegistry.init(globalData.ownId)) + { + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, + "Invalid configuration fetched", + "Could not init transporter registry"); + } + + if (!IPCConfig::configureTransporters(globalData.ownId, + * p, + globalTransporterRegistry)) + { + ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, + "Invalid configuration fetched", + "Could not configure transporters"); } /** @@ -434,11 +440,7 @@ Configuration::setupConfiguration(){ } Uint32 total_send_buffer = 0; - if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) || - total_send_buffer == 0) - { - total_send_buffer = globalTransporterRegistry.get_total_max_send_buffer(); - } + iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer); globalTransporterRegistry.allocate_send_buffers(total_send_buffer); if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){ === modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp' --- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-06 16:39:26 +0000 +++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-12 08:17:14 +0000 @@ -265,6 +265,7 @@ MgmtSrvr::MgmtSrvr(const MgmtOpts& opts, m_local_config(NULL), _ownReference(0), m_config_manager(NULL), + m_need_restart(false), theFacade(NULL), _isStopThread(false), _logLevelThreadSleep(500), @@ -743,7 +744,16 @@ MgmtSrvr::config_changed(NodeId node_id, setClusterLog(m_local_config); - // TODO Magnus, Reload ClusterMgr::theNodes + if (theFacade) + { + if (!theFacade->configure(_ownNodeId, + m_local_config->m_configValues)) + { + g_eventLogger->warning("Could not reconfigure everything online, " + "this node need a restart"); + m_need_restart= true; + } + } DBUG_VOID_RETURN; } === modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp' --- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-06 10:56:21 +0000 +++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-12 08:17:14 +0000 @@ -484,6 +484,8 @@ private: class ConfigManager* m_config_manager; + bool m_need_restart; + NodeBitmask m_reserved_nodes; struct in_addr m_connect_address[MAX_NODES]; === modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp' --- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-06 12:00:21 +0000 +++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-12 08:17:14 +0000 @@ -79,7 +79,8 @@ ClusterMgr::~ClusterMgr() } void -ClusterMgr::init(ndb_mgm_configuration_iterator & iter){ +ClusterMgr::configure(const ndb_mgm_configuration* config){ + ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE); for(iter.first(); iter.valid(); iter.next()){ Uint32 nodeId = 0; if(iter.get(CFG_NODE_ID, &nodeId)) @@ -109,6 +110,15 @@ ClusterMgr::init(ndb_mgm_configuration_i } } + /* Mark all non existing nodes as not defined */ + for(Uint32 i = 0; iinit(nodeId)) + return -1; + + theClusterMgr = new ClusterMgr(*this); + if (theClusterMgr == NULL) + return -1; + + if (!configure(nodeId, conf)) + return -1; + + if (!theTransporterRegistry->start_service(m_socket_server)) + return -1; + + theReceiveThread = NdbThread_Create(runReceiveResponse_C, + (void**)this, + 32768, + "ndb_receive", + NDB_THREAD_PRIO_LOW); + + theSendThread = NdbThread_Create(runSendRequest_C, + (void**)this, + 32768, + "ndb_send", + NDB_THREAD_PRIO_LOW); + + theClusterMgr->startThread(); + /** * Install signal handler for SIGPIPE * @@ -679,82 +709,89 @@ void TransporterFacade::init_cond_wait_q TransporterFacade::TransporterFacade(GlobalDictCache *cache) : theTransporterRegistry(0), + theOwnId(0), + theStartNodeId(1), + theClusterMgr(NULL), + theArbitMgr(NULL), + checkCounter(4), + currentSendLimit(1), + m_scan_batch_size(MAX_SCAN_BATCH_SIZE), + m_batch_byte_size(SCAN_BATCH_SIZE), + m_batch_size(DEF_BATCH_SIZE), theStopReceive(0), theSendThread(NULL), theReceiveThread(NULL), + m_max_trans_id(0), m_fragmented_signal_id(0), m_globalDictCache(cache) { DBUG_ENTER("TransporterFacade::TransporterFacade"); init_cond_wait_queue(); poll_owner = NULL; - theOwnId = 0; theMutexPtr = NdbMutex_Create(); sendPerformedLastInterval = 0; - checkCounter = 4; - currentSendLimit = 1; - theClusterMgr = NULL; - theArbitMgr = NULL; - theStartNodeId = 1; - m_scan_batch_size= MAX_SCAN_BATCH_SIZE; - m_batch_byte_size= SCAN_BATCH_SIZE; - m_batch_size= DEF_BATCH_SIZE; - m_max_trans_id = 0; - for (int i = 0; i < NO_API_FIXED_BLOCKS; i++) m_fixed2dynamic[i]= RNIL; - theClusterMgr = new ClusterMgr(* this); - #ifdef API_TRACE apiSignalLog = 0; #endif DBUG_VOID_RETURN; } + bool -TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) +TransporterFacade::configure(NodeId nodeId, + const ndb_mgm_configuration* conf) { - DBUG_ENTER("TransporterFacade::init"); - - theOwnId = nodeId; - theTransporterRegistry = new TransporterRegistry(this); + DBUG_ENTER("TransporterFacade::configure"); - const int res = IPCConfig::configureTransporters(nodeId, - * props, - * theTransporterRegistry); - if(res <= 0){ - TRP_DEBUG( "configureTransporters returned 0 or less" ); + assert(theOwnId == nodeId); + assert(theTransporterRegistry); + assert(theClusterMgr); + + // Configure transporters + if (!IPCConfig::configureTransporters(nodeId, + * conf, + * theTransporterRegistry)) DBUG_RETURN(false); - } - - ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE); - iter.first(); - theClusterMgr->init(iter); - - iter.first(); - if(iter.find(CFG_NODE_ID, nodeId)){ - TRP_DEBUG( "Node info missing from config." ); + + // Configure cluster manager + theClusterMgr->configure(conf); + + ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE); + if(iter.find(CFG_NODE_ID, nodeId)) DBUG_RETURN(false); - } - + + // Configure send buffers Uint32 total_send_buffer = 0; - if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) || - total_send_buffer == 0) - { - total_send_buffer = theTransporterRegistry->get_total_max_send_buffer(); - } + iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer); theTransporterRegistry->allocate_send_buffers(total_send_buffer); + // Configure arbitrator Uint32 rank = 0; - if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){ - theArbitMgr = new ArbitMgr(* this); + iter.get(CFG_NODE_ARBIT_RANK, &rank); + if (rank > 0) + { + // The arbitrator should be active + if (!theArbitMgr) + theArbitMgr = new ArbitMgr(* this); theArbitMgr->setRank(rank); + Uint32 delay = 0; iter.get(CFG_NODE_ARBIT_DELAY, &delay); theArbitMgr->setDelay(delay); } + else if (theArbitMgr) + { + // No arbitrator should be started + theArbitMgr->doStop(NULL); + delete theArbitMgr; + theArbitMgr= NULL; + } + + // Configure scan settings Uint32 scan_batch_size= 0; if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) { m_scan_batch_size= scan_batch_size; @@ -767,9 +804,9 @@ TransporterFacade::init(Uint32 nodeId, c if (!iter.get(CFG_BATCH_SIZE, &batch_size)) { m_batch_size= batch_size; } - + + // Configure timeouts Uint32 timeout = 120000; - iter.first(); for (iter.first(); iter.valid(); iter.next()) { Uint32 tmp1 = 0, tmp2 = 0; @@ -781,24 +818,6 @@ TransporterFacade::init(Uint32 nodeId, c } m_waitfor_timeout = timeout; - if (!theTransporterRegistry->start_service(m_socket_server)){ - ndbout_c("Unable to start theTransporterRegistry->start_service"); - DBUG_RETURN(false); - } - - theReceiveThread = NdbThread_Create(runReceiveResponse_C, - (void**)this, - 32768, - "ndb_receive", - NDB_THREAD_PRIO_LOW); - - theSendThread = NdbThread_Create(runSendRequest_C, - (void**)this, - 32768, - "ndb_send", - NDB_THREAD_PRIO_LOW); - theClusterMgr->startThread(); - #ifdef API_TRACE signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut); #endif === modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp' --- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-04 08:43:06 +0000 +++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-12 08:17:14 +0000 @@ -53,11 +53,16 @@ public: STATIC_CONST( MAX_NO_THREADS = 4711 ); TransporterFacade(GlobalDictCache *cache); virtual ~TransporterFacade(); - bool init(Uint32, const ndb_mgm_configuration *); - int start_instance(int, const ndb_mgm_configuration*); + int start_instance(NodeId, const ndb_mgm_configuration*); void stop_instance(); - + + /* + (Re)configure the TransporterFacade + to a specific configuration + */ + bool configure(NodeId, const ndb_mgm_configuration *); + /** * Register this block for sending/receiving signals * @blockNo block number to use, -1 => any blockNumber @@ -222,8 +227,7 @@ private: TransporterRegistry* theTransporterRegistry; SocketServer m_socket_server; int sendPerformedLastInterval; - int theOwnId; - + NodeId theOwnId; NodeId theStartNodeId; ClusterMgr* theClusterMgr; === modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp' --- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-09-09 12:04:17 +0000 +++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-11-12 08:17:14 +0000 @@ -711,7 +711,9 @@ int Ndb_cluster_connection::connect(int if(props == 0) break; - m_impl.m_transporter_facade->start_instance(nodeId, props); + if (m_impl.m_transporter_facade->start_instance(nodeId, props) < 0) + DBUG_RETURN(-1); + if (m_impl.init_nodes_vector(nodeId, *props)) { ndbout_c("Ndb_cluster_connection::connect: malloc failure");