List:Commits« Previous MessageNext Message »
From:Jan Wedvik Date:August 22 2011 8:36am
Subject:bzr push into mysql-5.1-telco-7.0 branch (jan.wedvik:4471 to 4472)
View as plain text  
 4472 Jan Wedvik	2011-08-22
      This commit implements adaptive parallelism for (non-root) scan operations
      (i.e. index scans). So far, such scans have always been executed with
      maximal parallelism, such that each fragment were scanned in parallel.
      This also meant that the available batch size would have to be divided by the
      number of fragments. For index scans with large result sets, this was 
      inefficient, since it meant asking for many small batches.
      
      As an example, assume that a table scan of t1 is followed by an index scan on 
      t2 where the index has poor selectivity. The query is then effectively a cross
      join. When such queries where tested on distributed clusters with 
      multi-threaded ndbd, the queries would typically run five to six times slower 
      than with query pushdown disabled.
      
      This commit will therefore try to set an optimal parallelism, depending on the
      size of the scan result set. This eliminates the performance regression in 
      the test cases described above.
      
      This works as follows:
      
      * The first time an index scan is started, it starts with parallelism=1 for 
      the first batch.
      
      * For the subsequent batches, the SPJ block will try to set the parallelism 
      such that all rows from a single fragment will fit in one batch. If this
      is not possible, parallelism will remain 1.
      
      * Whenever the index scan is started again (typically after receiving a new
      batch for its parent), the SPJ block will try to guess the optimal parallelism
      based on statistics from previous runs. 'Optimal' means that one can expect
      all rows from a given fragment to be retrieved in one batch if possible.
      
      See also quilt sereies published on 2011-08-18.

    modified:
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.hpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
 4471 jonas oreland	2011-08-19 [merge]
      ndb - merge 63 to 70

    modified:
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-08-22 08:35:35 +0000
@@ -530,21 +530,87 @@ public:
   typedef SLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> ScanFragHandle_list;
   typedef LocalSLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> Local_ScanFragHandle_list;
 
