#At file:///home/msvensson/mysql/6.4/ based on revid:frazer@stripped
3200 Magnus Svensson 2009-01-09 [merge]
Merge
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/include/util/SocketServer.hpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/common/util/SocketServer.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
storage/ndb/src/mgmsrv/Services.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/test/include/NdbMgmd.hpp
storage/ndb/test/ndbapi/testMgm.cpp
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-11-12 08:26:18 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2009-01-08 15:32:09 +0000
@@ -111,9 +111,12 @@ public:
bool init(NodeId localNodeId);
/**
- * after a connect from client, perform connection using correct transporter
- */
- bool connect_server(NDB_SOCKET_TYPE sockfd);
+ Handle the handshaking with a new client connection
+ on the server port.
+ NOTE! Connection should be closed if function
+ returns false
+ */
+ bool connect_server(NDB_SOCKET_TYPE sockfd) const;
bool connect_client(NdbMgmHandle *h);
=== modified file 'storage/ndb/include/util/SocketServer.hpp'
--- a/storage/ndb/include/util/SocketServer.hpp 2008-11-06 10:17:49 +0000
+++ b/storage/ndb/include/util/SocketServer.hpp 2009-01-08 15:32:09 +0000
@@ -42,21 +42,20 @@ public:
friend void* sessionThread_C(void*);
Session(NDB_SOCKET_TYPE sock) :
m_stop(false),
- m_stopped(false),
m_socket(sock),
- m_refCount(0)
+ m_refCount(0),
+ m_thread_stopped(false)
{
DBUG_ENTER("SocketServer::Session");
DBUG_PRINT("enter",("NDB_SOCKET: " MY_SOCKET_FORMAT,
MY_SOCKET_FORMAT_VALUE(m_socket)));
DBUG_VOID_RETURN;
}
-
bool m_stop; // Has the session been ordered to stop?
- bool m_stopped; // Has the session stopped?
-
NDB_SOCKET_TYPE m_socket;
unsigned m_refCount;
+ private:
+ bool m_thread_stopped; // Has the session thread stopped?
};
/**
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2008-11-12 08:17:14 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2009-01-08 15:32:09 +0000
@@ -148,12 +148,14 @@ Transporter::connect_server(NDB_SOCKET_T
DBUG_RETURN(res);
}
+
bool
Transporter::connect_client() {
NDB_SOCKET_TYPE sockfd;
+ DBUG_ENTER("Transporter::connect_client");
if(m_connected)
- return true;
+ DBUG_RETURN(true);
if(isMgmConnection)
{
@@ -162,55 +164,70 @@ Transporter::connect_client() {
else
{
if (!m_socket_client->init())
- {
- return false;
- }
+ DBUG_RETURN(false);
+
if (pre_connect_options(m_socket_client->m_sockfd) != 0)
- {
- return false;
- }
+ DBUG_RETURN(false);
+
if (strlen(localHostName) > 0)
{
if (m_socket_client->bind(localHostName, 0) != 0)
- return false;
+ DBUG_RETURN(false);
}
sockfd= m_socket_client->connect();
}
- return connect_client(sockfd);
+ DBUG_RETURN(connect_client(sockfd));
}
+
bool
Transporter::connect_client(NDB_SOCKET_TYPE sockfd) {
+ DBUG_ENTER("Transporter::connect_client(sockfd)");
+
if(m_connected)
- return true;
+ {
+ DBUG_PRINT("error", ("Already connected"));
+ DBUG_RETURN(true);
+ }
if (!my_socket_valid(sockfd))
- return false;
-
- DBUG_ENTER("Transporter::connect_client");
+ {
+ DBUG_PRINT("error", ("Socket " MY_SOCKET_FORMAT " is not valid",
+ MY_SOCKET_FORMAT_VALUE(sockfd)));
+ DBUG_RETURN(false);
+ }
- DBUG_PRINT("info",("port %d isMgmConnection=%d",m_s_port,isMgmConnection));
+ DBUG_PRINT("info",("server port: %d, isMgmConnection: %d",
+ m_s_port, isMgmConnection));
get_callback_obj()->reset_send_buffer(remoteNodeId);
+ // Send "hello"
+ DBUG_PRINT("info", ("Sending own nodeid: %d and transporter type: %d",
+ localNodeId, m_type));
SocketOutputStream s_output(sockfd);
- SocketInputStream s_input(sockfd);
-
- // send info about own id
- // send info about own transporter type
-
- s_output.println("%d %d", localNodeId, m_type);
- // get remote id
- int nodeId, remote_transporter_type= -1;
+ if (s_output.println("%d %d", localNodeId, m_type) < 0)
+ {
+ DBUG_PRINT("error", ("Send of 'hello' failed"));
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
+ }
+ // Read reply
+ DBUG_PRINT("info", ("Reading reply"));
char buf[256];
- if (s_input.gets(buf, 256) == 0) {
+ SocketInputStream s_input(sockfd);
+ if (s_input.gets(buf, 256) == 0)
+ {
+ DBUG_PRINT("error", ("Failed to read reply"));
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
+ // Parse reply
+ int nodeId, remote_transporter_type= -1;
int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
switch (r) {
case 2:
@@ -220,34 +237,40 @@ Transporter::connect_client(NDB_SOCKET_T
// ok, but with no checks on transporter configuration compatability
break;
default:
+ DBUG_PRINT("error", ("Failed to parse reply"));
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
- DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
+ DBUG_PRINT("info", ("Reply, nodeId: %d, transporter type: %d",
nodeId, remote_transporter_type));
- if (remote_transporter_type != -1)
+ // Check nodeid
+ if (nodeId != remoteNodeId)
{
- if (remote_transporter_type != m_type)
- {
- DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
- m_type, remote_transporter_type));
- NDB_CLOSE_SOCKET(sockfd);
- g_eventLogger->error("Incompatible configuration: transporter type "
- "mismatch with node %d", nodeId);
- DBUG_RETURN(false);
- }
+ g_eventLogger->error("Connected to wrong nodeid: %d, expected: %d",
+ nodeId, remoteNodeId);
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
}
- else if (m_type == tt_SHM_TRANSPORTER)
+
+ // Check transporter type
+ if (remote_transporter_type != -1 &&
+ remote_transporter_type != m_type)
{
- g_eventLogger->warning("Unable to verify transporter compatability with node %d", nodeId);
+ g_eventLogger->error("Connection to node: %d uses different transporter "
+ "type: %d, expected type: %d",
+ nodeId, remote_transporter_type, m_type);
+ NDB_CLOSE_SOCKET(sockfd);
+ DBUG_RETURN(false);
}
+ // Cache the connect address
my_socket_connect_address(sockfd, &m_connect_address);
bool res = connect_client_impl(sockfd);
- if(res){
+ if (res)
+ {
m_connected = true;
m_errorCount = 0;
}
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-12-11 10:40:24 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2009-01-08 15:34:01 +0000
@@ -288,18 +288,20 @@ TransporterRegistry::init(NodeId nodeId)
}
bool
-TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd)
+TransporterRegistry::connect_server(NDB_SOCKET_TYPE sockfd) const
{
- DBUG_ENTER("TransporterRegistry::connect_server");
+ DBUG_ENTER("TransporterRegistry::connect_server(sockfd)");
- // read node id and transporter type from client
- int nodeId, remote_transporter_type= -1;
+ // Read "hello" that consists of node id and transporter
+ // type from client
SocketInputStream s_input(sockfd);
char buf[11+1+11+1]; // <int> <int>
if (s_input.gets(buf, sizeof(buf)) == 0) {
- DBUG_PRINT("error", ("Could not get node id from client"));
+ DBUG_PRINT("error", ("Failed to read 'hello' from client"));
DBUG_RETURN(false);
}
+
+ int nodeId, remote_transporter_type= -1;
int r= sscanf(buf, "%d %d", &nodeId, &remote_transporter_type);
switch (r) {
case 2:
@@ -309,67 +311,65 @@ TransporterRegistry::connect_server(NDB_
// ok, but with no checks on transporter configuration compatability
break;
default:
- DBUG_PRINT("error", ("Error in node id from client"));
+ DBUG_PRINT("error", ("Failed to parse 'hello' from client, buf: '%.*s'",
+ sizeof(buf), buf));
DBUG_RETURN(false);
}
- DBUG_PRINT("info", ("nodeId=%d remote_transporter_type=%d",
- nodeId,remote_transporter_type));
+ DBUG_PRINT("info", ("Client hello, nodeId: %d transporter type: %d",
+ nodeId, remote_transporter_type));
- //check that nodeid is valid and that there is an allocated transporter
- if ( nodeId < 0 || nodeId >= (int)maxTransporters) {
- DBUG_PRINT("error", ("Node id out of range from client"));
- DBUG_RETURN(false);
- }
- if (theTransporters[nodeId] == 0) {
- DBUG_PRINT("error", ("No transporter for this node id from client"));
- DBUG_RETURN(false);
- }
- //check that the transporter should be connected
- if (performStates[nodeId] != TransporterRegistry::CONNECTING) {
- DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
+ // Check that nodeid is in range before accessing the arrays
+ if (nodeId < 0 ||
+ nodeId >= (int)maxTransporters)
+ {
+ DBUG_PRINT("error", ("Out of range nodeId: %d from client",
+ nodeId));
DBUG_RETURN(false);
}
+ // Check that transporter is allocated
Transporter *t= theTransporters[nodeId];
+ if (t == 0)
+ {
+ DBUG_PRINT("error", ("No transporter available for node id %d", nodeId));
+ DBUG_RETURN(false);
+ }
- // send info about own id (just as response to acknowledge connection)
- // send info on own transporter type
- SocketOutputStream s_output(sockfd);
- s_output.println("%d %d", t->getLocalNodeId(), t->m_type);
-
- if (remote_transporter_type != -1)
+ // Check that the transporter should be connecting
+ if (performStates[nodeId] != TransporterRegistry::CONNECTING)
{
- if (remote_transporter_type != t->m_type)
- {
- DBUG_PRINT("error", ("Transporter types mismatch this=%d remote=%d",
- t->m_type, remote_transporter_type));
- g_eventLogger->error("Incompatible configuration: Transporter type "
- "mismatch with node %d", nodeId);
+ DBUG_PRINT("error", ("Transporter for node id %d in wrong state",
+ nodeId));
+ DBUG_RETURN(false);
+ }
- // wait for socket close for 1 second to let message arrive at client
- {
- fd_set a_set;
- FD_ZERO(&a_set);
- my_FD_SET(sockfd, &a_set);
- struct timeval timeout;
- timeout.tv_sec = 1; timeout.tv_usec = 0;
- select(my_socket_nfds(sockfd,0)+1, &a_set, 0, 0, &timeout);
- }
- DBUG_RETURN(false);
- }
+ // Check transporter type
+ if (remote_transporter_type != -1 &&
+ remote_transporter_type != t->m_type)
+ {
+ g_eventLogger->error("Connection from node: %d uses different transporter "
+ "type: %d, expected type: %d",
+ nodeId, remote_transporter_type, t->m_type);
+ DBUG_RETURN(false);
}
- else if (t->m_type == tt_SHM_TRANSPORTER)
+
+ // Send reply to client
+ SocketOutputStream s_output(sockfd);
+ if (s_output.println("%d %d", t->getLocalNodeId(), t->m_type) < 0)
{
- g_eventLogger->warning("Unable to verify transporter compatability with node %d", nodeId);
+ DBUG_PRINT("error", ("Send of reply failed"));
+ DBUG_RETURN(false);
}
- // setup transporter (transporter responsible for closing sockfd)
+ // Setup transporter (transporter responsible for closing sockfd)
bool res = t->connect_server(sockfd);
if (res && performStates[nodeId] != TransporterRegistry::CONNECTING)
{
+ // Connection suceeded, but not connecting anymore, return
+ // false to close the connection
DBUG_RETURN(false);
}
@@ -1492,7 +1492,11 @@ TransporterRegistry::start_clients_threa
* First, we try to connect (if we have a port number).
*/
if (t->get_s_port())
+ {
+ DBUG_PRINT("info", ("connecting to node %d using port %d",
+ nodeId, t->get_s_port()));
connected= t->connect_client();
+ }
/**
* If dynamic, get the port for connecting from the management server
@@ -1501,11 +1505,18 @@ TransporterRegistry::start_clients_threa
int server_port= 0;
struct ndb_mgm_reply mgm_reply;
+ DBUG_PRINT("info", ("connection to node %d should use "
+ "dynamic port",
+ nodeId));
+
if(!ndb_mgm_is_connected(m_mgm_handle))
ndb_mgm_connect(m_mgm_handle, 0, 0, 0);
-
+
if(ndb_mgm_is_connected(m_mgm_handle))
{
+ DBUG_PRINT("info", ("asking mgmd which port to use for node %d",
+ nodeId));
+
int res=
ndb_mgm_get_connection_int_parameter(m_mgm_handle,
t->getRemoteNodeId(),
@@ -1518,6 +1529,8 @@ TransporterRegistry::start_clients_threa
t->getLocalNodeId(),res));
if( res >= 0 )
{
+ DBUG_PRINT("info", ("got port %d to use for connection to %d",
+ server_port, nodeId));
/**
* Server_port == 0 just means that that a mgmt server
* has not received a new port yet. Keep the old.
@@ -1527,11 +1540,15 @@ TransporterRegistry::start_clients_threa
}
else if(ndb_mgm_is_connected(m_mgm_handle))
{
- g_eventLogger->info("Failed to get dynamic port to connect to: %d", res);
+ DBUG_PRINT("info", ("Failed to get dynamic port, res: %d",
+ res));
+ g_eventLogger->info("Failed to get dynamic port, res: %d",
+ res);
ndb_mgm_disconnect(m_mgm_handle);
}
else
{
+ DBUG_PRINT("info", ("mgmd close connection early"));
g_eventLogger->info
("Management server closed connection early. "
"It is probably being shut down (or has problems). "
@@ -1762,6 +1779,7 @@ TransporterRegistry::get_transporter(Nod
return theTransporters[nodeId];
}
+
bool TransporterRegistry::connect_client(NdbMgmHandle *h)
{
DBUG_ENTER("TransporterRegistry::connect_client(NdbMgmHandle)");
@@ -1788,6 +1806,8 @@ bool TransporterRegistry::connect_client
DBUG_RETURN(res);
}
+
+
/**
* Given a connected NdbMgmHandle, turns it into a transporter
* and returns the socket.
@@ -1796,46 +1816,58 @@ NDB_SOCKET_TYPE TransporterRegistry::con
{
struct ndb_mgm_reply mgm_reply;
my_socket sockfd;
-
my_socket_invalidate(&sockfd);
+ DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(NdbMgmHandle)");
+
if ( h==NULL || *h == NULL )
{
g_eventLogger->error("%s: %d", __FILE__, __LINE__);
- return sockfd;
+ DBUG_RETURN(sockfd);
}
for(unsigned int i=0;i < m_transporter_interface.size();i++)
- if (m_transporter_interface[i].m_s_service_port < 0
- && ndb_mgm_set_connection_int_parameter(*h,
- localNodeId,
+ {
+ if (m_transporter_interface[i].m_s_service_port >= 0)
+ continue;
+
+ DBUG_PRINT("info", ("Setting dynamic port %d for connection from node %d",
+ m_transporter_interface[i].m_s_service_port,
+ m_transporter_interface[i].m_remote_nodeId));
+
+ if (ndb_mgm_set_connection_int_parameter(*h,
+ localNodeId,
m_transporter_interface[i].m_remote_nodeId,
CFG_CONNECTION_SERVER_PORT,
m_transporter_interface[i].m_s_service_port,
&mgm_reply) < 0)
{
+ DBUG_PRINT("error", ("Failed to set dynamic port"));
g_eventLogger->error("Error: %s: %d",
ndb_mgm_get_latest_error_desc(*h),
ndb_mgm_get_latest_error(*h));
g_eventLogger->error("%s: %d", __FILE__, __LINE__);
ndb_mgm_destroy_handle(h);
- return sockfd;
+ DBUG_RETURN(sockfd);
}
+ }
/**
* convert_to_transporter also disposes of the handle (i.e. we don't leak
* memory here.
*/
+ DBUG_PRINT("info", ("Converting handle to transporter"));
sockfd= ndb_mgm_convert_to_transporter(h);
if (!my_socket_valid(sockfd))
{
+ DBUG_PRINT("error", ("Failed to convert to transporter"));
g_eventLogger->error("Error: %s: %d",
ndb_mgm_get_latest_error_desc(*h),
ndb_mgm_get_latest_error(*h));
g_eventLogger->error("%s: %d", __FILE__, __LINE__);
ndb_mgm_destroy_handle(h);
}
- return sockfd;
+ DBUG_RETURN(sockfd);
}
/**
@@ -1848,9 +1880,11 @@ NDB_SOCKET_TYPE TransporterRegistry::con
my_socket s;
my_socket_invalidate(&s);
+ DBUG_ENTER("TransporterRegistry::connect_ndb_mgmd(SocketClient)");
+
if ( h == NULL )
{
- return s;
+ DBUG_RETURN(s);
}
/**
@@ -1864,11 +1898,12 @@ NDB_SOCKET_TYPE TransporterRegistry::con
if(ndb_mgm_connect(h, 0, 0, 0)<0)
{
+ DBUG_PRINT("info", ("connection to mgmd failed"));
ndb_mgm_destroy_handle(&h);
- return s;
+ DBUG_RETURN(s);
}
- return connect_ndb_mgmd(&h);
+ DBUG_RETURN(connect_ndb_mgmd(&h));
}
/**
=== modified file 'storage/ndb/src/common/util/SocketServer.cpp'
--- a/storage/ndb/src/common/util/SocketServer.cpp 2008-11-06 10:17:49 +0000
+++ b/storage/ndb/src/common/util/SocketServer.cpp 2009-01-08 15:32:09 +0000
@@ -323,7 +323,7 @@ SocketServer::checkSessionsImpl()
{
for(int i = m_sessions.size() - 1; i >= 0; i--)
{
- if(m_sessions[i].m_session->m_stopped &&
+ if(m_sessions[i].m_session->m_thread_stopped &&
(m_sessions[i].m_session->m_refCount == 0))
{
if(m_sessions[i].m_thread != 0)
@@ -371,21 +371,16 @@ void*
sessionThread_C(void* _sc){
SocketServer::Session * si = (SocketServer::Session *)_sc;
- /**
- * may have m_stopped set if we're transforming a mgm
- * connection into a transporter connection.
- */
- if(!si->m_stopped)
- {
- if(!si->m_stop){
- si->m_stopped = false;
- si->runSession();
- } else {
- NDB_CLOSE_SOCKET(si->m_socket);
- }
- }
-
- si->m_stopped = true;
+ assert(si->m_thread_stopped == false);
+
+ if(!si->m_stop)
+ si->runSession();
+ else
+ NDB_CLOSE_SOCKET(si->m_socket);
+
+ // Mark the thread as stopped to allow the
+ // session resources to be released
+ si->m_thread_stopped = true;
return 0;
}
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2008-12-19 15:27:02 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2009-01-08 15:41:27 +0000
@@ -3575,22 +3575,27 @@ MgmtSrvr::getConnectionDbParameter(int n
}
-void MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
+bool MgmtSrvr::transporter_connect(NDB_SOCKET_TYPE sockfd)
{
- if (theFacade->get_registry()->connect_server(sockfd))
- {
- /**
- * Force an update_connections() so that the
- * ClusterMgr and TransporterFacade is up to date
- * with the new connection.
- * Important for correct node id reservation handling
- */
- NdbMutex_Lock(theFacade->theMutexPtr);
- theFacade->get_registry()->update_connections();
- NdbMutex_Unlock(theFacade->theMutexPtr);
- }
+ DBUG_ENTER("MgmtSrvr::transporter_connect");
+ TransporterRegistry* tr= theFacade->get_registry();
+ if (!tr->connect_server(sockfd))
+ DBUG_RETURN(false);
+
+ /*
+ Force an update_connections() so that the
+ ClusterMgr and TransporterFacade is up to date
+ with the new connection.
+ Important for correct node id reservation handling
+ */
+ theFacade->lock_mutex();
+ tr->update_connections();
+ theFacade->unlock_mutex();
+
+ DBUG_RETURN(true);
}
+
bool MgmtSrvr::connect_to_self()
{
BaseString buf;
@@ -4057,7 +4062,6 @@ MgmtSrvr::show_variables(NdbOut& out)
}
-
template class MutexVector<NodeId>;
template class MutexVector<Ndb_mgmd_event_service::Event_listener>;
template class Vector<EventSubscribeReq>;
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2008-12-17 15:40:11 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2009-01-08 15:41:27 +0000
@@ -395,7 +395,7 @@ public:
int getConnectionDbParameter(int node1, int node2, int param,
int *value, BaseString& msg);
- void transporter_connect(NDB_SOCKET_TYPE sockfd);
+ bool transporter_connect(NDB_SOCKET_TYPE sockfd);
const char *get_connect_address(Uint32 node_id);
void get_connected_nodes(NodeBitmask &connected_nodes) const;
=== modified file 'storage/ndb/src/mgmsrv/Services.cpp'
--- a/storage/ndb/src/mgmsrv/Services.cpp 2009-01-07 09:55:03 +0000
+++ b/storage/ndb/src/mgmsrv/Services.cpp 2009-01-08 15:41:27 +0000
@@ -321,7 +321,7 @@ extern int g_errorInsert;
#define SLEEP_ERROR_INSERTED(x) if(ERROR_INSERTED(x)){NdbSleep_SecSleep(10);}
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock, Uint64 session_id)
- : SocketServer::Session(sock), m_mgmsrv(mgm)
+ : SocketServer::Session(sock), m_mgmsrv(mgm), m_name("unknown:0")
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
m_input = new SocketInputStream(sock, 30000);
@@ -336,9 +336,10 @@ MgmApiSession::MgmApiSession(class MgmtS
struct sockaddr_in addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr);
- my_getpeername(sock, (struct sockaddr*)&addr, &addrlen);
- m_name.assfmt("%s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
+ if (my_getpeername(sock, (struct sockaddr*)&addr, &addrlen) == 0)
+ m_name.assfmt("%s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
DBUG_PRINT("info", ("new connection from: %s", m_name.c_str()));
+
DBUG_VOID_RETURN;
}
@@ -1702,11 +1703,27 @@ void
MgmApiSession::transporter_connect(Parser_t::Context &ctx,
Properties const &args)
{
- m_mgmsrv.transporter_connect(m_socket);
+ if (!m_mgmsrv.transporter_connect(m_socket))
+ {
+ // Connection not allowed or failed
+ g_eventLogger->warning("Failed to convert connection "
+ "from '%s' to transporter",
+ name());
+
+ // Close the socket to indicate failure to other side
+ }
+ else
+ {
+ /*
+ Conversion to transporter suceeded
+ Stop this session thread and release resources
+ but don't close the socket, it's been taken over
+ by the transporter
+ */
+ my_socket_invalidate(&m_socket); // so nobody closes it
+ }
- m_stop= true;
- m_stopped= true; // force a stop (no closing socket)
- my_socket_invalidate(&m_socket); // so nobody closes it
+ m_stop= true; // Stop the session
}
void
@@ -2086,6 +2103,7 @@ MgmApiSession::show_variables(Parser_t::
}
+
template class MutexVector<int>;
template class Vector<ParserRow<MgmApiSession> const*>;
template class Vector<NDB_SOCKET_TYPE>;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2009-01-07 09:48:44 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2009-01-09 09:34:36 +0000
@@ -751,6 +751,51 @@ TransporterFacade::TransporterFacade(Glo
}
+/* Return true if node with "nodeId" is a MGM node */
+static bool is_mgmd(Uint32 nodeId,
+ const ndb_mgm_configuration * conf)
+{
+ ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_NODE);
+ if (iter.find(CFG_NODE_ID, nodeId))
+ abort();
+ Uint32 type;
+ if(iter.get(CFG_TYPE_OF_SECTION, &type))
+ abort();
+
+ return (type == NODE_TYPE_MGM);
+}
+
+
+bool
+TransporterFacade::do_connect_mgm(NodeId nodeId,
+ const ndb_mgm_configuration* conf)
+{
+ // Allow other MGM nodes to connect
+ DBUG_ENTER("TransporterFacade::do_connect_mgm");
+ ndb_mgm_configuration_iterator iter(*conf, CFG_SECTION_CONNECTION);
+ for(iter.first(); iter.valid(); iter.next())
+ {
+ Uint32 nodeId1, nodeId2, remoteNodeId;
+ if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1) ||
+ iter.get(CFG_CONNECTION_NODE_2, &nodeId2))
+ DBUG_RETURN(false);
+
+ // Skip connections where this node is not involved
+ if (nodeId1 != nodeId && nodeId2 != nodeId)
+ continue;
+
+ // If both sides are MGM, open connection
+ if(is_mgmd(nodeId1, conf) && is_mgmd(nodeId2, conf))
+ {
+ Uint32 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
+ DBUG_PRINT("info", ("opening connection to node %d", remoteNodeId));
+ doConnect(remoteNodeId);
+ }
+ }
+ DBUG_RETURN(true);
+}
+
+
bool
TransporterFacade::configure(NodeId nodeId,
const ndb_mgm_configuration* conf)
@@ -767,6 +812,10 @@ TransporterFacade::configure(NodeId node
* theTransporterRegistry))
DBUG_RETURN(false);
+ // Open connection between MGM servers
+ if (!do_connect_mgm(nodeId, conf))
+ DBUG_RETURN(false);
+
// Configure cluster manager
theClusterMgr->configure(conf);
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2008-11-12 08:17:14 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2009-01-09 09:34:36 +0000
@@ -256,6 +256,8 @@ private:
friend void* runReceiveResponse_C(void*);
friend void atexit_stop_instance();
+ bool do_connect_mgm(NodeId, const ndb_mgm_configuration*);
+
/**
* Block number handling
*/
=== modified file 'storage/ndb/test/include/NdbMgmd.hpp'
--- a/storage/ndb/test/include/NdbMgmd.hpp 2008-12-17 16:09:24 +0000
+++ b/storage/ndb/test/include/NdbMgmd.hpp 2009-01-08 15:32:09 +0000
@@ -70,6 +70,10 @@ public:
return m_handle;
}
+ NDB_SOCKET_TYPE socket(void) const {
+ return _ndb_mgm_get_socket(m_handle);
+ }
+
const char* getConnectString() const {
return m_connect_str.c_str();
}
@@ -125,6 +129,18 @@ public:
return true;
}
+ bool disconnect(void) {
+ if (ndb_mgm_disconnect(m_handle) != 0){
+ error("disconnect: ndb_mgm_disconnect failed");
+ return false;
+ }
+
+ ndb_mgm_destroy_handle(&m_handle);
+ m_handle = NULL;
+
+ return true;
+ }
+
bool restart(bool abort = false) {
if (!is_connected()){
error("restart: not connected");
@@ -159,7 +175,7 @@ public:
return false;
}
- SocketOutputStream out(_ndb_mgm_get_socket(m_handle));
+ SocketOutputStream out(socket());
if (out.println(cmd)){
error("call: println failed at line %d", __LINE__);
@@ -218,7 +234,7 @@ public:
}
BaseString buf;
- SocketInputStream2 in(_ndb_mgm_get_socket(m_handle));
+ SocketInputStream2 in(socket());
if (cmd_reply)
{
// Read the reply header and compare against "cmd_reply"
=== modified file 'storage/ndb/test/ndbapi/testMgm.cpp'
--- a/storage/ndb/test/ndbapi/testMgm.cpp 2008-12-17 16:18:23 +0000
+++ b/storage/ndb/test/ndbapi/testMgm.cpp 2009-01-08 15:32:09 +0000
@@ -1291,6 +1291,92 @@ int runTestGetVersionUntilStopped(NDBT_C
return result;
}
+
+
+static bool
+check_connection(NdbMgmd& mgmd)
+{
+ Properties args, reply;
+ mgmd.verbose(false); // Verbose off
+ bool result= mgmd.call("check connection", args,
+ "check connection reply", reply);
+ mgmd.verbose(); // Verbose on
+ return result;
+}
+
+
+static bool
+check_transporter_connect(NdbMgmd& mgmd, const char * hello)
+{
+ SocketOutputStream out(mgmd.socket());
+
+ // Call 'transporter connect'
+ if (out.println("transporter connect") ||
+ out.println(""))
+ {
+ g_err << "Send failed" << endl;
+ return false;
+ }
+
+ // Send the 'hello'
+ g_info << "Client hello: '" << hello << "'" << endl;
+ if (out.println(hello))
+ {
+ g_err << "Send hello '" << hello << "' failed" << endl;
+ return false;
+ }
+
+ // Should not be possible to read a reply now, socket
+ // should have been closed
+ if (check_connection(mgmd)){
+ g_err << "not disconnected" << endl;
+ return false;
+ }
+
+ // disconnect and connect again
+ if (!mgmd.disconnect())
+ return false;
+ if (!mgmd.connect())
+ return false;
+
+ return true;
+}
+
+
+int runTestTransporterConnect(NDBT_Context* ctx, NDBT_Step* step)
+{
+ NdbMgmd mgmd;
+
+ if (!mgmd.connect())
+ return NDBT_FAILED;
+
+ int result = NDBT_FAILED;
+ if (
+ // Junk hello strings
+ check_transporter_connect(mgmd, "hello") &&
+ check_transporter_connect(mgmd, "hello again") &&
+
+ // "Blow" the buffer
+ check_transporter_connect(mgmd, "string_longer_than_buf_1234567890") &&
+
+ // Out of range nodeid
+ check_transporter_connect(mgmd, "-1") &&
+ check_transporter_connect(mgmd, "-2 2") &&
+ check_transporter_connect(mgmd, "10000") &&
+ check_transporter_connect(mgmd, "99999 8") &&
+
+ // Valid nodeid, invalid transporter type
+ // Valid nodeid and transporter type, state != CONNECTING
+ // ^These are only possible to test by finding an existing
+ // NDB node that are not started and use its setting(s)
+
+ true)
+ result = NDBT_OK;
+
+ return result;
+}
+
+
static bool
show_config(NdbMgmd& mgmd,
const Properties& args,
@@ -1931,6 +2017,10 @@ TESTCASE("TestGetVersion",
"Test 'get version'"){
INITIALIZER(runTestGetVersion);
}
+TESTCASE("TestTransporterConnect",
+ "Test 'transporter connect'"){
+ INITIALIZER(runTestTransporterConnect);
+}
#ifdef NOT_YET
TESTCASE("TestRestartMgmd",
"Test restart of ndb_mgmd(s)"){
| Thread |
|---|
| • bzr commit into mysql-5.1 branch (msvensson:3200) | Magnus Svensson | 9 Jan |