List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:August 16 2011 1:20pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4455 to 4456)
View as plain text  
 4456 Ole John Aske	2011-08-16
      SPJ: Refactoring of how & when the parent/child row correlation is handled:
      
      Previously the correlation id's was decoded, and the parent/child 
      hashMaps built as part of execTRANSID_AI().
      
      This work is now deferred until a complete batch of result rows has been
      received - and at this point it is handled by the thread which was waiting
      for the result batch instead of the receiver thread.
      
      Note: This patch is also a step in the direction of moving building
      of the correlation hashmap entirely outside transporter mutex protected
      code.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 4455 Ole John Aske	2011-08-16
      Cherry picked fix for Bug#12798270 into 'mysql-5.1-telco-7.0' as this branch also contained
      the (cherry picked) fix for Bug 11764737 which made this problem to surface.

    modified:
      mysql-test/r/group_by.result
      mysql-test/t/group_by.test
      sql/sql_select.cc
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-16 08:04:43 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-16 13:20:11 +0000
@@ -87,6 +87,32 @@ const bool traceSignals = false;
  * children. That way, results for child operations can be updated correctly
  * when the application iterates over the results of the root scan operation.
  */
+class TupleCorrelation
+{
+public:
+  static const Uint32 wordCount = 1;
+
+  explicit TupleCorrelation()
+  : m_correlation((tupleNotFound<<16) | tupleNotFound)
+  {}
+
+  /** Conversion to/from Uint32 to store/fetch from buffers */
+  explicit TupleCorrelation(Uint32 val)
+  : m_correlation(val)
+  {}
+  Uint32 toUint32() const 
+  { return m_correlation; }
+
+  Uint16 getTupleId() const
+  { return m_correlation & 0xffff;}
+
+  Uint16 getParentTupleId() const
+  { return m_correlation >> 16;}
+
+private:
+  Uint32 m_correlation;
+}; // class TupleCorrelation
+
 class CorrelationData
 {
 public:
@@ -99,18 +125,15 @@ public:
     assert(AttributeHeader(m_corrPart[0]).getAttributeId() 
            == AttributeHeader::CORR_FACTOR64);
     assert(AttributeHeader(m_corrPart[0]).getByteSize() == 2*sizeof(Uint32));
-    assert(getTupleId()<tupleNotFound);
-    assert(getParentTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getTupleId()<tupleNotFound);
+    assert(getTupleCorrelation().getParentTupleId()<tupleNotFound);
   }
 
   Uint32 getRootReceiverId() const
   { return m_corrPart[2];}
 
-  Uint16 getTupleId() const
-  { return m_corrPart[1] & 0xffff;}
-
-  Uint16 getParentTupleId() const
-  { return m_corrPart[1] >> 16;}
+  const TupleCorrelation getTupleCorrelation() const
+  { return TupleCorrelation(m_corrPart[1]); }
 
 private:
   const Uint32* const m_corrPart;
@@ -166,6 +189,11 @@ public:
    */
   void reset();
 
+  /**
+   * Prepare for reading another batch of results.
+   */
+  void prepareResultSet();
+
   void incrOutstandingResults(Int32 delta)
   {
     m_outstandingResults += delta;
@@ -307,6 +335,9 @@ public:
   Uint32 getRowCount() const
   { return m_rowCount; }
 
+  char* getRow(Uint32 tupleNo) const
+  { return (m_buffer + (tupleNo*m_rowSize)); }
+
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
    * ids and pass it on to m_receiver.
@@ -314,12 +345,15 @@ public:
    * @param ptr buffer holding tuple.
    * @param len buffer length.
    */
-  void execTRANSID_AI(const Uint32 *ptr, Uint32 len);
+  void execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                      TupleCorrelation correlation);
 
-  /** A complete batch has been received for a fragment on this NdbResultStream,
-   *  Update whatever required before the appl. are allowed to navigate the result.
+  /**
+   * A complete batch has been received for a fragment on this NdbResultStream,
+   * Update whatever required before the appl. are allowed to navigate the result.
+   * @return true if node and all its siblings have returned all rows.
    */ 
