3076 Magnus Svensson 2008-11-12 [merge]
Merge
modified:
storage/ndb/config/common.mk.am
storage/ndb/include/mgmcommon/IPCConfig.hpp
storage/ndb/include/transporter/TransporterDefinitions.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/mgmcommon/IPCConfig.cpp
storage/ndb/src/common/transporter/SCI_Transporter.cpp
storage/ndb/src/common/transporter/SCI_Transporter.hpp
storage/ndb/src/common/transporter/SHM_Transporter.cpp
storage/ndb/src/common/transporter/SHM_Transporter.hpp
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/TCP_Transporter.hpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/kernel/vm/Configuration.cpp
storage/ndb/src/kernel/vm/TransporterCallback.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/ndbapi/ClusterMgr.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
3075 Frazer Clement 2008-11-11 [merge]
Merge 6.3->6.4
modified:
storage/ndb/test/src/NDBT_Tables.cpp
=== modified file 'storage/ndb/config/common.mk.am'
--- a/storage/ndb/config/common.mk.am 2008-03-11 15:27:35 +0000
+++ b/storage/ndb/config/common.mk.am 2008-11-11 07:21:58 +0000
@@ -25,6 +25,4 @@ mgmapiincludedir = "$(pkgincludedir)/sto
INCLUDES = $(INCLUDES_LOC)
LDADD = $(LDADD_LOC)
DEFS = @DEFS@ @NDB_DEFS@ $(DEFS_LOC) $(NDB_EXTRA_FLAGS)
-NDB_CXXFLAGS=@ndb_cxxflags_fix@ $(NDB_CXXFLAGS_LOC)
-NDB_AM_CXXFLAGS:= $(AM_CXXFLAGS)
-AM_CXXFLAGS=$(NDB_AM_CXXFLAGS) $(NDB_CXXFLAGS)
+AM_CXXFLAGS= @ndb_cxxflags_fix@ $(NDB_CXXFLAGS_LOC)
=== modified file 'storage/ndb/include/mgmcommon/IPCConfig.hpp'
--- a/storage/ndb/include/mgmcommon/IPCConfig.hpp 2006-12-23 19:20:40 +0000
+++ b/storage/ndb/include/mgmcommon/IPCConfig.hpp 2008-11-12 08:17:14 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 MySQL AB
+/* Copyright (C) 2003-2008 MySQL AB, Sun Microsystems Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -16,67 +16,21 @@
#ifndef IPCConfig_H
#define IPCConfig_H
-#include <ndb_types.h>
-#include <ndb_limits.h>
-#include <kernel_types.h>
-#include <Properties.hpp>
-
-/**
- * @class IPCConfig
- * @brief Config transporters in TransporterRegistry using Properties config
- */
-class IPCConfig
+struct IPCConfig
{
-public:
- IPCConfig(Properties * props);
- ~IPCConfig();
-
- /** @return 0 for OK */
- int init();
-
- NodeId ownId() const;
-
- /** @return No of transporters configured */
- int configureTransporters(class TransporterRegistry * theTransporterRegistry);
-
- /**
- * Supply a nodeId,
- * and get next higher node id
- * @return false if none found, true otherwise
- *
- * getREPHBFrequency and getNodeType uses the last Id supplied to
- * getNextRemoteNodeId.
- */
- bool getNextRemoteNodeId(NodeId & nodeId) const;
- Uint32 getREPHBFrequency(NodeId id) const;
- const char* getNodeType(NodeId id) const;
-
- NodeId getNoOfRemoteNodes() const {
- return theNoOfRemoteNodes;
- }
-
- void print() const { props->print(); }
-
- static Uint32 configureTransporters(Uint32 nodeId,
- const struct ndb_mgm_configuration &,
- class TransporterRegistry &);
-
-private:
- NodeId the_ownId;
- Properties * props;
-
- bool addRemoteNodeId(NodeId nodeId);
- NodeId theNoOfRemoteNodes;
- NodeId theRemoteNodeIds[MAX_NODES];
-};
-
-inline
-NodeId
-IPCConfig::ownId() const
-{
- return the_ownId;
-}
+ /*
+ configure_transporters
+ Create and configure transporters in TransporterRegistry
+ Returns:
+ true - sucessfully created and (re)configured transporters
+ false - at least one transporter could not be created
+ or (re)configured
+ */
+ static bool configureTransporters(Uint32 nodeId,
+ const struct ndb_mgm_configuration &,
+ class TransporterRegistry &);
+};
#endif // IPCConfig_H
=== modified file 'storage/ndb/include/transporter/TransporterDefinitions.hpp'
--- a/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-08-27 14:34:22 +0000
+++ b/storage/ndb/include/transporter/TransporterDefinitions.hpp 2008-11-12 08:17:14 +0000
@@ -40,6 +40,12 @@ enum SendStatus {
SEND_UNKNOWN_NODE = 5
};
+enum TransporterType {
+ tt_TCP_TRANSPORTER = 1,
+ tt_SCI_TRANSPORTER = 2,
+ tt_SHM_TRANSPORTER = 3
+};
+
/**
* Maximum message sizes
* ---------------------
@@ -75,6 +81,7 @@ struct TransporterConfiguration {
bool checksum;
bool signalId;
bool isMgmConnection; // is a mgm connection, requires transforming
+ TransporterType type;
union { // Transporter specific configuration information
@@ -92,11 +99,6 @@ struct TransporterConfiguration {
Uint32 shmSize;
int signum;
} shm;
-
- struct {
- Uint32 prioASignalSize;
- Uint32 prioBSignalSize;
- } ose;
struct {
Uint32 sendLimit; // Packet size
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-10 09:28:28 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-12 08:26:18 +0000
@@ -51,12 +51,6 @@ enum IOState {
HaltIO = 3
};
-enum TransporterType {
- tt_TCP_TRANSPORTER = 1,
- tt_SCI_TRANSPORTER = 2,
- tt_SHM_TRANSPORTER = 3
- // ID 4 was OSE Transporter which has been removed. Don't use ID 4.
-};
static const char *performStateString[] =
{ "is connected",
@@ -198,17 +192,21 @@ public:
IOState ioState(NodeId nodeId);
void setIOState(NodeId nodeId, IOState state);
- /**
- * createTransporter
+private:
+
+ bool createTCPTransporter(TransporterConfiguration * config);
+ bool createSCITransporter(TransporterConfiguration * config);
+ bool createSHMTransporter(TransporterConfiguration * config);
+
+public:
+ /**
+ * configureTransporter
+ *
+ * Configure a transporter, ie. create new if it
+ * does not exist otherwise try to reconfigure it
*
- * If the config object indicates that the transporter
- * to be created will act as a server and no server is
- * started, startServer is called. A transporter of the selected kind
- * is created and it is put in the transporter arrays.
- */
- bool createTCPTransporter(struct TransporterConfiguration * config);
- bool createSCITransporter(struct TransporterConfiguration * config);
- bool createSHMTransporter(struct TransporterConfiguration * config);
+ */
+ bool configureTransporter(TransporterConfiguration * config);
/**
* Allocate send buffer for default send buffer handling.
@@ -324,8 +322,6 @@ public:
void add_transporter_interface(NodeId remoteNodeId, const char *interf,
int s_port); // signed port. <0 is dynamic
Transporter* get_transporter(NodeId nodeId);
- NodeId get_localNodeId() { return localNodeId; };
-
struct in_addr get_connect_address(NodeId node_id) const;
protected:
@@ -339,7 +335,6 @@ private:
int sendCounter;
NodeId localNodeId;
- bool nodeIdSpecified;
unsigned maxTransporters;
int nTransporters;
int nTCPTransporters;
@@ -498,6 +493,9 @@ public:
bool has_data_to_send(NodeId node);
void reset_send_buffer(NodeId node);
+
+ void print_transporters(const char* where, NdbOut& out = ndbout);
+
};
inline void
=== modified file 'storage/ndb/src/common/mgmcommon/IPCConfig.cpp'
--- a/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-08-21 06:33:53 +0000
+++ b/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2008-11-12 08:22:03 +0000
@@ -1,4 +1,4 @@
-/* Copyright (C) 2003 MySQL AB
+/* Copyright (C) 2003-2008 MySQL AB, Sun Microsystems Inc.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@@ -14,166 +14,47 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
-#include <ndb_opt_defaults.h>
#include <IPCConfig.hpp>
-#include <NdbOut.hpp>
-#include <NdbHost.h>
-#include <TransporterDefinitions.hpp>
#include <TransporterRegistry.hpp>
-#include <Properties.hpp>
+#include <mgmapi.h>
#include <mgmapi_configuration.hpp>
-#include <mgmapi_config_parameters.h>
-
-#if defined DEBUG_TRANSPORTER
-#define DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":"
<< t << endl;
-#else
-#define DEBUG(t)
-#endif
-
-IPCConfig::IPCConfig(Properties * p)
-{
- theNoOfRemoteNodes = 0;
- the_ownId = 0;
- if(p != 0)
- props = new Properties(* p);
- else
- props = 0;
-}
-IPCConfig::~IPCConfig()
+/* Return true if node with "nodeId" is a MGM node */
+static bool is_mgmd(Uint32 nodeId,
+ const struct ndb_mgm_configuration & config)
{
- if(props != 0){
- delete props;
- }
-}
-
-int
-IPCConfig::init(){
- Uint32 nodeId;
-
- if(props == 0) return -1;
- if(!props->get("LocalNodeId", &nodeId)) {
- DEBUG( "Did not find local node id." );
- return -1;
- }
- the_ownId = nodeId;
-
- Uint32 noOfConnections;
- if(!props->get("NoOfConnections", &noOfConnections)) {
- DEBUG( "Did not find noOfConnections." );
- return -1;
- }
-
- for(Uint32 i = 0; i<noOfConnections; i++){
- const Properties * tmp;
- Uint32 node1, node2;
-
- if(!props->get("Connection", i, &tmp)) {
- DEBUG( "Did not find Connection." );
- return -1;
- }
- if(!tmp->get("NodeId1", &node1)) {
- DEBUG( "Did not find NodeId1." );
- return -1;
- }
- if(!tmp->get("NodeId2", &node2)) {
- DEBUG( "Did not find NodeId2." );
- return -1;
- }
+ ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
+ if (iter.find(CFG_NODE_ID, nodeId))
+ abort();
+ Uint32 type;
+ if(iter.get(CFG_TYPE_OF_SECTION, &type))
+ abort();
- if(node1 == the_ownId && node2 != the_ownId)
- if(!addRemoteNodeId(node2)) {
- DEBUG( "addRemoteNodeId(node2) failed." );
- return -1;
- }
-
- if(node1 != the_ownId && node2 == the_ownId)
- if(!addRemoteNodeId(node1)) {
- DEBUG( "addRemoteNodeId(node2) failed." );
- return -1;
- }
- }
- return 0;
+ return (type == NODE_TYPE_MGM);
}
-bool
-IPCConfig::addRemoteNodeId(NodeId nodeId){
- for(int i = 0; i<theNoOfRemoteNodes; i++)
- if(theRemoteNodeIds[i] == nodeId)
- return false;
- theRemoteNodeIds[theNoOfRemoteNodes] = nodeId;
- theNoOfRemoteNodes++;
- return true;
-}
-/**
- * Supply a nodeId,
- * and get next higher node id
- * Returns false if none found
- */
bool
-IPCConfig::getNextRemoteNodeId(NodeId & nodeId) const {
- NodeId returnNode = MAX_NODES + 1;
- for(int i = 0; i<theNoOfRemoteNodes; i++)
- if(theRemoteNodeIds[i] > nodeId){
- if(theRemoteNodeIds[i] < returnNode){
- returnNode = theRemoteNodeIds[i];
- }
- }
- if(returnNode == (MAX_NODES + 1))
- return false;
- nodeId = returnNode;
- return true;
-}
-
-
-Uint32
-IPCConfig::getREPHBFrequency(NodeId id) const {
- const Properties * tmp;
- Uint32 out;
-
- /**
- * Todo: Fix correct heartbeat
- */
- if (!props->get("Node", id, &tmp) ||
- !tmp->get("HeartbeatIntervalRepRep", &out)) {
- DEBUG("Illegal Node or HeartbeatIntervalRepRep in config.");
- out = 10000;
- }
-
- return out;
-}
-
-const char*
-IPCConfig::getNodeType(NodeId id) const {
- const char * out;
- const Properties * tmp;
-
- if (!props->get("Node", id, &tmp) || !tmp->get("Type", &out)) {
- DEBUG("Illegal Node or NodeType in config.");
- out = "Unknown";
- }
-
- return out;
-}
-
-#include <mgmapi.h>
-Uint32
IPCConfig::configureTransporters(Uint32 nodeId,
- const struct ndb_mgm_configuration & config,
- class TransporterRegistry & tr){
- TransporterConfiguration conf;
+ const struct ndb_mgm_configuration & config,
+ class TransporterRegistry & tr)
+{
+ bool result= true;
DBUG_ENTER("IPCConfig::configureTransporters");
- /**
- * Iterate over all MGM's an construct a connectstring
- * create mgm_handle and give it to the Transporter Registry
- */
+
+ if (!is_mgmd(nodeId, config))
{
+
+ /**
+ * Iterate over all MGM's and construct a connectstring
+ * create mgm_handle and give it to the Transporter Registry
+ */
+
const char *separator= "";
BaseString connect_string;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
@@ -198,9 +79,23 @@ IPCConfig::configureTransporters(Uint32
}
}
- Uint32 noOfTransportersCreated= 0;
+
+ /* Remove transporter to nodes that does not exist anymore */
+ for (int i= 1; i < MAX_NODES; i++)
+ {
+ ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
+ if (tr.get_transporter(i) && iter.find(CFG_NODE_ID, i))
+ {
+ // Transporter exist in TransporterResgistry but not
+ // in configuration
+ ndbout_c("The connection to node %d could not "
+ "be removed at this time", i);
+ result= false; // Need restart
+ }
+ }
+
+ TransporterConfiguration conf;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
-
for(iter.first(); iter.valid(); iter.next()){
bzero(&conf, sizeof(conf));
@@ -234,19 +129,11 @@ IPCConfig::configureTransporters(Uint32
Uint32 nodeIdServer= 0;
if(iter.get(CFG_CONNECTION_NODE_ID_SERVER, &nodeIdServer)) break;
- /*
- We check the node type.
- */
- Uint32 node1type, node2type;
- ndb_mgm_configuration_iterator node1iter(config, CFG_SECTION_NODE);
- ndb_mgm_configuration_iterator node2iter(config, CFG_SECTION_NODE);
- node1iter.find(CFG_NODE_ID,nodeId1);
- node2iter.find(CFG_NODE_ID,nodeId2);
- node1iter.get(CFG_TYPE_OF_SECTION,&node1type);
- node2iter.get(CFG_TYPE_OF_SECTION,&node2type);
-
- if(node1type==NODE_TYPE_MGM || node2type==NODE_TYPE_MGM)
+ if(is_mgmd(nodeId1, config) || is_mgmd(nodeId2, config))
+ {
+ // All connections with MGM uses the mgm port as server
conf.isMgmConnection= true;
+ }
else
conf.isMgmConnection= false;
@@ -286,21 +173,22 @@ IPCConfig::configureTransporters(Uint32
if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break;
if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break;
- Uint32 tmp;
- if(iter.get(CFG_SHM_SIGNUM, &tmp)) break;
- conf.shm.signum= tmp;
-
- if(!tr.createSHMTransporter(&conf)){
- DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
+ Uint32 signum;
+ if(iter.get(CFG_SHM_SIGNUM, &signum)) break;
+ conf.shm.signum= signum;
+
+ conf.type = tt_SHM_TRANSPORTER;
+
+ if(!tr.configureTransporter(&conf)){
+ DBUG_PRINT("error", ("Failed to configure SHM Transporter "
+ "from %d to %d",
conf.localNodeId, conf.remoteNodeId));
- ndbout << "Failed to create SHM Transporter from: "
- << conf.localNodeId << " to: " << conf.remoteNodeId <<
endl;
- } else {
- noOfTransportersCreated++;
+ ndbout_c("Failed to configure SHM Transporter to node %d",
+ conf.remoteNodeId);
+ result = false;
}
- DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, "
+ DBUG_PRINT("info", ("Configured SHM Transporter using shmkey %d, "
"buf size = %d", conf.shm.shmKey, conf.shm.shmSize));
-
break;
case CONNECTION_TYPE_SCI:
@@ -318,13 +206,16 @@ IPCConfig::configureTransporters(Uint32
} else {
conf.sci.nLocalAdapters = 2;
}
- if(!tr.createSCITransporter(&conf)){
- DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
+ conf.type = tt_SCI_TRANSPORTER;
+ if(!tr.configureTransporter(&conf)){
+ DBUG_PRINT("error", ("Failed to configure SCI Transporter "
+ "from %d to %d",
conf.localNodeId, conf.remoteNodeId));
- ndbout << "Failed to create SCI Transporter from: "
- << conf.localNodeId << " to: " << conf.remoteNodeId <<
endl;
+ ndbout_c("Failed to configure SCI Transporter to node %d",
+ conf.remoteNodeId);
+ result = false;
} else {
- DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, "
+ DBUG_PRINT("info", ("Configured SCI Transporter: Adapters = %d, "
"remote SCI node id %d",
conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0));
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, "
@@ -336,8 +227,6 @@ IPCConfig::configureTransporters(Uint32
"second remote SCI node id = %d",
conf.sci.remoteSciNodeId1));
}
- noOfTransportersCreated++;
- continue;
}
break;
@@ -357,14 +246,15 @@ IPCConfig::configureTransporters(Uint32
iter.get(CFG_TCP_RCV_BUF_SIZE, &conf.tcp.tcpRcvBufSize);
iter.get(CFG_TCP_MAXSEG_SIZE, &conf.tcp.tcpMaxsegSize);
iter.get(CFG_CONNECTION_OVERLOAD, &conf.tcp.tcpOverloadLimit);
+
+ conf.type = tt_TCP_TRANSPORTER;
- if(!tr.createTCPTransporter(&conf)){
- ndbout << "Failed to create TCP Transporter from: "
- << nodeId << " to: " << remoteNodeId << endl;
- } else {
- noOfTransportersCreated++;
+ if(!tr.configureTransporter(&conf)){
+ ndbout_c("Failed to configure TCP Transporter to node %d",
+ conf.remoteNodeId);
+ result= false;
}
- DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, "
+ DBUG_PRINT("info", ("Configured TCP Transporter: sendBufferSize = %d, "
"maxReceiveSize = %d", conf.tcp.sendBufferSize,
conf.tcp.maxReceiveSize));
break;
@@ -375,6 +265,6 @@ IPCConfig::configureTransporters(Uint32
} // switch
} // for
- DBUG_RETURN(noOfTransportersCreated);
+ DBUG_RETURN(result);
}
=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -99,6 +99,20 @@ SCI_Transporter::SCI_Transporter(Transpo
DBUG_VOID_RETURN;
}
+
+bool
+SCI_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if (conf->sci.sendLimit == (m_PacketSize + 3)/4 &&
+ conf->sci.bufferSize == m_buffersize &&
+ conf->sci.nLocalAdapters == m_adapters &&
+ conf->sci.remoteSciNodeId0 == m_remoteNodes[0] &&
+ conf->sci.remoteSciNodeId1 == m_remoteNodes[1])
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
void SCI_Transporter::disconnectImpl()
{
DBUG_ENTER("SCI_Transporter::disconnectImpl");
=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -144,6 +144,9 @@ private:
* Destructor. Disconnects the transporter.
*/
~SCI_Transporter();
+
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
bool m_mapped;
bool m_initLocal;
bool m_sciinit;
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -64,6 +64,18 @@ SHM_Transporter::SHM_Transporter(Transpo
m_signal_threshold = 4096;
}
+
+bool
+SHM_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if ((key_t)conf->shm.shmKey == shmKey &&
+ (int)conf->shm.shmSize == shmSize &&
+ conf->shm.signum == g_ndb_shm_signum)
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
SHM_Transporter::~SHM_Transporter(){
doDisconnect();
}
@@ -131,19 +143,19 @@ SHM_Transporter::setupBuffers(){
#ifdef DEBUG_TRANSPORTER
printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
- printf("Reader at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
- printf("sharedReadIndex1 at %d (%p) = %d\n",
+ printf("Reader at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
+ printf("sharedReadIndex1 at %ld (%p) = %d\n",
(char*)sharedReadIndex1-shmBuf,
sharedReadIndex1, *sharedReadIndex1);
- printf("sharedWriteIndex1 at %d (%p) = %d\n",
+ printf("sharedWriteIndex1 at %ld (%p) = %d\n",
(char*)sharedWriteIndex1-shmBuf,
sharedWriteIndex1, *sharedWriteIndex1);
- printf("Writer at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
- printf("sharedReadIndex2 at %d (%p) = %d\n",
+ printf("Writer at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
+ printf("sharedReadIndex2 at %ld (%p) = %d\n",
(char*)sharedReadIndex2-shmBuf,
sharedReadIndex2, *sharedReadIndex2);
- printf("sharedWriteIndex2 at %d (%p) = %d\n",
+ printf("sharedWriteIndex2 at %ld (%p) = %d\n",
(char*)sharedWriteIndex2-shmBuf,
sharedWriteIndex2, *sharedWriteIndex2);
@@ -171,19 +183,19 @@ SHM_Transporter::setupBuffers(){
* clientStatusFlag = 1;
#ifdef DEBUG_TRANSPORTER
printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
- printf("Reader at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
- printf("sharedReadIndex2 at %d (%p) = %d\n",
+ printf("Reader at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
+ printf("sharedReadIndex2 at %ld (%p) = %d\n",
(char*)sharedReadIndex2-shmBuf,
sharedReadIndex2, *sharedReadIndex2);
- printf("sharedWriteIndex2 at %d (%p) = %d\n",
+ printf("sharedWriteIndex2 at %ld (%p) = %d\n",
(char*)sharedWriteIndex2-shmBuf,
sharedWriteIndex2, *sharedWriteIndex2);
- printf("Writer at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
- printf("sharedReadIndex1 at %d (%p) = %d\n",
+ printf("Writer at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
+ printf("sharedReadIndex1 at %ld (%p) = %d\n",
(char*)sharedReadIndex1-shmBuf,
sharedReadIndex1, *sharedReadIndex1);
- printf("sharedWriteIndex1 at %d (%p) = %d\n",
+ printf("sharedWriteIndex1 at %ld (%p) = %d\n",
(char*)sharedWriteIndex1-shmBuf,
sharedWriteIndex1, *sharedWriteIndex1);
=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -49,6 +49,8 @@ public:
*/
virtual ~SHM_Transporter();
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
/**
* Do initialization
*/
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -69,6 +69,16 @@ setIf(int& ref, Uint32 val, Uint32 def)
ref = def;
}
+
+static
+Uint32 overload_limit(const TransporterConfiguration* conf)
+{
+ return (conf->tcp.tcpOverloadLimit ?
+ conf->tcp.tcpOverloadLimit :
+ conf->tcp.sendBufferSize*4/5);
+}
+
+
TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
const TransporterConfiguration* conf)
:
@@ -99,10 +109,25 @@ TCP_Transporter::TCP_Transporter(Transpo
setIf(sockOptSndBufSize, conf->tcp.tcpSndBufSize, 71540);
setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
- m_overload_limit = conf->tcp.tcpOverloadLimit ?
- conf->tcp.tcpOverloadLimit : conf->tcp.sendBufferSize*4/5;
+ m_overload_limit = overload_limit(conf);
}
+
+bool
+TCP_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+ if (conf->tcp.sendBufferSize == m_max_send_buffer &&
+ conf->tcp.maxReceiveSize == maxReceiveSize &&
+ conf->tcp.tcpSndBufSize == sockOptSndBufSize &&
+ conf->tcp.tcpRcvBufSize == sockOptRcvBufSize &&
+ conf->tcp.tcpMaxsegSize == sockOptTcpMaxSeg &&
+ overload_limit(conf) == m_overload_limit)
+ return true; // No change
+ndbout_c("configure_derived, can't reconfigure");
+ return false; // Can't reconfigure
+}
+
+
TCP_Transporter::~TCP_Transporter() {
// Disconnect
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -45,10 +45,12 @@ class TCP_Transporter : public Transport
private:
// Initialize member variables
TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
-
+
// Disconnect, delete send buffers and receive buffer
virtual ~TCP_Transporter();
+ virtual bool configure_derived(const TransporterConfiguration* conf);
+
/**
* Allocate buffers for sending and receiving
*/
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2008-11-12 08:17:14 +0000
@@ -102,10 +102,29 @@ Transporter::Transporter(TransporterRegi
}
Transporter::~Transporter(){
- if (m_socket_client)
- delete m_socket_client;
+ delete m_socket_client;
}
+
+bool
+Transporter::configure(const TransporterConfiguration* conf)
+{
+ if (configure_derived(conf) &&
+ conf->s_port == m_s_port &&
+ strcmp(conf->remoteHostName, remoteHostName) == 0 &&
+ strcmp(conf->localHostName, localHostName) == 0 &&
+ conf->remoteNodeId == remoteNodeId &&
+ conf->localNodeId == localNodeId &&
+ (conf->serverNodeId == conf->localNodeId) == isServer &&
+ conf->checksum == checksumUsed &&
+ conf->signalId == signalIdUsed &&
+ conf->isMgmConnection == isMgmConnection &&
+ conf->type == m_type)
+ return true; // No change
+ return false; // Can't reconfigure
+}
+
+
bool
Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
// all initial negotiation is done in TransporterRegistry::connect_server
=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp 2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp 2008-11-12 08:17:14 +0000
@@ -116,6 +116,9 @@ protected:
bool signalId,
Uint32 max_send_buffer);
+ virtual bool configure(const TransporterConfiguration* conf);
+ virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
+
/**
* Blocking, for max timeOut milli seconds
* Returns true if connect succeded
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-10 09:28:28 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-11-12 08:26:18 +0000
@@ -76,6 +76,7 @@ TransporterRegistry::TransporterRegistry
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) :
m_mgm_handle(0),
+ localNodeId(0),
m_transp_count(0),
m_use_default_send_buffer(use_default_send_buffer),
m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
@@ -83,7 +84,6 @@ TransporterRegistry::TransporterRegistry
{
DBUG_ENTER("TransporterRegistry::TransporterRegistry");
- nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
@@ -154,6 +154,19 @@ TransporterRegistry::allocate_send_buffe
if (!m_use_default_send_buffer)
return;
+ if (total_send_buffer == 0)
+ total_send_buffer = get_total_max_send_buffer();
+
+ if (m_send_buffers)
+ {
+ /* Send buffers already allocated -> resize the buffer pages */
+ assert(m_send_buffer_memory);
+
+ // TODO resize send buffer pages
+
+ return;
+ }
+
/* Initialize transporter send buffers (initially empty). */
m_send_buffers = new SendBuffer[maxTransporters];
for (unsigned i = 0; i < maxTransporters; i++)
@@ -264,11 +277,13 @@ TransporterRegistry::disconnectAll(){
bool
TransporterRegistry::init(NodeId nodeId) {
DBUG_ENTER("TransporterRegistry::init");
- nodeIdSpecified = true;
+ assert(localNodeId == 0 ||
+ localNodeId == nodeId);
+
localNodeId = nodeId;
-
+
DEBUG("TransporterRegistry started node: " << localNodeId);
-
+
DBUG_RETURN(true);
}
@@ -361,20 +376,47 @@ TransporterRegistry::connect_server(NDB_
DBUG_RETURN(res);
}
+
+bool
+TransporterRegistry::configureTransporter(TransporterConfiguration *config)
+{
+ NodeId remoteNodeId = config->remoteNodeId;
+
+ assert(localNodeId);
+ assert(config->localNodeId == localNodeId);
+
+ if (remoteNodeId >= maxTransporters)
+ return false;
+
+ Transporter* t = theTransporters[remoteNodeId];
+ if(t != NULL)
+ {
+ // Transporter already exist, try to reconfigure it
+ return t->configure(config);
+ }
+
+ DEBUG("Configuring transporter from " << localNodeId
+ << " to " << remoteNodeId);
+
+ switch (config->type){
+ case tt_TCP_TRANSPORTER:
+ return createTCPTransporter(config);
+ case tt_SHM_TRANSPORTER:
+ return createSHMTransporter(config);
+ case tt_SCI_TRANSPORTER:
+ return createSCITransporter(config);
+ default:
+ abort();
+ break;
+ }
+ return false;
+}
+
+
bool
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
-
TCP_Transporter * t = new TCP_Transporter(*this, config);
if (t == NULL)
@@ -405,17 +447,7 @@ TransporterRegistry::createSCITransporte
if(!SCI_Transporter::initSCI())
abort();
-
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
-
+
SCI_Transporter * t = new SCI_Transporter(*this,
config->localHostName,
config->remoteHostName,
@@ -457,13 +489,7 @@ bool
TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
DBUG_ENTER("TransporterRegistry::createTransporter SHM");
#ifdef NDB_SHM_TRANSPORTER
- if(!nodeIdSpecified){
- init(config->localNodeId);
- }
-
- if(config->localNodeId != localNodeId)
- return false;
-
+
if (!g_ndb_shm_signum) {
g_ndb_shm_signum= config->shm.signum;
DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
@@ -476,9 +502,6 @@ TransporterRegistry::createSHMTransporte
if(config->shm.signum != g_ndb_shm_signum)
return false;
-
- if(theTransporters[config->remoteNodeId] != NULL)
- return false;
SHM_Transporter * t = new SHM_Transporter(*this,
config->localHostName,
@@ -1270,8 +1293,12 @@ TransporterRegistry::ioState(NodeId node
void
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
+ if (ioStates[nodeId] == state)
+ return;
+
DEBUG("TransporterRegistry::setIOState("
- << nodeId << ", " << state << ")");
+ << nodeId << ", " << state << ")");
+
ioStates[nodeId] = state;
}
@@ -1609,9 +1636,10 @@ bool
TransporterRegistry::start_service(SocketServer& socket_server)
{
DBUG_ENTER("TransporterRegistry::start_service");
- if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
+ if (m_transporter_interface.size() > 0 &&
+ localNodeId == 0)
{
- g_eventLogger->error("TransporterRegistry::startReceiving: localNodeId not
specified");
+ g_eventLogger->error("INTERNAL ERROR: not initialized");
DBUG_RETURN(false);
}
@@ -1723,6 +1751,7 @@ NdbOut & operator <<(NdbOut & out, Signa
Transporter*
TransporterRegistry::get_transporter(NodeId nodeId) {
+ assert(nodeId < maxTransporters);
return theTransporters[nodeId];
}
@@ -1772,7 +1801,7 @@ NDB_SOCKET_TYPE TransporterRegistry::con
for(unsigned int i=0;i < m_transporter_interface.size();i++)
if (m_transporter_interface[i].m_s_service_port < 0
&& ndb_mgm_set_connection_int_parameter(*h,
- get_localNodeId(),
+ localNodeId,
m_transporter_interface[i].m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT,
m_transporter_interface[i].m_s_service_port,
@@ -2078,4 +2107,35 @@ TransporterRegistry::forceSend(NodeId no
return false;
}
+
+void
+TransporterRegistry::print_transporters(const char* where, NdbOut& out)
+{
+ out << where << " >>" << endl;
+
+ for(unsigned i = 0; i < maxTransporters; i++){
+ if(theTransporters[i] == NULL)
+ continue;
+
+ const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
+
+ out << i << " "
+ << getPerformStateString(remoteNodeId) << " to node: "
+ << remoteNodeId << " at "
+ << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
+ }
+
+ out << "<<" << endl;
+
+ for (size_t i= 0; i < m_transporter_interface.size(); i++){
+ Transporter_interface tf= m_transporter_interface[i];
+
+ out << i
+ << " remote node: " << tf.m_remote_nodeId
+ << " port: " << tf.m_s_service_port
+ << " interface: " << tf.m_interface << endl;
+ }
+}
+
+
template class Vector<TransporterRegistry::Transporter_interface>;
=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-07 19:50:20 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp 2008-11-12 08:26:18 +0000
@@ -409,14 +409,20 @@ Configuration::setupConfiguration(){
/**
* Configure transporters
*/
- {
- int res = IPCConfig::configureTransporters(globalData.ownId,
- * p,
- globalTransporterRegistry);
- if(res <= 0){
- ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
- "No transporters configured");
- }
+ if (!globalTransporterRegistry.init(globalData.ownId))
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched",
+ "Could not init transporter registry");
+ }
+
+ if (!IPCConfig::configureTransporters(globalData.ownId,
+ * p,
+ globalTransporterRegistry))
+ {
+ ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+ "Invalid configuration fetched",
+ "Could not configure transporters");
}
/**
@@ -434,11 +440,7 @@ Configuration::setupConfiguration(){
}
Uint32 total_send_buffer = 0;
- if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
- total_send_buffer == 0)
- {
- total_send_buffer = globalTransporterRegistry.get_total_max_send_buffer();
- }
+ iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
globalTransporterRegistry.allocate_send_buffers(total_send_buffer);
if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){
=== modified file 'storage/ndb/src/kernel/vm/TransporterCallback.cpp'
--- a/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-10-30 20:30:06 +0000
+++ b/storage/ndb/src/kernel/vm/TransporterCallback.cpp 2008-11-11 07:27:00 +0000
@@ -218,7 +218,7 @@ TransporterCallbackKernel::reportError(N
const char *info)
{
#ifdef DEBUG_TRANSPORTER
- ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "")
+ ndbout_c("reportError (%d, 0x%x) %s", nodeId, errorCode, info ? info : "");
#endif
DBUG_ENTER("reportError");
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-10 11:50:45 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-11-12 08:26:18 +0000
@@ -265,6 +265,7 @@ MgmtSrvr::MgmtSrvr(const MgmtOpts& opts,
m_local_config(NULL),
_ownReference(0),
m_config_manager(NULL),
+ m_need_restart(false),
theFacade(NULL),
_isStopThread(false),
_logLevelThreadSleep(500),
@@ -743,7 +744,16 @@ MgmtSrvr::config_changed(NodeId node_id,
setClusterLog(m_local_config);
- // TODO Magnus, Reload ClusterMgr::theNodes
+ if (theFacade)
+ {
+ if (!theFacade->configure(_ownNodeId,
+ m_local_config->m_configValues))
+ {
+ g_eventLogger->warning("Could not reconfigure everything online, "
+ "this node need a restart");
+ m_need_restart= true;
+ }
+ }
DBUG_VOID_RETURN;
}
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-06 10:56:21 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-11-12 08:17:14 +0000
@@ -484,6 +484,8 @@ private:
class ConfigManager* m_config_manager;
+ bool m_need_restart;
+
NodeBitmask m_reserved_nodes;
struct in_addr m_connect_address[MAX_NODES];
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-06 12:00:21 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2008-11-12 08:17:14 +0000
@@ -79,7 +79,8 @@ ClusterMgr::~ClusterMgr()
}
void
-ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
+ClusterMgr::configure(const ndb_mgm_configuration* config){
+ ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
for(iter.first(); iter.valid(); iter.next()){
Uint32 nodeId = 0;
if(iter.get(CFG_NODE_ID, &nodeId))
@@ -109,6 +110,15 @@ ClusterMgr::init(ndb_mgm_configuration_i
}
}
+ /* Mark all non existing nodes as not defined */
+ for(Uint32 i = 0; i<MAX_NODES; i++) {
+ if (iter.first())
+ continue;
+
+ if (iter.find(CFG_NODE_ID, i))
+ theNodes[i]= Node();
+ }
+
/* Init own node info */
Node &node= theNodes[theFacade.ownId()];
assert(node.defined);
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.hpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.hpp 2008-11-06 17:34:29 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp 2008-11-12 08:17:14 +0000
@@ -38,7 +38,7 @@ class ClusterMgr {
public:
ClusterMgr(class TransporterFacade &);
~ClusterMgr();
- void init(struct ndb_mgm_configuration_iterator & config);
+ void configure(const ndb_mgm_configuration* config);
void reportConnected(NodeId nodeId);
void reportDisconnected(NodeId nodeId);
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-11-06 13:52:28 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2008-11-12 08:17:14 +0000
@@ -420,13 +420,43 @@ copy(Uint32 * & insertPtr,
*/
int
-TransporterFacade::start_instance(int nodeId,
- const ndb_mgm_configuration* props)
+TransporterFacade::start_instance(NodeId nodeId,
+ const ndb_mgm_configuration* conf)
{
- if (! init(nodeId, props)) {
+ assert(theOwnId == 0);
+ theOwnId = nodeId;
+
+ theTransporterRegistry = new TransporterRegistry(this);
+ if (theTransporterRegistry == NULL)
return -1;
- }
-
+
+ if (!theTransporterRegistry->init(nodeId))
+ return -1;
+
+ theClusterMgr = new ClusterMgr(*this);
+ if (theClusterMgr == NULL)
+ return -1;
+
+ if (!configure(nodeId, conf))
+ return -1;
+
+ if (!theTransporterRegistry->start_service(m_socket_server))
+ return -1;
+
+ theReceiveThread = NdbThread_Create(runReceiveResponse_C,
+ (void**)this,
+ 32768,
+ "ndb_receive",
+ NDB_THREAD_PRIO_LOW);
+
+ theSendThread = NdbThread_Create(runSendRequest_C,
+ (void**)this,
+ 32768,
+ "ndb_send",
+ NDB_THREAD_PRIO_LOW);
+
+ theClusterMgr->startThread();
+
/**
* Install signal handler for SIGPIPE
*
@@ -679,82 +709,89 @@ void TransporterFacade::init_cond_wait_q
TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
theTransporterRegistry(0),
+ theOwnId(0),
+ theStartNodeId(1),
+ theClusterMgr(NULL),
+ theArbitMgr(NULL),
+ checkCounter(4),
+ currentSendLimit(1),
+ m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
+ m_batch_byte_size(SCAN_BATCH_SIZE),
+ m_batch_size(DEF_BATCH_SIZE),
theStopReceive(0),
theSendThread(NULL),
theReceiveThread(NULL),
+ m_max_trans_id(0),
m_fragmented_signal_id(0),
m_globalDictCache(cache)
{
DBUG_ENTER("TransporterFacade::TransporterFacade");
init_cond_wait_queue();
poll_owner = NULL;
- theOwnId = 0;
theMutexPtr = NdbMutex_Create();
sendPerformedLastInterval = 0;
- checkCounter = 4;
- currentSendLimit = 1;
- theClusterMgr = NULL;
- theArbitMgr = NULL;
- theStartNodeId = 1;
- m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
- m_batch_byte_size= SCAN_BATCH_SIZE;
- m_batch_size= DEF_BATCH_SIZE;
- m_max_trans_id = 0;
-
for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
m_fixed2dynamic[i]= RNIL;
- theClusterMgr = new ClusterMgr(* this);
-
#ifdef API_TRACE
apiSignalLog = 0;
#endif
DBUG_VOID_RETURN;
}
+
bool
-TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
+TransporterFacade::configure(NodeId nodeId,
+ const ndb_mgm_configuration* conf)
{
- DBUG_ENTER("TransporterFacade::init");
-
- theOwnId = nodeId;
- theTransporterRegistry = new TransporterRegistry(this);
+ DBUG_ENTER("TransporterFacade::configure");
- const int res = IPCConfig::configureTransporters(nodeId,
- * props,
- * theTransporterRegistry);
- if(res <= 0){
- TRP_DEBUG( "configureTransporters returned 0 or less" );
+ assert(theOwnId == nodeId);
+ assert(theTransporterRegistry);
+ assert(theClusterMgr);
+
+ // Configure transporters
+ if (!IPCConfig::configureTransporters(nodeId,
+ * conf,
+ * theTransporterRegistry))
DBUG_RETURN(false);
- }
-
- ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
- iter.first();
- theClusterMgr->init(iter);
-
- iter.first();
- if(iter.find(CFG_NODE_ID, nodeId)){
- TRP_DEBUG( "Node info missing from config." );
+
+ // Configure cluster manager
+ theClusterMgr->configure(conf);
+
+ ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE);
+ if(iter.find(CFG_NODE_ID, nodeId))
DBUG_RETURN(false);
- }
-
+
+ // Configure send buffers
Uint32 total_send_buffer = 0;
- if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
- total_send_buffer == 0)
- {
- total_send_buffer = theTransporterRegistry->get_total_max_send_buffer();
- }
+ iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
theTransporterRegistry->allocate_send_buffers(total_send_buffer);
+ // Configure arbitrator
Uint32 rank = 0;
- if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
- theArbitMgr = new ArbitMgr(* this);
+ iter.get(CFG_NODE_ARBIT_RANK, &rank);
+ if (rank > 0)
+ {
+ // The arbitrator should be active
+ if (!theArbitMgr)
+ theArbitMgr = new ArbitMgr(* this);
theArbitMgr->setRank(rank);
+
Uint32 delay = 0;
iter.get(CFG_NODE_ARBIT_DELAY, &delay);
theArbitMgr->setDelay(delay);
}
+ else if (theArbitMgr)
+ {
+ // No arbitrator should be started
+ theArbitMgr->doStop(NULL);
+ delete theArbitMgr;
+ theArbitMgr= NULL;
+ }
+
+ // Configure scan settings
Uint32 scan_batch_size= 0;
if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
m_scan_batch_size= scan_batch_size;
@@ -767,9 +804,9 @@ TransporterFacade::init(Uint32 nodeId, c
if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
m_batch_size= batch_size;
}
-
+
+ // Configure timeouts
Uint32 timeout = 120000;
- iter.first();
for (iter.first(); iter.valid(); iter.next())
{
Uint32 tmp1 = 0, tmp2 = 0;
@@ -781,24 +818,6 @@ TransporterFacade::init(Uint32 nodeId, c
}
m_waitfor_timeout = timeout;
- if (!theTransporterRegistry->start_service(m_socket_server)){
- ndbout_c("Unable to start theTransporterRegistry->start_service");
- DBUG_RETURN(false);
- }
-
- theReceiveThread = NdbThread_Create(runReceiveResponse_C,
- (void**)this,
- 32768,
- "ndb_receive",
- NDB_THREAD_PRIO_LOW);
-
- theSendThread = NdbThread_Create(runSendRequest_C,
- (void**)this,
- 32768,
- "ndb_send",
- NDB_THREAD_PRIO_LOW);
- theClusterMgr->startThread();
-
#ifdef API_TRACE
signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
#endif
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-04 08:43:06 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-12 08:17:14 +0000
@@ -53,11 +53,16 @@ public:
STATIC_CONST( MAX_NO_THREADS = 4711 );
TransporterFacade(GlobalDictCache *cache);
virtual ~TransporterFacade();
- bool init(Uint32, const ndb_mgm_configuration *);
- int start_instance(int, const ndb_mgm_configuration*);
+ int start_instance(NodeId, const ndb_mgm_configuration*);
void stop_instance();
-
+
+ /*
+ (Re)configure the TransporterFacade
+ to a specific configuration
+ */
+ bool configure(NodeId, const ndb_mgm_configuration *);
+
/**
* Register this block for sending/receiving signals
* @blockNo block number to use, -1 => any blockNumber
@@ -222,8 +227,7 @@ private:
TransporterRegistry* theTransporterRegistry;
SocketServer m_socket_server;
int sendPerformedLastInterval;
- int theOwnId;
-
+ NodeId theOwnId;
NodeId theStartNodeId;
ClusterMgr* theClusterMgr;
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-09-09 12:04:17 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2008-11-12 08:17:14 +0000
@@ -711,7 +711,9 @@ int Ndb_cluster_connection::connect(int
if(props == 0)
break;
- m_impl.m_transporter_facade->start_instance(nodeId, props);
+ if (m_impl.m_transporter_facade->start_instance(nodeId, props) < 0)
+ DBUG_RETURN(-1);
+
if (m_impl.init_nodes_vector(nodeId, *props))
{
ndbout_c("Ndb_cluster_connection::connect: malloc failure");
| Thread |
|---|
| • bzr push into mysql-5.1 branch (msvensson:3075 to 3076) | Magnus Svensson | 12 Nov |