List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:August 25 2011 6:37am
Subject:bzr push into mysql-5.5-cluster branch (jonas.oreland:3437 to 3438)
View as plain text  
 3438 Jonas Oreland	2011-08-25 [merge]
      ndb - merge 71 to 55

    modified:
      sql/ha_ndbcluster.cc
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.hpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
 3437 Jonas Oreland	2011-08-17
      ndb - The last crusade of the trailing share - remove dependency on LOCK_open in ndbcluster_create_binlog_setup, allow it to be used from anywhere. Use share->mutex to serialize the actual binlog setup. Patch for 5.5, plan to backport after test results

    modified:
      sql/ha_ndbcluster.cc
      sql/ha_ndbcluster_binlog.cc
      sql/ha_ndbcluster_binlog.h
=== modified file 'sql/ha_ndbcluster.cc'
--- a/sql/ha_ndbcluster.cc	2011-08-17 12:22:49 +0000
+++ b/sql/ha_ndbcluster.cc	2011-08-25 06:35:39 +0000
@@ -160,6 +160,11 @@ static MYSQL_THDVAR_ULONG(
   0                                  /* block */
 );
 
+#if NDB_VERSION_D < NDB_MAKE_VERSION(7,2,0)
+#define DEFAULT_NDB_INDEX_STAT_ENABLE FALSE
+#else
+#define DEFAULT_NDB_INDEX_STAT_ENABLE TRUE
+#endif
 
 static MYSQL_THDVAR_BOOL(
   index_stat_enable,                 /* name */
@@ -167,7 +172,7 @@ static MYSQL_THDVAR_BOOL(
   "Use ndb index statistics in query optimization.",
   NULL,                              /* check func. */
   NULL,                              /* update func. */
-  FALSE                              /* default */
+  DEFAULT_NDB_INDEX_STAT_ENABLE      /* default */
 );
 
 

=== modified file 'storage/ndb/include/ndbapi/NdbReceiver.hpp'
--- a/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/include/ndbapi/NdbReceiver.hpp	2011-08-25 06:35:39 +0000
@@ -41,7 +41,6 @@ class NdbReceiver
   friend class NdbIndexScanOperation;
   friend class NdbTransaction;
   friend class NdbRootFragment;
-  friend class ReceiverIdIterator;
   friend int compare_ndbrecord(const NdbReceiver *r1,
                       const NdbReceiver *r2,
                       const NdbRecord *key_record,
@@ -83,6 +82,12 @@ public:
   
   void setErrorCode(int);
 
+  /* Prepare for receiving of rows into specified buffer */
+  void prepareReceive(char *buf);
+
+  /* Prepare for reading of rows from specified buffer */
+  void prepareRead(char *buf, Uint32 rows);
+
 private:
   Uint32 theMagicNumber;
   Ndb* m_ndb;
@@ -141,8 +146,9 @@ private:
   /* members used for NdbRecord operation. */
   struct {
     const NdbRecord *m_ndb_record;
-    char *m_row;
-    /* Block of memory used to receive all rows in a batch during scan. */
+    /* Destination to receive next row into. */
+    char *m_row_recv;
+    /* Block of memory used to read all rows in a batch during scan. */
     char *m_row_buffer;
     /*
       Offsets between two rows in m_row_buffer.
@@ -227,7 +233,7 @@ private:
                     const char * & data_ptr) const;
   int getScanAttrData(const char * & data, Uint32 & size, Uint32 & pos) const;
   /** Used by NdbQueryOperationImpl, where random access to rows is needed.*/
-  void setCurrentRow(Uint32 currentRow);
+  void setCurrentRow(char* buffer, Uint32 row);
   /** Used by NdbQueryOperationImpl.*/
   Uint32 getCurrentRow() const { return m_current_row; }
 };
@@ -260,7 +266,7 @@ NdbReceiver::prepareSend(){
   if (m_using_ndb_record)
   {
     if (m_type==NDB_SCANRECEIVER || m_type==NDB_QUERY_OPERATION)
-      m_record.m_row= m_record.m_row_buffer;
+      m_record.m_row_recv= m_record.m_row_buffer;
   }
   theCurrentRecAttr = theFirstRecAttr;
 }
@@ -288,9 +294,13 @@ NdbReceiver::execSCANOPCONF(Uint32 tcPtr
 
 inline
 void
-NdbReceiver::setCurrentRow(Uint32 currentRow)
+NdbReceiver::setCurrentRow(char* buffer, Uint32 row)
 {
-  m_current_row = currentRow;
+  m_record.m_row_buffer = buffer;
+  m_current_row = row;
+#ifdef assert
+  assert(m_current_row < m_result_rows);
+#endif
 }
 
 inline

=== modified file 'storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp	2011-08-25 06:35:39 +0000
@@ -3880,6 +3880,8 @@ done:
     ndbassert(gci == restorableGCI);
     replicaPtr.p->m_restorable_gci = gci;
     Uint32 startGci = replicaPtr.p->maxGciCompleted[maxLcpIndex] + 1;
+    if (startGci > gci)
+      startGci = gci;
     ndbout_c("node: %d tab: %d frag: %d restore lcp: %u(idx: %u) maxGciStarted: %u maxGciCompleted: %u (restorable: %u(%u) newestRestorableGCI: %u)",
              takeOverPtr.p->toStartingNode,
              takeOverPtr.p->toCurrentTabref,
@@ -17543,7 +17545,8 @@ Dbdih::execDUMP_STATE_ORD(Signal* signal
     }
   }
 
-  if(arg == 7019 && signal->getLength() == 2)
+  if(arg == 7019 && signal->getLength() == 2 &&
+     signal->theData[1] < MAX_NDB_NODES)
   {
     char buf2[8+1];
     NodeRecordPtr nodePtr;

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2011-08-25 06:35:39 +0000
@@ -530,21 +530,87 @@ public:
   typedef SLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> ScanFragHandle_list;
   typedef LocalSLFifoListImpl<ScanFragHandle_pool, ScanFragHandle> Local_ScanFragHandle_list;
 
+  /**
+   * This class computes mean and standard deviation incrementally for a series
+   * of samples.
+   */
+  class IncrementalStatistics
+  {
+  public:
+    /**
+     * We cannot have a (non-trivial) constructor, since this class is used in
+     * unions.
+     */
+    void init()
+    {
+      m_mean = m_sumSquare = 0.0;
+      m_noOfSamples = 0;
+    }
+
+    // Add another sample.
+    void update(double sample);
+
+    double getMean() const { return m_mean; }
+
+    double getStdDev() const { 
+      return m_noOfSamples < 2 ? 0.0 : sqrt(m_sumSquare/(m_noOfSamples - 1));
+    }
+
+  private:
+    // Mean of all samples
+    double m_mean;
+    //Sum of square of differences from the current mean.
+    double m_sumSquare;
+    Uint32 m_noOfSamples;
+  }; // IncrementalStatistics
+
   struct ScanIndexData
   {
     Uint16 m_frags_complete;
     Uint16 m_frags_outstanding;
+    /**
+     * The number of fragment for which we have not yet sent SCAN_FRAGREQ but
+     * will eventually do so.
+     */
+    Uint16 m_frags_not_started;
     Uint32 m_rows_received;  // #execTRANSID_AI
     Uint32 m_rows_expecting; // Sum(ScanFragConf)
     Uint32 m_batch_chunks;   // #SCAN_FRAGREQ + #SCAN_NEXTREQ to retrieve batch
     Uint32 m_scanCookie;
     Uint32 m_fragCount;
+    // The number of fragments that we scan in parallel.
+    Uint32 m_parallelism;
+    /**
+     * True if this is the first instantiation of this operation. A child
+     * operation will be instantiated once for each batch of its parent.
+     */
+    bool m_firstExecution;
+    /**
+     * Mean and standard deviation for the optimal parallelism for earlier
+     * executions of this operation.
+     */
+    IncrementalStatistics m_parallelismStat;
+    // Total number of rows for the current execution of this operation.
+    Uint32 m_totalRows;
+    // Total number of bytes for the current execution of this operation.
+    Uint32 m_totalBytes;
+
     ScanFragHandle_list::HeadPOD m_fragments; // ScanFrag states
     union
     {
       PatternStore::HeadPOD m_prunePattern;
       Uint32 m_constPrunePtrI;
     };
+    /**
+     * Max number of rows seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchRows;
+    /**
+     * Max number of bytes seen in a batch. Used for calculating the number of
+     * rows per fragment in the next next batch when using adaptive batch size.
+     */
+    Uint32 m_largestBatchBytes;
     Uint32 m_scanFragReq[ScanFragReq::SignalLength + 2];
   };
 
@@ -1164,6 +1230,13 @@ private:
   void scanIndex_parent_row(Signal*,Ptr<Request>,Ptr<TreeNode>, const RowPtr&);
   void scanIndex_fixupBound(Ptr<ScanFragHandle> fragPtr, Uint32 ptrI, Uint32);
   void scanIndex_send(Signal*,Ptr<Request>,Ptr<TreeNode>);
+  void scanIndex_send(Signal* signal,
+                      Ptr<Request> requestPtr,
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange);
   void scanIndex_batchComplete(Signal* signal);
   Uint32 scanIndex_findFrag(Local_ScanFragHandle_list &, Ptr<ScanFragHandle>&,
                             Uint32 fragId);

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2011-08-25 06:35:39 +0000
@@ -4450,6 +4450,9 @@ Dbspj::parseScanIndex(Build_context& ctx
     data.m_fragments.init();
     data.m_frags_outstanding = 0;
     data.m_frags_complete = 0;
+    data.m_frags_not_started = 0;
+    data.m_parallelismStat.init();
+    data.m_firstExecution = true;
     data.m_batch_chunks = 0;
 
     err = parseDA(ctx, requestPtr, treeNodePtr,
@@ -5002,6 +5005,7 @@ Dbspj::scanIndex_parent_batch_complete(S
       }
     }
   }
+  data.m_frags_not_started = data.m_fragCount - data.m_frags_complete;
 
   if (data.m_frags_complete == data.m_fragCount)
   {
@@ -5015,7 +5019,120 @@ Dbspj::scanIndex_parent_batch_complete(S
   /**
    * When parent's batch is complete, we send our batch
    */
-  scanIndex_send(signal, requestPtr, treeNodePtr);
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
+  ndbassert(org->batch_size_rows >= data.m_fragCount - data.m_frags_complete);
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+  }
+  else if (data.m_firstExecution)
+  {
+    /**
+     * Having a high parallelism would allow us to fetch data from many
+     * fragments in parallel and thus reduce the number of round trips.
+     * On the other hand, we should set parallelism so low that we can fetch
+     * all data from a fragment in one batch if possible.
+     * Since this is the first execution, we do not know how many rows or bytes
+     * this operation is likely to return. Therefore we set parallelism to 1,
+     * since this gives the lowest penalty if our guess is wrong.
+     */
+    jam();
+    data.m_parallelism = 1;
+  }
+  else
+  {
+    jam();
+    /**
+     * Use statistics from earlier runs of this operation to estimate the
+     * initial parallelism. We use the mean minus two times the standard
+     * deviation to have a low risk of setting parallelism to high (as erring
+     * in the other direction is more costly).
+     */
+    Int32 parallelism = 
+      static_cast<Int32>(data.m_parallelismStat.getMean()
+                         - 2 * data.m_parallelismStat.getStdDev());
+
+    if (parallelism < 1)
+    {
+      jam();
+      parallelism = 1;
+    }
+    else if ((data.m_fragCount - data.m_frags_complete) % parallelism != 0)
+    {
+      jam();
+      /**
+       * Set parallelism such that we can expect to have similar
+       * parallelism in each batch. For example if there are 8 remaining
+       * fragments, then we should fecth 2 times 4 fragments rather than
+       * 7+1.
+       */
+      const Int32 roundTrips =
+        1 + (data.m_fragCount - data.m_frags_complete) / parallelism;
+      parallelism = (data.m_fragCount - data.m_frags_complete) / roundTrips;
+    }
+
+    data.m_parallelism = static_cast<Uint32>(parallelism);
+
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_send() starting index scan with parallelism="
+          << data.m_parallelism);
+#endif
+  }
+  ndbrequire(data.m_parallelism > 0);
+
+  const Uint32 bs_rows = org->batch_size_rows/ data.m_parallelism;
+  const Uint32 bs_bytes = org->batch_size_bytes / data.m_parallelism;
+  ndbassert(bs_rows > 0);
+  ndbassert(bs_bytes > 0);
+
+  data.m_largestBatchRows = 0;
+  data.m_largestBatchBytes = 0;
+  data.m_totalRows = 0;
+  data.m_totalBytes = 0;
+
+  {
+    Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
+    Ptr<ScanFragHandle> fragPtr;
+    list.first(fragPtr);
+
+    while(!fragPtr.isNull())
+    {
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE);
+      fragPtr.p->m_state = ScanFragHandle::SFH_NOT_STARTED;
+      list.next(fragPtr);
+    }
+  }
+
+  Uint32 batchRange = 0;
+  scanIndex_send(signal,
+                 requestPtr,
+                 treeNodePtr,
+                 data.m_parallelism,
+                 bs_bytes,
+                 bs_rows,
+                 batchRange);
+
+  data.m_firstExecution = false;
+
+  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
+  {
+    ndbrequire((data.m_frags_outstanding + data.m_frags_complete) ==
+               data.m_fragCount);
+  }
+  else
+  {
+    ndbrequire(static_cast<Uint32>(data.m_frags_outstanding + 
+                                   data.m_frags_complete) <=
+               data.m_fragCount);
+  }
+
+  data.m_batch_chunks = 1;
+  requestPtr.p->m_cnt_active++;
+  requestPtr.p->m_outstanding++;
+  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
 }
 
 void
@@ -5045,77 +5162,91 @@ Dbspj::scanIndex_parent_batch_repeat(Sig
   }
 }
 
+/**
+ * Ask for the first batch for a number of fragments.
+ */
 void
 Dbspj::scanIndex_send(Signal* signal,
                       Ptr<Request> requestPtr,
-                      Ptr<TreeNode> treeNodePtr)
+                      Ptr<TreeNode> treeNodePtr,
+                      Uint32 noOfFrags,
+                      Uint32 bs_bytes,
+                      Uint32 bs_rows,
+                      Uint32& batchRange)
 {
-  jam();
-
-  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-
-  Uint32 cnt = 1;
-  Uint32 bs_rows = org->batch_size_rows;
-  Uint32 bs_bytes = org->batch_size_bytes;
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    jam();
-    cnt = data.m_fragCount - data.m_frags_complete;
-    ndbrequire(cnt > 0);
-
-    bs_rows /= cnt;
-    bs_bytes /= cnt;
-    ndbassert(bs_rows > 0);
-  }
-
   /**
    * if (m_bits & prunemask):
    * - Range keys sliced out to each ScanFragHandle
    * - Else, range keys kept on first (and only) ScanFragHandle
    */
