List:Commits« Previous MessageNext Message »
From:Jonas Oreland Date:November 4 2009 11:12am
Subject:bzr commit into mysql-5.1-telco-7.0-spj branch (jonas:2969)
View as plain text  
#At file:///home/jonas/src/70-spj/ based on revid:jonas@stripped

 2969 Jonas Oreland	2009-11-04 [merge]
      ndb - spj - merge main

    modified:
      storage/ndb/CMakeLists.txt
      storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp
      storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
      storage/ndb/src/ndbapi/NdbTransactionScan.cpp
=== modified file 'storage/ndb/CMakeLists.txt'
--- a/storage/ndb/CMakeLists.txt	2009-10-19 09:44:26 +0000
+++ b/storage/ndb/CMakeLists.txt	2009-11-03 10:11:56 +0000
@@ -38,7 +38,11 @@ IF(WITHOUT_PARTITION_STORAGE_ENGINE)
 ENDIF(WITHOUT_PARTITION_STORAGE_ENGINE)
 
 INCLUDE("${PROJECT_SOURCE_DIR}/storage/mysql_storage_engine.cmake")
-INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include)
+INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/storage/ndb/include
+                    ${CMAKE_SOURCE_DIR}/storage/ndb/src/ndbapi
+                    ${CMAKE_SOURCE_DIR}/storage/ndb/include/util
+)
+
 SET(NDBCLUSTER_SOURCES
 	../../sql/ha_ndbcluster.cc
 	../../sql/ha_ndbcluster_cond.cc

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2009-11-02 18:03:43 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/Dbspj.hpp	2009-11-04 10:58:11 +0000
@@ -247,11 +247,17 @@ public:
       /**
        * SCAN_FRAGCONF is received
        */
-      SF_STARTED = 2
+      SF_STARTED = 2,
+
+      /**
+       * SCAN_NEXTREQ(close) has been sent to datanodes
+       */
+      SF_CLOSING = 3
     };
 
-    Uint32 m_scan_state;     // Only valid is TreeNodeState >= TN_ACTIVE
+    Uint32 m_scan_state;     // Only valid if TreeNodeState >= TN_ACTIVE
     Uint32 m_scan_status;    // fragmentCompleted
+    bool   m_pending_close;  // SCAN_NEXTREQ(close) pending while SF_RUNNING
     /** True if signal has been received since sending 
      * last SCAN_FRAGREQ/SCAN_NEXTREQ*/
     bool   m_scan_fragconf_received; 
@@ -357,7 +363,7 @@ public:
 
       /**
        * Is attrinfo "constructed"
-       *   (implies key info will be disowned (by send-signal)
+       *   (implies attr info will be disowned (by send-signal)
        */
       T_ATTRINFO_CONSTRUCTED = 0x8,
 

=== modified file 'storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp'
--- a/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2009-11-04 11:10:35 +0000
+++ b/storage/ndb/src/kernel/blocks/dbspj/DbspjMain.cpp	2009-11-04 11:12:17 +0000
@@ -534,6 +534,7 @@ Dbspj::do_init(Request* requestP, const 
   requestP->m_transId[1] = req->transId2;
   requestP->m_node_mask.clear();
   requestP->m_rootResultData = req->resultData;
+  requestP->m_currentNodePtrI = RNIL;
 }
 
 void
@@ -950,32 +951,51 @@ Dbspj::execSCAN_NEXTREQ(Signal* signal)
   Ptr<TreeNode> treeNodePtr;
   m_treenode_pool.getPtr(treeNodePtr, requestPtr.p->m_currentNodePtrI);
 
-  if (req->closeFlag == ZTRUE &&                         // Requested close scan
-      treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+  if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_CLOSING)
   {
     jam();
-
     /**
-     * TODO this needs more elaborate *abort* handling
+     * Duplicate of a close request already sent to datanodes.
+     * Ignore this and wait for reply on pending request.
      */
-    ScanFragConf* conf = reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
-
-    conf->senderData = requestPtr.p->m_senderData;
-    conf->transId1 = requestPtr.p->m_transId[0];
-    conf->transId2 = requestPtr.p->m_transId[1];
-    conf->completedOps = 0;
-    conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
-    conf->total_len = 0; // Not supported...
+    DEBUG("execSCAN_NEXTREQ, is SF_CLOSING -> ignore request");
+    return;
+  }
 
-    DEBUG("execSCAN_NEXTREQ(close), fragmentCompleted:" << conf->fragmentCompleted);
+  if (req->closeFlag == ZTRUE)                             // Requested close scan
+  {
+    if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2) // Is closed on LQH
+    {
+      jam();
+      ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state != ScanFragData::SF_RUNNING)
 
