List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 13 2012 10:28am
Subject:bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:5035 to 5036)
Bug#14709490
View as plain text  
 5036 Ole John Aske	2012-11-13
      Fix for bug#14709490: EXECUTION OF 'PUSHED JOINS' MAY EXHAUST JOB BUFFERS
      
      A pushed scan query, with multiple 'bushy' lookup children could flood
      the job buffers by producing multiple LQHKEYREQ's for each row
      returned from the scan. Normally we allows a 1::4 fanout between
      consumed and produced signals, but this is not sufficient for
      such a pushed join: There could be an (almost) unlimited number of rows
      returned from the scan (and multiple such executing in parallel)
      so we could overflow the jobbuffers by allowing any fanout > 1.
      
      This fix introduce sequentialization points in the SPJ execution:
      Any lookup children which otherwise would have been executed in parallel
      with another lookup sibling is tagged with T_EXEC_SEQUENTIAL.
      When such a TreeNode attempt to send a LQHKEYREQ, the operation is
      deferred (queued). It is later resumed when its parallel child
      completes (CONF or REF) one of its own REQuests.
      
      Thereby we have restricted the fanout for bushy scan-lookup joins
      to 1::1.
      
      Furthermore, it changes the 'pushed query planer' (>= V7.2) in the
      handler interface such that we now try to avoid some of the bushines
      among lookup childrens in the pushed query. 
      
      Due to the sequentialization this patch enforce among siblings, it
      is misleading to have an EXPLAINed query plan which show that siblings
      are executed in parallel. By also making them sequentially in the
      pushed join, we may take advantage of the selectivity of previously
      equi-joined children, such that non-matching branches in the query
      Tree are not executed.
      
      There are also the refactorications:
      
       1) Introduce Dbspj::cleanupBatch() which is intended to do all
          required resorce dealloc and required reinits in preparation for
          a new batch of result rows.
      
          Also removes 'reset' argument from releaseRequestBuffers and instead
          do required reset from callee (::cleanupBatch()) if required.
      
      2)  Moved Dbspj::registerActiveCursor to a place
          closer to where it is normally used, and closer to other functional
          dependent methods.

    modified:
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
 5035 magnus.blaudd@stripped	2012-11-12
      ndb
       - Add missing "extern opt_server_id_mask" declaration in mysql_priv.h (aka mysqld.h in the  future)

    modified:
      sql/mysql_priv.h
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2012-11-08 11:55:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2012-11-13 10:13:17 +0000
@@ -112,6 +112,8 @@
   struct Request;
   struct TreeNode;
   struct ScanFragHandle;
+  typedef DataBuffer2<14, LocalArenaPoolImpl> Correlation_list;
+  typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_correlation_list;
   typedef DataBuffer2<14, LocalArenaPoolImpl> Dependency_map;
   typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_dependency_map;
   typedef DataBuffer2<14, LocalArenaPoolImpl> PatternStore;
@@ -792,6 +794,28 @@
     Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
   };
 