+  /**
+   * This class computes mean and standard deviation incrementally for a series
+   * of samples.
+   */
+  class IncrementalStatistics
+  {
+  public:
+    /**
+     * We cannot have a (non-trivial) constructor, since this class is used in
+     * unions.
+     */
+    void init()
+    {
+      m_mean = m_sumSquare = 0.0;
+      m_noOfSamples = 0;
+    }
+
+    // Add another sample.
+    void update(double sample);
+
+    double getMean() const { return m_mean; }
+
+    double getStdDev() const { 
+      return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1));
+    }
+
+  private:
+    // Mean of all samples
+    double m_mean;
+    //Sum of square of differences from the current mean.
+    double m_sumSquare;
+    Uint32 m_noOfSamples;
+  }; // IncrementalStatistics
+
   struct ScanIndexData
   {
     Uint16 m_frags_complete;
     Uint16 m_frags_outstanding;
+    /**
+     * The number of fragment for which we have not yet sent SCAN_FRAGREQ but
+     * will eventually do so.
+     */
+    Uint16 m_frags_not_started;
     Uint32 m_rows_received;  // #execTRANSID_AI
     Uint32 m_rows_expecting; // Sum(ScanFragConf)
     Uint32 m_batch_chunks;   // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
     Uint32 m_scanCookie;
     Uint32 m_fragCount;
+    // The number of fragments that we scan in parallel.
+    Uint32 m_parallelism;
+    /**
+     * True if this is the first instantiation of this operation. A child
+     * operation will be instantiated once for each batch of its parent.
+     */
+    bool m_firstExecution;
+    /**
+     * Mean and standard deviation for the optimal parallelism for earlier
+     * executions of this operation.
+     */
+    IncrementalStatistics m_parallelismStat;
+    // Total number of rows for the current execution of this operation.
+    Uint32 m_totalRows;
+    // Total number of bytes for the current execution of this operation.
+    Uint32 m_totalBytes;
+
     ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
     union
     {
       PatternStore::HeadPOD m_prunePattern;
       Uint32 m_constPrunePtrI;
     };
+    /**
+     * Max number of rows seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchRows;
+    /**
+     * Max number of bytes seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchBytes;
     Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
   };
 
@@ -1164,6 +1230,13 @@ private:
   void scanIndex_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr&);
   void scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32);
   void scanIndex_send(Signal*,Ptr<Request>,Ptr<TreeNode>);
+  void scanIndex_send(Signal* signal,
+                      Ptr<Request> requestPtr,
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange);
   void scanIndex_batchComplete(Signal* signal);
   Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
                             Uint32 fragId);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-07-04 13:37:56 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-08-22 08:35:35 +0000
@@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx
     data.m_fragments.init();
     data.m_frags_outstanding = 0;
     data.m_frags_complete = 0;
+    data.m_frags_not_started = 0;
+    data.m_parallelismStat.init();
+    data.m_firstExecution = true;
     data.m_batch_chunks = 0;
 
     err = parseDA(ctx, requestPtr, treeNodePtr,
@@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S
       }
     }
   }
+  data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
 
   if (data.m_frags_complete == data.m_fragCount)
   {
@@ -5015,7 +5019,118 @@ Dbspj::scanIndex_parent_batch_complete(S
   /**
    * When parent's batch is complete, we send our batch
    */
-  scanIndex_send(signal, requestPtr, treeNodePtr);
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
+  ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+  }
+  else if (data.m_firstExecution)
+  {
+    /**
+     * Having a high parallelism would allow us to fetch data from many
+     * fragments in parallel and thus reduce the number of round trips.
+     * On the other hand, we should set parallelism so low that we can fetch
+     * all data from a fragment in one batch if possible.
+     * Since this is the first execution, we do not know how many rows or bytes
+     * this operation is likely to return. Therefore we set parallelism to 1,
+     * since this gives the lowest penalty if our guess is wrong.
+     */
+    jam();
+    data.m_parallelism = 1;
+  }
+  else
+  {
+    jam();
+    /**
+     * Use statistics from earlier runs of this operation to estimate the
+     * initial parallelism. We use the mean minus two times the standard
+     * deviation to have a low risk of setting parallelism to high (as erring
+     * in the other direction is more costly).
+     */
+    Int32 parallelism = data.m_parallelismStat.getMean()
+      - 2 * data.m_parallelismStat.getStdDev();
+
+    if (parallelism < 1)
+    {
+      jam();
+      parallelism = 1;
+    }
+    else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
+    {
+      jam();
+      /**
+       * Set parallelism such that we can expect to have similar
+       * parallelism in each batch. For example if there are 8 remaining
+       * fragments, then we should fecth 2 times 4 fragments rather than
+       * 7+1.
+       */
+      const Int32 roundTrips =
+        1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
+      parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
+    }
+
+    data.m_parallelism = static_cast<Uint32>(parallelism);
+
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_send() starting index scan with parallelism="
+          << data.m_parallelism);
+#endif
+  }
+  ndbrequire(data.m_parallelism > 0);
+
+  const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
+  const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
+  ndbassert(bs_rows > 0);
+  ndbassert(bs_bytes > 0);
+
+  data.m_largestBatchRows = 0;
+  data.m_largestBatchBytes = 0;
+  data.m_totalRows = 0;
+  data.m_totalBytes = 0;
+
+  {
+    Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+    Ptr<ScanFragHandle> fragPtr;
+    list.first(fragPtr);
+
+    while(!fragPtr.isNull())
+    {
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
+      fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
+      list.next(fragPtr);
+    }
+  }
+
+  Uint32 batchRange = 0;
+  scanIndex_send(signal,
+                 requestPtr,
+                 treeNodePtr,
+                 data.m_parallelism,
+                 bs_bytes,
+                 bs_rows,
+                 batchRange);
+
+  data.m_firstExecution = false;
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
+               data.m_fragCount);
+  }
+  else
+  {
+    ndbrequire((data.m_frags_outstanding + data.m_frags_complete) <=
+               data.m_fragCount);
+  }
+
+  data.m_batch_chunks = 1;
+  requestPtr.p->m_cnt_active++;
+  requestPtr.p->m_outstanding++;
+  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
 }
 
 void
