From: Ole John Aske Date: August 15 2011 12:39pm Subject: bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (ole.john.aske:3530 to 3531) List-Archive: http://lists.mysql.com/commits/140639 Message-Id: <20110815123933.014AB218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3531 Ole John Aske 2011-08-15 [merge] Merge modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp 3530 Ole John Aske 2011-08-15 [merge] merge modified: mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf storage/ndb/src/ndbapi/NdbQueryOperation.cpp === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-15 12:27:33 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-15 12:38:54 +0000 @@ -201,6 +201,9 @@ public: NdbResultStream& getResultStream(Uint32 operationNo) const { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); } + Uint32 getReceiverId() const; + Uint32 getReceiverTcPtrI() const; + /** * @return True if there are no more batches to be received for this fragment. */ @@ -876,8 +879,7 @@ void NdbRootFragment::buildReciverIdMap( { for(Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++) { - const Uint32 receiverId = - frags[fragNo].getResultStream(0).getReceiver().getId(); + const Uint32 receiverId = frags[fragNo].getReceiverId(); /** * For reasons unknow, NdbObjectIdMap shifts ids two bits to the left, * so we must do the opposite to get a good hash distribution. @@ -890,6 +892,7 @@ void NdbRootFragment::buildReciverIdMap( } } +//static NdbRootFragment* NdbRootFragment::receiverIdLookup(NdbRootFragment* frags, Uint32 noOfFrags, @@ -903,9 +906,7 @@ NdbRootFragment::receiverIdLookup(NdbRoo const int hash = (receiverId >> 2) % noOfFrags; int current = frags[hash].m_idMapHead; assert(current < static_cast(noOfFrags)); - while (current >= 0 && - frags[current].getResultStream(0).getReceiver().getId() - != receiverId) + while (current >= 0 && frags[current].getReceiverId() != receiverId) { current = frags[current].m_idMapNext; assert(current < static_cast(noOfFrags)); @@ -958,14 +959,31 @@ void NdbRootFragment::setConfReceived() bool NdbRootFragment::finalBatchReceived() const { - return getResultStream(0).getReceiver().m_tcPtrI==RNIL; + return getReceiverTcPtrI()==RNIL; } -bool NdbRootFragment::isEmpty() const +bool NdbRootFragment::isEmpty() const { return getResultStream(0).isEmpty(); } +/** + * SPJ requests are identified by the receiver-id of the + * *root* ResultStream for each RootFragment. Furthermore + * a NEXTREQ use the tcPtrI saved in this ResultStream to + * identify the 'cursor' to restart. + * + * We provide some convenient accessors for fetching this info + */ +Uint32 NdbRootFragment::getReceiverId() const +{ + return getResultStream(0).getReceiver().getId(); +} + +Uint32 NdbRootFragment::getReceiverTcPtrI() const +{ + return getResultStream(0).getReceiver().m_tcPtrI; +} /////////////////////////////////////////// ///////// NdbQuery API methods /////////// @@ -2494,11 +2512,13 @@ class InitialReceiverIdIterator: public { public: - InitialReceiverIdIterator(const NdbQueryImpl& query) - :m_query(query), + InitialReceiverIdIterator(NdbRootFragment rootFrags[], + Uint32 cnt) + :m_rootFrags(rootFrags), + m_fragCount(cnt), m_currFragNo(0) {} - + virtual ~InitialReceiverIdIterator() {}; /** @@ -2519,8 +2539,11 @@ private: * improving efficiency. */ static const Uint32 bufSize = 16; - /** The query with the scan root operation that we list receiver ids for.*/ - const NdbQueryImpl& m_query; + + /** Set of root fragments which we want to itterate receiver ids for.*/ + NdbRootFragment* m_rootFrags; + const Uint32 m_fragCount; + /** The next fragment numnber to be processed. (Range for 0 to no of * fragments.)*/ Uint32 m_currFragNo; @@ -2530,25 +2553,25 @@ private: const Uint32* InitialReceiverIdIterator::getNextWords(Uint32& sz) { - sz = 0; /** * For the initial batch, we want to retrieve one batch for each fragment * whether it is a sorted scan or not. */ - if (m_currFragNo >= m_query.getRootFragCount()) + if (m_currFragNo >= m_fragCount) { + sz = 0; return NULL; } else { - const NdbQueryOperationImpl& root = m_query.getQueryOperation(0U); - while (sz < bufSize && - m_currFragNo < m_query.getRootFragCount()) + Uint32 cnt = 0; + while (cnt < bufSize && m_currFragNo < m_fragCount) { - m_receiverIds[sz] = root.getReceiver(m_currFragNo).getId(); - sz++; + m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId(); + cnt++; m_currFragNo++; } + sz = cnt; return m_receiverIds; } } @@ -2690,7 +2713,7 @@ NdbQueryImpl::doSend(int nodeId, bool la * Section 2 : Optional KEYINFO section */ GenericSectionPtr secs[3]; - InitialReceiverIdIterator receiverIdIter(*this); + InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount); LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize()); LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize()); @@ -2868,8 +2891,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm scanNextReq->transId2 = (Uint32) (transId >> 32); tSignal.setLength(ScanNextReq::SignalLength); - const uint32 receiverId = - emptyFrag.getResultStream(0).getReceiver().m_tcPtrI; + const uint32 receiverId = emptyFrag.getReceiverTcPtrI(); LinearSectionIterator receiverIdIter(&receiverId ,1); GenericSectionPtr secs[1]; No bundle (reason: useless for push emails).