List:Commits« Previous MessageNext Message »
From:Magnus Svensson Date:August 5 2008 1:53pm
Subject:bzr commit into mysql-5.1-telco-6.2 branch (msvensson:2637) Bug#38563
View as plain text  
#At file:///home/msvensson/mysql/6.2/

 2637 Magnus Svensson	2008-08-05
      Bug#38563 ndb_mgmd performs network I/O with client(s) while holding mutex
modified:
  storage/ndb/include/util/OutputStream.hpp
  storage/ndb/include/util/SocketServer.hpp
  storage/ndb/src/common/util/OutputStream.cpp
  storage/ndb/src/common/util/SocketServer.cpp
  storage/ndb/src/mgmsrv/Services.cpp

per-file messages:
  storage/ndb/include/util/OutputStream.hpp
    Add BufferedSockOutputStream wich will buffer everything written to it until
    'flush' is called
  storage/ndb/include/util/SocketServer.hpp
    Add m_refcount used to keep track if anyone has copied a session pointer
  storage/ndb/src/common/util/OutputStream.cpp
    Add BufferedSockOutputStream wich will buffer everything written to it until
    'flush' is called
  storage/ndb/src/common/util/SocketServer.cpp
    Change 'foreachSession' to work on a copy of the session list. That way the
m_sessionmutex can be released while querying each session about it's status, thus
allowing new sessions to be accepted meanwhile.
  storage/ndb/src/mgmsrv/Services.cpp
    Use BufferedSockOutputStream when writing to clients, this way the whole result can be
    formatted and m_mutex can be released before performing network I/O with the client
=== modified file 'storage/ndb/include/util/OutputStream.hpp'
--- a/storage/ndb/include/util/OutputStream.hpp	2007-03-26 21:32:12 +0000
+++ b/storage/ndb/include/util/OutputStream.hpp	2008-08-05 11:53:52 +0000
@@ -45,6 +45,7 @@ public:
 };
 
 class SocketOutputStream : public OutputStream {
+protected:
   NDB_SOCKET_TYPE m_socket;
   unsigned m_timeout_ms;
   bool m_timedout;
@@ -59,6 +60,21 @@ public:
   int println(const char * fmt, ...);
 };
 