-  Uint32 prunemask = TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE;
+  const bool prune = treeNodePtr.p->m_bits &
+    (TreeNode::T_PRUNE_PATTERN | TreeNode::T_CONST_PRUNE);
 
   /**
-   * Don't release keyInfo if it may be sent multiple times, eiter:
-   *   - Not pruned -> same keyInfo goes to all datanodes.
-   *   - Result handling is REPEAT_SCAN_RESULT and same batch may be 
-   *     repeated multiple times due to incomplete bushy X-scans.
-   *     (by ::scanIndex_parent_batch_repeat())
-   *
-   * When not released, ::scanIndex_parent_batch_cleanup() will 
-   * eventually release them when preparing arrival of a new parent batch.
+   * If scan is repeatable, we must make sure not to release range keys so
+   * that we canuse them again in the next repetition.
    */
-  const bool release = ((treeNodePtr.p->m_bits & prunemask) != 0 &&
-                        (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) == 0);
+  const bool repeatable =
+    (treeNodePtr.p->m_bits & TreeNode::T_SCAN_REPEATABLE) != 0;
 
-  ScanFragReq* req = reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  ndbassert(noOfFrags > 0);
+  ndbassert(data.m_frags_not_started >= noOfFrags);
+  ScanFragReq* const req =
+    reinterpret_cast<ScanFragReq*>(signal->getDataPtrSend());
+  const ScanFragReq * const org
+    = reinterpret_cast<ScanFragReq*>(data.m_scanFragReq);
   memcpy(req, org, sizeof(data.m_scanFragReq));
   // req->variableData[0] // set below
   req->variableData[1] = requestPtr.p->m_rootResultData;
   req->batch_size_bytes = bs_bytes;
   req->batch_size_rows = bs_rows;
 
-  Ptr<ScanFragHandle> fragPtr;
+  Uint32 requestsSent = 0;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
-
-  Uint32 keyInfoPtrI = RNIL;
+  Ptr<ScanFragHandle> fragPtr;
   list.first(fragPtr);
-  if ((treeNodePtr.p->m_bits & prunemask) == 0)
+  Uint32 keyInfoPtrI = fragPtr.p->m_rangePtrI;
+  ndbrequire(prune || keyInfoPtrI != RNIL);
+  /**
+   * Iterate over the list of fragments until we have sent as many
+   * SCAN_FRAGREQs as we should.
+   */
+  while (requestsSent < noOfFrags)
   {
     jam();
-    keyInfoPtrI = fragPtr.p->m_rangePtrI;
-    ndbrequire(keyInfoPtrI != RNIL);
-  }
+    ndbassert(!fragPtr.isNull());
 
-  Uint32 batchRange = 0;
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
-  {
-    jam();
+    if (fragPtr.p->m_state != ScanFragHandle::SFH_NOT_STARTED)
+    {
+      // Skip forward to the frags that we should send.
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
+
+    const Uint32 ref = fragPtr.p->m_ref;
+
+    if (noOfFrags==1 && !prune &&
+        data.m_frags_not_started == data.m_fragCount &&
+        refToNode(ref) != getOwnNodeId() &&
+        list.hasNext(fragPtr))
+    {
+      /**
+       * If we are doing a scan with adaptive parallelism and start with
+       * parallelism=1 then it makes sense to fetch a batch from a fragment on
+       * the local data node. The reason for this is that if that fragment
+       * contains few rows, we may be able to read from several fragments in
+       * parallel. Then we minimize the total number of round trips (to remote
+       * data nodes) if we fetch the first fragment batch locally.
+       */
+      jam();
+      list.next(fragPtr);
+      continue;
+    }
 
     SectionHandle handle(this);
 
-    Uint32 ref = fragPtr.p->m_ref;
     Uint32 attrInfoPtrI = treeNodePtr.p->m_send.m_attrInfoPtrI;
 
     /**
@@ -5124,26 +5255,34 @@ Dbspj::scanIndex_send(Signal* signal,
     req->senderData = fragPtr.i;
     req->fragmentNoKeyLen = fragPtr.p->m_fragId;
 
-    if ((treeNodePtr.p->m_bits & prunemask))
+    if (prune)
     {
       jam();
       keyInfoPtrI = fragPtr.p->m_rangePtrI;
       if (keyInfoPtrI == RNIL)
       {
+        /**
+         * Since we use pruning, we can see that no parent rows would hash
+         * to this fragment.
+         */
         jam();
         fragPtr.p->m_state = ScanFragHandle::SFH_COMPLETE;
+        list.next(fragPtr);
         continue;
       }
-    }
-    if (release)
-    {
-      /**
-       * If we'll use sendSignal() and we need to send the attrInfo several
-       *   times, we need to copy them
-       */
-      Uint32 tmp = RNIL;
-      ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
-      attrInfoPtrI = tmp;
+
+      if (!repeatable)
+      {
+        /**
+         * If we'll use sendSignal() and we need to send the attrInfo several
+         * times, we need to copy them. (For repeatable or unpruned scans
+         * we use sendSignalNoRelease(), so then we do not need to copy.)
+         */
+        jam();
+        Uint32 tmp = RNIL;
+        ndbrequire(dupSection(tmp, attrInfoPtrI)); // TODO handle error
+        attrInfoPtrI = tmp;
+      }
     }
 
     req->variableData[0] = batchRange;
@@ -5152,16 +5291,14 @@ Dbspj::scanIndex_send(Signal* signal,
     handle.m_cnt = 2;
 
 #if defined DEBUG_SCAN_FRAGREQ
-    {
-      ndbout_c("SCAN_FRAGREQ to %x", ref);
-      printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
-                        NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
-                        DBLQH);
-      printf("ATTRINFO: ");
-      print(handle.m_ptr[0], stdout);
-      printf("KEYINFO: ");
-      print(handle.m_ptr[1], stdout);
-    }
+    ndbout_c("SCAN_FRAGREQ to %x", ref);
+    printSCAN_FRAGREQ(stdout, signal->getDataPtrSend(),
+                      NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
+                      DBLQH);
+    printf("ATTRINFO: ");
+    print(handle.m_ptr[0], stdout);
+    printf("KEYINFO: ");
+    print(handle.m_ptr[1], stdout);
 #endif
 
     if (refToNode(ref) == getOwnNodeId())
@@ -5173,8 +5310,13 @@ Dbspj::scanIndex_send(Signal* signal,
       c_Counters.incr_counter(CI_REMOTE_RANGE_SCANS_SENT, 1);
     }
 
-    if (release)
+    if (prune && !repeatable)
     {
+      /**
+       * For a non-repeatable pruned scan, key info is unique for each
+       * fragment and therefore cannot be reused, so we release key info
+       * right away.
+       */
       jam();
       sendSignal(ref, GSN_SCAN_FRAGREQ, signal,
                  NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
@@ -5183,32 +5325,24 @@ Dbspj::scanIndex_send(Signal* signal,
     }
     else
     {
+      /**
+       * Reuse key info for multiple fragments and/or multiple repetitions
+       * of the scan.
+       */
       jam();
       sendSignalNoRelease(ref, GSN_SCAN_FRAGREQ, signal,
                           NDB_ARRAY_SIZE(data.m_scanFragReq), JBB, &handle);
     }
     handle.clear();
 
-    i++;
     fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING; // running
     data.m_frags_outstanding++;
     batchRange += bs_rows;
-  }
+    requestsSent++;
+    list.next(fragPtr);
+  } // while (requestsSent < noOfFrags)
 
