List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:December 15 2010 1:55pm
Subject:bzr commit into mysql-5.1-telco-7.1 branch (jonas:4022)
View as plain text  
#At file:///home/jonas/src/telco-7.1/ based on revid:craig.russell@stripped

 4022 Jonas Oreland	2010-12-15 [merge]
      ndb - merge 70 to 71

    added:
      storage/ndb/src/common/transporter/Loopback_Transporter.cpp
      storage/ndb/src/common/transporter/Loopback_Transporter.hpp
    modified:
      storage/ndb/include/mgmcommon/IPCConfig.hpp
      storage/ndb/include/util/BaseString.hpp
      storage/ndb/src/common/mgmcommon/IPCConfig.cpp
      storage/ndb/src/common/transporter/CMakeLists.txt
      storage/ndb/src/common/transporter/Makefile.am
      storage/ndb/src/common/transporter/TCP_Transporter.cpp
      storage/ndb/src/common/transporter/TCP_Transporter.hpp
      storage/ndb/src/common/transporter/Transporter.hpp
      storage/ndb/src/common/transporter/TransporterRegistry.cpp
      storage/ndb/src/common/util/BaseString.cpp
      storage/ndb/src/mgmsrv/MgmtSrvr.cpp
      storage/ndb/src/ndbapi/ClusterMgr.cpp
      storage/ndb/src/ndbapi/ClusterMgr.hpp
      storage/ndb/src/ndbapi/SignalSender.cpp
      storage/ndb/src/ndbapi/TransporterFacade.cpp
      storage/ndb/src/ndbapi/trp_client.cpp
      storage/ndb/src/ndbapi/trp_client.hpp
=== modified file 'storage/ndb/include/mgmcommon/IPCConfig.hpp'
--- a/storage/ndb/include/mgmcommon/IPCConfig.hpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/mgmcommon/IPCConfig.hpp	2010-12-15 06:07:35 +0000
@@ -33,7 +33,8 @@ struct IPCConfig
   */
   static bool configureTransporters(Uint32 nodeId,
                                     const struct ndb_mgm_configuration &,
-                                    class TransporterRegistry &);
+                                    class TransporterRegistry &,
+                                    bool transporter_to_self = false);
 };
 
 #endif // IPCConfig_H

=== modified file 'storage/ndb/include/util/BaseString.hpp'
--- a/storage/ndb/include/util/BaseString.hpp	2010-09-21 07:36:08 +0000
+++ b/storage/ndb/include/util/BaseString.hpp	2010-12-14 12:53:32 +0000
@@ -35,6 +35,9 @@ public:
   /** @brief Constructs a copy of a char * */
   BaseString(const char* s);
 
+  /** @brief Constructs a copy of a char * with length */
+  BaseString(const char* s, size_t len);
+
   /** @brief Constructs a copy of another BaseString */
   BaseString(const BaseString& str);
 