+
+class BufferedSockOutputStream : public SocketOutputStream {
+  class UtilBuffer& m_buffer;
+public:
+  BufferedSockOutputStream(NDB_SOCKET_TYPE socket,
+                           unsigned write_timeout_ms = 1000);
+  virtual ~BufferedSockOutputStream();
+
+  int print(const char * fmt, ...);
+  int println(const char * fmt, ...);
+
+  void flush();
+};
+
+
 class NullOutputStream : public OutputStream {
 public:
   NullOutputStream() {}

=== modified file 'storage/ndb/include/util/SocketServer.hpp'
--- a/storage/ndb/include/util/SocketServer.hpp	2007-01-27 01:46:45 +0000
+++ b/storage/ndb/include/util/SocketServer.hpp	2008-08-05 11:53:52 +0000
@@ -40,11 +40,14 @@ public:
   protected:
     friend class SocketServer;
     friend void* sessionThread_C(void*);
-    Session(NDB_SOCKET_TYPE sock): m_socket(sock)
+    Session(NDB_SOCKET_TYPE sock) :
+      m_stop(false),
+      m_stopped(false),
+      m_socket(sock),
+      m_refCount(0)
       {
 	DBUG_ENTER("SocketServer::Session");
 	DBUG_PRINT("enter",("NDB_SOCKET: %d", m_socket));
-	m_stop = m_stopped = false;
 	DBUG_VOID_RETURN;
       }
     
@@ -52,6 +55,7 @@ public:
     bool m_stopped; // Has the session stopped?
     
     NDB_SOCKET_TYPE m_socket;
+    unsigned m_refCount;
   };
   
   /**

=== modified file 'storage/ndb/src/common/util/OutputStream.cpp'
--- a/storage/ndb/src/common/util/OutputStream.cpp	2008-02-22 15:14:27 +0000
+++ b/storage/ndb/src/common/util/OutputStream.cpp	2008-08-05 11:53:52 +0000
@@ -42,10 +42,12 @@ FileOutputStream::println(const char * f
 }
 
 SocketOutputStream::SocketOutputStream(NDB_SOCKET_TYPE socket,
-				       unsigned write_timeout_ms){
-  m_socket = socket;
-  m_timeout_remain= m_timeout_ms = write_timeout_ms;
-  m_timedout= false;
+				       unsigned write_timeout_ms) :
+  m_socket(socket),
+  m_timeout_ms(write_timeout_ms),
+  m_timedout(false),
+  m_timeout_remain(write_timeout_ms)
+{
 }
 
 int
@@ -92,3 +94,82 @@ SocketOutputStream::println(const char *
 
   return ret;
 }
+
+#include <UtilBuffer.hpp>
+#include <BaseString.hpp>
+
+BufferedSockOutputStream::BufferedSockOutputStream(NDB_SOCKET_TYPE socket,
+                                                   unsigned write_timeout_ms) :
+  SocketOutputStream(socket, write_timeout_ms),
+  m_buffer(*new UtilBuffer)
+{
+}
+
+BufferedSockOutputStream::~BufferedSockOutputStream()
+{
+  delete &m_buffer;
+}
+
+int
+BufferedSockOutputStream::print(const char * fmt, ...){
+  char buf[1];
+  va_list ap;
+  int len;
+  char* pos;
+
+  // Find out length of string
+  va_start(ap, fmt);
+  len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
+  va_end(ap);
+
+  // Grow buffer so it can hold the string
+  if ((pos= (char*)m_buffer.append(len+1)) == 0)
+    return -1;
+
+  // Print string to buffer
+  va_start(ap, fmt);
+  len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap);
+  va_end(ap);
+
+  return 0;
+}
+
+int
+BufferedSockOutputStream::println(const char * fmt, ...){
+  char buf[1];
+  va_list ap;
+  int len;
+  char* pos;
+
+  // Find out length of string
+  va_start(ap, fmt);
+  len = BaseString::vsnprintf(buf, sizeof(buf), fmt, ap);
+  va_end(ap);
+
+  // Grow buffer so it can hold the string and the new line
+  if ((pos= (char*)m_buffer.append(len+1)) == 0)
+    return -1;
+
+  // Print string to buffer
+  va_start(ap, fmt);
+  len = BaseString::vsnprintf((char*)pos, len+1, fmt, ap);
+  va_end(ap);
+
+  // Add newline
+  pos+= len;
+  *pos= '\n';
+
+  return 0;
+}
+
+void BufferedSockOutputStream::flush(){
+  int elapsed;
+  if (write_socket(m_socket, m_timeout_ms, &elapsed,
+                   (const char*)m_buffer.get_data(), m_buffer.length()) != 0)
+  {
+    fprintf(stderr, "Failed to flush buffer to socket, errno: %d\n", errno);
+  }
+
+  m_buffer.clear();
+}
+

=== modified file 'storage/ndb/src/common/util/SocketServer.cpp'
--- a/storage/ndb/src/common/util/SocketServer.cpp	2008-02-26 17:38:43 +0000
+++ b/storage/ndb/src/common/util/SocketServer.cpp	2008-08-05 11:53:52 +0000
@@ -28,17 +28,19 @@
 
 SocketServer::SocketServer(unsigned maxSessions) :
   m_sessions(10),
-  m_services(5)
+  m_services(5),
+  m_maxSessions(maxSessions),
+  m_stopThread(false),
+  m_thread(0)
 {
-  m_thread = 0;
-  m_stopThread = false;
-  m_maxSessions = maxSessions;
 }
 
 SocketServer::~SocketServer() {
   unsigned i;
   for(i = 0; i<m_sessions.size(); i++){
-    delete m_sessions[i].m_session;
+    Session* session= m_sessions[i].m_session;
+    assert(session->m_refCount == 0);
+    delete session;
   }
   for(i = 0; i<m_services.size(); i++){
     if(m_services[i].m_socket)
@@ -264,13 +266,34 @@ SocketServer::startSession(SessionInstan
 }
 
 void
-SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
+SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *),
+                             void *data)
 {
+  // Build a list of pointers to all active sessions
+  // and increase refcount on the sessions
   m_session_mutex.lock();
-  for(int i = m_sessions.size() - 1; i >= 0; i--){
-    (*func)(m_sessions[i].m_session, data);
+  Vector<Session*> session_pointers(m_sessions.size());
+  for(unsigned i= 0; i < m_sessions.size(); i++){
+    Session* session= m_sessions[i].m_session;
+    session_pointers.push_back(session);
+    session->m_refCount++;
   }
   m_session_mutex.unlock();
+
+  // Call the function on each session
+  for(unsigned i= 0; i < session_pointers.size(); i++){
+    (*func)(session_pointers[i], data);
+  }
+
+  // Release the sessions pointers and any stopped sessions
+  m_session_mutex.lock();
+  for(unsigned i= 0; i < session_pointers.size(); i++){
+    Session* session= session_pointers[i];
+    assert(session->m_refCount > 0);
+    session->m_refCount--;
+  }
+  checkSessionsImpl();
+  m_session_mutex.unlock();
 }
 
 void
@@ -286,14 +309,15 @@ SocketServer::checkSessionsImpl()
 {
   for(int i = m_sessions.size() - 1; i >= 0; i--)
   {
-    if(m_sessions[i].m_session->m_stopped)
+    if(m_sessions[i].m_session->m_stopped and
+       m_sessions[i].m_session->m_refCount == 0)
     {
       if(m_sessions[i].m_thread != 0)
       {
 	void* ret;
 	NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
 	NdbThread_Destroy(&m_sessions[i].m_thread);
-      } 
+      }
       m_sessions[i].m_session->stopSession();
       delete m_sessions[i].m_session;
       m_sessions.erase(i);
@@ -308,7 +332,6 @@ SocketServer::stopSessions(bool wait){
   for(i = m_sessions.size() - 1; i>=0; i--)
   {
     m_sessions[i].m_session->stopSession();
-    m_sessions[i].m_session->m_stop = true; // to make sure
   }
   m_session_mutex.unlock();
   
@@ -354,3 +377,4 @@ sessionThread_C(void* _sc){
 
 template class MutexVector<SocketServer::ServiceInstance>;
 template class Vector<SocketServer::SessionInstance>;
+template class Vector<SocketServer::Session*>;

=== modified file 'storage/ndb/src/mgmsrv/Services.cpp'
--- a/storage/ndb/src/mgmsrv/Services.cpp	2008-04-22 19:36:05 +0000
+++ b/storage/ndb/src/mgmsrv/Services.cpp	2008-08-05 11:53:52 +0000
@@ -298,7 +298,7 @@ MgmApiSession::MgmApiSession(class MgmtS
 {
   DBUG_ENTER("MgmApiSession::MgmApiSession");
   m_input = new SocketInputStream(sock, 30000);
-  m_output = new SocketOutputStream(sock, 30000);
+  m_output = new BufferedSockOutputStream(sock, 30000);
   m_parser = new Parser_t(commands, *m_input, true, true, true);
   m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
   m_stopSelf= 0;
@@ -365,6 +365,10 @@ MgmApiSession::runSession()
 
     stop= m_stop;
     NdbMutex_Unlock(m_mutex);
+
+    // Send output from command to the client
+    m_output->flush();
+
   };
 
   NdbMutex_Lock(m_mutex);

Thread
bzr commit into mysql-5.1-telco-6.2 branch (msvensson:2637) Bug#38563Magnus Svensson5 Aug