List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:August 25 2011 6:30am
Subject:bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4259 to 4260)
View as plain text  
 4260 Jonas Oreland	2011-08-25 [merge]
      ndb - merge 70 to 71

    modified:
      sql/ha_ndbcluster.cc
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.hpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
 4259 jonas oreland	2011-08-19 [merge]
      ndb - merge 70 to 71

    modified:
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-08-17 09:23:42 +0000
+++ b/sql/ha_ndbcluster.cc	2011-08-25 06:30:20 +0000
@@ -164,6 +164,11 @@ static MYSQL_THDVAR_ULONG(
   0                                  /* block */
 );
 
+#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
+#define DEFAULT_NDB_INDEX_STAT_ENABLE FALSE
+#else
+#define DEFAULT_NDB_INDEX_STAT_ENABLE TRUE
+#endif
 
 static MYSQL_THDVAR_BOOL(
   index_stat_enable,                 /* name */
@@ -171,7 +176,7 @@ static MYSQL_THDVAR_BOOL(
   "Use ndb index statistics in query optimization.",
   NULL,                              /* check func. */
   NULL,                              /* update func. */
-  FALSE                              /* default */
+  DEFAULT_NDB_INDEX_STAT_ENABLE      /* default */
 );
 
 

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-08-25 06:30:20 +0000
@@ -530,21 +530,87 @@ public:
   typedef SLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> ScanFragHandle_list;
   typedef LocalSLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> Local_ScanFragHandle_list;
 
+  /**
+   * This class computes mean and standard deviation incrementally for a series
+   * of samples.
+   */
+  class IncrementalStatistics
+  {
+  public:
+    /**
+     * We cannot have a (non-trivial) constructor, since this class is used in
+     * unions.
+     */
+    void init()
+    {
+      m_mean = m_sumSquare = 0.0;
+      m_noOfSamples = 0;
+    }
+
+    // Add another sample.
+    void update(double sample);
+
+    double getMean() const { return m_mean; }
+
+    double getStdDev() const { 
+      return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1));
+    }
+
+  private:
+    // Mean of all samples
+    double m_mean;
+    //Sum of square of differences from the current mean.
+    double m_sumSquare;
+    Uint32 m_noOfSamples;
+  }; // IncrementalStatistics
+
   struct ScanIndexData
   {
     Uint16 m_frags_complete;
     Uint16 m_frags_outstanding;
+    /**
+     * The number of fragment for which we have not yet sent SCAN_FRAGREQ but
+     * will eventually do so.
+     */
+    Uint16 m_frags_not_started;
     Uint32 m_rows_received;  // #execTRANSID_AI
     Uint32 m_rows_expecting; // Sum(ScanFragConf)
     Uint32 m_batch_chunks;   // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
     Uint32 m_scanCookie;
     Uint32 m_fragCount;
+    // The number of fragments that we scan in parallel.
+    Uint32 m_parallelism;
+    /**
+     * True if this is the first instantiation of this operation. A child
+     * operation will be instantiated once for each batch of its parent.
+     */
+    bool m_firstExecution;
+    /**
+     * Mean and standard deviation for the optimal parallelism for earlier
+     * executions of this operation.
+     */
+    IncrementalStatistics m_parallelismStat;
+    // Total number of rows for the current execution of this operation.
+    Uint32 m_totalRows;
+    // Total number of bytes for the current execution of this operation.
+    Uint32 m_totalBytes;
+
     ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
     union
     {
       PatternStore::HeadPOD m_prunePattern;
       Uint32 m_constPrunePtrI;
     };
+    /**
+     * Max number of rows seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchRows;
+    /**
+     * Max number of bytes seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchBytes;
     Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
   };
 
@@ -1164,6 +1230,13 @@ private:
   void scanIndex_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr&);
   void scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32);
   void scanIndex_send(Signal*,Ptr<Request>,Ptr<TreeNode>);
+  void scanIndex_send(Signal* signal,
+                      Ptr<Request> requestPtr,
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange);
   void scanIndex_batchComplete(Signal* signal);
   Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
                             Uint32 fragId);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-07-04 16:30:34 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-08-25 06:30:20 +0000
@@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx
     data.m_fragments.init();
     data.m_frags_outstanding = 0;
     data.m_frags_complete = 0;
+    data.m_frags_not_started = 0;
+    data.m_parallelismStat.init();
+    data.m_firstExecution = true;
     data.m_batch_chunks = 0;
 
     err = parseDA(ctx, requestPtr, treeNodePtr,
@@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S
       }
     }
   }
+  data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
 
   if (data.m_frags_complete == data.m_fragCount)
   {
@@ -5015,7 +5019,120 @@ Dbspj::scanIndex_parent_batch_complete(S
   /**
    * When parent's batch is complete, we send our batch
    */
-  scanIndex_send(signal, requestPtr, treeNodePtr);
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
+  ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+  }
+  else if (data.m_firstExecution)
+  {
+    /**
+     * Having a high parallelism would allow us to fetch data from many
+     * fragments in parallel and thus reduce the number of round trips.
+     * On the other hand, we should set parallelism so low that we can fetch
+     * all data from a fragment in one batch if possible.
+     * Since this is the first execution, we do not know how many rows or bytes
+     * this operation is likely to return. Therefore we set parallelism to 1,
+     * since this gives the lowest penalty if our guess is wrong.
+     */
+    jam();
+    data.m_parallelism = 1;
+  }
+  else
+  {
+    jam();
+    /**
+     * Use statistics from earlier runs of this operation to estimate the
+     * initial parallelism. We use the mean minus two times the standard
+     * deviation to have a low risk of setting parallelism to high (as erring
+     * in the other direction is more costly).
+     */
+    Int32 parallelism = 
+      static_cast<Int32>(data.m_parallelismStat.getMean()
+                         - 2 * data.m_parallelismStat.getStdDev());
+
+    if (parallelism < 1)
+    {
+      jam();
+      parallelism = 1;
+    }
+    else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
+    {
+      jam();
+      /**
+       * Set parallelism such that we can expect to have similar
+       * parallelism in each batch. For example if there are 8 remaining
+       * fragments, then we should fecth 2 times 4 fragments rather than
+       * 7+1.
+       */
+      const Int32 roundTrips =
+        1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
+      parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
+    }
+
+    data.m_parallelism = static_cast<Uint32>(parallelism);
+
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_send() starting index scan with parallelism="
+          << data.m_parallelism);
+#endif
+  }
+  ndbrequire(data.m_parallelism > 0);
+
+  const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
+  const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
+  ndbassert(bs_rows > 0);
+  ndbassert(bs_bytes > 0);
+
+  data.m_largestBatchRows = 0;
+  data.m_largestBatchBytes = 0;
+  data.m_totalRows = 0;
+  data.m_totalBytes = 0;
+
+  {
+    Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+    Ptr<ScanFragHandle> fragPtr;
+    list.first(fragPtr);
+
+    while(!fragPtr.isNull())
+    {
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
+      fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
+      list.next(fragPtr);
+    }
+  }
+
+  Uint32 batchRange = 0;
+  scanIndex_send(signal,
+                 requestPtr,
+                 treeNodePtr,
+                 data.m_parallelism,
+                 bs_bytes,
+                 bs_rows,
+                 batchRange);
+
+  data.m_firstExecution = false;
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
+               data.m_fragCount);
+  }
+  else
+  {
+    ndbrequire(static_cast<Uint32>(data.m_frags_outstanding + 
+                                   data.m_frags_complete) <=
+               data.m_fragCount);
+  }
+
+  data.m_batch_chunks = 1;
+  requestPtr.p->m_cnt_active++;
+  requestPtr.p->m_outstanding++;
+  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
 }
 
 void
