List:Commits« Previous MessageNext Message »
From:Magnus Svensson Date:January 9 2009 10:00am
Subject:bzr commit into mysql-5.1 branch (msvensson:3200)
View as plain text  
#At file:///home/msvensson/mysql/6.4/ based on revid:frazer@stripped

 3200 Magnus Svensson	2009-01-09 [merge]
      Merge
modified:
  storage/ndb/include/transporter/TransporterRegistry.hpp
  storage/ndb/include/util/SocketServer.hpp
  storage/ndb/src/common/transporter/Transporter.cpp
  storage/ndb/src/common/transporter/TransporterRegistry.cpp
  storage/ndb/src/common/util/SocketServer.cpp
  storage/ndb/src/mgmsrv/MgmtSrvr.cpp
  storage/ndb/src/mgmsrv/MgmtSrvr.hpp
  storage/ndb/src/mgmsrv/Services.cpp
  storage/ndb/src/ndbapi/TransporterFacade.cpp
  storage/ndb/src/ndbapi/TransporterFacade.hpp
  storage/ndb/test/include/NdbMgmd.hpp
  storage/ndb/test/ndbapi/testMgm.cpp

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2008-11-12 08:26:18 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2009-01-08 15:32:09 +0000
@@ -111,9 +111,12 @@ public:
   bool init(NodeId localNodeId);
 
   /**
-   * after a connect from client, perform connection using correct transporter
-   */
-  bool connect_server(NDB_SOCKET_TYPE sockfd);
+     Handle the handshaking with a new client connection
+     on the server port.
+     NOTE! Connection should be closed if function
+     returns false
+  */
+  bool connect_server(NDB_SOCKET_TYPE sockfd) const;
 
   bool connect_client(NdbMgmHandle *h);
 

=== modified file 'storage/ndb/include/util/SocketServer.hpp'
--- a/storage/ndb/include/util/SocketServer.hpp	2008-11-06 10:17:49 +0000
+++ b/storage/ndb/include/util/SocketServer.hpp	2009-01-08 15:32:09 +0000
@@ -42,21 +42,20 @@ public:
     friend void* sessionThread_C(void*);
     Session(NDB_SOCKET_TYPE sock) :
       m_stop(false),
-      m_stopped(false),
       m_socket(sock),
-      m_refCount(0)
+      m_refCount(0),
+      m_thread_stopped(false)
       {
 	DBUG_ENTER("SocketServer::Session");
 	DBUG_PRINT("enter",("NDB_SOCKET: " MY_SOCKET_FORMAT,
                             MY_SOCKET_FORMAT_VALUE(m_socket)));
 	DBUG_VOID_RETURN;
       }
-    
     bool m_stop;    // Has the session been ordered to stop?
-    bool m_stopped; // Has the session stopped?
-    
     NDB_SOCKET_TYPE m_socket;
     unsigned m_refCount;
