From: Ole John Aske Date: August 16 2011 7:57am Subject: bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4450 to 4451) List-Archive: http://lists.mysql.com/commits/140640 Message-Id: <20110816075734.58F00218@fimafeng09.norway.sun.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4451 Ole John Aske 2011-08-16 SPJ refactoring: Changed ownership of m_resultStreams[] from class NdbQueryOperationImpl to class NdbRootFragment. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 4450 Ole John Aske 2011-08-15 SPJ: Refactoring of 'class InitialReceiverIdIterator': - Make it less dependent of 'class NdbQueryImpl', instead construct it from the NdbRootFragment[] which should receive the result. - Let it use the recently introduced NdbRootFragment::getReceiverId() - Change ::getNextWords() such that it doesn't update argument 'Uint32& sz' for each iteration - Instead set it immediately before return. modified: storage/ndb/src/ndbapi/NdbQueryOperation.cpp === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-15 12:36:29 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 07:56:53 +0000 @@ -148,6 +148,8 @@ public: explicit NdbRootFragment(); + ~NdbRootFragment(); + /** * Initialize object. * @param query Enclosing query. @@ -198,9 +200,11 @@ public: * @param operationNo The id of the operation. * @return The result stream for this root fragment. */ - NdbResultStream& getResultStream(Uint32 operationNo) const - { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); } - + NdbResultStream& getResultStream(Uint32 operationNo) const; + + NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const + { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); } + Uint32 getReceiverId() const; Uint32 getReceiverTcPtrI() const; @@ -215,6 +219,9 @@ public: */ bool isEmpty() const; + /** Release resources after last row has been returned */ + void postFetchRelease(); + private: STATIC_CONST( voidFragNo = 0xffffffff); @@ -224,6 +231,9 @@ private: /** Number of the root operation fragment.*/ Uint32 m_fragNo; + /** For processing results originating from this root fragment (Array of).*/ + NdbResultStream* m_resultStreams; + /** * The number of outstanding TCKEYREF or TRANSID_AI * messages for the fragment. This includes both messages related to the @@ -275,26 +285,19 @@ public: * @param operation The operation for which we will receive results. * @param rootFragNo 0..n-1 when the root operation reads from n fragments. */ - explicit NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo); + explicit NdbResultStream(NdbQueryOperationImpl& operation, + NdbRootFragment& rootFrag); ~NdbResultStream(); /** * Prepare for receiving first results. - * @return possible error code. */ - int prepare(); + void prepare(); /** Prepare for receiving next batch of scan results. */ void reset(); - /** - * 0..n-1 if the root operation reads from n fragments. This stream holds data - * derived from one of those fragments. - */ - Uint32 getRootFragNo() const - { return m_rootFragNo; } - NdbReceiver& getReceiver() { return m_receiver; } @@ -418,22 +421,27 @@ public: }; private: - /** This stream handles results derived from the m_rootFragNo'th - * fragment of the root operation.*/ - const Uint32 m_rootFragNo; + /** + * This stream handles results derived from specified + * m_rootFrag of the root operation. + */ + const NdbRootFragment& m_rootFrag; + + /** Operation to which this resultStream belong.*/ + NdbQueryOperationImpl& m_operation; /** The receiver object that unpacks transid_AI messages.*/ NdbReceiver m_receiver; + /** Used for checking if buffer overrun occurred. */ + Uint32* m_batchOverflowCheck; + /** Max #rows which this stream may recieve in its buffer structures */ Uint32 m_maxRows; /** The number of transid_AI messages received.*/ Uint32 m_rowCount; - /** Operation to which this resultStream belong.*/ - NdbQueryOperationImpl& m_operation; - /** This is the state of the iterator used by firstResult(), nextResult().*/ enum { @@ -528,12 +536,15 @@ void* NdbBulkAllocator::allocObjMem(Uint ///////// NdbResultStream methods /////////// ////////////////////////////////////////////// -NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo): - m_rootFragNo(rootFragNo), +NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, + NdbRootFragment& rootFrag) +: + m_rootFrag(rootFrag), + m_operation(operation), m_receiver(operation.getQuery().getNdbTransaction().getNdb()), + m_batchOverflowCheck(NULL), m_maxRows(0), m_rowCount(0), - m_operation(operation), m_iterState(Iter_notStarted), m_currentRow(tupleNotFound), m_subScanComplete(true), @@ -548,9 +559,12 @@ NdbResultStream::~NdbResultStream() } } -int // Return 0 if ok, else errorcode +void NdbResultStream::prepare() { + const Uint32 rowSize = m_operation.getRowSize(); + NdbQueryImpl &query = m_operation.getQuery(); + /* Parent / child correlation is only relevant for scan type queries * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups! * Neither is these structures required for operations not having respective @@ -560,7 +574,7 @@ NdbResultStream::prepare() { m_maxRows = m_operation.getMaxBatchRows(); m_tupleSet = - new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows)) + new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) TupleSet[m_maxRows]; clearTupleSet(); @@ -568,9 +582,25 @@ NdbResultStream::prepare() else m_maxRows = 1; - return 0; -} //NdbResultStream::prepare + const int bufferSize = rowSize * m_maxRows; + NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc(); + char* const rowBuf = reinterpret_cast(bufferAlloc + .allocObjMem(bufferSize)); + + // So that we can test for buffer overrun. + m_batchOverflowCheck = + reinterpret_cast(bufferAlloc.allocObjMem(sizeof(Uint32))); + *m_batchOverflowCheck = 0xacbd1234; + m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation); + m_receiver.do_setup_ndbrecord( + m_operation.getNdbRecord(), + m_maxRows, + 0 /*key_size*/, + 0 /*read_range_no*/, + rowSize, + rowBuf); +} //NdbResultStream::prepare void NdbResultStream::reset() @@ -592,7 +622,7 @@ NdbResultStream::reset() childNo++) { NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo); - child.getResultStream(getRootFragNo()).reset(); + m_rootFrag.getResultStream(child).reset(); } } //NdbResultStream::reset @@ -623,7 +653,7 @@ NdbResultStream::isAllSubScansComplete() childNo++) { const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo); - const NdbResultStream& childStream = child.getResultStream(getRootFragNo()); + const NdbResultStream& childStream = m_rootFrag.getResultStream(child); if (!childStream.isAllSubScansComplete()) return false; } @@ -744,7 +774,7 @@ NdbResultStream::firstResult() Uint16 parentId = tupleNotFound; if (parent!=NULL) { - const NdbResultStream& parentStream = parent->getResultStream(m_rootFragNo); + const NdbResultStream& parentStream = m_rootFrag.getResultStream(*parent); parentId = parentStream.getCurrentTupleId(); if (parentId == tupleNotFound) @@ -791,8 +821,8 @@ NdbResultStream::execTRANSID_AI(const Ui { const CorrelationData correlData(ptr, len); - assert(m_operation.getRoot().getResultStream(m_rootFragNo) - .m_receiver.getId() == correlData.getRootReceiverId()); + assert(m_rootFrag.getResultStream(0).m_receiver.getId() == + correlData.getRootReceiverId()); m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount); @@ -829,6 +859,9 @@ NdbResultStream::execTRANSID_AI(const Ui void NdbResultStream::handleBatchComplete() { + // Buffer overrun check. + assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234); + for (Uint32 tupleNo=0; tupleNo + (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations())); + assert(m_resultStreams!=NULL); + + for (unsigned opNo=0; opNogetNoOfOperations(); opNo++) + { + m_resultStreams[opNo].~NdbResultStream(); + } + } + /** + * Don't 'delete' the object as it was in-place constructed from + * ResultStreamAlloc'ed memory. Memory is released by + * ResultStreamAlloc::reset(). + */ + m_resultStreams = NULL; +} + +NdbResultStream& +NdbRootFragment::getResultStream(Uint32 operationNo) const +{ + assert(m_resultStreams); + return m_resultStreams[operationNo]; } void NdbRootFragment::reset() @@ -1493,7 +1572,14 @@ NdbQueryImpl::~NdbQueryImpl() void NdbQueryImpl::postFetchRelease() { - if (m_operations != NULL) { + if (m_rootFrags != NULL) + { + for (unsigned i=0; i0: User specified prefered value, ==0: Use default CFG values - m_resultStreams(NULL), m_params(), - m_bufferSize(0), - m_batchOverflowCheck(NULL), m_resultBuffer(NULL), m_resultRef(NULL), m_isRowNull(true), @@ -3438,10 +3514,10 @@ NdbQueryOperationImpl::NdbQueryOperation NdbQueryOperationImpl::~NdbQueryOperationImpl() { - // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed. - // Either by fetching through last row, or calling ::close() which forcefully terminates fetch - assert (m_batchOverflowCheck == NULL); - assert (m_resultStreams == NULL); + /** + * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed. + * Either by fetching through last row, or calling ::close() which forcefully terminates fetch + */ assert (m_firstRecAttr == NULL); assert (m_interpretedCode == NULL); } //NdbQueryOperationImpl::~NdbQueryOperationImpl() @@ -3453,22 +3529,6 @@ NdbQueryOperationImpl::~NdbQueryOperatio void NdbQueryOperationImpl::postFetchRelease() { - // Buffer overrun check. - assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck == 0xacbd1234); - m_batchOverflowCheck = NULL; - - if (m_resultStreams != NULL) - { - for (int i = static_cast(getQuery().getRootFragCount())-1; i >= 0; i--) - { - if (m_resultStreams[i] != NULL) - { - m_resultStreams[i]->~NdbResultStream(); - } - } - } - m_resultStreams = NULL; - Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb(); NdbRecAttr* recAttr = m_firstRecAttr; while (recAttr != NULL) { @@ -3702,7 +3762,7 @@ NdbQueryOperationImpl::firstResult() if (rootFrag != NULL) { - NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()]; + NdbResultStream& resultStream = rootFrag->getResultStream(*this); if (resultStream.firstResult() != tupleNotFound) { fetchRow(resultStream); @@ -3742,7 +3802,7 @@ NdbQueryOperationImpl::nextResult(bool f const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent(); if (rootFrag!=NULL) { - NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()]; + NdbResultStream& resultStream = rootFrag->getResultStream(*this); if (resultStream.nextResult() != tupleNotFound) { fetchRow(resultStream); @@ -4073,55 +4133,9 @@ NdbQueryOperationImpl::setBatchedRows(Ui } } - -int -NdbQueryOperationImpl::prepareReceiver() -{ - // Construct receiver streams and prepare them for receiving scan result - assert(m_resultStreams==NULL); - assert(m_queryImpl.getRootFragCount() > 0); - - m_resultStreams = reinterpret_cast - (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount())); - - for(Uint32 i = 0; iprepare(); - if (unlikely(error)) { - return error; - } - - m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION, - false, this); - char* const rowBuf = reinterpret_cast(getQuery().getRowBufferAlloc() - .allocObjMem(m_bufferSize)); - m_resultStreams[i]->getReceiver() - .do_setup_ndbrecord(m_ndbRecord, - getMaxBatchRows(), - 0 /*key_size*/, - 0 /*read_range_no*/, - getRowSize(), - rowBuf); - m_resultStreams[i]->getReceiver().prepareSend(); - } - // So that we can test for for buffer overrun. - m_batchOverflowCheck = - reinterpret_cast(getQuery().getRowBufferAlloc() - .allocObjMem(sizeof(Uint32))); - *m_batchOverflowCheck = 0xacbd1234; - return 0; -}//NdbQueryOperationImpl::prepareReceiver - int NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo) { - // ::prepareReceiver() need to complete first: - assert (m_resultStreams != NULL); - const NdbQueryOperationDefImpl& def = getQueryOperationDef(); /** @@ -4604,14 +4618,14 @@ NdbQueryOperationImpl::execTRANSID_AI(co } // Process result values. - m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len); + rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len); rootFrag->incrOutstandingResults(-1); bool ret = false; if (rootFrag->isFragBatchComplete()) { - ret = m_queryImpl.handleBatchComplete(rootFragNo); + ret = m_queryImpl.handleBatchComplete(*rootFrag); } if (traceSignals) { @@ -4653,7 +4667,6 @@ NdbQueryOperationImpl::execTCKEYREF(cons } } - Uint32 rootFragNo = 0; NdbRootFragment& rootFrag = getQuery().m_rootFrags[0]; if (ref->errorCode != DbspjErr::NodeFailure) @@ -4678,13 +4691,12 @@ NdbQueryOperationImpl::execTCKEYREF(cons bool ret = false; if (rootFrag.isFragBatchComplete()) { - ret = m_queryImpl.handleBatchComplete(rootFragNo); + ret = m_queryImpl.handleBatchComplete(rootFrag); } if (traceSignals) { ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret - << ", *getRoot().m_resultStreams[0] {" - << *getRoot().m_resultStreams[0] << "}" + << ", resultStream= {" << rootFrag.getResultStream(*this) << "}" << ", *this=" << *this << endl; } return ret; @@ -4720,12 +4732,13 @@ NdbQueryOperationImpl::execSCAN_TABCONF( rootFrag->incrOutstandingResults(rowCount); // Handle for SCAN_NEXTREQ, RNIL -> EOF - NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()]; + NdbResultStream& resultStream = rootFrag->getResultStream(*this); resultStream.getReceiver().m_tcPtrI = tcPtrI; if(traceSignals){ - ndbout << " resultStream(root) {" << resultStream << "} fragNo" - << rootFrag->getFragNo() << endl; + ndbout << " resultStream {" << rootFrag->getResultStream(*this) + << "} fragNo" << rootFrag->getFragNo() + << endl; } const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef(); @@ -4757,7 +4770,7 @@ NdbQueryOperationImpl::execSCAN_TABCONF( if (rootFrag->isFragBatchComplete()) { /* This fragment is now complete */ - ret = m_queryImpl.handleBatchComplete(rootFrag->getFragNo()); + ret = m_queryImpl.handleBatchComplete(*rootFrag); } if (traceSignals) { ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret @@ -4893,13 +4906,6 @@ int NdbQueryOperationImpl::setBatchSize( return 0; } -NdbResultStream& -NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const -{ - assert(rootFragNo < getQuery().getRootFragCount()); - return *m_resultStreams[rootFragNo]; -} - bool NdbQueryOperationImpl::hasInterpretedCode() const { @@ -4938,15 +4944,8 @@ NdbQueryOperationImpl::prepareInterprete Uint32 NdbQueryOperationImpl::getIdOfReceiver() const { - return m_resultStreams[0]->getReceiver().getId(); -} - - -const NdbReceiver& -NdbQueryOperationImpl::getReceiver(Uint32 recNo) const { - assert(recNogetReceiver(); + NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0]; + return rootFrag.getResultStream(*this).getReceiver().getId(); } Uint32 NdbQueryOperationImpl::getRowSize() const @@ -4975,7 +4974,8 @@ NdbOut& operator<<(NdbOut& out, const Nd out << " m_queryImpl: " << &op.m_queryImpl; out << " m_operationDef: " << &op.m_operationDef; for(Uint32 i = 0; i