From: Ole John Aske Date: September 13 2012 10:33am Subject: bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4974 to 4975) Bug#14525521 List-Archive: http://lists.mysql.com/commits/144767 X-Bug: 14525521 Message-Id: <20120913103313.22447.55586.4975@fimafeng09.no.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4975 Ole John Aske 2012-09-13 Fix for Bug#14525521 RECEIVER-THREAD MAY BUSY-WAIT FOR DATA TO BE RECEIVED This patch fixes two related problems wrt. how 'NodeBitmask m_has_data_transporters' is maintained: 1) If the remaining part of the already received data inside the transporter was insufficient to reconstruct the last signal, we should not count this node as 'm_has_data_transporters'. This will force us to wait for more data to be recv'ed before we can continue processing data from this transporter, and thus break the busy-loop. 2) As described in the bug report, 'm_has_data_transporters' mix together nodes having data to be recv'ed from the socket into the local transporter buffers, and those nodes having 'leftover' data in the tranporter buffers which has to be unpacked. 'NodeBitmask m_recv_transporters' has been introduced to keep track of those nodes which pollReceive() detected to have data to be recv'ed. After being recv'ed, 'm_has_data_transporters' will still represent those nodes with data to be unpacked. This patch also cleans up and simplify the handling of 'blocked' transporters as a side effect of the above changes: As the 'has_recv' and 'has_data' state of the transporters now are represented on seperate bitmask, we no longer need to save blocked transporters with 'has_data' into 'm_blocked_with_data. Instead we allow any blocked transporters which already 'has_data' (Received data buffered in tranporters) to drain these buffers. However, the blocked transporters are excluded from receiving any more data on these transportere. When unblocked, the next pollReceive() will detect the available data and allow them to be recv'ed. modified: storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp 4974 Ole John Aske 2012-09-13 Fix for Bug#14525176 'DUMP 9992' TO SIMULATE BLOCKED TRANSPORTER, AFFECT INCORRECT NODE node_id, instead if index into transporter array should be used to check/set the blocked transporters. modified: storage/ndb/src/common/transporter/TransporterRegistry.cpp === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-06-07 14:46:55 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-09-13 10:27:55 +0000 @@ -108,8 +108,15 @@ struct TransporterReceiveData NodeBitmask m_transporters; /** - * Bitmask of transporters that has data "carried over" since - * last performReceive + * Bitmask of transporters having data awaiting to be received + * from its transporter. + */ + NodeBitmask m_recv_transporters; + + /** + * Bitmask of transporters that has already received data buffered + * inside its transporter. Possibly "carried over" from last + * performReceive */ NodeBitmask m_has_data_transporters; #if defined(HAVE_EPOLL_CREATE) @@ -408,7 +415,6 @@ private: #ifdef ERROR_INSERT Bitmask m_blocked; - Bitmask m_blocked_with_data; Bitmask m_blocked_disconnected; int m_disconnect_errors[MAX_NTRANSPORTERS]; #endif === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 09:35:22 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 10:27:55 +0000 @@ -225,7 +225,6 @@ TransporterRegistry::TransporterRegistry #ifdef ERROR_INSERT m_blocked.clear(); - m_blocked_with_data.clear(); m_blocked_disconnected.clear(); #endif // Initialize member variables @@ -1082,6 +1081,7 @@ TransporterRegistry::pollReceive(Uint32 assert((receiveHandle == &recvdata) || (receiveHandle == 0)); Uint32 retVal = 0; + recvdata.m_recv_transporters.clear(); /** * If any transporters have left-over data that was not fully executed in @@ -1131,20 +1131,12 @@ TransporterRegistry::pollReceive(Uint32 for (int i = 0; i < num_socket_events; i++) { const Uint32 trpid = recvdata.m_epoll_events[i].data.u32; -#ifdef ERROR_INSERT - if (m_blocked.get(trpid)) - { - /* Don't pull from socket now, wait till unblocked */ - m_blocked_with_data.set(trpid); - continue; - } -#endif /** * check that it's assigned to "us" */ assert(recvdata.m_transporters.get(trpid)); - recvdata.m_has_data_transporters.set(trpid); + recvdata.m_recv_transporters.set(trpid); } } else if (num_socket_events < 0) @@ -1293,7 +1285,7 @@ TransporterRegistry::poll_TCP(Uint32 tim if (recvdata.m_socket_poller.has_read(0)) { assert(recvdata.m_transporters.get(0)); - recvdata.m_has_data_transporters.set((Uint32)0); + recvdata.m_recv_transporters.set((Uint32)0); } } @@ -1303,16 +1295,8 @@ TransporterRegistry::poll_TCP(Uint32 tim if (idx[i] != MAX_NODES + 1) { Uint32 node_id = t->getRemoteNodeId(); -#ifdef ERROR_INSERT - if (m_blocked.get(node_id)) - { - /* Don't pull from socket now, wait till unblocked */ - m_blocked_with_data.set(node_id); - continue; - } -#endif if (recvdata.m_socket_poller.has_read(idx[i])) - recvdata.m_has_data_transporters.set(node_id); + recvdata.m_recv_transporters.set(node_id); } } } @@ -1331,18 +1315,22 @@ TransporterRegistry::performReceive(Tran bool hasReceived = false; - if (recvdata.m_has_data_transporters.get(0)) + if (recvdata.m_recv_transporters.get(0)) { assert(recvdata.m_transporters.get(0)); assert(&recvdata == receiveHandle); // not used by ndbmtd - recvdata.m_has_data_transporters.clear(Uint32(0)); + recvdata.m_recv_transporters.clear(Uint32(0)); consume_extra_sockets(); } #ifdef ERROR_INSERT if (!m_blocked.isclear()) { - if (recvdata.m_has_data_transporters.isclear()) + /* Exclude receive from blocked sockets. */ + recvdata.m_recv_transporters.bitANDC(m_blocked); + + if (recvdata.m_recv_transporters.isclear() && + recvdata.m_has_data_transporters.isclear()) { /* poll sees data, but we want to ignore for now * sleep a little to avoid busy loop @@ -1353,6 +1341,35 @@ TransporterRegistry::performReceive(Tran #endif #ifdef NDB_TCP_TRANSPORTER + /** + * Receive data from transporters polled to have data. + * Add to set of transported having pending data. + */ + for(Uint32 id = recvdata.m_recv_transporters.find_first(); + id != BitmaskImpl::NotFound; + id = recvdata.m_recv_transporters.find_next(id + 1)) + { + TCP_Transporter * t = (TCP_Transporter*)theTransporters[id]; + assert(recvdata.m_transporters.get(id)); + + if (is_connected(id)) + { + if (t->isConnected()) + { + int nBytes = t->doReceive(recvdata); + if (nBytes > 0) + { + recvdata.transporter_recv_from(id); + recvdata.m_has_data_transporters.set(id); + } + } + } + } + recvdata.m_recv_transporters.clear(); + + /** + * Handle data either received above or pending from prev rounds. + */ for(Uint32 id = recvdata.m_has_data_transporters.find_first(); id != BitmaskImpl::NotFound; id = recvdata.m_has_data_transporters.find_next(id + 1)) @@ -1366,16 +1383,20 @@ TransporterRegistry::performReceive(Tran { if (t->isConnected()) { - t->doReceive(recvdata); if (hasReceived) recvdata.checkJobBuffer(); hasReceived = true; Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); - recvdata.transporter_recv_from(id); Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]); - t->updateReceiveDataPtr(szUsed); - hasdata = t->hasReceiveData(); + if (likely(szUsed)) + { + t->updateReceiveDataPtr(szUsed); + hasdata = t->hasReceiveData(); + } + // else, we didn't unpack anything: + // Avail ReceiveData to short to be usefull, need to + // receive more before we can resume this transporter. } } // If transporter still have data, make sure that it's remember to next time @@ -1573,13 +1594,6 @@ TransporterRegistry::blockReceive(Transp assert(!m_blocked.get(nodeId)); m_blocked.set(nodeId); - - if (recvdata.m_has_data_transporters.get(nodeId)) - { - assert(!m_blocked_with_data.get(nodeId)); - m_blocked_with_data.set(nodeId); - recvdata.m_has_data_transporters.clear(nodeId); - } } void @@ -1598,11 +1612,6 @@ TransporterRegistry::unblockReceive(Tran m_blocked.clear(nodeId); - if (m_blocked_with_data.get(nodeId)) - { - recvdata.m_has_data_transporters.set(nodeId); - } - if (m_blocked_disconnected.get(nodeId)) { /* Process disconnect notification/handling now */ @@ -1753,6 +1762,7 @@ TransporterRegistry::report_disconnect(T #endif performStates[node_id] = DISCONNECTED; + recvdata.m_recv_transporters.clear(node_id); recvdata.m_has_data_transporters.clear(node_id); recvdata.reportDisconnect(node_id, errnum); DBUG_VOID_RETURN; No bundle (reason: useless for push emails).