+  private:
+    bool m_thread_stopped; // Has the session thread stopped?
   };
   
   /**

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2008-11-12 08:17:14 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2009-01-08 15:32:09 +0000
@@ -148,12 +148,14 @@ Transporter::connect_server(NDB_SOCKET_T
   DBUG_RETURN(res);
 }
 
+
 bool
 Transporter::connect_client() {
   NDB_SOCKET_TYPE sockfd;
+  DBUG_ENTER("Transporter::connect_client");
 
   if(m_connected)
-    return true;
+    DBUG_RETURN(true);
 
   if(isMgmConnection)
   {
@@ -162,55 +164,70 @@ Transporter::connect_client() {
   else
   {
     if (!m_socket_client->init())
-    {
-      return false;
-    }
+      DBUG_RETURN(false);
+
     if (pre_connect_options(m_socket_client->m_sockfd) != 0)
-    {
-      return false;
-    }
+      DBUG_RETURN(false);
+
     if (strlen(localHostName) > 0)
     {
       if (m_socket_client->bind(localHostName, 0) != 0)
-	return false;
+        DBUG_RETURN(false);
     }
     sockfd= m_socket_client->connect();
   }
 
-  return connect_client(sockfd);
+  DBUG_RETURN(connect_client(sockfd));
 }
 
+
 bool
 Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
 
+  DBUG_ENTER("Transporter::connect_client(sockfd)");
+
   if(m_connected)
-    return true;
+  {
+    DBUG_PRINT("error", ("Already connected"));
+    DBUG_RETURN(true);
+  }
 
   if (!my_socket_valid(sockfd))
-    return false;
-
-  DBUG_ENTER("Transporter::connect_client");
+  {
+    DBUG_PRINT("error", ("Socket " MY_SOCKET_FORMAT " is not valid",
+                         MY_SOCKET_FORMAT_VALUE(sockfd)));
+    DBUG_RETURN(false);
+  }
 
-  DBUG_PRINT("info",("port %d isMgmConnection=%d",m_s_port,isMgmConnection));
+  DBUG_PRINT("info",("server port: %d, isMgmConnection: %d",
+                     m_s_port, isMgmConnection));
 
   get_callback_obj()->reset_send_buffer(remoteNodeId);
 
+  // Send "hello"
+  DBUG_PRINT("info", ("Sending own nodeid: %d and transporter type: %d",
+                      localNodeId, m_type));
   SocketOutputStream s_output(sockfd);
-  SocketInputStream s_input(sockfd);
-
-  // send info about own id
-  // send info about own transporter type
-
-  s_output.println("%d %d", localNodeId, m_type);
-  // get remote id
-  int nodeId, remote_transporter_type= -1;
+  if (s_output.println("%d %d", localNodeId, m_type) < 0)
+  {
+    DBUG_PRINT("error", ("Send of 'hello' failed"));
+    NDB_CLOSE_SOCKET(sockfd);
+    DBUG_RETURN(false);
+  }
 
+  // Read reply
+  DBUG_PRINT("info", ("Reading reply"));
   char buf[256];
-  if (s_input.gets(buf, 256) == 0) {
+  SocketInputStream s_input(sockfd);
+  if (s_input.gets(buf, 256) == 0)
+  {
+    DBUG_PRINT("error", ("Failed to read reply"));
     NDB_CLOSE_SOCKET(sockfd);
     DBUG_RETURN(false);
   }
 
+  // Parse reply
+  int nodeId, remote_transporter_type= -1;
   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
   switch (r) {
   case 2:
@@ -220,34 +237,40 @@ Transporter::connect_client(NDB_SOCKET_T
     // ok, but with no checks on transporter configuration compatability
     break;
   default:
+    DBUG_PRINT("error", ("Failed to parse reply"));
     NDB_CLOSE_SOCKET(sockfd);
     DBUG_RETURN(false);
   }
 
-  DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
+  DBUG_PRINT("info", ("Reply, nodeId: %d, transporter type: %d",
 		      nodeId, remote_transporter_type));
 
-  if (remote_transporter_type != -1)
+  // Check nodeid
+  if (nodeId != remoteNodeId)
   {
-    if (remote_transporter_type != m_type)
-    {
-      DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
-			   m_type, remote_transporter_type));
-      NDB_CLOSE_SOCKET(sockfd);
-      g_eventLogger->error("Incompatible configuration: transporter type "
-                           "mismatch with node %d", nodeId);
-      DBUG_RETURN(false);
-    }
+    g_eventLogger->error("Connected to wrong nodeid: %d, expected: %d",
+                         nodeId, remoteNodeId);
+    NDB_CLOSE_SOCKET(sockfd);
+    DBUG_RETURN(false);
   }
-  else if (m_type == tt_SHM_TRANSPORTER)
+
+  // Check transporter type
+  if (remote_transporter_type != -1 &&
+      remote_transporter_type != m_type)
   {
-    g_eventLogger->warning("Unable to verify transporter compatability with node %d", nodeId);
+    g_eventLogger->error("Connection to node: %d uses different transporter "
+                         "type: %d, expected type: %d",
+                         nodeId, remote_transporter_type, m_type);
+    NDB_CLOSE_SOCKET(sockfd);
+    DBUG_RETURN(false);
   }
 
+  // Cache the connect address
   my_socket_connect_address(sockfd, &m_connect_address);
 
   bool res = connect_client_impl(sockfd);
-  if(res){
+  if (res)
+  {
     m_connected  = true;
     m_errorCount = 0;
   }

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-12-11 10:40:24 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2009-01-08 15:34:01 +0000
@@ -288,18 +288,20 @@ TransporterRegistry::init(NodeId nodeId)
 }
 
 bool
-TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
+TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd) const
 {
-  DBUG_ENTER("TransporterRegistry::connect_server");
+  DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
 
-  // read node id and transporter type from client
-  int nodeId, remote_transporter_type= -1;
+  // Read "hello" that consists of node id and transporter
+  // type from client
   SocketInputStream s_input(sockfd);
   char buf[11+1+11+1]; // <int> <int>
   if (s_input.gets(buf, sizeof(buf)) == 0) {
-    DBUG_PRINT("error", ("Could not get node id from client"));
+    DBUG_PRINT("error", ("Failed to read 'hello' from client"));
     DBUG_RETURN(false);
   }
+
+  int nodeId, remote_transporter_type= -1;
   int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
   switch (r) {
   case 2:
@@ -309,67 +311,65 @@ TransporterRegistry::connect_server(NDB_
     // ok, but with no checks on transporter configuration compatability
     break;
   default:
-    DBUG_PRINT("error", ("Error in node id from client"));
+    DBUG_PRINT("error", ("Failed to parse 'hello' from client, buf: '%.*s'",
+                         sizeof(buf), buf));
     DBUG_RETURN(false);
   }
 
-  DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
-		      nodeId,remote_transporter_type));
+  DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d",
+		      nodeId, remote_transporter_type));
 
-  //check that nodeid is valid and that there is an allocated transporter
-  if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
-    DBUG_PRINT("error", ("Node id out of range from client"));
-    DBUG_RETURN(false);
-  }
-  if (theTransporters[nodeId] == 0) {
-      DBUG_PRINT("error", ("No transporter for this node id from client"));
-      DBUG_RETURN(false);
-  }
 
-  //check that the transporter should be connected
-  if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
-    DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
+  // Check that nodeid is in range before accessing the arrays
+  if (nodeId < 0 ||
+      nodeId >= (int)maxTransporters)
+  {
+    DBUG_PRINT("error", ("Out of range nodeId: %d from client",
+                         nodeId));
     DBUG_RETURN(false);
   }
 
+  // Check that transporter is allocated
   Transporter *t= theTransporters[nodeId];
+  if (t == 0)
+  {
+    DBUG_PRINT("error", ("No transporter available for node id %d", nodeId));
+    DBUG_RETURN(false);
+  }
 
-  // send info about own id (just as response to acknowledge connection)
-  // send info on own transporter type
-  SocketOutputStream s_output(sockfd);
-  s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
-
-  if (remote_transporter_type != -1)
+  // Check that the transporter should be connecting
+  if (performStates[nodeId] != TransporterRegistry::CONNECTING)
   {
-    if (remote_transporter_type != t->m_type)
-    {
-      DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
-			   t->m_type, remote_transporter_type));
-      g_eventLogger->error("Incompatible configuration: Transporter type "
-                           "mismatch with node %d", nodeId);
+    DBUG_PRINT("error", ("Transporter for node id %d in wrong state",
+                         nodeId));
+    DBUG_RETURN(false);
+  }
 
-      // wait for socket close for 1 second to let message arrive at client
-      {
-	fd_set a_set;
-	FD_ZERO(&a_set);
-	my_FD_SET(sockfd, &a_set);
-	struct timeval timeout;
-	timeout.tv_sec  = 1; timeout.tv_usec = 0;
-	select(my_socket_nfds(sockfd,0)+1, &a_set, 0, 0, &timeout);
-      }
-      DBUG_RETURN(false);
-    }
+  // Check transporter type
+  if (remote_transporter_type != -1 &&
+      remote_transporter_type != t->m_type)
+  {
+    g_eventLogger->error("Connection from node: %d uses different transporter "
+                         "type: %d, expected type: %d",
+                         nodeId, remote_transporter_type, t->m_type);
+    DBUG_RETURN(false);
   }
-  else if (t->m_type == tt_SHM_TRANSPORTER)
+
+  // Send reply to client
+  SocketOutputStream s_output(sockfd);
+  if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
   {
-    g_eventLogger->warning("Unable to verify transporter compatability with node %d", nodeId);
+    DBUG_PRINT("error", ("Send of reply failed"));
+    DBUG_RETURN(false);
   }
 
-  // setup transporter (transporter responsible for closing sockfd)
+  // Setup transporter (transporter responsible for closing sockfd)
   bool res = t->connect_server(sockfd);
 
   if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
   {
+    // Connection suceeded, but not connecting anymore, return
+    // false to close the connection
     DBUG_RETURN(false);
   }
 
@@ -1492,7 +1492,11 @@ TransporterRegistry::start_clients_threa
 	   * First, we try to connect (if we have a port number).
 	   */
 	  if (t->get_s_port())
