List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 9 2010 2:50pm
Subject:bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3349)
View as plain text  
#At file:///net/fimafeng09/export/home/tmp/oleja/mysql/mysql-5.1-telco-7.0-spj-scan-scan/ based on revid:jan.wedvik@stripped

 3349 Ole John Aske	2010-11-09
      sps-svs: refactoring, no functional changes.
      
      Cleaned up and removed duplicated code in receiving result related to ::handleBatchComplete().
      
      - More logic related to counting of 'm_pendingFrags' and 'm_finalBatchFrags' which was
        duplicated in several signal ::execFOO methods has been moved into ::handleBatchComplete()
      
      - ::incrementPendingFrags() has been removed, and its logic integrated into ::handleBatchComplete
      
      - ::closeSingletonScans() (Which was not a 'close' at all) has been removed 
        by making receive of lookup queries more similar to scans.
      
      - Introduced ::findResultStream(Uint32 receiverId) which contains functionality 
        previously duplicated in ::execTRANSID_AI() and ::execSCAN_TABCONF().
      
      
      
      - 

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-11-08 15:18:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-11-09 14:50:00 +0000
@@ -1867,60 +1867,94 @@ NdbQueryImpl::awaitMoreResults(bool forc
      * the lock, because we know that the signal receiver thread will not
      * be accessing m_fullFrags at this time.
      */
-    NdbRootFragment* frag = m_fullFrags.pop();
-    if (frag==NULL)
+    NdbRootFragment* frag;
+    if ((frag=m_fullFrags.pop()) != NULL)
     {
-      /* Getting here means that either:
-       *  - No results was returned (TCKEYREF)
-       *  - There was no matching row for an inner join.
-       *  - or, the application called nextResult() twice for a lookup query.
-       */
-      assert(m_finalBatchFrags == getRootFragCount());
-      return FetchResult_noMoreData;
+      m_applFrags.add(*frag);
     }
-    else
+    assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
+
+    if (m_applFrags.getCurrent() != NULL)
     {
-      /* Move fragment from receiver thread's container to application 
-       *  thread's container.*/
-      assert(!frag->isEmpty());
-      m_applFrags.add(*frag);
-      assert(m_fullFrags.pop()==NULL); // Only one stream for lookups.
       return FetchResult_ok;
     }
+
+    /* Getting here means that either:
+     *  - No results was returned (TCKEYREF)
+     *  - There was no matching row for an inner join.
+     *  - or, the application called nextResult() twice for a lookup query.
+     */
+    assert(m_pendingFrags == 0);
+    assert(m_finalBatchFrags == getRootFragCount());
+    return FetchResult_noMoreData;
   } // if(m_queryDef.isScanQuery())
+
 } //NdbQueryImpl::awaitMoreResults
 
-void 
+
+/*
+  ::handleBatchComplete() is intended to be called when receiving signals only.
+  The PollGuard mutex is then set and the shared 'm_pendingFrags', 
+  'm_finalBatchFrags' and 'm_fullFrags' can safely be updated.
+
+  returns: 'true' when application thread should be resumed.
+*/
+bool 
 NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
 {
-  assert(m_rootFrags[fragNo].isFragBatchComplete());
-  getRoot().handleBatchComplete(fragNo);
+  if (traceSignals) {
+    ndbout << "NdbQueryImpl::handleBatchComplete, fragNo=" << fragNo
+           << ", pendingFrags=" << (m_pendingFrags-1)
+           << ", finalBatchFrags=" << m_finalBatchFrags
+           <<  endl;
+  }
+  bool resume = false;
+
+  /* May received fragment data after a SCANREF() (timeout?) 
+   * terminated the scan.  We are about to close this query, 
+   * and didn't expect any more data - ignore it!
+   */
+  if (likely(m_fullFrags.m_errorCode == 0))
+  {
+    NdbQueryOperationImpl& root = getRoot();
+    NdbRootFragment& rootFrag = m_rootFrags[fragNo];
+    assert(rootFrag.isFragBatchComplete());
 
-  // Position at the first (sorted?) row available from this fragments.
-  getRoot().m_resultStreams[fragNo]->firstResult();
-}
+    assert(m_pendingFrags > 0);                // Check against underflow.
+    assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
+    m_pendingFrags--;
 
-void 
-NdbQueryImpl::closeSingletonScans()
-{
-  assert(!getQueryDef().isScanQuery());
-  for(Uint32 i = 0; i<getNoOfOperations(); i++){
-    NdbQueryOperationImpl& operation = getQueryOperation(i);
-    NdbResultStream& resultStream = *operation.m_resultStreams[0];
-    /** Now we have received all tuples for all operations. We can thus call
-     *  execSCANOPCONF() with the right row count.
+    if (rootFrag.finalBatchReceived())
+    {
+      m_finalBatchFrags++;
+      assert(m_finalBatchFrags <= m_rootFragCount);
+    }
+
+    if (getQueryDef().isScanQuery())
+    {
+      root.handleBatchComplete(fragNo);  // Only required for scans
+      resume = (m_pendingFrags==0) || true;
+    }
+    else
+    {
+      assert(root.m_resultStreams[fragNo]->getReceiver().m_tcPtrI==RNIL);
+      assert(m_finalBatchFrags==1);
+      assert(m_pendingFrags==0);  // Lookup query should be complete now.
+      resume = true;   
+    }
+
+    /* Position at the first (sorted?) row available from this fragments.
      */
-    resultStream.getReceiver()
-      .execSCANOPCONF(RNIL, 0, resultStream.getRowCount());
-  }
-  /* nextResult() will later move it from m_fullFrags to m_applFrags
-   * under mutex protection.
-   */
-  if (getRoot().m_resultStreams[0]->firstResult() != tupleNotFound) {
-    m_fullFrags.push(m_rootFrags[0]);
+    root.m_resultStreams[fragNo]->firstResult();
+
+    /* When application thread ::awaitMoreResults() it will later be moved
+     * from m_fullFrags to m_applFrags under mutex protection.
+     */
+    m_fullFrags.push(rootFrag);
   }
-  m_finalBatchFrags++;
-} //NdbQueryImpl::closeSingletonScans
+
+  return resume;
+} // NdbQueryImpl::handleBatchComplete
 
 int
 NdbQueryImpl::close(bool forceSend)