-  if (treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL)
-  {
-    ndbrequire(data.m_frags_outstanding == 
-               data.m_fragCount - data.m_frags_complete);
-  }
-  else
-  {
-    ndbrequire(data.m_frags_outstanding == 1);
-  }
-
-  data.m_batch_chunks = 1;
-  requestPtr.p->m_cnt_active++;
-  requestPtr.p->m_outstanding++;
-  treeNodePtr.p->m_state = TreeNode::TN_ACTIVE;
+  data.m_frags_not_started -= requestsSent;
 }
 
 void
@@ -5282,6 +5416,10 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
   }
 
   requestPtr.p->m_rows += rows;
+  data.m_totalRows += rows;
+  data.m_totalBytes += conf->total_len;
+  data.m_largestBatchRows = MAX(data.m_largestBatchRows, rows);
+  data.m_largestBatchBytes = MAX(data.m_largestBatchBytes, conf->total_len);
 
   if (!treeNodePtr.p->isLeaf())
   {
@@ -5302,7 +5440,9 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
     ndbrequire(data.m_frags_complete < data.m_fragCount);
     data.m_frags_complete++;
 
-    if (data.m_frags_complete == data.m_fragCount)
+    if (data.m_frags_complete == data.m_fragCount ||
+        ((requestPtr.p->m_state & Request::RS_ABORTING) != 0 &&
+         data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started)))
     {
       jam();
       ndbrequire(requestPtr.p->m_cnt_active);
@@ -5314,6 +5454,32 @@ Dbspj::scanIndex_execSCAN_FRAGCONF(Signa
 
   if (data.m_frags_outstanding == 0)
   {
+    const ScanFragReq * const org
+      = reinterpret_cast<const ScanFragReq*>(data.m_scanFragReq);
+
+    if (data.m_frags_complete == data.m_fragCount)
+    {
+      jam();
+      /**
+       * Calculate what would have been the optimal parallelism for the
+       * scan instance that we have just completed, and update
+       * 'parallelismStat' with this value. We then use this statistics to set
+       * the initial parallelism for the next instance of this operation.
+       */
+      double parallelism = data.m_fragCount;
+      if (data.m_totalRows > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_rows) / data.m_totalRows);
+      }
+      if (data.m_totalBytes > 0)
+      {
+        parallelism = MIN(parallelism,
+                          double(org->batch_size_bytes) / data.m_totalBytes);
+      }
+      data.m_parallelismStat.update(parallelism);
+    }
+
     /**
      * Don't reportBatchComplete to children if we're aborting...
      */
@@ -5364,7 +5530,7 @@ Dbspj::scanIndex_execSCAN_FRAGREF(Signal
   ndbrequire(data.m_frags_outstanding > 0);
   data.m_frags_outstanding--;
 
-  if (data.m_frags_complete == data.m_fragCount)
+  if (data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5390,21 +5556,78 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   jam();
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
+  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
 
   data.m_rows_received = 0;
   data.m_rows_expecting = 0;
   ndbassert(data.m_frags_outstanding == 0);
 
   ndbrequire(data.m_frags_complete < data.m_fragCount);
-  Uint32 cnt = data.m_fragCount - data.m_frags_complete;
   if ((treeNodePtr.p->m_bits & TreeNode::T_SCAN_PARALLEL) == 0)
   {
     jam();
-    cnt = 1;
+    /**
+     * Since fetching few but large batches is more efficient, we
+     * set parallelism to the lowest value where we can still expect each
+     * batch to be full.
+     */
+    if (data.m_largestBatchRows < org->batch_size_rows/data.m_parallelism &&
+        data.m_largestBatchBytes < org->batch_size_bytes/data.m_parallelism)
+    {
+      jam();
+      data.m_parallelism = data.m_fragCount - data.m_frags_complete;
+      if (data.m_largestBatchRows > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(org->batch_size_rows / data.m_largestBatchRows,
+              data.m_parallelism);
+      }
+      if (data.m_largestBatchBytes > 0)
+      {
+        jam();
+        data.m_parallelism =
+          MIN(data.m_parallelism,
+              org->batch_size_bytes/data.m_largestBatchBytes);
+      }
+      if (data.m_frags_complete == 0 &&
+          data.m_frags_not_started % data.m_parallelism != 0)
+      {
+        jam();
+        /**
+         * Set parallelism such that we can expect to have similar
+         * parallelism in each batch. For example if there are 8 remaining
+         * fragments, then we should fecth 2 times 4 fragments rather than
+         * 7+1.
+         */
+        const Uint32 roundTrips =
+          1 + data.m_frags_not_started / data.m_parallelism;
+        data.m_parallelism = data.m_frags_not_started / roundTrips;
+      }
+    }
+    else
+    {
+      jam();
+      // We get full batches, so we should lower parallelism.
+      data.m_parallelism = MIN(data.m_fragCount - data.m_frags_complete,
+                               MAX(1, data.m_parallelism/2));
+    }
+    ndbassert(data.m_parallelism > 0);
+#ifdef DEBUG_SCAN_FRAGREQ
+    DEBUG("::scanIndex_execSCAN_NEXTREQ() Asking for new batches from " <<
+          data.m_parallelism <<
+          " fragments with " << org->batch_size_rows/data.m_parallelism <<
+          " rows and " << org->batch_size_bytes/data.m_parallelism <<
+          " bytes.");
+#endif
+  }
+  else
+  {
+    jam();
+    data.m_parallelism = data.m_fragCount - data.m_frags_complete;
   }
 
-  const ScanFragReq * org = (const ScanFragReq*)data.m_scanFragReq;
-  const Uint32 bs_rows = org->batch_size_rows/cnt;
+  const Uint32 bs_rows = org->batch_size_rows/data.m_parallelism;
   ndbassert(bs_rows > 0);
   ScanFragNextReq* req =
     reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
@@ -5413,20 +5636,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
   req->transId1 = requestPtr.p->m_transId[0];
   req->transId2 = requestPtr.p->m_transId[1];
   req->batch_size_rows = bs_rows;
-  req->batch_size_bytes = org->batch_size_bytes/cnt;
+  req->batch_size_bytes = org->batch_size_bytes/data.m_parallelism;
 
   Uint32 batchRange = 0;
   Ptr<ScanFragHandle> fragPtr;
+  Uint32 sentFragCount = 0;
+  {
+    /**
+     * First, ask for more data from fragments that are already started.
+     */
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
   list.first(fragPtr);
-  for (Uint32 i = 0; i < cnt && !fragPtr.isNull(); list.next(fragPtr))
+    while (sentFragCount < data.m_parallelism && !fragPtr.isNull())
   {
     jam();
+      ndbassert(fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_COMPLETE ||
+                fragPtr.p->m_state == ScanFragHandle::SFH_NOT_STARTED);
     if (fragPtr.p->m_state == ScanFragHandle::SFH_WAIT_NEXTREQ)
     {
       jam();
 
-      i++;
       data.m_frags_outstanding++;
       req->variableData[0] = batchRange;
       fragPtr.p->m_state = ScanFragHandle::SFH_SCANNING;
@@ -5434,6 +5664,7 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
 
       DEBUG("scanIndex_execSCAN_NEXTREQ to: " << hex
             << treeNodePtr.p->m_send.m_ref
+              << ", m_node_no=" << treeNodePtr.p->m_node_no
             << ", senderData: " << req->senderData);
 
 #ifdef DEBUG_SCAN_FRAGREQ
@@ -5445,9 +5676,27 @@ Dbspj::scanIndex_execSCAN_NEXTREQ(Signal
       sendSignal(fragPtr.p->m_ref, GSN_SCAN_NEXTREQ, signal,
                  ScanFragNextReq::SignalLength + 1,
                  JBB);
+        sentFragCount++;
+      }
+      list.next(fragPtr);
     }
   }
 
+  if (sentFragCount < data.m_parallelism)
+  {
+    /**
+     * Then start new fragments until we reach data.m_parallelism.
+     */
+    jam();
+    ndbassert(data.m_frags_not_started != 0);
+    scanIndex_send(signal,
+                   requestPtr,
+                   treeNodePtr,
+                   data.m_parallelism - sentFragCount,
+                   org->batch_size_bytes/data.m_parallelism,
+                   bs_rows,
+                   batchRange);
+  }
   /**
    * cursor should not have been positioned here...
    *   unless we actually had something more to send.
@@ -5544,14 +5793,28 @@ Dbspj::scanIndex_abort(Signal* signal,
     }
   }
 
-  ndbrequire(cnt_waiting + cnt_scanning > 0);
   if (cnt_scanning == 0)
   {
-    /**
-     * If all were waiting...this should increase m_outstanding
-     */
-    jam();
-    requestPtr.p->m_outstanding++;
+    if (cnt_waiting > 0)
+    {
+      /**
+       * If all were waiting...this should increase m_outstanding
+       */
+      jam();
+      requestPtr.p->m_outstanding++;
+    }
+    else
+    {
+      /**
+       * All fragments are either complete or not yet started, so there is
+       * nothing to abort.
+       */
+      jam();
+      ndbassert(data.m_frags_not_started > 0);
+      ndbrequire(requestPtr.p->m_cnt_active);
+      requestPtr.p->m_cnt_active--;
+      treeNodePtr.p->m_state = TreeNode::TN_INACTIVE;
+    }
   }
 }
 
@@ -5603,6 +5866,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
       jam();
       ndbrequire(data.m_frags_complete < data.m_fragCount);
       data.m_frags_complete++;
+      ndbrequire(data.m_frags_not_started > 0);
+      data.m_frags_not_started--;
       // fall through
     case ScanFragHandle::SFH_COMPLETE:
       jam();
@@ -5637,8 +5902,8 @@ Dbspj::scanIndex_execNODE_FAILREP(Signal
     requestPtr.p->m_outstanding--;
   }
 
-  if (save1 != data.m_fragCount
-      && data.m_frags_complete == data.m_fragCount)
+  if (save1 != 0 &&
+      data.m_fragCount == (data.m_frags_complete + data.m_frags_not_started))
   {
     jam();
     ndbrequire(requestPtr.p->m_cnt_active);
@@ -5654,7 +5919,8 @@ Dbspj::scanIndex_release_rangekeys(Ptr<R
                                    Ptr<TreeNode> treeNodePtr)
 {
   jam();
-  DEBUG("scanIndex_release_rangekeys(), m_node_no: " << treeNodePtr.p->m_node_no);
+  DEBUG("scanIndex_release_rangekeys(), tree node " << treeNodePtr.i
+          << " m_node_no: " << treeNodePtr.p->m_node_no);
 
   ScanIndexData& data = treeNodePtr.p->m_scanindex_data;
   Local_ScanFragHandle_list list(m_scanfraghandle_pool, data.m_fragments);
@@ -7115,3 +7381,15 @@ void Dbspj::execDBINFO_SCANREQ(Signal *s
 
   ndbinfo_send_scan_conf(signal, req, rl);
 } // Dbspj::execDBINFO_SCANREQ(Signal *signal)
+
+void Dbspj::IncrementalStatistics::update(double sample)
+{
+  // Prevent wrap-around
+  if(m_noOfSamples < 0xffffffff)
+  {
+    m_noOfSamples++;
+    const double delta = sample - m_mean;
+    m_mean += delta/m_noOfSamples;
+    m_sumSquare +=  delta * (sample - m_mean);
+  }
+}

=== modified file 'storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp'
--- a/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/kernel/blocks/qmgr/QmgrMain.cpp	2011-08-25 06:35:39 +0000
@@ -2972,26 +2972,6 @@ void Qmgr::checkStartInterface(Signal* s
                                    nodePtr.p->m_failconf_blocks[3],
                                    nodePtr.p->m_failconf_blocks[4]);
               warningEvent("%s", buf);
-
-              for (Uint32 i = 0; i<NDB_ARRAY_SIZE(nodePtr.p->m_failconf_blocks);
-                   i++)
-              {
-                jam();
-                if (nodePtr.p->m_failconf_blocks[i] != 0)
-                {
-                  jam();
-                  signal->theData[0] = 7019;
-                  signal->theData[1] = nodePtr.i;
-                  sendSignal(numberToRef(nodePtr.p->m_failconf_blocks[i],
-                                         getOwnNodeId()),
-                             GSN_DUMP_STATE_ORD, signal, 2, JBB);
-                }
-                else
-                {
-                  jam();
-                  break;
-                }
-              }
             }
           }
 	}

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 10:36:01 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-25 06:35:39 +0000
@@ -57,6 +57,7 @@ static const int Err_FunctionNotImplemen
 static const int Err_UnknownColumn = 4004;
 static const int Err_ReceiveTimedOut = 4008;
 static const int Err_NodeFailCausedAbort = 4028;
+static const int Err_ParameterError = 4118;
 static const int Err_SimpleDirtyReadFailed = 4119;
 static const int Err_WrongFieldLength = 4209;
 static const int Err_ReadTooMuch = 4257;
@@ -72,6 +73,21 @@ static const Uint16 tupleNotFound = 0xff
 /** Set to true to trace incomming signals.*/
 const bool traceSignals = false;
 
+enum
+{
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * scan parallelism should be adaptive.
+   */
+  Parallelism_adaptive = 0xffff0000,
+
+  /**
+   * Set NdbQueryOperationImpl::m_parallelism to this value to indicate that
+   * all fragments should be scanned in parallel.
+   */
+  Parallelism_max = 0xffff0001
+};
+
 /**
  * A class for accessing the correlation data at the end of a tuple (for 
  * scan queries). These data have the following layout:
@@ -181,18 +197,24 @@ public:
    */
   void init(NdbQueryImpl& query, Uint32 fragNo); 
 
+  static void clear(NdbRootFragment* frags, Uint32 noOfFrags);
+
   Uint32 getFragNo() const
   { return m_fragNo; }
 
   /**
-   * Prepare for receiving another batch.
+   * Prepare for receiving another batch of results.
    */
-  void reset();
+  void prepareNextReceiveSet();
 
   /**
    * Prepare for reading another batch of results.
    */
-  void prepareResultSet();
+  void grabNextResultSet();                     // Need mutex lock
+
+  bool hasReceivedMore() const;                 // Need mutex lock
+
+  void setReceivedMore();                       // Need mutex lock
 
   void incrOutstandingResults(Int32 delta)
   {
@@ -261,6 +283,10 @@ public:
   void postFetchRelease();
 
 private:
+  /** No copying.*/
+  NdbRootFragment(const NdbRootFragment&);
+  NdbRootFragment& operator=(const NdbRootFragment&);
+
   STATIC_CONST( voidFragNo = 0xffffffff);
 
   /** Enclosing query.*/
@@ -273,12 +299,19 @@ private:
   NdbResultStream* m_resultStreams;
 
   /**
-   * The number of outstanding TCKEYREF or TRANSID_AI 
-   * messages for the fragment. This includes both messages related to the
+   * Number of available prefetched ResultSets which are completely 
+   * received. Will be made available for reading by calling 
+   * ::grabNextResultSet()
+   */
+  Uint32 m_availResultSets;   // Need mutex
+
+  /**
+   * The number of outstanding TCKEYREF or TRANSID_AI messages to receive
+   * for the fragment. This includes both messages related to the
    * root operation and any descendant operation that was instantiated as
    * a consequence of tuples found by the root operation.
-   * This number may temporarily be negative if e.g. TRANSID_AI arrives before 
-   * SCAN_TABCONF. 
+   * This number may temporarily be negative if e.g. TRANSID_AI arrives 
+   * before SCAN_TABCONF. 
    */
   Int32 m_outstandingResults;
 
@@ -287,7 +320,8 @@ private:
    * operation accesses (i.e. one for a lookup, all for a table scan).
    *
    * Each element is true iff a SCAN_TABCONF (for that fragment) or 
-   * TCKEYCONF message has been received */
+   * TCKEYCONF message has been received
+   */
   bool m_confReceived;
 
   /**
@@ -311,6 +345,55 @@ private:
   int m_idMapNext;
 }; //NdbRootFragment
 
+/**
+ * 'class NdbResultSet' is a helper for 'class NdbResultStream'.
+ *  It manages the buffers which rows are received into and
+ *  read from.
+ */
+class NdbResultSet
+{
+  friend class NdbResultStream;
+
+public:
+  explicit NdbResultSet();
+
+  void init(NdbQueryImpl& query,
+            Uint32 maxRows, Uint32 rowSize);
+
+  void prepareReceive(NdbReceiver& receiver)
+  {
+    m_rowCount = 0;
+    receiver.prepareReceive(m_buffer);
+  }
+
+  void prepareRead(NdbReceiver& receiver)
+  {
+    receiver.prepareRead(m_buffer,m_rowCount);
+  }
+
+  Uint32 getRowCount() const
+  { return m_rowCount; }
+
+private:
+  /** No copying.*/
+  NdbResultSet(const NdbResultSet&);
+  NdbResultSet& operator=(const NdbResultSet&);
+
+  /** The buffers which we receive the results into */
+  char* m_buffer;
+
+  /** Used for checking if buffer overrun occurred. */
+  Uint32* m_batchOverflowCheck;
+
+  /** Array of TupleCorrelations for all rows in m_buffer */
+  TupleCorrelation* m_correlations;
+
+  Uint32 m_rowSize;
+
+  /** The current #rows in 'm_buffer'.*/
+  Uint32 m_rowCount;
+
+}; // class NdbResultSet
 
 /** 
  * This class manages the subset of result data for one operation that is 
@@ -340,7 +423,7 @@ public:
   void prepare();
 
   /** Prepare for receiving next batch of scan results. */
-  void reset();
+  void prepareNextReceiveSet();
     
   NdbReceiver& getReceiver()
   { return m_receiver; }
@@ -348,11 +431,8 @@ public:
   const NdbReceiver& getReceiver() const
   { return m_receiver; }
 
-  Uint32 getRowCount() const
-  { return m_rowCount; }
-
-  char* getRow(Uint32 tupleNo) const
-  { return (m_buffer + (tupleNo*m_rowSize)); }
+  const char* getCurrentRow()
+  { return m_receiver.get_row(); }
 
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
@@ -372,9 +452,9 @@ public:
   bool prepareResultSet(Uint32 remainingScans);
 
   /**
-   * Navigate within the current result batch to resp. first and next row.
+   * Navigate within the current ResultSet to resp. first and next row.
    * For non-parent operations in the pushed query, navigation is with respect
-   * to any preceding parents which results in this NdbResultStream depends on.
+   * to any preceding parents which results in this ResultSet depends on.
    * Returns either the tupleNo within TupleSet[] which we navigated to, or 
    * tupleNotFound().
    */
@@ -490,25 +570,22 @@ private:
   /** The receiver object that unpacks transid_AI messages.*/
   NdbReceiver m_receiver;
 
-  /** The buffers which we receive the results into */
-  char* m_buffer;
-
-  /** Used for checking if buffer overrun occurred. */
-  Uint32* m_batchOverflowCheck;
-
-  Uint32 m_rowSize;
-
-  /** The number of transid_AI messages received.*/
-  Uint32 m_rowCount;
+  /**
+   * ResultSets are received into and read from this stream,
+   * intended to be extended into double buffered ResultSet later.
+   */
+  NdbResultSet m_resultSets[1];
+  Uint32 m_read;  // We read from m_resultSets[m_read]
+  Uint32 m_recv;  // We receive into m_resultSets[m_recv]
 
   /** This is the state of the iterator used by firstResult(), nextResult().*/
   enum
   {
     /** The first row has not been fetched yet.*/
     Iter_notStarted,
-    /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
+    /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/
     Iter_started,  
-    /** Last row for current batch has been returned.*/
+    /** Last row for current ResultSet has been returned.*/
     Iter_finished
   } m_iterState;
 
@@ -586,6 +663,41 @@ void* NdbBulkAllocator::allocObjMem(Uint
   return m_nextObjNo > m_maxObjs ? NULL : result;
 }
 
+///////////////////////////////////////////
+/////////  NdbResultSet methods ///////////
+///////////////////////////////////////////
+NdbResultSet::NdbResultSet() :
+  m_buffer(NULL),
+  m_batchOverflowCheck(NULL),
+  m_correlations(NULL),
+  m_rowSize(0),
+  m_rowCount(0)
+{}
+
+void
+NdbResultSet::init(NdbQueryImpl& query,
+                   Uint32 maxRows,
+                   Uint32 rowSize)
+{
+  m_rowSize = rowSize;
+  {
+    const int bufferSize = rowSize * maxRows;
+    NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+    m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
+
+    // So that we can test for buffer overrun.
+    m_batchOverflowCheck = 
+      reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+    *m_batchOverflowCheck = 0xacbd1234;
+
+    if (query.getQueryDef().isScanQuery())
+    {
+      m_correlations = reinterpret_cast<TupleCorrelation*>
+                         (bufferAlloc.allocObjMem(maxRows*sizeof(TupleCorrelation)));
+    }
+  }
+}
+
 //////////////////////////////////////////////
 /////////  NdbResultStream methods ///////////
 //////////////////////////////////////////////
@@ -607,10 +719,7 @@ NdbResultStream::NdbResultStream(NdbQuer
      | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
        ? Is_Inner_Join : 0))),
   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
