From: Ole John Aske Date: November 13 2012 10:28am Subject: bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4660 to 4661) List-Archive: http://lists.mysql.com/commits/145255 Message-Id: <20121113102830.526.93802.4661@khepri40.no.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 4661 Ole John Aske 2012-11-13 [merge] Merge 7.0 -> 7.1 modified: storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 4660 magnus.blaudd@stripped 2012-11-12 [merge] Merge 7.0 -> 7.1 modified: sql/mysql_priv.h === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' --- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-11-08 11:58:49 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-11-13 10:14:52 +0000 @@ -112,6 +112,8 @@ struct Request; struct TreeNode; struct ScanFragHandle; + typedef DataBuffer2<14, LocalArenaPoolImpl> Correlation_list; + typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_correlation_list; typedef DataBuffer2<14, LocalArenaPoolImpl> Dependency_map; typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_dependency_map; typedef DataBuffer2<14, LocalArenaPoolImpl> PatternStore; @@ -792,6 +794,28 @@ Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2]; }; + struct DeferredParentOps + { + /** + * m_correlations contains a list of Correlation Values (Uint32) + * which identifies parent rows which has been deferred. + * m_pos are index into this array, identifying the next parent row + * for which to resume operation. + */ + Correlation_list::Head m_correlations; + Uint16 m_pos; // Next row operation to resume + + DeferredParentOps() : m_correlations(), m_pos(0) {} + + void init() { + m_correlations.init(); + m_pos = 0; + } + bool isEmpty() const { + return (m_pos == m_correlations.getSize()); + } + }; + struct TreeNode_cursor_ptr { Uint32 nextList; @@ -809,7 +833,8 @@ TreeNode() : m_magic(MAGIC), m_state(TN_END), m_parentPtrI(RNIL), m_requestPtrI(RNIL), - m_ancestors() + m_ancestors(), + m_resumeEvents(0), m_resumePtrI(RNIL) { } @@ -818,6 +843,7 @@ m_info(0), m_bits(T_LEAF), m_state(TN_BUILDING), m_parentPtrI(RNIL), m_requestPtrI(request), m_ancestors(), + m_resumeEvents(0), m_resumePtrI(RNIL), nextList(RNIL), prevList(RNIL) { // m_send.m_ref = 0; @@ -945,10 +971,27 @@ */ T_SCAN_REPEATABLE = 0x4000, + /** + * Exec of a previous REQ must complete before we can proceed. + * A ResumeEvent will later resume exec. of this operation + */ + T_EXEC_SEQUENTIAL = 0x8000, + // End marker... T_END = 0 }; + /** + * Describe whether a LQHKEY-REF and/or CONF whould trigger a + * exec resume of another TreeNode having T_EXEC_SEQUENTIAL. + * (Used as a bitmask) + */ + enum TreeNodeResumeEvents + { + TN_RESUME_REF = 0x01, + TN_RESUME_CONF = 0x02 + }; + bool isLeaf() const { return (m_bits & T_LEAF) != 0;} // table or index this TreeNode operates on, and its schemaVersion @@ -974,6 +1017,22 @@ */ RowCollection m_rows; + /** + * T_EXEC_SEQUENTIAL cause execution of child operations to + * be deferred. These operations are queued in the 'struct DeferredParentOps' + * Currently only Lookup operation might be deferred. + * Could later be extended to also cover index scans. + */ + DeferredParentOps m_deferred; + + /** + * Set of TreeNodeResumeEvents, possibly or'ed. + * Specify whether a REF or CONF will cause a resume + * of the TreeNode referred by 'm_resumePtrI'. + */ + Uint32 m_resumeEvents; + Uint32 m_resumePtrI; + union { LookupData m_lookup_data; @@ -1251,6 +1310,7 @@ const OpInfo* getOpInfo(Uint32 op); Uint32 build(Build_context&,Ptr,SectionReader&,SectionReader&); Uint32 initRowBuffers(Ptr); + void buildExecPlan(Ptr, Ptr node, Ptr next); void checkPrepareComplete(Signal*, Ptr, Uint32 cnt); void start(Signal*, Ptr); void checkBatchComplete(Signal*, Ptr, Uint32 cnt); @@ -1259,13 +1319,14 @@ void sendConf(Signal*, Ptr, bool is_complete); void complete(Signal*, Ptr); void cleanup(Ptr); + void cleanupBatch(Ptr); void abort(Signal*, Ptr, Uint32 errCode); Uint32 nodeFail(Signal*, Ptr, NdbNodeBitmask mask); Uint32 createNode(Build_context&, Ptr, Ptr &); void reportBatchComplete(Signal*, Ptr, Ptr); void releaseScanBuffers(Ptr requestPtr); - void releaseRequestBuffers(Ptr requestPtr, bool reset); + void releaseRequestBuffers(Ptr requestPtr); void releaseNodeRows(Ptr requestPtr, Ptr); void registerActiveCursor(Ptr, Ptr); void nodeFail_checkRequests(Signal*); @@ -1368,6 +1429,7 @@ Uint32 lookup_build(Build_context&,Ptr, const QueryNode*, const QueryNodeParameters*); void lookup_start(Signal*, Ptr, Ptr); + void lookup_resume(Signal*, Ptr, Ptr); void lookup_send(Signal*, Ptr, Ptr); void lookup_execTRANSID_AI(Signal*, Ptr, Ptr, const RowPtr&); @@ -1375,6 +1437,7 @@ void lookup_execLQHKEYCONF(Signal*, Ptr, Ptr); void lookup_parent_row(Signal*, Ptr, Ptr, const RowPtr &); void lookup_parent_batch_complete(Signal*, Ptr, Ptr); + void lookup_row(Signal*, Ptr, Ptr, const RowPtr &); void lookup_abort(Signal*, Ptr, Ptr); Uint32 lookup_execNODE_FAILREP(Signal*signal, Ptr, Ptr, NdbNodeBitmask); === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-11-08 11:58:49 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-11-13 10:14:52 +0000 @@ -50,7 +50,7 @@ * DEBUG options for different parts od SPJ block * Comment out those part you don't want DEBUG'ed. */ -#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl; +//#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl; //#define DEBUG_DICT(x) ndbout << "DBSPJ: "<< x << endl; //#define DEBUG_LQHKEYREQ //#define DEBUG_SCAN_FRAGREQ @@ -1314,7 +1314,35 @@ Uint32 Dbspj::initRowBuffers(Ptr requestPtr) { - Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); + jam(); + /** + * Execution of scan request requires restrictions + * of how lookup-children issues their LQHKEYREQs: + * A large scan result with many parallel lookup + * siblings can easily flood the job buffers with too many + * REQs. So we set up an 'execution plan' for how a + * scan request should be executed: + * + * NOTE: It could make sense to do the same for a lookup Req. + * However, CONF/REF for these leafs operations are not + * returned to SPJ. Thus, there are no way to know when + * the operation has completed, and other operation could + * be resumed. + * + * As a lookup request does not have the same potential for + * producing lots of LQHKEYREQs, we believe/hope the risk + * of flooding job buffers for a lookup request can be ignored. + */ + if (requestPtr.p->isScan()) + { + jam(); + Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); + Ptr treeRootPtr; + + list.first(treeRootPtr); // treeRootPtr is a scan + ndbrequire(!treeRootPtr.isNull()); + buildExecPlan(requestPtr, treeRootPtr, NullTreeNodePtr); + } /** * Init ROW_BUFFERS iff Request has to buffer any rows. @@ -1345,6 +1373,7 @@ requestPtr.p->m_rowBuffer.init(BUFFER_STACK); } + Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); Ptr treeNodePtr; for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { @@ -1372,7 +1401,150 @@ } return 0; -} +} // Dbspj::initRowBuffers + +/** + * buildExecPlan(): + * Decides the order/pace in which the different + * TreeNodes should be executed. + * Currently it is only used to insert sequentialization point in + * the execution of bushy lookup-child nodes. (aka star-join). + * This is done in order to avoid too many LQHKEYREQ-signals to + * be sent which could overflow the job buffers. + * + * For each branch of TreeNodes starting with a scan, we identify + * any 'bushines' among its lookup children. We set up a left -> right + * execution order among these such that: + * - A child lookup operation can not be REQuested before we + * either has executed a TRANSID_AI from the scan parent, + * or executed a CONF / REF from another lookup child. + * - When a lookup CONF or REF is executed, its TreeNode is + * annotated with 'resume' info which decides if/which TreeNode + * we should execute next. + * + * This will maintain a strict 1:1 fanout between incomming rows + * being processed, and new row REQuest being produced. + * Thus avoiding that large scan result will flood the jobb buffers + * with too many lookup requests. + * + * FUTURE: + * For join children where child execution now is T_EXEC_SEQUENTIAL, + * it should be relatively simple to extend SPJ to do 'inner join'. + * As we at these sequential point knows wheteher the previous + * joined children didn't found any matches, we can skip REQuesting + * rows from other children having the same parent row. + */ +void +Dbspj::buildExecPlan(Ptr requestPtr, + Ptr treeNodePtr, + Ptr nextLookup) +{ + Uint32 lookupChildren[NDB_SPJ_MAX_TREE_NODES]; + Uint32 lookupChildCnt = 0; + + /** + * Need to iterate lookup childs in reverse order to set up 'next' + * operations. As this is not possible throught ConstDataBufferIterator, + * store any lookup childs into temp array childPtrI[]. + * Scan childs are parents of new 'scan -> lookup' branches. + */ + { + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + Local_dependency_map childList(pool, treeNodePtr.p->m_dependent_nodes); + Dependency_map::ConstDataBufferIterator it; + for (childList.first(it); !it.isNull(); childList.next(it)) + { + jam(); + Ptr childPtr; + m_treenode_pool.getPtr(childPtr, *it.data); + + if (childPtr.p->m_info == &g_LookupOpInfo) + { + jam(); + lookupChildren[lookupChildCnt++] = *it.data; + } + else + { + // Build a new plan starting from this scan operation + jam(); + buildExecPlan(requestPtr, childPtr, NullTreeNodePtr); + } + } + } + + /** + * Lookup children might have to wait for previous LQHKEYREQs to + * complete before they are allowed to send their own requests. + * (In order to not overfill jobb buffers) + */ + if (treeNodePtr.p->m_info == &g_LookupOpInfo && + !nextLookup.isNull()) + { + jam(); + /** + * Annotate that: + * - 'nextLookup' is not allowed to start immediately. + * - 'treeNode' restart 'nextLookup' when it completes + */ + nextLookup.p->m_bits |= TreeNode::T_EXEC_SEQUENTIAL; + + if (lookupChildCnt==0) //'isLeaf() or only scan children + { + jam(); + treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_CONF | + TreeNode::TN_RESUME_REF; + DEBUG("ExecPlan: 'REF/CONF' from node " << treeNodePtr.p->m_node_no + << " resumes node " << nextLookup.p->m_node_no); + } + else + { + /** + * Will REQuest from one of its child lookups if CONF, + * so we don't resume another TreeNode in addition. + */ + jam(); + treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_REF; + DEBUG("ExecPlan: 'REF' from node " << treeNodePtr.p->m_node_no + << " resumes node " << nextLookup.p->m_node_no); + } + treeNodePtr.p->m_resumePtrI = nextLookup.i; + + /** + * When we T_EXEC_SEQUENTIAL, TreeNode will iterate its + * parent rows in order to create new REQ's as previous + * are completed (CONF or REF). + * - Prepare RowIterator for parent rows + * - Buffer rows to be iterated in the parent node + */ + { + jam(); + + ndbassert(nextLookup.p->m_parentPtrI != RNIL); + Ptr parentPtr; + m_treenode_pool.getPtr(parentPtr, nextLookup.p->m_parentPtrI); + parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER + | TreeNode::T_ROW_BUFFER_MAP; + requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS; + + DEBUG("ExecPlan: rows from node " << parentPtr.p->m_node_no + << " are buffered"); + } + } + + /** + * Recursively build exec. plan for any lookup child. + */ + for (int i = lookupChildCnt-1; i >= 0; i--) + { + jam(); + Ptr childPtr; + m_treenode_pool.getPtr(childPtr, lookupChildren[i]); + ndbassert(childPtr.p->m_info == &g_LookupOpInfo); + + buildExecPlan(requestPtr, childPtr, nextLookup); + nextLookup = childPtr; + } +} // Dbspj::buildExecPlan Uint32 Dbspj::createNode(Build_context& ctx, Ptr requestPtr, @@ -1547,26 +1719,18 @@ { jam(); /** - * request completed + * Entire Request completed */ cleanup(requestPtr); } - else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0) - { - jam(); - /** - * release unneeded buffers as preparation for later SCAN_NEXTREQ - */ - releaseScanBuffers(requestPtr); - } - else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0) - { - jam(); - /** - * if not multiple scans in request, simply release all pages allocated - * for row buffers (all rows will be released anyway) - */ - releaseRequestBuffers(requestPtr, true); + else + { + jam(); + /** + * Cleanup the TreeNode branches getting another + * batch of result rows. + */ + cleanupBatch(requestPtr); } } @@ -1699,6 +1863,26 @@ } void +Dbspj::registerActiveCursor(Ptr requestPtr, Ptr treeNodePtr) +{ + Uint32 bit = treeNodePtr.p->m_node_no; + ndbrequire(!requestPtr.p->m_active_nodes.get(bit)); + requestPtr.p->m_active_nodes.set(bit); + + Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes); +#ifdef VM_TRACE + { + Ptr nodePtr; + for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) + { + ndbrequire(nodePtr.i != treeNodePtr.i); + } + } +#endif + list.add(treeNodePtr); +} + +void Dbspj::sendConf(Signal* signal, Ptr requestPtr, bool is_complete) { if (requestPtr.p->isScan()) @@ -1819,26 +2003,64 @@ return 0; } +/** + * Cleanup resources in preparation for a SCAN_NEXTREQ + * requesting a new batch of rows. + */ void -Dbspj::releaseScanBuffers(Ptr requestPtr) +Dbspj::cleanupBatch(Ptr requestPtr) { + /** + * Needs to be atleast 1 active otherwise we should have + * taken the Request cleanup "path" in batchComplete + */ + ndbassert(requestPtr.p->m_cnt_active >= 1); + + /** + * Release any buffered rows for the TreeNode branches + * getting new rows. + */ + if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0) + { + if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0) + { + jam(); + /** + * A MULTI_SCAN may selectively retrieve rows from only + * some of the (scan-) branches in the Request. + * Selectively release from only these brances. + */ + releaseScanBuffers(requestPtr); + } + else + { + jam(); + /** + * if not multiple scans in request, simply release all pages allocated + * for row buffers (all rows will be released anyway) + */ + // Root node should be the one and only being active + ndbassert(requestPtr.p->m_cnt_active == 1); + ndbassert(requestPtr.p->m_active_nodes.get(0)); + releaseRequestBuffers(requestPtr); + } + } //RT_ROW_BUFFERS + + Ptr treeNodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { /** - * Release buffered rows for all treeNodes getting more rows + * Re-init row buffer structures for those treeNodes getting more rows * in the following NEXTREQ, including all its childs. */ if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) || requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors)) { - if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) - { - jam(); - releaseNodeRows(requestPtr, treeNodePtr); - } + jam(); + treeNodePtr.p->m_rows.init(); } /** @@ -1848,6 +2070,21 @@ if (requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors)) { jam(); + /** + * Common TreeNode cleanup: + * Release list of deferred operations which may refer + * buffered rows released above. + */ + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + { + Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + correlations.release(); + } + treeNodePtr.p->m_deferred.init(); + + /** + * TreeNode-type specific cleanup. + */ if (treeNodePtr.p->m_info->m_parent_batch_cleanup != 0) { jam(); @@ -1856,31 +2093,30 @@ } } } - /** - * Needs to be atleast 1 active otherwise we should have - * taken the cleanup "path" in batchComplete - */ - ndbrequire(requestPtr.p->m_cnt_active >= 1); } void -Dbspj::registerActiveCursor(Ptr requestPtr, Ptr treeNodePtr) +Dbspj::releaseScanBuffers(Ptr requestPtr) { - Uint32 bit = treeNodePtr.p->m_node_no; - ndbrequire(!requestPtr.p->m_active_nodes.get(bit)); - requestPtr.p->m_active_nodes.set(bit); + Ptr treeNodePtr; + Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); - Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes); -#ifdef VM_TRACE + for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { - Ptr nodePtr; - for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) + /** + * Release buffered rows for all treeNodes getting more rows + * in the following NEXTREQ, including all its childs. + */ + if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) || + requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors)) { - ndbrequire(nodePtr.i != treeNodePtr.i); + if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) + { + jam(); + releaseNodeRows(requestPtr, treeNodePtr); + } } } -#endif - list.add(treeNodePtr); } void @@ -1906,7 +2142,6 @@ releaseRow(treeNodePtr.p->m_rows, pos); cnt ++; } - treeNodePtr.p->m_rows.init(); DEBUG("RowIterator: released " << cnt << " rows!"); if (treeNodePtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP) @@ -1982,7 +2217,7 @@ } void -Dbspj::releaseRequestBuffers(Ptr requestPtr, bool reset) +Dbspj::releaseRequestBuffers(Ptr requestPtr) { DEBUG("releaseRequestBuffers" << ", request: " << requestPtr.i @@ -2006,17 +2241,6 @@ } requestPtr.p->m_rowBuffer.reset(); } - - if (reset) - { - Ptr nodePtr; - Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); - for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) - { - jam(); - nodePtr.p->m_rows.init(); - } - } } void @@ -2226,7 +2450,7 @@ jam(); m_lookup_request_hash.remove(requestPtr, *requestPtr.p); } - releaseRequestBuffers(requestPtr, false); + releaseRequestBuffers(requestPtr); ArenaHead ah = requestPtr.p->m_arena; m_request_pool.release(requestPtr); m_arenaAllocator.release(ah); @@ -2253,6 +2477,11 @@ pattern.release(); } + { + Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + correlations.release(); + } + if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL) { jam(); @@ -3612,6 +3841,7 @@ treeNodePtr.p->m_lookup_data.m_outstanding--; if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { @@ -3726,7 +3956,21 @@ treeNodePtr.p->m_lookup_data.m_outstanding -= cnt; + /** + * Another TreeNode awaited for completion of this request + * before it could resume its operation. + */ + if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF) + { + jam(); + ndbassert(treeNodePtr.p->m_resumePtrI != RNIL); + Ptr resumeTreeNodePtr; + m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI); + lookup_resume(signal, requestPtr, resumeTreeNodePtr); + } + if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { @@ -3762,7 +4006,21 @@ treeNodePtr.p->m_lookup_data.m_outstanding--; + /** + * Another TreeNode awaited for completion of this request + * before it could resume its operation. + */ + if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_CONF) + { + jam(); + ndbassert(treeNodePtr.p->m_resumePtrI != RNIL); + Ptr resumeTreeNodePtr; + m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI); + lookup_resume(signal, requestPtr, resumeTreeNodePtr); + } + if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { @@ -3786,6 +4044,99 @@ { jam(); + DEBUG("::lookup_parent_row" + << ", node: " << treeNodePtr.p->m_node_no); + + if (treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL) + { + jam(); + DEBUG("T_EXEC_SEQUENTIAL --> exec deferred"); + + /** + * Append correlation values of deferred parent row operations + * to a list / fifo. Upon resume, we will then be able to + * relocate all parent rows for which to resume operations. + */ + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + if (!correlations.append(&rowRef.m_src_correlation, 1)) + { + jam(); + abort(signal, requestPtr, DbspjErr::OutOfQueryMemory); + return; + } + return; + } + + lookup_row(signal, requestPtr, treeNodePtr, rowRef); +} // Dbspj::lookup_parent_row() + +/** + * lookup_resume() is a delayed lookup_parent_row. + * It will locate the next parent row now allowed to execute, + * and create a child lookup request for that row. + */ +void +Dbspj::lookup_resume(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr) +{ + DEBUG("::lookup_resume" + << ", node: " << treeNodePtr.p->m_node_no + ); + + ndbassert(treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL); + ndbassert(treeNodePtr.p->m_parentPtrI != RNIL); + ndbassert(!treeNodePtr.p->m_deferred.isEmpty()); + + if (unlikely(requestPtr.p->m_state & Request::RS_ABORTING)) + { + jam(); + return; + } + + Uint32 corrVal; + { + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + + Local_pattern_store::DataBufferIterator it; + const bool valid = correlations.position(it, (Uint32)(treeNodePtr.p->m_deferred.m_pos++)); + (void)valid; ndbassert(valid); + corrVal = *it.data; + } + + Ptr parentPtr; + m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI); + + // Set up RowPtr & RowRef for this parent row + RowPtr row; + row.m_src_node_ptrI = parentPtr.i; + row.m_src_correlation = corrVal; + + ndbassert(parentPtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP); + RowRef ref; + parentPtr.p->m_rows.m_map.copyto(ref); + const Uint32* const mapptr = get_row_ptr(ref); + + // Relocate parent row from correlation value. + const Uint32 rowId = (corrVal & 0xFFFF); + parentPtr.p->m_rows.m_map.load(mapptr, rowId, ref); + + const Uint32* const rowptr = get_row_ptr(ref); + setupRowPtr(parentPtr.p->m_rows, row, ref, rowptr); + + lookup_row(signal, requestPtr, treeNodePtr, row); +} // Dbspj::lookup_resume() + +void +Dbspj::lookup_row(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr, + const RowPtr & rowRef) +{ + jam(); + /** * Here we need to... * 1) construct a key @@ -3796,7 +4147,7 @@ const Uint32 tableId = treeNodePtr.p->m_tableOrIndexId; const Uint32 corrVal = rowRef.m_src_correlation; - DEBUG("::lookup_parent_row" + DEBUG("::lookup_row" << ", node: " << treeNodePtr.p->m_node_no); do @@ -3853,10 +4204,11 @@ * When the key contains NULL values, an EQ-match is impossible! * Entire lookup request can therefore be eliminate as it is known * to be REFused with errorCode = 626 (Row not found). - * Different handling is required depening of request being a - * scan or lookup: + * Scan request can elliminate these child LQHKEYREQs if REFs + * are not needed in order to handle TN_RESUME_REF. */ - if (requestPtr.p->isScan()) + if (requestPtr.p->isScan() && + (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF) == 0) { /** * Scan request: We can simply ignore lookup operation: @@ -3868,7 +4220,7 @@ releaseSection(ptrI); return; // Bailout, KEYREQ would have returned KEYREF(626) anyway } - else // isLookup() + else // isLookup() or 'need TN_RESUME_REF' { /** * Ignored lookup request need a faked KEYREF for the lookup operation. @@ -4053,7 +4405,9 @@ ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete); treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true; + if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { jam(); @@ -4679,6 +5033,7 @@ } } } + ndbassert(treeNodePtr.p->m_resumePtrI == RNIL); if (treeNodePtr.p->m_scanfrag_data.m_rows_received == treeNodePtr.p->m_scanfrag_data.m_rows_expecting) @@ -6312,6 +6667,7 @@ ScanIndexData& data = treeNodePtr.p->m_scanindex_data; data.m_rows_received++; + ndbassert(treeNodePtr.p->m_resumePtrI == RNIL); if (data.m_frags_outstanding == 0 && data.m_rows_received == data.m_rows_expecting) No bundle (reason: useless for push emails).