@@ -2042,12 +2076,9 @@ NdbQueryImpl::execTCKEYCONF()
   m_rootFrags[0].incrOutstandingResults(-1);
 
   bool ret = false;
-  if (m_rootFrags[0].isFragBatchComplete()) { 
-    /* If this root fragment is complete, verify that the query is also 
-     * complete for this batch.
-     */
-    ret = incrementPendingFrags(-1);
-    assert(ret);
+  if (m_rootFrags[0].isFragBatchComplete())
+  { 
+    ret = handleBatchComplete(0);
   }
 
   if (traceSignals) {
@@ -2058,7 +2089,7 @@ NdbQueryImpl::execTCKEYCONF()
            << endl;
   }
   return ret;
-}
+} // NdbQueryImpl::execTCKEYCONF
 
 void 
 NdbQueryImpl::execCLOSE_SCAN_REP(int errorCode, bool needClose)
@@ -2070,41 +2101,6 @@ NdbQueryImpl::execCLOSE_SCAN_REP(int err
   setFetchTerminated(errorCode,needClose);
 }
 
-/*
-  ::incrementPendingFrags() is intended to be called when receiving signals only.
-  The PollGuard mutex is then set and the shared 'm_pendingFrags' can safely be updated.
-*/
-bool 
-NdbQueryImpl::incrementPendingFrags(int increment)
-{
-  if (unlikely(m_fullFrags.m_errorCode != 0))
-  {
-    /* Received fragment data after a SCANREF() (timeout?) has terminated the scan.
-     * We are about to close this query, and didn't expect there to be anything 'pending'.
-     */
-    assert (m_pendingFrags==0);
-    return (m_pendingFrags==0);
-  }
-
-  m_pendingFrags += increment;
-  assert(m_pendingFrags < 1<<15);            // Check against underflow.
-  assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
-
-  if (traceSignals) {
-    ndbout << "NdbQueryImpl::incrementPendingFrags(" << increment << "): "
-           << ", pendingFrags=" << m_pendingFrags <<  endl;
-  }
-
-  if (m_pendingFrags==0) {
-    if (!getQueryDef().isScanQuery()) {
-      closeSingletonScans();
-    }
-    return true;
-  } else {
-    return false;
-  }
-}
-
 int
 NdbQueryImpl::prepareSend()
 {
@@ -3506,9 +3502,9 @@ NdbQueryOperationImpl::handleBatchComple
     NdbQueryOperationImpl& child = getChildOperation(i);
     child.handleBatchComplete(fragNo);
   }
-
   m_resultStreams[fragNo]->handleBatchComplete();
-}
+
+} // NdbQueryOperationImpl::handleBatchComplete
   
 
 void 
@@ -4321,7 +4317,6 @@ NdbQueryOperationImpl::prepareLookupKeyI
     default:
       assert(false);
     }
-
   }
 
   if (unlikely(keyInfo.isMemoryExhausted())) {
@@ -4332,17 +4327,29 @@ NdbQueryOperationImpl::prepareLookupKeyI
 } // NdbQueryOperationImpl::prepareLookupKeyInfo
 
 