-  m_buffer(NULL),
-  m_batchOverflowCheck(NULL),
-  m_rowSize(0),
-  m_rowCount(0),
+  m_resultSets(), m_read(0xffffffff), m_recv(0),
   m_iterState(Iter_notStarted),
   m_currentRow(tupleNotFound),
   m_maxRows(0),
@@ -631,12 +740,8 @@ NdbResultStream::prepare()
   const Uint32 rowSize = m_operation.getRowSize();
   NdbQueryImpl &query = m_operation.getQuery();
 
-  m_rowSize = rowSize;
-
   /* Parent / child correlation is only relevant for scan type queries
-   * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
-   * Neither is these structures required for operations not having respective
-   * child or parent operations.
+   * Don't create a m_tupleSet with these correlation id's for lookups!
    */
   if (isScanQuery())
   {
@@ -648,14 +753,7 @@ NdbResultStream::prepare()
   else
     m_maxRows = 1;
 
-  const int bufferSize = rowSize * m_maxRows;
-  NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
-  m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
-
-  // So that we can test for buffer overrun.
-  m_batchOverflowCheck = 
-    reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
-  *m_batchOverflowCheck = 0xacbd1234;
+  m_resultSets[0].init(query, m_maxRows, rowSize);
 
   m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
   m_receiver.do_setup_ndbrecord(
@@ -664,32 +762,9 @@ NdbResultStream::prepare()
                           0 /*key_size*/, 
                           0 /*read_range_no*/, 
                           rowSize,
-                          m_buffer);
+                          m_resultSets[m_recv].m_buffer);
 } //NdbResultStream::prepare
 
