From: Ole John Aske Date: August 16 2011 1:20pm Subject: bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4455 to 4456) List-Archive: http://lists.mysql.com/commits/140652 Message-Id: <20110816132045.2DAD6218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4456 Ole John Aske 2011-08-16 SPJ: Refactoring of how & when the parent/child row correlation is handled: Previously the correlation id's was decoded, and the parent/child hashMaps built as part of execTRANSID_AI(). This work is now deferred until a complete batch of result rows has been received - and at this point it is handled by the thread which was waiting for the result batch instead of the receiver thread. Note: This patch is also a step in the direction of moving building of the correlation hashmap entirely outside transporter mutex protected code. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp 4455 Ole John Aske 2011-08-16 Cherry picked fix for Bug#12798270 into 'mysql-5.1-telco-7.0' as this branch also contained the (cherry picked) fix for Bug 11764737 which made this problem to surface. modified: mysql-test/r/group_by.result mysql-test/t/group_by.test sql/sql_select.cc === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 08:04:43 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 13:20:11 +0000 @@ -87,6 +87,32 @@ const bool traceSignals = false; * children. That way, results for child operations can be updated correctly * when the application iterates over the results of the root scan operation. */ +class TupleCorrelation +{ +public: + static const Uint32 wordCount = 1; + + explicit TupleCorrelation() + : m_correlation((tupleNotFound<<16) | tupleNotFound) + {} + + /** Conversion to/from Uint32 to store/fetch from buffers */ + explicit TupleCorrelation(Uint32 val) + : m_correlation(val) + {} + Uint32 toUint32() const + { return m_correlation; } + + Uint16 getTupleId() const + { return m_correlation & 0xffff;} + + Uint16 getParentTupleId() const + { return m_correlation >> 16;} + +private: + Uint32 m_correlation; +}; // class TupleCorrelation + class CorrelationData { public: @@ -99,18 +125,15 @@ public: assert(AttributeHeader(m_corrPart[0]).getAttributeId() == AttributeHeader::CORR_FACTOR64); assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32)); - assert(getTupleId()> 16;} + const TupleCorrelation getTupleCorrelation() const + { return TupleCorrelation(m_corrPart[1]); } private: const Uint32* const m_corrPart; @@ -166,6 +189,11 @@ public: */ void reset(); + /** + * Prepare for reading another batch of results. + */ + void prepareResultSet(); + void incrOutstandingResults(Int32 delta) { m_outstandingResults += delta; @@ -307,6 +335,9 @@ public: Uint32 getRowCount() const { return m_rowCount; } + char* getRow(Uint32 tupleNo) const + { return (m_buffer + (tupleNo*m_rowSize)); } + /** * Process an incomming tuple for this stream. Extract parent and own tuple * ids and pass it on to m_receiver. @@ -314,12 +345,15 @@ public: * @param ptr buffer holding tuple. * @param len buffer length. */ - void execTRANSID_AI(const Uint32 *ptr, Uint32 len); + void execTRANSID_AI(const Uint32 *ptr, Uint32 len, + TupleCorrelation correlation); - /** A complete batch has been received for a fragment on this NdbResultStream, - * Update whatever required before the appl. are allowed to navigate the result. + /** + * A complete batch has been received for a fragment on this NdbResultStream, + * Update whatever required before the appl. are allowed to navigate the result. + * @return true if node and all its siblings have returned all rows. */ - void handleBatchComplete(); + bool prepareResultSet(); /** * Navigate within the current result batch to resp. first and next row. @@ -364,12 +398,6 @@ public: return m_subScanComplete; } - /** Variant of isSubScanComplete() above which checks that this resultstream - * and all its descendants have consumed all batches of rows instantiated - * from their parent operation(s). - */ - bool isAllSubScansComplete() const; - bool isScanQuery() const { return (m_properties & Is_Scan_Query); } @@ -420,7 +448,7 @@ public: * that had no matching children.*/ Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild; - explicit TupleSet() + explicit TupleSet() : m_hash_head(tupleNotFound) {} private: @@ -452,9 +480,14 @@ private: /** The receiver object that unpacks transid_AI messages.*/ NdbReceiver m_receiver; + /** The buffers which we receive the results into */ + char* m_buffer; + /** Used for checking if buffer overrun occurred. */ Uint32* m_batchOverflowCheck; + Uint32 m_rowSize; + /** The number of transid_AI messages received.*/ Uint32 m_rowCount; @@ -489,11 +522,7 @@ private: /** TupleSet contains the correlation between parent/childs */ TupleSet* m_tupleSet; - void clearTupleSet(); - - void setParentChildMap(Uint16 parentId, - Uint16 tupleId, - Uint16 tupleNo); + void buildResultCorrelations(); Uint16 getTupleId(Uint16 tupleNo) const { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; } @@ -576,7 +605,9 @@ NdbResultStream::NdbResultStream(NdbQuer | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll ? Is_Inner_Join : 0))), m_receiver(operation.getQuery().getNdbTransaction().getNdb()), + m_buffer(NULL), m_batchOverflowCheck(NULL), + m_rowSize(0), m_rowCount(0), m_iterState(Iter_notStarted), m_currentRow(tupleNotFound), @@ -599,6 +630,8 @@ NdbResultStream::prepare() const Uint32 rowSize = m_operation.getRowSize(); NdbQueryImpl &query = m_operation.getQuery(); + m_rowSize = rowSize; + /* Parent / child correlation is only relevant for scan type queries * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups! * Neither is these structures required for operations not having respective @@ -610,16 +643,13 @@ NdbResultStream::prepare() m_tupleSet = new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) TupleSet[m_maxRows]; - - clearTupleSet(); } else m_maxRows = 1; const int bufferSize = rowSize * m_maxRows; NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc(); - char* const rowBuf = reinterpret_cast(bufferAlloc - .allocObjMem(bufferSize)); + m_buffer = reinterpret_cast(bufferAlloc.allocObjMem(bufferSize)); // So that we can test for buffer overrun. m_batchOverflowCheck = @@ -633,7 +663,7 @@ NdbResultStream::prepare() 0 /*key_size*/, 0 /*read_range_no*/, rowSize, - rowBuf); + m_buffer); } //NdbResultStream::prepare void @@ -646,7 +676,6 @@ NdbResultStream::reset() m_iterState = Iter_notStarted; m_currentRow = tupleNotFound; - clearTupleSet(); m_receiver.prepareSend(); /** * If this stream will get new rows in the next batch, then so will @@ -660,84 +689,10 @@ NdbResultStream::reset() } } //NdbResultStream::reset -void -NdbResultStream::clearTupleSet() -{ - assert (isScanQuery()); - for (Uint32 i=0; i