From: Date: August 5 2008 1:53pm Subject: bzr commit into mysql-5.1-telco-6.2 branch (msvensson:2637) Bug#38563 List-Archive: http://lists.mysql.com/commits/50918 X-Bug: 38563 Message-Id: <20080805115356.C5E9A30E85A@pilot> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit #At file:///home/msvensson/mysql/6.2/ 2637 Magnus Svensson 2008-08-05 Bug#38563 ndb_mgmd performs network I/O with client(s) while holding mutex modified: storage/ndb/include/util/OutputStream.hpp storage/ndb/include/util/SocketServer.hpp storage/ndb/src/common/util/OutputStream.cpp storage/ndb/src/common/util/SocketServer.cpp storage/ndb/src/mgmsrv/Services.cpp per-file messages: storage/ndb/include/util/OutputStream.hpp Add BufferedSockOutputStream wich will buffer everything written to it until 'flush' is called storage/ndb/include/util/SocketServer.hpp Add m_refcount used to keep track if anyone has copied a session pointer storage/ndb/src/common/util/OutputStream.cpp Add BufferedSockOutputStream wich will buffer everything written to it until 'flush' is called storage/ndb/src/common/util/SocketServer.cpp Change 'foreachSession' to work on a copy of the session list. That way the m_sessionmutex can be released while querying each session about it's status, thus allowing new sessions to be accepted meanwhile. storage/ndb/src/mgmsrv/Services.cpp Use BufferedSockOutputStream when writing to clients, this way the whole result can be formatted and m_mutex can be released before performing network I/O with the client === modified file 'storage/ndb/include/util/OutputStream.hpp' --- a/storage/ndb/include/util/OutputStream.hpp 2007-03-26 21:32:12 +0000 +++ b/storage/ndb/include/util/OutputStream.hpp 2008-08-05 11:53:52 +0000 @@ -45,6 +45,7 @@ public: }; class SocketOutputStream : public OutputStream { +protected: NDB_SOCKET_TYPE m_socket; unsigned m_timeout_ms; bool m_timedout; @@ -59,6 +60,21 @@ public: int println(const char * fmt, ...); }; + +class BufferedSockOutputStream : public SocketOutputStream { + class UtilBuffer& m_buffer; +public: + BufferedSockOutputStream(NDB_SOCKET_TYPE socket, + unsigned write_timeout_ms = 1000); + virtual ~BufferedSockOutputStream(); + + int print(const char * fmt, ...); + int println(const char * fmt, ...); + + void flush(); +}; + + class NullOutputStream : public OutputStream { public: NullOutputStream() {} === modified file 'storage/ndb/include/util/SocketServer.hpp' --- a/storage/ndb/include/util/SocketServer.hpp 2007-01-27 01:46:45 +0000 +++ b/storage/ndb/include/util/SocketServer.hpp 2008-08-05 11:53:52 +0000 @@ -40,11 +40,14 @@ public: protected: friend class SocketServer; friend void* sessionThread_C(void*); - Session(NDB_SOCKET_TYPE sock): m_socket(sock) + Session(NDB_SOCKET_TYPE sock) : + m_stop(false), + m_stopped(false), + m_socket(sock), + m_refCount(0) { DBUG_ENTER("SocketServer::Session"); DBUG_PRINT("enter",("NDB_SOCKET: %d", m_socket)); - m_stop = m_stopped = false; DBUG_VOID_RETURN; } @@ -52,6 +55,7 @@ public: bool m_stopped; // Has the session stopped? NDB_SOCKET_TYPE m_socket; + unsigned m_refCount; }; /** === modified file 'storage/ndb/src/common/util/OutputStream.cpp' --- a/storage/ndb/src/common/util/OutputStream.cpp 2008-02-22 15:14:27 +0000 +++ b/storage/ndb/src/common/util/OutputStream.cpp 2008-08-05 11:53:52 +0000 @@ -42,10 +42,12 @@ FileOutputStream::println(const char * f } SocketOutputStream::SocketOutputStream(NDB_SOCKET_TYPE socket, - unsigned write_timeout_ms){ - m_socket = socket; - m_timeout_remain= m_timeout_ms = write_timeout_ms; - m_timedout= false; + unsigned write_timeout_ms) : + m_socket(socket), + m_timeout_ms(write_timeout_ms), + m_timedout(false), + m_timeout_remain(write_timeout_ms) +{ } int @@ -92,3 +94,82 @@ SocketOutputStream::println(const char * return ret; } + +#include +#include + +BufferedSockOutputStream::BufferedSockOutputStream(NDB_SOCKET_TYPE socket, + unsigned write_timeout_ms) : + SocketOutputStream(socket, write_timeout_ms), + m_buffer(*new UtilBuffer) +{ +} + +BufferedSockOutputStream::~BufferedSockOutputStream() +{ + delete &m_buffer; +} + +int +BufferedSockOutputStream::print(const char * fmt, ...){ + char buf[1]; + va_list ap; + int len; + char* pos; + + // Find out length of string + va_start(ap, fmt); + len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + // Grow buffer so it can hold the string + if ((pos= (char*)m_buffer.append(len+1)) == 0) + return -1; + + // Print string to buffer + va_start(ap, fmt); + len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap); + va_end(ap); + + return 0; +} + +int +BufferedSockOutputStream::println(const char * fmt, ...){ + char buf[1]; + va_list ap; + int len; + char* pos; + + // Find out length of string + va_start(ap, fmt); + len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + // Grow buffer so it can hold the string and the new line + if ((pos= (char*)m_buffer.append(len+1)) == 0) + return -1; + + // Print string to buffer + va_start(ap, fmt); + len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap); + va_end(ap); + + // Add newline + pos+= len; + *pos= '\n'; + + return 0; +} + +void BufferedSockOutputStream::flush(){ + int elapsed; + if (write_socket(m_socket, m_timeout_ms, &elapsed, + (const char*)m_buffer.get_data(), m_buffer.length()) != 0) + { + fprintf(stderr, "Failed to flush buffer to socket, errno: %d\n", errno); + } + + m_buffer.clear(); +} + === modified file 'storage/ndb/src/common/util/SocketServer.cpp' --- a/storage/ndb/src/common/util/SocketServer.cpp 2008-02-26 17:38:43 +0000 +++ b/storage/ndb/src/common/util/SocketServer.cpp 2008-08-05 11:53:52 +0000 @@ -28,17 +28,19 @@ SocketServer::SocketServer(unsigned maxSessions) : m_sessions(10), - m_services(5) + m_services(5), + m_maxSessions(maxSessions), + m_stopThread(false), + m_thread(0) { - m_thread = 0; - m_stopThread = false; - m_maxSessions = maxSessions; } SocketServer::~SocketServer() { unsigned i; for(i = 0; im_refCount == 0); + delete session; } for(i = 0; i= 0; i--){ - (*func)(m_sessions[i].m_session, data); + Vector session_pointers(m_sessions.size()); + for(unsigned i= 0; i < m_sessions.size(); i++){ + Session* session= m_sessions[i].m_session; + session_pointers.push_back(session); + session->m_refCount++; } m_session_mutex.unlock(); + + // Call the function on each session + for(unsigned i= 0; i < session_pointers.size(); i++){ + (*func)(session_pointers[i], data); + } + + // Release the sessions pointers and any stopped sessions + m_session_mutex.lock(); + for(unsigned i= 0; i < session_pointers.size(); i++){ + Session* session= session_pointers[i]; + assert(session->m_refCount > 0); + session->m_refCount--; + } + checkSessionsImpl(); + m_session_mutex.unlock(); } void @@ -286,14 +309,15 @@ SocketServer::checkSessionsImpl() { for(int i = m_sessions.size() - 1; i >= 0; i--) { - if(m_sessions[i].m_session->m_stopped) + if(m_sessions[i].m_session->m_stopped and + m_sessions[i].m_session->m_refCount == 0) { if(m_sessions[i].m_thread != 0) { void* ret; NdbThread_WaitFor(m_sessions[i].m_thread, &ret); NdbThread_Destroy(&m_sessions[i].m_thread); - } + } m_sessions[i].m_session->stopSession(); delete m_sessions[i].m_session; m_sessions.erase(i); @@ -308,7 +332,6 @@ SocketServer::stopSessions(bool wait){ for(i = m_sessions.size() - 1; i>=0; i--) { m_sessions[i].m_session->stopSession(); - m_sessions[i].m_session->m_stop = true; // to make sure } m_session_mutex.unlock(); @@ -354,3 +377,4 @@ sessionThread_C(void* _sc){ template class MutexVector; template class Vector; +template class Vector; === modified file 'storage/ndb/src/mgmsrv/Services.cpp' --- a/storage/ndb/src/mgmsrv/Services.cpp 2008-04-22 19:36:05 +0000 +++ b/storage/ndb/src/mgmsrv/Services.cpp 2008-08-05 11:53:52 +0000 @@ -298,7 +298,7 @@ MgmApiSession::MgmApiSession(class MgmtS { DBUG_ENTER("MgmApiSession::MgmApiSession"); m_input = new SocketInputStream(sock, 30000); - m_output = new SocketOutputStream(sock, 30000); + m_output = new BufferedSockOutputStream(sock, 30000); m_parser = new Parser_t(commands, *m_input, true, true, true); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); m_stopSelf= 0; @@ -365,6 +365,10 @@ MgmApiSession::runSession() stop= m_stop; NdbMutex_Unlock(m_mutex); + + // Send output from command to the client + m_output->flush(); + }; NdbMutex_Lock(m_mutex);