+          {
+            DBUG_PRINT("info", ("connecting to node %d using port %d",
+                                nodeId, t->get_s_port()));
 	    connected= t->connect_client();
+          }
 
 	  /**
 	   * If dynamic, get the port for connecting from the management server
@@ -1501,11 +1505,18 @@ TransporterRegistry::start_clients_threa
 	    int server_port= 0;
 	    struct ndb_mgm_reply mgm_reply;
 
+            DBUG_PRINT("info", ("connection to node %d should use "
+                                "dynamic port",
+                                nodeId));
+
 	    if(!ndb_mgm_is_connected(m_mgm_handle))
 	      ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
-	    
+
 	    if(ndb_mgm_is_connected(m_mgm_handle))
 	    {
+              DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
+                                  nodeId));
+
 	      int res=
 		ndb_mgm_get_connection_int_parameter(m_mgm_handle,
 						     t->getRemoteNodeId(),
@@ -1518,6 +1529,8 @@ TransporterRegistry::start_clients_threa
 				 t->getLocalNodeId(),res));
 	      if( res >= 0 )
 	      {
+                DBUG_PRINT("info", ("got port %d to use for connection to %d",
+                                    server_port, nodeId));
 		/**
 		 * Server_port == 0 just means that that a mgmt server
 		 * has not received a new port yet. Keep the old.
@@ -1527,11 +1540,15 @@ TransporterRegistry::start_clients_threa
 	      }
 	      else if(ndb_mgm_is_connected(m_mgm_handle))
 	      {
-                g_eventLogger->info("Failed to get dynamic port to connect to: %d", res);
+                DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
+                                    res));
+                g_eventLogger->info("Failed to get dynamic port, res: %d",
+                                    res);
 		ndb_mgm_disconnect(m_mgm_handle);
 	      }
 	      else
 	      {
+                DBUG_PRINT("info", ("mgmd close connection early"));
                 g_eventLogger->info
                   ("Management server closed connection early. "
                    "It is probably being shut down (or has problems). "
@@ -1762,6 +1779,7 @@ TransporterRegistry::get_transporter(Nod
   return theTransporters[nodeId];
 }
 
+
 bool TransporterRegistry::connect_client(NdbMgmHandle *h)
 {
   DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
@@ -1788,6 +1806,8 @@ bool TransporterRegistry::connect_client
   DBUG_RETURN(res);
 }
 
+
+
 /**
  * Given a connected NdbMgmHandle, turns it into a transporter
  * and returns the socket.
@@ -1796,46 +1816,58 @@ NDB_SOCKET_TYPE TransporterRegistry::con
 {
   struct ndb_mgm_reply mgm_reply;
   my_socket sockfd;
-
   my_socket_invalidate(&sockfd);
 
+  DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
+
   if ( h==NULL || *h == NULL )
   {
     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
-    return sockfd;
+    DBUG_RETURN(sockfd);
   }
 
   for(unsigned int i=0;i < m_transporter_interface.size();i++)
-    if (m_transporter_interface[i].m_s_service_port < 0
-	&& ndb_mgm_set_connection_int_parameter(*h,
-				   localNodeId,
+  {
+    if (m_transporter_interface[i].m_s_service_port >= 0)
+      continue;
+
+    DBUG_PRINT("info", ("Setting dynamic port %d for connection from node %d",
+                        m_transporter_interface[i].m_s_service_port,
+                        m_transporter_interface[i].m_remote_nodeId));
+
+    if (ndb_mgm_set_connection_int_parameter(*h,
+                                   localNodeId,
 				   m_transporter_interface[i].m_remote_nodeId,
 				   CFG_CONNECTION_SERVER_PORT,
 				   m_transporter_interface[i].m_s_service_port,
 				   &mgm_reply) < 0)
     {
+      DBUG_PRINT("error", ("Failed to set dynamic port"));
       g_eventLogger->error("Error: %s: %d",
                            ndb_mgm_get_latest_error_desc(*h),
                            ndb_mgm_get_latest_error(*h));
       g_eventLogger->error("%s: %d", __FILE__, __LINE__);
       ndb_mgm_destroy_handle(h);
-      return sockfd;
+      DBUG_RETURN(sockfd);
     }
+  }
 
   /**
    * convert_to_transporter also disposes of the handle (i.e. we don't leak
    * memory here.
    */