@@ -5045,77 +5160,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
   }
 }
 
+/**
+ * Ask for the first batch for a number of fragments.
+ */
 void
 Dbspj::scanIndex_send(Signal* signal,
                       Ptr<Request> requestPtr,
-                      Ptr<TreeNode> treeNodePtr)
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange)
 {
-  jam();
-
-  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-
-  Uint32 cnt = 1;
-  Uint32 bs_rows = org->batch_size_rows;
-  Uint32 bs_bytes = org->batch_size_bytes;
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    jam();
-    cnt = data.m_fragCount - data.m_frags_complete;
-    ndbrequire(cnt > 0);
-
-    bs_rows /= cnt;
-    bs_bytes /= cnt;
-    ndbassert(bs_rows > 0);
-  }
-
   /**
    * if (m_bits & prunemask):
    * - Range keys sliced out to each ScanFragHandle
    * - Else, range keys kept on first (and only) ScanFragHandle
    */
-  Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
+  const bool prune = treeNodePtr.p->m_bits &
+    (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
 
   /**
-   * Don't release keyInfo if it may be sent multiple times, eiter:
-   *   - Not pruned -> same keyInfo goes to all datanodes.
-   *   - Result handling is REPEAT_SCAN_RESULT and same batch may be 
-   *     repeated multiple times due to incomplete bushy X-scans.
-   *     (by ::scanIndex_parent_batch_repeat())
-   *
-   * When not released, ::scanIndex_parent_batch_cleanup() will 
-   * eventually release them when preparing arrival of a new parent batch.
+   * If scan is repeatable, we must make sure not to release range keys so
+   * that we canuse them again in the next repetition.
    */
-  const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
-                        (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
+  const bool repeatable =
+    (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
 
-  ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  ndbassert(noOfFrags > 0);
+  ndbassert(data.m_frags_not_started >= noOfFrags);
+  ScanFragReq* const req =
+    reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  const ScanFragReq * const org
+    = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
   memcpy(req, org, sizeof(data.m_scanFragReq));
   // req->variableData[0] // set below
   req->variableData[1] = requestPtr.p->m_rootResultData;
   req->batch_size_bytes = bs_bytes;
   req->batch_size_rows = bs_rows;
 
-  Ptr<ScanFragHandle> fragPtr;
+  Uint32 requestsSent = 0;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
-
-  Uint32 keyInfoPtrI = RNIL;
+  Ptr<ScanFragHandle> fragPtr;
   list.first(fragPtr);
-  if ((treeNodePtr.p->m_bits & prunemask) == 0)
+  Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
+  ndbrequire(prune || keyInfoPtrI != RNIL);
+  /**
+   * Iterate over the list of fragments until we have sent as many
+   * SCAN_FRAGREQs as we should.
+   */
+  while (requestsSent < noOfFrags)
   {
     jam();
-    keyInfoPtrI = fragPtr.p->m_rangePtrI;
-    ndbrequire(keyInfoPtrI != RNIL);
-  }
+    ndbassert(!fragPtr.isNull());
 
-  Uint32 batchRange = 0;
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
-  {
-    jam();
+    if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
+    {
+      // Skip forward to the frags that we should send.
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
+
+    const Uint32 ref = fragPtr.p->m_ref;
+
+    if (noOfFrags==1 && !prune &&
+        data.m_frags_not_started == data.m_fragCount &&
+        refToNode(ref) != getOwnNodeId() &&
+        list.hasNext(fragPtr))
+    {
+      /**
+       * If we are doing a scan with adaptive parallelism and start with
+       * parallelism=1 then it makes sense to fetch a batch from a fragment on
+       * the local data node. The reason for this is that if that fragment
+       * contains few rows, we may be able to read from several fragments in
+       * parallel. Then we minimize the total number of round trips (to remote
+       * data nodes) if we fetch the first fragment batch locally.
+       */
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
 
     SectionHandle handle(this);
 
-    Uint32 ref = fragPtr.p->m_ref;
     Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
 
     /**
@@ -5124,26 +5253,34 @@ Dbspj::scanIndex_send(Signal* signal,
     req->senderData = fragPtr.i;
     req->fragmentNoKeyLen = fragPtr.p->m_fragId;
 
-    if ((treeNodePtr.p->m_bits & prunemask))
+    if (prune)
     {
       jam();
       keyInfoPtrI = fragPtr.p->m_rangePtrI;
       if (keyInfoPtrI == RNIL)
       {
+        /**
+         * Since we use pruning, we can see that no parent rows would hash
+         * to this fragment.
+         */
         jam();
         fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
+        list.next(fragPtr);
         continue;
       }
-    }
-    if (release)
-    {
-      /**
-       * If we'll use sendSignal() and we need to send the attrInfo several
-       *   times, we need to copy them
-       */
-      Uint32 tmp = RNIL;
-      ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
-      attrInfoPtrI = tmp;
+
+      if (!repeatable)
+      {
+        /**
+         * If we'll use sendSignal() and we need to send the attrInfo several
+         * times, we need to copy them. (For repeatable or unpruned scans
+         * we use sendSignalNoRelease(), so then we do not need to copy.)
+         */
+        jam();
+        Uint32 tmp = RNIL;
+        ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
+        attrInfoPtrI = tmp;
+      }
     }
 
     req->variableData[0] = batchRange;
@@ -5152,16 +5289,14 @@ Dbspj::scanIndex_send(Signal* signal,
     handle.m_cnt = 2;
 
 #if defined DEBUG_SCAN_FRAGREQ
-    {
-      ndbout_c("SCAN_FRAGREQ to %x", ref);
-      printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
-                        NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
-                        DBLQH);
-      printf("ATTRINFO: ");
-      print(handle.m_ptr[0], stdout);
-      printf("KEYINFO: ");
-      print(handle.m_ptr[1], stdout);
-    }
+    ndbout_c("SCAN_FRAGREQ to %x", ref);
+    printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
+                      NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+                      DBLQH);
+    printf("ATTRINFO: ");
+    print(handle.m_ptr[0], stdout);
+    printf("KEYINFO: ");
+    print(handle.m_ptr[1], stdout);
 #endif
 
     if (refToNode(ref) == getOwnNodeId())
@@ -5173,8 +5308,13 @@ Dbspj::scanIndex_send(Signal* signal,
       c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
     }
 
-    if (release)
+    if (prune && !repeatable)
     {
+      /**
+       * For a non-repeatable pruned scan, key info is unique for each
+       * fragment and therefore cannot be reused, so we release key info
+       * right away.
+       */
       jam();
       sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
                  NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
@@ -5183,32 +5323,24 @@ Dbspj::scanIndex_send(Signal* signal,
     }
     else
     {
+      /**
+       * Reuse key info for multiple fragments and/or multiple repetitions
+       * of the scan.
+       */
       jam();
       sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
                           NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
     }
     handle.clear();
 
-    i++;
     fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
     data.m_frags_outstanding++;
     batchRange += bs_rows;
-  }
+    requestsSent++;
+    list.next(fragPtr);
+  } // while (requestsSent < noOfFrags)
 
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    ndbrequire(data.m_frags_outstanding == 
-               data.m_fragCount - data.m_frags_complete);
-  }
-  else
-  {
-    ndbrequire(data.m_frags_outstanding == 1);
-  }
-
-  data.m_batch_chunks = 1;
-  requestPtr.p->m_cnt_active++;
-  requestPtr.p->m_outstanding++;
-  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
+  data.m_frags_not_started -= requestsSent;
 }
 
 void
