List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:August 17 2011 1:28pm
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3540 to 3541)
View as plain text  
 3541 Ole John Aske	2011-08-17 [merge]
      Merge 70 -> 70-spj

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
 3540 Ole John Aske	2011-08-17 [merge]
      Merge

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 13:09:14 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 13:27:04 +0000
@@ -187,9 +187,9 @@ public:
   { return m_fragNo; }
 
   /**
-   * Prepare for receiving another batch.
+   * Prepare for receiving another batch of results.
    */
-  void reset();
+  void prepareNextReceiveSet();
 
   /**
    * Prepare for reading another batch of results.
@@ -403,7 +403,7 @@ public:
   void prepare();
 
   /** Prepare for receiving next batch of scan results. */
-  void reset();
+  void prepareNextReceiveSet();
     
   NdbReceiver& getReceiver()
   { return m_receiver; }
@@ -738,27 +738,6 @@ NdbResultStream::prepare()
                           m_resultSets[m_recv].m_buffer);
 } //NdbResultStream::prepare
 
-void
-NdbResultStream::reset()
-{
-  assert (isScanQuery());
-
-  m_iterState = Iter_notStarted;
-  m_currentRow = tupleNotFound;
-  m_resultSets[m_recv].prepareReceive(m_receiver);
-
-  /**
-   * If this stream will get new rows in the next batch, then so will
-   * all of its descendants.
-   */
-  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
-       childNo++)
-  {
-    NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    m_rootFrag.getResultStream(child).reset();
-  }
-} //NdbResultStream::reset
-
 /** Locate, and return 'tupleNo', of first tuple with specified parentId.
  *  parentId == tupleNotFound is use as a special value for iterating results
  *  from the root operation in the order which they was inserted by 
@@ -851,7 +830,6 @@ NdbResultStream::firstResult()
   return tupleNotFound;
 } //NdbResultStream::firstResult()
 
-
 Uint16
 NdbResultStream::nextResult()
 {
@@ -867,7 +845,6 @@ NdbResultStream::nextResult()
   return tupleNotFound;
 } //NdbResultStream::nextResult()
 
-
 /**
  * Callback when a TRANSID_AI signal (receive row) is processed.
  */
@@ -891,6 +868,32 @@ NdbResultStream::execTRANSID_AI(const Ui
 } // NdbResultStream::execTRANSID_AI()
 
 /**
+ * Make preparation for another batch of results to be received.
+ * This NdbResultStream, and all its sibling will receive a batch
+ * of results from the datanodes.
+ */
+void
+NdbResultStream::prepareNextReceiveSet()
+{
+  assert (isScanQuery());
+
+  m_iterState = Iter_notStarted;
+  m_currentRow = tupleNotFound;
+  m_resultSets[m_recv].prepareReceive(m_receiver);
+
+  /**
+   * If this stream will get new rows in the next batch, then so will
+   * all of its descendants.
+   */
+  for (Uint32 childNo = 0; childNo < m_operation.getNoOfChildOperations();
+       childNo++)
+  {
+    NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
+    m_rootFrag.getResultStream(child).prepareNextReceiveSet();
+  }
+} //NdbResultStream::prepareNextReceiveSet
+
+/**
  * Make preparations for another batch of result to be read:
  *  - Fill in parent/child result correlations in m_tupleSet[]
  *  - ... or reset m_tupleSet[] if we reuse the previous.
@@ -911,7 +914,8 @@ NdbResultStream::prepareResultSet(Uint32
 
   /**
    * Prepare NdbResultSet for reading - either the next received
-   * from datanodes or reuse current.
+   * from datanodes or reuse the last as has been determined by 
+   * ::prepareNextReceiveSet()
    */
   if (m_tupleSet!=NULL)
   {
@@ -1210,7 +1214,7 @@ bool NdbRootFragment::hasReceivedMore() 
   return (m_availResultSets > 0);
 }
 
-void NdbRootFragment::reset()
+void NdbRootFragment::prepareNextReceiveSet()
 {
   assert(m_fragNo!=voidFragNo);
   assert(m_outstandingResults == 0);
@@ -1218,13 +1222,14 @@ void NdbRootFragment::reset()
 
   for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++) 
   {
-    if (!m_resultStreams[opNo].isSubScanComplete(m_remainingScans))
+    NdbResultStream& resultStream = getResultStream(opNo);
+    if (!resultStream.isSubScanComplete(m_remainingScans))
     {
       /**
-       * Reset m_resultStreams[] and all its descendants, since all these
+       * Reset resultStream and all its descendants, since all these
        * streams will get a new set of rows in the next batch.
        */ 
-      m_resultStreams[opNo].reset();
+      resultStream.prepareNextReceiveSet();
     }
   }
   m_confReceived = false;
@@ -3194,7 +3199,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
     NdbRootFragment* rootFrag = rootFrags[i];
     assert(rootFrag->isFragBatchComplete());
     assert(!rootFrag->finalBatchReceived());
-    rootFrag->reset();
+    rootFrag->prepareNextReceiveSet();
   }
 
   Ndb& ndb = *getNdbTransaction().getNdb();