+  DBUG_PRINT("info", ("Converting handle to transporter"));
   sockfd= ndb_mgm_convert_to_transporter(h);
   if (!my_socket_valid(sockfd))
   {
+    DBUG_PRINT("error", ("Failed to convert to transporter"));
     g_eventLogger->error("Error: %s: %d",
                          ndb_mgm_get_latest_error_desc(*h),
                          ndb_mgm_get_latest_error(*h));
     g_eventLogger->error("%s: %d", __FILE__, __LINE__);
     ndb_mgm_destroy_handle(h);
   }
-  return sockfd;
+  DBUG_RETURN(sockfd);
 }
 
 /**
@@ -1848,9 +1880,11 @@ NDB_SOCKET_TYPE TransporterRegistry::con
   my_socket s;
   my_socket_invalidate(&s);
 
+  DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
+
   if ( h == NULL )
   {
-    return s;
+    DBUG_RETURN(s);
   }
 
   /**
@@ -1864,11 +1898,12 @@ NDB_SOCKET_TYPE TransporterRegistry::con
 
   if(ndb_mgm_connect(h, 0, 0, 0)<0)
   {
+    DBUG_PRINT("info", ("connection to mgmd failed"));
     ndb_mgm_destroy_handle(&h);
-    return s;
+    DBUG_RETURN(s);
   }
 
-  return connect_ndb_mgmd(&h);
+  DBUG_RETURN(connect_ndb_mgmd(&h));
 }
 
 /**

=== modified file 'storage/ndb/src/common/util/SocketServer.cpp'
--- a/storage/ndb/src/common/util/SocketServer.cpp	2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/common/util/SocketServer.cpp	2009-01-08 15:32:09 +0000
@@ -323,7 +323,7 @@ SocketServer::checkSessionsImpl()
 {
   for(int i = m_sessions.size() - 1; i >= 0; i--)
   {
-    if(m_sessions[i].m_session->m_stopped &&
+    if(m_sessions[i].m_session->m_thread_stopped &&
        (m_sessions[i].m_session->m_refCount == 0))
     {
       if(m_sessions[i].m_thread != 0)
@@ -371,21 +371,16 @@ void* 
 sessionThread_C(void* _sc){
   SocketServer::Session * si = (SocketServer::Session *)_sc;
 
-  /**
-   * may have m_stopped set if we're transforming a mgm
-   * connection into a transporter connection.
-   */
-  if(!si->m_stopped)
-  {
-    if(!si->m_stop){
-      si->m_stopped = false;
-      si->runSession();
-    } else {
-      NDB_CLOSE_SOCKET(si->m_socket);
-    }
-  }
-  
-  si->m_stopped = true;
+  assert(si->m_thread_stopped == false);
+
+  if(!si->m_stop)
+    si->runSession();
+  else
+    NDB_CLOSE_SOCKET(si->m_socket);
+
+  // Mark the thread as stopped to allow the
+  // session resources to be released
+  si->m_thread_stopped = true;
   return 0;
 }
 

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2008-12-19 15:27:02 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2009-01-08 15:41:27 +0000
@@ -3575,22 +3575,27 @@ MgmtSrvr::getConnectionDbParameter(int n
 }
 
 
