List:Commits« Previous MessageNext Message »
From:Magnus Svensson Date:November 12 2008 8:17am
Subject:bzr commit into mysql-5.1 branch (msvensson:3061) WL#4350
View as plain text  
#At file:///home/msvensson/mysql/6.4-wl4350/

 3061 Magnus Svensson	2008-11-12
      WL#4350 Reconfigure transporter
      
      Make it possible to run IPCConfig::configureTransporters
      also to reconfigure the transporters in TransporteRegistry.
      
      If the transporter already exist, call "configure(conf)" on it.
      
      Currently the Transporter only detect if the conf is same or
      different. If different a warning about this will be printed
      to log and the flag m_need_restart is set.
      
      Also detect if a transporter should be removed and print a
      warning about restart in same fashion as above.
modified:
  storage/ndb/include/mgmcommon/IPCConfig.hpp
  storage/ndb/include/transporter/TransporterDefinitions.hpp
  storage/ndb/include/transporter/TransporterRegistry.hpp
  storage/ndb/src/common/mgmcommon/IPCConfig.cpp
  storage/ndb/src/common/transporter/SCI_Transporter.cpp
  storage/ndb/src/common/transporter/SCI_Transporter.hpp
  storage/ndb/src/common/transporter/SHM_Transporter.cpp
  storage/ndb/src/common/transporter/SHM_Transporter.hpp
  storage/ndb/src/common/transporter/TCP_Transporter.cpp
  storage/ndb/src/common/transporter/TCP_Transporter.hpp
  storage/ndb/src/common/transporter/Transporter.cpp
  storage/ndb/src/common/transporter/Transporter.hpp
  storage/ndb/src/common/transporter/TransporterRegistry.cpp
  storage/ndb/src/kernel/vm/Configuration.cpp
  storage/ndb/src/mgmsrv/MgmtSrvr.cpp
  storage/ndb/src/mgmsrv/MgmtSrvr.hpp
  storage/ndb/src/ndbapi/ClusterMgr.cpp
  storage/ndb/src/ndbapi/ClusterMgr.hpp
  storage/ndb/src/ndbapi/TransporterFacade.cpp
  storage/ndb/src/ndbapi/TransporterFacade.hpp
  storage/ndb/src/ndbapi/ndb_cluster_connection.cpp

per-file messages:
  storage/ndb/include/transporter/TransporterDefinitions.hpp
    Add TransporterType field to TransporterConfiguration
  storage/ndb/include/transporter/TransporterRegistry.hpp
    Use "localNodeId == 0" instead of nodeIdSpecified flag
    Remove 'get_localNodeId'
    Hide the factory functions for indivdual transporter types, instead one should
    use 'configureTransporter'
  storage/ndb/src/common/mgmcommon/IPCConfig.cpp
    Detect if a transporter should be removed, print a warning and return false
    to indicate restart would be needed.
    Use 'configureTransporter' regardless of which type of transporter to create
  storage/ndb/src/common/transporter/SCI_Transporter.cpp
    Add function 'configure_derived' to be used to (re)configure a transporter
    Currently only supports detection of "no change"
  storage/ndb/src/common/transporter/SCI_Transporter.hpp
    Add function 'configure_derived' to be used to (re)configure a transporter
    Currently only supports detection of "no change"
  storage/ndb/src/common/transporter/SHM_Transporter.cpp
    Add function 'configure_derived' to be used to (re)configure a transporter
    Currently only supports detection of "no change"
  storage/ndb/src/common/transporter/SHM_Transporter.hpp
    Add function 'configure_derived' to be used to (re)configure a transporter
    Currently only supports detection of "no change"
  storage/ndb/src/common/transporter/TCP_Transporter.cpp
    Add function 'configure_derived' to be used to (re)configure a transporter
    Currently only supports detection of "no change" 
    Add function to extract "overload_limit" from conf
  storage/ndb/src/common/transporter/TCP_Transporter.hpp
    Add function 'configure_derived' to be used to (re)configure a transporter
    Currently only supports detection of "no change"
  storage/ndb/src/common/transporter/Transporter.cpp
    Add function 'configure' to be used to (re)configure a transporter
    Currently only supports detection of "no change"
  storage/ndb/src/common/transporter/Transporter.hpp
    Add functions 'configure' and 'configure_derived'
  storage/ndb/src/common/transporter/TransporterRegistry.cpp
    Remove "nodeidSpecified", instead use "localNodeId == 0" which means the same thing
    Move the code for case where "total_send_buffer == 0" into the function 'allocate_send_buffers'
    so that the function reflects comment and reduces code duplication
    Don't resize send buffer pages yet
    Add function 'configureTransporter' as common function to call when creating a new Transporter
    or reconfiguring an existing one.
    Add 'print_transporters'
  storage/ndb/src/kernel/vm/Configuration.cpp
    Add call to 'globalTransporterRegistry->init' to "fixate"  it's nodeid
    Treat false from 'configureTransporters' during startup as fatal error.
    Move code for "total_send_buffer == 0" into 'allocate_send_buffers'
  storage/ndb/src/mgmsrv/MgmtSrvr.cpp
    Add call to reconfigure TransporterFacade and it's subclasses when configuration changed
    Set flag "m_need_restart" if "reconfigure" returns false
  storage/ndb/src/mgmsrv/MgmtSrvr.hpp
    Add "m_need_restart" flag.
  storage/ndb/src/ndbapi/ClusterMgr.cpp
    Mark all non existing nodes as not defined when reconfiguring ClusterMgr
  storage/ndb/src/ndbapi/ClusterMgr.hpp
    Rename 'init' to 'reconfigure'
  storage/ndb/src/ndbapi/TransporterFacade.cpp
    Move code to setup TransporterFacade from 'init/configure' to 'start_instance'
    Use 'start_instance' to start and 'configure' to reconfigure TransporteFacade
    Add calls to reconfigure Transporters, Arbitrator, ClusterMgr, scan timeout values and
    send buffer pages.
    Move code for "total_send_buffer == 0" into 'allocate_send_buffers'
  storage/ndb/src/ndbapi/TransporterFacade.hpp
    Rename 'init' to 'configure'
    Use "NodeId" instead of int
  storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
    Check return code from 'TransporterFacade::start_instance'