@@ -3383,22 +3388,27 @@ int NdbQueryImpl::isPrunable(bool& pruna
 NdbQueryImpl::OrderedFragSet::OrderedFragSet():
   m_capacity(0),
   m_activeFragCount(0),
-  m_emptiedFragCount(0),
+  m_fetchMoreFragCount(0),
   m_finalFragCount(0),
   m_ordering(NdbQueryOptions::ScanOrdering_void),
   m_keyRecord(NULL),
   m_resultRecord(NULL),
   m_activeFrags(NULL),
-  m_emptiedFrags(NULL)
+  m_fetchMoreFrags(NULL)
 {
 }
 
 NdbQueryImpl::OrderedFragSet::~OrderedFragSet() 
 { 
   m_activeFrags = NULL;
-  m_emptiedFrags= NULL;
+  m_fetchMoreFrags = NULL;
 }
 
+void NdbQueryImpl::OrderedFragSet::clear() 
+{ 
+  m_activeFragCount = 0;
+  m_fetchMoreFragCount = 0; 
+}
 
 void
 NdbQueryImpl::OrderedFragSet::prepare(NdbBulkAllocator& allocator,
@@ -3419,9 +3429,9 @@ NdbQueryImpl::OrderedFragSet::prepare(Nd
       reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity)); 
     bzero(m_activeFrags, capacity * sizeof(NdbRootFragment*));
 
-    m_emptiedFrags = 
+    m_fetchMoreFrags = 
       reinterpret_cast<NdbRootFragment**>(allocator.allocObjMem(capacity));
-    bzero(m_emptiedFrags, capacity * sizeof(NdbRootFragment*));
+    bzero(m_fetchMoreFrags, capacity * sizeof(NdbRootFragment*));
   }
   m_ordering = ordering;
   m_keyRecord = keyRecord;
@@ -3486,10 +3496,10 @@ NdbQueryImpl::OrderedFragSet::reorganize
     }
     else
     {
-      m_emptiedFrags[m_emptiedFragCount++] = frag;
+      m_fetchMoreFrags[m_fetchMoreFragCount++] = frag;
     }
     m_activeFragCount--;
-    assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
+    assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount 
            <= m_capacity);
 
     return;  // Remaining m_activeFrags[] are sorted
@@ -3539,7 +3549,7 @@ NdbQueryImpl::OrderedFragSet::reorganize
     }
     assert(verifySortOrder());
   }
-  assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
+  assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount 
          <= m_capacity);
 } // OrderedFragSet::reorganize()
 
@@ -3565,23 +3575,16 @@ NdbQueryImpl::OrderedFragSet::prepareMor
     }
   } // for all 'rootFrags[]'
 
-  assert(m_activeFragCount + m_emptiedFragCount + m_finalFragCount 
+  assert(m_activeFragCount + m_fetchMoreFragCount + m_finalFragCount 
          <= m_capacity);
 } // OrderedFragSet::prepareMoreResults()
 
-void NdbQueryImpl::OrderedFragSet::clear() 
-{ 
-  m_activeFragCount = 0;
-  m_emptiedFragCount = 0; 
-  m_finalFragCount = 0;
-}
-
 Uint32 
 NdbQueryImpl::OrderedFragSet::getFetchMore(NdbRootFragment** &frags)
 {
-  const Uint32 cnt = m_emptiedFragCount;
-  frags = m_emptiedFrags;
-  m_emptiedFragCount = 0;
+  const int cnt = m_fetchMoreFragCount;
+  frags = m_fetchMoreFrags;
+  m_fetchMoreFragCount = 0;
   return cnt;
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-17 12:06:04 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-17 13:16:22 +0000
@@ -333,14 +333,16 @@ private:
 
   private:
 
-    /** Max no of fragments.*/
+    /** No of fragments to read until '::finalBatchReceived()'.*/
     int m_capacity;
     /** Number of fragments in 'm_activeFrags'.*/
     int m_activeFragCount;
-    /** Number of fragments in 'm_emptiedFrags'. */
-    int m_emptiedFragCount;
-    /** Number of fragments where the final batch has been received
-     * and consumed.*/
+    /** Number of fragments in 'm_fetchMoreFrags'. */
+    int m_fetchMoreFragCount;
+    /**
+     * Number of fragments where the final batch has been received
+     * and consumed.
+     */
     int m_finalFragCount;
     /** Ordering of index scan result.*/
     NdbQueryOptions::ScanOrdering m_ordering;
@@ -348,15 +350,18 @@ private:
     const NdbRecord* m_keyRecord;
     /** Needed for comparing records when ordering results.*/
     const NdbRecord* m_resultRecord;
-    /** Fragments where some tuples in the current batch has not yet been 
-     * consumed.*/
+    /**
+     * Fragments where some tuples in the current ResultSet has not 
+     * yet been consumed.
+     */
     NdbRootFragment** m_activeFrags;
-    /** Fragments where all tuples in the current batch have been consumed, 
-     * but where there are more batches to fetch.*/
-    NdbRootFragment** m_emptiedFrags;
-    // No copying.
-    OrderedFragSet(const OrderedFragSet&);
-    OrderedFragSet& operator=(const OrderedFragSet&);
+    /**
+     * Fragments from which we should request more ResultSets.
+     * Either due to the current batch has been consumed, or double buffering
+     * of result sets allows us to request another batch before the current
+     * has been consumed.
+     */
+    NdbRootFragment** m_fetchMoreFrags;
 
     /** Add a complete fragment that has been received.*/
     void add(NdbRootFragment& frag);
@@ -369,6 +374,10 @@ private:
 
     /** For debugging purposes.*/
     bool verifySortOrder() const;
+
+    // No copying.
+    OrderedFragSet(const OrderedFragSet&);
+    OrderedFragSet& operator=(const OrderedFragSet&);
   }; // class OrderedFragSet
 
   /** The interface that is visible to the application developer.*/

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3540 to 3541) Ole John Aske22 Aug