+  struct DeferredParentOps
+  {
+    /**
+     * m_correlations contains a list of Correlation Values (Uint32)
+     * which identifies parent rows which has been deferred. 
+     * m_pos are index into this array, identifying the next parent row
+     * for which to resume operation.
+     */
+    Correlation_list::Head m_correlations;
+    Uint16 m_pos; // Next row operation to resume 
+
+    DeferredParentOps() : m_correlations(), m_pos(0) {}
+
+    void init()  {
+      m_correlations.init();
+      m_pos = 0;
+    }
+    bool isEmpty() const {
+      return (m_pos == m_correlations.getSize());
+    }
+  };
+
   struct TreeNode_cursor_ptr
   {
     Uint32 nextList;
@@ -809,7 +833,8 @@
     TreeNode()
     : m_magic(MAGIC), m_state(TN_END),
       m_parentPtrI(RNIL), m_requestPtrI(RNIL),
-      m_ancestors()
+      m_ancestors(),
+      m_resumeEvents(0), m_resumePtrI(RNIL)
     {
     }
 
@@ -818,6 +843,7 @@
       m_info(0), m_bits(T_LEAF), m_state(TN_BUILDING),
       m_parentPtrI(RNIL), m_requestPtrI(request),
       m_ancestors(),
+      m_resumeEvents(0), m_resumePtrI(RNIL),
       nextList(RNIL), prevList(RNIL)
     {
 //    m_send.m_ref = 0;
@@ -945,10 +971,27 @@
        */
       T_SCAN_REPEATABLE = 0x4000,
 
+      /**
+       * Exec of a previous REQ must complete before we can proceed.
+       * A ResumeEvent will later resume exec. of this operation
+       */
+      T_EXEC_SEQUENTIAL = 0x8000,
+
       // End marker...
       T_END = 0
     };
 
+    /**
+     * Describe whether a LQHKEY-REF and/or CONF whould trigger a 
+     * exec resume of another TreeNode having T_EXEC_SEQUENTIAL.
+     * (Used as a bitmask)
+     */
+    enum TreeNodeResumeEvents
+    {
+      TN_RESUME_REF   = 0x01,
+      TN_RESUME_CONF  = 0x02
+    };
+
     bool isLeaf() const { return (m_bits & T_LEAF) != 0;}
 
     // table or index this TreeNode operates on, and its schemaVersion
@@ -974,6 +1017,22 @@
      */
     RowCollection m_rows;
 
+    /**
+     * T_EXEC_SEQUENTIAL cause execution of child operations to
+     * be deferred.  These operations are queued in the 'struct DeferredParentOps'
+     * Currently only Lookup operation might be deferred.
+     * Could later be extended to also cover index scans.
+     */
+    DeferredParentOps m_deferred;
+
+    /**
+     * Set of TreeNodeResumeEvents, possibly or'ed.
+     * Specify whether a REF or CONF will cause a resume
+     * of the TreeNode referred by 'm_resumePtrI'.
+     */
+    Uint32 m_resumeEvents;
+    Uint32 m_resumePtrI;
+
     union
     {
       LookupData m_lookup_data;
@@ -1251,6 +1310,7 @@
   const OpInfo* getOpInfo(Uint32 op);
   Uint32 build(Build_context&,Ptr<Request>,SectionReader&,SectionReader&);
   Uint32 initRowBuffers(Ptr<Request>);
+  void buildExecPlan(Ptr<Request>, Ptr<TreeNode> node, Ptr<TreeNode> next);
   void checkPrepareComplete(Signal*, Ptr<Request>, Uint32 cnt);
   void start(Signal*, Ptr<Request>);
   void checkBatchComplete(Signal*, Ptr<Request>, Uint32 cnt);
@@ -1259,13 +1319,14 @@
   void sendConf(Signal*, Ptr<Request>, bool is_complete);
   void complete(Signal*, Ptr<Request>);
   void cleanup(Ptr<Request>);
+  void cleanupBatch(Ptr<Request>);
   void abort(Signal*, Ptr<Request>, Uint32 errCode);
   Uint32 nodeFail(Signal*, Ptr<Request>, NdbNodeBitmask mask);
 
   Uint32 createNode(Build_context&, Ptr<Request>, Ptr<TreeNode> &);
   void reportBatchComplete(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void releaseScanBuffers(Ptr<Request> requestPtr);
-  void releaseRequestBuffers(Ptr<Request> requestPtr, bool reset);
+  void releaseRequestBuffers(Ptr<Request> requestPtr);
   void releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode>);
   void registerActiveCursor(Ptr<Request>, Ptr<TreeNode>);
   void nodeFail_checkRequests(Signal*);
@@ -1368,6 +1429,7 @@
   Uint32 lookup_build(Build_context&,Ptr<Request>,
 		      const QueryNode*, const QueryNodeParameters*);
   void lookup_start(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void lookup_resume(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_send(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_execTRANSID_AI(Signal*, Ptr<Request>, Ptr<TreeNode>,
 			     const RowPtr&);
@@ -1375,6 +1437,7 @@
   void lookup_execLQHKEYCONF(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_parent_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
   void lookup_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void lookup_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
   void lookup_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
   Uint32 lookup_execNODE_FAILREP(Signal*signal, Ptr<Request>, Ptr<TreeNode>,
                                NdbNodeBitmask);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2012-11-08 11:55:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2012-11-13 10:13:17 +0000
@@ -50,7 +50,7 @@
  * DEBUG options for different parts od SPJ block
  * Comment out those part you don't want DEBUG'ed.
  */
-#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
+//#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
 //#define DEBUG_DICT(x) ndbout << "DBSPJ: "<< x << endl;
 //#define DEBUG_LQHKEYREQ
 //#define DEBUG_SCAN_FRAGREQ
@@ -1314,7 +1314,35 @@
 Uint32
 Dbspj::initRowBuffers(Ptr<Request> requestPtr)
 {
-  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+  jam();
+  /**
+   * Execution of scan request requires restrictions
+   * of how lookup-children issues their LQHKEYREQs:
+   * A large scan result with many parallel lookup
+   * siblings can easily flood the job buffers with too many
+   * REQs. So we set up an 'execution plan' for how a
+   * scan request should be executed:
+   *
+   * NOTE: It could make sense to do the same for a lookup Req.
+   * However, CONF/REF for these leafs operations are not 
+   * returned to SPJ. Thus, there are no way to know when
+   * the operation has completed, and other operation could
+   * be resumed.
+   *
+   * As a lookup request does not have the same potential for
+   * producing lots of LQHKEYREQs, we believe/hope the risk
+   * of flooding job buffers for a lookup request can be ignored.
+   */
+  if (requestPtr.p->isScan())
+  {
+    jam();
+    Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+    Ptr<TreeNode> treeRootPtr;
+
+    list.first(treeRootPtr);   // treeRootPtr is a scan
+    ndbrequire(!treeRootPtr.isNull());
+    buildExecPlan(requestPtr, treeRootPtr, NullTreeNodePtr);
+  }
 
   /**
    * Init ROW_BUFFERS iff Request has to buffer any rows.
@@ -1345,6 +1373,7 @@
       requestPtr.p->m_rowBuffer.init(BUFFER_STACK);
     }
 
+    Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
     Ptr<TreeNode> treeNodePtr;
     for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
     {
@@ -1372,7 +1401,150 @@
   }
 
   return 0;
-}
+} // Dbspj::initRowBuffers
+
+/**
+ * buildExecPlan():
+ *   Decides the order/pace in which the different
+ *   TreeNodes should be executed.
+ *   Currently it is only used to insert sequentialization point in
+ *   the execution of bushy lookup-child nodes. (aka star-join).
+ *   This is done in order to avoid too many LQHKEYREQ-signals to
+ *   be sent which could overflow the job buffers.
+ *
+ *   For each branch of TreeNodes starting with a scan, we identify
+ *   any 'bushines' among its lookup children. We set up a left -> right
+ *   execution order among these such that:
+ *    - A child lookup operation can not be REQuested before we
+ *      either has executed a TRANSID_AI from the scan parent,
+ *      or executed a CONF / REF from another lookup child.
+ *    - When a lookup CONF or REF is executed, its TreeNode is 
+ *      annotated with 'resume' info which decides if/which TreeNode
+ *      we should execute next.
+ *
+ *   This will maintain a strict 1:1 fanout between incomming rows
+ *   being processed, and new row REQuest being produced.
+ *   Thus avoiding that large scan result will flood the jobb buffers 
+ *   with too many lookup requests.
+ *
+ * FUTURE:
+ *   For join children where child execution now is T_EXEC_SEQUENTIAL,
+ *   it should be relatively simple to extend SPJ to do 'inner join'.
+ *   As we at these sequential point knows wheteher the previous
+ *   joined children didn't found any matches, we can skip REQuesting
+ *   rows from other children having the same parent row.
+ */
+void
+Dbspj::buildExecPlan(Ptr<Request>  requestPtr,
+                     Ptr<TreeNode> treeNodePtr,
+                     Ptr<TreeNode> nextLookup)
+{
+  Uint32 lookupChildren[NDB_SPJ_MAX_TREE_NODES];
+  Uint32 lookupChildCnt = 0;
+
+  /**
+   * Need to iterate lookup childs in reverse order to set up 'next'
+   * operations. As this is not possible throught ConstDataBufferIterator,
+   * store any lookup childs into temp array childPtrI[].
+   * Scan childs are parents of new 'scan -> lookup' branches.
+   */
+  {
+    LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+    Local_dependency_map childList(pool, treeNodePtr.p->m_dependent_nodes);
+    Dependency_map::ConstDataBufferIterator it;
+    for (childList.first(it); !it.isNull(); childList.next(it))
+    {
+      jam();
+      Ptr<TreeNode> childPtr;
+      m_treenode_pool.getPtr(childPtr, *it.data);
+
+      if (childPtr.p->m_info == &g_LookupOpInfo)
+      {
+        jam();
+        lookupChildren[lookupChildCnt++] = *it.data;
+      }
+      else
+      {
+        // Build a new plan starting from this scan operation
+        jam();
+        buildExecPlan(requestPtr, childPtr, NullTreeNodePtr);
+      }
+    }
+  }
+
+  /**
+   * Lookup children might have to wait for previous LQHKEYREQs to
+   * complete before they are allowed to send their own requests.
+   * (In order to not overfill jobb buffers)
+   */
+  if (treeNodePtr.p->m_info == &g_LookupOpInfo &&
+      !nextLookup.isNull())
+  {
+    jam();
+    /**
+     * Annotate that:
+     *  - 'nextLookup' is not allowed to start immediately.
+     *  - 'treeNode' restart 'nextLookup' when it completes 
+     */
+    nextLookup.p->m_bits |= TreeNode::T_EXEC_SEQUENTIAL;
+
+    if (lookupChildCnt==0)  //'isLeaf() or only scan children
+    {
+      jam();
+      treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_CONF |
+                                      TreeNode::TN_RESUME_REF;
+      DEBUG("ExecPlan: 'REF/CONF' from node " << treeNodePtr.p->m_node_no
+         << " resumes node " << nextLookup.p->m_node_no);
+    }
+    else
+    {
+      /**
+       * Will REQuest from one of its child lookups if CONF,
+       * so we don't resume another TreeNode in addition.
+       */
+      jam();
+      treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_REF;
+      DEBUG("ExecPlan: 'REF' from node " << treeNodePtr.p->m_node_no
+         << " resumes node " << nextLookup.p->m_node_no);
+    }
+    treeNodePtr.p->m_resumePtrI = nextLookup.i;
+
+    /**
+     * When we T_EXEC_SEQUENTIAL, TreeNode will iterate its
+     * parent rows in order to create new REQ's as previous
+     * are completed (CONF or REF).
+     *  - Prepare RowIterator for parent rows
+     *  - Buffer rows to be iterated in the parent node 
+     */
+    {
+      jam();
+
+      ndbassert(nextLookup.p->m_parentPtrI != RNIL);
+      Ptr<TreeNode> parentPtr;
+      m_treenode_pool.getPtr(parentPtr, nextLookup.p->m_parentPtrI);
+      parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER
+                           | TreeNode::T_ROW_BUFFER_MAP;
+      requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS;
+
+      DEBUG("ExecPlan: rows from node " << parentPtr.p->m_node_no
+         << " are buffered");
+    }
+  }
+
+  /**
+   * Recursively build exec. plan for any lookup child.
+   */
+  for (int i = lookupChildCnt-1; i >= 0; i--)
+  {
+    jam();
+    Ptr<TreeNode> childPtr;
+    m_treenode_pool.getPtr(childPtr, lookupChildren[i]);
+    ndbassert(childPtr.p->m_info == &g_LookupOpInfo);
+
+    buildExecPlan(requestPtr, childPtr, nextLookup);
+    nextLookup = childPtr;
+  }
+} // Dbspj::buildExecPlan
 
 Uint32
 Dbspj::createNode(Build_context& ctx, Ptr<Request> requestPtr,
@@ -1547,26 +1719,18 @@
   {
     jam();
     /**
-     * request completed
+     * Entire Request completed
      */
     cleanup(requestPtr);
   }
-  else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
-  {
-    jam();
-    /**
-     * release unneeded buffers as preparation for later SCAN_NEXTREQ
-     */
-    releaseScanBuffers(requestPtr);
-  }
-  else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
-  {
-    jam();
-    /**
-     * if not multiple scans in request, simply release all pages allocated
-     * for row buffers (all rows will be released anyway)
-     */
-    releaseRequestBuffers(requestPtr, true);
+  else
+  {
+    jam();
+    /**
+     * Cleanup the TreeNode branches getting another
+     * batch of result rows.
+     */
+    cleanupBatch(requestPtr);
   }
 }
 
@@ -1699,6 +1863,26 @@
 }
 
 void
+Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
+{
+  Uint32 bit = treeNodePtr.p->m_node_no;
+  ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
+  requestPtr.p->m_active_nodes.set(bit);
+
+  Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
+#ifdef VM_TRACE
+  {
+    Ptr<TreeNode> nodePtr;
+    for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
+    {
+      ndbrequire(nodePtr.i != treeNodePtr.i);
+    }
+  }
+#endif
+  list.add(treeNodePtr);
+}
+
+void
 Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete)
 {
   if (requestPtr.p->isScan())
@@ -1819,26 +2003,64 @@
   return 0;
 }
 
+/**
+ * Cleanup resources in preparation for a SCAN_NEXTREQ
+ * requesting a new batch of rows.
+ */
 void
-Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
+Dbspj::cleanupBatch(Ptr<Request> requestPtr)
 {
+  /**
+   * Needs to be atleast 1 active otherwise we should have
+   *   taken the Request cleanup "path" in batchComplete
+   */
+  ndbassert(requestPtr.p->m_cnt_active >= 1);
+
+  /**
+   * Release any buffered rows for the TreeNode branches
+   * getting new rows.
+   */
+  if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
+  {
+    if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
+    {
+      jam();
+      /**
+       * A MULTI_SCAN may selectively retrieve rows from only
+       * some of the (scan-) branches in the Request.
+       * Selectively release from only these brances.
+       */
+      releaseScanBuffers(requestPtr);
+    }
+    else
+    {
+      jam();
+      /**
+       * if not multiple scans in request, simply release all pages allocated
+       * for row buffers (all rows will be released anyway)
+       */
+      // Root node should be the one and only being active
+      ndbassert(requestPtr.p->m_cnt_active == 1);
+      ndbassert(requestPtr.p->m_active_nodes.get(0));
+      releaseRequestBuffers(requestPtr);
+    }
+  } //RT_ROW_BUFFERS
+
+
   Ptr<TreeNode> treeNodePtr;
   Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
 
   for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
   {
     /**
-     * Release buffered rows for all treeNodes getting more rows
+     * Re-init row buffer structures for those treeNodes getting more rows
      * in the following NEXTREQ, including all its childs.
      */
     if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) ||
         requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
     {
-      if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
-      {
-        jam();
-        releaseNodeRows(requestPtr, treeNodePtr);
-      }
+      jam();
+      treeNodePtr.p->m_rows.init();
     }
 
     /**
@@ -1848,6 +2070,21 @@
     if (requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
     {
       jam();
+      /**
+       * Common TreeNode cleanup:
+       * Release list of deferred operations which may refer 
+       * buffered rows released above.
+       */
+      LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+      {
+        Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+        correlations.release();
+      }
+      treeNodePtr.p->m_deferred.init();
+
+      /**
+       * TreeNode-type specific cleanup.
+       */
       if (treeNodePtr.p->m_info->m_parent_batch_cleanup != 0)
       {
         jam();
@@ -1856,31 +2093,30 @@
       }
     }
   }
-  /**
-   * Needs to be atleast 1 active otherwise we should have
-   *   taken the cleanup "path" in batchComplete
-   */
-  ndbrequire(requestPtr.p->m_cnt_active >= 1);
 }
 
 void
-Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
+Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
 {
-  Uint32 bit = treeNodePtr.p->m_node_no;
-  ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
-  requestPtr.p->m_active_nodes.set(bit);
+  Ptr<TreeNode> treeNodePtr;
+  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
 
-  Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
-#ifdef VM_TRACE
+  for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
   {
-    Ptr<TreeNode> nodePtr;
-    for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
+    /**
+     * Release buffered rows for all treeNodes getting more rows
+     * in the following NEXTREQ, including all its childs.
+     */
+    if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) ||
+        requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
     {
-      ndbrequire(nodePtr.i != treeNodePtr.i);
+      if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
+      {
+        jam();
+        releaseNodeRows(requestPtr, treeNodePtr);
+      }
     }
   }
-#endif
-  list.add(treeNodePtr);
 }
 
 void
@@ -1906,7 +2142,6 @@
     releaseRow(treeNodePtr.p->m_rows, pos);
     cnt ++;
   }
-  treeNodePtr.p->m_rows.init();
   DEBUG("RowIterator: released " << cnt << " rows!");
 
   if (treeNodePtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP)
@@ -1982,7 +2217,7 @@
 }
 
 void
-Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr, bool reset)
+Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr)
 {
   DEBUG("releaseRequestBuffers"
      << ", request: " << requestPtr.i
@@ -2006,17 +2241,6 @@
     }
     requestPtr.p->m_rowBuffer.reset();
   }
