List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:August 17 2011 1:03pm
Subject:bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4464 to 4465)
View as plain text  
 4465 Ole John Aske	2011-08-17
      SPJ: Introduce the helper class NdbResultSet which manage the receive/read
      buffer for class NdbResultStream.
            
      The intention is to later change class NdbResultStream to have
      multiple NdbResultSets -> double buffered results.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 4464 Ole John Aske	2011-08-17
      SPJ extensions:In preparation for implementing handling of double buffered result
      sets in class NdbReceiver, extend NdbReceiver with two new methods to resp.
      set buffer positions for receiving and reading result rows into/from
      the NdbReceiver.
            
      NOTE: Different receive and read position did already exists 
      as part of the NdbReceiver - This patch only extend the
      interface to be able to set it from the outside.

    modified:
      storage/ndb/include/ndbapi/NdbReceiver.hpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbReceiver.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 12:36:56 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-17 13:03:23 +0000
@@ -325,6 +325,55 @@ private:
   int m_idMapNext;
 }; //NdbRootFragment
 
+/**
+ * 'class NdbResultSet' is a helper for 'class NdbResultStream'.
+ *  It manages the buffers which rows are received into and
+ *  read from.
+ */
+class NdbResultSet
+{
+  friend class NdbResultStream;
+
+public:
+  explicit NdbResultSet();
+
+  void init(NdbQueryImpl& query,
+            Uint32 maxRows, Uint32 rowSize);
+
+  void prepareReceive(NdbReceiver& receiver)
+  {
+    m_rowCount = 0;
+    receiver.prepareReceive(m_buffer);
+  }
+
+  void prepareRead(NdbReceiver& receiver)
+  {
+    receiver.prepareRead(m_buffer,m_rowCount);
+  }
+
+  Uint32 getRowCount() const
+  { return m_rowCount; }
+
+  char* getRow(Uint32 tupleNo) const
+  { return (m_buffer + (tupleNo*m_rowSize)); }
+
+private:
+  /** No copying.*/
+  NdbResultSet(const NdbResultSet&);
+  NdbResultSet& operator=(const NdbResultSet&);
+
+  /** The buffers which we receive the results into */
+  char* m_buffer;
+
+  /** Used for checking if buffer overrun occurred. */
+  Uint32* m_batchOverflowCheck;
+
+  Uint32 m_rowSize;
+
+  /** The current #rows in 'm_buffer'.*/
+  Uint32 m_rowCount;
+
+}; // class NdbResultSet
 
 /** 
  * This class manages the subset of result data for one operation that is 
@@ -362,11 +411,8 @@ public:
   const NdbReceiver& getReceiver() const
   { return m_receiver; }
 
-  Uint32 getRowCount() const
-  { return m_rowCount; }
-
-  char* getRow(Uint32 tupleNo) const
-  { return (m_buffer + (tupleNo*m_rowSize)); }
+  const char* getCurrentRow() const
+  { return m_receiver.peek_row(); }
 
   /**
    * Process an incomming tuple for this stream. Extract parent and own tuple 
@@ -386,9 +432,9 @@ public:
   bool prepareResultSet(Uint32 remainingScans);
 
   /**
-   * Navigate within the current result batch to resp. first and next row.
+   * Navigate within the current ResultSet to resp. first and next row.
    * For non-parent operations in the pushed query, navigation is with respect
-   * to any preceding parents which results in this NdbResultStream depends on.
+   * to any preceding parents which results in this ResultSet depends on.
    * Returns either the tupleNo within TupleSet[] which we navigated to, or 
    * tupleNotFound().
    */
@@ -504,25 +550,22 @@ private:
   /** The receiver object that unpacks transid_AI messages.*/
   NdbReceiver m_receiver;
 
-  /** The buffers which we receive the results into */
-  char* m_buffer;
-
-  /** Used for checking if buffer overrun occurred. */
-  Uint32* m_batchOverflowCheck;
-
-  Uint32 m_rowSize;
-
-  /** The number of transid_AI messages received.*/
-  Uint32 m_rowCount;
+  /**
+   * ResultSets are received into and read from this stream,
+   * intended to be extended into double buffered ResultSet later.
+   */
+  NdbResultSet m_resultSets[1];
+  Uint32 m_read;  // We read from m_resultSets[m_read]
+  Uint32 m_recv;  // We receive into m_resultSets[m_recv]
 
   /** This is the state of the iterator used by firstResult(), nextResult().*/
   enum
   {
     /** The first row has not been fetched yet.*/
     Iter_notStarted,
-    /** Is iterating the resultset, (implies 'm_currentRow!=tupleNotFound').*/
+    /** Is iterating the ResultSet, (implies 'm_currentRow!=tupleNotFound').*/
     Iter_started,  
-    /** Last row for current batch has been returned.*/
+    /** Last row for current ResultSet has been returned.*/
     Iter_finished
   } m_iterState;
 