@@ -5045,77 +5162,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
   }
 }
 
+/**
+ * Ask for the first batch for a number of fragments.
+ */
 void
 Dbspj::scanIndex_send(Signal* signal,
                       Ptr<Request> requestPtr,
-                      Ptr<TreeNode> treeNodePtr)
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange)
 {
-  jam();
-
-  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-
-  Uint32 cnt = 1;
-  Uint32 bs_rows = org->batch_size_rows;
-  Uint32 bs_bytes = org->batch_size_bytes;
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    jam();
-    cnt = data.m_fragCount - data.m_frags_complete;
-    ndbrequire(cnt > 0);
-
-    bs_rows /= cnt;
-    bs_bytes /= cnt;
-    ndbassert(bs_rows > 0);
-  }
-
   /**
    * if (m_bits & prunemask):
    * - Range keys sliced out to each ScanFragHandle
    * - Else, range keys kept on first (and only) ScanFragHandle
    */
-  Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
+  const bool prune = treeNodePtr.p->m_bits &
+    (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
 
   /**
-   * Don't release keyInfo if it may be sent multiple times, eiter:
-   *   - Not pruned -> same keyInfo goes to all datanodes.
-   *   - Result handling is REPEAT_SCAN_RESULT and same batch may be 
-   *     repeated multiple times due to incomplete bushy X-scans.
-   *     (by ::scanIndex_parent_batch_repeat())
-   *
-   * When not released, ::scanIndex_parent_batch_cleanup() will 
-   * eventually release them when preparing arrival of a new parent batch.
+   * If scan is repeatable, we must make sure not to release range keys so
+   * that we canuse them again in the next repetition.
    */
-  const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
-                        (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
+  const bool repeatable =
+    (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
 
-  ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  ndbassert(noOfFrags > 0);
+  ndbassert(data.m_frags_not_started >= noOfFrags);
+  ScanFragReq* const req =
+    reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  const ScanFragReq * const org
+    = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
   memcpy(req, org, sizeof(data.m_scanFragReq));
   // req->variableData[0] // set below
   req->variableData[1] = requestPtr.p->m_rootResultData;
   req->batch_size_bytes = bs_bytes;
   req->batch_size_rows = bs_rows;
 
-  Ptr<ScanFragHandle> fragPtr;
+  Uint32 requestsSent = 0;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
-
-  Uint32 keyInfoPtrI = RNIL;
+  Ptr<ScanFragHandle> fragPtr;
   list.first(fragPtr);
-  if ((treeNodePtr.p->m_bits & prunemask) == 0)
+  Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
+  ndbrequire(prune || keyInfoPtrI != RNIL);
+  /**
+   * Iterate over the list of fragments until we have sent as many
+   * SCAN_FRAGREQs as we should.
+   */
+  while (requestsSent < noOfFrags)
   {
     jam();
-    keyInfoPtrI = fragPtr.p->m_rangePtrI;
-    ndbrequire(keyInfoPtrI != RNIL);
-  }
+    ndbassert(!fragPtr.isNull());
 
-  Uint32 batchRange = 0;
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
-  {
-    jam();
+    if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
+    {
+      // Skip forward to the frags that we should send.
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
+
+    const Uint32 ref = fragPtr.p->m_ref;
+
+    if (noOfFrags==1 && !prune &&
+        data.m_frags_not_started == data.m_fragCount &&
+        refToNode(ref) != getOwnNodeId() &&
+        list.hasNext(fragPtr))
+    {
+      /**
+       * If we are doing a scan with adaptive parallelism and start with
+       * parallelism=1 then it makes sense to fetch a batch from a fragment on
+       * the local data node. The reason for this is that if that fragment
+       * contains few rows, we may be able to read from several fragments in
+       * parallel. Then we minimize the total number of round trips (to remote
+       * data nodes) if we fetch the first fragment batch locally.
+       */
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
 
     SectionHandle handle(this);
 
-    Uint32 ref = fragPtr.p->m_ref;
     Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
 
     /**
@@ -5124,26 +5255,34 @@ Dbspj::scanIndex_send(Signal* signal,
     req->senderData = fragPtr.i;
     req->fragmentNoKeyLen = fragPtr.p->m_fragId;
 
-    if ((treeNodePtr.p->m_bits & prunemask))
+    if (prune)
     {
       jam();
       keyInfoPtrI = fragPtr.p->m_rangePtrI;
       if (keyInfoPtrI == RNIL)
       {
+        /**
+         * Since we use pruning, we can see that no parent rows would hash
+         * to this fragment.
+         */
         jam();
         fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
+        list.next(fragPtr);
         continue;
       }
-    }
-    if (release)
-    {
-      /**
-       * If we'll use sendSignal() and we need to send the attrInfo several
-       *   times, we need to copy them
-       */
-      Uint32 tmp = RNIL;
-      ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
-      attrInfoPtrI = tmp;
+
+      if (!repeatable)
+      {
+        /**
+         * If we'll use sendSignal() and we need to send the attrInfo several
+         * times, we need to copy them. (For repeatable or unpruned scans
+         * we use sendSignalNoRelease(), so then we do not need to copy.)
+         */
+        jam();
+        Uint32 tmp = RNIL;
+        ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
+        attrInfoPtrI = tmp;
+      }
     }
 
     req->variableData[0] = batchRange;
@@ -5152,16 +5291,14 @@ Dbspj::scanIndex_send(Signal* signal,
     handle.m_cnt = 2;
 
 #if defined DEBUG_SCAN_FRAGREQ
-    {
-      ndbout_c("SCAN_FRAGREQ to %x", ref);
-      printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
-                        NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
-                        DBLQH);
-      printf("ATTRINFO: ");
-      print(handle.m_ptr[0], stdout);
-      printf("KEYINFO: ");
-      print(handle.m_ptr[1], stdout);
-    }
+    ndbout_c("SCAN_FRAGREQ to %x", ref);
+    printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
+                      NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+                      DBLQH);
+    printf("ATTRINFO: ");
+    print(handle.m_ptr[0], stdout);
+    printf("KEYINFO: ");
+    print(handle.m_ptr[1], stdout);
 #endif
 
     if (refToNode(ref) == getOwnNodeId())
@@ -5173,8 +5310,13 @@ Dbspj::scanIndex_send(Signal* signal,
       c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
     }
 
-    if (release)
+    if (prune && !repeatable)
     {
+      /**
+       * For a non-repeatable pruned scan, key info is unique for each
+       * fragment and therefore cannot be reused, so we release key info
+       * right away.
+       */
       jam();
       sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
                  NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
@@ -5183,32 +5325,24 @@ Dbspj::scanIndex_send(Signal* signal,
     }
     else
     {
+      /**
+       * Reuse key info for multiple fragments and/or multiple repetitions
+       * of the scan.
+       */
       jam();
       sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
                           NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
     }
     handle.clear();
 
-    i++;
     fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
     data.m_frags_outstanding++;
     batchRange += bs_rows;
-  }
+    requestsSent++;
+    list.next(fragPtr);
+  } // while (requestsSent < noOfFrags)
 
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    ndbrequire(data.m_frags_outstanding == 
-               data.m_fragCount - data.m_frags_complete);
-  }
-  else
-  {
-    ndbrequire(data.m_frags_outstanding == 1);
-  }
-
-  data.m_batch_chunks = 1;
-  requestPtr.p->m_cnt_active++;
-  requestPtr.p->m_outstanding++;
-  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
+  data.m_frags_not_started -= requestsSent;
 }
 
 void
