#At file:///home/jonas/src/telco-7.1/ based on revid:martin.skold@stripped
3972 jonas oreland 2010-11-10 [merge]
ndb - merge 70 to 71
modified:
storage/ndb/include/ndbapi/NdbScanOperation.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/src/common/transporter/Packer.cpp
storage/ndb/src/common/transporter/Packer.hpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
storage/ndb/src/ndbapi/NdbImpl.hpp
storage/ndb/src/ndbapi/NdbOperationExec.cpp
storage/ndb/src/ndbapi/NdbScanOperation.cpp
storage/ndb/src/ndbapi/NdbTransaction.cpp
storage/ndb/src/ndbapi/Ndbif.cpp
storage/ndb/src/ndbapi/SignalSender.hpp
storage/ndb/src/ndbapi/TransporterFacade.cpp
storage/ndb/src/ndbapi/TransporterFacade.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
storage/ndb/src/ndbapi/trp_client.hpp
storage/ndb/tools/restore/Restore.cpp
storage/ndb/tools/restore/Restore.hpp
storage/ndb/tools/restore/consumer_restore.cpp
storage/ndb/tools/restore/restore_main.cpp
=== modified file 'storage/ndb/include/ndbapi/NdbScanOperation.hpp'
--- a/storage/ndb/include/ndbapi/NdbScanOperation.hpp 2010-06-08 14:37:15 +0000
+++ b/storage/ndb/include/ndbapi/NdbScanOperation.hpp 2010-11-09 20:40:03 +0000
@@ -495,7 +495,7 @@ protected:
bool fetchAllowed, bool forceSend);
virtual void release();
- int close_impl(class TransporterFacade*, bool forceSend,
+ int close_impl(bool forceSend,
PollGuard *poll_guard);
/* Helper for NdbScanFilter to allocate an InterpretedCode
=== modified file 'storage/ndb/include/transporter/TransporterRegistry.hpp'
--- a/storage/ndb/include/transporter/TransporterRegistry.hpp 2010-10-07 09:36:21 +0000
+++ b/storage/ndb/include/transporter/TransporterRegistry.hpp 2010-11-09 20:40:03 +0000
@@ -272,7 +272,7 @@ public:
const SignalHeader * const signalHeader, Uint8 prio,
const Uint32 * const signalData,
NodeId nodeId,
- GenericSectionPtr ptr[3]);
+ const GenericSectionPtr ptr[3]);
/**
* Backwards compatiple methods with default send buffer handling.
*/
@@ -294,7 +294,7 @@ public:
SendStatus prepareSend(const SignalHeader * const signalHeader, Uint8 prio,
const Uint32 * const signalData,
NodeId nodeId,
- GenericSectionPtr ptr[3])
+ const GenericSectionPtr ptr[3])
{
return prepareSend(this, signalHeader, prio, signalData, nodeId, ptr);
}
=== modified file 'storage/ndb/src/common/transporter/Packer.cpp'
--- a/storage/ndb/src/common/transporter/Packer.cpp 2010-11-04 14:57:04 +0000
+++ b/storage/ndb/src/common/transporter/Packer.cpp 2010-11-09 20:40:03 +0000
@@ -403,7 +403,7 @@ import(Uint32 * & insertPtr, const Linea
inline
void
-importGeneric(Uint32 * & insertPtr, GenericSectionPtr & ptr){
+importGeneric(Uint32 * & insertPtr, const GenericSectionPtr & ptr){
/* Use the section iterator to obtain the words in this section */
Uint32 remain= ptr.sz;
@@ -553,7 +553,7 @@ Packer::pack(Uint32 * insertPtr,
Uint32 prio,
const SignalHeader * header,
const Uint32 * theData,
- GenericSectionPtr ptr[3]) const {
+ const GenericSectionPtr ptr[3]) const {
Uint32 i;
Uint32 dataLen32 = header->theLength;
=== modified file 'storage/ndb/src/common/transporter/Packer.hpp'
--- a/storage/ndb/src/common/transporter/Packer.hpp 2009-05-27 15:21:45 +0000
+++ b/storage/ndb/src/common/transporter/Packer.hpp 2010-11-09 20:40:03 +0000
@@ -63,7 +63,7 @@ public:
Uint32 prio,
const SignalHeader* header,
const Uint32* data,
- GenericSectionPtr ptr[3]) const ;
+ const GenericSectionPtr ptr[3]) const ;
};
inline
=== modified file 'storage/ndb/src/common/transporter/TransporterRegistry.cpp'
--- a/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2010-10-07 09:36:21 +0000
+++ b/storage/ndb/src/common/transporter/TransporterRegistry.cpp 2010-11-09 20:40:03 +0000
@@ -760,7 +760,7 @@ TransporterRegistry::prepareSend(Transpo
Uint8 prio,
const Uint32 * const signalData,
NodeId nodeId,
- GenericSectionPtr ptr[3]){
+ const GenericSectionPtr ptr[3]){
Transporter *t = theTransporters[nodeId];
=== modified file 'storage/ndb/src/ndbapi/Ndb.cpp'
--- a/storage/ndb/src/ndbapi/Ndb.cpp 2010-09-30 14:27:18 +0000
+++ b/storage/ndb/src/ndbapi/Ndb.cpp 2010-11-09 20:40:03 +0000
@@ -144,8 +144,7 @@ Ndb::NDB_connect(Uint32 tNode)
DBUG_ENTER("Ndb::NDB_connect");
{
- TransporterFacade *tp = theImpl->m_transporter_facade;
- if (tp->get_node_stopping(tNode))
+ if (theImpl->get_node_stopping(tNode))
{
DBUG_RETURN(0);
}
@@ -968,8 +967,7 @@ Ndb::closeTransaction(NdbTransaction* aC
* NOTE: It's ok to call getNodeSequence() here wo/ having mutex,
*/
Uint32 nodeId = aConnection->getConnectedNodeId();
- TransporterFacade* tp = theImpl->m_transporter_facade;
- Uint32 seq = tp->getNodeSequence(nodeId);
+ Uint32 seq = theImpl->getNodeSequence(nodeId);
if (aConnection->theNodeSequence != seq)
{
aConnection->theReleaseOnClose = true;
=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2010-11-01 10:11:47 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp 2010-11-09 20:40:03 +0000
@@ -2183,7 +2183,7 @@ int
NdbDictInterface::dictSignal(NdbApiSignal* sig,
LinearSectionPtr ptr[3], int secs,
int node_specification,
- WaitSignalType wst,
+ Uint32 wst,
int timeout, Uint32 RETRIES,
const int *errcodes, int temporaryMask)
{
@@ -2225,7 +2225,7 @@ NdbDictInterface::dictSignal(NdbApiSigna
Uint32 node;
switch(node_specification){
case 0:
- node = (getTransporter()->get_node_alive(m_masterNodeId) ? m_masterNodeId :
+ node = (m_impl->get_node_alive(m_masterNodeId) ? m_masterNodeId :
(m_masterNodeId = getTransporter()->get_an_alive_node()));
break;
case -1:
@@ -8116,9 +8116,9 @@ NdbDictInterface::checkAllNodeVersionsMi
{
for (Uint32 nodeId = 1; nodeId < MAX_NODES; nodeId++)
{
- if (getTransporter()->getIsDbNode(nodeId) &&
- getTransporter()->getIsNodeSendable(nodeId) &&
- (getTransporter()->getNodeNdbVersion(nodeId) <
+ if (m_impl->getIsDbNode(nodeId) &&
+ m_impl->getIsNodeSendable(nodeId) &&
+ (m_impl->getNodeNdbVersion(nodeId) <
minNdbVersion))
{
/* At least 1 sendable data node has lower-than-min
=== modified file 'storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2010-10-04 19:22:21 +0000
+++ b/storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp 2010-11-09 20:40:03 +0000
@@ -33,7 +33,6 @@
#include <Ndb.hpp>
#include "DictCache.hpp"
#include <signaldata/DictSignal.hpp>
-#include "NdbWaiter.hpp"
class ListTablesReq;
@@ -611,7 +610,7 @@ public:
// To abstract the stuff thats made in all create/drop/lists below
int dictSignal(NdbApiSignal* signal, LinearSectionPtr ptr[3], int secs,
int nodeId, // -1 any, 0 = master, >1 = specified
- WaitSignalType wst,
+ Uint32 waitsignaltype,
int timeout, Uint32 RETRIES,
const int *errcodes = 0, int temporaryMask = 0);
=== modified file 'storage/ndb/src/ndbapi/NdbImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbImpl.hpp 2010-10-01 10:08:29 +0000
+++ b/storage/ndb/src/ndbapi/NdbImpl.hpp 2010-11-09 20:40:03 +0000
@@ -30,6 +30,8 @@
#include "NdbDictionaryImpl.hpp"
#include "ObjectMap.hpp"
#include "trp_client.hpp"
+#include "trp_node.hpp"
+#include "NdbWaiter.hpp"
template <class T>
struct Ndb_free_list_t
@@ -62,7 +64,7 @@ public:
Ndb * m_next_ndb_object, * m_prev_ndb_object;
Ndb_cluster_connection_impl &m_ndb_cluster_connection;
- TransporterFacade *m_transporter_facade;
+ TransporterFacade * const m_transporter_facade;
NdbDictionaryImpl m_dictionary;
@@ -151,6 +153,17 @@ public:
virtual void trp_deliver_signal(const NdbApiSignal*,
const LinearSectionPtr p[3]);
virtual void trp_node_status(Uint32, Uint32);
+
+ // Is node available for running transactions
+ bool get_node_alive(NodeId nodeId) const;
+ bool get_node_stopping(NodeId nodeId) const;
+ bool getIsDbNode(NodeId nodeId) const;
+ bool getIsNodeSendable(NodeId nodeId) const;
+ Uint32 getNodeGrp(NodeId nodeId) const;
+ Uint32 getNodeSequence(NodeId nodeId) const;
+ Uint32 getNodeNdbVersion(NodeId nodeId) const;
+ Uint32 getMinDbNodeVersion() const;
+ bool check_send_size(Uint32 node_id, Uint32 send_size) const { return true;}
};
#ifdef VM_TRACE
@@ -372,4 +385,62 @@ Ndb_free_list_t<T>::release(Uint32 cnt,
#endif
}
+inline
+bool
+NdbImpl::getIsDbNode(NodeId n) const {
+ return
+ getNodeInfo(n).defined &&
+ getNodeInfo(n).m_info.m_type == NodeInfo::DB;
+}
+
+inline
+Uint32
+NdbImpl::getNodeGrp(NodeId n) const {
+ return getNodeInfo(n).m_state.nodeGroup;
+}
+
+
+inline
+bool
+NdbImpl::get_node_alive(NodeId n) const {
+ return getNodeInfo(n).m_alive;
+}
+
+inline
+bool
+NdbImpl::get_node_stopping(NodeId n) const {
+ const trp_node & node = getNodeInfo(n);
+ assert(node.m_info.getType() == NodeInfo::DB);
+ return (!node.m_state.getSingleUserMode() &&
+ node.m_state.startLevel >= NodeState::SL_STOPPING_1);
+}
+
+inline
+bool
+NdbImpl::getIsNodeSendable(NodeId n) const {
+ const trp_node & node = getNodeInfo(n);
+ const Uint32 startLevel = node.m_state.startLevel;
+ const NodeInfo::NodeType node_type = node.m_info.getType();
+ assert(node_type == NodeInfo::DB ||
+ node_type == NodeInfo::MGM);
+
+ return node.compatible && (startLevel == NodeState::SL_STARTED ||
+ startLevel == NodeState::SL_STOPPING_1 ||
+ node.m_state.getSingleUserMode() ||
+ node_type == NodeInfo::MGM);
+}
+
+inline
+Uint32
+NdbImpl::getNodeSequence(NodeId n) const {
+ return getNodeInfo(n).m_info.m_connectCount;
+}
+
+inline
+Uint32
+NdbImpl::getNodeNdbVersion(NodeId n) const
+{
+ return getNodeInfo(n).m_info.m_version;
+}
+
#endif
=== modified file 'storage/ndb/src/ndbapi/NdbOperationExec.cpp'
--- a/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2010-11-04 15:02:25 +0000
+++ b/storage/ndb/src/ndbapi/NdbOperationExec.cpp 2010-11-09 20:40:03 +0000
@@ -136,16 +136,15 @@ NdbOperation::doSendKeyReq(int aNodeId,
* we can send signal trains instead.
*/
NdbApiSignal* request = theTCREQ;
- TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
- Uint32 tcNodeVersion = tp->getNodeNdbVersion(aNodeId);
- bool forceShort = false;
- forceShort = theNdb->theImpl->forceShortRequests;
+ NdbImpl* impl = theNdb->theImpl;
+ Uint32 tcNodeVersion = impl->getNodeNdbVersion(aNodeId);
+ bool forceShort = impl->forceShortRequests;
bool sendLong = ( tcNodeVersion >= NDBD_LONG_TCKEYREQ ) &&
! forceShort;
if (sendLong)
{
- return tp->sendSignal(request, aNodeId, secs, numSecs);
+ return impl->sendSignal(request, aNodeId, secs, numSecs);
}
else
{
@@ -187,7 +186,7 @@ NdbOperation::doSendKeyReq(int aNodeId,
request->setLength(reqLen);
- if (tp->sendSignal(request, aNodeId) == -1)
+ if (impl->sendSignal(request, aNodeId) == -1)
return -1;
keyInfoLen -= keyInfoInReq;
@@ -209,7 +208,7 @@ NdbOperation::doSendKeyReq(int aNodeId,
keyInfoReader.copyNWords(&keyInfo->keyData[0], dataWords);
request->setLength(KeyInfo::HeaderLength + dataWords);
- if (tp->sendSignal(request, aNodeId) == -1)
+ if (impl->sendSignal(request, aNodeId) == -1)
return -1;
keyInfoLen-= dataWords;
@@ -233,7 +232,7 @@ NdbOperation::doSendKeyReq(int aNodeId,
attrInfoReader.copyNWords(&attrInfo->attrData[0], dataWords);
request->setLength(AttrInfo::HeaderLength + dataWords);
- if (tp->sendSignal(request, aNodeId) == -1)
+ if (impl->sendSignal(request, aNodeId) == -1)
return -1;
attrInfoLen-= dataWords;
=== modified file 'storage/ndb/src/ndbapi/NdbScanOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2010-10-20 17:41:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbScanOperation.cpp 2010-11-09 20:40:03 +0000
@@ -1667,7 +1667,7 @@ NdbScanOperation::executeCursor(int node
* proceeding
*/
bool locked = false;
- TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ NdbImpl* theImpl = theNdb->theImpl;
int res = 0;
if (m_scanUsingOldApi && finaliseScanOldApi() == -1)
@@ -1679,12 +1679,12 @@ NdbScanOperation::executeCursor(int node
{
locked = true;
NdbTransaction * tCon = theNdbCon;
- NdbMutex_Lock(tp->theMutexPtr);
+ theImpl->lock();
Uint32 seq = tCon->theNodeSequence;
- if (tp->get_node_alive(nodeId) &&
- (tp->getNodeSequence(nodeId) == seq)) {
+ if (theImpl->get_node_alive(nodeId) &&
+ (theImpl->getNodeSequence(nodeId) == seq)) {
tCon->theMagicNumber = 0x37412619;
@@ -1698,8 +1698,8 @@ NdbScanOperation::executeCursor(int node
}
else
{
- if (!(tp->get_node_stopping(nodeId) &&
- (tp->getNodeSequence(nodeId) == seq)))
+ if (!(theImpl->get_node_stopping(nodeId) &&
+ (theImpl->getNodeSequence(nodeId) == seq)))
{
TRACE_DEBUG("The node is hard dead when attempting to start a scan");
setErrorCode(4029);
@@ -1731,7 +1731,7 @@ done:
}
if (locked)
- NdbMutex_Unlock(tp->theMutexPtr);
+ theImpl->unlock();
return res;
}
@@ -1845,15 +1845,15 @@ NdbScanOperation::nextResultNdbRecord(co
/* Now we have to wait for more rows (or end-of-file on all receivers). */
Uint32 nodeId = theNdbCon->theDBnode;
- Uint32 timeout= theNdb->theImpl->get_waitfor_timeout();
- TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
+ NdbImpl* theImpl = theNdb->theImpl;
+ Uint32 timeout= theImpl->get_waitfor_timeout();
int retVal= 2;
Uint32 idx, last;
/*
The rest needs to be done under mutex due to synchronization with receiver
thread.
*/
- PollGuard poll_guard(* theNdb->theImpl);
+ PollGuard poll_guard(* theImpl);
const Uint32 seq= theNdbCon->theNodeSequence;
@@ -1862,7 +1862,7 @@ NdbScanOperation::nextResultNdbRecord(co
goto err4;
}
- if(seq == tp->getNodeSequence(nodeId) &&
+ if(seq == theImpl->getNodeSequence(nodeId) &&
send_next_scan(m_current_api_receiver, false) == 0)
{
idx= m_current_api_receiver;
@@ -1888,7 +1888,7 @@ NdbScanOperation::nextResultNdbRecord(co
{
/* No completed... */
int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
- if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
+ if (ret_code == 0 && seq == theImpl->getNodeSequence(nodeId)) {
continue;
} else if(ret_code == -1){
retVal= -1;
@@ -1992,16 +1992,16 @@ NdbScanOperation::send_next_scan(Uint32
if(sent)
{
Uint32 nodeId = theNdbCon->theDBnode;
- TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
+ NdbImpl* impl = theNdb->theImpl;
if(cnt > 21){
tSignal.setLength(4);
LinearSectionPtr ptr[3];
ptr[0].p = prep_array;
ptr[0].sz = sent;
- ret = tp->sendSignal(&tSignal, nodeId, ptr, 1);
+ ret = impl->sendSignal(&tSignal, nodeId, ptr, 1);
} else {
tSignal.setLength(4+sent);
- ret = tp->sendSignal(&tSignal, nodeId);
+ ret = impl->sendSignal(&tSignal, nodeId);
}
}
m_sent_receivers_count = last + sent;
@@ -2045,7 +2045,6 @@ void NdbScanOperation::close(bool forceS
m_conf_receivers_count,
m_sent_receivers_count);
- TransporterFacade* tp = theNdb->theImpl->m_transporter_facade;
/*
The PollGuard has an implicit call of unlock_and_signal through the
~PollGuard method. This method is called implicitly by the compiler
@@ -2053,7 +2052,7 @@ void NdbScanOperation::close(bool forceS
break, continue or simply end of statement block
*/
PollGuard poll_guard(* theNdb->theImpl);
- close_impl(tp, forceSend, &poll_guard);
+ close_impl(forceSend, &poll_guard);
}
// Keep in local variables, as "this" might be destructed below
@@ -2424,21 +2423,19 @@ NdbScanOperation::doSendScan(int aProces
numSections= 3;
}
- TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
-
- Uint32 tcNodeVersion = tp->getNodeNdbVersion(aProcessorId);
- bool forceShort = false;
- forceShort = theNdb->theImpl->forceShortRequests;
+ NdbImpl* impl = theNdb->theImpl;
+ Uint32 tcNodeVersion = impl->getNodeNdbVersion(aProcessorId);
+ bool forceShort = impl->forceShortRequests;
bool sendLong = ( tcNodeVersion >= NDBD_LONG_SCANTABREQ) &&
! forceShort;
if (sendLong)
{
/* Send Fragmented as SCAN_TABREQ can be large */
- if (tp->sendFragmentedSignal(theSCAN_TABREQ,
- aProcessorId,
- &secs[0],
- numSections) == -1)
+ if (impl->sendFragmentedSignal(theSCAN_TABREQ,
+ aProcessorId,
+ &secs[0],
+ numSections) == -1)
{
setErrorCode(4002);
return -1;
@@ -2462,7 +2459,7 @@ NdbScanOperation::doSendScan(int aProces
scanTabReq->attrLenKeyLen = (keyInfoLen << 16) | attrInfoLen;
/* Send with receiver Ids as first and only section */
- if (tp->sendSignal(theSCAN_TABREQ, aProcessorId, &secs[0], 1) == -1)
+ if (impl->sendSignal(theSCAN_TABREQ, aProcessorId, &secs[0], 1) == -1)
{
setErrorCode(4002);
return -1;
@@ -2483,7 +2480,7 @@ NdbScanOperation::doSendScan(int aProces
keyInfoReader.copyNWords(&keyInfo->keyData[0], dataWords);
theSCAN_TABREQ->setLength(KeyInfo::HeaderLength + dataWords);
- if (tp->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
+ if (impl->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
{
setErrorCode(4002);
return -1;
@@ -2505,7 +2502,7 @@ NdbScanOperation::doSendScan(int aProces
attrInfoReader.copyNWords(&attrInfo->attrData[0], dataWords);
theSCAN_TABREQ->setLength(AttrInfo::HeaderLength + dataWords);
- if (tp->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
+ if (impl->sendSignal(theSCAN_TABREQ, aProcessorId) == -1)
{
setErrorCode(4002);
return -1;
@@ -3726,22 +3723,22 @@ NdbIndexScanOperation::ordered_insert_re
int
NdbIndexScanOperation::ordered_send_scan_wait_for_all(bool forceSend)
{
- Uint32 timeout= theNdb->theImpl->get_waitfor_timeout();
- TransporterFacade* tp= theNdb->theImpl->m_transporter_facade;
+ NdbImpl* impl = theNdb->theImpl;
+ Uint32 timeout= impl->get_waitfor_timeout();
- PollGuard poll_guard(* theNdb->theImpl);
+ PollGuard poll_guard(* impl);
if(theError.code)
return -1;
Uint32 seq= theNdbCon->theNodeSequence;
Uint32 nodeId= theNdbCon->theDBnode;
- if (seq == tp->getNodeSequence(nodeId) &&
+ if (seq == impl->getNodeSequence(nodeId) &&
!send_next_scan_ordered(m_current_api_receiver))
{
while (m_sent_receivers_count > 0 && !theError.code)
{
int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
- if (ret_code == 0 && seq == tp->getNodeSequence(nodeId))
+ if (ret_code == 0 && seq == impl->getNodeSequence(nodeId))
continue;
if(ret_code == -1){
setErrorCode(4008);
@@ -3814,21 +3811,21 @@ NdbIndexScanOperation::send_next_scan_or
m_sent_receivers_count = last + 1;
Uint32 nodeId = theNdbCon->theDBnode;
- TransporterFacade * tp = theNdb->theImpl->m_transporter_facade;
+ NdbImpl * impl = theNdb->theImpl;
tSignal.setLength(4+1);
- int ret= tp->sendSignal(&tSignal, nodeId);
+ int ret= impl->sendSignal(&tSignal, nodeId);
return ret;
}
int
-NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
- PollGuard *poll_guard)
+NdbScanOperation::close_impl(bool forceSend, PollGuard *poll_guard)
{
- Uint32 timeout= theNdb->theImpl->get_waitfor_timeout();
+ NdbImpl* impl = theNdb->theImpl;
+ Uint32 timeout= impl->get_waitfor_timeout();
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
- if(seq != tp->getNodeSequence(nodeId))
+ if (seq != impl->getNodeSequence(nodeId))
{
theNdbCon->theReleaseOnClose = true;
return -1;
=== modified file 'storage/ndb/src/ndbapi/NdbTransaction.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransaction.cpp 2010-09-30 14:27:18 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransaction.cpp 2010-11-09 20:40:03 +0000
@@ -843,10 +843,9 @@ NdbTransaction::sendTC_HBREP() // Send
tcHbRep->transId1 = tTransId1;
tcHbRep->transId2 = tTransId2;
- TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
- tp->lock_mutex();
- const int res = tp->sendSignal(tSignal,theDBnode);
- tp->unlock_mutex();
+ tNdb->theImpl->lock();
+ const int res = tNdb->theImpl->sendSignal(tSignal,theDBnode);
+ tNdb->theImpl->unlock();
tNdb->releaseSignal(tSignal);
if (res == -1){
@@ -949,7 +948,7 @@ NdbTransaction::sendROLLBACK() // S
*************************************************************************/
NdbApiSignal tSignal(tNdb->theMyRef);
Uint32 tTransId1, tTransId2;
- TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
+ NdbImpl * impl = theNdb->theImpl;
int tReturnCode;
tTransId1 = (Uint32) theTransactionId;
@@ -964,7 +963,7 @@ NdbTransaction::sendROLLBACK() // S
tSignal.setLength(tSignal.getLength() + 1); // + flags
tSignal.setData(0x1, 4); // potentially bad data
}
- tReturnCode = tp->sendSignal(&tSignal,theDBnode);
+ tReturnCode = impl->sendSignal(&tSignal,theDBnode);
if (tReturnCode != -1) {
theSendStatus = sendTC_ROLLBACK;
tNdb->insert_sent_list(this);
@@ -1002,7 +1001,7 @@ NdbTransaction::sendCOMMIT() // Send
{
NdbApiSignal tSignal(theNdb->theMyRef);
Uint32 tTransId1, tTransId2;
- TransporterFacade *tp = theNdb->theImpl->m_transporter_facade;
+ NdbImpl * impl = theNdb->theImpl;
int tReturnCode;
tTransId1 = (Uint32) theTransactionId;
@@ -1012,7 +1011,7 @@ NdbTransaction::sendCOMMIT() // Send
tSignal.setData(tTransId1, 2);
tSignal.setData(tTransId2, 3);
- tReturnCode = tp->sendSignal(&tSignal,theDBnode);
+ tReturnCode = impl->sendSignal(&tSignal,theDBnode);
if (tReturnCode != -1) {
theSendStatus = sendTC_COMMIT;
theNdb->insert_sent_list(this);
=== modified file 'storage/ndb/src/ndbapi/Ndbif.cpp'
--- a/storage/ndb/src/ndbapi/Ndbif.cpp 2010-10-06 12:35:34 +0000
+++ b/storage/ndb/src/ndbapi/Ndbif.cpp 2010-11-09 20:40:03 +0000
@@ -176,15 +176,9 @@ void Ndb::connected(Uint32 ref)
assert(theMyRef == numberToRef(theNdbBlockNumber, tmpTheNode));
}
- TransporterFacade * theFacade = theImpl->m_transporter_facade;
- int i, n= 0;
- for (i = 1; i < MAX_NDB_NODES; i++){
- if (theFacade->getIsDbNode(i)){
- theImpl->theDBnodes[n] = i;
- n++;
- }
- }
- theImpl->theNoOfDBnodes = n;
+ Uint32 cnt =
+ theImpl->m_ndb_cluster_connection.get_db_nodes(theImpl->theDBnodes);
+ theImpl->theNoOfDBnodes = cnt;
theFirstTransId += ((Uint64)tBlockNo << 52)+
((Uint64)tmpTheNode << 40);
@@ -1138,15 +1132,14 @@ Ndb::sendPrepTrans(int forceSend)
and we keep a small space for messages like that.
*/
Uint32 i;
- TransporterFacade* tp = theImpl->m_transporter_facade;
theCachedMinDbNodeVersion = theImpl->m_transporter_facade->getMinDbNodeVersion();
Uint32 no_of_prep_trans = theNoOfPreparedTransactions;
for (i = 0; i < no_of_prep_trans; i++) {
NdbTransaction * a_con = thePreparedTransactionsArray[i];
thePreparedTransactionsArray[i] = NULL;
Uint32 node_id = a_con->getConnectedNodeId();
- if ((tp->getNodeSequence(node_id) == a_con->theNodeSequence) &&
- (tp->get_node_alive(node_id) || tp->get_node_stopping(node_id)))
+ if ((theImpl->getNodeSequence(node_id) == a_con->theNodeSequence) &&
+ (theImpl->get_node_alive(node_id) || theImpl->get_node_stopping(node_id)))
{
/*
We will send if
@@ -1156,7 +1149,7 @@ Ndb::sendPrepTrans(int forceSend)
of all transactions and commits and thus we allow aborts and
commits to continue but not normal operations.
*/
- if (tp->check_send_size(node_id, a_con->get_send_size())) {
+ if (theImpl->check_send_size(node_id, a_con->get_send_size())) {
if (a_con->doSend() == 0) {
NDB_TICKS current_time = NdbTick_CurrentMillisecond();
a_con->theStartTransTime = current_time;
@@ -1209,11 +1202,7 @@ Ndb::sendPrepTrans(int forceSend)
insert_completed_list(a_con);
}//for
theNoOfPreparedTransactions = 0;
- if (forceSend == 0) {
- tp->checkForceSend(theNdbBlockNumber);
- } else if (forceSend == 1) {
- tp->forceSend(theNdbBlockNumber);
- }//if
+ theImpl->forceSend(forceSend);
return;
}//Ndb::sendPrepTrans()
@@ -1352,7 +1341,6 @@ Ndb::sendRecSignal(Uint16 node_id,
int return_code;
Uint32 read_conn_seq;
- TransporterFacade* tp = theImpl->m_transporter_facade;
Uint32 send_size = 1; // Always sends one signal only
// Protected area
/*
@@ -1362,14 +1350,14 @@ Ndb::sendRecSignal(Uint16 node_id,
break, continue or simply end of statement block
*/
PollGuard poll_guard(* theImpl);
- read_conn_seq= tp->getNodeSequence(node_id);
+ read_conn_seq= theImpl->getNodeSequence(node_id);
if (ret_conn_seq)
*ret_conn_seq= read_conn_seq;
- if ((tp->get_node_alive(node_id)) &&
+ if ((theImpl->get_node_alive(node_id)) &&
((read_conn_seq == conn_seq) ||
(conn_seq == 0))) {
- if (tp->check_send_size(node_id, send_size)) {
- return_code = tp->sendSignal(aSignal, node_id);
+ if (theImpl->check_send_size(node_id, send_size)) {
+ return_code = theImpl->sendSignal(aSignal, node_id);
if (return_code != -1) {
return poll_guard.wait_n_unlock(WAITFOR_RESPONSE_TIMEOUT,node_id,
aWaitState, false);
@@ -1380,7 +1368,7 @@ Ndb::sendRecSignal(Uint16 node_id,
return_code = -4;
}//if
} else {
- if ((tp->get_node_stopping(node_id)) &&
+ if ((theImpl->get_node_stopping(node_id)) &&
((read_conn_seq == conn_seq) ||
(conn_seq == 0))) {
return_code = -5;
@@ -1437,9 +1425,9 @@ NdbImpl::send_event_report(bool has_lock
m_ndb_cluster_connection.init_get_next_node(node_iter);
while ((tNode= m_ndb_cluster_connection.get_next_node(node_iter)))
{
- if(tp->get_node_alive(tNode))
+ if(get_node_alive(tNode))
{
- tp->sendSignal(&aSignal, tNode);
+ sendSignal(&aSignal, tNode);
goto done;
}
}
=== modified file 'storage/ndb/src/ndbapi/SignalSender.hpp'
--- a/storage/ndb/src/ndbapi/SignalSender.hpp 2010-10-13 12:22:51 +0000
+++ b/storage/ndb/src/ndbapi/SignalSender.hpp 2010-11-09 20:40:03 +0000
@@ -116,7 +116,7 @@ public:
Uint32 get_an_alive_node() const { return theFacade->get_an_alive_node(); }
Uint32 getAliveNode() const { return get_an_alive_node(); }
- bool get_node_alive(NodeId n) const { return theFacade->get_node_alive(n); }
+ bool get_node_alive(Uint32 n) const { return getNodeInfo(n).m_alive; }
private:
int m_blockNo;
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.cpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.cpp 2010-11-04 14:57:04 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.cpp 2010-11-09 20:40:03 +0000
@@ -18,8 +18,8 @@
#include <ndb_global.h>
#include <ndb_limits.h>
-#include "trp_client.hpp"
#include "TransporterFacade.hpp"
+#include "trp_client.hpp"
#include "ClusterMgr.hpp"
#include <IPCConfig.hpp>
#include <TransporterCallback.hpp>
@@ -1149,8 +1149,10 @@ public:
* boundaries to simplify reassembly in the kernel.
*/
int
-TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
- GenericSectionPtr ptr[3], Uint32 secs)
+TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal,
+ NodeId aNode,
+ const GenericSectionPtr ptr[3],
+ Uint32 secs)
{
unsigned i;
Uint32 totalSectionLength= 0;
@@ -1332,7 +1334,8 @@ TransporterFacade::sendFragmentedSignal(
int
TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
- LinearSectionPtr ptr[3], Uint32 secs)
+ const LinearSectionPtr ptr[3],
+ Uint32 secs)
{
/* Use the GenericSection variant of sendFragmentedSignal */
GenericSectionPtr tmpPtr[3];
@@ -1361,7 +1364,7 @@ TransporterFacade::sendFragmentedSignal(
int
TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode,
- LinearSectionPtr ptr[3], Uint32 secs){
+ const LinearSectionPtr ptr[3], Uint32 secs){
aSignal->m_noOfSections = secs;
if(getIsNodeSendable(aNode) == true){
#ifdef API_TRACE
@@ -1393,7 +1396,7 @@ TransporterFacade::sendSignal(NdbApiSign
int
TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode,
- GenericSectionPtr ptr[3], Uint32 secs){
+ const GenericSectionPtr ptr[3], Uint32 secs){
aSignal->m_noOfSections = secs;
if(getIsNodeSendable(aNode) == true){
#ifdef API_TRACE
=== modified file 'storage/ndb/src/ndbapi/TransporterFacade.hpp'
--- a/storage/ndb/src/ndbapi/TransporterFacade.hpp 2010-11-04 14:57:04 +0000
+++ b/storage/ndb/src/ndbapi/TransporterFacade.hpp 2010-11-09 20:40:03 +0000
@@ -81,28 +81,25 @@ public:
Uint32 get_active_ndb_objects() const;
// Only sends to nodes which are alive
+private:
int sendSignal(NdbApiSignal * signal, NodeId nodeId);
int sendSignal(NdbApiSignal*, NodeId,
- LinearSectionPtr ptr[3], Uint32 secs);
+ const LinearSectionPtr ptr[3], Uint32 secs);
int sendSignal(NdbApiSignal*, NodeId,
- GenericSectionPtr ptr[3], Uint32 secs);
+ const GenericSectionPtr ptr[3], Uint32 secs);
int sendFragmentedSignal(NdbApiSignal*, NodeId,
- LinearSectionPtr ptr[3], Uint32 secs);
+ const LinearSectionPtr ptr[3], Uint32 secs);
int sendFragmentedSignal(NdbApiSignal*, NodeId,
- GenericSectionPtr ptr[3], Uint32 secs);
+ const GenericSectionPtr ptr[3], Uint32 secs);
+public:
// Is node available for running transactions
+private:
bool get_node_alive(NodeId nodeId) const;
- bool get_node_stopping(NodeId nodeId) const;
- bool getIsDbNode(NodeId nodeId) const;
bool getIsNodeSendable(NodeId nodeId) const;
- Uint32 getNodeGrp(NodeId nodeId) const;
- Uint32 getNodeSequence(NodeId nodeId) const;
- Uint32 getNodeNdbVersion(NodeId nodeId) const;
- Uint32 getMinDbNodeVersion() const;
- // Is there space in sendBuffer to send messages
- bool check_send_size(Uint32 node_id, Uint32 send_size);
+public:
+ Uint32 getMinDbNodeVersion() const;
// My own processor id
NodeId ownId() const;
@@ -209,10 +206,8 @@ private:
friend class ArbitMgr;
friend class MgmtSrvr;
friend class SignalSender;
- friend class GrepPS;
- friend class ExtSender; ///< @todo Hack to be able to sendSignalUnCond
- friend class GrepSS;
friend class Ndb;
+ friend class Ndb_cluster_connection;
friend class Ndb_cluster_connection_impl;
friend class NdbTransaction;
friend class NdbDictInterface;
@@ -317,28 +312,6 @@ unsigned Ndb_cluster_connection_impl::ge
inline
bool
-TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size)
-{
- return true;
-}
-
-inline
-bool
-TransporterFacade::getIsDbNode(NodeId n) const {
- return
- theClusterMgr->getNodeInfo(n).defined &&
- theClusterMgr->getNodeInfo(n).m_info.m_type == NodeInfo::DB;
-}
-
-inline
-Uint32
-TransporterFacade::getNodeGrp(NodeId n) const {
- return theClusterMgr->getNodeInfo(n).m_state.nodeGroup;
-}
-
-
-inline
-bool
TransporterFacade::get_node_alive(NodeId n) const {
if (theClusterMgr)
{
@@ -355,15 +328,6 @@ TransporterFacade::hb_received(NodeId n)
inline
bool
-TransporterFacade::get_node_stopping(NodeId n) const {
- const trp_node & node = theClusterMgr->getNodeInfo(n);
- assert(node.m_info.getType() == NodeInfo::DB);
- return (!node.m_state.getSingleUserMode() &&
- node.m_state.startLevel >= NodeState::SL_STOPPING_1);
-}
-
-inline
-bool
TransporterFacade::getIsNodeSendable(NodeId n) const {
const trp_node & node = theClusterMgr->getNodeInfo(n);
const Uint32 startLevel = node.m_state.startLevel;
@@ -379,19 +343,6 @@ TransporterFacade::getIsNodeSendable(Nod
inline
Uint32
-TransporterFacade::getNodeSequence(NodeId n) const {
- return theClusterMgr->getNodeInfo(n).m_info.m_connectCount;
-}
-
-inline
-Uint32
-TransporterFacade::getNodeNdbVersion(NodeId n) const
-{
- return theClusterMgr->getNodeInfo(n).m_info.m_version;
-}
-
-inline
-Uint32
TransporterFacade::getMinDbNodeVersion() const
{
if (theClusterMgr)
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection.cpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2010-11-08 12:22:42 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection.cpp 2010-11-09 20:40:03 +0000
@@ -622,6 +622,17 @@ Ndb_cluster_connection_impl::init_nodes_
DBUG_RETURN(0);
}
+Uint32
+Ndb_cluster_connection_impl::get_db_nodes(Uint8 arr[MAX_NDB_NODES]) const
+{
+ Uint32 cnt = (Uint32)m_all_nodes.size();
+ assert(cnt < MAX_NDB_NODES);
+ const Node *nodes = m_all_nodes.getBase();
+ for (Uint32 i = 0; i<cnt; i++)
+ arr[i] = (Uint8)nodes[i].id;
+ return cnt;
+}
+
int
Ndb_cluster_connection_impl::configure(Uint32 nodeId,
const ndb_mgm_configuration &config)
=== modified file 'storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp'
--- a/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2010-11-03 11:56:56 +0000
+++ b/storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp 2010-11-09 20:40:03 +0000
@@ -94,6 +94,7 @@ private:
int configure(Uint32 nodeid, const ndb_mgm_configuration &config);
void connect_thread();
void set_name(const char *name);
+ Uint32 get_db_nodes(Uint8 nodesarray[MAX_NDB_NODES]) const;
int connect(int no_retries,
int retry_delay_in_seconds,
=== modified file 'storage/ndb/src/ndbapi/trp_client.hpp'
--- a/storage/ndb/src/ndbapi/trp_client.hpp 2010-10-13 06:15:20 +0000
+++ b/storage/ndb/src/ndbapi/trp_client.hpp 2010-11-09 20:40:03 +0000
@@ -24,6 +24,7 @@
struct trp_node;
class NdbApiSignal;
struct LinearSectionPtr;
+struct GenericSectionPtr;
class trp_client
{
@@ -46,7 +47,18 @@ public:
void forceSend(int val = 1);
+ int sendSignal(NdbApiSignal * signal, Uint32 nodeId);
+ int sendSignal(NdbApiSignal*, Uint32, const LinearSectionPtr p[3], Uint32 s);
+ int sendSignal(NdbApiSignal*, Uint32, const GenericSectionPtr p[3], Uint32 s);
+ int sendFragmentedSignal(NdbApiSignal*, Uint32,
+ const LinearSectionPtr ptr[3], Uint32 secs);
+ int sendFragmentedSignal(NdbApiSignal*, Uint32,
+ const GenericSectionPtr ptr[3], Uint32 secs);
+
const trp_node & getNodeInfo(Uint32 i) const;
+
+ void lock();
+ void unlock();
private:
Uint32 m_blockNo;
TransporterFacade * m_facade;
@@ -92,4 +104,63 @@ private:
class NdbWaiter *m_waiter;
};
+#include "TransporterFacade.hpp"
+
+inline
+void
+trp_client::lock()
+{
+ assert(m_poll.m_locked == false);
+ NdbMutex_Lock(m_facade->theMutexPtr);
+ m_poll.m_locked = true;
+}
+
+inline
+void
+trp_client::unlock()
+{
+ assert(m_poll.m_locked == true);
+ NdbMutex_Unlock(m_facade->theMutexPtr);
+ m_poll.m_locked = false;
+}
+
+inline
+int
+trp_client::sendSignal(NdbApiSignal * signal, Uint32 nodeId)
+{
+ return m_facade->sendSignal(signal, nodeId);
+}
+
+inline
+int
+trp_client::sendSignal(NdbApiSignal * signal, Uint32 nodeId,
+ const LinearSectionPtr ptr[3], Uint32 secs)
+{
+ return m_facade->sendSignal(signal, nodeId, ptr, secs);
+}
+
+inline
+int
+trp_client::sendSignal(NdbApiSignal * signal, Uint32 nodeId,
+ const GenericSectionPtr ptr[3], Uint32 secs)
+{
+ return m_facade->sendSignal(signal, nodeId, ptr, secs);
+}
+
+inline
+int
+trp_client::sendFragmentedSignal(NdbApiSignal * signal, Uint32 nodeId,
+ const LinearSectionPtr ptr[3], Uint32 secs)
+{
+ return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs);
+}
+
+inline
+int
+trp_client::sendFragmentedSignal(NdbApiSignal * signal, Uint32 nodeId,
+ const GenericSectionPtr ptr[3], Uint32 secs)
+{
+ return m_facade->sendFragmentedSignal(signal, nodeId, ptr, secs);
+}
+
#endif
=== modified file 'storage/ndb/tools/restore/Restore.cpp'
--- a/storage/ndb/tools/restore/Restore.cpp 2010-10-22 06:52:58 +0000
+++ b/storage/ndb/tools/restore/Restore.cpp 2010-11-09 20:40:03 +0000
@@ -28,6 +28,7 @@
#include <signaldata/DictTabInfo.hpp>
#include <ndb_limits.h>
#include <NdbAutoPtr.hpp>
+#include "../src/ndbapi/NdbDictionaryImpl.hpp"
#include <sys/stat.h>
=== modified file 'storage/ndb/tools/restore/Restore.hpp'
--- a/storage/ndb/tools/restore/Restore.hpp 2010-10-22 06:52:58 +0000
+++ b/storage/ndb/tools/restore/Restore.hpp 2010-11-09 20:40:03 +0000
@@ -24,9 +24,9 @@
#include <ndb_global.h>
#include <NdbOut.hpp>
#include "../src/kernel/blocks/backup/BackupFormat.hpp"
-#include "../src/ndbapi/NdbDictionaryImpl.hpp"
#include <NdbApi.hpp>
#include <util/azlib.h>
+#include <util/UtilBuffer.hpp>
#include <ndb_version.h>
#include <version.h>
=== modified file 'storage/ndb/tools/restore/consumer_restore.cpp'
--- a/storage/ndb/tools/restore/consumer_restore.cpp 2010-10-22 08:22:32 +0000
+++ b/storage/ndb/tools/restore/consumer_restore.cpp 2010-11-09 20:40:03 +0000
@@ -18,11 +18,13 @@
#include <NDBT_ReturnCodes.h>
#include "consumer_restore.hpp"
+#include <kernel/ndb_limits.h>
#include <my_sys.h>
#include <NdbSleep.h>
#include <ndb_internal.hpp>
#include <ndb_logevent.h>
+#include "../src/ndbapi/NdbDictionaryImpl.hpp"
#define NDB_ANYVALUE_FOR_NOLOGGING 0x8000007f
=== modified file 'storage/ndb/tools/restore/restore_main.cpp'
--- a/storage/ndb/tools/restore/restore_main.cpp 2010-10-22 06:52:58 +0000
+++ b/storage/ndb/tools/restore/restore_main.cpp 2010-11-09 20:40:03 +0000
@@ -28,6 +28,7 @@
#include "consumer_restore.hpp"
#include "consumer_printer.hpp"
+#include "../src/ndbapi/NdbDictionaryImpl.hpp"
extern FilteredNdbOut err;
extern FilteredNdbOut info;
No bundle (reason: revision is a merge).
| Thread |
|---|
| • bzr commit into mysql-5.1-telco-7.1 branch (jonas:3972) | jonas oreland | 10 Nov |