@@ -600,6 +643,34 @@ void* NdbBulkAllocator::allocObjMem(Uint
   return m_nextObjNo > m_maxObjs ? NULL : result;
 }
 
+///////////////////////////////////////////
+/////////  NdbResultSet methods ///////////
+///////////////////////////////////////////
+NdbResultSet::NdbResultSet() :
+  m_buffer(NULL),
+  m_batchOverflowCheck(NULL),
+  m_rowSize(0),
+  m_rowCount(0)
+{}
+
+void
+NdbResultSet::init(NdbQueryImpl& query,
+                   Uint32 maxRows,
+                   Uint32 rowSize)
+{
+  m_rowSize = rowSize;
+  {
+    const int bufferSize = rowSize * maxRows;
+    NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+    m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
+
+    // So that we can test for buffer overrun.
+    m_batchOverflowCheck = 
+      reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+    *m_batchOverflowCheck = 0xacbd1234;
+  }
+}
+
 //////////////////////////////////////////////
 /////////  NdbResultStream methods ///////////
 //////////////////////////////////////////////
@@ -621,10 +692,7 @@ NdbResultStream::NdbResultStream(NdbQuer
      | (operation.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll
        ? Is_Inner_Join : 0))),
   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
-  m_buffer(NULL),
-  m_batchOverflowCheck(NULL),
-  m_rowSize(0),
-  m_rowCount(0),
+  m_resultSets(), m_read(0xffffffff), m_recv(0),
   m_iterState(Iter_notStarted),
   m_currentRow(tupleNotFound),
   m_maxRows(0),
@@ -645,12 +713,8 @@ NdbResultStream::prepare()
   const Uint32 rowSize = m_operation.getRowSize();
   NdbQueryImpl &query = m_operation.getQuery();
 
-  m_rowSize = rowSize;
-
   /* Parent / child correlation is only relevant for scan type queries
-   * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
-   * Neither is these structures required for operations not having respective
-   * child or parent operations.
+   * Don't create a m_tupleSet with these correlation id's for lookups!
    */
   if (isScanQuery())
   {
@@ -662,14 +726,7 @@ NdbResultStream::prepare()
   else
     m_maxRows = 1;
 
-  const int bufferSize = rowSize * m_maxRows;
-  NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
-  m_buffer = reinterpret_cast<char*>(bufferAlloc.allocObjMem(bufferSize));
-
-  // So that we can test for buffer overrun.
-  m_batchOverflowCheck = 
-    reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
-  *m_batchOverflowCheck = 0xacbd1234;
+  m_resultSets[0].init(query, m_maxRows, rowSize);
 
   m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
   m_receiver.do_setup_ndbrecord(
@@ -678,7 +735,7 @@ NdbResultStream::prepare()
                           0 /*key_size*/, 
                           0 /*read_range_no*/, 
                           rowSize,
-                          m_buffer);
+                          m_resultSets[m_recv].m_buffer);
 } //NdbResultStream::prepare
 
 void