-void MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
+bool MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
 {
-  if (theFacade->get_registry()->connect_server(sockfd))
-  {
-    /**
-     * Force an update_connections() so that the
-     * ClusterMgr and TransporterFacade is up to date
-     * with the new connection.
-     * Important for correct node id reservation handling
-     */
-    NdbMutex_Lock(theFacade->theMutexPtr);
-    theFacade->get_registry()->update_connections();
-    NdbMutex_Unlock(theFacade->theMutexPtr);
-  }
+  DBUG_ENTER("MgmtSrvr::transporter_connect");
+  TransporterRegistry* tr= theFacade->get_registry();
+  if (!tr->connect_server(sockfd))
+    DBUG_RETURN(false);
+
+  /*
+    Force an update_connections() so that the
+    ClusterMgr and TransporterFacade is up to date
+    with the new connection.
+    Important for correct node id reservation handling
+  */
+  theFacade->lock_mutex();
+  tr->update_connections();
+  theFacade->unlock_mutex();
+
+  DBUG_RETURN(true);
 }
 
+
 bool MgmtSrvr::connect_to_self()
 {
   BaseString buf;
@@ -4057,7 +4062,6 @@ MgmtSrvr::show_variables(NdbOut& out)
 }
 
 
-
 template class MutexVector<NodeId>;
 template class MutexVector<Ndb_mgmd_event_service::Event_listener>;
 template class Vector<EventSubscribeReq>;

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2008-12-17 15:40:11 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp	2009-01-08 15:41:27 +0000
@@ -395,7 +395,7 @@ public:
   int getConnectionDbParameter(int node1, int node2, int param,
 			       int *value, BaseString& msg);
 
