List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:August 16 2011 8:03am
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3531 to 3532)
View as plain text  
 3532 Ole John Aske	2011-08-16 [merge]
      Merge

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
 3531 Ole John Aske	2011-08-15 [merge]
      Merge

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-15 12:38:54 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2011-08-16 08:02:39 +0000
@@ -148,6 +148,8 @@ public:
 
   explicit NdbRootFragment();
 
+  ~NdbRootFragment();
+
   /**
    * Initialize object.
    * @param query Enclosing query.
@@ -198,9 +200,11 @@ public:
    * @param operationNo The id of the operation.
    * @return The result stream for this root fragment.
    */
-  NdbResultStream& getResultStream(Uint32 operationNo) const
-  { return m_query->getQueryOperation(operationNo).getResultStream(m_fragNo); }
-  
+  NdbResultStream& getResultStream(Uint32 operationNo) const;
+
+  NdbResultStream& getResultStream(const NdbQueryOperationImpl& op) const
+  { return getResultStream(op.getQueryOperationDef().getQueryOperationIx()); }
+
   Uint32 getReceiverId() const;
   Uint32 getReceiverTcPtrI() const;
 
@@ -215,6 +219,9 @@ public:
    */
   bool isEmpty() const;
 
+  /** Release resources after last row has been returned */
+  void postFetchRelease();
+
 private:
   STATIC_CONST( voidFragNo = 0xffffffff);
 
@@ -224,6 +231,9 @@ private:
   /** Number of the root operation fragment.*/
   Uint32 m_fragNo;
 
+  /** For processing results originating from this root fragment (Array of).*/
+  NdbResultStream* m_resultStreams;
+
   /**
    * The number of outstanding TCKEYREF or TRANSID_AI 
    * messages for the fragment. This includes both messages related to the
@@ -275,26 +285,19 @@ public:
    * @param operation The operation for which we will receive results.
    * @param rootFragNo 0..n-1 when the root operation reads from n fragments.
    */
-  explicit NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo);
+  explicit NdbResultStream(NdbQueryOperationImpl& operation,
+                           NdbRootFragment& rootFrag);
 
   ~NdbResultStream();
 
   /** 
    * Prepare for receiving first results. 
-   * @return possible error code. 
    */
-  int prepare();
+  void prepare();
 
   /** Prepare for receiving next batch of scan results. */
   void reset();
     
-  /**
-   * 0..n-1 if the root operation reads from n fragments. This stream holds data
-   * derived from one of those fragments.
-   */
-  Uint32 getRootFragNo() const
-  { return m_rootFragNo; }
-
   NdbReceiver& getReceiver()
   { return m_receiver; }
 
@@ -418,22 +421,27 @@ public:
   };
 
 private:
-  /** This stream handles results derived from the m_rootFragNo'th 
-   * fragment of the root operation.*/
-  const Uint32 m_rootFragNo;
+  /**
+   * This stream handles results derived from specified 
+   * m_rootFrag of the root operation.
+   */
+  const NdbRootFragment& m_rootFrag;
+ 
+  /** Operation to which this resultStream belong.*/
+  NdbQueryOperationImpl& m_operation;
 
   /** The receiver object that unpacks transid_AI messages.*/
   NdbReceiver m_receiver;
 
+  /** Used for checking if buffer overrun occurred. */
+  Uint32* m_batchOverflowCheck;
+
   /** Max #rows which this stream may recieve in its buffer structures */
   Uint32 m_maxRows;
 
   /** The number of transid_AI messages received.*/
   Uint32 m_rowCount;
 
-  /** Operation to which this resultStream belong.*/
-  NdbQueryOperationImpl& m_operation;
-
   /** This is the state of the iterator used by firstResult(), nextResult().*/
   enum
   {
@@ -528,12 +536,15 @@ void* NdbBulkAllocator::allocObjMem(Uint
 /////////  NdbResultStream methods ///////////
 //////////////////////////////////////////////
 
-NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation, Uint32 rootFragNo):
-  m_rootFragNo(rootFragNo),
+NdbResultStream::NdbResultStream(NdbQueryOperationImpl& operation,
+                                 NdbRootFragment& rootFrag)
+:
+  m_rootFrag(rootFrag),
+  m_operation(operation),
   m_receiver(operation.getQuery().getNdbTransaction().getNdb()),
+  m_batchOverflowCheck(NULL),
   m_maxRows(0),
   m_rowCount(0),
-  m_operation(operation),
   m_iterState(Iter_notStarted),
   m_currentRow(tupleNotFound),
   m_subScanComplete(true),
@@ -548,9 +559,12 @@ NdbResultStream::~NdbResultStream()
   }
 }
 
