List:Commits« Previous MessageNext Message »
From:Ole John Aske Date:October 29 2010 9:21pm
Subject:bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch
(ole.john.aske:3331)
View as plain text  
#At file:///home/oleja/mysql/mysql-5.1-telco-7.0-spj-scan-scan/ based on revid:jonas@stripped

 3331 Ole John Aske	2010-10-29
      Recommit after resolving merge conflicts:
      
      spj-svs: Fixed mutex / mt concurrency problem when receiving fragment data.
      
      Ensure that the class NdbQueryImpl member fields m_pendingFrags & m_finalBatchFrags 
      are only accessed when the PollGuard mutex is locked.

    modified:
      storage/ndb/src/ndbapi/NdbQueryOperation.cpp
      storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp
=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperation.cpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-10-26 12:41:00 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperation.cpp	2010-10-29 21:21:45 +0000
@@ -1743,7 +1743,6 @@ NdbQueryImpl::awaitMoreResults(bool forc
   /* Check if there are any more completed fragments available.*/
   if (m_queryDef.isScanQuery())
   {
-    assert (m_state==Executing);
     assert (m_scanTransaction);
 
     Ndb* const ndb = m_transaction.getNdb();
@@ -1756,6 +1755,7 @@ NdbQueryImpl::awaitMoreResults(bool forc
          * receiver thread.
          */
         PollGuard poll_guard(*ndb->theImpl);
+        assert (m_state==Executing);
 
         /* m_fullFrags contains any fragments that are complete (for this batch)
          * but have not yet been moved (under mutex protection) to 
@@ -1998,12 +1998,17 @@ NdbQueryImpl::execCLOSE_SCAN_REP(bool ne
   }
 }
 
-
+/*
+  ::incrementPendingFrags() is intended to be called when receiving signals only.
+  The PollGuard mutex is then set and the shared 'm_pendingFrags' can safely be updated.
+*/
 bool 
 NdbQueryImpl::incrementPendingFrags(int increment)
 {
   m_pendingFrags += increment;
-  assert(m_pendingFrags < 1<<15); // Check against underflow.
+  assert(m_pendingFrags < 1<<15);            // Check against underflow.
+  assert(m_pendingFrags <= m_rootFragCount); // .... and overflow
+
   if (traceSignals) {
     ndbout << "NdbQueryImpl::incrementPendingFrags(" << increment << "): "
            << ", pendingFrags=" << m_pendingFrags <<  endl;
@@ -2032,8 +2037,6 @@ NdbQueryImpl::prepareSend()
     return -1;
   }
 
-  assert (m_pendingFrags==0);
-
   // Determine execution parameters 'batch size'.
   // May be user specified (TODO), and/or,  limited/specified by config values
   //
@@ -2043,13 +2046,13 @@ NdbQueryImpl::prepareSend()
      * and unordered scans.*/
     if (getQueryOperation(0U).m_parallelism > 0)
     {
-      m_pendingFrags = m_rootFragCount 
+      m_rootFragCount
         = MIN(getRoot().getQueryOperationDef().getTable().getFragmentCount(),
               getQueryOperation(0U).m_parallelism);
     }
     else
     {
-      m_pendingFrags = m_rootFragCount 
+      m_rootFragCount
         = getRoot().getQueryOperationDef().getTable().getFragmentCount();
     }
     Ndb* const ndb = m_transaction.getNdb();
@@ -2070,7 +2073,7 @@ NdbQueryImpl::prepareSend()
   }
   else  // Lookup query
   {
-    m_pendingFrags = m_rootFragCount = 1;
+    m_rootFragCount = 1;
   }
 
   // Some preparation for later batchsize calculations pr. (sub) scan
@@ -2112,10 +2115,10 @@ NdbQueryImpl::prepareSend()
   }
   int error;
   if (unlikely((error = m_applFrags.prepare(getRoot().getOrdering(),
-                                              m_pendingFrags, 
+                                              m_rootFragCount, 
                                               keyRec,
                                               getRoot().m_ndbRecord)) != 0)
-            || (error = m_fullFrags.prepare(m_pendingFrags)) != 0) {
+            || (error = m_fullFrags.prepare(m_rootFragCount)) != 0) {
     setErrorCodeAbort(error);
     return -1;
   }
@@ -2147,6 +2150,7 @@ NdbQueryImpl::prepareSend()
   ndbout << endl;
 #endif
 
+  assert (m_pendingFrags==0);
   m_state = Prepared;
   return 0;
 } // NdbQueryImpl::prepareSend
@@ -2220,7 +2224,10 @@ const Uint32* InitialReceiverIdIterator:
   
 
 /******************************************************************************
-int doSend()
+int doSend()    Send serialized queryTree and parameters encapsulated in 
+                either a SCAN_TABREQ or TCKEYREQ to TC.
+
+NOTE:           The TransporterFacade mutex is already set by callee.
 
 Return Value:   Return >0 : send was succesful, returns number of signals sent
                 Return -1: In all other case.   
@@ -2461,6 +2468,9 @@ NdbQueryImpl::doSend(int nodeId, bool la
                                           getNoOfLeafOperations());
   } // if
 
+  assert (m_pendingFrags==0);
+  m_pendingFrags = m_rootFragCount;
+
   // Shrink memory footprint by removing structures not required after ::execute()
   m_keyInfo.releaseExtend();
   m_attrInfo.releaseExtend();
@@ -2494,9 +2504,6 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
   assert(!emptyFrag.finalBatchReceived());
   assert(m_queryDef.isScanQuery());
 
-  m_pendingFrags++;
-  assert(m_pendingFrags <= getRootFragCount());
-
   emptyFrag.reset();
 
   for (unsigned opNo=0; opNo<m_countOperations; opNo++) 
@@ -2562,6 +2569,8 @@ NdbQueryImpl::sendFetchMore(NdbRootFragm
     // Error: 'Send to NDB failed'
     return -1;
   }
+  m_pendingFrags++;
+  assert(m_pendingFrags <= getRootFragCount());
 
   if (forceSend)
   {
@@ -2663,6 +2672,10 @@ NdbQueryImpl::closeTcCursor(bool forceSe
   return 0;
 } //NdbQueryImpl::closeTcCursor
 
+
+/*
+  This method is called with the PollGuard mutex held on the transporter.
+*/
 int
 NdbQueryImpl::sendClose(int nodeId)
 {
@@ -2692,6 +2705,10 @@ NdbQueryImpl::sendClose(int nodeId)
 } // NdbQueryImpl::sendClose()
 
 
+/*
+  This method is always called with PollGuard mutex held on the transporter.
+  (As m_pendingFrags is accessed both from API sender and receiver)
+*/
 bool 
 NdbQueryImpl::isBatchComplete() const {
 #ifndef NDEBUG

=== modified file 'storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp'
--- a/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-10-12 13:00:44 +0000
+++ b/storage/ndb/src/ndbapi/NdbQueryOperationImpl.hpp	2010-10-29 21:21:45 +0000
@@ -364,8 +364,10 @@ private:
    */
   Uint32 m_globalCursor;
 
-  /** Number of root fragments not yet completed within the current batch.*/
-  Uint32 m_pendingFrags;
+  /** Number of root fragments not yet completed within the current batch.
+   *  Only access w/ PollGuard mutex as it is also updated by receiver threa 
+   */
+  Uint32 m_pendingFrags;  // BEWARE: protect with PollGuard mutex
 
   /** Number of fragments to be read by the root operation. (1 if root 
    * operation is a lookup)*/
@@ -394,7 +396,7 @@ private:
    * m_finalBatchFrags==m_rootFragCount, all tuples for the final batches may
    * still not have been received (i.e. m_pendingFrags>0).
    */
-  Uint32 m_finalBatchFrags;
+  Uint32 m_finalBatchFrags; // BEWARE: protect with PollGuard mutex
 
   /** Number of IndexBounds set by API (index scans only) */
   Uint32 m_num_bounds;


Attachment: [text/bzr-bundle] bzr/ole.john.aske@oracle.com-20101029212145-ip0tz1guh8iwghol.bundle
Thread
bzr commit into mysql-5.1-telco-7.0-spj-scan-vs-scan branch(ole.john.aske:3331) Ole John Aske29 Oct