-  void transporter_connect(NDB_SOCKET_TYPE sockfd);
+  bool transporter_connect(NDB_SOCKET_TYPE sockfd);
 
   const char *get_connect_address(Uint32 node_id);
   void get_connected_nodes(NodeBitmask &connected_nodes) const;

=== modified file 'storage/ndb/src/mgmsrv/Services.cpp'
--- a/storage/ndb/src/mgmsrv/Services.cpp	2009-01-07 09:55:03 +0000
+++ b/storage/ndb/src/mgmsrv/Services.cpp	2009-01-08 15:41:27 +0000
@@ -321,7 +321,7 @@ extern int g_errorInsert;
 #define SLEEP_ERROR_INSERTED(x) if(ERROR_INSERTED(x)){NdbSleep_SecSleep(10);}
 
 MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id)
-  : SocketServer::Session(sock), m_mgmsrv(mgm)
+  : SocketServer::Session(sock), m_mgmsrv(mgm), m_name("unknown:0")
 {
   DBUG_ENTER("MgmApiSession::MgmApiSession");
   m_input = new SocketInputStream(sock, 30000);
@@ -336,9 +336,10 @@ MgmApiSession::MgmApiSession(class MgmtS
 
   struct sockaddr_in addr;
   SOCKET_SIZE_TYPE addrlen= sizeof(addr);
-  my_getpeername(sock, (struct sockaddr*)&addr, &addrlen);
-  m_name.assfmt("%s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
+  if (my_getpeername(sock, (struct sockaddr*)&addr, &addrlen) == 0)
+    m_name.assfmt("%s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
   DBUG_PRINT("info", ("new connection from: %s", m_name.c_str()));
+
   DBUG_VOID_RETURN;
 }
 
@@ -1702,11 +1703,27 @@ void
 MgmApiSession::transporter_connect(Parser_t::Context &ctx,
 				   Properties const &args)
 {
-  m_mgmsrv.transporter_connect(m_socket);
+  if (!m_mgmsrv.transporter_connect(m_socket))
+  {
+    // Connection not allowed or failed
+    g_eventLogger->warning("Failed to convert connection "
+                           "from '%s' to transporter",
+                           name());
+
+    // Close the socket to indicate failure to other side
+  }
+  else
+  {
+    /*
+      Conversion to transporter suceeded
+      Stop this session thread and release resources
+      but don't close the socket, it's been taken over
+      by the transporter
+    */
+    my_socket_invalidate(&m_socket);   // so nobody closes it
+  }
 
-  m_stop= true;
-  m_stopped= true; // force a stop (no closing socket)
-  my_socket_invalidate(&m_socket);   // so nobody closes it
+  m_stop= true; // Stop the session
 }
 
 void
@@ -2086,6 +2103,7 @@ MgmApiSession::show_variables(Parser_t::
 
 }
 
+
 template class MutexVector<int>;
 template class Vector<ParserRow<MgmApiSession> const*>;
 template class Vector<NDB_SOCKET_TYPE>;

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2009-01-07 09:48:44 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2009-01-09 09:34:36 +0000
@@ -751,6 +751,51 @@ TransporterFacade::TransporterFacade(Glo
 }
 
 
+/* Return true if node with "nodeId" is a MGM node */
+static bool is_mgmd(Uint32 nodeId,
+                    const ndb_mgm_configuration * conf)
+{
+  ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_NODE);
+  if (iter.find(CFG_NODE_ID, nodeId))
+    abort();
+  Uint32 type;
+  if(iter.get(CFG_TYPE_OF_SECTION, &type))
+    abort();
+
+  return (type == NODE_TYPE_MGM);
+}
+
+
+bool
+TransporterFacade::do_connect_mgm(NodeId nodeId,
+                                  const ndb_mgm_configuration* conf)
+{
+  // Allow other MGM nodes to connect
+  DBUG_ENTER("TransporterFacade::do_connect_mgm");
+  ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_CONNECTION);
+  for(iter.first(); iter.valid(); iter.next())
+  {
+    Uint32 nodeId1, nodeId2, remoteNodeId;
+    if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1) ||
+        iter.get(CFG_CONNECTION_NODE_2, &nodeId2))
+      DBUG_RETURN(false);
+
+    // Skip connections where this node is not involved
+    if (nodeId1 != nodeId && nodeId2 != nodeId)
+      continue;
+
+    // If both sides are MGM, open connection
+    if(is_mgmd(nodeId1, conf) && is_mgmd(nodeId2, conf))
+    {
+      Uint32 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
+      DBUG_PRINT("info", ("opening connection to node %d", remoteNodeId));
+      doConnect(remoteNodeId);
+    }
+  }
+  DBUG_RETURN(true);
+}
+
+
 bool
 TransporterFacade::configure(NodeId nodeId,
                              const ndb_mgm_configuration* conf)
