#At file:///home/msvensson/mysql/6.4-wl4350/
3061 Magnus Svensson 2008-11-12
WL#4350 Reconfigure transporter
Make it possible to run IPCConfig::configureTransporters
also to reconfigure the transporters in TransporteRegistry.
If the transporter already exist, call "configure(conf)" on it.
Currently the Transporter only detect if the conf is same or
different. If different a warning about this will be printed
to log and the flag m_need_restart is set.
Also detect if a transporter should be removed and print a
warning about restart in same fashion as above.
modified:
storage/ndb/include/mgmcommon/IPCConfig.hpp
storage/ndb/include/transporter/TransporterDefinitions.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/mgmcommon/IPCConfig.cpp
storage/ndb/src/common/transporter/SCI_Transporter.cpp
storage/ndb/src/common/transporter/SCI_Transporter.hpp
storage/ndb/src/common/transporter/SHM_Transporter.cpp
storage/ndb/src/common/transporter/SHM_Transporter.hpp
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/TCP_Transporter.hpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/ndbapi/ClusterMgr.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
per-file messages:
storage/ndb/include/transporter/TransporterDefinitions.hpp
Add TransporterType field to TransporterConfiguration
storage/ndb/include/transporter/TransporterRegistry.hpp
Use "localNodeId == 0" instead of nodeIdSpecified flag
Remove 'get_localNodeId'
Hide the factory functions for indivdual transporter types, instead one should
use 'configureTransporter'
storage/ndb/src/common/mgmcommon/IPCConfig.cpp
Detect if a transporter should be removed, print a warning and return false
to indicate restart would be needed.
Use 'configureTransporter' regardless of which type of transporter to create
storage/ndb/src/common/transporter/SCI_Transporter.cpp
Add function 'configure_derived' to be used to (re)configure a transporter
Currently only supports detection of "no change"
storage/ndb/src/common/transporter/SCI_Transporter.hpp
Add function 'configure_derived' to be used to (re)configure a transporter
Currently only supports detection of "no change"
storage/ndb/src/common/transporter/SHM_Transporter.cpp
Add function 'configure_derived' to be used to (re)configure a transporter
Currently only supports detection of "no change"
storage/ndb/src/common/transporter/SHM_Transporter.hpp
Add function 'configure_derived' to be used to (re)configure a transporter
Currently only supports detection of "no change"
storage/ndb/src/common/transporter/TCP_Transporter.cpp
Add function 'configure_derived' to be used to (re)configure a transporter
Currently only supports detection of "no change"
Add function to extract "overload_limit" from conf
storage/ndb/src/common/transporter/TCP_Transporter.hpp
Add function 'configure_derived' to be used to (re)configure a transporter
Currently only supports detection of "no change"
storage/ndb/src/common/transporter/Transporter.cpp
Add function 'configure' to be used to (re)configure a transporter
Currently only supports detection of "no change"
storage/ndb/src/common/transporter/Transporter.hpp
Add functions 'configure' and 'configure_derived'
storage/ndb/src/common/transporter/TransporterRegistry.cpp
Remove "nodeidSpecified", instead use "localNodeId == 0" which means the same thing
Move the code for case where "total_send_buffer == 0" into the function 'allocate_send_buffers'
so that the function reflects comment and reduces code duplication
Don't resize send buffer pages yet
Add function 'configureTransporter' as common function to call when creating a new Transporter
or reconfiguring an existing one.
Add 'print_transporters'
storage/ndb/src/kernel/vm/Configuration.cpp
Add call to 'globalTransporterRegistry->init' to "fixate" it's nodeid
Treat false from 'configureTransporters' during startup as fatal error.
Move code for "total_send_buffer == 0" into 'allocate_send_buffers'
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
Add call to reconfigure TransporterFacade and it's subclasses when configuration changed
Set flag "m_need_restart" if "reconfigure" returns false
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
Add "m_need_restart" flag.
storage/ndb/src/ndbapi/ClusterMgr.cpp
Mark all non existing nodes as not defined when reconfiguring ClusterMgr
storage/ndb/src/ndbapi/ClusterMgr.hpp
Rename 'init' to 'reconfigure'
storage/ndb/src/ndbapi/TransporterFacade.cpp
Move code to setup TransporterFacade from 'init/configure' to 'start_instance'
Use 'start_instance' to start and 'configure' to reconfigure TransporteFacade
Add calls to reconfigure Transporters, Arbitrator, ClusterMgr, scan timeout values and
send buffer pages.
Move code for "total_send_buffer == 0" into 'allocate_send_buffers'
storage/ndb/src/ndbapi/TransporterFacade.hpp
Rename 'init' to 'configure'
Use "NodeId" instead of int
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
Check return code from 'TransporterFacade::start_instance'
=== modified file 'storage/ndb/include/mgmcommon/IPCConfig.hpp'
--- a/storage/ndb/include/mgmcommon/IPCConfig.hpp 2008-11-11 07:25:25 +0000
+++ b/storage/ndb/include/mgmcommon/IPCConfig.hpp 2008-11-12 08:17:14 +0000
@@ -18,10 +18,19 @@
struct IPCConfig
{
- /* Returns the number of transporters configured */
- static Uint32 configureTransporters(Uint32 nodeId,
- const struct ndb_mgm_configuration &,
- class TransporterRegistry &);
+ /*
+ configure_transporters
+
+ Create and configure transporters in TransporterRegistry
+
+ Returns:
+ true - sucessfully created and (re)configured transporters
+ false - at least one transporter could not be created
+ or (re)configured
+ */
+ static bool configureTransporters(Uint32 nodeId,
+ const struct ndb_mgm_configuration &,
+ class TransporterRegistry &);
};
#endif // IPCConfig_H
=== modified file 'storage/ndb/include/transporter/TransporterDefinitions.hpp'
--- a/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-11-11 07:26:22 +0000
+++ b/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-11-12 08:17:14 +0000
@@ -40,6 +40,12 @@ enum SendStatus {
SEND_UNKNOWN_NODE = 5
};
+enum TransporterType {
+ tt_TCP_TRANSPORTER = 1,
+ tt_SCI_TRANSPORTER = 2,
+ tt_SHM_TRANSPORTER = 3
+};
+
/**
* Maximum message sizes
* ---------------------
@@ -75,6 +81,7 @@ struct TransporterConfiguration {
bool checksum;
bool signalId;
bool isMgmConnection; // is a mgm connection, requires transforming
+ TransporterType type;
union { // Transporter specific configuration information
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-06 10:17:49 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-12 08:17:14 +0000
@@ -51,12 +51,6 @@ enum IOState {
HaltIO = 3
};
-enum TransporterType {
- tt_TCP_TRANSPORTER = 1,
- tt_SCI_TRANSPORTER = 2,
- tt_SHM_TRANSPORTER = 3
- // ID 4 was OSE Transporter which has been removed. Don't use ID 4.
-};
static const char *performStateString[] =
{ "is connected",
@@ -198,17 +192,21 @@ public:
IOState ioState(NodeId nodeId);
void setIOState(NodeId nodeId, IOState state);
- /**
- * createTransporter
+private:
+
+ bool createTCPTransporter(TransporterConfiguration * config);
+ bool createSCITransporter(TransporterConfiguration * config);
+ bool createSHMTransporter(TransporterConfiguration * config);
+
+public:
+ /**
+ * configureTransporter
+ *
+ * Configure a transporter, ie. create new if it
+ * does not exist otherwise try to reconfigure it
*
- * If the config object indicates that the transporter
- * to be created will act as a server and no server is
- * started, startServer is called. A transporter of the selected kind
- * is created and it is put in the transporter arrays.
- */
- bool createTCPTransporter(struct TransporterConfiguration * config);
- bool createSCITransporter(struct TransporterConfiguration * config);
- bool createSHMTransporter(struct TransporterConfiguration * config);
+ */
+ bool configureTransporter(TransporterConfiguration * config);
/**
* Allocate send buffer for default send buffer handling.
@@ -324,8 +322,6 @@ public:
void add_transporter_interface(NodeId remoteNodeId, const char *interf,
int s_port); // signed port. <0 is dynamic
Transporter* get_transporter(NodeId nodeId);
- NodeId get_localNodeId() { return localNodeId; };
-
struct in_addr get_connect_address(NodeId node_id) const;
protected:
@@ -339,7 +335,6 @@ private:
int sendCounter;
NodeId localNodeId;
- bool nodeIdSpecified;
unsigned maxTransporters;
int nTransporters;
int nTCPTransporters;
@@ -498,6 +493,9 @@ public:
bool has_data_to_send(NodeId node);
void reset_send_buffer(NodeId node);
+
+ void print_transporters(const char* where, NdbOut& out = ndbout);
+
};
inline void
=== modified file 'storage/ndb/src/common/mgmcommon/IPCConfig.cpp'
--- a/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-11-11 07:25:25 +0000
+++ b/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-11-12 08:17:14 +0000
@@ -21,11 +21,12 @@
#include <mgmapi.h>
#include <mgmapi_configuration.hpp>
-Uint32
+bool
IPCConfig::configureTransporters(Uint32 nodeId,
- const struct ndb_mgm_configuration & config,
- class TransporterRegistry & tr){
- TransporterConfiguration conf;
+ const struct ndb_mgm_configuration & config,
+ class TransporterRegistry & tr)
+{
+ bool result= true;
DBUG_ENTER("IPCConfig::configureTransporters");
@@ -58,9 +59,23 @@ IPCConfig::configureTransporters(Uint32
}
}
- Uint32 noOfTransportersCreated= 0;
+
+ /* Remove transporter to nodes that does not exist anymore */
+ for (int i= 1; i < MAX_NODES; i++)
+ {
+ ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
+ if (tr.get_transporter(i) && iter.find(CFG_NODE_ID, i))
+ {
+ // Transporter exist in TransporterResgistry but not
+ // in configuration
+ ndbout_c("The connection to node %d could not "
+ "be removed at this time", i);
+ result= false; // Need restart
+ }
+ }
+
+ TransporterConfiguration conf;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
-
for(iter.first(); iter.valid(); iter.next()){
bzero(&conf, sizeof(conf));
@@ -146,21 +161,22 @@ IPCConfig::configureTransporters(Uint32
if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break;
- Uint32 tmp;
- if(iter.get(CFG_SHM_SIGNUM, &tmp)) break;
- conf.shm.signum= tmp;
-
- if(!tr.createSHMTransporter(&conf)){
- DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
+ Uint32 signum;
+ if(iter.get(CFG_SHM_SIGNUM, &signum)) break;
+ conf.shm.signum= signum;
+
+ conf.type = tt_SHM_TRANSPORTER;
+
+ if(!tr.configureTransporter(&conf)){
+ DBUG_PRINT("error", ("Failed to configure SHM Transporter "
+ "from %d to %d",
conf.localNodeId, conf.remoteNodeId));
- ndbout << "Failed to create SHM Transporter from: "
- << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
- } else {
- noOfTransportersCreated++;
+ ndbout_c("Failed to configure SHM Transporter to node %d",
+ conf.remoteNodeId);
+ result = false;
}
- DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, "
+ DBUG_PRINT("info", ("Configured SHM Transporter using shmkey %d, "
"buf size = %d", conf.shm.shmKey, conf.shm.shmSize));
-
break;
case CONNECTION_TYPE_SCI:
@@ -178,13 +194,16 @@ IPCConfig::configureTransporters(Uint32
} else {
conf.sci.nLocalAdapters = 2;
}
- if(!tr.createSCITransporter(&conf)){
- DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
+ conf.type = tt_SCI_TRANSPORTER;
+ if(!tr.configureTransporter(&conf)){
+ DBUG_PRINT("error", ("Failed to configure SCI Transporter "
+ "from %d to %d",
conf.localNodeId, conf.remoteNodeId));
- ndbout << "Failed to create SCI Transporter from: "
- << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
+ ndbout_c("Failed to configure SCI Transporter to node %d",
+ conf.remoteNodeId);
+ result = false;
} else {
- DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, "
+ DBUG_PRINT("info", ("Configured SCI Transporter: Adapters = %d, "
"remote SCI node id %d",
conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0));
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, "
@@ -196,8 +215,6 @@ IPCConfig::configureTransporters(Uint32
"second remote SCI node id = %d",
conf.sci.remoteSciNodeId1));
}
- noOfTransportersCreated++;
- continue;
}
break;
@@ -217,14 +234,15 @@ IPCConfig::configureTransporters(Uint32
iter.get(CFG_TCP_RCV_BUF_SIZE, &conf.tcp.tcpRcvBufSize);
iter.get(CFG_TCP_MAXSEG_SIZE, &conf.tcp.tcpMaxsegSize);
iter.get(CFG_CONNECTION_OVERLOAD, &conf.tcp.tcpOverloadLimit);
+
+ conf.type = tt_TCP_TRANSPORTER;
- if(!tr.createTCPTransporter(&conf)){
- ndbout << "Failed to create TCP Transporter from: "
- << nodeId << " to: " << remoteNodeId << endl;
- } else {
- noOfTransportersCreated++;
+ if(!tr.configureTransporter(&conf)){
+ ndbout_c("Failed to configure TCP Transporter to node %d",
+ conf.remoteNodeId);
+ result= false;
}
- DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, "
+ DBUG_PRINT("info", ("Configured TCP Transporter: sendBufferSize = %d, "
"maxReceiveSize = %d", conf.tcp.sendBufferSize,
conf.tcp.maxReceiveSize));
break;
@@ -235,6 +253,6 @@ IPCConfig::configureTransporters(Uint32
} // switch
} // for
- DBUG_RETURN(noOfTransportersCreated);
+ DBUG_RETURN(result);
}
=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -99,6 +99,20 @@ SCI_Transporter::SCI_Transporter(Transpo
DBUG_VOID_RETURN;
}
+
+bool
+SCI_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if (conf->sci.sendLimit == (m_PacketSize + 3)/4 &&
+ conf->sci.bufferSize == m_buffersize &&
+ conf->sci.nLocalAdapters == m_adapters &&
+ conf->sci.remoteSciNodeId0 == m_remoteNodes[0] &&
+ conf->sci.remoteSciNodeId1 == m_remoteNodes[1])
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
void SCI_Transporter::disconnectImpl()
{
DBUG_ENTER("SCI_Transporter::disconnectImpl");
=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -144,6 +144,9 @@ private:
* Destructor. Disconnects the transporter.
*/
~SCI_Transporter();
+
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
bool m_mapped;
bool m_initLocal;
bool m_sciinit;
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-11-11 07:23:27 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -64,6 +64,18 @@ SHM_Transporter::SHM_Transporter(Transpo
m_signal_threshold = 4096;
}
+
+bool
+SHM_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if ((key_t)conf->shm.shmKey == shmKey &&
+ (int)conf->shm.shmSize == shmSize &&
+ conf->shm.signum == g_ndb_shm_signum)
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
SHM_Transporter::~SHM_Transporter(){
doDisconnect();
}
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -49,6 +49,8 @@ public:
*/
virtual ~SHM_Transporter();
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
/**
* Do initialization
*/
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -69,6 +69,16 @@ setIf(int& ref, Uint32 val, Uint32 def)
ref = def;
}
+
+static
+Uint32 overload_limit(const TransporterConfiguration* conf)
+{
+ return (conf->tcp.tcpOverloadLimit ?
+ conf->tcp.tcpOverloadLimit :
+ conf->tcp.sendBufferSize*4/5);
+}
+
+
TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
const TransporterConfiguration* conf)
:
@@ -99,10 +109,25 @@ TCP_Transporter::TCP_Transporter(Transpo
setIf(sockOptSndBufSize, conf->tcp.tcpSndBufSize, 71540);
setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
- m_overload_limit = conf->tcp.tcpOverloadLimit ?
- conf->tcp.tcpOverloadLimit : conf->tcp.sendBufferSize*4/5;
+ m_overload_limit = overload_limit(conf);
}
+
+bool
+TCP_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if (conf->tcp.sendBufferSize == m_max_send_buffer &&
+ conf->tcp.maxReceiveSize == maxReceiveSize &&
+ conf->tcp.tcpSndBufSize == sockOptSndBufSize &&
+ conf->tcp.tcpRcvBufSize == sockOptRcvBufSize &&
+ conf->tcp.tcpMaxsegSize == sockOptTcpMaxSeg &&
+ overload_limit(conf) == m_overload_limit)
+ return true; // No change
+ndbout_c("configure_derived, can't reconfigure");
+ return false; // Can't reconfigure
+}
+
+
TCP_Transporter::~TCP_Transporter() {
// Disconnect
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -45,10 +45,12 @@ class TCP_Transporter : public Transport
private:
// Initialize member variables
TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
-
+
// Disconnect, delete send buffers and receive buffer
virtual ~TCP_Transporter();
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
/**
* Allocate buffers for sending and receiving
*/
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -102,10 +102,29 @@ Transporter::Transporter(TransporterRegi
}
Transporter::~Transporter(){
- if (m_socket_client)
- delete m_socket_client;
+ delete m_socket_client;
}
+
+bool
+Transporter::configure(const TransporterConfiguration* conf)
+{
+ if (configure_derived(conf) &&
+ conf->s_port == m_s_port &&
+ strcmp(conf->remoteHostName, remoteHostName) == 0 &&
+ strcmp(conf->localHostName, localHostName) == 0 &&
+ conf->remoteNodeId == remoteNodeId &&
+ conf->localNodeId == localNodeId &&
+ (conf->serverNodeId == conf->localNodeId) == isServer &&
+ conf->checksum == checksumUsed &&
+ conf->signalId == signalIdUsed &&
+ conf->isMgmConnection == isMgmConnection &&
+ conf->type == m_type)
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
bool
Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
// all initial negotiation is done in TransporterRegistry::connect_server
=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -116,6 +116,9 @@ protected:
bool signalId,
Uint32 max_send_buffer);
+ virtual bool configure(const TransporterConfiguration* conf);
+ virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
+
/**
* Blocking, for max timeOut milli seconds
* Returns true if connect succeded
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-07 08:52:22 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-12 08:17:14 +0000
@@ -76,6 +76,7 @@ TransporterRegistry::TransporterRegistry
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) :
m_mgm_handle(0),
+ localNodeId(0),
m_transp_count(0),
m_use_default_send_buffer(use_default_send_buffer),
m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
@@ -83,7 +84,6 @@ TransporterRegistry::TransporterRegistry
{
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
- nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
@@ -154,6 +154,19 @@ TransporterRegistry::allocate_send_buffe
if (!m_use_default_send_buffer)
return;
+ if (total_send_buffer == 0)
+ total_send_buffer = get_total_max_send_buffer();
+
+ if (m_send_buffers)
+ {
+ /* Send buffers already allocated -> resize the buffer pages */
+ assert(m_send_buffer_memory);
+
+ // TODO resize send buffer pages
+
+ return;
+ }
+
/* Initialize transporter send buffers (initially empty). */
m_send_buffers = new SendBuffer[maxTransporters];
for (unsigned i = 0; i < maxTransporters; i++)
@@ -264,11 +277,13 @@ TransporterRegistry::disconnectAll(){
bool
TransporterRegistry::init(NodeId nodeId) {
DBUG_ENTER("TransporterRegistry::init");
- nodeIdSpecified = true;
+ assert(localNodeId == 0 ||
+ localNodeId == nodeId);
+
localNodeId = nodeId;
-
+
DEBUG("TransporterRegistry started node: " << localNodeId);
-
+
DBUG_RETURN(true);
}
@@ -361,20 +376,47 @@ TransporterRegistry::connect_server(NDB_
DBUG_RETURN(res);
}
+
+bool
+TransporterRegistry::configureTransporter(TransporterConfiguration *config)
+{
+ NodeId remoteNodeId = config->remoteNodeId;
+
+ assert(localNodeId);
+ assert(config->localNodeId == localNodeId);
+
+ if (remoteNodeId >= maxTransporters)
+ return false;
+
+ Transporter* t = theTransporters[remoteNodeId];
+ if(t != NULL)
+ {
+ // Transporter already exist, try to reconfigure it
+ return t->configure(config);
+ }
+
+ DEBUG("Configuring transporter from " << localNodeId
+ << " to " << remoteNodeId);
+
+ switch (config->type){
+ case tt_TCP_TRANSPORTER:
+ return createTCPTransporter(config);
+ case tt_SHM_TRANSPORTER:
+ return createSHMTransporter(config);
+ case tt_SCI_TRANSPORTER:
+ return createSCITransporter(config);
+ default:
+ abort();
+ break;
+ }
+ return false;
+}
+
+
bool
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
-
TCP_Transporter * t = new TCP_Transporter(*this, config);
if (t == NULL)
@@ -405,17 +447,7 @@ TransporterRegistry::createSCITransporte
if(!SCI_Transporter::initSCI())
abort();
-
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
-
+
SCI_Transporter * t = new SCI_Transporter(*this,
config->localHostName,
config->remoteHostName,
@@ -457,13 +489,7 @@ bool
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
DBUG_ENTER("TransporterRegistry::createTransporter SHM");
#ifdef NDB_SHM_TRANSPORTER
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
+
if (!g_ndb_shm_signum) {
g_ndb_shm_signum= config->shm.signum;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
@@ -476,9 +502,6 @@ TransporterRegistry::createSHMTransporte
if(config->shm.signum != g_ndb_shm_signum)
return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
SHM_Transporter * t = new SHM_Transporter(*this,
config->localHostName,
@@ -1270,8 +1293,12 @@ TransporterRegistry::ioState(NodeId node
void
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
+ if (ioStates[nodeId] == state)
+ return;
+
DEBUG("TransporterRegistry::setIOState("
- << nodeId << ", " << state << ")");
+ << nodeId << ", " << state << ")");
+
ioStates[nodeId] = state;
}
@@ -1609,9 +1636,10 @@ bool
TransporterRegistry::start_service(SocketServer& socket_server)
{
DBUG_ENTER("TransporterRegistry::start_service");
- if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
+ if (m_transporter_interface.size() > 0 &&
+ localNodeId == 0)
{
- g_eventLogger->error("TransporterRegistry::startReceiving: localNodeId not specified");
+ g_eventLogger->error("INTERNAL ERROR: not initialized");
DBUG_RETURN(false);
}
@@ -1723,6 +1751,7 @@ NdbOut & operator <<(NdbOut & out, Signa
Transporter*
TransporterRegistry::get_transporter(NodeId nodeId) {
+ assert(nodeId < maxTransporters);
return theTransporters[nodeId];
}
@@ -1772,7 +1801,7 @@ NDB_SOCKET_TYPE TransporterRegistry::con
for(unsigned int i=0;i < m_transporter_interface.size();i++)
if (m_transporter_interface[i].m_s_service_port < 0
&& ndb_mgm_set_connection_int_parameter(*h,
- get_localNodeId(),
+ localNodeId,
m_transporter_interface[i].m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT,
m_transporter_interface[i].m_s_service_port,
@@ -2078,4 +2107,35 @@ TransporterRegistry::forceSend(NodeId no
return false;
}
+
+void
+TransporterRegistry::print_transporters(const char* where, NdbOut& out)
+{
+ out << where << " >>" << endl;
+
+ for(unsigned i = 0; i < maxTransporters; i++){
+ if(theTransporters[i] == NULL)
+ continue;
+
+ const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
+
+ out << i << " "
+ << getPerformStateString(remoteNodeId) << " to node: "
+ << remoteNodeId << " at "
+ << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
+ }
+
+ out << "<<" << endl;
+
+ for (size_t i= 0; i < m_transporter_interface.size(); i++){
+ Transporter_interface tf= m_transporter_interface[i];
+
+ out << i
+ << " remote node: " << tf.m_remote_nodeId
+ << " port: " << tf.m_s_service_port
+ << " interface: " << tf.m_interface << endl;
+ }
+}
+
+
template class Vector<TransporterRegistry::Transporter_interface>;
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-06 16:29:57 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-12 08:17:14 +0000
@@ -409,14 +409,20 @@ Configuration::setupConfiguration(){
/**
* Configure transporters
*/
- {
- int res = IPCConfig::configureTransporters(globalData.ownId,
- * p,
- globalTransporterRegistry);
- if(res <= 0){
- ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
- "No transporters configured");
- }
+ if (!globalTransporterRegistry.init(globalData.ownId))
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched",
+ "Could not init transporter registry");
+ }
+
+ if (!IPCConfig::configureTransporters(globalData.ownId,
+ * p,
+ globalTransporterRegistry))
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched",
+ "Could not configure transporters");
}
/**
@@ -434,11 +440,7 @@ Configuration::setupConfiguration(){
}
Uint32 total_send_buffer = 0;
- if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
- total_send_buffer == 0)
- {
- total_send_buffer = globalTransporterRegistry.get_total_max_send_buffer();
- }
+ iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
globalTransporterRegistry.allocate_send_buffers(total_send_buffer);
if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-06 16:39:26 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-12 08:17:14 +0000
@@ -265,6 +265,7 @@ MgmtSrvr::MgmtSrvr(const MgmtOpts& opts,
m_local_config(NULL),
_ownReference(0),
m_config_manager(NULL),
+ m_need_restart(false),
theFacade(NULL),
_isStopThread(false),
_logLevelThreadSleep(500),
@@ -743,7 +744,16 @@ MgmtSrvr::config_changed(NodeId node_id,
setClusterLog(m_local_config);
- // TODO Magnus, Reload ClusterMgr::theNodes
+ if (theFacade)
+ {
+ if (!theFacade->configure(_ownNodeId,
+ m_local_config->m_configValues))
+ {
+ g_eventLogger->warning("Could not reconfigure everything online, "
+ "this node need a restart");
+ m_need_restart= true;
+ }
+ }
DBUG_VOID_RETURN;
}
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-06 10:56:21 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-12 08:17:14 +0000
@@ -484,6 +484,8 @@ private:
class ConfigManager* m_config_manager;
+ bool m_need_restart;
+
NodeBitmask m_reserved_nodes;
struct in_addr m_connect_address[MAX_NODES];
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-06 12:00:21 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-12 08:17:14 +0000
@@ -79,7 +79,8 @@ ClusterMgr::~ClusterMgr()
}
void
-ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
+ClusterMgr::configure(const ndb_mgm_configuration* config){
+ ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
for(iter.first(); iter.valid(); iter.next()){
Uint32 nodeId = 0;
if(iter.get(CFG_NODE_ID, &nodeId))
@@ -109,6 +110,15 @@ ClusterMgr::init(ndb_mgm_configuration_i
}
}
+ /* Mark all non existing nodes as not defined */
+ for(Uint32 i = 0; i<MAX_NODES; i++) {
+ if (iter.first())
+ continue;
+
+ if (iter.find(CFG_NODE_ID, i))
+ theNodes[i]= Node();
+ }
+
/* Init own node info */
Node &node= theNodes[theFacade.ownId()];
assert(node.defined);
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.hpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.hpp 2008-11-06 17:34:29 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp 2008-11-12 08:17:14 +0000
@@ -38,7 +38,7 @@ class ClusterMgr {
public:
ClusterMgr(class TransporterFacade &);
~ClusterMgr();
- void init(struct ndb_mgm_configuration_iterator & config);
+ void configure(const ndb_mgm_configuration* config);
void reportConnected(NodeId nodeId);
void reportDisconnected(NodeId nodeId);
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-11-06 13:52:28 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-11-12 08:17:14 +0000
@@ -420,13 +420,43 @@ copy(Uint32 * & insertPtr,
*/
int
-TransporterFacade::start_instance(int nodeId,
- const ndb_mgm_configuration* props)
+TransporterFacade::start_instance(NodeId nodeId,
+ const ndb_mgm_configuration* conf)
{
- if (! init(nodeId, props)) {
+ assert(theOwnId == 0);
+ theOwnId = nodeId;
+
+ theTransporterRegistry = new TransporterRegistry(this);
+ if (theTransporterRegistry == NULL)
return -1;
- }
-
+
+ if (!theTransporterRegistry->init(nodeId))
+ return -1;
+
+ theClusterMgr = new ClusterMgr(*this);
+ if (theClusterMgr == NULL)
+ return -1;
+
+ if (!configure(nodeId, conf))
+ return -1;
+
+ if (!theTransporterRegistry->start_service(m_socket_server))
+ return -1;
+
+ theReceiveThread = NdbThread_Create(runReceiveResponse_C,
+ (void**)this,
+ 32768,
+ "ndb_receive",
+ NDB_THREAD_PRIO_LOW);
+
+ theSendThread = NdbThread_Create(runSendRequest_C,
+ (void**)this,
+ 32768,
+ "ndb_send",
+ NDB_THREAD_PRIO_LOW);
+
+ theClusterMgr->startThread();
+
/**
* Install signal handler for SIGPIPE
*
@@ -679,82 +709,89 @@ void TransporterFacade::init_cond_wait_q
TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
theTransporterRegistry(0),
+ theOwnId(0),
+ theStartNodeId(1),
+ theClusterMgr(NULL),
+ theArbitMgr(NULL),
+ checkCounter(4),
+ currentSendLimit(1),
+ m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
+ m_batch_byte_size(SCAN_BATCH_SIZE),
+ m_batch_size(DEF_BATCH_SIZE),
theStopReceive(0),
theSendThread(NULL),
theReceiveThread(NULL),
+ m_max_trans_id(0),
m_fragmented_signal_id(0),
m_globalDictCache(cache)
{
DBUG_ENTER("TransporterFacade::TransporterFacade");
init_cond_wait_queue();
poll_owner = NULL;
- theOwnId = 0;
theMutexPtr = NdbMutex_Create();
sendPerformedLastInterval = 0;
- checkCounter = 4;
- currentSendLimit = 1;
- theClusterMgr = NULL;
- theArbitMgr = NULL;
- theStartNodeId = 1;
- m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
- m_batch_byte_size= SCAN_BATCH_SIZE;
- m_batch_size= DEF_BATCH_SIZE;
- m_max_trans_id = 0;
-
for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
m_fixed2dynamic[i]= RNIL;
- theClusterMgr = new ClusterMgr(* this);
-
#ifdef API_TRACE
apiSignalLog = 0;
#endif
DBUG_VOID_RETURN;
}
+
bool
-TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
+TransporterFacade::configure(NodeId nodeId,
+ const ndb_mgm_configuration* conf)
{
- DBUG_ENTER("TransporterFacade::init");
-
- theOwnId = nodeId;
- theTransporterRegistry = new TransporterRegistry(this);
+ DBUG_ENTER("TransporterFacade::configure");
- const int res = IPCConfig::configureTransporters(nodeId,
- * props,
- * theTransporterRegistry);
- if(res <= 0){
- TRP_DEBUG( "configureTransporters returned 0 or less" );
+ assert(theOwnId == nodeId);
+ assert(theTransporterRegistry);
+ assert(theClusterMgr);
+
+ // Configure transporters
+ if (!IPCConfig::configureTransporters(nodeId,
+ * conf,
+ * theTransporterRegistry))
DBUG_RETURN(false);
- }
-
- ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
- iter.first();
- theClusterMgr->init(iter);
-
- iter.first();
- if(iter.find(CFG_NODE_ID, nodeId)){
- TRP_DEBUG( "Node info missing from config." );
+
+ // Configure cluster manager
+ theClusterMgr->configure(conf);
+
+ ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE);
+ if(iter.find(CFG_NODE_ID, nodeId))
DBUG_RETURN(false);
- }
-
+
+ // Configure send buffers
Uint32 total_send_buffer = 0;
- if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
- total_send_buffer == 0)
- {
- total_send_buffer = theTransporterRegistry->get_total_max_send_buffer();
- }
+ iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
theTransporterRegistry->allocate_send_buffers(total_send_buffer);
+ // Configure arbitrator
Uint32 rank = 0;
- if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
- theArbitMgr = new ArbitMgr(* this);
+ iter.get(CFG_NODE_ARBIT_RANK, &rank);
+ if (rank > 0)
+ {
+ // The arbitrator should be active
+ if (!theArbitMgr)
+ theArbitMgr = new ArbitMgr(* this);
theArbitMgr->setRank(rank);
+
Uint32 delay = 0;
iter.get(CFG_NODE_ARBIT_DELAY, &delay);
theArbitMgr->setDelay(delay);
}
+ else if (theArbitMgr)
+ {
+ // No arbitrator should be started
+ theArbitMgr->doStop(NULL);
+ delete theArbitMgr;
+ theArbitMgr= NULL;
+ }
+
+ // Configure scan settings
Uint32 scan_batch_size= 0;
if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
m_scan_batch_size= scan_batch_size;
@@ -767,9 +804,9 @@ TransporterFacade::init(Uint32 nodeId, c
if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
m_batch_size= batch_size;
}
-
+
+ // Configure timeouts
Uint32 timeout = 120000;
- iter.first();
for (iter.first(); iter.valid(); iter.next())
{
Uint32 tmp1 = 0, tmp2 = 0;
@@ -781,24 +818,6 @@ TransporterFacade::init(Uint32 nodeId, c
}
m_waitfor_timeout = timeout;
- if (!theTransporterRegistry->start_service(m_socket_server)){
- ndbout_c("Unable to start theTransporterRegistry->start_service");
- DBUG_RETURN(false);
- }
-
- theReceiveThread = NdbThread_Create(runReceiveResponse_C,
- (void**)this,
- 32768,
- "ndb_receive",
- NDB_THREAD_PRIO_LOW);
-
- theSendThread = NdbThread_Create(runSendRequest_C,
- (void**)this,
- 32768,
- "ndb_send",
- NDB_THREAD_PRIO_LOW);
- theClusterMgr->startThread();
-
#ifdef API_TRACE
signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
#endif
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-04 08:43:06 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-12 08:17:14 +0000
@@ -53,11 +53,16 @@ public:
STATIC_CONST( MAX_NO_THREADS = 4711 );
TransporterFacade(GlobalDictCache *cache);
virtual ~TransporterFacade();
- bool init(Uint32, const ndb_mgm_configuration *);
- int start_instance(int, const ndb_mgm_configuration*);
+ int start_instance(NodeId, const ndb_mgm_configuration*);
void stop_instance();
-
+
+ /*
+ (Re)configure the TransporterFacade
+ to a specific configuration
+ */
+ bool configure(NodeId, const ndb_mgm_configuration *);
+
/**
* Register this block for sending/receiving signals
* @blockNo block number to use, -1 => any blockNumber
@@ -222,8 +227,7 @@ private:
TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server;
int sendPerformedLastInterval;
- int theOwnId;
-
+ NodeId theOwnId;
NodeId theStartNodeId;
ClusterMgr* theClusterMgr;
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-09-09 12:04:17 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-11-12 08:17:14 +0000
@@ -711,7 +711,9 @@ int Ndb_cluster_connection::connect(int
if(props == 0)
break;
- m_impl.m_transporter_facade->start_instance(nodeId, props);
+ if (m_impl.m_transporter_facade->start_instance(nodeId, props) < 0)
+ DBUG_RETURN(-1);
+
if (m_impl.init_nodes_vector(nodeId, *props))
{
ndbout_c("Ndb_cluster_connection::connect: malloc failure");
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (msvensson:3061) WL#4350 | Magnus Svensson | 12 Nov |