=== modified file 'storage/ndb/include/mgmcommon/IPCConfig.hpp'
--- a/storage/ndb/include/mgmcommon/IPCConfig.hpp	2008-11-11 07:25:25 +0000
+++ b/storage/ndb/include/mgmcommon/IPCConfig.hpp	2008-11-12 08:17:14 +0000
@@ -18,10 +18,19 @@
 
 struct IPCConfig
 {
-  /* Returns the number of transporters configured */
-  static Uint32 configureTransporters(Uint32 nodeId,
-				      const struct ndb_mgm_configuration &,
-				      class TransporterRegistry &);
+  /*
+    configure_transporters
+
+    Create and configure transporters in TransporterRegistry
+
+    Returns:
+      true  - sucessfully created and (re)configured transporters
+      false - at least one transporter could not be created
+              or (re)configured
+  */
+  static bool configureTransporters(Uint32 nodeId,
+                                    const struct ndb_mgm_configuration &,
+                                    class TransporterRegistry &);
 };
 
 #endif // IPCConfig_H

=== modified file 'storage/ndb/include/transporter/TransporterDefinitions.hpp'
--- a/storage/ndb/include/transporter/TransporterDefinitions.hpp	2008-11-11 07:26:22 +0000
+++ b/storage/ndb/include/transporter/TransporterDefinitions.hpp	2008-11-12 08:17:14 +0000
@@ -40,6 +40,12 @@ enum SendStatus { 
   SEND_UNKNOWN_NODE = 5
 };
 
+enum TransporterType {
+  tt_TCP_TRANSPORTER = 1,
+  tt_SCI_TRANSPORTER = 2,
+  tt_SHM_TRANSPORTER = 3
+};
+
 /**
  * Maximum message sizes
  * ---------------------
@@ -75,6 +81,7 @@ struct TransporterConfiguration {
   bool checksum;
   bool signalId;
   bool isMgmConnection; // is a mgm connection, requires transforming
+  TransporterType type;
 
   union { // Transporter specific configuration information
 

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2008-11-06 10:17:49 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2008-11-12 08:17:14 +0000
@@ -51,12 +51,6 @@ enum IOState {
   HaltIO     = 3
 };
 
-enum TransporterType {
-  tt_TCP_TRANSPORTER = 1,
-  tt_SCI_TRANSPORTER = 2,
-  tt_SHM_TRANSPORTER = 3
-  // ID 4 was OSE Transporter which has been removed. Don't use ID 4.
-};
 
 static const char *performStateString[] = 
   { "is connected",
@@ -198,17 +192,21 @@ public:
   IOState ioState(NodeId nodeId);
   void setIOState(NodeId nodeId, IOState state);
 
-  /** 
-   * createTransporter
+private:
+
+  bool createTCPTransporter(TransporterConfiguration * config);
+  bool createSCITransporter(TransporterConfiguration * config);
+  bool createSHMTransporter(TransporterConfiguration * config);
+
+public:
+  /**
+   *   configureTransporter
+   *
+   *   Configure a transporter, ie. create new if it
+   *   does not exist otherwise try to reconfigure it
    *
-   * If the config object indicates that the transporter
-   * to be created will act as a server and no server is
-   * started, startServer is called. A transporter of the selected kind
-   * is created and it is put in the transporter arrays.
-   */
-  bool createTCPTransporter(struct TransporterConfiguration * config);
-  bool createSCITransporter(struct TransporterConfiguration * config);
-  bool createSHMTransporter(struct TransporterConfiguration * config);
+   */
+  bool configureTransporter(TransporterConfiguration * config);
 
   /**
    * Allocate send buffer for default send buffer handling.
@@ -324,8 +322,6 @@ public:
   void add_transporter_interface(NodeId remoteNodeId, const char *interf,
 		  		 int s_port);	// signed port. <0 is dynamic
   Transporter* get_transporter(NodeId nodeId);
-  NodeId get_localNodeId() { return localNodeId; };
-
   struct in_addr get_connect_address(NodeId node_id) const;
 protected:
   
@@ -339,7 +335,6 @@ private:
 
   int sendCounter;
   NodeId localNodeId;
-  bool nodeIdSpecified;
   unsigned maxTransporters;
   int nTransporters;
   int nTCPTransporters;
@@ -498,6 +493,9 @@ public:
   bool has_data_to_send(NodeId node);
 
   void reset_send_buffer(NodeId node);
+
+  void print_transporters(const char* where, NdbOut& out = ndbout);
+
 };
 
 inline void

=== modified file 'storage/ndb/src/common/mgmcommon/IPCConfig.cpp'
--- a/storage/ndb/src/common/mgmcommon/IPCConfig.cpp	2008-11-11 07:25:25 +0000
+++ b/storage/ndb/src/common/mgmcommon/IPCConfig.cpp	2008-11-12 08:17:14 +0000
@@ -21,11 +21,12 @@
 #include <mgmapi.h>
 #include <mgmapi_configuration.hpp>
 
-Uint32
+bool
 IPCConfig::configureTransporters(Uint32 nodeId,
-				 const struct ndb_mgm_configuration & config,
-				 class TransporterRegistry & tr){
-  TransporterConfiguration conf;
+                                 const struct ndb_mgm_configuration & config,
+                                 class TransporterRegistry & tr)
+{
+  bool result= true;
 
   DBUG_ENTER("IPCConfig::configureTransporters");
 
@@ -58,9 +59,23 @@ IPCConfig::configureTransporters(Uint32 
     }
   }
 
-  Uint32 noOfTransportersCreated= 0;
+
+  /* Remove transporter to nodes that does not exist anymore */
+  for (int i= 1; i < MAX_NODES; i++)
+  {
+    ndb_mgm_configuration_iterator iter(config, CFG_SECTION_NODE);
+    if (tr.get_transporter(i) && iter.find(CFG_NODE_ID, i))
+    {
+      // Transporter exist in TransporterResgistry but not
+      // in configuration
+      ndbout_c("The connection to node %d could not "
+               "be removed at this time", i);
+      result= false; // Need restart
+    }
+  }
+
+  TransporterConfiguration conf;
   ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