@@ -767,6 +812,10 @@ TransporterFacade::configure(NodeId node
                                         * theTransporterRegistry))
     DBUG_RETURN(false);
 
+  // Open connection between MGM servers
+  if (!do_connect_mgm(nodeId, conf))
+    DBUG_RETURN(false);
+
   // Configure cluster manager
   theClusterMgr->configure(conf);
 

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp	2008-11-12 08:17:14 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp	2009-01-09 09:34:36 +0000
@@ -256,6 +256,8 @@ private:
   friend void* runReceiveResponse_C(void*);
   friend void atexit_stop_instance();
 
+  bool do_connect_mgm(NodeId, const ndb_mgm_configuration*);
+
   /**
    * Block number handling
    */

=== modified file 'storage/ndb/test/include/NdbMgmd.hpp'
--- a/storage/ndb/test/include/NdbMgmd.hpp	2008-12-17 16:09:24 +0000
+++ b/storage/ndb/test/include/NdbMgmd.hpp	2009-01-08 15:32:09 +0000
@@ -70,6 +70,10 @@ public:
     return m_handle;
   }
 
+  NDB_SOCKET_TYPE socket(void) const {
+    return _ndb_mgm_get_socket(m_handle);
+  }
+
   const char* getConnectString() const {
     return m_connect_str.c_str();
   }
@@ -125,6 +129,18 @@ public:
     return true;
   }
 