@@ -5282,6 +5416,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
   }
 
   requestPtr.p->m_rows += rows;
+  data.m_totalRows += rows;
+  data.m_totalBytes += conf->total_len;
+  data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
+  data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
 
   if (!treeNodePtr.p->isLeaf())
   {
@@ -5302,7 +5440,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
     ndbrequire(data.m_frags_complete < data.m_fragCount);
     data.m_frags_complete++;
 
-    if (data.m_frags_complete == data.m_fragCount)
+    if (data.m_frags_complete == data.m_fragCount ||
+        ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
+         data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
     {
       jam();
       ndbrequire(requestPtr.p->m_cnt_active);
@@ -5314,6 +5454,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
 
   if (data.m_frags_outstanding == 0)
   {
+    const ScanFragReq * const org
+      = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
+
+    if (data.m_frags_complete == data.m_fragCount)
+    {
+      jam();
+      /**
+       * Calculate what would have been the optimal parallelism for the
+       * scan instance that we have just completed, and update
+       * 'parallelismStat' with this value. We then use this statistics to set
+       * the initial parallelism for the next instance of this operation.
+       */
+      double parallelism = data.m_fragCount;
+      if (data.m_totalRows > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_rows) / data.m_totalRows);
+      }
+      if (data.m_totalBytes > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_bytes) / data.m_totalBytes);
+      }
+      data.m_parallelismStat.update(parallelism);
+    }
+
     /**
      * Don't reportBatchComplete to children if we're aborting...
      */
