List:Commits« Previous MessageNext Message »
From:Martin Skold Date:June 2 2010 10:22am
Subject:bzr commit into mysql-5.1-telco-6.3 branch (Martin.Skold:3208) Bug#34303
Bug#54155 Bug#54168
View as plain text  
#At file:///home/marty/MySQL/mysql-5.1-telco-6.3/

 3208 Martin Skold	2010-06-02 [merge]
      Merge
      added:
        storage/ndb/include/portlib/ndb_socket_poller.h
      modified:
        mysql-test/t/ctype_cp932_binlog_stm.test
        storage/ndb/include/portlib/NdbTCP.h
        storage/ndb/include/transporter/TransporterRegistry.hpp
        storage/ndb/include/util/SocketClient.hpp
        storage/ndb/include/util/SocketServer.hpp
        storage/ndb/src/common/transporter/TCP_Transporter.cpp
        storage/ndb/src/common/transporter/TCP_Transporter.hpp
        storage/ndb/src/common/transporter/Transporter.cpp
        storage/ndb/src/common/transporter/TransporterRegistry.cpp
        storage/ndb/src/common/util/SocketClient.cpp
        storage/ndb/src/common/util/SocketServer.cpp
        storage/ndb/src/common/util/socket_io.cpp
        storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
        storage/ndb/src/kernel/vm/SimulatedBlock.cpp
        storage/ndb/src/kernel/vm/VMSignal.hpp
        storage/ndb/src/mgmapi/mgmapi.cpp

=== modified file 'mysql-test/t/ctype_cp932_binlog_stm.test'
--- a/mysql-test/t/ctype_cp932_binlog_stm.test	2009-08-26 10:59:55 +0000
+++ b/mysql-test/t/ctype_cp932_binlog_stm.test	2010-06-01 20:24:30 +0000
@@ -32,6 +32,10 @@ delimiter ;|
 # Note: 365 (depends on FD event size changes) is a magic position (found experimentally, depends on  
 # the log's contents) that caused the server crash.
 
+-- disable_query_log
+call mtr.add_suppression("Error in Log_event::read_log_event\\\(\\\): 'read error', data_len: 66124, event_type: 116");
+-- enable_query_log
+
 --error 1220
 SHOW BINLOG EVENTS FROM 365;
 

=== modified file 'storage/ndb/include/portlib/NdbTCP.h'
--- a/storage/ndb/include/portlib/NdbTCP.h	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/portlib/NdbTCP.h	2010-06-01 10:45:29 +0000
@@ -54,6 +54,26 @@
 
 #define NDB_SOCKLEN_T SOCKET_SIZE_TYPE
 
