From: Ole John Aske Date: August 17 2011 1:03pm Subject: bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4464 to 4465) List-Archive: http://lists.mysql.com/commits/140677 Message-Id: <20110817130358.B45C3218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4465 Ole John Aske 2011-08-17 SPJ: Introduce the helper class NdbResultSet which manage the receive/read buffer for class NdbResultStream. The intention is to later change class NdbResultStream to have multiple NdbResultSets -> double buffered results. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp 4464 Ole John Aske 2011-08-17 SPJ extensions:In preparation for implementing handling of double buffered result sets in class NdbReceiver, extend NdbReceiver with two new methods to resp. set buffer positions for receiving and reading result rows into/from the NdbReceiver. NOTE: Different receive and read position did already exists as part of the NdbReceiver - This patch only extend the interface to be able to set it from the outside. modified: storage/ndb/include/ndbapi/NdbReceiver.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbReceiver.cpp === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 12:36:56 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-17 13:03:23 +0000 @@ -325,6 +325,55 @@ private: int m_idMapNext; }; //NdbRootFragment +/** + * 'class NdbResultSet' is a helper for 'class NdbResultStream'. + * It manages the buffers which rows are received into and + * read from. + */ +class NdbResultSet +{ + friend class NdbResultStream; + +public: + explicit NdbResultSet(); + + void init(NdbQueryImpl& query, + Uint32 maxRows, Uint32 rowSize); + + void prepareReceive(NdbReceiver& receiver) + { + m_rowCount = 0; + receiver.prepareReceive(m_buffer); + } + + void prepareRead(NdbReceiver& receiver) + { + receiver.prepareRead(m_buffer,m_rowCount); + } + + Uint32 getRowCount() const + { return m_rowCount; } + + char* getRow(Uint32 tupleNo) const + { return (m_buffer + (tupleNo*m_rowSize)); } + +private: + /** No copying.*/ + NdbResultSet(const NdbResultSet&); + NdbResultSet& operator=(const NdbResultSet&); + + /** The buffers which we receive the results into */ + char* m_buffer; + + /** Used for checking if buffer overrun occurred. */ + Uint32* m_batchOverflowCheck; + + Uint32 m_rowSize; + + /** The current #rows in 'm_buffer'.*/ + Uint32 m_rowCount; + +}; // class NdbResultSet /** * This class manages the subset of result data for one operation that is @@ -362,11 +411,8 @@ public: const NdbReceiver& getReceiver() const { return m_receiver; } - Uint32 getRowCount() const - { return m_rowCount; } - - char* getRow(Uint32 tupleNo) const - { return (m_buffer + (tupleNo*m_rowSize)); } + const char* getCurrentRow() const + { return m_receiver.peek_row(); } /** * Process an incomming tuple for this stream. Extract parent and own tuple @@ -386,9 +432,9 @@ public: bool prepareResultSet(Uint32 remainingScans); /** - * Navigate within the current result batch to resp. first and next row. + * Navigate within the current ResultSet to resp. first and next row. * For non-parent operations in the pushed query, navigation is with respect - * to any preceding parents which results in this NdbResultStream depends on. + * to any preceding parents which results in this ResultSet depends on. * Returns either the tupleNo within TupleSet[] which we navigated to, or * tupleNotFound(). */ @@ -504,25 +550,22 @@ private: /** The receiver object that unpacks transid_AI messages.*/ NdbReceiver m_receiver; - /** The buffers which we receive the results into */ - char* m_buffer; - - /** Used for checking if buffer overrun occurred. */ - Uint32* m_batchOverflowCheck; - - Uint32 m_rowSize; - - /** The number of transid_AI messages received.*/ - Uint32 m_rowCount; + /** + * ResultSets are received into and read from this stream, + * intended to be extended into double buffered ResultSet later. + */ + NdbResultSet m_resultSets[1]; + Uint32 m_read; // We read from m_resultSets[m_read] + Uint32 m_recv; // We receive into m_resultSets[m_recv] /** This is the state of the iterator used by firstResult(), nextResult().*/ enum { /** The first row has not been fetched yet.*/ Iter_notStarted, - /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/ + /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/ Iter_started, - /** Last row for current batch has been returned.*/ + /** Last row for current ResultSet has been returned.*/ Iter_finished } m_iterState; @@ -600,6 +643,34 @@ void* NdbBulkAllocator::allocObjMem(Uint return m_nextObjNo > m_maxObjs ? NULL : result; } +/////////////////////////////////////////// +///////// NdbResultSet methods /////////// +/////////////////////////////////////////// +NdbResultSet::NdbResultSet() : + m_buffer(NULL), + m_batchOverflowCheck(NULL), + m_rowSize(0), + m_rowCount(0) +{} + +void +NdbResultSet::init(NdbQueryImpl& query, + Uint32 maxRows, + Uint32 rowSize) +{ + m_rowSize = rowSize; + { + const int bufferSize = rowSize * maxRows; + NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc(); + m_buffer = reinterpret_cast(bufferAlloc.allocObjMem(bufferSize)); + + // So that we can test for buffer overrun. + m_batchOverflowCheck = + reinterpret_cast(bufferAlloc.allocObjMem(sizeof(Uint32))); + *m_batchOverflowCheck = 0xacbd1234; + } +} + ////////////////////////////////////////////// ///////// NdbResultStream methods /////////// ////////////////////////////////////////////// @@ -621,10 +692,7 @@ NdbResultStream::NdbResultStream(NdbQuer | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll ? Is_Inner_Join : 0))), m_receiver(operation.getQuery().getNdbTransaction().getNdb()), - m_buffer(NULL), - m_batchOverflowCheck(NULL), - m_rowSize(0), - m_rowCount(0), + m_resultSets(), m_read(0xffffffff), m_recv(0), m_iterState(Iter_notStarted), m_currentRow(tupleNotFound), m_maxRows(0), @@ -645,12 +713,8 @@ NdbResultStream::prepare() const Uint32 rowSize = m_operation.getRowSize(); NdbQueryImpl &query = m_operation.getQuery(); - m_rowSize = rowSize; - /* Parent / child correlation is only relevant for scan type queries - * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups! - * Neither is these structures required for operations not having respective - * child or parent operations. + * Don't create a m_tupleSet with these correlation id's for lookups! */ if (isScanQuery()) { @@ -662,14 +726,7 @@ NdbResultStream::prepare() else m_maxRows = 1; - const int bufferSize = rowSize * m_maxRows; - NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc(); - m_buffer = reinterpret_cast(bufferAlloc.allocObjMem(bufferSize)); - - // So that we can test for buffer overrun. - m_batchOverflowCheck = - reinterpret_cast(bufferAlloc.allocObjMem(sizeof(Uint32))); - *m_batchOverflowCheck = 0xacbd1234; + m_resultSets[0].init(query, m_maxRows, rowSize); m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation); m_receiver.do_setup_ndbrecord( @@ -678,7 +735,7 @@ NdbResultStream::prepare() 0 /*key_size*/, 0 /*read_range_no*/, rowSize, - m_buffer); + m_resultSets[m_recv].m_buffer); } //NdbResultStream::prepare void @@ -686,12 +743,10 @@ NdbResultStream::reset() { assert (isScanQuery()); - // Root scan-operation need a ScanTabConf to complete - m_rowCount = 0; m_iterState = Iter_notStarted; m_currentRow = tupleNotFound; + m_resultSets[m_recv].prepareReceive(m_receiver); - m_receiver.prepareSend(); /** * If this stream will get new rows in the next batch, then so will * all of its descendants. @@ -717,11 +772,11 @@ NdbResultStream::findTupleWithParentId(U { assert ((parentId==tupleNotFound) == (m_parent==NULL)); - if (likely(m_rowCount>0)) + if (likely(m_resultSets[m_read].m_rowCount>0)) { if (m_tupleSet==NULL) { - assert (m_rowCount <= 1); + assert (m_resultSets[m_read].m_rowCount <= 1); return 0; } @@ -788,7 +843,7 @@ NdbResultStream::firstResult() if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound) { m_iterState = Iter_started; - m_receiver.setCurrentRow(m_buffer, m_currentRow); + m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow); return m_currentRow; } @@ -805,7 +860,7 @@ NdbResultStream::nextResult() (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound) { m_iterState = Iter_started; - m_receiver.setCurrentRow(m_buffer, m_currentRow); + m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow); return m_currentRow; } m_iterState = Iter_finished; @@ -821,7 +876,7 @@ NdbResultStream::execTRANSID_AI(const Ui TupleCorrelation correlation) { m_receiver.execTRANSID_AI(ptr, len); - m_rowCount++; + m_resultSets[m_recv].m_rowCount++; if (isScanQuery()) { @@ -848,11 +903,14 @@ NdbResultStream::prepareResultSet(Uint32 bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows assert(isComplete || isScanResult()); //Lookups always 'complete' - // Set correct #rows received in the NdbReceiver. - getReceiver().m_result_rows = getRowCount(); + m_read = m_recv; + NdbResultSet& readResult = m_resultSets[m_read]; + + // Set correct buffer and #rows received by this ResultSet. + readResult.prepareRead(m_receiver); /** - * Prepare NdbResultStream for reading - either the next received + * Prepare NdbResultSet for reading - either the next received * from datanodes or reuse current. */ if (m_tupleSet!=NULL) @@ -865,7 +923,7 @@ NdbResultStream::prepareResultSet(Uint32 else { // Makes all rows in 'TupleSet' available (clear 'm_skip' flag) - for (Uint32 tupleNo=0; tupleNo