-  void handleBatchComplete();
+  bool prepareResultSet();
 
   /**
    * Navigate within the current result batch to resp. first and next row.
@@ -364,12 +398,6 @@ public:
     return m_subScanComplete; 
   }
 
-  /** Variant of isSubScanComplete() above which checks that this resultstream
-   * and all its descendants have consumed all batches of rows instantiated 
-   * from their parent operation(s).
-   */
-  bool isAllSubScansComplete() const;
-
   bool isScanQuery() const
   { return (m_properties & Is_Scan_Query); }
 
@@ -420,7 +448,7 @@ public:
      * that had no matching children.*/
     Bitmask<(NDB_SPJ_MAX_TREE_NODES+31)/32> m_hasMatchingChild;
 
-    explicit TupleSet()
+    explicit TupleSet() : m_hash_head(tupleNotFound)
     {}
 
   private:
@@ -452,9 +480,14 @@ private:
   /** The receiver object that unpacks transid_AI messages.*/
   NdbReceiver m_receiver;
 
+  /** The buffers which we receive the results into */
+  char* m_buffer;
+
   /** Used for checking if buffer overrun occurred. */
   Uint32* m_batchOverflowCheck;
 
+  Uint32 m_rowSize;
+
   /** The number of transid_AI messages received.*/
   Uint32 m_rowCount;
 
@@ -489,11 +522,7 @@ private:
   /** TupleSet contains the correlation between parent/childs */
   TupleSet* m_tupleSet;
 
-  void clearTupleSet();
-
-  void setParentChildMap(Uint16 parentId,
-                         Uint16 tupleId, 
-                         Uint16 tupleNo);
+  void buildResultCorrelations();
 
   Uint16 getTupleId(Uint16 tupleNo) const
   { return (m_tupleSet) ? m_tupleSet[tupleNo].m_tupleId : 0; }
@@ -576,7 +605,9 @@ NdbResultStream::NdbResultStream(NdbQuer
      | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
        ? Is_Inner_Join : 0))),
   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
+  m_buffer(NULL),
   m_batchOverflowCheck(NULL),
+  m_rowSize(0),
   m_rowCount(0),
   m_iterState(Iter_notStarted),
   m_currentRow(tupleNotFound),
@@ -599,6 +630,8 @@ NdbResultStream::prepare()
   const Uint32 rowSize = m_operation.getRowSize();
   NdbQueryImpl &query = m_operation.getQuery();
 
+  m_rowSize = rowSize;
+
   /* Parent / child correlation is only relevant for scan type queries
    * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
    * Neither is these structures required for operations not having respective
@@ -610,16 +643,13 @@ NdbResultStream::prepare()
     m_tupleSet = 
       new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) 
       TupleSet[m_maxRows];
-
-    clearTupleSet();
   }
   else
     m_maxRows = 1;
 
   const int bufferSize = rowSize * m_maxRows;
   NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
-  char* const rowBuf = reinterpret_cast<char*>(bufferAlloc
-                                               .allocObjMem(bufferSize));
+  m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
 
   // So that we can test for buffer overrun.
   m_batchOverflowCheck = 
@@ -633,7 +663,7 @@ NdbResultStream::prepare()
                           0 /*key_size*/, 
                           0 /*read_range_no*/, 
                           rowSize,
-                          rowBuf);
+                          m_buffer);
 } //NdbResultStream::prepare
 
 void
@@ -646,7 +676,6 @@ NdbResultStream::reset()
   m_iterState = Iter_notStarted;
   m_currentRow = tupleNotFound;
 
