List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 13 2012 10:28am
Subject:bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4660 to 4661)
View as plain text  
 4661 Ole John Aske	2012-11-13 [merge]
      Merge 7.0 -> 7.1

    modified:
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
 4660 magnus.blaudd@stripped	2012-11-12 [merge]
      Merge 7.0 -> 7.1

    modified:
      sql/mysql_priv.h
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2012-11-08 11:58:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2012-11-13 10:14:52 +0000
@@ -112,6 +112,8 @@
   struct Request;
   struct TreeNode;
   struct ScanFragHandle;
+  typedef DataBuffer2<14, LocalArenaPoolImpl> Correlation_list;
+  typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_correlation_list;
   typedef DataBuffer2<14, LocalArenaPoolImpl> Dependency_map;
   typedef LocalDataBuffer2<14, LocalArenaPoolImpl> Local_dependency_map;
   typedef DataBuffer2<14, LocalArenaPoolImpl> PatternStore;
@@ -792,6 +794,28 @@
     Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
   };
 
+  struct DeferredParentOps
+  {
+    /**
+     * m_correlations contains a list of Correlation Values (Uint32)
+     * which identifies parent rows which has been deferred. 
+     * m_pos are index into this array, identifying the next parent row
+     * for which to resume operation.
+     */
+    Correlation_list::Head m_correlations;
+    Uint16 m_pos; // Next row operation to resume 
+
+    DeferredParentOps() : m_correlations(), m_pos(0) {}
+
+    void init()  {
+      m_correlations.init();
+      m_pos = 0;
+    }
+    bool isEmpty() const {
+      return (m_pos == m_correlations.getSize());
+    }
+  };
+
   struct TreeNode_cursor_ptr
   {
     Uint32 nextList;
@@ -809,7 +833,8 @@
     TreeNode()
     : m_magic(MAGIC), m_state(TN_END),
       m_parentPtrI(RNIL), m_requestPtrI(RNIL),
-      m_ancestors()
+      m_ancestors(),
+      m_resumeEvents(0), m_resumePtrI(RNIL)
     {
     }
 
@@ -818,6 +843,7 @@
       m_info(0), m_bits(T_LEAF), m_state(TN_BUILDING),
       m_parentPtrI(RNIL), m_requestPtrI(request),
       m_ancestors(),
+      m_resumeEvents(0), m_resumePtrI(RNIL),
       nextList(RNIL), prevList(RNIL)
     {
 //    m_send.m_ref = 0;
@@ -945,10 +971,27 @@
        */
       T_SCAN_REPEATABLE = 0x4000,
 
+      /**
+       * Exec of a previous REQ must complete before we can proceed.
+       * A ResumeEvent will later resume exec. of this operation
+       */
+      T_EXEC_SEQUENTIAL = 0x8000,
+
       // End marker...
       T_END = 0
     };
 
+    /**
+     * Describe whether a LQHKEY-REF and/or CONF whould trigger a 
+     * exec resume of another TreeNode having T_EXEC_SEQUENTIAL.
+     * (Used as a bitmask)
+     */
+    enum TreeNodeResumeEvents
+    {
+      TN_RESUME_REF   = 0x01,
+      TN_RESUME_CONF  = 0x02
+    };
+
     bool isLeaf() const { return (m_bits & T_LEAF) != 0;}
 
     // table or index this TreeNode operates on, and its schemaVersion
@@ -974,6 +1017,22 @@
      */
     RowCollection m_rows;
 
+    /**
+     * T_EXEC_SEQUENTIAL cause execution of child operations to
+     * be deferred.  These operations are queued in the 'struct DeferredParentOps'
+     * Currently only Lookup operation might be deferred.
+     * Could later be extended to also cover index scans.
+     */
+    DeferredParentOps m_deferred;
+
+    /**
+     * Set of TreeNodeResumeEvents, possibly or'ed.
+     * Specify whether a REF or CONF will cause a resume
+     * of the TreeNode referred by 'm_resumePtrI'.
+     */
+    Uint32 m_resumeEvents;
+    Uint32 m_resumePtrI;
+
     union
     {
       LookupData m_lookup_data;
@@ -1251,6 +1310,7 @@
   const OpInfo* getOpInfo(Uint32 op);
   Uint32 build(Build_context&,Ptr<Request>,SectionReader&,SectionReader&);
   Uint32 initRowBuffers(Ptr<Request>);
+  void buildExecPlan(Ptr<Request>, Ptr<TreeNode> node, Ptr<TreeNode> next);
   void checkPrepareComplete(Signal*, Ptr<Request>, Uint32 cnt);
   void start(Signal*, Ptr<Request>);
   void checkBatchComplete(Signal*, Ptr<Request>, Uint32 cnt);
@@ -1259,13 +1319,14 @@
   void sendConf(Signal*, Ptr<Request>, bool is_complete);
   void complete(Signal*, Ptr<Request>);
   void cleanup(Ptr<Request>);
+  void cleanupBatch(Ptr<Request>);
   void abort(Signal*, Ptr<Request>, Uint32 errCode);
   Uint32 nodeFail(Signal*, Ptr<Request>, NdbNodeBitmask mask);
 
   Uint32 createNode(Build_context&, Ptr<Request>, Ptr<TreeNode> &);
   void reportBatchComplete(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void releaseScanBuffers(Ptr<Request> requestPtr);
-  void releaseRequestBuffers(Ptr<Request> requestPtr, bool reset);
+  void releaseRequestBuffers(Ptr<Request> requestPtr);
   void releaseNodeRows(Ptr<Request> requestPtr, Ptr<TreeNode>);
   void registerActiveCursor(Ptr<Request>, Ptr<TreeNode>);
   void nodeFail_checkRequests(Signal*);
@@ -1368,6 +1429,7 @@
   Uint32 lookup_build(Build_context&,Ptr<Request>,
 		      const QueryNode*, const QueryNodeParameters*);
   void lookup_start(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void lookup_resume(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_send(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_execTRANSID_AI(Signal*, Ptr<Request>, Ptr<TreeNode>,
 			     const RowPtr&);
@@ -1375,6 +1437,7 @@
   void lookup_execLQHKEYCONF(Signal*, Ptr<Request>, Ptr<TreeNode>);
   void lookup_parent_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
   void lookup_parent_batch_complete(Signal*, Ptr<Request>, Ptr<TreeNode>);
+  void lookup_row(Signal*, Ptr<Request>, Ptr<TreeNode>, const RowPtr &);
   void lookup_abort(Signal*, Ptr<Request>, Ptr<TreeNode>);
   Uint32 lookup_execNODE_FAILREP(Signal*signal, Ptr<Request>, Ptr<TreeNode>,
                                NdbNodeBitmask);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2012-11-08 11:58:49 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2012-11-13 10:14:52 +0000
@@ -50,7 +50,7 @@
  * DEBUG options for different parts od SPJ block
  * Comment out those part you don't want DEBUG'ed.
  */
-#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
+//#define DEBUG(x) ndbout << "DBSPJ: "<< x << endl;
 //#define DEBUG_DICT(x) ndbout << "DBSPJ: "<< x << endl;
 //#define DEBUG_LQHKEYREQ
 //#define DEBUG_SCAN_FRAGREQ
@@ -1314,7 +1314,35 @@
 Uint32
 Dbspj::initRowBuffers(Ptr<Request> requestPtr)
 {
-  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+  jam();
+  /**
+   * Execution of scan request requires restrictions
+   * of how lookup-children issues their LQHKEYREQs:
+   * A large scan result with many parallel lookup
+   * siblings can easily flood the job buffers with too many
+   * REQs. So we set up an 'execution plan' for how a
+   * scan request should be executed:
+   *
+   * NOTE: It could make sense to do the same for a lookup Req.
+   * However, CONF/REF for these leafs operations are not 
+   * returned to SPJ. Thus, there are no way to know when
+   * the operation has completed, and other operation could
+   * be resumed.
+   *
+   * As a lookup request does not have the same potential for
+   * producing lots of LQHKEYREQs, we believe/hope the risk
+   * of flooding job buffers for a lookup request can be ignored.
+   */
+  if (requestPtr.p->isScan())
+  {
+    jam();
+    Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
+    Ptr<TreeNode> treeRootPtr;
+
+    list.first(treeRootPtr);   // treeRootPtr is a scan
+    ndbrequire(!treeRootPtr.isNull());
+    buildExecPlan(requestPtr, treeRootPtr, NullTreeNodePtr);
+  }
 
   /**
    * Init ROW_BUFFERS iff Request has to buffer any rows.
@@ -1345,6 +1373,7 @@
       requestPtr.p->m_rowBuffer.init(BUFFER_STACK);
     }
 
+    Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
     Ptr<TreeNode> treeNodePtr;
     for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
     {
@@ -1372,7 +1401,150 @@
   }
 
   return 0;
-}
+} // Dbspj::initRowBuffers
+
+/**
+ * buildExecPlan():
+ *   Decides the order/pace in which the different
+ *   TreeNodes should be executed.
+ *   Currently it is only used to insert sequentialization point in
+ *   the execution of bushy lookup-child nodes. (aka star-join).
+ *   This is done in order to avoid too many LQHKEYREQ-signals to
+ *   be sent which could overflow the job buffers.
+ *
+ *   For each branch of TreeNodes starting with a scan, we identify
+ *   any 'bushines' among its lookup children. We set up a left -> right
+ *   execution order among these such that:
+ *    - A child lookup operation can not be REQuested before we
+ *      either has executed a TRANSID_AI from the scan parent,
+ *      or executed a CONF / REF from another lookup child.
+ *    - When a lookup CONF or REF is executed, its TreeNode is 
+ *      annotated with 'resume' info which decides if/which TreeNode
+ *      we should execute next.
+ *
+ *   This will maintain a strict 1:1 fanout between incomming rows
+ *   being processed, and new row REQuest being produced.
+ *   Thus avoiding that large scan result will flood the jobb buffers 
+ *   with too many lookup requests.
+ *
+ * FUTURE:
+ *   For join children where child execution now is T_EXEC_SEQUENTIAL,
+ *   it should be relatively simple to extend SPJ to do 'inner join'.
+ *   As we at these sequential point knows wheteher the previous
+ *   joined children didn't found any matches, we can skip REQuesting
+ *   rows from other children having the same parent row.
+ */
+void
+Dbspj::buildExecPlan(Ptr<Request>  requestPtr,
+                     Ptr<TreeNode> treeNodePtr,
+                     Ptr<TreeNode> nextLookup)
+{
+  Uint32 lookupChildren[NDB_SPJ_MAX_TREE_NODES];
+  Uint32 lookupChildCnt = 0;
+
+  /**
+   * Need to iterate lookup childs in reverse order to set up 'next'
+   * operations. As this is not possible throught ConstDataBufferIterator,
+   * store any lookup childs into temp array childPtrI[].
+   * Scan childs are parents of new 'scan -> lookup' branches.
+   */
+  {
+    LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+    Local_dependency_map childList(pool, treeNodePtr.p->m_dependent_nodes);
+    Dependency_map::ConstDataBufferIterator it;
+    for (childList.first(it); !it.isNull(); childList.next(it))
+    {
+      jam();
+      Ptr<TreeNode> childPtr;
+      m_treenode_pool.getPtr(childPtr, *it.data);
+
+      if (childPtr.p->m_info == &g_LookupOpInfo)
+      {
+        jam();
+        lookupChildren[lookupChildCnt++] = *it.data;
+      }
+      else
+      {
+        // Build a new plan starting from this scan operation
+        jam();
+        buildExecPlan(requestPtr, childPtr, NullTreeNodePtr);
+      }
+    }
+  }
+
+  /**
+   * Lookup children might have to wait for previous LQHKEYREQs to
+   * complete before they are allowed to send their own requests.
+   * (In order to not overfill jobb buffers)
+   */
+  if (treeNodePtr.p->m_info == &g_LookupOpInfo &&
+      !nextLookup.isNull())
+  {
+    jam();
+    /**
+     * Annotate that:
+     *  - 'nextLookup' is not allowed to start immediately.
+     *  - 'treeNode' restart 'nextLookup' when it completes 
+     */
+    nextLookup.p->m_bits |= TreeNode::T_EXEC_SEQUENTIAL;
+
+    if (lookupChildCnt==0)  //'isLeaf() or only scan children
+    {
+      jam();
+      treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_CONF |
+                                      TreeNode::TN_RESUME_REF;
+      DEBUG("ExecPlan: 'REF/CONF' from node " << treeNodePtr.p->m_node_no
+         << " resumes node " << nextLookup.p->m_node_no);
+    }
+    else
+    {
+      /**
+       * Will REQuest from one of its child lookups if CONF,
+       * so we don't resume another TreeNode in addition.
+       */
+      jam();
+      treeNodePtr.p->m_resumeEvents = TreeNode::TN_RESUME_REF;
+      DEBUG("ExecPlan: 'REF' from node " << treeNodePtr.p->m_node_no
+         << " resumes node " << nextLookup.p->m_node_no);
+    }
+    treeNodePtr.p->m_resumePtrI = nextLookup.i;
+
+    /**
+     * When we T_EXEC_SEQUENTIAL, TreeNode will iterate its
+     * parent rows in order to create new REQ's as previous
+     * are completed (CONF or REF).
+     *  - Prepare RowIterator for parent rows
+     *  - Buffer rows to be iterated in the parent node 
+     */
+    {
+      jam();
+
+      ndbassert(nextLookup.p->m_parentPtrI != RNIL);
+      Ptr<TreeNode> parentPtr;
+      m_treenode_pool.getPtr(parentPtr, nextLookup.p->m_parentPtrI);
+      parentPtr.p->m_bits |= TreeNode::T_ROW_BUFFER
+                           | TreeNode::T_ROW_BUFFER_MAP;
+      requestPtr.p->m_bits |= Request::RT_ROW_BUFFERS;
+
+      DEBUG("ExecPlan: rows from node " << parentPtr.p->m_node_no
+         << " are buffered");
+    }
+  }
+
+  /**
+   * Recursively build exec. plan for any lookup child.
+   */
+  for (int i = lookupChildCnt-1; i >= 0; i--)
+  {
+    jam();
+    Ptr<TreeNode> childPtr;
+    m_treenode_pool.getPtr(childPtr, lookupChildren[i]);
+    ndbassert(childPtr.p->m_info == &g_LookupOpInfo);
+
+    buildExecPlan(requestPtr, childPtr, nextLookup);
+    nextLookup = childPtr;
+  }
+} // Dbspj::buildExecPlan
 
 Uint32
 Dbspj::createNode(Build_context& ctx, Ptr<Request> requestPtr,
@@ -1547,26 +1719,18 @@
   {
     jam();
     /**
-     * request completed
+     * Entire Request completed
      */
     cleanup(requestPtr);
   }
-  else if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
-  {
-    jam();
-    /**
-     * release unneeded buffers as preparation for later SCAN_NEXTREQ
-     */
-    releaseScanBuffers(requestPtr);
-  }
-  else if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
-  {
-    jam();
-    /**
-     * if not multiple scans in request, simply release all pages allocated
-     * for row buffers (all rows will be released anyway)
-     */
-    releaseRequestBuffers(requestPtr, true);
+  else
+  {
+    jam();
+    /**
+     * Cleanup the TreeNode branches getting another
+     * batch of result rows.
+     */
+    cleanupBatch(requestPtr);
   }
 }
 
@@ -1699,6 +1863,26 @@
 }
 
 void
+Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
+{
+  Uint32 bit = treeNodePtr.p->m_node_no;
+  ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
+  requestPtr.p->m_active_nodes.set(bit);
+
+  Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
+#ifdef VM_TRACE
+  {
+    Ptr<TreeNode> nodePtr;
+    for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
+    {
+      ndbrequire(nodePtr.i != treeNodePtr.i);
+    }
+  }
+#endif
+  list.add(treeNodePtr);
+}
+
+void
 Dbspj::sendConf(Signal* signal, Ptr<Request> requestPtr, bool is_complete)
 {
   if (requestPtr.p->isScan())
@@ -1819,26 +2003,64 @@
   return 0;
 }
 
+/**
+ * Cleanup resources in preparation for a SCAN_NEXTREQ
+ * requesting a new batch of rows.
+ */
 void
-Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
+Dbspj::cleanupBatch(Ptr<Request> requestPtr)
 {
+  /**
+   * Needs to be atleast 1 active otherwise we should have
+   *   taken the Request cleanup "path" in batchComplete
+   */
+  ndbassert(requestPtr.p->m_cnt_active >= 1);
+
+  /**
+   * Release any buffered rows for the TreeNode branches
+   * getting new rows.
+   */
+  if ((requestPtr.p->m_bits & Request::RT_ROW_BUFFERS) != 0)
+  {
+    if ((requestPtr.p->m_bits & Request::RT_MULTI_SCAN) != 0)
+    {
+      jam();
+      /**
+       * A MULTI_SCAN may selectively retrieve rows from only
+       * some of the (scan-) branches in the Request.
+       * Selectively release from only these brances.
+       */
+      releaseScanBuffers(requestPtr);
+    }
+    else
+    {
+      jam();
+      /**
+       * if not multiple scans in request, simply release all pages allocated
+       * for row buffers (all rows will be released anyway)
+       */
+      // Root node should be the one and only being active
+      ndbassert(requestPtr.p->m_cnt_active == 1);
+      ndbassert(requestPtr.p->m_active_nodes.get(0));
+      releaseRequestBuffers(requestPtr);
+    }
+  } //RT_ROW_BUFFERS
+
+
   Ptr<TreeNode> treeNodePtr;
   Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
 
   for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
   {
     /**
-     * Release buffered rows for all treeNodes getting more rows
+     * Re-init row buffer structures for those treeNodes getting more rows
      * in the following NEXTREQ, including all its childs.
      */
     if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) ||
         requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
     {
-      if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
-      {
-        jam();
-        releaseNodeRows(requestPtr, treeNodePtr);
-      }
+      jam();
+      treeNodePtr.p->m_rows.init();
     }
 
     /**
@@ -1848,6 +2070,21 @@
     if (requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
     {
       jam();
+      /**
+       * Common TreeNode cleanup:
+       * Release list of deferred operations which may refer 
+       * buffered rows released above.
+       */
+      LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+      {
+        Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+        correlations.release();
+      }
+      treeNodePtr.p->m_deferred.init();
+
+      /**
+       * TreeNode-type specific cleanup.
+       */
       if (treeNodePtr.p->m_info->m_parent_batch_cleanup != 0)
       {
         jam();
@@ -1856,31 +2093,30 @@
       }
     }
   }
-  /**
-   * Needs to be atleast 1 active otherwise we should have
-   *   taken the cleanup "path" in batchComplete
-   */
-  ndbrequire(requestPtr.p->m_cnt_active >= 1);
 }
 
 void
-Dbspj::registerActiveCursor(Ptr<Request> requestPtr, Ptr<TreeNode> treeNodePtr)
+Dbspj::releaseScanBuffers(Ptr<Request> requestPtr)
 {
-  Uint32 bit = treeNodePtr.p->m_node_no;
-  ndbrequire(!requestPtr.p->m_active_nodes.get(bit));
-  requestPtr.p->m_active_nodes.set(bit);
+  Ptr<TreeNode> treeNodePtr;
+  Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
 
-  Local_TreeNodeCursor_list list(m_treenode_pool, requestPtr.p->m_cursor_nodes);
-#ifdef VM_TRACE
+  for (list.first(treeNodePtr); !treeNodePtr.isNull(); list.next(treeNodePtr))
   {
-    Ptr<TreeNode> nodePtr;
-    for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
+    /**
+     * Release buffered rows for all treeNodes getting more rows
+     * in the following NEXTREQ, including all its childs.
+     */
+    if (requestPtr.p->m_active_nodes.get(treeNodePtr.p->m_node_no) ||
+        requestPtr.p->m_active_nodes.overlaps(treeNodePtr.p->m_ancestors))
     {
-      ndbrequire(nodePtr.i != treeNodePtr.i);
+      if (treeNodePtr.p->m_bits & TreeNode::T_ROW_BUFFER)
+      {
+        jam();
+        releaseNodeRows(requestPtr, treeNodePtr);
+      }
     }
   }
-#endif
-  list.add(treeNodePtr);
 }
 
 void
@@ -1906,7 +2142,6 @@
     releaseRow(treeNodePtr.p->m_rows, pos);
     cnt ++;
   }
-  treeNodePtr.p->m_rows.init();
   DEBUG("RowIterator: released " << cnt << " rows!");
 
   if (treeNodePtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP)
@@ -1982,7 +2217,7 @@
 }
 
 void
-Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr, bool reset)
+Dbspj::releaseRequestBuffers(Ptr<Request> requestPtr)
 {
   DEBUG("releaseRequestBuffers"
      << ", request: " << requestPtr.i
@@ -2006,17 +2241,6 @@
     }
     requestPtr.p->m_rowBuffer.reset();
   }
-
-  if (reset)
-  {
-    Ptr<TreeNode> nodePtr;
-    Local_TreeNode_list list(m_treenode_pool, requestPtr.p->m_nodes);
-    for (list.first(nodePtr); !nodePtr.isNull(); list.next(nodePtr))
-    {
-      jam();
-      nodePtr.p->m_rows.init();
-    }
-  }
 }
 
 void
@@ -2226,7 +2450,7 @@
     jam();
     m_lookup_request_hash.remove(requestPtr, *requestPtr.p);
   }