@@ -5282,6 +5414,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
   }
 
   requestPtr.p->m_rows += rows;
+  data.m_totalRows += rows;
+  data.m_totalBytes += conf->total_len;
+  data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
+  data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
 
   if (!treeNodePtr.p->isLeaf())
   {
@@ -5302,7 +5438,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
     ndbrequire(data.m_frags_complete < data.m_fragCount);
     data.m_frags_complete++;
 
-    if (data.m_frags_complete == data.m_fragCount)
+    if (data.m_frags_complete == data.m_fragCount ||
+        ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
+         data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
     {
       jam();
       ndbrequire(requestPtr.p->m_cnt_active);
@@ -5314,6 +5452,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
 
   if (data.m_frags_outstanding == 0)
   {
+    const ScanFragReq * const org
+      = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
+
+    if (data.m_frags_complete == data.m_fragCount)
+    {
+      jam();
+      /**
+       * Calculate what would have been the optimal parallelism for the
+       * scan instance that we have just completed, and update
+       * 'parallelismStat' with this value. We then use this statistics to set
+       * the initial parallelism for the next instance of this operation.
+       */
+      double parallelism = data.m_fragCount;
+      if (data.m_totalRows > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_rows) / data.m_totalRows);
+      }
+      if (data.m_totalBytes > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_bytes) / data.m_totalBytes);
+      }
+      data.m_parallelismStat.update(parallelism);
+    }
+
     /**
      * Don't reportBatchComplete to children if we're aborting...
      */
