#At file:///home/marty/MySQL/mysql-5.1-telco-6.3/
3208 Martin Skold 2010-06-02 [merge]
Merge
added:
storage/ndb/include/portlib/ndb_socket_poller.h
modified:
mysql-test/t/ctype_cp932_binlog_stm.test
storage/ndb/include/portlib/NdbTCP.h
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/include/util/SocketClient.hpp
storage/ndb/include/util/SocketServer.hpp
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/TCP_Transporter.hpp
storage/ndb/src/common/transporter/Transporter.cpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/common/util/SocketClient.cpp
storage/ndb/src/common/util/SocketServer.cpp
storage/ndb/src/common/util/socket_io.cpp
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp
storage/ndb/src/kernel/vm/SimulatedBlock.cpp
storage/ndb/src/kernel/vm/VMSignal.hpp
storage/ndb/src/mgmapi/mgmapi.cpp
=== modified file 'mysql-test/t/ctype_cp932_binlog_stm.test'
--- a/mysql-test/t/ctype_cp932_binlog_stm.test 2009-08-26 10:59:55 +0000
+++ b/mysql-test/t/ctype_cp932_binlog_stm.test 2010-06-01 20:24:30 +0000
@@ -32,6 +32,10 @@ delimiter ;|
# Note: 365 (depends on FD event size changes) is a magic position (found experimentally, depends on
# the log's contents) that caused the server crash.
+-- disable_query_log
+call mtr.add_suppression("Error in Log_event::read_log_event\\\(\\\): 'read error', data_len: 66124, event_type: 116");
+-- enable_query_log
+
--error 1220
SHOW BINLOG EVENTS FROM 365;
=== modified file 'storage/ndb/include/portlib/NdbTCP.h'
--- a/storage/ndb/include/portlib/NdbTCP.h 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/portlib/NdbTCP.h 2010-06-01 10:45:29 +0000
@@ -54,6 +54,26 @@
#define NDB_SOCKLEN_T SOCKET_SIZE_TYPE
+
+/* Forward compatibility functions */
+
+typedef NDB_SOCKET_TYPE ndb_socket_t;
+typedef NDB_SOCKET_TYPE ndb_native_socket_t;
+
+static inline bool
+my_socket_valid(ndb_socket_t s)
+{
+ return (s != NDB_INVALID_SOCKET);
+}
+
+static inline ndb_native_socket_t
+ndb_socket_get_native(ndb_socket_t s)
+{
+ return s;
+}
+
+#include <portlib/ndb_socket_poller.h>
+
#ifdef __cplusplus
extern "C" {
#endif
=== added file 'storage/ndb/include/portlib/ndb_socket_poller.h'
--- a/storage/ndb/include/portlib/ndb_socket_poller.h 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/include/portlib/ndb_socket_poller.h 2010-06-01 10:45:29 +0000
@@ -0,0 +1,287 @@
+/*
+ Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef NDB_SOCKET_POLLER_H
+#define NDB_SOCKET_POLLER_H
+
+/*
+ Portability layer used for waiting on socket events
+*/
+
+class ndb_socket_poller {
+ // Max number of fds the list can hold, defaults to 1 and
+ // can be dynamically expanded by calling 'set_max_count'
+ unsigned m_max_count;
+
+ // Current number of fds in the list
+ unsigned m_count;
+
+#ifdef HAVE_POLL
+ // The list of pollfds, initial size is 1 and m_pfds will
+ // then point at m_one_pfd. After dynamic expand points at
+ // dynamic list of pollfds
+ struct pollfd m_one_pfd;
+ struct pollfd* m_pfds;
+#else
+#if defined(_WIN32)
+ // Utility functions for dynamically expanding the fd_set
+ // on Windows to get around the hardcoded FD_SETSIZE limit.
+ static bool
+ set_max_count(fd_set* set, fd_set* static_set, unsigned count) {
+ void* ptr = malloc(sizeof(fd_set) + count-1*sizeof(SOCKET));
+ if (!ptr)
+ return false;
+ if (set != static_set)
+ free(set);
+ set = (fd_set*)ptr;
+ clear(set);
+ return true;
+ }
+
+ static void
+ set_fd(fd_set* set, SOCKET s) {
+ // Avoid use of FD_SET since it silently drop
+ // sockets when FD_SETSIZE fd_count is reached
+ set->fd_array[set->fd_count++] = s;
+ }
+
+ static void
+ clear(fd_set* set) {
+ FD_ZERO(set);
+ }
+#endif
+ fd_set m_one_read_set;
+ fd_set m_one_write_set;
+ fd_set m_one_excp_set;
+ fd_set* m_read_set;
+ fd_set* m_write_set;
+ fd_set* m_excp_set;
+
+ // Mapping from "index" to "fd"
+ ndb_native_socket_t m_one_fd;
+ ndb_native_socket_t* m_fds;
+
+ int m_nfds; // Max fd number for 'select'
+#endif
+
+public:
+
+ ndb_socket_poller(void) :
+ m_max_count(1)
+#ifdef HAVE_POLL
+ , m_pfds(&m_one_pfd)
+#else
+ , m_read_set(&m_one_read_set)
+ , m_write_set(&m_one_write_set)
+ , m_excp_set(&m_one_excp_set)
+ , m_fds(&m_one_fd)
+#endif
+ {
+ clear();
+ }
+
+ void clear(void) {
+ m_count = 0;
+#ifndef HAVE_POLL
+ FD_ZERO(m_read_set);
+ FD_ZERO(m_write_set);
+ FD_ZERO(m_excp_set);
+ m_nfds = 0;
+#endif
+ }
+
+ ~ndb_socket_poller() {
+#ifdef HAVE_POLL
+ if (m_pfds != &m_one_pfd)
+ delete[] m_pfds;
+#else
+#ifdef _WIN32
+ if (m_read_set != &m_one_read_set)
+ free(m_read_set);
+ if (m_write_set != &m_one_write_set)
+ free(m_write_set);
+ if (m_excp_set != &m_one_excp_set)
+ free(m_excp_set);
+#endif
+ if (m_fds != &m_one_fd)
+ delete[] m_fds;
+#endif
+ }
+
+ bool set_max_count(unsigned count) {
+ if (count <= m_max_count)
+ {
+ // Ignore decrease or setting same value
+ return true;
+ }
+#ifdef HAVE_POLL
+ struct pollfd* pfds = new struct pollfd[count];
+ if (pfds == NULL)
+ return false;
+ if (m_pfds != &m_one_pfd)
+ delete[] m_pfds;
+ m_pfds = pfds;
+#else
+#if defined(_WIN32)
+ if (count > FD_SETSIZE)
+ {
+ // Expand the arrays above the builtin FD_SETSIZE
+ if (!set_max_count(m_read_set, &m_one_read_set, count) ||
+ !set_max_count(m_write_set, &m_one_write_set, count) ||
+ !set_max_count(m_excp_set, &m_one_excp_set, count))
+ return false;
+ }
+#endif
+ ndb_native_socket_t* fds = new ndb_native_socket_t[count];
+ if (fds == NULL)
+ return false;
+ if (m_fds != &m_one_fd)
+ delete[] m_fds;
+ m_fds = fds;
+#endif
+ m_max_count = count;
+ return true;
+ }
+
+ unsigned add(ndb_socket_t sock, bool read, bool write, bool error) {
+ const unsigned index = m_count;
+#ifdef HAVE_POLL
+ assert(m_count < m_max_count);
+ struct pollfd &pfd = m_pfds[m_count++];
+ pfd.fd = ndb_socket_get_native(sock);
+
+ short events = 0;
+ if (read)
+ events |= POLLIN;
+ if (write)
+ events |= POLLOUT;
+ if (error)
+ events |= POLLPRI;
+ pfd.events = events;
+
+ pfd.revents = 0;
+#else
+#if defined(_WIN32)
+ if (read)
+ set_fd(m_read_set, ndb_socket_get_native(sock));
+ if (write)
+ set_fd(m_write_set, ndb_socket_get_native(sock));
+ if (error)
+ set_fd(m_excp_set, ndb_socket_get_native(sock));
+ // Not counting nfds on Windows since select ignores it anyway
+ assert(m_nfds == 0);
+#else
+ if (fd < 0 || fd >= FD_SETSIZE)
+ {
+ fprintf(stderr, "Maximum value for FD_SETSIZE: %d exceeded when"
+ "trying to add fd: %d", FD_SETSIZE, fd);
+ fflush(stderr);
+ abort();
+ }
+ if (read)
+ FD_SET(fd, m_read_set);
+ if (write)
+ FD_SET(fd, m_write_set);
+ if (error)
+ FD_SET(fd, m_excp_set);
+ if (fd > m_nfds)
+ m_nfds = fd;
+#endif
+ // Maintain mapping from index to fd
+ m_fds[m_count++] = ndb_socket_get_native(sock);
+#endif
+ assert(m_count > index);
+ return index;
+ }
+
+ unsigned count(void) const {
+ return m_count;
+ }
+
+ bool is_socket_equal(unsigned index, ndb_socket_t socket) const {
+ assert(index < m_count);
+ assert(m_count <= m_max_count);
+#ifdef HAVE_POLL
+ return (m_pfds[index].fd == ndb_socket_get_native(socket));
+#else
+ return (m_fds[index] == ndb_socket_get_native(socket));
+#endif
+ }
+
+ bool has_read(unsigned index) const {
+ assert(index < m_count);
+ assert(m_count <= m_max_count);
+#ifdef HAVE_POLL
+ return (m_pfds[index].revents & POLLIN);
+#else
+ return FD_ISSET(m_fds[index], m_read_set);
+#endif
+ }
+
+ bool has_write(unsigned index) const {
+ assert(index < m_count);
+ assert(m_count <= m_max_count);
+#ifdef HAVE_POLL
+ return (m_pfds[index].revents & POLLOUT);
+#else
+ return FD_ISSET(m_fds[index], m_write_set);
+#endif
+ }
+
+ int poll(int timeout) {
+#ifdef HAVE_POLL
+ return ::poll(m_pfds, m_count, timeout);
+#else
+
+#ifdef _WIN32
+ if (m_count == 0)
+ {
+ // Windows does not sleep on 'select' with 0 sockets
+ Sleep(timeout);
+ return 0; // Timeout occured
+ }
+#endif
+
+ struct timeval tv;
+ tv.tv_sec = (timeout / 1000);
+ tv.tv_usec = (timeout % 1000) * 1000;
+
+ return select(m_nfds+1, m_read_set, m_write_set, m_excp_set,
+ timeout == -1 ? NULL : &tv);
+#endif
+ }
+};
+
+
+static inline
+int
+ndb_poll(ndb_socket_t sock,
+ bool read, bool write, bool error, int timeout_millis)
+{
+ ndb_socket_poller poller;
+ (void)poller.add(sock, read, write, error);
+
+ const int res = poller.poll(timeout_millis);
+ if (res <= 0)
+ return res;
+
+ assert(res >= 1);
+
+ return res;
+}
+
+#endif
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2009-10-20 18:19:00 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2010-06-01 10:45:29 +0000
@@ -362,8 +362,8 @@ private:
* Used in polling if exists TCP_Transporter
*/
int tcpReadSelectReply;
- fd_set tcpReadset;
-
+ ndb_socket_poller m_socket_poller;
+
Uint32 poll_TCP(Uint32 timeOutMillis);
Uint32 poll_SCI(Uint32 timeOutMillis);
Uint32 poll_SHM(Uint32 timeOutMillis);
=== modified file 'storage/ndb/include/util/SocketClient.hpp'
--- a/storage/ndb/include/util/SocketClient.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/util/SocketClient.hpp 2010-06-01 10:45:29 +0000
@@ -25,7 +25,7 @@ class SocketAuthenticator;
class SocketClient
{
struct sockaddr_in m_servaddr;
- unsigned int m_connect_timeout_sec;
+ unsigned int m_connect_timeout_millisec;
unsigned short m_port;
char *m_server_name;
SocketAuthenticator *m_auth;
@@ -37,8 +37,8 @@ public:
m_port = port;
m_servaddr.sin_port = htons(m_port);
};
- void set_connect_timeout(unsigned int s) {
- m_connect_timeout_sec= s;
+ void set_connect_timeout(unsigned int timeout_millisec) {
+ m_connect_timeout_millisec = timeout_millisec;
}
unsigned short get_port() { return m_port; };
char *get_server_name() { return m_server_name; };
=== modified file 'storage/ndb/include/util/SocketServer.hpp'
--- a/storage/ndb/include/util/SocketServer.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/include/util/SocketServer.hpp 2010-06-01 10:45:29 +0000
@@ -127,9 +127,10 @@ private:
NdbLockable m_session_mutex;
Vector<SessionInstance> m_sessions;
MutexVector<ServiceInstance> m_services;
+ ndb_socket_poller m_services_poller;
unsigned m_maxSessions;
-
- void doAccept();
+
+ bool doAccept();
void checkSessionsImpl();
void startSession(SessionInstance &);
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2009-12-15 15:37:38 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2010-06-01 10:45:29 +0000
@@ -86,7 +86,8 @@ TCP_Transporter::TCP_Transporter(Transpo
0, false,
conf->checksum,
conf->signalId),
- m_sendBuffer(conf->tcp.sendBufferSize)
+ m_sendBuffer(conf->tcp.sendBufferSize),
+ m_poll_index(~0)
{
maxReceiveSize = conf->tcp.maxReceiveSize;
@@ -257,20 +258,15 @@ TCP_Transporter::setSocketNonBlocking(ND
#endif
bool
-TCP_Transporter::sendIsPossible(struct timeval * timeout) {
- if(theSocket != NDB_INVALID_SOCKET){
- fd_set writeset;
- FD_ZERO(&writeset);
- FD_SET(theSocket, &writeset);
-
- int selectReply = select(theSocket + 1, NULL, &writeset, NULL, timeout);
+TCP_Transporter::sendIsPossible(int timeout_millisec) const
+{
+ if (!my_socket_valid(theSocket))
+ return false;
- if ((selectReply > 0) && FD_ISSET(theSocket, &writeset))
- return true;
- else
- return false;
- }
- return false;
+ if (ndb_poll(theSocket, false, true, false, timeout_millisec) <= 0)
+ return false; // Timeout or error occured
+
+ return true;
}
Uint32
@@ -283,15 +279,13 @@ Uint32 *
TCP_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
Uint32 * insertPtr = m_sendBuffer.getInsertPtr(lenBytes);
-
- struct timeval timeout = {0, 10000};
if (insertPtr == 0) {
//-------------------------------------------------
// Buffer was completely full. We have severe problems.
// We will attempt to wait for a small time
//-------------------------------------------------
- if(sendIsPossible(&timeout)) {
+ if(sendIsPossible(10)) {
//-------------------------------------------------
// Send is possible after the small timeout.
//-------------------------------------------------
@@ -326,8 +320,7 @@ TCP_Transporter::updateWritePtr(Uint32 l
// we will not worry since we will soon be back for
// a renewed trial.
//-------------------------------------------------
- struct timeval no_timeout = {0,0};
- if(sendIsPossible(&no_timeout)) {
+ if(sendIsPossible(0)) {
//-------------------------------------------------
// Send was possible, attempt at a send.
//-------------------------------------------------
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2010-06-01 10:45:29 +0000
@@ -139,7 +139,7 @@ private:
static bool setSocketNonBlocking(NDB_SOCKET_TYPE aSocket);
virtual int pre_connect_options(NDB_SOCKET_TYPE aSocket);
- bool sendIsPossible(struct timeval * timeout);
+ bool sendIsPossible(int timeout_millisec) const;
/**
* Statistics
@@ -157,6 +157,10 @@ private:
*/
Uint32 overloadedPct;
void update_status_overloaded();
+
+ unsigned m_poll_index;
+ void set_poll_index(unsigned index) { m_poll_index = index; };
+ unsigned get_poll_index(void) const { return m_poll_index; };
};
inline
=== modified file 'storage/ndb/src/common/transporter/Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Transporter.cpp 2009-08-03 12:54:45 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.cpp 2010-06-01 10:45:29 +0000
@@ -89,7 +89,7 @@ Transporter::Transporter(TransporterRegi
new SocketAuthSimple("ndbd",
"ndbd passwd"));
- m_socket_client->set_connect_timeout((m_timeOutMillis+999)/1000);
+ m_socket_client->set_connect_timeout(m_timeOutMillis);
}
DBUG_VOID_RETURN;
}
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2010-05-07 13:08:39 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2010-06-01 11:47:29 +0000
@@ -218,7 +218,11 @@ TransporterRegistry::init(NodeId nodeId)
localNodeId = nodeId;
DEBUG("TransporterRegistry started node: " << localNodeId);
-
+
+ if (!m_socket_poller.set_max_count(maxTransporters +
+ 1 /* wakeup socket */))
+ DBUG_RETURN(false);
+
DBUG_RETURN(true);
}
@@ -284,15 +288,6 @@ TransporterRegistry::connect_server(NDB_
g_eventLogger->error("Incompatible configuration: Transporter type "
"mismatch with node %d", nodeId);
- // wait for socket close for 1 second to let message arrive at client
- {
- fd_set a_set;
- FD_ZERO(&a_set);
- FD_SET(sockfd, &a_set);
- struct timeval timeout;
- timeout.tv_sec = 1; timeout.tv_usec = 0;
- select(sockfd+1, &a_set, 0, 0, &timeout);
- }
DBUG_RETURN(false);
}
}
@@ -877,68 +872,38 @@ Uint32
TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
{
bool hasdata = false;
- if (false && nTCPTransporters == 0)
- {
- tcpReadSelectReply = 0;
- return 0;
- }
-
- NDB_SOCKET_TYPE maxSocketValue = -1;
-
- // Needed for TCP/IP connections
- // The read- and writeset are used by select
-
- FD_ZERO(&tcpReadset);
- // Prepare for sending and receiving
- for (int i = 0; i < nTCPTransporters; i++) {
- TCP_Transporter * t = theTCPTransporters[i];
- Uint32 node_id= t->getRemoteNodeId();
-
- // If the transporter is connected
- if (is_connected(node_id) && t->isConnected()) {
-
- const NDB_SOCKET_TYPE socket = t->getSocket();
- // Find the highest socket value. It will be used by select
- if (socket > maxSocketValue)
- maxSocketValue = socket;
-
- // Put the connected transporters in the socket read-set
- FD_SET(socket, &tcpReadset);
- }
- hasdata |= t->hasReceiveData();
- }
-
+ m_socket_poller.clear();
+
if (m_has_extra_wakeup_socket)
{
const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
- if (socket > maxSocketValue)
- maxSocketValue = socket;
- // Put the wakup-socket in the read-set
- FD_SET(socket, &tcpReadset);
+ // Poll the wakup-socket for read
+ m_socket_poller.add(socket, true, false, false);
}
- timeOutMillis = hasdata ? 0 : timeOutMillis;
-
- struct timeval timeout;
- timeout.tv_sec = timeOutMillis / 1000;
- timeout.tv_usec = (timeOutMillis % 1000) * 1000;
+ for (int i = 0; i < nTCPTransporters; i++)
+ {
+ unsigned index = ~0;
+ TCP_Transporter * t = theTCPTransporters[i];
+ const Uint32 node_id = t->getRemoteNodeId();
+ const NDB_SOCKET_TYPE socket = t->getSocket();
- // The highest socket value plus one
- maxSocketValue++;
-
- tcpReadSelectReply = select(maxSocketValue, &tcpReadset, 0, 0, &timeout);
- if(false && tcpReadSelectReply == -1 && errno == EINTR)
- g_eventLogger->info("woke-up by signal");
+ if (is_connected(node_id) && t->isConnected() &&
+ my_socket_valid(socket))
+ {
+ // Poll the connected transporter for read
+ index = m_socket_poller.add(socket, true, false, false);
+ }
+ // Remember the index into poll list
+ t->set_poll_index(index);
-#ifdef NDB_WIN32
- if(tcpReadSelectReply == SOCKET_ERROR)
- {
- NdbSleep_MilliSleep(timeOutMillis);
+ hasdata |= t->hasReceiveData();
}
-#endif
-
+
+ tcpReadSelectReply = m_socket_poller.poll(hasdata ? 0 : timeOutMillis);
+
return tcpReadSelectReply || hasdata;
}
#endif
@@ -1057,8 +1022,10 @@ TransporterRegistry::performReceive()
{
if (m_has_extra_wakeup_socket)
{
- if (FD_ISSET(m_extra_wakeup_sockets[0], &tcpReadset))
+ // The wakeupsocket is always added first => use index 0
+ if (m_socket_poller.has_read(0))
{
+ assert(m_socket_poller.is_socket_equal(0, m_extra_wakeup_sockets[0]));
consume_extra_sockets();
}
}
@@ -1068,14 +1035,19 @@ TransporterRegistry::performReceive()
checkJobBuffer();
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
- const NDB_SOCKET_TYPE socket = t->getSocket();
+
if(is_connected(nodeId)){
if(t->isConnected())
{
- if (FD_ISSET(socket, &tcpReadset))
+ const unsigned index = t->get_poll_index();
+ if (index != (unsigned)~0 &&
+ m_socket_poller.has_read(index))
{
+ assert(m_socket_poller.is_socket_equal(index, t->getSocket()));
t->doReceive();
}
+ // Reset the index into poll list
+ t->set_poll_index(~0);
if (t->hasReceiveData())
{
=== modified file 'storage/ndb/src/common/util/SocketClient.cpp'
--- a/storage/ndb/src/common/util/SocketClient.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/common/util/SocketClient.cpp 2010-06-01 10:45:29 +0000
@@ -18,18 +18,17 @@
#include <ndb_global.h>
-#include <NdbOut.hpp>
#include <SocketClient.hpp>
#include <SocketAuthenticator.hpp>
-SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa)
+SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa) :
+ m_connect_timeout_millisec(0) // Blocking connect by default
{
m_auth= sa;
m_port= port;
m_server_name= server_name ? strdup(server_name) : 0;
m_sockfd= NDB_INVALID_SOCKET;
- m_connect_timeout_sec= 0;
}
SocketClient::~SocketClient()
@@ -108,19 +107,10 @@ SocketClient::bind(const char* bindaddre
NDB_SOCKET_TYPE
SocketClient::connect(const char *toaddress, unsigned short toport)
{
- fd_set rset, wset;
- struct timeval tval;
- int r;
- bool use_timeout;
- SOCKOPT_OPTLEN_TYPE len;
- int flags;
-
- if (m_sockfd == NDB_INVALID_SOCKET)
+ if (!my_socket_valid(m_sockfd))
{
- if (!init()) {
-#ifdef VM_TRACE
- ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl;
-#endif
+ if (!init())
+ {
return NDB_INVALID_SOCKET;
}
}
@@ -139,52 +129,53 @@ SocketClient::connect(const char *toaddr
return NDB_INVALID_SOCKET;
}
- flags= fcntl(m_sockfd, F_GETFL, 0);
+ // Set socket non blocking
+ const int flags= fcntl(m_sockfd, F_GETFL, 0);
fcntl(m_sockfd, F_SETFL, flags | O_NONBLOCK);
- r= ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
-
+ // Start non blocking connect
+ int r = ::connect(m_sockfd,
+ (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
if (r == 0)
goto done; // connected immediately.
if (r < 0 && (errno != EINPROGRESS)) {
+ // Start of non blocking connect failed
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= NDB_INVALID_SOCKET;
return NDB_INVALID_SOCKET;
}
- FD_ZERO(&rset);
- FD_SET(m_sockfd, &rset);
- wset= rset;
- tval.tv_sec= m_connect_timeout_sec;
- tval.tv_usec= 0;
- use_timeout= m_connect_timeout_sec;
-
- if ((r= select(m_sockfd+1, &rset, &wset, NULL,
- use_timeout? &tval : NULL)) == 0)
+ if (ndb_poll(m_sockfd, true, true, true,
+ m_connect_timeout_millisec > 0 ?
+ m_connect_timeout_millisec : -1) <= 0)
{
+ // Nothing has happened on the socket after timeout
+ // or an error occured
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= NDB_INVALID_SOCKET;
return NDB_INVALID_SOCKET;
}
- if (FD_ISSET(m_sockfd, &rset) || FD_ISSET(m_sockfd, &wset))
+ // Activity detected on the socket
+
{
- len= sizeof(r);
- if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &r, &len) < 0 || r)
+ // Check socket level error code
+ int so_error = 0;
+ SOCKOPT_OPTLEN_TYPE len= sizeof(so_error);
+ if (getsockopt(m_sockfd, SOL_SOCKET, SO_ERROR, &so_error, &len) < 0)
+ {
+ NDB_CLOSE_SOCKET(m_sockfd);
+ m_sockfd= NDB_INVALID_SOCKET;
+ return NDB_INVALID_SOCKET;
+ }
+
+ if (so_error)
{
- // Solaris got an error... different than others
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= NDB_INVALID_SOCKET;
return NDB_INVALID_SOCKET;
}
- }
- else
- {
- // select error, probably m_sockfd not set.
- NDB_CLOSE_SOCKET(m_sockfd);
- m_sockfd= NDB_INVALID_SOCKET;
- return NDB_INVALID_SOCKET;
}
done:
=== modified file 'storage/ndb/src/common/util/SocketServer.cpp'
--- a/storage/ndb/src/common/util/SocketServer.cpp 2009-09-08 12:23:44 +0000
+++ b/storage/ndb/src/common/util/SocketServer.cpp 2010-06-01 10:45:29 +0000
@@ -152,60 +152,77 @@ SocketServer::setup(SocketServer::Servic
i.m_service = service;
m_services.push_back(i);
+ // Increase size to allow polling all listening ports
+ m_services_poller.set_max_count(m_services.size());
+
*port = ntohs(servaddr.sin_port);
DBUG_RETURN(true);
}
-void
-SocketServer::doAccept(){
- fd_set readSet, exceptionSet;
- FD_ZERO(&readSet);
- FD_ZERO(&exceptionSet);
-
+
+bool
+SocketServer::doAccept()
+{
m_services.lock();
- int maxSock = 0;
- for (unsigned i = 0; i < m_services.size(); i++){
- const NDB_SOCKET_TYPE s = m_services[i].m_socket;
- FD_SET(s, &readSet);
- FD_SET(s, &exceptionSet);
- maxSock = (maxSock > s ? maxSock : s);
- }
- struct timeval timeout;
- timeout.tv_sec = 1;
- timeout.tv_usec = 0;
-
- if(select(maxSock + 1, &readSet, 0, &exceptionSet, &timeout) > 0){
- for (unsigned i = 0; i < m_services.size(); i++){
- ServiceInstance & si = m_services[i];
-
- if(FD_ISSET(si.m_socket, &readSet)){
- NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
- if(childSock == NDB_INVALID_SOCKET){
- continue;
- }
-
- SessionInstance s;
- s.m_service = si.m_service;
- s.m_session = si.m_service->newSession(childSock);
- if(s.m_session != 0)
- {
- m_session_mutex.lock();
- m_sessions.push_back(s);
- startSession(m_sessions.back());
- m_session_mutex.unlock();
- }
-
- continue;
- }
-
- if(FD_ISSET(si.m_socket, &exceptionSet)){
- DEBUG("socket in the exceptionSet");
- continue;
- }
+
+ m_services_poller.clear();
+ for (unsigned i = 0; i < m_services.size(); i++)
+ {
+ m_services_poller.add(m_services[i].m_socket, true, false, true);
+ }
+ assert(m_services.size() == m_services_poller.count());
+
+ const int accept_timeout_ms = 1000;
+ const int ret = m_services_poller.poll(accept_timeout_ms);
+ if (ret < 0)
+ {
+ // Error occured, indicate error to caller by returning false
+ m_services.unlock();
+ return false;
+ }
+
+ if (ret == 0)
+ {
+ // Timeout occured
+ m_services.unlock();
+ return true;
+ }
+
+ bool result = true;
+ for (unsigned i = 0; i < m_services_poller.count(); i++)
+ {
+ const bool has_read = m_services_poller.has_read(i);
+
+ if (!has_read)
+ continue; // Ignore events where read flag wasn't set
+
+ ServiceInstance & si = m_services[i];
+ assert(m_services_poller.is_socket_equal(i, si.m_socket));
+
+ const NDB_SOCKET_TYPE childSock = accept(si.m_socket, 0, 0);
+ if (!my_socket_valid(childSock))
+ {
+ // Could not 'accept' socket(maybe at max fds), indicate error
+ // to caller by returning false
+ result = false;
+ continue;
+ }
+
+ SessionInstance s;
+ s.m_service = si.m_service;
+ s.m_session = si.m_service->newSession(childSock);
+ if (s.m_session != 0)
+ {
+ m_session_mutex.lock();
+ m_sessions.push_back(s);
+ startSession(m_sessions.back());
+ m_session_mutex.unlock();
}
}
+
m_services.unlock();
+ return result;
}
extern "C"
@@ -260,11 +277,16 @@ SocketServer::doRun(){
while(!m_stopThread){
m_session_mutex.lock();
checkSessionsImpl();
- if(m_sessions.size() < m_maxSessions){
- m_session_mutex.unlock();
- doAccept();
- } else {
- m_session_mutex.unlock();
+ m_session_mutex.unlock();
+
+ if(m_sessions.size() >= m_maxSessions){
+ // Don't accept more connections yet
+ NdbSleep_MilliSleep(200);
+ continue;
+ }
+
+ if (!doAccept()){
+ // accept failed, step back
NdbSleep_MilliSleep(200);
}
}
=== modified file 'storage/ndb/src/common/util/socket_io.cpp'
--- a/storage/ndb/src/common/util/socket_io.cpp 2009-05-26 18:53:34 +0000
+++ b/storage/ndb/src/common/util/socket_io.cpp 2010-06-01 10:45:29 +0000
@@ -29,16 +29,8 @@ read_socket(NDB_SOCKET_TYPE socket, int
char * buf, int buflen){
if(buflen < 1)
return 0;
-
- fd_set readset;
- FD_ZERO(&readset);
- FD_SET(socket, &readset);
-
- struct timeval timeout;
- timeout.tv_sec = (timeout_millis / 1000);
- timeout.tv_usec = (timeout_millis % 1000) * 1000;
- const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
+ const int selectRes = ndb_poll(socket, true, false, false, timeout_millis);
if(selectRes == 0)
return 0;
@@ -56,20 +48,11 @@ readln_socket(NDB_SOCKET_TYPE socket, in
if(buflen <= 1)
return 0;
- fd_set readset;
- FD_ZERO(&readset);
- FD_SET(socket, &readset);
-
- struct timeval timeout;
- timeout.tv_sec = (timeout_millis / 1000);
- timeout.tv_usec = (timeout_millis % 1000) * 1000;
-
if(mutex)
NdbMutex_Unlock(mutex);
Uint64 tick= NdbTick_CurrentMillisecond();
- const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
-
- *time= NdbTick_CurrentMillisecond() - tick;
+ int selectRes = ndb_poll(socket, true, false, false, timeout_millis);
+ *time= (int)(NdbTick_CurrentMillisecond() - tick);
if(mutex)
NdbMutex_Lock(mutex);
@@ -134,14 +117,10 @@ readln_socket(NDB_SOCKET_TYPE socket, in
tmp -= t;
}
- FD_ZERO(&readset);
- FD_SET(socket, &readset);
- timeout.tv_sec = ((timeout_millis - *time) / 1000);
- timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
-
tick= NdbTick_CurrentMillisecond();
- const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
- *time= NdbTick_CurrentMillisecond() - tick;
+ int selectRes = ndb_poll(socket, true, false, false,
+ timeout_millis - *time);
+ *time= (int)(NdbTick_CurrentMillisecond() - tick);
if(selectRes != 1){
return -1;
@@ -155,17 +134,10 @@ extern "C"
int
write_socket(NDB_SOCKET_TYPE socket, int timeout_millis, int *time,
const char buf[], int len){
- fd_set writeset;
- FD_ZERO(&writeset);
- FD_SET(socket, &writeset);
- struct timeval timeout;
- timeout.tv_sec = (timeout_millis / 1000);
- timeout.tv_usec = (timeout_millis % 1000) * 1000;
-
Uint64 tick= NdbTick_CurrentMillisecond();
- const int selectRes = select(socket + 1, 0, &writeset, 0, &timeout);
- *time= NdbTick_CurrentMillisecond() - tick;
+ const int selectRes = ndb_poll(socket, false, true, false, timeout_millis);
+ *time= (int)(NdbTick_CurrentMillisecond() - tick);
if(selectRes != 1){
return -1;
@@ -182,15 +154,11 @@ write_socket(NDB_SOCKET_TYPE socket, int
if(len == 0)
break;
-
- FD_ZERO(&writeset);
- FD_SET(socket, &writeset);
- timeout.tv_sec = ((timeout_millis - *time) / 1000);
- timeout.tv_usec = ((timeout_millis - *time) % 1000) * 1000;
Uint64 tick= NdbTick_CurrentMillisecond();
- const int selectRes2 = select(socket + 1, 0, &writeset, 0, &timeout);
- *time= NdbTick_CurrentMillisecond() - tick;
+ const int selectRes2 = ndb_poll(socket, false, true, false,
+ timeout_millis - *time);
+ *time= (int)(NdbTick_CurrentMillisecond() - tick);
if(selectRes2 != 1){
return -1;
=== modified file 'storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2009-12-14 10:58:03 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp 2010-06-02 09:12:29 +0000
@@ -924,7 +924,7 @@ bool Dbtup::readTriggerInfo(TupTriggerDa
numAttrsToRead = setAttrIds(attributeMask, regTabPtr->m_no_of_attributes,
&readBuffer[0]);
}
- ndbrequire(numAttrsToRead < MAX_ATTRIBUTES_IN_TABLE);
+ ndbrequire(numAttrsToRead <= MAX_ATTRIBUTES_IN_TABLE);
//--------------------------------------------------------------------
// Read Main tuple values
//--------------------------------------------------------------------
=== modified file 'storage/ndb/src/kernel/vm/SimulatedBlock.cpp'
--- a/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2010-01-25 15:36:04 +0000
+++ b/storage/ndb/src/kernel/vm/SimulatedBlock.cpp 2010-06-02 07:36:20 +0000
@@ -2398,7 +2398,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
}
tmp.p->m_callback = c;
- if(!c_fragSenderRunning){
+ if(!c_fragSenderRunning)
+ {
+ SaveSignal<2> save(signal);
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->type = ContinueFragmented::CONTINUE_SENDING;
@@ -2439,7 +2441,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
}
tmp.p->m_callback = c;
- if(!c_fragSenderRunning){
+ if(!c_fragSenderRunning)
+ {
+ SaveSignal<2> save(signal);
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->type = ContinueFragmented::CONTINUE_SENDING;
@@ -2489,7 +2493,9 @@ SimulatedBlock::sendFragmentedSignal(Blo
}
tmp.p->m_callback = c;
- if(!c_fragSenderRunning){
+ if(!c_fragSenderRunning)
+ {
+ SaveSignal<2> save(signal);
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->type = ContinueFragmented::CONTINUE_SENDING;
@@ -2532,7 +2538,9 @@ SimulatedBlock::sendFragmentedSignal(Nod
}
tmp.p->m_callback = c;
- if(!c_fragSenderRunning){
+ if(!c_fragSenderRunning)
+ {
+ SaveSignal<2> save(signal);
c_fragSenderRunning = true;
ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend();
sig->type = ContinueFragmented::CONTINUE_SENDING;
=== modified file 'storage/ndb/src/kernel/vm/VMSignal.hpp'
--- a/storage/ndb/src/kernel/vm/VMSignal.hpp 2009-10-21 08:40:24 +0000
+++ b/storage/ndb/src/kernel/vm/VMSignal.hpp 2010-06-02 07:36:20 +0000
@@ -120,6 +120,37 @@ public:
void garbage_register();
};
+template<Uint32 len>
+class SaveSignal
+{
+ Uint32 m_copy[len];
+ Signal * m_signal;
+
+public:
+ SaveSignal(Signal* signal) {
+ save(signal);
+ }
+
+ void save(Signal* signal) {
+ m_signal = signal;
+ for (Uint32 i = 0; i<len; i++)
+ m_copy[i] = m_signal->theData[i];
+ }
+
+ void clear() { m_signal = 0;}
+
+ void restore() {
+ for (Uint32 i = 0; i<len; i++)
+ m_signal->theData[i] = m_copy[i];
+ }
+
+ ~SaveSignal() {
+ if (m_signal)
+ restore();
+ clear();
+ }
+};
+
inline
Uint32
Signal::getLength() const {
=== modified file 'storage/ndb/src/mgmapi/mgmapi.cpp'
--- a/storage/ndb/src/mgmapi/mgmapi.cpp 2010-03-04 14:31:16 +0000
+++ b/storage/ndb/src/mgmapi/mgmapi.cpp 2010-06-01 10:45:29 +0000
@@ -562,7 +562,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int
SocketClient s(0, 0);
const char *bind_address= NULL;
unsigned short bind_address_port= 0;
- s.set_connect_timeout((handle->timeout+999)/1000);
+ s.set_connect_timeout(handle->timeout);
if (!s.init())
{
fprintf(handle->errstream,
@@ -1764,7 +1764,7 @@ ndb_mgm_listen_event_internal(NdbMgmHand
int port= ndb_mgm_get_connected_port(handle);
const char *bind_address= ndb_mgm_get_connected_bind_address(handle);
SocketClient s(0, 0);
- s.set_connect_timeout((handle->timeout+999)/1000);
+ s.set_connect_timeout(handle->timeout);
if (!s.init())
{
fprintf(handle->errstream, "Unable to create socket");
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-6.3 branch (Martin.Skold:3208) Bug#34303Bug#54155 Bug#54168 | Martin Skold | 2 Jun |