List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 5 2010 2:45pm
Subject:bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3340)
View as plain text  
#At file:///net/fimafeng09/export/home/tmp/oleja/mysql/mysql-5.1-telco-7.0-spj-scan-scan/ based on revid:jonas@stripped

 3340 Ole John Aske	2010-11-05
      sps-svs: Fixed improper handling of SCANREF's, timeout and nodefailure when 
        wait_scan() wait for more results to arrive.
      
      Fix introduce the member field 'm_errorCode' in class SharedFragStack. m_errorCode is used
      to temporary store errors received from TC / datanodes. As these error are received
      asyncronous by the receiver thread, they should not be stored directly into m_error.code.
      
      Instead we check m_errorCode for errors whenever we have locked the PollGuard mutex. At 
      this point any received errors may be made moved into 'm_error.code' where it is made
      visible, and returned to the aplication.
      
      This fix also contains several fixed for ::closeTcCursor() related to when we should
      wait for pending results, and when to send a 'close' to TC.
      
      Furthermore, execTRANSID_ID has been enhanced to ignore counting of 'pending' fragments
      if we are in an error situation. These are 'leftover' TRANSID_AI's arriving after a SCANREF, and
      before TC and LQH has been requested to close its cursors.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/src/ndbapi/NdbTransactionScan.cpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-11-02 11:36:28 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-11-05 14:45:27 +0000
@@ -47,7 +47,7 @@ STATIC_CONST(Err_MemoryAlloc = 4000);
 STATIC_CONST(Err_SendFailed = 4002);
 STATIC_CONST(Err_FunctionNotImplemented = 4003);
 STATIC_CONST(Err_UnknownColumn = 4004);
-STATIC_CONST(Err_ReceiveFromNdbFailed = 4008);
+STATIC_CONST(Err_ReceiveTimedOut = 4008);
 STATIC_CONST(Err_NodeFailCausedAbort = 4028);
 STATIC_CONST(Err_WrongFieldLength = 4209);
 STATIC_CONST(Err_MixRecAttrAndRecord = 4284);
