From: Date: August 5 2008 1:59pm Subject: bzr commit into mysql-5.1-telco-6.3 branch (msvensson:2645) Bug#38563 List-Archive: http://lists.mysql.com/commits/50919 X-Bug: 38563 Message-Id: <20080805115910.C82A430E85A@pilot> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit #At file:///home/msvensson/mysql/6.3/ 2645 Magnus Svensson 2008-08-05 [merge] Merge bug#38563 modified: storage/ndb/include/util/OutputStream.hpp storage/ndb/include/util/SocketServer.hpp storage/ndb/src/common/util/OutputStream.cpp storage/ndb/src/common/util/SocketServer.cpp storage/ndb/src/mgmsrv/Services.cpp === modified file 'storage/ndb/include/util/OutputStream.hpp' --- a/storage/ndb/include/util/OutputStream.hpp 2007-03-26 21:32:12 +0000 +++ b/storage/ndb/include/util/OutputStream.hpp 2008-08-05 11:53:52 +0000 @@ -45,6 +45,7 @@ public: }; class SocketOutputStream : public OutputStream { +protected: NDB_SOCKET_TYPE m_socket; unsigned m_timeout_ms; bool m_timedout; @@ -59,6 +60,21 @@ public: int println(const char * fmt, ...); }; + +class BufferedSockOutputStream : public SocketOutputStream { + class UtilBuffer& m_buffer; +public: + BufferedSockOutputStream(NDB_SOCKET_TYPE socket, + unsigned write_timeout_ms = 1000); + virtual ~BufferedSockOutputStream(); + + int print(const char * fmt, ...); + int println(const char * fmt, ...); + + void flush(); +}; + + class NullOutputStream : public OutputStream { public: NullOutputStream() {} === modified file 'storage/ndb/include/util/SocketServer.hpp' --- a/storage/ndb/include/util/SocketServer.hpp 2007-01-27 01:46:45 +0000 +++ b/storage/ndb/include/util/SocketServer.hpp 2008-08-05 11:53:52 +0000 @@ -40,11 +40,14 @@ public: protected: friend class SocketServer; friend void* sessionThread_C(void*); - Session(NDB_SOCKET_TYPE sock): m_socket(sock) + Session(NDB_SOCKET_TYPE sock) : + m_stop(false), + m_stopped(false), + m_socket(sock), + m_refCount(0) { DBUG_ENTER("SocketServer::Session"); DBUG_PRINT("enter",("NDB_SOCKET: %d", m_socket)); - m_stop = m_stopped = false; DBUG_VOID_RETURN; } @@ -52,6 +55,7 @@ public: bool m_stopped; // Has the session stopped? NDB_SOCKET_TYPE m_socket; + unsigned m_refCount; }; /** === modified file 'storage/ndb/src/common/util/OutputStream.cpp' --- a/storage/ndb/src/common/util/OutputStream.cpp 2008-02-22 15:14:27 +0000 +++ b/storage/ndb/src/common/util/OutputStream.cpp 2008-08-05 11:53:52 +0000 @@ -42,10 +42,12 @@ FileOutputStream::println(const char * f } SocketOutputStream::SocketOutputStream(NDB_SOCKET_TYPE socket, - unsigned write_timeout_ms){ - m_socket = socket; - m_timeout_remain= m_timeout_ms = write_timeout_ms; - m_timedout= false; + unsigned write_timeout_ms) : + m_socket(socket), + m_timeout_ms(write_timeout_ms), + m_timedout(false), + m_timeout_remain(write_timeout_ms) +{ } int @@ -92,3 +94,82 @@ SocketOutputStream::println(const char * return ret; } + +#include +#include + +BufferedSockOutputStream::BufferedSockOutputStream(NDB_SOCKET_TYPE socket, + unsigned write_timeout_ms) : + SocketOutputStream(socket, write_timeout_ms), + m_buffer(*new UtilBuffer) +{ +} + +BufferedSockOutputStream::~BufferedSockOutputStream() +{ + delete &m_buffer; +} + +int +BufferedSockOutputStream::print(const char * fmt, ...){ + char buf[1]; + va_list ap; + int len; + char* pos; + + // Find out length of string + va_start(ap, fmt); + len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + // Grow buffer so it can hold the string + if ((pos= (char*)m_buffer.append(len+1)) == 0) + return -1; + + // Print string to buffer + va_start(ap, fmt); + len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap); + va_end(ap); + + return 0; +} + +int +BufferedSockOutputStream::println(const char * fmt, ...){ + char buf[1]; + va_list ap; + int len; + char* pos; + + // Find out length of string + va_start(ap, fmt); + len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap); + va_end(ap); + + // Grow buffer so it can hold the string and the new line + if ((pos= (char*)m_buffer.append(len+1)) == 0) + return -1; + + // Print string to buffer + va_start(ap, fmt); + len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap); + va_end(ap); + + // Add newline + pos+= len; + *pos= '\n'; + + return 0; +} + +void BufferedSockOutputStream::flush(){ + int elapsed; + if (write_socket(m_socket, m_timeout_ms, &elapsed, + (const char*)m_buffer.get_data(), m_buffer.length()) != 0) + { + fprintf(stderr, "Failed to flush buffer to socket, errno: %d\n", errno); + } + + m_buffer.clear(); +} + === modified file 'storage/ndb/src/common/util/SocketServer.cpp' --- a/storage/ndb/src/common/util/SocketServer.cpp 2008-04-08 13:40:43 +0000 +++ b/storage/ndb/src/common/util/SocketServer.cpp 2008-08-05 11:59:01 +0000 @@ -28,17 +28,19 @@ SocketServer::SocketServer(unsigned maxSessions) : m_sessions(10), - m_services(5) + m_services(5), + m_maxSessions(maxSessions), + m_stopThread(false), + m_thread(0) { - m_thread = 0; - m_stopThread = false; - m_maxSessions = maxSessions; } SocketServer::~SocketServer() { unsigned i; for(i = 0; im_refCount == 0); + delete session; } for(i = 0; i= 0; i--){ - (*func)(m_sessions[i].m_session, data); + Vector session_pointers(m_sessions.size()); + for(unsigned i= 0; i < m_sessions.size(); i++){ + Session* session= m_sessions[i].m_session; + session_pointers.push_back(session); + session->m_refCount++; } m_session_mutex.unlock(); + + // Call the function on each session + for(unsigned i= 0; i < session_pointers.size(); i++){ + (*func)(session_pointers[i], data); + } + + // Release the sessions pointers and any stopped sessions + m_session_mutex.lock(); + for(unsigned i= 0; i < session_pointers.size(); i++){ + Session* session= session_pointers[i]; + assert(session->m_refCount > 0); + session->m_refCount--; + } + checkSessionsImpl(); + m_session_mutex.unlock(); } void @@ -297,14 +320,15 @@ SocketServer::checkSessionsImpl() { for(int i = m_sessions.size() - 1; i >= 0; i--) { - if(m_sessions[i].m_session->m_stopped) + if(m_sessions[i].m_session->m_stopped and + m_sessions[i].m_session->m_refCount == 0) { if(m_sessions[i].m_thread != 0) { void* ret; NdbThread_WaitFor(m_sessions[i].m_thread, &ret); NdbThread_Destroy(&m_sessions[i].m_thread); - } + } m_sessions[i].m_session->stopSession(); delete m_sessions[i].m_session; m_sessions.erase(i); @@ -319,7 +343,6 @@ SocketServer::stopSessions(bool wait){ for(i = m_sessions.size() - 1; i>=0; i--) { m_sessions[i].m_session->stopSession(); - m_sessions[i].m_session->m_stop = true; // to make sure } m_session_mutex.unlock(); @@ -365,3 +388,4 @@ sessionThread_C(void* _sc){ template class MutexVector; template class Vector; +template class Vector; === modified file 'storage/ndb/src/mgmsrv/Services.cpp' --- a/storage/ndb/src/mgmsrv/Services.cpp 2008-04-22 20:09:38 +0000 +++ b/storage/ndb/src/mgmsrv/Services.cpp 2008-08-05 11:59:01 +0000 @@ -299,7 +299,7 @@ MgmApiSession::MgmApiSession(class MgmtS { DBUG_ENTER("MgmApiSession::MgmApiSession"); m_input = new SocketInputStream(sock, 30000); - m_output = new SocketOutputStream(sock, 30000); + m_output = new BufferedSockOutputStream(sock, 30000); m_parser = new Parser_t(commands, *m_input, true, true, true); m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv); m_stopSelf= 0; @@ -366,6 +366,10 @@ MgmApiSession::runSession() stop= m_stop; NdbMutex_Unlock(m_mutex); + + // Send output from command to the client + m_output->flush(); + }; NdbMutex_Lock(m_mutex);