-  releaseRequestBuffers(requestPtr, false);
+  releaseRequestBuffers(requestPtr);
   ArenaHead ah = requestPtr.p->m_arena;
   m_request_pool.release(requestPtr);
   m_arenaAllocator.release(ah);
@@ -2253,6 +2477,11 @@
     pattern.release();
   }
 
+  {
+    Local_correlation_list correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+    correlations.release();
+  }
+
   if (treeNodePtr.p->m_send.m_keyInfoPtrI != RNIL)
   {
     jam();
@@ -3612,6 +3841,7 @@
   treeNodePtr.p->m_lookup_data.m_outstanding--;
 
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
@@ -3726,7 +3956,21 @@
 
   treeNodePtr.p->m_lookup_data.m_outstanding -= cnt;
 
+  /**
+   * Another TreeNode awaited for completion of this request
+   * before it could resume its operation.
+   */
+  if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF)
+  {
+    jam();
+    ndbassert(treeNodePtr.p->m_resumePtrI != RNIL);
+    Ptr<TreeNode> resumeTreeNodePtr;
+    m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI);
+    lookup_resume(signal, requestPtr, resumeTreeNodePtr);
+  }
+
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
@@ -3762,7 +4006,21 @@
 
   treeNodePtr.p->m_lookup_data.m_outstanding--;
 