@@ -686,12 +743,10 @@ NdbResultStream::reset()
 {
   assert (isScanQuery());
 
-  // Root scan-operation need a ScanTabConf to complete
-  m_rowCount = 0;
   m_iterState = Iter_notStarted;
   m_currentRow = tupleNotFound;
+  m_resultSets[m_recv].prepareReceive(m_receiver);
 
-  m_receiver.prepareSend();
   /**
    * If this stream will get new rows in the next batch, then so will
    * all of its descendants.
@@ -717,11 +772,11 @@ NdbResultStream::findTupleWithParentId(U
 {
   assert ((parentId==tupleNotFound) == (m_parent==NULL));
 
-  if (likely(m_rowCount>0))
+  if (likely(m_resultSets[m_read].m_rowCount>0))
   {
     if (m_tupleSet==NULL)
     {
-      assert (m_rowCount <= 1);
+      assert (m_resultSets[m_read].m_rowCount <= 1);
       return 0;
     }
 
@@ -788,7 +843,7 @@ NdbResultStream::firstResult()
   if ((m_currentRow=findTupleWithParentId(parentId)) != tupleNotFound)
   {
     m_iterState = Iter_started;
-    m_receiver.setCurrentRow(m_buffer, m_currentRow);
+    m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
     return m_currentRow;
   }
 
@@ -805,7 +860,7 @@ NdbResultStream::nextResult()
       (m_currentRow=findNextTuple(m_currentRow)) != tupleNotFound)
   {
     m_iterState = Iter_started;
-    m_receiver.setCurrentRow(m_buffer, m_currentRow);
+    m_receiver.setCurrentRow(m_resultSets[m_read].m_buffer, m_currentRow);
     return m_currentRow;
   }
   m_iterState = Iter_finished;
@@ -821,7 +876,7 @@ NdbResultStream::execTRANSID_AI(const Ui
                                 TupleCorrelation correlation)
 {
   m_receiver.execTRANSID_AI(ptr, len);
-  m_rowCount++;
+  m_resultSets[m_recv].m_rowCount++;
 
   if (isScanQuery())
   {
@@ -848,11 +903,14 @@ NdbResultStream::prepareResultSet(Uint32
   bool isComplete = isSubScanComplete(remainingScans); //Childs with more rows
   assert(isComplete || isScanResult());                //Lookups always 'complete'
 
-  // Set correct #rows received in the NdbReceiver.
-  getReceiver().m_result_rows = getRowCount();
+   m_read = m_recv;
+   NdbResultSet& readResult = m_resultSets[m_read];
+
+  // Set correct buffer and #rows received by this ResultSet.
+  readResult.prepareRead(m_receiver);
 
   /**
-   * Prepare NdbResultStream for reading - either the next received
+   * Prepare NdbResultSet for reading - either the next received
    * from datanodes or reuse current.
    */
   if (m_tupleSet!=NULL)
@@ -865,7 +923,7 @@ NdbResultStream::prepareResultSet(Uint32
     else
     {
       // Makes all rows in 'TupleSet' available (clear 'm_skip' flag)
-      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
       {
         m_tupleSet[tupleNo].m_skip = false;
       }
@@ -892,7 +950,7 @@ NdbResultStream::prepareResultSet(Uint32
 
     if (m_tupleSet!=NULL)
     {
-      for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
+      for (Uint32 tupleNo=0; tupleNo<readResult.getRowCount(); tupleNo++)
       {
         if (!m_tupleSet[tupleNo].m_skip)
         {
@@ -929,20 +987,23 @@ NdbResultStream::prepareResultSet(Uint32
 /**
  * Fill m_tupleSet[] with correlation data between parent 
  * and child tuples. The 'TupleCorrelation' is stored as
- * and extra Uint32 after each row received
+ * and extra Uint32 after each row in the NdbResultSet
  * by execTRANSID_AI().
  *
  * NOTE: In order to reduce work done when holding the 
  * transporter mutex, the 'TupleCorrelation' is only stored
  * in the buffer when it arrives. Later (here) we build the
  * correlation hashMap immediately before we prepare to 
- * read the NdbResultStream.
+ * read the NdbResultSet.
  */
 void 
 NdbResultStream::buildResultCorrelations()
 {
+  const NdbResultSet& readResult = m_resultSets[m_read];
+
   // Buffer overrun check.
-  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+  assert(readResult.m_batchOverflowCheck==NULL || 
+         *readResult.m_batchOverflowCheck==0xacbd1234);
 
 //if (m_tupleSet!=NULL)
   {
@@ -952,10 +1013,10 @@ NdbResultStream::buildResultCorrelations
       m_tupleSet[i].m_hash_head = tupleNotFound;
     }
 
-    /* Rebuild correlation & hashmap from received buffers */
-    for (Uint32 tupleNo=0; tupleNo<m_rowCount; tupleNo++)
+    /* Rebuild correlation & hashmap from 'readResult' */
+    for (Uint32 tupleNo=0; tupleNo<readResult.m_rowCount; tupleNo++)
     {
-      const Uint32* row = (Uint32*)getRow(tupleNo+1);
+      const Uint32* row = (Uint32*)readResult.getRow(tupleNo+1);
       const TupleCorrelation correlation(row[-1]);
 
       const Uint16 tupleId  = correlation.getTupleId();
@@ -3941,7 +4002,7 @@ NdbQueryOperationImpl::nextResult(bool f
 void 
 NdbQueryOperationImpl::fetchRow(NdbResultStream& resultStream)
 {
-  const char* buff = resultStream.getReceiver().peek_row();
+  const char* buff = resultStream.getCurrentRow();
   assert(buff!=NULL || (m_firstRecAttr==NULL && m_ndbRecord==NULL));
 
   m_isRowNull = false;
@@ -5090,7 +5151,7 @@ NdbOut& operator<<(NdbOut& out, const Nd
 }
 
 NdbOut& operator<<(NdbOut& out, const NdbResultStream& stream){
-  out << " m_rowCount: " << stream.m_rowCount;
+  out << " received rows: " << stream.m_resultSets[stream.m_recv].getRowCount();
   return out;
 }
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0 branch (ole.john.aske:4464 to 4465) Ole John Aske22 Aug