@@ -5364,7 +5530,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
   ndbrequire(data.m_frags_outstanding > 0);
   data.m_frags_outstanding--;
 
-  if (data.m_frags_complete == data.m_fragCount)
+  if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5390,21 +5556,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   jam();
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
 
   data.m_rows_received = 0;
   data.m_rows_expecting = 0;
   ndbassert(data.m_frags_outstanding == 0);
 
   ndbrequire(data.m_frags_complete < data.m_fragCount);
-  Uint32 cnt = data.m_fragCount - data.m_frags_complete;
   if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
   {
     jam();
-    cnt = 1;
+    /**
+     * Since fetching few but large batches is more efficient, we
+     * set parallelism to the lowest value where we can still expect each
+     * batch to be full.
+     */
+    if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
+        data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
+    {
+      jam();
+      data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+      if (data.m_largestBatchRows > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(org->batch_size_rows / data.m_largestBatchRows,
+              data.m_parallelism);
+      }
+      if (data.m_largestBatchBytes > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(data.m_parallelism,
+              org->batch_size_bytes/data.m_largestBatchBytes);
+      }
+      if (data.m_frags_complete == 0 &&
+          data.m_frags_not_started % data.m_parallelism != 0)
+      {
+        jam();
+        /**
+         * Set parallelism such that we can expect to have similar
+         * parallelism in each batch. For example if there are 8 remaining
+         * fragments, then we should fecth 2 times 4 fragments rather than
+         * 7+1.
+         */
+        const Uint32 roundTrips =
+          1 + data.m_frags_not_started / data.m_parallelism;
+        data.m_parallelism = data.m_frags_not_started / roundTrips;
+      }
+    }
+    else
+    {
+      jam();
+      // We get full batches, so we should lower parallelism.
+      data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+                               MAX(1, data.m_parallelism/2));
+    }
+    ndbassert(data.m_parallelism > 0);
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
+          data.m_parallelism <<
+          " fragments with " << org->batch_size_rows/data.m_parallelism <<
+          " rows and " << org->batch_size_bytes/data.m_parallelism <<
+          " bytes.");
+#endif
+  }
+  else
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
   }
 
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-  const Uint32 bs_rows = org->batch_size_rows/cnt;
+  const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
   ndbassert(bs_rows > 0);
   ScanFragNextReq* req =
     reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