-
-  if (reset)
-  {
-    Ptr<TreeNode> nodePtr;
-    Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
-    for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
-    {
-      jam();
-      nodePtr.p->m_rows.init();
-    }
-  }
 }
 
 void
@@ -2226,7 +2450,7 @@
     jam();
     m_lookup_request_hash.remove(requestPtr, *requestPtr.p);
   }
-  releaseRequestBuffers(requestPtr, false);
+  releaseRequestBuffers(requestPtr);
   ArenaHead ah = requestPtr.p->m_arena;
   m_request_pool.release(requestPtr);
   m_arenaAllocator.release(ah);
@@ -2253,6 +2477,11 @@
     pattern.release();
   }
 
+  {
+    Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+    correlations.release();
+  }
+
   if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL)
   {
     jam();
@@ -3612,6 +3841,7 @@
   treeNodePtr.p->m_lookup_data.m_outstanding--;
 
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
@@ -3726,7 +3956,21 @@
 
   treeNodePtr.p->m_lookup_data.m_outstanding -= cnt;
 
+  /**
+   * Another TreeNode awaited for completion of this request
+   * before it could resume its operation.
+   */
+  if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF)
+  {
+    jam();
+    ndbassert(treeNodePtr.p->m_resumePtrI != RNIL);
+    Ptr<TreeNode> resumeTreeNodePtr;
+    m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI);
+    lookup_resume(signal, requestPtr, resumeTreeNodePtr);
+  }
+
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
@@ -3762,7 +4006,21 @@
 
   treeNodePtr.p->m_lookup_data.m_outstanding--;
 
