List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:September 13 2012 10:33am
Subject:bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4604 to 4605)
View as plain text  
 4605 Ole John Aske	2012-09-13 [merge]
      Merge 7.0 -> 7.1

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
 4604 Ole John Aske	2012-09-13 [merge]
      Merge 7.0 -> 7.1

    modified:
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-06-07 14:46:55 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-09-13 10:27:55 +0000
@@ -108,8 +108,15 @@ struct TransporterReceiveData
   NodeBitmask m_transporters;
 
   /**
-   * Bitmask of transporters that has data "carried over" since
-   *   last performReceive
+   * Bitmask of transporters having data awaiting to be received
+   * from its transporter.
+   */
+  NodeBitmask m_recv_transporters;
+
+  /**
+   * Bitmask of transporters that has already received data buffered
+   * inside its transporter. Possibly "carried over" from last 
+   * performReceive
    */
   NodeBitmask m_has_data_transporters;
 #if defined(HAVE_EPOLL_CREATE)
@@ -408,7 +415,6 @@ private:
 
 #ifdef ERROR_INSERT
   Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
-  Bitmask<MAX_NTRANSPORTERS/32> m_blocked_with_data;
   Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
   int m_disconnect_errors[MAX_NTRANSPORTERS];
 #endif

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-09-13 09:36:35 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-09-13 10:28:59 +0000
@@ -225,7 +225,6 @@ TransporterRegistry::TransporterRegistry
 
 #ifdef ERROR_INSERT
   m_blocked.clear();
-  m_blocked_with_data.clear();
   m_blocked_disconnected.clear();
 #endif
   // Initialize member variables
@@ -1082,6 +1081,7 @@ TransporterRegistry::pollReceive(Uint32 
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
 
   Uint32 retVal = 0;
+  recvdata.m_recv_transporters.clear();
 
   /**
    * If any transporters have left-over data that was not fully executed in
@@ -1131,20 +1131,12 @@ TransporterRegistry::pollReceive(Uint32 
       for (int i = 0; i < num_socket_events; i++)
       {
         const Uint32 trpid = recvdata.m_epoll_events[i].data.u32;
-#ifdef ERROR_INSERT
-        if (m_blocked.get(trpid))
-        {
-          /* Don't pull from socket now, wait till unblocked */
-          m_blocked_with_data.set(trpid);
-          continue;
-        }
-#endif
         /**
          * check that it's assigned to "us"
          */
         assert(recvdata.m_transporters.get(trpid));
 
-        recvdata.m_has_data_transporters.set(trpid);
+        recvdata.m_recv_transporters.set(trpid);
       }
     }
     else if (num_socket_events < 0)
@@ -1293,7 +1285,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
       if (recvdata.m_socket_poller.has_read(0))
       {
         assert(recvdata.m_transporters.get(0));
-        recvdata.m_has_data_transporters.set((Uint32)0);
+        recvdata.m_recv_transporters.set((Uint32)0);
       }
     }
 
@@ -1303,16 +1295,8 @@ TransporterRegistry::poll_TCP(Uint32 tim
       if (idx[i] != MAX_NODES + 1)
       {
         Uint32 node_id = t->getRemoteNodeId();
-#ifdef ERROR_INSERT
-        if (m_blocked.get(node_id))
-        {
-          /* Don't pull from socket now, wait till unblocked */
-          m_blocked_with_data.set(node_id);
-          continue;
-        }
-#endif
         if (recvdata.m_socket_poller.has_read(idx[i]))
-          recvdata.m_has_data_transporters.set(node_id);
+          recvdata.m_recv_transporters.set(node_id);
       }
     }
   }
