Below is the list of changes that have just been committed into a local
5.1 repository of jonas. When jonas does a push these changes
will be propagated to the main repository and, within 24 hours after the
push, to the public repository.
For information on how to access the public repository
see http://dev.mysql.com/doc/mysql/en/installing-source-tree.html
ChangeSet@stripped, 2008-03-14 08:21:27+01:00, jonas@stripped +4 -0
ndb - add support for epoll
configure.in@stripped, 2008-03-14 08:21:22+01:00, jonas@stripped +1 -1
ndb - add support for epoll
storage/ndb/include/transporter/TransporterRegistry.hpp@stripped, 2008-03-14 08:21:23+01:00,
jonas@stripped +18 -1
ndb - add support for epoll
storage/ndb/src/common/transporter/TCP_Transporter.cpp@stripped, 2008-03-14 08:21:23+01:00,
jonas@stripped +1 -0
ndb - add support for epoll
storage/ndb/src/common/transporter/TransporterRegistry.cpp@stripped, 2008-03-14
08:21:23+01:00, jonas@stripped +151 -1
ndb - add support for epoll
diff -Nrup a/configure.in b/configure.in
--- a/configure.in 2008-02-20 22:12:19 +01:00
+++ b/configure.in 2008-03-14 08:21:22 +01:00
@@ -2030,7 +2030,7 @@ AC_FUNC_UTIME_NULL
AC_FUNC_VPRINTF
AC_CHECK_FUNCS(alarm bcmp bfill bmove bsearch bzero \
- chsize cuserid fchmod fcntl \
+ chsize cuserid epoll_create fchmod fcntl \
fconvert fdatasync finite fpresetsticky fpsetmask fsync ftruncate \
getcwd gethostbyaddr_r gethostbyname_r getpass getpassphrase getpwnam \
getpwuid getrlimit getrusage getwd index initgroups isnan \
diff -Nrup a/storage/ndb/include/transporter/TransporterRegistry.hpp
b/storage/ndb/include/transporter/TransporterRegistry.hpp
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2007-08-28 11:43:22 +02:00
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2008-03-14 08:21:23 +01:00
@@ -27,6 +27,9 @@
#ifndef TransporterRegistry_H
#define TransporterRegistry_H
+#if defined(HAVE_EPOLL_CREATE)
+#include <sys/epoll.h>
+#endif
#include "TransporterDefinitions.hpp"
#include <SocketServer.hpp>
#include <SocketClient.hpp>
@@ -273,7 +276,7 @@ public:
Transporter* get_transporter(NodeId nodeId);
NodeId get_localNodeId() { return localNodeId; };
-
+ void remove_from_epoll(NodeId node_id);
struct in_addr get_connect_address(NodeId node_id) const;
protected:
@@ -294,6 +297,20 @@ private:
int nSCITransporters;
int nSHMTransporters;
+#if defined(HAVE_EPOLL_CREATE)
+ typedef Bitmask<MAX_NTRANSPORTERS/32> TransporterMask;
+
+ int m_epoll_fd;
+ struct epoll_event *m_epoll_events;
+ bool change_epoll(TCP_Transporter *t, bool add);
+ void get_tcp_data(TCP_Transporter *t);
+
+ /**
+ * Bitmask of transporters that has data "carried over" since
+ * last performReceive
+ */
+ TransporterMask m_has_data_transporters;
+#endif
/**
* Arrays holding all transporters in the order they are created
*/
diff -Nrup a/storage/ndb/src/common/transporter/TCP_Transporter.cpp
b/storage/ndb/src/common/transporter/TCP_Transporter.cpp
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-01-24 12:20:55 +01:00
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2008-03-14 08:21:23 +01:00
@@ -455,6 +455,7 @@ TCP_Transporter::doReceive() {
void
TCP_Transporter::disconnectImpl() {
if(theSocket != NDB_INVALID_SOCKET){
+ m_transporter_registry.remove_from_epoll(remoteNodeId);
if(NDB_CLOSE_SOCKET(theSocket) < 0){
report_error(TE_ERROR_CLOSING_SOCKET);
}
diff -Nrup a/storage/ndb/src/common/transporter/TransporterRegistry.cpp
b/storage/ndb/src/common/transporter/TransporterRegistry.cpp
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2007-11-14 13:25:45
+01:00
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2008-03-14 08:21:23
+01:00
@@ -93,7 +93,20 @@ TransporterRegistry::TransporterRegistry
theTransporters = new Transporter * [maxTransporters];
performStates = new PerformState [maxTransporters];
ioStates = new IOState [maxTransporters];
-
+
+#if defined(HAVE_EPOLL_CREATE)
+ m_epoll_fd = 0;
+ m_epoll_events = new struct epoll_event[maxTransporters];
+ m_epoll_fd = epoll_create(maxTransporters);
+ if (m_epoll_fd == -1 || !m_epoll_events)
+ {
+ /* Failure to allocate data or get epoll socket, abort */
+ perror("Failed to alloc epoll-array or calling epoll_create...giving up!");
+ abort();
+ }
+ memset((char*)m_epoll_events, 0,
+ maxTransporters * sizeof(struct epoll_event));
+#endif
// Initialize member variables
nTransporters = 0;
nTCPTransporters = 0;
@@ -149,6 +162,10 @@ TransporterRegistry::~TransporterRegistr
delete[] performStates;
delete[] ioStates;
+#if defined(HAVE_EPOLL_CREATE)
+ delete [] m_epoll_events;
+ close(m_epoll_fd);
+#endif
if (m_mgm_handle)
ndb_mgm_destroy_handle(&m_mgm_handle);
@@ -681,6 +698,26 @@ TransporterRegistry::pollReceive(Uint32
#endif
#ifdef NDB_TCP_TRANSPORTER
+#if defined(HAVE_EPOLL_CREATE)
+ Uint32 num_trps = nTCPTransporters;
+ /**
+ * If any transporters have left-over data that was not fully executed in
+ * last loop, don't wait and return 'data available' even if nothing new
+ * from epoll.
+ */
+ if (!m_has_data_transporters.isclear())
+ {
+ timeOutMillis = 0;
+ retVal = 1;
+ }
+
+ if (num_trps)
+ {
+ tcpReadSelectReply = epoll_wait(m_epoll_fd, m_epoll_events,
+ num_trps, timeOutMillis);
+ retVal |= tcpReadSelectReply;
+ }
+#else
if(nTCPTransporters > 0 || retVal == 0)
{
retVal |= poll_TCP(timeOutMillis);
@@ -688,6 +725,7 @@ TransporterRegistry::pollReceive(Uint32
else
tcpReadSelectReply = 0;
#endif
+#endif
#ifdef NDB_SCI_TRANSPORTER
if(nSCITransporters > 0)
retVal |= poll_SCI(timeOutMillis);
@@ -802,11 +840,113 @@ TransporterRegistry::poll_TCP(Uint32 tim
}
#endif
+void
+TransporterRegistry::remove_from_epoll(NodeId node_id)
+{
+#if defined(HAVE_EPOLL_CREATE)
+ DBUG_ENTER("TransporterRegistry::remove_from_epoll");
+ change_epoll((TCP_Transporter*)theTransporters[node_id], FALSE);
+ m_has_data_transporters.clear(node_id);
+ DBUG_VOID_RETURN;
+#else
+ (void)node_id;
+ return;
+#endif
+}
+
+#if defined(HAVE_EPOLL_CREATE)
+bool
+TransporterRegistry::change_epoll(TCP_Transporter *t, bool add)
+{
+ struct epoll_event event_poll;
+ int sock_fd = t->getSocket();
+ int node_id = t->getRemoteNodeId();
+ int op = add ? EPOLL_CTL_ADD : EPOLL_CTL_DEL;
+ int ret_val, error;
+
+ if (sock_fd == NDB_INVALID_SOCKET)
+ return FALSE;
+
+ event_poll.data.u32 = t->getRemoteNodeId();
+ event_poll.events = EPOLLIN;
+ ret_val = epoll_ctl(m_epoll_fd, op, sock_fd, &event_poll);
+ if (!ret_val)
+ goto ok;
+ error= errno;
+ if (error == ENOENT && !add)
+ {
+ /*
+ * Could be that socket was closed premature to this call.
+ * Not a problem that this occurs.
+ */
+ goto ok;
+ }
+ if (!add || (add && (error != ENOMEM)))
+ {
+ /*
+ * Serious problems, we are either using wrong parameters,
+ * have permission problems or the socket doesn't support
+ * epoll!!
+ */
+ perror("Failed to add fd to epoll-set...giving up!");
+ abort();
+ }
+ ndbout << "We lacked memory to add the socket for node id ";
+ ndbout << node_id << endl;
+ return TRUE;
+
+ok:
+ return FALSE;
+}
+
+void
+TransporterRegistry::get_tcp_data(TCP_Transporter *t)
+{
+ const NodeId node_id = t->getRemoteNodeId();
+ bool hasdata = false;
+ checkJobBuffer();
+ if (is_connected(node_id) && t->isConnected())
+ {
+ t->doReceive();
+
+ Uint32 *ptr;
+ Uint32 sz = t->getReceiveData(&ptr);
+ transporter_recv_from(callbackObj, node_id);
+ Uint32 szUsed = unpack(ptr, sz, node_id, ioStates[node_id]);
+ t->updateReceiveDataPtr(szUsed);
+ hasdata = t->hasReceiveData();
+ }
+ m_has_data_transporters.set(node_id, hasdata);
+}
+
+#endif
void
TransporterRegistry::performReceive()
{
#ifdef NDB_TCP_TRANSPORTER
+#if defined(HAVE_EPOLL_CREATE)
+ int num_socket_events = tcpReadSelectReply;
+ int i;
+
+ if (num_socket_events > 0)
+ {
+ for (i = 0; i < num_socket_events; i++)
+ {
+ m_has_data_transporters.set(m_epoll_events[i].data.u32);
+ }
+ }
+ else if (num_socket_events < 0)
+ {
+ assert(errno == EINTR);
+ }
+
+ Uint32 id = 0;
+ while ((id = m_has_data_transporters.find(id + 1)) != BitmaskImpl::NotFound)
+ {
+ get_tcp_data((TCP_Transporter*)theTransporters[id]);
+ }
+#else
for (int i=0; i<nTCPTransporters; i++)
{
checkJobBuffer();
@@ -833,6 +973,7 @@ TransporterRegistry::performReceive()
}
}
#endif
+#endif
#ifdef NDB_SCI_TRANSPORTER
//performReceive
@@ -1027,6 +1168,14 @@ TransporterRegistry::report_connect(Node
DBUG_ENTER("TransporterRegistry::report_connect");
DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
performStates[node_id] = CONNECTED;
+#if defined(HAVE_EPOLL_CREATE)
+ if (change_epoll((TCP_Transporter*)theTransporters[node_id],
+ TRUE))
+ {
+ performStates[node_id] = DISCONNECTING;
+ DBUG_VOID_RETURN;
+ }
+#endif
reportConnect(callbackObj, node_id);
DBUG_VOID_RETURN;
}
@@ -1037,6 +1186,7 @@ TransporterRegistry::report_disconnect(N
DBUG_ENTER("TransporterRegistry::report_disconnect");
DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
performStates[node_id] = DISCONNECTED;
+ remove_from_epoll(node_id);
reportDisconnect(callbackObj, node_id, errnum);
DBUG_VOID_RETURN;
}
| Thread |
|---|
| • bk commit into 5.1 tree (jonas:1.2570) | jonas | 14 Mar |