@@ -5413,20 +5636,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   req->transId1 = requestPtr.p->m_transId[0];
   req->transId2 = requestPtr.p->m_transId[1];
   req->batch_size_rows = bs_rows;
-  req->batch_size_bytes = org->batch_size_bytes/cnt;
+  req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
 
   Uint32 batchRange = 0;
   Ptr<ScanFragHandle> fragPtr;
+  Uint32 sentFragCount = 0;
+  {
+    /**
+     * First, ask for more data from fragments that are already started.
+     */
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
   list.first(fragPtr);
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
+    while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
   {
     jam();
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
     if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
     {
       jam();
 
-      i++;
       data.m_frags_outstanding++;
       req->variableData[0] = batchRange;
       fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
@@ -5434,6 +5664,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
 
       DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
             << treeNodePtr.p->m_send.m_ref
+              << ", m_node_no=" << treeNodePtr.p->m_node_no
             << ", senderData: " << req->senderData);
 
 #ifdef DEBUG_SCAN_FRAGREQ
@@ -5445,9 +5676,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
       sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
                  ScanFragNextReq::SignalLength + 1,
                  JBB);
+        sentFragCount++;
+      }
+      list.next(fragPtr);
     }
   }
 
+  if (sentFragCount < data.m_parallelism)
+  {
+    /**
+     * Then start new fragments until we reach data.m_parallelism.
+     */
+    jam();
+    ndbassert(data.m_frags_not_started != 0);
+    scanIndex_send(signal,
+                   requestPtr,
+                   treeNodePtr,
+                   data.m_parallelism - sentFragCount,
+                   org->batch_size_bytes/data.m_parallelism,
+                   bs_rows,
+                   batchRange);
+  }
   /**
    * cursor should not have been positioned here...
    *   unless we actually had something more to send.
@@ -5544,14 +5793,28 @@ Dbspj::scanIndex_abort(Signal* signal,
     }
   }
 
-  ndbrequire(cnt_waiting + cnt_scanning > 0);
   if (cnt_scanning == 0)
   {
-    /**
-     * If all were waiting...this should increase m_outstanding
-     */
-    jam();
-    requestPtr.p->m_outstanding++;
+    if (cnt_waiting > 0)
+    {
+      /**
+       * If all were waiting...this should increase m_outstanding
+       */
+      jam();
+      requestPtr.p->m_outstanding++;
+    }
+    else
+    {
+      /**
+       * All fragments are either complete or not yet started, so there is
+       * nothing to abort.
+       */
+      jam();
+      ndbassert(data.m_frags_not_started > 0);
+      ndbrequire(requestPtr.p->m_cnt_active);
+      requestPtr.p->m_cnt_active--;
+      treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+    }
   }
 }
 