-  clearTupleSet();
   m_receiver.prepareSend();
   /**
    * If this stream will get new rows in the next batch, then so will
@@ -660,84 +689,10 @@ NdbResultStream::reset()
   }
 } //NdbResultStream::reset
 
-void
-NdbResultStream::clearTupleSet()
-{
-  assert (isScanQuery());
-  for (Uint32 i=0; i<m_maxRows; i++)
-  {
-    m_tupleSet[i].m_parentId = tupleNotFound;
-    m_tupleSet[i].m_tupleId  = tupleNotFound;
-    m_tupleSet[i].m_hash_head = tupleNotFound;
-    m_tupleSet[i].m_skip = false;
-    m_tupleSet[i].m_hasMatchingChild.clear();
-  }
-}
-
-bool
-NdbResultStream::isAllSubScansComplete() const
-{ 
-  // Lookups should always be 'complete'
-  assert(m_subScanComplete || isScanResult());
-
-  if (!m_subScanComplete)
-    return false;
-
-  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations(); 
-       childNo++)
-  {
-    const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    const NdbResultStream& childStream = m_rootFrag.getResultStream(child);
-    if (!childStream.isAllSubScansComplete())
-      return false;
-  }
-  return true;
-} //NdbResultStream::isAllSubScansComplete
-
-
-void
-NdbResultStream::setParentChildMap(Uint16 parentId,
-                                   Uint16 tupleId, 
-                                   Uint16 tupleNo)
-{
-  assert (isScanQuery());
-  assert (tupleNo < m_maxRows);
-  assert (tupleId != tupleNotFound);
-
-  for (Uint32 i = 0; i < tupleNo; i++)
-  {
-    // Check that tuple id is unique.
-    assert (m_tupleSet[i].m_tupleId != tupleId); 
-  }
-  m_tupleSet[tupleNo].m_parentId = parentId;
-  m_tupleSet[tupleNo].m_tupleId  = tupleId;
-
-  const Uint16 hash = (parentId % m_maxRows);
-  if (parentId == tupleNotFound)
-  {
-    /* Root stream: Insert sequentially in hash_next to make it
-     * possible to use ::findTupleWithParentId() and ::findNextTuple()
-     * to navigate even the root operation.
-     */
-    assert (m_parent==NULL);
-    /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
-    if (tupleNo==0)
-      m_tupleSet[hash].m_hash_head  = 0;
-    else
-      m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
-    m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
-  }
-  else
-  {
-    /* Insert parentId in HashMap */
-    m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
-    m_tupleSet[hash].m_hash_head  = tupleNo;
-  }
-}
-
 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
  *  parentId == tupleNotFound is use as a special value for iterating results
- *  from the root operation in the order which they was inserted by ::setParentChildMap()
+ *  from the root operation in the order which they was inserted by 
+ *  ::buildResultCorrelations()
  *
  *  Position of 'currentRow' is *not* updated and should be modified by callee
  *  if it want to keep the new position.
@@ -843,99 +798,188 @@ NdbResultStream::nextResult()
 } //NdbResultStream::nextResult()
 
 
+/**
+ * Callback when a TRANSID_AI signal (receive row) is processed.
+ */
 void
-NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len)
+NdbResultStream::execTRANSID_AI(const Uint32 *ptr, Uint32 len,
+                                TupleCorrelation correlation)
 {
-  assert(m_iterState == Iter_notStarted);
+  m_receiver.execTRANSID_AI(ptr, len);
+  m_rowCount++;
+
   if (isScanQuery())
   {
-    const CorrelationData correlData(ptr, len);
-
-    assert(m_rootFrag.getResultStream(0).m_receiver.getId() ==
-           correlData.getRootReceiverId());
-
-    m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
-
     /**
-     * Keep correlation data between parent and child tuples.
-     * Since tuples may arrive in any order, we cannot match
-     * parent and child until all tuples (for this batch and 
-     * root fragment) have arrived.
+     * Store TupleCorrelation as hidden value imm. after received row
+     * (NdbQueryOperationImpl::getRowSize() has reserved space for it)
      */
-    setParentChildMap(m_parent==NULL
-                      ? tupleNotFound
-                      : correlData.getParentTupleId(),
-                      correlData.getTupleId(),
-                      m_rowCount);
+    Uint32* row_recv = reinterpret_cast<Uint32*>(m_receiver.m_record.m_row);
+    row_recv[-1] = correlation.toUint32();
   }
-  else
-  {
-    // Lookup query.
-    m_receiver.execTRANSID_AI(ptr, len);
-  }
-  m_rowCount++;
-  /* Set correct #rows received in the NdbReceiver.
-   */
-  getReceiver().m_result_rows = getRowCount();
 } // NdbResultStream::execTRANSID_AI()
 
