List:Commits« Previous MessageNext Message »
From:Pekka Nousiainen Date:August 16 2011 6:27pm
Subject:bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4425
to 4426)
View as plain text  
 4426 Pekka Nousiainen	2011-08-16 [merge]
      merge telco-7.0 to wl4124-new2

    modified:
      mysql-test/r/group_by.result
      mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf
      mysql-test/t/group_by.test
      sql/sql_select.cc
      storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp
      storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp
      storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
 4425 Pekka Nousiainen	2011-08-16
      wl#4124 x20_fix.diff
      no dtor is called due to useless pthread_exit(0)

    modified:
      sql/ha_ndb_index_stat.cc
=== modified file 'mysql-test/r/group_by.result'
--- a/mysql-test/r/group_by.result	2010-10-29 08:23:06 +0000
+++ b/mysql-test/r/group_by.result	2011-08-16 10:20:19 +0000
@@ -1856,3 +1856,18 @@ COUNT(*)
 2
 DROP TABLE t1;
 # End of 5.1 tests
+#
+# Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+#
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk 
+GROUP BY v1.pk;
+pk
+DROP VIEW v1;
+DROP TABLE t1,t2;
+# End of Bug#12798270

=== modified file 'mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf'
--- a/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf	2011-07-07 14:48:06 +0000
+++ b/mysql-test/suite/ndb_rpl/t/ndb_rpl_conflict_epoch.cnf	2011-07-11 10:40:00 +0000
@@ -25,14 +25,7 @@ skip-slave-start
 [mysqld.2.slave]
 server-id= 4
 log-bin = sec-master-2
-master-host=		127.0.0.1
-master-port=		@mysqld.2.1.port
-master-password=	@mysqld.2.1.#password
-master-user=		@mysqld.2.1.#user
-master-connect-retry=	1
-init-rpl-role=		slave
 skip-slave-start
-ndb_connectstring=	@mysql_cluster.slave.ndb_connectstring
 
 [ENV]
 

=== modified file 'mysql-test/t/group_by.test'
--- a/mysql-test/t/group_by.test	2010-10-29 08:23:06 +0000
+++ b/mysql-test/t/group_by.test	2011-08-16 10:20:19 +0000
@@ -1248,3 +1248,24 @@ ON 1 WHERE t2.f1 > 1 GROUP BY t2.f1;
 DROP TABLE t1;
 
 --echo # End of 5.1 tests
+
+--echo #
+--echo # Bug#12798270: ASSERTION `!TAB->SORTED' FAILED IN JOIN_READ_KEY2
+--echo #
+
+CREATE TABLE t1 (i int);
+INSERT INTO t1 VALUES (1);
+
+CREATE TABLE t2 (pk int PRIMARY KEY);
+INSERT INTO t2 VALUES (10);
+
+CREATE VIEW v1 AS SELECT t2.pk FROM t2;
+
+SELECT v1.pk
+FROM t1 LEFT JOIN v1 ON t1.i = v1.pk 
+GROUP BY v1.pk;
+
+DROP VIEW v1;
+DROP TABLE t1,t2;
+
+--echo # End of Bug#12798270

=== modified file 'sql/sql_select.cc'
--- a/sql/sql_select.cc	2011-06-30 15:59:25 +0000
+++ b/sql/sql_select.cc	2011-08-16 10:20:19 +0000
@@ -6876,7 +6876,15 @@ make_join_readinfo(JOIN *join, ulonglong
          (join->sort_by_table == (TABLE *) 1 && i != join->const_tables)))
       ordered_set= 1;
 
+#ifdef MCP_BUG12798270
     tab->sorted= sorted;
+#else
+    /*
+      For eq_ref there is at most one join match for each row from
+      previous tables so ordering is not useful.
+    */
+    tab->sorted= (tab->type != JT_EQ_REF) ? sorted : false;
+#endif
     sorted= 0;                                  // only first must be sorted
     table->status=STATUS_NO_RECORD;
     pick_table_access_method (tab);

=== modified file 'storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp'
--- a/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/blocks/dbtux/DbtuxNode.cpp	2011-08-16 08:47:35 +0000
@@ -155,19 +155,26 @@ Dbtux::setNodePref(TuxCtx & ctx, NodeHan
 {
   const Frag& frag = node.m_frag;
   const Index& index = *c_indexPool.getPtr(frag.m_indexId);
-  KeyData prefKey(index.m_keySpec, false, 0);
-  prefKey.set_buf(node.getPref(), index.m_prefBytes);
+  /*
+   * bug#12873640
+   * Node prefix exists if it has non-zero number of attributes.  It is
+   * then a partial instance of KeyData.  If the prefix does not exist
+   * then set_buf() could overwrite m_pageId1 in first entry, causing
+   * random crash in TUP via readKeyAttrs().
+   */
   if (index.m_prefAttrs > 0) {
+    KeyData prefKey(index.m_keySpec, false, 0);
+    prefKey.set_buf(node.getPref(), index.m_prefBytes);
     jam();
     readKeyAttrs(ctx, frag, node.getEnt(0), prefKey, index.m_prefAttrs);
-  }
 #ifdef VM_TRACE
-  if (debugFlags & DebugMaint) {
-    debugOut << "setNodePref: " << node;
-    debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
-    debugOut << endl;
-  }
+    if (debugFlags & DebugMaint) {
+      debugOut << "setNodePref: " << node;
+      debugOut << " " << prefKey.print(ctx.c_debugBuffer, DebugBufferBytes);
+      debugOut << endl;
+    }
 #endif
+  }
 }
 
 // node operations

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp	2011-02-01 23:27:25 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.cpp	2011-08-16 08:27:14 +0000
@@ -39,6 +39,16 @@ static const char * f_method = "MSms";
 #endif
 #define MAX_CHUNKS 10
 
+#ifdef VM_TRACE
+#ifndef NDBD_RANDOM_START_PAGE
+#define NDBD_RANDOM_START_PAGE
+#endif
+#endif
+
+#ifdef NDBD_RANDOM_START_PAGE
+static Uint32 g_random_start_page_id = 0;
+#endif
+
 /*
  * For muti-threaded ndbd, these calls are used for locking around
  * memory allocation operations.
@@ -224,6 +234,16 @@ Ndbd_mem_manager::Ndbd_mem_manager()
   mt_mem_manager_init();
 }
 
+void*
+Ndbd_mem_manager::get_memroot() const
+{
+#ifdef NDBD_RANDOM_START_PAGE
+  return (void*)(m_base_page - g_random_start_page_id);
+#else
+  return (void*)m_base_page;
+#endif
+}
+
 /**
  *
  * resource 0 has following semantics:
@@ -359,6 +379,29 @@ Ndbd_mem_manager::init(Uint32 *watchCoun
   }
 #endif
 
+#ifdef NDBD_RANDOM_START_PAGE
+  /**
+   * In order to find bad-users of page-id's
+   *   we add a random offset to the page-id's returned
+   *   however, due to ZONE_LO that offset can't be that big
+   *   (since we at get_page don't know if it's a HI/LO page)
+   */
+  Uint32 max_rand_start = ZONE_LO_BOUND - 1;
+  if (max_rand_start > pages)
+  {
+    max_rand_start -= pages;
+    if (max_rand_start > 0x10000)
+      g_random_start_page_id = 0x10000 + (rand() % (max_rand_start - 0x10000));
+    else if (max_rand_start)
+      g_random_start_page_id = rand() % max_rand_start;
+
+    assert(Uint64(pages) + Uint64(g_random_start_page_id) <= 0xFFFFFFFF);
+
+    ndbout_c("using g_random_start_page_id: %u (%.8x)",
+             g_random_start_page_id, g_random_start_page_id);
+  }
+#endif
+
   /**
    * Do malloc
    */
@@ -670,7 +713,7 @@ Ndbd_mem_manager::alloc(AllocZone zone,
       return;
     * pages = save;
   }
-  
+
   alloc_impl(ZONE_LO, ret, pages, min);
 }
 
@@ -870,7 +913,12 @@ Ndbd_mem_manager::alloc_page(Uint32 type
 
       check_resource_limits(m_resource_limit);
       mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+      *i += g_random_start_page_id;
+      return m_base_page + *i - g_random_start_page_id;
+#else
       return m_base_page + *i;
+#endif
     }
   }
   mt_mem_manager_unlock();