@@ -1688,76 +1688,77 @@ NdbQueryImpl::nextRootResult(bool fetchA
    * accessed by the application thread, so it is safe to use it without 
    * locks.
    */
-  while (m_state != EndOfData)  // Or likely:  return when 'gotRow'
+  while (m_state != EndOfData)  // Or likely:  return when 'gotRow' or error
   {
     const NdbRootFragment* rootFrag = m_applFrags.getCurrent();
     if (unlikely(rootFrag==NULL))
     {
       /* m_applFrags is empty, so we cannot get more results without 
        * possibly blocking.
+       *
+       * ::awaitMoreResults() will either copy fragments that are already
+       * complete (under mutex protection), or block until data
+       * previously requested arrives.
        */
-      if (fetchAllowed)
-      {
-        /* fetchMoreResults() will either copy fragments that are already
-         * complete (under mutex protection), or block until more data arrives.
-         */
-        const FetchResult fetchResult = awaitMoreResults(forceSend);
-        switch (fetchResult) {
-
-        case FetchResult_ok:
-          break;
-
-        case FetchResult_otherError:
-          assert (m_error.code != 0);
-          setErrorCode(m_error.code);
-          return NdbQuery::NextResult_error;
-
-        case FetchResult_sendFail:
-          setErrorCode(Err_NodeFailCausedAbort);
-          return NdbQuery::NextResult_error;
-
-        case FetchResult_nodeFail:
-          setErrorCode(Err_NodeFailCausedAbort);
-          return NdbQuery::NextResult_error;
+      const FetchResult fetchResult = awaitMoreResults(forceSend);
+      switch (fetchResult) {
 
-        case FetchResult_timeOut:
-          setErrorCode(Err_ReceiveFromNdbFailed);
-          return NdbQuery::NextResult_error;
+      case FetchResult_ok:          // OK - got data wo/ error
+        assert (m_error.code == 0);
+        rootFrag = m_applFrags.getCurrent();
+        assert (rootFrag!=NULL);
+        break;
 
-        case FetchResult_scanComplete:
-          getRoot().nullifyResult();
+      case FetchResult_noMoreData:  // No data, no error
+        assert (m_error.code == 0);
+        assert (m_applFrags.getCurrent()==NULL);
+        getRoot().nullifyResult();
+        if (!fetchAllowed)
+        {
+          return NdbQuery::NextResult_bufferEmpty;
+        }
+        else if (m_finalBatchFrags == getRootFragCount())
+        {
+          m_state = EndOfData;
+          postFetchRelease();
           return NdbQuery::NextResult_scanComplete;
-
-        default:
-          assert(false);
         }
+        break;  // ::sendFetchMore() will request more results
+
+      case FetchResult_gotError:    // Error in 'm_error.code'
+        assert (m_error.code != 0);
+        return NdbQuery::NextResult_error;
+
+      default:
+        assert(false);
       }
-      else
-      { 
-        // There are no more cached records in NdbApi
-        getRoot().nullifyResult();
-        return NdbQuery::NextResult_bufferEmpty; 
-      }
-      rootFrag = m_applFrags.getCurrent();
     }
     else
     {
       assert(rootFrag->isFragBatchComplete());
       rootFrag->getResultStream(0).nextResult();   // Consume current
-      m_applFrags.reorganize();
+      m_applFrags.reorganize();                    // Calculate new current
       // Reorg. may update 'current' RootFragment
       rootFrag = m_applFrags.getCurrent();
+    }
 
+    /**
+     * If allowed to request more rows from datanodes, we do this asynch
+     * and request more rows as soon as we have consumed all rows from a
+     * fragment. ::awaitMoreResults() may eventually block and wait for these
+     * when required.
+     */
+    if (fetchAllowed)
+    {
       // Ask for a new batch if we emptied one.
-      NdbRootFragment* const emptyFrag = m_applFrags.getEmpty();
-      if (emptyFrag != NULL)
+      NdbRootFragment* emptyFrag = m_applFrags.getEmpty();
+      while (emptyFrag != NULL)
       {
         if (sendFetchMore(*emptyFrag, forceSend) != 0)
         {
-          setErrorCode(Err_NodeFailCausedAbort); // Node fail
           return NdbQuery::NextResult_error;
         }        
-        assert(m_applFrags.getEmpty() == NULL);
+        emptyFrag = m_applFrags.getEmpty();
       }
     }
 
@@ -1767,13 +1768,18 @@ NdbQueryImpl::nextRootResult(bool fetchA
       getRoot().fetchRow(rootFrag->getResultStream(0));
       return NdbQuery::NextResult_gotRow;
     }
-  }
+  } // m_state != EndOfData
 
   assert (m_state == EndOfData);
   return NdbQuery::NextResult_scanComplete;
 } //NdbQueryImpl::nextRootResult()
 
 
+/**
+ * Wait for more scan results which already has been REQuested to arrive.
+ * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+ * and 1 of there are no more rows to receive.
+ */
 NdbQueryImpl::FetchResult
 NdbQueryImpl::awaitMoreResults(bool forceSend)
 {
@@ -1783,83 +1789,77 @@ NdbQueryImpl::awaitMoreResults(bool forc
   if (m_queryDef.isScanQuery())
   {
     assert (m_scanTransaction);
+    assert (m_state==Executing);
 
     Ndb* const ndb = m_transaction.getNdb();
-    Uint32 timeout = ndb->theImpl->get_waitfor_timeout();
-
-    while (likely(m_error.code==0))
     {
-      {
-        /* This part needs to be done under mutex due to synchronization with 
-         * receiver thread.
-         */
-        PollGuard poll_guard(*ndb->theImpl);
-        assert (m_state==Executing);
+      /* This part needs to be done under mutex due to synchronization with 
+       * receiver thread.
+       */
+      PollGuard poll_guard(*ndb->theImpl);
 
+      /* There may be pending (asynchronous received, mutex protected) errors
+       * from TC / datanodes. Propogate these into m_error.code in 'API space'.
+       */
+      while (likely(!hasReceivedError()))
+      {
         /* m_fullFrags contains any fragments that are complete (for this batch)
          * but have not yet been moved (under mutex protection) to 
          * m_applFrags.
          */
-        if (m_fullFrags.size()==0) {
-          if (m_pendingFrags == 0)
-          {
-            assert(m_finalBatchFrags == getRootFragCount());
-            m_state = EndOfData;
-            postFetchRelease();
-            return FetchResult_scanComplete;
-          }
-          else
-          {
-            /* More results are on the way, so we wait for them.*/
-            const FetchResult waitResult = static_cast<FetchResult>
-              (poll_guard.wait_scan(3*timeout, 
-                                    m_transaction.getConnectedNodeId(), 
-                                    forceSend));
-            if (waitResult != FetchResult_ok)
-            {
-              return waitResult;
-            }
-          }
-        } // if (m_fullFrags.size()==0)
-  
         NdbRootFragment* frag;
         while ((frag=m_fullFrags.pop()) != NULL)
         {
           m_applFrags.add(*frag);
         }
-      }
+        if (m_applFrags.getCurrent() != NULL)
+        {
+          return FetchResult_ok;
+        }
 
-      /**
-       * We may get batches that are empty but still not the final batch
-       * for that fragment. If so, ask for a new batch.
-       */
-      NdbRootFragment* emptyFrag = m_applFrags.getEmpty();
-      while (emptyFrag != NULL)
-      {
-        if (unlikely(sendFetchMore(*emptyFrag, forceSend) != 0))
+        /* There are noe more available frament results available without
+         * first waiting for more to be received from datanodes
+         */
+        if (m_pendingFrags == 0)
         {
-          return FetchResult_sendFail;
-        }        
-        emptyFrag = m_applFrags.getEmpty();
-      }
+          // 'No more *pending* results', ::sendFetchMore() may make more available
+          return FetchResult_noMoreData;
+        }
 
-      if (m_applFrags.getCurrent() != NULL)
-      {
-        return FetchResult_ok;
-      }
+        /* More results are on the way, so we wait for them.*/
+        const FetchResult waitResult = static_cast<FetchResult>
+          (poll_guard.wait_scan(3*ndb->theImpl->get_waitfor_timeout(), 
+                                m_transaction.getConnectedNodeId(), 
+                                forceSend));
 
-      /* Getting here is not an error. PollGuard::wait_scan() will return
-       * when a complete batch (for a fragment) is available for *any* active 
-       * scan in this transaction. So we must wait again for the next arriving 
-       * batch.
-       */
-    } // while(likely(m_error.code==0))
+        switch (waitResult) {
+        case FetchResult_ok:  // SCAN_TABREF, may have setFetchTerminated() w/ errors
+          break;
+        case FetchResult_timeOut:
+          setFetchTerminated(Err_ReceiveTimedOut,false);
+          break;
+        case FetchResult_nodeFail:
+          setFetchTerminated(Err_NodeFailCausedAbort,false);
+          break;
+        default:
+          assert(false);
+        }
+        assert (!m_error.code);
 
-    // 'while' terminated by m_error.code
-    assert (m_error.code);
-    return FetchResult_otherError;
+        /* Getting here is not an error. PollGuard::wait_scan() will return
+         * when a complete batch (for a fragment) is available for *any* active 
+         * scan in this transaction. So we must wait again for the next arriving 
+         * batch.
+         */
+      } // while(!hasReceivedError())
+    } // Terminates scope of 'PollGuard'
 
-  } else { // is a Lookup query
+    // Fall through only if ::hasReceivedError()
+    assert (m_error.code);
+    return FetchResult_gotError;
+  }
+  else // is a Lookup query
+  {
     /* The root operation is a lookup. Lookups are guaranteed to be complete
      * before NdbTransaction::execute() returns. Therefore we do not set
      * the lock, because we know that the signal receiver thread will not
@@ -1873,9 +1873,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
        *  - There was no matching row for an inner join.
        *  - or, the application called nextResult() twice for a lookup query.
        */
-      m_state = EndOfData;
-      postFetchRelease();
-      return FetchResult_scanComplete;
+      return FetchResult_noMoreData;
     }
     else
     {
@@ -1887,7 +1885,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
       return FetchResult_ok;
     }
   } // if(m_queryDef.isScanQuery())
