List:Commits« Previous MessageNext Message »
From:jonas Date:March 14 2008 8:21am
Subject:bk commit into 5.1 tree (jonas:1.2570)
View as plain text  
Below is the list of changes that have just been committed into a local
5.1 repository of jonas.  When jonas does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html

ChangeSet@stripped, 2008-03-14 08:21:27+01:00, jonas@stripped +4 -0
  ndb - add support for epoll

  configure.in@stripped, 2008-03-14 08:21:22+01:00, jonas@stripped +1 -1
    ndb - add support for epoll

  storage/ndb/include/transporter/TransporterRegistry.hpp@stripped, 2008-03-14 08:21:23+01:00,
jonas@stripped +18 -1
    ndb - add support for epoll

  storage/ndb/src/common/transporter/TCP_Transporter.cpp@stripped, 2008-03-14 08:21:23+01:00,
jonas@stripped +1 -0
    ndb - add support for epoll

  storage/ndb/src/common/transporter/TransporterRegistry.cpp@stripped, 2008-03-14
08:21:23+01:00, jonas@stripped +151 -1
    ndb - add support for epoll

diff -Nrup a/configure.in b/configure.in
--- a/configure.in	2008-02-20 22:12:19 +01:00
+++ b/configure.in	2008-03-14 08:21:22 +01:00
@@ -2030,7 +2030,7 @@ AC_FUNC_UTIME_NULL
 AC_FUNC_VPRINTF
 
 AC_CHECK_FUNCS(alarm bcmp bfill bmove bsearch bzero \
-  chsize cuserid fchmod fcntl \
+  chsize cuserid epoll_create fchmod fcntl \
   fconvert fdatasync finite fpresetsticky fpsetmask fsync ftruncate \
   getcwd gethostbyaddr_r gethostbyname_r getpass getpassphrase getpwnam \
   getpwuid getrlimit getrusage getwd index initgroups isnan \
diff -Nrup a/storage/ndb/include/transporter/TransporterRegistry.hpp
b/storage/ndb/include/transporter/TransporterRegistry.hpp
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2007-08-28 11:43:22 +02:00
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2008-03-14 08:21:23 +01:00
@@ -27,6 +27,9 @@
 #ifndef TransporterRegistry_H
 #define TransporterRegistry_H
 
+#if defined(HAVE_EPOLL_CREATE)
+#include <sys/epoll.h>
+#endif
 #include "TransporterDefinitions.hpp"
 #include <SocketServer.hpp>
 #include <SocketClient.hpp>
@@ -273,7 +276,7 @@ public:
   Transporter* get_transporter(NodeId nodeId);
   NodeId get_localNodeId() { return localNodeId; };
 
-
+  void remove_from_epoll(NodeId node_id);
   struct in_addr get_connect_address(NodeId node_id) const;
 protected:
   
@@ -294,6 +297,20 @@ private:
   int nSCITransporters;
   int nSHMTransporters;
 
+#if defined(HAVE_EPOLL_CREATE)
+  typedef Bitmask<MAX_NTRANSPORTERS/32> TransporterMask;
+
+  int m_epoll_fd;
+  struct epoll_event *m_epoll_events;
+  bool change_epoll(TCP_Transporter *t, bool add);
+  void get_tcp_data(TCP_Transporter *t);
+
+  /**
+   * Bitmask of transporters that has data "carried over" since
+   *   last performReceive
+   */
+  TransporterMask m_has_data_transporters;
+#endif
   /**
    * Arrays holding all transporters in the order they are created
    */
diff -Nrup a/storage/ndb/src/common/transporter/TCP_Transporter.cpp
b/storage/ndb/src/common/transporter/TCP_Transporter.cpp
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-01-24 12:20:55 +01:00
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2008-03-14 08:21:23 +01:00
@@ -455,6 +455,7 @@ TCP_Transporter::doReceive() {
 void
 TCP_Transporter::disconnectImpl() {
   if(theSocket != NDB_INVALID_SOCKET){
+    m_transporter_registry.remove_from_epoll(remoteNodeId);
     if(NDB_CLOSE_SOCKET(theSocket) < 0){
       report_error(TE_ERROR_CLOSING_SOCKET);
     }
diff -Nrup a/storage/ndb/src/common/transporter/TransporterRegistry.cpp
b/storage/ndb/src/common/transporter/TransporterRegistry.cpp
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2007-11-14 13:25:45
+01:00
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2008-03-14 08:21:23
+01:00
@@ -93,7 +93,20 @@ TransporterRegistry::TransporterRegistry
   theTransporters     = new Transporter     * [maxTransporters];
   performStates       = new PerformState      [maxTransporters];
   ioStates            = new IOState           [maxTransporters]; 
-  
+ 
+#if defined(HAVE_EPOLL_CREATE)
+ m_epoll_fd = 0;
+ m_epoll_events       = new struct epoll_event[maxTransporters];
+ m_epoll_fd = epoll_create(maxTransporters);
+ if (m_epoll_fd == -1 || !m_epoll_events)
+ {
+   /* Failure to allocate data or get epoll socket, abort */
+   perror("Failed to alloc epoll-array or calling epoll_create...giving up!");
+   abort();
+ }
+ memset((char*)m_epoll_events, 0,
+        maxTransporters * sizeof(struct epoll_event));
+#endif
   // Initialize member variables
   nTransporters    = 0;
   nTCPTransporters = 0;
@@ -149,6 +162,10 @@ TransporterRegistry::~TransporterRegistr
   delete[] performStates;
   delete[] ioStates;
 
+#if defined(HAVE_EPOLL_CREATE)
+  delete [] m_epoll_events;
+  close(m_epoll_fd);
+#endif
   if (m_mgm_handle)
     ndb_mgm_destroy_handle(&m_mgm_handle);
 
@@ -681,6 +698,26 @@ TransporterRegistry::pollReceive(Uint32 
 #endif
 
 #ifdef NDB_TCP_TRANSPORTER
+#if defined(HAVE_EPOLL_CREATE)
+  Uint32 num_trps = nTCPTransporters;
+  /**
+   * If any transporters have left-over data that was not fully executed in
+   * last loop, don't wait and return 'data available' even if nothing new
+   * from epoll.
+   */
+  if (!m_has_data_transporters.isclear())
+  {
+    timeOutMillis = 0;
+    retVal = 1;
+  }
+  
+  if (num_trps)
+  {
+    tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
+                                    num_trps, timeOutMillis);
+    retVal |= tcpReadSelectReply;
+  }
+#else
   if(nTCPTransporters > 0 || retVal == 0)
   {
     retVal |= poll_TCP(timeOutMillis);
@@ -688,6 +725,7 @@ TransporterRegistry::pollReceive(Uint32 
   else
     tcpReadSelectReply = 0;
 #endif
+#endif
 #ifdef NDB_SCI_TRANSPORTER
   if(nSCITransporters > 0)
     retVal |= poll_SCI(timeOutMillis);
@@ -802,11 +840,113 @@ TransporterRegistry::poll_TCP(Uint32 tim
 }
 #endif
 
+void
+TransporterRegistry::remove_from_epoll(NodeId node_id)
+{
+#if defined(HAVE_EPOLL_CREATE)
+  DBUG_ENTER("TransporterRegistry::remove_from_epoll");
+  change_epoll((TCP_Transporter*)theTransporters[node_id], FALSE);
+  m_has_data_transporters.clear(node_id);
+  DBUG_VOID_RETURN;
+#else
+  (void)node_id;
+  return;
+#endif
+}
+
+#if defined(HAVE_EPOLL_CREATE)
+bool
+TransporterRegistry::change_epoll(TCP_Transporter *t, bool add)
+{
+  struct epoll_event event_poll;
+  int sock_fd = t->getSocket();
+  int node_id = t->getRemoteNodeId();
+  int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
+  int ret_val, error;
+
+  if (sock_fd == NDB_INVALID_SOCKET)
+    return FALSE;
+
+  event_poll.data.u32 = t->getRemoteNodeId();
+  event_poll.events = EPOLLIN;
+  ret_val = epoll_ctl(m_epoll_fd, op, sock_fd, &event_poll);
+  if (!ret_val)
+    goto ok;
+  error= errno;
+  if (error == ENOENT && !add)
+  {
+    /*
+     * Could be that socket was closed premature to this call.
+     * Not a problem that this occurs.
+     */
+    goto ok;
+  }
+  if (!add || (add && (error != ENOMEM)))
+  {
+    /*
+     * Serious problems, we are either using wrong parameters,
+     * have permission problems or the socket doesn't support
+     * epoll!!
+     */
+    perror("Failed to add fd to epoll-set...giving up!");
+    abort();
+  }
+  ndbout << "We lacked memory to add the socket for node id ";
+  ndbout << node_id << endl;
+  return TRUE;
+
+ok:
+  return FALSE;
+}
+
+void
+TransporterRegistry::get_tcp_data(TCP_Transporter *t)
+{
+  const NodeId node_id = t->getRemoteNodeId();
+  bool hasdata = false;
+  checkJobBuffer();
+  if (is_connected(node_id) && t->isConnected())
+  {
+    t->doReceive();
+    
+    Uint32 *ptr;
+    Uint32 sz = t->getReceiveData(&ptr);
+    transporter_recv_from(callbackObj, node_id);
+    Uint32 szUsed = unpack(ptr, sz, node_id, ioStates[node_id]);
+    t->updateReceiveDataPtr(szUsed);
+    hasdata = t->hasReceiveData();
+  }
+  m_has_data_transporters.set(node_id, hasdata);
+}
+
+#endif
 
 void
 TransporterRegistry::performReceive()
 {
 #ifdef NDB_TCP_TRANSPORTER
+#if defined(HAVE_EPOLL_CREATE)
+  int num_socket_events = tcpReadSelectReply;
+  int i;
+
+  if (num_socket_events > 0)
+  {
+    for (i = 0; i < num_socket_events; i++)
+    {
+      m_has_data_transporters.set(m_epoll_events[i].data.u32);
+    }
+  }
+  else if (num_socket_events < 0)
+  {
+    assert(errno == EINTR);
+  }
+  
+  Uint32 id = 0;
+  while ((id = m_has_data_transporters.find(id + 1)) != BitmaskImpl::NotFound)
+  {
+    get_tcp_data((TCP_Transporter*)theTransporters[id]);
+  }
+#else
   for (int i=0; i<nTCPTransporters; i++) 
   {
     checkJobBuffer();
@@ -833,6 +973,7 @@ TransporterRegistry::performReceive()
     }
   }
 #endif
+#endif
   
 #ifdef NDB_SCI_TRANSPORTER
   //performReceive
@@ -1027,6 +1168,14 @@ TransporterRegistry::report_connect(Node
   DBUG_ENTER("TransporterRegistry::report_connect");
   DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
   performStates[node_id] = CONNECTED;
+#if defined(HAVE_EPOLL_CREATE)
+  if (change_epoll((TCP_Transporter*)theTransporters[node_id],
+                   TRUE))
+  {
+    performStates[node_id] = DISCONNECTING;
+    DBUG_VOID_RETURN;
+  }
+#endif
   reportConnect(callbackObj, node_id);
   DBUG_VOID_RETURN;
 }
@@ -1037,6 +1186,7 @@ TransporterRegistry::report_disconnect(N
   DBUG_ENTER("TransporterRegistry::report_disconnect");
   DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
   performStates[node_id] = DISCONNECTED;
+  remove_from_epoll(node_id);
   reportDisconnect(callbackObj, node_id, errnum);
   DBUG_VOID_RETURN;
 }
Thread
bk commit into 5.1 tree (jonas:1.2570)jonas14 Mar