From: Jonas Oreland Date: January 19 2012 11:49am Subject: bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4804 to 4805) List-Archive: http://lists.mysql.com/commits/142462 Message-Id: <20120119114926.3B58955C19E@perch.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4805 Jonas Oreland 2012-01-19 ndb - allow TransporterReceiveData to handle subset of transporters (for mt recv) modified: storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp 4804 Martin Skold 2012-01-19 Bug#13579318 LIKE SEARCH DOESN'T MATCH ANY ROWS ON A MULTI BYTE CHARSET COLUMN: Passing actual string length to scan filter predicate modified: mysql-test/suite/ndb/r/ndb_condition_pushdown.result mysql-test/suite/ndb/t/ndb_condition_pushdown.test sql/ha_ndbcluster_cond.cc === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-16 13:56:30 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-19 11:48:54 +0000 @@ -103,6 +103,11 @@ struct TransporterReceiveData bool epoll_add(TCP_Transporter*); /** + * Bitmask of transporters currently handled by this instance + */ + NodeBitmask m_transporters; + + /** * Bitmask of transporters that has data "carried over" since * last performReceive */ === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-01-18 12:35:51 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-01-19 11:48:54 +0000 @@ -76,6 +76,13 @@ SocketServer::Session * TransporterServi TransporterReceiveData::TransporterReceiveData() { + /** + * With multi receiver threads + * an interface to reassign these is needed... + */ + m_transporters.set(); // Handle all + m_transporters.clear(Uint32(0)); // Except wakeup socket... + #if defined(HAVE_EPOLL_CREATE) m_epoll_fd = -1; m_epoll_events = 0; @@ -111,6 +118,7 @@ fallback: bool TransporterReceiveData::epoll_add(TCP_Transporter *t) { + assert(m_transporters.get(t->getRemoteNodeId())); #if defined(HAVE_EPOLL_CREATE) if (m_epoll_fd != -1) { @@ -382,7 +390,7 @@ TransporterRegistry::init(NodeId nodeId) if (receiveHandle) { - if (!receiveHandle->init(maxTransporters)) + if (!init(* receiveHandle)) DBUG_RETURN(false); } @@ -992,6 +1000,8 @@ TransporterRegistry::setup_wakeup_socket return true; } + assert(!recvdata.m_transporters.get(0)); + if (my_socketpair(m_extra_wakeup_sockets)) { perror("socketpair failed!"); @@ -1025,6 +1035,7 @@ TransporterRegistry::setup_wakeup_socket } #endif m_has_extra_wakeup_socket = true; + recvdata.m_transporters.set(Uint32(0)); return true; err: @@ -1109,6 +1120,11 @@ TransporterRegistry::pollReceive(Uint32 continue; } #endif + /** + * check that it's assigned to "us" + */ + assert(recvdata.m_transporters.get(trpid)); + recvdata.m_has_data_transporters.set(trpid); } } @@ -1153,6 +1169,10 @@ TransporterRegistry::poll_SCI(Uint32 tim { SCI_Transporter * t = theSCITransporters[i]; Uint32 node_id = t->getRemoteNodeId(); + + if (!recvdata.m_transporters.get(nodeId)) + continue; + if (t->isConnected() && is_connected(node_id)) { if (t->hasDataToRead()) @@ -1182,6 +1202,10 @@ TransporterRegistry::poll_SHM(Uint32 tim { SHM_Transporter * t = theSHMTransporters[i]; Uint32 node_id = t->getRemoteNodeId(); + + if (!recvdata.m_transporters.get(node_id)) + continue; + if (t->isConnected() && is_connected(node_id)) { if (t->hasDataToRead()) @@ -1214,7 +1238,7 @@ TransporterRegistry::poll_TCP(Uint32 tim recvdata.m_socket_poller.clear(); - if (m_has_extra_wakeup_socket) + if (m_has_extra_wakeup_socket && recvdata.m_transporters.get(0)) { const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0]; @@ -1229,6 +1253,9 @@ TransporterRegistry::poll_TCP(Uint32 tim const NDB_SOCKET_TYPE socket = t->getSocket(); Uint32 node_id = t->getRemoteNodeId(); + if (!recvdata.m_transporters.get(node_id)) + continue; + if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket)) { idx[i] = recvdata.m_socket_poller.add(socket, true, false, false); @@ -1285,6 +1312,7 @@ TransporterRegistry::performReceive(Tran if (recvdata.m_has_data_transporters.get(0)) { + assert(recvdata.m_transporters.get(0)); recvdata.m_has_data_transporters.clear(Uint32(0)); consume_extra_sockets(); } @@ -1309,6 +1337,9 @@ TransporterRegistry::performReceive(Tran { bool hasdata = false; TCP_Transporter * t = (TCP_Transporter*)theTransporters[id]; + + assert(recvdata.m_transporters.get(id)); + if (is_connected(id)) { if (t->isConnected()) @@ -1337,6 +1368,7 @@ TransporterRegistry::performReceive(Tran { SCI_Transporter *t = theSCITransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); + assert(recvdata.m_transporters.get(nodeId)); if(is_connected(nodeId)) { if(t->isConnected() && t->checkConnected()) @@ -1358,6 +1390,7 @@ TransporterRegistry::performReceive(Tran { SHM_Transporter *t = theSHMTransporters[i]; const NodeId nodeId = t->getRemoteNodeId(); + assert(recvdata.m_transporters.get(nodeId)); if(is_connected(nodeId)){ if(t->isConnected() && t->checkConnected()) { @@ -1509,6 +1542,7 @@ TransporterRegistry::blockReceive(Transp NodeId nodeId) { assert((receiveHandle == &recvdata) || (receiveHandle == 0)); + assert(recvdata.m_transporters.get(nodeId)); /* Check that node is not already blocked? * Stop pulling from its socket (but track received data etc) @@ -1531,6 +1565,7 @@ TransporterRegistry::unblockReceive(Tran NodeId nodeId) { assert((receiveHandle == &recvdata) || (receiveHandle == 0)); + assert(recvdata.m_transporters.get(nodeId)); /* Check that node is blocked? * Resume pulling from its socket @@ -1644,6 +1679,7 @@ TransporterRegistry::report_connect(Tran NodeId node_id) { assert((receiveHandle == &recvdata) || (receiveHandle == 0)); + assert(recvdata.m_transporters.get(node_id)); DBUG_ENTER("TransporterRegistry::report_connect"); DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id)); @@ -1677,6 +1713,7 @@ TransporterRegistry::report_disconnect(T NodeId node_id, int errnum) { assert((receiveHandle == &recvdata) || (receiveHandle == 0)); + assert(recvdata.m_transporters.get(node_id)); DBUG_ENTER("TransporterRegistry::report_disconnect"); DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id)); @@ -1735,6 +1772,8 @@ TransporterRegistry::update_connections( n++; const NodeId nodeId = t->getRemoteNodeId(); + if (!recvdata.m_transporters.get(nodeId)) + continue; TransporterError code = m_error_states[nodeId].m_code; const char *info = m_error_states[nodeId].m_info; No bundle (reason: useless for push emails).