List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:September 13 2012 10:33am
Subject:bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4974 to 4975)
Bug#14525521
View as plain text  
 4975 Ole John Aske	2012-09-13
      Fix for Bug#14525521 RECEIVER-THREAD MAY BUSY-WAIT FOR 
                           DATA TO BE RECEIVED
      
      This patch fixes two related problems wrt. how
      'NodeBitmask m_has_data_transporters' is maintained:
      
      1) If the remaining part of the already received data inside the
         transporter was insufficient to reconstruct the last signal,
         we should not count this node as 'm_has_data_transporters'.
         This will force us to wait for more data to be recv'ed before
         we can continue processing data from this transporter, and
         thus break the busy-loop.
      
      2) As described in the bug report, 'm_has_data_transporters' mix
         together nodes having data to be recv'ed from the socket into
         the local transporter buffers, and those nodes having 'leftover'
         data in the tranporter buffers which has to be unpacked.
      
         'NodeBitmask m_recv_transporters' has been introduced to keep
         track of those nodes which pollReceive() detected to have data
         to be recv'ed. After being recv'ed, 'm_has_data_transporters'
         will still represent those nodes with data to be unpacked.
      
      This patch also cleans up and simplify the handling of
      'blocked' transporters as a side effect of the above changes:
      
      As the 'has_recv' and 'has_data' state of the transporters now
      are represented on seperate bitmask, we no longer need to save
      blocked transporters with 'has_data' into 'm_blocked_with_data.
      
      Instead we allow any blocked transporters which already 'has_data'
      (Received data buffered in tranporters) to drain these buffers.
      However, the blocked transporters are excluded from receiving any
      more data on these transportere.
      
      When unblocked, the next pollReceive() will detect the available
      data and allow them to be recv'ed.

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
 4974 Ole John Aske	2012-09-13
      Fix for Bug#14525176 'DUMP 9992' TO SIMULATE BLOCKED TRANSPORTER, AFFECT INCORRECT NODE
      
      node_id, instead if index into transporter array should be used to check/set
      the blocked transporters. 

    modified:
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-06-07 14:46:55 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-09-13 10:27:55 +0000
@@ -108,8 +108,15 @@ struct TransporterReceiveData
   NodeBitmask m_transporters;
 
   /**
-   * Bitmask of transporters that has data "carried over" since
-   *   last performReceive
+   * Bitmask of transporters having data awaiting to be received
+   * from its transporter.
+   */
+  NodeBitmask m_recv_transporters;
+
+  /**
+   * Bitmask of transporters that has already received data buffered
+   * inside its transporter. Possibly "carried over" from last 
+   * performReceive
    */
   NodeBitmask m_has_data_transporters;
 #if defined(HAVE_EPOLL_CREATE)
@@ -408,7 +415,6 @@ private:
 
 #ifdef ERROR_INSERT
   Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
-  Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
   Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
   int m_disconnect_errors[MAX_NTRANSPORTERS];
 #endif

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-09-13 09:35:22 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-09-13 10:27:55 +0000
@@ -225,7 +225,6 @@ TransporterRegistry::TransporterRegistry
 
 #ifdef ERROR_INSERT
   m_blocked.clear();
-  m_blocked_with_data.clear();
   m_blocked_disconnected.clear();
 #endif
   // Initialize member variables
@@ -1082,6 +1081,7 @@ TransporterRegistry::pollReceive(Uint32 
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
 
   Uint32 retVal = 0;
+  recvdata.m_recv_transporters.clear();
 
   /**
    * If any transporters have left-over data that was not fully executed in
@@ -1131,20 +1131,12 @@ TransporterRegistry::pollReceive(Uint32 
       for (int i = 0; i < num_socket_events; i++)
       {
         const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
-#ifdef ERROR_INSERT
-        if (m_blocked.get(trpid))
-        {
-          /* Don't pull from socket now, wait till unblocked */
-          m_blocked_with_data.set(trpid);
-          continue;
-        }
-#endif
         /**
          * check that it's assigned to "us"
          */
         assert(recvdata.m_transporters.get(trpid));
 
-        recvdata.m_has_data_transporters.set(trpid);
+        recvdata.m_recv_transporters.set(trpid);
       }
     }
     else if (num_socket_events < 0)
@@ -1293,7 +1285,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
       if (recvdata.m_socket_poller.has_read(0))
       {
         assert(recvdata.m_transporters.get(0));
-        recvdata.m_has_data_transporters.set((Uint32)0);
+        recvdata.m_recv_transporters.set((Uint32)0);
       }
     }
 