=== modified file 'storage/ndb/src/common/mgmcommon/IPCConfig.cpp'
--- a/storage/ndb/src/common/mgmcommon/IPCConfig.cpp	2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/common/mgmcommon/IPCConfig.cpp	2010-12-15 06:07:35 +0000
@@ -43,7 +43,8 @@ static bool is_mgmd(Uint32 nodeId,
 bool
 IPCConfig::configureTransporters(Uint32 nodeId,
                                  const struct ndb_mgm_configuration & config,
-                                 class TransporterRegistry & tr)
+                                 class TransporterRegistry & tr,
+                                 bool transporter_to_self)
 {
   bool result= true;
 
@@ -98,6 +99,7 @@ IPCConfig::configureTransporters(Uint32 
   }
 
   TransporterConfiguration conf;
+  TransporterConfiguration loopback_conf;
   ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
   for(iter.first(); iter.valid(); iter.next()){
     
@@ -110,6 +112,11 @@ IPCConfig::configureTransporters(Uint32 
     if(nodeId1 != nodeId && nodeId2 != nodeId) continue;
     remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
 
+    if (nodeId1 == nodeId && nodeId2 == nodeId)
+    {
+      transporter_to_self = false; // One already present..ignore extra arg
+    }
+
     {
       const char * host1= 0, * host2= 0;
       iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
@@ -260,6 +267,7 @@ IPCConfig::configureTransporters(Uint32 
       DBUG_PRINT("info", ("Configured TCP Transporter: sendBufferSize = %d, "
 			  "maxReceiveSize = %d", conf.tcp.sendBufferSize,
 			  conf.tcp.maxReceiveSize));
+      loopback_conf = conf; // reuse it...
       break;
     default:
       ndbout << "Unknown transporter type from: " << nodeId << 
@@ -268,6 +276,21 @@ IPCConfig::configureTransporters(Uint32 
     } // switch
   } // for
 
+  if (transporter_to_self)
+  {
+    loopback_conf.remoteNodeId = nodeId;
+    loopback_conf.localNodeId = nodeId;
+    loopback_conf.serverNodeId = 0; // always client
+    loopback_conf.remoteHostName = "localhost";
+    loopback_conf.localHostName = "localhost";
+    loopback_conf.s_port = 1; // prevent asking ndb_mgmd for port...
+    if (!tr.configureTransporter(&loopback_conf))
+    {
+      ndbout_c("Failed to configure Loopback Transporter");
+      result= false;
+    }
+  }
+
   DBUG_RETURN(result);
 }
   

=== modified file 'storage/ndb/src/common/transporter/CMakeLists.txt'
--- a/storage/ndb/src/common/transporter/CMakeLists.txt	2008-08-21 05:05:59 +0000
+++ b/storage/ndb/src/common/transporter/CMakeLists.txt	2010-12-15 06:07:35 +0000
@@ -23,6 +23,6 @@ INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/
                     ${NDB_SCI_INCLUDES})
 
 ADD_LIBRARY(ndbtransport STATIC
-            Transporter.cpp TCP_Transporter.cpp
+            Transporter.cpp TCP_Transporter.cpp Loopback_Transporter.cpp
             TransporterRegistry.cpp Packer.cpp)
 

=== added file 'storage/ndb/src/common/transporter/Loopback_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Loopback_Transporter.cpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/transporter/Loopback_Transporter.cpp	2010-12-15 06:07:35 +0000
@@ -0,0 +1,194 @@
+/*
+   Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#include <ndb_global.h>
+
+#include "Loopback_Transporter.hpp"
+#include <NdbOut.hpp>
+#include <NdbSleep.h>
+
+#include <EventLogger.hpp>
+extern EventLogger * g_eventLogger;
+// End of stuff to be moved
+
+
+Loopback_Transporter::Loopback_Transporter(TransporterRegistry &t_reg,
+                                           const TransporterConfiguration* conf)
+  : TCP_Transporter(t_reg, conf)
+{
+  assert(isServer == false);
+}
+
+Loopback_Transporter::~Loopback_Transporter()
+{
+}
+
+bool
+Loopback_Transporter::connect_client()
+{
+  NDB_SOCKET_TYPE pair[2];
+  if (my_socketpair(pair))
+  {
+    perror("socketpair failed!");
+    return false;
+  }
+
+  if (!TCP_Transporter::setSocketNonBlocking(pair[0]) ||
+      !TCP_Transporter::setSocketNonBlocking(pair[1]))
+  {
+    goto err;
+  }
+
+  theSocket = pair[0];
+  m_send_socket = pair[1];
+  m_connected = true;
+  return true;
+
+err:
+  my_socket_close(pair[0]);
+  my_socket_close(pair[1]);
+  return false;
+}
+
+void
+Loopback_Transporter::disconnectImpl()
+{
+  NDB_SOCKET_TYPE pair[] = { theSocket, m_send_socket };
+
+  get_callback_obj()->lock_transporter(remoteNodeId);
+
+  receiveBuffer.clear();
+  my_socket_invalidate(&theSocket);
+  my_socket_invalidate(&m_send_socket);
+
+  get_callback_obj()->unlock_transporter(remoteNodeId);
+
+  if (my_socket_valid(pair[0]))
+    my_socket_close(pair[0]);
+
+  if (my_socket_valid(pair[1]))
+    my_socket_close(pair[1]);
+}
+
+bool
+Loopback_Transporter::send_is_possible(int timeout_millisec) const
+{
+  return TCP_Transporter::send_is_possible(m_send_socket, timeout_millisec);
+}
+
+#define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
+                                 (!((sz == -1) && ((e == SOCKET_EAGAIN) || (e == SOCKET_EWOULDBLOCK) || (e == SOCKET_EINTR)))))
+
+int
+Loopback_Transporter::doSend() {
+  struct iovec iov[64];
+  Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
+
+  if (cnt == 0)
+  {
+    return 0;
+  }
+
+  Uint32 sum = 0;
+  for(Uint32 i = 0; i<cnt; i++)
+  {
+    assert(iov[i].iov_len);
+    sum += iov[i].iov_len;
+  }
+
+  Uint32 pos = 0;
+  Uint32 sum_sent = 0;
+  Uint32 send_cnt = 0;
+  Uint32 remain = sum;
+
+  if (cnt == NDB_ARRAY_SIZE(iov))
+  {
+    // If pulling all iov's make sure that we never return everyting
+    // flushed
+    sum++;
+  }
+
+  while (send_cnt < 5)
+  {
+    send_cnt++;
+    Uint32 iovcnt = cnt > m_os_max_iovec ? m_os_max_iovec : cnt;
+    int nBytesSent = (int)my_socket_writev(m_send_socket, iov+pos, iovcnt);
+    assert(nBytesSent <= (int)remain);
+
+    if (Uint32(nBytesSent) == remain)
+    {
+      sum_sent += nBytesSent;
+      goto ok;
+    }
+    else if (nBytesSent > 0)
+    {
+      sum_sent += nBytesSent;
+      remain -= nBytesSent;
+
+      /**
+       * Forward in iovec
+       */
+      while (Uint32(nBytesSent) >= iov[pos].iov_len)
+      {
+        assert(iov[pos].iov_len > 0);
+        nBytesSent -= iov[pos].iov_len;
+        pos++;
+        cnt--;
+      }
+
+      if (nBytesSent)
+      {
+        assert(iov[pos].iov_len > Uint32(nBytesSent));
+        iov[pos].iov_len -= nBytesSent;
+        iov[pos].iov_base = ((char*)(iov[pos].iov_base))+nBytesSent;
+      }
+      continue;
+    }
+    else
+    {
+      int err = my_socket_errno();
+      if (!(DISCONNECT_ERRNO(err, nBytesSent)))
+      {
+        if (sum_sent)
+        {
+          goto ok;
+        }
+        else
+        {
+          return remain;
+        }
+      }
+
+      do_disconnect(err);
+      return 0;
+    }
+  }
+
+ok:
+  assert(sum >= sum_sent);
+  iovec_data_sent(sum_sent);
+  sendCount += send_cnt;
+  sendSize  += sum_sent;
+  if(sendCount >= reportFreq)
+  {
+    get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
+    sendCount = 0;
+    sendSize  = 0;
+  }
+
+  return sum - sum_sent; // 0 if every thing flushed else >0
+}