@@ -885,7 +933,11 @@ Ndbd_mem_manager::release_page(Uint32 ty
   mt_mem_manager_lock();
   Resource_limit tot = m_resource_limit[0];
   Resource_limit rl = m_resource_limit[idx];
-  
+
+#ifdef NDBD_RANDOM_START_PAGE
+  i -= g_random_start_page_id;
+#endif
+
   Uint32 sub = (rl.m_curr <= rl.m_min) ? 1 : 0; // Over min ?
   release(i, 1);
   m_resource_limit[0].m_curr = tot.m_curr - 1;
@@ -954,10 +1006,16 @@ Ndbd_mem_manager::alloc_pages(Uint32 typ
     m_resource_limit[idx].m_curr = rl.m_curr + req;
     check_resource_limits(m_resource_limit);
     mt_mem_manager_unlock();
+#ifdef NDBD_RANDOM_START_PAGE
+    *i += g_random_start_page_id;
+#endif
     return ;
   }
   mt_mem_manager_unlock();
   * cnt = req;
+#ifdef NDBD_RANDOM_START_PAGE
+  *i += g_random_start_page_id;
+#endif
   return;
 }
 
@@ -969,7 +1027,11 @@ Ndbd_mem_manager::release_pages(Uint32 t
   mt_mem_manager_lock();
   Resource_limit tot = m_resource_limit[0];
   Resource_limit rl = m_resource_limit[idx];
-  
+
+#ifdef NDBD_RANDOM_START_PAGE
+  i -= g_random_start_page_id;
+#endif
+
   release(i, cnt);
 
   Uint32 currnew = rl.m_curr - cnt;

=== modified file 'storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp'
--- a/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2011-06-30 15:59:25 +0000
+++ b/storage/ndb/src/kernel/vm/ndbd_malloc_impl.hpp	2011-08-16 08:27:14 +0000
@@ -68,7 +68,7 @@ public:
 
   bool init(Uint32 *watchCounter, bool allow_alloc_less_than_requested = true);
   void map(Uint32 * watchCounter, bool memlock = false, Uint32 resources[] = 0);
-  void* get_memroot() const { return (void*)m_base_page;}
+  void* get_memroot() const;
   
   void dump() const ;
   

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-07-01 10:02:15 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-16 13:37:24 +0000
@@ -87,6 +87,32 @@ const bool traceSignals = false;
  * children. That way, results for child operations can be updated correctly
  * when the application iterates over the results of the root scan operation.
  */
+class TupleCorrelation
+{
+public:
+  static const Uint32 wordCount = 1;
+
+  explicit TupleCorrelation()
+  : m_correlation((tupleNotFound<<16) | tupleNotFound)
+  {}
+
+  /** Conversion to/from Uint32 to store/fetch from buffers */
+  explicit TupleCorrelation(Uint32 val)
+  : m_correlation(val)
+  {}
+  Uint32 toUint32() const 
+  { return m_correlation; }
+
+  Uint16 getTupleId() const
+  { return m_correlation & 0xffff;}
+
+  Uint16 getParentTupleId() const
+  { return m_correlation >> 16;}
+
+private:
+  Uint32 m_correlation;
+}; // class TupleCorrelation
+
 class CorrelationData
 {
 public:
@@ -99,18 +125,15 @@ public:
     assert(AttributeHeader(m_corrPart[0]).getAttributeId() 
            == AttributeHeader::CORR_FACTOR64);
     assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
-    assert(getTupleId()<tupleNotFound);
-    assert(getParentTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
   }
 
   Uint32 getRootReceiverId() const
   { return m_corrPart[2];}
 
-  Uint16 getTupleId() const
-  { return m_corrPart[1] & 0xffff;}
-
-  Uint16 getParentTupleId() const
-  { return m_corrPart[1] >> 16;}
+  const TupleCorrelation getTupleCorrelation() const
+  { return TupleCorrelation(m_corrPart[1]); }
 
 private:
   const Uint32* const m_corrPart;
@@ -148,6 +171,8 @@ public:
 
   explicit NdbRootFragment();
 
+  ~NdbRootFragment();
+
   /**
    * Initialize object.
    * @param query Enclosing query.
@@ -164,6 +189,11 @@ public:
    */
   void reset();
 
+  /**
+   * Prepare for reading another batch of results.
+   */
+  void prepareResultSet();
+
   void incrOutstandingResults(Int32 delta)
   {
     m_outstandingResults += delta;
@@ -198,9 +228,14 @@ public:
    * @param operationNo The id of the operation.
    * @return The result stream for this root fragment.
    */
-  NdbResultStream& getResultStream(Uint32 operationNo) const
-  { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); }
-  
+  NdbResultStream& getResultStream(Uint32 operationNo) const;
+
+  NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const
+  { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); }
+
+  Uint32 getReceiverId() const;
+  Uint32 getReceiverTcPtrI() const;
+
   /**
    * @return True if there are no more batches to be received for this fragment.
    */
@@ -212,6 +247,19 @@ public:
    */
   bool isEmpty() const;
 
+  /** 
+   * This method is used for marking which streams belonging to this
+   * NdbRootFragment which has remaining batches for a sub scan
+   * instantiated from the current batch of its parent operation.
+   */
+  void setRemainingSubScans(Uint32 nodeMask)
+  { 
+    m_remainingScans = nodeMask;
+  }
+
+  /** Release resources after last row has been returned */
+  void postFetchRelease();
+
 private:
   STATIC_CONST( voidFragNo = 0xffffffff);
 
@@ -221,6 +269,9 @@ private:
   /** Number of the root operation fragment.*/
   Uint32 m_fragNo;
 
+  /** For processing results originating from this root fragment (Array of).*/
+  NdbResultStream* m_resultStreams;
+
   /**
    * The number of outstanding TCKEYREF or TRANSID_AI 
    * messages for the fragment. This includes both messages related to the
@@ -239,6 +290,12 @@ private:
    * TCKEYCONF message has been received */
   bool m_confReceived;
 
+  /**
+   * A bitmask of operation id's for which we will receive more
+   * ResultSets in a NEXTREQ.
+   */
+  Uint32 m_remainingScans;
+
   /** 
    * Used for implementing a hash map from root receiver ids to a 
    * NdbRootFragment instance. m_idMapHead is the index of the first
@@ -272,26 +329,19 @@ public:
    * @param operation The operation for which we will receive results.
    * @param rootFragNo 0..n-1 when the root operation reads from n fragments.
    */
-  explicit NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo);
+  explicit NdbResultStream(NdbQueryOperationImpl& operation,
+                           NdbRootFragment& rootFrag);
 
   ~NdbResultStream();
 
   /** 
    * Prepare for receiving first results. 
-   * @return possible error code. 
    */
-  int prepare();
+  void prepare();
 
   /** Prepare for receiving next batch of scan results. */
   void reset();
     
-  /**
-   * 0..n-1 if the root operation reads from n fragments. This stream holds data
-   * derived from one of those fragments.
-   */
-  Uint32 getRootFragNo() const
-  { return m_rootFragNo; }
-
   NdbReceiver& getReceiver()
   { return m_receiver; }
 
@@ -301,6 +351,9 @@ public:
   Uint32 getRowCount() const
   { return m_rowCount; }
 
+  char* getRow(Uint32 tupleNo) const
+  { return (m_buffer + (tupleNo*m_rowSize)); }
+
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
    * ids and pass it on to m_receiver.
@@ -308,12 +361,15 @@ public:
    * @param ptr buffer holding tuple.
    * @param len buffer length.
    */
-  void execTRANSID_AI(const Uint32 *ptr, Uint32 len);
+  void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                      TupleCorrelation correlation);
 
-  /** A complete batch has been received for a fragment on this NdbResultStream,
-   *  Update whatever required before the appl. are allowed to navigate the result.
+  /**
+   * A complete batch has been received for a fragment on this NdbResultStream,
+   * Update whatever required before the appl. are allowed to navigate the result.
+   * @return true if node and all its siblings have returned all rows.
    */ 
-  void handleBatchComplete();
+  bool prepareResultSet(Uint32 remainingScans);
 
   /**
    * Navigate within the current result batch to resp. first and next row.
@@ -333,36 +389,33 @@ public:
   { return m_iterState == Iter_finished; }
 
   /** 
-   * This method is
-   * used for marking a stream as holding the last batch of a sub scan. 
-   * This means that it is the last batch of the scan that was instantiated 
-   * from the current batch of its parent operation.
-   */
-  void setSubScanCompletion(bool complete)
-  { 
-    // Lookups should always be 'complete'
-    assert(complete || m_operation.getQueryOperationDef().isScanOperation());
-    m_subScanComplete = complete; 
-  }
-
-  /** 
    * This method 
-   * returns true if this result stream holds the last batch of a sub scan
+   * returns true if this result stream holds the last batch of a sub scan.
    * This means that it is the last batch of the scan that was instantiated 
    * from the current batch of its parent operation.
    */
