4661 Ole John Aske 2012-11-13 [merge]
Merge 7.0 -> 7.1
modified:
storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
4660 magnus.blaudd@stripped 2012-11-12 [merge]
Merge 7.0 -> 7.1
modified:
sql/mysql_priv.h
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-11-08 11:58:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp 2012-11-13 10:14:52 +0000
@@ -112,6 +112,8 @@
struct Request;
struct TreeNode;
struct ScanFragHandle;
+ typedef DataBuffer2<14, LocalArenaPoolImpl> Correlation_list;
+ typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_correlation_list;
typedef DataBuffer2<14, LocalArenaPoolImpl> Dependency_map;
typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_dependency_map;
typedef DataBuffer2<14, LocalArenaPoolImpl> PatternStore;
@@ -792,6 +794,28 @@
Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
};
+ struct DeferredParentOps
+ {
+ /**
+ * m_correlations contains a list of Correlation Values (Uint32)
+ * which identifies parent rows which has been deferred.
+ * m_pos are index into this array, identifying the next parent row
+ * for which to resume operation.
+ */
+ Correlation_list::Head m_correlations;
+ Uint16 m_pos; // Next row operation to resume
+
+ DeferredParentOps() : m_correlations(), m_pos(0) {}
+
+ void init() {
+ m_correlations.init();
+ m_pos = 0;
+ }
+ bool isEmpty() const {
+ return (m_pos == m_correlations.getSize());
+ }
+ };
+
struct TreeNode_cursor_ptr
{
Uint32 nextList;
@@ -809,7 +833,8 @@
TreeNode()
: m_magic(MAGIC), m_state(TN_END),
m_parentPtrI(RNIL), m_requestPtrI(RNIL),
- m_ancestors()
+ m_ancestors(),
+ m_resumeEvents(0), m_resumePtrI(RNIL)
{
}
@@ -818,6 +843,7 @@
m_info(0), m_bits(T_LEAF), m_state(TN_BUILDING),
m_parentPtrI(RNIL), m_requestPtrI(request),
m_ancestors(),
+ m_resumeEvents(0), m_resumePtrI(RNIL),
nextList(RNIL), prevList(RNIL)
{
// m_send.m_ref = 0;
@@ -945,10 +971,27 @@
*/
T_SCAN_REPEATABLE = 0x4000,
+ /**
+ * Exec of a previous REQ must complete before we can proceed.
+ * A ResumeEvent will later resume exec. of this operation
+ */
+ T_EXEC_SEQUENTIAL = 0x8000,
+
// End marker...
T_END = 0
};
+ /**
+ * Describe whether a LQHKEY-REF and/or CONF whould trigger a
+ * exec resume of another TreeNode having T_EXEC_SEQUENTIAL.
+ * (Used as a bitmask)
+ */
+ enum TreeNodeResumeEvents
+ {
+ TN_RESUME_REF = 0x01,
+ TN_RESUME_CONF = 0x02
+ };
+
bool isLeaf() const { return (m_bits & T_LEAF) != 0;}
// table or index this TreeNode operates on, and its schemaVersion
@@ -974,6 +1017,22 @@
*/
RowCollection m_rows;
+ /**
+ * T_EXEC_SEQUENTIAL cause execution of child operations to
+ * be deferred. These operations are queued in the 'struct DeferredParentOps'
+ * Currently only Lookup operation might be deferred.
+ * Could later be extended to also cover index scans.
+ */
+ DeferredParentOps m_deferred;
+
+ /**
+ * Set of TreeNodeResumeEvents, possibly or'ed.
+ * Specify whether a REF or CONF will cause a resume
+ * of the TreeNode referred by 'm_resumePtrI'.
+ */
+ Uint32 m_resumeEvents;
+ Uint32 m_resumePtrI;
+
union
{
LookupData m_lookup_data;
@@ -1251,6 +1310,7 @@
const OpInfo* getOpInfo(Uint32 op);
Uint32 build(Build_context&,Ptr<Request>,SectionReader&,SectionReader&);
Uint32 initRowBuffers(Ptr<Request>);
+ void buildExecPlan(Ptr<Request>, Ptr<TreeNode> node, Ptr<TreeNode> next);
void checkPrepareComplete(Signal*, Ptr<Request>, Uint32 cnt);
void start(Signal*, Ptr<Request>);
void checkBatchComplete(Signal*, Ptr<Request>, Uint32 cnt);
@@ -1259,13 +1319,14 @@
void sendConf(Signal*, Ptr<Request>, bool is_complete);
void complete(Signal*, Ptr<Request>);
void cleanup(Ptr<Request>);
+ void cleanupBatch(Ptr<Request>);
void abort(Signal*, Ptr<Request>, Uint32 errCode);
Uint32 nodeFail(Signal*, Ptr<Request>, NdbNodeBitmask mask);
Uint32 createNode(Build_context&, Ptr<Request>, Ptr<TreeNode> &);
void reportBatchComplete(Signal*, Ptr<Request>, Ptr<TreeNode>);
void releaseScanBuffers(Ptr<Request> requestPtr);
- void releaseRequestBuffers(Ptr<Request> requestPtr, bool reset);
+ void releaseRequestBuffers(Ptr<Request> requestPtr);
void releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode>);
void registerActiveCursor(Ptr<Request>, Ptr<TreeNode>);
void nodeFail_checkRequests(Signal*);
@@ -1368,6 +1429,7 @@
Uint32 lookup_build(Build_context&,Ptr<Request>,
const QueryNode*, const QueryNodeParameters*);
void lookup_start(Signal*, Ptr<Request>, Ptr<TreeNode>);
+ void lookup_resume(Signal*, Ptr<Request>, Ptr<TreeNode>);
void lookup_send(Signal*, Ptr<Request>, Ptr<TreeNode>);
void lookup_execTRANSID_AI(Signal*, Ptr<Request>, Ptr<TreeNode>,
const RowPtr&);
@@ -1375,6 +1437,7 @@
void lookup_execLQHKEYCONF(Signal*, Ptr<Request>, Ptr<TreeNode>);
void lookup_parent_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
void lookup_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+ void lookup_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
void lookup_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
Uint32 lookup_execNODE_FAILREP(Signal*signal, Ptr<Request>, Ptr<TreeNode>,
NdbNodeBitmask);
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-11-08 11:58:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp 2012-11-13 10:14:52 +0000
@@ -50,7 +50,7 @@
* DEBUG options for different parts od SPJ block
* Comment out those part you don't want DEBUG'ed.
*/
-#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
+//#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
//#define DEBUG_DICT(x) ndbout << "DBSPJ: "<< x << endl;
//#define DEBUG_LQHKEYREQ
//#define DEBUG_SCAN_FRAGREQ
@@ -1314,7 +1314,35 @@
Uint32
Dbspj::initRowBuffers(Ptr<Request> requestPtr)
{
- Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+ jam();
+ /**
+ * Execution of scan request requires restrictions
+ * of how lookup-children issues their LQHKEYREQs:
+ * A large scan result with many parallel lookup
+ * siblings can easily flood the job buffers with too many
+ * REQs. So we set up an 'execution plan' for how a
+ * scan request should be executed:
+ *
+ * NOTE: It could make sense to do the same for a lookup Req.
+ * However, CONF/REF for these leafs operations are not
+ * returned to SPJ. Thus, there are no way to know when
+ * the operation has completed, and other operation could
+ * be resumed.
+ *
+ * As a lookup request does not have the same potential for
+ * producing lots of LQHKEYREQs, we believe/hope the risk
+ * of flooding job buffers for a lookup request can be ignored.
+ */
+ if (requestPtr.p->isScan())
+ {
+ jam();
+ Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+ Ptr<TreeNode> treeRootPtr;
+
+ list.first(treeRootPtr); // treeRootPtr is a scan
+ ndbrequire(!treeRootPtr.isNull());
+ buildExecPlan(requestPtr, treeRootPtr, NullTreeNodePtr);
+ }
/**
* Init ROW_BUFFERS iff Request has to buffer any rows.
@@ -1345,6 +1373,7 @@
requestPtr.p->m_rowBuffer.init(BUFFER_STACK);
}
+ Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
Ptr<TreeNode> treeNodePtr;
for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
{
@@ -1372,7 +1401,150 @@
}
return 0;
-}
+} // Dbspj::initRowBuffers
+
+/**
+ * buildExecPlan():
+ * Decides the order/pace in which the different
+ * TreeNodes should be executed.
+ * Currently it is only used to insert sequentialization point in
+ * the execution of bushy lookup-child nodes. (aka star-join).
+ * This is done in order to avoid too many LQHKEYREQ-signals to
+ * be sent which could overflow the job buffers.
+ *
+ * For each branch of TreeNodes starting with a scan, we identify
+ * any 'bushines' among its lookup children. We set up a left -> right
+ * execution order among these such that:
+ * - A child lookup operation can not be REQuested before we
+ * either has executed a TRANSID_AI from the scan parent,
+ * or executed a CONF / REF from another lookup child.
+ * - When a lookup CONF or REF is executed, its TreeNode is
+ * annotated with 'resume' info which decides if/which TreeNode
+ * we should execute next.
+ *
+ * This will maintain a strict 1:1 fanout between incomming rows
+ * being processed, and new row REQuest being produced.
+ * Thus avoiding that large scan result will flood the jobb buffers
+ * with too many lookup requests.
+ *
+ * FUTURE:
+ * For join children where child execution now is T_EXEC_SEQUENTIAL,
+ * it should be relatively simple to extend SPJ to do 'inner join'.
+ * As we at these sequential point knows wheteher the previous
+ * joined children didn't found any matches, we can skip REQuesting
+ * rows from other children having the same parent row.
+ */
+void
+Dbspj::buildExecPlan(Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr,
+ Ptr<TreeNode> nextLookup)
+{
+ Uint32 lookupChildren[NDB_SPJ_MAX_TREE_NODES];
+ Uint32 lookupChildCnt = 0;
+
+ /**
+ * Need to iterate lookup childs in reverse order to set up 'next'
+ * operations. As this is not possible throught ConstDataBufferIterator,
+ * store any lookup childs into temp array childPtrI[].
+ * Scan childs are parents of new 'scan -> lookup' branches.
+ */
+ {
+ LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+ Local_dependency_map childList(pool, treeNodePtr.p->m_dependent_nodes);
+ Dependency_map::ConstDataBufferIterator it;
+ for (childList.first(it); !it.isNull(); childList.next(it))
+ {
+ jam();
+ Ptr<TreeNode> childPtr;
+ m_treenode_pool.getPtr(childPtr, *it.data);
+
+ if (childPtr.p->m_info == &g_LookupOpInfo)
+ {
+ jam();
+ lookupChildren[lookupChildCnt++] = *it.data;
+ }
+ else
+ {
+ // Build a new plan starting from this scan operation
+ jam();
+ buildExecPlan(requestPtr, childPtr, NullTreeNodePtr);
+ }
+ }
+ }
+
+ /**
+ * Lookup children might have to wait for previous LQHKEYREQs to
+ * complete before they are allowed to send their own requests.
+ * (In order to not overfill jobb buffers)
+ */
+ if (treeNodePtr.p->m_info == &g_LookupOpInfo &&
+ !nextLookup.isNull())
+ {
+ jam();
+ /**
+ * Annotate that:
+ * - 'nextLookup' is not allowed to start immediately.
+ * - 'treeNode' restart 'nextLookup' when it completes
+ */
+ nextLookup.p->m_bits |= TreeNode::T_EXEC_SEQUENTIAL;
+
+ if (lookupChildCnt==0) //'isLeaf() or only scan children
+ {
+ jam();
+ treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_CONF |
+ TreeNode::TN_RESUME_REF;
+ DEBUG("ExecPlan: 'REF/CONF' from node " << treeNodePtr.p->m_node_no
+ << " resumes node " << nextLookup.p->m_node_no);
+ }
+ else
+ {
+ /**
+ * Will REQuest from one of its child lookups if CONF,
+ * so we don't resume another TreeNode in addition.
+ */
+ jam();
+ treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_REF;
+ DEBUG("ExecPlan: 'REF' from node " << treeNodePtr.p->m_node_no
+ << " resumes node " << nextLookup.p->m_node_no);
+ }
+ treeNodePtr.p->m_resumePtrI = nextLookup.i;
+
+ /**
+ * When we T_EXEC_SEQUENTIAL, TreeNode will iterate its
+ * parent rows in order to create new REQ's as previous
+ * are completed (CONF or REF).
+ * - Prepare RowIterator for parent rows
+ * - Buffer rows to be iterated in the parent node
+ */
+ {
+ jam();
+
+ ndbassert(nextLookup.p->m_parentPtrI != RNIL);
+ Ptr<TreeNode> parentPtr;
+ m_treenode_pool.getPtr(parentPtr, nextLookup.p->m_parentPtrI);
+ parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER
+ | TreeNode::T_ROW_BUFFER_MAP;
+ requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS;
+
+ DEBUG("ExecPlan: rows from node " << parentPtr.p->m_node_no
+ << " are buffered");
+ }
+ }
+
+ /**
+ * Recursively build exec. plan for any lookup child.
+ */
+ for (int i = lookupChildCnt-1; i >= 0; i--)
+ {
+ jam();
+ Ptr<TreeNode> childPtr;
+ m_treenode_pool.getPtr(childPtr, lookupChildren[i]);
+ ndbassert(childPtr.p->m_info == &g_LookupOpInfo);
+
+ buildExecPlan(requestPtr, childPtr, nextLookup);
+ nextLookup = childPtr;
+ }
+} // Dbspj::buildExecPlan
Uint32
Dbspj::createNode(Build_context& ctx, Ptr<Request> requestPtr,
@@ -1547,26 +1719,18 @@
{
jam();
/**
- * request completed
+ * Entire Request completed
*/
cleanup(requestPtr);
}
- else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
- {
- jam();
- /**
- * release unneeded buffers as preparation for later SCAN_NEXTREQ
- */
- releaseScanBuffers(requestPtr);
- }
- else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
- {
- jam();
- /**
- * if not multiple scans in request, simply release all pages allocated
- * for row buffers (all rows will be released anyway)
- */
- releaseRequestBuffers(requestPtr, true);
+ else
+ {
+ jam();
+ /**
+ * Cleanup the TreeNode branches getting another
+ * batch of result rows.
+ */
+ cleanupBatch(requestPtr);
}
}
@@ -1699,6 +1863,26 @@
}
void
+Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
+{
+ Uint32 bit = treeNodePtr.p->m_node_no;
+ ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
+ requestPtr.p->m_active_nodes.set(bit);
+
+ Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
+#ifdef VM_TRACE
+ {
+ Ptr<TreeNode> nodePtr;
+ for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
+ {
+ ndbrequire(nodePtr.i != treeNodePtr.i);
+ }
+ }
+#endif
+ list.add(treeNodePtr);
+}
+
+void
Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete)
{
if (requestPtr.p->isScan())
@@ -1819,26 +2003,64 @@
return 0;
}
+/**
+ * Cleanup resources in preparation for a SCAN_NEXTREQ
+ * requesting a new batch of rows.
+ */
void
-Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
+Dbspj::cleanupBatch(Ptr<Request> requestPtr)
{
+ /**
+ * Needs to be atleast 1 active otherwise we should have
+ * taken the Request cleanup "path" in batchComplete
+ */
+ ndbassert(requestPtr.p->m_cnt_active >= 1);
+
+ /**
+ * Release any buffered rows for the TreeNode branches
+ * getting new rows.
+ */
+ if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
+ {
+ if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
+ {
+ jam();
+ /**
+ * A MULTI_SCAN may selectively retrieve rows from only
+ * some of the (scan-) branches in the Request.
+ * Selectively release from only these brances.
+ */
+ releaseScanBuffers(requestPtr);
+ }
+ else
+ {
+ jam();
+ /**
+ * if not multiple scans in request, simply release all pages allocated
+ * for row buffers (all rows will be released anyway)
+ */
+ // Root node should be the one and only being active
+ ndbassert(requestPtr.p->m_cnt_active == 1);
+ ndbassert(requestPtr.p->m_active_nodes.get(0));
+ releaseRequestBuffers(requestPtr);
+ }
+ } //RT_ROW_BUFFERS
+
+
Ptr<TreeNode> treeNodePtr;
Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
{
/**
- * Release buffered rows for all treeNodes getting more rows
+ * Re-init row buffer structures for those treeNodes getting more rows
* in the following NEXTREQ, including all its childs.
*/
if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) ||
requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
{
- if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
- {
- jam();
- releaseNodeRows(requestPtr, treeNodePtr);
- }
+ jam();
+ treeNodePtr.p->m_rows.init();
}
/**
@@ -1848,6 +2070,21 @@
if (requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
{
jam();
+ /**
+ * Common TreeNode cleanup:
+ * Release list of deferred operations which may refer
+ * buffered rows released above.
+ */
+ LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+ {
+ Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+ correlations.release();
+ }
+ treeNodePtr.p->m_deferred.init();
+
+ /**
+ * TreeNode-type specific cleanup.
+ */
if (treeNodePtr.p->m_info->m_parent_batch_cleanup != 0)
{
jam();
@@ -1856,31 +2093,30 @@
}
}
}
- /**
- * Needs to be atleast 1 active otherwise we should have
- * taken the cleanup "path" in batchComplete
- */
- ndbrequire(requestPtr.p->m_cnt_active >= 1);
}
void
-Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
+Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
{
- Uint32 bit = treeNodePtr.p->m_node_no;
- ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
- requestPtr.p->m_active_nodes.set(bit);
+ Ptr<TreeNode> treeNodePtr;
+ Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
- Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
-#ifdef VM_TRACE
+ for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
{
- Ptr<TreeNode> nodePtr;
- for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
+ /**
+ * Release buffered rows for all treeNodes getting more rows
+ * in the following NEXTREQ, including all its childs.
+ */
+ if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) ||
+ requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
{
- ndbrequire(nodePtr.i != treeNodePtr.i);
+ if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
+ {
+ jam();
+ releaseNodeRows(requestPtr, treeNodePtr);
+ }
}
}
-#endif
- list.add(treeNodePtr);
}
void
@@ -1906,7 +2142,6 @@
releaseRow(treeNodePtr.p->m_rows, pos);
cnt ++;
}
- treeNodePtr.p->m_rows.init();
DEBUG("RowIterator: released " << cnt << " rows!");
if (treeNodePtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP)
@@ -1982,7 +2217,7 @@
}
void
-Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr, bool reset)
+Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr)
{
DEBUG("releaseRequestBuffers"
<< ", request: " << requestPtr.i
@@ -2006,17 +2241,6 @@
}
requestPtr.p->m_rowBuffer.reset();
}
-
- if (reset)
- {
- Ptr<TreeNode> nodePtr;
- Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
- for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
- {
- jam();
- nodePtr.p->m_rows.init();
- }
- }
}
void
@@ -2226,7 +2450,7 @@
jam();
m_lookup_request_hash.remove(requestPtr, *requestPtr.p);
}
- releaseRequestBuffers(requestPtr, false);
+ releaseRequestBuffers(requestPtr);
ArenaHead ah = requestPtr.p->m_arena;
m_request_pool.release(requestPtr);
m_arenaAllocator.release(ah);
@@ -2253,6 +2477,11 @@
pattern.release();
}
+ {
+ Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+ correlations.release();
+ }
+
if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL)
{
jam();
@@ -3612,6 +3841,7 @@
treeNodePtr.p->m_lookup_data.m_outstanding--;
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+ && treeNodePtr.p->m_deferred.isEmpty()
&& treeNodePtr.p->m_lookup_data.m_parent_batch_complete
&& treeNodePtr.p->m_lookup_data.m_outstanding == 0)
{
@@ -3726,7 +3956,21 @@
treeNodePtr.p->m_lookup_data.m_outstanding -= cnt;
+ /**
+ * Another TreeNode awaited for completion of this request
+ * before it could resume its operation.
+ */
+ if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF)
+ {
+ jam();
+ ndbassert(treeNodePtr.p->m_resumePtrI != RNIL);
+ Ptr<TreeNode> resumeTreeNodePtr;
+ m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI);
+ lookup_resume(signal, requestPtr, resumeTreeNodePtr);
+ }
+
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+ && treeNodePtr.p->m_deferred.isEmpty()
&& treeNodePtr.p->m_lookup_data.m_parent_batch_complete
&& treeNodePtr.p->m_lookup_data.m_outstanding == 0)
{
@@ -3762,7 +4006,21 @@
treeNodePtr.p->m_lookup_data.m_outstanding--;
+ /**
+ * Another TreeNode awaited for completion of this request
+ * before it could resume its operation.
+ */
+ if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_CONF)
+ {
+ jam();
+ ndbassert(treeNodePtr.p->m_resumePtrI != RNIL);
+ Ptr<TreeNode> resumeTreeNodePtr;
+ m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI);
+ lookup_resume(signal, requestPtr, resumeTreeNodePtr);
+ }
+
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+ && treeNodePtr.p->m_deferred.isEmpty()
&& treeNodePtr.p->m_lookup_data.m_parent_batch_complete
&& treeNodePtr.p->m_lookup_data.m_outstanding == 0)
{
@@ -3786,6 +4044,99 @@
{
jam();
+ DEBUG("::lookup_parent_row"
+ << ", node: " << treeNodePtr.p->m_node_no);
+
+ if (treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL)
+ {
+ jam();
+ DEBUG("T_EXEC_SEQUENTIAL --> exec deferred");
+
+ /**
+ * Append correlation values of deferred parent row operations
+ * to a list / fifo. Upon resume, we will then be able to
+ * relocate all parent rows for which to resume operations.
+ */
+ LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+ Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+ if (!correlations.append(&rowRef.m_src_correlation, 1))
+ {
+ jam();
+ abort(signal, requestPtr, DbspjErr::OutOfQueryMemory);
+ return;
+ }
+ return;
+ }
+
+ lookup_row(signal, requestPtr, treeNodePtr, rowRef);
+} // Dbspj::lookup_parent_row()
+
+/**
+ * lookup_resume() is a delayed lookup_parent_row.
+ * It will locate the next parent row now allowed to execute,
+ * and create a child lookup request for that row.
+ */
+void
+Dbspj::lookup_resume(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr)
+{
+ DEBUG("::lookup_resume"
+ << ", node: " << treeNodePtr.p->m_node_no
+ );
+
+ ndbassert(treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL);
+ ndbassert(treeNodePtr.p->m_parentPtrI != RNIL);
+ ndbassert(!treeNodePtr.p->m_deferred.isEmpty());
+
+ if (unlikely(requestPtr.p->m_state & Request::RS_ABORTING))
+ {
+ jam();
+ return;
+ }
+
+ Uint32 corrVal;
+ {
+ LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+ Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+
+ Local_pattern_store::DataBufferIterator it;
+ const bool valid = correlations.position(it, (Uint32)(treeNodePtr.p->m_deferred.m_pos++));
+ (void)valid; ndbassert(valid);
+ corrVal = *it.data;
+ }
+
+ Ptr<TreeNode> parentPtr;
+ m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
+
+ // Set up RowPtr & RowRef for this parent row
+ RowPtr row;
+ row.m_src_node_ptrI = parentPtr.i;
+ row.m_src_correlation = corrVal;
+
+ ndbassert(parentPtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP);
+ RowRef ref;
+ parentPtr.p->m_rows.m_map.copyto(ref);
+ const Uint32* const mapptr = get_row_ptr(ref);
+
+ // Relocate parent row from correlation value.
+ const Uint32 rowId = (corrVal & 0xFFFF);
+ parentPtr.p->m_rows.m_map.load(mapptr, rowId, ref);
+
+ const Uint32* const rowptr = get_row_ptr(ref);
+ setupRowPtr(parentPtr.p->m_rows, row, ref, rowptr);
+
+ lookup_row(signal, requestPtr, treeNodePtr, row);
+} // Dbspj::lookup_resume()
+
+void
+Dbspj::lookup_row(Signal* signal,
+ Ptr<Request> requestPtr,
+ Ptr<TreeNode> treeNodePtr,
+ const RowPtr & rowRef)
+{
+ jam();
+
/**
* Here we need to...
* 1) construct a key
@@ -3796,7 +4147,7 @@
const Uint32 tableId = treeNodePtr.p->m_tableOrIndexId;
const Uint32 corrVal = rowRef.m_src_correlation;
- DEBUG("::lookup_parent_row"
+ DEBUG("::lookup_row"
<< ", node: " << treeNodePtr.p->m_node_no);
do
@@ -3853,10 +4204,11 @@
* When the key contains NULL values, an EQ-match is impossible!
* Entire lookup request can therefore be eliminate as it is known
* to be REFused with errorCode = 626 (Row not found).
- * Different handling is required depening of request being a
- * scan or lookup:
+ * Scan request can elliminate these child LQHKEYREQs if REFs
+ * are not needed in order to handle TN_RESUME_REF.
*/
- if (requestPtr.p->isScan())
+ if (requestPtr.p->isScan() &&
+ (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF) == 0)
{
/**
* Scan request: We can simply ignore lookup operation:
@@ -3868,7 +4220,7 @@
releaseSection(ptrI);
return; // Bailout, KEYREQ would have returned KEYREF(626) anyway
}
- else // isLookup()
+ else // isLookup() or 'need TN_RESUME_REF'
{
/**
* Ignored lookup request need a faked KEYREF for the lookup operation.
@@ -4053,7 +4405,9 @@
ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete);
treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true;
+
if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+ && treeNodePtr.p->m_deferred.isEmpty()
&& treeNodePtr.p->m_lookup_data.m_outstanding == 0)
{
jam();
@@ -4679,6 +5033,7 @@
}
}
}
+ ndbassert(treeNodePtr.p->m_resumePtrI == RNIL);
if (treeNodePtr.p->m_scanfrag_data.m_rows_received ==
treeNodePtr.p->m_scanfrag_data.m_rows_expecting)
@@ -6312,6 +6667,7 @@
ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
data.m_rows_received++;
+ ndbassert(treeNodePtr.p->m_resumePtrI == RNIL);
if (data.m_frags_outstanding == 0 &&
data.m_rows_received == data.m_rows_expecting)
No bundle (reason: useless for push emails).
| Thread |
|---|
| • bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4660 to 4661) | Ole John Aske | 14 Nov |