-
 /**
- * A fresh batch of results has arrived for this ResultStream (and all its parent / childs)
- * Filter away any result rows which should not be visible (yet) - Either due to incomplete
- * child batches, or the join being an 'inner join'.
- * Set result itterator state to 'before first' resultrow.
+ * Make preparations for another batch of result to be read:
+ *  - Fill in parent/child result correlations in m_tupleSet[]
+ *  - ... or reset m_tupleSet[] if we reuse the previous.
+ *  - Apply inner/outer join filtering to remove non qualifying 
+ *    rows.
  */
-void 
-NdbResultStream::handleBatchComplete()
+bool 
+NdbResultStream::prepareResultSet()
 {
-  // Buffer overrun check.
-  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+  bool isComplete = isSubScanComplete();  //Childs with more rows
+  assert(isComplete || isScanResult());   //Lookups always 'complete'
+
+  // Set correct #rows received in the NdbReceiver.
+  getReceiver().m_result_rows = getRowCount();
 
-  for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+  /**
+   * Prepare NdbResultStream for reading - either the next received
+   * from datanodes or reuse current.
+   */
+  if (m_tupleSet!=NULL)
   {
-    m_tupleSet[tupleNo].m_skip = false;
+    const bool newResults = (m_iterState!=Iter_finished);
+    if (newResults)
+    {
+      buildResultCorrelations();
+    }
+    else
+    {
+      // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      {
+        m_tupleSet[tupleNo].m_skip = false;
+      }
+    }
   }
 
+  /**
+   * Recursively iterate all child results depth first. 
+   * Filter away any result rows which should not be visible (yet) - 
+   * Either due to incomplete child batches, or the join being an 'inner join'.
+   * Set result itterator state to 'before first' resultrow.
+   */
   for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
   {
     const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
     NdbResultStream& childStream = m_rootFrag.getResultStream(child);
-    childStream.handleBatchComplete();
+    const bool allSubScansComplete = childStream.prepareResultSet();
+
+    Uint32 childId = child.getQueryOperationDef().getQueryOperationIx();
 
-    const bool isInnerJoin = childStream.isInnerJoin();
-    const bool allSubScansComplete = childStream.isAllSubScansComplete();
+    /* Condition 1) & 2) calc'ed outside loop, see comments further below: */
+    const bool skipNonMatches = !allSubScansComplete ||      // 1)
+                                childStream.isInnerJoin();   // 2)
 
-    for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+    if (m_tupleSet!=NULL)
     {
-      if (!m_tupleSet[tupleNo].m_skip)
+      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
       {
-        Uint16 tupleId = getTupleId(tupleNo);
-        if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
-          m_tupleSet[tupleNo].m_hasMatchingChild.set(childNo);
-
-        /////////////////////////////////
-        //  No child matched for this row. Making parent row visible
-        //  will cause a NULL (outer join) row to be produced.
-        //  Skip NULL row production when:
-        //    1) Some child batches are not complete; they may contain later matches.
-        //    2) A match was found in a previous batch.
-        //    3) Join type is 'inner join', skip as no child are matching.
-        //
-        else if (!allSubScansComplete                                 // 1)
-             ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childNo)  // 2)
-             ||  isInnerJoin)                                         // 3)
-          m_tupleSet[tupleNo].m_skip = true;
+        if (!m_tupleSet[tupleNo].m_skip)
+        {
+          Uint16 tupleId = getTupleId(tupleNo);
+          if (childStream.findTupleWithParentId(tupleId)!=tupleNotFound)
+            m_tupleSet[tupleNo].m_hasMatchingChild.set(childId);
+
+          /////////////////////////////////
+          //  No child matched for this row. Making parent row visible
+          //  will cause a NULL (outer join) row to be produced.
+          //  Skip NULL row production when:
+          //    1) Some child batches are not complete; they may contain later matches.
+          //    2) Join type is 'inner join', skip as no child are matching.
+          //    3) A match was found in a previous batch.
+          //  Condition 1) & 2) above is precalculated in 'bool skipNonMatches'
+          //
+          else if (skipNonMatches                                       // 1 & 2)
+               ||  m_tupleSet[tupleNo].m_hasMatchingChild.get(childId)) // 3)
+            m_tupleSet[tupleNo].m_skip = true;
+        }
       }
     }