-  bool isSubScanComplete() const
+  bool isSubScanComplete(Uint32 remainingScans) const
   { 
-    // Lookups should always be 'complete'
-    assert(m_subScanComplete || m_operation.getQueryOperationDef().isScanOperation());
-    return m_subScanComplete; 
+    /**
+     * Find the node number seen by the SPJ block. Since a unique index
+     * operation will have two distincts nodes in the tree used by the
+     * SPJ block, this number may be different from 'opNo'.
+     */
+    const Uint32 internalOpNo = m_operation.getQueryOperationDef().getQueryOperationId();
+
+    const bool complete = !((remainingScans >> internalOpNo) & 1);
+    assert(complete || isScanResult());    // Lookups should always be 'complete'
+    return complete; 
   }
 
-  /** Variant of isSubScanComplete() above which checks that this resultstream
-   * and all its descendants have consumed all batches of rows instantiated 
-   * from their parent operation(s).
-   */
-  bool isAllSubScansComplete() const;
+  bool isScanQuery() const
+  { return (m_properties & Is_Scan_Query); }
+
+  bool isScanResult() const
+  { return (m_properties & Is_Scan_Result); }
+
+  bool isInnerJoin() const
+  { return (m_properties & Is_Inner_Join); }
 
   /** For debugging.*/
   friend NdbOut& operator<<(NdbOut& out, const NdbResultStream&);
@@ -405,7 +458,7 @@ public:
      * that had no matching children.*/
     Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
 
-    explicit TupleSet()
+    explicit TupleSet() : m_hash_head(tupleNotFound)
     {}
 
   private:
@@ -415,22 +468,39 @@ public:
   };
 
 private:
-  /** This stream handles results derived from the m_rootFragNo'th 
-   * fragment of the root operation.*/
-  const Uint32 m_rootFragNo;
+  /**
+   * This stream handles results derived from specified 
+   * m_rootFrag of the root operation.
+   */
+  const NdbRootFragment& m_rootFrag;
+ 
+  /** Operation to which this resultStream belong.*/
+  NdbQueryOperationImpl& m_operation;
+
+  /** ResultStream for my parent operation, or NULL if I am root */
+  NdbResultStream* const m_parent;
+
+  const enum properties
+  {
+    Is_Scan_Query = 0x01,
+    Is_Scan_Result = 0x02,
+    Is_Inner_Join = 0x10
+  } m_properties;
 
   /** The receiver object that unpacks transid_AI messages.*/
   NdbReceiver m_receiver;
 
-  /** Max #rows which this stream may recieve in its buffer structures */
-  Uint32 m_maxRows;
+  /** The buffers which we receive the results into */
+  char* m_buffer;
+
+  /** Used for checking if buffer overrun occurred. */
+  Uint32* m_batchOverflowCheck;
+
+  Uint32 m_rowSize;
 
   /** The number of transid_AI messages received.*/
   Uint32 m_rowCount;
 
-  /** Operation to which this resultStream belong.*/
-  NdbQueryOperationImpl& m_operation;
-
   /** This is the state of the iterator used by firstResult(), nextResult().*/
   enum
   {
@@ -442,24 +512,19 @@ private:
     Iter_finished
   } m_iterState;
 
-  /** Tuple id of the current tuple, or 'tupleNotFound' if Iter_notStarted or Iter_finished. */
+  /**
+   * Tuple id of the current tuple, or 'tupleNotFound'
+   * if Iter_notStarted or Iter_finished. 
+   */
   Uint16 m_currentRow;
   
-  /** 
-   * This field is only used for result streams of scan operations. If set,
-   * it indicates that the stream is holding the last batch of a sub scan. 
-   * This means that it is the last batch of the scan that was instantiated 
-   * from the current batch of its parent operation.
-   */
-  bool m_subScanComplete;
+  /** Max #rows which this stream may recieve in its TupleSet structures */
+  Uint32 m_maxRows;
 
+  /** TupleSet contains the correlation between parent/childs */
   TupleSet* m_tupleSet;
 
-  void clearTupleSet();
-
-  void setParentChildMap(Uint16 parentId,
-                         Uint16 tupleId, 
-                         Uint16 tupleNo);
+  void buildResultCorrelations();
 
   Uint16 getTupleId(Uint16 tupleNo) const
   { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
@@ -525,15 +590,30 @@ void* NdbBulkAllocator::allocObjMem(Uint
 /////////  NdbResultStream methods ///////////
 //////////////////////////////////////////////
 
-NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo):
-  m_rootFragNo(rootFragNo),
+NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation,
+                                 NdbRootFragment& rootFrag)
+:
+  m_rootFrag(rootFrag),
+  m_operation(operation),
+  m_parent(operation.getParentOperation()
+        ? &rootFrag.getResultStream(*operation.getParentOperation())
+        : NULL),
+  m_properties(
+    (enum properties)
+     ((operation.getQueryDef().isScanQuery()
+       ? Is_Scan_Query : 0)
+     | (operation.getQueryOperationDef().isScanOperation()
+       ? Is_Scan_Result : 0)
+     | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
+       ? Is_Inner_Join : 0))),
   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
-  m_maxRows(0),
+  m_buffer(NULL),
+  m_batchOverflowCheck(NULL),
+  m_rowSize(0),
   m_rowCount(0),
-  m_operation(operation),
   m_iterState(Iter_notStarted),
   m_currentRow(tupleNotFound),
-  m_subScanComplete(true),
+  m_maxRows(0),
   m_tupleSet(NULL)
 {};
 
@@ -545,41 +625,58 @@ NdbResultStream::~NdbResultStream()
   }
 }
 
-int  // Return 0 if ok, else errorcode
+void
 NdbResultStream::prepare()
 {
+  const Uint32 rowSize = m_operation.getRowSize();
+  NdbQueryImpl &query = m_operation.getQuery();
+
+  m_rowSize = rowSize;
+
   /* Parent / child correlation is only relevant for scan type queries
    * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
    * Neither is these structures required for operations not having respective
    * child or parent operations.
    */
-  if (m_operation.getQueryDef().isScanQuery())
+  if (isScanQuery())
   {
     m_maxRows  = m_operation.getMaxBatchRows();
     m_tupleSet = 
-      new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows)) 
+      new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) 
       TupleSet[m_maxRows];
-
-    clearTupleSet();
   }
   else
     m_maxRows = 1;
 
-  return 0;
-} //NdbResultStream::prepare
+  const int bufferSize = rowSize * m_maxRows;
+  NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+  m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
 
+  // So that we can test for buffer overrun.
+  m_batchOverflowCheck = 
+    reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+  *m_batchOverflowCheck = 0xacbd1234;
+
+  m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
+  m_receiver.do_setup_ndbrecord(
+                          m_operation.getNdbRecord(),
+                          m_maxRows, 
+                          0 /*key_size*/, 
+                          0 /*read_range_no*/, 
+                          rowSize,
+                          m_buffer);
+} //NdbResultStream::prepare
 
 void
 NdbResultStream::reset()
 {
-  assert (m_operation.getQueryDef().isScanQuery());
+  assert (isScanQuery());
 
   // Root scan-operation need a ScanTabConf to complete
   m_rowCount = 0;
   m_iterState = Iter_notStarted;
   m_currentRow = tupleNotFound;
 
-  clearTupleSet();
   m_receiver.prepareSend();
   /**
    * If this stream will get new rows in the next batch, then so will
@@ -589,88 +686,14 @@ NdbResultStream::reset()
        childNo++)
   {
     NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    child.getResultStream(getRootFragNo()).reset();
+    m_rootFrag.getResultStream(child).reset();
   }
 } //NdbResultStream::reset
 
-void
-NdbResultStream::clearTupleSet()
-{
-  assert (m_operation.getQueryDef().isScanQuery());
-  for (Uint32 i=0; i<m_maxRows; i++)
-  {
-    m_tupleSet[i].m_parentId = tupleNotFound;
-    m_tupleSet[i].m_tupleId  = tupleNotFound;
-    m_tupleSet[i].m_hash_head = tupleNotFound;
-    m_tupleSet[i].m_skip = false;
-    m_tupleSet[i].m_hasMatchingChild.clear();
-  }
-}
-
-bool
-NdbResultStream::isAllSubScansComplete() const
-{ 
-  // Lookups should always be 'complete'
-  assert(m_subScanComplete || m_operation.getQueryOperationDef().isScanOperation());
-
-  if (!m_subScanComplete)
-    return false;
-
-  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); 
-       childNo++)
-  {
-    const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    const NdbResultStream& childStream = child.getResultStream(getRootFragNo());
-    if (!childStream.isAllSubScansComplete())
-      return false;
-  }
-  return true;
-} //NdbResultStream::isAllSubScansComplete
-
-
-void
-NdbResultStream::setParentChildMap(Uint16 parentId,
-                                   Uint16 tupleId, 
-                                   Uint16 tupleNo)
-{
-  assert (m_operation.getQueryDef().isScanQuery());
-  assert (tupleNo < m_maxRows);
-  assert (tupleId != tupleNotFound);
-
-  for (Uint32 i = 0; i < tupleNo; i++)
-  {
-    // Check that tuple id is unique.
-    assert (m_tupleSet[i].m_tupleId != tupleId); 
-  }
-  m_tupleSet[tupleNo].m_parentId = parentId;
-  m_tupleSet[tupleNo].m_tupleId  = tupleId;
-
-  const Uint16 hash = (parentId % m_maxRows);
-  if (parentId == tupleNotFound)
-  {
-    /* Root stream: Insert sequentially in hash_next to make it
-     * possible to use ::findTupleWithParentId() and ::findNextTuple()
-     * to navigate even the root operation.
-     */
-    assert (m_operation.getParentOperation()==NULL);
-    /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
-    if (tupleNo==0)
-      m_tupleSet[hash].m_hash_head  = 0;
-    else
-      m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
-    m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
-  }
-  else
-  {
-    /* Insert parentId in HashMap */
-    m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
-    m_tupleSet[hash].m_hash_head  = tupleNo;
-  }
-}
-
 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
  *  parentId == tupleNotFound is use as a special value for iterating results