@@ -1331,18 +1315,22 @@ TransporterRegistry::performReceive(Tran
 
   bool hasReceived = false;
 
-  if (recvdata.m_has_data_transporters.get(0))
+  if (recvdata.m_recv_transporters.get(0))
   {
     assert(recvdata.m_transporters.get(0));
     assert(&recvdata == receiveHandle); // not used by ndbmtd
-    recvdata.m_has_data_transporters.clear(Uint32(0));
+    recvdata.m_recv_transporters.clear(Uint32(0));
     consume_extra_sockets();
   }
 
 #ifdef ERROR_INSERT
   if (!m_blocked.isclear())
   {
-    if (recvdata.m_has_data_transporters.isclear())
+    /* Exclude receive from blocked sockets. */
+    recvdata.m_recv_transporters.bitANDC(m_blocked);
+
+    if (recvdata.m_recv_transporters.isclear()  &&
+        recvdata.m_has_data_transporters.isclear())
     {
         /* poll sees data, but we want to ignore for now
          * sleep a little to avoid busy loop
@@ -1353,6 +1341,35 @@ TransporterRegistry::performReceive(Tran
 #endif
 
 #ifdef NDB_TCP_TRANSPORTER
+  /**
+   * Receive data from transporters polled to have data.
+   * Add to set of transported having pending data.
+   */
+  for(Uint32 id = recvdata.m_recv_transporters.find_first();
+      id != BitmaskImpl::NotFound;
+      id = recvdata.m_recv_transporters.find_next(id + 1))
+  {
+    TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
+    assert(recvdata.m_transporters.get(id));
+
+    if (is_connected(id))
+    {
+      if (t->isConnected())
+      {
+        int nBytes = t->doReceive(recvdata);
+        if (nBytes > 0)
+        {
+          recvdata.transporter_recv_from(id);
+          recvdata.m_has_data_transporters.set(id);
+        }
+      }
+    }
+  }
+  recvdata.m_recv_transporters.clear();
+
+  /**
+   * Handle data either received above or pending from prev rounds.
+   */
   for(Uint32 id = recvdata.m_has_data_transporters.find_first();
       id != BitmaskImpl::NotFound;
       id = recvdata.m_has_data_transporters.find_next(id + 1))
@@ -1366,16 +1383,20 @@ TransporterRegistry::performReceive(Tran
     {
       if (t->isConnected())
       {
-        t->doReceive(recvdata);
         if (hasReceived)
           recvdata.checkJobBuffer();
         hasReceived = true;
         Uint32 * ptr;
         Uint32 sz = t->getReceiveData(&ptr);
-        recvdata.transporter_recv_from(id);
         Uint32 szUsed = unpack(recvdata, ptr, sz, id, ioStates[id]);
-        t->updateReceiveDataPtr(szUsed);
-        hasdata = t->hasReceiveData();
+        if (likely(szUsed))
+        {
+          t->updateReceiveDataPtr(szUsed);
+          hasdata = t->hasReceiveData();
+        }
+        // else, we didn't unpack anything:
+        //   Avail ReceiveData to short to be usefull, need to
+        //   receive more before we can resume this transporter.
       }
     }
     // If transporter still have data, make sure that it's remember to next time
@@ -1573,13 +1594,6 @@ TransporterRegistry::blockReceive(Transp
   assert(!m_blocked.get(nodeId));
 
   m_blocked.set(nodeId);
-
-  if (recvdata.m_has_data_transporters.get(nodeId))
-  {
-    assert(!m_blocked_with_data.get(nodeId));
-    m_blocked_with_data.set(nodeId);
-    recvdata.m_has_data_transporters.clear(nodeId);
-  }
 }
 
 void
@@ -1598,11 +1612,6 @@ TransporterRegistry::unblockReceive(Tran
 
   m_blocked.clear(nodeId);
 
-  if (m_blocked_with_data.get(nodeId))
-  {
-    recvdata.m_has_data_transporters.set(nodeId);
-  }
-
   if (m_blocked_disconnected.get(nodeId))
   {
     /* Process disconnect notification/handling now */
@@ -1753,6 +1762,7 @@ TransporterRegistry::report_disconnect(T
 #endif
 
   performStates[node_id] = DISCONNECTED;
+  recvdata.m_recv_transporters.clear(node_id);
   recvdata.m_has_data_transporters.clear(node_id);
   recvdata.reportDisconnect(node_id, errnum);
   DBUG_VOID_RETURN;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4604 to 4605) Ole John Aske13 Sep