#At file:///home/msvensson/mysql/6.3/
2645 Magnus Svensson 2008-08-05 [merge]
Merge bug#38563
modified:
storage/ndb/include/util/OutputStream.hpp
storage/ndb/include/util/SocketServer.hpp
storage/ndb/src/common/util/OutputStream.cpp
storage/ndb/src/common/util/SocketServer.cpp
storage/ndb/src/mgmsrv/Services.cpp
=== modified file 'storage/ndb/include/util/OutputStream.hpp'
--- a/storage/ndb/include/util/OutputStream.hpp 2007-03-26 21:32:12 +0000
+++ b/storage/ndb/include/util/OutputStream.hpp 2008-08-05 11:53:52 +0000
@@ -45,6 +45,7 @@ public:
};
class SocketOutputStream : public OutputStream {
+protected:
NDB_SOCKET_TYPE m_socket;
unsigned m_timeout_ms;
bool m_timedout;
@@ -59,6 +60,21 @@ public:
int println(const char * fmt, ...);
};
+
+class BufferedSockOutputStream : public SocketOutputStream {
+ class UtilBuffer& m_buffer;
+public:
+ BufferedSockOutputStream(NDB_SOCKET_TYPE socket,
+ unsigned write_timeout_ms = 1000);
+ virtual ~BufferedSockOutputStream();
+
+ int print(const char * fmt, ...);
+ int println(const char * fmt, ...);
+
+ void flush();
+};
+
+
class NullOutputStream : public OutputStream {
public:
NullOutputStream() {}
=== modified file 'storage/ndb/include/util/SocketServer.hpp'
--- a/storage/ndb/include/util/SocketServer.hpp 2007-01-27 01:46:45 +0000
+++ b/storage/ndb/include/util/SocketServer.hpp 2008-08-05 11:53:52 +0000
@@ -40,11 +40,14 @@ public:
protected:
friend class SocketServer;
friend void* sessionThread_C(void*);
- Session(NDB_SOCKET_TYPE sock): m_socket(sock)
+ Session(NDB_SOCKET_TYPE sock) :
+ m_stop(false),
+ m_stopped(false),
+ m_socket(sock),
+ m_refCount(0)
{
DBUG_ENTER("SocketServer::Session");
DBUG_PRINT("enter",("NDB_SOCKET: %d", m_socket));
- m_stop = m_stopped = false;
DBUG_VOID_RETURN;
}
@@ -52,6 +55,7 @@ public:
bool m_stopped; // Has the session stopped?
NDB_SOCKET_TYPE m_socket;
+ unsigned m_refCount;
};
/**
=== modified file 'storage/ndb/src/common/util/OutputStream.cpp'
--- a/storage/ndb/src/common/util/OutputStream.cpp 2008-02-22 15:14:27 +0000
+++ b/storage/ndb/src/common/util/OutputStream.cpp 2008-08-05 11:53:52 +0000
@@ -42,10 +42,12 @@ FileOutputStream::println(const char * f
}
SocketOutputStream::SocketOutputStream(NDB_SOCKET_TYPE socket,
- unsigned write_timeout_ms){
- m_socket = socket;
- m_timeout_remain= m_timeout_ms = write_timeout_ms;
- m_timedout= false;
+ unsigned write_timeout_ms) :
+ m_socket(socket),
+ m_timeout_ms(write_timeout_ms),
+ m_timedout(false),
+ m_timeout_remain(write_timeout_ms)
+{
}
int
@@ -92,3 +94,82 @@ SocketOutputStream::println(const char *
return ret;
}
+
+#include <UtilBuffer.hpp>
+#include <BaseString.hpp>
+
+BufferedSockOutputStream::BufferedSockOutputStream(NDB_SOCKET_TYPE socket,
+ unsigned write_timeout_ms) :
+ SocketOutputStream(socket, write_timeout_ms),
+ m_buffer(*new UtilBuffer)
+{
+}
+
+BufferedSockOutputStream::~BufferedSockOutputStream()
+{
+ delete &m_buffer;
+}
+
+int
+BufferedSockOutputStream::print(const char * fmt, ...){
+ char buf[1];
+ va_list ap;
+ int len;
+ char* pos;
+
+ // Find out length of string
+ va_start(ap, fmt);
+ len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+
+ // Grow buffer so it can hold the string
+ if ((pos= (char*)m_buffer.append(len+1)) == 0)
+ return -1;
+
+ // Print string to buffer
+ va_start(ap, fmt);
+ len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap);
+ va_end(ap);
+
+ return 0;
+}
+
+int
+BufferedSockOutputStream::println(const char * fmt, ...){
+ char buf[1];
+ va_list ap;
+ int len;
+ char* pos;
+
+ // Find out length of string
+ va_start(ap, fmt);
+ len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
+ va_end(ap);
+
+ // Grow buffer so it can hold the string and the new line
+ if ((pos= (char*)m_buffer.append(len+1)) == 0)
+ return -1;
+
+ // Print string to buffer
+ va_start(ap, fmt);
+ len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap);
+ va_end(ap);
+
+ // Add newline
+ pos+= len;
+ *pos= '\n';
+
+ return 0;
+}
+
+void BufferedSockOutputStream::flush(){
+ int elapsed;
+ if (write_socket(m_socket, m_timeout_ms, &elapsed,
+ (const char*)m_buffer.get_data(), m_buffer.length()) != 0)
+ {
+ fprintf(stderr, "Failed to flush buffer to socket, errno: %d\n", errno);
+ }
+
+ m_buffer.clear();
+}
+
=== modified file 'storage/ndb/src/common/util/SocketServer.cpp'
--- a/storage/ndb/src/common/util/SocketServer.cpp 2008-04-08 13:40:43 +0000
+++ b/storage/ndb/src/common/util/SocketServer.cpp 2008-08-05 11:59:01 +0000
@@ -28,17 +28,19 @@
SocketServer::SocketServer(unsigned maxSessions) :
m_sessions(10),
- m_services(5)
+ m_services(5),
+ m_maxSessions(maxSessions),
+ m_stopThread(false),
+ m_thread(0)
{
- m_thread = 0;
- m_stopThread = false;
- m_maxSessions = maxSessions;
}
SocketServer::~SocketServer() {
unsigned i;
for(i = 0; i<m_sessions.size(); i++){
- delete m_sessions[i].m_session;
+ Session* session= m_sessions[i].m_session;
+ assert(session->m_refCount == 0);
+ delete session;
}
for(i = 0; i<m_services.size(); i++){
if(m_services[i].m_socket)
@@ -275,13 +277,34 @@ SocketServer::startSession(SessionInstan
}
void
-SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
+SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *),
+ void *data)
{
+ // Build a list of pointers to all active sessions
+ // and increase refcount on the sessions
m_session_mutex.lock();
- for(int i = m_sessions.size() - 1; i >= 0; i--){
- (*func)(m_sessions[i].m_session, data);
+ Vector<Session*> session_pointers(m_sessions.size());
+ for(unsigned i= 0; i < m_sessions.size(); i++){
+ Session* session= m_sessions[i].m_session;
+ session_pointers.push_back(session);
+ session->m_refCount++;
}
m_session_mutex.unlock();
+
+ // Call the function on each session
+ for(unsigned i= 0; i < session_pointers.size(); i++){
+ (*func)(session_pointers[i], data);
+ }
+
+ // Release the sessions pointers and any stopped sessions
+ m_session_mutex.lock();
+ for(unsigned i= 0; i < session_pointers.size(); i++){
+ Session* session= session_pointers[i];
+ assert(session->m_refCount > 0);
+ session->m_refCount--;
+ }
+ checkSessionsImpl();
+ m_session_mutex.unlock();
}
void
@@ -297,14 +320,15 @@ SocketServer::checkSessionsImpl()
{
for(int i = m_sessions.size() - 1; i >= 0; i--)
{
- if(m_sessions[i].m_session->m_stopped)
+ if(m_sessions[i].m_session->m_stopped and
+ m_sessions[i].m_session->m_refCount == 0)
{
if(m_sessions[i].m_thread != 0)
{
void* ret;
NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
NdbThread_Destroy(&m_sessions[i].m_thread);
- }
+ }
m_sessions[i].m_session->stopSession();
delete m_sessions[i].m_session;
m_sessions.erase(i);
@@ -319,7 +343,6 @@ SocketServer::stopSessions(bool wait){
for(i = m_sessions.size() - 1; i>=0; i--)
{
m_sessions[i].m_session->stopSession();
- m_sessions[i].m_session->m_stop = true; // to make sure
}
m_session_mutex.unlock();
@@ -365,3 +388,4 @@ sessionThread_C(void* _sc){
template class MutexVector<SocketServer::ServiceInstance>;
template class Vector<SocketServer::SessionInstance>;
+template class Vector<SocketServer::Session*>;
=== modified file 'storage/ndb/src/mgmsrv/Services.cpp'
--- a/storage/ndb/src/mgmsrv/Services.cpp 2008-04-22 20:09:38 +0000
+++ b/storage/ndb/src/mgmsrv/Services.cpp 2008-08-05 11:59:01 +0000
@@ -299,7 +299,7 @@ MgmApiSession::MgmApiSession(class MgmtS
{
DBUG_ENTER("MgmApiSession::MgmApiSession");
m_input = new SocketInputStream(sock, 30000);
- m_output = new SocketOutputStream(sock, 30000);
+ m_output = new BufferedSockOutputStream(sock, 30000);
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
m_stopSelf= 0;
@@ -366,6 +366,10 @@ MgmApiSession::runSession()
stop= m_stop;
NdbMutex_Unlock(m_mutex);
+
+ // Send output from command to the client
+ m_output->flush();
+
};
NdbMutex_Lock(m_mutex);
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.3 branch (msvensson:2645) Bug#38563 | Magnus Svensson | 5 Aug |