+  /**
+   * Another TreeNode awaited for completion of this request
+   * before it could resume its operation.
+   */
+  if (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_CONF)
+  {
+    jam();
+    ndbassert(treeNodePtr.p->m_resumePtrI != RNIL);
+    Ptr<TreeNode> resumeTreeNodePtr;
+    m_treenode_pool.getPtr(resumeTreeNodePtr, treeNodePtr.p->m_resumePtrI);
+    lookup_resume(signal, requestPtr, resumeTreeNodePtr);
+  }
+
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_parent_batch_complete
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
@@ -3786,6 +4044,99 @@
 {
   jam();
 
+  DEBUG("::lookup_parent_row"
+     << ", node: " << treeNodePtr.p->m_node_no);
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL)
+  {
+    jam();
+    DEBUG("T_EXEC_SEQUENTIAL --> exec deferred");
+
+    /**
+     * Append correlation values of deferred parent row operations
+     * to a list / fifo. Upon resume, we will then be able to 
+     * relocate all parent rows for which to resume operations.
+     */
+    LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+    Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+    if (!correlations.append(&rowRef.m_src_correlation, 1))
+    {
+      jam();
+      abort(signal, requestPtr, DbspjErr::OutOfQueryMemory);
+      return;
+    }
+    return;
+  }
+
+  lookup_row(signal, requestPtr, treeNodePtr, rowRef);
+} // Dbspj::lookup_parent_row()
+
+/**
+ * lookup_resume() is a delayed lookup_parent_row.
+ * It will locate the next parent row now allowed to execute,
+ * and create a child lookup request for that row.
+ */
+void
+Dbspj::lookup_resume(Signal* signal,
+                     Ptr<Request> requestPtr,
+                     Ptr<TreeNode> treeNodePtr)
+{
+  DEBUG("::lookup_resume"
+     << ", node: " << treeNodePtr.p->m_node_no
+  );
+
+  ndbassert(treeNodePtr.p->m_bits & TreeNode::T_EXEC_SEQUENTIAL);
+  ndbassert(treeNodePtr.p->m_parentPtrI != RNIL);
+  ndbassert(!treeNodePtr.p->m_deferred.isEmpty());
+
+  if (unlikely(requestPtr.p->m_state & Request::RS_ABORTING))
+  {
+    jam();
+    return;
+  }
+
+  Uint32 corrVal;
+  {
+    LocalArenaPoolImpl pool(requestPtr.p->m_arena, m_dependency_map_pool);
+    Local_pattern_store correlations(pool, treeNodePtr.p->m_deferred.m_correlations);
+
+    Local_pattern_store::DataBufferIterator it;
+    const bool valid = correlations.position(it, (Uint32)(treeNodePtr.p->m_deferred.m_pos++));
+    (void)valid; ndbassert(valid);
+    corrVal = *it.data;
+  }
+
+  Ptr<TreeNode> parentPtr;
+  m_treenode_pool.getPtr(parentPtr, treeNodePtr.p->m_parentPtrI);
+
+  // Set up RowPtr & RowRef for this parent row
+  RowPtr row;
+  row.m_src_node_ptrI = parentPtr.i;
+  row.m_src_correlation = corrVal;
+
+  ndbassert(parentPtr.p->m_rows.m_type == RowCollection::COLLECTION_MAP);
+  RowRef ref;
+  parentPtr.p->m_rows.m_map.copyto(ref);
+  const Uint32* const mapptr = get_row_ptr(ref);
+
+  // Relocate parent row from correlation value.
+  const Uint32 rowId = (corrVal & 0xFFFF);
+  parentPtr.p->m_rows.m_map.load(mapptr, rowId, ref);
+
+  const Uint32* const rowptr = get_row_ptr(ref);
+  setupRowPtr(parentPtr.p->m_rows, row, ref, rowptr);
+
+  lookup_row(signal, requestPtr, treeNodePtr, row);
+} // Dbspj::lookup_resume()
+
+void
+Dbspj::lookup_row(Signal* signal,
+                         Ptr<Request> requestPtr,
+                         Ptr<TreeNode> treeNodePtr,
+                         const RowPtr & rowRef)
+{
+  jam();
+
   /**
    * Here we need to...
    *   1) construct a key
@@ -3796,7 +4147,7 @@
   const Uint32 tableId = treeNodePtr.p->m_tableOrIndexId;
   const Uint32 corrVal = rowRef.m_src_correlation;
 
-  DEBUG("::lookup_parent_row"
+  DEBUG("::lookup_row"
      << ", node: " << treeNodePtr.p->m_node_no);
 
   do
@@ -3853,10 +4204,11 @@
          * When the key contains NULL values, an EQ-match is impossible!
          * Entire lookup request can therefore be eliminate as it is known
          * to be REFused with errorCode = 626 (Row not found).
-         * Different handling is required depening of request being a
-         * scan or lookup:
+         * Scan request can elliminate these child LQHKEYREQs if REFs 
+         * are not needed in order to handle TN_RESUME_REF.
          */
-        if (requestPtr.p->isScan())
+        if (requestPtr.p->isScan() &&
+            (treeNodePtr.p->m_resumeEvents & TreeNode::TN_RESUME_REF) == 0)
         {
           /**
            * Scan request: We can simply ignore lookup operation:
@@ -3868,7 +4220,7 @@
           releaseSection(ptrI);
           return;  // Bailout, KEYREQ would have returned KEYREF(626) anyway
         }
-        else  // isLookup()
+        else  // isLookup() or 'need TN_RESUME_REF'
         {
           /**
            * Ignored lookup request need a faked KEYREF for the lookup operation.
@@ -4053,7 +4405,9 @@
 
   ndbassert(!treeNodePtr.p->m_lookup_data.m_parent_batch_complete);
   treeNodePtr.p->m_lookup_data.m_parent_batch_complete = true;
+
   if (treeNodePtr.p->m_bits & TreeNode::T_REPORT_BATCH_COMPLETE
+      && treeNodePtr.p->m_deferred.isEmpty()
       && treeNodePtr.p->m_lookup_data.m_outstanding == 0)
   {
     jam();
@@ -4679,6 +5033,7 @@
       }
     }
   }
+  ndbassert(treeNodePtr.p->m_resumePtrI == RNIL);
 
   if (treeNodePtr.p->m_scanfrag_data.m_rows_received ==
       treeNodePtr.p->m_scanfrag_data.m_rows_expecting)
@@ -6312,6 +6667,7 @@
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
   data.m_rows_received++;
+  ndbassert(treeNodePtr.p->m_resumePtrI == RNIL);
 
   if (data.m_frags_outstanding == 0 &&
       data.m_rows_received == data.m_rows_expecting)

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (ole.john.aske:4660 to 4661) Ole John Aske14 Nov