@@ -5364,7 +5528,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
   ndbrequire(data.m_frags_outstanding > 0);
   data.m_frags_outstanding--;
 
-  if (data.m_frags_complete == data.m_fragCount)
+  if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5390,21 +5554,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   jam();
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
 
   data.m_rows_received = 0;
   data.m_rows_expecting = 0;
   ndbassert(data.m_frags_outstanding == 0);
 
   ndbrequire(data.m_frags_complete < data.m_fragCount);
-  Uint32 cnt = data.m_fragCount - data.m_frags_complete;
   if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
   {
     jam();
-    cnt = 1;
+    /**
+     * Since fetching few but large batches is more efficient, we
+     * set parallelism to the lowest value where we can still expect each
+     * batch to be full.
+     */
+    if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
+        data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
+    {
+      jam();
+      data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+      if (data.m_largestBatchRows > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(org->batch_size_rows / data.m_largestBatchRows,
+              data.m_parallelism);
+      }
+      if (data.m_largestBatchBytes > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(data.m_parallelism,
+              org->batch_size_bytes/data.m_largestBatchBytes);
+      }
+      if (data.m_frags_complete == 0 &&
+          data.m_frags_not_started % data.m_parallelism != 0)
+      {
+        jam();
+        /**
+         * Set parallelism such that we can expect to have similar
+         * parallelism in each batch. For example if there are 8 remaining
+         * fragments, then we should fecth 2 times 4 fragments rather than
+         * 7+1.
+         */
+        const Uint32 roundTrips =
+          1 + data.m_frags_not_started / data.m_parallelism;
+        data.m_parallelism = data.m_frags_not_started / roundTrips;
+      }
+    }
+    else
+    {
+      jam();
+      // We get full batches, so we should lower parallelism.
+      data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+                               MAX(1, data.m_parallelism/2));
+    }
+    ndbassert(data.m_parallelism > 0);
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
+          data.m_parallelism <<
+          " fragments with " << org->batch_size_rows/data.m_parallelism <<
+          " rows and " << org->batch_size_bytes/data.m_parallelism <<
+          " bytes.");
+#endif
+  }
+  else
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
   }
 
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-  const Uint32 bs_rows = org->batch_size_rows/cnt;
+  const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
   ndbassert(bs_rows > 0);
   ScanFragNextReq* req =
     reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
@@ -5413,20 +5634,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   req->transId1 = requestPtr.p->m_transId[0];
   req->transId2 = requestPtr.p->m_transId[1];
   req->batch_size_rows = bs_rows;
-  req->batch_size_bytes = org->batch_size_bytes/cnt;
+  req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
 
   Uint32 batchRange = 0;
   Ptr<ScanFragHandle> fragPtr;
+  Uint32 sentFragCount = 0;
+  {
+    /**
+     * First, ask for more data from fragments that are already started.
+     */
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
   list.first(fragPtr);
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
+    while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
   {
     jam();
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
     if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
     {
       jam();
 
-      i++;
       data.m_frags_outstanding++;
       req->variableData[0] = batchRange;
       fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
@@ -5434,6 +5662,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
 
       DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
             << treeNodePtr.p->m_send.m_ref
+              << ", m_node_no=" << treeNodePtr.p->m_node_no
             << ", senderData: " << req->senderData);
 
 #ifdef DEBUG_SCAN_FRAGREQ
@@ -5445,9 +5674,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
       sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
                  ScanFragNextReq::SignalLength + 1,
                  JBB);
+        sentFragCount++;
+      }
+      list.next(fragPtr);
     }
   }
 
