#At file:///home/jonas/src/telco-7.1/ based on revid:craig.russell@stripped
4022 Jonas Oreland 2010-12-15 [merge]
ndb - merge 70 to 71
added:
storage/ndb/src/common/transporter/Loopback_Transporter.cpp
storage/ndb/src/common/transporter/Loopback_Transporter.hpp
modified:
storage/ndb/include/mgmcommon/IPCConfig.hpp
storage/ndb/include/util/BaseString.hpp
storage/ndb/src/common/mgmcommon/IPCConfig.cpp
storage/ndb/src/common/transporter/CMakeLists.txt
storage/ndb/src/common/transporter/Makefile.am
storage/ndb/src/common/transporter/TCP_Transporter.cpp
storage/ndb/src/common/transporter/TCP_Transporter.hpp
storage/ndb/src/common/transporter/Transporter.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/common/util/BaseString.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/ndbapi/ClusterMgr.cpp
storage/ndb/src/ndbapi/ClusterMgr.hpp
storage/ndb/src/ndbapi/SignalSender.cpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/trp_client.cpp
storage/ndb/src/ndbapi/trp_client.hpp
=== modified file 'storage/ndb/include/mgmcommon/IPCConfig.hpp'
--- a/storage/ndb/include/mgmcommon/IPCConfig.hpp 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/include/mgmcommon/IPCConfig.hpp 2010-12-15 06:07:35 +0000
@@ -33,7 +33,8 @@ struct IPCConfig
*/
static bool configureTransporters(Uint32 nodeId,
const struct ndb_mgm_configuration &,
- class TransporterRegistry &);
+ class TransporterRegistry &,
+ bool transporter_to_self = false);
};
#endif // IPCConfig_H
=== modified file 'storage/ndb/include/util/BaseString.hpp'
--- a/storage/ndb/include/util/BaseString.hpp 2010-09-21 07:36:08 +0000
+++ b/storage/ndb/include/util/BaseString.hpp 2010-12-14 12:53:32 +0000
@@ -35,6 +35,9 @@ public:
/** @brief Constructs a copy of a char * */
BaseString(const char* s);
+ /** @brief Constructs a copy of a char * with length */
+ BaseString(const char* s, size_t len);
+
/** @brief Constructs a copy of another BaseString */
BaseString(const BaseString& str);
=== modified file 'storage/ndb/src/common/mgmcommon/IPCConfig.cpp'
--- a/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/common/mgmcommon/IPCConfig.cpp 2010-12-15 06:07:35 +0000
@@ -43,7 +43,8 @@ static bool is_mgmd(Uint32 nodeId,
bool
IPCConfig::configureTransporters(Uint32 nodeId,
const struct ndb_mgm_configuration & config,
- class TransporterRegistry & tr)
+ class TransporterRegistry & tr,
+ bool transporter_to_self)
{
bool result= true;
@@ -98,6 +99,7 @@ IPCConfig::configureTransporters(Uint32
}
TransporterConfiguration conf;
+ TransporterConfiguration loopback_conf;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
for(iter.first(); iter.valid(); iter.next()){
@@ -110,6 +112,11 @@ IPCConfig::configureTransporters(Uint32
if(nodeId1 != nodeId && nodeId2 != nodeId) continue;
remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
+ if (nodeId1 == nodeId && nodeId2 == nodeId)
+ {
+ transporter_to_self = false; // One already present..ignore extra arg
+ }
+
{
const char * host1= 0, * host2= 0;
iter.get(CFG_CONNECTION_HOSTNAME_1, &host1);
@@ -260,6 +267,7 @@ IPCConfig::configureTransporters(Uint32
DBUG_PRINT("info", ("Configured TCP Transporter: sendBufferSize = %d, "
"maxReceiveSize = %d", conf.tcp.sendBufferSize,
conf.tcp.maxReceiveSize));
+ loopback_conf = conf; // reuse it...
break;
default:
ndbout << "Unknown transporter type from: " << nodeId <<
@@ -268,6 +276,21 @@ IPCConfig::configureTransporters(Uint32
} // switch
} // for
+ if (transporter_to_self)
+ {
+ loopback_conf.remoteNodeId = nodeId;
+ loopback_conf.localNodeId = nodeId;
+ loopback_conf.serverNodeId = 0; // always client
+ loopback_conf.remoteHostName = "localhost";
+ loopback_conf.localHostName = "localhost";
+ loopback_conf.s_port = 1; // prevent asking ndb_mgmd for port...
+ if (!tr.configureTransporter(&loopback_conf))
+ {
+ ndbout_c("Failed to configure Loopback Transporter");
+ result= false;
+ }
+ }
+
DBUG_RETURN(result);
}
=== modified file 'storage/ndb/src/common/transporter/CMakeLists.txt'
--- a/storage/ndb/src/common/transporter/CMakeLists.txt 2008-08-21 05:05:59 +0000
+++ b/storage/ndb/src/common/transporter/CMakeLists.txt 2010-12-15 06:07:35 +0000
@@ -23,6 +23,6 @@ INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/
${NDB_SCI_INCLUDES})
ADD_LIBRARY(ndbtransport STATIC
- Transporter.cpp TCP_Transporter.cpp
+ Transporter.cpp TCP_Transporter.cpp Loopback_Transporter.cpp
TransporterRegistry.cpp Packer.cpp)
=== added file 'storage/ndb/src/common/transporter/Loopback_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/Loopback_Transporter.cpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/transporter/Loopback_Transporter.cpp 2010-12-15 06:07:35 +0000
@@ -0,0 +1,194 @@
+/*
+ Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#include <ndb_global.h>
+
+#include "Loopback_Transporter.hpp"
+#include <NdbOut.hpp>
+#include <NdbSleep.h>
+
+#include <EventLogger.hpp>
+extern EventLogger * g_eventLogger;
+// End of stuff to be moved
+
+
+Loopback_Transporter::Loopback_Transporter(TransporterRegistry &t_reg,
+ const TransporterConfiguration* conf)
+ : TCP_Transporter(t_reg, conf)
+{
+ assert(isServer == false);
+}
+
+Loopback_Transporter::~Loopback_Transporter()
+{
+}
+
+bool
+Loopback_Transporter::connect_client()
+{
+ NDB_SOCKET_TYPE pair[2];
+ if (my_socketpair(pair))
+ {
+ perror("socketpair failed!");
+ return false;
+ }
+
+ if (!TCP_Transporter::setSocketNonBlocking(pair[0]) ||
+ !TCP_Transporter::setSocketNonBlocking(pair[1]))
+ {
+ goto err;
+ }
+
+ theSocket = pair[0];
+ m_send_socket = pair[1];
+ m_connected = true;
+ return true;
+
+err:
+ my_socket_close(pair[0]);
+ my_socket_close(pair[1]);
+ return false;
+}
+
+void
+Loopback_Transporter::disconnectImpl()
+{
+ NDB_SOCKET_TYPE pair[] = { theSocket, m_send_socket };
+
+ get_callback_obj()->lock_transporter(remoteNodeId);
+
+ receiveBuffer.clear();
+ my_socket_invalidate(&theSocket);
+ my_socket_invalidate(&m_send_socket);
+
+ get_callback_obj()->unlock_transporter(remoteNodeId);
+
+ if (my_socket_valid(pair[0]))
+ my_socket_close(pair[0]);
+
+ if (my_socket_valid(pair[1]))
+ my_socket_close(pair[1]);
+}
+
+bool
+Loopback_Transporter::send_is_possible(int timeout_millisec) const
+{
+ return TCP_Transporter::send_is_possible(m_send_socket, timeout_millisec);
+}
+
+#define DISCONNECT_ERRNO(e, sz) ((sz == 0) || \
+ (!((sz == -1) && ((e == SOCKET_EAGAIN) || (e == SOCKET_EWOULDBLOCK) || (e == SOCKET_EINTR)))))
+
+int
+Loopback_Transporter::doSend() {
+ struct iovec iov[64];
+ Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
+
+ if (cnt == 0)
+ {
+ return 0;
+ }
+
+ Uint32 sum = 0;
+ for(Uint32 i = 0; i<cnt; i++)
+ {
+ assert(iov[i].iov_len);
+ sum += iov[i].iov_len;
+ }
+
+ Uint32 pos = 0;
+ Uint32 sum_sent = 0;
+ Uint32 send_cnt = 0;
+ Uint32 remain = sum;
+
+ if (cnt == NDB_ARRAY_SIZE(iov))
+ {
+ // If pulling all iov's make sure that we never return everyting
+ // flushed
+ sum++;
+ }
+
+ while (send_cnt < 5)
+ {
+ send_cnt++;
+ Uint32 iovcnt = cnt > m_os_max_iovec ? m_os_max_iovec : cnt;
+ int nBytesSent = (int)my_socket_writev(m_send_socket, iov+pos, iovcnt);
+ assert(nBytesSent <= (int)remain);
+
+ if (Uint32(nBytesSent) == remain)
+ {
+ sum_sent += nBytesSent;
+ goto ok;
+ }
+ else if (nBytesSent > 0)
+ {
+ sum_sent += nBytesSent;
+ remain -= nBytesSent;
+
+ /**
+ * Forward in iovec
+ */
+ while (Uint32(nBytesSent) >= iov[pos].iov_len)
+ {
+ assert(iov[pos].iov_len > 0);
+ nBytesSent -= iov[pos].iov_len;
+ pos++;
+ cnt--;
+ }
+
+ if (nBytesSent)
+ {
+ assert(iov[pos].iov_len > Uint32(nBytesSent));
+ iov[pos].iov_len -= nBytesSent;
+ iov[pos].iov_base = ((char*)(iov[pos].iov_base))+nBytesSent;
+ }
+ continue;
+ }
+ else
+ {
+ int err = my_socket_errno();
+ if (!(DISCONNECT_ERRNO(err, nBytesSent)))
+ {
+ if (sum_sent)
+ {
+ goto ok;
+ }
+ else
+ {
+ return remain;
+ }
+ }
+
+ do_disconnect(err);
+ return 0;
+ }
+ }
+
+ok:
+ assert(sum >= sum_sent);
+ iovec_data_sent(sum_sent);
+ sendCount += send_cnt;
+ sendSize += sum_sent;
+ if(sendCount >= reportFreq)
+ {
+ get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
+ sendCount = 0;
+ sendSize = 0;
+ }
+
+ return sum - sum_sent; // 0 if every thing flushed else >0
+}
=== added file 'storage/ndb/src/common/transporter/Loopback_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Loopback_Transporter.hpp 1970-01-01 00:00:00 +0000
+++ b/storage/ndb/src/common/transporter/Loopback_Transporter.hpp 2010-12-15 06:07:35 +0000
@@ -0,0 +1,70 @@
+/*
+ Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; version 2 of the License.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+*/
+
+#ifndef LOOPBACK_TRANSPORTER_HPP
+#define LOOPBACK_TRANSPORTER_HPP
+
+#include "TCP_Transporter.hpp"
+
+/**
+ * This implements a connection to self,
+ * by using a socketpair...
+ * where theSocket is the receive part, and m_send_socket is the write part
+ */
+class Loopback_Transporter : public TCP_Transporter
+{
+ friend class TransporterRegistry;
+private:
+ // Initialize member variables
+ Loopback_Transporter(TransporterRegistry&,
+ const TransporterConfiguration* conf);
+
+ // Disconnect, delete send buffers and receive buffer
+ virtual ~Loopback_Transporter();
+
+ /**
+ * overloads TCP_Transporter::doSend
+ */
+ virtual int doSend();
+
+ /**
+ * setup socket pair
+ * @overload Transporter::connect_client()
+ */
+ virtual bool connect_client();
+
+ /**
+ * @overload TCP_Transporter::disconnectImpl
+ */
+ virtual void disconnectImpl();
+
+protected:
+
+private:
+ /**
+ * m_send_socket is used to send
+ * theSocket (in base class) is used for receive
+ */
+ NDB_SOCKET_TYPE m_send_socket;
+
+ /**
+ * overloads TCP_Transporter::send_is_possible
+ */
+ virtual bool send_is_possible(int timeout_millisec) const;
+};
+
+#endif // Define of TCP_Transporter_H
=== modified file 'storage/ndb/src/common/transporter/Makefile.am'
--- a/storage/ndb/src/common/transporter/Makefile.am 2010-08-06 08:19:19 +0000
+++ b/storage/ndb/src/common/transporter/Makefile.am 2010-12-15 06:07:35 +0000
@@ -21,6 +21,7 @@ noinst_LTLIBRARIES = libtransporter.la
libtransporter_la_SOURCES = \
Transporter.cpp \
TCP_Transporter.cpp \
+ Loopback_Transporter.cpp \
TransporterRegistry.cpp \
Packer.cpp
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.cpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2010-10-07 09:36:21 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.cpp 2010-12-15 06:07:35 +0000
@@ -254,13 +254,19 @@ bool TCP_Transporter::setSocketNonBlocki
bool
TCP_Transporter::send_is_possible(int timeout_millisec) const
{
+ return send_is_possible(theSocket, timeout_millisec);
+}
+
+bool
+TCP_Transporter::send_is_possible(NDB_SOCKET_TYPE fd,int timeout_millisec) const
+{
ndb_socket_poller poller;
- if (!my_socket_valid(theSocket))
+ if (!my_socket_valid(fd))
return false;
poller.clear();
- poller.add(theSocket, false, true, false);
+ poller.add(fd, false, true, false);
if (poller.poll_unsafe(timeout_millisec) <= 0)
return false; // Timeout or error occured
=== modified file 'storage/ndb/src/common/transporter/TCP_Transporter.hpp'
--- a/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2010-10-07 09:36:21 +0000
+++ b/storage/ndb/src/common/transporter/TCP_Transporter.hpp 2010-12-15 06:07:35 +0000
@@ -45,6 +45,7 @@ struct ReceiveBuffer {
class TCP_Transporter : public Transporter {
friend class TransporterRegistry;
+ friend class Loopback_Transporter;
private:
// Initialize member variables
TCP_Transporter(TransporterRegistry&, const TransporterConfiguration* conf);
@@ -128,6 +129,7 @@ private:
virtual int pre_connect_options(NDB_SOCKET_TYPE aSocket);
bool send_is_possible(int timeout_millisec) const;
+ bool send_is_possible(NDB_SOCKET_TYPE fd, int timeout_millisec) const;
/**
* Statistics
=== modified file 'storage/ndb/src/common/transporter/Transporter.hpp'
--- a/storage/ndb/src/common/transporter/Transporter.hpp 2010-06-01 12:19:50 +0000
+++ b/storage/ndb/src/common/transporter/Transporter.hpp 2010-12-15 06:07:35 +0000
@@ -48,7 +48,7 @@ public:
* None blocking
* Use isConnected() to check status
*/
- bool connect_client();
+ virtual bool connect_client();
bool connect_client(NDB_SOCKET_TYPE sockfd);
bool connect_server(NDB_SOCKET_TYPE socket);
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2010-11-09 20:40:03 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2010-12-15 06:07:35 +0000
@@ -26,6 +26,7 @@
#ifdef NDB_TCP_TRANSPORTER
#include "TCP_Transporter.hpp"
+#include "Loopback_Transporter.hpp"
#endif
#ifdef NDB_SCI_TRANSPORTER
@@ -429,7 +430,15 @@ bool
TransporterRegistry::createTCPTransporter(TransporterConfiguration *config) {
#ifdef NDB_TCP_TRANSPORTER
- TCP_Transporter * t = new TCP_Transporter(*this, config);
+ TCP_Transporter * t = 0;
+ if (config->remoteNodeId == config->localNodeId)
+ {
+ t = new Loopback_Transporter(* this, config);
+ }
+ else
+ {
+ t = new TCP_Transporter(*this, config);
+ }
if (t == NULL)
return false;
=== modified file 'storage/ndb/src/common/util/BaseString.cpp'
--- a/storage/ndb/src/common/util/BaseString.cpp 2010-11-28 11:34:01 +0000
+++ b/storage/ndb/src/common/util/BaseString.cpp 2010-12-14 12:53:32 +0000
@@ -54,6 +54,26 @@ BaseString::BaseString(const char* s)
m_len = n;
}
+BaseString::BaseString(const char * s, size_t n)
+{
+ if (s == NULL || n == 0)
+ {
+ m_chr = NULL;
+ m_len = 0;
+ return;
+ }
+ m_chr = new char[n + 1];
+ if (m_chr == NULL)
+ {
+ errno = ENOMEM;
+ m_len = 0;
+ return;
+ }
+ memcpy(m_chr, s, n);
+ m_chr[n] = 0;
+ m_len = n;
+}
+
BaseString::BaseString(const BaseString& str)
{
const char* const s = str.m_chr;
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2010-11-30 10:00:52 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2010-12-15 06:07:35 +0000
@@ -432,6 +432,14 @@ MgmtSrvr::start_transporter(const Config
DBUG_RETURN(false);
}
+ /**
+ * Wait for loopback interface to be enabled
+ */
+ while (!theFacade->isConnected(_ownNodeId))
+ {
+ NdbSleep_MilliSleep(20);
+ }
+
_ownReference = numberToRef(_blockNumber, _ownNodeId);
/*
@@ -768,6 +776,8 @@ MgmtSrvr::~MgmtSrvr()
m_config_manager= 0;
}
+ this->close(); // close trp_client before stopping TransporterFacade
+
// Stop transporter
if(theFacade != 0){
theFacade->stop_instance();
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.cpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.cpp 2010-10-15 13:04:50 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.cpp 2010-12-15 06:07:35 +0000
@@ -91,6 +91,7 @@ ClusterMgr::~ClusterMgr()
delete theArbitMgr;
theArbitMgr = 0;
}
+ this->close(); // disconnect from TransporterFacade
NdbCondition_Destroy(waitForHBCond);
NdbMutex_Destroy(clusterMgrThreadMutex);
DBUG_VOID_RETURN;
@@ -140,7 +141,7 @@ ClusterMgr::configure(Uint32 nodeId,
}
/* Init own node info */
- Node &node= theNodes[theFacade.ownId()];
+ Node &node= theNodes[getOwnNodeId()];
assert(node.defined);
node.set_connected(true);
node.set_confirmed(true);
@@ -159,7 +160,7 @@ ClusterMgr::configure(Uint32 nodeId,
{
// The arbitrator should be active
if (!theArbitMgr)
- theArbitMgr = new ArbitMgr(theFacade);
+ theArbitMgr = new ArbitMgr(* this);
theArbitMgr->setRank(rank);
Uint32 delay = 0;
@@ -190,9 +191,11 @@ ClusterMgr::startThread() {
void
ClusterMgr::doStop( ){
DBUG_ENTER("ClusterMgr::doStop");
- Guard g(clusterMgrThreadMutex);
- if(theStop == 1){
- DBUG_VOID_RETURN;
+ {
+ Guard g(clusterMgrThreadMutex);
+ if(theStop == 1){
+ DBUG_VOID_RETURN;
+ }
}
void *status;
@@ -294,11 +297,23 @@ ClusterMgr::threadMain( ){
NDB_TICKS timeSlept = 100;
NDB_TICKS now = NdbTick_CurrentMillisecond();
- while(!theStop){
-
- /* Sleep at least 100ms between each heartbet check */
+ while(!theStop)
+ {
+ /* Sleep at 100ms between each heartbeat check */
NDB_TICKS before = now;
- NdbSleep_MilliSleep(100);
+ for (Uint32 i = 0; i<10; i++)
+ {
+ NdbSleep_MilliSleep(10);
+ {
+ Guard g(clusterMgrThreadMutex);
+ /**
+ * Protect from ArbitMgr sending signals while we poll
+ */
+ start_poll();
+ do_poll(0);
+ complete_poll();
+ }
+ }
now = NdbTick_CurrentMillisecond();
timeSlept = (now - before);
@@ -327,6 +342,9 @@ ClusterMgr::threadMain( ){
assert(nodeId > 0 && nodeId < MAX_NODES);
Node & theNode = theNodes[nodeId];
+ if (nodeId == getOwnNodeId())
+ continue;
+
if (!theNode.defined)
continue;
@@ -867,8 +885,8 @@ ClusterMgr::print_nodes(const char* wher
/******************************************************************************
* Arbitrator
******************************************************************************/
-ArbitMgr::ArbitMgr(TransporterFacade & _fac)
- : theFacade(_fac)
+ArbitMgr::ArbitMgr(ClusterMgr & c)
+ : m_clusterMgr(c)
{
DBUG_ENTER("ArbitMgr::ArbitMgr");
@@ -1184,7 +1202,7 @@ ArbitMgr::sendStopRep(ArbitSignal& aSign
void
ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
{
- NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
+ NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
signal.theVerId_signalNumber = aSignal.gsn;
signal.theReceiversBlockNumber = QMGR;
@@ -1193,7 +1211,7 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal&
ArbitSignalData* sd = CAST_PTR(ArbitSignalData, signal.getDataPtrSend());
- sd->sender = numberToRef(API_CLUSTERMGR, theFacade.ownId());
+ sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
sd->code = aSignal.data.code;
sd->node = aSignal.data.node;
sd->ticket = aSignal.data.ticket;
@@ -1211,8 +1229,11 @@ ArbitMgr::sendSignalToQmgr(ArbitSignal&
ndbout << endl;
#endif
- theFacade.lock_mutex();
- theFacade.sendSignalUnCond(&signal, aSignal.data.sender);
- theFacade.unlock_mutex();
+ {
+ Guard g(m_clusterMgr.clusterMgrThreadMutex);
+ m_clusterMgr.lock();
+ m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
+ m_clusterMgr.unlock();
+ }
}
=== modified file 'storage/ndb/src/ndbapi/ClusterMgr.hpp'
--- a/storage/ndb/src/ndbapi/ClusterMgr.hpp 2010-10-13 06:15:20 +0000
+++ b/storage/ndb/src/ndbapi/ClusterMgr.hpp 2010-12-14 19:26:50 +0000
@@ -37,6 +37,7 @@ extern "C" void* runClusterMgr_C(void *
class ClusterMgr : public trp_client
{
friend class TransporterFacade;
+ friend class ArbitMgr;
friend void* runClusterMgr_C(void * me);
public:
ClusterMgr(class TransporterFacade &);
@@ -182,7 +183,7 @@ extern "C" void* runArbitMgr_C(void* me)
class ArbitMgr
{
public:
- ArbitMgr(class TransporterFacade &);
+ ArbitMgr(class ClusterMgr &);
~ArbitMgr();
inline void setRank(unsigned n) { theRank = n; }
@@ -195,7 +196,7 @@ public:
friend void* runArbitMgr_C(void* me);
private:
- class TransporterFacade & theFacade;
+ class ClusterMgr & m_clusterMgr;
unsigned theRank;
unsigned theDelay;
=== modified file 'storage/ndb/src/ndbapi/SignalSender.cpp'
--- a/storage/ndb/src/ndbapi/SignalSender.cpp 2010-10-13 12:22:51 +0000
+++ b/storage/ndb/src/ndbapi/SignalSender.cpp 2010-12-15 06:07:35 +0000
@@ -191,11 +191,6 @@ SignalSender::sendFragmentedSignal(Uint1
Uint32 len)
{
sig.set(*this, TestOrd::TraceAPI, recBlock, gsn, len);
- if (nodeId == theFacade->ownId())
- {
- // No need to fragment when sending to own node
- return sendSignal(nodeId, &sig);
- }
return theFacade->sendFragmentedSignal((NdbApiSignal*)&sig.header,
nodeId,
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2010-12-03 05:53:47 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2010-12-15 06:07:35 +0000
@@ -482,22 +482,12 @@ void TransporterFacade::threadMainReceiv
#ifdef NDB_SHM_TRANSPORTER
NdbThread_set_shm_sigmask(TRUE);
#endif
- NdbMutex_Lock(theMutexPtr);
- theTransporterRegistry->update_connections();
- NdbMutex_Unlock(theMutexPtr);
- while(!theStopReceive) {
- for(int i = 0; i<10; i++){
- NdbSleep_MilliSleep(10);
- NdbMutex_Lock(theMutexPtr);
- if (m_poll_owner == NULL)
- {
- external_poll(0);
- }
- NdbMutex_Unlock(theMutexPtr);
- }
+ while(!theStopReceive)
+ {
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr);
+ NdbSleep_MilliSleep(100);
}//while
theTransporterRegistry->stopReceiving();
}
@@ -607,10 +597,18 @@ TransporterFacade::do_connect_mgm(NodeId
doConnect(remoteNodeId);
}
}
+
+ /**
+ * Also setup Loopback Transporter
+ */
+ if (is_mgmd(nodeId, conf))
+ {
+ doConnect(nodeId);
+ }
+
DBUG_RETURN(true);
}
-
bool
TransporterFacade::configure(NodeId nodeId,
const ndb_mgm_configuration* conf)
@@ -624,7 +622,8 @@ TransporterFacade::configure(NodeId node
// Configure transporters
if (!IPCConfig::configureTransporters(nodeId,
* conf,
- * theTransporterRegistry))
+ * theTransporterRegistry,
+ is_mgmd(nodeId, conf)))
DBUG_RETURN(false);
// Configure cluster manager
@@ -1725,17 +1724,6 @@ SignalSender::sendSignal(Uint16 nodeId,
}
#endif
- if (nodeId == theFacade->ownId())
- {
- SignalHeader tmp= s->header;
- tmp.theSendersBlockRef = getOwnRef();
- theFacade->deliver_signal(&tmp,
- 1, // JBB
- (Uint32*)&s->theData[0],
- (LinearSectionPtr*)&s->ptr[0]);
- return SEND_OK;
- }
-
SendStatus ss =
theFacade->theTransporterRegistry->prepareSend(&s->header,
1, // JBB
=== modified file 'storage/ndb/src/ndbapi/trp_client.cpp'
--- a/storage/ndb/src/ndbapi/trp_client.cpp 2010-10-06 12:35:34 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.cpp 2010-12-14 09:10:45 +0000
@@ -47,6 +47,12 @@ trp_client::open(TransporterFacade* tf,
return res;
}
+Uint32
+trp_client::getOwnNodeId() const
+{
+ return m_facade->theOwnId;
+}
+
void
trp_client::close()
{
=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp 2010-12-03 05:53:47 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp 2010-12-14 09:10:45 +0000
@@ -61,6 +61,8 @@ public:
void lock();
void unlock();
+
+ Uint32 getOwnNodeId() const;
private:
Uint32 m_blockNo;
TransporterFacade * m_facade;
@@ -112,8 +114,8 @@ inline
void
trp_client::lock()
{
- assert(m_poll.m_locked == false);
NdbMutex_Lock(m_facade->theMutexPtr);
+ assert(m_poll.m_locked == false);
m_poll.m_locked = true;
}
@@ -122,8 +124,8 @@ void
trp_client::unlock()
{
assert(m_poll.m_locked == true);
- NdbMutex_Unlock(m_facade->theMutexPtr);
m_poll.m_locked = false;
+ NdbMutex_Unlock(m_facade->theMutexPtr);
}
inline
No bundle (reason: revision is a merge).
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.1 branch (jonas:4022) | Jonas Oreland | 15 Dec |