-} //NdbQueryImpl::fetchMoreResults
+} //NdbQueryImpl::awaitMoreResults
 
 void 
 NdbQueryImpl::handleBatchComplete(Uint32 fragNo)
@@ -1918,6 +1916,7 @@ NdbQueryImpl::closeSingletonScans()
   if (getRoot().m_resultStreams[0]->firstResult() != tupleNotFound) {
     m_fullFrags.push(m_rootFrags[0]);
   }
+  m_finalBatchFrags++;
 } //NdbQueryImpl::closeSingletonScans
 
 int
@@ -1991,6 +1990,42 @@ NdbQueryImpl::setErrorCodeAbort(int aErr
 }
 
 
+/*
+ * ::setFetchTerminated() Should only be called with mutex locked.
+ * Register result fetching as completed (possibly prematurely, w/ errorCode).
+ */
+void
+NdbQueryImpl::setFetchTerminated(int errorCode, bool needClose)
+{
+  assert(m_finalBatchFrags < getRootFragCount());
+  if (!needClose)
+  {
+    m_finalBatchFrags = getRootFragCount();
+  }
+  if (errorCode!=0)
+  {
+    m_fullFrags.m_errorCode = errorCode;
+  }
+  m_pendingFrags = 0;
+} // NdbQueryImpl::setFetchTerminated()
+
+
+/* There may be pending (asynchronous received, mutex protected) errors
+ * from TC / datanodes. Propogate these into 'API space'.
+ * ::hasReceivedError() Should only be called with mutex locked
+ */
+bool
+NdbQueryImpl::hasReceivedError()
+{
+  if (unlikely(m_fullFrags.m_errorCode))
+  {
+    setErrorCode(m_fullFrags.m_errorCode);
+    return true;
+  }
+  return false;
+} // NdbQueryImpl::hasReceivedError
+
+
 bool 
 NdbQueryImpl::execTCKEYCONF()
 {
@@ -2023,18 +2058,13 @@ NdbQueryImpl::execTCKEYCONF()
 }
 
 void 