-int  // Return 0 if ok, else errorcode
+void
 NdbResultStream::prepare()
 {
+  const Uint32 rowSize = m_operation.getRowSize();
+  NdbQueryImpl &query = m_operation.getQuery();
+
   /* Parent / child correlation is only relevant for scan type queries
    * Don't create m_parentTupleId[] and m_childTupleIdx[] for lookups!
    * Neither is these structures required for operations not having respective
@@ -560,7 +574,7 @@ NdbResultStream::prepare()
   {
     m_maxRows  = m_operation.getMaxBatchRows();
     m_tupleSet = 
-      new (m_operation.getQuery().getTupleSetAlloc().allocObjMem(m_maxRows)) 
+      new (query.getTupleSetAlloc().allocObjMem(m_maxRows)) 
       TupleSet[m_maxRows];
 
     clearTupleSet();
@@ -568,9 +582,25 @@ NdbResultStream::prepare()
   else
     m_maxRows = 1;
 
-  return 0;
-} //NdbResultStream::prepare
+  const int bufferSize = rowSize * m_maxRows;
+  NdbBulkAllocator& bufferAlloc = query.getRowBufferAlloc();
+  char* const rowBuf = reinterpret_cast<char*>(bufferAlloc
+                                               .allocObjMem(bufferSize));
+
+  // So that we can test for buffer overrun.
+  m_batchOverflowCheck = 
+    reinterpret_cast<Uint32*>(bufferAlloc.allocObjMem(sizeof(Uint32)));
+  *m_batchOverflowCheck = 0xacbd1234;
 
+  m_receiver.init(NdbReceiver::NDB_QUERY_OPERATION, false, &m_operation);
+  m_receiver.do_setup_ndbrecord(
+                          m_operation.getNdbRecord(),
+                          m_maxRows, 
+                          0 /*key_size*/, 
+                          0 /*read_range_no*/, 
+                          rowSize,
+                          rowBuf);
+} //NdbResultStream::prepare
 
 void
 NdbResultStream::reset()
@@ -592,7 +622,7 @@ NdbResultStream::reset()
        childNo++)
   {
     NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    child.getResultStream(getRootFragNo()).reset();
+    m_rootFrag.getResultStream(child).reset();
   }
 } //NdbResultStream::reset
 
@@ -623,7 +653,7 @@ NdbResultStream::isAllSubScansComplete()
        childNo++)
   {
     const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    const NdbResultStream& childStream = child.getResultStream(getRootFragNo());
+    const NdbResultStream& childStream = m_rootFrag.getResultStream(child);
     if (!childStream.isAllSubScansComplete())
       return false;
   }
