From: Ole John Aske Date: August 17 2011 1:28pm Subject: bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch (ole.john.aske:3540 to 3541) List-Archive: http://lists.mysql.com/commits/140682 Message-Id: <20110817132800.B4B70218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 3541 Ole John Aske 2011-08-17 [merge] Merge 70 -> 70-spj modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 3540 Ole John Aske 2011-08-17 [merge] Merge modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:09:14 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:27:04 +0000 @@ -187,9 +187,9 @@ public: { return m_fragNo; } /** - * Prepare for receiving another batch. + * Prepare for receiving another batch of results. */ - void reset(); + void prepareNextReceiveSet(); /** * Prepare for reading another batch of results. @@ -403,7 +403,7 @@ public: void prepare(); /** Prepare for receiving next batch of scan results. */ - void reset(); + void prepareNextReceiveSet(); NdbReceiver& getReceiver() { return m_receiver; } @@ -738,27 +738,6 @@ NdbResultStream::prepare() m_resultSets[m_recv].m_buffer); } //NdbResultStream::prepare -void -NdbResultStream::reset() -{ - assert (isScanQuery()); - - m_iterState = Iter_notStarted; - m_currentRow = tupleNotFound; - m_resultSets[m_recv].prepareReceive(m_receiver); - - /** - * If this stream will get new rows in the next batch, then so will - * all of its descendants. - */ - for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); - childNo++) - { - NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo); - m_rootFrag.getResultStream(child).reset(); - } -} //NdbResultStream::reset - /** Locate, and return 'tupleNo', of first tuple with specified parentId. * parentId == tupleNotFound is use as a special value for iterating results * from the root operation in the order which they was inserted by @@ -851,7 +830,6 @@ NdbResultStream::firstResult() return tupleNotFound; } //NdbResultStream::firstResult() - Uint16 NdbResultStream::nextResult() { @@ -867,7 +845,6 @@ NdbResultStream::nextResult() return tupleNotFound; } //NdbResultStream::nextResult() - /** * Callback when a TRANSID_AI signal (receive row) is processed. */ @@ -891,6 +868,32 @@ NdbResultStream::execTRANSID_AI(const Ui } // NdbResultStream::execTRANSID_AI() /** + * Make preparation for another batch of results to be received. + * This NdbResultStream, and all its sibling will receive a batch + * of results from the datanodes. + */ +void +NdbResultStream::prepareNextReceiveSet() +{ + assert (isScanQuery()); + + m_iterState = Iter_notStarted; + m_currentRow = tupleNotFound; + m_resultSets[m_recv].prepareReceive(m_receiver); + + /** + * If this stream will get new rows in the next batch, then so will + * all of its descendants. + */ + for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); + childNo++) + { + NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo); + m_rootFrag.getResultStream(child).prepareNextReceiveSet(); + } +} //NdbResultStream::prepareNextReceiveSet + +/** * Make preparations for another batch of result to be read: * - Fill in parent/child result correlations in m_tupleSet[] * - ... or reset m_tupleSet[] if we reuse the previous. @@ -911,7 +914,8 @@ NdbResultStream::prepareResultSet(Uint32 /** * Prepare NdbResultSet for reading - either the next received - * from datanodes or reuse current. + * from datanodes or reuse the last as has been determined by + * ::prepareNextReceiveSet() */ if (m_tupleSet!=NULL) { @@ -1210,7 +1214,7 @@ bool NdbRootFragment::hasReceivedMore() return (m_availResultSets > 0); } -void NdbRootFragment::reset() +void NdbRootFragment::prepareNextReceiveSet() { assert(m_fragNo!=voidFragNo); assert(m_outstandingResults == 0); @@ -1218,13 +1222,14 @@ void NdbRootFragment::reset() for (unsigned opNo=0; opNogetNoOfOperations(); opNo++) { - if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans)) + NdbResultStream& resultStream = getResultStream(opNo); + if (!resultStream.isSubScanComplete(m_remainingScans)) { /** - * Reset m_resultStreams[] and all its descendants, since all these + * Reset resultStream and all its descendants, since all these * streams will get a new set of rows in the next batch. */ - m_resultStreams[opNo].reset(); + resultStream.prepareNextReceiveSet(); } } m_confReceived = false; @@ -3194,7 +3199,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm NdbRootFragment* rootFrag = rootFrags[i]; assert(rootFrag->isFragBatchComplete()); assert(!rootFrag->finalBatchReceived()); - rootFrag->reset(); + rootFrag->prepareNextReceiveSet(); } Ndb& ndb = *getNdbTransaction().getNdb(); @@ -3383,22 +3388,27 @@ int NdbQueryImpl::isPrunable(bool& pruna NdbQueryImpl::OrderedFragSet::OrderedFragSet(): m_capacity(0), m_activeFragCount(0), - m_emptiedFragCount(0), + m_fetchMoreFragCount(0), m_finalFragCount(0), m_ordering(NdbQueryOptions::ScanOrdering_void), m_keyRecord(NULL), m_resultRecord(NULL), m_activeFrags(NULL), - m_emptiedFrags(NULL) + m_fetchMoreFrags(NULL) { } NdbQueryImpl::OrderedFragSet::~OrderedFragSet() { m_activeFrags = NULL; - m_emptiedFrags= NULL; + m_fetchMoreFrags = NULL; } +void NdbQueryImpl::OrderedFragSet::clear() +{ + m_activeFragCount = 0; + m_fetchMoreFragCount = 0; +} void NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator, @@ -3419,9 +3429,9 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd reinterpret_cast(allocator.allocObjMem(capacity)); bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*)); - m_emptiedFrags = + m_fetchMoreFrags = reinterpret_cast(allocator.allocObjMem(capacity)); - bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*)); + bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*)); } m_ordering = ordering; m_keyRecord = keyRecord; @@ -3486,10 +3496,10 @@ NdbQueryImpl::OrderedFragSet::reorganize } else { - m_emptiedFrags[m_emptiedFragCount++] = frag; + m_fetchMoreFrags[m_fetchMoreFragCount++] = frag; } m_activeFragCount--; - assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount + assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount <= m_capacity); return; // Remaining m_activeFrags[] are sorted @@ -3539,7 +3549,7 @@ NdbQueryImpl::OrderedFragSet::reorganize } assert(verifySortOrder()); } - assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount + assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount <= m_capacity); } // OrderedFragSet::reorganize() @@ -3565,23 +3575,16 @@ NdbQueryImpl::OrderedFragSet::prepareMor } } // for all 'rootFrags[]' - assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount + assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount <= m_capacity); } // OrderedFragSet::prepareMoreResults() -void NdbQueryImpl::OrderedFragSet::clear() -{ - m_activeFragCount = 0; - m_emptiedFragCount = 0; - m_finalFragCount = 0; -} - Uint32 NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags) { - const Uint32 cnt = m_emptiedFragCount; - frags = m_emptiedFrags; - m_emptiedFragCount = 0; + const int cnt = m_fetchMoreFragCount; + frags = m_fetchMoreFrags; + m_fetchMoreFragCount = 0; return cnt; } === modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 12:06:04 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 13:16:22 +0000 @@ -333,14 +333,16 @@ private: private: - /** Max no of fragments.*/ + /** No of fragments to read until '::finalBatchReceived()'.*/ int m_capacity; /** Number of fragments in 'm_activeFrags'.*/ int m_activeFragCount; - /** Number of fragments in 'm_emptiedFrags'. */ - int m_emptiedFragCount; - /** Number of fragments where the final batch has been received - * and consumed.*/ + /** Number of fragments in 'm_fetchMoreFrags'. */ + int m_fetchMoreFragCount; + /** + * Number of fragments where the final batch has been received + * and consumed. + */ int m_finalFragCount; /** Ordering of index scan result.*/ NdbQueryOptions::ScanOrdering m_ordering; @@ -348,15 +350,18 @@ private: const NdbRecord* m_keyRecord; /** Needed for comparing records when ordering results.*/ const NdbRecord* m_resultRecord; - /** Fragments where some tuples in the current batch has not yet been - * consumed.*/ + /** + * Fragments where some tuples in the current ResultSet has not + * yet been consumed. + */ NdbRootFragment** m_activeFrags; - /** Fragments where all tuples in the current batch have been consumed, - * but where there are more batches to fetch.*/ - NdbRootFragment** m_emptiedFrags; - // No copying. - OrderedFragSet(const OrderedFragSet&); - OrderedFragSet& operator=(const OrderedFragSet&); + /** + * Fragments from which we should request more ResultSets. + * Either due to the current batch has been consumed, or double buffering + * of result sets allows us to request another batch before the current + * has been consumed. + */ + NdbRootFragment** m_fetchMoreFrags; /** Add a complete fragment that has been received.*/ void add(NdbRootFragment& frag); @@ -369,6 +374,10 @@ private: /** For debugging purposes.*/ bool verifySortOrder() const; + + // No copying. + OrderedFragSet(const OrderedFragSet&); + OrderedFragSet& operator=(const OrderedFragSet&); }; // class OrderedFragSet /** The interface that is visible to the application developer.*/ No bundle (reason: useless for push emails).