-NdbQueryImpl::execCLOSE_SCAN_REP(bool needClose)
+NdbQueryImpl::execCLOSE_SCAN_REP(int errorCode, bool needClose)
 {
   if (traceSignals)
   {
     ndbout << "NdbQueryImpl::execCLOSE_SCAN_REP()" << endl;
   }
-  assert(m_finalBatchFrags < getRootFragCount());
-  m_pendingFrags = 0;
-  if(!needClose)
-  {
-    m_finalBatchFrags = getRootFragCount();
-  }
+  setFetchTerminated(errorCode,needClose);
 }
 
 /*
@@ -2044,6 +2074,15 @@ NdbQueryImpl::execCLOSE_SCAN_REP(bool ne
 bool 
 NdbQueryImpl::incrementPendingFrags(int increment)
 {
+  if (unlikely(m_fullFrags.m_errorCode != 0))
+  {
+    /* Received fragment data after a SCANREF() (timeout?) has terminated the scan.
+     * We are about to close this query, and didn't expect there to be anything 'pending'.
+     */
+    assert (m_pendingFrags==0);
+    return (m_pendingFrags==0);
+  }
+
   m_pendingFrags += increment;
   assert(m_pendingFrags < 1<<15);            // Check against underflow.
   assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
@@ -2530,7 +2569,6 @@ NdbQueryImpl::doSend(int nodeId, bool la
 int sendFetchMore() - Fetch another scan batch, optionaly closing the scan
                 
                 Request another batch of rows to be retrieved from the scan.
-                Transporter mutex is locked before this method is called. 
 
 Return Value:   0 if send succeeded, -1 otherwise.
 Parameters:     emptyFrag: Root frgament for which to ask for another batch.
@@ -2600,6 +2638,12 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
    * receiver thread.
    */
   PollGuard poll_guard(*ndb.theImpl);
+
+  if (unlikely(hasReceivedError()))
+  {
+    // Errors arrived inbetween ::await released mutex, and fetchMore grabbed it
+    return -1;
+  }
   const int res = 
     facade->sendSignal(&tSignal, 
                        getNdbTransaction().getConnectedNodeId(), 
@@ -2607,7 +2651,7 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
                        1);
   if (unlikely(res == -1)) 
   {
-    // Error: 'Send to NDB failed'
+    setErrorCode(Err_NodeFailCausedAbort);
     return -1;
   }
   m_pendingFrags++;
@@ -2639,40 +2683,49 @@ NdbQueryImpl::closeTcCursor(bool forceSe
    */
   PollGuard poll_guard(*ndb->theImpl);
 
+//Uint32 seq = m_transaction->theNodeSequence;
+//if (seq != tp->getNodeSequence(m_transaction.getConnectedNodeId())) // TODO
+//{}
+
   /* Wait for outstanding scan results from current batch fetch */
-  while (m_error.code==0 && !isBatchComplete())
+  while (m_pendingFrags > 0)
   {
     const FetchResult waitResult = static_cast<FetchResult>
           (poll_guard.wait_scan(3*timeout, 
                                 m_transaction.getConnectedNodeId(), 
                                 forceSend));
     switch (waitResult) {
-    case FetchResult_ok:
+    case FetchResult_ok:  // SCAN_TABREF, may have setFetchTerminated() w/ errors
       break;
-    case FetchResult_nodeFail:
-      setErrorCode(Err_NodeFailCausedAbort);  // Node fail
-      return -1;
     case FetchResult_timeOut:
-      setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
-      return -1;
+      setFetchTerminated(Err_ReceiveTimedOut,false);
+      break;
+    case FetchResult_nodeFail:
+      setFetchTerminated(Err_NodeFailCausedAbort,false);
+      break;
     default:
       assert(false);
     }
+    if (hasReceivedError())
+    {
+      break;
+    }
   } // while
 
-  assert(m_pendingFrags==0 || m_error.code != 0);
-  m_error.code = 0;  // Ignore possible errorcode caused by previous fetching
-
-  /* Discard pending result in order to not confuse later counting of m_finalBatchFrags */
-  m_fullFrags.clear();
+  assert(m_pendingFrags==0);
+  m_fullFrags.clear();                         // Throw any unhandled results
+  m_fullFrags.m_errorCode = 0;                 // Clear errors caused by previous fetching
+  m_error.code = 0;
 
-  if (m_finalBatchFrags<getRootFragCount())  // TC has an open scan cursor.
+  if (m_finalBatchFrags < getRootFragCount())  // TC has an open scan cursor.
   {
-    /* Send SCANREQ(close) */
+    /* Send SCAN_NEXTREQ(close) */
     const int error = sendClose(m_transaction.getConnectedNodeId());
     if (unlikely(error))
       return error;
 
+    assert(m_finalBatchFrags+m_pendingFrags==getRootFragCount());
+
     /* Wait for close to be confirmed: */
     while (m_pendingFrags > 0)
     {
@@ -2682,49 +2735,35 @@ NdbQueryImpl::closeTcCursor(bool forceSe
                                   forceSend));
       switch (waitResult) {
       case FetchResult_ok:
-        if (unlikely(m_error.code))   // Close request itself failed, keep error
-        {
-          setErrorCode(m_error.code);
-          return -1;
-        }
-        NdbRootFragment* frag;
-        while ((frag=m_fullFrags.pop()) != NULL)
-        {
-          if (frag->finalBatchReceived())
-          {
-            // This was the final batch for that root fragment.
-            m_finalBatchFrags++;
-          }
-        }
         break;
-      case FetchResult_nodeFail:
-        setErrorCode(Err_NodeFailCausedAbort);  // Node fail
-        return -1;
       case FetchResult_timeOut:
-        setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
-        return -1;
+        setFetchTerminated(Err_ReceiveTimedOut,false);
+        break;
+      case FetchResult_nodeFail:
+        setFetchTerminated(Err_NodeFailCausedAbort,false);
+        break;
       default:
         assert(false);
       }
+      if (hasReceivedError())
+      {
+        break;
+      }
     } // while
   } // if
-  assert(m_finalBatchFrags == getRootFragCount());
 
   return 0;
 } //NdbQueryImpl::closeTcCursor
 
 
 /*
-  This method is called with the PollGuard mutex held on the transporter.
-*/
+ * This method is called with the PollGuard mutex held on the transporter.
+ */
 int
 NdbQueryImpl::sendClose(int nodeId)
 {
   assert(m_finalBatchFrags < getRootFragCount());
-
   m_pendingFrags = getRootFragCount() - m_finalBatchFrags;
-  assert(m_pendingFrags > 0);
-  assert(m_pendingFrags < 1<<15); // Check against underflow.
 
   Ndb& ndb = *m_transaction.getNdb();
   NdbApiSignal tSignal(&ndb);
@@ -2746,28 +2785,6 @@ NdbQueryImpl::sendClose(int nodeId)
 } // NdbQueryImpl::sendClose()
 
 