+    isComplete &= allSubScansComplete;
   }
-  m_currentRow = tupleNotFound;
+
+  // Set current position 'before first'
   m_iterState = Iter_notStarted;
-} // NdbResultStream::handleBatchComplete()
+  m_currentRow = tupleNotFound;
+
+  return isComplete; 
+} // NdbResultStream::prepareResultSet()
+
+
+/**
+ * Fill m_tupleSet[] with correlation data between parent 
+ * and child tuples. The 'TupleCorrelation' is stored as
+ * and extra Uint32 after each row received
+ * by execTRANSID_AI().
+ *
+ * NOTE: In order to reduce work done when holding the 
+ * transporter mutex, the 'TupleCorrelation' is only stored
+ * in the buffer when it arrives. Later (here) we build the
+ * correlation hashMap immediately before we prepare to 
+ * read the NdbResultStream.
+ */
+void 
+NdbResultStream::buildResultCorrelations()
+{
+  // Buffer overrun check.
+  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
+//if (m_tupleSet!=NULL)
+  {
+    /* Clear the hashmap structures */
+    for (Uint32 i=0; i<m_maxRows; i++)
+    {
+      m_tupleSet[i].m_hash_head = tupleNotFound;
+    }
+
+    /* Rebuild correlation & hashmap from received buffers */
+    for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+    {
+      const Uint32* row = (Uint32*)getRow(tupleNo+1);
+      const TupleCorrelation correlation(row[-1]);
+
+      const Uint16 tupleId  = correlation.getTupleId();
+      const Uint16 parentId = (m_parent!=NULL) 
+                                ? correlation.getParentTupleId()
+                                : tupleNotFound;
+
+      m_tupleSet[tupleNo].m_skip     = false;
+      m_tupleSet[tupleNo].m_parentId = parentId;
+      m_tupleSet[tupleNo].m_tupleId  = tupleId;
+      m_tupleSet[tupleNo].m_hasMatchingChild.clear();
+
+      /* Insert into parentId-hashmap */
+      const Uint16 hash = (parentId % m_maxRows);
+      if (m_parent==NULL)
+      {
+        /* Root stream: Insert sequentially in hash_next to make it
+         * possible to use ::findTupleWithParentId() and ::findNextTuple()
+         * to navigate even the root operation.
+         */
+        /* Link into m_hash_next in order to let ::findNextTuple() navigate correctly */
+        if (tupleNo==0)
+          m_tupleSet[hash].m_hash_head  = tupleNo;
+        else
+          m_tupleSet[tupleNo-1].m_hash_next  = tupleNo;
+        m_tupleSet[tupleNo].m_hash_next  = tupleNotFound;
+      }
+      else
+      {
+        /* Insert parentId in HashMap */
+        m_tupleSet[tupleNo].m_hash_next = m_tupleSet[hash].m_hash_head;
+        m_tupleSet[hash].m_hash_head  = tupleNo;
+      }
+    }
+  }
+} // NdbResultStream::buildResultCorrelations
 
 
 ///////////////////////////////////////////
-/////////  NdbRootFragment methods ///////////
+////////  NdbRootFragment methods /////////
 ///////////////////////////////////////////
 void NdbRootFragment::buildReciverIdMap(NdbRootFragment* frags, 
                                         Uint32 noOfFrags)
@@ -1056,6 +1100,16 @@ void NdbRootFragment::reset()
   m_confReceived = false;
 }
 
