From: Ole John Aske Date: August 17 2011 12:06pm Subject: bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4461 to 4462) List-Archive: http://lists.mysql.com/commits/140672 Message-Id: <20110817120643.35897218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4462 Ole John Aske 2011-08-17 Refactoring as part of preparing SPJ results to be double (multi...?) buffered. 'class SharedFragStack' which was intended as a container for fragments which has received a complete batch of rows is removed. It is replaced with a set of (mutex protected) methods in 'class NdbRootFragment' and OrderedFragSet::prepareMoreResults() The rational for this refactoring is that the SharedFragStack container wasn't a particular good fit to represent the state when *multiple* batches of (double buffered) result set may be available. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 4461 Jonas Oreland 2011-08-17 [merge] ndb - merge wl4124-new2 removed: mysql-test/suite/ndb/r/ndb_statistics.result mysql-test/suite/ndb/t/ndb_statistics.test added: mysql-test/suite/ndb/r/ndb_index_stat.result mysql-test/suite/ndb/r/ndb_statistics0.result mysql-test/suite/ndb/r/ndb_statistics1.result mysql-test/suite/ndb/t/ndb_index_stat.test mysql-test/suite/ndb/t/ndb_index_stat_enable.inc mysql-test/suite/ndb/t/ndb_statistics.inc mysql-test/suite/ndb/t/ndb_statistics0.test mysql-test/suite/ndb/t/ndb_statistics1.test modified: mysql-test/suite/ndb/r/ndb_restore_misc.result mysql-test/suite/ndb/t/ndb_restore_misc.test sql/ha_ndb_index_stat.cc sql/ha_ndb_index_stat.h sql/ha_ndbcluster.cc sql/ha_ndbcluster.h storage/ndb/include/ndb_constants.h storage/ndb/include/ndbapi/NdbIndexStat.hpp storage/ndb/include/util/NdbPack.hpp storage/ndb/src/common/util/NdbPack.cpp storage/ndb/src/ndbapi/NdbIndexStat.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.cpp storage/ndb/src/ndbapi/NdbIndexStatImpl.hpp storage/ndb/test/ndbapi/testIndexStat.cpp storage/ndb/tools/ndb_index_stat.cpp === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 08:10:27 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:06:04 +0000 @@ -181,6 +181,8 @@ public: */ void init(NdbQueryImpl& query, Uint32 fragNo); + static void clear(NdbRootFragment* frags, Uint32 noOfFrags); + Uint32 getFragNo() const { return m_fragNo; } @@ -192,7 +194,11 @@ public: /** * Prepare for reading another batch of results. */ - void prepareResultSet(); + void grabNextResultSet(); // Need mutex lock + + bool hasReceivedMore() const; // Need mutex lock + + void setReceivedMore(); // Need mutex lock void incrOutstandingResults(Int32 delta) { @@ -273,12 +279,19 @@ private: NdbResultStream* m_resultStreams; /** - * The number of outstanding TCKEYREF or TRANSID_AI - * messages for the fragment. This includes both messages related to the + * Number of available prefetched ResultSets which are completely + * received. Will be made available for reading by calling + * ::grabNextResultSet() + */ + Uint32 m_availResultSets; // Need mutex + + /** + * The number of outstanding TCKEYREF or TRANSID_AI messages to receive + * for the fragment. This includes both messages related to the * root operation and any descendant operation that was instantiated as * a consequence of tuples found by the root operation. - * This number may temporarily be negative if e.g. TRANSID_AI arrives before - * SCAN_TABCONF. + * This number may temporarily be negative if e.g. TRANSID_AI arrives + * before SCAN_TABCONF. */ Int32 m_outstandingResults; @@ -287,7 +300,8 @@ private: * operation accesses (i.e. one for a lookup, all for a table scan). * * Each element is true iff a SCAN_TABCONF (for that fragment) or - * TCKEYCONF message has been received */ + * TCKEYCONF message has been received + */ bool m_confReceived; /** @@ -1034,6 +1048,7 @@ NdbRootFragment::NdbRootFragment(): m_query(NULL), m_fragNo(voidFragNo), m_resultStreams(NULL), + m_availResultSets(0), m_outstandingResults(0), m_confReceived(false), m_remainingScans(0), @@ -1094,6 +1109,45 @@ NdbRootFragment::getResultStream(Uint32 return m_resultStreams[operationNo]; } +/** + * Throw any pending ResultSets from specified rootFrags[] + */ +//static +void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags) +{ + if (rootFrags != NULL) + { + for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++) + { + rootFrags[fragNo].m_availResultSets = 0; + } + } +} + +/** + * Signal that another complete ResultSet is available for + * this NdbRootFragment. + * Need mutex lock as 'm_availResultSets' is accesed both from + * receiver and application thread. + */ +void NdbRootFragment::setReceivedMore() +{ + assert(m_availResultSets==0); + m_availResultSets++; +} + +/** + * Check if another ResultSets has been received and is available + * for reading. It will be given to the application thread when it + * call ::grabNextResultSet(). + * Need mutex lock as 'm_availResultSets' is accesed both from + * receiver and application thread. + */ +bool NdbRootFragment::hasReceivedMore() const +{ + return (m_availResultSets > 0); +} + void NdbRootFragment::reset() { assert(m_fragNo!=voidFragNo); @@ -1114,8 +1168,17 @@ void NdbRootFragment::reset() m_confReceived = false; } -void NdbRootFragment::prepareResultSet() +/** + * Let the application thread takes ownership of an available + * ResultSet, prepare it for reading first row. + * Need mutex lock as 'm_availResultSets' is accesed both from + * receiver and application thread. + */ +void NdbRootFragment::grabNextResultSet() { + assert(m_availResultSets>0); + m_availResultSets--; + NdbResultStream& rootStream = getResultStream(0); rootStream.prepareResultSet(m_remainingScans); @@ -1137,7 +1200,7 @@ void NdbRootFragment::setConfReceived(Ui bool NdbRootFragment::finalBatchReceived() const { - return getReceiverTcPtrI()==RNIL; + return m_confReceived && getReceiverTcPtrI()==RNIL; } bool NdbRootFragment::isEmpty() const @@ -1587,6 +1650,7 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio m_next(NULL), m_queryDef(&queryDef), m_error(), + m_errorReceived(0), m_transaction(trans), m_scanTransaction(NULL), m_operations(0), @@ -1596,7 +1660,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio m_rootFragCount(0), m_rootFrags(NULL), m_applFrags(), - m_fullFrags(), m_finalBatchFrags(0), m_num_bounds(0), m_shortestBound(0xffffffff), @@ -2060,10 +2123,9 @@ NdbQueryImpl::nextResult(bool fetchAllow NdbQuery::NextResultOutcome NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend) { - /* To minimize lock contention, each query has two separate root fragment - * conatiners (m_fullFrags and m_applFrags). m_applFrags is only - * accessed by the application thread, so it is safe to use it without - * locks. + /* To minimize lock contention, each query has the separate root fragment + * conatiner 'm_applFrags'. m_applFrags is only accessed by the application + * thread, so it is safe to use it without locks. */ while (m_state != EndOfData) // Or likely: return when 'gotRow' or error { @@ -2179,23 +2241,17 @@ NdbQueryImpl::awaitMoreResults(bool forc */ while (likely(!hasReceivedError())) { - /* m_fullFrags contains any fragments that are complete (for this batch) - * but have not yet been moved (under mutex protection) to - * m_applFrags. + /* Scan m_rootFrags (under mutex protection) for fragments + * which has received a complete batch. Add these to m_applFrags. */ - NdbRootFragment* frag; - while ((frag=m_fullFrags.pop()) != NULL) - { - frag->prepareResultSet(); - m_applFrags.add(*frag); - } + m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount); if (m_applFrags.getCurrent() != NULL) { return FetchResult_ok; } - /* There are noe more available frament results available without - * first waiting for more to be received from datanodes + /* There are no more available fragment results available without + * first waiting for more to be received from the datanodes */ if (m_pendingFrags == 0) { @@ -2236,16 +2292,9 @@ NdbQueryImpl::awaitMoreResults(bool forc /* The root operation is a lookup. Lookups are guaranteed to be complete * before NdbTransaction::execute() returns. Therefore we do not set * the lock, because we know that the signal receiver thread will not - * be accessing m_fullFrags at this time. + * be accessing m_rootFrags at this time. */ - NdbRootFragment* frag; - if ((frag=m_fullFrags.pop()) != NULL) - { - frag->prepareResultSet(); - m_applFrags.add(*frag); - } - assert(m_fullFrags.pop()==NULL); // Only one stream for lookups. - + m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount); if (m_applFrags.getCurrent() != NULL) { return FetchResult_ok; @@ -2266,8 +2315,8 @@ NdbQueryImpl::awaitMoreResults(bool forc /* ::handleBatchComplete() is intended to be called when receiving signals only. - The PollGuard mutex is then set and the shared 'm_pendingFrags', - 'm_finalBatchFrags' and 'm_fullFrags' can safely be updated. + The PollGuard mutex is then set and the shared 'm_pendingFrags' and + 'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled. returns: 'true' when application thread should be resumed. */ @@ -2286,7 +2335,7 @@ NdbQueryImpl::handleBatchComplete(NdbRoo * terminated the scan. We are about to close this query, * and didn't expect any more data - ignore it! */ - if (likely(m_fullFrags.m_errorCode == 0)) + if (likely(m_errorReceived == 0)) { assert(rootFrag.isFragBatchComplete()); @@ -2300,10 +2349,10 @@ NdbQueryImpl::handleBatchComplete(NdbRoo assert(m_finalBatchFrags <= m_rootFragCount); } - /* When application thread ::awaitMoreResults() it will later be moved - * from m_fullFrags to m_applFrags under mutex protection. + /* When application thread ::awaitMoreResults() it will later be + * added to m_applFrags under mutex protection. */ - m_fullFrags.push(rootFrag); + rootFrag.setReceivedMore(); return true; } @@ -2327,7 +2376,7 @@ NdbQueryImpl::close(bool forceSend) } // Throw any pending results - m_fullFrags.clear(); + NdbRootFragment::clear(m_rootFrags,m_rootFragCount); m_applFrags.clear(); Ndb* const ndb = m_transaction.getNdb(); @@ -2423,7 +2472,7 @@ NdbQueryImpl::setFetchTerminated(int err } if (errorCode!=0) { - m_fullFrags.m_errorCode = errorCode; + m_errorReceived = errorCode; } m_pendingFrags = 0; } // NdbQueryImpl::setFetchTerminated() @@ -2436,9 +2485,9 @@ NdbQueryImpl::setFetchTerminated(int err bool NdbQueryImpl::hasReceivedError() { - if (unlikely(m_fullFrags.m_errorCode)) + if (unlikely(m_errorReceived)) { - setErrorCode(m_fullFrags.m_errorCode); + setErrorCode(m_errorReceived); return true; } return false; @@ -2543,8 +2592,7 @@ NdbQueryImpl::prepareSend() } // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects. error = m_pointerAlloc.init(m_rootFragCount * - (SharedFragStack::pointersPerFragment + - OrderedFragSet::pointersPerFragment)); + (OrderedFragSet::pointersPerFragment)); if (error != 0) { setErrorCode(error); @@ -2631,21 +2679,17 @@ NdbQueryImpl::prepareSend() keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord(); assert(keyRec!=NULL); } - if (unlikely((error = m_applFrags.prepare(m_pointerAlloc, - getRoot().getOrdering(), - m_rootFragCount, - keyRec, - getRoot().m_ndbRecord)) != 0) - || (error = m_fullFrags.prepare(m_pointerAlloc, - m_rootFragCount)) != 0) { - setErrorCode(error); - return -1; - } + m_applFrags.prepare(m_pointerAlloc, + getRoot().getOrdering(), + m_rootFragCount, + keyRec, + getRoot().m_ndbRecord); if (getQueryDef().isScanQuery()) { NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount); } + #ifdef TRACE_SERIALIZATION ndbout << "Serialized ATTRINFO : "; for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){ @@ -3183,8 +3227,8 @@ NdbQueryImpl::closeTcCursor(bool forceSe } // while assert(m_pendingFrags==0); - m_fullFrags.clear(); // Throw any unhandled results - m_fullFrags.m_errorCode = 0; // Clear errors caused by previous fetching + NdbRootFragment::clear(m_rootFrags,m_rootFragCount); + m_errorReceived = 0; // Clear errors caused by previous fetching m_error.code = 0; if (m_finalBatchFrags < getRootFragCount()) // TC has an open scan cursor. @@ -3271,39 +3315,6 @@ int NdbQueryImpl::isPrunable(bool& pruna /**************** - * NdbQueryImpl::SharedFragStack methods. - ***************/ - -NdbQueryImpl::SharedFragStack::SharedFragStack(): - m_errorCode(0), - m_capacity(0), - m_current(-1), - m_array(NULL) -{} - -int -NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator, - int capacity) -{ - assert(m_array==NULL); - assert(m_capacity==0); - if (capacity > 0) - { m_capacity = capacity; - m_array = - reinterpret_cast(allocator.allocObjMem(capacity)); - } - return 0; -} - -void -NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag) -{ - m_current++; - assert(m_current(allocator.allocObjMem(capacity)); bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*)); + m_emptiedFrags = reinterpret_cast(allocator.allocObjMem(capacity)); bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*)); @@ -3352,7 +3364,6 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd m_ordering = ordering; m_keyRecord = keyRecord; m_resultRecord = resultRecord; - return 0; } // OrderedFragSet::prepare() @@ -3479,6 +3490,23 @@ NdbQueryImpl::OrderedFragSet::add(NdbRoo reorganize(); // Move into position } // OrderedFragSet::add() +void +NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt) +{ + for (Uint32 fragNo = 0; fragNo < cnt; fragNo++) + { + NdbRootFragment& rootFrag = rootFrags[fragNo]; + if (rootFrag.hasReceivedMore()) // Another ResultSet is available + { + rootFrag.grabNextResultSet(); // Get new ResultSet. + add(rootFrag); // Make avail. to appl. thread + } + } // for all 'rootFrags[]' + + assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount + <= m_capacity); +} // OrderedFragSet::prepareMoreResults() + void NdbQueryImpl::OrderedFragSet::clear() { m_activeFragCount = 0; === modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 08:10:27 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 2011-08-17 12:06:04 +0000 @@ -271,58 +271,13 @@ private: FetchResult_noMoreCache = 2 }; - /** A stack of NdbRootFragment pointers. - * NdbRootFragments which are 'BatchComplete' are pushed on this stack by - * the receiver thread, and later pop'ed into the application thread when - * it need more results to process. - * Due to this shared usage, the PollGuard mutex must be set before - * accessing SharedFragStack. + /** + * Container of root fragments that the application is currently + * iterating over. 'Owned' by application thread and can be accesed + * without requiring a mutex lock. + * RootFragments are appended to a OrderedFragSet by ::prepareMoreResults() + * */ - class SharedFragStack{ - public: - // For calculating need for dynamically allocated memory. - static const Uint32 pointersPerFragment = 1; - - explicit SharedFragStack(); - - /** - * Prepare internal datastructures. - * param[in] allocator For allocating arrays of pointers. - * param[in] capacity Max no of root fragments. - * @return 0 if ok, else errorcode - */ - int prepare(NdbBulkAllocator& allocator, int capacity); - - NdbRootFragment* pop() { - return m_current>=0 ? m_array[m_current--] : NULL; - } - - void push(NdbRootFragment& frag); - - int size() const { - return (m_current+1); - } - - void clear() { - m_current = -1; - } - - /** Possible error received from TC / datanodes. */ - int m_errorCode; - - private: - /** Capacity of stack.*/ - int m_capacity; - - /** Index of current top of stack.*/ - int m_current; - NdbRootFragment** m_array; - - // No copying. - SharedFragStack(const SharedFragStack&); - SharedFragStack& operator=(const SharedFragStack&); - }; // class SharedFragStack - class OrderedFragSet{ public: // For calculating need for dynamically allocated memory. @@ -339,11 +294,20 @@ private: * param[in] capacity Max no of root fragments. * @return 0 if ok, else errorcode */ - int prepare(NdbBulkAllocator& allocator, - NdbQueryOptions::ScanOrdering ordering, - int capacity, - const NdbRecord* keyRecord, - const NdbRecord* resultRecord); + void prepare(NdbBulkAllocator& allocator, + NdbQueryOptions::ScanOrdering ordering, + int capacity, + const NdbRecord* keyRecord, + const NdbRecord* resultRecord); + + /** + * Add root fragments with completed ResultSets to this OrderedFragSet. + * The PollGuard mutex must locked, and under its protection + * completed root fragments are 'consumed' from rootFrags[] and + * added to OrderedFragSet where it become available for the + * application thread. + */ + void prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt); // Need mutex lock /** Get the root fragment from which to read the next row.*/ NdbRootFragment* getCurrent() const; @@ -355,9 +319,6 @@ private: */ void reorganize(); - /** Add a complete fragment that has been received.*/ - void add(NdbRootFragment& frag); - /** Reset object to an empty state.*/ void clear(); @@ -397,6 +358,9 @@ private: OrderedFragSet(const OrderedFragSet&); OrderedFragSet& operator=(const OrderedFragSet&); + /** Add a complete fragment that has been received.*/ + void add(NdbRootFragment& frag); + /** For sorting fragment reads according to index value of first record. * Also f1f2, 0 if f1==f2, -1 if f1