=== added file 'storage/ndb/src/common/transporter/Loopback_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Loopback_Transporter.hpp	1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/transporter/Loopback_Transporter.hpp	2010-12-15 06:07:35 +0000
@@ -0,0 +1,70 @@
+/*
+   Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; version 2 of the License.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
+*/
+
+#ifndef LOOPBACK_TRANSPORTER_HPP
+#define LOOPBACK_TRANSPORTER_HPP
+
+#include "TCP_Transporter.hpp"
+
+/**
+ * This implements a connection to self,
+ *   by using a socketpair...
+ * where theSocket is the receive part, and m_send_socket is the write part
+ */
+class Loopback_Transporter : public TCP_Transporter
+{
+  friend class TransporterRegistry;
+private:
+  // Initialize member variables
+  Loopback_Transporter(TransporterRegistry&,
+                       const TransporterConfiguration* conf);
+
+  // Disconnect, delete send buffers and receive buffer
+  virtual ~Loopback_Transporter();
+
+  /**
+   * overloads TCP_Transporter::doSend
+   */
+  virtual int doSend();
+
+  /**
+   * setup socket pair
+   * @overload Transporter::connect_client()
+   */
+  virtual bool connect_client();
+
+  /**
+   * @overload TCP_Transporter::disconnectImpl
+   */
+  virtual void disconnectImpl();
+
+protected:
+
+private:
+  /**
+   * m_send_socket is used to send
+   * theSocket (in base class) is used for receive
+   */
+  NDB_SOCKET_TYPE m_send_socket;
+
+  /**
+   * overloads TCP_Transporter::send_is_possible
+   */
+  virtual bool send_is_possible(int timeout_millisec) const;
+};
+
+#endif // Define of TCP_Transporter_H