@@ -744,7 +774,7 @@ NdbResultStream::firstResult()
   Uint16 parentId = tupleNotFound;
   if (parent!=NULL)
   {
-    const NdbResultStream& parentStream = parent->getResultStream(m_rootFragNo);
+    const NdbResultStream& parentStream = m_rootFrag.getResultStream(*parent);
     parentId = parentStream.getCurrentTupleId();
 
     if (parentId == tupleNotFound)
@@ -791,8 +821,8 @@ NdbResultStream::execTRANSID_AI(const Ui
   {
     const CorrelationData correlData(ptr, len);
 
-    assert(m_operation.getRoot().getResultStream(m_rootFragNo)
-           .m_receiver.getId() == correlData.getRootReceiverId());
+    assert(m_rootFrag.getResultStream(0).m_receiver.getId() ==
+           correlData.getRootReceiverId());
 
     m_receiver.execTRANSID_AI(ptr, len - CorrelationData::wordCount);
 
@@ -829,6 +859,9 @@ NdbResultStream::execTRANSID_AI(const Ui
 void 
 NdbResultStream::handleBatchComplete()
 {
+  // Buffer overrun check.
+  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck==0xacbd1234);
+
   for (Uint32 tupleNo=0; tupleNo<getRowCount(); tupleNo++)
   {
     m_tupleSet[tupleNo].m_skip = false;
@@ -837,7 +870,7 @@ NdbResultStream::handleBatchComplete()
   for (Uint32 childNo=0; childNo < m_operation.getNoOfChildOperations(); childNo++)
   {
     const NdbQueryOperationImpl& child = m_operation.getChildOperation(childNo);
-    NdbResultStream& childStream = child.getResultStream(m_rootFragNo);
+    NdbResultStream& childStream = m_rootFrag.getResultStream(child);
     childStream.handleBatchComplete();
 
     const bool isInnerJoin = child.getQueryOperationDef().getMatchType() != NdbQueryOptions::MatchAll;
@@ -925,6 +958,7 @@ NdbRootFragment::receiverIdLookup(NdbRoo
 NdbRootFragment::NdbRootFragment():
   m_query(NULL),
   m_fragNo(voidFragNo),
+  m_resultStreams(NULL),
   m_outstandingResults(0),
   m_confReceived(false),
   m_idMapHead(-1),
@@ -932,11 +966,56 @@ NdbRootFragment::NdbRootFragment():
 {
 }
 
+NdbRootFragment::~NdbRootFragment()
+{
+  assert(m_resultStreams==NULL);
+}
+
 void NdbRootFragment::init(NdbQueryImpl& query, Uint32 fragNo)
 {
   assert(m_fragNo==voidFragNo);
   m_query = &query;
   m_fragNo = fragNo;
+
+  m_resultStreams = reinterpret_cast<NdbResultStream*>
+     (query.getResultStreamAlloc().allocObjMem(query.getNoOfOperations()));
+  assert(m_resultStreams!=NULL);
+
+  for (unsigned opNo=0; opNo<query.getNoOfOperations(); opNo++) 
+  {
+    NdbQueryOperationImpl& op = query.getQueryOperation(opNo);
+    new (&m_resultStreams[opNo]) NdbResultStream(op,*this);
+    m_resultStreams[opNo].prepare();
+  }
+}
+
+/**
+ * Release what we want need anymore after last available row has been 
+ * returned from datanodes.
+ */ 
+void
+NdbRootFragment::postFetchRelease()
+{
+  if (m_resultStreams != NULL)
+  { 
+    for (unsigned opNo=0; opNo<m_query->getNoOfOperations(); opNo++) 
+    {
+      m_resultStreams[opNo].~NdbResultStream();
+    }
+  }
+  /**
+   * Don't 'delete' the object as it was in-place constructed from
+   * ResultStreamAlloc'ed memory. Memory is released by
+   * ResultStreamAlloc::reset().
+   */
+  m_resultStreams = NULL;
+}
+
+NdbResultStream&
+NdbRootFragment::getResultStream(Uint32 operationNo) const
+{
+  assert(m_resultStreams);
+  return m_resultStreams[operationNo];
 }
 
 void NdbRootFragment::reset()
@@ -1493,7 +1572,14 @@ NdbQueryImpl::~NdbQueryImpl()
 void
 NdbQueryImpl::postFetchRelease()
 {
-  if (m_operations != NULL) {
+  if (m_rootFrags != NULL)
+  {
+    for (unsigned i=0; i<m_rootFragCount; i++)
+    { m_rootFrags[i].postFetchRelease();
+    }
+  }
+  if (m_operations != NULL)
+  {
     for (unsigned i=0; i<m_countOperations; i++)
     { m_operations[i].postFetchRelease();
     }
@@ -2088,10 +2174,11 @@ NdbQueryImpl::awaitMoreResults(bool forc
   returns: 'true' when application thread should be resumed.
 */
 bool 
-NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
+NdbQueryImpl::handleBatchComplete(NdbRootFragment& rootFrag)
 {
   if (traceSignals) {
-    ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo
+    ndbout << "NdbQueryImpl::handleBatchComplete"
+           << ", fragNo=" << rootFrag.getFragNo()
            << ", pendingFrags=" << (m_pendingFrags-1)
            << ", finalBatchFrags=" << m_finalBatchFrags
            <<  endl;
@@ -2105,7 +2192,6 @@ NdbQueryImpl::handleBatchComplete(Uint32
   if (likely(m_fullFrags.m_errorCode == 0))
   {
     NdbQueryOperationImpl& root = getRoot();
-    NdbRootFragment& rootFrag = m_rootFrags[fragNo];
     assert(rootFrag.isFragBatchComplete());
 
     assert(m_pendingFrags > 0);                // Check against underflow.
@@ -2121,7 +2207,7 @@ NdbQueryImpl::handleBatchComplete(Uint32
     if (getQueryDef().isScanQuery())
     {
       // Only required for scans
-      root.getResultStream(fragNo).handleBatchComplete();  
+      rootFrag.getResultStream(root).handleBatchComplete();  
 
       // Only ordered scans has to wait until all pending completed
       resume = (m_pendingFrags==0) ||
@@ -2129,7 +2215,7 @@ NdbQueryImpl::handleBatchComplete(Uint32
     }
     else
     {
-      assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL);
+      assert(rootFrag.getResultStream(root).getReceiver().m_tcPtrI==RNIL);
       assert(m_finalBatchFrags==1);
       assert(m_pendingFrags==0);  // Lookup query should be complete now.
       resume = true;   
@@ -2137,7 +2223,7 @@ NdbQueryImpl::handleBatchComplete(Uint32
 
     /* Position at the first (sorted?) row available from this fragments.
      */
-    root.m_resultStreams[fragNo]->firstResult();
+    rootFrag.getResultStream(root).firstResult();
 
     /* When application thread ::awaitMoreResults() it will later be moved
      * from m_fullFrags to m_applFrags under mutex protection.
@@ -2290,22 +2376,22 @@ NdbQueryImpl::execTCKEYCONF()
     ndbout << "NdbQueryImpl::execTCKEYCONF()" << endl;
   }
   assert(!getQueryDef().isScanQuery());
+  NdbRootFragment& rootFrag = m_rootFrags[0];
 
   // We will get 1 + #leaf-nodes TCKEYCONF for a lookup...
-  m_rootFrags[0].setConfReceived();
-  m_rootFrags[0].incrOutstandingResults(-1);
+  rootFrag.setConfReceived();
+  rootFrag.incrOutstandingResults(-1);
 
   bool ret = false;
-  if (m_rootFrags[0].isFragBatchComplete())
+  if (rootFrag.isFragBatchComplete())
   { 
-    ret = handleBatchComplete(0);
+    ret = handleBatchComplete(rootFrag);
   }
 
   if (traceSignals) {
     ndbout << "NdbQueryImpl::execTCKEYCONF(): returns:" << ret
            << ", m_pendingFrags=" << m_pendingFrags
-           << ", *getRoot().m_resultStreams[0]=" 
-           << *getRoot().m_resultStreams[0]
+           << ", rootStream= {" << rootFrag.getResultStream(0) << "}"
            << endl;
   }
   return ret;
@@ -2382,9 +2468,7 @@ NdbQueryImpl::prepareSend()
   // Allocate space for ptrs to NdbResultStream and NdbRootFragment objects.
   error = m_pointerAlloc.init(m_rootFragCount * 
                               (SharedFragStack::pointersPerFragment +
-                               OrderedFragSet::pointersPerFragment +
-                               // Pointers to NdbResultStream objects.
-                               getNoOfOperations()));
+                               OrderedFragSet::pointersPerFragment));
   if (error != 0)
   {
     setErrorCode(error);
@@ -2395,21 +2479,22 @@ NdbQueryImpl::prepareSend()
   getRoot().calculateBatchedRows(NULL);
   getRoot().setBatchedRows(1);
 
-  /** Calculate total amount of row buffer space for all operations and
-   * fragments.*/
+  /**
+   * Calculate total amount of row buffer space for all operations and
+   * fragments.
+   */
   Uint32 totalBuffSize = 0;
   for (Uint32 opNo = 0; opNo < getNoOfOperations(); opNo++)
   {
-    NdbQueryOperationImpl& op = getQueryOperation(opNo);
-
-    op.m_bufferSize = op.getRowSize() * op.getMaxBatchRows();
-    totalBuffSize += op.m_bufferSize;
+    const NdbQueryOperationImpl& op = getQueryOperation(opNo);
+    totalBuffSize += (op.getRowSize() * op.getMaxBatchRows());
   }
-  /** Add one word per operation for buffer overrun check. We add a word
+  /**
+   * Add one word per ResultStream for buffer overrun check. We add a word
    * rather than a byte to avoid possible alignment problems.
    */
-  m_rowBufferAlloc.init(totalBuffSize * m_rootFragCount + 
-                        sizeof(Uint32) * getNoOfOperations());
+  m_rowBufferAlloc.init(m_rootFragCount * 
+                       (totalBuffSize + (sizeof(Uint32) * getNoOfOperations())) );
   if (getQueryDef().isScanQuery())
   {
     Uint32 totalRows = 0;
@@ -2424,15 +2509,28 @@ NdbQueryImpl::prepareSend()
       return -1;
     }
   }
-  // 1. Build receiver structures for each QueryOperation.
-  // 2. Fill in parameters (into ATTRINFO) for QueryTree.
-  //    (Has to complete *after* ::prepareReceiver() as QueryTree params
-  //     refer receiver id's.)
-  //
+
+  /**
+   * Allocate and initialize fragment state variables.
+   * Will also cause a ResultStream object containing a 
+   * NdbReceiver to be constructed for each operation in QueryTree
+   */
+  m_rootFrags = new NdbRootFragment[m_rootFragCount];
+  if (m_rootFrags == NULL)
+  {
+    setErrorCode(Err_MemoryAlloc);
+    return -1;
+  }
+  for (Uint32 i = 0; i<m_rootFragCount; i++)
+  {
+    m_rootFrags[i].init(*this, i); // Set fragment number.
+  }
+
+  // Fill in parameters (into ATTRINFO) for QueryTree.
   for (Uint32 i = 0; i < m_countOperations; i++) {
-    int error;
-    if (unlikely((error = m_operations[i].prepareReceiver()) != 0)
-              || (error = m_operations[i].prepareAttrInfo(m_attrInfo)) != 0) {
+    const int error = m_operations[i].prepareAttrInfo(m_attrInfo);
+    if (unlikely(error))
+    {
       setErrorCode(error);
       return -1;
     }
@@ -2468,23 +2566,6 @@ NdbQueryImpl::prepareSend()
     return -1;
   }
 
-  /**
-   * Allocate and initialize fragment state variables.
-   */
-  m_rootFrags = new NdbRootFragment[m_rootFragCount];
-  if(m_rootFrags == NULL)
-  {
-    setErrorCode(Err_MemoryAlloc);
-    return -1;
-  }
-  else
-  {
-    for(Uint32 i = 0; i<m_rootFragCount; i++)
-    {
-      m_rootFrags[i].init(*this, i); // Set fragment number.
-    }
-  }
-
   if (getQueryDef().isScanQuery())
   {
     NdbRootFragment::buildReciverIdMap(m_rootFrags, m_rootFragCount);
@@ -2854,17 +2935,15 @@ Remark:
 int
 NdbQueryImpl::sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend)
 {
-  assert(getRoot().m_resultStreams!=NULL);
   assert(!emptyFrag.finalBatchReceived());
   assert(getQueryDef().isScanQuery());
 
-  const Uint32 fragNo = emptyFrag.getFragNo();
   emptyFrag.reset();
 
   for (unsigned opNo=0; opNo<m_countOperations; opNo++) 
   {
     NdbResultStream& resultStream = 
-       getQueryOperation(opNo).getResultStream(fragNo);
+       emptyFrag.getResultStream(opNo);
 
     if (!resultStream.isSubScanComplete())
     {
@@ -3386,10 +3465,7 @@ NdbQueryOperationImpl::NdbQueryOperation
   m_parent(NULL),
   m_children(def.getNoOfChildOperations()),
   m_maxBatchRows(0),   // >0: User specified prefered value, ==0: Use default CFG values
-  m_resultStreams(NULL),
   m_params(),
-  m_bufferSize(0),
-  m_batchOverflowCheck(NULL),
   m_resultBuffer(NULL),
   m_resultRef(NULL),
   m_isRowNull(true),
@@ -3438,10 +3514,10 @@ NdbQueryOperationImpl::NdbQueryOperation
 
 NdbQueryOperationImpl::~NdbQueryOperationImpl()
 {
-  // We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
-  // Either by fetching through last row, or calling ::close() which forcefully terminates fetch
-  assert (m_batchOverflowCheck == NULL);
-  assert (m_resultStreams == NULL);
+  /**
+   * We expect ::postFetchRelease to have deleted fetch related structures when fetch completed.
+   * Either by fetching through last row, or calling ::close() which forcefully terminates fetch
+   */
   assert (m_firstRecAttr == NULL);
   assert (m_interpretedCode == NULL);
 } //NdbQueryOperationImpl::~NdbQueryOperationImpl()
@@ -3453,22 +3529,6 @@ NdbQueryOperationImpl::~NdbQueryOperatio
 void
 NdbQueryOperationImpl::postFetchRelease()
 {
-  // Buffer overrun check.
-  assert(m_batchOverflowCheck==NULL || *m_batchOverflowCheck ==  0xacbd1234);
-  m_batchOverflowCheck = NULL;
-  
-  if (m_resultStreams != NULL)
-  { 
-    for (int i = static_cast<int>(getQuery().getRootFragCount())-1; i >= 0; i--)
-    {
-      if (m_resultStreams[i] != NULL)
-      {
-        m_resultStreams[i]->~NdbResultStream();
-      }
-    }
-  }
-  m_resultStreams = NULL;
-
   Ndb* const ndb = m_queryImpl.getNdbTransaction().getNdb();
   NdbRecAttr* recAttr = m_firstRecAttr;
   while (recAttr != NULL) {
@@ -3702,7 +3762,7 @@ NdbQueryOperationImpl::firstResult()
 
   if (rootFrag != NULL)
   {
-    NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+    NdbResultStream& resultStream = rootFrag->getResultStream(*this);
     if (resultStream.firstResult() != tupleNotFound)
     {
       fetchRow(resultStream);
@@ -3742,7 +3802,7 @@ NdbQueryOperationImpl::nextResult(bool f
     const NdbRootFragment* rootFrag = m_queryImpl.m_applFrags.getCurrent();
     if (rootFrag!=NULL)
     {
-      NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+      NdbResultStream& resultStream = rootFrag->getResultStream(*this);
       if (resultStream.nextResult() != tupleNotFound)
       {
         fetchRow(resultStream);
@@ -4073,55 +4133,9 @@ NdbQueryOperationImpl::setBatchedRows(Ui
   }
 }
 
-
-int 
-NdbQueryOperationImpl::prepareReceiver()
-{
-  // Construct receiver streams and prepare them for receiving scan result
-  assert(m_resultStreams==NULL);
-  assert(m_queryImpl.getRootFragCount() > 0);
-
-  m_resultStreams = reinterpret_cast<NdbResultStream**>
-    (getQuery().getPointerAlloc().allocObjMem(m_queryImpl.getRootFragCount())); 
-
-  for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
-    m_resultStreams[i] = NULL;  // Init to legal contents for d'tor
-  }
-  for(Uint32 i = 0; i<m_queryImpl.getRootFragCount(); i++) {
-    m_resultStreams[i] = new (getQuery().m_resultStreamAlloc.allocObjMem(1))
-      NdbResultStream(*this, i);
-    const int error = m_resultStreams[i]->prepare();
-    if (unlikely(error)) {
-      return error;
-    }
-
-    m_resultStreams[i]->getReceiver().init(NdbReceiver::NDB_QUERY_OPERATION, 
-                                        false, this);
-    char* const rowBuf = reinterpret_cast<char*>(getQuery().getRowBufferAlloc()
-                                                 .allocObjMem(m_bufferSize));
-    m_resultStreams[i]->getReceiver()
-      .do_setup_ndbrecord(m_ndbRecord,
-                          getMaxBatchRows(), 
-                          0 /*key_size*/, 
-                          0 /*read_range_no*/, 
-                          getRowSize(),
-                          rowBuf);
-    m_resultStreams[i]->getReceiver().prepareSend();
-  }
-  // So that we can test for for buffer overrun.
-  m_batchOverflowCheck = 
-    reinterpret_cast<Uint32*>(getQuery().getRowBufferAlloc()
-                              .allocObjMem(sizeof(Uint32)));
-  *m_batchOverflowCheck = 0xacbd1234;
-  return 0;
-}//NdbQueryOperationImpl::prepareReceiver
-
 int 
 NdbQueryOperationImpl::prepareAttrInfo(Uint32Buffer& attrInfo)
 {
-  // ::prepareReceiver() need to complete first:
-  assert (m_resultStreams != NULL);
-
   const NdbQueryOperationDefImpl& def = getQueryOperationDef();
 
   /**
@@ -4604,14 +4618,14 @@ NdbQueryOperationImpl::execTRANSID_AI(co
   }
 
   // Process result values.
-  m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
+  rootFrag->getResultStream(*this).execTRANSID_AI(ptr, len);
 
   rootFrag->incrOutstandingResults(-1);
 
   bool ret = false;
   if (rootFrag->isFragBatchComplete())
   {
-    ret = m_queryImpl.handleBatchComplete(rootFragNo);
+    ret = m_queryImpl.handleBatchComplete(*rootFrag);
   }
 
   if (traceSignals) {
@@ -4653,7 +4667,6 @@ NdbQueryOperationImpl::execTCKEYREF(cons
     }
   }
 
-  Uint32 rootFragNo = 0;
   NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
 
   if (ref->errorCode != DbspjErr::NodeFailure)
@@ -4678,13 +4691,12 @@ NdbQueryOperationImpl::execTCKEYREF(cons
   bool ret = false;
   if (rootFrag.isFragBatchComplete())
   { 
-    ret = m_queryImpl.handleBatchComplete(rootFragNo);
+    ret = m_queryImpl.handleBatchComplete(rootFrag);
   } 
 
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTCKEYREF(): returns:" << ret
-           << ", *getRoot().m_resultStreams[0] {" 
-           << *getRoot().m_resultStreams[0] << "}"
+           << ", resultStream= {" << rootFrag.getResultStream(*this) << "}"
            << ", *this=" << *this <<  endl;
   }
   return ret;
@@ -4720,12 +4732,13 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
   rootFrag->incrOutstandingResults(rowCount);
 
   // Handle for SCAN_NEXTREQ, RNIL -> EOF
-  NdbResultStream& resultStream = *m_resultStreams[rootFrag->getFragNo()];
+  NdbResultStream& resultStream = rootFrag->getResultStream(*this);
   resultStream.getReceiver().m_tcPtrI = tcPtrI;  
 
   if(traceSignals){
-    ndbout << "  resultStream(root) {" << resultStream << "} fragNo" 
-           << rootFrag->getFragNo() << endl;
+    ndbout << "  resultStream {" << rootFrag->getResultStream(*this)
+           << "} fragNo" << rootFrag->getFragNo()
+           << endl;
   }
 
   const NdbQueryDefImpl& queryDef = m_queryImpl.getQueryDef();
@@ -4757,7 +4770,7 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
   if (rootFrag->isFragBatchComplete())
   {
     /* This fragment is now complete */
-    ret = m_queryImpl.handleBatchComplete(rootFrag->getFragNo());
+    ret = m_queryImpl.handleBatchComplete(*rootFrag);
   }
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret
@@ -4893,13 +4906,6 @@ int NdbQueryOperationImpl::setBatchSize(
   return 0;
 }
 
-NdbResultStream& 
-NdbQueryOperationImpl::getResultStream(Uint32 rootFragNo) const
-{
-  assert(rootFragNo < getQuery().getRootFragCount());
-  return *m_resultStreams[rootFragNo];
-}
-
 bool
 NdbQueryOperationImpl::hasInterpretedCode() const
 {
@@ -4938,15 +4944,8 @@ NdbQueryOperationImpl::prepareInterprete
 
 Uint32 
 NdbQueryOperationImpl::getIdOfReceiver() const {
-  return m_resultStreams[0]->getReceiver().getId();
-}
-
-
-const NdbReceiver& 
-NdbQueryOperationImpl::getReceiver(Uint32 recNo) const {
-  assert(recNo<getQuery().getRootFragCount());
-  assert(m_resultStreams!=NULL);
-  return m_resultStreams[recNo]->getReceiver();
+  NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[0];
+  return rootFrag.getResultStream(*this).getReceiver().getId();
 }
 
 Uint32 NdbQueryOperationImpl::getRowSize() const
@@ -4975,7 +4974,8 @@ NdbOut& operator<<(NdbOut& out, const Nd
   out << "  m_queryImpl: " << &op.m_queryImpl;
   out << "  m_operationDef: " << &op.m_operationDef;
   for(Uint32 i = 0; i<op.m_queryImpl.getRootFragCount(); i++){
-    out << "  m_resultStream[" << i << "]{" << *op.m_resultStreams[i] << "}";
+    NdbRootFragment& rootFrag = op.m_queryImpl.m_rootFrags[i];
+    out << "  m_resultStream[" << i << "]{" << rootFrag.getResultStream(op) << "}";
   }
   out << " m_isRowNull " << op.m_isRowNull;
   out << " ]";

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-06-20 13:25:48 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2011-08-16 07:56:53 +0000
@@ -247,9 +247,15 @@ public:
   Uint32 getRootFragCount() const
   { return m_rootFragCount; }
 
+  NdbBulkAllocator& getResultStreamAlloc()
+  { return m_resultStreamAlloc; }
+
   NdbBulkAllocator& getTupleSetAlloc()
   { return m_tupleSetAlloc; }
 
+  NdbBulkAllocator& getRowBufferAlloc()
+  { return m_rowBufferAlloc; }
+
 private:
   /** Possible return values from NdbQueryImpl::awaitMoreResults. 
    * A subset of the integer values also matches those returned
@@ -577,13 +583,10 @@ private:
    *  the result.
    *  @return: 'true' if its time to resume appl. threads
    */ 
-  bool handleBatchComplete(Uint32 rootFragNo);
+  bool handleBatchComplete(NdbRootFragment& rootFrag);
 
   NdbBulkAllocator& getPointerAlloc()
   { return m_pointerAlloc; }
-  
-  NdbBulkAllocator& getRowBufferAlloc()
-  { return m_rowBufferAlloc; }
 
 }; // class NdbQueryImpl
 
@@ -705,10 +708,6 @@ public:
   int setInterpretedCode(const NdbInterpretedCode& code);
   bool hasInterpretedCode() const;
 
-  NdbResultStream& getResultStream(Uint32 rootFragNo) const;
-
-  const NdbReceiver& getReceiver(Uint32 rootFragNo) const;
-
   /** Verify magic number.*/
   bool checkMagicNumber() const
   { return m_magic == MAGIC; }
@@ -719,6 +718,12 @@ public:
   Uint32 getMaxBatchRows() const
   { return m_maxBatchRows; }
 
+  /** Get size of row as required to buffer it. */  
+  Uint32 getRowSize() const;
+
+  const NdbRecord* getNdbRecord() const
+  { return m_ndbRecord; }
+
 private:
 
   STATIC_CONST (MAGIC = 0xfade1234);
@@ -742,16 +747,9 @@ private:
   /** Max rows (per resultStream) in a scan batch.*/
   Uint32 m_maxBatchRows;
 
-  /** For processing results from this operation (Array of).*/
-  NdbResultStream** m_resultStreams;
   /** Buffer for parameters in serialized format */
   Uint32Buffer m_params;
 
-  /** Buffer size allocated for *each* ResultStream/Receiver when 
-   *  fetching results.*/
-  Uint32 m_bufferSize;
-  /** Used for checking if buffer overrun occurred. */
-  Uint32* m_batchOverflowCheck;
   /** User specified buffer for final storage of result.*/
   char* m_resultBuffer;
   /** User specified pointer to application pointer that should be 
@@ -819,9 +817,6 @@ private:
   Uint32 calculateBatchedRows(const NdbQueryOperationImpl* closestScan);
   void setBatchedRows(Uint32 batchedRows);
 
-  /** Construct and prepare receiver streams for result processing. */
-  int prepareReceiver();
-
   /** Prepare ATTRINFO for execution. (Add execution params++)
    *  @return possible error code.*/
   int prepareAttrInfo(Uint32Buffer& attrInfo);
@@ -863,7 +858,6 @@ private:
   bool diskInUserProjection() const
   { return m_diskInUserProjection; }
 
-  Uint32 getRowSize() const;
 }; // class NdbQueryOperationImpl
 
 

No bundle (reason: useless for push emails).
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3531 to 3532) Ole John Aske17 Aug