List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:November 8 2010 12:13pm
Subject:bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3342 to 3343)
View as plain text  
 3343 Ole John Aske	2010-11-08
      spj-svs: Introduce usage of 'Node sequence' as part of nodefailure detection.
      
      Implemented based on current implementation in NdbScanOperation.cpp

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
 3342 Ole John Aske	2010-11-08
      spj-svs: Corrected a potential mutex issue accessing 'm_finalBatchFrags' within ::nextRootResult)
      
      m_finalBatchFrags requires the PullGuard mutex to be locked before accesing it. However, in 
      ::nextRootResult() it was accessed wo/ mutex.
      
      Moved the mutex violating logic inside :awaitMoreResults() where the mutex is locked,
      and extended the return values from ::awaitMoreResults() to include 
      'FetchResult_noMoreCache'.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-11-08 09:38:33 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-11-08 12:12:08 +0000
@@ -1793,12 +1793,12 @@ NdbQueryImpl::awaitMoreResults(bool forc
     assert (m_scanTransaction);
     assert (m_state==Executing);
 
-    Ndb* const ndb = m_transaction.getNdb();
+    NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
     {
       /* This part needs to be done under mutex due to synchronization with 
        * receiver thread.
        */
-      PollGuard poll_guard(*ndb->theImpl);
+      PollGuard poll_guard(*ndb);
 
       /* There may be pending (asynchronous received, mutex protected) errors
        * from TC / datanodes. Propogate these into m_error.code in 'API space'.
@@ -1829,31 +1829,27 @@ NdbQueryImpl::awaitMoreResults(bool forc
                                                           : FetchResult_noMoreData;
         }
 
+        TransporterFacade* tp = ndb->m_transporter_facade;
+        const Uint32 timeout  = ndb->get_waitfor_timeout();
+        const Uint32 nodeId   = m_transaction.getConnectedNodeId();
+        const Uint32 seq      = m_transaction.theNodeSequence;
+
         /* More results are on the way, so we wait for them.*/
         const FetchResult waitResult = static_cast<FetchResult>
-          (poll_guard.wait_scan(3*ndb->theImpl->get_waitfor_timeout(), 
-                                m_transaction.getConnectedNodeId(), 
+          (poll_guard.wait_scan(3*timeout, 
+                                nodeId, 
                                 forceSend));
 
-        switch (waitResult) {
-        case FetchResult_ok:  // SCAN_TABREF, may have setFetchTerminated() w/ errors
-          break;
-        case FetchResult_timeOut:
+        if (tp->getNodeSequence(nodeId) != seq)
+          setFetchTerminated(Err_NodeFailCausedAbort,false);
+        else if (likely(waitResult == FetchResult_ok))
+          continue;
+        else if (waitResult == FetchResult_timeOut)
           setFetchTerminated(Err_ReceiveTimedOut,false);
-          break;
-        case FetchResult_nodeFail:
+        else
           setFetchTerminated(Err_NodeFailCausedAbort,false);
-          break;
-        default:
-          assert(false);
-        }
-        assert (!m_error.code);
 
-        /* Getting here is not an error. PollGuard::wait_scan() will return
-         * when a complete batch (for a fragment) is available for *any* active 
-         * scan in this transaction. So we must wait again for the next arriving 
-         * batch.
-         */
+        assert (!m_error.code);
       } // while(!hasReceivedError())
     } // Terminates scope of 'PollGuard'
 
@@ -2636,7 +2632,9 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
   secs[ScanNextReq::ReceiverIdsSectionNum].sectionIter = &receiverIdIter;
   secs[ScanNextReq::ReceiverIdsSectionNum].sz = 1;
   
-  TransporterFacade* const facade = ndb.theImpl->m_transporter_facade;
+  TransporterFacade* const tp = ndb.theImpl->m_transporter_facade;
+  Uint32 nodeId = m_transaction.getConnectedNodeId();
+  Uint32 seq    = m_transaction.theNodeSequence;
 
   /* This part needs to be done under mutex due to synchronization with 
    * receiver thread.
@@ -2648,29 +2646,24 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
     // Errors arrived inbetween ::await released mutex, and fetchMore grabbed it
     return -1;
   }
-  const int res = 
-    facade->sendSignal(&tSignal, 
-                       getNdbTransaction().getConnectedNodeId(), 
-                       secs, 
-                       1);
-  if (unlikely(res == -1)) 
+  if (tp->getNodeSequence(nodeId) != seq ||
+      tp->sendSignal(&tSignal, nodeId, secs, 1) != 0)
   {
     setErrorCode(Err_NodeFailCausedAbort);
     return -1;
   }
-  m_pendingFrags++;
-  assert(m_pendingFrags <= getRootFragCount());
-
   if (forceSend)
   {
     // Flush signals to TC.
-    facade->forceSend(ndb.theNdbBlockNumber);
+    tp->forceSend(ndb.theNdbBlockNumber);
   }
   else
   {
-    facade->checkForceSend(ndb.theNdbBlockNumber);
+    tp->checkForceSend(ndb.theNdbBlockNumber);
   }
 
+  m_pendingFrags++;
+  assert(m_pendingFrags <= getRootFragCount());
   return 0;
 } // NdbQueryImpl::sendFetchMore()
 
@@ -2679,36 +2672,38 @@ NdbQueryImpl::closeTcCursor(bool forceSe
 {
   assert (m_queryDef.isScanQuery());
 
-  Ndb* const ndb = m_transaction.getNdb();
-  Uint32 timeout = ndb->theImpl->get_waitfor_timeout();
+  NdbImpl* const ndb = m_transaction.getNdb()->theImpl;
+
+  TransporterFacade* tp = ndb->m_transporter_facade;
+  const Uint32 timeout  = ndb->get_waitfor_timeout();
+  const Uint32 nodeId   = m_transaction.getConnectedNodeId();
+  const Uint32 seq      = m_transaction.theNodeSequence;
 
   /* This part needs to be done under mutex due to synchronization with 
    * receiver thread.
    */
-  PollGuard poll_guard(*ndb->theImpl);
+  PollGuard poll_guard(*ndb);
 
-//Uint32 seq = m_transaction->theNodeSequence;
-//if (seq != tp->getNodeSequence(m_transaction.getConnectedNodeId())) // TODO
-//{}
+  if (unlikely(tp->getNodeSequence(nodeId) != seq))
+  {
+    setErrorCode(Err_NodeFailCausedAbort);
+    return -1;  // Transporter disconnected and reconnected, no need to close
+  }
 
   /* Wait for outstanding scan results from current batch fetch */
   while (m_pendingFrags > 0)
   {
-    const FetchResult waitResult = static_cast<FetchResult>
-          (poll_guard.wait_scan(3*timeout, 
-                                m_transaction.getConnectedNodeId(), 
-                                forceSend));
-    switch (waitResult) {
-    case FetchResult_ok:  // SCAN_TABREF, may have setFetchTerminated() w/ errors
-      break;
-    case FetchResult_timeOut:
-      setFetchTerminated(Err_ReceiveTimedOut,false);
-      break;
-    case FetchResult_nodeFail:
+    const FetchResult result = static_cast<FetchResult>
+        (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
+
+    if (unlikely(tp->getNodeSequence(nodeId) != seq))
       setFetchTerminated(Err_NodeFailCausedAbort,false);
-      break;
-    default:
-      assert(false);
+    else if (unlikely(result != FetchResult_ok))
+    {
+      if (result == FetchResult_timeOut)
+        setFetchTerminated(Err_ReceiveTimedOut,false);
+      else
+        setFetchTerminated(Err_NodeFailCausedAbort,false);
     }
     if (hasReceivedError())
     {
@@ -2733,21 +2728,17 @@ NdbQueryImpl::closeTcCursor(bool forceSe
     /* Wait for close to be confirmed: */
     while (m_pendingFrags > 0)
     {
-      const FetchResult waitResult = static_cast<FetchResult>
-            (poll_guard.wait_scan(3*timeout, 
-                                  m_transaction.getConnectedNodeId(), 
-                                  forceSend));
-      switch (waitResult) {
-      case FetchResult_ok:
-        break;
-      case FetchResult_timeOut:
-        setFetchTerminated(Err_ReceiveTimedOut,false);
-        break;
-      case FetchResult_nodeFail:
+      const FetchResult result = static_cast<FetchResult>
+          (poll_guard.wait_scan(3*timeout, nodeId, forceSend));
+
+      if (unlikely(tp->getNodeSequence(nodeId) != seq))
         setFetchTerminated(Err_NodeFailCausedAbort,false);
-        break;
-      default:
-        assert(false);
+      if (unlikely(result != FetchResult_ok))
+      {
+        if (result == FetchResult_timeOut)
+          setFetchTerminated(Err_ReceiveTimedOut,false);
+        else
+          setFetchTerminated(Err_NodeFailCausedAbort,false);
       }
       if (hasReceivedError())
       {


Attachment: [text/bzr-bundle] bzr/ole.john.aske@oracle.com-20101108121208-itigmpnjo5cvk395.bundle
Thread
bzr push into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3342 to 3343) Ole John Aske8 Nov