4975 Ole John Aske 2012-09-13
Fix for Bug#14525521 RECEIVER-THREAD MAY BUSY-WAIT FOR
DATA TO BE RECEIVED
This patch fixes two related problems wrt. how
'NodeBitmask m_has_data_transporters' is maintained:
1) If the remaining part of the already received data inside the
transporter was insufficient to reconstruct the last signal,
we should not count this node as 'm_has_data_transporters'.
This will force us to wait for more data to be recv'ed before
we can continue processing data from this transporter, and
thus break the busy-loop.
2) As described in the bug report, 'm_has_data_transporters' mix
together nodes having data to be recv'ed from the socket into
the local transporter buffers, and those nodes having 'leftover'
data in the tranporter buffers which has to be unpacked.
'NodeBitmask m_recv_transporters' has been introduced to keep
track of those nodes which pollReceive() detected to have data
to be recv'ed. After being recv'ed, 'm_has_data_transporters'
will still represent those nodes with data to be unpacked.
This patch also cleans up and simplify the handling of
'blocked' transporters as a side effect of the above changes:
As the 'has_recv' and 'has_data' state of the transporters now
are represented on seperate bitmask, we no longer need to save
blocked transporters with 'has_data' into 'm_blocked_with_data.
Instead we allow any blocked transporters which already 'has_data'
(Received data buffered in tranporters) to drain these buffers.
However, the blocked transporters are excluded from receiving any
more data on these transportere.
When unblocked, the next pollReceive() will detect the available
data and allow them to be recv'ed.
modified:
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
4974 Ole John Aske 2012-09-13
Fix for Bug#14525176 'DUMP 9992' TO SIMULATE BLOCKED TRANSPORTER, AFFECT INCORRECT NODE
node_id, instead if index into transporter array should be used to check/set
the blocked transporters.
modified:
storage/ndb/src/common/transporter/TransporterRegistry.cpp
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-06-07 14:46:55 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-09-13 10:27:55 +0000
@@ -108,8 +108,15 @@ struct TransporterReceiveData
NodeBitmask m_transporters;
/**
- * Bitmask of transporters that has data "carried over" since
- * last performReceive
+ * Bitmask of transporters having data awaiting to be received
+ * from its transporter.
+ */
+ NodeBitmask m_recv_transporters;
+
+ /**
+ * Bitmask of transporters that has already received data buffered
+ * inside its transporter. Possibly "carried over" from last
+ * performReceive
*/
NodeBitmask m_has_data_transporters;
#if defined(HAVE_EPOLL_CREATE)
@@ -408,7 +415,6 @@ private:
#ifdef ERROR_INSERT
Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
- Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
int m_disconnect_errors[MAX_NTRANSPORTERS];
#endif
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 09:35:22 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 10:27:55 +0000
@@ -225,7 +225,6 @@ TransporterRegistry::TransporterRegistry
#ifdef ERROR_INSERT
m_blocked.clear();
- m_blocked_with_data.clear();
m_blocked_disconnected.clear();
#endif
// Initialize member variables
@@ -1082,6 +1081,7 @@ TransporterRegistry::pollReceive(Uint32
assert((receiveHandle == &recvdata) || (receiveHandle == 0));
Uint32 retVal = 0;
+ recvdata.m_recv_transporters.clear();
/**
* If any transporters have left-over data that was not fully executed in
@@ -1131,20 +1131,12 @@ TransporterRegistry::pollReceive(Uint32
for (int i = 0; i < num_socket_events; i++)
{
const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
-#ifdef ERROR_INSERT
- if (m_blocked.get(trpid))
- {
- /* Don't pull from socket now, wait till unblocked */
- m_blocked_with_data.set(trpid);
- continue;
- }
-#endif
/**
* check that it's assigned to "us"
*/
assert(recvdata.m_transporters.get(trpid));
- recvdata.m_has_data_transporters.set(trpid);
+ recvdata.m_recv_transporters.set(trpid);
}
}
else if (num_socket_events < 0)
@@ -1293,7 +1285,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (recvdata.m_socket_poller.has_read(0))
{
assert(recvdata.m_transporters.get(0));
- recvdata.m_has_data_transporters.set((Uint32)0);
+ recvdata.m_recv_transporters.set((Uint32)0);
}
}
@@ -1303,16 +1295,8 @@ TransporterRegistry::poll_TCP(Uint32 tim
if (idx[i] != MAX_NODES + 1)
{
Uint32 node_id = t->getRemoteNodeId();
-#ifdef ERROR_INSERT
- if (m_blocked.get(node_id))
- {
- /* Don't pull from socket now, wait till unblocked */
- m_blocked_with_data.set(node_id);
- continue;
- }
-#endif
if (recvdata.m_socket_poller.has_read(idx[i]))
- recvdata.m_has_data_transporters.set(node_id);
+ recvdata.m_recv_transporters.set(node_id);
}
}
}
@@ -1331,18 +1315,22 @@ TransporterRegistry::performReceive(Tran
bool hasReceived = false;
- if (recvdata.m_has_data_transporters.get(0))
+ if (recvdata.m_recv_transporters.get(0))
{
assert(recvdata.m_transporters.get(0));
assert(&recvdata == receiveHandle); // not used by ndbmtd
- recvdata.m_has_data_transporters.clear(Uint32(0));
+ recvdata.m_recv_transporters.clear(Uint32(0));
consume_extra_sockets();
}
#ifdef ERROR_INSERT
if (!m_blocked.isclear())
{
- if (recvdata.m_has_data_transporters.isclear())
+ /* Exclude receive from blocked sockets. */
+ recvdata.m_recv_transporters.bitANDC(m_blocked);
+
+ if (recvdata.m_recv_transporters.isclear() &&
+ recvdata.m_has_data_transporters.isclear())
{
/* poll sees data, but we want to ignore for now
* sleep a little to avoid busy loop
@@ -1353,6 +1341,35 @@ TransporterRegistry::performReceive(Tran
#endif
#ifdef NDB_TCP_TRANSPORTER
+ /**
+ * Receive data from transporters polled to have data.
+ * Add to set of transported having pending data.
+ */
+ for(Uint32 id = recvdata.m_recv_transporters.find_first();
+ id != BitmaskImpl::NotFound;
+ id = recvdata.m_recv_transporters.find_next(id + 1))
+ {
+ TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
+ assert(recvdata.m_transporters.get(id));
+
+ if (is_connected(id))
+ {
+ if (t->isConnected())
+ {
+ int nBytes = t->doReceive(recvdata);
+ if (nBytes > 0)
+ {
+ recvdata.transporter_recv_from(id);
+ recvdata.m_has_data_transporters.set(id);
+ }
+ }
+ }
+ }
+ recvdata.m_recv_transporters.clear();
+
+ /**
+ * Handle data either received above or pending from prev rounds.
+ */
for(Uint32 id = recvdata.m_has_data_transporters.find_first();
id != BitmaskImpl::NotFound;
id = recvdata.m_has_data_transporters.find_next(id + 1))
@@ -1366,16 +1383,20 @@ TransporterRegistry::performReceive(Tran
{
if (t->isConnected())
{
- t->doReceive(recvdata);
if (hasReceived)
recvdata.checkJobBuffer();
hasReceived = true;
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
- recvdata.transporter_recv_from(id);
Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
- t->updateReceiveDataPtr(szUsed);
- hasdata = t->hasReceiveData();
+ if (likely(szUsed))
+ {
+ t->updateReceiveDataPtr(szUsed);
+ hasdata = t->hasReceiveData();
+ }
+ // else, we didn't unpack anything:
+ // Avail ReceiveData to short to be usefull, need to
+ // receive more before we can resume this transporter.
}
}
// If transporter still have data, make sure that it's remember to next time
@@ -1573,13 +1594,6 @@ TransporterRegistry::blockReceive(Transp
assert(!m_blocked.get(nodeId));
m_blocked.set(nodeId);
-
- if (recvdata.m_has_data_transporters.get(nodeId))
- {
- assert(!m_blocked_with_data.get(nodeId));
- m_blocked_with_data.set(nodeId);
- recvdata.m_has_data_transporters.clear(nodeId);
- }
}
void
@@ -1598,11 +1612,6 @@ TransporterRegistry::unblockReceive(Tran
m_blocked.clear(nodeId);
- if (m_blocked_with_data.get(nodeId))
- {
- recvdata.m_has_data_transporters.set(nodeId);
- }
-
if (m_blocked_disconnected.get(nodeId))
{
/* Process disconnect notification/handling now */
@@ -1753,6 +1762,7 @@ TransporterRegistry::report_disconnect(T
#endif
performStates[node_id] = DISCONNECTED;
+ recvdata.m_recv_transporters.clear(node_id);
recvdata.m_has_data_transporters.clear(node_id);
recvdata.reportDisconnect(node_id, errnum);
DBUG_VOID_RETURN;
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4974 to 4975)Bug#14525521 | Ole John Aske | 13 Sep |