From: Pekka Nousiainen Date: August 16 2011 6:27pm Subject: bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4425 to 4426) List-Archive: http://lists.mysql.com/commits/140660 Message-Id: <20110816182712.5F02B55875@sama.localdomain> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4426 Pekka Nousiainen 2011-08-16 [merge] merge telco-7.0 to wl4124-new2 modified: mysql-test/r/group_by.result mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf mysql-test/t/group_by.test sql/sql_select.cc storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp storage/ndb/src/ndbapi/NdbQueryOperation.cpp storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp 4425 Pekka Nousiainen 2011-08-16 wl#4124 x20_fix.diff no dtor is called due to useless pthread_exit(0) modified: sql/ha_ndb_index_stat.cc === modified file 'mysql-test/r/group_by.result' --- a/mysql-test/r/group_by.result 2010-10-29 08:23:06 +0000 +++ b/mysql-test/r/group_by.result 2011-08-16 10:20:19 +0000 @@ -1856,3 +1856,18 @@ COUNT(*) 2 DROP TABLE t1; # End of 5.1 tests +# +# Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2 +# +CREATE TABLE t1 (i int); +INSERT INTO t1 VALUES (1); +CREATE TABLE t2 (pk int PRIMARY KEY); +INSERT INTO t2 VALUES (10); +CREATE VIEW v1 AS SELECT t2.pk FROM t2; +SELECT v1.pk +FROM t1 LEFT JOIN v1 ON t1.i = v1.pk +GROUP BY v1.pk; +pk +DROP VIEW v1; +DROP TABLE t1,t2; +# End of Bug#12798270 === modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf' --- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf 2011-07-07 14:48:06 +0000 +++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf 2011-07-11 10:40:00 +0000 @@ -25,14 +25,7 @@ skip-slave-start [mysqld.2.slave] server-id= 4 log-bin = sec-master-2 -master-host= 127.0.0.1 -master-port= @mysqld.2.1.port -master-password= @mysqld.2.1.#password -master-user= @mysqld.2.1.#user -master-connect-retry= 1 -init-rpl-role= slave skip-slave-start -ndb_connectstring= @mysql_cluster.slave.ndb_connectstring [ENV] === modified file 'mysql-test/t/group_by.test' --- a/mysql-test/t/group_by.test 2010-10-29 08:23:06 +0000 +++ b/mysql-test/t/group_by.test 2011-08-16 10:20:19 +0000 @@ -1248,3 +1248,24 @@ ON 1 WHERE t2.f1 > 1 GROUP BY t2.f1; DROP TABLE t1; --echo # End of 5.1 tests + +--echo # +--echo # Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2 +--echo # + +CREATE TABLE t1 (i int); +INSERT INTO t1 VALUES (1); + +CREATE TABLE t2 (pk int PRIMARY KEY); +INSERT INTO t2 VALUES (10); + +CREATE VIEW v1 AS SELECT t2.pk FROM t2; + +SELECT v1.pk +FROM t1 LEFT JOIN v1 ON t1.i = v1.pk +GROUP BY v1.pk; + +DROP VIEW v1; +DROP TABLE t1,t2; + +--echo # End of Bug#12798270 === modified file 'sql/sql_select.cc' --- a/sql/sql_select.cc 2011-06-30 15:59:25 +0000 +++ b/sql/sql_select.cc 2011-08-16 10:20:19 +0000 @@ -6876,7 +6876,15 @@ make_join_readinfo(JOIN *join, ulonglong (join->sort_by_table == (TABLE *) 1 && i != join->const_tables))) ordered_set= 1; +#ifdef MCP_BUG12798270 tab->sorted= sorted; +#else + /* + For eq_ref there is at most one join match for each row from + previous tables so ordering is not useful. + */ + tab->sorted= (tab->type != JT_EQ_REF) ? sorted : false; +#endif sorted= 0; // only first must be sorted table->status=STATUS_NO_RECORD; pick_table_access_method (tab); === modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp' --- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp 2011-08-16 08:47:35 +0000 @@ -155,19 +155,26 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan { const Frag& frag = node.m_frag; const Index& index = *c_indexPool.getPtr(frag.m_indexId); - KeyData prefKey(index.m_keySpec, false, 0); - prefKey.set_buf(node.getPref(), index.m_prefBytes); + /* + * bug#12873640 + * Node prefix exists if it has non-zero number of attributes. It is + * then a partial instance of KeyData. If the prefix does not exist + * then set_buf() could overwrite m_pageId1 in first entry, causing + * random crash in TUP via readKeyAttrs(). + */ if (index.m_prefAttrs > 0) { + KeyData prefKey(index.m_keySpec, false, 0); + prefKey.set_buf(node.getPref(), index.m_prefBytes); jam(); readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs); - } #ifdef VM_TRACE - if (debugFlags & DebugMaint) { - debugOut << "setNodePref: " << node; - debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes); - debugOut << endl; - } + if (debugFlags & DebugMaint) { + debugOut << "setNodePref: " << node; + debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes); + debugOut << endl; + } #endif + } } // node operations === modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp' --- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp 2011-02-01 23:27:25 +0000 +++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp 2011-08-16 08:27:14 +0000 @@ -39,6 +39,16 @@ static const char * f_method = "MSms"; #endif #define MAX_CHUNKS 10 +#ifdef VM_TRACE +#ifndef NDBD_RANDOM_START_PAGE +#define NDBD_RANDOM_START_PAGE +#endif +#endif + +#ifdef NDBD_RANDOM_START_PAGE +static Uint32 g_random_start_page_id = 0; +#endif + /* * For muti-threaded ndbd, these calls are used for locking around * memory allocation operations. @@ -224,6 +234,16 @@ Ndbd_mem_manager::Ndbd_mem_manager() mt_mem_manager_init(); } +void* +Ndbd_mem_manager::get_memroot() const +{ +#ifdef NDBD_RANDOM_START_PAGE + return (void*)(m_base_page - g_random_start_page_id); +#else + return (void*)m_base_page; +#endif +} + /** * * resource 0 has following semantics: @@ -359,6 +379,29 @@ Ndbd_mem_manager::init(Uint32 *watchCoun } #endif +#ifdef NDBD_RANDOM_START_PAGE + /** + * In order to find bad-users of page-id's + * we add a random offset to the page-id's returned + * however, due to ZONE_LO that offset can't be that big + * (since we at get_page don't know if it's a HI/LO page) + */ + Uint32 max_rand_start = ZONE_LO_BOUND - 1; + if (max_rand_start > pages) + { + max_rand_start -= pages; + if (max_rand_start > 0x10000) + g_random_start_page_id = 0x10000 + (rand() % (max_rand_start - 0x10000)); + else if (max_rand_start) + g_random_start_page_id = rand() % max_rand_start; + + assert(Uint64(pages) + Uint64(g_random_start_page_id) <= 0xFFFFFFFF); + + ndbout_c("using g_random_start_page_id: %u (%.8x)", + g_random_start_page_id, g_random_start_page_id); + } +#endif + /** * Do malloc */ @@ -670,7 +713,7 @@ Ndbd_mem_manager::alloc(AllocZone zone, return; * pages = save; } - + alloc_impl(ZONE_LO, ret, pages, min); } @@ -870,7 +913,12 @@ Ndbd_mem_manager::alloc_page(Uint32 type check_resource_limits(m_resource_limit); mt_mem_manager_unlock(); +#ifdef NDBD_RANDOM_START_PAGE + *i += g_random_start_page_id; + return m_base_page + *i - g_random_start_page_id; +#else return m_base_page + *i; +#endif } } mt_mem_manager_unlock(); @@ -885,7 +933,11 @@ Ndbd_mem_manager::release_page(Uint32 ty mt_mem_manager_lock(); Resource_limit tot = m_resource_limit[0]; Resource_limit rl = m_resource_limit[idx]; - + +#ifdef NDBD_RANDOM_START_PAGE + i -= g_random_start_page_id; +#endif + Uint32 sub = (rl.m_curr <= rl.m_min) ? 1 : 0; // Over min ? release(i, 1); m_resource_limit[0].m_curr = tot.m_curr - 1; @@ -954,10 +1006,16 @@ Ndbd_mem_manager::alloc_pages(Uint32 typ m_resource_limit[idx].m_curr = rl.m_curr + req; check_resource_limits(m_resource_limit); mt_mem_manager_unlock(); +#ifdef NDBD_RANDOM_START_PAGE + *i += g_random_start_page_id; +#endif return ; } mt_mem_manager_unlock(); * cnt = req; +#ifdef NDBD_RANDOM_START_PAGE + *i += g_random_start_page_id; +#endif return; } @@ -969,7 +1027,11 @@ Ndbd_mem_manager::release_pages(Uint32 t mt_mem_manager_lock(); Resource_limit tot = m_resource_limit[0]; Resource_limit rl = m_resource_limit[idx]; - + +#ifdef NDBD_RANDOM_START_PAGE + i -= g_random_start_page_id; +#endif + release(i, cnt); Uint32 currnew = rl.m_curr - cnt; === modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp' --- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2011-06-30 15:59:25 +0000 +++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp 2011-08-16 08:27:14 +0000 @@ -68,7 +68,7 @@ public: bool init(Uint32 *watchCounter, bool allow_alloc_less_than_requested = true); void map(Uint32 * watchCounter, bool memlock = false, Uint32 resources[] = 0); - void* get_memroot() const { return (void*)m_base_page;} + void* get_memroot() const; void dump() const ; === modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp' --- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-07-01 10:02:15 +0000 +++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp 2011-08-16 13:37:24 +0000 @@ -87,6 +87,32 @@ const bool traceSignals = false; * children. That way, results for child operations can be updated correctly * when the application iterates over the results of the root scan operation. */ +class TupleCorrelation +{ +public: + static const Uint32 wordCount = 1; + + explicit TupleCorrelation() + : m_correlation((tupleNotFound<<16) | tupleNotFound) + {} + + /** Conversion to/from Uint32 to store/fetch from buffers */ + explicit TupleCorrelation(Uint32 val) + : m_correlation(val) + {} + Uint32 toUint32() const + { return m_correlation; } + + Uint16 getTupleId() const + { return m_correlation & 0xffff;} + + Uint16 getParentTupleId() const + { return m_correlation >> 16;} + +private: + Uint32 m_correlation; +}; // class TupleCorrelation + class CorrelationData { public: @@ -99,18 +125,15 @@ public: assert(AttributeHeader(m_corrPart[0]).getAttributeId() == AttributeHeader::CORR_FACTOR64); assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32)); - assert(getTupleId()> 16;} + const TupleCorrelation getTupleCorrelation() const + { return TupleCorrelation(m_corrPart[1]); } private: const Uint32* const m_corrPart; @@ -148,6 +171,8 @@ public: explicit NdbRootFragment(); + ~NdbRootFragment(); + /** * Initialize object. * @param query Enclosing query. @@ -164,6 +189,11 @@ public: */ void reset(); + /** + * Prepare for reading another batch of results. + */ + void prepareResultSet(); + void incrOutstandingResults(Int32 delta) { m_outstandingResults += delta; @@ -198,9 +228,14 @@ public: * @param operationNo The id of the operation. * @return The result stream for this root fragment. */ - NdbResultStream& getResultStream(Uint32 operationNo) const - { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); } - + NdbResultStream& getResultStream(Uint32 operationNo) const; + + NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const + { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); } + + Uint32 getReceiverId() const; + Uint32 getReceiverTcPtrI() const; + /** * @return True if there are no more batches to be received for this fragment. */ @@ -212,6 +247,19 @@ public: */ bool isEmpty() const; + /** + * This method is used for marking which streams belonging to this + * NdbRootFragment which has remaining batches for a sub scan + * instantiated from the current batch of its parent operation. + */ + void setRemainingSubScans(Uint32 nodeMask) + { + m_remainingScans = nodeMask; + } + + /** Release resources after last row has been returned */ + void postFetchRelease(); + private: STATIC_CONST( voidFragNo = 0xffffffff); @@ -221,6 +269,9 @@ private: /** Number of the root operation fragment.*/ Uint32 m_fragNo; + /** For processing results originating from this root fragment (Array of).*/ + NdbResultStream* m_resultStreams; + /** * The number of outstanding TCKEYREF or TRANSID_AI * messages for the fragment. This includes both messages related to the @@ -239,6 +290,12 @@ private: * TCKEYCONF message has been received */ bool m_confReceived; + /** + * A bitmask of operation id's for which we will receive more + * ResultSets in a NEXTREQ. + */ + Uint32 m_remainingScans; + /** * Used for implementing a hash map from root receiver ids to a * NdbRootFragment instance. m_idMapHead is the index of the first @@ -272,26 +329,19 @@ public: * @param operation The operation for which we will receive results. * @param rootFragNo 0..n-1 when the root operation reads from n fragments. */ - explicit NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo); + explicit NdbResultStream(NdbQueryOperationImpl& operation, + NdbRootFragment& rootFrag); ~NdbResultStream(); /** * Prepare for receiving first results. - * @return possible error code. */ - int prepare(); + void prepare(); /** Prepare for receiving next batch of scan results. */ void reset(); - /** - * 0..n-1 if the root operation reads from n fragments. This stream holds data - * derived from one of those fragments. - */ - Uint32 getRootFragNo() const - { return m_rootFragNo; } - NdbReceiver& getReceiver() { return m_receiver; } @@ -301,6 +351,9 @@ public: Uint32 getRowCount() const { return m_rowCount; } + char* getRow(Uint32 tupleNo) const + { return (m_buffer + (tupleNo*m_rowSize)); } + /** * Process an incomming tuple for this stream. Extract parent and own tuple * ids and pass it on to m_receiver. @@ -308,12 +361,15 @@ public: * @param ptr buffer holding tuple. * @param len buffer length. */ - void execTRANSID_AI(const Uint32 *ptr, Uint32 len); + void execTRANSID_AI(const Uint32 *ptr, Uint32 len, + TupleCorrelation correlation); - /** A complete batch has been received for a fragment on this NdbResultStream, - * Update whatever required before the appl. are allowed to navigate the result. + /** + * A complete batch has been received for a fragment on this NdbResultStream, + * Update whatever required before the appl. are allowed to navigate the result. + * @return true if node and all its siblings have returned all rows. */ - void handleBatchComplete(); + bool prepareResultSet(Uint32 remainingScans); /** * Navigate within the current result batch to resp. first and next row. @@ -333,36 +389,33 @@ public: { return m_iterState == Iter_finished; } /** - * This method is - * used for marking a stream as holding the last batch of a sub scan. - * This means that it is the last batch of the scan that was instantiated - * from the current batch of its parent operation. - */ - void setSubScanCompletion(bool complete) - { - // Lookups should always be 'complete' - assert(complete || m_operation.getQueryOperationDef().isScanOperation()); - m_subScanComplete = complete; - } - - /** * This method - * returns true if this result stream holds the last batch of a sub scan + * returns true if this result stream holds the last batch of a sub scan. * This means that it is the last batch of the scan that was instantiated * from the current batch of its parent operation. */ - bool isSubScanComplete() const + bool isSubScanComplete(Uint32 remainingScans) const { - // Lookups should always be 'complete' - assert(m_subScanComplete || m_operation.getQueryOperationDef().isScanOperation()); - return m_subScanComplete; + /** + * Find the node number seen by the SPJ block. Since a unique index + * operation will have two distincts nodes in the tree used by the + * SPJ block, this number may be different from 'opNo'. + */ + const Uint32 internalOpNo = m_operation.getQueryOperationDef().getQueryOperationId(); + + const bool complete = !((remainingScans >> internalOpNo) & 1); + assert(complete || isScanResult()); // Lookups should always be 'complete' + return complete; } - /** Variant of isSubScanComplete() above which checks that this resultstream - * and all its descendants have consumed all batches of rows instantiated - * from their parent operation(s). - */ - bool isAllSubScansComplete() const; + bool isScanQuery() const + { return (m_properties & Is_Scan_Query); } + + bool isScanResult() const + { return (m_properties & Is_Scan_Result); } + + bool isInnerJoin() const + { return (m_properties & Is_Inner_Join); } /** For debugging.*/ friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&); @@ -405,7 +458,7 @@ public: * that had no matching children.*/ Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild; - explicit TupleSet() + explicit TupleSet() : m_hash_head(tupleNotFound) {} private: @@ -415,22 +468,39 @@ public: }; private: - /** This stream handles results derived from the m_rootFragNo'th - * fragment of the root operation.*/ - const Uint32 m_rootFragNo; + /** + * This stream handles results derived from specified + * m_rootFrag of the root operation. + */ + const NdbRootFragment& m_rootFrag; + + /** Operation to which this resultStream belong.*/ + NdbQueryOperationImpl& m_operation; + + /** ResultStream for my parent operation, or NULL if I am root */ + NdbResultStream* const m_parent; + + const enum properties + { + Is_Scan_Query = 0x01, + Is_Scan_Result = 0x02, + Is_Inner_Join = 0x10 + } m_properties; /** The receiver object that unpacks transid_AI messages.*/ NdbReceiver m_receiver; - /** Max #rows which this stream may recieve in its buffer structures */ - Uint32 m_maxRows; + /** The buffers which we receive the results into */ + char* m_buffer; + + /** Used for checking if buffer overrun occurred. */ + Uint32* m_batchOverflowCheck; + + Uint32 m_rowSize; /** The number of transid_AI messages received.*/ Uint32 m_rowCount; - /** Operation to which this resultStream belong.*/ - NdbQueryOperationImpl& m_operation; - /** This is the state of the iterator used by firstResult(), nextResult().*/ enum { @@ -442,24 +512,19 @@ private: Iter_finished } m_iterState; - /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */ + /** + * Tuple id of the current tuple, or 'tupleNotFound' + * if Iter_notStarted or Iter_finished. + */ Uint16 m_currentRow; - /** - * This field is only used for result streams of scan operations. If set, - * it indicates that the stream is holding the last batch of a sub scan. - * This means that it is the last batch of the scan that was instantiated - * from the current batch of its parent operation. - */ - bool m_subScanComplete; + /** Max #rows which this stream may recieve in its TupleSet structures */ + Uint32 m_maxRows; + /** TupleSet contains the correlation between parent/childs */ TupleSet* m_tupleSet; - void clearTupleSet(); - - void setParentChildMap(Uint16 parentId, - Uint16 tupleId, - Uint16 tupleNo); + void buildResultCorrelations(); Uint16 getTupleId(Uint16 tupleNo) const { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; } @@ -525,15 +590,30 @@ void* NdbBulkAllocator::allocObjMem(Uint ///////// NdbResultStream methods /////////// ////////////////////////////////////////////// -NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo): - m_rootFragNo(rootFragNo), +NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, + NdbRootFragment& rootFrag) +: + m_rootFrag(rootFrag), + m_operation(operation), + m_parent(operation.getParentOperation() + ? &rootFrag.getResultStream(*operation.getParentOperation()) + : NULL), + m_properties( + (enum properties) + ((operation.getQueryDef().isScanQuery() + ? Is_Scan_Query : 0) + | (operation.getQueryOperationDef().isScanOperation() + ? Is_Scan_Result : 0) + | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll + ? Is_Inner_Join : 0))), m_receiver(operation.getQuery().getNdbTransaction().getNdb()), - m_maxRows(0), + m_buffer(NULL), + m_batchOverflowCheck(NULL), + m_rowSize(0), m_rowCount(0), - m_operation(operation), m_iterState(Iter_notStarted), m_currentRow(tupleNotFound), - m_subScanComplete(true), + m_maxRows(0), m_tupleSet(NULL) {}; @@ -545,41 +625,58 @@ NdbResultStream::~NdbResultStream() } } -int // Return 0 if ok, else errorcode +void NdbResultStream::prepare() { + const Uint32 rowSize = m_operation.getRowSize(); + NdbQueryImpl &query = m_operation.getQuery(); + + m_rowSize = rowSize; + /* Parent / child correlation is only relevant for scan type queries * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups! * Neither is these structures required for operations not having respective * child or parent operations. */ - if (m_operation.getQueryDef().isScanQuery()) + if (isScanQuery()) { m_maxRows = m_operation.getMaxBatchRows(); m_tupleSet = - new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows)) + new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) TupleSet[m_maxRows]; - - clearTupleSet(); } else m_maxRows = 1; - return 0; -} //NdbResultStream::prepare + const int bufferSize = rowSize * m_maxRows; + NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc(); + m_buffer = reinterpret_cast(bufferAlloc.allocObjMem(bufferSize)); + // So that we can test for buffer overrun. + m_batchOverflowCheck = + reinterpret_cast(bufferAlloc.allocObjMem(sizeof(Uint32))); + *m_batchOverflowCheck = 0xacbd1234; + + m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation); + m_receiver.do_setup_ndbrecord( + m_operation.getNdbRecord(), + m_maxRows, + 0 /*key_size*/, + 0 /*read_range_no*/, + rowSize, + m_buffer); +} //NdbResultStream::prepare void NdbResultStream::reset() { - assert (m_operation.getQueryDef().isScanQuery()); + assert (isScanQuery()); // Root scan-operation need a ScanTabConf to complete m_rowCount = 0; m_iterState = Iter_notStarted; m_currentRow = tupleNotFound; - clearTupleSet(); m_receiver.prepareSend(); /** * If this stream will get new rows in the next batch, then so will @@ -589,88 +686,14 @@ NdbResultStream::reset() childNo++) { NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo); - child.getResultStream(getRootFragNo()).reset(); + m_rootFrag.getResultStream(child).reset(); } } //NdbResultStream::reset -void -NdbResultStream::clearTupleSet() -{ - assert (m_operation.getQueryDef().isScanQuery()); - for (Uint32 i=0; i0)) { @@ -736,14 +759,10 @@ NdbResultStream::findNextTuple(Uint16 tu Uint16 NdbResultStream::firstResult() { - NdbQueryOperationImpl* parent = m_operation.getParentOperation(); - Uint16 parentId = tupleNotFound; - if (parent!=NULL) + if (m_parent!=NULL) { - const NdbResultStream& parentStream = parent->getResultStream(m_rootFragNo); - parentId = parentStream.getCurrentTupleId(); - + parentId = m_parent->getCurrentTupleId(); if (parentId == tupleNotFound) { m_currentRow = tupleNotFound; @@ -780,104 +799,195 @@ NdbResultStream::nextResult() } //NdbResultStream::nextResult() +/** + * Callback when a TRANSID_AI signal (receive row) is processed. + */ void -NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len) +NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len, + TupleCorrelation correlation) { - assert(m_iterState == Iter_notStarted); - if (m_operation.getQueryDef().isScanQuery()) - { - const CorrelationData correlData(ptr, len); - - assert(m_operation.getRoot().getResultStream(m_rootFragNo) - .m_receiver.getId() == correlData.getRootReceiverId()); - - m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount); + m_receiver.execTRANSID_AI(ptr, len); + m_rowCount++; + if (isScanQuery()) + { /** - * Keep correlation data between parent and child tuples. - * Since tuples may arrive in any order, we cannot match - * parent and child until all tuples (for this batch and - * root fragment) have arrived. + * Store TupleCorrelation as hidden value imm. after received row + * (NdbQueryOperationImpl::getRowSize() has reserved space for it) */ - setParentChildMap(m_operation.getParentOperation()==NULL - ? tupleNotFound - : correlData.getParentTupleId(), - correlData.getTupleId(), - m_rowCount); + Uint32* row_recv = reinterpret_cast(m_receiver.m_record.m_row); + row_recv[-1] = correlation.toUint32(); } - else - { - // Lookup query. - m_receiver.execTRANSID_AI(ptr, len); - } - m_rowCount++; - /* Set correct #rows received in the NdbReceiver. - */ - getReceiver().m_result_rows = getRowCount(); } // NdbResultStream::execTRANSID_AI() - /** - * A fresh batch of results has arrived for this ResultStream (and all its parent / childs) - * Filter away any result rows which should not be visible (yet) - Either due to incomplete - * child batches, or the join being an 'inner join'. - * Set result itterator state to 'before first' resultrow. + * Make preparations for another batch of result to be read: + * - Fill in parent/child result correlations in m_tupleSet[] + * - ... or reset m_tupleSet[] if we reuse the previous. + * - Apply inner/outer join filtering to remove non qualifying + * rows. */ -void -NdbResultStream::handleBatchComplete() +bool +NdbResultStream::prepareResultSet(Uint32 remainingScans) { - for (Uint32 tupleNo=0; tupleNo> 2) % noOfFrags; int current = frags[hash].m_idMapHead; assert(current < static_cast(noOfFrags)); - while (current >= 0 && - frags[current].getResultStream(0).getReceiver().getId() - != receiverId) + while (current >= 0 && frags[current].getReceiverId() != receiverId) { current = frags[current].m_idMapNext; assert(current < static_cast(noOfFrags)); @@ -924,18 +1033,65 @@ NdbRootFragment::receiverIdLookup(NdbRoo NdbRootFragment::NdbRootFragment(): m_query(NULL), m_fragNo(voidFragNo), + m_resultStreams(NULL), m_outstandingResults(0), m_confReceived(false), + m_remainingScans(0), m_idMapHead(-1), m_idMapNext(-1) { } +NdbRootFragment::~NdbRootFragment() +{ + assert(m_resultStreams==NULL); +} + void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo) { assert(m_fragNo==voidFragNo); m_query = &query; m_fragNo = fragNo; + + m_resultStreams = reinterpret_cast + (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations())); + assert(m_resultStreams!=NULL); + + for (unsigned opNo=0; opNogetNoOfOperations(); opNo++) + { + m_resultStreams[opNo].~NdbResultStream(); + } + } + /** + * Don't 'delete' the object as it was in-place constructed from + * ResultStreamAlloc'ed memory. Memory is released by + * ResultStreamAlloc::reset(). + */ + m_resultStreams = NULL; +} + +NdbResultStream& +NdbRootFragment::getResultStream(Uint32 operationNo) const +{ + assert(m_resultStreams); + return m_resultStreams[operationNo]; } void NdbRootFragment::reset() @@ -943,9 +1099,31 @@ void NdbRootFragment::reset() assert(m_fragNo!=voidFragNo); assert(m_outstandingResults == 0); assert(m_confReceived); + + for (unsigned opNo=0; opNogetNoOfOperations(); opNo++) + { + if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans)) + { + /** + * Reset m_resultStreams[] and all its descendants, since all these + * streams will get a new set of rows in the next batch. + */ + m_resultStreams[opNo].reset(); + } + } m_confReceived = false; } +void NdbRootFragment::prepareResultSet() +{ + NdbResultStream& rootStream = getResultStream(0); + rootStream.prepareResultSet(m_remainingScans); + + /* Position at the first (sorted?) row available from this fragments. + */ + rootStream.firstResult(); +} + void NdbRootFragment::setConfReceived() { /* For a query with a lookup root, there may be more than one TCKEYCONF @@ -958,14 +1136,31 @@ void NdbRootFragment::setConfReceived() bool NdbRootFragment::finalBatchReceived() const { - return getResultStream(0).getReceiver().m_tcPtrI==RNIL; + return getReceiverTcPtrI()==RNIL; } -bool NdbRootFragment::isEmpty() const +bool NdbRootFragment::isEmpty() const { return getResultStream(0).isEmpty(); } +/** + * SPJ requests are identified by the receiver-id of the + * *root* ResultStream for each RootFragment. Furthermore + * a NEXTREQ use the tcPtrI saved in this ResultStream to + * identify the 'cursor' to restart. + * + * We provide some convenient accessors for fetching this info + */ +Uint32 NdbRootFragment::getReceiverId() const +{ + return getResultStream(0).getReceiver().getId(); +} + +Uint32 NdbRootFragment::getReceiverTcPtrI() const +{ + return getResultStream(0).getReceiver().m_tcPtrI; +} /////////////////////////////////////////// ///////// NdbQuery API methods /////////// @@ -1475,7 +1670,14 @@ NdbQueryImpl::~NdbQueryImpl() void NdbQueryImpl::postFetchRelease() { - if (m_operations != NULL) { + if (m_rootFrags != NULL) + { + for (unsigned i=0; iprepareResultSet(); m_applFrags.add(*frag); } if (m_applFrags.getCurrent() != NULL) @@ -2040,6 +2243,7 @@ NdbQueryImpl::awaitMoreResults(bool forc NdbRootFragment* frag; if ((frag=m_fullFrags.pop()) != NULL) { + frag->prepareResultSet(); m_applFrags.add(*frag); } assert(m_fullFrags.pop()==NULL); // Only one stream for lookups. @@ -2070,15 +2274,15 @@ NdbQueryImpl::awaitMoreResults(bool forc returns: 'true' when application thread should be resumed. */ bool -NdbQueryImpl::handleBatchComplete(Uint32 fragNo) +NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag) { if (traceSignals) { - ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo + ndbout << "NdbQueryImpl::handleBatchComplete" + << ", fragNo=" << rootFrag.getFragNo() << ", pendingFrags=" << (m_pendingFrags-1) << ", finalBatchFrags=" << m_finalBatchFrags << endl; } - bool resume = false; /* May received fragment data after a SCANREF() (timeout?) * terminated the scan. We are about to close this query, @@ -2086,8 +2290,6 @@ NdbQueryImpl::handleBatchComplete(Uint32 */ if (likely(m_fullFrags.m_errorCode == 0)) { - NdbQueryOperationImpl& root = getRoot(); - NdbRootFragment& rootFrag = m_rootFrags[fragNo]; assert(rootFrag.isFragBatchComplete()); assert(m_pendingFrags > 0); // Check against underflow. @@ -2100,34 +2302,14 @@ NdbQueryImpl::handleBatchComplete(Uint32 assert(m_finalBatchFrags <= m_rootFragCount); } - if (getQueryDef().isScanQuery()) - { - // Only required for scans - root.getResultStream(fragNo).handleBatchComplete(); - - // Only ordered scans has to wait until all pending completed - resume = (m_pendingFrags==0) || - (root.m_ordering==NdbQueryOptions::ScanOrdering_unordered); - } - else - { - assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL); - assert(m_finalBatchFrags==1); - assert(m_pendingFrags==0); // Lookup query should be complete now. - resume = true; - } - - /* Position at the first (sorted?) row available from this fragments. - */ - root.m_resultStreams[fragNo]->firstResult(); - /* When application thread ::awaitMoreResults() it will later be moved * from m_fullFrags to m_applFrags under mutex protection. */ m_fullFrags.push(rootFrag); + return true; } - return resume; + return false; } // NdbQueryImpl::handleBatchComplete int @@ -2272,22 +2454,22 @@ NdbQueryImpl::execTCKEYCONF() ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl; } assert(!getQueryDef().isScanQuery()); + NdbRootFragment& rootFrag = m_rootFrags[0]; // We will get 1 + #leaf-nodes TCKEYCONF for a lookup... - m_rootFrags[0].setConfReceived(); - m_rootFrags[0].incrOutstandingResults(-1); + rootFrag.setConfReceived(); + rootFrag.incrOutstandingResults(-1); bool ret = false; - if (m_rootFrags[0].isFragBatchComplete()) + if (rootFrag.isFragBatchComplete()) { - ret = handleBatchComplete(0); + ret = handleBatchComplete(rootFrag); } if (traceSignals) { ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret << ", m_pendingFrags=" << m_pendingFrags - << ", *getRoot().m_resultStreams[0]=" - << *getRoot().m_resultStreams[0] + << ", rootStream= {" << rootFrag.getResultStream(0) << "}" << endl; } return ret; @@ -2364,9 +2546,7 @@ NdbQueryImpl::prepareSend() // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects. error = m_pointerAlloc.init(m_rootFragCount * (SharedFragStack::pointersPerFragment + - OrderedFragSet::pointersPerFragment + - // Pointers to NdbResultStream objects. - getNoOfOperations())); + OrderedFragSet::pointersPerFragment)); if (error != 0) { setErrorCode(error); @@ -2377,21 +2557,22 @@ NdbQueryImpl::prepareSend() getRoot().calculateBatchedRows(NULL); getRoot().setBatchedRows(1); - /** Calculate total amount of row buffer space for all operations and - * fragments.*/ + /** + * Calculate total amount of row buffer space for all operations and + * fragments. + */ Uint32 totalBuffSize = 0; for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++) { - NdbQueryOperationImpl& op = getQueryOperation(opNo); - - op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows(); - totalBuffSize += op.m_bufferSize; + const NdbQueryOperationImpl& op = getQueryOperation(opNo); + totalBuffSize += (op.getRowSize() * op.getMaxBatchRows()); } - /** Add one word per operation for buffer overrun check. We add a word + /** + * Add one word per ResultStream for buffer overrun check. We add a word * rather than a byte to avoid possible alignment problems. */ - m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount + - sizeof(Uint32) * getNoOfOperations()); + m_rowBufferAlloc.init(m_rootFragCount * + (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) ); if (getQueryDef().isScanQuery()) { Uint32 totalRows = 0; @@ -2406,15 +2587,28 @@ NdbQueryImpl::prepareSend() return -1; } } - // 1. Build receiver structures for each QueryOperation. - // 2. Fill in parameters (into ATTRINFO) for QueryTree. - // (Has to complete *after* ::prepareReceiver() as QueryTree params - // refer receiver id's.) - // + + /** + * Allocate and initialize fragment state variables. + * Will also cause a ResultStream object containing a + * NdbReceiver to be constructed for each operation in QueryTree + */ + m_rootFrags = new NdbRootFragment[m_rootFragCount]; + if (m_rootFrags == NULL) + { + setErrorCode(Err_MemoryAlloc); + return -1; + } + for (Uint32 i = 0; i= m_query.getRootFragCount()) + if (m_currFragNo >= m_fragCount) { + sz = 0; return NULL; } else { - const NdbQueryOperationImpl& root = m_query.getQueryOperation(0U); - while (sz < bufSize && - m_currFragNo < m_query.getRootFragCount()) + Uint32 cnt = 0; + while (cnt < bufSize && m_currFragNo < m_fragCount) { - m_receiverIds[sz] = root.getReceiver(m_currFragNo).getId(); - sz++; + m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId(); + cnt++; m_currFragNo++; } + sz = cnt; return m_receiverIds; } } @@ -2690,7 +2872,7 @@ NdbQueryImpl::doSend(int nodeId, bool la * Section 2 : Optional KEYINFO section */ GenericSectionPtr secs[3]; - InitialReceiverIdIterator receiverIdIter(*this); + InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount); LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize()); LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize()); @@ -2831,28 +3013,11 @@ Remark: int NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend) { - assert(getRoot().m_resultStreams!=NULL); assert(!emptyFrag.finalBatchReceived()); assert(getQueryDef().isScanQuery()); - const Uint32 fragNo = emptyFrag.getFragNo(); emptyFrag.reset(); - for (unsigned opNo=0; opNom_tcRef)); @@ -2868,8 +3033,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm scanNextReq->transId2 = (Uint32) (transId >> 32); tSignal.setLength(ScanNextReq::SignalLength); - const uint32 receiverId = - emptyFrag.getResultStream(0).getReceiver().m_tcPtrI; + const uint32 receiverId = emptyFrag.getReceiverTcPtrI(); LinearSectionIterator receiverIdIter(&receiverId ,1); GenericSectionPtr secs[1]; @@ -3302,9 +3466,9 @@ NdbQueryImpl::OrderedFragSet::getEmpty() bool NdbQueryImpl::OrderedFragSet::verifySortOrder() const { - for(int i = 0; i0: User specified prefered value, ==0: Use default CFG values - m_resultStreams(NULL), m_params(), - m_bufferSize(0), - m_batchOverflowCheck(NULL), m_resultBuffer(NULL), m_resultRef(NULL), m_isRowNull(true), @@ -3416,10 +3577,10 @@ NdbQueryOperationImpl::NdbQueryOperation NdbQueryOperationImpl::~NdbQueryOperationImpl() { - // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed. - // Either by fetching through last row, or calling ::close() which forcefully terminates fetch - assert (m_batchOverflowCheck == NULL); - assert (m_resultStreams == NULL); + /** + * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed. + * Either by fetching through last row, or calling ::close() which forcefully terminates fetch + */ assert (m_firstRecAttr == NULL); assert (m_interpretedCode == NULL); } //NdbQueryOperationImpl::~NdbQueryOperationImpl() @@ -3431,22 +3592,6 @@ NdbQueryOperationImpl::~NdbQueryOperatio void NdbQueryOperationImpl::postFetchRelease() { - // Buffer overrun check. - assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck == 0xacbd1234); - m_batchOverflowCheck = NULL; - - if (m_resultStreams != NULL) - { - for (int i = static_cast(getQuery().getRootFragCount())-1; i >= 0; i--) - { - if (m_resultStreams[i] != NULL) - { - m_resultStreams[i]->~NdbResultStream(); - } - } - } - m_resultStreams = NULL; - Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb(); NdbRecAttr* recAttr = m_firstRecAttr; while (recAttr != NULL) { @@ -3680,7 +3825,7 @@ NdbQueryOperationImpl::firstResult() if (rootFrag != NULL) { - NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()]; + NdbResultStream& resultStream = rootFrag->getResultStream(*this); if (resultStream.firstResult() != tupleNotFound) { fetchRow(resultStream); @@ -3720,7 +3865,7 @@ NdbQueryOperationImpl::nextResult(bool f const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent(); if (rootFrag!=NULL) { - NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()]; + NdbResultStream& resultStream = rootFrag->getResultStream(*this); if (resultStream.nextResult() != tupleNotFound) { fetchRow(resultStream); @@ -3736,7 +3881,7 @@ NdbQueryOperationImpl::nextResult(bool f void NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream) { - const char* buff = resultStream.getReceiver().get_row(); + const char* buff = resultStream.getReceiver().peek_row(); assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL)); m_isRowNull = false; @@ -4051,55 +4196,9 @@ NdbQueryOperationImpl::setBatchedRows(Ui } } - -int -NdbQueryOperationImpl::prepareReceiver() -{ - // Construct receiver streams and prepare them for receiving scan result - assert(m_resultStreams==NULL); - assert(m_queryImpl.getRootFragCount() > 0); - - m_resultStreams = reinterpret_cast - (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount())); - - for(Uint32 i = 0; iprepare(); - if (unlikely(error)) { - return error; - } - - m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION, - false, this); - char* const rowBuf = reinterpret_cast(getQuery().getRowBufferAlloc() - .allocObjMem(m_bufferSize)); - m_resultStreams[i]->getReceiver() - .do_setup_ndbrecord(m_ndbRecord, - getMaxBatchRows(), - 0 /*key_size*/, - 0 /*read_range_no*/, - getRowSize(), - rowBuf); - m_resultStreams[i]->getReceiver().prepareSend(); - } - // So that we can test for for buffer overrun. - m_batchOverflowCheck = - reinterpret_cast(getQuery().getRowBufferAlloc() - .allocObjMem(sizeof(Uint32))); - *m_batchOverflowCheck = 0xacbd1234; - return 0; -}//NdbQueryOperationImpl::prepareReceiver - int NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo) { - // ::prepareReceiver() need to complete first: - assert (m_resultStreams != NULL); - const NdbQueryOperationDefImpl& def = getQueryOperationDef(); /** @@ -4553,10 +4652,12 @@ NdbQueryOperationImpl::prepareLookupKeyI bool NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len) { + TupleCorrelation tupleCorrelation; NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags; - Uint32 rootFragNo = 0; + if (getQueryDef().isScanQuery()) { + const CorrelationData correlData(ptr, len); const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId(); /** receiverId holds the Id of the receiver of the corresponding stream @@ -4572,24 +4673,27 @@ NdbQueryOperationImpl::execTRANSID_AI(co assert(false); return false; } - rootFragNo = rootFrag->getFragNo(); + + // Extract tuple correlation. + tupleCorrelation = correlData.getTupleCorrelation(); + len -= CorrelationData::wordCount; } + if (traceSignals) { ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" << ", operation no: " << getQueryOperationDef().getQueryOperationIx() - << ", fragment no: " << rootFragNo + << ", fragment no: " << rootFrag->getFragNo() << endl; } // Process result values. - m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len); - + rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation); rootFrag->incrOutstandingResults(-1); bool ret = false; if (rootFrag->isFragBatchComplete()) { - ret = m_queryImpl.handleBatchComplete(rootFragNo); + ret = m_queryImpl.handleBatchComplete(*rootFrag); } if (traceSignals) { @@ -4631,7 +4735,6 @@ NdbQueryOperationImpl::execTCKEYREF(cons } } - Uint32 rootFragNo = 0; NdbRootFragment& rootFrag = getQuery().m_rootFrags[0]; if (ref->errorCode != DbspjErr::NodeFailure) @@ -4656,13 +4759,12 @@ NdbQueryOperationImpl::execTCKEYREF(cons bool ret = false; if (rootFrag.isFragBatchComplete()) { - ret = m_queryImpl.handleBatchComplete(rootFragNo); + ret = m_queryImpl.handleBatchComplete(rootFrag); } if (traceSignals) { ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret - << ", *getRoot().m_resultStreams[0] {" - << *getRoot().m_resultStreams[0] << "}" + << ", resultStream= {" << rootFrag.getResultStream(*this) << "}" << ", *this=" << *this << endl; } return ret; @@ -4695,47 +4797,24 @@ NdbQueryOperationImpl::execSCAN_TABCONF( return false; } rootFrag->setConfReceived(); + rootFrag->setRemainingSubScans(nodeMask); rootFrag->incrOutstandingResults(rowCount); // Handle for SCAN_NEXTREQ, RNIL -> EOF - NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()]; + NdbResultStream& resultStream = rootFrag->getResultStream(*this); resultStream.getReceiver().m_tcPtrI = tcPtrI; if(traceSignals){ - ndbout << " resultStream(root) {" << resultStream << "} fragNo" - << rootFrag->getFragNo() << endl; + ndbout << " resultStream {" << rootFrag->getResultStream(*this) + << "} fragNo" << rootFrag->getFragNo() + << endl; } - const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef(); - /* Mark each scan node to indicate if the current batch is the last in the - * current sub-scan or not. - */ - for (Uint32 opNo = 0; opNo < queryDef.getNoOfOperations(); opNo++) - { - const NdbQueryOperationImpl& op = m_queryImpl.getQueryOperation(opNo); - /** - * Find the node number seen by the SPJ block. Since a unique index - * operation will have two distincts nodes in the tree used by the - * SPJ block, this number may be different from 'opNo'. - */ - const Uint32 internalOpNo = op.getQueryOperationDef().getQueryOperationId(); - assert(internalOpNo >= opNo); - const bool complete = ((nodeMask >> internalOpNo) & 1) == 0; - - // Lookups should always be 'complete' - assert(complete || op.getQueryOperationDef().isScanOperation()); - rootFrag->getResultStream(opNo).setSubScanCompletion(complete); - } - // Check that nodeMask does not have more bits than we have operations. - assert(nodeMask >> - (1+queryDef.getQueryOperation(queryDef.getNoOfOperations() - 1) - .getQueryOperationId()) == 0); - bool ret = false; if (rootFrag->isFragBatchComplete()) { /* This fragment is now complete */ - ret = m_queryImpl.handleBatchComplete(rootFrag->getFragNo()); + ret = m_queryImpl.handleBatchComplete(*rootFrag); } if (traceSignals) { ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret @@ -4871,13 +4950,6 @@ int NdbQueryOperationImpl::setBatchSize( return 0; } -NdbResultStream& -NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const -{ - assert(rootFragNo < getQuery().getRootFragCount()); - return *m_resultStreams[rootFragNo]; -} - bool NdbQueryOperationImpl::hasInterpretedCode() const { @@ -4916,15 +4988,8 @@ NdbQueryOperationImpl::prepareInterprete Uint32 NdbQueryOperationImpl::getIdOfReceiver() const { - return m_resultStreams[0]->getReceiver().getId(); -} - - -const NdbReceiver& -NdbQueryOperationImpl::getReceiver(Uint32 recNo) const { - assert(recNogetReceiver(); + NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0]; + return rootFrag.getResultStream(*this).getReceiver().getId(); } Uint32 NdbQueryOperationImpl::getRowSize() const @@ -4934,6 +4999,12 @@ Uint32 NdbQueryOperationImpl::getRowSize { m_rowSize = NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false); + + const bool withCorrelation = getRoot().getQueryDef().isScanQuery(); + if (withCorrelation) + { + m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32); + } } return m_rowSize; } @@ -4953,7 +5024,8 @@ NdbOut& operator<<(NdbOut& out, const Nd out << " m_queryImpl: " << &op.m_queryImpl; out << " m_operationDef: " << &op.m_operationDef; for(Uint32 i = 0; i