+  if (sentFragCount < data.m_parallelism)
+  {
+    /**
+     * Then start new fragments until we reach data.m_parallelism.
+     */
+    jam();
+    ndbassert(data.m_frags_not_started != 0);
+    scanIndex_send(signal,
+                   requestPtr,
+                   treeNodePtr,
+                   data.m_parallelism - sentFragCount,
+                   org->batch_size_bytes/data.m_parallelism,
+                   bs_rows,
+                   batchRange);
+  }
   /**
    * cursor should not have been positioned here...
    *   unless we actually had something more to send.
@@ -5544,14 +5791,28 @@ Dbspj::scanIndex_abort(Signal* signal,
     }
   }
 
-  ndbrequire(cnt_waiting + cnt_scanning > 0);
   if (cnt_scanning == 0)
   {
-    /**
-     * If all were waiting...this should increase m_outstanding
-     */
-    jam();
-    requestPtr.p->m_outstanding++;
+    if (cnt_waiting > 0)
+    {
+      /**
+       * If all were waiting...this should increase m_outstanding
+       */
+      jam();
+      requestPtr.p->m_outstanding++;
+    }
+    else
+    {
+      /**
+       * All fragments are either complete or not yet started, so there is
+       * nothing to abort.
+       */
+      jam();
+      ndbassert(data.m_frags_not_started > 0);
+      ndbrequire(requestPtr.p->m_cnt_active);
+      requestPtr.p->m_cnt_active--;
+      treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+    }
   }
 }
 
@@ -5603,6 +5864,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
       jam();
       ndbrequire(data.m_frags_complete < data.m_fragCount);
       data.m_frags_complete++;
+      ndbrequire(data.m_frags_not_started > 0);
+      data.m_frags_not_started--;
       // fall through
     case ScanFragHandle::SFH_COMPLETE:
       jam();
@@ -5637,8 +5900,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
     requestPtr.p->m_outstanding--;
   }
 
-  if (save1 != data.m_fragCount
-      && data.m_frags_complete == data.m_fragCount)
+  if (save1 != 0 &&
+      data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5654,7 +5917,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr<R
                                    Ptr<TreeNode> treeNodePtr)
 {
   jam();
-  DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+  DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
+          << " m_node_no: " << treeNodePtr.p->m_node_no);
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
@@ -7115,3 +7379,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s
 
   ndbinfo_send_scan_conf(signal, req, rl);
 } // Dbspj::execDBINFO_SCANREQ(Signal *signal)
+
+void Dbspj::IncrementalStatistics::update(double sample)
+{
+  // Prevent wrap-around
+  if(m_noOfSamples < 0xffffffff)
+  {
+    m_noOfSamples++;
+    const double delta = sample - m_mean;
+    m_mean += delta/m_noOfSamples;
+    m_sumSquare +=  delta * (sample - m_mean);
+  }
+}

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-18 11:47:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-22 08:35:35 +0000
@@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen
 static const int Err_UnknownColumn = 4004;
 static const int Err_ReceiveTimedOut = 4008;
 static const int Err_NodeFailCausedAbort = 4028;
+static const int Err_ParameterError = 4118;
 static const int Err_SimpleDirtyReadFailed = 4119;
 static const int Err_WrongFieldLength = 4209;
 static const int Err_ReadTooMuch = 4257;
@@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff
 /** Set to true to trace incomming signals.*/
 const bool traceSignals = false;
 