+void NdbRootFragment::prepareResultSet()
+{
+  NdbResultStream& rootStream = getResultStream(0);
+  rootStream.prepareResultSet();  
+
+  /* Position at the first (sorted?) row available from this fragments.
+   */
+  rootStream.firstResult();
+}
+
 void NdbRootFragment::setConfReceived()
 { 
   /* For a query with a lookup root, there may be more than one TCKEYCONF
@@ -2120,6 +2174,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
         NdbRootFragment* frag;
         while ((frag=m_fullFrags.pop()) != NULL)
         {
+          frag->prepareResultSet();
           m_applFrags.add(*frag);
         }
         if (m_applFrags.getCurrent() != NULL)
@@ -2174,6 +2229,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
     NdbRootFragment* frag;
     if ((frag=m_fullFrags.pop()) != NULL)
     {
+      frag->prepareResultSet();
       m_applFrags.add(*frag);
     }
     assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
@@ -2213,7 +2269,6 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
            << ", finalBatchFrags=" << m_finalBatchFrags
            <<  endl;
   }
-  bool resume = false;
 
   /* May received fragment data after a SCANREF() (timeout?) 
    * terminated the scan.  We are about to close this query, 
@@ -2221,7 +2276,6 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
    */
   if (likely(m_fullFrags.m_errorCode == 0))
   {
-    NdbQueryOperationImpl& root = getRoot();
     assert(rootFrag.isFragBatchComplete());
 
     assert(m_pendingFrags > 0);                // Check against underflow.
@@ -2234,34 +2288,14 @@ NdbQueryImpl::handleBatchComplete(NdbRoo
       assert(m_finalBatchFrags <= m_rootFragCount);
     }
 
-    if (getQueryDef().isScanQuery())
-    {
-      // Only required for scans
-      rootFrag.getResultStream(root).handleBatchComplete();  
-
-      // Only ordered scans has to wait until all pending completed
-      resume = (m_pendingFrags==0) ||
-               (root.m_ordering==NdbQueryOptions::ScanOrdering_unordered);
-    }
-    else
-    {
-      assert(rootFrag.getResultStream(root).getReceiver().m_tcPtrI==RNIL);
-      assert(m_finalBatchFrags==1);
-      assert(m_pendingFrags==0);  // Lookup query should be complete now.
-      resume = true;   
-    }
-
-    /* Position at the first (sorted?) row available from this fragments.
-     */
-    rootFrag.getResultStream(root).firstResult();
-
     /* When application thread ::awaitMoreResults() it will later be moved
      * from m_fullFrags to m_applFrags under mutex protection.
      */
     m_fullFrags.push(rootFrag);
+    return true;
   }
 
-  return resume;
+  return false;
 } // NdbQueryImpl::handleBatchComplete
 
 int
@@ -4619,10 +4653,12 @@ NdbQueryOperationImpl::prepareLookupKeyI
 bool 
 NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
 {
+  TupleCorrelation tupleCorrelation;
   NdbRootFragment* rootFrag = m_queryImpl.m_rootFrags;
-  Uint32 rootFragNo = 0;
+
   if (getQueryDef().isScanQuery())
   {
+    const CorrelationData correlData(ptr, len);
     const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
@@ -4638,18 +4674,21 @@ NdbQueryOperationImpl::execTRANSID_AI(co
       assert(false);
       return false;
     }
-    rootFragNo = rootFrag->getFragNo();
+
+    // Extract tuple correlation.
+    tupleCorrelation = correlData.getTupleCorrelation();
+    len -= CorrelationData::wordCount;
   }
+
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" 
            << ", operation no: " << getQueryOperationDef().getQueryOperationIx()
-           << ", fragment no: " << rootFragNo
+           << ", fragment no: " << rootFrag->getFragNo()
            << endl;
   }
 
   // Process result values.
-  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len);
-
+  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len, tupleCorrelation);
   rootFrag->incrOutstandingResults(-1);
 
   bool ret = false;
@@ -4985,6 +5024,12 @@ Uint32 NdbQueryOperationImpl::getRowSize
   {
     m_rowSize = 
       NdbReceiver::ndbrecord_rowsize(m_ndbRecord, m_firstRecAttr, 0, false);
+
+    const bool withCorrelation = getRoot().getQueryDef().isScanQuery();
+    if (withCorrelation)
+    {
+      m_rowSize += TupleCorrelation::wordCount*sizeof(Uint32);
+    }
   }
   return m_rowSize;
 }

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4455 to 4456) Ole John Aske17 Aug