=== modified file 'storage/ndb/src/common/transporter/Makefile.am'
--- a/storage/ndb/src/common/transporter/Makefile.am	2010-08-06 08:19:19 +0000
+++ b/storage/ndb/src/common/transporter/Makefile.am	2010-12-15 06:07:35 +0000
@@ -21,6 +21,7 @@ noinst_LTLIBRARIES = libtransporter.la
 libtransporter_la_SOURCES = \
 	Transporter.cpp \
 	TCP_Transporter.cpp \
+	Loopback_Transporter.cpp \
 	TransporterRegistry.cpp \
         Packer.cpp
 

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2010-10-07 09:36:21 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp	2010-12-15 06:07:35 +0000
@@ -254,13 +254,19 @@ bool TCP_Transporter::setSocketNonBlocki
 bool
 TCP_Transporter::send_is_possible(int timeout_millisec) const
 {
+  return send_is_possible(theSocket, timeout_millisec);
+}
+
+bool
+TCP_Transporter::send_is_possible(NDB_SOCKET_TYPE fd,int timeout_millisec) const
+{
   ndb_socket_poller poller;
 
-  if (!my_socket_valid(theSocket))
+  if (!my_socket_valid(fd))
     return false;
 
   poller.clear();
-  poller.add(theSocket, false, true, false);
+  poller.add(fd, false, true, false);
 
   if (poller.poll_unsafe(timeout_millisec) <= 0)
     return false; // Timeout or error occured

=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2010-10-07 09:36:21 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp	2010-12-15 06:07:35 +0000
@@ -45,6 +45,7 @@ struct ReceiveBuffer {
 
 class TCP_Transporter : public Transporter {
   friend class TransporterRegistry;
+  friend class Loopback_Transporter;
 private:
   // Initialize member variables
   TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
@@ -128,6 +129,7 @@ private:
   virtual int pre_connect_options(NDB_SOCKET_TYPE aSocket);
   
   bool send_is_possible(int timeout_millisec) const;
+  bool send_is_possible(NDB_SOCKET_TYPE fd, int timeout_millisec) const;
 
   /**
    * Statistics

=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp	2010-06-01 12:19:50 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp	2010-12-15 06:07:35 +0000
@@ -48,7 +48,7 @@ public:
    * None blocking
    *    Use isConnected() to check status
    */
-  bool connect_client();
+  virtual bool connect_client();
   bool connect_client(NDB_SOCKET_TYPE sockfd);
   bool connect_server(NDB_SOCKET_TYPE socket);
 

=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2010-11-09 20:40:03 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp	2010-12-15 06:07:35 +0000
@@ -26,6 +26,7 @@
 
 #ifdef NDB_TCP_TRANSPORTER
 #include "TCP_Transporter.hpp"
+#include "Loopback_Transporter.hpp"
 #endif
 
 #ifdef NDB_SCI_TRANSPORTER
@@ -429,7 +430,15 @@ bool
 TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
 #ifdef NDB_TCP_TRANSPORTER
 
-  TCP_Transporter * t = new TCP_Transporter(*this, config);
+  TCP_Transporter * t = 0;
+  if (config->remoteNodeId == config->localNodeId)
+  {
+    t = new Loopback_Transporter(* this, config);
+  }
+  else
+  {
+    t = new TCP_Transporter(*this, config);
+  }
 
   if (t == NULL) 
     return false;

=== modified file 'storage/ndb/src/common/util/BaseString.cpp'
--- a/storage/ndb/src/common/util/BaseString.cpp	2010-11-28 11:34:01 +0000
+++ b/storage/ndb/src/common/util/BaseString.cpp	2010-12-14 12:53:32 +0000
@@ -54,6 +54,26 @@ BaseString::BaseString(const char* s)
     m_len = n;
 }
 
+BaseString::BaseString(const char * s, size_t n)
+{
+  if (s == NULL || n == 0)
+  {
+    m_chr = NULL;
+    m_len = 0;
+    return;
+  }
+  m_chr = new char[n + 1];
+  if (m_chr == NULL)
+  {
+    errno = ENOMEM;
+    m_len = 0;
+    return;
+  }
+  memcpy(m_chr, s, n);
+  m_chr[n] = 0;
+  m_len = n;
+}
+
 BaseString::BaseString(const BaseString& str)
 {
     const char* const s = str.m_chr;

=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2010-11-30 10:00:52 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp	2010-12-15 06:07:35 +0000
@@ -432,6 +432,14 @@ MgmtSrvr::start_transporter(const Config
     DBUG_RETURN(false);
   }
 
+  /**
+   * Wait for loopback interface to be enabled
+   */
+  while (!theFacade->isConnected(_ownNodeId))
+  {
+    NdbSleep_MilliSleep(20);
+  }
+
   _ownReference = numberToRef(_blockNumber, _ownNodeId);
 
   /*
@@ -768,6 +776,8 @@ MgmtSrvr::~MgmtSrvr()
     m_config_manager= 0;
   }
 
+  this->close(); // close trp_client before stopping TransporterFacade
+
   // Stop transporter
   if(theFacade != 0){
     theFacade->stop_instance();

=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp	2010-10-15 13:04:50 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp	2010-12-15 06:07:35 +0000
@@ -91,6 +91,7 @@ ClusterMgr::~ClusterMgr()
     delete theArbitMgr;
     theArbitMgr = 0;
   }
+  this->close(); // disconnect from TransporterFacade
   NdbCondition_Destroy(waitForHBCond);
   NdbMutex_Destroy(clusterMgrThreadMutex);
   DBUG_VOID_RETURN;
@@ -140,7 +141,7 @@ ClusterMgr::configure(Uint32 nodeId,
   }
 
   /* Init own node info */
-  Node &node= theNodes[theFacade.ownId()];
+  Node &node= theNodes[getOwnNodeId()];
   assert(node.defined);
   node.set_connected(true);
   node.set_confirmed(true);
@@ -159,7 +160,7 @@ ClusterMgr::configure(Uint32 nodeId,
   {
     // The arbitrator should be active
     if (!theArbitMgr)
-      theArbitMgr = new ArbitMgr(theFacade);
+      theArbitMgr = new ArbitMgr(* this);
     theArbitMgr->setRank(rank);
 
     Uint32 delay = 0;
@@ -190,9 +191,11 @@ ClusterMgr::startThread() {
 void
 ClusterMgr::doStop( ){
   DBUG_ENTER("ClusterMgr::doStop");
-  Guard g(clusterMgrThreadMutex);
-  if(theStop == 1){
-    DBUG_VOID_RETURN;
+  {
+    Guard g(clusterMgrThreadMutex);
+    if(theStop == 1){
+      DBUG_VOID_RETURN;
+    }
   }
 
   void *status;
@@ -294,11 +297,23 @@ ClusterMgr::threadMain( ){
   NDB_TICKS timeSlept = 100;
   NDB_TICKS now = NdbTick_CurrentMillisecond();
 
-  while(!theStop){
-
-    /* Sleep at least 100ms between each heartbet check */
+  while(!theStop)
+  {
+    /* Sleep at 100ms between each heartbeat check */
     NDB_TICKS before = now;
-    NdbSleep_MilliSleep(100);
+    for (Uint32 i = 0; i<10; i++)
+    {
+      NdbSleep_MilliSleep(10);
+      {
+        Guard g(clusterMgrThreadMutex);
+        /**
+         * Protect from ArbitMgr sending signals while we poll
+         */
+        start_poll();
+        do_poll(0);
+        complete_poll();
+      }
+    }
     now = NdbTick_CurrentMillisecond();
     timeSlept = (now - before);
 
@@ -327,6 +342,9 @@ ClusterMgr::threadMain( ){
       assert(nodeId > 0 && nodeId < MAX_NODES);
       Node & theNode = theNodes[nodeId];
       
+      if (nodeId == getOwnNodeId())
+        continue;
+
       if (!theNode.defined)
 	continue;
 
@@ -867,8 +885,8 @@ ClusterMgr::print_nodes(const char* wher
 /******************************************************************************
  * Arbitrator
  ******************************************************************************/
-ArbitMgr::ArbitMgr(TransporterFacade & _fac)
-  : theFacade(_fac)
+ArbitMgr::ArbitMgr(ClusterMgr & c)
+  : m_clusterMgr(c)
 {
   DBUG_ENTER("ArbitMgr::ArbitMgr");
 
@@ -1184,7 +1202,7 @@ ArbitMgr::sendStopRep(ArbitSignal& aSign
 void
 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
 {
-  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
+  NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
 
   signal.theVerId_signalNumber = aSignal.gsn;
   signal.theReceiversBlockNumber = QMGR;
@@ -1193,7 +1211,7 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal& 
 
   ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
 
-  sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId());
+  sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
   sd->code = aSignal.data.code;
   sd->node = aSignal.data.node;
   sd->ticket = aSignal.data.ticket;
@@ -1211,8 +1229,11 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal& 
   ndbout << endl;
 #endif
 
-  theFacade.lock_mutex();
-  theFacade.sendSignalUnCond(&signal, aSignal.data.sender);
-  theFacade.unlock_mutex();
+  {
+    Guard g(m_clusterMgr.clusterMgrThreadMutex);
+    m_clusterMgr.lock();
+    m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
+    m_clusterMgr.unlock();
+  }
 }
 

=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.hpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.hpp	2010-10-13 06:15:20 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp	2010-12-14 19:26:50 +0000
@@ -37,6 +37,7 @@ extern "C" void* runClusterMgr_C(void * 
 class ClusterMgr : public trp_client
 {
   friend class TransporterFacade;
+  friend class ArbitMgr;
   friend void* runClusterMgr_C(void * me);
 public:
   ClusterMgr(class TransporterFacade &);
@@ -182,7 +183,7 @@ extern "C" void* runArbitMgr_C(void* me)
 class ArbitMgr
 {
 public:
-  ArbitMgr(class TransporterFacade &);
+  ArbitMgr(class ClusterMgr &);
   ~ArbitMgr();
 
   inline void setRank(unsigned n) { theRank = n; }
@@ -195,7 +196,7 @@ public:
   friend void* runArbitMgr_C(void* me);
 
 private:
-  class TransporterFacade & theFacade;
+  class ClusterMgr & m_clusterMgr;
   unsigned theRank;
   unsigned theDelay;
 

=== modified file 'storage/ndb/src/ndbapi/SignalSender.cpp'
--- a/storage/ndb/src/ndbapi/SignalSender.cpp	2010-10-13 12:22:51 +0000
+++ b/storage/ndb/src/ndbapi/SignalSender.cpp	2010-12-15 06:07:35 +0000
@@ -191,11 +191,6 @@ SignalSender::sendFragmentedSignal(Uint1
                                    Uint32 len)
 {
   sig.set(*this, TestOrd::TraceAPI, recBlock, gsn, len);
-  if (nodeId == theFacade->ownId())
-  {
-    // No need to fragment when sending to own node
-    return sendSignal(nodeId, &sig);
-  }
 
   return theFacade->sendFragmentedSignal((NdbApiSignal*)&sig.header,
                                          nodeId,

=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp	2010-12-03 05:53:47 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp	2010-12-15 06:07:35 +0000
@@ -482,22 +482,12 @@ void TransporterFacade::threadMainReceiv
 #ifdef NDB_SHM_TRANSPORTER
   NdbThread_set_shm_sigmask(TRUE);
 #endif
-  NdbMutex_Lock(theMutexPtr);
-  theTransporterRegistry->update_connections();
-  NdbMutex_Unlock(theMutexPtr);
-  while(!theStopReceive) {
-    for(int i = 0; i<10; i++){
-      NdbSleep_MilliSleep(10);
-      NdbMutex_Lock(theMutexPtr);
-      if (m_poll_owner == NULL)
-      {
-        external_poll(0);
-      }
-      NdbMutex_Unlock(theMutexPtr);
-    }
+  while(!theStopReceive)
+  {
     NdbMutex_Lock(theMutexPtr);
     theTransporterRegistry->update_connections();
     NdbMutex_Unlock(theMutexPtr);
+    NdbSleep_MilliSleep(100);
   }//while
   theTransporterRegistry->stopReceiving();
 }
@@ -607,10 +597,18 @@ TransporterFacade::do_connect_mgm(NodeId
       doConnect(remoteNodeId);
     }
   }
+
+  /**
+   * Also setup Loopback Transporter
+   */
+  if (is_mgmd(nodeId, conf))
+  {
+    doConnect(nodeId);
+  }
+
   DBUG_RETURN(true);
 }
 
-
 bool
 TransporterFacade::configure(NodeId nodeId,
                              const ndb_mgm_configuration* conf)
@@ -624,7 +622,8 @@ TransporterFacade::configure(NodeId node
   // Configure transporters
   if (!IPCConfig::configureTransporters(nodeId,
                                         * conf,
-                                        * theTransporterRegistry))
+                                        * theTransporterRegistry,
+                                        is_mgmd(nodeId, conf)))
     DBUG_RETURN(false);
 
   // Configure cluster manager
@@ -1725,17 +1724,6 @@ SignalSender::sendSignal(Uint16 nodeId, 
   }
 #endif
 
-  if (nodeId == theFacade->ownId())
-  {
-    SignalHeader tmp= s->header;
-    tmp.theSendersBlockRef = getOwnRef();
-    theFacade->deliver_signal(&tmp,
-                              1, // JBB
-                              (Uint32*)&s->theData[0],
-                              (LinearSectionPtr*)&s->ptr[0]);
-    return SEND_OK;
-  }
-
   SendStatus ss = 
     theFacade->theTransporterRegistry->prepareSend(&s->header,
                                                    1, // JBB

=== modified file 'storage/ndb/src/ndbapi/trp_client.cpp'
--- a/storage/ndb/src/ndbapi/trp_client.cpp	2010-10-06 12:35:34 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.cpp	2010-12-14 09:10:45 +0000
@@ -47,6 +47,12 @@ trp_client::open(TransporterFacade* tf, 
   return res;
 }
 
+Uint32
+trp_client::getOwnNodeId() const
+{
+  return m_facade->theOwnId;
+}
+
 void
 trp_client::close()
 {

=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp	2010-12-03 05:53:47 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp	2010-12-14 09:10:45 +0000
@@ -61,6 +61,8 @@ public:
 
   void lock();
   void unlock();
+
+  Uint32 getOwnNodeId() const;
 private:
   Uint32 m_blockNo;
   TransporterFacade * m_facade;
@@ -112,8 +114,8 @@ inline
 void
 trp_client::lock()
 {
-  assert(m_poll.m_locked == false);
   NdbMutex_Lock(m_facade->theMutexPtr);
+  assert(m_poll.m_locked == false);
   m_poll.m_locked = true;
 }
 
@@ -122,8 +124,8 @@ void
 trp_client::unlock()
 {
   assert(m_poll.m_locked == true);
-  NdbMutex_Unlock(m_facade->theMutexPtr);
   m_poll.m_locked = false;
+  NdbMutex_Unlock(m_facade->theMutexPtr);
 }
 
 inline

No bundle (reason: revision is a merge).
Thread
bzr commit into mysql-5.1-telco-7.1 branch (jonas:4022) Jonas Oreland15 Dec