@@ -5603,6 +5866,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
       jam();
       ndbrequire(data.m_frags_complete < data.m_fragCount);
       data.m_frags_complete++;
+      ndbrequire(data.m_frags_not_started > 0);
+      data.m_frags_not_started--;
       // fall through
     case ScanFragHandle::SFH_COMPLETE:
       jam();
@@ -5637,8 +5902,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
     requestPtr.p->m_outstanding--;
   }
 
-  if (save1 != data.m_fragCount
-      && data.m_frags_complete == data.m_fragCount)
+  if (save1 != 0 &&
+      data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5654,7 +5919,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr<R
                                    Ptr<TreeNode> treeNodePtr)
 {
   jam();
-  DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+  DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
+          << " m_node_no: " << treeNodePtr.p->m_node_no);
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
@@ -7115,3 +7381,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s
 
   ndbinfo_send_scan_conf(signal, req, rl);
 } // Dbspj::execDBINFO_SCANREQ(Signal *signal)
+
+void Dbspj::IncrementalStatistics::update(double sample)
+{
+  // Prevent wrap-around
+  if(m_noOfSamples < 0xffffffff)
+  {
+    m_noOfSamples++;
+    const double delta = sample - m_mean;
+    m_mean += delta/m_noOfSamples;
+    m_sumSquare +=  delta * (sample - m_mean);
+  }
+}

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-18 11:47:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-22 12:56:56 +0000
@@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen
 static const int Err_UnknownColumn = 4004;
 static const int Err_ReceiveTimedOut = 4008;
 static const int Err_NodeFailCausedAbort = 4028;
+static const int Err_ParameterError = 4118;
 static const int Err_SimpleDirtyReadFailed = 4119;
 static const int Err_WrongFieldLength = 4209;
 static const int Err_ReadTooMuch = 4257;
@@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff
 /** Set to true to trace incomming signals.*/
 const bool traceSignals = false;
 
+enum
+{
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * scan parallelism should be adaptive.
+   */
+  Parallelism_adaptive = 0xffff0000,
+
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * all fragments should be scanned in parallel.
+   */
+  Parallelism_max = 0xffff0001
+};
+
 /**
  * A class for accessing the correlation data at the end of a tuple (for 
  * scan queries). These data have the following layout:
@@ -358,9 +374,6 @@ public:
   Uint32 getRowCount() const
   { return m_rowCount; }
 
-  char* getRow(Uint32 tupleNo) const
-  { return (m_buffer + (tupleNo*m_rowSize)); }
-
 private:
   /** No copying.*/
   NdbResultSet(const NdbResultSet&);
@@ -372,6 +385,9 @@ private:
   /** Used for checking if buffer overrun occurred. */
   Uint32* m_batchOverflowCheck;
 
+  /** Array of TupleCorrelations for all rows in m_buffer */
+  TupleCorrelation* m_correlations;
+
   Uint32 m_rowSize;
 
   /** The current #rows in 'm_buffer'.*/
@@ -415,8 +431,8 @@ public:
   const NdbReceiver& getReceiver() const
   { return m_receiver; }
 
