3814 Jonas Oreland 2010-10-01
ndb - remove ExecuteFunction/NodeStatusFunction and replace with virtual class instead
modified:
storage/ndb/src/mgmsrv/MgmtSrvr.cpp
storage/ndb/src/mgmsrv/MgmtSrvr.hpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
storage/ndb/src/ndbapi/NdbImpl.hpp
storage/ndb/src/ndbapi/Ndbif.cpp
storage/ndb/src/ndbapi/SignalSender.cpp
storage/ndb/src/ndbapi/SignalSender.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
3813 Jonas Oreland 2010-09-30
ndb - compiler warnings
modified:
storage/ndb/test/ndbapi/testRestartGci.cpp
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.cpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.cpp 2010-10-01 07:38:17 +0000
@@ -408,9 +408,7 @@ MgmtSrvr::start_transporter(const Config
Register ourself at TransporterFacade to be able to receive signals
and to be notified when a database process has died.
*/
- if ((_blockNumber= theFacade->open(this,
- signalReceivedNotification,
- nodeStatusNotification)) == -1)
+ if ((_blockNumber= theFacade->open(this)) == -1)
{
g_eventLogger->error("Failed to open block in TransporterFacade");
theFacade->stop_instance();
@@ -2457,7 +2455,8 @@ const char* MgmtSrvr::getErrorText(int e
void
-MgmtSrvr::handleReceivedSignal(const NdbApiSignal* signal)
+MgmtSrvr::trp_deliver_signal(const NdbApiSignal* signal,
+ const LinearSectionPtr ptr[3])
{
int gsn = signal->readSignalNumber();
@@ -2492,16 +2491,7 @@ MgmtSrvr::handleReceivedSignal(const Ndb
void
-MgmtSrvr::signalReceivedNotification(void* mgmtSrvr,
- const NdbApiSignal* signal,
- const LinearSectionPtr ptr[3])
-{
- ((MgmtSrvr*)mgmtSrvr)->handleReceivedSignal(signal);
-}
-
-
-void
-MgmtSrvr::handleStatus(NodeId nodeId, bool alive, bool nfComplete)
+MgmtSrvr::trp_node_status(Uint32 nodeId, bool alive, bool nfComplete)
{
DBUG_ENTER("MgmtSrvr::handleStatus");
DBUG_PRINT("enter",("nodeid: %d, alive: %d, nfComplete: %d",
@@ -2532,15 +2522,6 @@ MgmtSrvr::handleStatus(NodeId nodeId, bo
DBUG_VOID_RETURN;
}
-
-void
-MgmtSrvr::nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
- bool alive, bool nfComplete)
-{
- ((MgmtSrvr*)mgmSrv)->handleStatus(nodeId, alive, nfComplete);
-}
-
-
enum ndb_mgm_node_type
MgmtSrvr::getNodeType(NodeId nodeId) const
{
=== modified file 'storage/ndb/src/mgmsrv/MgmtSrvr.hpp'
--- a/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/mgmsrv/MgmtSrvr.hpp 2010-10-01 07:38:17 +0000
@@ -73,7 +73,7 @@ public:
@class MgmtSrvr
@brief Main class for the management server.
*/
-class MgmtSrvr : private ConfigSubscriber {
+class MgmtSrvr : private ConfigSubscriber, public trp_client {
public:
// some compilers need all of this
@@ -469,30 +469,12 @@ private:
NodeBitmask m_reserved_nodes;
struct in_addr m_connect_address[MAX_NODES];
- void handleReceivedSignal(const NdbApiSignal* signal);
- void handleStatus(NodeId nodeId, bool alive, bool nfComplete);
-
/**
- Callback function installed into TransporterFacade, will be called
- once for each new signal received to the MgmtSrvr.
- @param mgmtSrvr: The MgmtSrvr object which shall recieve the signal.
- @param signal: The received signal.
- @param ptr: The long part(s) of the signal
- */
- static void signalReceivedNotification(void* mgmtSrvr,
- const NdbApiSignal* signal,
- const struct LinearSectionPtr ptr[3]);
-
- /**
- Callback function installed into TransporterFacade, will be called
- when status of a node changes.
- @param mgmtSrvr: The MgmtSrvr object which shall receive
- the notification.
- @param processId: Id of the node whose status changed.
- @param alive: true if the other node is alive
+ * trp_client interface
*/
- static void nodeStatusNotification(void* mgmSrv, Uint32 nodeId,
- bool alive, bool nfCompleted);
+ virtual void trp_deliver_signal(const NdbApiSignal* signal,
+ const struct LinearSectionPtr ptr[3]);
+ virtual void trp_node_status(Uint32 nodeId, bool alive, bool nfCompleted);
/**
* An event from <i>nodeId</i> has arrived
=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2010-09-30 14:27:18 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2010-10-01 07:38:17 +0000
@@ -713,6 +713,7 @@ private:
class TransporterFacade * m_transporter;
friend class Ndb;
+ friend class NdbImpl;
friend class NdbDictionaryImpl;
static void execSignal(void* dictImpl,
const class NdbApiSignal* signal,
=== modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbImpl.hpp 2010-09-30 10:36:47 +0000
+++ b/storage/ndb/src/ndbapi/NdbImpl.hpp 2010-10-01 07:38:17 +0000
@@ -29,6 +29,7 @@
#include "ndb_cluster_connection_impl.hpp"
#include "NdbDictionaryImpl.hpp"
#include "ObjectMap.hpp"
+#include "trp_client.hpp"
template <class T>
struct Ndb_free_list_t
@@ -49,7 +50,8 @@ struct Ndb_free_list_t
/**
* Private parts of the Ndb object (corresponding to Ndb.hpp in public API)
*/
-class NdbImpl {
+class NdbImpl : public trp_client
+{
public:
NdbImpl(Ndb_cluster_connection *, Ndb&);
~NdbImpl();
@@ -142,6 +144,13 @@ public:
Ndb_free_list_t<NdbOperation> theOpIdleList;
Ndb_free_list_t<NdbIndexOperation> theIndexOpIdleList;
Ndb_free_list_t<NdbTransaction> theConIdleList;
+
+ /**
+ * trp_client interface
+ */
+ virtual void trp_deliver_signal(const NdbApiSignal*,
+ const LinearSectionPtr p[3]);
+ virtual void trp_node_status(Uint32, bool nodeAlive, bool nfComplete);
};
#ifdef VM_TRACE
=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp 2010-09-30 14:27:18 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2010-10-01 07:38:17 +0000
@@ -73,9 +73,8 @@ Ndb::init(int aMaxNoOfTransactions)
TransporterFacade * theFacade = theImpl->m_transporter_facade;
theFacade->lock_mutex();
- const int tBlockNo = theFacade->open(this,
- executeMessage,
- statusMessage);
+ const int tBlockNo = theFacade->open(theImpl);
+
if ( tBlockNo == -1 ) {
theError.code = 4105;
theFacade->unlock_mutex();
@@ -162,12 +161,10 @@ Ndb::releaseTransactionArrays()
}//Ndb::releaseTransactionArrays()
void
-Ndb::executeMessage(void* NdbObject,
- const NdbApiSignal * aSignal,
- const LinearSectionPtr ptr[3])
+NdbImpl::trp_deliver_signal(const NdbApiSignal * aSignal,
+ const LinearSectionPtr ptr[3])
{
- Ndb* tNdb = (Ndb*)NdbObject;
- tNdb->handleReceivedSignal(aSignal, ptr);
+ m_ndb.handleReceivedSignal(aSignal, ptr);
}
void Ndb::connected(Uint32 ref)
@@ -215,12 +212,12 @@ void Ndb::report_node_connected(Uint32 n
}
void
-Ndb::statusMessage(void* NdbObject, Uint32 a_node, bool alive, bool nfComplete)
+NdbImpl::trp_node_status(Uint32 a_node, bool alive, bool nfComplete)
{
DBUG_ENTER("Ndb::statusMessage");
DBUG_PRINT("info", ("a_node: %u alive: %u nfComplete: %u",
a_node, alive, nfComplete));
- Ndb* tNdb = (Ndb*)NdbObject;
+ Ndb* tNdb = (Ndb*)&m_ndb;
if (alive) {
if (nfComplete) {
// cluster connect, a_node == own reference
=== modified file 'storage/ndb/src/ndbapi/SignalSender.cpp'
--- a/storage/ndb/src/ndbapi/SignalSender.cpp 2010-09-30 11:52:49 +0000
+++ b/storage/ndb/src/ndbapi/SignalSender.cpp 2010-10-01 07:38:17 +0000
@@ -81,7 +81,7 @@ SignalSender::SignalSender(TransporterFa
m_cond = NdbCondition_Create();
theFacade = facade;
lock();
- m_blockNo = theFacade->open(this, execSignal, execNodeStatus, blockNo);
+ m_blockNo = theFacade->open(this, blockNo);
unlock();
assert(m_blockNo > 0);
}
@@ -91,7 +91,7 @@ SignalSender::SignalSender(Ndb_cluster_c
m_cond = NdbCondition_Create();
theFacade = connection->m_impl.m_transporter_facade;
lock();
- m_blockNo = theFacade->open(this, execSignal, execNodeStatus, -1);
+ m_blockNo = theFacade->open(this, -1);
unlock();
assert(m_blockNo > 0);
}
@@ -257,9 +257,9 @@ SignalSender::waitFor(Uint32 timeOutMill
#include <NdbApiSignal.hpp>
void
-SignalSender::execSignal(void* signalSender,
- const NdbApiSignal* signal,
- const struct LinearSectionPtr ptr[3]){
+SignalSender::trp_deliver_signal(const NdbApiSignal* signal,
+ const struct LinearSectionPtr ptr[3])
+{
SimpleSignal * s = new SimpleSignal(true);
s->header = * signal;
memcpy(&s->theData[0], signal->getDataPtr(), 4 * s->header.theLength);
@@ -268,23 +268,20 @@ SignalSender::execSignal(void* signalSen
s->ptr[i].sz = ptr[i].sz;
memcpy(s->ptr[i].p, ptr[i].p, 4 * ptr[i].sz);
}
- SignalSender * ss = (SignalSender*)signalSender;
- ss->m_jobBuffer.push_back(s);
- NdbCondition_Signal(ss->m_cond);
+ m_jobBuffer.push_back(s);
+ NdbCondition_Signal(m_cond);
}
void
-SignalSender::execNodeStatus(void* signalSender,
- Uint32 nodeId,
- bool alive,
- bool nfCompleted){
+SignalSender::trp_node_status(Uint32 nodeId,
+ bool alive,
+ bool nfCompleted){
if (alive) {
// node connected
return;
}
SimpleSignal * s = new SimpleSignal(true);
- SignalSender * ss = (SignalSender*)signalSender;
// node disconnected
if(nfCompleted)
@@ -309,13 +306,13 @@ SignalSender::execNodeStatus(void* signa
NdbNodeBitmask::clear(rep->theNodes);
// Mark ndb nodes as failed in bitmask
- const ClusterMgr::Node node= ss->getNodeInfo(nodeId);
+ const ClusterMgr::Node node= getNodeInfo(nodeId);
if (node.m_info.getType() == NodeInfo::DB)
NdbNodeBitmask::set(rep->theNodes, nodeId);
}
- ss->m_jobBuffer.push_back(s);
- NdbCondition_Signal(ss->m_cond);
+ m_jobBuffer.push_back(s);
+ NdbCondition_Signal(m_cond);
}
=== modified file 'storage/ndb/src/ndbapi/SignalSender.hpp'
--- a/storage/ndb/src/ndbapi/SignalSender.hpp 2010-09-29 13:25:19 +0000
+++ b/storage/ndb/src/ndbapi/SignalSender.hpp 2010-10-01 07:38:17 +0000
@@ -21,6 +21,7 @@
#include <ndb_global.h>
#include "TransporterFacade.hpp"
+#include "trp_client.hpp"
#include <Vector.hpp>
#include <signaldata/TestOrd.hpp>
@@ -84,7 +85,7 @@ private:
bool deallocSections;
};
-class SignalSender {
+class SignalSender : public trp_client {
public:
SignalSender(TransporterFacade *facade, int blockNo = -1);
SignalSender(Ndb_cluster_connection* connection);
@@ -119,13 +120,16 @@ private:
int m_blockNo;
TransporterFacade * theFacade;
- static void execSignal(void* signalSender,
- const NdbApiSignal* signal,
- const struct LinearSectionPtr ptr[3]);
+public:
+ /**
+ * trp_client interface
+ */
+ virtual void trp_deliver_signal(const NdbApiSignal* signal,
+ const struct LinearSectionPtr ptr[3]);
- static void execNodeStatus(void* signalSender, Uint32 nodeId,
- bool alive, bool nfCompleted);
+ virtual void trp_node_status(Uint32 nodeId, bool alive, bool nfCompleted);
+private:
int m_lock;
struct NdbCondition * m_cond;
Vector<SimpleSignal *> m_jobBuffer;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2010-09-30 11:52:49 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2010-10-01 07:38:17 +0000
@@ -17,8 +17,8 @@
*/
#include <ndb_global.h>
-#include <my_pthread.h>
#include <ndb_limits.h>
+#include "trp_client.hpp"
#include "TransporterFacade.hpp"
#include "ClusterMgr.hpp"
#include <IPCConfig.hpp>
@@ -211,24 +211,24 @@ TransporterFacade::deliver_signal(Signal
Uint8 prio, Uint32 * const theData,
LinearSectionPtr ptr[3])
{
-
- TransporterFacade::ThreadData::Object_Execute oe;
Uint32 tRecBlockNo = header->theReceiversBlockNumber;
#ifdef API_TRACE
if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
signalLogger.executeSignal(* header,
- prio,
+ prio,
theData,
- ownId(),
+ ownId(),
ptr, header->m_noOfSections);
signalLogger.flushSignalLog();
}
#endif
- if (tRecBlockNo >= MIN_API_BLOCK_NO) {
- oe = m_threads.get(tRecBlockNo);
- if (oe.m_object != 0 && oe.m_executeFunction != 0) {
+ if (tRecBlockNo >= MIN_API_BLOCK_NO)
+ {
+ trp_client * clnt = m_threads.get(tRecBlockNo);
+ if (clnt != 0)
+ {
/**
* Handle received signal immediately to avoid any unnecessary
* copying of data, allocation of memory and other things. Copying
@@ -243,9 +243,11 @@ TransporterFacade::deliver_signal(Signal
NdbApiSignal tmpSignal(*header);
NdbApiSignal * tSignal = &tmpSignal;
tSignal->setDataPtr(theData);
- (* oe.m_executeFunction) (oe.m_object, tSignal, ptr);
+ clnt->trp_deliver_signal(tSignal, ptr);
}//if
- } else if (tRecBlockNo == API_PACKED) {
+ }
+ else if (tRecBlockNo == API_PACKED)
+ {
/**
* Block number == 2047 is used to signal a signal that consists of
* multiple instances of the same signal. This is an effort to
@@ -263,150 +265,162 @@ TransporterFacade::deliver_signal(Signal
Tsent++;
Uint32 TpacketLen = (Theader & 0x1F) + 3;
tRecBlockNo = Theader >> 16;
- if (TpacketLen <= 25) {
- if ((TpacketLen + Tsent) <= Tlength) {
- /**
- * Set the data length of the signal and the receivers block
- * reference and then call the API.
- */
- header->theLength = TpacketLen;
- header->theReceiversBlockNumber = tRecBlockNo;
- Uint32* tDataPtr = &theData[Tsent];
- Tsent += TpacketLen;
- if (tRecBlockNo >= MIN_API_BLOCK_NO) {
- oe = m_threads.get(tRecBlockNo);
- if(oe.m_object != 0 && oe.m_executeFunction != 0){
- NdbApiSignal tmpSignal(*header);
- NdbApiSignal * tSignal = &tmpSignal;
- tSignal->setDataPtr(tDataPtr);
- (*oe.m_executeFunction)(oe.m_object, tSignal, 0);
- }
- }
- }
+ if (TpacketLen <= 25)
+ {
+ if ((TpacketLen + Tsent) <= Tlength)
+ {
+ /**
+ * Set the data length of the signal and the receivers block
+ * reference and then call the API.
+ */
+ header->theLength = TpacketLen;
+ header->theReceiversBlockNumber = tRecBlockNo;
+ Uint32* tDataPtr = &theData[Tsent];
+ Tsent += TpacketLen;
+ if (tRecBlockNo >= MIN_API_BLOCK_NO)
+ {
+ trp_client * clnt = m_threads.get(tRecBlockNo);
+ if(clnt != 0)
+ {
+ NdbApiSignal tmpSignal(*header);
+ NdbApiSignal * tSignal = &tmpSignal;
+ tSignal->setDataPtr(tDataPtr);
+ clnt->trp_deliver_signal(tSignal, 0);
+ }
+ }
+ }
+ }
+ }
+ return;
+ }
+ else if (tRecBlockNo == API_CLUSTERMGR)
+ {
+ /**
+ * The signal was aimed for the Cluster Manager.
+ * We handle it immediately here.
+ */
+ ClusterMgr * clusterMgr = theClusterMgr;
+ const Uint32 gsn = header->theVerId_signalNumber;
+
+ switch (gsn){
+ case GSN_API_REGREQ:
+ clusterMgr->execAPI_REGREQ(theData);
+ break;
+
+ case GSN_API_REGCONF:
+ {
+ clusterMgr->execAPI_REGCONF(theData);
+
+ // Distribute signal to all threads/blocks
+ NdbApiSignal tSignal(* header);
+ tSignal.setDataPtr(theData);
+ for_each(&tSignal, ptr);
+ break;
+ }
+
+ case GSN_API_REGREF:
+ clusterMgr->execAPI_REGREF(theData);
+ break;
+
+ case GSN_NODE_FAILREP:
+ clusterMgr->execNODE_FAILREP(theData);
+ break;
+
+ case GSN_NF_COMPLETEREP:
+ clusterMgr->execNF_COMPLETEREP(theData);
+ break;
+
+ case GSN_ARBIT_STARTREQ:
+ if (theArbitMgr != NULL)
+ theArbitMgr->doStart(theData);
+ break;
+
+ case GSN_ARBIT_CHOOSEREQ:
+ if (theArbitMgr != NULL)
+ theArbitMgr->doChoose(theData);
+ break;
+
+ case GSN_ARBIT_STOPORD:
+ if(theArbitMgr != NULL)
+ theArbitMgr->doStop(theData);
+ break;
+
+ case GSN_ALTER_TABLE_REP:
+ {
+ if (m_globalDictCache == NULL)
+ break;
+ const AlterTableRep* rep = (const AlterTableRep*)theData;
+ m_globalDictCache->lock();
+ m_globalDictCache->
+ alter_table_rep((const char*)ptr[0].p,
+ rep->tableId,
+ rep->tableVersion,
+ rep->changeType == AlterTableRep::CT_ALTERED);
+ m_globalDictCache->unlock();
+ break;
+ }
+ case GSN_SUB_GCP_COMPLETE_REP:
+ {
+ /**
+ * Report
+ */
+ NdbApiSignal tSignal(* header);
+ tSignal.setDataPtr(theData);
+ for_each(&tSignal, ptr);
+
+ /**
+ * Reply
+ */
+ {
+ Uint32* send= tSignal.getDataPtrSend();
+ memcpy(send, theData, tSignal.getLength() << 2);
+ CAST_PTR(SubGcpCompleteAck, send)->rep.senderRef =
+ numberToRef(API_CLUSTERMGR, theOwnId);
+ Uint32 ref= header->theSendersBlockRef;
+ Uint32 aNodeId= refToNode(ref);
+ tSignal.theReceiversBlockNumber= refToBlock(ref);
+ tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
+ sendSignalUnCond(&tSignal, aNodeId);
}
+ break;
+ }
+ case GSN_TAKE_OVERTCCONF:
+ {
+ /**
+ * Report
+ */
+ NdbApiSignal tSignal(* header);
+ tSignal.setDataPtr(theData);
+ for_each(&tSignal, ptr);
+ return;
+ }
+ default:
+ break;
+
}
return;
- } else if (tRecBlockNo == API_CLUSTERMGR) {
- /**
- * The signal was aimed for the Cluster Manager.
- * We handle it immediately here.
- */
- ClusterMgr * clusterMgr = theClusterMgr;
- const Uint32 gsn = header->theVerId_signalNumber;
-
- switch (gsn){
- case GSN_API_REGREQ:
- clusterMgr->execAPI_REGREQ(theData);
- break;
-
- case GSN_API_REGCONF:
- {
- clusterMgr->execAPI_REGCONF(theData);
-
- // Distribute signal to all threads/blocks
- NdbApiSignal tSignal(* header);
- tSignal.setDataPtr(theData);
- for_each(&tSignal, ptr);
- break;
- }
-
- case GSN_API_REGREF:
- clusterMgr->execAPI_REGREF(theData);
- break;
-
- case GSN_NODE_FAILREP:
- clusterMgr->execNODE_FAILREP(theData);
- break;
-
- case GSN_NF_COMPLETEREP:
- clusterMgr->execNF_COMPLETEREP(theData);
- break;
-
- case GSN_ARBIT_STARTREQ:
- if (theArbitMgr != NULL)
- theArbitMgr->doStart(theData);
- break;
-
- case GSN_ARBIT_CHOOSEREQ:
- if (theArbitMgr != NULL)
- theArbitMgr->doChoose(theData);
- break;
-
- case GSN_ARBIT_STOPORD:
- if(theArbitMgr != NULL)
- theArbitMgr->doStop(theData);
- break;
-
- case GSN_ALTER_TABLE_REP:
- {
- if (m_globalDictCache == NULL)
- break;
- const AlterTableRep* rep = (const AlterTableRep*)theData;
- m_globalDictCache->lock();
- m_globalDictCache->
- alter_table_rep((const char*)ptr[0].p,
- rep->tableId,
- rep->tableVersion,
- rep->changeType == AlterTableRep::CT_ALTERED);
- m_globalDictCache->unlock();
- break;
- }
- case GSN_SUB_GCP_COMPLETE_REP:
- {
- /**
- * Report
- */
- NdbApiSignal tSignal(* header);
- tSignal.setDataPtr(theData);
- for_each(&tSignal, ptr);
-
- /**
- * Reply
- */
- {
- Uint32* send= tSignal.getDataPtrSend();
- memcpy(send, theData, tSignal.getLength() << 2);
- CAST_PTR(SubGcpCompleteAck, send)->rep.senderRef =
- numberToRef(API_CLUSTERMGR, theOwnId);
- Uint32 ref= header->theSendersBlockRef;
- Uint32 aNodeId= refToNode(ref);
- tSignal.theReceiversBlockNumber= refToBlock(ref);
- tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
- sendSignalUnCond(&tSignal, aNodeId);
- }
- break;
- }
- case GSN_TAKE_OVERTCCONF:
- {
- /**
- * Report
- */
- NdbApiSignal tSignal(* header);
- tSignal.setDataPtr(theData);
- for_each(&tSignal, ptr);
- return;
- }
- default:
- break;
-
- }
- return;
- } else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
- tRecBlockNo <= MAX_API_FIXED_BLOCK_NO) {
+ }
+ else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
+ tRecBlockNo <= MAX_API_FIXED_BLOCK_NO)
+ {
Uint32 dynamic= m_fixed2dynamic[tRecBlockNo - MIN_API_FIXED_BLOCK_NO];
- oe = m_threads.get(dynamic);
- if (oe.m_object != 0 && oe.m_executeFunction != 0) {
+ trp_client * clnt = m_threads.get(dynamic);
+ if (clnt != 0)
+ {
NdbApiSignal tmpSignal(*header);
NdbApiSignal * tSignal = &tmpSignal;
tSignal->setDataPtr(theData);
- (* oe.m_executeFunction) (oe.m_object, tSignal, ptr);
+ clnt->trp_deliver_signal(tSignal, ptr);
}//if
- } else {
- ; // Ignore all other block numbers.
- if(header->theVerId_signalNumber != GSN_API_REGREQ) {
+ }
+ else
+ {
+ // Ignore all other block numbers.
+ if(header->theVerId_signalNumber != GSN_API_REGREQ)
+ {
TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
ndbout << "BLOCK NO: " << tRecBlockNo << " sig "
- << header->theVerId_signalNumber << endl;
+ << header->theVerId_signalNumber << endl;
abort();
}
}
@@ -877,13 +891,12 @@ void
TransporterFacade::for_each(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
{
Uint32 sz = m_threads.m_statusNext.size();
- TransporterFacade::ThreadData::Object_Execute oe;
for (Uint32 i = 0; i < sz ; i ++)
{
- oe = m_threads.m_objectExecute[i];
- if (m_threads.getInUse(i))
+ trp_client * clnt = m_threads.m_objectExecute[i];
+ if (clnt != 0)
{
- (* oe.m_executeFunction) (oe.m_object, aSignal, ptr);
+ clnt->trp_deliver_signal(aSignal, ptr);
}
}
}
@@ -893,11 +906,12 @@ TransporterFacade::connected()
{
DBUG_ENTER("TransporterFacade::connected");
Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
+ for (Uint32 i = 0; i < sz ; i ++)
+ {
+ trp_client * clnt = m_threads.m_objectExecute[i];
+ if (clnt != 0)
+ {
+ clnt->trp_node_status(numberToRef(indexToNumber(i), theOwnId), true, true);
}
}
DBUG_VOID_RETURN;
@@ -917,11 +931,12 @@ TransporterFacade::ReportNodeDead(NodeId
* might not have noticed the failure.
*/
Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, tNodeId, false, false);
+ for (Uint32 i = 0; i < sz ; i ++)
+ {
+ trp_client * clnt = m_threads.m_objectExecute[i];
+ if (clnt != 0)
+ {
+ clnt->trp_node_status(tNodeId, false, false);
}
}
DBUG_VOID_RETURN;
@@ -942,11 +957,12 @@ TransporterFacade::ReportNodeFailureComp
DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete");
DBUG_PRINT("enter",("nodeid= %d", tNodeId));
Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, tNodeId, false, true);
+ for (Uint32 i = 0; i < sz ; i ++)
+ {
+ trp_client * clnt = m_threads.m_objectExecute[i];
+ if (clnt != 0)
+ {
+ clnt->trp_node_status(tNodeId, false, true);
}
}
DBUG_VOID_RETURN;
@@ -965,11 +981,12 @@ TransporterFacade::ReportNodeAlive(NodeI
* might not have noticed the failure.
*/
Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, tNodeId, true, false);
+ for (Uint32 i = 0; i < sz ; i ++)
+ {
+ trp_client * clnt = m_threads.m_objectExecute[i];
+ if (clnt != 0)
+ {
+ clnt->trp_node_status(tNodeId, true, false);
}
}
}
@@ -990,29 +1007,30 @@ TransporterFacade::close_local(BlockNumb
}
int
-TransporterFacade::open(void* objRef,
- ExecuteFunction fun,
- NodeStatusFunction statusFun,
- int blockNo)
+TransporterFacade::open(trp_client * clnt, int blockNo)
{
DBUG_ENTER("TransporterFacade::open");
- int r= m_threads.open(objRef, fun, statusFun);
+ int r= m_threads.open(clnt);
if (r < 0)
+ {
DBUG_RETURN(r);
+ }
- if (unlikely(blockNo != -1)){
+ if (unlikely(blockNo != -1))
+ {
// Using fixed block number, add fixed->dymamic mapping
- Uint32 fixed_index= blockNo - MIN_API_FIXED_BLOCK_NO;
-
+ Uint32 fixed_index = blockNo - MIN_API_FIXED_BLOCK_NO;
+
assert(blockNo >= MIN_API_FIXED_BLOCK_NO &&
fixed_index <= NO_API_FIXED_BLOCKS);
-
+
m_fixed2dynamic[fixed_index]= r;
}
#if 1
- if (theOwnId > 0) {
- (*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
+ if (theOwnId > 0)
+ {
+ clnt->trp_node_status(numberToRef(r, theOwnId), true, true);
}
#endif
DBUG_RETURN(r);
@@ -1746,12 +1764,10 @@ TransporterFacade::ThreadData::ThreadDat
void
TransporterFacade::ThreadData::expand(Uint32 size){
- Object_Execute oe = { 0 ,0 };
- NodeStatusFunction fun = 0;
+ trp_client * oe = 0;
const Uint32 sz = m_statusNext.size();
m_objectExecute.fill(sz + size, oe);
- m_statusFunction.fill(sz + size, fun);
for(Uint32 i = 0; i<size; i++){
m_statusNext.push_back(sz + i + 1);
}
@@ -1762,9 +1778,7 @@ TransporterFacade::ThreadData::expand(Ui
int
-TransporterFacade::ThreadData::open(void* objRef,
- ExecuteFunction fun,
- NodeStatusFunction fun2)
+TransporterFacade::ThreadData::open(trp_client * clnt)
{
Uint32 nextFree = m_firstFree;
@@ -1780,11 +1794,8 @@ TransporterFacade::ThreadData::open(void
m_use_cnt++;
m_firstFree = m_statusNext[nextFree];
- Object_Execute oe = { objRef , fun };
-
m_statusNext[nextFree] = INACTIVE;
- m_objectExecute[nextFree] = oe;
- m_statusFunction[nextFree] = fun2;
+ m_objectExecute[nextFree] = clnt;
return indexToNumber(nextFree);
}
@@ -1792,14 +1803,12 @@ TransporterFacade::ThreadData::open(void
int
TransporterFacade::ThreadData::close(int number){
number= numberToIndex(number);
- assert(getInUse(number));
+ assert(m_objectExecute[number] != 0);
m_statusNext[number] = m_firstFree;
assert(m_use_cnt);
m_use_cnt--;
m_firstFree = number;
- Object_Execute oe = { 0, 0 };
- m_objectExecute[number] = oe;
- m_statusFunction[number] = 0;
+ m_objectExecute[number] = 0;
return 0;
}
@@ -1982,8 +1991,7 @@ void PollGuard::unlock_and_signal()
m_locked=false;
}
-template class Vector<NodeStatusFunction>;
-template class Vector<TransporterFacade::ThreadData::Object_Execute>;
+template class Vector<trp_client*>;
#include "SignalSender.hpp"
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2010-09-30 11:52:49 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2010-10-01 07:38:17 +0000
@@ -35,9 +35,7 @@ struct ndb_mgm_configuration;
class Ndb;
class NdbApiSignal;
class NdbWaiter;
-
-typedef void (* ExecuteFunction)(void *, const NdbApiSignal *, const LinearSectionPtr ptr[3]);
-typedef void (* NodeStatusFunction)(void *, Uint32, bool nodeAlive, bool nfComplete);
+class trp_client;
extern "C" {
void* runSendRequest_C(void*);
@@ -69,8 +67,7 @@ public:
* @blockNo block number to use, -1 => any blockNumber
* @return BlockNumber or -1 for failure
*/
- int open(void* objRef, ExecuteFunction, NodeStatusFunction,
- int blockNo = -1);
+ int open(trp_client*, int blockNo = -1);
// Close this block number
int close(BlockNumber blockNumber);
@@ -262,43 +259,22 @@ private:
ThreadData(Uint32 initialSize = 32);
- /**
- * Split "object" into 3 list
- * This to improve locality
- * when iterating over lists
- */
- struct Object_Execute {
- void * m_object;
- ExecuteFunction m_executeFunction;
- };
- struct NodeStatus_NextFree {
- NodeStatusFunction m_statusFunction;
- };
-
Uint32 m_use_cnt;
Uint32 m_firstFree;
Vector<Uint32> m_statusNext;
- Vector<Object_Execute> m_objectExecute;
- Vector<NodeStatusFunction> m_statusFunction;
+ Vector<trp_client*> m_objectExecute;
- int open(void* objRef, ExecuteFunction, NodeStatusFunction);
+ int open(trp_client*);
int close(int number);
void expand(Uint32 size);
- inline Object_Execute get(Uint16 blockNo) const {
+ inline trp_client* get(Uint16 blockNo) const {
blockNo -= MIN_API_BLOCK_NO;
- if(likely (blockNo < m_objectExecute.size())){
- return m_objectExecute[blockNo];
+ if(likely (blockNo < m_objectExecute.size()))
+ {
+ return m_objectExecute.getBase()[blockNo];
}
- Object_Execute oe = { 0, 0 };
- return oe;
- }
-
- /**
- * Is the block number used currently
- */
- inline bool getInUse(Uint16 index) const {
- return (m_statusNext[index] & (1 << 16)) != 0;
+ return 0;
}
} m_threads;
Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20101001073817-m85t3bdn75a7jh6m.bundle
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.0 branch (jonas:3813 to 3814) | Jonas Oreland | 1 Oct |