4605 Ole John Aske 2012-09-13 [merge]
Merge 7.0 -> 7.1
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
4604 Ole John Aske 2012-09-13 [merge]
Merge 7.0 -> 7.1
modified:
storage/ndb/src/common/transporter/TransporterRegistry.cpp
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-06-07 14:46:55 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-09-13 10:27:55 +0000
@@ -108,8 +108,15 @@ struct TransporterReceiveData
NodeBitmask m_transporters;
/**
- * Bitmask of transporters that has data "carried over" since
- * last performReceive
+ * Bitmask of transporters having data awaiting to be received
+ * from its transporter.
+ */
+ NodeBitmask m_recv_transporters;
+
+ /**
+ * Bitmask of transporters that has already received data buffered
+ * inside its transporter. Possibly "carried over" from last
+ * performReceive
*/
NodeBitmask m_has_data_transporters;
#if defined(HAVE_EPOLL_CREATE)
@@ -408,7 +415,6 @@ private:
#ifdef ERROR_INSERT
Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
- Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
int m_disconnect_errors[MAX_NTRANSPORTERS];
#endif
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 09:36:35 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 10:28:59 +0000
@@ -225,7 +225,6 @@ TransporterRegistry::TransporterRegistry
#ifdef ERROR_INSERT
m_blocked.clear();
- m_blocked_with_data.clear();
m_blocked_disconnected.clear();
#endif
// Initialize member variables
@@ -1082,6 +1081,7 @@ TransporterRegistry::pollReceive(Uint32
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
Uint32 retVal = 0;
+ recvdata.m_recv_transporters.clear();
/**
* If any transporters have left-over data that was not fully executed in
@@ -1131,20 +1131,12 @@ TransporterRegistry::pollReceive(Uint32
for (int i = 0; i < num_socket_events; i++)
{
const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
-#ifdef ERROR_INSERT
- if (m_blocked.get(trpid))
- {
- /* Don't pull from socket now, wait till unblocked */
- m_blocked_with_data.set(trpid);
- continue;
- }
-#endif
/**
* check that it's assigned to "us"
*/
assert(recvdata.m_transporters.get(trpid));
- recvdata.m_has_data_transporters.set(trpid);
+ recvdata.m_recv_transporters.set(trpid);
}
}
else if (num_socket_events < 0)
@@ -1293,7 +1285,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (recvdata.m_socket_poller.has_read(0))
{
assert(recvdata.m_transporters.get(0));
- recvdata.m_has_data_transporters.set((Uint32)0);
+ recvdata.m_recv_transporters.set((Uint32)0);
}
}
@@ -1303,16 +1295,8 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (idx[i] != MAX_NODES + 1)
{
Uint32 node_id = t->getRemoteNodeId();
-#ifdef ERROR_INSERT
- if (m_blocked.get(node_id))
- {
- /* Don't pull from socket now, wait till unblocked */
- m_blocked_with_data.set(node_id);
- continue;
- }
-#endif
if (recvdata.m_socket_poller.has_read(idx[i]))
- recvdata.m_has_data_transporters.set(node_id);
+ recvdata.m_recv_transporters.set(node_id);
}
}
}
@@ -1331,18 +1315,22 @@ TransporterRegistry::performReceive(Tran
bool hasReceived = false;
- if (recvdata.m_has_data_transporters.get(0))
+ if (recvdata.m_recv_transporters.get(0))
{
assert(recvdata.m_transporters.get(0));
assert(&recvdata == receiveHandle); // not used by ndbmtd
- recvdata.m_has_data_transporters.clear(Uint32(0));
+ recvdata.m_recv_transporters.clear(Uint32(0));
consume_extra_sockets();
}
#ifdef ERROR_INSERT
if (!m_blocked.isclear())
{
- if (recvdata.m_has_data_transporters.isclear())
+ /* Exclude receive from blocked sockets. */
+ recvdata.m_recv_transporters.bitANDC(m_blocked);
+
+ if (recvdata.m_recv_transporters.isclear() &&
+ recvdata.m_has_data_transporters.isclear())
{
/* poll sees data, but we want to ignore for now
* sleep a little to avoid busy loop
@@ -1353,6 +1341,35 @@ TransporterRegistry::performReceive(Tran
#endif
#ifdef NDB_TCP_TRANSPORTER
+ /**
+ * Receive data from transporters polled to have data.
+ * Add to set of transported having pending data.
+ */
+ for(Uint32 id = recvdata.m_recv_transporters.find_first();
+ id != BitmaskImpl::NotFound;
+ id = recvdata.m_recv_transporters.find_next(id + 1))
+ {
+ TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
+ assert(recvdata.m_transporters.get(id));
+
+ if (is_connected(id))
+ {
+ if (t->isConnected())
+ {
+ int nBytes = t->doReceive(recvdata);
+ if (nBytes > 0)
+ {
+ recvdata.transporter_recv_from(id);
+ recvdata.m_has_data_transporters.set(id);
+ }
+ }
+ }
+ }
+ recvdata.m_recv_transporters.clear();
+
+ /**
+ * Handle data either received above or pending from prev rounds.
+ */
for(Uint32 id = recvdata.m_has_data_transporters.find_first();
id != BitmaskImpl::NotFound;
id = recvdata.m_has_data_transporters.find_next(id + 1))
@@ -1366,16 +1383,20 @@ TransporterRegistry::performReceive(Tran
{
if (t->isConnected())
{
- t->doReceive(recvdata);
if (hasReceived)
recvdata.checkJobBuffer();
hasReceived = true;
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
- recvdata.transporter_recv_from(id);
Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
- t->updateReceiveDataPtr(szUsed);
- hasdata = t->hasReceiveData();
+ if (likely(szUsed))
+ {
+ t->updateReceiveDataPtr(szUsed);
+ hasdata = t->hasReceiveData();
+ }
+ // else, we didn't unpack anything:
+ // Avail ReceiveData to short to be usefull, need to
+ // receive more before we can resume this transporter.
}
}
// If transporter still have data, make sure that it's remember to next time
@@ -1573,13 +1594,6 @@ TransporterRegistry::blockReceive(Transp
assert(!m_blocked.get(nodeId));
m_blocked.set(nodeId);
-
- if (recvdata.m_has_data_transporters.get(nodeId))
- {
- assert(!m_blocked_with_data.get(nodeId));
- m_blocked_with_data.set(nodeId);
- recvdata.m_has_data_transporters.clear(nodeId);
- }
}
void
@@ -1598,11 +1612,6 @@ TransporterRegistry::unblockReceive(Tran
m_blocked.clear(nodeId);
- if (m_blocked_with_data.get(nodeId))
- {
- recvdata.m_has_data_transporters.set(nodeId);
- }
-
if (m_blocked_disconnected.get(nodeId))
{
/* Process disconnect notification/handling now */
@@ -1753,6 +1762,7 @@ TransporterRegistry::report_disconnect(T
#endif
performStates[node_id] = DISCONNECTED;
+ recvdata.m_recv_transporters.clear(node_id);
recvdata.m_has_data_transporters.clear(node_id);
recvdata.reportDisconnect(node_id, errnum);
DBUG_VOID_RETURN;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4604 to 4605) | Ole John Aske | 13 Sep |