4805 Jonas Oreland 2012-01-19
ndb - allow TransporterReceiveData to handle subset of transporters (for mt recv)
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
4804 Martin Skold 2012-01-19
Bug#13579318 LIKE SEARCH DOESN'T MATCH ANY ROWS ON A MULTI BYTE CHARSET COLUMN: Passing actual string length to scan filter predicate
modified:
mysql-test/suite/ndb/r/ndb_condition_pushdown.result
mysql-test/suite/ndb/t/ndb_condition_pushdown.test
sql/ha_ndbcluster_cond.cc
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-16 13:56:30 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-01-19 11:48:54 +0000
@@ -103,6 +103,11 @@ struct TransporterReceiveData
bool epoll_add(TCP_Transporter*);
/**
+ * Bitmask of transporters currently handled by this instance
+ */
+ NodeBitmask m_transporters;
+
+ /**
* Bitmask of transporters that has data "carried over" since
* last performReceive
*/
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-01-18 12:35:51 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-01-19 11:48:54 +0000
@@ -76,6 +76,13 @@ SocketServer::Session * TransporterServi
TransporterReceiveData::TransporterReceiveData()
{
+ /**
+ * With multi receiver threads
+ * an interface to reassign these is needed...
+ */
+ m_transporters.set(); // Handle all
+ m_transporters.clear(Uint32(0)); // Except wakeup socket...
+
#if defined(HAVE_EPOLL_CREATE)
m_epoll_fd = -1;
m_epoll_events = 0;
@@ -111,6 +118,7 @@ fallback:
bool
TransporterReceiveData::epoll_add(TCP_Transporter *t)
{
+ assert(m_transporters.get(t->getRemoteNodeId()));
#if defined(HAVE_EPOLL_CREATE)
if (m_epoll_fd != -1)
{
@@ -382,7 +390,7 @@ TransporterRegistry::init(NodeId nodeId)
if (receiveHandle)
{
- if (!receiveHandle->init(maxTransporters))
+ if (!init(* receiveHandle))
DBUG_RETURN(false);
}
@@ -992,6 +1000,8 @@ TransporterRegistry::setup_wakeup_socket
return true;
}
+ assert(!recvdata.m_transporters.get(0));
+
if (my_socketpair(m_extra_wakeup_sockets))
{
perror("socketpair failed!");
@@ -1025,6 +1035,7 @@ TransporterRegistry::setup_wakeup_socket
}
#endif
m_has_extra_wakeup_socket = true;
+ recvdata.m_transporters.set(Uint32(0));
return true;
err:
@@ -1109,6 +1120,11 @@ TransporterRegistry::pollReceive(Uint32
continue;
}
#endif
+ /**
+ * check that it's assigned to "us"
+ */
+ assert(recvdata.m_transporters.get(trpid));
+
recvdata.m_has_data_transporters.set(trpid);
}
}
@@ -1153,6 +1169,10 @@ TransporterRegistry::poll_SCI(Uint32 tim
{
SCI_Transporter * t = theSCITransporters[i];
Uint32 node_id = t->getRemoteNodeId();
+
+ if (!recvdata.m_transporters.get(nodeId))
+ continue;
+
if (t->isConnected() && is_connected(node_id))
{
if (t->hasDataToRead())
@@ -1182,6 +1202,10 @@ TransporterRegistry::poll_SHM(Uint32 tim
{
SHM_Transporter * t = theSHMTransporters[i];
Uint32 node_id = t->getRemoteNodeId();
+
+ if (!recvdata.m_transporters.get(node_id))
+ continue;
+
if (t->isConnected() && is_connected(node_id))
{
if (t->hasDataToRead())
@@ -1214,7 +1238,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
recvdata.m_socket_poller.clear();
- if (m_has_extra_wakeup_socket)
+ if (m_has_extra_wakeup_socket && recvdata.m_transporters.get(0))
{
const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
@@ -1229,6 +1253,9 @@ TransporterRegistry::poll_TCP(Uint32 tim
const NDB_SOCKET_TYPE socket = t->getSocket();
Uint32 node_id = t->getRemoteNodeId();
+ if (!recvdata.m_transporters.get(node_id))
+ continue;
+
if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
{
idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
@@ -1285,6 +1312,7 @@ TransporterRegistry::performReceive(Tran
if (recvdata.m_has_data_transporters.get(0))
{
+ assert(recvdata.m_transporters.get(0));
recvdata.m_has_data_transporters.clear(Uint32(0));
consume_extra_sockets();
}
@@ -1309,6 +1337,9 @@ TransporterRegistry::performReceive(Tran
{
bool hasdata = false;
TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
+
+ assert(recvdata.m_transporters.get(id));
+
if (is_connected(id))
{
if (t->isConnected())
@@ -1337,6 +1368,7 @@ TransporterRegistry::performReceive(Tran
{
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
+ assert(recvdata.m_transporters.get(nodeId));
if(is_connected(nodeId))
{
if(t->isConnected() && t->checkConnected())
@@ -1358,6 +1390,7 @@ TransporterRegistry::performReceive(Tran
{
SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
+ assert(recvdata.m_transporters.get(nodeId));
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected())
{
@@ -1509,6 +1542,7 @@ TransporterRegistry::blockReceive(Transp
NodeId nodeId)
{
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+ assert(recvdata.m_transporters.get(nodeId));
/* Check that node is not already blocked?
* Stop pulling from its socket (but track received data etc)
@@ -1531,6 +1565,7 @@ TransporterRegistry::unblockReceive(Tran
NodeId nodeId)
{
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+ assert(recvdata.m_transporters.get(nodeId));
/* Check that node is blocked?
* Resume pulling from its socket
@@ -1644,6 +1679,7 @@ TransporterRegistry::report_connect(Tran
NodeId node_id)
{
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+ assert(recvdata.m_transporters.get(node_id));
DBUG_ENTER("TransporterRegistry::report_connect");
DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
@@ -1677,6 +1713,7 @@ TransporterRegistry::report_disconnect(T
NodeId node_id, int errnum)
{
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+ assert(recvdata.m_transporters.get(node_id));
DBUG_ENTER("TransporterRegistry::report_disconnect");
DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
@@ -1735,6 +1772,8 @@ TransporterRegistry::update_connections(
n++;
const NodeId nodeId = t->getRemoteNodeId();
+ if (!recvdata.m_transporters.get(nodeId))
+ continue;
TransporterError code = m_error_states[nodeId].m_code;
const char *info = m_error_states[nodeId].m_info;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas.oreland:4804 to 4805) | Jonas Oreland | 20 Jan |