-  const char* getCurrentRow() const
-  { return m_receiver.peek_row(); }
+  const char* getCurrentRow()
+  { return m_receiver.get_row(); }
 
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
@@ -653,6 +669,7 @@ void* NdbBulkAllocator::allocObjMem(Uint
 NdbResultSet::NdbResultSet() :
   m_buffer(NULL),
   m_batchOverflowCheck(NULL),
+  m_correlations(NULL),
   m_rowSize(0),
   m_rowCount(0)
 {}
@@ -672,6 +689,12 @@ NdbResultSet::init(NdbQueryImpl& query,
     m_batchOverflowCheck = 
       reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
     *m_batchOverflowCheck = 0xacbd1234;
+
+    if (query.getQueryDef().isScanQuery())
+    {
+      m_correlations = reinterpret_cast<TupleCorrelation*>
+                         (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
+    }
   }
 }
 
@@ -856,19 +879,17 @@ void
 NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
                                 TupleCorrelation correlation)
 {
-  m_receiver.execTRANSID_AI(ptr, len);
-  m_resultSets[m_recv].m_rowCount++;
-
+  NdbResultSet& receiveSet = m_resultSets[m_recv];
   if (isScanQuery())
   {
     /**
-     * Store TupleCorrelation as hidden value imm. after received row
-     * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
+     * Store TupleCorrelation.
      */
-    Uint32* row_recv = reinterpret_cast<Uint32*>
-                         (m_receiver.m_record.m_row_recv);
-    row_recv[-1] = correlation.toUint32();
+    receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
   }
+
+  m_receiver.execTRANSID_AI(ptr, len);
+  receiveSet.m_rowCount++;
 } // NdbResultStream::execTRANSID_AI()
 
 /**
@@ -994,8 +1015,8 @@ NdbResultStream::prepareResultSet(Uint32
 
 /**
  * Fill m_tupleSet[] with correlation data between parent 
- * and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row in the NdbResultSet
+ * and child tuples. The 'TupleCorrelation' is stored
+ * in an array of TupleCorrelations in each ResultSet
  * by execTRANSID_AI().
  *
  * NOTE: In order to reduce work done when holding the 
@@ -1024,12 +1045,9 @@ NdbResultStream::buildResultCorrelations
     /* Rebuild correlation & hashmap from 'readResult' */
     for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
     {
-      const Uint32* row = (Uint32*)readResult.getRow(tupleNo+1);
-      const TupleCorrelation correlation(row[-1]);
-
-      const Uint16 tupleId  = correlation.getTupleId();
+      const Uint16 tupleId  = readResult.m_correlations[tupleNo].getTupleId();
       const Uint16 parentId = (m_parent!=NULL) 
-                                ? correlation.getParentTupleId()
+                                ? readResult.m_correlations[tupleNo].getParentTupleId()
                                 : tupleNotFound;
 
       m_tupleSet[tupleNo].m_skip     = false;
@@ -1490,6 +1508,14 @@ int NdbQueryOperation::setParallelism(Ui
   return m_impl.setParallelism(parallelism);
 }
 
+int NdbQueryOperation::setMaxParallelism(){
+  return m_impl.setMaxParallelism();
+}
+
+int NdbQueryOperation::setAdaptiveParallelism(){
+  return m_impl.setAdaptiveParallelism();
+}
+
 int NdbQueryOperation::setBatchSize(Uint32 batchSize){
   return m_impl.setBatchSize(batchSize);
 }
@@ -2623,16 +2649,17 @@ NdbQueryImpl::prepareSend()
   {
     /* For the first batch, we read from all fragments for both ordered 
      * and unordered scans.*/
-    if (getQueryOperation(0U).m_parallelism > 0)
+    if (getQueryOperation(0U).m_parallelism == Parallelism_max)
     {
       m_rootFragCount
-        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
-              getQueryOperation(0U).m_parallelism);
+        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
     }
     else
     {
+      assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
       m_rootFragCount
-        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
+        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
+              getQueryOperation(0U).m_parallelism);
     }
     Ndb* const ndb = m_transaction.getNdb();
 
@@ -2682,14 +2709,13 @@ NdbQueryImpl::prepareSend()
   for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
   {
     const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+    // Add space for m_correlations, m_buffer & m_batchOverflowCheck
+    totalBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
     totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
+    totalBuffSize += sizeof(Uint32); // Overflow check
   }