@@ -1303,16 +1295,8 @@ TransporterRegistry::poll_TCP(Uint32 tim
       if (idx[i] != MAX_NODES + 1)
       {
         Uint32 node_id = t->getRemoteNodeId();
-#ifdef ERROR_INSERT
-        if (m_blocked.get(node_id))
-        {
-          /* Don't pull from socket now, wait till unblocked */
-          m_blocked_with_data.set(node_id);
-          continue;
-        }
-#endif
         if (recvdata.m_socket_poller.has_read(idx[i]))
-          recvdata.m_has_data_transporters.set(node_id);
+          recvdata.m_recv_transporters.set(node_id);
       }
     }
   }
@@ -1331,18 +1315,22 @@ TransporterRegistry::performReceive(Tran
 
   bool hasReceived = false;
 
-  if (recvdata.m_has_data_transporters.get(0))
+  if (recvdata.m_recv_transporters.get(0))
   {
     assert(recvdata.m_transporters.get(0));
     assert(&recvdata == receiveHandle); // not used by ndbmtd
-    recvdata.m_has_data_transporters.clear(Uint32(0));
+    recvdata.m_recv_transporters.clear(Uint32(0));
     consume_extra_sockets();
   }
 
 #ifdef ERROR_INSERT
   if (!m_blocked.isclear())
   {
-    if (recvdata.m_has_data_transporters.isclear())
+    /* Exclude receive from blocked sockets. */
+    recvdata.m_recv_transporters.bitANDC(m_blocked);
+
+    if (recvdata.m_recv_transporters.isclear()  &&
+        recvdata.m_has_data_transporters.isclear())
     {
         /* poll sees data, but we want to ignore for now
          * sleep a little to avoid busy loop
@@ -1353,6 +1341,35 @@ TransporterRegistry::performReceive(Tran
 #endif
 
 #ifdef NDB_TCP_TRANSPORTER
+  /**
+   * Receive data from transporters polled to have data.
+   * Add to set of transported having pending data.
+   */
+  for(Uint32 id = recvdata.m_recv_transporters.find_first();
+      id != BitmaskImpl::NotFound;
+      id = recvdata.m_recv_transporters.find_next(id + 1))
+  {
+    TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
+    assert(recvdata.m_transporters.get(id));
+
+    if (is_connected(id))
+    {
+      if (t->isConnected())
+      {
+        int nBytes = t->doReceive(recvdata);
+        if (nBytes > 0)
+        {
+          recvdata.transporter_recv_from(id);
+          recvdata.m_has_data_transporters.set(id);
+        }
+      }
+    }
+  }
+  recvdata.m_recv_transporters.clear();
+
+  /**
+   * Handle data either received above or pending from prev rounds.
+   */
   for(Uint32 id = recvdata.m_has_data_transporters.find_first();
       id != BitmaskImpl::NotFound;
       id = recvdata.m_has_data_transporters.find_next(id + 1))
@@ -1366,16 +1383,20 @@ TransporterRegistry::performReceive(Tran
     {
       if (t->isConnected())
       {
-        t->doReceive(recvdata);
         if (hasReceived)
           recvdata.checkJobBuffer();
         hasReceived = true;
         Uint32 * ptr;
         Uint32 sz = t->getReceiveData(&ptr);
-        recvdata.transporter_recv_from(id);
         Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
-        t->updateReceiveDataPtr(szUsed);
-        hasdata = t->hasReceiveData();
+        if (likely(szUsed))
+        {
+          t->updateReceiveDataPtr(szUsed);
+          hasdata = t->hasReceiveData();
+        }
+        // else, we didn't unpack anything:
+        //   Avail ReceiveData to short to be usefull, need to
+        //   receive more before we can resume this transporter.
       }
     }
     // If transporter still have data, make sure that it's remember to next time
@@ -1573,13 +1594,6 @@ TransporterRegistry::blockReceive(Transp
   assert(!m_blocked.get(nodeId));
 
   m_blocked.set(nodeId);
-
-  if (recvdata.m_has_data_transporters.get(nodeId))
-  {
-    assert(!m_blocked_with_data.get(nodeId));
-    m_blocked_with_data.set(nodeId);
-    recvdata.m_has_data_transporters.clear(nodeId);
-  }
 }
 
 void
@@ -1598,11 +1612,6 @@ TransporterRegistry::unblockReceive(Tran
 
   m_blocked.clear(nodeId);
 
-  if (m_blocked_with_data.get(nodeId))
-  {
-    recvdata.m_has_data_transporters.set(nodeId);
-  }
-
   if (m_blocked_disconnected.get(nodeId))
   {
     /* Process disconnect notification/handling now */
@@ -1753,6 +1762,7 @@ TransporterRegistry::report_disconnect(T
 #endif
 
   performStates[node_id] = DISCONNECTED;
+  recvdata.m_recv_transporters.clear(node_id);
   recvdata.m_has_data_transporters.clear(node_id);
   recvdata.reportDisconnect(node_id, errnum);
   DBUG_VOID_RETURN;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4974 to 4975)Bug#14525521Ole John Aske13 Sep