List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:January 19 2012 12:09pm
Subject:bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4420 to 4421)
View as plain text  
 4421 Jonas Oreland	2012-01-19 [merge]
      ndb - merge 70 to 71

    modified:
      storage/ndb/include/transporter/TransporterRegistry.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
 4420 Martin Skold	2012-01-19 [merge]
      Merge

    modified:
      mysql-test/suite/ndb/r/ndb_condition_pushdown.result
      mysql-test/suite/ndb/t/ndb_condition_pushdown.test
      sql/ha_ndbcluster_cond.cc
      storage/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
      storage/ndb/src/kernel/error/ErrorReporter.cpp
      storage/ndb/src/kernel/vm/Emulator.cpp
      storage/ndb/src/kernel/vm/Emulator.hpp
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-01-16 13:56:30 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp	2012-01-19 11:48:54 +0000
@@ -103,6 +103,11 @@ struct TransporterReceiveData
   bool epoll_add(TCP_Transporter*);
 
   /**
+   * Bitmask of transporters currently handled by this instance
+   */
+  NodeBitmask m_transporters;
+
+  /**
    * Bitmask of transporters that has data "carried over" since
    *   last performReceive
    */

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-01-18 13:44:10 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2012-01-19 12:08:30 +0000
@@ -76,6 +76,13 @@ SocketServer::Session * TransporterServi
 
 TransporterReceiveData::TransporterReceiveData()
 {
+  /**
+   * With multi receiver threads
+   *   an interface to reassign these is needed...
+   */
+  m_transporters.set();            // Handle all
+  m_transporters.clear(Uint32(0)); // Except wakeup socket...
+
 #if defined(HAVE_EPOLL_CREATE)
   m_epoll_fd = -1;
   m_epoll_events = 0;
@@ -111,6 +118,7 @@ fallback:
 bool
 TransporterReceiveData::epoll_add(TCP_Transporter *t)
 {
+  assert(m_transporters.get(t->getRemoteNodeId()));
 #if defined(HAVE_EPOLL_CREATE)
   if (m_epoll_fd != -1)
   {
@@ -382,7 +390,7 @@ TransporterRegistry::init(NodeId nodeId)
 
   if (receiveHandle)
   {
-    if (!receiveHandle->init(maxTransporters))
+    if (!init(* receiveHandle))
       DBUG_RETURN(false);
   }
 
@@ -992,6 +1000,8 @@ TransporterRegistry::setup_wakeup_socket
     return true;
   }
 
+  assert(!recvdata.m_transporters.get(0));
+
   if (my_socketpair(m_extra_wakeup_sockets))
   {
     perror("socketpair failed!");
@@ -1025,6 +1035,7 @@ TransporterRegistry::setup_wakeup_socket
   }
 #endif
   m_has_extra_wakeup_socket = true;
+  recvdata.m_transporters.set(Uint32(0));
   return true;
 
 err:
@@ -1109,6 +1120,11 @@ TransporterRegistry::pollReceive(Uint32
           continue;
         }
 #endif
+        /**
+         * check that it's assigned to "us"
+         */
+        assert(recvdata.m_transporters.get(trpid));
+
         recvdata.m_has_data_transporters.set(trpid);
       }
     }