+  /**
+   * Another TreeNode awaited for completion of this request
+   * before it could resume its operation.
+   */
+  if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_CONF)
+  {
+    jam();
+    ndbassert(treeNodePtr.p->m_resumePtrI != RNIL);
+    Ptr<TreeNode> resumeTreeNodePtr;
+    m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI);
+    lookup_resume(signal, requestPtr, resumeTreeNodePtr);
+  }
+
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
@@ -3786,6 +4044,99 @@
 {
   jam();
 
+  DEBUG("::lookup_parent_row"
+     << ", node: " << treeNodePtr.p->m_node_no);
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL)
+  {
+    jam();
+    DEBUG("T_EXEC_SEQUENTIAL --> exec deferred");
+
+    /**
+     * Append correlation values of deferred parent row operations
+     * to a list / fifo. Upon resume, we will then be able to 
+     * relocate all parent rows for which to resume operations.
+     */
+    LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+    Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+    if (!correlations.append(&rowRef.m_src_correlation, 1))
+    {
+      jam();
+      abort(signal, requestPtr, DbspjErr::OutOfQueryMemory);
+      return;
+    }
+    return;
+  }
+
+  lookup_row(signal, requestPtr, treeNodePtr, rowRef);
+} // Dbspj::lookup_parent_row()
+
+/**
+ * lookup_resume() is a delayed lookup_parent_row.
+ * It will locate the next parent row now allowed to execute,
+ * and create a child lookup request for that row.
+ */
+void
+Dbspj::lookup_resume(Signal* signal,
+                     Ptr<Request> requestPtr,
+                     Ptr<TreeNode> treeNodePtr)
+{
+  DEBUG("::lookup_resume"
+     << ", node: " << treeNodePtr.p->m_node_no
+  );
+
+  ndbassert(treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL);
+  ndbassert(treeNodePtr.p->m_parentPtrI != RNIL);
+  ndbassert(!treeNodePtr.p->m_deferred.isEmpty());
+
+  if (unlikely(requestPtr.p->m_state & Request::RS_ABORTING))
+  {
+    jam();
+    return;
+  }
+
+  Uint32 corrVal;
+  {
+    LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+    Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+
+    Local_pattern_store::DataBufferIterator it;
+    const bool valid = correlations.position(it, (Uint32)(treeNodePtr.p->m_deferred.m_pos++));
+    (void)valid; ndbassert(valid);
+    corrVal = *it.data;
+  }
+
+  Ptr<TreeNode> parentPtr;
+  m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
+
+  // Set up RowPtr & RowRef for this parent row
+  RowPtr row;
+  row.m_src_node_ptrI = parentPtr.i;
+  row.m_src_correlation = corrVal;
+
+  ndbassert(parentPtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP);
+  RowRef ref;
+  parentPtr.p->m_rows.m_map.copyto(ref);
+  const Uint32* const mapptr = get_row_ptr(ref);
+
+  // Relocate parent row from correlation value.
+  const Uint32 rowId = (corrVal & 0xFFFF);
+  parentPtr.p->m_rows.m_map.load(mapptr, rowId, ref);
+
+  const Uint32* const rowptr = get_row_ptr(ref);
+  setupRowPtr(parentPtr.p->m_rows, row, ref, rowptr);
+
+  lookup_row(signal, requestPtr, treeNodePtr, row);
+} // Dbspj::lookup_resume()
+
+void
+Dbspj::lookup_row(Signal* signal,
+                         Ptr<Request> requestPtr,
+                         Ptr<TreeNode> treeNodePtr,
+                         const RowPtr & rowRef)
+{
+  jam();
+
   /**
    * Here we need to...
    *   1) construct a key
@@ -3796,7 +4147,7 @@
   const Uint32 tableId = treeNodePtr.p->m_tableOrIndexId;
   const Uint32 corrVal = rowRef.m_src_correlation;
 
-  DEBUG("::lookup_parent_row"
+  DEBUG("::lookup_row"
      << ", node: " << treeNodePtr.p->m_node_no);
 
   do
@@ -3853,10 +4204,11 @@
          * When the key contains NULL values, an EQ-match is impossible!
          * Entire lookup request can therefore be eliminate as it is known
          * to be REFused with errorCode = 626 (Row not found).
-         * Different handling is required depening of request being a
-         * scan or lookup:
+         * Scan request can elliminate these child LQHKEYREQs if REFs 
+         * are not needed in order to handle TN_RESUME_REF.
          */
-        if (requestPtr.p->isScan())
+        if (requestPtr.p->isScan() &&
+            (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF) == 0)
         {
           /**
            * Scan request: We can simply ignore lookup operation:
@@ -3868,7 +4220,7 @@
           releaseSection(ptrI);
           return;  // Bailout, KEYREQ would have returned KEYREF(626) anyway
         }
-        else  // isLookup()
+        else  // isLookup() or 'need TN_RESUME_REF'
         {
           /**
            * Ignored lookup request need a faked KEYREF for the lookup operation.
@@ -4053,7 +4405,9 @@
 
   ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete);
   treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true;
+
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
     jam();
@@ -4679,6 +5033,7 @@
       }
     }
   }
+  ndbassert(treeNodePtr.p->m_resumePtrI == RNIL);
 
   if (treeNodePtr.p->m_scanfrag_data.m_rows_received ==
       treeNodePtr.p->m_scanfrag_data.m_rows_expecting)
@@ -6312,6 +6667,7 @@
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
   data.m_rows_received++;
+  ndbassert(treeNodePtr.p->m_resumePtrI == RNIL);
 
   if (data.m_frags_outstanding == 0 &&
       data.m_rows_received == data.m_rows_expecting)

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:5035 to 5036)Bug#14709490Ole John Aske14 Nov