From: Ole John Aske Date: September 13 2012 10:33am Subject: bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4604 to 4605) List-Archive: http://lists.mysql.com/commits/144768 Message-Id: <20120913103308.22430.49893.4605@fimafeng09.no.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4605 Ole John Aske 2012-09-13 [merge] Merge 7.0 -> 7.1 modified: storage/ndb/include/transporter/TransporterRegistry.hpp storage/ndb/src/common/transporter/TransporterRegistry.cpp 4604 Ole John Aske 2012-09-13 [merge] Merge 7.0 -> 7.1 modified: storage/ndb/src/common/transporter/TransporterRegistry.cpp === modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp' --- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-06-07 14:46:55 +0000 +++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2012-09-13 10:27:55 +0000 @@ -108,8 +108,15 @@ struct TransporterReceiveData NodeBitmask m_transporters; /** - * Bitmask of transporters that has data "carried over" since - * last performReceive + * Bitmask of transporters having data awaiting to be received + * from its transporter. + */ + NodeBitmask m_recv_transporters; + + /** + * Bitmask of transporters that has already received data buffered + * inside its transporter. Possibly "carried over" from last + * performReceive */ NodeBitmask m_has_data_transporters; #if defined(HAVE_EPOLL_CREATE) @@ -408,7 +415,6 @@ private: #ifdef ERROR_INSERT Bitmask m_blocked; - Bitmask m_blocked_with_data; Bitmask m_blocked_disconnected; int m_disconnect_errors[MAX_NTRANSPORTERS]; #endif === modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp' --- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 09:36:35 +0000 +++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2012-09-13 10:28:59 +0000 @@ -225,7 +225,6 @@ TransporterRegistry::TransporterRegistry #ifdef ERROR_INSERT m_blocked.clear(); - m_blocked_with_data.clear(); m_blocked_disconnected.clear(); #endif // Initialize member variables @@ -1082,6 +1081,7 @@ TransporterRegistry::pollReceive(Uint32 assert((receiveHandle == &recvdata) || (receiveHandle == 0)); Uint32 retVal = 0; + recvdata.m_recv_transporters.clear(); /** * If any transporters have left-over data that was not fully executed in @@ -1131,20 +1131,12 @@ TransporterRegistry::pollReceive(Uint32 for (int i = 0; i < num_socket_events; i++) { const Uint32 trpid = recvdata.m_epoll_events[i].data.u32; -#ifdef ERROR_INSERT - if (m_blocked.get(trpid)) - { - /* Don't pull from socket now, wait till unblocked */ - m_blocked_with_data.set(trpid); - continue; - } -#endif /** * check that it's assigned to "us" */ assert(recvdata.m_transporters.get(trpid)); - recvdata.m_has_data_transporters.set(trpid); + recvdata.m_recv_transporters.set(trpid); } } else if (num_socket_events < 0) @@ -1293,7 +1285,7 @@ TransporterRegistry::poll_TCP(Uint32 tim if (recvdata.m_socket_poller.has_read(0)) { assert(recvdata.m_transporters.get(0)); - recvdata.m_has_data_transporters.set((Uint32)0); + recvdata.m_recv_transporters.set((Uint32)0); } } @@ -1303,16 +1295,8 @@ TransporterRegistry::poll_TCP(Uint32 tim if (idx[i] != MAX_NODES + 1) { Uint32 node_id = t->getRemoteNodeId(); -#ifdef ERROR_INSERT - if (m_blocked.get(node_id)) - { - /* Don't pull from socket now, wait till unblocked */ - m_blocked_with_data.set(node_id); - continue; - } -#endif if (recvdata.m_socket_poller.has_read(idx[i])) - recvdata.m_has_data_transporters.set(node_id); + recvdata.m_recv_transporters.set(node_id); } } } @@ -1331,18 +1315,22 @@ TransporterRegistry::performReceive(Tran bool hasReceived = false; - if (recvdata.m_has_data_transporters.get(0)) + if (recvdata.m_recv_transporters.get(0)) { assert(recvdata.m_transporters.get(0)); assert(&recvdata == receiveHandle); // not used by ndbmtd - recvdata.m_has_data_transporters.clear(Uint32(0)); + recvdata.m_recv_transporters.clear(Uint32(0)); consume_extra_sockets(); } #ifdef ERROR_INSERT if (!m_blocked.isclear()) { - if (recvdata.m_has_data_transporters.isclear()) + /* Exclude receive from blocked sockets. */ + recvdata.m_recv_transporters.bitANDC(m_blocked); + + if (recvdata.m_recv_transporters.isclear() && + recvdata.m_has_data_transporters.isclear()) { /* poll sees data, but we want to ignore for now * sleep a little to avoid busy loop @@ -1353,6 +1341,35 @@ TransporterRegistry::performReceive(Tran #endif #ifdef NDB_TCP_TRANSPORTER + /** + * Receive data from transporters polled to have data. + * Add to set of transported having pending data. + */ + for(Uint32 id = recvdata.m_recv_transporters.find_first(); + id != BitmaskImpl::NotFound; + id = recvdata.m_recv_transporters.find_next(id + 1)) + { + TCP_Transporter * t = (TCP_Transporter*)theTransporters[id]; + assert(recvdata.m_transporters.get(id)); + + if (is_connected(id)) + { + if (t->isConnected()) + { + int nBytes = t->doReceive(recvdata); + if (nBytes > 0) + { + recvdata.transporter_recv_from(id); + recvdata.m_has_data_transporters.set(id); + } + } + } + } + recvdata.m_recv_transporters.clear(); + + /** + * Handle data either received above or pending from prev rounds. + */ for(Uint32 id = recvdata.m_has_data_transporters.find_first(); id != BitmaskImpl::NotFound; id = recvdata.m_has_data_transporters.find_next(id + 1)) @@ -1366,16 +1383,20 @@ TransporterRegistry::performReceive(Tran { if (t->isConnected()) { - t->doReceive(recvdata); if (hasReceived) recvdata.checkJobBuffer(); hasReceived = true; Uint32 * ptr; Uint32 sz = t->getReceiveData(&ptr); - recvdata.transporter_recv_from(id); Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]); - t->updateReceiveDataPtr(szUsed); - hasdata = t->hasReceiveData(); + if (likely(szUsed)) + { + t->updateReceiveDataPtr(szUsed); + hasdata = t->hasReceiveData(); + } + // else, we didn't unpack anything: + // Avail ReceiveData to short to be usefull, need to + // receive more before we can resume this transporter. } } // If transporter still have data, make sure that it's remember to next time @@ -1573,13 +1594,6 @@ TransporterRegistry::blockReceive(Transp assert(!m_blocked.get(nodeId)); m_blocked.set(nodeId); - - if (recvdata.m_has_data_transporters.get(nodeId)) - { - assert(!m_blocked_with_data.get(nodeId)); - m_blocked_with_data.set(nodeId); - recvdata.m_has_data_transporters.clear(nodeId); - } } void @@ -1598,11 +1612,6 @@ TransporterRegistry::unblockReceive(Tran m_blocked.clear(nodeId); - if (m_blocked_with_data.get(nodeId)) - { - recvdata.m_has_data_transporters.set(nodeId); - } - if (m_blocked_disconnected.get(nodeId)) { /* Process disconnect notification/handling now */ @@ -1753,6 +1762,7 @@ TransporterRegistry::report_disconnect(T #endif performStates[node_id] = DISCONNECTED; + recvdata.m_recv_transporters.clear(node_id); recvdata.m_has_data_transporters.clear(node_id); recvdata.reportDisconnect(node_id, errnum); DBUG_VOID_RETURN; No bundle (reason: useless for push emails).