+enum
+{
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * scan parallelism should be adaptive.
+   */
+  Parallelism_adaptive = 0xffff0000,
+
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * all fragments should be scanned in parallel.
+   */
+  Parallelism_max = 0xffff0001
+};
+
 /**
  * A class for accessing the correlation data at the end of a tuple (for 
  * scan queries). These data have the following layout:
@@ -1490,6 +1506,14 @@ int NdbQueryOperation::setParallelism(Ui
   return m_impl.setParallelism(parallelism);
 }
 
+int NdbQueryOperation::setMaxParallelism(){
+  return m_impl.setMaxParallelism();
+}
+
+int NdbQueryOperation::setAdaptiveParallelism(){
+  return m_impl.setAdaptiveParallelism();
+}
+
 int NdbQueryOperation::setBatchSize(Uint32 batchSize){
   return m_impl.setBatchSize(batchSize);
 }
@@ -2623,16 +2647,17 @@ NdbQueryImpl::prepareSend()
   {
     /* For the first batch, we read from all fragments for both ordered 
      * and unordered scans.*/
-    if (getQueryOperation(0U).m_parallelism > 0)
+    if (getQueryOperation(0U).m_parallelism == Parallelism_max)
     {
       m_rootFragCount
-        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
-              getQueryOperation(0U).m_parallelism);
+        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
     }
     else
     {
+      assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
       m_rootFragCount
-        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
+        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
+              getQueryOperation(0U).m_parallelism);
     }
     Ndb* const ndb = m_transaction.getNdb();
 
@@ -3667,7 +3692,8 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_ordering(NdbQueryOptions::ScanOrdering_unordered),
   m_interpretedCode(NULL),
   m_diskInUserProjection(false),
-  m_parallelism(0),
+  m_parallelism(def.getQueryOperationIx() == 0
+                ? Parallelism_max : Parallelism_adaptive),
   m_rowSize(0xffffffff)
 { 
   if (errno == ENOMEM)
@@ -4266,9 +4292,10 @@ NdbQueryOperationImpl
                                       m_ndbRecord,
                                       m_firstRecAttr,
                                       0, // Key size.
-                                      getRoot().m_parallelism > 0 ?
-                                      getRoot().m_parallelism :
-                                      m_queryImpl.getRootFragCount(),
+                                      getRoot().m_parallelism
+                                      == Parallelism_max ?
+                                      m_queryImpl.getRootFragCount() :
+                                      getRoot().m_parallelism,
                                       maxBatchRows,
                                       batchByteSize,
                                       firstBatchRows);
@@ -4454,7 +4481,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U
                                       firstBatchRows);
     assert(batchRows==getMaxBatchRows());
     assert(batchRows==firstBatchRows);
-    requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
+    assert(m_parallelism == Parallelism_max ||
+           m_parallelism == Parallelism_adaptive);
+    if (m_parallelism == Parallelism_max)
+    {
+      requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
+    }
     param->requestInfo = requestInfo; 
     param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
     param->resultData = getIdOfReceiver();
@@ -4958,7 +4990,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu
     return -1;
   }
 
-  if (m_parallelism != 0)
+  if (m_parallelism != Parallelism_max)
   {
     getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
     return -1;
@@ -5053,10 +5085,40 @@ int NdbQueryOperationImpl::setParallelis
     getQuery().setErrorCode(Err_FunctionNotImplemented);
     return -1;
   }
+  else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
+  {
+    getQuery().setErrorCode(Err_ParameterError);
+    return -1;
+  }
   m_parallelism = parallelism;
   return 0;
 }
 
+int NdbQueryOperationImpl::setMaxParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  m_parallelism = Parallelism_max;
+  return 0;
+}
+
+int NdbQueryOperationImpl::setAdaptiveParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  else if (getQueryOperationDef().getQueryOperationIx() == 0)
+  {
+    getQuery().setErrorCode(Err_FunctionNotImplemented);
+    return -1;
+  }
+  m_parallelism = Parallelism_adaptive;
+  return 0;
+}
+
 int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
   if (!getQueryOperationDef().isScanOperation())
   {

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-08-22 08:35:35 +0000
@@ -335,13 +335,29 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const;
 
   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-17 13:16:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-22 08:35:35 +0000
@@ -661,14 +661,30 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const
   { return m_ordering; }
 
-   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+  /**
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (jan.wedvik:4471 to 4472) Jan Wedvik22 Aug