@@ -1153,6 +1169,10 @@ TransporterRegistry::poll_SCI(Uint32 tim
   {
     SCI_Transporter * t = theSCITransporters[i];
     Uint32 node_id = t->getRemoteNodeId();
+
+    if (!recvdata.m_transporters.get(nodeId))
+      continue;
+
     if (t->isConnected() && is_connected(node_id))
     {
       if (t->hasDataToRead())
@@ -1182,6 +1202,10 @@ TransporterRegistry::poll_SHM(Uint32 tim
     {
       SHM_Transporter * t = theSHMTransporters[i];
       Uint32 node_id = t->getRemoteNodeId();
+
+      if (!recvdata.m_transporters.get(node_id))
+        continue;
+
       if (t->isConnected() && is_connected(node_id))
       {
 	if (t->hasDataToRead())
@@ -1214,7 +1238,7 @@ TransporterRegistry::poll_TCP(Uint32 tim
 
   recvdata.m_socket_poller.clear();
 
-  if (m_has_extra_wakeup_socket)
+  if (m_has_extra_wakeup_socket && recvdata.m_transporters.get(0))
   {
     const NDB_SOCKET_TYPE socket = m_extra_wakeup_sockets[0];
 
@@ -1229,6 +1253,9 @@ TransporterRegistry::poll_TCP(Uint32 tim
     const NDB_SOCKET_TYPE socket = t->getSocket();
     Uint32 node_id = t->getRemoteNodeId();
 
+    if (!recvdata.m_transporters.get(node_id))
+      continue;
+
     if (is_connected(node_id) && t->isConnected() && my_socket_valid(socket))
     {
       idx[i] = recvdata.m_socket_poller.add(socket, true, false, false);
@@ -1285,6 +1312,7 @@ TransporterRegistry::performReceive(Tran
 
   if (recvdata.m_has_data_transporters.get(0))
   {
+    assert(recvdata.m_transporters.get(0));
     recvdata.m_has_data_transporters.clear(Uint32(0));
     consume_extra_sockets();
   }
@@ -1309,6 +1337,9 @@ TransporterRegistry::performReceive(Tran
   {
     bool hasdata = false;
     TCP_Transporter * t = (TCP_Transporter*)theTransporters[id];
+
+    assert(recvdata.m_transporters.get(id));
+
     if (is_connected(id))
     {
       if (t->isConnected())
@@ -1337,6 +1368,7 @@ TransporterRegistry::performReceive(Tran
   {
     SCI_Transporter  *t = theSCITransporters[i];
     const NodeId nodeId = t->getRemoteNodeId();
+    assert(recvdata.m_transporters.get(nodeId));
     if(is_connected(nodeId))
     {
       if(t->isConnected() && t->checkConnected())
@@ -1358,6 +1390,7 @@ TransporterRegistry::performReceive(Tran
   {
     SHM_Transporter *t = theSHMTransporters[i];
     const NodeId nodeId = t->getRemoteNodeId();
+    assert(recvdata.m_transporters.get(nodeId));
     if(is_connected(nodeId)){
       if(t->isConnected() && t->checkConnected())
       {
@@ -1509,6 +1542,7 @@ TransporterRegistry::blockReceive(Transp
                                   NodeId nodeId)
 {
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(nodeId));
 
   /* Check that node is not already blocked?
    * Stop pulling from its socket (but track received data etc)
@@ -1531,6 +1565,7 @@ TransporterRegistry::unblockReceive(Tran
                                     NodeId nodeId)
 {
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(nodeId));
 
   /* Check that node is blocked?
    * Resume pulling from its socket
@@ -1644,6 +1679,7 @@ TransporterRegistry::report_connect(Tran
                                     NodeId node_id)
 {
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(node_id));
 
   DBUG_ENTER("TransporterRegistry::report_connect");
   DBUG_PRINT("info",("performStates[%d]=CONNECTED",node_id));
@@ -1677,6 +1713,7 @@ TransporterRegistry::report_disconnect(T
                                        NodeId node_id, int errnum)
 {
   assert((receiveHandle == &recvdata) || (receiveHandle == 0));
+  assert(recvdata.m_transporters.get(node_id));
 
   DBUG_ENTER("TransporterRegistry::report_disconnect");
   DBUG_PRINT("info",("performStates[%d]=DISCONNECTED",node_id));
@@ -1735,6 +1772,8 @@ TransporterRegistry::update_connections(
     n++;
 
     const NodeId nodeId = t->getRemoteNodeId();
+    if (!recvdata.m_transporters.get(nodeId))
+      continue;
 
     TransporterError code = m_error_states[nodeId].m_code;
     const char *info = m_error_states[nodeId].m_info;

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4420 to 4421) Jonas Oreland20 Jan