From: Ole John Aske Date: November 13 2012 10:28am Subject: bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:5035 to 5036) Bug#14709490 List-Archive: http://lists.mysql.com/commits/145254 X-Bug: 14709490 Message-Id: <20121113102844.536.53253.5036@khepri40.no.oracle.com> MIME-Version: 1.0 Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit 5036 Ole John Aske 2012-11-13 Fix for bug#14709490: EXECUTION OF 'PUSHED JOINS' MAY EXHAUST JOB BUFFERS A pushed scan query, with multiple 'bushy' lookup children could flood the job buffers by producing multiple LQHKEYREQ's for each row returned from the scan. Normally we allows a 1::4 fanout between consumed and produced signals, but this is not sufficient for such a pushed join: There could be an (almost) unlimited number of rows returned from the scan (and multiple such executing in parallel) so we could overflow the jobbuffers by allowing any fanout > 1. This fix introduce sequentialization points in the SPJ execution: Any lookup children which otherwise would have been executed in parallel with another lookup sibling is tagged with T_EXEC_SEQUENTIAL. When such a TreeNode attempt to send a LQHKEYREQ, the operation is deferred (queued). It is later resumed when its parallel child completes (CONF or REF) one of its own REQuests. Thereby we have restricted the fanout for bushy scan-lookup joins to 1::1. Furthermore, it changes the 'pushed query planer' (>= V7.2) in the handler interface such that we now try to avoid some of the bushines among lookup childrens in the pushed query. Due to the sequentialization this patch enforce among siblings, it is misleading to have an EXPLAINed query plan which show that siblings are executed in parallel. By also making them sequentially in the pushed join, we may take advantage of the selectivity of previously equi-joined children, such that non-matching branches in the query Tree are not executed. There are also the refactorications: 1) Introduce Dbspj::cleanupBatch() which is intended to do all required resorce dealloc and required reinits in preparation for a new batch of result rows. Also removes 'reset' argument from releaseRequestBuffers and instead do required reset from callee (::cleanupBatch()) if required. 2) Moved Dbspj::registerActiveCursor to a place closer to where it is normally used, and closer to other functional dependent methods. modified: storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 5035 magnus.blaudd@stripped 2012-11-12 ndb - Add missing "extern opt_server_id_mask" declaration in mysql_priv.h (aka mysqld.h in the future) modified: sql/mysql_priv.h === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' === modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp' --- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-11-08 11:55:49 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-11-13 10:13:17 +0000 @@ -112,6 +112,8 @@ struct Request; struct TreeNode; struct ScanFragHandle; + typedef DataBuffer2<14, LocalArenaPoolImpl> Correlation_list; + typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_correlation_list; typedef DataBuffer2<14, LocalArenaPoolImpl> Dependency_map; typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_dependency_map; typedef DataBuffer2<14, LocalArenaPoolImpl> PatternStore; @@ -792,6 +794,28 @@ Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2]; }; + struct DeferredParentOps + { + /** + * m_correlations contains a list of Correlation Values (Uint32) + * which identifies parent rows which has been deferred. + * m_pos are index into this array, identifying the next parent row + * for which to resume operation. + */ + Correlation_list::Head m_correlations; + Uint16 m_pos; // Next row operation to resume + + DeferredParentOps() : m_correlations(), m_pos(0) {} + + void init() { + m_correlations.init(); + m_pos = 0; + } + bool isEmpty() const { + return (m_pos == m_correlations.getSize()); + } + }; + struct TreeNode_cursor_ptr { Uint32 nextList; @@ -809,7 +833,8 @@ TreeNode() : m_magic(MAGIC), m_state(TN_END), m_parentPtrI(RNIL), m_requestPtrI(RNIL), - m_ancestors() + m_ancestors(), + m_resumeEvents(0), m_resumePtrI(RNIL) { } @@ -818,6 +843,7 @@ m_info(0), m_bits(T_LEAF), m_state(TN_BUILDING), m_parentPtrI(RNIL), m_requestPtrI(request), m_ancestors(), + m_resumeEvents(0), m_resumePtrI(RNIL), nextList(RNIL), prevList(RNIL) { // m_send.m_ref = 0; @@ -945,10 +971,27 @@ */ T_SCAN_REPEATABLE = 0x4000, + /** + * Exec of a previous REQ must complete before we can proceed. + * A ResumeEvent will later resume exec. of this operation + */ + T_EXEC_SEQUENTIAL = 0x8000, + // End marker... T_END = 0 }; + /** + * Describe whether a LQHKEY-REF and/or CONF whould trigger a + * exec resume of another TreeNode having T_EXEC_SEQUENTIAL. + * (Used as a bitmask) + */ + enum TreeNodeResumeEvents + { + TN_RESUME_REF = 0x01, + TN_RESUME_CONF = 0x02 + }; + bool isLeaf() const { return (m_bits & T_LEAF) != 0;} // table or index this TreeNode operates on, and its schemaVersion @@ -974,6 +1017,22 @@ */ RowCollection m_rows; + /** + * T_EXEC_SEQUENTIAL cause execution of child operations to + * be deferred. These operations are queued in the 'struct DeferredParentOps' + * Currently only Lookup operation might be deferred. + * Could later be extended to also cover index scans. + */ + DeferredParentOps m_deferred; + + /** + * Set of TreeNodeResumeEvents, possibly or'ed. + * Specify whether a REF or CONF will cause a resume + * of the TreeNode referred by 'm_resumePtrI'. + */ + Uint32 m_resumeEvents; + Uint32 m_resumePtrI; + union { LookupData m_lookup_data; @@ -1251,6 +1310,7 @@ const OpInfo* getOpInfo(Uint32 op); Uint32 build(Build_context&,Ptr,SectionReader&,SectionReader&); Uint32 initRowBuffers(Ptr); + void buildExecPlan(Ptr, Ptr node, Ptr next); void checkPrepareComplete(Signal*, Ptr, Uint32 cnt); void start(Signal*, Ptr); void checkBatchComplete(Signal*, Ptr, Uint32 cnt); @@ -1259,13 +1319,14 @@ void sendConf(Signal*, Ptr, bool is_complete); void complete(Signal*, Ptr); void cleanup(Ptr); + void cleanupBatch(Ptr); void abort(Signal*, Ptr, Uint32 errCode); Uint32 nodeFail(Signal*, Ptr, NdbNodeBitmask mask); Uint32 createNode(Build_context&, Ptr, Ptr &); void reportBatchComplete(Signal*, Ptr, Ptr); void releaseScanBuffers(Ptr requestPtr); - void releaseRequestBuffers(Ptr requestPtr, bool reset); + void releaseRequestBuffers(Ptr requestPtr); void releaseNodeRows(Ptr requestPtr, Ptr); void registerActiveCursor(Ptr, Ptr); void nodeFail_checkRequests(Signal*); @@ -1368,6 +1429,7 @@ Uint32 lookup_build(Build_context&,Ptr, const QueryNode*, const QueryNodeParameters*); void lookup_start(Signal*, Ptr, Ptr); + void lookup_resume(Signal*, Ptr, Ptr); void lookup_send(Signal*, Ptr, Ptr); void lookup_execTRANSID_AI(Signal*, Ptr, Ptr, const RowPtr&); @@ -1375,6 +1437,7 @@ void lookup_execLQHKEYCONF(Signal*, Ptr, Ptr); void lookup_parent_row(Signal*, Ptr, Ptr, const RowPtr &); void lookup_parent_batch_complete(Signal*, Ptr, Ptr); + void lookup_row(Signal*, Ptr, Ptr, const RowPtr &); void lookup_abort(Signal*, Ptr, Ptr); Uint32 lookup_execNODE_FAILREP(Signal*signal, Ptr, Ptr, NdbNodeBitmask); === modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp' --- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-11-08 11:55:49 +0000 +++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-11-13 10:13:17 +0000 @@ -50,7 +50,7 @@ * DEBUG options for different parts od SPJ block * Comment out those part you don't want DEBUG'ed. */ -#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl; +//#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl; //#define DEBUG_DICT(x) ndbout << "DBSPJ: "<< x << endl; //#define DEBUG_LQHKEYREQ //#define DEBUG_SCAN_FRAGREQ @@ -1314,7 +1314,35 @@ Uint32 Dbspj::initRowBuffers(Ptr requestPtr) { - Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); + jam(); + /** + * Execution of scan request requires restrictions + * of how lookup-children issues their LQHKEYREQs: + * A large scan result with many parallel lookup + * siblings can easily flood the job buffers with too many + * REQs. So we set up an 'execution plan' for how a + * scan request should be executed: + * + * NOTE: It could make sense to do the same for a lookup Req. + * However, CONF/REF for these leafs operations are not + * returned to SPJ. Thus, there are no way to know when + * the operation has completed, and other operation could + * be resumed. + * + * As a lookup request does not have the same potential for + * producing lots of LQHKEYREQs, we believe/hope the risk + * of flooding job buffers for a lookup request can be ignored. + */ + if (requestPtr.p->isScan()) + { + jam(); + Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); + Ptr treeRootPtr; + + list.first(treeRootPtr); // treeRootPtr is a scan + ndbrequire(!treeRootPtr.isNull()); + buildExecPlan(requestPtr, treeRootPtr, NullTreeNodePtr); + } /** * Init ROW_BUFFERS iff Request has to buffer any rows. @@ -1345,6 +1373,7 @@ requestPtr.p->m_rowBuffer.init(BUFFER_STACK); } + Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); Ptr treeNodePtr; for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { @@ -1372,7 +1401,150 @@ } return 0; -} +} // Dbspj::initRowBuffers + +/** + * buildExecPlan(): + * Decides the order/pace in which the different + * TreeNodes should be executed. + * Currently it is only used to insert sequentialization point in + * the execution of bushy lookup-child nodes. (aka star-join). + * This is done in order to avoid too many LQHKEYREQ-signals to + * be sent which could overflow the job buffers. + * + * For each branch of TreeNodes starting with a scan, we identify + * any 'bushines' among its lookup children. We set up a left -> right + * execution order among these such that: + * - A child lookup operation can not be REQuested before we + * either has executed a TRANSID_AI from the scan parent, + * or executed a CONF / REF from another lookup child. + * - When a lookup CONF or REF is executed, its TreeNode is + * annotated with 'resume' info which decides if/which TreeNode + * we should execute next. + * + * This will maintain a strict 1:1 fanout between incomming rows + * being processed, and new row REQuest being produced. + * Thus avoiding that large scan result will flood the jobb buffers + * with too many lookup requests. + * + * FUTURE: + * For join children where child execution now is T_EXEC_SEQUENTIAL, + * it should be relatively simple to extend SPJ to do 'inner join'. + * As we at these sequential point knows wheteher the previous + * joined children didn't found any matches, we can skip REQuesting + * rows from other children having the same parent row. + */ +void +Dbspj::buildExecPlan(Ptr requestPtr, + Ptr treeNodePtr, + Ptr nextLookup) +{ + Uint32 lookupChildren[NDB_SPJ_MAX_TREE_NODES]; + Uint32 lookupChildCnt = 0; + + /** + * Need to iterate lookup childs in reverse order to set up 'next' + * operations. As this is not possible throught ConstDataBufferIterator, + * store any lookup childs into temp array childPtrI[]. + * Scan childs are parents of new 'scan -> lookup' branches. + */ + { + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + Local_dependency_map childList(pool, treeNodePtr.p->m_dependent_nodes); + Dependency_map::ConstDataBufferIterator it; + for (childList.first(it); !it.isNull(); childList.next(it)) + { + jam(); + Ptr childPtr; + m_treenode_pool.getPtr(childPtr, *it.data); + + if (childPtr.p->m_info == &g_LookupOpInfo) + { + jam(); + lookupChildren[lookupChildCnt++] = *it.data; + } + else + { + // Build a new plan starting from this scan operation + jam(); + buildExecPlan(requestPtr, childPtr, NullTreeNodePtr); + } + } + } + + /** + * Lookup children might have to wait for previous LQHKEYREQs to + * complete before they are allowed to send their own requests. + * (In order to not overfill jobb buffers) + */ + if (treeNodePtr.p->m_info == &g_LookupOpInfo && + !nextLookup.isNull()) + { + jam(); + /** + * Annotate that: + * - 'nextLookup' is not allowed to start immediately. + * - 'treeNode' restart 'nextLookup' when it completes + */ + nextLookup.p->m_bits |= TreeNode::T_EXEC_SEQUENTIAL; + + if (lookupChildCnt==0) //'isLeaf() or only scan children + { + jam(); + treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_CONF | + TreeNode::TN_RESUME_REF; + DEBUG("ExecPlan: 'REF/CONF' from node " << treeNodePtr.p->m_node_no + << " resumes node " << nextLookup.p->m_node_no); + } + else + { + /** + * Will REQuest from one of its child lookups if CONF, + * so we don't resume another TreeNode in addition. + */ + jam(); + treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_REF; + DEBUG("ExecPlan: 'REF' from node " << treeNodePtr.p->m_node_no + << " resumes node " << nextLookup.p->m_node_no); + } + treeNodePtr.p->m_resumePtrI = nextLookup.i; + + /** + * When we T_EXEC_SEQUENTIAL, TreeNode will iterate its + * parent rows in order to create new REQ's as previous + * are completed (CONF or REF). + * - Prepare RowIterator for parent rows + * - Buffer rows to be iterated in the parent node + */ + { + jam(); + + ndbassert(nextLookup.p->m_parentPtrI != RNIL); + Ptr parentPtr; + m_treenode_pool.getPtr(parentPtr, nextLookup.p->m_parentPtrI); + parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER + | TreeNode::T_ROW_BUFFER_MAP; + requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS; + + DEBUG("ExecPlan: rows from node " << parentPtr.p->m_node_no + << " are buffered"); + } + } + + /** + * Recursively build exec. plan for any lookup child. + */ + for (int i = lookupChildCnt-1; i >= 0; i--) + { + jam(); + Ptr childPtr; + m_treenode_pool.getPtr(childPtr, lookupChildren[i]); + ndbassert(childPtr.p->m_info == &g_LookupOpInfo); + + buildExecPlan(requestPtr, childPtr, nextLookup); + nextLookup = childPtr; + } +} // Dbspj::buildExecPlan Uint32 Dbspj::createNode(Build_context& ctx, Ptr requestPtr, @@ -1547,26 +1719,18 @@ { jam(); /** - * request completed + * Entire Request completed */ cleanup(requestPtr); } - else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0) - { - jam(); - /** - * release unneeded buffers as preparation for later SCAN_NEXTREQ - */ - releaseScanBuffers(requestPtr); - } - else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0) - { - jam(); - /** - * if not multiple scans in request, simply release all pages allocated - * for row buffers (all rows will be released anyway) - */ - releaseRequestBuffers(requestPtr, true); + else + { + jam(); + /** + * Cleanup the TreeNode branches getting another + * batch of result rows. + */ + cleanupBatch(requestPtr); } } @@ -1699,6 +1863,26 @@ } void +Dbspj::registerActiveCursor(Ptr requestPtr, Ptr treeNodePtr) +{ + Uint32 bit = treeNodePtr.p->m_node_no; + ndbrequire(!requestPtr.p->m_active_nodes.get(bit)); + requestPtr.p->m_active_nodes.set(bit); + + Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes); +#ifdef VM_TRACE + { + Ptr nodePtr; + for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) + { + ndbrequire(nodePtr.i != treeNodePtr.i); + } + } +#endif + list.add(treeNodePtr); +} + +void Dbspj::sendConf(Signal* signal, Ptr requestPtr, bool is_complete) { if (requestPtr.p->isScan()) @@ -1819,26 +2003,64 @@ return 0; } +/** + * Cleanup resources in preparation for a SCAN_NEXTREQ + * requesting a new batch of rows. + */ void -Dbspj::releaseScanBuffers(Ptr requestPtr) +Dbspj::cleanupBatch(Ptr requestPtr) { + /** + * Needs to be atleast 1 active otherwise we should have + * taken the Request cleanup "path" in batchComplete + */ + ndbassert(requestPtr.p->m_cnt_active >= 1); + + /** + * Release any buffered rows for the TreeNode branches + * getting new rows. + */ + if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0) + { + if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0) + { + jam(); + /** + * A MULTI_SCAN may selectively retrieve rows from only + * some of the (scan-) branches in the Request. + * Selectively release from only these brances. + */ + releaseScanBuffers(requestPtr); + } + else + { + jam(); + /** + * if not multiple scans in request, simply release all pages allocated + * for row buffers (all rows will be released anyway) + */ + // Root node should be the one and only being active + ndbassert(requestPtr.p->m_cnt_active == 1); + ndbassert(requestPtr.p->m_active_nodes.get(0)); + releaseRequestBuffers(requestPtr); + } + } //RT_ROW_BUFFERS + + Ptr treeNodePtr; Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { /** - * Release buffered rows for all treeNodes getting more rows + * Re-init row buffer structures for those treeNodes getting more rows * in the following NEXTREQ, including all its childs. */ if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) || requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors)) { - if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) - { - jam(); - releaseNodeRows(requestPtr, treeNodePtr); - } + jam(); + treeNodePtr.p->m_rows.init(); } /** @@ -1848,6 +2070,21 @@ if (requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors)) { jam(); + /** + * Common TreeNode cleanup: + * Release list of deferred operations which may refer + * buffered rows released above. + */ + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + { + Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + correlations.release(); + } + treeNodePtr.p->m_deferred.init(); + + /** + * TreeNode-type specific cleanup. + */ if (treeNodePtr.p->m_info->m_parent_batch_cleanup != 0) { jam(); @@ -1856,31 +2093,30 @@ } } } - /** - * Needs to be atleast 1 active otherwise we should have - * taken the cleanup "path" in batchComplete - */ - ndbrequire(requestPtr.p->m_cnt_active >= 1); } void -Dbspj::registerActiveCursor(Ptr requestPtr, Ptr treeNodePtr) +Dbspj::releaseScanBuffers(Ptr requestPtr) { - Uint32 bit = treeNodePtr.p->m_node_no; - ndbrequire(!requestPtr.p->m_active_nodes.get(bit)); - requestPtr.p->m_active_nodes.set(bit); + Ptr treeNodePtr; + Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); - Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes); -#ifdef VM_TRACE + for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr)) { - Ptr nodePtr; - for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) + /** + * Release buffered rows for all treeNodes getting more rows + * in the following NEXTREQ, including all its childs. + */ + if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) || + requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors)) { - ndbrequire(nodePtr.i != treeNodePtr.i); + if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER) + { + jam(); + releaseNodeRows(requestPtr, treeNodePtr); + } } } -#endif - list.add(treeNodePtr); } void @@ -1906,7 +2142,6 @@ releaseRow(treeNodePtr.p->m_rows, pos); cnt ++; } - treeNodePtr.p->m_rows.init(); DEBUG("RowIterator: released " << cnt << " rows!"); if (treeNodePtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP) @@ -1982,7 +2217,7 @@ } void -Dbspj::releaseRequestBuffers(Ptr requestPtr, bool reset) +Dbspj::releaseRequestBuffers(Ptr requestPtr) { DEBUG("releaseRequestBuffers" << ", request: " << requestPtr.i @@ -2006,17 +2241,6 @@ } requestPtr.p->m_rowBuffer.reset(); } - - if (reset) - { - Ptr nodePtr; - Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes); - for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr)) - { - jam(); - nodePtr.p->m_rows.init(); - } - } } void @@ -2226,7 +2450,7 @@ jam(); m_lookup_request_hash.remove(requestPtr, *requestPtr.p); } - releaseRequestBuffers(requestPtr, false); + releaseRequestBuffers(requestPtr); ArenaHead ah = requestPtr.p->m_arena; m_request_pool.release(requestPtr); m_arenaAllocator.release(ah); @@ -2253,6 +2477,11 @@ pattern.release(); } + { + Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + correlations.release(); + } + if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL) { jam(); @@ -3612,6 +3841,7 @@ treeNodePtr.p->m_lookup_data.m_outstanding--; if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { @@ -3726,7 +3956,21 @@ treeNodePtr.p->m_lookup_data.m_outstanding -= cnt; + /** + * Another TreeNode awaited for completion of this request + * before it could resume its operation. + */ + if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF) + { + jam(); + ndbassert(treeNodePtr.p->m_resumePtrI != RNIL); + Ptr resumeTreeNodePtr; + m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI); + lookup_resume(signal, requestPtr, resumeTreeNodePtr); + } + if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { @@ -3762,7 +4006,21 @@ treeNodePtr.p->m_lookup_data.m_outstanding--; + /** + * Another TreeNode awaited for completion of this request + * before it could resume its operation. + */ + if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_CONF) + { + jam(); + ndbassert(treeNodePtr.p->m_resumePtrI != RNIL); + Ptr resumeTreeNodePtr; + m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI); + lookup_resume(signal, requestPtr, resumeTreeNodePtr); + } + if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_parent_batch_complete && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { @@ -3786,6 +4044,99 @@ { jam(); + DEBUG("::lookup_parent_row" + << ", node: " << treeNodePtr.p->m_node_no); + + if (treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL) + { + jam(); + DEBUG("T_EXEC_SEQUENTIAL --> exec deferred"); + + /** + * Append correlation values of deferred parent row operations + * to a list / fifo. Upon resume, we will then be able to + * relocate all parent rows for which to resume operations. + */ + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + if (!correlations.append(&rowRef.m_src_correlation, 1)) + { + jam(); + abort(signal, requestPtr, DbspjErr::OutOfQueryMemory); + return; + } + return; + } + + lookup_row(signal, requestPtr, treeNodePtr, rowRef); +} // Dbspj::lookup_parent_row() + +/** + * lookup_resume() is a delayed lookup_parent_row. + * It will locate the next parent row now allowed to execute, + * and create a child lookup request for that row. + */ +void +Dbspj::lookup_resume(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr) +{ + DEBUG("::lookup_resume" + << ", node: " << treeNodePtr.p->m_node_no + ); + + ndbassert(treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL); + ndbassert(treeNodePtr.p->m_parentPtrI != RNIL); + ndbassert(!treeNodePtr.p->m_deferred.isEmpty()); + + if (unlikely(requestPtr.p->m_state & Request::RS_ABORTING)) + { + jam(); + return; + } + + Uint32 corrVal; + { + LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool); + Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations); + + Local_pattern_store::DataBufferIterator it; + const bool valid = correlations.position(it, (Uint32)(treeNodePtr.p->m_deferred.m_pos++)); + (void)valid; ndbassert(valid); + corrVal = *it.data; + } + + Ptr parentPtr; + m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI); + + // Set up RowPtr & RowRef for this parent row + RowPtr row; + row.m_src_node_ptrI = parentPtr.i; + row.m_src_correlation = corrVal; + + ndbassert(parentPtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP); + RowRef ref; + parentPtr.p->m_rows.m_map.copyto(ref); + const Uint32* const mapptr = get_row_ptr(ref); + + // Relocate parent row from correlation value. + const Uint32 rowId = (corrVal & 0xFFFF); + parentPtr.p->m_rows.m_map.load(mapptr, rowId, ref); + + const Uint32* const rowptr = get_row_ptr(ref); + setupRowPtr(parentPtr.p->m_rows, row, ref, rowptr); + + lookup_row(signal, requestPtr, treeNodePtr, row); +} // Dbspj::lookup_resume() + +void +Dbspj::lookup_row(Signal* signal, + Ptr requestPtr, + Ptr treeNodePtr, + const RowPtr & rowRef) +{ + jam(); + /** * Here we need to... * 1) construct a key @@ -3796,7 +4147,7 @@ const Uint32 tableId = treeNodePtr.p->m_tableOrIndexId; const Uint32 corrVal = rowRef.m_src_correlation; - DEBUG("::lookup_parent_row" + DEBUG("::lookup_row" << ", node: " << treeNodePtr.p->m_node_no); do @@ -3853,10 +4204,11 @@ * When the key contains NULL values, an EQ-match is impossible! * Entire lookup request can therefore be eliminate as it is known * to be REFused with errorCode = 626 (Row not found). - * Different handling is required depening of request being a - * scan or lookup: + * Scan request can elliminate these child LQHKEYREQs if REFs + * are not needed in order to handle TN_RESUME_REF. */ - if (requestPtr.p->isScan()) + if (requestPtr.p->isScan() && + (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF) == 0) { /** * Scan request: We can simply ignore lookup operation: @@ -3868,7 +4220,7 @@ releaseSection(ptrI); return; // Bailout, KEYREQ would have returned KEYREF(626) anyway } - else // isLookup() + else // isLookup() or 'need TN_RESUME_REF' { /** * Ignored lookup request need a faked KEYREF for the lookup operation. @@ -4053,7 +4405,9 @@ ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete); treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true; + if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE + && treeNodePtr.p->m_deferred.isEmpty() && treeNodePtr.p->m_lookup_data.m_outstanding == 0) { jam(); @@ -4679,6 +5033,7 @@ } } } + ndbassert(treeNodePtr.p->m_resumePtrI == RNIL); if (treeNodePtr.p->m_scanfrag_data.m_rows_received == treeNodePtr.p->m_scanfrag_data.m_rows_expecting) @@ -6312,6 +6667,7 @@ ScanIndexData& data = treeNodePtr.p->m_scanindex_data; data.m_rows_received++; + ndbassert(treeNodePtr.p->m_resumePtrI == RNIL); if (data.m_frags_outstanding == 0 && data.m_rows_received == data.m_rows_expecting) No bundle (reason: useless for push emails).