-    sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
-               ScanFragConf::SignalLength, JBB);
+      ScanFragConf* conf = reinterpret_cast<ScanFragConf*>(signal->getDataPtrSend());
+      conf->senderData = requestPtr.p->m_senderData;
+      conf->transId1 = requestPtr.p->m_transId[0];
+      conf->transId2 = requestPtr.p->m_transId[1];
+      conf->completedOps = 0;
+      conf->fragmentCompleted = 2; // =ZSCAN_FRAG_CLOSED -> Finished...
+      conf->total_len = 0; // Not supported...
+
+      DEBUG("execSCAN_NEXTREQ(close), LQH has conf'ed 'w/ ZSCAN_FRAG_CLOSED");
+      sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGCONF, signal,
+                 ScanFragConf::SignalLength, JBB);
 
-    cleanup(requestPtr);
-    return;
+      cleanup(requestPtr);
+      return;
+    }
+    else if (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING)
+    {
+      jam();
+      DEBUG("execSCAN_NEXTREQ, make PENDING CLOSE");
+      treeNodePtr.p->m_scanfrag_data.m_pending_close = true;
+      return;
+    }
+    // else; fallthrough & send to datanodes:
   }
 
+  ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
+  ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_status != 2)
   ndbrequire(treeNodePtr.p->m_info != 0 &&
              treeNodePtr.p->m_info->m_execSCAN_NEXTREQ != 0);
   (this->*(treeNodePtr.p->m_info->m_execSCAN_NEXTREQ))(signal,
@@ -1765,6 +1785,10 @@ Dbspj::scanFrag_build(Build_context& ctx
     treeNodePtr.p->m_info = &g_ScanFragOpInfo;
     treeNodePtr.p->m_bits |= TreeNode::T_ATTR_INTERPRETED;
 
+    treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+    treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
+    treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+
     ScanFragReq*dst=(ScanFragReq*)treeNodePtr.p->m_scanfrag_data.m_scanFragReq;
     dst->senderData = treeNodePtr.i;
     dst->resultRef = reference();
@@ -1807,6 +1831,7 @@ Dbspj::scanFrag_build(Build_context& ctx
     DEBUG("param len: " << param->len);
     if (unlikely(param->len < QN_ScanFragParameters::NodeSize))
     {
+      jam();
       DEBUG_CRASH();
       break;
     }
@@ -1826,6 +1851,7 @@ Dbspj::scanFrag_build(Build_context& ctx
                   nodeDA, treeBits, paramDA, paramBits);
     if (unlikely(err != 0))
     {
+      jam();
       DEBUG_CRASH();
       break;
     }
@@ -2012,6 +2038,7 @@ Dbspj::scanFrag_send(Signal* signal,
 	     NDB_ARRAY_SIZE(treeNodePtr.p->m_scanfrag_data.m_scanFragReq),
              JBB, &handle);
 
+  ndbassert (!treeNodePtr.p->m_scanfrag_data.m_pending_close);
   treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
   treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
   treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
@@ -2022,6 +2049,12 @@ Dbspj::scanFrag_send(Signal* signal,
   treeNodePtr.p->m_scanfrag_data.m_descendant_keyrefs_received = 0;
   treeNodePtr.p->m_scanfrag_data.m_descendant_keyreqs_sent = 0;
   treeNodePtr.p->m_scanfrag_data.m_missing_descendant_rows = 0;
+
+  /**
+   * Save position where next-scan-req should continue or close
+   */
+  treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+  requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
 }
 
 /** Return true if scan batch is complete. This happens when all scan 
@@ -2076,9 +2109,6 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
                                  Ptr<Request> requestPtr,
                                  Ptr<TreeNode> treeNodePtr)
 {
-  /**
-   * TODO
-   */
   const ScanFragRef* const rep = reinterpret_cast<const ScanFragRef*>(signal->getDataPtr());
   Uint32 errCode = rep->errorCode;
 
@@ -2102,9 +2132,29 @@ Dbspj::scanFrag_execSCAN_FRAGREF(Signal*
   sendSignal(requestPtr.p->m_senderRef, GSN_SCAN_FRAGREF, signal,
 	     ScanFragRef::SignalLength, JBB);
 
-  // TODO: Cleanup operation on SPJ block
+  treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = true;
+//treeNodePtr.p->m_scanfrag_data.m_scan_status = 2;  // (2=ZSCAN_FRAG_CLOSED)
+  ndbassert (isScanComplete(treeNodePtr.p->m_scanfrag_data));
 
-//ndbrequire(false);
+  /**
+   * SCAN_FRAGREF implies that datanodes closed the cursor.
+   *  -> Pending close is effectively a NOOP, reset it
+   */
+  if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+  {
+    jam();
+    treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+    DEBUG(" SCAN_FRAGREF, had pending close which can be ignored (is closed)");
+  }
+
+  /**
+   * Cleanup operation on SPJ block, remove all allocated resources.
+   */
+  {
+    jam();
+    treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_IDLE;
+    nodeFinished(signal, requestPtr, treeNodePtr);
+  }
 }
 
 
@@ -2143,8 +2193,34 @@ Dbspj::scanFrag_batch_complete(Signal* s
                                Ptr<TreeNode> treeNodePtr)
 {
   DEBUG("scanFrag_batch_complete()");
-  ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state ==
-             ScanFragData::SF_RUNNING);
+
+  if (treeNodePtr.p->m_scanfrag_data.m_pending_close)
+  {
+    jam();
+    ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING);
+    treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
+
+    DEBUG("scanFrag_batch_complete() - has pending close, ignore this reply, request close");
+
+    ScanFragNextReq* req = reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
+
+    /**
+     * SCAN_NEXTREQ(close) was requested while we where waiting for 
+     * datanodes to complete this request. 
+     *   - Send close request to LQH now.
+     *   - Suppress reply to TC/API, will reply later when close is conf'ed
+     */
+    req->closeFlag = ZTRUE;
+    req->senderData = treeNodePtr.i;
+    req->transId1 = requestPtr.p->m_transId[0];
+    req->transId2 = requestPtr.p->m_transId[1];
+    req->batch_size_rows = 0;
+    req->batch_size_bytes = 0;
+
+    treeNodePtr.p->m_scanfrag_data.m_pending_close = false;
+    scanFrag_execSCAN_NEXTREQ(signal, requestPtr, treeNodePtr);
+    return;
+  }
 
   /**
    * one batch complete...
@@ -2166,6 +2242,8 @@ Dbspj::scanFrag_batch_complete(Signal* s
   if (treeNodePtr.p->m_scanfrag_data.m_scan_status == 2)
   {
     jam();
+    ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING ||
+               treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_CLOSING);
     /**
      * EOF for scan
      */
@@ -2175,11 +2253,12 @@ Dbspj::scanFrag_batch_complete(Signal* s
   else
   {
     jam();
+    ndbrequire(treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_RUNNING);
     /**
-     * Save position where next-scan-req should continue
+     * Check position where next-scan-req should continue
      */
     treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_STARTED;
-    requestPtr.p->m_currentNodePtrI = treeNodePtr.i;
+    assert(requestPtr.p->m_currentNodePtrI == treeNodePtr.i);
   }
 }
 
@@ -2199,6 +2278,7 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
                                  Ptr<TreeNode> treeNodePtr)
 {
   jamEntry();
+  ndbassert (treeNodePtr.p->m_scanfrag_data.m_scan_state == ScanFragData::SF_STARTED);
 
   ScanFragNextReq* nextReq = reinterpret_cast<ScanFragNextReq*>(signal->getDataPtrSend());
   nextReq->senderData = treeNodePtr.i;
@@ -2214,7 +2294,10 @@ Dbspj::scanFrag_execSCAN_NEXTREQ(Signal*
              ScanFragNextReq::SignalLength, 
              JBB);
 
-  treeNodePtr.p->m_scanfrag_data.m_scan_state = ScanFragData::SF_RUNNING;
+  treeNodePtr.p->m_scanfrag_data.m_scan_state = (nextReq->closeFlag == ZTRUE)
+    ? ScanFragData::SF_CLOSING 
+    : ScanFragData::SF_RUNNING;
+
   treeNodePtr.p->m_scanfrag_data.m_scan_status = 0;
   treeNodePtr.p->m_scanfrag_data.m_scan_fragconf_received = false;
   treeNodePtr.p->m_scanfrag_data.m_rows_received = 0;

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-10-29 15:04:38 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2009-11-03 12:21:47 +0000
@@ -469,6 +469,9 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
                            const NdbQueryDefImpl& queryDef):
   m_interface(*this),
   m_state(Initial),
+  m_tcState(Inactive),
+  m_next(NULL),
+  m_queryDef(queryDef),
   m_error(),
   m_transaction(trans),
   m_scanTransaction(NULL),
@@ -476,8 +479,6 @@ NdbQueryImpl::NdbQueryImpl(NdbTransactio
   m_countOperations(0),
   m_tcKeyConfReceived(false),
   m_pendingStreams(0),
-  m_next(NULL),
-  m_queryDef(queryDef),
   m_parallelism(0),
   m_maxBatchRows(0),
   m_applStreams(),
@@ -652,18 +653,21 @@ NdbQueryImpl::nextResult(bool fetchAllow
     m_applStreams.pop();
   }
 
-  if (unlikely(m_applStreams.top()==NULL)) {
+  if (unlikely(m_applStreams.top()==NULL))
+  {
     /* m_applStreams is empty, so we cannot get more results without 
-     * possibly blocking.*/
-
-    if (fetchAllowed) {
+     * possibly blocking.
+     */
+    if (fetchAllowed)
+    {
       /* fetchMoreResults() will either copy streams that are already
-       * complete (under mutex protection), or block until more data arrives.*/
+       * complete (under mutex protection), or block until more data arrives.
+       */
       const FetchResult fetchResult = fetchMoreResults(forceSend);
       switch (fetchResult) {
       case FetchResult_otherError:
-        // FIXME: copy semantics from NdbScanOperation.
-        setErrorCode(Err_NodeFailCausedAbort); // Node fail
+        assert (m_error.code != 0);
+        setErrorCode(m_error.code);
         return NdbQuery::NextResult_error;
       case FetchResult_sendFail:
         // FIXME: copy semantics from NdbScanOperation.
@@ -758,7 +762,6 @@ NdbQueryImpl::nextResult(bool fetchAllow
     }
   }
 
-  m_state = Fetching;
   return NdbQuery::NextResult_gotRow;
 } //NdbQueryImpl::nextResult
 
@@ -771,7 +774,7 @@ NdbQueryImpl::fetchMoreResults(bool forc
   /* Check if there are any more completed streams available.*/
   if(m_queryDef.isScanQuery()){
     
-    assert (m_state==Executing || m_state==Fetching);
+    assert (m_state==Executing);
     assert (m_scanTransaction);
 
     Ndb* const ndb = m_transaction.getNdb();
@@ -782,14 +785,16 @@ NdbQueryImpl::fetchMoreResults(bool forc
     PollGuard poll_guard(facade,
                          &ndb->theImpl->theWaiter,
                          ndb->theNdbBlockNumber);
-    while(true){
+
+    while (likely(m_error.code==0))
+    {
       /* m_fullStreams contains any streams that are complete (for this batch)
        * but have not yet been moved (under mutex protection) to 
        * m_applStreams.*/
       if(m_fullStreams.top()==NULL){
         if(getRoot().isBatchComplete()){
           // Request another scan batch, may already be at EOF
-          const int sent = sendFetchMore(m_transaction.getConnectedNodeId(),false);
+          const int sent = sendFetchMore(m_transaction.getConnectedNodeId());
           if (sent==0) {  // EOF reached?
             m_state = EndOfData;
             postFetchRelease();
@@ -797,7 +802,8 @@ NdbQueryImpl::fetchMoreResults(bool forc
           } else if (unlikely(sent<0)) {
             return FetchResult_sendFail;
           }
-        }
+        } //if (isBatchComplete...
+
         /* More results are on the way, so we wait for them.*/
         const FetchResult waitResult = static_cast<FetchResult>
           (poll_guard.wait_scan(3*facade->m_waitfor_timeout, 
@@ -809,7 +815,7 @@ NdbQueryImpl::fetchMoreResults(bool forc
       } // if (m_fullStreams.top()==NULL)
 
       // Assert: No sporious wakeups w/ neither resultdata, nor EOF:
-      assert (m_fullStreams.top()!=NULL || getRoot().isBatchComplete());
+      assert (m_fullStreams.top()!=NULL || getRoot().isBatchComplete() || m_error.code);
 
       /* Move full streams from receiver thread's container to application 
        *  thread's container.*/
@@ -823,9 +829,13 @@ NdbQueryImpl::fetchMoreResults(bool forc
       }
 
       // Only expect to end up here if another ::sendFetchMore() is required
-      assert (getRoot().isBatchComplete());
+      assert (getRoot().isBatchComplete() || m_error.code);
     } // while(true)
 
+    // 'while' terminated by m_error.code
+    assert (m_error.code);
+    return FetchResult_otherError;
+
   } else { // is a Lookup query
     /* The root operation is a lookup. Lookups are guaranteed to be complete
      * before NdbTransaction::execute() returns. Therefore we do not set
@@ -885,90 +895,22 @@ NdbQueryImpl::closeSingletonScans()
 int
 NdbQueryImpl::close(bool forceSend)
 {
+  int res = 0;
+
   assert (m_state >= Initial && m_state < Destructed);
   Ndb* const ndb = m_transaction.getNdb();
 
-  // TODO?: Unsure if we may also need to send a close to datanodes if we
-  //        have an errorcondition on this query
-
-  if (m_state >= Executing  && m_state < EndOfData) {
-    if (m_queryDef.isScanQuery()) {
-      assert (m_scanTransaction != NULL);
-
-      TransporterFacade* const facade = ndb->theImpl->m_transporter_facade;
-
-      /* This part needs to be done under mutex due to synchronization with 
-       * receiver thread.
-       */
-      PollGuard poll_guard(facade,
-                           &ndb->theImpl->theWaiter,
-                           ndb->theNdbBlockNumber);
-
-      /* Wait for outstanding scan results from current batch fetch */
-      while (!getRoot().isBatchComplete())
-      {
-        const FetchResult waitResult = static_cast<FetchResult>
-              (poll_guard.wait_scan(3*facade->m_waitfor_timeout, 
-                                    m_transaction.getConnectedNodeId(), 
-                                    forceSend));
-        switch (waitResult) {
-        case FetchResult_ok:
-          break;
-        case FetchResult_nodeFail:
-          setErrorCode(Err_NodeFailCausedAbort);  // Node fail
-          return -1;
-        case FetchResult_timeOut:
-          setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
-          return -1;
-        default:
-          assert(false);
-        }
-      } // while
-
-      m_fullStreams.clear();
-      m_applStreams.clear();
-
-      /* Send SCANREQ(close) */
-      const int sent = sendFetchMore(m_transaction.getConnectedNodeId(), true);
-      if (unlikely(sent<0))
-        return -1;
-
-      /* Wait for close to be confirm: */
-      while (sent > 0 && !getRoot().isBatchComplete())
-      {
-        const FetchResult waitResult = static_cast<FetchResult>
-              (poll_guard.wait_scan(3*facade->m_waitfor_timeout, 
-                                    m_transaction.getConnectedNodeId(), 
-                                    forceSend));
-        switch (waitResult) {
-        case FetchResult_ok:
-          break;
-        case FetchResult_nodeFail:
-          setErrorCode(Err_NodeFailCausedAbort);  // Node fail
-          return -1;
-        case FetchResult_timeOut:
-          setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
-          return -1;
-        default:
-          assert(false);
-        }
-      } // while
-
-    } else { // Lookup query
-      // Not likely to require any action to close lookups on TC
-    }
-    // Throw any pending results
-    m_fullStreams.clear();
-    m_applStreams.clear();
-    m_state = EndOfData;
-
-  } else { // Not executed query, nor fetched any rows
-    // Should not have any pending result rows in this state:
-    assert(m_fullStreams.top() == NULL);
-    assert(m_applStreams.top() == NULL);
+  if (m_tcState > Inactive && m_tcState != Completed)
+  {
+    res = closeTcCursor(forceSend);
   }
 
-  if (m_scanTransaction != NULL) {
+  // Throw any pending results
+  m_fullStreams.clear();
+  m_applStreams.clear();
+
+  if (m_scanTransaction != NULL)
+  {
     assert (m_state != Closed);
     assert (m_scanTransaction->m_scanningQuery == this);
     m_scanTransaction->m_scanningQuery = NULL;
@@ -978,10 +920,11 @@ NdbQueryImpl::close(bool forceSend)
   }
 
   postFetchRelease();
-  m_state = Closed;
-  return 0;
+  m_state = Closed;  // Even if it was previously 'Failed' it is closed now!
+  return res;
 } //NdbQueryImpl::close
 
+
 void
 NdbQueryImpl::release()
 { 
@@ -1025,15 +968,17 @@ NdbQueryImpl::execTCKEYCONF()
 }
 
 void 
-NdbQueryImpl::execCLOSE_SCAN_REP()
+NdbQueryImpl::execCLOSE_SCAN_REP(bool needClose)
 {
-  if(traceSignals){
+  if(traceSignals)
+  {
     ndbout << "NdbQueryImpl::execCLOSE_SCAN_REP()" << endl;
   }
-  if (m_state < EndOfData)
-    m_state = EndOfData;
+  assert (m_tcState < Completed);
+  m_tcState = (needClose) ? FetchMore : Completed;
 }
 
+
 bool 
 NdbQueryImpl::countPendingStreams(int increment)
 {
@@ -1217,7 +1162,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
     Uint32 parallel = m_parallelism;
 
     bool tupScan = (scan_flags & NdbScanOperation::SF_TupScan);
-    bool rangeScan= false;
+    bool rangeScan = false;
 
     bool   isPruned;
     Uint32 hashValue;
@@ -1331,6 +1276,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
       setErrorCodeAbort(Err_SendFailed);  // Error: 'Send to NDB failed'
       return FetchResult_sendFail;
     }
+    m_tcState = Fetching;
 
   } else {  // Lookup query
 
@@ -1409,6 +1355,7 @@ NdbQueryImpl::doSend(int nodeId, bool la
       return FetchResult_sendFail;
     }
     m_transaction.OpSent();
+    m_tcState = Completed;
   } // if
 
   // Shrink memory footprint by removing structures not required after ::execute()
@@ -1436,18 +1383,18 @@ Return Value:   Return >0 : send was suc
                 Return =0 : No more rows is available -> EOF
                 Return -1: In all other case.   
 Parameters:     nodeId: Receiving processor node
-                closeFlag: Close the scan
 Remark:
 ******************************************************************************/
 int
-NdbQueryImpl::sendFetchMore(int nodeId, bool closeFlag)
+NdbQueryImpl::sendFetchMore(int nodeId)
 {
-
-  Uint32 receivers[64];  // TODO: 64 is a temp hack
   Uint32 sent = 0;
   NdbQueryOperationImpl& root = getRoot();
-  for(unsigned i = 0; i<m_parallelism; i++) {
+  Uint32 receivers[64];  // TODO: 64 is a temp hack
 
+  assert (root.m_resultStreams!=NULL);
+  for(unsigned i = 0; i<m_parallelism; i++)
+  {
     const Uint32 tcPtrI = root.getReceiver(i).m_tcPtrI;
     if (tcPtrI != RNIL) {
       receivers[sent++] = tcPtrI;
@@ -1459,10 +1406,16 @@ NdbQueryImpl::sendFetchMore(int nodeId, 
 
 //printf("::sendFetchMore, to nodeId:%d, sent:%d\n", nodeId, sent);
   if (sent==0)
+  {
+    assert (m_tcState != FetchMore);
+    m_tcState = Completed;
     return 0;
+  }
 
   m_pendingStreams = 0;
   m_tcKeyConfReceived = false;
+  assert (m_tcState == FetchMore);
+  m_tcState = Fetching;
 
   Ndb& ndb = *m_transaction.getNdb();
   NdbApiSignal tSignal(&ndb);
@@ -1473,7 +1426,7 @@ NdbQueryImpl::sendFetchMore(int nodeId, 
   const Uint64 transId = m_scanTransaction->getTransactionId();
 
   scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
-  scanNextReq->stopScan = closeFlag;
+  scanNextReq->stopScan = 0;
   scanNextReq->transId1 = (Uint32) transId;
   scanNextReq->transId2 = (Uint32) (transId >> 32);
   tSignal.setLength(ScanNextReq::SignalLength);
@@ -1492,6 +1445,109 @@ NdbQueryImpl::sendFetchMore(int nodeId, 
   return sent;
 } // NdbQueryImpl::sendFetchMore()
 
+int
+NdbQueryImpl::closeTcCursor(bool forceSend)
+{
+  assert (m_queryDef.isScanQuery());
+
+  Ndb* const ndb = m_transaction.getNdb();
+  TransporterFacade* const facade = ndb->theImpl->m_transporter_facade;
+
+  /* This part needs to be done under mutex due to synchronization with 
+   * receiver thread.
+   */
+  PollGuard poll_guard(facade,
+                       &ndb->theImpl->theWaiter,
+                       ndb->theNdbBlockNumber);
+
+  /* Wait for outstanding scan results from current batch fetch */
+  while (!getRoot().isBatchComplete() && m_error.code==0)
+  {
+    const FetchResult waitResult = static_cast<FetchResult>
+          (poll_guard.wait_scan(3*facade->m_waitfor_timeout, 
+                                m_transaction.getConnectedNodeId(), 
+                                forceSend));
+    switch (waitResult) {
+    case FetchResult_ok:
+      break;
+    case FetchResult_nodeFail:
+      setErrorCode(Err_NodeFailCausedAbort);  // Node fail
+      return -1;
+    case FetchResult_timeOut:
+      setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
+      return -1;
+    default:
+      assert(false);
+    }
+  } // while
+
+  m_error.code = 0;  // Ignore possible errorcode caused by previous fetching
+
+  if (m_tcState == FetchMore)  // TC has an open scan cursor.
+  {
+    /* Send SCANREQ(close) */
+    const int error = sendClose(m_transaction.getConnectedNodeId());
+    if (unlikely(error))
+      return error;
+
+    /* Wait for close to be confirmed: */
+    while (m_tcState != Completed)
+    {
+      const FetchResult waitResult = static_cast<FetchResult>
+            (poll_guard.wait_scan(3*facade->m_waitfor_timeout, 
+                                  m_transaction.getConnectedNodeId(), 
+                                  forceSend));
+      switch (waitResult) {
+      case FetchResult_ok:
+        if (unlikely(m_error.code))   // Close request itself failed, keep error
+        {
+          setErrorCode(m_error.code);
+          return -1;
+        }
+        break;
+      case FetchResult_nodeFail:
+        setErrorCode(Err_NodeFailCausedAbort);  // Node fail
+        return -1;
+      case FetchResult_timeOut:
+        setErrorCode(Err_ReceiveFromNdbFailed); // Timeout
+        return -1;
+      default:
+        assert(false);
+      }
+    } // while
+  } // if
+
+  m_tcState = Completed;
+  return 0;
+} //NdbQueryImpl::closeTcCursor
+
+int
+NdbQueryImpl::sendClose(int nodeId)
+{
+  m_pendingStreams = 0;
+  m_tcKeyConfReceived = false;
+  assert (m_tcState == FetchMore);
+  m_tcState = Fetching;
+
+  Ndb& ndb = *m_transaction.getNdb();
+  NdbApiSignal tSignal(&ndb);
+  tSignal.setSignal(GSN_SCAN_NEXTREQ);
+  ScanNextReq * const scanNextReq = CAST_PTR(ScanNextReq, tSignal.getDataPtrSend());
+
+  assert (m_scanTransaction);
+  const Uint64 transId = m_scanTransaction->getTransactionId();
+
+  scanNextReq->apiConnectPtr = m_scanTransaction->theTCConPtr;
+  scanNextReq->stopScan = true;
+  scanNextReq->transId1 = (Uint32) transId;
+  scanNextReq->transId2 = (Uint32) (transId >> 32);
+  tSignal.setLength(ScanNextReq::SignalLength);
+
+  TransporterFacade* tp = ndb.theImpl->m_transporter_facade;
+  return tp->sendSignal(&tSignal, nodeId);
+
+} // NdbQueryImpl::sendClose()
+
 
 NdbQueryImpl::StreamStack::StreamStack():
   m_size(0),
@@ -1748,7 +1804,8 @@ NdbQueryOperationImpl::setResultRowRef (
 }
 
 void
-NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo){
+NdbQueryOperationImpl::fetchRecAttrResults(Uint32 streamNo)
+{
   NdbRecAttr* recAttr = m_firstRecAttr;
   Uint32 posInRow = 0;
   while(recAttr != NULL){
@@ -1766,7 +1823,8 @@ NdbQueryOperationImpl::fetchRecAttrResul
 }
 
 void 
-NdbQueryOperationImpl::updateChildResult(Uint32 streamNo, Uint32 rowNo){
+NdbQueryOperationImpl::updateChildResult(Uint32 streamNo, Uint32 rowNo)
+{
   if (rowNo==tupleNotFound) {
     /* This operation gave no result for the current parent tuple.*/ 
     m_isRowNull = true;
@@ -2231,7 +2289,7 @@ NdbQueryOperationImpl::execTRANSID_AI(co
       m_queryImpl.countPendingStreams(1);
     }
     return false;
-  }
+  } // end lookup
 } //NdbQueryOperationImpl::execTRANSID_AI
 
 
@@ -2285,6 +2343,9 @@ NdbQueryOperationImpl::execSCAN_TABCONF(
   resultStream.m_pendingResults += rowCount;
 
   resultStream.m_receiver.m_tcPtrI = tcPtrI;  // Handle for SCAN_NEXTREQ, RNIL -> EOF
+  if (tcPtrI != RNIL) {
+    m_queryImpl.m_tcState = NdbQueryImpl::FetchMore;
+  }
 
   if(traceSignals){
     ndbout << "  resultStream(root) {" << resultStream << "}" << endl;
@@ -2363,10 +2424,7 @@ NdbQueryOperationImpl::getIdOfReceiver()
 
 bool 
 NdbQueryOperationImpl::isBatchComplete() const {
-  if (m_queryImpl.m_state >= NdbQueryImpl::EndOfData) {
-//  printf("Query state:%d (>= EndOfData) -> BatchCompleted\n", m_queryImpl.m_state);
-    return true;
-  }
+  assert(m_resultStreams!=NULL);
   for(Uint32 i = 0; i < m_queryImpl.getParallelism(); i++){
     if(!m_resultStreams[i]->isBatchComplete()){
       return false;
@@ -2379,6 +2437,7 @@ NdbQueryOperationImpl::isBatchComplete()
 const NdbReceiver& 
 NdbQueryOperationImpl::getReceiver(Uint32 recNo) const {
   assert(recNo<getQuery().getParallelism());
+  assert(m_resultStreams!=NULL);
   return m_resultStreams[recNo]->m_receiver;
 }
 

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-10-29 15:04:38 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2009-11-03 12:21:47 +0000
@@ -127,11 +127,6 @@ public:
    */
   int doSend(int aNodeId, bool lastFlag);
 
-  /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
-   *  @return #signals sent, -1 if error.
-   */
-  int sendFetchMore(int nodeId, bool lastFlag);
-
   NdbQuery& getInterface()
   { return m_interface; }
 
@@ -154,7 +149,7 @@ public:
   bool execTCKEYCONF();
 
   /** Process SCAN_TABCONF w/ EndOfData which is a 'Close Scan Reply'. */
-  void execCLOSE_SCAN_REP();
+  void execCLOSE_SCAN_REP(bool isClosed);
 
   /** Determines if query has completed and may be garbage collected
    *  A query is considder complete when it either:
@@ -224,18 +219,29 @@ private:
   /** The interface that is visible to the application developer.*/
   NdbQuery m_interface;
 
-  enum {
-    Initial,
-    Defined,
-    Prepared,
-    Executing,
-    Fetching,
-    EndOfData,
-    Closed,
-    Failed,
+  enum {        // State of NdbQuery in API
+    Initial,    // Constructed object, assiciated with a defined query
+    Defined,    // Parameter values has been assigned
+    Prepared,   // KeyInfo & AttrInfo prepared for execution
+    Executing,  // Signal with exec. req. sent to TC
+    EndOfData,  // All results rows consumed
+    Closed,     // Query has been ::close()'ed 
+    Failed,     
     Destructed
   } m_state;
 
+  enum {        // Assumed state of query cursor in TC block
+    Inactive,   // Execution not started at TC
+    Fetching,
+    FetchMore,
+    Completed
+  } m_tcState;
+
+  /** Next query in same transaction.*/
+  NdbQueryImpl* m_next;
+  /** Definition of this query.*/
+  const NdbQueryDefImpl& m_queryDef;
+
   /** Possible error status of this query.*/
   NdbError m_error;
   /** Transaction in which this query instance executes.*/
@@ -253,12 +259,9 @@ private:
 
   /** True if a TCKEYCONF message has been received for this query.*/
   bool m_tcKeyConfReceived;
+
   /** Number of streams not yet completed within the current batch.*/
   Uint32 m_pendingStreams;
-  /** Next query in same transaction.*/
-  NdbQueryImpl* m_next;
-  /** Definition of this query.*/
-  const NdbQueryDefImpl& m_queryDef;
 
   /** Number of fragments to be scanned in parallel. (1 if root operation is 
    *  a lookup)*/
@@ -296,6 +299,19 @@ private:
   /** Get more scan results, ask for the next batch if necessary.*/
   FetchResult fetchMoreResults(bool forceSend);
 
+  /** Send SCAN_NEXTREQ signal to fetch another batch from a scan query
+   *  @return #signals sent, -1 if error.
+   */
+  int sendFetchMore(int nodeId);
+
+  /** Close cursor on TC */
+  int closeTcCursor(bool forceSend);
+
+  /** Send SCAN_NEXTREQ(close) signal to close cursor on TC and datanodes.
+   *  @return #signals sent, -1 if error.
+   */
+  int sendClose(int nodeId);
+
   /** Close scan receivers used for lookups. (Since scans and lookups should
    * have the same semantics for nextResult(), lookups use scan-type 
    * NdbReceiver objects.)

=== modified file 'storage/ndb/src/ndbapi/NdbTransactionScan.cpp'
--- a/storage/ndb/src/ndbapi/NdbTransactionScan.cpp	2009-10-21 11:45:41 +0000
+++ b/storage/ndb/src/ndbapi/NdbTransactionScan.cpp	2009-11-03 12:21:47 +0000
@@ -47,8 +47,8 @@ NdbTransaction::receiveSCAN_TABREF(NdbAp
   
   if (checkState_TransId(&ref->transId1)) {
     if (theScanningOp) {
-      theScanningOp->setErrorCode(ref->errorCode);
       theScanningOp->execCLOSE_SCAN_REP();
+      theScanningOp->setErrorCode(ref->errorCode);
       if(!ref->closeNeeded){
         return 0;
       }
@@ -63,8 +63,8 @@ NdbTransaction::receiveSCAN_TABREF(NdbAp
 
     } else {
       assert (m_scanningQuery);
+      m_scanningQuery->execCLOSE_SCAN_REP(ref->closeNeeded);
       m_scanningQuery->setErrorCode(ref->errorCode);
-      m_scanningQuery->execCLOSE_SCAN_REP();
       if(!ref->closeNeeded){
         return 0;
       }
@@ -108,7 +108,7 @@ NdbTransaction::receiveSCAN_TABCONF(NdbA
         theScanningOp->execCLOSE_SCAN_REP();
       } else {
         assert (m_scanningQuery);
-        m_scanningQuery->execCLOSE_SCAN_REP();
+        m_scanningQuery->execCLOSE_SCAN_REP(false);
       }
       return 1; // -> Finished
     }


Attachment: [text/bzr-bundle] bzr/jonas@mysql.com-20091104111217-5gs4rcrc7ggu3gie.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj branch (jonas:2969)Jonas Oreland4 Nov