+  bool disconnect(void) {
+    if (ndb_mgm_disconnect(m_handle) != 0){
+      error("disconnect: ndb_mgm_disconnect failed");
+      return false;
+    }
+
+    ndb_mgm_destroy_handle(&m_handle);
+    m_handle = NULL;
+
+    return true;
+  }
+
   bool restart(bool abort = false) {
     if (!is_connected()){
       error("restart: not connected");
@@ -159,7 +175,7 @@ public:
       return false;
     }
 
-    SocketOutputStream out(_ndb_mgm_get_socket(m_handle));
+    SocketOutputStream out(socket());
 
     if (out.println(cmd)){
       error("call: println failed at line %d", __LINE__);
@@ -218,7 +234,7 @@ public:
     }
 
     BaseString buf;
-    SocketInputStream2 in(_ndb_mgm_get_socket(m_handle));
+    SocketInputStream2 in(socket());
     if (cmd_reply)
     {
       // Read the reply header and compare against "cmd_reply"

=== modified file 'storage/ndb/test/ndbapi/testMgm.cpp'
--- a/storage/ndb/test/ndbapi/testMgm.cpp	2008-12-17 16:18:23 +0000
+++ b/storage/ndb/test/ndbapi/testMgm.cpp	2009-01-08 15:32:09 +0000
@@ -1291,6 +1291,92 @@ int runTestGetVersionUntilStopped(NDBT_C
   return result;
 }
 
+
+
+static bool
+check_connection(NdbMgmd& mgmd)
+{
+  Properties args, reply;
+  mgmd.verbose(false); // Verbose off
+  bool result= mgmd.call("check connection", args,
+                         "check connection reply", reply);
+  mgmd.verbose(); // Verbose on
+  return result;
+}
+
+
+static bool
+check_transporter_connect(NdbMgmd& mgmd, const char * hello)
+{
+  SocketOutputStream out(mgmd.socket());
+
+  // Call 'transporter connect'
+  if (out.println("transporter connect") ||
+      out.println(""))
+  {
+    g_err << "Send failed" << endl;
+    return false;
+  }
+
+  // Send the 'hello'
+  g_info << "Client hello: '" << hello << "'" << endl;
+  if (out.println(hello))
+  {
+    g_err << "Send hello '" << hello << "' failed" << endl;
+    return false;
+  }
+
+  // Should not be possible to read a reply now, socket
+  // should have been closed
+  if (check_connection(mgmd)){
+    g_err << "not disconnected" << endl;
+    return false;
+  }
+
+  // disconnect and connect again
+  if (!mgmd.disconnect())
+    return false;
+  if (!mgmd.connect())
+    return false;
+
+  return true;
+}
+
+
+int runTestTransporterConnect(NDBT_Context* ctx, NDBT_Step* step)
+{
+  NdbMgmd mgmd;
+
+  if (!mgmd.connect())
+    return NDBT_FAILED;
+
+  int result = NDBT_FAILED;
+  if (
+      // Junk hello strings
+      check_transporter_connect(mgmd, "hello") &&
+      check_transporter_connect(mgmd, "hello again") &&
+
+      // "Blow" the buffer
+      check_transporter_connect(mgmd, "string_longer_than_buf_1234567890") &&
+
+      // Out of range nodeid
+      check_transporter_connect(mgmd, "-1") &&
+      check_transporter_connect(mgmd, "-2 2") &&
+      check_transporter_connect(mgmd, "10000") &&
+      check_transporter_connect(mgmd, "99999 8") &&
+
+      // Valid nodeid, invalid transporter type
+      // Valid nodeid and transporter type, state != CONNECTING
+      // ^These are only possible to test by finding an existing
+      //  NDB node that are not started and use its setting(s)
+
+      true)
+   result = NDBT_OK;
+
+  return result;
+}
+
+
 static bool
 show_config(NdbMgmd& mgmd,
             const Properties& args,
@@ -1931,6 +2017,10 @@ TESTCASE("TestGetVersion",
 	 "Test 'get version'"){
   INITIALIZER(runTestGetVersion);
 }
+TESTCASE("TestTransporterConnect",
+	 "Test 'transporter connect'"){
+  INITIALIZER(runTestTransporterConnect);
+}
 #ifdef NOT_YET
 TESTCASE("TestRestartMgmd",
         "Test restart of ndb_mgmd(s)"){

Thread
bzr commit into mysql-5.1 branch (msvensson:3200) Magnus Svensson9 Jan