-  /**
-   * Add one word per ResultStream for buffer overrun check. We add a word
-   * rather than a byte to avoid possible alignment problems.
-   */
-  m_rowBufferAlloc.init(m_rootFragCount * 
-                       (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
+
+  m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
   if (getQueryDef().isScanQuery())
   {
     Uint32 totalRows = 0;
@@ -3455,8 +3481,6 @@ NdbQueryImpl::OrderedFragSet::getCurrent
 { 
   if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
   {
-    // Results should be ordered.
-    assert(verifySortOrder());
     /** 
      * Must have tuples for each (non-completed) fragment when doing ordered
      * scan.
@@ -3667,7 +3691,8 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_ordering(NdbQueryOptions::ScanOrdering_unordered),
   m_interpretedCode(NULL),
   m_diskInUserProjection(false),
-  m_parallelism(0),
+  m_parallelism(def.getQueryOperationIx() == 0
+                ? Parallelism_max : Parallelism_adaptive),
   m_rowSize(0xffffffff)
 { 
   if (errno == ENOMEM)
@@ -4266,9 +4291,10 @@ NdbQueryOperationImpl
                                       m_ndbRecord,
                                       m_firstRecAttr,
                                       0, // Key size.
-                                      getRoot().m_parallelism > 0 ?
-                                      getRoot().m_parallelism :
-                                      m_queryImpl.getRootFragCount(),
+                                      getRoot().m_parallelism
+                                      == Parallelism_max ?
+                                      m_queryImpl.getRootFragCount() :
+                                      getRoot().m_parallelism,
                                       maxBatchRows,
                                       batchByteSize,
                                       firstBatchRows);
@@ -4454,7 +4480,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U
                                       firstBatchRows);
     assert(batchRows==getMaxBatchRows());
     assert(batchRows==firstBatchRows);
-    requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
+    assert(m_parallelism == Parallelism_max ||
+           m_parallelism == Parallelism_adaptive);
+    if (m_parallelism == Parallelism_max)
+    {
+      requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
+    }
     param->requestInfo = requestInfo; 
     param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
     param->resultData = getIdOfReceiver();
@@ -4786,7 +4817,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
   if (getQueryDef().isScanQuery())
   {
     const CorrelationData correlData(ptr, len);
-    const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
+    const Uint32 receiverId = correlData.getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
      * of the root operation. We can thus find the correct root fragment 
@@ -4958,7 +4989,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu
     return -1;
   }
 
-  if (m_parallelism != 0)
+  if (m_parallelism != Parallelism_max)
   {
     getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
     return -1;
@@ -5053,10 +5084,40 @@ int NdbQueryOperationImpl::setParallelis
     getQuery().setErrorCode(Err_FunctionNotImplemented);
     return -1;
   }
+  else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
+  {
+    getQuery().setErrorCode(Err_ParameterError);
+    return -1;
+  }
   m_parallelism = parallelism;
   return 0;
 }
 
+int NdbQueryOperationImpl::setMaxParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  m_parallelism = Parallelism_max;
+  return 0;
+}
+
+int NdbQueryOperationImpl::setAdaptiveParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  else if (getQueryOperationDef().getQueryOperationIx() == 0)
+  {
+    getQuery().setErrorCode(Err_FunctionNotImplemented);
+    return -1;
+  }
+  m_parallelism = Parallelism_adaptive;
+  return 0;
+}
+
 int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
   if (!getQueryOperationDef().isScanOperation())
   {
@@ -5124,12 +5185,6 @@ Uint32 NdbQueryOperationImpl::getRowSize
   {
     m_rowSize = 
       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
-
-    const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
-    if (withCorrelation)
-    {
-      m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
-    }
   }
   return m_rowSize;
 }

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-08-22 08:35:35 +0000
@@ -335,13 +335,29 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const;
 
   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-17 13:16:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-22 08:35:35 +0000
@@ -661,14 +661,30 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const
   { return m_ordering; }
 
-   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+  /**
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.1 branch (jonas.oreland:4259 to 4260) Jonas Oreland25 Aug