-/*
-  This method is always called with PollGuard mutex held on the transporter.
-  (As m_pendingFrags is accessed both from API sender and receiver)
-*/
-bool 
-NdbQueryImpl::isBatchComplete() const {
-#ifndef NDEBUG
-  if (!m_error.code)
-  {
-    Uint32 count = 0;
-    for(Uint32 i = 0; i < getRootFragCount(); i++){
-      if(!m_rootFrags[i].isFragBatchComplete()){
-        count++;
-      }
-    }
-    assert(count == m_pendingFrags);
-  }
-#endif
-  return m_pendingFrags == 0;
-}
-
-
 int NdbQueryImpl::isPrunable(bool& prunable)
 {
   if (m_prunability == Prune_Unknown)
@@ -2788,17 +2805,18 @@ int NdbQueryImpl::isPrunable(bool& pruna
 
 
 /****************
- * NdbQueryImpl::FragStack methods.
+ * NdbQueryImpl::SharedFragStack methods.
  ***************/
 
-NdbQueryImpl::FragStack::FragStack():
+NdbQueryImpl::SharedFragStack::SharedFragStack():
+  m_errorCode(0),
   m_capacity(0),
   m_current(-1),
-  m_array(NULL){
-}
+  m_array(NULL)
+{}
 
 int
-NdbQueryImpl::FragStack::prepare(int capacity)
+NdbQueryImpl::SharedFragStack::prepare(int capacity)
 {
   assert(m_array==NULL);
   assert(m_capacity==0);
@@ -2812,7 +2830,8 @@ NdbQueryImpl::FragStack::prepare(int cap
 }
 
 void
-NdbQueryImpl::FragStack::push(NdbRootFragment& frag){
+NdbQueryImpl::SharedFragStack::push(NdbRootFragment& frag)
+{
   m_current++;
   assert(m_current<m_capacity);
   m_array[m_current] = &frag; 
@@ -4323,7 +4342,8 @@ NdbQueryOperationImpl::execTRANSID_AI(co
   bool ret = false;
   NdbRootFragment* rootFrag = NULL;
 
-  if(getQueryDef().isScanQuery()){
+  if(getQueryDef().isScanQuery())
+  {
     const Uint32 receiverId = CorrelationData(ptr, len).getRootReceiverId();
     
     /** receiverId holds the Id of the receiver of the corresponding stream
@@ -4360,7 +4380,9 @@ NdbQueryOperationImpl::execTRANSID_AI(co
       // Wake up appl thread when we have data, or entire query batch completed.
       ret = true;
     }
-  } else { // Lookup query
+  }
+  else
+  { // Lookup query
     // The root operation is a lookup.
     m_resultStreams[0]->execTRANSID_AI(ptr, len);
 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-10-29 21:21:45 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-11-05 14:45:27 +0000
@@ -152,7 +152,7 @@ public:
   bool execTCKEYCONF();
 
   /** Process SCAN_TABCONF w/ EndOfData which is a 'Close Scan Reply'. */
-  void execCLOSE_SCAN_REP(bool isClosed);
+  void execCLOSE_SCAN_REP(int errorCode, bool needClose);
 
   /** Determines if query has completed and may be garbage collected
    *  A query is not considder complete until the client has 
@@ -199,24 +199,30 @@ public:
   { return m_rootFragCount; }
 
 private:
-  /** Possible return values from NdbQuery::awaitMoreResults. Integer values
-   * matches those returned from PoolGuard::waitScan().
+  /** Possible return values from NdbQueryImpl::awaitMoreResults. Integer values
+   * matches those returned from PoolGuard::wait_scan().
    */
   enum FetchResult{
-    FetchResult_otherError = -4,
+    FetchResult_gotError = -4,  // There is an error avail in 'm_error.code'
     FetchResult_sendFail = -3,
     FetchResult_nodeFail = -2,
     FetchResult_timeOut = -1,
     FetchResult_ok = 0,
-    FetchResult_scanComplete = 1
+    FetchResult_noMoreData = 1
   };
 
-  /** A stack of NdbRootFragment pointers. */
-  class FragStack{
+  /** A stack of NdbRootFragment pointers.
+   *  NdbRootFragments which are 'BatchComplete' are pushed on this stack by 
+   *  the receiver thread, and later pop'ed into the application thread when 
+   *  it need more results to process.
+   *  Due to this shared usage, the PollGuard mutex must be set before 
+   *  accessing SharedFragStack.
+   */
+  class SharedFragStack{
   public:
-    explicit FragStack();
+    explicit SharedFragStack();
 
-    ~FragStack() {
+    ~SharedFragStack() {
       delete[] m_array;
     }
 
@@ -238,16 +244,21 @@ private:
       m_current = -1;
     }
 
+    /** Possible error received from TC / datanodes. */
+    int m_errorCode;
+
   private:
     /** Capacity of stack.*/
     int m_capacity;
+
     /** Index of current top of stack.*/
     int m_current;
     NdbRootFragment** m_array;
+
     // No copying.
-    FragStack(const FragStack&);
-    FragStack& operator=(const FragStack&);
-  }; // class FragStack
+    SharedFragStack(const SharedFragStack&);
+    SharedFragStack& operator=(const SharedFragStack&);
+  }; // class SharedFragStack
 
   class OrderedFragSet{
   public:
@@ -389,7 +400,7 @@ private:
   /** Root frgaments that have received a complete batch. Shared between 
    *  application thread and receiving thread. Access should be mutex protected.
    */
-  FragStack m_fullFrags;
+  SharedFragStack m_fullFrags;  // BEWARE: protect with PollGuard mutex
 
   /** Number of root fragments for which confirmation for the final batch 
    * (with tcPtrI=RNIL) has been received. Observe that even if 
@@ -448,14 +459,24 @@ private:
   /** Navigate to the next result from the root operation. */
   NdbQuery::NextResultOutcome nextRootResult(bool fetchAllowed, bool forceSend);
 
-  /** Get more scan results, ask datanodes for the next batch if necessary.*/
-  FetchResult awaitMoreResults(bool forceSend);
-
   /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
    * @return 0 if send succeeded, -1 otherwise.
    */
   int sendFetchMore(NdbRootFragment& emptyFrag, bool forceSend);
 
+  /** Wait for more scan results which already has been REQuested to arrive.
+   * @return 0 if some rows did arrive, a negative value if there are errors (in m_error.code),
+   * and 1 of there are no more rows to receive.
+   */
+  FetchResult awaitMoreResults(bool forceSend);
+
+  /** Check if we have received an error from TC, or datanodes.
+   * @return 'true' if an error is pending, 'false' otherwise.
+   */
+  bool hasReceivedError();                                   // Need mutex lock
+
+  void setFetchTerminated(int aErrorCode, bool needClose);   // Need mutex lock
+
   /** Close cursor on TC */
   int closeTcCursor(bool forceSend);
 
@@ -482,9 +503,6 @@ private:
    */
   bool incrementPendingFrags(int increment);
 
-  /** Check if batch is complete (no outstanding messages).*/
-  bool isBatchComplete() const;
-
   /** A complete batch has been received for a given root fragment
    *  Update whatever required before the appl. is allowed to navigate the result.
    */ 

=== modified file 'storage/ndb/src/ndbapi/NdbTransactionScan.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransactionScan.cpp	2010-10-11 13:18:51 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransactionScan.cpp	2010-11-05 14:45:27 +0000
@@ -55,8 +55,7 @@ NdbTransaction::receiveSCAN_TABREF(const
 
     } else {
       assert (m_scanningQuery);
-      m_scanningQuery->execCLOSE_SCAN_REP(ref->closeNeeded);
-      m_scanningQuery->setErrorCode(ref->errorCode);
+      m_scanningQuery->execCLOSE_SCAN_REP(ref->errorCode, ref->closeNeeded);
       if(!ref->closeNeeded){
         return 0;
       }
@@ -100,7 +99,7 @@ NdbTransaction::receiveSCAN_TABCONF(cons
         theScanningOp->execCLOSE_SCAN_REP();
       } else {
         assert (m_scanningQuery);
-        m_scanningQuery->execCLOSE_SCAN_REP(false);
+        m_scanningQuery->execCLOSE_SCAN_REP(0, false);
       }
       return 1; // -> Finished
     }


Attachment: [text/bzr-bundle] bzr/ole.john.aske@oracle.com-20101105144527-rsl314qx5w7slhyr.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3340) Ole John Aske5 Nov