- *  from the root operation in the order which they was inserted by ::setParentChildMap()
+ *  from the root operation in the order which they was inserted by 
+ *  ::buildResultCorrelations()
  *
  *  Position of 'currentRow' is *not* updated and should be modified by callee
  *  if it want to keep the new position.
@@ -678,7 +701,7 @@ NdbResultStream::setParentChildMap(Uint1
 Uint16
 NdbResultStream::findTupleWithParentId(Uint16 parentId) const
 {
-  assert ((parentId==tupleNotFound) == (m_operation.getParentOperation()==NULL));
+  assert ((parentId==tupleNotFound) == (m_parent==NULL));
 
   if (likely(m_rowCount>0))
   {
@@ -736,14 +759,10 @@ NdbResultStream::findNextTuple(Uint16 tu
 Uint16
 NdbResultStream::firstResult()
 {
-  NdbQueryOperationImpl* parent = m_operation.getParentOperation();
-
   Uint16 parentId = tupleNotFound;
-  if (parent!=NULL)
+  if (m_parent!=NULL)
   {
-    const NdbResultStream& parentStream = parent->getResultStream(m_rootFragNo);
-    parentId = parentStream.getCurrentTupleId();
-
+    parentId = m_parent->getCurrentTupleId();
     if (parentId == tupleNotFound)
     {
       m_currentRow = tupleNotFound;
@@ -780,104 +799,195 @@ NdbResultStream::nextResult()
 } //NdbResultStream::nextResult()
 
 
+/**
+ * Callback when a TRANSID_AI signal (receive row) is processed.
+ */
 void
-NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len)
+NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                                TupleCorrelation correlation)
 {
-  assert(m_iterState == Iter_notStarted);
-  if (m_operation.getQueryDef().isScanQuery())
-  {
-    const CorrelationData correlData(ptr, len);
-
-    assert(m_operation.getRoot().getResultStream(m_rootFragNo)
-           .m_receiver.getId() == correlData.getRootReceiverId());
-
-    m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
+  m_receiver.execTRANSID_AI(ptr, len);
+  m_rowCount++;
 
+  if (isScanQuery())
+  {
     /**
-     * Keep correlation data between parent and child tuples.
-     * Since tuples may arrive in any order, we cannot match
-     * parent and child until all tuples (for this batch and 
-     * root fragment) have arrived.
+     * Store TupleCorrelation as hidden value imm. after received row
+     * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
      */
-    setParentChildMap(m_operation.getParentOperation()==NULL
-                      ? tupleNotFound
-                      : correlData.getParentTupleId(),
-                      correlData.getTupleId(),
-                      m_rowCount);
+    Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+    row_recv[-1] = correlation.toUint32();
   }
-  else
-  {
-    // Lookup query.
-    m_receiver.execTRANSID_AI(ptr, len);
-  }
-  m_rowCount++;
-  /* Set correct #rows received in the NdbReceiver.
-   */
-  getReceiver().m_result_rows = getRowCount();
 } // NdbResultStream::execTRANSID_AI()
 
-
 /**
- * A fresh batch of results has arrived for this ResultStream (and all its parent / childs)
- * Filter away any result rows which should not be visible (yet) - Either due to incomplete
- * child batches, or the join being an 'inner join'.
- * Set result itterator state to 'before first' resultrow.
+ * Make preparations for another batch of result to be read:
+ *  - Fill in parent/child result correlations in m_tupleSet[]
+ *  - ... or reset m_tupleSet[] if we reuse the previous.
+ *  - Apply inner/outer join filtering to remove non qualifying 
+ *    rows.
  */
-void 
-NdbResultStream::handleBatchComplete()
+bool 
+NdbResultStream::prepareResultSet(Uint32 remainingScans)
 {
-  for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+  bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
+  assert(isComplete || isScanResult());                //Lookups always 'complete'
+
+  // Set correct #rows received in the NdbReceiver.
+  getReceiver().m_result_rows = getRowCount();
+
+  /**
+   * Prepare NdbResultStream for reading - either the next received
+   * from datanodes or reuse current.
+   */
+  if (m_tupleSet!=NULL)
   {
-    m_tupleSet[tupleNo].m_skip = false;
+    const bool newResults = (m_iterState!=Iter_finished);
+    if (newResults)
+    {
+      buildResultCorrelations();
+    }
+    else
+    {
+      // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      {
+        m_tupleSet[tupleNo].m_skip = false;
+      }
+    }
   }
 
+  /**
+   * Recursively iterate all child results depth first. 
+   * Filter away any result rows which should not be visible (yet) - 
+   * Either due to incomplete child batches, or the join being an 'inner join'.
+   * Set result itterator state to 'before first' resultrow.
+   */
   for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
   {
     const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    NdbResultStream& childStream = child.getResultStream(m_rootFragNo);
-    childStream.handleBatchComplete();
+    NdbResultStream& childStream = m_rootFrag.getResultStream(child);
+    const bool allSubScansComplete = childStream.prepareResultSet(remainingScans);
 
-    const bool isInnerJoin = child.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll;
-    const bool allSubScansComplete = childStream.isAllSubScansComplete();
+    Uint32 childId = child.getQueryOperationDef().getQueryOperationIx();
 
-    for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+    /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
+    const bool skipNonMatches = !allSubScansComplete ||      // 1)
+                                childStream.isInnerJoin();   // 2)
+
+    if (m_tupleSet!=NULL)
     {
-      if (!m_tupleSet[tupleNo].m_skip)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
       {
-        Uint16 tupleId = getTupleId(tupleNo);
-        if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
-          m_tupleSet[tupleNo].m_hasMatchingChild.set(childNo);
-
-        /////////////////////////////////
-        //  No child matched for this row. Making parent row visible
-        //  will cause a NULL (outer join) row to be produced.
-        //  Skip NULL row production when:
-        //    1) Some child batches are not complete; they may contain later matches.
-        //    2) A match was found in a previous batch.
-        //    3) Join type is 'inner join', skip as no child are matching.
-        //
-        else if (!allSubScansComplete                                 // 1)
-             ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childNo)  // 2)
-             ||  isInnerJoin)                                         // 3)
-          m_tupleSet[tupleNo].m_skip = true;
+        if (!m_tupleSet[tupleNo].m_skip)
+        {
+          Uint16 tupleId = getTupleId(tupleNo);
+          if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
+            m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
+
+          /////////////////////////////////
+          //  No child matched for this row. Making parent row visible
+          //  will cause a NULL (outer join) row to be produced.
+          //  Skip NULL row production when:
+          //    1) Some child batches are not complete; they may contain later matches.
+          //    2) Join type is 'inner join', skip as no child are matching.
+          //    3) A match was found in a previous batch.
+          //  Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
+          //
+          else if (skipNonMatches                                       // 1 & 2)
+               ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
+            m_tupleSet[tupleNo].m_skip = true;
+        }
       }
     }
+    isComplete &= allSubScansComplete;
   }
-  m_currentRow = tupleNotFound;
+
+  // Set current position 'before first'
   m_iterState = Iter_notStarted;
-} // NdbResultStream::handleBatchComplete()
+  m_currentRow = tupleNotFound;
+
+  return isComplete; 
+} // NdbResultStream::prepareResultSet()
+
+
+/**
+ * Fill m_tupleSet[] with correlation data between parent 
+ * and child tuples. The 'TupleCorrelation' is stored as
+ * and extra Uint32 after each row received
+ * by execTRANSID_AI().
+ *
+ * NOTE: In order to reduce work done when holding the 
+ * transporter mutex, the 'TupleCorrelation' is only stored
+ * in the buffer when it arrives. Later (here) we build the
+ * correlation hashMap immediately before we prepare to 
+ * read the NdbResultStream.
+ */
+void 
+NdbResultStream::buildResultCorrelations()
+{
+  // Buffer overrun check.
+  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
+//if (m_tupleSet!=NULL)
+  {
+    /* Clear the hashmap structures */
+    for (Uint32 i=0; i<m_maxRows; i++)
+    {
+      m_tupleSet[i].m_hash_head = tupleNotFound;
+    }
+
+    /* Rebuild correlation & hashmap from received buffers */
+    for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+    {
+      const Uint32* row = (Uint32*)getRow(tupleNo+1);
+      const TupleCorrelation correlation(row[-1]);
+
+      const Uint16 tupleId  = correlation.getTupleId();
+      const Uint16 parentId = (m_parent!=NULL) 
+                                ? correlation.getParentTupleId()
+                                : tupleNotFound;
+
+      m_tupleSet[tupleNo].m_skip     = false;
+      m_tupleSet[tupleNo].m_parentId = parentId;
+      m_tupleSet[tupleNo].m_tupleId  = tupleId;
+      m_tupleSet[tupleNo].m_hasMatchingChild.clear();
+
+      /* Insert into parentId-hashmap */
+      const Uint16 hash = (parentId % m_maxRows);
+      if (m_parent==NULL)
+      {
+        /* Root stream: Insert sequentially in hash_next to make it
+         * possible to use ::findTupleWithParentId() and ::findNextTuple()
+         * to navigate even the root operation.
+         */
+        /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
+        if (tupleNo==0)
+          m_tupleSet[hash].m_hash_head  = tupleNo;
+        else
+          m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
+        m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
+      }
+      else
+      {
+        /* Insert parentId in HashMap */
+        m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
+        m_tupleSet[hash].m_hash_head  = tupleNo;
+      }
+    }
+  }
+} // NdbResultStream::buildResultCorrelations
 
 
 ///////////////////////////////////////////
-/////////  NdbRootFragment methods ///////////
+////////  NdbRootFragment methods /////////
 ///////////////////////////////////////////
 void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags, 
                                         Uint32 noOfFrags)
 {
   for(Uint32 fragNo = 0; fragNo < noOfFrags; fragNo++)
   {
-    const Uint32 receiverId = 
-      frags[fragNo].getResultStream(0).getReceiver().getId();
+    const Uint32 receiverId = frags[fragNo].getReceiverId();
     /** 
      * For reasons unknow, NdbObjectIdMap shifts ids two bits to the left,
      * so we must do the opposite to get a good hash distribution.
@@ -890,6 +1000,7 @@ void NdbRootFragment::buildReciverIdMap(
   } 
 }
 
+//static
 NdbRootFragment* 
 NdbRootFragment::receiverIdLookup(NdbRootFragment* frags, 
                                   Uint32 noOfFrags, 
@@ -903,9 +1014,7 @@ NdbRootFragment::receiverIdLookup(NdbRoo
   const int hash = (receiverId >> 2) % noOfFrags;
   int current = frags[hash].m_idMapHead;
   assert(current < static_cast<int>(noOfFrags));
-  while (current >= 0 && 
-         frags[current].getResultStream(0).getReceiver().getId() 
-         != receiverId)
+  while (current >= 0 && frags[current].getReceiverId() != receiverId)
   {
     current = frags[current].m_idMapNext;
     assert(current < static_cast<int>(noOfFrags));
@@ -924,18 +1033,65 @@ NdbRootFragment::receiverIdLookup(NdbRoo
 NdbRootFragment::NdbRootFragment():
   m_query(NULL),
   m_fragNo(voidFragNo),
+  m_resultStreams(NULL),
   m_outstandingResults(0),
   m_confReceived(false),
+  m_remainingScans(0),
   m_idMapHead(-1),
   m_idMapNext(-1)
 {
 }
 
+NdbRootFragment::~NdbRootFragment()
+{
+  assert(m_resultStreams==NULL);
+}
+
 void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
 {
   assert(m_fragNo==voidFragNo);
   m_query = &query;
   m_fragNo = fragNo;
+
+  m_resultStreams = reinterpret_cast<NdbResultStream*>
+     (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
+  assert(m_resultStreams!=NULL);
+
+  for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++) 
+  {
+    NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
+    new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
+    m_resultStreams[opNo].prepare();
+  }
+}
+
+/**
+ * Release what we want need anymore after last available row has been 
+ * returned from datanodes.
+ */ 
+void
+NdbRootFragment::postFetchRelease()
+{
+  if (m_resultStreams != NULL)
+  { 
+    for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++) 
+    {
+      m_resultStreams[opNo].~NdbResultStream();
+    }
+  }
+  /**
+   * Don't 'delete' the object as it was in-place constructed from
+   * ResultStreamAlloc'ed memory. Memory is released by
+   * ResultStreamAlloc::reset().
+   */
+  m_resultStreams = NULL;
+}
+
+NdbResultStream&
+NdbRootFragment::getResultStream(Uint32 operationNo) const
+{
+  assert(m_resultStreams);
+  return m_resultStreams[operationNo];
 }
 
 void NdbRootFragment::reset()
@@ -943,9 +1099,31 @@ void NdbRootFragment::reset()
   assert(m_fragNo!=voidFragNo);
   assert(m_outstandingResults == 0);
   assert(m_confReceived);
+
+  for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++) 
+  {
+    if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+    {
+      /**
+       * Reset m_resultStreams[] and all its descendants, since all these
+       * streams will get a new set of rows in the next batch.
+       */ 
+      m_resultStreams[opNo].reset();
+    }
+  }
   m_confReceived = false;
 }
 
+void NdbRootFragment::prepareResultSet()
+{
+  NdbResultStream& rootStream = getResultStream(0);
+  rootStream.prepareResultSet(m_remainingScans);  
+
+  /* Position at the first (sorted?) row available from this fragments.
+   */
+  rootStream.firstResult();
+}
+
 void NdbRootFragment::setConfReceived()
 { 
   /* For a query with a lookup root, there may be more than one TCKEYCONF
@@ -958,14 +1136,31 @@ void NdbRootFragment::setConfReceived()
 
 bool NdbRootFragment::finalBatchReceived() const
 {
-  return getResultStream(0).getReceiver().m_tcPtrI==RNIL;
+  return getReceiverTcPtrI()==RNIL;
 }
 
-bool  NdbRootFragment::isEmpty() const
+bool NdbRootFragment::isEmpty() const
 { 
   return getResultStream(0).isEmpty();
 }
 
+/**
+ * SPJ requests are identified by the receiver-id of the
+ * *root* ResultStream for each RootFragment. Furthermore
+ * a NEXTREQ use the tcPtrI saved in this ResultStream to
+ * identify the 'cursor' to restart.
+ *
+ * We provide some convenient accessors for fetching this info 
+ */
+Uint32 NdbRootFragment::getReceiverId() const
+{
+  return getResultStream(0).getReceiver().getId();
+}
+
+Uint32 NdbRootFragment::getReceiverTcPtrI() const
+{
+  return getResultStream(0).getReceiver().m_tcPtrI;
+}
 
 ///////////////////////////////////////////
 /////////  NdbQuery API methods ///////////
@@ -1475,7 +1670,14 @@ NdbQueryImpl::~NdbQueryImpl()
 void
 NdbQueryImpl::postFetchRelease()
 {
-  if (m_operations != NULL) {
+  if (m_rootFrags != NULL)
+  {
+    for (unsigned i=0; i<m_rootFragCount; i++)
+    { m_rootFrags[i].postFetchRelease();
+    }
+  }
+  if (m_operations != NULL)
+  {
     for (unsigned i=0; i<m_countOperations; i++)
     { m_operations[i].postFetchRelease();
     }
@@ -1986,6 +2188,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
         NdbRootFragment* frag;
         while ((frag=m_fullFrags.pop()) != NULL)
         {
+          frag->prepareResultSet();
           m_applFrags.add(*frag);
         }
         if (m_applFrags.getCurrent() != NULL)
@@ -2040,6 +2243,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
     NdbRootFragment* frag;
     if ((frag=m_fullFrags.pop()) != NULL)
     {
+      frag->prepareResultSet();
       m_applFrags.add(*frag);
     }
     assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
@@ -2070,15 +2274,15 @@ NdbQueryImpl::awaitMoreResults(bool forc
   returns: 'true' when application thread should be resumed.
 */
 bool 
-NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
+NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
 {
   if (traceSignals) {
-    ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo
+    ndbout << "NdbQueryImpl::handleBatchComplete"
+           << ", fragNo=" << rootFrag.getFragNo()
            << ", pendingFrags=" << (m_pendingFrags-1)
            << ", finalBatchFrags=" << m_finalBatchFrags
            <<  endl;
   }
-  bool resume = false;
 
   /* May received fragment data after a SCANREF() (timeout?) 
    * terminated the scan.  We are about to close this query, 
@@ -2086,8 +2290,6 @@ NdbQueryImpl::handleBatchComplete(Uint32
    */
   if (likely(m_fullFrags.m_errorCode == 0))
   {
-    NdbQueryOperationImpl& root = getRoot();
-    NdbRootFragment& rootFrag = m_rootFrags[fragNo];
     assert(rootFrag.isFragBatchComplete());
 
     assert(m_pendingFrags > 0);                // Check against underflow.
@@ -2100,34 +2302,14 @@ NdbQueryImpl::handleBatchComplete(Uint32
       assert(m_finalBatchFrags <= m_rootFragCount);
     }
 
-    if (getQueryDef().isScanQuery())
-    {
-      // Only required for scans
-      root.getResultStream(fragNo).handleBatchComplete();  
-
-      // Only ordered scans has to wait until all pending completed
-      resume = (m_pendingFrags==0) ||
-               (root.m_ordering==NdbQueryOptions::ScanOrdering_unordered);
-    }
-    else
-    {
-      assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL);
-      assert(m_finalBatchFrags==1);
-      assert(m_pendingFrags==0);  // Lookup query should be complete now.
-      resume = true;   
-    }
-
-    /* Position at the first (sorted?) row available from this fragments.
-     */
-    root.m_resultStreams[fragNo]->firstResult();
-
     /* When application thread ::awaitMoreResults() it will later be moved
      * from m_fullFrags to m_applFrags under mutex protection.
      */
     m_fullFrags.push(rootFrag);
+    return true;
   }
 
-  return resume;
+  return false;
 } // NdbQueryImpl::handleBatchComplete
 
 int
@@ -2272,22 +2454,22 @@ NdbQueryImpl::execTCKEYCONF()
     ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
   }
   assert(!getQueryDef().isScanQuery());
+  NdbRootFragment& rootFrag = m_rootFrags[0];
 
   // We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
-  m_rootFrags[0].setConfReceived();
-  m_rootFrags[0].incrOutstandingResults(-1);
+  rootFrag.setConfReceived();
+  rootFrag.incrOutstandingResults(-1);
 
   bool ret = false;
-  if (m_rootFrags[0].isFragBatchComplete())
+  if (rootFrag.isFragBatchComplete())
   { 
-    ret = handleBatchComplete(0);
+    ret = handleBatchComplete(rootFrag);
   }
 
   if (traceSignals) {
     ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
            << ", m_pendingFrags=" << m_pendingFrags
-           << ", *getRoot().m_resultStreams[0]=" 
-           << *getRoot().m_resultStreams[0]
+           << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
            << endl;
   }
   return ret;
@@ -2364,9 +2546,7 @@ NdbQueryImpl::prepareSend()
   // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
   error = m_pointerAlloc.init(m_rootFragCount * 
                               (SharedFragStack::pointersPerFragment +
-                               OrderedFragSet::pointersPerFragment +
-                               // Pointers to NdbResultStream objects.
-                               getNoOfOperations()));
+                               OrderedFragSet::pointersPerFragment));
   if (error != 0)
   {
     setErrorCode(error);
@@ -2377,21 +2557,22 @@ NdbQueryImpl::prepareSend()
   getRoot().calculateBatchedRows(NULL);
   getRoot().setBatchedRows(1);
 
-  /** Calculate total amount of row buffer space for all operations and
-   * fragments.*/
+  /**
+   * Calculate total amount of row buffer space for all operations and
+   * fragments.
+   */
   Uint32 totalBuffSize = 0;
   for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
   {
-    NdbQueryOperationImpl& op = getQueryOperation(opNo);
-
-    op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows();
-    totalBuffSize += op.m_bufferSize;
+    const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+    totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
   }
-  /** Add one word per operation for buffer overrun check. We add a word
+  /**
+   * Add one word per ResultStream for buffer overrun check. We add a word
    * rather than a byte to avoid possible alignment problems.
    */
-  m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount + 
-                        sizeof(Uint32) * getNoOfOperations());
+  m_rowBufferAlloc.init(m_rootFragCount * 
+                       (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
   if (getQueryDef().isScanQuery())
   {
     Uint32 totalRows = 0;
@@ -2406,15 +2587,28 @@ NdbQueryImpl::prepareSend()
       return -1;
     }
   }
-  // 1. Build receiver structures for each QueryOperation.
-  // 2. Fill in parameters (into ATTRINFO) for QueryTree.
-  //    (Has to complete *after* ::prepareReceiver() as QueryTree params
-  //     refer receiver id's.)
-  //
+
+  /**
+   * Allocate and initialize fragment state variables.
+   * Will also cause a ResultStream object containing a 
+   * NdbReceiver to be constructed for each operation in QueryTree
+   */
+  m_rootFrags = new NdbRootFragment[m_rootFragCount];
+  if (m_rootFrags == NULL)
+  {
+    setErrorCode(Err_MemoryAlloc);
+    return -1;
+  }
+  for (Uint32 i = 0; i<m_rootFragCount; i++)
+  {
+    m_rootFrags[i].init(*this, i); // Set fragment number.
+  }
+
+  // Fill in parameters (into ATTRINFO) for QueryTree.
   for (Uint32 i = 0; i < m_countOperations; i++) {
-    int error;
-    if (unlikely((error = m_operations[i].prepareReceiver()) != 0)
-              || (error = m_operations[i].prepareAttrInfo(m_attrInfo)) != 0) {
+    const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
+    if (unlikely(error))
+    {
       setErrorCode(error);
       return -1;
     }
@@ -2450,23 +2644,6 @@ NdbQueryImpl::prepareSend()
     return -1;
   }
 
-  /**
-   * Allocate and initialize fragment state variables.
-   */
-  m_rootFrags = new NdbRootFragment[m_rootFragCount];
-  if(m_rootFrags == NULL)
-  {
-    setErrorCode(Err_MemoryAlloc);
-    return -1;
-  }
-  else
-  {
-    for(Uint32 i = 0; i<m_rootFragCount; i++)
-    {
-      m_rootFrags[i].init(*this, i); // Set fragment number.
-    }
-  }
-
   if (getQueryDef().isScanQuery())
   {
     NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
@@ -2494,11 +2671,13 @@ class InitialReceiverIdIterator: public
 {
 public:
   
-  InitialReceiverIdIterator(const NdbQueryImpl& query)
-    :m_query(query),
+  InitialReceiverIdIterator(NdbRootFragment rootFrags[],
+                            Uint32 cnt)
+    :m_rootFrags(rootFrags),
+     m_fragCount(cnt),
      m_currFragNo(0)
   {}
-  
+
   virtual ~InitialReceiverIdIterator() {};
   
   /**
@@ -2519,8 +2698,11 @@ private:
    * improving efficiency.
    */
   static const Uint32 bufSize = 16;
-  /** The query with the scan root operation that we list receiver ids for.*/
-  const NdbQueryImpl& m_query;
+
+  /** Set of root fragments which we want to itterate receiver ids for.*/
+  NdbRootFragment* m_rootFrags;
+  const Uint32 m_fragCount;
+
   /** The next fragment numnber to be processed. (Range for 0 to no of 
    * fragments.)*/
   Uint32 m_currFragNo;
@@ -2530,25 +2712,25 @@ private:
 
 const Uint32* InitialReceiverIdIterator::getNextWords(Uint32& sz)
 {
-  sz = 0;
   /**
    * For the initial batch, we want to retrieve one batch for each fragment
    * whether it is a sorted scan or not.
    */
-  if (m_currFragNo >= m_query.getRootFragCount())
+  if (m_currFragNo >= m_fragCount)
   {
+    sz = 0;
     return NULL;
   }
   else
   {
-    const NdbQueryOperationImpl& root = m_query.getQueryOperation(0U);
-    while (sz < bufSize && 
-           m_currFragNo < m_query.getRootFragCount())
+    Uint32 cnt = 0;
+    while (cnt < bufSize && m_currFragNo < m_fragCount)
     {
-      m_receiverIds[sz] = root.getReceiver(m_currFragNo).getId();
-      sz++;
+      m_receiverIds[cnt] = m_rootFrags[m_currFragNo].getReceiverId();
+      cnt++;
       m_currFragNo++;
     }
+    sz = cnt;
     return m_receiverIds;
   }
 }
@@ -2690,7 +2872,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
      * Section 2 : Optional KEYINFO section
      */
     GenericSectionPtr secs[3];
-    InitialReceiverIdIterator receiverIdIter(*this);
+    InitialReceiverIdIterator receiverIdIter(m_rootFrags, m_rootFragCount);
     LinearSectionIterator attrInfoIter(m_attrInfo.addr(), m_attrInfo.getSize());
     LinearSectionIterator keyInfoIter(m_keyInfo.addr(), m_keyInfo.getSize());
  
@@ -2831,28 +3013,11 @@ Remark:
 int
 NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend)
 {
-  assert(getRoot().m_resultStreams!=NULL);
   assert(!emptyFrag.finalBatchReceived());
   assert(getQueryDef().isScanQuery());
 
-  const Uint32 fragNo = emptyFrag.getFragNo();
   emptyFrag.reset();
 
-  for (unsigned opNo=0; opNo<m_countOperations; opNo++) 
-  {
-    NdbResultStream& resultStream = 
-       getQueryOperation(opNo).getResultStream(fragNo);
-
-    if (!resultStream.isSubScanComplete())
-    {
-      /**
-       * Reset resultstream and all its descendants, since all these
-       * streams will get a new set of rows in the next batch.
-       */ 
-      resultStream.reset();
-    }
-  }
-
   Ndb& ndb = *getNdbTransaction().getNdb();
   NdbApiSignal tSignal(&ndb);
   tSignal.setSignal(GSN_SCAN_NEXTREQ, refToBlock(m_scanTransaction->m_tcRef));
@@ -2868,8 +3033,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
   scanNextReq->transId2 = (Uint32) (transId >> 32);
   tSignal.setLength(ScanNextReq::SignalLength);
 
-  const uint32 receiverId = 
-    emptyFrag.getResultStream(0).getReceiver().m_tcPtrI;
+  const uint32 receiverId = emptyFrag.getReceiverTcPtrI();
   LinearSectionIterator receiverIdIter(&receiverId ,1);
 
   GenericSectionPtr secs[1];
@@ -3302,9 +3466,9 @@ NdbQueryImpl::OrderedFragSet::getEmpty()
 bool 
 NdbQueryImpl::OrderedFragSet::verifySortOrder() const
 {
-  for(int i = 0; i<m_activeFragCount-2; i++)
+  for (int i = 0; i<m_activeFragCount-1; i++)
   {
-    if(compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
+    if (compare(*m_activeFrags[i], *m_activeFrags[i+1]) < 0)
     {
       assert(false);
       return false;
@@ -3364,10 +3528,7 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_parent(NULL),
   m_children(def.getNoOfChildOperations()),
   m_maxBatchRows(0),   // >0: User specified prefered value, ==0: Use default CFG values
-  m_resultStreams(NULL),
   m_params(),
-  m_bufferSize(0),
-  m_batchOverflowCheck(NULL),
   m_resultBuffer(NULL),
   m_resultRef(NULL),
   m_isRowNull(true),
@@ -3416,10 +3577,10 @@ NdbQueryOperationImpl::NdbQueryOperation
 
 NdbQueryOperationImpl::~NdbQueryOperationImpl()
 {
-  // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
-  // Either by fetching through last row, or calling ::close() which forcefully terminates fetch
-  assert (m_batchOverflowCheck == NULL);
-  assert (m_resultStreams == NULL);
+  /**
+   * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
+   * Either by fetching through last row, or calling ::close() which forcefully terminates fetch
+   */
   assert (m_firstRecAttr == NULL);
   assert (m_interpretedCode == NULL);
 } //NdbQueryOperationImpl::~NdbQueryOperationImpl()
@@ -3431,22 +3592,6 @@ NdbQueryOperationImpl::~NdbQueryOperatio
 void
 NdbQueryOperationImpl::postFetchRelease()
 {
-  // Buffer overrun check.
-  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck ==  0xacbd1234);
-  m_batchOverflowCheck = NULL;
-  
-  if (m_resultStreams != NULL)
-  { 
-    for (int i = static_cast<int>(getQuery().getRootFragCount())-1; i >= 0; i--)
-    {
-      if (m_resultStreams[i] != NULL)
-      {
-        m_resultStreams[i]->~NdbResultStream();
-      }
-    }
-  }
-  m_resultStreams = NULL;
-
   Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
   NdbRecAttr* recAttr = m_firstRecAttr;
   while (recAttr != NULL) {
@@ -3680,7 +3825,7 @@ NdbQueryOperationImpl::firstResult()
 
   if (rootFrag != NULL)
   {
-    NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+    NdbResultStream& resultStream = rootFrag->getResultStream(*this);
     if (resultStream.firstResult() != tupleNotFound)
     {
       fetchRow(resultStream);
@@ -3720,7 +3865,7 @@ NdbQueryOperationImpl::nextResult(bool f
     const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
     if (rootFrag!=NULL)
     {
-      NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+      NdbResultStream& resultStream = rootFrag->getResultStream(*this);
       if (resultStream.nextResult() != tupleNotFound)
       {
         fetchRow(resultStream);
@@ -3736,7 +3881,7 @@ NdbQueryOperationImpl::nextResult(bool f
 void 
 NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
 {
-  const char* buff = resultStream.getReceiver().get_row();
+  const char* buff = resultStream.getReceiver().peek_row();
   assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
 
   m_isRowNull = false;
@@ -4051,55 +4196,9 @@ NdbQueryOperationImpl::setBatchedRows(Ui
   }
 }
 
-
-int 
-NdbQueryOperationImpl::prepareReceiver()
-{
-  // Construct receiver streams and prepare them for receiving scan result
-  assert(m_resultStreams==NULL);
-  assert(m_queryImpl.getRootFragCount() > 0);
-
-  m_resultStreams = reinterpret_cast<NdbResultStream**>
-    (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount())); 
-
-  for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
-    m_resultStreams[i] = NULL;  // Init to legal contents for d'tor
-  }
-  for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
-    m_resultStreams[i] = new (getQuery().m_resultStreamAlloc.allocObjMem(1))
-      NdbResultStream(*this, i);
-    const int error = m_resultStreams[i]->prepare();
-    if (unlikely(error)) {
-      return error;
-    }
-
-    m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION, 
-                                        false, this);
-    char* const rowBuf = reinterpret_cast<char*>(getQuery().getRowBufferAlloc()
-                                                 .allocObjMem(m_bufferSize));
-    m_resultStreams[i]->getReceiver()
-      .do_setup_ndbrecord(m_ndbRecord,
-                          getMaxBatchRows(), 
-                          0 /*key_size*/, 
-                          0 /*read_range_no*/, 
-                          getRowSize(),
-                          rowBuf);
-    m_resultStreams[i]->getReceiver().prepareSend();
-  }
-  // So that we can test for for buffer overrun.
-  m_batchOverflowCheck = 
-    reinterpret_cast<Uint32*>(getQuery().getRowBufferAlloc()
-                              .allocObjMem(sizeof(Uint32)));
-  *m_batchOverflowCheck = 0xacbd1234;
-  return 0;
-}//NdbQueryOperationImpl::prepareReceiver
-
 int 
 NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
 {
-  // ::prepareReceiver() need to complete first:
-  assert (m_resultStreams != NULL);
-
   const NdbQueryOperationDefImpl& def = getQueryOperationDef();
 
   /**
@@ -4553,10 +4652,12 @@ NdbQueryOperationImpl::prepareLookupKeyI
 bool 
 NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
 {
+  TupleCorrelation tupleCorrelation;
   NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
-  Uint32 rootFragNo = 0;
+
   if (getQueryDef().isScanQuery())
   {
+    const CorrelationData correlData(ptr, len);
     const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
@@ -4572,24 +4673,27 @@ NdbQueryOperationImpl::execTRANSID_AI(co
       assert(false);
       return false;
     }
-    rootFragNo = rootFrag->getFragNo();
+
+    // Extract tuple correlation.
+    tupleCorrelation = correlData.getTupleCorrelation();
+    len -= CorrelationData::wordCount;
   }
+
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" 
            << ", operation no: " << getQueryOperationDef().getQueryOperationIx()
-           << ", fragment no: " << rootFragNo
+           << ", fragment no: " << rootFrag->getFragNo()
            << endl;
   }
 
   // Process result values.
-  m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
-
+  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
   rootFrag->incrOutstandingResults(-1);
 
   bool ret = false;
   if (rootFrag->isFragBatchComplete())
   {
-    ret = m_queryImpl.handleBatchComplete(rootFragNo);
+    ret = m_queryImpl.handleBatchComplete(*rootFrag);
   }
 
   if (traceSignals) {
@@ -4631,7 +4735,6 @@ NdbQueryOperationImpl::execTCKEYREF(cons
     }
   }
 
-  Uint32 rootFragNo = 0;
   NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
 
   if (ref->errorCode != DbspjErr::NodeFailure)
@@ -4656,13 +4759,12 @@ NdbQueryOperationImpl::execTCKEYREF(cons
   bool ret = false;
   if (rootFrag.isFragBatchComplete())
   { 
-    ret = m_queryImpl.handleBatchComplete(rootFragNo);
+    ret = m_queryImpl.handleBatchComplete(rootFrag);
   } 
 
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
-           << ", *getRoot().m_resultStreams[0] {" 
-           << *getRoot().m_resultStreams[0] << "}"
+           << ", resultStream= {" << rootFrag.getResultStream(*this) << "}"
            << ", *this=" << *this <<  endl;
   }
   return ret;
@@ -4695,47 +4797,24 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
     return false;
   }
   rootFrag->setConfReceived();
+  rootFrag->setRemainingSubScans(nodeMask);
   rootFrag->incrOutstandingResults(rowCount);
 
   // Handle for SCAN_NEXTREQ, RNIL -> EOF
-  NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+  NdbResultStream& resultStream = rootFrag->getResultStream(*this);
   resultStream.getReceiver().m_tcPtrI = tcPtrI;  
 
   if(traceSignals){
-    ndbout << "  resultStream(root) {" << resultStream << "} fragNo" 
-           << rootFrag->getFragNo() << endl;
+    ndbout << "  resultStream {" << rootFrag->getResultStream(*this)
+           << "} fragNo" << rootFrag->getFragNo()
+           << endl;
   }
 
-  const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef();
-  /* Mark each scan node to indicate if the current batch is the last in the
-   * current sub-scan or not.
-   */
-  for (Uint32 opNo = 0; opNo < queryDef.getNoOfOperations(); opNo++)
-  {
-    const NdbQueryOperationImpl& op = m_queryImpl.getQueryOperation(opNo);
-    /**
-     * Find the node number seen by the SPJ block. Since a unique index
-     * operation will have two distincts nodes in the tree used by the
-     * SPJ block, this number may be different from 'opNo'.
-     */
-    const Uint32 internalOpNo = op.getQueryOperationDef().getQueryOperationId();
-    assert(internalOpNo >= opNo);
-    const bool complete = ((nodeMask >> internalOpNo) & 1) == 0;
-
-    // Lookups should always be 'complete'
-    assert(complete ||  op.getQueryOperationDef().isScanOperation());
-    rootFrag->getResultStream(opNo).setSubScanCompletion(complete);
-  }
-  // Check that nodeMask does not have more bits than we have operations. 
-  assert(nodeMask >> 
-         (1+queryDef.getQueryOperation(queryDef.getNoOfOperations() - 1)
-          .getQueryOperationId()) == 0);
-
   bool ret = false;
   if (rootFrag->isFragBatchComplete())
   {
     /* This fragment is now complete */
-    ret = m_queryImpl.handleBatchComplete(rootFrag->getFragNo());
+    ret = m_queryImpl.handleBatchComplete(*rootFrag);
   }
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
@@ -4871,13 +4950,6 @@ int NdbQueryOperationImpl::setBatchSize(
   return 0;
 }
 
-NdbResultStream& 
-NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const
-{
-  assert(rootFragNo < getQuery().getRootFragCount());
-  return *m_resultStreams[rootFragNo];
-}
-
 bool
 NdbQueryOperationImpl::hasInterpretedCode() const
 {
@@ -4916,15 +4988,8 @@ NdbQueryOperationImpl::prepareInterprete
 
 Uint32 
 NdbQueryOperationImpl::getIdOfReceiver() const {
-  return m_resultStreams[0]->getReceiver().getId();
-}
-
-
-const NdbReceiver& 
-NdbQueryOperationImpl::getReceiver(Uint32 recNo) const {
-  assert(recNo<getQuery().getRootFragCount());
-  assert(m_resultStreams!=NULL);
-  return m_resultStreams[recNo]->getReceiver();
+  NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
+  return rootFrag.getResultStream(*this).getReceiver().getId();
 }
 
 Uint32 NdbQueryOperationImpl::getRowSize() const
@@ -4934,6 +4999,12 @@ Uint32 NdbQueryOperationImpl::getRowSize
   {
     m_rowSize = 
       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+
+    const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
+    if (withCorrelation)
+    {
+      m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
+    }
   }
   return m_rowSize;
 }
@@ -4953,7 +5024,8 @@ NdbOut& operator<<(NdbOut& out, const Nd
   out << "  m_queryImpl: " << &op.m_queryImpl;
   out << "  m_operationDef: " << &op.m_operationDef;
   for(Uint32 i = 0; i<op.m_queryImpl.getRootFragCount(); i++){
-    out << "  m_resultStream[" << i << "]{" << *op.m_resultStreams[i] << "}";
+    NdbRootFragment& rootFrag = op.m_queryImpl.m_rootFrags[i];
+    out << "  m_resultStream[" << i << "]{" << rootFrag.getResultStream(op) << "}";
   }
   out << " m_isRowNull " << op.m_isRowNull;
   out << " ]";

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-16 07:56:53 +0000
@@ -247,9 +247,15 @@ public:
   Uint32 getRootFragCount() const
   { return m_rootFragCount; }
 
+  NdbBulkAllocator& getResultStreamAlloc()
+  { return m_resultStreamAlloc; }
+
   NdbBulkAllocator& getTupleSetAlloc()
   { return m_tupleSetAlloc; }
 
+  NdbBulkAllocator& getRowBufferAlloc()
+  { return m_rowBufferAlloc; }
+
 private:
   /** Possible return values from NdbQueryImpl::awaitMoreResults. 
    * A subset of the integer values also matches those returned
@@ -577,13 +583,10 @@ private:
    *  the result.
    *  @return: 'true' if its time to resume appl. threads
    */ 
-  bool handleBatchComplete(Uint32 rootFragNo);
+  bool handleBatchComplete(NdbRootFragment& rootFrag);
 
   NdbBulkAllocator& getPointerAlloc()
   { return m_pointerAlloc; }
-  
-  NdbBulkAllocator& getRowBufferAlloc()
-  { return m_rowBufferAlloc; }
 
 }; // class NdbQueryImpl
 
@@ -705,10 +708,6 @@ public:
   int setInterpretedCode(const NdbInterpretedCode& code);
   bool hasInterpretedCode() const;
 
-  NdbResultStream& getResultStream(Uint32 rootFragNo) const;
-
-  const NdbReceiver& getReceiver(Uint32 rootFragNo) const;
-
   /** Verify magic number.*/
   bool checkMagicNumber() const
   { return m_magic == MAGIC; }
@@ -719,6 +718,12 @@ public:
   Uint32 getMaxBatchRows() const
   { return m_maxBatchRows; }
 
+  /** Get size of row as required to buffer it. */  
+  Uint32 getRowSize() const;
+
+  const NdbRecord* getNdbRecord() const
+  { return m_ndbRecord; }
+
 private:
 
   STATIC_CONST (MAGIC = 0xfade1234);
@@ -742,16 +747,9 @@ private:
   /** Max rows (per resultStream) in a scan batch.*/
   Uint32 m_maxBatchRows;
 
-  /** For processing results from this operation (Array of).*/
-  NdbResultStream** m_resultStreams;
   /** Buffer for parameters in serialized format */
   Uint32Buffer m_params;
 
-  /** Buffer size allocated for *each* ResultStream/Receiver when 
-   *  fetching results.*/
-  Uint32 m_bufferSize;
-  /** Used for checking if buffer overrun occurred. */
-  Uint32* m_batchOverflowCheck;
   /** User specified buffer for final storage of result.*/
   char* m_resultBuffer;
   /** User specified pointer to application pointer that should be 
@@ -819,9 +817,6 @@ private:
   Uint32 calculateBatchedRows(const NdbQueryOperationImpl* closestScan);
   void setBatchedRows(Uint32 batchedRows);
 
-  /** Construct and prepare receiver streams for result processing. */
-  int prepareReceiver();
-
   /** Prepare ATTRINFO for execution. (Add execution params++)
    *  @return possible error code.*/
   int prepareAttrInfo(Uint32Buffer& attrInfo);
@@ -863,7 +858,6 @@ private:
   bool diskInUserProjection() const
   { return m_diskInUserProjection; }
 
-  Uint32 getRowSize() const;
 }; // class NdbQueryOperationImpl
 
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-wl4124-new2 branch (pekka.nousiainen:4425to 4426) Pekka Nousiainen17 Aug