-  
   for(iter.first(); iter.valid(); iter.next()){
     
     bzero(&conf, sizeof(conf));
@@ -146,21 +161,22 @@ IPCConfig::configureTransporters(Uint32 
       if(iter.get(CFG_SHM_KEY, &conf.shm.shmKey)) break;
       if(iter.get(CFG_SHM_BUFFER_MEM, &conf.shm.shmSize)) break;
 
-      Uint32 tmp;
-      if(iter.get(CFG_SHM_SIGNUM, &tmp)) break;
-      conf.shm.signum= tmp;
-
-      if(!tr.createSHMTransporter(&conf)){
-        DBUG_PRINT("error", ("Failed to create SHM Transporter from %d to %d",
+      Uint32 signum;
+      if(iter.get(CFG_SHM_SIGNUM, &signum)) break;
+      conf.shm.signum= signum;
+
+      conf.type = tt_SHM_TRANSPORTER;
+
+      if(!tr.configureTransporter(&conf)){
+        DBUG_PRINT("error", ("Failed to configure SHM Transporter "
+                             "from %d to %d",
 	           conf.localNodeId, conf.remoteNodeId));
-	ndbout << "Failed to create SHM Transporter from: " 
-	       << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
-      } else {
-	noOfTransportersCreated++;
+	ndbout_c("Failed to configure SHM Transporter to node %d",
+                conf.remoteNodeId);
+        result = false;
       }
-      DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, "
+      DBUG_PRINT("info", ("Configured SHM Transporter using shmkey %d, "
 			  "buf size = %d", conf.shm.shmKey, conf.shm.shmSize));
-
       break;
 
     case CONNECTION_TYPE_SCI:
@@ -178,13 +194,16 @@ IPCConfig::configureTransporters(Uint32 
       } else {
         conf.sci.nLocalAdapters = 2;
       }
-     if(!tr.createSCITransporter(&conf)){
-        DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
+      conf.type = tt_SCI_TRANSPORTER;
+      if(!tr.configureTransporter(&conf)){
+        DBUG_PRINT("error", ("Failed to configure SCI Transporter "
+                             "from %d to %d",
 	           conf.localNodeId, conf.remoteNodeId));
-	ndbout << "Failed to create SCI Transporter from: " 
-	       << conf.localNodeId << " to: " << conf.remoteNodeId << endl;
+	ndbout_c("Failed to configure SCI Transporter to node %d",
+                 conf.remoteNodeId);
+        result = false;
       } else {
-        DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, "
+        DBUG_PRINT("info", ("Configured SCI Transporter: Adapters = %d, "
 			    "remote SCI node id %d",
                    conf.sci.nLocalAdapters, conf.sci.remoteSciNodeId0));
         DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, "
@@ -196,8 +215,6 @@ IPCConfig::configureTransporters(Uint32 
 			      "second remote SCI node id = %d",
 			      conf.sci.remoteSciNodeId1)); 
         }
-	noOfTransportersCreated++;
-	continue;
       }
      break;
 
@@ -217,14 +234,15 @@ IPCConfig::configureTransporters(Uint32 
       iter.get(CFG_TCP_RCV_BUF_SIZE, &conf.tcp.tcpRcvBufSize);
       iter.get(CFG_TCP_MAXSEG_SIZE, &conf.tcp.tcpMaxsegSize);
       iter.get(CFG_CONNECTION_OVERLOAD, &conf.tcp.tcpOverloadLimit);
+
+      conf.type = tt_TCP_TRANSPORTER;
       
-      if(!tr.createTCPTransporter(&conf)){
-	ndbout << "Failed to create TCP Transporter from: " 
-	       << nodeId << " to: " << remoteNodeId << endl;
-      } else {
-	noOfTransportersCreated++;
+      if(!tr.configureTransporter(&conf)){
+	ndbout_c("Failed to configure TCP Transporter to node %d",
+                 conf.remoteNodeId);
+        result= false;
       }
-      DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, "
+      DBUG_PRINT("info", ("Configured TCP Transporter: sendBufferSize = %d, "
 			  "maxReceiveSize = %d", conf.tcp.sendBufferSize,
 			  conf.tcp.maxReceiveSize));
       break;
@@ -235,6 +253,6 @@ IPCConfig::configureTransporters(Uint32 
     } // switch
   } // for
 
-  DBUG_RETURN(noOfTransportersCreated);
+  DBUG_RETURN(result);
 }
   

=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.cpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.cpp	2008-11-12 08:17:14 +0000
@@ -99,6 +99,20 @@ SCI_Transporter::SCI_Transporter(Transpo
   DBUG_VOID_RETURN;
 } 
  
+
+bool
+SCI_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+  if (conf->sci.sendLimit == (m_PacketSize + 3)/4 &&
+      conf->sci.bufferSize == m_buffersize &&
+      conf->sci.nLocalAdapters == m_adapters &&
+      conf->sci.remoteSciNodeId0 == m_remoteNodes[0] &&
+      conf->sci.remoteSciNodeId1 == m_remoteNodes[1])
+    return true; // No change
+  return false; // Can't reconfigure
+}
+
+
 void SCI_Transporter::disconnectImpl() 
 { 
   DBUG_ENTER("SCI_Transporter::disconnectImpl");

=== modified file 'storage/ndb/src/common/transporter/SCI_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SCI_Transporter.hpp	2008-05-29 15:06:11 +0000
+++ b/storage/ndb/src/common/transporter/SCI_Transporter.hpp	2008-11-12 08:17:14 +0000
@@ -144,6 +144,9 @@ private: 
    * Destructor. Disconnects the transporter. 
    */ 
 	~SCI_Transporter();    
+
+  virtual bool configure_derived(const TransporterConfiguration* conf);
+
   bool m_mapped; 
   bool m_initLocal; 
   bool m_sciinit; 

=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.cpp	2008-11-11 07:23:27 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.cpp	2008-11-12 08:17:14 +0000
@@ -64,6 +64,18 @@ SHM_Transporter::SHM_Transporter(Transpo
   m_signal_threshold = 4096;
 }
 
+
+bool
+SHM_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+  if ((key_t)conf->shm.shmKey == shmKey &&
+      (int)conf->shm.shmSize == shmSize &&
+      conf->shm.signum == g_ndb_shm_signum)
+    return true; // No change
+  return false; // Can't reconfigure
+}
+
+
 SHM_Transporter::~SHM_Transporter(){
   doDisconnect();
 }

=== modified file 'storage/ndb/src/common/transporter/SHM_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/SHM_Transporter.hpp	2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/SHM_Transporter.hpp	2008-11-12 08:17:14 +0000
@@ -49,6 +49,8 @@ public:
    */
   virtual ~SHM_Transporter();
   
+  virtual bool configure_derived(const TransporterConfiguration* conf);
+
   /**
    * Do initialization
    */

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-11-12 08:17:14 +0000
@@ -69,6 +69,16 @@ setIf(int& ref, Uint32 val, Uint32 def)
     ref = def;
 }
 
+
+static
+Uint32 overload_limit(const TransporterConfiguration* conf)
+{
+  return (conf->tcp.tcpOverloadLimit ?
+          conf->tcp.tcpOverloadLimit :
+          conf->tcp.sendBufferSize*4/5);
+}
+
+
 TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
 				 const TransporterConfiguration* conf)
   :
@@ -99,10 +109,25 @@ TCP_Transporter::TCP_Transporter(Transpo
   setIf(sockOptSndBufSize, conf->tcp.tcpSndBufSize, 71540);
   setIf(sockOptTcpMaxSeg, conf->tcp.tcpMaxsegSize, 0);
 
-  m_overload_limit = conf->tcp.tcpOverloadLimit ?
-    conf->tcp.tcpOverloadLimit : conf->tcp.sendBufferSize*4/5;
+  m_overload_limit = overload_limit(conf);
 }
 
+
+bool
+TCP_Transporter::configure_derived(const TransporterConfiguration* conf)
+{
+  if (conf->tcp.sendBufferSize == m_max_send_buffer &&
+      conf->tcp.maxReceiveSize == maxReceiveSize &&
+      conf->tcp.tcpSndBufSize == sockOptSndBufSize &&
+      conf->tcp.tcpRcvBufSize == sockOptRcvBufSize &&
+      conf->tcp.tcpMaxsegSize == sockOptTcpMaxSeg &&
+      overload_limit(conf) == m_overload_limit)
+    return true; // No change
+ndbout_c("configure_derived, can't reconfigure");
+  return false; // Can't reconfigure
+}
+
+
 TCP_Transporter::~TCP_Transporter() {
   
   // Disconnect

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2008-11-12 08:17:14 +0000
@@ -45,10 +45,12 @@ class TCP_Transporter : public Transport
 private:
   // Initialize member variables
   TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
-  
+
   // Disconnect, delete send buffers and receive buffer
   virtual ~TCP_Transporter();
 
+  virtual bool configure_derived(const TransporterConfiguration* conf);
+
   /**
    * Allocate buffers for sending and receiving
    */

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2008-11-12 08:17:14 +0000
@@ -102,10 +102,29 @@ Transporter::Transporter(TransporterRegi
 }
 
 Transporter::~Transporter(){
-  if (m_socket_client)
-    delete m_socket_client;
+  delete m_socket_client;
 }
 
+
+bool
+Transporter::configure(const TransporterConfiguration* conf)
+{
+  if (configure_derived(conf) &&
+      conf->s_port == m_s_port &&
+      strcmp(conf->remoteHostName, remoteHostName) == 0 &&
+      strcmp(conf->localHostName, localHostName) == 0 &&
+      conf->remoteNodeId == remoteNodeId &&
+      conf->localNodeId == localNodeId &&
+      (conf->serverNodeId == conf->localNodeId) == isServer &&
+      conf->checksum == checksumUsed &&
+      conf->signalId == signalIdUsed &&
+      conf->isMgmConnection == isMgmConnection &&
+      conf->type == m_type)
+    return true; // No change
+  return false; // Can't reconfigure
+}
+
+
 bool
 Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
   // all initial negotiation is done in TransporterRegistry::connect_server

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	2008-10-27 17:56:57 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	2008-11-12 08:17:14 +0000
@@ -116,6 +116,9 @@ protected:
 	      bool signalId,
               Uint32 max_send_buffer);
 
+  virtual bool configure(const TransporterConfiguration* conf);
+  virtual bool configure_derived(const TransporterConfiguration* conf) = 0;
+
   /**
    * Blocking, for max timeOut milli seconds
    *   Returns true if connect succeded

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-11-07 08:52:22 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-11-12 08:17:14 +0000
@@ -76,6 +76,7 @@ TransporterRegistry::TransporterRegistry
 					 unsigned _maxTransporters,
 					 unsigned sizeOfLongSignalMemory) :
   m_mgm_handle(0),
+  localNodeId(0),
   m_transp_count(0),
   m_use_default_send_buffer(use_default_send_buffer),
   m_send_buffers(0), m_page_freelist(0), m_send_buffer_memory(0),
@@ -83,7 +84,6 @@ TransporterRegistry::TransporterRegistry
 {
   DBUG_ENTER("TransporterRegistry::TransporterRegistry");
 
-  nodeIdSpecified = false;
   maxTransporters = _maxTransporters;
   sendCounter = 1;
   
@@ -154,6 +154,19 @@ TransporterRegistry::allocate_send_buffe
   if (!m_use_default_send_buffer)
     return;
 
+  if (total_send_buffer == 0)
+    total_send_buffer = get_total_max_send_buffer();
+
+  if (m_send_buffers)
+  {
+    /* Send buffers already allocated -> resize the buffer pages */
+    assert(m_send_buffer_memory);
+
+    // TODO resize send buffer pages
+
+    return;
+  }
+
   /* Initialize transporter send buffers (initially empty). */
   m_send_buffers = new SendBuffer[maxTransporters];
   for (unsigned i = 0; i < maxTransporters; i++)