+
+/* Forward compatibility functions */
+
+typedef NDB_SOCKET_TYPE ndb_socket_t;
+typedef NDB_SOCKET_TYPE ndb_native_socket_t;
+
+static inline bool
+my_socket_valid(ndb_socket_t s)
+{
+  return (s != NDB_INVALID_SOCKET);
+}
+
+static inline ndb_native_socket_t
+ndb_socket_get_native(ndb_socket_t s)
+{
+  return s;
+}
+
+#include <portlib/ndb_socket_poller.h>
+
 #ifdef	__cplusplus
 extern "C" {
 #endif

=== added file 'storage/ndb/include/portlib/ndb_socket_poller.h'
--- a/storage/ndb/include/portlib/ndb_socket_poller.h	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/portlib/ndb_socket_poller.h	2010-06-01 10:45:29 +0000
@@ -0,0 +1,287 @@
+/*
+   Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef NDB_SOCKET_POLLER_H
+#define NDB_SOCKET_POLLER_H
+
+/*
+  Portability layer used for waiting on socket events
+*/
+
+class ndb_socket_poller {
+  // Max number of fds the list can hold, defaults to 1 and
+  // can be dynamically expanded by calling 'set_max_count'
+  unsigned m_max_count;
+
+  // Current number of fds in the list
+  unsigned m_count;
+
+#ifdef HAVE_POLL
+  // The list of pollfds, initial size is 1 and m_pfds will
+  // then point at m_one_pfd. After dynamic expand points at
+  // dynamic list of pollfds
+  struct pollfd m_one_pfd;
+  struct pollfd* m_pfds;
+#else
+#if defined(_WIN32)
+  // Utility functions for dynamically expanding the fd_set
+  // on Windows to get around the hardcoded FD_SETSIZE limit.
+  static bool
+  set_max_count(fd_set* set, fd_set* static_set, unsigned count) {
+    void* ptr = malloc(sizeof(fd_set) + count-1*sizeof(SOCKET));
+    if (!ptr)
+      return false;
+    if (set != static_set)
+      free(set);
+    set = (fd_set*)ptr;
+    clear(set);
+    return true;
+  }
+
+  static void
+  set_fd(fd_set* set, SOCKET s) {
+    // Avoid use of FD_SET since it silently drop
+    // sockets when FD_SETSIZE fd_count is reached
+    set->fd_array[set->fd_count++] = s;
+  }
+
+  static void
+  clear(fd_set* set) {
+    FD_ZERO(set);
+  }
+#endif
+  fd_set m_one_read_set;
+  fd_set m_one_write_set;
+  fd_set m_one_excp_set;
+  fd_set* m_read_set;
+  fd_set* m_write_set;
+  fd_set* m_excp_set;
+
+  // Mapping from "index" to "fd"
+  ndb_native_socket_t m_one_fd;
+  ndb_native_socket_t* m_fds;
+
+  int m_nfds; // Max fd number for 'select'
+#endif
+
+public:
+
+  ndb_socket_poller(void) :
+    m_max_count(1)
+#ifdef HAVE_POLL
+    , m_pfds(&m_one_pfd)
+#else
+    , m_read_set(&m_one_read_set)
+    , m_write_set(&m_one_write_set)
+    , m_excp_set(&m_one_excp_set)
+    , m_fds(&m_one_fd)
+#endif
+  {
+    clear();
+  }
+
+  void clear(void) {
+    m_count = 0;
+#ifndef HAVE_POLL
+    FD_ZERO(m_read_set);
+    FD_ZERO(m_write_set);
+    FD_ZERO(m_excp_set);
+    m_nfds = 0;
+#endif
+  }
+
+  ~ndb_socket_poller() {
+#ifdef HAVE_POLL
+    if (m_pfds != &m_one_pfd)
+      delete[] m_pfds;
+#else
+#ifdef _WIN32
+    if (m_read_set != &m_one_read_set)
+      free(m_read_set);
+    if (m_write_set != &m_one_write_set)
+      free(m_write_set);
+    if (m_excp_set != &m_one_excp_set)
+      free(m_excp_set);
+#endif
+    if (m_fds != &m_one_fd)
+      delete[] m_fds;
+#endif
+    }
+
+  bool set_max_count(unsigned count) {
+    if (count <= m_max_count)
+    {
+      // Ignore decrease or setting same value
+      return true;
+    }
+#ifdef HAVE_POLL
+    struct pollfd* pfds = new struct pollfd[count];
+    if (pfds == NULL)
+      return false;
+    if (m_pfds != &m_one_pfd)
+      delete[] m_pfds;
+    m_pfds = pfds;
+#else
+#if defined(_WIN32)
+    if (count > FD_SETSIZE)
+    {
+      // Expand the arrays above the builtin FD_SETSIZE
+      if (!set_max_count(m_read_set, &m_one_read_set, count) ||
+          !set_max_count(m_write_set, &m_one_write_set, count) ||
+          !set_max_count(m_excp_set, &m_one_excp_set, count))
+        return false;
+    }
+#endif
+    ndb_native_socket_t* fds = new ndb_native_socket_t[count];
+    if (fds == NULL)
+      return false;
+    if (m_fds != &m_one_fd)
+      delete[] m_fds;
+    m_fds = fds;
+#endif
+    m_max_count = count;
+    return true;
+  }
+
+  unsigned add(ndb_socket_t sock, bool read, bool write, bool error) {
+    const unsigned index = m_count;
+#ifdef HAVE_POLL
+    assert(m_count < m_max_count);
+    struct pollfd &pfd = m_pfds[m_count++];
+    pfd.fd = ndb_socket_get_native(sock);
+
+    short events = 0;
+    if (read)
+      events |= POLLIN;
+    if (write)
+      events |= POLLOUT;
+    if (error)
+      events |= POLLPRI;
+    pfd.events = events;
+
+    pfd.revents = 0;
+#else
+#if defined(_WIN32)
+    if (read)
+      set_fd(m_read_set, ndb_socket_get_native(sock));
+    if (write)
+      set_fd(m_write_set, ndb_socket_get_native(sock));
+    if (error)
+      set_fd(m_excp_set, ndb_socket_get_native(sock));
+    // Not counting nfds on Windows since select ignores it anyway
+    assert(m_nfds == 0);
+#else
+    if (fd < 0 || fd >= FD_SETSIZE)
+    {
+      fprintf(stderr, "Maximum value for FD_SETSIZE: %d exceeded when"
+        "trying to add fd: %d", FD_SETSIZE, fd);
+      fflush(stderr);
+      abort();
+    }
+    if (read)
+      FD_SET(fd, m_read_set);
+    if (write)
+      FD_SET(fd, m_write_set);
+    if (error)
+      FD_SET(fd, m_excp_set);
+    if (fd > m_nfds)
+      m_nfds = fd;
+#endif
+    // Maintain mapping from index to fd
+    m_fds[m_count++] = ndb_socket_get_native(sock);
+#endif
+    assert(m_count > index);
+    return index;
+  }
+
+  unsigned count(void) const {
+    return m_count;
+  }
+
+  bool is_socket_equal(unsigned index, ndb_socket_t socket) const {
+    assert(index < m_count);
+    assert(m_count <= m_max_count);
+#ifdef HAVE_POLL
+    return (m_pfds[index].fd == ndb_socket_get_native(socket));
+#else
+    return (m_fds[index] == ndb_socket_get_native(socket));
+#endif
+  }
+
+  bool has_read(unsigned index) const {
+    assert(index < m_count);
+    assert(m_count <= m_max_count);
+#ifdef HAVE_POLL
+    return (m_pfds[index].revents & POLLIN);
+#else
+    return FD_ISSET(m_fds[index], m_read_set);
+#endif
+  }
+
+  bool has_write(unsigned index) const {
+    assert(index < m_count);
+    assert(m_count <= m_max_count);
+#ifdef HAVE_POLL
+    return (m_pfds[index].revents & POLLOUT);
+#else
+    return FD_ISSET(m_fds[index], m_write_set);
+#endif
+  }
+
+  int poll(int timeout) {
+#ifdef HAVE_POLL
+    return ::poll(m_pfds, m_count, timeout);
+#else
+
+#ifdef _WIN32
+    if (m_count == 0)
+    {
+      // Windows does not sleep on 'select' with 0 sockets
+      Sleep(timeout);
+      return 0; // Timeout occured
+    }
+#endif
+
+    struct timeval tv;
+    tv.tv_sec  = (timeout / 1000);
+    tv.tv_usec = (timeout % 1000) * 1000;
+
+    return select(m_nfds+1, m_read_set, m_write_set, m_excp_set,
+                  timeout == -1 ? NULL : &tv);
+#endif
+  }
+};
+
+
+static inline
+int
+ndb_poll(ndb_socket_t sock,
+         bool read, bool write, bool error, int timeout_millis)
+{
+  ndb_socket_poller poller;
+  (void)poller.add(sock, read, write, error);
+
+  const int res = poller.poll(timeout_millis);
+  if (res <= 0)
+    return res;
+
+  assert(res >= 1);
+
+  return res;
+}
+
+#endif

=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2009-10-20 18:19:00 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2010-06-01 10:45:29 +0000
@@ -362,8 +362,8 @@ private:
    * Used in polling if exists TCP_Transporter
    */
   int tcpReadSelectReply;
-  fd_set tcpReadset;
-  
+  ndb_socket_poller m_socket_poller;
+
   Uint32 poll_TCP(Uint32 timeOutMillis);
   Uint32 poll_SCI(Uint32 timeOutMillis);
   Uint32 poll_SHM(Uint32 timeOutMillis);

=== modified file 'storage/ndb/include/util/SocketClient.hpp'
--- a/storage/ndb/include/util/SocketClient.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/util/SocketClient.hpp	2010-06-01 10:45:29 +0000
@@ -25,7 +25,7 @@ class SocketAuthenticator;
 class SocketClient
 {
   struct sockaddr_in m_servaddr;
-  unsigned int m_connect_timeout_sec;
+  unsigned int m_connect_timeout_millisec;
   unsigned short m_port;
   char *m_server_name;
   SocketAuthenticator *m_auth;
@@ -37,8 +37,8 @@ public:
     m_port = port;
     m_servaddr.sin_port = htons(m_port);
   };
-  void set_connect_timeout(unsigned int s) {
-    m_connect_timeout_sec= s;
+  void set_connect_timeout(unsigned int timeout_millisec) {
+    m_connect_timeout_millisec = timeout_millisec;
   }
   unsigned short get_port() { return m_port; };
   char *get_server_name() { return m_server_name; };

=== modified file 'storage/ndb/include/util/SocketServer.hpp'
--- a/storage/ndb/include/util/SocketServer.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/util/SocketServer.hpp	2010-06-01 10:45:29 +0000
@@ -127,9 +127,10 @@ private:
   NdbLockable m_session_mutex;
   Vector<SessionInstance> m_sessions;
   MutexVector<ServiceInstance> m_services;
+  ndb_socket_poller m_services_poller;
   unsigned m_maxSessions;
-  
-  void doAccept();
+
+  bool doAccept();
   void checkSessionsImpl();
   void startSession(SessionInstance &);
   

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2009-12-15 15:37:38 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2010-06-01 10:45:29 +0000
@@ -86,7 +86,8 @@ TCP_Transporter::TCP_Transporter(Transpo
 	      0, false, 
 	      conf->checksum,
 	      conf->signalId),
-  m_sendBuffer(conf->tcp.sendBufferSize)
+  m_sendBuffer(conf->tcp.sendBufferSize),
+  m_poll_index(~0)
 {
   maxReceiveSize = conf->tcp.maxReceiveSize;
   
@@ -257,20 +258,15 @@ TCP_Transporter::setSocketNonBlocking(ND
 #endif
 
 bool
-TCP_Transporter::sendIsPossible(struct timeval * timeout) {
-  if(theSocket != NDB_INVALID_SOCKET){
-    fd_set   writeset;
-    FD_ZERO(&writeset);
-    FD_SET(theSocket, &writeset);
-    
-    int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
+TCP_Transporter::sendIsPossible(int timeout_millisec) const
+{
+  if (!my_socket_valid(theSocket))
+    return false;
 
-    if ((selectReply > 0) && FD_ISSET(theSocket, &writeset)) 
-      return true;
-    else
-      return false;
-  }
-  return false;
+  if (ndb_poll(theSocket, false, true, false, timeout_millisec) <= 0)
+    return false; // Timeout or error occured
+
+  return true;
 }
 
 Uint32
@@ -283,15 +279,13 @@ Uint32 *
 TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
   
   Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
-  
-  struct timeval timeout = {0, 10000};
 
   if (insertPtr == 0) {
     //-------------------------------------------------
     // Buffer was completely full. We have severe problems.
     // We will attempt to wait for a small time
     //-------------------------------------------------
-    if(sendIsPossible(&timeout)) {
+    if(sendIsPossible(10)) {
       //-------------------------------------------------
       // Send is possible after the small timeout.
       //-------------------------------------------------
@@ -326,8 +320,7 @@ TCP_Transporter::updateWritePtr(Uint32 l
     // we will not worry since we will soon be back for
     // a renewed trial.
     //-------------------------------------------------
-    struct timeval no_timeout = {0,0};
-    if(sendIsPossible(&no_timeout)) {
+    if(sendIsPossible(0)) {
       //-------------------------------------------------
       // Send was possible, attempt at a send.
       //-------------------------------------------------

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2010-06-01 10:45:29 +0000
@@ -139,7 +139,7 @@ private:
   static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
   virtual int pre_connect_options(NDB_SOCKET_TYPE aSocket);
   
-  bool sendIsPossible(struct timeval * timeout);
+  bool sendIsPossible(int timeout_millisec) const;
 
   /**
    * Statistics
@@ -157,6 +157,10 @@ private:
    */
   Uint32 overloadedPct;
   void update_status_overloaded();
+
+  unsigned m_poll_index;
+  void set_poll_index(unsigned index) { m_poll_index = index; };
+  unsigned get_poll_index(void) const { return m_poll_index; };
 };
 
 inline

=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp	2009-08-03 12:54:45 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp	2010-06-01 10:45:29 +0000
@@ -89,7 +89,7 @@ Transporter::Transporter(TransporterRegi
 				      new SocketAuthSimple("ndbd",
 							   "ndbd passwd"));
 
-    m_socket_client->set_connect_timeout((m_timeOutMillis+999)/1000);
+    m_socket_client->set_connect_timeout(m_timeOutMillis);
   }
   DBUG_VOID_RETURN;
 }

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2010-05-07 13:08:39 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2010-06-01 11:47:29 +0000
@@ -218,7 +218,11 @@ TransporterRegistry::init(NodeId nodeId)
   localNodeId = nodeId;
   
   DEBUG("TransporterRegistry started node: " << localNodeId);
-  
+
+  if (!m_socket_poller.set_max_count(maxTransporters +
+				     1 /* wakeup socket */))
+    DBUG_RETURN(false);
+
   DBUG_RETURN(true);
 }
 
@@ -284,15 +288,6 @@ TransporterRegistry::connect_server(NDB_
       g_eventLogger->error("Incompatible configuration: Transporter type "
                            "mismatch with node %d", nodeId);
 
-      // wait for socket close for 1 second to let message arrive at client
-      {
-	fd_set a_set;
-	FD_ZERO(&a_set);
-	FD_SET(sockfd, &a_set);
-	struct timeval timeout;
-	timeout.tv_sec  = 1; timeout.tv_usec = 0;
-	select(sockfd+1, &a_set, 0, 0, &timeout);
-      }
       DBUG_RETURN(false);
     }
   }
@@ -877,68 +872,38 @@ Uint32 
 TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
 {
   bool hasdata = false;
-  if (false && nTCPTransporters == 0)
-  {
-    tcpReadSelectReply = 0;
-    return 0;
-  }
-  
-  NDB_SOCKET_TYPE maxSocketValue = -1;
-  
-  // Needed for TCP/IP connections
-  // The read- and writeset are used by select
-  
-  FD_ZERO(&tcpReadset);
 
-  // Prepare for sending and receiving
-  for (int i = 0; i < nTCPTransporters; i++) {
-    TCP_Transporter * t = theTCPTransporters[i];
-    Uint32 node_id= t->getRemoteNodeId();
-    
-    // If the transporter is connected
-    if (is_connected(node_id) && t->isConnected()) {
-      
-      const NDB_SOCKET_TYPE socket = t->getSocket();
-      // Find the highest socket value. It will be used by select
-      if (socket > maxSocketValue)
-	maxSocketValue = socket;
-      
-      // Put the connected transporters in the socket read-set 
-      FD_SET(socket, &tcpReadset);
-    }
-    hasdata |= t->hasReceiveData();
-  }
-  
+  m_socket_poller.clear();
+
   if (m_has_extra_wakeup_socket)
   {
     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
-    if (socket > maxSocketValue)
-      maxSocketValue = socket;
 
-    // Put the wakup-socket in the read-set
-    FD_SET(socket, &tcpReadset);
+    // Poll the wakup-socket for read
+    m_socket_poller.add(socket, true, false, false);
   }
 
-  timeOutMillis = hasdata ? 0 : timeOutMillis;
-  
-  struct timeval timeout;
-  timeout.tv_sec  = timeOutMillis / 1000;
-  timeout.tv_usec = (timeOutMillis % 1000) * 1000;
+  for (int i = 0; i < nTCPTransporters; i++)
+  {
+    unsigned index = ~0;
+    TCP_Transporter * t = theTCPTransporters[i];
+    const Uint32 node_id = t->getRemoteNodeId();
+    const NDB_SOCKET_TYPE socket = t->getSocket();
 
-  // The highest socket value plus one
-  maxSocketValue++; 
-  
-  tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);  
-  if(false && tcpReadSelectReply == -1 && errno == EINTR)
-    g_eventLogger->info("woke-up by signal");
+    if (is_connected(node_id) && t->isConnected() &&
+        my_socket_valid(socket))
+    {
+      // Poll the connected transporter for read
+      index = m_socket_poller.add(socket, true, false, false);
+    }
+    // Remember the index into poll list
+    t->set_poll_index(index);
 
-#ifdef NDB_WIN32
-  if(tcpReadSelectReply == SOCKET_ERROR)
-  {
-    NdbSleep_MilliSleep(timeOutMillis);
+    hasdata |= t->hasReceiveData();
   }
-#endif
-  
+
+  tcpReadSelectReply = m_socket_poller.poll(hasdata ? 0 : timeOutMillis);
+
   return tcpReadSelectReply || hasdata;
 }
 #endif
@@ -1057,8 +1022,10 @@ TransporterRegistry::performReceive()
   {
     if (m_has_extra_wakeup_socket)
     {
-      if (FD_ISSET(m_extra_wakeup_sockets[0], &tcpReadset))
+      // The wakeupsocket is always added first => use index 0
+      if (m_socket_poller.has_read(0))
       {
+        assert(m_socket_poller.is_socket_equal(0, m_extra_wakeup_sockets[0]));
         consume_extra_sockets();
       }
     }
@@ -1068,14 +1035,19 @@ TransporterRegistry::performReceive()
       checkJobBuffer();
       TCP_Transporter *t = theTCPTransporters[i];
       const NodeId nodeId = t->getRemoteNodeId();
-      const NDB_SOCKET_TYPE socket    = t->getSocket();
+
       if(is_connected(nodeId)){
         if(t->isConnected())
         {
-          if (FD_ISSET(socket, &tcpReadset))
+          const unsigned index = t->get_poll_index();
+          if (index != (unsigned)~0 &&
+              m_socket_poller.has_read(index))
           {
+            assert(m_socket_poller.is_socket_equal(index, t->getSocket()));
             t->doReceive();
           }
+	  // Reset the index into poll list
+	  t->set_poll_index(~0);
           
           if (t->hasReceiveData())
           {

=== modified file 'storage/ndb/src/common/util/SocketClient.cpp'
--- a/storage/ndb/src/common/util/SocketClient.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/common/util/SocketClient.cpp	2010-06-01 10:45:29 +0000
@@ -18,18 +18,17 @@
 
 
 #include <ndb_global.h>
-#include <NdbOut.hpp>
 
 #include <SocketClient.hpp>
 #include <SocketAuthenticator.hpp>
 
-SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa)
+SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa) :
+  m_connect_timeout_millisec(0) // Blocking connect by default
 {
   m_auth= sa;
   m_port= port;
   m_server_name= server_name ? strdup(server_name) : 0;
   m_sockfd= NDB_INVALID_SOCKET;
-  m_connect_timeout_sec= 0;
 }
 
 SocketClient::~SocketClient()
@@ -108,19 +107,10 @@ SocketClient::bind(const char* bindaddre
 NDB_SOCKET_TYPE
 SocketClient::connect(const char *toaddress, unsigned short toport)
 {
-  fd_set rset, wset;
-  struct timeval tval;
-  int r;
-  bool use_timeout;
-  SOCKOPT_OPTLEN_TYPE len;
-  int flags;
-
-  if (m_sockfd == NDB_INVALID_SOCKET)
+  if (!my_socket_valid(m_sockfd))
   {
-    if (!init()) {
-#ifdef VM_TRACE
-      ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl;
-#endif
+    if (!init())
+    {
       return NDB_INVALID_SOCKET;
     }
   }
@@ -139,52 +129,53 @@ SocketClient::connect(const char *toaddr
       return NDB_INVALID_SOCKET;
   }
 
-  flags= fcntl(m_sockfd, F_GETFL, 0);
+  // Set socket non blocking
+  const int flags= fcntl(m_sockfd, F_GETFL, 0);
   fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK);
 
-  r= ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
-
+  // Start non blocking connect
+  int r = ::connect(m_sockfd,
+		    (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
   if (r == 0)
     goto done; // connected immediately.
 
   if (r < 0 && (errno != EINPROGRESS)) {
+    // Start of non blocking connect failed
     NDB_CLOSE_SOCKET(m_sockfd);
     m_sockfd= NDB_INVALID_SOCKET;
     return NDB_INVALID_SOCKET;
   }
 
-  FD_ZERO(&rset);
-  FD_SET(m_sockfd, &rset);
-  wset= rset;
-  tval.tv_sec= m_connect_timeout_sec;
-  tval.tv_usec= 0;
-  use_timeout= m_connect_timeout_sec;
-
-  if ((r= select(m_sockfd+1, &rset, &wset, NULL,
-                 use_timeout? &tval : NULL)) == 0)
+  if (ndb_poll(m_sockfd, true, true, true,
+               m_connect_timeout_millisec > 0 ?
+               m_connect_timeout_millisec : -1) <= 0)
   {
+    // Nothing has happened on the socket after timeout
+    // or an error occured
     NDB_CLOSE_SOCKET(m_sockfd);
     m_sockfd= NDB_INVALID_SOCKET;
     return NDB_INVALID_SOCKET;
   }
 
-  if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset))
+  // Activity detected on the socket
+
   {
-    len= sizeof(r);
-    if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &r, &len) < 0 || r)
+    // Check socket level error code
+    int so_error = 0;
+    SOCKOPT_OPTLEN_TYPE len= sizeof(so_error);
+    if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0)
+    {
+      NDB_CLOSE_SOCKET(m_sockfd);
+      m_sockfd= NDB_INVALID_SOCKET;
+      return NDB_INVALID_SOCKET;
+    }
+
+    if (so_error)
     {
-      // Solaris got an error... different than others
       NDB_CLOSE_SOCKET(m_sockfd);
       m_sockfd= NDB_INVALID_SOCKET;
       return NDB_INVALID_SOCKET;
     }
-  }
-  else
-  {
-    // select error, probably m_sockfd not set.
-    NDB_CLOSE_SOCKET(m_sockfd);
-    m_sockfd= NDB_INVALID_SOCKET;
-    return NDB_INVALID_SOCKET;
   }
 
 done:

=== modified file 'storage/ndb/src/common/util/SocketServer.cpp'
--- a/storage/ndb/src/common/util/SocketServer.cpp	2009-09-08 12:23:44 +0000
+++ b/storage/ndb/src/common/util/SocketServer.cpp	2010-06-01 10:45:29 +0000
@@ -152,60 +152,77 @@ SocketServer::setup(SocketServer::Servic
   i.m_service = service;
   m_services.push_back(i);
 
+  // Increase size to allow polling all listening ports
+  m_services_poller.set_max_count(m_services.size());
+
   *port = ntohs(servaddr.sin_port);
 
   DBUG_RETURN(true);
 }
 
-void
-SocketServer::doAccept(){
-  fd_set readSet, exceptionSet;
-  FD_ZERO(&readSet);
-  FD_ZERO(&exceptionSet);
-  
+
+bool
+SocketServer::doAccept()
+{
   m_services.lock();
-  int maxSock = 0;
-  for (unsigned i = 0; i < m_services.size(); i++){
-    const NDB_SOCKET_TYPE s = m_services[i].m_socket;
-    FD_SET(s, &readSet);
-    FD_SET(s, &exceptionSet);
-    maxSock = (maxSock > s ? maxSock : s);
-  }
-  struct timeval timeout;
-  timeout.tv_sec  = 1;
-  timeout.tv_usec = 0;
-  
-  if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){
-    for (unsigned i = 0; i < m_services.size(); i++){
-      ServiceInstance & si = m_services[i];
-      
-      if(FD_ISSET(si.m_socket, &readSet)){
-	NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
-	if(childSock == NDB_INVALID_SOCKET){
-	  continue;
-	}
-	
-	SessionInstance s;
-	s.m_service = si.m_service;
-	s.m_session = si.m_service->newSession(childSock);
-	if(s.m_session != 0)
-	{
-	  m_session_mutex.lock();
-	  m_sessions.push_back(s);
-	  startSession(m_sessions.back());
-	  m_session_mutex.unlock();
-	}
-	
-	continue;
-      }      
-      
-      if(FD_ISSET(si.m_socket, &exceptionSet)){
-	DEBUG("socket in the exceptionSet");
-	continue;
-      }
+
+  m_services_poller.clear();
+  for (unsigned i = 0; i < m_services.size(); i++)
+  {
+    m_services_poller.add(m_services[i].m_socket, true, false, true);
+  }
+  assert(m_services.size() == m_services_poller.count());
+
+  const int accept_timeout_ms = 1000;
+  const int ret = m_services_poller.poll(accept_timeout_ms);
+  if (ret < 0)
+  {
+    // Error occured, indicate error to caller by returning false
+    m_services.unlock();
+    return false;
+  }
+
+  if (ret == 0)
+  {
+    // Timeout occured
+    m_services.unlock();
+    return true;
+  }
+
+  bool result = true;
+  for (unsigned i = 0; i < m_services_poller.count(); i++)
+  {
+    const bool has_read = m_services_poller.has_read(i);
+
+    if (!has_read)
+      continue; // Ignore events where read flag wasn't set
+
+    ServiceInstance & si = m_services[i];
+    assert(m_services_poller.is_socket_equal(i, si.m_socket));
+
+    const NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
+    if (!my_socket_valid(childSock))
+    {
+      // Could not 'accept' socket(maybe at max fds), indicate error
+      // to caller by returning false
+      result = false;
+      continue;
+    }
+
+    SessionInstance s;
+    s.m_service = si.m_service;
+    s.m_session = si.m_service->newSession(childSock);
+    if (s.m_session != 0)
+    {
+      m_session_mutex.lock();
+      m_sessions.push_back(s);
+      startSession(m_sessions.back());
+      m_session_mutex.unlock();
     }
   }
+
   m_services.unlock();
+  return result;
 }
 
 extern "C"
@@ -260,11 +277,16 @@ SocketServer::doRun(){
   while(!m_stopThread){
     m_session_mutex.lock();
     checkSessionsImpl();
-    if(m_sessions.size() < m_maxSessions){
-      m_session_mutex.unlock();
-      doAccept();
-    } else {
-      m_session_mutex.unlock();
+    m_session_mutex.unlock();
+
+    if(m_sessions.size() >= m_maxSessions){
+      // Don't accept more connections yet
+      NdbSleep_MilliSleep(200);
+      continue;
+    }
+
+    if (!doAccept()){
+      // accept failed, step back
       NdbSleep_MilliSleep(200);
     }
   }

=== modified file 'storage/ndb/src/common/util/socket_io.cpp'
--- a/storage/ndb/src/common/util/socket_io.cpp	2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/common/util/socket_io.cpp	2010-06-01 10:45:29 +0000
@@ -29,16 +29,8 @@ read_socket(NDB_SOCKET_TYPE socket, int 
 	    char * buf, int buflen){
   if(buflen < 1)
     return 0;
-  
-  fd_set readset;
-  FD_ZERO(&readset);
-  FD_SET(socket, &readset);
-  
-  struct timeval timeout;
-  timeout.tv_sec  = (timeout_millis / 1000);
-  timeout.tv_usec = (timeout_millis % 1000) * 1000;
 
-  const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
+  const int selectRes = ndb_poll(socket, true, false, false, timeout_millis);
   if(selectRes == 0)
     return 0;
   
@@ -56,20 +48,11 @@ readln_socket(NDB_SOCKET_TYPE socket, in
   if(buflen <= 1)
     return 0;
 
-  fd_set readset;
-  FD_ZERO(&readset);
-  FD_SET(socket, &readset);
-
-  struct timeval timeout;
-  timeout.tv_sec  = (timeout_millis / 1000);
-  timeout.tv_usec = (timeout_millis % 1000) * 1000;
-
   if(mutex)
     NdbMutex_Unlock(mutex);
   Uint64 tick= NdbTick_CurrentMillisecond();
-  const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
-
-  *time= NdbTick_CurrentMillisecond() - tick;
+  int selectRes = ndb_poll(socket, true, false, false, timeout_millis);
+  *time= (int)(NdbTick_CurrentMillisecond() - tick);
   if(mutex)
     NdbMutex_Lock(mutex);
 
@@ -134,14 +117,10 @@ readln_socket(NDB_SOCKET_TYPE socket, in
       tmp -= t;
     }
 
-    FD_ZERO(&readset);
-    FD_SET(socket, &readset);
-    timeout.tv_sec  = ((timeout_millis - *time) / 1000);
-    timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
-
     tick= NdbTick_CurrentMillisecond();
-    const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
-    *time= NdbTick_CurrentMillisecond() - tick;
+    int selectRes = ndb_poll(socket, true, false, false,
+                             timeout_millis - *time);
+    *time= (int)(NdbTick_CurrentMillisecond() - tick);
 
     if(selectRes != 1){
       return -1;
@@ -155,17 +134,10 @@ extern "C"
 int
 write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
 	     const char buf[], int len){
-  fd_set writeset;
-  FD_ZERO(&writeset);
-  FD_SET(socket, &writeset);
-  struct timeval timeout;
-  timeout.tv_sec  = (timeout_millis / 1000);
-  timeout.tv_usec = (timeout_millis % 1000) * 1000;
-
 
   Uint64 tick= NdbTick_CurrentMillisecond();
-  const int selectRes = select(socket + 1, 0, &writeset, 0, &timeout);
-  *time= NdbTick_CurrentMillisecond() - tick;
+  const int selectRes = ndb_poll(socket, false, true, false, timeout_millis);
+  *time= (int)(NdbTick_CurrentMillisecond() - tick);
 
   if(selectRes != 1){
     return -1;
@@ -182,15 +154,11 @@ write_socket(NDB_SOCKET_TYPE socket, int
     
     if(len == 0)
       break;
-    
-    FD_ZERO(&writeset);
-    FD_SET(socket, &writeset);
-    timeout.tv_sec  = ((timeout_millis - *time) / 1000);
-    timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
 
     Uint64 tick= NdbTick_CurrentMillisecond();
-    const int selectRes2 = select(socket + 1, 0, &writeset, 0, &timeout);
-    *time= NdbTick_CurrentMillisecond() - tick;
+    const int selectRes2 = ndb_poll(socket, false, true, false,
+                                    timeout_millis - *time);
+    *time= (int)(NdbTick_CurrentMillisecond() - tick);
 
     if(selectRes2 != 1){
       return -1;

=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp	2010-06-02 09:12:29 +0000
@@ -924,7 +924,7 @@ bool Dbtup::readTriggerInfo(TupTriggerDa
     numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes,
                                 &readBuffer[0]);
   }
-  ndbrequire(numAttrsToRead < MAX_ATTRIBUTES_IN_TABLE);
+  ndbrequire(numAttrsToRead <= MAX_ATTRIBUTES_IN_TABLE);
 //--------------------------------------------------------------------
 // Read Main tuple values
 //--------------------------------------------------------------------

=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2010-01-25 15:36:04 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp	2010-06-02 07:36:20 +0000
@@ -2398,7 +2398,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
   }
   tmp.p->m_callback = c;
 
-  if(!c_fragSenderRunning){
+  if(!c_fragSenderRunning)
+  {
+    SaveSignal<2> save(signal);
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
     sig->type = ContinueFragmented::CONTINUE_SENDING;
@@ -2439,7 +2441,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
   }
   tmp.p->m_callback = c;
 
-  if(!c_fragSenderRunning){
+  if(!c_fragSenderRunning)
+  {
+    SaveSignal<2> save(signal);
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
     sig->type = ContinueFragmented::CONTINUE_SENDING;
@@ -2489,7 +2493,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
   }
   tmp.p->m_callback = c;
   
-  if(!c_fragSenderRunning){
+  if(!c_fragSenderRunning)
+  {
+    SaveSignal<2> save(signal);
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
     sig->type = ContinueFragmented::CONTINUE_SENDING;
@@ -2532,7 +2538,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
   }
   tmp.p->m_callback = c;
   
-  if(!c_fragSenderRunning){
+  if(!c_fragSenderRunning)
+  {
+    SaveSignal<2> save(signal);
     c_fragSenderRunning = true;
     ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
     sig->type = ContinueFragmented::CONTINUE_SENDING;

=== modified file 'storage/ndb/src/kernel/vm/VMSignal.hpp'
--- a/storage/ndb/src/kernel/vm/VMSignal.hpp	2009-10-21 08:40:24 +0000
+++ b/storage/ndb/src/kernel/vm/VMSignal.hpp	2010-06-02 07:36:20 +0000
@@ -120,6 +120,37 @@ public:
   void garbage_register();
 };
 
+template<Uint32 len>
+class SaveSignal 
+{
+  Uint32 m_copy[len];
+  Signal * m_signal;
+
+public:
+  SaveSignal(Signal* signal) {
+    save(signal);
+  }
+
+  void save(Signal* signal) {
+    m_signal = signal;
+    for (Uint32 i = 0; i<len; i++)
+      m_copy[i] = m_signal->theData[i];
+  }
+
+  void clear() { m_signal = 0;}
+
+  void restore() {
+    for (Uint32 i = 0; i<len; i++)
+      m_signal->theData[i] = m_copy[i];
+  }
+
+  ~SaveSignal() {
+    if (m_signal)
+      restore();
+    clear();
+  }
+};
+
 inline
 Uint32
 Signal::getLength() const {

=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp	2010-03-04 14:31:16 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp	2010-06-01 10:45:29 +0000
@@ -562,7 +562,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int
       SocketClient s(0, 0);
       const char *bind_address= NULL;
       unsigned short bind_address_port= 0;
-      s.set_connect_timeout((handle->timeout+999)/1000);
+      s.set_connect_timeout(handle->timeout);
       if (!s.init())
       {
         fprintf(handle->errstream, 
@@ -1764,7 +1764,7 @@ ndb_mgm_listen_event_internal(NdbMgmHand
   int port= ndb_mgm_get_connected_port(handle);
   const char *bind_address= ndb_mgm_get_connected_bind_address(handle);
   SocketClient s(0, 0);
-  s.set_connect_timeout((handle->timeout+999)/1000);
+  s.set_connect_timeout(handle->timeout);
   if (!s.init())
   {
     fprintf(handle->errstream, "Unable to create socket");

Thread
bzr commit into mysql-5.1-telco-6.3 branch (Martin.Skold:3208) Bug#34303Bug#54155 Bug#54168Martin Skold2 Jun