-void
-NdbResultStream::reset()
-{
-  assert (isScanQuery());
-
-  // Root scan-operation need a ScanTabConf to complete
-  m_rowCount = 0;
-  m_iterState = Iter_notStarted;
-  m_currentRow = tupleNotFound;
-
-  m_receiver.prepareSend();
-  /**
-   * If this stream will get new rows in the next batch, then so will
-   * all of its descendants.
-   */
-  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
-       childNo++)
-  {
-    NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    m_rootFrag.getResultStream(child).reset();
-  }
-} //NdbResultStream::reset
-
 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
  *  parentId == tupleNotFound is use as a special value for iterating results
  *  from the root operation in the order which they was inserted by 
@@ -703,11 +778,11 @@ NdbResultStream::findTupleWithParentId(U
 {
   assert ((parentId==tupleNotFound) == (m_parent==NULL));
 
-  if (likely(m_rowCount>0))
+  if (likely(m_resultSets[m_read].m_rowCount>0))
   {
     if (m_tupleSet==NULL)
     {
-      assert (m_rowCount <= 1);
+      assert (m_resultSets[m_read].m_rowCount <= 1);
       return 0;
     }
 
@@ -774,7 +849,7 @@ NdbResultStream::firstResult()
   if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
   {
     m_iterState = Iter_started;
-    m_receiver.setCurrentRow(m_currentRow);
+    m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
     return m_currentRow;
   }
 
@@ -782,7 +857,6 @@ NdbResultStream::firstResult()
   return tupleNotFound;
 } //NdbResultStream::firstResult()
 
-
 Uint16
 NdbResultStream::nextResult()
 {
@@ -791,14 +865,13 @@ NdbResultStream::nextResult()
       (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
   {
     m_iterState = Iter_started;
-    m_receiver.setCurrentRow(m_currentRow);
+    m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
     return m_currentRow;
   }
   m_iterState = Iter_finished;
   return tupleNotFound;
 } //NdbResultStream::nextResult()
 
-
 /**
  * Callback when a TRANSID_AI signal (receive row) is processed.
  */
@@ -806,21 +879,46 @@ void
 NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
                                 TupleCorrelation correlation)
 {
-  m_receiver.execTRANSID_AI(ptr, len);
-  m_rowCount++;
-
+  NdbResultSet& receiveSet = m_resultSets[m_recv];
   if (isScanQuery())
   {
     /**
-     * Store TupleCorrelation as hidden value imm. after received row
-     * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
+     * Store TupleCorrelation.
      */
-    Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
-    row_recv[-1] = correlation.toUint32();
+    receiveSet.m_correlations[receiveSet.m_rowCount] = correlation;
   }
+
+  m_receiver.execTRANSID_AI(ptr, len);
+  receiveSet.m_rowCount++;
 } // NdbResultStream::execTRANSID_AI()
 
 /**
+ * Make preparation for another batch of results to be received.
+ * This NdbResultStream, and all its sibling will receive a batch
+ * of results from the datanodes.
+ */
+void
+NdbResultStream::prepareNextReceiveSet()
+{
+  assert (isScanQuery());
+
+  m_iterState = Iter_notStarted;
+  m_currentRow = tupleNotFound;
+  m_resultSets[m_recv].prepareReceive(m_receiver);
+
+  /**
+   * If this stream will get new rows in the next batch, then so will
+   * all of its descendants.
+   */
+  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
+       childNo++)
+  {
+    NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
+    m_rootFrag.getResultStream(child).prepareNextReceiveSet();
+  }
+} //NdbResultStream::prepareNextReceiveSet
+
+/**
  * Make preparations for another batch of result to be read:
  *  - Fill in parent/child result correlations in m_tupleSet[]
  *  - ... or reset m_tupleSet[] if we reuse the previous.
@@ -833,12 +931,16 @@ NdbResultStream::prepareResultSet(Uint32
   bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
   assert(isComplete || isScanResult());                //Lookups always 'complete'
 
-  // Set correct #rows received in the NdbReceiver.
-  getReceiver().m_result_rows = getRowCount();
+   m_read = m_recv;
+   NdbResultSet& readResult = m_resultSets[m_read];
+
+  // Set correct buffer and #rows received by this ResultSet.
+  readResult.prepareRead(m_receiver);
 
   /**
-   * Prepare NdbResultStream for reading - either the next received
-   * from datanodes or reuse current.
+   * Prepare NdbResultSet for reading - either the next received
+   * from datanodes or reuse the last as has been determined by 
+   * ::prepareNextReceiveSet()
    */
   if (m_tupleSet!=NULL)
   {
@@ -850,7 +952,7 @@ NdbResultStream::prepareResultSet(Uint32
     else
     {
       // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
-      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
       {
         m_tupleSet[tupleNo].m_skip = false;
       }
@@ -877,7 +979,7 @@ NdbResultStream::prepareResultSet(Uint32
 
     if (m_tupleSet!=NULL)
     {
-      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
       {
         if (!m_tupleSet[tupleNo].m_skip)
         {
@@ -913,21 +1015,24 @@ NdbResultStream::prepareResultSet(Uint32
 
 /**
  * Fill m_tupleSet[] with correlation data between parent 
- * and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row received
+ * and child tuples. The 'TupleCorrelation' is stored
+ * in an array of TupleCorrelations in each ResultSet
  * by execTRANSID_AI().
  *
  * NOTE: In order to reduce work done when holding the 
  * transporter mutex, the 'TupleCorrelation' is only stored
  * in the buffer when it arrives. Later (here) we build the
  * correlation hashMap immediately before we prepare to 
- * read the NdbResultStream.
+ * read the NdbResultSet.
  */
 void 
 NdbResultStream::buildResultCorrelations()
 {
+  const NdbResultSet& readResult = m_resultSets[m_read];
+
   // Buffer overrun check.
-  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+  assert(readResult.m_batchOverflowCheck==NULL || 
+         *readResult.m_batchOverflowCheck==0xacbd1234);
 
 //if (m_tupleSet!=NULL)
   {
@@ -937,15 +1042,12 @@ NdbResultStream::buildResultCorrelations
       m_tupleSet[i].m_hash_head = tupleNotFound;
     }
 
-    /* Rebuild correlation & hashmap from received buffers */
-    for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+    /* Rebuild correlation & hashmap from 'readResult' */
+    for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
     {
-      const Uint32* row = (Uint32*)getRow(tupleNo+1);
-      const TupleCorrelation correlation(row[-1]);
-
-      const Uint16 tupleId  = correlation.getTupleId();
+      const Uint16 tupleId  = readResult.m_correlations[tupleNo].getTupleId();
       const Uint16 parentId = (m_parent!=NULL) 
-                                ? correlation.getParentTupleId()
+                                ? readResult.m_correlations[tupleNo].getParentTupleId()
                                 : tupleNotFound;
 
       m_tupleSet[tupleNo].m_skip     = false;
@@ -1034,6 +1136,7 @@ NdbRootFragment::NdbRootFragment():
   m_query(NULL),
   m_fragNo(voidFragNo),
   m_resultStreams(NULL),
+  m_availResultSets(0),
   m_outstandingResults(0),
   m_confReceived(false),
   m_remainingScans(0),
@@ -1094,7 +1197,46 @@ NdbRootFragment::getResultStream(Uint32
   return m_resultStreams[operationNo];
 }
 
-void NdbRootFragment::reset()
+/**
+ * Throw any pending ResultSets from specified rootFrags[]
+ */
+//static
+void NdbRootFragment::clear(NdbRootFragment* rootFrags, Uint32 noOfFrags)
+{
+  if (rootFrags != NULL)
+  {
+    for (Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
+    {
+      rootFrags[fragNo].m_availResultSets = 0;
+    }
+  }
+}
+
+/**
+ * Signal that another complete ResultSet is available for 
+ * this NdbRootFragment.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::setReceivedMore()
+{
+  assert(m_availResultSets==0);
+  m_availResultSets++;
+}
+
+/**
+ * Check if another ResultSets has been received and is available
+ * for reading. It will be given to the application thread when it
+ * call ::grabNextResultSet().
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+bool NdbRootFragment::hasReceivedMore() const
+{
+  return (m_availResultSets > 0);
+}
+
+void NdbRootFragment::prepareNextReceiveSet()
 {
   assert(m_fragNo!=voidFragNo);
   assert(m_outstandingResults == 0);
@@ -1102,20 +1244,30 @@ void NdbRootFragment::reset()
 
   for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++) 
   {
-    if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+    NdbResultStream& resultStream = getResultStream(opNo);
+    if (!resultStream.isSubScanComplete(m_remainingScans))
     {
       /**
-       * Reset m_resultStreams[] and all its descendants, since all these
+       * Reset resultStream and all its descendants, since all these
        * streams will get a new set of rows in the next batch.
        */ 
-      m_resultStreams[opNo].reset();
+      resultStream.prepareNextReceiveSet();
     }
   }
   m_confReceived = false;
 }
 
-void NdbRootFragment::prepareResultSet()
+/**
+ * Let the application thread takes ownership of an available
+ * ResultSet, prepare it for reading first row.
+ * Need mutex lock as 'm_availResultSets' is accesed both from
+ * receiver and application thread.
+ */
+void NdbRootFragment::grabNextResultSet()
 {
+  assert(m_availResultSets>0);
+  m_availResultSets--;
+
   NdbResultStream& rootStream = getResultStream(0);
   rootStream.prepareResultSet(m_remainingScans);  
 
@@ -1137,7 +1289,7 @@ void NdbRootFragment::setConfReceived(Ui
 
 bool NdbRootFragment::finalBatchReceived() const
 {
-  return getReceiverTcPtrI()==RNIL;
+  return m_confReceived && getReceiverTcPtrI()==RNIL;
 }
 
 bool NdbRootFragment::isEmpty() const
@@ -1356,6 +1508,14 @@ int NdbQueryOperation::setParallelism(Ui
   return m_impl.setParallelism(parallelism);
 }
 
+int NdbQueryOperation::setMaxParallelism(){
+  return m_impl.setMaxParallelism();
+}
+
+int NdbQueryOperation::setAdaptiveParallelism(){
+  return m_impl.setAdaptiveParallelism();
+}
+
 int NdbQueryOperation::setBatchSize(Uint32 batchSize){
   return m_impl.setBatchSize(batchSize);
 }
@@ -1587,6 +1747,7 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
   m_next(NULL),
   m_queryDef(&queryDef),
   m_error(),
+  m_errorReceived(0),
   m_transaction(trans),
   m_scanTransaction(NULL),
   m_operations(0),
@@ -1596,7 +1757,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
   m_rootFragCount(0),
   m_rootFrags(NULL),
   m_applFrags(),
-  m_fullFrags(),
   m_finalBatchFrags(0),
   m_num_bounds(0),
   m_shortestBound(0xffffffff),
@@ -2060,10 +2220,9 @@ NdbQueryImpl::nextResult(bool fetchAllow
 NdbQuery::NextResultOutcome
 NdbQueryImpl::nextRootResult(bool fetchAllowed, bool forceSend)
 {
-  /* To minimize lock contention, each query has two separate root fragment 
-   * conatiners (m_fullFrags and m_applFrags). m_applFrags is only
-   * accessed by the application thread, so it is safe to use it without 
-   * locks.
+  /* To minimize lock contention, each query has the separate root fragment 
+   * conatiner 'm_applFrags'. m_applFrags is only accessed by the application
+   * thread, so it is safe to use it without locks.
    */
   while (m_state != EndOfData)  // Or likely:  return when 'gotRow' or error
   {
@@ -2179,23 +2338,17 @@ NdbQueryImpl::awaitMoreResults(bool forc
        */
       while (likely(!hasReceivedError()))
       {
-        /* m_fullFrags contains any fragments that are complete (for this batch)
-         * but have not yet been moved (under mutex protection) to 
-         * m_applFrags.
+        /* Scan m_rootFrags (under mutex protection) for fragments
+         * which has received a complete batch. Add these to m_applFrags.
          */
-        NdbRootFragment* frag;
-        while ((frag=m_fullFrags.pop()) != NULL)
-        {
-          frag->prepareResultSet();
-          m_applFrags.add(*frag);
-        }
+        m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
         if (m_applFrags.getCurrent() != NULL)
         {
           return FetchResult_ok;
         }
 
-        /* There are noe more available frament results available without
-         * first waiting for more to be received from datanodes
+        /* There are no more available fragment results available without
+         * first waiting for more to be received from the datanodes
          */
         if (m_pendingFrags == 0)
         {
@@ -2236,16 +2389,9 @@ NdbQueryImpl::awaitMoreResults(bool forc
     /* The root operation is a lookup. Lookups are guaranteed to be complete
      * before NdbTransaction::execute() returns. Therefore we do not set
      * the lock, because we know that the signal receiver thread will not
-     * be accessing m_fullFrags at this time.
+     * be accessing m_rootFrags at this time.
      */
-    NdbRootFragment* frag;
-    if ((frag=m_fullFrags.pop()) != NULL)
-    {
-      frag->prepareResultSet();
-      m_applFrags.add(*frag);
-    }
-    assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
-
+    m_applFrags.prepareMoreResults(m_rootFrags,m_rootFragCount);
     if (m_applFrags.getCurrent() != NULL)
     {
       return FetchResult_ok;
@@ -2266,8 +2412,8 @@ NdbQueryImpl::awaitMoreResults(bool forc
 
 /*
   ::handleBatchComplete() is intended to be called when receiving signals only.
-  The PollGuard mutex is then set and the shared 'm_pendingFrags', 
-  'm_finalBatchFrags' and 'm_fullFrags' can safely be updated.
+  The PollGuard mutex is then set and the shared 'm_pendingFrags' and
+  'm_finalBatchFrags' can safely be updated and ::setReceivedMore() signaled.
 
   returns: 'true' when application thread should be resumed.
 */
@@ -2286,7 +2432,7 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
    * terminated the scan.  We are about to close this query, 
    * and didn't expect any more data - ignore it!
    */
-  if (likely(m_fullFrags.m_errorCode == 0))
+  if (likely(m_errorReceived == 0))
   {
     assert(rootFrag.isFragBatchComplete());
 
@@ -2300,10 +2446,10 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
       assert(m_finalBatchFrags <= m_rootFragCount);
     }
 
-    /* When application thread ::awaitMoreResults() it will later be moved
-     * from m_fullFrags to m_applFrags under mutex protection.
+    /* When application thread ::awaitMoreResults() it will later be
+     * added to m_applFrags under mutex protection.
      */
-    m_fullFrags.push(rootFrag);
+    rootFrag.setReceivedMore();
     return true;
   }
 
@@ -2327,7 +2473,7 @@ NdbQueryImpl::close(bool forceSend)
     }
 
     // Throw any pending results
-    m_fullFrags.clear();
+    NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
     m_applFrags.clear();
 
     Ndb* const ndb = m_transaction.getNdb();
@@ -2423,7 +2569,7 @@ NdbQueryImpl::setFetchTerminated(int err
   }
   if (errorCode!=0)
   {
-    m_fullFrags.m_errorCode = errorCode;
+    m_errorReceived = errorCode;
   }
   m_pendingFrags = 0;
 } // NdbQueryImpl::setFetchTerminated()
@@ -2436,9 +2582,9 @@ NdbQueryImpl::setFetchTerminated(int err
 bool
 NdbQueryImpl::hasReceivedError()
 {
-  if (unlikely(m_fullFrags.m_errorCode))
+  if (unlikely(m_errorReceived))
   {
-    setErrorCode(m_fullFrags.m_errorCode);
+    setErrorCode(m_errorReceived);
     return true;
   }
   return false;
@@ -2503,16 +2649,17 @@ NdbQueryImpl::prepareSend()
   {
     /* For the first batch, we read from all fragments for both ordered 
      * and unordered scans.*/
-    if (getQueryOperation(0U).m_parallelism > 0)
+    if (getQueryOperation(0U).m_parallelism == Parallelism_max)
     {
       m_rootFragCount
-        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
-              getQueryOperation(0U).m_parallelism);
+        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
     }
     else
     {
+      assert(getQueryOperation(0U).m_parallelism != Parallelism_adaptive);
       m_rootFragCount
-        = getRoot().getQueryOperationDef().getTable().getFragmentCount();
+        = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
+              getQueryOperation(0U).m_parallelism);
     }
     Ndb* const ndb = m_transaction.getNdb();
 
@@ -2543,8 +2690,7 @@ NdbQueryImpl::prepareSend()
   }
   // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
   error = m_pointerAlloc.init(m_rootFragCount * 
-                              (SharedFragStack::pointersPerFragment +
-                               OrderedFragSet::pointersPerFragment));
+                              (OrderedFragSet::pointersPerFragment));
   if (error != 0)
   {
     setErrorCode(error);
@@ -2563,14 +2709,13 @@ NdbQueryImpl::prepareSend()
   for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
   {
     const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+    // Add space for m_correlations, m_buffer & m_batchOverflowCheck
+    totalBuffSize += (sizeof(TupleCorrelation) * op.getMaxBatchRows());
     totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
+    totalBuffSize += sizeof(Uint32); // Overflow check
   }
-  /**
-   * Add one word per ResultStream for buffer overrun check. We add a word
-   * rather than a byte to avoid possible alignment problems.
-   */
-  m_rowBufferAlloc.init(m_rootFragCount * 
-                       (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
+
+  m_rowBufferAlloc.init(m_rootFragCount * totalBuffSize);
   if (getQueryDef().isScanQuery())
   {
     Uint32 totalRows = 0;
@@ -2631,21 +2776,17 @@ NdbQueryImpl::prepareSend()
     keyRec = getRoot().getQueryOperationDef().getIndex()->getDefaultRecord();
     assert(keyRec!=NULL);
   }
-  if (unlikely((error = m_applFrags.prepare(m_pointerAlloc,
-                                            getRoot().getOrdering(),
-                                            m_rootFragCount, 
-                                            keyRec,
-                                            getRoot().m_ndbRecord)) != 0)
-      || (error = m_fullFrags.prepare(m_pointerAlloc,
-                                      m_rootFragCount)) != 0) {
-    setErrorCode(error);
-    return -1;
-  }
+  m_applFrags.prepare(m_pointerAlloc,
+                      getRoot().getOrdering(),
+                      m_rootFragCount, 
+                      keyRec,
+                      getRoot().m_ndbRecord);
 
   if (getQueryDef().isScanQuery())
   {
     NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
   }
+
 #ifdef TRACE_SERIALIZATION
   ndbout << "Serialized ATTRINFO : ";
   for(Uint32 i = 0; i < m_attrInfo.getSize(); i++){
@@ -3088,7 +3229,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
     NdbRootFragment* rootFrag = rootFrags[i];
     assert(rootFrag->isFragBatchComplete());
     assert(!rootFrag->finalBatchReceived());
-    rootFrag->reset();
+    rootFrag->prepareNextReceiveSet();
   }
 
   Ndb& ndb = *getNdbTransaction().getNdb();
@@ -3183,8 +3324,8 @@ NdbQueryImpl::closeTcCursor(bool forceSe
   } // while
 
   assert(m_pendingFrags==0);
-  m_fullFrags.clear();                         // Throw any unhandled results
-  m_fullFrags.m_errorCode = 0;                 // Clear errors caused by previous fetching
+  NdbRootFragment::clear(m_rootFrags,m_rootFragCount);
+  m_errorReceived = 0;                         // Clear errors caused by previous fetching
   m_error.code = 0;
 
   if (m_finalBatchFrags < getRootFragCount())  // TC has an open scan cursor.
@@ -3271,63 +3412,35 @@ int NdbQueryImpl::isPrunable(bool& pruna
 
 
 /****************
- * NdbQueryImpl::SharedFragStack methods.
- ***************/
-
-NdbQueryImpl::SharedFragStack::SharedFragStack():
-  m_errorCode(0),
-  m_capacity(0),
-  m_current(-1),
-  m_array(NULL)
-{}
-
-int
-NdbQueryImpl::SharedFragStack::prepare(NdbBulkAllocator& allocator,
-                                       int capacity)
-{
-  assert(m_array==NULL);
-  assert(m_capacity==0);
-  if (capacity > 0) 
-  { m_capacity = capacity;
-    m_array = 
-      reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity)); 
-  }
-  return 0;
-}
-
-void
-NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag)
-{
-  m_current++;
-  assert(m_current<m_capacity);
-  m_array[m_current] = &frag; 
-}
-
-/****************
  * NdbQueryImpl::OrderedFragSet methods.
  ***************/
 
 NdbQueryImpl::OrderedFragSet::OrderedFragSet():
   m_capacity(0),
   m_activeFragCount(0),
-  m_emptiedFragCount(0),
+  m_fetchMoreFragCount(0),
   m_finalFragCount(0),
   m_ordering(NdbQueryOptions::ScanOrdering_void),
   m_keyRecord(NULL),
   m_resultRecord(NULL),
   m_activeFrags(NULL),
-  m_emptiedFrags(NULL)
+  m_fetchMoreFrags(NULL)
 {
 }
 
 NdbQueryImpl::OrderedFragSet::~OrderedFragSet() 
 { 
   m_activeFrags = NULL;
-  m_emptiedFrags= NULL;
+  m_fetchMoreFrags = NULL;
 }
 
+void NdbQueryImpl::OrderedFragSet::clear() 
+{ 
+  m_activeFragCount = 0;
+  m_fetchMoreFragCount = 0; 
+}
 
-int
+void
 NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
                                       NdbQueryOptions::ScanOrdering ordering, 
                                       int capacity,                
@@ -3345,14 +3458,14 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
     m_activeFrags =  
       reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity)); 
     bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
-    m_emptiedFrags = 
+
+    m_fetchMoreFrags = 
       reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
-    bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
+    bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
   }
   m_ordering = ordering;
   m_keyRecord = keyRecord;
   m_resultRecord = resultRecord;
-  return 0;
 } // OrderedFragSet::prepare()
 
 
@@ -3368,8 +3481,6 @@ NdbQueryImpl::OrderedFragSet::getCurrent
 { 
   if (m_ordering!=NdbQueryOptions::ScanOrdering_unordered)
   {
-    // Results should be ordered.
-    assert(verifySortOrder());
     /** 
      * Must have tuples for each (non-completed) fragment when doing ordered
      * scan.
@@ -3413,10 +3524,10 @@ NdbQueryImpl::OrderedFragSet::reorganize
     }
     else
     {
-      m_emptiedFrags[m_emptiedFragCount++] = frag;
+      m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
     }
     m_activeFragCount--;
-    assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
+    assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount 
            <= m_capacity);
 
     return;  // Remaining m_activeFrags[] are sorted
@@ -3466,7 +3577,7 @@ NdbQueryImpl::OrderedFragSet::reorganize
     }
     assert(verifySortOrder());
   }
-  assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
+  assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount 
          <= m_capacity);
 } // OrderedFragSet::reorganize()
 
@@ -3479,19 +3590,29 @@ NdbQueryImpl::OrderedFragSet::add(NdbRoo
   reorganize();                                // Move into position
 } // OrderedFragSet::add()
 
-void NdbQueryImpl::OrderedFragSet::clear() 
-{ 
-  m_activeFragCount = 0;
-  m_emptiedFragCount = 0; 
-  m_finalFragCount = 0;
-}
+void 
+NdbQueryImpl::OrderedFragSet::prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt) 
+{
+  for (Uint32 fragNo = 0; fragNo < cnt; fragNo++)
+  {
+    NdbRootFragment& rootFrag = rootFrags[fragNo];
+    if (rootFrag.hasReceivedMore())   // Another ResultSet is available
+    {
+      rootFrag.grabNextResultSet();   // Get new ResultSet.
+      add(rootFrag);                  // Make avail. to appl. thread
+    }
+  } // for all 'rootFrags[]'
+
+  assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount 
+         <= m_capacity);
+} // OrderedFragSet::prepareMoreResults()
 
 Uint32 
 NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
 {
-  const Uint32 cnt = m_emptiedFragCount;
-  frags = m_emptiedFrags;
-  m_emptiedFragCount = 0;
+  const int cnt = m_fetchMoreFragCount;
+  frags = m_fetchMoreFrags;
+  m_fetchMoreFragCount = 0;
   return cnt;
 }
 
@@ -3570,7 +3691,8 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_ordering(NdbQueryOptions::ScanOrdering_unordered),
   m_interpretedCode(NULL),
   m_diskInUserProjection(false),
-  m_parallelism(0),
+  m_parallelism(def.getQueryOperationIx() == 0
+                ? Parallelism_max : Parallelism_adaptive),
   m_rowSize(0xffffffff)
 { 
   if (errno == ENOMEM)
@@ -3912,7 +4034,7 @@ NdbQueryOperationImpl::nextResult(bool f
 void 
 NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
 {
-  const char* buff = resultStream.getReceiver().peek_row();
+  const char* buff = resultStream.getCurrentRow();
   assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
 
   m_isRowNull = false;
@@ -4169,9 +4291,10 @@ NdbQueryOperationImpl
                                       m_ndbRecord,
                                       m_firstRecAttr,
                                       0, // Key size.
-                                      getRoot().m_parallelism > 0 ?
-                                      getRoot().m_parallelism :
-                                      m_queryImpl.getRootFragCount(),
+                                      getRoot().m_parallelism
+                                      == Parallelism_max ?
+                                      m_queryImpl.getRootFragCount() :
+                                      getRoot().m_parallelism,
                                       maxBatchRows,
                                       batchByteSize,
                                       firstBatchRows);
@@ -4357,7 +4480,12 @@ NdbQueryOperationImpl::prepareAttrInfo(U
                                       firstBatchRows);
     assert(batchRows==getMaxBatchRows());
     assert(batchRows==firstBatchRows);
-    requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL; // FIXME: SPJ always assume. SIP_PARALLEL
+    assert(m_parallelism == Parallelism_max ||
+           m_parallelism == Parallelism_adaptive);
+    if (m_parallelism == Parallelism_max)
+    {
+      requestInfo |= QN_ScanIndexParameters::SIP_PARALLEL;
+    }
     param->requestInfo = requestInfo; 
     param->batchSize = ((Uint16)batchByteSize << 16) | (Uint16)firstBatchRows;
     param->resultData = getIdOfReceiver();
@@ -4689,7 +4817,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
   if (getQueryDef().isScanQuery())
   {
     const CorrelationData correlData(ptr, len);
-    const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
+    const Uint32 receiverId = correlData.getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
      * of the root operation. We can thus find the correct root fragment 
@@ -4861,7 +4989,7 @@ NdbQueryOperationImpl::setOrdering(NdbQu
     return -1;
   }
 
-  if (m_parallelism != 0)
+  if (m_parallelism != Parallelism_max)
   {
     getQuery().setErrorCode(QRY_SEQUENTIAL_SCAN_SORTED);
     return -1;
@@ -4956,10 +5084,40 @@ int NdbQueryOperationImpl::setParallelis
     getQuery().setErrorCode(Err_FunctionNotImplemented);
     return -1;
   }
+  else if (parallelism < 1 || parallelism > MAX_NDB_PARTITIONS)
+  {
+    getQuery().setErrorCode(Err_ParameterError);
+    return -1;
+  }
   m_parallelism = parallelism;
   return 0;
 }
 
+int NdbQueryOperationImpl::setMaxParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  m_parallelism = Parallelism_max;
+  return 0;
+}
+
+int NdbQueryOperationImpl::setAdaptiveParallelism(){
+  if (!getQueryOperationDef().isScanOperation())
+  {
+    getQuery().setErrorCode(QRY_WRONG_OPERATION_TYPE);
+    return -1;
+  }
+  else if (getQueryOperationDef().getQueryOperationIx() == 0)
+  {
+    getQuery().setErrorCode(Err_FunctionNotImplemented);
+    return -1;
+  }
+  m_parallelism = Parallelism_adaptive;
+  return 0;
+}
+
 int NdbQueryOperationImpl::setBatchSize(Uint32 batchSize){
   if (!getQueryOperationDef().isScanOperation())
   {
@@ -5027,12 +5185,6 @@ Uint32 NdbQueryOperationImpl::getRowSize
   {
     m_rowSize = 
       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
-
-    const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
-    if (withCorrelation)
-    {
-      m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
-    }
   }
   return m_rowSize;
 }
@@ -5061,7 +5213,7 @@ NdbOut& operator<<(NdbOut& out, const Nd
 }
 
 NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
-  out << " m_rowCount: " << stream.m_rowCount;
+  out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
   return out;
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.hpp	2011-08-22 08:35:35 +0000
@@ -335,13 +335,29 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const;
 
   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-17 08:10:27 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-22 08:35:35 +0000
@@ -271,58 +271,13 @@ private:
     FetchResult_noMoreCache = 2
   };
 
-  /** A stack of NdbRootFragment pointers.
-   *  NdbRootFragments which are 'BatchComplete' are pushed on this stack by 
-   *  the receiver thread, and later pop'ed into the application thread when 
-   *  it need more results to process.
-   *  Due to this shared usage, the PollGuard mutex must be set before 
-   *  accessing SharedFragStack.
+  /**
+   * Container of root fragments that the application is currently
+   * iterating over. 'Owned' by application thread and can be accesed
+   * without requiring a mutex lock.
+   * RootFragments are appended to a OrderedFragSet by ::prepareMoreResults()
+   *
    */
-  class SharedFragStack{
-  public:
-    // For calculating need for dynamically allocated memory.
-    static const Uint32 pointersPerFragment = 1;
-
-    explicit SharedFragStack();
-
-    /** 
-     * Prepare internal datastructures.
-     * param[in] allocator For allocating arrays of pointers.
-     * param[in] capacity Max no of root fragments.
-     * @return 0 if ok, else errorcode
-     */
-    int prepare(NdbBulkAllocator& allocator, int capacity);
-
-    NdbRootFragment* pop() { 
-      return m_current>=0 ? m_array[m_current--] : NULL;
-    }
-    
-    void push(NdbRootFragment& frag);
-
-    int size() const {
-      return (m_current+1);
-    }
-
-    void clear() {
-      m_current = -1;
-    }
-
-    /** Possible error received from TC / datanodes. */
-    int m_errorCode;
-
-  private:
-    /** Capacity of stack.*/
-    int m_capacity;
-
-    /** Index of current top of stack.*/
-    int m_current;
-    NdbRootFragment** m_array;
-
-    // No copying.
-    SharedFragStack(const SharedFragStack&);
-    SharedFragStack& operator=(const SharedFragStack&);
-  }; // class SharedFragStack
-
   class OrderedFragSet{
   public:
     // For calculating need for dynamically allocated memory.
@@ -339,11 +294,20 @@ private:
      * param[in] capacity Max no of root fragments.
      * @return 0 if ok, else errorcode
      */
-    int prepare(NdbBulkAllocator& allocator,
-                NdbQueryOptions::ScanOrdering ordering, 
-                int capacity,  
-                const NdbRecord* keyRecord,
-                const NdbRecord* resultRecord);
+    void prepare(NdbBulkAllocator& allocator,
+                 NdbQueryOptions::ScanOrdering ordering, 
+                 int capacity,  
+                 const NdbRecord* keyRecord,
+                 const NdbRecord* resultRecord);
+
+    /**
+     * Add root fragments with completed ResultSets to this OrderedFragSet.
+     * The PollGuard mutex must locked, and under its protection
+     * completed root fragments are 'consumed' from rootFrags[] and
+     * added to OrderedFragSet where it become available for the
+     * application thread.
+     */
+    void prepareMoreResults(NdbRootFragment rootFrags[], Uint32 cnt);  // Need mutex lock
 
     /** Get the root fragment from which to read the next row.*/
     NdbRootFragment* getCurrent() const;
@@ -355,9 +319,6 @@ private:
      */
     void reorganize();
 
-    /** Add a complete fragment that has been received.*/
-    void add(NdbRootFragment& frag);
-
     /** Reset object to an empty state.*/
     void clear();
 
@@ -372,14 +333,16 @@ private:
 
   private:
 
-    /** Max no of fragments.*/
+    /** No of fragments to read until '::finalBatchReceived()'.*/
     int m_capacity;
     /** Number of fragments in 'm_activeFrags'.*/
     int m_activeFragCount;
-    /** Number of fragments in 'm_emptiedFrags'. */
-    int m_emptiedFragCount;
-    /** Number of fragments where the final batch has been received
-     * and consumed.*/
+    /** Number of fragments in 'm_fetchMoreFrags'. */
+    int m_fetchMoreFragCount;
+    /**
+     * Number of fragments where the final batch has been received
+     * and consumed.
+     */
     int m_finalFragCount;
     /** Ordering of index scan result.*/
     NdbQueryOptions::ScanOrdering m_ordering;
@@ -387,15 +350,21 @@ private:
     const NdbRecord* m_keyRecord;
     /** Needed for comparing records when ordering results.*/
     const NdbRecord* m_resultRecord;
-    /** Fragments where some tuples in the current batch has not yet been 
-     * consumed.*/
+    /**
+     * Fragments where some tuples in the current ResultSet has not 
+     * yet been consumed.
+     */
     NdbRootFragment** m_activeFrags;
-    /** Fragments where all tuples in the current batch have been consumed, 
-     * but where there are more batches to fetch.*/
-    NdbRootFragment** m_emptiedFrags;
-    // No copying.
-    OrderedFragSet(const OrderedFragSet&);
-    OrderedFragSet& operator=(const OrderedFragSet&);
+    /**
+     * Fragments from which we should request more ResultSets.
+     * Either due to the current batch has been consumed, or double buffering
+     * of result sets allows us to request another batch before the current
+     * has been consumed.
+     */
+    NdbRootFragment** m_fetchMoreFrags;
+
+    /** Add a complete fragment that has been received.*/
+    void add(NdbRootFragment& frag);
 
     /** For sorting fragment reads according to index value of first record. 
      * Also f1<f2 if f2 has reached end of data and f1 has not.
@@ -405,6 +374,10 @@ private:
 
     /** For debugging purposes.*/
     bool verifySortOrder() const;
+
+    // No copying.
+    OrderedFragSet(const OrderedFragSet&);
+    OrderedFragSet& operator=(const OrderedFragSet&);
   }; // class OrderedFragSet
 
   /** The interface that is visible to the application developer.*/
@@ -433,6 +406,14 @@ private:
 
   /** Possible error status of this query.*/
   NdbError m_error;
+
+  /**
+   * Possible error received from TC / datanodes.
+   * Only access w/ PollGuard mutex as it is set by receiver thread.
+   * Checked and moved into 'm_error' with ::hasReceivedError().
+   */
+  int m_errorReceived;   // BEWARE: protect with PollGuard mutex
+
   /** Transaction in which this query instance executes.*/
   NdbTransaction& m_transaction;
 
@@ -452,7 +433,7 @@ private:
   Uint32 m_globalCursor;
 
   /** Number of root fragments not yet completed within the current batch.
-   *  Only access w/ PollGuard mutex as it is also updated by receiver threa 
+   *  Only access w/ PollGuard mutex as it is also updated by receiver thread 
    */
   Uint32 m_pendingFrags;  // BEWARE: protect with PollGuard mutex
 
@@ -473,11 +454,6 @@ private:
    */
   OrderedFragSet m_applFrags;
 
-  /** Root frgaments that have received a complete batch. Shared between 
-   *  application thread and receiving thread. Access should be mutex protected.
-   */
-  SharedFragStack m_fullFrags;  // BEWARE: protect with PollGuard mutex
-
   /** Number of root fragments for which confirmation for the final batch 
    * (with tcPtrI=RNIL) has been received. Observe that even if 
    * m_finalBatchFrags==m_rootFragCount, all tuples for the final batches may
@@ -557,7 +533,8 @@ private:
                     bool forceSend);
 
   /** Wait for more scan results which already has been REQuested to arrive.
-   * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+   * @return 0 if some rows did arrive, a negative value if there are errors
+   * (in m_error.code),
    * and 1 of there are no more rows to receive.
    */
   FetchResult awaitMoreResults(bool forceSend);
@@ -684,14 +661,30 @@ public:
   NdbQueryOptions::ScanOrdering getOrdering() const
   { return m_ordering; }
 
-   /** 
-   * Set the number of fragments to be scanned in parallel for the root 
-   * operation of this query. This only applies to table scans and non-sorted
-   * scans of ordered indexes.
+  /**
+   * Set the number of fragments to be scanned in parallel. This only applies
+   * to table scans and non-sorted scans of ordered indexes. This method is
+   * only implemented for then root scan operation.
    * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
    */
   int setParallelism(Uint32 parallelism);
 
+  /**
+   * Set the number of fragments to be scanned in parallel to the maximum
+   * possible value. This is the default for the root scan operation.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setMaxParallelism();
+
+  /**
+   * Let the system dynamically choose the number of fragments to scan in
+   * parallel. The system will try to choose a value that gives optimal
+   * performance. This is the default for all scans but the root scan. This
+   * method only implemented for non-root scan operations.
+   * @return 0 if ok, -1 in case of error (call getNdbError() for details.)
+   */
+  int setAdaptiveParallelism();
+
   /** Set the batch size (max rows per batch) for this operation. This
    * only applies to scan operations, as lookup operations always will
    * have the same batch size as its parent operation, or 1 if it is the

=== modified file 'storage/ndb/src/ndbapi/NdbReceiver.cpp'
--- a/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-07-05 12:46:07 +0000
+++ b/storage/ndb/src/ndbapi/NdbReceiver.cpp	2011-08-25 06:35:39 +0000
@@ -56,7 +56,7 @@ NdbReceiver::init(ReceiverType type, boo
   if (useRec)
   {
     m_record.m_ndb_record= NULL;
-    m_record.m_row= NULL;
+    m_record.m_row_recv= NULL;
     m_record.m_row_buffer= NULL;
     m_record.m_row_offset= 0;
     m_record.m_read_range_no= false;
@@ -118,11 +118,38 @@ NdbReceiver::getValues(const NdbRecord*
   assert(m_using_ndb_record);
 
   m_record.m_ndb_record= rec;
-  m_record.m_row= row_ptr;
+  m_record.m_row_recv= row_ptr;
   m_record.m_row_offset= rec->m_row_size;
 }
 
-#define KEY_ATTR_ID (~(Uint32)0)
+void
+NdbReceiver::prepareReceive(char *buf)
+{
+  /* Set pointers etc. to prepare for receiving the first row of the batch. */
+  assert(theMagicNumber == 0x11223344);
+  m_received_result_length = 0;
+  m_expected_result_length = 0;
+  if (m_using_ndb_record)
+  {
+    m_record.m_row_recv= buf;
+  }
+  theCurrentRecAttr = theFirstRecAttr;
+}
+
+void
+NdbReceiver::prepareRead(char *buf, Uint32 rows)
+{
+  /* Set pointers etc. to prepare for reading the first row of the batch. */
+  assert(theMagicNumber == 0x11223344);
+  m_current_row = 0;
+  m_result_rows = rows;
+  if (m_using_ndb_record)
+  {
+    m_record.m_row_buffer = buf;
+  }
+}
+
+ #define KEY_ATTR_ID (~(Uint32)0)
 
 /*
   Compute the batch size (rows between each NEXT_TABREQ / SCAN_TABCONF) to
@@ -219,7 +246,7 @@ NdbReceiver::do_setup_ndbrecord(const Nd
 {
   m_using_ndb_record= true;
   m_record.m_ndb_record= ndb_record;
-  m_record.m_row= row_buffer;
+  m_record.m_row_recv= row_buffer;
   m_record.m_row_buffer= row_buffer;
   m_record.m_row_offset= rowsize;
   m_record.m_read_range_no= read_range_no;
@@ -524,7 +551,7 @@ NdbReceiver::receive_packed_ndbrecord(Ui
       {
 	if (BitmaskImpl::get(bmlen, aDataPtr, ++i))
 	{
-          setRecToNULL(col, m_record.m_row);
+          setRecToNULL(col, m_record.m_row_recv);
 
           // Next column...
 	  continue;
@@ -668,7 +695,8 @@ NdbReceiver::execTRANSID_AI(const Uint32
         assert(m_record.m_read_range_no);
         assert(attrSize==4);
         assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size+attrSize);
-        memcpy(m_record.m_row+m_record.m_ndb_record->m_row_size, aDataPtr++, 4);
+        memcpy(m_record.m_row_recv+m_record.m_ndb_record->m_row_size, 
+               aDataPtr++, 4);
         aLength--;
         continue; // Next
       }
@@ -682,7 +710,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
         assert (m_record.m_row_offset >= m_record.m_ndb_record->m_row_size);
         Uint32 len= receive_packed_ndbrecord(attrSize >> 2, // Bitmap length
                                              aDataPtr,
-                                             m_record.m_row);
+                                             m_record.m_row_recv);
         aDataPtr+= len;
         aLength-= len;
         continue;  // Next
@@ -709,13 +737,13 @@ NdbReceiver::execTRANSID_AI(const Uint32
         
         /* Save this extra getValue */
         save_pos+= sizeof(Uint32);
-        memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+        memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
                &attrSize, sizeof(Uint32));
         if (attrSize > 0)
         {
           save_pos+= attrSize;
           assert (save_pos<=m_record.m_row_offset);
-          memcpy(m_record.m_row + m_record.m_row_offset - save_pos,
+          memcpy(m_record.m_row_recv + m_record.m_row_offset - save_pos,
                  aDataPtr, attrSize);
         }
 
@@ -803,7 +831,7 @@ NdbReceiver::execTRANSID_AI(const Uint32
 
   if (m_using_ndb_record) {
     /* Move onto next row in scan buffer */
-    m_record.m_row+= m_record.m_row_offset;
+    m_record.m_row_recv+= m_record.m_row_offset;
   }
   return (tmp == exp || (exp > TcKeyConf::DirtyReadBit) ? 1 : 0);
 }

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.5-cluster branch (jonas.oreland:3437 to 3438) Jonas Oreland25 Aug