@@ -264,11 +277,13 @@ TransporterRegistry::disconnectAll(){
 bool
 TransporterRegistry::init(NodeId nodeId) {
   DBUG_ENTER("TransporterRegistry::init");
-  nodeIdSpecified = true;
+  assert(localNodeId == 0 ||
+         localNodeId == nodeId);
+
   localNodeId = nodeId;
-  
+
   DEBUG("TransporterRegistry started node: " << localNodeId);
-  
+
   DBUG_RETURN(true);
 }
 
@@ -361,20 +376,47 @@ TransporterRegistry::connect_server(NDB_
   DBUG_RETURN(res);
 }
 
+
+bool
+TransporterRegistry::configureTransporter(TransporterConfiguration *config)
+{
+  NodeId remoteNodeId = config->remoteNodeId;
+
+  assert(localNodeId);
+  assert(config->localNodeId == localNodeId);
+
+  if (remoteNodeId >= maxTransporters)
+    return false;
+
+  Transporter* t = theTransporters[remoteNodeId];
+  if(t != NULL)
+  {
+    // Transporter already exist, try to reconfigure it
+    return t->configure(config);
+  }
+
+  DEBUG("Configuring transporter from " << localNodeId
+	<< " to " << remoteNodeId);
+
+  switch (config->type){
+  case tt_TCP_TRANSPORTER:
+    return createTCPTransporter(config);
+  case tt_SHM_TRANSPORTER:
+    return createSHMTransporter(config);
+  case tt_SCI_TRANSPORTER:
+    return createSCITransporter(config);
+  default:
+    abort();
+    break;
+  }
+  return false;
+}
+
+
 bool
 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
 #ifdef NDB_TCP_TRANSPORTER
 
-  if(!nodeIdSpecified){
-    init(config->localNodeId);
-  }
-  
-  if(config->localNodeId != localNodeId) 
-    return false;
-  
-  if(theTransporters[config->remoteNodeId] != NULL)
-    return false;
-   
   TCP_Transporter * t = new TCP_Transporter(*this, config);
 
   if (t == NULL) 
@@ -405,17 +447,7 @@ TransporterRegistry::createSCITransporte
 
   if(!SCI_Transporter::initSCI())
     abort();
-  
-  if(!nodeIdSpecified){
-    init(config->localNodeId);
-  }
-  
-  if(config->localNodeId != localNodeId)
-    return false;
- 
-  if(theTransporters[config->remoteNodeId] != NULL)
-    return false;
- 
+
   SCI_Transporter * t = new SCI_Transporter(*this,
                                             config->localHostName,
                                             config->remoteHostName,
@@ -457,13 +489,7 @@ bool
 TransporterRegistry::createSHMTransporter(TransporterConfiguration *config) {
   DBUG_ENTER("TransporterRegistry::createTransporter SHM");
 #ifdef NDB_SHM_TRANSPORTER
-  if(!nodeIdSpecified){
-    init(config->localNodeId);
-  }
-  
-  if(config->localNodeId != localNodeId)
-    return false;
-  
+
   if (!g_ndb_shm_signum) {
     g_ndb_shm_signum= config->shm.signum;
     DBUG_PRINT("info",("Block signum %d",g_ndb_shm_signum));
@@ -476,9 +502,6 @@ TransporterRegistry::createSHMTransporte
 
   if(config->shm.signum != g_ndb_shm_signum)
     return false;
-  
-  if(theTransporters[config->remoteNodeId] != NULL)
-    return false;
 
   SHM_Transporter * t = new SHM_Transporter(*this,
 					    config->localHostName,
@@ -1270,8 +1293,12 @@ TransporterRegistry::ioState(NodeId node
 
 void
 TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
+  if (ioStates[nodeId] == state)
+    return;
+
   DEBUG("TransporterRegistry::setIOState("
-	<< nodeId << ", " << state << ")");
+        << nodeId << ", " << state << ")");
+
   ioStates[nodeId] = state;
 }
 
@@ -1609,9 +1636,10 @@ bool
 TransporterRegistry::start_service(SocketServer& socket_server)
 {
   DBUG_ENTER("TransporterRegistry::start_service");
-  if (m_transporter_interface.size() > 0 && !nodeIdSpecified)
+  if (m_transporter_interface.size() > 0 &&
+      localNodeId == 0)
   {
-    g_eventLogger->error("TransporterRegistry::startReceiving: localNodeId not specified");
+    g_eventLogger->error("INTERNAL ERROR: not initialized");
     DBUG_RETURN(false);
   }
 
@@ -1723,6 +1751,7 @@ NdbOut & operator <<(NdbOut & out, Signa
 
 Transporter*
 TransporterRegistry::get_transporter(NodeId nodeId) {
+  assert(nodeId < maxTransporters);
   return theTransporters[nodeId];
 }
 
@@ -1772,7 +1801,7 @@ NDB_SOCKET_TYPE TransporterRegistry::con
   for(unsigned int i=0;i < m_transporter_interface.size();i++)
     if (m_transporter_interface[i].m_s_service_port < 0
 	&& ndb_mgm_set_connection_int_parameter(*h,
-				   get_localNodeId(),
+				   localNodeId,
 				   m_transporter_interface[i].m_remote_nodeId,
 				   CFG_CONNECTION_SERVER_PORT,
 				   m_transporter_interface[i].m_s_service_port,
@@ -2078,4 +2107,35 @@ TransporterRegistry::forceSend(NodeId no
     return false;
 }
 
+
+void
+TransporterRegistry::print_transporters(const char* where, NdbOut& out)
+{
+  out << where << " >>" << endl;
+
+  for(unsigned i = 0; i < maxTransporters; i++){
+    if(theTransporters[i] == NULL)
+      continue;
+
+    const NodeId remoteNodeId = theTransporters[i]->getRemoteNodeId();
+
+    out << i << " "
+        << getPerformStateString(remoteNodeId) << " to node: "
+        << remoteNodeId << " at "
+        << inet_ntoa(get_connect_address(remoteNodeId)) << endl;
+  }
+
+  out << "<<" << endl;
+
+  for (size_t i= 0; i < m_transporter_interface.size(); i++){
+    Transporter_interface tf= m_transporter_interface[i];
+
+    out << i
+        << " remote node: " << tf.m_remote_nodeId
+        << " port: " << tf.m_s_service_port
+        << " interface: " << tf.m_interface << endl;
+  }
+}
+
+
 template class Vector<TransporterRegistry::Transporter_interface>;

=== modified file 'storage/ndb/src/kernel/vm/Configuration.cpp'
--- a/storage/ndb/src/kernel/vm/Configuration.cpp	2008-11-06 16:29:57 +0000
+++ b/storage/ndb/src/kernel/vm/Configuration.cpp	2008-11-12 08:17:14 +0000
@@ -409,14 +409,20 @@ Configuration::setupConfiguration(){
   /**
    * Configure transporters
    */
-  {  
-    int res = IPCConfig::configureTransporters(globalData.ownId,
-					       * p, 
-					       globalTransporterRegistry);
-    if(res <= 0){
-      ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched", 
-		"No transporters configured");
-    }
+  if (!globalTransporterRegistry.init(globalData.ownId))
+  {
+    ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+              "Invalid configuration fetched",
+              "Could not init transporter registry");
+  }
+
+  if (!IPCConfig::configureTransporters(globalData.ownId,
+                                        * p,
+                                        globalTransporterRegistry))
+  {
+    ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
+              "Invalid configuration fetched",
+              "Could not configure transporters");
   }
 
   /**
@@ -434,11 +440,7 @@ Configuration::setupConfiguration(){
   }
 
   Uint32 total_send_buffer = 0;
-  if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
-     total_send_buffer == 0)
-  {
-    total_send_buffer = globalTransporterRegistry.get_total_max_send_buffer();
-  }
+  iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
   globalTransporterRegistry.allocate_send_buffers(total_send_buffer);
   
   if(iter.get(CFG_DB_NO_SAVE_MSGS, &_maxErrorLogs)){

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2008-11-06 16:39:26 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2008-11-12 08:17:14 +0000
@@ -265,6 +265,7 @@ MgmtSrvr::MgmtSrvr(const MgmtOpts& opts,
   m_local_config(NULL),
   _ownReference(0),
   m_config_manager(NULL),
+  m_need_restart(false),
   theFacade(NULL),
   _isStopThread(false),
   _logLevelThreadSleep(500),
@@ -743,7 +744,16 @@ MgmtSrvr::config_changed(NodeId node_id,
 
   setClusterLog(m_local_config);
 
-  // TODO Magnus, Reload ClusterMgr::theNodes
+  if (theFacade)
+  {
+    if (!theFacade->configure(_ownNodeId,
+                              m_local_config->m_configValues))
+    {
+      g_eventLogger->warning("Could not reconfigure everything online, "
+                             "this node need a restart");
+      m_need_restart= true;
+    }
+  }
 
   DBUG_VOID_RETURN;
 }

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2008-11-06 10:56:21 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2008-11-12 08:17:14 +0000
@@ -484,6 +484,8 @@ private:
 
   class ConfigManager* m_config_manager;
 
+  bool m_need_restart;
+
   NodeBitmask m_reserved_nodes;
   struct in_addr m_connect_address[MAX_NODES];
 

=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp	2008-11-06 12:00:21 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp	2008-11-12 08:17:14 +0000
@@ -79,7 +79,8 @@ ClusterMgr::~ClusterMgr()
 }
 
 void
-ClusterMgr::init(ndb_mgm_configuration_iterator & iter){
+ClusterMgr::configure(const ndb_mgm_configuration* config){
+  ndb_mgm_configuration_iterator iter(* config, CFG_SECTION_NODE);
   for(iter.first(); iter.valid(); iter.next()){
     Uint32 nodeId = 0;
     if(iter.get(CFG_NODE_ID, &nodeId))
@@ -109,6 +110,15 @@ ClusterMgr::init(ndb_mgm_configuration_i
     }
   }
 
+  /* Mark all non existing nodes as not defined */
+  for(Uint32 i = 0; i<MAX_NODES; i++) {
+    if (iter.first())
+      continue;
+
+    if (iter.find(CFG_NODE_ID, i))
+      theNodes[i]= Node();
+  }
+
   /* Init own node info */
   Node &node= theNodes[theFacade.ownId()];
   assert(node.defined);

=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.hpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.hpp	2008-11-06 17:34:29 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp	2008-11-12 08:17:14 +0000
@@ -38,7 +38,7 @@ class ClusterMgr {
 public:
   ClusterMgr(class TransporterFacade &);
   ~ClusterMgr();
-  void init(struct ndb_mgm_configuration_iterator & config);
+  void configure(const ndb_mgm_configuration* config);
   
   void reportConnected(NodeId nodeId);
   void reportDisconnected(NodeId nodeId);

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2008-11-06 13:52:28 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2008-11-12 08:17:14 +0000
@@ -420,13 +420,43 @@ copy(Uint32 * & insertPtr, 
  */
 
 int
-TransporterFacade::start_instance(int nodeId, 
-				  const ndb_mgm_configuration* props)
+TransporterFacade::start_instance(NodeId nodeId,
+                                  const ndb_mgm_configuration* conf)
 {
-  if (! init(nodeId, props)) {
+  assert(theOwnId == 0);
+  theOwnId = nodeId;
+
+  theTransporterRegistry = new TransporterRegistry(this);
+  if (theTransporterRegistry == NULL)
     return -1;
-  }
-  
+
+  if (!theTransporterRegistry->init(nodeId))
+    return -1;
+
+  theClusterMgr = new ClusterMgr(*this);
+  if (theClusterMgr == NULL)
+    return -1;
+
+  if (!configure(nodeId, conf))
+    return -1;
+
+  if (!theTransporterRegistry->start_service(m_socket_server))
+    return -1;
+
+  theReceiveThread = NdbThread_Create(runReceiveResponse_C,
+                                      (void**)this,
+                                      32768,
+                                      "ndb_receive",
+                                      NDB_THREAD_PRIO_LOW);
+
+  theSendThread = NdbThread_Create(runSendRequest_C,
+                                   (void**)this,
+                                   32768,
+                                   "ndb_send",
+                                   NDB_THREAD_PRIO_LOW);
+
+  theClusterMgr->startThread();
+
   /**
    * Install signal handler for SIGPIPE
    *
@@ -679,82 +709,89 @@ void TransporterFacade::init_cond_wait_q
 
 TransporterFacade::TransporterFacade(GlobalDictCache *cache) :
   theTransporterRegistry(0),
+  theOwnId(0),
+  theStartNodeId(1),
+  theClusterMgr(NULL),
+  theArbitMgr(NULL),
+  checkCounter(4),
+  currentSendLimit(1),
+  m_scan_batch_size(MAX_SCAN_BATCH_SIZE),
+  m_batch_byte_size(SCAN_BATCH_SIZE),
+  m_batch_size(DEF_BATCH_SIZE),
   theStopReceive(0),
   theSendThread(NULL),
   theReceiveThread(NULL),
+  m_max_trans_id(0),
   m_fragmented_signal_id(0),
   m_globalDictCache(cache)
 {
   DBUG_ENTER("TransporterFacade::TransporterFacade");
   init_cond_wait_queue();
   poll_owner = NULL;
-  theOwnId = 0;
   theMutexPtr = NdbMutex_Create();
   sendPerformedLastInterval = 0;
 
-  checkCounter = 4;
-  currentSendLimit = 1;
-  theClusterMgr = NULL;
-  theArbitMgr = NULL;
-  theStartNodeId = 1;
-  m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
-  m_batch_byte_size= SCAN_BATCH_SIZE;
-  m_batch_size= DEF_BATCH_SIZE;
-  m_max_trans_id = 0;
-
   for (int i = 0; i < NO_API_FIXED_BLOCKS; i++)
     m_fixed2dynamic[i]= RNIL;
 
-  theClusterMgr = new ClusterMgr(* this);
-
 #ifdef API_TRACE
   apiSignalLog = 0;
 #endif
   DBUG_VOID_RETURN;
 }
 
+
 bool
-TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
+TransporterFacade::configure(NodeId nodeId,
+                             const ndb_mgm_configuration* conf)
 {
-  DBUG_ENTER("TransporterFacade::init");
-
-  theOwnId = nodeId;
-  theTransporterRegistry = new TransporterRegistry(this);
+  DBUG_ENTER("TransporterFacade::configure");
 
-  const int res = IPCConfig::configureTransporters(nodeId, 
-						   * props, 
-						   * theTransporterRegistry);
-  if(res <= 0){
-    TRP_DEBUG( "configureTransporters returned 0 or less" );
+  assert(theOwnId == nodeId);
+  assert(theTransporterRegistry);
+  assert(theClusterMgr);
+
+  // Configure transporters
+  if (!IPCConfig::configureTransporters(nodeId,
+                                        * conf,
+                                        * theTransporterRegistry))
     DBUG_RETURN(false);
-  }
-  
-  ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
-  iter.first();
-  theClusterMgr->init(iter);
-  
-  iter.first();
-  if(iter.find(CFG_NODE_ID, nodeId)){
-    TRP_DEBUG( "Node info missing from config." );
+
+  // Configure cluster manager
+  theClusterMgr->configure(conf);
+
+  ndb_mgm_configuration_iterator iter(* conf, CFG_SECTION_NODE);
+  if(iter.find(CFG_NODE_ID, nodeId))
     DBUG_RETURN(false);
-  }
-  
+
+  // Configure send buffers
   Uint32 total_send_buffer = 0;
-  if(iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer) ||
-     total_send_buffer == 0)
-  {
-    total_send_buffer = theTransporterRegistry->get_total_max_send_buffer();
-  }
+  iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
   theTransporterRegistry->allocate_send_buffers(total_send_buffer);
 
+  // Configure arbitrator
   Uint32 rank = 0;
-  if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
-    theArbitMgr = new ArbitMgr(* this);
+  iter.get(CFG_NODE_ARBIT_RANK, &rank);
+  if (rank > 0)
+  {
+    // The arbitrator should be active
+    if (!theArbitMgr)
+      theArbitMgr = new ArbitMgr(* this);
     theArbitMgr->setRank(rank);
+
     Uint32 delay = 0;
     iter.get(CFG_NODE_ARBIT_DELAY, &delay);
     theArbitMgr->setDelay(delay);
   }
+  else if (theArbitMgr)
+  {
+    // No arbitrator should be started
+    theArbitMgr->doStop(NULL);
+    delete theArbitMgr;
+    theArbitMgr= NULL;
+  }
+
+  // Configure scan settings
   Uint32 scan_batch_size= 0;
   if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
     m_scan_batch_size= scan_batch_size;
@@ -767,9 +804,9 @@ TransporterFacade::init(Uint32 nodeId, c
   if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
     m_batch_size= batch_size;
   }
-  
+
+  // Configure timeouts
   Uint32 timeout = 120000;
-  iter.first();
   for (iter.first(); iter.valid(); iter.next())
   {
     Uint32 tmp1 = 0, tmp2 = 0;
@@ -781,24 +818,6 @@ TransporterFacade::init(Uint32 nodeId, c
   }
   m_waitfor_timeout = timeout;
   
-  if (!theTransporterRegistry->start_service(m_socket_server)){
-    ndbout_c("Unable to start theTransporterRegistry->start_service");
-    DBUG_RETURN(false);
-  }
-
-  theReceiveThread = NdbThread_Create(runReceiveResponse_C,
-                                      (void**)this,
-                                      32768,
-                                      "ndb_receive",
-                                      NDB_THREAD_PRIO_LOW);
-
-  theSendThread = NdbThread_Create(runSendRequest_C,
-                                   (void**)this,
-                                   32768,
-                                   "ndb_send",
-                                   NDB_THREAD_PRIO_LOW);
-  theClusterMgr->startThread();
-  
 #ifdef API_TRACE
   signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
 #endif

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	2008-11-04 08:43:06 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	2008-11-12 08:17:14 +0000
@@ -53,11 +53,16 @@ public:
   STATIC_CONST( MAX_NO_THREADS = 4711 );
   TransporterFacade(GlobalDictCache *cache);
   virtual ~TransporterFacade();
-  bool init(Uint32, const ndb_mgm_configuration *);
 
-  int start_instance(int, const ndb_mgm_configuration*);
+  int start_instance(NodeId, const ndb_mgm_configuration*);
   void stop_instance();
-  
+
+  /*
+    (Re)configure the TransporterFacade
+    to a specific configuration
+  */
+  bool configure(NodeId, const ndb_mgm_configuration *);
+
   /**
    * Register this block for sending/receiving signals
    * @blockNo block number to use, -1 => any blockNumber
@@ -222,8 +227,7 @@ private:
   TransporterRegistry* theTransporterRegistry;
   SocketServer m_socket_server;
   int sendPerformedLastInterval;
-  int theOwnId;
-
+  NodeId theOwnId;
   NodeId theStartNodeId;
 
   ClusterMgr* theClusterMgr;

=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2008-09-09 12:04:17 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp	2008-11-12 08:17:14 +0000
@@ -711,7 +711,9 @@ int Ndb_cluster_connection::connect(int 
     if(props == 0)
       break;
 
-    m_impl.m_transporter_facade->start_instance(nodeId, props);
+    if (m_impl.m_transporter_facade->start_instance(nodeId, props) < 0)
+      DBUG_RETURN(-1);
+
     if (m_impl.init_nodes_vector(nodeId, *props))
     {
       ndbout_c("Ndb_cluster_connection::connect: malloc failure");

Thread
bzr commit into mysql-5.1 branch (msvensson:3061) WL#4350Magnus Svensson12 Nov