-bool 
-NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len){
-  if (traceSignals) {
-    ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" 
-           << " operation no: " 
-           << getQueryOperationDef().getQueryOperationIx() << endl;
+Uint32
+NdbQueryOperationImpl::findResultStream(Uint32 receiverId) const
+{
+  Uint32 rootFragNo;
+  Uint32 rootFragCount = getQuery().getRootFragCount();
+  assert(&getRoot() == this);
+
+  for (rootFragNo = 0; rootFragNo<rootFragCount; rootFragNo++)
+  {
+    if (m_resultStreams[rootFragNo]->getReceiver().getId() == receiverId)
+      return rootFragNo;
   }
-  bool ret = false;
-  NdbRootFragment* rootFrag = NULL;
 
-  if(getQueryDef().isScanQuery())
+  assert(false);
+  return rootFragCount;
+} // NdbQueryOperationImpl::findResultStream
+
+
+bool 
+NdbQueryOperationImpl::execTRANSID_AI(const Uint32* ptr, Uint32 len)
+{
+  Uint32 rootFragNo = 0;
+  if (getQueryDef().isScanQuery())
   {
     const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
     
@@ -4350,50 +4357,26 @@ NdbQueryOperationImpl::execTRANSID_AI(co
      * of the root operation. We can thus find the correct root fragment 
      * number.
      */
-    Uint32 rootFragNo;
-    for(rootFragNo = 0; 
-        rootFragNo<getQuery().getRootFragCount() && 
-          getRoot().m_resultStreams[rootFragNo]->getReceiver().getId() 
-          != receiverId; 
-        rootFragNo++);
-    assert(rootFragNo<getQuery().getRootFragCount());
-
-    if (traceSignals) {
-      ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" 
-             << " fragment no: " << rootFragNo
-             << endl;
-    }
-
-    // Process result values.
-    m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
-
-    rootFrag = &getQuery().m_rootFrags[rootFragNo];
-    rootFrag->incrOutstandingResults(-1);
-
-    if (rootFrag->isFragBatchComplete()) {
-      m_queryImpl.incrementPendingFrags(-1);
-      m_queryImpl.handleBatchComplete(rootFragNo);
-
-      /* nextResult() will later move it from m_fullFrags to m_applFrags
-       * under mutex protection.*/
-      m_queryImpl.m_fullFrags.push(*rootFrag);
-      // Wake up appl thread when we have data, or entire query batch completed.
-      ret = true;
-    }
+    rootFragNo = getRoot().findResultStream(receiverId);
+  }
+  if (traceSignals) {
+    ndbout << "NdbQueryOperationImpl::execTRANSID_AI()" 
+           << ", operation no: " << getQueryOperationDef().getQueryOperationIx()
+           << ", fragment no: " << rootFragNo
+           << endl;
+  }
+
+  // Process result values.
+  m_resultStreams[rootFragNo]->execTRANSID_AI(ptr, len);
+
+  NdbRootFragment& rootFrag = m_queryImpl.m_rootFrags[rootFragNo];
+  rootFrag.incrOutstandingResults(-1);
+
+  bool ret = false;
+  if (rootFrag.isFragBatchComplete())
+  {
+    ret = m_queryImpl.handleBatchComplete(rootFragNo);
   }
-  else
-  { // Lookup query
-    // The root operation is a lookup.
-    m_resultStreams[0]->execTRANSID_AI(ptr, len);
-
-    rootFrag = &getQuery().m_rootFrags[0];
-    rootFrag->incrOutstandingResults(-1);
-
-    if (rootFrag->isFragBatchComplete()) {
-      ret = m_queryImpl.incrementPendingFrags(-1);
-      assert(ret); // The query should be complete now.
-    }
-  } // end lookup
 
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTRANSID_AI(): returns:" << ret
@@ -4404,7 +4387,8 @@ NdbQueryOperationImpl::execTRANSID_AI(co
 
 
 bool 
-NdbQueryOperationImpl::execTCKEYREF(const NdbApiSignal* aSignal){
+NdbQueryOperationImpl::execTCKEYREF(const NdbApiSignal* aSignal)
+{
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execTCKEYREF()" <<  endl;
   }
@@ -4432,6 +4416,9 @@ NdbQueryOperationImpl::execTCKEYREF(cons
     }
   }
 
+  Uint32 rootFragNo = 0;
+  NdbRootFragment& rootFrag = getQuery().m_rootFrags[0];
+
   if (ref->errorCode != DbspjErr::NodeFailure)
   {
     // Compensate for children results not produced.
@@ -4443,18 +4430,18 @@ NdbQueryOperationImpl::execTCKEYREF(cons
     {
       cnt += getNoOfLeafOperations();
     }
-    getQuery().m_rootFrags[0].incrOutstandingResults(- Int32(cnt));
+    rootFrag.incrOutstandingResults(- Int32(cnt));
   }
   else
   {
     // consider frag-batch complete
-    getQuery().m_rootFrags[0].clearOutstandingResults();
+    rootFrag.clearOutstandingResults();
   }
 
   bool ret = false;
-  if (getQuery().m_rootFrags[0].isFragBatchComplete()) { 
-    ret = m_queryImpl.incrementPendingFrags(-1);
-    assert(ret); // The query should be complete now.
+  if (rootFrag.isFragBatchComplete())
+  { 
+    ret = m_queryImpl.handleBatchComplete(rootFragNo);
   } 
 
   if (traceSignals) {
@@ -4482,14 +4469,9 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
   // For now, only the root operation may be a scan.
   assert(&getRoot() == this);
   assert(m_operationDef.isScanOperation());
-  Uint32 fragNo;
+
   // Find root fragment number.
-  for(fragNo = 0; 
-      fragNo<getQuery().getRootFragCount() && 
-        &getRoot().m_resultStreams[fragNo]
-        ->getReceiver() != receiver; 
-      fragNo++);
-  assert(fragNo<getQuery().getRootFragCount());
+  Uint32 fragNo = findResultStream(receiver->getId());
 
   NdbRootFragment& rootFrag = getQuery().m_rootFrags[fragNo];
   rootFrag.setConfReceived();
@@ -4499,10 +4481,6 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
   NdbResultStream& resultStream = *m_resultStreams[fragNo];
   resultStream.getReceiver().m_tcPtrI = tcPtrI;  
 
-  if (rootFrag.finalBatchReceived())
-  {
-    m_queryImpl.m_finalBatchFrags++;
-  }
   if(traceSignals){
     ndbout << "  resultStream(root) {" << resultStream << "} fragNo=" 
            << fragNo << endl;
@@ -4536,17 +4514,12 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
   // Check that nodeMask does not have more bits than we have operations. 
   UNUSED(finalOpDef);
   assert(nodeMask >> (1+finalOpDef.getQueryOperationId()) == 0);
+
   bool ret = false;
-  if (rootFrag.isFragBatchComplete()) {
-    /* This fragment is now complete*/
-    m_queryImpl.incrementPendingFrags(-1);
-    m_queryImpl.handleBatchComplete(fragNo);
-
-    /* nextResult() will later move it from m_fullFrags to m_applFrags
-     * under mutex protection.*/
-    m_queryImpl.m_fullFrags.push(rootFrag);
-    // Wake up now.
-    ret = true;
+  if (rootFrag.isFragBatchComplete())
+  {
+    /* This fragment is now complete */
+    ret = m_queryImpl.handleBatchComplete(fragNo);
   }
   if (traceSignals) {
     ndbout << "NdbQueryOperationImpl::execSCAN_TABCONF():, returns:" << ret

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-11-08 15:18:22 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-11-09 14:50:00 +0000
@@ -487,28 +487,18 @@ private:
    */
   int sendClose(int nodeId);
 
-  /** Close scan receivers used for lookups. (Since scans and lookups should
-   * have the same semantics for nextResult(), lookups use scan-type 
-   * NdbReceiver objects.)
-   */
-  void closeSingletonScans();
-
   const NdbQuery& getInterface() const
   { return m_interface; }
 
   NdbQueryOperationImpl& getRoot() const 
   { return getQueryOperation(0U); }
 
-  /** Count number of completed root fragments within this batch. 
-   *  @param increment Change in count of completed root frgaments.
-   *  @return True if batch is complete.
-   */
-  bool incrementPendingFrags(int increment);
-
   /** A complete batch has been received for a given root fragment
-   *  Update whatever required before the appl. is allowed to navigate the result.
+   *  Update whatever required before the appl. is allowed to navigate 
+   *  the result.
+   *  @return: 'true' if its time to resume appl. threads
    */ 
-  void handleBatchComplete(Uint32 rootFragNo);
+  bool handleBatchComplete(Uint32 rootFragNo);
 }; // class NdbQueryImpl
 
 
@@ -779,6 +769,8 @@ private:
    * NdbReceiver::m_query_operation_impl here.*/
   Uint32 getIdOfReceiver() const;
   
+  Uint32 findResultStream(Uint32 receiverId) const;
+
   /** 
    * If the operation has a scan filter, append the corresponding
    * interpreter code to a buffer.


Attachment: [text/bzr-bundle] bzr/ole.john.aske@oracle.com-20101109145000-dh3gh32h1lnrk4w3.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3349) Ole John Aske9 Nov